Skip to main content

hermes_core/structures/
sstable.rs

1//! Async SSTable with lazy loading via FileSlice
2//!
3//! Memory-efficient design - only loads minimal metadata into memory,
4//! blocks are loaded on-demand.
5//!
6//! ## Key Features
7//!
8//! 1. **FST-based Block Index**: Uses Finite State Transducer for key lookup
9//!    - Can be mmap'd directly without parsing into heap-allocated structures
10//!    - ~90% memory reduction compared to Vec<BlockIndexEntry>
11//!
12//! 2. **Bitpacked Block Addresses**: Offsets and lengths stored with delta encoding
13//!    - Minimal memory footprint for block metadata
14//!
15//! 3. **Dictionary Compression**: Zstd dictionary for 15-30% better compression
16//!
17//! 4. **Configurable Compression Level**: Levels 1-22 for space/speed tradeoff
18//!
19//! 5. **Bloom Filter**: Fast negative lookups to skip unnecessary I/O
20
21use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
22use parking_lot::RwLock;
23use rustc_hash::FxHashMap;
24use std::io::{self, Read, Write};
25use std::sync::Arc;
26
27#[cfg(feature = "native")]
28use super::sstable_index::FstBlockIndex;
29use super::sstable_index::{BlockAddr, BlockIndex, MmapBlockIndex};
30use crate::compression::{CompressionDict, CompressionLevel};
31use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice, OwnedBytes};
32
33/// SSTable magic number - version 4 with memory-efficient index
34/// Uses FST-based or mmap'd block index to avoid heap allocation
35pub const SSTABLE_MAGIC: u32 = 0x53544234; // "STB4"
36
37/// Block size for SSTable (16KB default)
38pub const BLOCK_SIZE: usize = 16 * 1024;
39
40/// Default dictionary size (64KB)
41pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
42
43/// Bloom filter bits per key (10 bits ≈ 1% false positive rate)
44pub const BLOOM_BITS_PER_KEY: usize = 10;
45
46/// Bloom filter hash count (optimal for 10 bits/key)
47pub const BLOOM_HASH_COUNT: usize = 7;
48
49// ============================================================================
50// Bloom Filter Implementation
51// ============================================================================
52
53/// Simple bloom filter for key existence checks
54#[derive(Debug, Clone)]
55pub struct BloomFilter {
56    bits: Vec<u64>,
57    num_bits: usize,
58    num_hashes: usize,
59}
60
61impl BloomFilter {
62    /// Create a new bloom filter sized for expected number of keys
63    pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
64        let num_bits = (expected_keys * bits_per_key).max(64);
65        let num_words = num_bits.div_ceil(64);
66        Self {
67            bits: vec![0u64; num_words],
68            num_bits,
69            num_hashes: BLOOM_HASH_COUNT,
70        }
71    }
72
73    /// Create from serialized bytes
74    pub fn from_bytes(data: &[u8]) -> io::Result<Self> {
75        if data.len() < 12 {
76            return Err(io::Error::new(
77                io::ErrorKind::InvalidData,
78                "Bloom filter data too short",
79            ));
80        }
81        let mut reader = data;
82        let num_bits = reader.read_u32::<LittleEndian>()? as usize;
83        let num_hashes = reader.read_u32::<LittleEndian>()? as usize;
84        let num_words = reader.read_u32::<LittleEndian>()? as usize;
85
86        if reader.len() < num_words * 8 {
87            return Err(io::Error::new(
88                io::ErrorKind::InvalidData,
89                "Bloom filter data truncated",
90            ));
91        }
92
93        let mut bits = Vec::with_capacity(num_words);
94        for _ in 0..num_words {
95            bits.push(reader.read_u64::<LittleEndian>()?);
96        }
97
98        Ok(Self {
99            bits,
100            num_bits,
101            num_hashes,
102        })
103    }
104
105    /// Serialize to bytes
106    pub fn to_bytes(&self) -> Vec<u8> {
107        let mut data = Vec::with_capacity(12 + self.bits.len() * 8);
108        data.write_u32::<LittleEndian>(self.num_bits as u32)
109            .unwrap();
110        data.write_u32::<LittleEndian>(self.num_hashes as u32)
111            .unwrap();
112        data.write_u32::<LittleEndian>(self.bits.len() as u32)
113            .unwrap();
114        for &word in &self.bits {
115            data.write_u64::<LittleEndian>(word).unwrap();
116        }
117        data
118    }
119
120    /// Add a key to the filter
121    pub fn insert(&mut self, key: &[u8]) {
122        let (h1, h2) = self.hash_pair(key);
123        for i in 0..self.num_hashes {
124            let bit_pos = self.get_bit_pos(h1, h2, i);
125            let word_idx = bit_pos / 64;
126            let bit_idx = bit_pos % 64;
127            if word_idx < self.bits.len() {
128                self.bits[word_idx] |= 1u64 << bit_idx;
129            }
130        }
131    }
132
133    /// Check if a key might be in the filter
134    /// Returns false if definitely not present, true if possibly present
135    pub fn may_contain(&self, key: &[u8]) -> bool {
136        let (h1, h2) = self.hash_pair(key);
137        for i in 0..self.num_hashes {
138            let bit_pos = self.get_bit_pos(h1, h2, i);
139            let word_idx = bit_pos / 64;
140            let bit_idx = bit_pos % 64;
141            if word_idx >= self.bits.len() || (self.bits[word_idx] & (1u64 << bit_idx)) == 0 {
142                return false;
143            }
144        }
145        true
146    }
147
148    /// Size in bytes
149    pub fn size_bytes(&self) -> usize {
150        12 + self.bits.len() * 8
151    }
152
153    /// Compute two hash values using FNV-1a variant
154    fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
155        // FNV-1a hash
156        let mut h1: u64 = 0xcbf29ce484222325;
157        for &byte in key {
158            h1 ^= byte as u64;
159            h1 = h1.wrapping_mul(0x100000001b3);
160        }
161
162        // Second hash using different seed
163        let mut h2: u64 = 0x84222325cbf29ce4;
164        for &byte in key {
165            h2 = h2.wrapping_mul(0x100000001b3);
166            h2 ^= byte as u64;
167        }
168
169        (h1, h2)
170    }
171
172    /// Get bit position for hash iteration i using double hashing
173    #[inline]
174    fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
175        (h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
176    }
177}
178
179/// SSTable value trait
180pub trait SSTableValue: Clone + Send + Sync {
181    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
182    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
183}
184
185/// u64 value implementation
186impl SSTableValue for u64 {
187    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
188        write_vint(writer, *self)
189    }
190
191    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
192        read_vint(reader)
193    }
194}
195
196/// Vec<u8> value implementation
197impl SSTableValue for Vec<u8> {
198    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
199        write_vint(writer, self.len() as u64)?;
200        writer.write_all(self)
201    }
202
203    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
204        let len = read_vint(reader)? as usize;
205        let mut data = vec![0u8; len];
206        reader.read_exact(&mut data)?;
207        Ok(data)
208    }
209}
210
211/// Sparse dimension info for SSTable-based sparse index
212/// Stores offset and length for posting list lookup
213#[derive(Debug, Clone, Copy, PartialEq, Eq)]
214pub struct SparseDimInfo {
215    /// Offset in sparse file where posting list starts
216    pub offset: u64,
217    /// Length of serialized posting list
218    pub length: u32,
219}
220
221impl SparseDimInfo {
222    pub fn new(offset: u64, length: u32) -> Self {
223        Self { offset, length }
224    }
225}
226
227impl SSTableValue for SparseDimInfo {
228    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
229        write_vint(writer, self.offset)?;
230        write_vint(writer, self.length as u64)
231    }
232
233    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
234        let offset = read_vint(reader)?;
235        let length = read_vint(reader)? as u32;
236        Ok(Self { offset, length })
237    }
238}
239
240/// Maximum number of postings that can be inlined in TermInfo
241pub const MAX_INLINE_POSTINGS: usize = 3;
242
243/// Term info for posting list references
244///
245/// Supports two modes:
246/// - **Inline**: Small posting lists (1-3 docs) stored directly in TermInfo
247/// - **External**: Larger posting lists stored in separate .post file
248///
249/// This eliminates a separate I/O read for rare/unique terms.
250#[derive(Debug, Clone, PartialEq, Eq)]
251pub enum TermInfo {
252    /// Small posting list inlined directly (up to MAX_INLINE_POSTINGS entries)
253    /// Each entry is (doc_id, term_freq) delta-encoded
254    Inline {
255        /// Number of postings (1-3)
256        doc_freq: u8,
257        /// Inline data: delta-encoded (doc_id, term_freq) pairs
258        /// Format: [delta_doc_id, term_freq, delta_doc_id, term_freq, ...]
259        data: [u8; 16],
260        /// Actual length of data used
261        data_len: u8,
262    },
263    /// Reference to external posting list in .post file
264    External {
265        posting_offset: u64,
266        posting_len: u32,
267        doc_freq: u32,
268        /// Position data offset (0 if no positions)
269        position_offset: u64,
270        /// Position data length (0 if no positions)
271        position_len: u32,
272    },
273}
274
275impl TermInfo {
276    /// Create an external reference
277    pub fn external(posting_offset: u64, posting_len: u32, doc_freq: u32) -> Self {
278        TermInfo::External {
279            posting_offset,
280            posting_len,
281            doc_freq,
282            position_offset: 0,
283            position_len: 0,
284        }
285    }
286
287    /// Create an external reference with position info
288    pub fn external_with_positions(
289        posting_offset: u64,
290        posting_len: u32,
291        doc_freq: u32,
292        position_offset: u64,
293        position_len: u32,
294    ) -> Self {
295        TermInfo::External {
296            posting_offset,
297            posting_len,
298            doc_freq,
299            position_offset,
300            position_len,
301        }
302    }
303
304    /// Try to create an inline TermInfo from posting data
305    /// Returns None if posting list is too large to inline
306    pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
307        if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
308            return None;
309        }
310
311        let mut data = [0u8; 16];
312        let mut cursor = std::io::Cursor::new(&mut data[..]);
313        let mut prev_doc_id = 0u32;
314
315        for (i, &doc_id) in doc_ids.iter().enumerate() {
316            let delta = doc_id - prev_doc_id;
317            if write_vint(&mut cursor, delta as u64).is_err() {
318                return None;
319            }
320            if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
321                return None;
322            }
323            prev_doc_id = doc_id;
324        }
325
326        let data_len = cursor.position() as u8;
327        if data_len > 16 {
328            return None;
329        }
330
331        Some(TermInfo::Inline {
332            doc_freq: doc_ids.len() as u8,
333            data,
334            data_len,
335        })
336    }
337
338    /// Get document frequency
339    pub fn doc_freq(&self) -> u32 {
340        match self {
341            TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
342            TermInfo::External { doc_freq, .. } => *doc_freq,
343        }
344    }
345
346    /// Check if this is an inline posting list
347    pub fn is_inline(&self) -> bool {
348        matches!(self, TermInfo::Inline { .. })
349    }
350
351    /// Get external posting info (offset, len) - returns None for inline
352    pub fn external_info(&self) -> Option<(u64, u32)> {
353        match self {
354            TermInfo::External {
355                posting_offset,
356                posting_len,
357                ..
358            } => Some((*posting_offset, *posting_len)),
359            TermInfo::Inline { .. } => None,
360        }
361    }
362
363    /// Get position info (offset, len) - returns None for inline or if no positions
364    pub fn position_info(&self) -> Option<(u64, u32)> {
365        match self {
366            TermInfo::External {
367                position_offset,
368                position_len,
369                ..
370            } if *position_len > 0 => Some((*position_offset, *position_len)),
371            _ => None,
372        }
373    }
374
375    /// Decode inline postings into (doc_ids, term_freqs)
376    /// Returns None if this is an external reference
377    pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
378        match self {
379            TermInfo::Inline {
380                doc_freq,
381                data,
382                data_len,
383            } => {
384                let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
385                let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
386                let mut reader = &data[..*data_len as usize];
387                let mut prev_doc_id = 0u32;
388
389                for _ in 0..*doc_freq {
390                    let delta = read_vint(&mut reader).ok()? as u32;
391                    let tf = read_vint(&mut reader).ok()? as u32;
392                    let doc_id = prev_doc_id + delta;
393                    doc_ids.push(doc_id);
394                    term_freqs.push(tf);
395                    prev_doc_id = doc_id;
396                }
397
398                Some((doc_ids, term_freqs))
399            }
400            TermInfo::External { .. } => None,
401        }
402    }
403}
404
405impl SSTableValue for TermInfo {
406    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
407        match self {
408            TermInfo::Inline {
409                doc_freq,
410                data,
411                data_len,
412            } => {
413                // Tag byte 0xFF = inline marker
414                writer.write_u8(0xFF)?;
415                writer.write_u8(*doc_freq)?;
416                writer.write_u8(*data_len)?;
417                writer.write_all(&data[..*data_len as usize])?;
418            }
419            TermInfo::External {
420                posting_offset,
421                posting_len,
422                doc_freq,
423                position_offset,
424                position_len,
425            } => {
426                // Tag byte 0x00 = external marker (no positions)
427                // Tag byte 0x01 = external with positions
428                if *position_len > 0 {
429                    writer.write_u8(0x01)?;
430                    write_vint(writer, *doc_freq as u64)?;
431                    write_vint(writer, *posting_offset)?;
432                    write_vint(writer, *posting_len as u64)?;
433                    write_vint(writer, *position_offset)?;
434                    write_vint(writer, *position_len as u64)?;
435                } else {
436                    writer.write_u8(0x00)?;
437                    write_vint(writer, *doc_freq as u64)?;
438                    write_vint(writer, *posting_offset)?;
439                    write_vint(writer, *posting_len as u64)?;
440                }
441            }
442        }
443        Ok(())
444    }
445
446    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
447        let tag = reader.read_u8()?;
448
449        if tag == 0xFF {
450            // Inline
451            let doc_freq = reader.read_u8()?;
452            let data_len = reader.read_u8()?;
453            let mut data = [0u8; 16];
454            reader.read_exact(&mut data[..data_len as usize])?;
455            Ok(TermInfo::Inline {
456                doc_freq,
457                data,
458                data_len,
459            })
460        } else if tag == 0x00 {
461            // External (no positions)
462            let doc_freq = read_vint(reader)? as u32;
463            let posting_offset = read_vint(reader)?;
464            let posting_len = read_vint(reader)? as u32;
465            Ok(TermInfo::External {
466                posting_offset,
467                posting_len,
468                doc_freq,
469                position_offset: 0,
470                position_len: 0,
471            })
472        } else if tag == 0x01 {
473            // External with positions
474            let doc_freq = read_vint(reader)? as u32;
475            let posting_offset = read_vint(reader)?;
476            let posting_len = read_vint(reader)? as u32;
477            let position_offset = read_vint(reader)?;
478            let position_len = read_vint(reader)? as u32;
479            Ok(TermInfo::External {
480                posting_offset,
481                posting_len,
482                doc_freq,
483                position_offset,
484                position_len,
485            })
486        } else {
487            Err(io::Error::new(
488                io::ErrorKind::InvalidData,
489                format!("Invalid TermInfo tag: {}", tag),
490            ))
491        }
492    }
493}
494
495/// Write variable-length integer
496pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
497    loop {
498        let byte = (value & 0x7F) as u8;
499        value >>= 7;
500        if value == 0 {
501            writer.write_u8(byte)?;
502            return Ok(());
503        } else {
504            writer.write_u8(byte | 0x80)?;
505        }
506    }
507}
508
509/// Read variable-length integer
510pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
511    let mut result = 0u64;
512    let mut shift = 0;
513
514    loop {
515        let byte = reader.read_u8()?;
516        result |= ((byte & 0x7F) as u64) << shift;
517        if byte & 0x80 == 0 {
518            return Ok(result);
519        }
520        shift += 7;
521        if shift >= 64 {
522            return Err(io::Error::new(
523                io::ErrorKind::InvalidData,
524                "varint too long",
525            ));
526        }
527    }
528}
529
530/// Compute common prefix length
531pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
532    a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
533}
534
535/// SSTable statistics for debugging
536#[derive(Debug, Clone)]
537pub struct SSTableStats {
538    pub num_blocks: usize,
539    pub num_sparse_entries: usize,
540    pub num_entries: u64,
541    pub has_bloom_filter: bool,
542    pub has_dictionary: bool,
543    pub bloom_filter_size: usize,
544    pub dictionary_size: usize,
545}
546
547/// SSTable writer configuration
548#[derive(Debug, Clone)]
549pub struct SSTableWriterConfig {
550    /// Compression level (1-22, higher = better compression but slower)
551    pub compression_level: CompressionLevel,
552    /// Whether to train and use a dictionary for compression
553    pub use_dictionary: bool,
554    /// Dictionary size in bytes (default 64KB)
555    pub dict_size: usize,
556    /// Whether to build a bloom filter
557    pub use_bloom_filter: bool,
558    /// Bloom filter bits per key (default 10 = ~1% false positive rate)
559    pub bloom_bits_per_key: usize,
560}
561
562impl Default for SSTableWriterConfig {
563    fn default() -> Self {
564        Self::from_optimization(crate::structures::IndexOptimization::default())
565    }
566}
567
568impl SSTableWriterConfig {
569    /// Create config from IndexOptimization mode
570    pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
571        use crate::structures::IndexOptimization;
572        match optimization {
573            IndexOptimization::Adaptive => Self {
574                compression_level: CompressionLevel::BETTER, // Level 9
575                use_dictionary: false,
576                dict_size: DEFAULT_DICT_SIZE,
577                use_bloom_filter: false,
578                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
579            },
580            IndexOptimization::SizeOptimized => Self {
581                compression_level: CompressionLevel::MAX, // Level 22
582                use_dictionary: true,
583                dict_size: DEFAULT_DICT_SIZE,
584                use_bloom_filter: true,
585                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
586            },
587            IndexOptimization::PerformanceOptimized => Self {
588                compression_level: CompressionLevel::FAST, // Level 1
589                use_dictionary: false,
590                dict_size: DEFAULT_DICT_SIZE,
591                use_bloom_filter: true, // Bloom helps skip blocks fast
592                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
593            },
594        }
595    }
596
597    /// Fast configuration - prioritize write speed over compression
598    pub fn fast() -> Self {
599        Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
600    }
601
602    /// Maximum compression configuration - prioritize size over speed
603    pub fn max_compression() -> Self {
604        Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
605    }
606}
607
608/// SSTable writer with optimizations:
609/// - Dictionary compression for blocks (if dictionary provided)
610/// - Configurable compression level
611/// - Block index prefix compression
612/// - Bloom filter for fast negative lookups
613pub struct SSTableWriter<'a, V: SSTableValue> {
614    writer: &'a mut dyn Write,
615    block_buffer: Vec<u8>,
616    prev_key: Vec<u8>,
617    index: Vec<BlockIndexEntry>,
618    current_offset: u64,
619    num_entries: u64,
620    block_first_key: Option<Vec<u8>>,
621    config: SSTableWriterConfig,
622    /// Pre-trained dictionary for compression (optional)
623    dictionary: Option<CompressionDict>,
624    /// All keys for bloom filter
625    all_keys: Vec<Vec<u8>>,
626    /// Bloom filter (built at finish time)
627    bloom_filter: Option<BloomFilter>,
628    _phantom: std::marker::PhantomData<V>,
629}
630
631impl<'a, V: SSTableValue> SSTableWriter<'a, V> {
632    /// Create a new SSTable writer with default configuration
633    pub fn new(writer: &'a mut dyn Write) -> Self {
634        Self::with_config(writer, SSTableWriterConfig::default())
635    }
636
637    /// Create a new SSTable writer with custom configuration
638    pub fn with_config(writer: &'a mut dyn Write, config: SSTableWriterConfig) -> Self {
639        Self {
640            writer,
641            block_buffer: Vec::with_capacity(BLOCK_SIZE),
642            prev_key: Vec::new(),
643            index: Vec::new(),
644            current_offset: 0,
645            num_entries: 0,
646            block_first_key: None,
647            config,
648            dictionary: None,
649            all_keys: Vec::new(),
650            bloom_filter: None,
651            _phantom: std::marker::PhantomData,
652        }
653    }
654
655    /// Create a new SSTable writer with a pre-trained dictionary
656    pub fn with_dictionary(
657        writer: &'a mut dyn Write,
658        config: SSTableWriterConfig,
659        dictionary: CompressionDict,
660    ) -> Self {
661        Self {
662            writer,
663            block_buffer: Vec::with_capacity(BLOCK_SIZE),
664            prev_key: Vec::new(),
665            index: Vec::new(),
666            current_offset: 0,
667            num_entries: 0,
668            block_first_key: None,
669            config,
670            dictionary: Some(dictionary),
671            all_keys: Vec::new(),
672            bloom_filter: None,
673            _phantom: std::marker::PhantomData,
674        }
675    }
676
677    pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
678        if self.block_first_key.is_none() {
679            self.block_first_key = Some(key.to_vec());
680        }
681
682        // Collect keys for bloom filter
683        if self.config.use_bloom_filter {
684            self.all_keys.push(key.to_vec());
685        }
686
687        let prefix_len = common_prefix_len(&self.prev_key, key);
688        let suffix = &key[prefix_len..];
689
690        write_vint(&mut self.block_buffer, prefix_len as u64)?;
691        write_vint(&mut self.block_buffer, suffix.len() as u64)?;
692        self.block_buffer.extend_from_slice(suffix);
693        value.serialize(&mut self.block_buffer)?;
694
695        self.prev_key.clear();
696        self.prev_key.extend_from_slice(key);
697        self.num_entries += 1;
698
699        if self.block_buffer.len() >= BLOCK_SIZE {
700            self.flush_block()?;
701        }
702
703        Ok(())
704    }
705
706    /// Flush and compress the current block
707    fn flush_block(&mut self) -> io::Result<()> {
708        if self.block_buffer.is_empty() {
709            return Ok(());
710        }
711
712        // Compress block with dictionary if available
713        let compressed = if let Some(ref dict) = self.dictionary {
714            crate::compression::compress_with_dict(
715                &self.block_buffer,
716                self.config.compression_level,
717                dict,
718            )?
719        } else {
720            crate::compression::compress(&self.block_buffer, self.config.compression_level)?
721        };
722
723        if let Some(first_key) = self.block_first_key.take() {
724            self.index.push(BlockIndexEntry {
725                first_key,
726                offset: self.current_offset,
727                length: compressed.len() as u32,
728            });
729        }
730
731        self.writer.write_all(&compressed)?;
732        self.current_offset += compressed.len() as u64;
733        self.block_buffer.clear();
734        self.prev_key.clear();
735
736        Ok(())
737    }
738
739    pub fn finish(mut self) -> io::Result<()> {
740        // Flush any remaining data
741        self.flush_block()?;
742
743        // Build bloom filter from collected keys
744        if self.config.use_bloom_filter && !self.all_keys.is_empty() {
745            let mut bloom = BloomFilter::new(self.all_keys.len(), self.config.bloom_bits_per_key);
746            for key in &self.all_keys {
747                bloom.insert(key);
748            }
749            self.bloom_filter = Some(bloom);
750        }
751
752        let data_end_offset = self.current_offset;
753
754        // Build memory-efficient block index (v4 format)
755        // Convert to (key, BlockAddr) pairs for the new index format
756        let entries: Vec<(Vec<u8>, BlockAddr)> = self
757            .index
758            .iter()
759            .map(|e| {
760                (
761                    e.first_key.clone(),
762                    BlockAddr {
763                        offset: e.offset,
764                        length: e.length,
765                    },
766                )
767            })
768            .collect();
769
770        // Build FST-based index if native feature is enabled, otherwise use mmap index
771        #[cfg(feature = "native")]
772        let index_bytes = FstBlockIndex::build(&entries)?;
773        #[cfg(not(feature = "native"))]
774        let index_bytes = MmapBlockIndex::build(&entries)?;
775
776        // Write index bytes with length prefix
777        self.writer
778            .write_u32::<LittleEndian>(index_bytes.len() as u32)?;
779        self.writer.write_all(&index_bytes)?;
780        self.current_offset += 4 + index_bytes.len() as u64;
781
782        // Write bloom filter if present
783        let bloom_offset = if let Some(ref bloom) = self.bloom_filter {
784            let bloom_data = bloom.to_bytes();
785            let offset = self.current_offset;
786            self.writer.write_all(&bloom_data)?;
787            self.current_offset += bloom_data.len() as u64;
788            offset
789        } else {
790            0
791        };
792
793        // Write dictionary if present
794        let dict_offset = if let Some(ref dict) = self.dictionary {
795            let dict_bytes = dict.as_bytes();
796            let offset = self.current_offset;
797            self.writer
798                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
799            self.writer.write_all(dict_bytes)?;
800            self.current_offset += 4 + dict_bytes.len() as u64;
801            offset
802        } else {
803            0
804        };
805
806        // Write extended footer (v4 format)
807        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
808        self.writer.write_u64::<LittleEndian>(self.num_entries)?;
809        self.writer.write_u64::<LittleEndian>(bloom_offset)?; // 0 if no bloom
810        self.writer.write_u64::<LittleEndian>(dict_offset)?; // 0 if no dict
811        self.writer
812            .write_u8(self.config.compression_level.0 as u8)?;
813        self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
814
815        Ok(())
816    }
817}
818
819/// Block index entry
820#[derive(Debug, Clone)]
821struct BlockIndexEntry {
822    first_key: Vec<u8>,
823    offset: u64,
824    length: u32,
825}
826
827/// Async SSTable reader - loads blocks on demand via LazyFileSlice
828///
829/// Memory-efficient design (v4 format):
830/// - Block index uses FST (native) or mmap'd raw bytes - no heap allocation for keys
831/// - Block addresses stored in bitpacked format
832/// - Bloom filter and dictionary optional
833pub struct AsyncSSTableReader<V: SSTableValue> {
834    /// LazyFileSlice for the data portion (blocks only) - fetches ranges on demand
835    data_slice: LazyFileSlice,
836    /// Memory-efficient block index (v4: FST or mmap, v3: legacy Vec)
837    block_index: BlockIndex,
838    num_entries: u64,
839    /// Hot cache for decompressed blocks
840    cache: RwLock<BlockCache>,
841    /// Bloom filter for fast negative lookups (optional)
842    bloom_filter: Option<BloomFilter>,
843    /// Compression dictionary (optional)
844    dictionary: Option<CompressionDict>,
845    /// Compression level used
846    #[allow(dead_code)]
847    compression_level: CompressionLevel,
848    _phantom: std::marker::PhantomData<V>,
849}
850
851/// LRU-style block cache
852struct BlockCache {
853    blocks: FxHashMap<u64, Arc<Vec<u8>>>,
854    access_order: Vec<u64>,
855    max_blocks: usize,
856}
857
858impl BlockCache {
859    fn new(max_blocks: usize) -> Self {
860        Self {
861            blocks: FxHashMap::default(),
862            access_order: Vec::new(),
863            max_blocks,
864        }
865    }
866
867    fn get(&mut self, offset: u64) -> Option<Arc<Vec<u8>>> {
868        if let Some(block) = self.blocks.get(&offset) {
869            if let Some(pos) = self.access_order.iter().position(|&o| o == offset) {
870                self.access_order.remove(pos);
871                self.access_order.push(offset);
872            }
873            Some(Arc::clone(block))
874        } else {
875            None
876        }
877    }
878
879    fn insert(&mut self, offset: u64, block: Arc<Vec<u8>>) {
880        while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
881            let evict_offset = self.access_order.remove(0);
882            self.blocks.remove(&evict_offset);
883        }
884        self.blocks.insert(offset, block);
885        self.access_order.push(offset);
886    }
887}
888
889impl<V: SSTableValue> AsyncSSTableReader<V> {
890    /// Open an SSTable from a LazyFileHandle
891    /// Only loads the footer and index into memory, data blocks fetched on-demand
892    ///
893    /// Supports both v3 (legacy) and v4 (memory-efficient) formats:
894    /// - v4: FST-based or mmap'd block index (no heap allocation for keys)
895    /// - v3: Prefix-compressed block index loaded into Vec (legacy fallback)
896    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
897        let file_len = file_handle.len();
898        if file_len < 37 {
899            return Err(io::Error::new(
900                io::ErrorKind::InvalidData,
901                "SSTable too small",
902            ));
903        }
904
905        // Read footer (37 bytes)
906        // Format: data_end(8) + num_entries(8) + bloom_offset(8) + dict_offset(8) + compression_level(1) + magic(4)
907        let footer_bytes = file_handle
908            .read_bytes_range(file_len - 37..file_len)
909            .await?;
910
911        let mut reader = footer_bytes.as_slice();
912        let data_end_offset = reader.read_u64::<LittleEndian>()?;
913        let num_entries = reader.read_u64::<LittleEndian>()?;
914        let bloom_offset = reader.read_u64::<LittleEndian>()?;
915        let dict_offset = reader.read_u64::<LittleEndian>()?;
916        let compression_level = CompressionLevel(reader.read_u8()? as i32);
917        let magic = reader.read_u32::<LittleEndian>()?;
918
919        if magic != SSTABLE_MAGIC {
920            return Err(io::Error::new(
921                io::ErrorKind::InvalidData,
922                format!("Invalid SSTable magic: 0x{:08X}", magic),
923            ));
924        }
925
926        // Read index section
927        let index_start = data_end_offset;
928        let index_end = file_len - 37;
929        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
930
931        // Parse block index (length-prefixed FST or mmap index)
932        let mut idx_reader = index_bytes.as_slice();
933        let index_len = idx_reader.read_u32::<LittleEndian>()? as usize;
934
935        if index_len > idx_reader.len() {
936            return Err(io::Error::new(
937                io::ErrorKind::InvalidData,
938                "Index data truncated",
939            ));
940        }
941
942        let index_data = OwnedBytes::new(idx_reader[..index_len].to_vec());
943
944        // Try FST first (native), fall back to mmap
945        #[cfg(feature = "native")]
946        let block_index = match FstBlockIndex::load(index_data.clone()) {
947            Ok(fst_idx) => BlockIndex::Fst(fst_idx),
948            Err(_) => BlockIndex::Mmap(MmapBlockIndex::load(index_data)?),
949        };
950        #[cfg(not(feature = "native"))]
951        let block_index = BlockIndex::Mmap(MmapBlockIndex::load(index_data)?);
952
953        // Load bloom filter if present
954        let bloom_filter = if bloom_offset > 0 {
955            let bloom_start = bloom_offset;
956            // Read bloom filter size first (12 bytes header)
957            let bloom_header = file_handle
958                .read_bytes_range(bloom_start..bloom_start + 12)
959                .await?;
960            let num_words = u32::from_le_bytes([
961                bloom_header[8],
962                bloom_header[9],
963                bloom_header[10],
964                bloom_header[11],
965            ]) as u64;
966            let bloom_size = 12 + num_words * 8;
967            let bloom_data = file_handle
968                .read_bytes_range(bloom_start..bloom_start + bloom_size)
969                .await?;
970            Some(BloomFilter::from_bytes(&bloom_data)?)
971        } else {
972            None
973        };
974
975        // Load dictionary if present
976        let dictionary = if dict_offset > 0 {
977            let dict_start = dict_offset;
978            // Read dictionary size first
979            let dict_len_bytes = file_handle
980                .read_bytes_range(dict_start..dict_start + 4)
981                .await?;
982            let dict_len = u32::from_le_bytes([
983                dict_len_bytes[0],
984                dict_len_bytes[1],
985                dict_len_bytes[2],
986                dict_len_bytes[3],
987            ]) as u64;
988            let dict_data = file_handle
989                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
990                .await?;
991            Some(CompressionDict::from_bytes(dict_data.to_vec()))
992        } else {
993            None
994        };
995
996        // Create a lazy slice for just the data portion
997        let data_slice = file_handle.slice(0..data_end_offset);
998
999        Ok(Self {
1000            data_slice,
1001            block_index,
1002            num_entries,
1003            cache: RwLock::new(BlockCache::new(cache_blocks)),
1004            bloom_filter,
1005            dictionary,
1006            compression_level,
1007            _phantom: std::marker::PhantomData,
1008        })
1009    }
1010
1011    /// Number of entries
1012    pub fn num_entries(&self) -> u64 {
1013        self.num_entries
1014    }
1015
1016    /// Get stats about this SSTable for debugging
1017    pub fn stats(&self) -> SSTableStats {
1018        SSTableStats {
1019            num_blocks: self.block_index.len(),
1020            num_sparse_entries: 0, // No longer using sparse index separately
1021            num_entries: self.num_entries,
1022            has_bloom_filter: self.bloom_filter.is_some(),
1023            has_dictionary: self.dictionary.is_some(),
1024            bloom_filter_size: self
1025                .bloom_filter
1026                .as_ref()
1027                .map(|b| b.size_bytes())
1028                .unwrap_or(0),
1029            dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
1030        }
1031    }
1032
1033    /// Look up a key (async - may need to load block)
1034    ///
1035    /// Uses bloom filter for fast negative lookups, then memory-efficient
1036    /// block index to locate the block, reducing I/O to typically 1 block read.
1037    pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
1038        log::debug!(
1039            "SSTable::get called, key_len={}, total_blocks={}",
1040            key.len(),
1041            self.block_index.len()
1042        );
1043
1044        // Check bloom filter first - fast negative lookup
1045        if let Some(ref bloom) = self.bloom_filter
1046            && !bloom.may_contain(key)
1047        {
1048            log::debug!("SSTable::get bloom filter negative");
1049            return Ok(None);
1050        }
1051
1052        // Use block index to find the block that could contain the key
1053        let block_idx = match self.block_index.locate(key) {
1054            Some(idx) => idx,
1055            None => {
1056                log::debug!("SSTable::get key not found (before first block)");
1057                return Ok(None);
1058            }
1059        };
1060
1061        log::debug!("SSTable::get loading block_idx={}", block_idx);
1062
1063        // Now we know exactly which block to load - single I/O
1064        let block_data = self.load_block(block_idx).await?;
1065        self.search_block(&block_data, key)
1066    }
1067
1068    /// Batch lookup multiple keys with optimized I/O
1069    ///
1070    /// Groups keys by block and loads each block only once, reducing
1071    /// I/O from N reads to at most N reads (often fewer if keys share blocks).
1072    /// Uses bloom filter to skip keys that definitely don't exist.
1073    pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
1074        if keys.is_empty() {
1075            return Ok(Vec::new());
1076        }
1077
1078        // Map each key to its block index
1079        let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
1080        for (key_idx, key) in keys.iter().enumerate() {
1081            // Check bloom filter first
1082            if let Some(ref bloom) = self.bloom_filter
1083                && !bloom.may_contain(key)
1084            {
1085                key_to_block.push((key_idx, usize::MAX)); // Definitely not present
1086                continue;
1087            }
1088
1089            match self.block_index.locate(key) {
1090                Some(block_idx) => key_to_block.push((key_idx, block_idx)),
1091                None => key_to_block.push((key_idx, usize::MAX)), // Mark as not found
1092            }
1093        }
1094
1095        // Group keys by block
1096        let mut blocks_to_load: Vec<usize> = key_to_block
1097            .iter()
1098            .filter(|(_, b)| *b != usize::MAX)
1099            .map(|(_, b)| *b)
1100            .collect();
1101        blocks_to_load.sort_unstable();
1102        blocks_to_load.dedup();
1103
1104        // Load all needed blocks (this is where I/O happens)
1105        for &block_idx in &blocks_to_load {
1106            let _ = self.load_block(block_idx).await?;
1107        }
1108
1109        // Now search each key in its block (all blocks are cached)
1110        let mut results = vec![None; keys.len()];
1111        for (key_idx, block_idx) in key_to_block {
1112            if block_idx == usize::MAX {
1113                continue;
1114            }
1115            let block_data = self.load_block(block_idx).await?; // Will hit cache
1116            results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
1117        }
1118
1119        Ok(results)
1120    }
1121
1122    /// Preload all data blocks into memory
1123    ///
1124    /// Call this after open() to eliminate all I/O during subsequent lookups.
1125    /// Useful when the SSTable is small enough to fit in memory.
1126    pub async fn preload_all_blocks(&self) -> io::Result<()> {
1127        for block_idx in 0..self.block_index.len() {
1128            self.load_block(block_idx).await?;
1129        }
1130        Ok(())
1131    }
1132
1133    /// Load a block (checks cache first, then loads from FileSlice)
1134    /// Uses dictionary decompression if dictionary is present
1135    async fn load_block(&self, block_idx: usize) -> io::Result<Arc<Vec<u8>>> {
1136        let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
1137            io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
1138        })?;
1139
1140        // Check cache first
1141        {
1142            let mut cache = self.cache.write();
1143            if let Some(block) = cache.get(addr.offset) {
1144                log::debug!("SSTable::load_block idx={} CACHE HIT", block_idx);
1145                return Ok(block);
1146            }
1147        }
1148
1149        log::debug!(
1150            "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
1151            block_idx,
1152            addr.offset,
1153            addr.offset + addr.length as u64
1154        );
1155
1156        // Load from FileSlice
1157        let range = addr.byte_range();
1158        let compressed = self.data_slice.read_bytes_range(range).await?;
1159
1160        // Decompress with dictionary if available
1161        let decompressed = if let Some(ref dict) = self.dictionary {
1162            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1163        } else {
1164            crate::compression::decompress(compressed.as_slice())?
1165        };
1166
1167        let block = Arc::new(decompressed);
1168
1169        // Insert into cache
1170        {
1171            let mut cache = self.cache.write();
1172            cache.insert(addr.offset, Arc::clone(&block));
1173        }
1174
1175        Ok(block)
1176    }
1177
1178    fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
1179        let mut reader = block_data;
1180        let mut current_key = Vec::new();
1181
1182        while !reader.is_empty() {
1183            let common_prefix_len = read_vint(&mut reader)? as usize;
1184            let suffix_len = read_vint(&mut reader)? as usize;
1185
1186            current_key.truncate(common_prefix_len);
1187            let mut suffix = vec![0u8; suffix_len];
1188            reader.read_exact(&mut suffix)?;
1189            current_key.extend_from_slice(&suffix);
1190
1191            let value = V::deserialize(&mut reader)?;
1192
1193            match current_key.as_slice().cmp(target_key) {
1194                std::cmp::Ordering::Equal => return Ok(Some(value)),
1195                std::cmp::Ordering::Greater => return Ok(None),
1196                std::cmp::Ordering::Less => continue,
1197            }
1198        }
1199
1200        Ok(None)
1201    }
1202
1203    /// Prefetch blocks for a key range
1204    pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
1205        let start_block = self.block_index.locate(start_key).unwrap_or(0);
1206        let end_block = self
1207            .block_index
1208            .locate(end_key)
1209            .unwrap_or(self.block_index.len().saturating_sub(1));
1210
1211        for block_idx in start_block..=end_block.min(self.block_index.len().saturating_sub(1)) {
1212            let _ = self.load_block(block_idx).await?;
1213        }
1214
1215        Ok(())
1216    }
1217
1218    /// Iterate over all entries (loads blocks as needed)
1219    pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
1220        AsyncSSTableIterator::new(self)
1221    }
1222
1223    /// Get all entries as a vector (for merging)
1224    pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
1225        let mut results = Vec::new();
1226
1227        for block_idx in 0..self.block_index.len() {
1228            let block_data = self.load_block(block_idx).await?;
1229            let mut reader = block_data.as_slice();
1230            let mut current_key = Vec::new();
1231
1232            while !reader.is_empty() {
1233                let common_prefix_len = read_vint(&mut reader)? as usize;
1234                let suffix_len = read_vint(&mut reader)? as usize;
1235
1236                current_key.truncate(common_prefix_len);
1237                let mut suffix = vec![0u8; suffix_len];
1238                reader.read_exact(&mut suffix)?;
1239                current_key.extend_from_slice(&suffix);
1240
1241                let value = V::deserialize(&mut reader)?;
1242                results.push((current_key.clone(), value));
1243            }
1244        }
1245
1246        Ok(results)
1247    }
1248}
1249
1250/// Async iterator over SSTable entries
1251pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
1252    reader: &'a AsyncSSTableReader<V>,
1253    current_block: usize,
1254    block_data: Option<Arc<Vec<u8>>>,
1255    block_offset: usize,
1256    current_key: Vec<u8>,
1257    finished: bool,
1258}
1259
1260impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
1261    fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
1262        Self {
1263            reader,
1264            current_block: 0,
1265            block_data: None,
1266            block_offset: 0,
1267            current_key: Vec::new(),
1268            finished: reader.block_index.is_empty(),
1269        }
1270    }
1271
1272    async fn load_next_block(&mut self) -> io::Result<bool> {
1273        if self.current_block >= self.reader.block_index.len() {
1274            self.finished = true;
1275            return Ok(false);
1276        }
1277
1278        self.block_data = Some(self.reader.load_block(self.current_block).await?);
1279        self.block_offset = 0;
1280        self.current_key.clear();
1281        self.current_block += 1;
1282        Ok(true)
1283    }
1284
1285    /// Advance to next entry (async)
1286    pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
1287        if self.finished {
1288            return Ok(None);
1289        }
1290
1291        if self.block_data.is_none() && !self.load_next_block().await? {
1292            return Ok(None);
1293        }
1294
1295        loop {
1296            let block = self.block_data.as_ref().unwrap();
1297            if self.block_offset >= block.len() {
1298                if !self.load_next_block().await? {
1299                    return Ok(None);
1300                }
1301                continue;
1302            }
1303
1304            let mut reader = &block[self.block_offset..];
1305            let start_len = reader.len();
1306
1307            let common_prefix_len = read_vint(&mut reader)? as usize;
1308            let suffix_len = read_vint(&mut reader)? as usize;
1309
1310            self.current_key.truncate(common_prefix_len);
1311            let mut suffix = vec![0u8; suffix_len];
1312            reader.read_exact(&mut suffix)?;
1313            self.current_key.extend_from_slice(&suffix);
1314
1315            let value = V::deserialize(&mut reader)?;
1316
1317            self.block_offset += start_len - reader.len();
1318
1319            return Ok(Some((self.current_key.clone(), value)));
1320        }
1321    }
1322}
1323
1324#[cfg(test)]
1325mod tests {
1326    use super::*;
1327
1328    #[test]
1329    fn test_bloom_filter_basic() {
1330        let mut bloom = BloomFilter::new(100, 10);
1331
1332        bloom.insert(b"hello");
1333        bloom.insert(b"world");
1334        bloom.insert(b"test");
1335
1336        assert!(bloom.may_contain(b"hello"));
1337        assert!(bloom.may_contain(b"world"));
1338        assert!(bloom.may_contain(b"test"));
1339
1340        // These should likely return false (with ~1% false positive rate)
1341        assert!(!bloom.may_contain(b"notfound"));
1342        assert!(!bloom.may_contain(b"missing"));
1343    }
1344
1345    #[test]
1346    fn test_bloom_filter_serialization() {
1347        let mut bloom = BloomFilter::new(100, 10);
1348        bloom.insert(b"key1");
1349        bloom.insert(b"key2");
1350
1351        let bytes = bloom.to_bytes();
1352        let restored = BloomFilter::from_bytes(&bytes).unwrap();
1353
1354        assert!(restored.may_contain(b"key1"));
1355        assert!(restored.may_contain(b"key2"));
1356        assert!(!restored.may_contain(b"key3"));
1357    }
1358
1359    #[test]
1360    fn test_bloom_filter_false_positive_rate() {
1361        let num_keys = 10000;
1362        let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
1363
1364        // Insert keys
1365        for i in 0..num_keys {
1366            let key = format!("key_{}", i);
1367            bloom.insert(key.as_bytes());
1368        }
1369
1370        // All inserted keys should be found
1371        for i in 0..num_keys {
1372            let key = format!("key_{}", i);
1373            assert!(bloom.may_contain(key.as_bytes()));
1374        }
1375
1376        // Check false positive rate on non-existent keys
1377        let mut false_positives = 0;
1378        let test_count = 10000;
1379        for i in 0..test_count {
1380            let key = format!("nonexistent_{}", i);
1381            if bloom.may_contain(key.as_bytes()) {
1382                false_positives += 1;
1383            }
1384        }
1385
1386        // With 10 bits per key, expect ~1% false positive rate
1387        // Allow up to 3% due to hash function variance
1388        let fp_rate = false_positives as f64 / test_count as f64;
1389        assert!(
1390            fp_rate < 0.03,
1391            "False positive rate {} is too high",
1392            fp_rate
1393        );
1394    }
1395
1396    #[test]
1397    fn test_sstable_writer_config() {
1398        use crate::structures::IndexOptimization;
1399
1400        // Default = Adaptive
1401        let config = SSTableWriterConfig::default();
1402        assert_eq!(config.compression_level.0, 9); // BETTER
1403        assert!(!config.use_bloom_filter);
1404        assert!(!config.use_dictionary);
1405
1406        // Adaptive
1407        let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
1408        assert_eq!(adaptive.compression_level.0, 9);
1409        assert!(!adaptive.use_bloom_filter);
1410        assert!(!adaptive.use_dictionary);
1411
1412        // SizeOptimized
1413        let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
1414        assert_eq!(size.compression_level.0, 22); // MAX
1415        assert!(size.use_bloom_filter);
1416        assert!(size.use_dictionary);
1417
1418        // PerformanceOptimized
1419        let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
1420        assert_eq!(perf.compression_level.0, 1); // FAST
1421        assert!(perf.use_bloom_filter); // Bloom helps skip blocks fast
1422        assert!(!perf.use_dictionary);
1423
1424        // Aliases
1425        let fast = SSTableWriterConfig::fast();
1426        assert_eq!(fast.compression_level.0, 1);
1427
1428        let max = SSTableWriterConfig::max_compression();
1429        assert_eq!(max.compression_level.0, 22);
1430    }
1431
1432    #[test]
1433    fn test_vint_roundtrip() {
1434        let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
1435
1436        for &val in &test_values {
1437            let mut buf = Vec::new();
1438            write_vint(&mut buf, val).unwrap();
1439            let mut reader = buf.as_slice();
1440            let decoded = read_vint(&mut reader).unwrap();
1441            assert_eq!(val, decoded, "Failed for value {}", val);
1442        }
1443    }
1444
1445    #[test]
1446    fn test_common_prefix_len() {
1447        assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
1448        assert_eq!(common_prefix_len(b"hello", b"help"), 3);
1449        assert_eq!(common_prefix_len(b"hello", b"world"), 0);
1450        assert_eq!(common_prefix_len(b"", b"hello"), 0);
1451        assert_eq!(common_prefix_len(b"hello", b""), 0);
1452    }
1453}