rustlite_storage/
sstable.rs

1//! SSTable - Sorted String Table format and I/O
2//!
3//! SSTables are immutable on-disk files that store key-value pairs in sorted order.
4//! They are the primary on-disk storage format for LSM-tree databases.
5//!
6//! ## File Format
7//!
8//! ```text
9//! +------------------+
10//! | Data Blocks      |  <- Key-value pairs grouped in blocks
11//! +------------------+
12//! | Index Block      |  <- Sparse index pointing to data blocks
13//! +------------------+
14//! | Footer           |  <- Index offset + magic number + CRC
15//! +------------------+
16//! ```
17
18use crate::memtable::MemtableEntry;
19use rustlite_core::{Error, Result};
20use serde::{Deserialize, Serialize};
21use std::fs::{self, File};
22use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
23use std::path::{Path, PathBuf};
24
25/// Magic number for SSTable files
26const SSTABLE_MAGIC: u64 = 0x535354_424C_4954; // "SSTBLIT" in hex-ish
27
28/// Default block size (4KB)
29const DEFAULT_BLOCK_SIZE: usize = 4096;
30
31/// Entry type tags
32const ENTRY_TYPE_VALUE: u8 = 0;
33const ENTRY_TYPE_TOMBSTONE: u8 = 1;
34
35/// A single entry in an SSTable
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct SSTableEntry {
38    /// The key
39    pub key: Vec<u8>,
40    /// Entry type: 0 = value, 1 = tombstone
41    pub entry_type: u8,
42    /// The value (empty for tombstones)
43    pub value: Vec<u8>,
44}
45
46impl SSTableEntry {
47    /// Create a value entry
48    pub fn value(key: Vec<u8>, value: Vec<u8>) -> Self {
49        Self {
50            key,
51            entry_type: ENTRY_TYPE_VALUE,
52            value,
53        }
54    }
55
56    /// Create a tombstone entry
57    pub fn tombstone(key: Vec<u8>) -> Self {
58        Self {
59            key,
60            entry_type: ENTRY_TYPE_TOMBSTONE,
61            value: Vec::new(),
62        }
63    }
64
65    /// Check if this is a tombstone
66    pub fn is_tombstone(&self) -> bool {
67        self.entry_type == ENTRY_TYPE_TOMBSTONE
68    }
69}
70
71/// Index entry pointing to a data block
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct IndexEntry {
74    /// First key in the block
75    pub first_key: Vec<u8>,
76    /// Offset of the block in the file
77    pub offset: u64,
78    /// Size of the block in bytes
79    pub size: u32,
80}
81
82/// SSTable footer containing metadata
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct SSTableFooter {
85    /// Offset of the index block
86    pub index_offset: u64,
87    /// Size of the index block
88    pub index_size: u32,
89    /// Number of entries in the SSTable
90    pub entry_count: u64,
91    /// Minimum key in the SSTable
92    pub min_key: Vec<u8>,
93    /// Maximum key in the SSTable
94    pub max_key: Vec<u8>,
95    /// Magic number for validation
96    pub magic: u64,
97    /// CRC32 of the footer data
98    pub crc: u32,
99}
100
101/// SSTable metadata (in-memory representation)
102#[derive(Debug, Clone)]
103pub struct SSTableMeta {
104    /// Path to the SSTable file
105    pub path: PathBuf,
106    /// Minimum key
107    pub min_key: Vec<u8>,
108    /// Maximum key
109    pub max_key: Vec<u8>,
110    /// Number of entries
111    pub entry_count: u64,
112    /// File size in bytes
113    pub file_size: u64,
114    /// Level in the LSM tree (0 = newest)
115    pub level: u32,
116    /// Sequence number when created
117    pub sequence: u64,
118}
119
120/// SSTable writer - creates new SSTable files
121pub struct SSTableWriter {
122    /// Output file path
123    path: PathBuf,
124    /// Buffered writer
125    writer: BufWriter<File>,
126    /// Current position in file
127    position: u64,
128    /// Index entries
129    index: Vec<IndexEntry>,
130    /// Current block buffer
131    block_buffer: Vec<u8>,
132    /// Block size threshold
133    block_size: usize,
134    /// First key of current block
135    current_block_first_key: Option<Vec<u8>>,
136    /// Entry count
137    entry_count: u64,
138    /// Minimum key
139    min_key: Option<Vec<u8>>,
140    /// Maximum key
141    max_key: Option<Vec<u8>>,
142}
143
144impl SSTableWriter {
145    /// Create a new SSTable writer
146    pub fn new(path: impl AsRef<Path>) -> Result<Self> {
147        Self::with_block_size(path, DEFAULT_BLOCK_SIZE)
148    }
149
150    /// Create a new SSTable writer with custom block size
151    pub fn with_block_size(path: impl AsRef<Path>, block_size: usize) -> Result<Self> {
152        let path = path.as_ref().to_path_buf();
153        let file = File::create(&path)?;
154
155        Ok(Self {
156            path,
157            writer: BufWriter::new(file),
158            position: 0,
159            index: Vec::new(),
160            block_buffer: Vec::with_capacity(block_size),
161            block_size,
162            current_block_first_key: None,
163            entry_count: 0,
164            min_key: None,
165            max_key: None,
166        })
167    }
168
169    /// Add an entry to the SSTable
170    pub fn add(&mut self, entry: SSTableEntry) -> Result<()> {
171        // Track min/max keys
172        if self.min_key.is_none() {
173            self.min_key = Some(entry.key.clone());
174        }
175        self.max_key = Some(entry.key.clone());
176        
177        // Track first key of block
178        if self.current_block_first_key.is_none() {
179            self.current_block_first_key = Some(entry.key.clone());
180        }
181        
182        // Serialize entry
183        let encoded = bincode::serialize(&entry)
184            .map_err(|e| Error::Serialization(e.to_string()))?;
185        
186        // Write length prefix + entry
187        let len = encoded.len() as u32;
188        self.block_buffer.extend_from_slice(&len.to_le_bytes());
189        self.block_buffer.extend_from_slice(&encoded);
190        
191        self.entry_count += 1;
192        
193        // Flush block if it exceeds threshold
194        if self.block_buffer.len() >= self.block_size {
195            self.flush_block()?;
196        }
197        
198        Ok(())
199    }
200
201    /// Flush the current block to disk
202    fn flush_block(&mut self) -> Result<()> {
203        if self.block_buffer.is_empty() {
204            return Ok(());
205        }
206        
207        // Calculate CRC
208        let crc = crc32fast::hash(&self.block_buffer);
209        
210        // Create index entry
211        if let Some(first_key) = self.current_block_first_key.take() {
212            self.index.push(IndexEntry {
213                first_key,
214                offset: self.position,
215                size: self.block_buffer.len() as u32 + 4, // +4 for CRC
216            });
217        }
218
219        // Write block data
220        self.writer.write_all(&self.block_buffer)?;
221        self.position += self.block_buffer.len() as u64;
222
223        // Write block CRC
224        self.writer.write_all(&crc.to_le_bytes())?;
225        self.position += 4;
226
227        self.block_buffer.clear();
228
229        Ok(())
230    }
231
232    /// Finish writing and close the SSTable
233    pub fn finish(mut self) -> Result<SSTableMeta> {
234        // Flush any remaining data
235        self.flush_block()?;
236        
237        // Write index block
238        let index_offset = self.position;
239        let index_encoded = bincode::serialize(&self.index)
240            .map_err(|e| Error::Serialization(e.to_string()))?;
241        let index_size = index_encoded.len() as u32;
242
243        self.writer.write_all(&index_encoded)?;
244        self.position += index_size as u64;
245
246        // Write footer
247        let min_key = self.min_key.clone().unwrap_or_default();
248        let max_key = self.max_key.clone().unwrap_or_default();
249        
250        let footer_data = SSTableFooter {
251            index_offset,
252            index_size,
253            entry_count: self.entry_count,
254            min_key: min_key.clone(),
255            max_key: max_key.clone(),
256            magic: SSTABLE_MAGIC,
257            crc: 0, // Will be set after computing CRC
258        };
259        
260        let footer_encoded = bincode::serialize(&footer_data)
261            .map_err(|e| Error::Serialization(e.to_string()))?;
262        let footer_crc = crc32fast::hash(&footer_encoded);
263        
264        // Write footer with correct CRC
265        let final_footer = SSTableFooter {
266            crc: footer_crc,
267            ..footer_data
268        };
269        let final_footer_encoded = bincode::serialize(&final_footer)
270            .map_err(|e| Error::Serialization(e.to_string()))?;
271
272        // Write footer length + footer
273        let footer_len = final_footer_encoded.len() as u32;
274        self.writer.write_all(&final_footer_encoded)?;
275        self.writer.write_all(&footer_len.to_le_bytes())?;
276
277        self.writer.flush()?;
278
279        let file_size = self.position + final_footer_encoded.len() as u64 + 4;
280        
281        Ok(SSTableMeta {
282            path: self.path,
283            min_key,
284            max_key,
285            entry_count: self.entry_count,
286            file_size,
287            level: 0,
288            sequence: 0,
289        })
290    }
291
292    /// Build an SSTable from a memtable
293    pub fn from_memtable<I>(path: impl AsRef<Path>, iter: I) -> Result<SSTableMeta>
294    where
295        I: Iterator<Item = (Vec<u8>, MemtableEntry)>,
296    {
297        let mut writer = SSTableWriter::new(path)?;
298        
299        for (key, entry) in iter {
300            let sstable_entry = match entry {
301                MemtableEntry::Value(v) => SSTableEntry::value(key, v),
302                MemtableEntry::Tombstone => SSTableEntry::tombstone(key),
303            };
304            writer.add(sstable_entry)?;
305        }
306        
307        writer.finish()
308    }
309}
310
311/// SSTable reader - reads from existing SSTable files
312pub struct SSTableReader {
313    /// Path to the SSTable file
314    path: PathBuf,
315    /// The file handle
316    file: BufReader<File>,
317    /// Index entries
318    index: Vec<IndexEntry>,
319    /// Footer metadata
320    footer: SSTableFooter,
321    /// File size
322    file_size: u64,
323}
324
325impl SSTableReader {
326    /// Open an SSTable file for reading
327    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
328        let path = path.as_ref().to_path_buf();
329        let mut file = File::open(&path)?;
330
331        // Get file size
332        let file_size = file.metadata()?.len();
333
334        if file_size < 4 {
335            return Err(Error::Corruption("SSTable too small".into()));
336        }
337
338        // Read footer length (last 4 bytes)
339        file.seek(SeekFrom::End(-4))?;
340        let mut footer_len_buf = [0u8; 4];
341        file.read_exact(&mut footer_len_buf)?;
342        let footer_len = u32::from_le_bytes(footer_len_buf) as i64;
343
344        // Read footer
345        file.seek(SeekFrom::End(-4 - footer_len))?;
346        let mut footer_buf = vec![0u8; footer_len as usize];
347        file.read_exact(&mut footer_buf)?;
348
349        let footer: SSTableFooter = bincode::deserialize(&footer_buf)
350            .map_err(|e| Error::Serialization(e.to_string()))?;
351
352        // Validate magic number
353        if footer.magic != SSTABLE_MAGIC {
354            return Err(Error::Corruption("Invalid SSTable magic number".into()));
355        }
356
357        // Read index
358        file.seek(SeekFrom::Start(footer.index_offset))?;
359        let mut index_buf = vec![0u8; footer.index_size as usize];
360        file.read_exact(&mut index_buf)?;
361
362        let index: Vec<IndexEntry> = bincode::deserialize(&index_buf)
363            .map_err(|e| Error::Serialization(e.to_string()))?;
364
365        Ok(Self {
366            path,
367            file: BufReader::new(file.try_clone()?),
368            index,
369            footer,
370            file_size,
371        })
372    }
373
374    /// Get a value by key
375    pub fn get(&mut self, key: &[u8]) -> Result<Option<SSTableEntry>> {
376        // Binary search to find the block that might contain the key
377        let block_idx = self.index.partition_point(|entry| entry.first_key.as_slice() <= key);
378        
379        // The key would be in the previous block (if any)
380        if block_idx == 0 {
381            // Key is smaller than all keys in the SSTable
382            if key < self.footer.min_key.as_slice() {
383                return Ok(None);
384            }
385        }
386        
387        // Check the block
388        let block_idx = if block_idx > 0 { block_idx - 1 } else { 0 };
389        
390        if block_idx >= self.index.len() {
391            return Ok(None);
392        }
393        
394        // Read and search the block
395        let block = self.read_block(block_idx)?;
396        
397        for entry in block {
398            if entry.key.as_slice() == key {
399                return Ok(Some(entry));
400            }
401            if entry.key.as_slice() > key {
402                break;
403            }
404        }
405        
406        Ok(None)
407    }
408
409    /// Read a data block by index
410    fn read_block(&mut self, block_idx: usize) -> Result<Vec<SSTableEntry>> {
411        let index_entry = &self.index[block_idx];
412
413        self.file.seek(SeekFrom::Start(index_entry.offset))?;
414
415        let data_size = index_entry.size as usize - 4; // Subtract CRC size
416        let mut data_buf = vec![0u8; data_size];
417        self.file.read_exact(&mut data_buf)?;
418
419        // Read and verify CRC
420        let mut crc_buf = [0u8; 4];
421        self.file.read_exact(&mut crc_buf)?;
422        let stored_crc = u32::from_le_bytes(crc_buf);
423        let computed_crc = crc32fast::hash(&data_buf);
424
425        if stored_crc != computed_crc {
426            return Err(Error::Corruption("Block CRC mismatch".into()));
427        }
428        
429        // Parse entries from block
430        let mut entries = Vec::new();
431        let mut offset = 0;
432        
433        while offset < data_buf.len() {
434            if offset + 4 > data_buf.len() {
435                break;
436            }
437            
438            let len = u32::from_le_bytes([
439                data_buf[offset],
440                data_buf[offset + 1],
441                data_buf[offset + 2],
442                data_buf[offset + 3],
443            ]) as usize;
444            offset += 4;
445            
446            if offset + len > data_buf.len() {
447                break;
448            }
449            
450            let entry: SSTableEntry = bincode::deserialize(&data_buf[offset..offset + len])
451                .map_err(|e| Error::Serialization(e.to_string()))?;
452            entries.push(entry);
453            offset += len;
454        }
455        
456        Ok(entries)
457    }
458
459    /// Get metadata about this SSTable
460    pub fn metadata(&self) -> SSTableMeta {
461        SSTableMeta {
462            path: self.path.clone(),
463            min_key: self.footer.min_key.clone(),
464            max_key: self.footer.max_key.clone(),
465            entry_count: self.footer.entry_count,
466            file_size: self.file_size,
467            level: 0,
468            sequence: 0,
469        }
470    }
471
472    /// Check if a key might be in this SSTable (range check)
473    pub fn might_contain(&self, key: &[u8]) -> bool {
474        key >= self.footer.min_key.as_slice() && key <= self.footer.max_key.as_slice()
475    }
476
477    /// Iterate over all entries in the SSTable
478    pub fn iter(&mut self) -> Result<SSTableIterator<'_>> {
479        Ok(SSTableIterator {
480            reader: self,
481            block_idx: 0,
482            block_entries: Vec::new(),
483            entry_idx: 0,
484        })
485    }
486}
487
488/// Iterator over SSTable entries
489pub struct SSTableIterator<'a> {
490    reader: &'a mut SSTableReader,
491    block_idx: usize,
492    block_entries: Vec<SSTableEntry>,
493    entry_idx: usize,
494}
495
496impl<'a> SSTableIterator<'a> {
497    /// Get the next entry
498    pub fn next_entry(&mut self) -> Result<Option<SSTableEntry>> {
499        loop {
500            // If we have entries in the current block, return the next one
501            if self.entry_idx < self.block_entries.len() {
502                let entry = self.block_entries[self.entry_idx].clone();
503                self.entry_idx += 1;
504                return Ok(Some(entry));
505            }
506            
507            // Load the next block
508            if self.block_idx >= self.reader.index.len() {
509                return Ok(None);
510            }
511            
512            self.block_entries = self.reader.read_block(self.block_idx)?;
513            self.block_idx += 1;
514            self.entry_idx = 0;
515        }
516    }
517}
518
519/// Delete an SSTable file
520pub fn delete_sstable(path: impl AsRef<Path>) -> Result<()> {
521    fs::remove_file(path)?;
522    Ok(())
523}
524
525#[cfg(test)]
526mod tests {
527    use super::*;
528    use tempfile::tempdir;
529
530    #[test]
531    fn test_sstable_write_read() {
532        let dir = tempdir().unwrap();
533        let path = dir.path().join("test.sst");
534        
535        // Write SSTable
536        let mut writer = SSTableWriter::new(&path).unwrap();
537        writer.add(SSTableEntry::value(b"a".to_vec(), b"1".to_vec())).unwrap();
538        writer.add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec())).unwrap();
539        writer.add(SSTableEntry::value(b"c".to_vec(), b"3".to_vec())).unwrap();
540        let meta = writer.finish().unwrap();
541        
542        assert_eq!(meta.entry_count, 3);
543        assert_eq!(meta.min_key, b"a".to_vec());
544        assert_eq!(meta.max_key, b"c".to_vec());
545        
546        // Read SSTable
547        let mut reader = SSTableReader::open(&path).unwrap();
548        
549        let entry = reader.get(b"a").unwrap().unwrap();
550        assert_eq!(entry.value, b"1".to_vec());
551        
552        let entry = reader.get(b"b").unwrap().unwrap();
553        assert_eq!(entry.value, b"2".to_vec());
554        
555        let entry = reader.get(b"c").unwrap().unwrap();
556        assert_eq!(entry.value, b"3".to_vec());
557        
558        assert!(reader.get(b"d").unwrap().is_none());
559    }
560
561    #[test]
562    fn test_sstable_tombstone() {
563        let dir = tempdir().unwrap();
564        let path = dir.path().join("test.sst");
565
566        // Keys must be added in sorted order
567        let mut writer = SSTableWriter::new(&path).unwrap();
568        writer
569            .add(SSTableEntry::tombstone(b"deleted".to_vec()))
570            .unwrap();
571        writer
572            .add(SSTableEntry::value(b"key".to_vec(), b"value".to_vec()))
573            .unwrap();
574        writer.finish().unwrap();
575
576        let mut reader = SSTableReader::open(&path).unwrap();
577
578        let entry = reader.get(b"key").unwrap().unwrap();
579        assert!(!entry.is_tombstone());
580
581        let entry = reader.get(b"deleted").unwrap().unwrap();
582        assert!(entry.is_tombstone());
583    }
584
585    #[test]
586    fn test_sstable_iterator() {
587        let dir = tempdir().unwrap();
588        let path = dir.path().join("test.sst");
589        
590        let mut writer = SSTableWriter::new(&path).unwrap();
591        for i in 0..100 {
592            let key = format!("key{:03}", i);
593            let value = format!("value{}", i);
594            writer.add(SSTableEntry::value(key.into_bytes(), value.into_bytes())).unwrap();
595        }
596        writer.finish().unwrap();
597        
598        let mut reader = SSTableReader::open(&path).unwrap();
599        let mut iter = reader.iter().unwrap();
600        
601        let mut count = 0;
602        while let Some(_entry) = iter.next_entry().unwrap() {
603            count += 1;
604        }
605        
606        assert_eq!(count, 100);
607    }
608
609    #[test]
610    fn test_sstable_from_memtable() {
611        use crate::memtable::Memtable;
612        
613        let dir = tempdir().unwrap();
614        let path = dir.path().join("test.sst");
615        
616        let mut mt = Memtable::new();
617        mt.put(b"a".to_vec(), b"1".to_vec());
618        mt.put(b"b".to_vec(), b"2".to_vec());
619        mt.delete(b"c".to_vec());
620        
621        let meta = SSTableWriter::from_memtable(&path, mt.into_iter()).unwrap();
622        
623        assert_eq!(meta.entry_count, 3);
624        
625        let mut reader = SSTableReader::open(&path).unwrap();
626        assert_eq!(reader.get(b"a").unwrap().unwrap().value, b"1".to_vec());
627        assert!(reader.get(b"c").unwrap().unwrap().is_tombstone());
628    }
629
630    #[test]
631    fn test_sstable_might_contain() {
632        let dir = tempdir().unwrap();
633        let path = dir.path().join("test.sst");
634        
635        let mut writer = SSTableWriter::new(&path).unwrap();
636        writer.add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec())).unwrap();
637        writer.add(SSTableEntry::value(b"d".to_vec(), b"4".to_vec())).unwrap();
638        writer.finish().unwrap();
639        
640        let reader = SSTableReader::open(&path).unwrap();
641        
642        assert!(!reader.might_contain(b"a")); // Before range
643        assert!(reader.might_contain(b"b"));  // In range
644        assert!(reader.might_contain(b"c"));  // In range (might be there)
645        assert!(reader.might_contain(b"d"));  // In range
646        assert!(!reader.might_contain(b"e")); // After range
647    }
648}