rustlite_storage/
sstable.rs

1//! SSTable - Sorted String Table format and I/O
2//!
3//! SSTables are immutable on-disk files that store key-value pairs in sorted order.
4//! They are the primary on-disk storage format for LSM-tree databases.
5//!
6//! ## File Format
7//!
8//! ```text
9//! +------------------+
10//! | Data Blocks      |  <- Key-value pairs grouped in blocks
11//! +------------------+
12//! | Index Block      |  <- Sparse index pointing to data blocks
13//! +------------------+
14//! | Footer           |  <- Index offset + magic number + CRC
15//! +------------------+
16//! ```
17
18use crate::memtable::MemtableEntry;
19use rustlite_core::{Error, Result};
20use serde::{Deserialize, Serialize};
21use std::fs::{self, File};
22use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
23use std::path::{Path, PathBuf};
24
25/// Magic number for SSTable files
26const SSTABLE_MAGIC: u64 = 0x53_53_54_42_4C_49_54; // "SSTBLIT" in hex-ish
27
28/// Default block size (4KB)
29const DEFAULT_BLOCK_SIZE: usize = 4096;
30
31/// Entry type tags
32const ENTRY_TYPE_VALUE: u8 = 0;
33const ENTRY_TYPE_TOMBSTONE: u8 = 1;
34
35/// A single entry in an SSTable
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct SSTableEntry {
38    /// The key
39    pub key: Vec<u8>,
40    /// Entry type: 0 = value, 1 = tombstone
41    pub entry_type: u8,
42    /// The value (empty for tombstones)
43    pub value: Vec<u8>,
44}
45
46impl SSTableEntry {
47    /// Create a value entry
48    pub fn value(key: Vec<u8>, value: Vec<u8>) -> Self {
49        Self {
50            key,
51            entry_type: ENTRY_TYPE_VALUE,
52            value,
53        }
54    }
55
56    /// Create a tombstone entry
57    pub fn tombstone(key: Vec<u8>) -> Self {
58        Self {
59            key,
60            entry_type: ENTRY_TYPE_TOMBSTONE,
61            value: Vec::new(),
62        }
63    }
64
65    /// Check if this is a tombstone
66    pub fn is_tombstone(&self) -> bool {
67        self.entry_type == ENTRY_TYPE_TOMBSTONE
68    }
69}
70
71/// Index entry pointing to a data block
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct IndexEntry {
74    /// First key in the block
75    pub first_key: Vec<u8>,
76    /// Offset of the block in the file
77    pub offset: u64,
78    /// Size of the block in bytes
79    pub size: u32,
80}
81
82/// SSTable footer containing metadata
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct SSTableFooter {
85    /// Offset of the index block
86    pub index_offset: u64,
87    /// Size of the index block
88    pub index_size: u32,
89    /// Number of entries in the SSTable
90    pub entry_count: u64,
91    /// Minimum key in the SSTable
92    pub min_key: Vec<u8>,
93    /// Maximum key in the SSTable
94    pub max_key: Vec<u8>,
95    /// Magic number for validation
96    pub magic: u64,
97    /// CRC32 of the footer data
98    pub crc: u32,
99}
100
101/// SSTable metadata (in-memory representation)
102#[derive(Debug, Clone)]
103pub struct SSTableMeta {
104    /// Path to the SSTable file
105    pub path: PathBuf,
106    /// Minimum key
107    pub min_key: Vec<u8>,
108    /// Maximum key
109    pub max_key: Vec<u8>,
110    /// Number of entries
111    pub entry_count: u64,
112    /// File size in bytes
113    pub file_size: u64,
114    /// Level in the LSM tree (0 = newest)
115    pub level: u32,
116    /// Sequence number when created
117    pub sequence: u64,
118}
119
120/// SSTable writer - creates new SSTable files
121pub struct SSTableWriter {
122    /// Output file path
123    path: PathBuf,
124    /// Buffered writer
125    writer: BufWriter<File>,
126    /// Current position in file
127    position: u64,
128    /// Index entries
129    index: Vec<IndexEntry>,
130    /// Current block buffer
131    block_buffer: Vec<u8>,
132    /// Block size threshold
133    block_size: usize,
134    /// First key of current block
135    current_block_first_key: Option<Vec<u8>>,
136    /// Entry count
137    entry_count: u64,
138    /// Minimum key
139    min_key: Option<Vec<u8>>,
140    /// Maximum key
141    max_key: Option<Vec<u8>>,
142}
143
144impl SSTableWriter {
145    /// Create a new SSTable writer
146    pub fn new(path: impl AsRef<Path>) -> Result<Self> {
147        Self::with_block_size(path, DEFAULT_BLOCK_SIZE)
148    }
149
150    /// Create a new SSTable writer with custom block size
151    pub fn with_block_size(path: impl AsRef<Path>, block_size: usize) -> Result<Self> {
152        let path = path.as_ref().to_path_buf();
153        let file = File::create(&path)?;
154
155        Ok(Self {
156            path,
157            writer: BufWriter::new(file),
158            position: 0,
159            index: Vec::new(),
160            block_buffer: Vec::with_capacity(block_size),
161            block_size,
162            current_block_first_key: None,
163            entry_count: 0,
164            min_key: None,
165            max_key: None,
166        })
167    }
168
169    /// Add an entry to the SSTable
170    pub fn add(&mut self, entry: SSTableEntry) -> Result<()> {
171        // Track min/max keys
172        if self.min_key.is_none() {
173            self.min_key = Some(entry.key.clone());
174        }
175        self.max_key = Some(entry.key.clone());
176
177        // Track first key of block
178        if self.current_block_first_key.is_none() {
179            self.current_block_first_key = Some(entry.key.clone());
180        }
181
182        // Serialize entry
183        let encoded =
184            bincode::serialize(&entry).map_err(|e| Error::Serialization(e.to_string()))?;
185
186        // Write length prefix + entry
187        let len = encoded.len() as u32;
188        self.block_buffer.extend_from_slice(&len.to_le_bytes());
189        self.block_buffer.extend_from_slice(&encoded);
190
191        self.entry_count += 1;
192
193        // Flush block if it exceeds threshold
194        if self.block_buffer.len() >= self.block_size {
195            self.flush_block()?;
196        }
197
198        Ok(())
199    }
200
201    /// Flush the current block to disk
202    fn flush_block(&mut self) -> Result<()> {
203        if self.block_buffer.is_empty() {
204            return Ok(());
205        }
206
207        // Calculate CRC
208        let crc = crc32fast::hash(&self.block_buffer);
209
210        // Create index entry
211        if let Some(first_key) = self.current_block_first_key.take() {
212            self.index.push(IndexEntry {
213                first_key,
214                offset: self.position,
215                size: self.block_buffer.len() as u32 + 4, // +4 for CRC
216            });
217        }
218
219        // Write block data
220        self.writer.write_all(&self.block_buffer)?;
221        self.position += self.block_buffer.len() as u64;
222
223        // Write block CRC
224        self.writer.write_all(&crc.to_le_bytes())?;
225        self.position += 4;
226
227        self.block_buffer.clear();
228
229        Ok(())
230    }
231
232    /// Finish writing and close the SSTable
233    pub fn finish(mut self) -> Result<SSTableMeta> {
234        // Flush any remaining data
235        self.flush_block()?;
236
237        // Write index block
238        let index_offset = self.position;
239        let index_encoded =
240            bincode::serialize(&self.index).map_err(|e| Error::Serialization(e.to_string()))?;
241        let index_size = index_encoded.len() as u32;
242
243        self.writer.write_all(&index_encoded)?;
244        self.position += index_size as u64;
245
246        // Write footer
247        let min_key = self.min_key.clone().unwrap_or_default();
248        let max_key = self.max_key.clone().unwrap_or_default();
249
250        let footer_data = SSTableFooter {
251            index_offset,
252            index_size,
253            entry_count: self.entry_count,
254            min_key: min_key.clone(),
255            max_key: max_key.clone(),
256            magic: SSTABLE_MAGIC,
257            crc: 0, // Will be set after computing CRC
258        };
259
260        let footer_encoded =
261            bincode::serialize(&footer_data).map_err(|e| Error::Serialization(e.to_string()))?;
262        let footer_crc = crc32fast::hash(&footer_encoded);
263
264        // Write footer with correct CRC
265        let final_footer = SSTableFooter {
266            crc: footer_crc,
267            ..footer_data
268        };
269        let final_footer_encoded =
270            bincode::serialize(&final_footer).map_err(|e| Error::Serialization(e.to_string()))?;
271
272        // Write footer length + footer
273        let footer_len = final_footer_encoded.len() as u32;
274        self.writer.write_all(&final_footer_encoded)?;
275        self.writer.write_all(&footer_len.to_le_bytes())?;
276
277        self.writer.flush()?;
278
279        let file_size = self.position + final_footer_encoded.len() as u64 + 4;
280
281        Ok(SSTableMeta {
282            path: self.path,
283            min_key,
284            max_key,
285            entry_count: self.entry_count,
286            file_size,
287            level: 0,
288            sequence: 0,
289        })
290    }
291
292    /// Build an SSTable from a memtable
293    pub fn from_memtable<I>(path: impl AsRef<Path>, iter: I) -> Result<SSTableMeta>
294    where
295        I: Iterator<Item = (Vec<u8>, MemtableEntry)>,
296    {
297        let mut writer = SSTableWriter::new(path)?;
298
299        for (key, entry) in iter {
300            let sstable_entry = match entry {
301                MemtableEntry::Value(v) => SSTableEntry::value(key, v),
302                MemtableEntry::Tombstone => SSTableEntry::tombstone(key),
303            };
304            writer.add(sstable_entry)?;
305        }
306
307        writer.finish()
308    }
309}
310
311/// SSTable reader - reads from existing SSTable files
312pub struct SSTableReader {
313    /// Path to the SSTable file
314    path: PathBuf,
315    /// The file handle
316    file: BufReader<File>,
317    /// Index entries
318    index: Vec<IndexEntry>,
319    /// Footer metadata
320    footer: SSTableFooter,
321    /// File size
322    file_size: u64,
323}
324
325impl SSTableReader {
326    /// Open an SSTable file for reading
327    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
328        let path = path.as_ref().to_path_buf();
329        let mut file = File::open(&path)?;
330
331        // Get file size
332        let file_size = file.metadata()?.len();
333
334        if file_size < 4 {
335            return Err(Error::Corruption("SSTable too small".into()));
336        }
337
338        // Read footer length (last 4 bytes)
339        file.seek(SeekFrom::End(-4))?;
340        let mut footer_len_buf = [0u8; 4];
341        file.read_exact(&mut footer_len_buf)?;
342        let footer_len = u32::from_le_bytes(footer_len_buf) as i64;
343
344        // Read footer
345        file.seek(SeekFrom::End(-4 - footer_len))?;
346        let mut footer_buf = vec![0u8; footer_len as usize];
347        file.read_exact(&mut footer_buf)?;
348
349        let footer: SSTableFooter =
350            bincode::deserialize(&footer_buf).map_err(|e| Error::Serialization(e.to_string()))?;
351
352        // Validate magic number
353        if footer.magic != SSTABLE_MAGIC {
354            return Err(Error::Corruption("Invalid SSTable magic number".into()));
355        }
356
357        // Read index
358        file.seek(SeekFrom::Start(footer.index_offset))?;
359        let mut index_buf = vec![0u8; footer.index_size as usize];
360        file.read_exact(&mut index_buf)?;
361
362        let index: Vec<IndexEntry> =
363            bincode::deserialize(&index_buf).map_err(|e| Error::Serialization(e.to_string()))?;
364
365        Ok(Self {
366            path,
367            file: BufReader::new(file.try_clone()?),
368            index,
369            footer,
370            file_size,
371        })
372    }
373
374    /// Get a value by key
375    pub fn get(&mut self, key: &[u8]) -> Result<Option<SSTableEntry>> {
376        // Binary search to find the block that might contain the key
377        let block_idx = self
378            .index
379            .partition_point(|entry| entry.first_key.as_slice() <= key);
380
381        // The key would be in the previous block (if any)
382        if block_idx == 0 {
383            // Key is smaller than all keys in the SSTable
384            if key < self.footer.min_key.as_slice() {
385                return Ok(None);
386            }
387        }
388
389        // Check the block
390        let block_idx = if block_idx > 0 { block_idx - 1 } else { 0 };
391
392        if block_idx >= self.index.len() {
393            return Ok(None);
394        }
395
396        // Read and search the block
397        let block = self.read_block(block_idx)?;
398
399        for entry in block {
400            if entry.key.as_slice() == key {
401                return Ok(Some(entry));
402            }
403            if entry.key.as_slice() > key {
404                break;
405            }
406        }
407
408        Ok(None)
409    }
410
411    /// Read a data block by index
412    fn read_block(&mut self, block_idx: usize) -> Result<Vec<SSTableEntry>> {
413        let index_entry = &self.index[block_idx];
414
415        self.file.seek(SeekFrom::Start(index_entry.offset))?;
416
417        let data_size = index_entry.size as usize - 4; // Subtract CRC size
418        let mut data_buf = vec![0u8; data_size];
419        self.file.read_exact(&mut data_buf)?;
420
421        // Read and verify CRC
422        let mut crc_buf = [0u8; 4];
423        self.file.read_exact(&mut crc_buf)?;
424        let stored_crc = u32::from_le_bytes(crc_buf);
425        let computed_crc = crc32fast::hash(&data_buf);
426
427        if stored_crc != computed_crc {
428            return Err(Error::Corruption("Block CRC mismatch".into()));
429        }
430
431        // Parse entries from block
432        let mut entries = Vec::new();
433        let mut offset = 0;
434
435        while offset < data_buf.len() {
436            if offset + 4 > data_buf.len() {
437                break;
438            }
439
440            let len = u32::from_le_bytes([
441                data_buf[offset],
442                data_buf[offset + 1],
443                data_buf[offset + 2],
444                data_buf[offset + 3],
445            ]) as usize;
446            offset += 4;
447
448            if offset + len > data_buf.len() {
449                break;
450            }
451
452            let entry: SSTableEntry = bincode::deserialize(&data_buf[offset..offset + len])
453                .map_err(|e| Error::Serialization(e.to_string()))?;
454            entries.push(entry);
455            offset += len;
456        }
457
458        Ok(entries)
459    }
460
461    /// Get metadata about this SSTable
462    pub fn metadata(&self) -> SSTableMeta {
463        SSTableMeta {
464            path: self.path.clone(),
465            min_key: self.footer.min_key.clone(),
466            max_key: self.footer.max_key.clone(),
467            entry_count: self.footer.entry_count,
468            file_size: self.file_size,
469            level: 0,
470            sequence: 0,
471        }
472    }
473
474    /// Check if a key might be in this SSTable (range check)
475    pub fn might_contain(&self, key: &[u8]) -> bool {
476        key >= self.footer.min_key.as_slice() && key <= self.footer.max_key.as_slice()
477    }
478
479    /// Iterate over all entries in the SSTable
480    pub fn iter(&mut self) -> Result<SSTableIterator<'_>> {
481        Ok(SSTableIterator {
482            reader: self,
483            block_idx: 0,
484            block_entries: Vec::new(),
485            entry_idx: 0,
486        })
487    }
488}
489
490/// Iterator over SSTable entries
491pub struct SSTableIterator<'a> {
492    reader: &'a mut SSTableReader,
493    block_idx: usize,
494    block_entries: Vec<SSTableEntry>,
495    entry_idx: usize,
496}
497
498impl SSTableIterator<'_> {
499    /// Get the next entry
500    pub fn next_entry(&mut self) -> Result<Option<SSTableEntry>> {
501        loop {
502            // If we have entries in the current block, return the next one
503            if self.entry_idx < self.block_entries.len() {
504                let entry = self.block_entries[self.entry_idx].clone();
505                self.entry_idx += 1;
506                return Ok(Some(entry));
507            }
508
509            // Load the next block
510            if self.block_idx >= self.reader.index.len() {
511                return Ok(None);
512            }
513
514            self.block_entries = self.reader.read_block(self.block_idx)?;
515            self.block_idx += 1;
516            self.entry_idx = 0;
517        }
518    }
519}
520
521/// Delete an SSTable file
522pub fn delete_sstable(path: impl AsRef<Path>) -> Result<()> {
523    fs::remove_file(path)?;
524    Ok(())
525}
526
527#[cfg(test)]
528mod tests {
529    use super::*;
530    use tempfile::tempdir;
531
532    #[test]
533    fn test_sstable_write_read() {
534        let dir = tempdir().unwrap();
535        let path = dir.path().join("test.sst");
536
537        // Write SSTable
538        let mut writer = SSTableWriter::new(&path).unwrap();
539        writer
540            .add(SSTableEntry::value(b"a".to_vec(), b"1".to_vec()))
541            .unwrap();
542        writer
543            .add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec()))
544            .unwrap();
545        writer
546            .add(SSTableEntry::value(b"c".to_vec(), b"3".to_vec()))
547            .unwrap();
548        let meta = writer.finish().unwrap();
549
550        assert_eq!(meta.entry_count, 3);
551        assert_eq!(meta.min_key, b"a".to_vec());
552        assert_eq!(meta.max_key, b"c".to_vec());
553
554        // Read SSTable
555        let mut reader = SSTableReader::open(&path).unwrap();
556
557        let entry = reader.get(b"a").unwrap().unwrap();
558        assert_eq!(entry.value, b"1".to_vec());
559
560        let entry = reader.get(b"b").unwrap().unwrap();
561        assert_eq!(entry.value, b"2".to_vec());
562
563        let entry = reader.get(b"c").unwrap().unwrap();
564        assert_eq!(entry.value, b"3".to_vec());
565
566        assert!(reader.get(b"d").unwrap().is_none());
567    }
568
569    #[test]
570    fn test_sstable_tombstone() {
571        let dir = tempdir().unwrap();
572        let path = dir.path().join("test.sst");
573
574        // Keys must be added in sorted order
575        let mut writer = SSTableWriter::new(&path).unwrap();
576        writer
577            .add(SSTableEntry::tombstone(b"deleted".to_vec()))
578            .unwrap();
579        writer
580            .add(SSTableEntry::value(b"key".to_vec(), b"value".to_vec()))
581            .unwrap();
582        writer.finish().unwrap();
583
584        let mut reader = SSTableReader::open(&path).unwrap();
585
586        let entry = reader.get(b"key").unwrap().unwrap();
587        assert!(!entry.is_tombstone());
588
589        let entry = reader.get(b"deleted").unwrap().unwrap();
590        assert!(entry.is_tombstone());
591    }
592
593    #[test]
594    fn test_sstable_iterator() {
595        let dir = tempdir().unwrap();
596        let path = dir.path().join("test.sst");
597
598        let mut writer = SSTableWriter::new(&path).unwrap();
599        for i in 0..100 {
600            let key = format!("key{:03}", i);
601            let value = format!("value{}", i);
602            writer
603                .add(SSTableEntry::value(key.into_bytes(), value.into_bytes()))
604                .unwrap();
605        }
606        writer.finish().unwrap();
607
608        let mut reader = SSTableReader::open(&path).unwrap();
609        let mut iter = reader.iter().unwrap();
610
611        let mut count = 0;
612        while let Some(_entry) = iter.next_entry().unwrap() {
613            count += 1;
614        }
615
616        assert_eq!(count, 100);
617    }
618
619    #[test]
620    fn test_sstable_from_memtable() {
621        use crate::memtable::Memtable;
622
623        let dir = tempdir().unwrap();
624        let path = dir.path().join("test.sst");
625
626        let mut mt = Memtable::new();
627        mt.put(b"a".to_vec(), b"1".to_vec());
628        mt.put(b"b".to_vec(), b"2".to_vec());
629        mt.delete(b"c".to_vec());
630
631        let meta = SSTableWriter::from_memtable(&path, mt.drain()).unwrap();
632
633        assert_eq!(meta.entry_count, 3);
634
635        let mut reader = SSTableReader::open(&path).unwrap();
636        assert_eq!(reader.get(b"a").unwrap().unwrap().value, b"1".to_vec());
637        assert!(reader.get(b"c").unwrap().unwrap().is_tombstone());
638    }
639
640    #[test]
641    fn test_sstable_might_contain() {
642        let dir = tempdir().unwrap();
643        let path = dir.path().join("test.sst");
644
645        let mut writer = SSTableWriter::new(&path).unwrap();
646        writer
647            .add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec()))
648            .unwrap();
649        writer
650            .add(SSTableEntry::value(b"d".to_vec(), b"4".to_vec()))
651            .unwrap();
652        writer.finish().unwrap();
653
654        let reader = SSTableReader::open(&path).unwrap();
655
656        assert!(!reader.might_contain(b"a")); // Before range
657        assert!(reader.might_contain(b"b")); // In range
658        assert!(reader.might_contain(b"c")); // In range (might be there)
659        assert!(reader.might_contain(b"d")); // In range
660        assert!(!reader.might_contain(b"e")); // After range
661    }
662}