rocksdb_fileformat/
sst_file_writer.rs

1use crate::block_builder::{DataBlockBuilder, DataBlockBuilderOptions, IndexBlockBuilder};
2use crate::block_handle::BlockHandle;
3use crate::error::{Error, Result};
4use crate::footer::Footer;
5use crate::types::{CompressionType, FormatVersion, WriteOptions};
6use byteorder::{LittleEndian, WriteBytesExt};
7use std::fs::File;
8use std::io::{BufWriter, Write};
9use std::path::Path;
10
11/// Entry type for SST files  
12#[derive(Debug, Clone, PartialEq, Eq)]
13pub enum EntryType {
14    Put,
15    Delete,
16    Merge,
17}
18
19/// SST file writer that matches RocksDB's SstFileWriter API
20pub struct SstFileWriter {
21    options: WriteOptions,
22    writer: Option<BufWriter<File>>,
23    data_block_builder: DataBlockBuilder,
24    index_block_builder: IndexBlockBuilder,
25    offset: u64,
26    num_entries: u64,
27    last_key: Vec<u8>,
28    finished: bool,
29    pending_index_entry: Option<(Vec<u8>, BlockHandle)>,
30    base_context_checksum: Option<u32>,
31}
32
33impl SstFileWriter {
34    /// Create a new SstFileWriter with the given options
35    pub fn create(opts: &WriteOptions) -> Self {
36        // Initialize base context checksum for format versions >= 6
37        let base_context_checksum = if opts.format_version >= FormatVersion::V6 {
38            Some(0) // TODO: Generate proper base context checksum
39        } else {
40            None
41        };
42
43        SstFileWriter {
44            options: opts.clone(),
45            writer: None,
46            data_block_builder: DataBlockBuilder::new(
47                DataBlockBuilderOptions::default()
48                    .with_restart_interval(opts.block_restart_interval),
49            ),
50            index_block_builder: IndexBlockBuilder::new(opts.block_restart_interval),
51            offset: 0,
52            num_entries: 0,
53            last_key: Vec::new(),
54            finished: false,
55            pending_index_entry: None,
56            base_context_checksum,
57        }
58    }
59
60    /// Open a file for writing
61    pub fn open<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
62        if self.writer.is_some() {
63            return Err(Error::InvalidArgument("File already open".to_string()));
64        }
65
66        let file = File::create(path)?;
67        self.writer = Some(BufWriter::new(file));
68        self.offset = 0;
69        self.num_entries = 0;
70        self.last_key.clear();
71        self.finished = false;
72
73        Ok(())
74    }
75
76    /// Add a key-value pair to the SST file
77    pub fn put<K, V>(&mut self, key: K, value: V) -> Result<()>
78    where
79        K: AsRef<[u8]>,
80        V: AsRef<[u8]>,
81    {
82        self.add_entry(key.as_ref(), value.as_ref(), EntryType::Put)
83    }
84
85    /// Add a merge entry to the SST file
86    pub fn merge<K, V>(&mut self, key: K, value: V) -> Result<()>
87    where
88        K: AsRef<[u8]>,
89        V: AsRef<[u8]>,
90    {
91        self.add_entry(key.as_ref(), value.as_ref(), EntryType::Merge)
92    }
93
94    /// Add a delete entry to the SST file
95    pub fn delete<K: AsRef<[u8]>>(&mut self, key: K) -> Result<()> {
96        self.add_entry(key.as_ref(), &[], EntryType::Delete)
97    }
98
99    /// Finish writing the SST file
100    pub fn finish(&mut self) -> Result<()> {
101        if self.finished {
102            return Err(Error::InvalidArgument("Already finished".to_string()));
103        }
104
105        if self.writer.is_none() {
106            return Err(Error::InvalidArgument("No file open".to_string()));
107        }
108
109        // Flush any remaining data block
110        if !self.data_block_builder.empty() {
111            self.flush_data_block()?;
112        }
113
114        // Prepare all data to write
115        let index_block_data = self.index_block_builder.finish(
116            CompressionType::None,
117            self.options.checksum_type,
118            Some(self.offset),
119            self.base_context_checksum,
120        )?;
121        let index_handle = BlockHandle {
122            offset: self.offset,
123            size: index_block_data.len() as u64,
124        };
125
126        let metaindex_offset = self.offset + index_block_data.len() as u64;
127        let metaindex_data = self.create_empty_metaindex_block(metaindex_offset)?;
128        let metaindex_handle = BlockHandle {
129            offset: metaindex_offset,
130            size: metaindex_data.len() as u64,
131        };
132
133        let footer = Footer {
134            checksum_type: self.options.checksum_type,
135            metaindex_handle,
136            index_handle,
137            format_version: self.options.format_version as u32,
138            base_context_checksum: self.base_context_checksum,
139        };
140        // Calculate where the footer will be written (after index and metaindex blocks)
141        let footer_offset =
142            self.offset + index_block_data.len() as u64 + metaindex_data.len() as u64;
143        let footer_data = footer.encode_to_bytes(footer_offset)?;
144
145        // Now write everything
146        let writer = self.writer.as_mut().unwrap();
147        writer.write_all(&index_block_data)?;
148        self.offset += index_block_data.len() as u64;
149
150        writer.write_all(&metaindex_data)?;
151        self.offset += metaindex_data.len() as u64;
152
153        writer.write_all(&footer_data)?;
154
155        writer.flush()?;
156        self.finished = true;
157
158        Ok(())
159    }
160
161    /// Get the current file size
162    pub fn file_size(&self) -> u64 {
163        self.offset
164    }
165
166    fn add_entry(&mut self, key: &[u8], value: &[u8], entry_type: EntryType) -> Result<()> {
167        if self.finished {
168            return Err(Error::InvalidArgument("Writer is finished".to_string()));
169        }
170
171        if self.writer.is_none() {
172            return Err(Error::InvalidArgument("No file open".to_string()));
173        }
174
175        // Check key ordering
176        if !self.last_key.is_empty() && key <= self.last_key.as_slice() {
177            return Err(Error::InvalidArgument(
178                "Keys must be added in strictly increasing order".to_string(),
179            ));
180        }
181
182        // Check if we need to flush the current data block
183        if self.data_block_builder.size_estimate() >= self.options.block_size
184            && !self.data_block_builder.empty()
185        {
186            self.flush_data_block()?;
187        }
188
189        // Encode the value with entry type
190        let encoded_value = self.encode_entry_value(value, entry_type);
191
192        // Add to current data block
193        self.data_block_builder.add(key, &encoded_value);
194
195        self.last_key.clear();
196        self.last_key.extend_from_slice(key);
197        self.num_entries += 1;
198
199        Ok(())
200    }
201
202    fn flush_data_block(&mut self) -> Result<()> {
203        if self.data_block_builder.empty() {
204            return Ok(());
205        }
206
207        let writer = self.writer.as_mut().unwrap();
208
209        // Finish the current data block
210        let block_data = self.data_block_builder.finish(
211            self.options.compression,
212            self.options.checksum_type,
213            Some(self.offset),
214            self.base_context_checksum,
215        )?;
216
217        // Create block handle
218        let block_handle = BlockHandle {
219            offset: self.offset,
220            size: block_data.len() as u64,
221        };
222
223        // Write data block
224        writer.write_all(&block_data)?;
225        self.offset += block_data.len() as u64;
226
227        // Add to pending index entry (we'll use the last key of this block)
228        if let Some((prev_key, prev_handle)) = self.pending_index_entry.take() {
229            self.index_block_builder
230                .add_index_entry(&prev_key, &prev_handle);
231        }
232
233        // Store this block's info for the next index entry
234        self.pending_index_entry = Some((self.last_key.clone(), block_handle));
235
236        // Reset data block builder
237        self.data_block_builder.reset();
238
239        Ok(())
240    }
241
242    fn encode_entry_value(&self, value: &[u8], entry_type: EntryType) -> Vec<u8> {
243        // For simplicity, we'll encode the entry type as a prefix byte
244        // In a real implementation, you might want to follow RocksDB's internal key format more closely
245        let mut encoded = Vec::with_capacity(value.len() + 1);
246        encoded.push(entry_type as u8);
247        encoded.extend_from_slice(value);
248        encoded
249    }
250
251    fn create_empty_metaindex_block(&self, file_offset: u64) -> Result<Vec<u8>> {
252        // Create an empty metaindex block
253        let mut block_data = Vec::new();
254
255        // Empty block with just restart info
256        block_data.write_u32::<LittleEndian>(0)?; // restart point at 0
257        block_data.write_u32::<LittleEndian>(1)?; // one restart point
258
259        // Calculate checksum over block data + compression type
260        let mut checksum_data = block_data.clone();
261        checksum_data.push(CompressionType::None as u8);
262        let mut checksum = self.options.checksum_type.calculate(&checksum_data);
263
264        // Apply context-based checksum modification if needed
265        if let Some(base_checksum) = self.base_context_checksum {
266            let modifier = crate::types::checksum_modifier_for_context(base_checksum, file_offset);
267            checksum = checksum.wrapping_add(modifier);
268        }
269
270        // Add block trailer: compression type (1 byte) + checksum (4 bytes)
271        block_data.push(CompressionType::None as u8);
272        block_data.write_u32::<LittleEndian>(checksum)?;
273
274        Ok(block_data)
275    }
276}
277
278impl Drop for SstFileWriter {
279    fn drop(&mut self) {
280        if !self.finished && self.writer.is_some() {
281            // Try to finish gracefully, but don't panic on error
282            let _ = self.finish();
283        }
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290    use crate::error::Error;
291    use crate::sst_reader::SstReader;
292    use crate::types::{ChecksumType, CompressionType, FormatVersion};
293    use tempfile::tempdir;
294
295    #[test]
296    fn test_create_writer() -> Result<()> {
297        let opts = WriteOptions::default();
298        let writer = SstFileWriter::create(&opts);
299        assert_eq!(writer.file_size(), 0);
300        Ok(())
301    }
302
303    #[test]
304    fn test_write_and_read_simple() -> Result<()> {
305        let dir =
306            tempdir().map_err(|e| Error::InvalidArgument(format!("Temp dir failed: {}", e)))?;
307        let path = dir.path().join("test.sst");
308
309        let opts = WriteOptions {
310            compression: CompressionType::None,
311            block_size: 4096,
312            block_restart_interval: 16,
313            format_version: FormatVersion::V5,
314            checksum_type: ChecksumType::CRC32c,
315        };
316
317        // Write data
318        {
319            let mut writer = SstFileWriter::create(&opts);
320            writer.open(&path)?;
321            writer.put(b"key1", b"value1")?;
322            writer.put(b"key2", b"value2")?;
323            writer.put(b"key3", b"value3")?;
324            writer.finish()?;
325        }
326
327        // Read data back
328        let reader = SstReader::open(&path)?;
329
330        let footer = reader.get_footer();
331        assert!(footer.index_handle.size > 0);
332        assert_eq!(footer.checksum_type, ChecksumType::CRC32c);
333
334        Ok(())
335    }
336
337    #[test]
338    fn test_key_ordering_enforced() -> Result<()> {
339        let dir =
340            tempdir().map_err(|e| Error::InvalidArgument(format!("Temp dir failed: {}", e)))?;
341        let path = dir.path().join("test.sst");
342
343        let opts = WriteOptions::default();
344        let mut writer = SstFileWriter::create(&opts);
345        writer.open(&path)?;
346
347        writer.put(b"key2", b"value2")?;
348
349        // This should fail because key1 < key2
350        let result = writer.put(b"key1", b"value1");
351        assert!(result.is_err());
352        Ok(())
353    }
354
355    #[test]
356    fn test_different_operations() -> Result<()> {
357        let dir =
358            tempdir().map_err(|e| Error::InvalidArgument(format!("Temp dir failed: {}", e)))?;
359        let path = dir.path().join("test.sst");
360
361        let opts = WriteOptions::default();
362        let mut writer = SstFileWriter::create(&opts);
363        writer.open(&path)?;
364
365        writer.put(b"key1", b"value1")?;
366        writer.delete(b"key2")?;
367        writer.merge(b"key3", b"merge_value")?;
368        writer.finish()?;
369
370        assert!(writer.file_size() > 0);
371        Ok(())
372    }
373
374    #[test]
375    fn test_compression() -> Result<()> {
376        let dir =
377            tempdir().map_err(|e| Error::InvalidArgument(format!("Temp dir failed: {}", e)))?;
378        let path = dir.path().join("test.sst");
379
380        let opts = WriteOptions {
381            compression: CompressionType::Snappy,
382            block_size: 1024, // Small block size to ensure compression
383            block_restart_interval: 16,
384            format_version: FormatVersion::V5,
385            checksum_type: ChecksumType::CRC32c,
386        };
387
388        let mut writer = SstFileWriter::create(&opts);
389        writer.open(&path)?;
390
391        // Add many similar keys to get good compression
392        for i in 0..100 {
393            let key = format!("key{:03}", i);
394            let value = format!("value{:03}_some_long_repeated_data", i);
395            writer.put(key.as_bytes(), value.as_bytes())?;
396        }
397
398        writer.finish()?;
399        assert!(writer.file_size() > 0);
400        Ok(())
401    }
402
403    #[test]
404    fn test_empty_file() -> Result<()> {
405        let dir =
406            tempdir().map_err(|e| Error::InvalidArgument(format!("Temp dir failed: {}", e)))?;
407        let path = dir.path().join("empty.sst");
408
409        let opts = WriteOptions::default();
410        let mut writer = SstFileWriter::create(&opts);
411        writer.open(&path)?;
412        writer.finish()?;
413
414        // Should be able to create an empty SST file
415        assert!(writer.file_size() > 0); // Will have at least footer
416        Ok(())
417    }
418
419    #[test]
420    fn test_file_not_open() -> Result<()> {
421        let opts = WriteOptions::default();
422        let mut writer = SstFileWriter::create(&opts);
423
424        // Should fail when no file is open
425        let result = writer.put(b"key1", b"value1");
426        assert!(result.is_err());
427        Ok(())
428    }
429
430    #[test]
431    fn test_already_finished() -> Result<()> {
432        let dir =
433            tempdir().map_err(|e| Error::InvalidArgument(format!("Temp dir failed: {}", e)))?;
434        let path = dir.path().join("test.sst");
435
436        let opts = WriteOptions::default();
437        let mut writer = SstFileWriter::create(&opts);
438        writer.open(&path)?;
439        writer.finish()?;
440
441        // Should fail after finish
442        let result = writer.put(b"key1", b"value1");
443        assert!(result.is_err());
444
445        // Should fail to finish again
446        let result = writer.finish();
447        assert!(result.is_err());
448        Ok(())
449    }
450
451    #[test]
452    fn test_rountrip_v5_xxh3() -> Result<()> {
453        let dir =
454            tempdir().map_err(|e| Error::InvalidArgument(format!("Temp dir failed: {}", e)))?;
455        let path = dir.path().join("checksum_test_v5_xxh3.sst");
456
457        // Test with XXH3 checksum type
458        let opts = WriteOptions {
459            compression: CompressionType::None,
460            block_size: 4096,
461            block_restart_interval: 16,
462            format_version: FormatVersion::V5,
463            checksum_type: ChecksumType::XXH3,
464        };
465
466        // Write data
467        {
468            let mut writer = SstFileWriter::create(&opts);
469            writer.open(&path)?;
470            writer.put(b"key1", b"value1")?;
471            writer.put(b"key2", b"value2")?;
472            writer.finish()?;
473        }
474
475        // Read data back and verify the checksum type was used
476        let reader = SstReader::open(&path)?;
477        let footer = reader.get_footer();
478        assert_eq!(footer.checksum_type, ChecksumType::XXH3);
479        Ok(())
480    }
481
482    #[test]
483    fn test_rountrip_v6_xxh3() -> Result<()> {
484        let dir =
485            tempdir().map_err(|e| Error::InvalidArgument(format!("Temp dir failed: {}", e)))?;
486        let path = dir.path().join("checksum_test_v6_xxh3.sst");
487
488        // Test with XXH3 checksum type
489        let opts = WriteOptions {
490            compression: CompressionType::None,
491            block_size: 4096,
492            block_restart_interval: 16,
493            format_version: FormatVersion::V6,
494            checksum_type: ChecksumType::XXH3,
495        };
496
497        // Write data
498        {
499            let mut writer = SstFileWriter::create(&opts);
500            writer.open(&path)?;
501            writer.put(b"key1", b"value1")?;
502            writer.put(b"key2", b"value2")?;
503            writer.finish()?;
504        }
505
506        // Read data back and verify the checksum type was used
507        let reader = SstReader::open(&path)?;
508        let footer = reader.get_footer();
509        assert_eq!(footer.checksum_type, ChecksumType::XXH3);
510        Ok(())
511    }
512
513    #[test]
514    fn test_rountrip_v7_xxh3() -> Result<()> {
515        let dir =
516            tempdir().map_err(|e| Error::InvalidArgument(format!("Temp dir failed: {}", e)))?;
517        let path = dir.path().join("checksum_test_v7_xxh3.sst");
518
519        // Test with XXH3 checksum type
520        let opts = WriteOptions {
521            compression: CompressionType::None,
522            block_size: 4096,
523            block_restart_interval: 16,
524            format_version: FormatVersion::V7,
525            checksum_type: ChecksumType::XXH3,
526        };
527
528        // Write data
529        {
530            let mut writer = SstFileWriter::create(&opts);
531            writer.open(&path)?;
532            writer.put(b"key1", b"value1")?;
533            writer.put(b"key2", b"value2")?;
534            writer.finish()?;
535        }
536
537        // Read data back and verify the checksum type was used
538        let reader = SstReader::open(&path)?;
539        let footer = reader.get_footer();
540        assert_eq!(footer.checksum_type, ChecksumType::XXH3);
541        Ok(())
542    }
543}