rocksdb_fileformat/
block_builder.rs

1use crate::block_handle::BlockHandle;
2use crate::compression::compress;
3use crate::error::Result;
4use crate::types::{ChecksumType, CompressionType, checksum_modifier_for_context};
5use byteorder::{LittleEndian, WriteBytesExt};
6
7/// Configuration options for DataBlockBuilder
8#[derive(Debug, Clone)]
9pub struct DataBlockBuilderOptions {
10    /// Number of entries between restart points for prefix compression
11    pub restart_interval: usize,
12    /// Target block size in bytes (for future use)
13    pub block_size_target: Option<usize>,
14    /// Whether to enable checksum verification (for future use)
15    pub enable_checksums: bool,
16}
17
18impl Default for DataBlockBuilderOptions {
19    fn default() -> Self {
20        Self {
21            restart_interval: 16,
22            block_size_target: None,
23            enable_checksums: false,
24        }
25    }
26}
27
28impl DataBlockBuilderOptions {
29    /// Set the restart interval
30    pub fn with_restart_interval(mut self, restart_interval: usize) -> Self {
31        self.restart_interval = restart_interval;
32        self
33    }
34
35    /// Set the target block size
36    pub fn with_block_size_target(mut self, size: usize) -> Self {
37        self.block_size_target = Some(size);
38        self
39    }
40
41    /// Enable checksum verification
42    pub fn with_checksums(mut self, enable: bool) -> Self {
43        self.enable_checksums = enable;
44        self
45    }
46}
47
48/// Builder for data blocks with prefix compression and restart points
49pub struct DataBlockBuilder {
50    buffer: Vec<u8>,
51    restarts: Vec<u32>,
52    counter: usize,
53    options: DataBlockBuilderOptions,
54    last_key: Vec<u8>,
55    finished: bool,
56}
57
58impl DataBlockBuilder {
59    /// Create a new DataBlockBuilder with the specified options
60    pub fn new(options: DataBlockBuilderOptions) -> Self {
61        let mut builder = DataBlockBuilder {
62            buffer: Vec::new(),
63            restarts: Vec::new(),
64            counter: 0,
65            options,
66            last_key: Vec::new(),
67            finished: false,
68        };
69
70        // Add first restart point
71        builder.restarts.push(0);
72        builder
73    }
74
75    pub fn add(&mut self, key: &[u8], value: &[u8]) {
76        assert!(!self.finished);
77        assert!(self.counter <= self.options.restart_interval);
78        assert!(self.buffer.len() < u32::MAX as usize);
79
80        let mut shared = 0;
81        if self.counter < self.options.restart_interval {
82            // Find shared prefix with last key
83            let min_len = std::cmp::min(self.last_key.len(), key.len());
84            while shared < min_len && self.last_key[shared] == key[shared] {
85                shared += 1;
86            }
87        } else {
88            // Restart
89            self.restarts.push(self.buffer.len() as u32);
90            self.counter = 0;
91        }
92
93        let non_shared = key.len() - shared;
94
95        // Encode entry: shared_length(varint) non_shared_length(varint) value_length(varint) key_delta value
96        self.encode_varint(shared as u32);
97        self.encode_varint(non_shared as u32);
98        self.encode_varint(value.len() as u32);
99
100        // Add key delta
101        self.buffer.extend_from_slice(&key[shared..]);
102
103        // Add value
104        self.buffer.extend_from_slice(value);
105
106        // Update state
107        self.last_key.clear();
108        self.last_key.extend_from_slice(key);
109        self.counter += 1;
110    }
111
112    pub fn finish(
113        &mut self,
114        compression_type: CompressionType,
115        checksum_type: ChecksumType,
116        file_offset: Option<u64>,
117        base_context_checksum: Option<u32>,
118    ) -> Result<Vec<u8>> {
119        if self.finished {
120            panic!("DataBlockBuilder already finished");
121        }
122        self.finished = true;
123
124        // Add restart array
125        for restart in &self.restarts {
126            self.buffer.write_u32::<LittleEndian>(*restart).unwrap();
127        }
128
129        // Add restart count
130        self.buffer
131            .write_u32::<LittleEndian>(self.restarts.len() as u32)
132            .unwrap();
133
134        // Calculate checksum over the block data + compression type
135        let mut checksum_data = self.buffer.clone();
136        checksum_data.push(compression_type as u8);
137        let mut checksum = checksum_type.calculate(&checksum_data);
138
139        // Apply context-based checksum modification if needed
140        if let (Some(offset), Some(base_checksum)) = (file_offset, base_context_checksum) {
141            let modifier = checksum_modifier_for_context(base_checksum, offset);
142            checksum = checksum.wrapping_add(modifier);
143        }
144
145        // For uncompressed blocks
146        if compression_type == CompressionType::None {
147            let mut result = self.buffer.clone();
148            result.push(compression_type as u8);
149            result.write_u32::<LittleEndian>(checksum).unwrap();
150            Ok(result)
151        } else {
152            // Compress the data (without the trailer)
153            let compressed_data = compress(&self.buffer, compression_type)?;
154
155            // Recalculate checksum over compressed data + compression type
156            let mut compressed_checksum_data = compressed_data.clone();
157            compressed_checksum_data.push(compression_type as u8);
158            let mut compressed_checksum = checksum_type.calculate(&compressed_checksum_data);
159
160            // Apply context-based checksum modification if needed
161            if let (Some(offset), Some(base_checksum)) = (file_offset, base_context_checksum) {
162                let modifier = checksum_modifier_for_context(base_checksum, offset);
163                compressed_checksum = compressed_checksum.wrapping_add(modifier);
164            }
165
166            // Add the trailer after compression
167            let mut result = compressed_data;
168            result.push(compression_type as u8);
169            result
170                .write_u32::<LittleEndian>(compressed_checksum)
171                .unwrap();
172
173            Ok(result)
174        }
175    }
176
177    pub fn reset(&mut self) {
178        self.buffer.clear();
179        self.restarts.clear();
180        self.restarts.push(0);
181        self.counter = 0;
182        self.last_key.clear();
183        self.finished = false;
184    }
185
186    pub fn empty(&self) -> bool {
187        self.buffer.is_empty()
188    }
189
190    pub fn size_estimate(&self) -> usize {
191        self.buffer.len() + 4 * self.restarts.len() + 4 + 5 // restarts + count + trailer
192    }
193
194    fn encode_varint(&mut self, mut value: u32) {
195        while value >= 0x80 {
196            self.buffer.push((value & 0x7F) as u8 | 0x80);
197            value >>= 7;
198        }
199        self.buffer.push(value as u8);
200    }
201}
202
203/// Builder for index blocks that track data block locations
204pub struct IndexBlockBuilder {
205    buffer: Vec<u8>,
206    restarts: Vec<u32>,
207    counter: usize,
208    restart_interval: usize,
209    last_key: Vec<u8>,
210    finished: bool,
211}
212
213impl IndexBlockBuilder {
214    pub fn new(restart_interval: usize) -> Self {
215        let mut builder = IndexBlockBuilder {
216            buffer: Vec::new(),
217            restarts: Vec::new(),
218            counter: 0,
219            restart_interval,
220            last_key: Vec::new(),
221            finished: false,
222        };
223
224        // Add first restart point
225        builder.restarts.push(0);
226        builder
227    }
228
229    pub fn add_index_entry(&mut self, key: &[u8], block_handle: &BlockHandle) {
230        assert!(!self.finished);
231        assert!(self.counter <= self.restart_interval);
232        assert!(self.buffer.len() < u32::MAX as usize);
233
234        let mut shared = 0;
235        if self.counter < self.restart_interval {
236            // Find shared prefix with last key
237            let min_len = std::cmp::min(self.last_key.len(), key.len());
238            while shared < min_len && self.last_key[shared] == key[shared] {
239                shared += 1;
240            }
241        } else {
242            // Restart
243            self.restarts.push(self.buffer.len() as u32);
244            self.counter = 0;
245        }
246
247        let non_shared = key.len() - shared;
248
249        // Encode block handle as value
250        let mut handle_data = Vec::new();
251        self.encode_varint_to(&mut handle_data, block_handle.offset as u32);
252        self.encode_varint_to(&mut handle_data, block_handle.size as u32);
253
254        // Encode entry: shared_length(varint) non_shared_length(varint) value_length(varint) key_delta block_handle
255        self.encode_varint(shared as u32);
256        self.encode_varint(non_shared as u32);
257        self.encode_varint(handle_data.len() as u32);
258
259        // Add key delta
260        self.buffer.extend_from_slice(&key[shared..]);
261
262        // Add block handle
263        self.buffer.extend_from_slice(&handle_data);
264
265        // Update state
266        self.last_key.clear();
267        self.last_key.extend_from_slice(key);
268        self.counter += 1;
269    }
270
271    pub fn finish(
272        &mut self,
273        compression_type: CompressionType,
274        checksum_type: ChecksumType,
275        file_offset: Option<u64>,
276        base_context_checksum: Option<u32>,
277    ) -> Result<Vec<u8>> {
278        if self.finished {
279            panic!("IndexBlockBuilder already finished");
280        }
281        self.finished = true;
282
283        // Add restart array
284        for restart in &self.restarts {
285            self.buffer.write_u32::<LittleEndian>(*restart).unwrap();
286        }
287
288        // Add restart count
289        self.buffer
290            .write_u32::<LittleEndian>(self.restarts.len() as u32)
291            .unwrap();
292
293        // Calculate checksum over the block data + compression type
294        let mut checksum_data = self.buffer.clone();
295        checksum_data.push(compression_type as u8);
296        let mut checksum = checksum_type.calculate(&checksum_data);
297
298        // Apply context-based checksum modification if needed
299        if let (Some(offset), Some(base_checksum)) = (file_offset, base_context_checksum) {
300            let modifier = checksum_modifier_for_context(base_checksum, offset);
301            checksum = checksum.wrapping_add(modifier);
302        }
303
304        // For uncompressed blocks
305        if compression_type == CompressionType::None {
306            let mut result = self.buffer.clone();
307            result.push(compression_type as u8);
308            result.write_u32::<LittleEndian>(checksum).unwrap();
309            Ok(result)
310        } else {
311            // Compress the data (without the trailer)
312            let compressed_data = compress(&self.buffer, compression_type)?;
313
314            // Recalculate checksum over compressed data + compression type
315            let mut compressed_checksum_data = compressed_data.clone();
316            compressed_checksum_data.push(compression_type as u8);
317            let mut compressed_checksum = checksum_type.calculate(&compressed_checksum_data);
318
319            // Apply context-based checksum modification if needed
320            if let (Some(offset), Some(base_checksum)) = (file_offset, base_context_checksum) {
321                let modifier = checksum_modifier_for_context(base_checksum, offset);
322                compressed_checksum = compressed_checksum.wrapping_add(modifier);
323            }
324
325            // Add the trailer after compression
326            let mut result = compressed_data;
327            result.push(compression_type as u8);
328            result
329                .write_u32::<LittleEndian>(compressed_checksum)
330                .unwrap();
331
332            Ok(result)
333        }
334    }
335
336    pub fn empty(&self) -> bool {
337        self.buffer.is_empty()
338    }
339
340    fn encode_varint(&mut self, mut value: u32) {
341        while value >= 0x80 {
342            self.buffer.push((value & 0x7F) as u8 | 0x80);
343            value >>= 7;
344        }
345        self.buffer.push(value as u8);
346    }
347
348    fn encode_varint_to(&self, buffer: &mut Vec<u8>, mut value: u32) {
349        while value >= 0x80 {
350            buffer.push((value & 0x7F) as u8 | 0x80);
351            value >>= 7;
352        }
353        buffer.push(value as u8);
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360    use crate::types::{ChecksumType, CompressionType};
361
362    #[test]
363    fn test_data_block_builder_simple() -> Result<()> {
364        let mut builder =
365            DataBlockBuilder::new(DataBlockBuilderOptions::default().with_restart_interval(16));
366
367        builder.add(b"key1", b"value1");
368        builder.add(b"key2", b"value2");
369
370        let block_data = builder.finish(CompressionType::None, ChecksumType::CRC32c, None, None)?;
371        assert!(!block_data.is_empty());
372        Ok(())
373    }
374
375    #[test]
376    fn test_data_block_builder_with_compression() -> Result<()> {
377        let mut builder =
378            DataBlockBuilder::new(DataBlockBuilderOptions::default().with_restart_interval(16));
379
380        // Add multiple entries to test compression
381        for i in 0..10 {
382            let key = format!("key{:03}", i);
383            let value = format!("value{:03}", i);
384            builder.add(key.as_bytes(), value.as_bytes());
385        }
386
387        let compressed_block =
388            builder.finish(CompressionType::Snappy, ChecksumType::CRC32c, None, None)?;
389        assert!(!compressed_block.is_empty());
390        Ok(())
391    }
392
393    #[test]
394    fn test_index_block_builder() -> Result<()> {
395        let mut builder = IndexBlockBuilder::new(16);
396
397        let handle1 = BlockHandle {
398            offset: 0,
399            size: 100,
400        };
401        let handle2 = BlockHandle {
402            offset: 100,
403            size: 150,
404        };
405
406        builder.add_index_entry(b"key1", &handle1);
407        builder.add_index_entry(b"key2", &handle2);
408
409        let block_data = builder.finish(CompressionType::None, ChecksumType::CRC32c, None, None)?;
410        assert!(!block_data.is_empty());
411        Ok(())
412    }
413
414    #[test]
415    fn test_data_block_builder_empty() -> Result<()> {
416        let builder =
417            DataBlockBuilder::new(DataBlockBuilderOptions::default().with_restart_interval(16));
418        assert!(builder.empty());
419        Ok(())
420    }
421
422    #[test]
423    fn test_data_block_builder_reset() -> Result<()> {
424        let mut builder =
425            DataBlockBuilder::new(DataBlockBuilderOptions::default().with_restart_interval(16));
426        builder.add(b"key1", b"value1");
427        assert!(!builder.empty());
428
429        builder.reset();
430        assert!(builder.empty());
431        Ok(())
432    }
433}