rocksdb_fileformat/
index_block.rs

1use crate::block_handle::BlockHandle;
2use crate::compression::decompress;
3use crate::error::{Error, Result};
4use crate::types::CompressionType;
5use byteorder::{LittleEndian, ReadBytesExt};
6use std::io::Cursor;
7
8pub struct IndexEntry {
9    pub key: Vec<u8>,
10    pub block_handle: BlockHandle,
11}
12
13pub struct IndexBlock {
14    data: Vec<u8>,
15    restart_offset: usize,
16    num_restarts: u32,
17    restart_points: Vec<u32>,
18}
19
20impl IndexBlock {
21    pub fn new(compressed_data: &[u8], compression_type: CompressionType) -> Result<Self> {
22        let raw_data = decompress(compressed_data, compression_type)?;
23
24        // RocksDB blocks have a 5-byte trailer: compression_type (1) + checksum (4)
25        let data = if raw_data.len() >= 5 {
26            raw_data[..raw_data.len() - 5].to_vec()
27        } else {
28            raw_data
29        };
30
31        if data.len() < 4 {
32            return Err(Error::InvalidBlockFormat(
33                "Index block too small to contain restart info".to_string(),
34            ));
35        }
36
37        let mut cursor = Cursor::new(&data);
38        cursor.set_position((data.len() - 4) as u64);
39        let num_restarts = cursor.read_u32::<LittleEndian>()?;
40
41        // Check if this looks like a valid restart count (reasonable small number)
42        if num_restarts == 0 || num_restarts > 1000 {
43            // This might not be a standard block format
44            // Try to parse as a single-entry index with no restart points
45            let data_len = data.len();
46            return Ok(IndexBlock {
47                data,
48                restart_offset: data_len,
49                num_restarts: 1,
50                restart_points: vec![0],
51            });
52        }
53
54        if data.len() < 4 + (num_restarts as usize * 4) {
55            // Fallback to simple format
56            let data_len = data.len();
57            return Ok(IndexBlock {
58                data,
59                restart_offset: data_len,
60                num_restarts: 1,
61                restart_points: vec![0],
62            });
63        }
64
65        let restart_offset = data.len() - 4 - (num_restarts as usize * 4);
66        if restart_offset >= data.len() {
67            return Err(Error::InvalidBlockFormat(
68                "Invalid restart offset in index block".to_string(),
69            ));
70        }
71
72        let mut restart_points = Vec::with_capacity(num_restarts as usize);
73        cursor.set_position(restart_offset as u64);
74
75        for _ in 0..num_restarts {
76            restart_points.push(cursor.read_u32::<LittleEndian>()?);
77        }
78
79        Ok(IndexBlock {
80            data,
81            restart_offset,
82            num_restarts,
83            restart_points,
84        })
85    }
86
87    pub fn get_entries(&self) -> Result<Vec<IndexEntry>> {
88        let mut entries = Vec::new();
89        let mut cursor = Cursor::new(&self.data);
90        let mut last_key = Vec::new();
91
92        // Try to find a valid starting point by looking for an entry with shared_len=0
93        let mut start_pos = 0;
94        if self.data.len() > 0 && self.data[0] != 0 {
95            // First byte is not 0 (shared_len), so look for a restart point
96            for &restart_pos in &self.restart_points {
97                if restart_pos < self.data.len() as u32 && restart_pos > 0 {
98                    if (restart_pos as usize) < self.data.len()
99                        && self.data[restart_pos as usize] == 0
100                    {
101                        start_pos = restart_pos as usize;
102                        break;
103                    }
104                }
105            }
106        }
107
108        cursor.set_position(start_pos as u64);
109
110        while (cursor.position() as usize) < self.restart_offset {
111            let entry_start = cursor.position();
112
113            let shared_key_len = self.read_varint(&mut cursor)?;
114            let unshared_key_len = self.read_varint(&mut cursor)?;
115            let value_len = self.read_varint(&mut cursor)?;
116
117            if shared_key_len > last_key.len() as u32 {
118                return Err(Error::InvalidBlockFormat(
119                    "Shared key length exceeds previous key length in index block".to_string(),
120                ));
121            }
122
123            let mut key = Vec::new();
124            key.extend_from_slice(&last_key[..shared_key_len as usize]);
125
126            if unshared_key_len > 0 {
127                let pos = cursor.position() as usize;
128                if pos + unshared_key_len as usize > self.data.len() {
129                    return Err(Error::InvalidBlockFormat(
130                        "Index key extends beyond block".to_string(),
131                    ));
132                }
133                key.extend_from_slice(&self.data[pos..pos + unshared_key_len as usize]);
134                cursor.set_position((pos + unshared_key_len as usize) as u64);
135            }
136
137            if value_len == 0 {
138                return Err(Error::InvalidBlockFormat(
139                    "Index entry must have value (block handle)".to_string(),
140                ));
141            }
142
143            let value_start = cursor.position() as usize;
144            if value_start + value_len as usize > self.data.len() {
145                return Err(Error::InvalidBlockFormat(
146                    "Index value extends beyond block".to_string(),
147                ));
148            }
149
150            let value_data = &self.data[value_start..value_start + value_len as usize];
151            let block_handle = self.parse_block_handle(value_data)?;
152            cursor.set_position((value_start + value_len as usize) as u64);
153
154            last_key = key.clone();
155            entries.push(IndexEntry { key, block_handle });
156
157            if self.is_restart_point(entry_start as u32) {
158                last_key.clear();
159            }
160        }
161
162        Ok(entries)
163    }
164
165    fn parse_block_handle(&self, data: &[u8]) -> Result<BlockHandle> {
166        let mut cursor = Cursor::new(data);
167
168        let offset = self.read_varint_from_slice(&mut cursor)?;
169        let size = self.read_varint_from_slice(&mut cursor)?;
170
171        Ok(BlockHandle {
172            offset: offset as u64,
173            size: size as u64,
174        })
175    }
176
177    fn read_varint_from_slice(&self, cursor: &mut Cursor<&[u8]>) -> Result<u32> {
178        let mut result = 0u32;
179        let mut shift = 0;
180
181        loop {
182            let data = cursor.get_ref();
183            let pos = cursor.position() as usize;
184
185            if pos >= data.len() {
186                return Err(Error::InvalidVarint);
187            }
188
189            let byte = data[pos];
190            cursor.set_position(cursor.position() + 1);
191
192            result |= ((byte & 0x7F) as u32) << shift;
193
194            if (byte & 0x80) == 0 {
195                break;
196            }
197
198            shift += 7;
199            if shift >= 32 {
200                return Err(Error::InvalidVarint);
201            }
202        }
203
204        Ok(result)
205    }
206
207    fn read_varint(&self, cursor: &mut Cursor<&Vec<u8>>) -> Result<u32> {
208        let mut result = 0u32;
209        let mut shift = 0;
210
211        loop {
212            if (cursor.position() as usize) >= self.data.len() {
213                return Err(Error::InvalidVarint);
214            }
215
216            let byte = self.data[cursor.position() as usize];
217            cursor.set_position(cursor.position() + 1);
218
219            result |= ((byte & 0x7F) as u32) << shift;
220
221            if (byte & 0x80) == 0 {
222                break;
223            }
224
225            shift += 7;
226            if shift >= 32 {
227                return Err(Error::InvalidVarint);
228            }
229        }
230
231        Ok(result)
232    }
233
234    fn is_restart_point(&self, offset: u32) -> bool {
235        self.restart_points.contains(&offset)
236    }
237
238    pub fn find_block_for_key(&self, target_key: &[u8]) -> Result<Option<BlockHandle>> {
239        let entries = self.get_entries()?;
240
241        for entry in entries.iter() {
242            if entry.key.as_slice() >= target_key {
243                return Ok(Some(entry.block_handle.clone()));
244            }
245        }
246
247        if let Some(last_entry) = entries.last() {
248            Ok(Some(last_entry.block_handle.clone()))
249        } else {
250            Ok(None)
251        }
252    }
253
254    pub fn get_all_block_handles(&self) -> Result<Vec<BlockHandle>> {
255        let entries = self.get_entries()?;
256        Ok(entries
257            .into_iter()
258            .map(|entry| entry.block_handle)
259            .collect())
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use crate::block_builder::IndexBlockBuilder;
267    use crate::error::Result;
268    use crate::types::{ChecksumType, CompressionType};
269
270    #[test]
271    fn test_roundtrip_index_block_single_entry() -> Result<()> {
272        let key1 = b"key001";
273        let handle1 = BlockHandle {
274            offset: 100,
275            size: 200,
276        };
277
278        let mut builder = IndexBlockBuilder::new(16);
279        builder.add_index_entry(key1, &handle1);
280        let block_data = builder.finish(CompressionType::None, ChecksumType::CRC32c, None, None)?;
281
282        let index_block = IndexBlock::new(&block_data, CompressionType::None)?;
283        let entries = index_block.get_entries()?;
284
285        assert_eq!(entries.len(), 1);
286        assert_eq!(entries[0].key, key1);
287        assert_eq!(entries[0].block_handle.offset, handle1.offset);
288        assert_eq!(entries[0].block_handle.size, handle1.size);
289        Ok(())
290    }
291
292    #[test]
293    fn test_roundtrip_find_block_for_key() -> Result<()> {
294        let key1 = b"key001";
295        let key2 = b"key002";
296        let handle1 = BlockHandle {
297            offset: 100,
298            size: 200,
299        };
300        let handle2 = BlockHandle {
301            offset: 300,
302            size: 150,
303        };
304
305        let mut builder = IndexBlockBuilder::new(1); // Use restart_interval of 1 to create restart points
306        builder.add_index_entry(key1, &handle1);
307        builder.add_index_entry(key2, &handle2);
308        let block_data = builder.finish(CompressionType::None, ChecksumType::CRC32c, None, None)?;
309
310        let index_block = IndexBlock::new(&block_data, CompressionType::None)?;
311
312        let result = index_block.find_block_for_key(b"key000")?;
313        assert!(result.is_some());
314        assert_eq!(result.unwrap().offset, handle1.offset);
315
316        let result = index_block.find_block_for_key(b"key001")?;
317        assert!(result.is_some());
318        assert_eq!(result.unwrap().offset, handle1.offset);
319
320        let result = index_block.find_block_for_key(b"key002")?;
321        assert!(result.is_some());
322        assert_eq!(result.unwrap().offset, handle2.offset);
323        Ok(())
324    }
325}