rustlite_storage/
sstable.rs

1//! SSTable - Sorted String Table format and I/O
2//!
3//! SSTables are immutable on-disk files that store key-value pairs in sorted order.
4//! They are the primary on-disk storage format for LSM-tree databases.
5//!
6//! ## File Format
7//!
8//! ```text
9//! +------------------+
10//! | Data Blocks      |  <- Key-value pairs grouped in blocks
11//! +------------------+
12//! | Index Block      |  <- Sparse index pointing to data blocks
13//! +------------------+
14//! | Footer           |  <- Index offset + magic number + CRC
15//! +------------------+
16//! ```
17
18use crate::memtable::MemtableEntry;
19use rustlite_core::{Error, Result};
20use serde::{Deserialize, Serialize};
21use std::fs::{self, File};
22use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
23use std::path::{Path, PathBuf};
24
25/// Magic number for SSTable files ("RSSL" = RustLite SSTable)
26const SSTABLE_MAGIC_HEADER: [u8; 4] = *b"RSSL";
27
28/// Footer magic for backward compatibility
29const SSTABLE_MAGIC: u64 = 0x53_53_54_42_4C_49_54;
30
31/// SSTable format version (v1.0.0+)
32/// Increment this when making incompatible format changes
33const SSTABLE_FORMAT_VERSION: u16 = 1;
34
35/// Default block size (4KB)
36const DEFAULT_BLOCK_SIZE: usize = 4096;
37
38/// Entry type tags
39const ENTRY_TYPE_VALUE: u8 = 0;
40const ENTRY_TYPE_TOMBSTONE: u8 = 1;
41
42/// A single entry in an SSTable
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct SSTableEntry {
45    /// The key
46    pub key: Vec<u8>,
47    /// Entry type: 0 = value, 1 = tombstone
48    pub entry_type: u8,
49    /// The value (empty for tombstones)
50    pub value: Vec<u8>,
51}
52
53impl SSTableEntry {
54    /// Create a value entry
55    pub fn value(key: Vec<u8>, value: Vec<u8>) -> Self {
56        Self {
57            key,
58            entry_type: ENTRY_TYPE_VALUE,
59            value,
60        }
61    }
62
63    /// Create a tombstone entry
64    pub fn tombstone(key: Vec<u8>) -> Self {
65        Self {
66            key,
67            entry_type: ENTRY_TYPE_TOMBSTONE,
68            value: Vec::new(),
69        }
70    }
71
72    /// Check if this is a tombstone
73    pub fn is_tombstone(&self) -> bool {
74        self.entry_type == ENTRY_TYPE_TOMBSTONE
75    }
76}
77
78/// Index entry pointing to a data block
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct IndexEntry {
81    /// First key in the block
82    pub first_key: Vec<u8>,
83    /// Offset of the block in the file
84    pub offset: u64,
85    /// Size of the block in bytes
86    pub size: u32,
87}
88
89/// SSTable footer containing metadata
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct SSTableFooter {
92    /// Format version (v1.0.0+)
93    pub format_version: u16,
94    /// Offset of the index block
95    pub index_offset: u64,
96    /// Size of the index block
97    pub index_size: u32,
98    /// Number of entries in the SSTable
99    pub entry_count: u64,
100    /// Minimum key in the SSTable
101    pub min_key: Vec<u8>,
102    /// Maximum key in the SSTable
103    pub max_key: Vec<u8>,
104    /// Magic number for validation (kept for backward compat with footer)
105    pub magic: u64,
106    /// CRC32 of the footer data
107    pub crc: u32,
108}
109
110/// File header written at the start of SSTable files (v1.0+)
111#[derive(Debug, Clone)]
112pub struct SSTableHeader {
113    /// Magic bytes: "RSSL"
114    pub magic: [u8; 4],
115    /// Format version
116    pub version: u16,
117}
118
119impl SSTableHeader {
120    /// Size of header in bytes
121    pub const SIZE: usize = 6; // 4 bytes magic + 2 bytes version
122
123    /// Create a new header with current version
124    pub fn new() -> Self {
125        Self {
126            magic: SSTABLE_MAGIC_HEADER,
127            version: SSTABLE_FORMAT_VERSION,
128        }
129    }
130
131    /// Write header to a writer
132    pub fn write_to<W: Write>(&self, writer: &mut W) -> Result<()> {
133        writer.write_all(&self.magic)?;
134        writer.write_all(&self.version.to_le_bytes())?;
135        Ok(())
136    }
137
138    /// Read header from a reader
139    pub fn read_from<R: Read>(reader: &mut R) -> Result<Self> {
140        let mut magic = [0u8; 4];
141        reader.read_exact(&mut magic)?;
142
143        if magic != SSTABLE_MAGIC_HEADER {
144            return Err(Error::Corruption(format!(
145                "Invalid SSTable magic: expected {:?}, got {:?}",
146                SSTABLE_MAGIC_HEADER, magic
147            )));
148        }
149
150        let mut version_bytes = [0u8; 2];
151        reader.read_exact(&mut version_bytes)?;
152        let version = u16::from_le_bytes(version_bytes);
153
154        if version > SSTABLE_FORMAT_VERSION {
155            return Err(Error::Corruption(format!(
156                "Unsupported SSTable version: {} (current: {})",
157                version, SSTABLE_FORMAT_VERSION
158            )));
159        }
160
161        Ok(Self { magic, version })
162    }
163}
164
165/// SSTable metadata (in-memory representation)
166#[derive(Debug, Clone)]
167pub struct SSTableMeta {
168    /// Path to the SSTable file
169    pub path: PathBuf,
170    /// Minimum key
171    pub min_key: Vec<u8>,
172    /// Maximum key
173    pub max_key: Vec<u8>,
174    /// Number of entries
175    pub entry_count: u64,
176    /// File size in bytes
177    pub file_size: u64,
178    /// Level in the LSM tree (0 = newest)
179    pub level: u32,
180    /// Sequence number when created
181    pub sequence: u64,
182}
183
184/// SSTable writer - creates new SSTable files
185pub struct SSTableWriter {
186    /// Output file path
187    path: PathBuf,
188    /// Buffered writer
189    writer: BufWriter<File>,
190    /// Current position in file
191    position: u64,
192    /// Index entries
193    index: Vec<IndexEntry>,
194    /// Current block buffer
195    block_buffer: Vec<u8>,
196    /// Block size threshold
197    block_size: usize,
198    /// First key of current block
199    current_block_first_key: Option<Vec<u8>>,
200    /// Entry count
201    entry_count: u64,
202    /// Minimum key
203    min_key: Option<Vec<u8>>,
204    /// Maximum key
205    max_key: Option<Vec<u8>>,
206}
207
208impl SSTableWriter {
209    /// Create a new SSTable writer
210    pub fn new(path: impl AsRef<Path>) -> Result<Self> {
211        Self::with_block_size(path, DEFAULT_BLOCK_SIZE)
212    }
213
214    /// Create a new SSTable writer with custom block size
215    pub fn with_block_size(path: impl AsRef<Path>, block_size: usize) -> Result<Self> {
216        let path = path.as_ref().to_path_buf();
217        let file = File::create(&path)?;
218        let mut writer = BufWriter::new(file);
219
220        // Write file header (v1.0+)
221        let header = SSTableHeader::new();
222        header.write_to(&mut writer)?;
223        let header_size = SSTableHeader::SIZE as u64;
224
225        Ok(Self {
226            path,
227            writer,
228            position: header_size,
229            index: Vec::new(),
230            block_buffer: Vec::with_capacity(block_size),
231            block_size,
232            current_block_first_key: None,
233            entry_count: 0,
234            min_key: None,
235            max_key: None,
236        })
237    }
238
239    /// Add an entry to the SSTable
240    pub fn add(&mut self, entry: SSTableEntry) -> Result<()> {
241        // Track min/max keys
242        if self.min_key.is_none() {
243            self.min_key = Some(entry.key.clone());
244        }
245        self.max_key = Some(entry.key.clone());
246
247        // Track first key of block
248        if self.current_block_first_key.is_none() {
249            self.current_block_first_key = Some(entry.key.clone());
250        }
251
252        // Serialize entry
253        let encoded =
254            bincode::serialize(&entry).map_err(|e| Error::Serialization(e.to_string()))?;
255
256        // Write length prefix + entry
257        let len = encoded.len() as u32;
258        self.block_buffer.extend_from_slice(&len.to_le_bytes());
259        self.block_buffer.extend_from_slice(&encoded);
260
261        self.entry_count += 1;
262
263        // Flush block if it exceeds threshold
264        if self.block_buffer.len() >= self.block_size {
265            self.flush_block()?;
266        }
267
268        Ok(())
269    }
270
271    /// Flush the current block to disk
272    fn flush_block(&mut self) -> Result<()> {
273        if self.block_buffer.is_empty() {
274            return Ok(());
275        }
276
277        // Calculate CRC
278        let crc = crc32fast::hash(&self.block_buffer);
279
280        // Create index entry
281        if let Some(first_key) = self.current_block_first_key.take() {
282            self.index.push(IndexEntry {
283                first_key,
284                offset: self.position,
285                size: self.block_buffer.len() as u32 + 4, // +4 for CRC
286            });
287        }
288
289        // Write block data
290        self.writer.write_all(&self.block_buffer)?;
291        self.position += self.block_buffer.len() as u64;
292
293        // Write block CRC
294        self.writer.write_all(&crc.to_le_bytes())?;
295        self.position += 4;
296
297        self.block_buffer.clear();
298
299        Ok(())
300    }
301
302    /// Finish writing and close the SSTable
303    pub fn finish(mut self) -> Result<SSTableMeta> {
304        // Flush any remaining data
305        self.flush_block()?;
306
307        // Write index block
308        let index_offset = self.position;
309        let index_encoded =
310            bincode::serialize(&self.index).map_err(|e| Error::Serialization(e.to_string()))?;
311        let index_size = index_encoded.len() as u32;
312
313        self.writer.write_all(&index_encoded)?;
314        self.position += index_size as u64;
315
316        // Write footer
317        let min_key = self.min_key.clone().unwrap_or_default();
318        let max_key = self.max_key.clone().unwrap_or_default();
319
320        let footer_data = SSTableFooter {
321            format_version: SSTABLE_FORMAT_VERSION,
322            index_offset,
323            index_size,
324            entry_count: self.entry_count,
325            min_key: min_key.clone(),
326            max_key: max_key.clone(),
327            magic: SSTABLE_MAGIC,
328            crc: 0, // Will be set after computing CRC
329        };
330
331        let footer_encoded =
332            bincode::serialize(&footer_data).map_err(|e| Error::Serialization(e.to_string()))?;
333        let footer_crc = crc32fast::hash(&footer_encoded);
334
335        // Write footer with correct CRC
336        let final_footer = SSTableFooter {
337            crc: footer_crc,
338            ..footer_data
339        };
340        let final_footer_encoded =
341            bincode::serialize(&final_footer).map_err(|e| Error::Serialization(e.to_string()))?;
342
343        // Write footer length + footer
344        let footer_len = final_footer_encoded.len() as u32;
345        self.writer.write_all(&final_footer_encoded)?;
346        self.writer.write_all(&footer_len.to_le_bytes())?;
347
348        self.writer.flush()?;
349
350        let file_size = self.position + final_footer_encoded.len() as u64 + 4;
351
352        Ok(SSTableMeta {
353            path: self.path,
354            min_key,
355            max_key,
356            entry_count: self.entry_count,
357            file_size,
358            level: 0,
359            sequence: 0,
360        })
361    }
362
363    /// Build an SSTable from a memtable
364    pub fn from_memtable<I>(path: impl AsRef<Path>, iter: I) -> Result<SSTableMeta>
365    where
366        I: Iterator<Item = (Vec<u8>, MemtableEntry)>,
367    {
368        let mut writer = SSTableWriter::new(path)?;
369
370        for (key, entry) in iter {
371            let sstable_entry = match entry {
372                MemtableEntry::Value(v) => SSTableEntry::value(key, v),
373                MemtableEntry::Tombstone => SSTableEntry::tombstone(key),
374            };
375            writer.add(sstable_entry)?;
376        }
377
378        writer.finish()
379    }
380}
381
382/// SSTable reader - reads from existing SSTable files
383pub struct SSTableReader {
384    /// Path to the SSTable file
385    path: PathBuf,
386    /// The file handle
387    file: BufReader<File>,
388    /// Index entries
389    index: Vec<IndexEntry>,
390    /// Footer metadata
391    footer: SSTableFooter,
392    /// File size
393    file_size: u64,
394    /// Header offset (0 for legacy files, SSTableHeader::SIZE for v1.0+)
395    header_offset: u64,
396}
397
398impl SSTableReader {
399    /// Open an SSTable file for reading
400    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
401        let path = path.as_ref().to_path_buf();
402        let mut file = File::open(&path)?;
403
404        // Get file size
405        let file_size = file.metadata()?.len();
406
407        if file_size < 4 {
408            return Err(Error::Corruption("SSTable too small".into()));
409        }
410
411        // Try to read header (v1.0+)
412        // If header is missing or invalid, assume legacy format (v0.x)
413        let header_offset = if file_size >= SSTableHeader::SIZE as u64 {
414            file.seek(SeekFrom::Start(0))?;
415            match SSTableHeader::read_from(&mut file) {
416                Ok(header) => {
417                    // Valid header found, data starts after header
418                    tracing::debug!("Opened SSTable with format version {}", header.version);
419                    SSTableHeader::SIZE as u64
420                }
421                Err(_) => {
422                    // No valid header, assume legacy format
423                    tracing::debug!("Opened legacy SSTable (pre-v1.0)");
424                    0
425                }
426            }
427        } else {
428            // File too small for header, must be legacy
429            0
430        };
431
432        // Read footer length (last 4 bytes)
433        file.seek(SeekFrom::End(-4))?;
434        let mut footer_len_buf = [0u8; 4];
435        file.read_exact(&mut footer_len_buf)?;
436        let footer_len = u32::from_le_bytes(footer_len_buf) as i64;
437
438        // Read footer
439        file.seek(SeekFrom::End(-4 - footer_len))?;
440        let mut footer_buf = vec![0u8; footer_len as usize];
441        file.read_exact(&mut footer_buf)?;
442
443        let footer: SSTableFooter =
444            bincode::deserialize(&footer_buf).map_err(|e| Error::Serialization(e.to_string()))?;
445
446        // Validate magic number
447        if footer.magic != SSTABLE_MAGIC {
448            return Err(Error::Corruption("Invalid SSTable magic number".into()));
449        }
450
451        // Validate format version (v1.0.0+)
452        if footer.format_version > SSTABLE_FORMAT_VERSION {
453            return Err(Error::Corruption(format!(
454                "Unsupported SSTable format version: {} (current: {})",
455                footer.format_version, SSTABLE_FORMAT_VERSION
456            )));
457        }
458
459        // Read index (index_offset is already absolute from file start for v1.0+, or from data start for legacy)
460        let index_offset = if header_offset > 0 {
461            // New format: footer.index_offset is absolute including header
462            footer.index_offset
463        } else {
464            // Legacy format: footer.index_offset is relative to start of data (which is position 0)
465            footer.index_offset
466        };
467        file.seek(SeekFrom::Start(index_offset))?;
468        let mut index_buf = vec![0u8; footer.index_size as usize];
469        file.read_exact(&mut index_buf)?;
470
471        let index: Vec<IndexEntry> =
472            bincode::deserialize(&index_buf).map_err(|e| Error::Serialization(e.to_string()))?;
473
474        Ok(Self {
475            path,
476            file: BufReader::new(file.try_clone()?),
477            index,
478            footer,
479            file_size,
480            header_offset,
481        })
482    }
483
484    /// Get a value by key
485    pub fn get(&mut self, key: &[u8]) -> Result<Option<SSTableEntry>> {
486        // Binary search to find the block that might contain the key
487        let block_idx = self
488            .index
489            .partition_point(|entry| entry.first_key.as_slice() <= key);
490
491        // The key would be in the previous block (if any)
492        if block_idx == 0 {
493            // Key is smaller than all keys in the SSTable
494            if key < self.footer.min_key.as_slice() {
495                return Ok(None);
496            }
497        }
498
499        // Check the block
500        let block_idx = if block_idx > 0 { block_idx - 1 } else { 0 };
501
502        if block_idx >= self.index.len() {
503            return Ok(None);
504        }
505
506        // Read and search the block
507        let block = self.read_block(block_idx)?;
508
509        for entry in block {
510            if entry.key.as_slice() == key {
511                return Ok(Some(entry));
512            }
513            if entry.key.as_slice() > key {
514                break;
515            }
516        }
517
518        Ok(None)
519    }
520
521    /// Read a data block by index
522    fn read_block(&mut self, block_idx: usize) -> Result<Vec<SSTableEntry>> {
523        let index_entry = &self.index[block_idx];
524
525        // Block offsets are already absolute for v1.0+ files (include header)
526        // For legacy files, they start at position 0 (no header)
527        let absolute_offset = if self.header_offset > 0 {
528            index_entry.offset // Already absolute
529        } else {
530            index_entry.offset // Relative to start (no header)
531        };
532        self.file.seek(SeekFrom::Start(absolute_offset))?;
533
534        let data_size = index_entry.size as usize - 4; // Subtract CRC size
535        let mut data_buf = vec![0u8; data_size];
536        self.file.read_exact(&mut data_buf)?;
537
538        // Read and verify CRC
539        let mut crc_buf = [0u8; 4];
540        self.file.read_exact(&mut crc_buf)?;
541        let stored_crc = u32::from_le_bytes(crc_buf);
542        let computed_crc = crc32fast::hash(&data_buf);
543
544        if stored_crc != computed_crc {
545            return Err(Error::Corruption("Block CRC mismatch".into()));
546        }
547
548        // Parse entries from block
549        let mut entries = Vec::new();
550        let mut offset = 0;
551
552        while offset < data_buf.len() {
553            if offset + 4 > data_buf.len() {
554                break;
555            }
556
557            let len = u32::from_le_bytes([
558                data_buf[offset],
559                data_buf[offset + 1],
560                data_buf[offset + 2],
561                data_buf[offset + 3],
562            ]) as usize;
563            offset += 4;
564
565            if offset + len > data_buf.len() {
566                break;
567            }
568
569            let entry: SSTableEntry = bincode::deserialize(&data_buf[offset..offset + len])
570                .map_err(|e| Error::Serialization(e.to_string()))?;
571            entries.push(entry);
572            offset += len;
573        }
574
575        Ok(entries)
576    }
577
578    /// Get metadata about this SSTable
579    pub fn metadata(&self) -> SSTableMeta {
580        SSTableMeta {
581            path: self.path.clone(),
582            min_key: self.footer.min_key.clone(),
583            max_key: self.footer.max_key.clone(),
584            entry_count: self.footer.entry_count,
585            file_size: self.file_size,
586            level: 0,
587            sequence: 0,
588        }
589    }
590
591    /// Check if a key might be in this SSTable (range check)
592    pub fn might_contain(&self, key: &[u8]) -> bool {
593        key >= self.footer.min_key.as_slice() && key <= self.footer.max_key.as_slice()
594    }
595
596    /// Iterate over all entries in the SSTable
597    pub fn iter(&mut self) -> Result<SSTableIterator<'_>> {
598        Ok(SSTableIterator {
599            reader: self,
600            block_idx: 0,
601            block_entries: Vec::new(),
602            entry_idx: 0,
603        })
604    }
605}
606
607/// Iterator over SSTable entries
608pub struct SSTableIterator<'a> {
609    reader: &'a mut SSTableReader,
610    block_idx: usize,
611    block_entries: Vec<SSTableEntry>,
612    entry_idx: usize,
613}
614
615impl SSTableIterator<'_> {
616    /// Get the next entry
617    pub fn next_entry(&mut self) -> Result<Option<SSTableEntry>> {
618        loop {
619            // If we have entries in the current block, return the next one
620            if self.entry_idx < self.block_entries.len() {
621                let entry = self.block_entries[self.entry_idx].clone();
622                self.entry_idx += 1;
623                return Ok(Some(entry));
624            }
625
626            // Load the next block
627            if self.block_idx >= self.reader.index.len() {
628                return Ok(None);
629            }
630
631            self.block_entries = self.reader.read_block(self.block_idx)?;
632            self.block_idx += 1;
633            self.entry_idx = 0;
634        }
635    }
636}
637
638/// Delete an SSTable file
639pub fn delete_sstable(path: impl AsRef<Path>) -> Result<()> {
640    fs::remove_file(path)?;
641    Ok(())
642}
643
644#[cfg(test)]
645mod tests {
646    use super::*;
647    use tempfile::tempdir;
648
649    #[test]
650    fn test_sstable_write_read() {
651        let dir = tempdir().unwrap();
652        let path = dir.path().join("test.sst");
653
654        // Write SSTable
655        let mut writer = SSTableWriter::new(&path).unwrap();
656        writer
657            .add(SSTableEntry::value(b"a".to_vec(), b"1".to_vec()))
658            .unwrap();
659        writer
660            .add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec()))
661            .unwrap();
662        writer
663            .add(SSTableEntry::value(b"c".to_vec(), b"3".to_vec()))
664            .unwrap();
665        let meta = writer.finish().unwrap();
666
667        assert_eq!(meta.entry_count, 3);
668        assert_eq!(meta.min_key, b"a".to_vec());
669        assert_eq!(meta.max_key, b"c".to_vec());
670
671        // Read SSTable
672        let mut reader = SSTableReader::open(&path).unwrap();
673
674        let entry = reader.get(b"a").unwrap().unwrap();
675        assert_eq!(entry.value, b"1".to_vec());
676
677        let entry = reader.get(b"b").unwrap().unwrap();
678        assert_eq!(entry.value, b"2".to_vec());
679
680        let entry = reader.get(b"c").unwrap().unwrap();
681        assert_eq!(entry.value, b"3".to_vec());
682
683        assert!(reader.get(b"d").unwrap().is_none());
684    }
685
686    #[test]
687    fn test_sstable_tombstone() {
688        let dir = tempdir().unwrap();
689        let path = dir.path().join("test.sst");
690
691        // Keys must be added in sorted order
692        let mut writer = SSTableWriter::new(&path).unwrap();
693        writer
694            .add(SSTableEntry::tombstone(b"deleted".to_vec()))
695            .unwrap();
696        writer
697            .add(SSTableEntry::value(b"key".to_vec(), b"value".to_vec()))
698            .unwrap();
699        writer.finish().unwrap();
700
701        let mut reader = SSTableReader::open(&path).unwrap();
702
703        let entry = reader.get(b"key").unwrap().unwrap();
704        assert!(!entry.is_tombstone());
705
706        let entry = reader.get(b"deleted").unwrap().unwrap();
707        assert!(entry.is_tombstone());
708    }
709
710    #[test]
711    fn test_sstable_iterator() {
712        let dir = tempdir().unwrap();
713        let path = dir.path().join("test.sst");
714
715        let mut writer = SSTableWriter::new(&path).unwrap();
716        for i in 0..100 {
717            let key = format!("key{:03}", i);
718            let value = format!("value{}", i);
719            writer
720                .add(SSTableEntry::value(key.into_bytes(), value.into_bytes()))
721                .unwrap();
722        }
723        writer.finish().unwrap();
724
725        let mut reader = SSTableReader::open(&path).unwrap();
726        let mut iter = reader.iter().unwrap();
727
728        let mut count = 0;
729        while let Some(_entry) = iter.next_entry().unwrap() {
730            count += 1;
731        }
732
733        assert_eq!(count, 100);
734    }
735
736    #[test]
737    fn test_sstable_from_memtable() {
738        use crate::memtable::Memtable;
739
740        let dir = tempdir().unwrap();
741        let path = dir.path().join("test.sst");
742
743        let mut mt = Memtable::new();
744        mt.put(b"a".to_vec(), b"1".to_vec());
745        mt.put(b"b".to_vec(), b"2".to_vec());
746        mt.delete(b"c".to_vec());
747
748        let meta = SSTableWriter::from_memtable(&path, mt.drain()).unwrap();
749
750        assert_eq!(meta.entry_count, 3);
751
752        let mut reader = SSTableReader::open(&path).unwrap();
753        assert_eq!(reader.get(b"a").unwrap().unwrap().value, b"1".to_vec());
754        assert!(reader.get(b"c").unwrap().unwrap().is_tombstone());
755    }
756
757    #[test]
758    fn test_sstable_might_contain() {
759        let dir = tempdir().unwrap();
760        let path = dir.path().join("test.sst");
761
762        let mut writer = SSTableWriter::new(&path).unwrap();
763        writer
764            .add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec()))
765            .unwrap();
766        writer
767            .add(SSTableEntry::value(b"d".to_vec(), b"4".to_vec()))
768            .unwrap();
769        writer.finish().unwrap();
770
771        let reader = SSTableReader::open(&path).unwrap();
772
773        assert!(!reader.might_contain(b"a")); // Before range
774        assert!(reader.might_contain(b"b")); // In range
775        assert!(reader.might_contain(b"c")); // In range (might be there)
776        assert!(reader.might_contain(b"d")); // In range
777        assert!(!reader.might_contain(b"e")); // After range
778    }
779}