rocksdb_fileformat/
index_block.rs1use crate::block_handle::BlockHandle;
2use crate::compression::decompress;
3use crate::error::{Error, Result};
4use crate::types::CompressionType;
5use byteorder::{LittleEndian, ReadBytesExt};
6use std::io::Cursor;
7
8pub struct IndexEntry {
9 pub key: Vec<u8>,
10 pub block_handle: BlockHandle,
11}
12
13pub struct IndexBlock {
14 data: Vec<u8>,
15 restart_offset: usize,
16 num_restarts: u32,
17 restart_points: Vec<u32>,
18}
19
20impl IndexBlock {
21 pub fn new(compressed_data: &[u8], compression_type: CompressionType) -> Result<Self> {
22 let raw_data = decompress(compressed_data, compression_type)?;
23
24 let data = if raw_data.len() >= 5 {
26 raw_data[..raw_data.len() - 5].to_vec()
27 } else {
28 raw_data
29 };
30
31 if data.len() < 4 {
32 return Err(Error::InvalidBlockFormat(
33 "Index block too small to contain restart info".to_string(),
34 ));
35 }
36
37 let mut cursor = Cursor::new(&data);
38 cursor.set_position((data.len() - 4) as u64);
39 let num_restarts = cursor.read_u32::<LittleEndian>()?;
40
41 if num_restarts == 0 || num_restarts > 1000 {
43 let data_len = data.len();
46 return Ok(IndexBlock {
47 data,
48 restart_offset: data_len,
49 num_restarts: 1,
50 restart_points: vec![0],
51 });
52 }
53
54 if data.len() < 4 + (num_restarts as usize * 4) {
55 let data_len = data.len();
57 return Ok(IndexBlock {
58 data,
59 restart_offset: data_len,
60 num_restarts: 1,
61 restart_points: vec![0],
62 });
63 }
64
65 let restart_offset = data.len() - 4 - (num_restarts as usize * 4);
66 if restart_offset >= data.len() {
67 return Err(Error::InvalidBlockFormat(
68 "Invalid restart offset in index block".to_string(),
69 ));
70 }
71
72 let mut restart_points = Vec::with_capacity(num_restarts as usize);
73 cursor.set_position(restart_offset as u64);
74
75 for _ in 0..num_restarts {
76 restart_points.push(cursor.read_u32::<LittleEndian>()?);
77 }
78
79 Ok(IndexBlock {
80 data,
81 restart_offset,
82 num_restarts,
83 restart_points,
84 })
85 }
86
87 pub fn get_entries(&self) -> Result<Vec<IndexEntry>> {
88 let mut entries = Vec::new();
89 let mut cursor = Cursor::new(&self.data);
90 let mut last_key = Vec::new();
91
92 let mut start_pos = 0;
94 if self.data.len() > 0 && self.data[0] != 0 {
95 for &restart_pos in &self.restart_points {
97 if restart_pos < self.data.len() as u32 && restart_pos > 0 {
98 if (restart_pos as usize) < self.data.len()
99 && self.data[restart_pos as usize] == 0
100 {
101 start_pos = restart_pos as usize;
102 break;
103 }
104 }
105 }
106 }
107
108 cursor.set_position(start_pos as u64);
109
110 while (cursor.position() as usize) < self.restart_offset {
111 let entry_start = cursor.position();
112
113 let shared_key_len = self.read_varint(&mut cursor)?;
114 let unshared_key_len = self.read_varint(&mut cursor)?;
115 let value_len = self.read_varint(&mut cursor)?;
116
117 if shared_key_len > last_key.len() as u32 {
118 return Err(Error::InvalidBlockFormat(
119 "Shared key length exceeds previous key length in index block".to_string(),
120 ));
121 }
122
123 let mut key = Vec::new();
124 key.extend_from_slice(&last_key[..shared_key_len as usize]);
125
126 if unshared_key_len > 0 {
127 let pos = cursor.position() as usize;
128 if pos + unshared_key_len as usize > self.data.len() {
129 return Err(Error::InvalidBlockFormat(
130 "Index key extends beyond block".to_string(),
131 ));
132 }
133 key.extend_from_slice(&self.data[pos..pos + unshared_key_len as usize]);
134 cursor.set_position((pos + unshared_key_len as usize) as u64);
135 }
136
137 if value_len == 0 {
138 return Err(Error::InvalidBlockFormat(
139 "Index entry must have value (block handle)".to_string(),
140 ));
141 }
142
143 let value_start = cursor.position() as usize;
144 if value_start + value_len as usize > self.data.len() {
145 return Err(Error::InvalidBlockFormat(
146 "Index value extends beyond block".to_string(),
147 ));
148 }
149
150 let value_data = &self.data[value_start..value_start + value_len as usize];
151 let block_handle = self.parse_block_handle(value_data)?;
152 cursor.set_position((value_start + value_len as usize) as u64);
153
154 last_key = key.clone();
155 entries.push(IndexEntry { key, block_handle });
156
157 if self.is_restart_point(entry_start as u32) {
158 last_key.clear();
159 }
160 }
161
162 Ok(entries)
163 }
164
165 fn parse_block_handle(&self, data: &[u8]) -> Result<BlockHandle> {
166 let mut cursor = Cursor::new(data);
167
168 let offset = self.read_varint_from_slice(&mut cursor)?;
169 let size = self.read_varint_from_slice(&mut cursor)?;
170
171 Ok(BlockHandle {
172 offset: offset as u64,
173 size: size as u64,
174 })
175 }
176
177 fn read_varint_from_slice(&self, cursor: &mut Cursor<&[u8]>) -> Result<u32> {
178 let mut result = 0u32;
179 let mut shift = 0;
180
181 loop {
182 let data = cursor.get_ref();
183 let pos = cursor.position() as usize;
184
185 if pos >= data.len() {
186 return Err(Error::InvalidVarint);
187 }
188
189 let byte = data[pos];
190 cursor.set_position(cursor.position() + 1);
191
192 result |= ((byte & 0x7F) as u32) << shift;
193
194 if (byte & 0x80) == 0 {
195 break;
196 }
197
198 shift += 7;
199 if shift >= 32 {
200 return Err(Error::InvalidVarint);
201 }
202 }
203
204 Ok(result)
205 }
206
207 fn read_varint(&self, cursor: &mut Cursor<&Vec<u8>>) -> Result<u32> {
208 let mut result = 0u32;
209 let mut shift = 0;
210
211 loop {
212 if (cursor.position() as usize) >= self.data.len() {
213 return Err(Error::InvalidVarint);
214 }
215
216 let byte = self.data[cursor.position() as usize];
217 cursor.set_position(cursor.position() + 1);
218
219 result |= ((byte & 0x7F) as u32) << shift;
220
221 if (byte & 0x80) == 0 {
222 break;
223 }
224
225 shift += 7;
226 if shift >= 32 {
227 return Err(Error::InvalidVarint);
228 }
229 }
230
231 Ok(result)
232 }
233
234 fn is_restart_point(&self, offset: u32) -> bool {
235 self.restart_points.contains(&offset)
236 }
237
238 pub fn find_block_for_key(&self, target_key: &[u8]) -> Result<Option<BlockHandle>> {
239 let entries = self.get_entries()?;
240
241 for entry in entries.iter() {
242 if entry.key.as_slice() >= target_key {
243 return Ok(Some(entry.block_handle.clone()));
244 }
245 }
246
247 if let Some(last_entry) = entries.last() {
248 Ok(Some(last_entry.block_handle.clone()))
249 } else {
250 Ok(None)
251 }
252 }
253
254 pub fn get_all_block_handles(&self) -> Result<Vec<BlockHandle>> {
255 let entries = self.get_entries()?;
256 Ok(entries
257 .into_iter()
258 .map(|entry| entry.block_handle)
259 .collect())
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use crate::block_builder::IndexBlockBuilder;
267 use crate::error::Result;
268 use crate::types::{ChecksumType, CompressionType};
269
270 #[test]
271 fn test_roundtrip_index_block_single_entry() -> Result<()> {
272 let key1 = b"key001";
273 let handle1 = BlockHandle {
274 offset: 100,
275 size: 200,
276 };
277
278 let mut builder = IndexBlockBuilder::new(16);
279 builder.add_index_entry(key1, &handle1);
280 let block_data = builder.finish(CompressionType::None, ChecksumType::CRC32c, None, None)?;
281
282 let index_block = IndexBlock::new(&block_data, CompressionType::None)?;
283 let entries = index_block.get_entries()?;
284
285 assert_eq!(entries.len(), 1);
286 assert_eq!(entries[0].key, key1);
287 assert_eq!(entries[0].block_handle.offset, handle1.offset);
288 assert_eq!(entries[0].block_handle.size, handle1.size);
289 Ok(())
290 }
291
292 #[test]
293 fn test_roundtrip_find_block_for_key() -> Result<()> {
294 let key1 = b"key001";
295 let key2 = b"key002";
296 let handle1 = BlockHandle {
297 offset: 100,
298 size: 200,
299 };
300 let handle2 = BlockHandle {
301 offset: 300,
302 size: 150,
303 };
304
305 let mut builder = IndexBlockBuilder::new(1); builder.add_index_entry(key1, &handle1);
307 builder.add_index_entry(key2, &handle2);
308 let block_data = builder.finish(CompressionType::None, ChecksumType::CRC32c, None, None)?;
309
310 let index_block = IndexBlock::new(&block_data, CompressionType::None)?;
311
312 let result = index_block.find_block_for_key(b"key000")?;
313 assert!(result.is_some());
314 assert_eq!(result.unwrap().offset, handle1.offset);
315
316 let result = index_block.find_block_for_key(b"key001")?;
317 assert!(result.is_some());
318 assert_eq!(result.unwrap().offset, handle1.offset);
319
320 let result = index_block.find_block_for_key(b"key002")?;
321 assert!(result.is_some());
322 assert_eq!(result.unwrap().offset, handle2.offset);
323 Ok(())
324 }
325}