rustlite_storage/
sstable.rs

1//! SSTable - Sorted String Table format and I/O
2//!
3//! SSTables are immutable on-disk files that store key-value pairs in sorted order.
4//! They are the primary on-disk storage format for LSM-tree databases.
5//!
6//! ## File Format
7//!
8//! ```text
9//! +------------------+
10//! | Data Blocks      |  <- Key-value pairs grouped in blocks
11//! +------------------+
12//! | Index Block      |  <- Sparse index pointing to data blocks
13//! +------------------+
14//! | Footer           |  <- Index offset + magic number + CRC
15//! +------------------+
16//! ```
17
18use crate::memtable::MemtableEntry;
19use rustlite_core::{Error, Result};
20use serde::{Deserialize, Serialize};
21use std::fs::{self, File};
22use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
23use std::path::{Path, PathBuf};
24
25/// Magic number for SSTable files ("SSTBLIT" in ASCII-ish)
26const SSTABLE_MAGIC: u64 = 0x53_53_54_42_4C_49_54;
27
28/// SSTable format version (v1.0.0+)
29/// Increment this when making incompatible format changes
30const SSTABLE_FORMAT_VERSION: u16 = 1;
31
32/// Default block size (4KB)
33const DEFAULT_BLOCK_SIZE: usize = 4096;
34
35/// Entry type tags
36const ENTRY_TYPE_VALUE: u8 = 0;
37const ENTRY_TYPE_TOMBSTONE: u8 = 1;
38
39/// A single entry in an SSTable
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct SSTableEntry {
42    /// The key
43    pub key: Vec<u8>,
44    /// Entry type: 0 = value, 1 = tombstone
45    pub entry_type: u8,
46    /// The value (empty for tombstones)
47    pub value: Vec<u8>,
48}
49
50impl SSTableEntry {
51    /// Create a value entry
52    pub fn value(key: Vec<u8>, value: Vec<u8>) -> Self {
53        Self {
54            key,
55            entry_type: ENTRY_TYPE_VALUE,
56            value,
57        }
58    }
59
60    /// Create a tombstone entry
61    pub fn tombstone(key: Vec<u8>) -> Self {
62        Self {
63            key,
64            entry_type: ENTRY_TYPE_TOMBSTONE,
65            value: Vec::new(),
66        }
67    }
68
69    /// Check if this is a tombstone
70    pub fn is_tombstone(&self) -> bool {
71        self.entry_type == ENTRY_TYPE_TOMBSTONE
72    }
73}
74
75/// Index entry pointing to a data block
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct IndexEntry {
78    /// First key in the block
79    pub first_key: Vec<u8>,
80    /// Offset of the block in the file
81    pub offset: u64,
82    /// Size of the block in bytes
83    pub size: u32,
84}
85
86/// SSTable footer containing metadata
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct SSTableFooter {
89    /// Format version (v1.0.0+)
90    pub format_version: u16,
91    /// Offset of the index block
92    pub index_offset: u64,
93    /// Size of the index block
94    pub index_size: u32,
95    /// Number of entries in the SSTable
96    pub entry_count: u64,
97    /// Minimum key in the SSTable
98    pub min_key: Vec<u8>,
99    /// Maximum key in the SSTable
100    pub max_key: Vec<u8>,
101    /// Magic number for validation
102    pub magic: u64,
103    /// CRC32 of the footer data
104    pub crc: u32,
105}
106
107/// SSTable metadata (in-memory representation)
108#[derive(Debug, Clone)]
109pub struct SSTableMeta {
110    /// Path to the SSTable file
111    pub path: PathBuf,
112    /// Minimum key
113    pub min_key: Vec<u8>,
114    /// Maximum key
115    pub max_key: Vec<u8>,
116    /// Number of entries
117    pub entry_count: u64,
118    /// File size in bytes
119    pub file_size: u64,
120    /// Level in the LSM tree (0 = newest)
121    pub level: u32,
122    /// Sequence number when created
123    pub sequence: u64,
124}
125
126/// SSTable writer - creates new SSTable files
127pub struct SSTableWriter {
128    /// Output file path
129    path: PathBuf,
130    /// Buffered writer
131    writer: BufWriter<File>,
132    /// Current position in file
133    position: u64,
134    /// Index entries
135    index: Vec<IndexEntry>,
136    /// Current block buffer
137    block_buffer: Vec<u8>,
138    /// Block size threshold
139    block_size: usize,
140    /// First key of current block
141    current_block_first_key: Option<Vec<u8>>,
142    /// Entry count
143    entry_count: u64,
144    /// Minimum key
145    min_key: Option<Vec<u8>>,
146    /// Maximum key
147    max_key: Option<Vec<u8>>,
148}
149
150impl SSTableWriter {
151    /// Create a new SSTable writer
152    pub fn new(path: impl AsRef<Path>) -> Result<Self> {
153        Self::with_block_size(path, DEFAULT_BLOCK_SIZE)
154    }
155
156    /// Create a new SSTable writer with custom block size
157    pub fn with_block_size(path: impl AsRef<Path>, block_size: usize) -> Result<Self> {
158        let path = path.as_ref().to_path_buf();
159        let file = File::create(&path)?;
160
161        Ok(Self {
162            path,
163            writer: BufWriter::new(file),
164            position: 0,
165            index: Vec::new(),
166            block_buffer: Vec::with_capacity(block_size),
167            block_size,
168            current_block_first_key: None,
169            entry_count: 0,
170            min_key: None,
171            max_key: None,
172        })
173    }
174
175    /// Add an entry to the SSTable
176    pub fn add(&mut self, entry: SSTableEntry) -> Result<()> {
177        // Track min/max keys
178        if self.min_key.is_none() {
179            self.min_key = Some(entry.key.clone());
180        }
181        self.max_key = Some(entry.key.clone());
182
183        // Track first key of block
184        if self.current_block_first_key.is_none() {
185            self.current_block_first_key = Some(entry.key.clone());
186        }
187
188        // Serialize entry
189        let encoded =
190            bincode::serialize(&entry).map_err(|e| Error::Serialization(e.to_string()))?;
191
192        // Write length prefix + entry
193        let len = encoded.len() as u32;
194        self.block_buffer.extend_from_slice(&len.to_le_bytes());
195        self.block_buffer.extend_from_slice(&encoded);
196
197        self.entry_count += 1;
198
199        // Flush block if it exceeds threshold
200        if self.block_buffer.len() >= self.block_size {
201            self.flush_block()?;
202        }
203
204        Ok(())
205    }
206
207    /// Flush the current block to disk
208    fn flush_block(&mut self) -> Result<()> {
209        if self.block_buffer.is_empty() {
210            return Ok(());
211        }
212
213        // Calculate CRC
214        let crc = crc32fast::hash(&self.block_buffer);
215
216        // Create index entry
217        if let Some(first_key) = self.current_block_first_key.take() {
218            self.index.push(IndexEntry {
219                first_key,
220                offset: self.position,
221                size: self.block_buffer.len() as u32 + 4, // +4 for CRC
222            });
223        }
224
225        // Write block data
226        self.writer.write_all(&self.block_buffer)?;
227        self.position += self.block_buffer.len() as u64;
228
229        // Write block CRC
230        self.writer.write_all(&crc.to_le_bytes())?;
231        self.position += 4;
232
233        self.block_buffer.clear();
234
235        Ok(())
236    }
237
238    /// Finish writing and close the SSTable
239    pub fn finish(mut self) -> Result<SSTableMeta> {
240        // Flush any remaining data
241        self.flush_block()?;
242
243        // Write index block
244        let index_offset = self.position;
245        let index_encoded =
246            bincode::serialize(&self.index).map_err(|e| Error::Serialization(e.to_string()))?;
247        let index_size = index_encoded.len() as u32;
248
249        self.writer.write_all(&index_encoded)?;
250        self.position += index_size as u64;
251
252        // Write footer
253        let min_key = self.min_key.clone().unwrap_or_default();
254        let max_key = self.max_key.clone().unwrap_or_default();
255
256        let footer_data = SSTableFooter {
257            format_version: SSTABLE_FORMAT_VERSION,
258            index_offset,
259            index_size,
260            entry_count: self.entry_count,
261            min_key: min_key.clone(),
262            max_key: max_key.clone(),
263            magic: SSTABLE_MAGIC,
264            crc: 0, // Will be set after computing CRC
265        };
266
267        let footer_encoded =
268            bincode::serialize(&footer_data).map_err(|e| Error::Serialization(e.to_string()))?;
269        let footer_crc = crc32fast::hash(&footer_encoded);
270
271        // Write footer with correct CRC
272        let final_footer = SSTableFooter {
273            crc: footer_crc,
274            ..footer_data
275        };
276        let final_footer_encoded =
277            bincode::serialize(&final_footer).map_err(|e| Error::Serialization(e.to_string()))?;
278
279        // Write footer length + footer
280        let footer_len = final_footer_encoded.len() as u32;
281        self.writer.write_all(&final_footer_encoded)?;
282        self.writer.write_all(&footer_len.to_le_bytes())?;
283
284        self.writer.flush()?;
285
286        let file_size = self.position + final_footer_encoded.len() as u64 + 4;
287
288        Ok(SSTableMeta {
289            path: self.path,
290            min_key,
291            max_key,
292            entry_count: self.entry_count,
293            file_size,
294            level: 0,
295            sequence: 0,
296        })
297    }
298
299    /// Build an SSTable from a memtable
300    pub fn from_memtable<I>(path: impl AsRef<Path>, iter: I) -> Result<SSTableMeta>
301    where
302        I: Iterator<Item = (Vec<u8>, MemtableEntry)>,
303    {
304        let mut writer = SSTableWriter::new(path)?;
305
306        for (key, entry) in iter {
307            let sstable_entry = match entry {
308                MemtableEntry::Value(v) => SSTableEntry::value(key, v),
309                MemtableEntry::Tombstone => SSTableEntry::tombstone(key),
310            };
311            writer.add(sstable_entry)?;
312        }
313
314        writer.finish()
315    }
316}
317
318/// SSTable reader - reads from existing SSTable files
319pub struct SSTableReader {
320    /// Path to the SSTable file
321    path: PathBuf,
322    /// The file handle
323    file: BufReader<File>,
324    /// Index entries
325    index: Vec<IndexEntry>,
326    /// Footer metadata
327    footer: SSTableFooter,
328    /// File size
329    file_size: u64,
330}
331
332impl SSTableReader {
333    /// Open an SSTable file for reading
334    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
335        let path = path.as_ref().to_path_buf();
336        let mut file = File::open(&path)?;
337
338        // Get file size
339        let file_size = file.metadata()?.len();
340
341        if file_size < 4 {
342            return Err(Error::Corruption("SSTable too small".into()));
343        }
344
345        // Read footer length (last 4 bytes)
346        file.seek(SeekFrom::End(-4))?;
347        let mut footer_len_buf = [0u8; 4];
348        file.read_exact(&mut footer_len_buf)?;
349        let footer_len = u32::from_le_bytes(footer_len_buf) as i64;
350
351        // Read footer
352        file.seek(SeekFrom::End(-4 - footer_len))?;
353        let mut footer_buf = vec![0u8; footer_len as usize];
354        file.read_exact(&mut footer_buf)?;
355
356        let footer: SSTableFooter =
357            bincode::deserialize(&footer_buf).map_err(|e| Error::Serialization(e.to_string()))?;
358
359        // Validate magic number
360        if footer.magic != SSTABLE_MAGIC {
361            return Err(Error::Corruption("Invalid SSTable magic number".into()));
362        }
363
364        // Validate format version (v1.0.0+)
365        if footer.format_version != SSTABLE_FORMAT_VERSION {
366            return Err(Error::Corruption(format!(
367                "Unsupported SSTable format version: {} (expected {})",
368                footer.format_version, SSTABLE_FORMAT_VERSION
369            )));
370        }
371
372        // Read index
373        file.seek(SeekFrom::Start(footer.index_offset))?;
374        let mut index_buf = vec![0u8; footer.index_size as usize];
375        file.read_exact(&mut index_buf)?;
376
377        let index: Vec<IndexEntry> =
378            bincode::deserialize(&index_buf).map_err(|e| Error::Serialization(e.to_string()))?;
379
380        Ok(Self {
381            path,
382            file: BufReader::new(file.try_clone()?),
383            index,
384            footer,
385            file_size,
386        })
387    }
388
389    /// Get a value by key
390    pub fn get(&mut self, key: &[u8]) -> Result<Option<SSTableEntry>> {
391        // Binary search to find the block that might contain the key
392        let block_idx = self
393            .index
394            .partition_point(|entry| entry.first_key.as_slice() <= key);
395
396        // The key would be in the previous block (if any)
397        if block_idx == 0 {
398            // Key is smaller than all keys in the SSTable
399            if key < self.footer.min_key.as_slice() {
400                return Ok(None);
401            }
402        }
403
404        // Check the block
405        let block_idx = if block_idx > 0 { block_idx - 1 } else { 0 };
406
407        if block_idx >= self.index.len() {
408            return Ok(None);
409        }
410
411        // Read and search the block
412        let block = self.read_block(block_idx)?;
413
414        for entry in block {
415            if entry.key.as_slice() == key {
416                return Ok(Some(entry));
417            }
418            if entry.key.as_slice() > key {
419                break;
420            }
421        }
422
423        Ok(None)
424    }
425
426    /// Read a data block by index
427    fn read_block(&mut self, block_idx: usize) -> Result<Vec<SSTableEntry>> {
428        let index_entry = &self.index[block_idx];
429
430        self.file.seek(SeekFrom::Start(index_entry.offset))?;
431
432        let data_size = index_entry.size as usize - 4; // Subtract CRC size
433        let mut data_buf = vec![0u8; data_size];
434        self.file.read_exact(&mut data_buf)?;
435
436        // Read and verify CRC
437        let mut crc_buf = [0u8; 4];
438        self.file.read_exact(&mut crc_buf)?;
439        let stored_crc = u32::from_le_bytes(crc_buf);
440        let computed_crc = crc32fast::hash(&data_buf);
441
442        if stored_crc != computed_crc {
443            return Err(Error::Corruption("Block CRC mismatch".into()));
444        }
445
446        // Parse entries from block
447        let mut entries = Vec::new();
448        let mut offset = 0;
449
450        while offset < data_buf.len() {
451            if offset + 4 > data_buf.len() {
452                break;
453            }
454
455            let len = u32::from_le_bytes([
456                data_buf[offset],
457                data_buf[offset + 1],
458                data_buf[offset + 2],
459                data_buf[offset + 3],
460            ]) as usize;
461            offset += 4;
462
463            if offset + len > data_buf.len() {
464                break;
465            }
466
467            let entry: SSTableEntry = bincode::deserialize(&data_buf[offset..offset + len])
468                .map_err(|e| Error::Serialization(e.to_string()))?;
469            entries.push(entry);
470            offset += len;
471        }
472
473        Ok(entries)
474    }
475
476    /// Get metadata about this SSTable
477    pub fn metadata(&self) -> SSTableMeta {
478        SSTableMeta {
479            path: self.path.clone(),
480            min_key: self.footer.min_key.clone(),
481            max_key: self.footer.max_key.clone(),
482            entry_count: self.footer.entry_count,
483            file_size: self.file_size,
484            level: 0,
485            sequence: 0,
486        }
487    }
488
489    /// Check if a key might be in this SSTable (range check)
490    pub fn might_contain(&self, key: &[u8]) -> bool {
491        key >= self.footer.min_key.as_slice() && key <= self.footer.max_key.as_slice()
492    }
493
494    /// Iterate over all entries in the SSTable
495    pub fn iter(&mut self) -> Result<SSTableIterator<'_>> {
496        Ok(SSTableIterator {
497            reader: self,
498            block_idx: 0,
499            block_entries: Vec::new(),
500            entry_idx: 0,
501        })
502    }
503}
504
505/// Iterator over SSTable entries
506pub struct SSTableIterator<'a> {
507    reader: &'a mut SSTableReader,
508    block_idx: usize,
509    block_entries: Vec<SSTableEntry>,
510    entry_idx: usize,
511}
512
513impl SSTableIterator<'_> {
514    /// Get the next entry
515    pub fn next_entry(&mut self) -> Result<Option<SSTableEntry>> {
516        loop {
517            // If we have entries in the current block, return the next one
518            if self.entry_idx < self.block_entries.len() {
519                let entry = self.block_entries[self.entry_idx].clone();
520                self.entry_idx += 1;
521                return Ok(Some(entry));
522            }
523
524            // Load the next block
525            if self.block_idx >= self.reader.index.len() {
526                return Ok(None);
527            }
528
529            self.block_entries = self.reader.read_block(self.block_idx)?;
530            self.block_idx += 1;
531            self.entry_idx = 0;
532        }
533    }
534}
535
536/// Delete an SSTable file
537pub fn delete_sstable(path: impl AsRef<Path>) -> Result<()> {
538    fs::remove_file(path)?;
539    Ok(())
540}
541
542#[cfg(test)]
543mod tests {
544    use super::*;
545    use tempfile::tempdir;
546
547    #[test]
548    fn test_sstable_write_read() {
549        let dir = tempdir().unwrap();
550        let path = dir.path().join("test.sst");
551
552        // Write SSTable
553        let mut writer = SSTableWriter::new(&path).unwrap();
554        writer
555            .add(SSTableEntry::value(b"a".to_vec(), b"1".to_vec()))
556            .unwrap();
557        writer
558            .add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec()))
559            .unwrap();
560        writer
561            .add(SSTableEntry::value(b"c".to_vec(), b"3".to_vec()))
562            .unwrap();
563        let meta = writer.finish().unwrap();
564
565        assert_eq!(meta.entry_count, 3);
566        assert_eq!(meta.min_key, b"a".to_vec());
567        assert_eq!(meta.max_key, b"c".to_vec());
568
569        // Read SSTable
570        let mut reader = SSTableReader::open(&path).unwrap();
571
572        let entry = reader.get(b"a").unwrap().unwrap();
573        assert_eq!(entry.value, b"1".to_vec());
574
575        let entry = reader.get(b"b").unwrap().unwrap();
576        assert_eq!(entry.value, b"2".to_vec());
577
578        let entry = reader.get(b"c").unwrap().unwrap();
579        assert_eq!(entry.value, b"3".to_vec());
580
581        assert!(reader.get(b"d").unwrap().is_none());
582    }
583
584    #[test]
585    fn test_sstable_tombstone() {
586        let dir = tempdir().unwrap();
587        let path = dir.path().join("test.sst");
588
589        // Keys must be added in sorted order
590        let mut writer = SSTableWriter::new(&path).unwrap();
591        writer
592            .add(SSTableEntry::tombstone(b"deleted".to_vec()))
593            .unwrap();
594        writer
595            .add(SSTableEntry::value(b"key".to_vec(), b"value".to_vec()))
596            .unwrap();
597        writer.finish().unwrap();
598
599        let mut reader = SSTableReader::open(&path).unwrap();
600
601        let entry = reader.get(b"key").unwrap().unwrap();
602        assert!(!entry.is_tombstone());
603
604        let entry = reader.get(b"deleted").unwrap().unwrap();
605        assert!(entry.is_tombstone());
606    }
607
608    #[test]
609    fn test_sstable_iterator() {
610        let dir = tempdir().unwrap();
611        let path = dir.path().join("test.sst");
612
613        let mut writer = SSTableWriter::new(&path).unwrap();
614        for i in 0..100 {
615            let key = format!("key{:03}", i);
616            let value = format!("value{}", i);
617            writer
618                .add(SSTableEntry::value(key.into_bytes(), value.into_bytes()))
619                .unwrap();
620        }
621        writer.finish().unwrap();
622
623        let mut reader = SSTableReader::open(&path).unwrap();
624        let mut iter = reader.iter().unwrap();
625
626        let mut count = 0;
627        while let Some(_entry) = iter.next_entry().unwrap() {
628            count += 1;
629        }
630
631        assert_eq!(count, 100);
632    }
633
634    #[test]
635    fn test_sstable_from_memtable() {
636        use crate::memtable::Memtable;
637
638        let dir = tempdir().unwrap();
639        let path = dir.path().join("test.sst");
640
641        let mut mt = Memtable::new();
642        mt.put(b"a".to_vec(), b"1".to_vec());
643        mt.put(b"b".to_vec(), b"2".to_vec());
644        mt.delete(b"c".to_vec());
645
646        let meta = SSTableWriter::from_memtable(&path, mt.drain()).unwrap();
647
648        assert_eq!(meta.entry_count, 3);
649
650        let mut reader = SSTableReader::open(&path).unwrap();
651        assert_eq!(reader.get(b"a").unwrap().unwrap().value, b"1".to_vec());
652        assert!(reader.get(b"c").unwrap().unwrap().is_tombstone());
653    }
654
655    #[test]
656    fn test_sstable_might_contain() {
657        let dir = tempdir().unwrap();
658        let path = dir.path().join("test.sst");
659
660        let mut writer = SSTableWriter::new(&path).unwrap();
661        writer
662            .add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec()))
663            .unwrap();
664        writer
665            .add(SSTableEntry::value(b"d".to_vec(), b"4".to_vec()))
666            .unwrap();
667        writer.finish().unwrap();
668
669        let reader = SSTableReader::open(&path).unwrap();
670
671        assert!(!reader.might_contain(b"a")); // Before range
672        assert!(reader.might_contain(b"b")); // In range
673        assert!(reader.might_contain(b"c")); // In range (might be there)
674        assert!(reader.might_contain(b"d")); // In range
675        assert!(!reader.might_contain(b"e")); // After range
676    }
677}