Skip to main content

hermes_core/structures/
sstable.rs

1//! Async SSTable with lazy loading via FileSlice
2//!
3//! Only loads the index into memory, blocks are loaded on-demand.
4//!
5//! ## Optimizations
6//!
7//! 1. **Sparse Index**: Samples every Nth block's first key for fast range narrowing,
8//!    reducing bisect reads from O(log N) to O(log(N/SPARSE_INDEX_INTERVAL)) ≈ 1-2 reads.
9//!
10//! 2. **Dictionary Compression**: Trains a Zstd dictionary from block samples for
11//!    15-30% better compression on similar data patterns.
12//!
13//! 3. **Configurable Compression Level**: Supports levels 1-22 for space/speed tradeoff.
14//!
15//! 4. **Block Index Prefix Compression**: Applies prefix compression to block index
16//!    keys for 5-10% index size reduction.
17//!
18//! 5. **Bloom Filter**: Per-SSTable bloom filter to skip blocks that definitely
19//!    don't contain a key, reducing unnecessary I/O.
20
21use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
22use parking_lot::RwLock;
23use rustc_hash::FxHashMap;
24use std::io::{self, Read, Write};
25use std::sync::Arc;
26
27use crate::compression::{CompressionDict, CompressionLevel};
28use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
29
30/// SSTable magic number - version 3 with optimizations
31pub const SSTABLE_MAGIC: u32 = 0x53544233; // "STB3"
32
33/// Block size for SSTable (16KB default)
34pub const BLOCK_SIZE: usize = 16 * 1024;
35
36/// Sparse index sampling interval - sample every Nth block
37/// The block index is already fully in memory; sparse index provides
38/// an additional level for very large SSTables.
39pub const SPARSE_INDEX_INTERVAL: usize = 16;
40
41/// Default dictionary size (64KB)
42pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
43
44/// Bloom filter bits per key (10 bits ≈ 1% false positive rate)
45pub const BLOOM_BITS_PER_KEY: usize = 10;
46
47/// Bloom filter hash count (optimal for 10 bits/key)
48pub const BLOOM_HASH_COUNT: usize = 7;
49
50// ============================================================================
51// Bloom Filter Implementation
52// ============================================================================
53
54/// Simple bloom filter for key existence checks
55#[derive(Debug, Clone)]
56pub struct BloomFilter {
57    bits: Vec<u64>,
58    num_bits: usize,
59    num_hashes: usize,
60}
61
62impl BloomFilter {
63    /// Create a new bloom filter sized for expected number of keys
64    pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
65        let num_bits = (expected_keys * bits_per_key).max(64);
66        let num_words = num_bits.div_ceil(64);
67        Self {
68            bits: vec![0u64; num_words],
69            num_bits,
70            num_hashes: BLOOM_HASH_COUNT,
71        }
72    }
73
74    /// Create from serialized bytes
75    pub fn from_bytes(data: &[u8]) -> io::Result<Self> {
76        if data.len() < 12 {
77            return Err(io::Error::new(
78                io::ErrorKind::InvalidData,
79                "Bloom filter data too short",
80            ));
81        }
82        let mut reader = data;
83        let num_bits = reader.read_u32::<LittleEndian>()? as usize;
84        let num_hashes = reader.read_u32::<LittleEndian>()? as usize;
85        let num_words = reader.read_u32::<LittleEndian>()? as usize;
86
87        if reader.len() < num_words * 8 {
88            return Err(io::Error::new(
89                io::ErrorKind::InvalidData,
90                "Bloom filter data truncated",
91            ));
92        }
93
94        let mut bits = Vec::with_capacity(num_words);
95        for _ in 0..num_words {
96            bits.push(reader.read_u64::<LittleEndian>()?);
97        }
98
99        Ok(Self {
100            bits,
101            num_bits,
102            num_hashes,
103        })
104    }
105
106    /// Serialize to bytes
107    pub fn to_bytes(&self) -> Vec<u8> {
108        let mut data = Vec::with_capacity(12 + self.bits.len() * 8);
109        data.write_u32::<LittleEndian>(self.num_bits as u32)
110            .unwrap();
111        data.write_u32::<LittleEndian>(self.num_hashes as u32)
112            .unwrap();
113        data.write_u32::<LittleEndian>(self.bits.len() as u32)
114            .unwrap();
115        for &word in &self.bits {
116            data.write_u64::<LittleEndian>(word).unwrap();
117        }
118        data
119    }
120
121    /// Add a key to the filter
122    pub fn insert(&mut self, key: &[u8]) {
123        let (h1, h2) = self.hash_pair(key);
124        for i in 0..self.num_hashes {
125            let bit_pos = self.get_bit_pos(h1, h2, i);
126            let word_idx = bit_pos / 64;
127            let bit_idx = bit_pos % 64;
128            if word_idx < self.bits.len() {
129                self.bits[word_idx] |= 1u64 << bit_idx;
130            }
131        }
132    }
133
134    /// Check if a key might be in the filter
135    /// Returns false if definitely not present, true if possibly present
136    pub fn may_contain(&self, key: &[u8]) -> bool {
137        let (h1, h2) = self.hash_pair(key);
138        for i in 0..self.num_hashes {
139            let bit_pos = self.get_bit_pos(h1, h2, i);
140            let word_idx = bit_pos / 64;
141            let bit_idx = bit_pos % 64;
142            if word_idx >= self.bits.len() || (self.bits[word_idx] & (1u64 << bit_idx)) == 0 {
143                return false;
144            }
145        }
146        true
147    }
148
149    /// Size in bytes
150    pub fn size_bytes(&self) -> usize {
151        12 + self.bits.len() * 8
152    }
153
154    /// Compute two hash values using FNV-1a variant
155    fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
156        // FNV-1a hash
157        let mut h1: u64 = 0xcbf29ce484222325;
158        for &byte in key {
159            h1 ^= byte as u64;
160            h1 = h1.wrapping_mul(0x100000001b3);
161        }
162
163        // Second hash using different seed
164        let mut h2: u64 = 0x84222325cbf29ce4;
165        for &byte in key {
166            h2 = h2.wrapping_mul(0x100000001b3);
167            h2 ^= byte as u64;
168        }
169
170        (h1, h2)
171    }
172
173    /// Get bit position for hash iteration i using double hashing
174    #[inline]
175    fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
176        (h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
177    }
178}
179
180/// SSTable value trait
181pub trait SSTableValue: Clone + Send + Sync {
182    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
183    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
184}
185
186/// u64 value implementation
187impl SSTableValue for u64 {
188    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
189        write_vint(writer, *self)
190    }
191
192    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
193        read_vint(reader)
194    }
195}
196
197/// Vec<u8> value implementation
198impl SSTableValue for Vec<u8> {
199    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
200        write_vint(writer, self.len() as u64)?;
201        writer.write_all(self)
202    }
203
204    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
205        let len = read_vint(reader)? as usize;
206        let mut data = vec![0u8; len];
207        reader.read_exact(&mut data)?;
208        Ok(data)
209    }
210}
211
212/// Maximum number of postings that can be inlined in TermInfo
213pub const MAX_INLINE_POSTINGS: usize = 3;
214
215/// Term info for posting list references
216///
217/// Supports two modes:
218/// - **Inline**: Small posting lists (1-3 docs) stored directly in TermInfo
219/// - **External**: Larger posting lists stored in separate .post file
220///
221/// This eliminates a separate I/O read for rare/unique terms.
222#[derive(Debug, Clone, PartialEq, Eq)]
223pub enum TermInfo {
224    /// Small posting list inlined directly (up to MAX_INLINE_POSTINGS entries)
225    /// Each entry is (doc_id, term_freq) delta-encoded
226    Inline {
227        /// Number of postings (1-3)
228        doc_freq: u8,
229        /// Inline data: delta-encoded (doc_id, term_freq) pairs
230        /// Format: [delta_doc_id, term_freq, delta_doc_id, term_freq, ...]
231        data: [u8; 16],
232        /// Actual length of data used
233        data_len: u8,
234    },
235    /// Reference to external posting list in .post file
236    External {
237        posting_offset: u64,
238        posting_len: u32,
239        doc_freq: u32,
240        /// Position data offset (0 if no positions)
241        position_offset: u64,
242        /// Position data length (0 if no positions)
243        position_len: u32,
244    },
245}
246
247impl TermInfo {
248    /// Create an external reference
249    pub fn external(posting_offset: u64, posting_len: u32, doc_freq: u32) -> Self {
250        TermInfo::External {
251            posting_offset,
252            posting_len,
253            doc_freq,
254            position_offset: 0,
255            position_len: 0,
256        }
257    }
258
259    /// Create an external reference with position info
260    pub fn external_with_positions(
261        posting_offset: u64,
262        posting_len: u32,
263        doc_freq: u32,
264        position_offset: u64,
265        position_len: u32,
266    ) -> Self {
267        TermInfo::External {
268            posting_offset,
269            posting_len,
270            doc_freq,
271            position_offset,
272            position_len,
273        }
274    }
275
276    /// Try to create an inline TermInfo from posting data
277    /// Returns None if posting list is too large to inline
278    pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
279        if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
280            return None;
281        }
282
283        let mut data = [0u8; 16];
284        let mut cursor = std::io::Cursor::new(&mut data[..]);
285        let mut prev_doc_id = 0u32;
286
287        for (i, &doc_id) in doc_ids.iter().enumerate() {
288            let delta = doc_id - prev_doc_id;
289            if write_vint(&mut cursor, delta as u64).is_err() {
290                return None;
291            }
292            if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
293                return None;
294            }
295            prev_doc_id = doc_id;
296        }
297
298        let data_len = cursor.position() as u8;
299        if data_len > 16 {
300            return None;
301        }
302
303        Some(TermInfo::Inline {
304            doc_freq: doc_ids.len() as u8,
305            data,
306            data_len,
307        })
308    }
309
310    /// Get document frequency
311    pub fn doc_freq(&self) -> u32 {
312        match self {
313            TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
314            TermInfo::External { doc_freq, .. } => *doc_freq,
315        }
316    }
317
318    /// Check if this is an inline posting list
319    pub fn is_inline(&self) -> bool {
320        matches!(self, TermInfo::Inline { .. })
321    }
322
323    /// Get external posting info (offset, len) - returns None for inline
324    pub fn external_info(&self) -> Option<(u64, u32)> {
325        match self {
326            TermInfo::External {
327                posting_offset,
328                posting_len,
329                ..
330            } => Some((*posting_offset, *posting_len)),
331            TermInfo::Inline { .. } => None,
332        }
333    }
334
335    /// Get position info (offset, len) - returns None for inline or if no positions
336    pub fn position_info(&self) -> Option<(u64, u32)> {
337        match self {
338            TermInfo::External {
339                position_offset,
340                position_len,
341                ..
342            } if *position_len > 0 => Some((*position_offset, *position_len)),
343            _ => None,
344        }
345    }
346
347    /// Decode inline postings into (doc_ids, term_freqs)
348    /// Returns None if this is an external reference
349    pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
350        match self {
351            TermInfo::Inline {
352                doc_freq,
353                data,
354                data_len,
355            } => {
356                let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
357                let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
358                let mut reader = &data[..*data_len as usize];
359                let mut prev_doc_id = 0u32;
360
361                for _ in 0..*doc_freq {
362                    let delta = read_vint(&mut reader).ok()? as u32;
363                    let tf = read_vint(&mut reader).ok()? as u32;
364                    let doc_id = prev_doc_id + delta;
365                    doc_ids.push(doc_id);
366                    term_freqs.push(tf);
367                    prev_doc_id = doc_id;
368                }
369
370                Some((doc_ids, term_freqs))
371            }
372            TermInfo::External { .. } => None,
373        }
374    }
375}
376
377impl SSTableValue for TermInfo {
378    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
379        match self {
380            TermInfo::Inline {
381                doc_freq,
382                data,
383                data_len,
384            } => {
385                // Tag byte 0xFF = inline marker
386                writer.write_u8(0xFF)?;
387                writer.write_u8(*doc_freq)?;
388                writer.write_u8(*data_len)?;
389                writer.write_all(&data[..*data_len as usize])?;
390            }
391            TermInfo::External {
392                posting_offset,
393                posting_len,
394                doc_freq,
395                position_offset,
396                position_len,
397            } => {
398                // Tag byte 0x00 = external marker (no positions)
399                // Tag byte 0x01 = external with positions
400                if *position_len > 0 {
401                    writer.write_u8(0x01)?;
402                    write_vint(writer, *doc_freq as u64)?;
403                    write_vint(writer, *posting_offset)?;
404                    write_vint(writer, *posting_len as u64)?;
405                    write_vint(writer, *position_offset)?;
406                    write_vint(writer, *position_len as u64)?;
407                } else {
408                    writer.write_u8(0x00)?;
409                    write_vint(writer, *doc_freq as u64)?;
410                    write_vint(writer, *posting_offset)?;
411                    write_vint(writer, *posting_len as u64)?;
412                }
413            }
414        }
415        Ok(())
416    }
417
418    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
419        let tag = reader.read_u8()?;
420
421        if tag == 0xFF {
422            // Inline
423            let doc_freq = reader.read_u8()?;
424            let data_len = reader.read_u8()?;
425            let mut data = [0u8; 16];
426            reader.read_exact(&mut data[..data_len as usize])?;
427            Ok(TermInfo::Inline {
428                doc_freq,
429                data,
430                data_len,
431            })
432        } else if tag == 0x00 {
433            // External (no positions)
434            let doc_freq = read_vint(reader)? as u32;
435            let posting_offset = read_vint(reader)?;
436            let posting_len = read_vint(reader)? as u32;
437            Ok(TermInfo::External {
438                posting_offset,
439                posting_len,
440                doc_freq,
441                position_offset: 0,
442                position_len: 0,
443            })
444        } else if tag == 0x01 {
445            // External with positions
446            let doc_freq = read_vint(reader)? as u32;
447            let posting_offset = read_vint(reader)?;
448            let posting_len = read_vint(reader)? as u32;
449            let position_offset = read_vint(reader)?;
450            let position_len = read_vint(reader)? as u32;
451            Ok(TermInfo::External {
452                posting_offset,
453                posting_len,
454                doc_freq,
455                position_offset,
456                position_len,
457            })
458        } else {
459            Err(io::Error::new(
460                io::ErrorKind::InvalidData,
461                format!("Invalid TermInfo tag: {}", tag),
462            ))
463        }
464    }
465}
466
467/// Write variable-length integer
468pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
469    loop {
470        let byte = (value & 0x7F) as u8;
471        value >>= 7;
472        if value == 0 {
473            writer.write_u8(byte)?;
474            return Ok(());
475        } else {
476            writer.write_u8(byte | 0x80)?;
477        }
478    }
479}
480
481/// Read variable-length integer
482pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
483    let mut result = 0u64;
484    let mut shift = 0;
485
486    loop {
487        let byte = reader.read_u8()?;
488        result |= ((byte & 0x7F) as u64) << shift;
489        if byte & 0x80 == 0 {
490            return Ok(result);
491        }
492        shift += 7;
493        if shift >= 64 {
494            return Err(io::Error::new(
495                io::ErrorKind::InvalidData,
496                "varint too long",
497            ));
498        }
499    }
500}
501
502/// Compute common prefix length
503pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
504    a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
505}
506
507/// Sparse index entry - samples every Nth block for fast range narrowing
508#[derive(Debug, Clone)]
509struct SparseIndexEntry {
510    /// First key of the sampled block
511    first_key: Vec<u8>,
512    /// Index into the full block index
513    block_idx: u32,
514}
515
516/// SSTable statistics for debugging
517#[derive(Debug, Clone)]
518pub struct SSTableStats {
519    pub num_blocks: usize,
520    pub num_sparse_entries: usize,
521    pub num_entries: u64,
522    pub has_bloom_filter: bool,
523    pub has_dictionary: bool,
524    pub bloom_filter_size: usize,
525    pub dictionary_size: usize,
526}
527
528/// SSTable writer configuration
529#[derive(Debug, Clone)]
530pub struct SSTableWriterConfig {
531    /// Compression level (1-22, higher = better compression but slower)
532    pub compression_level: CompressionLevel,
533    /// Whether to train and use a dictionary for compression
534    pub use_dictionary: bool,
535    /// Dictionary size in bytes (default 64KB)
536    pub dict_size: usize,
537    /// Whether to build a bloom filter
538    pub use_bloom_filter: bool,
539    /// Bloom filter bits per key (default 10 = ~1% false positive rate)
540    pub bloom_bits_per_key: usize,
541}
542
543impl Default for SSTableWriterConfig {
544    fn default() -> Self {
545        Self::from_optimization(crate::structures::IndexOptimization::default())
546    }
547}
548
549impl SSTableWriterConfig {
550    /// Create config from IndexOptimization mode
551    pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
552        use crate::structures::IndexOptimization;
553        match optimization {
554            IndexOptimization::Adaptive => Self {
555                compression_level: CompressionLevel::BETTER, // Level 9
556                use_dictionary: false,
557                dict_size: DEFAULT_DICT_SIZE,
558                use_bloom_filter: false,
559                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
560            },
561            IndexOptimization::SizeOptimized => Self {
562                compression_level: CompressionLevel::MAX, // Level 22
563                use_dictionary: true,
564                dict_size: DEFAULT_DICT_SIZE,
565                use_bloom_filter: true,
566                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
567            },
568            IndexOptimization::PerformanceOptimized => Self {
569                compression_level: CompressionLevel::FAST, // Level 1
570                use_dictionary: false,
571                dict_size: DEFAULT_DICT_SIZE,
572                use_bloom_filter: true, // Bloom helps skip blocks fast
573                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
574            },
575        }
576    }
577
578    /// Fast configuration - prioritize write speed over compression
579    pub fn fast() -> Self {
580        Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
581    }
582
583    /// Maximum compression configuration - prioritize size over speed
584    pub fn max_compression() -> Self {
585        Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
586    }
587}
588
589/// SSTable writer with optimizations:
590/// - Dictionary compression for blocks (if dictionary provided)
591/// - Configurable compression level
592/// - Block index prefix compression
593/// - Bloom filter for fast negative lookups
594pub struct SSTableWriter<'a, V: SSTableValue> {
595    writer: &'a mut dyn Write,
596    block_buffer: Vec<u8>,
597    prev_key: Vec<u8>,
598    index: Vec<BlockIndexEntry>,
599    current_offset: u64,
600    num_entries: u64,
601    block_first_key: Option<Vec<u8>>,
602    config: SSTableWriterConfig,
603    /// Pre-trained dictionary for compression (optional)
604    dictionary: Option<CompressionDict>,
605    /// All keys for bloom filter
606    all_keys: Vec<Vec<u8>>,
607    /// Bloom filter (built at finish time)
608    bloom_filter: Option<BloomFilter>,
609    _phantom: std::marker::PhantomData<V>,
610}
611
612impl<'a, V: SSTableValue> SSTableWriter<'a, V> {
613    /// Create a new SSTable writer with default configuration
614    pub fn new(writer: &'a mut dyn Write) -> Self {
615        Self::with_config(writer, SSTableWriterConfig::default())
616    }
617
618    /// Create a new SSTable writer with custom configuration
619    pub fn with_config(writer: &'a mut dyn Write, config: SSTableWriterConfig) -> Self {
620        Self {
621            writer,
622            block_buffer: Vec::with_capacity(BLOCK_SIZE),
623            prev_key: Vec::new(),
624            index: Vec::new(),
625            current_offset: 0,
626            num_entries: 0,
627            block_first_key: None,
628            config,
629            dictionary: None,
630            all_keys: Vec::new(),
631            bloom_filter: None,
632            _phantom: std::marker::PhantomData,
633        }
634    }
635
636    /// Create a new SSTable writer with a pre-trained dictionary
637    pub fn with_dictionary(
638        writer: &'a mut dyn Write,
639        config: SSTableWriterConfig,
640        dictionary: CompressionDict,
641    ) -> Self {
642        Self {
643            writer,
644            block_buffer: Vec::with_capacity(BLOCK_SIZE),
645            prev_key: Vec::new(),
646            index: Vec::new(),
647            current_offset: 0,
648            num_entries: 0,
649            block_first_key: None,
650            config,
651            dictionary: Some(dictionary),
652            all_keys: Vec::new(),
653            bloom_filter: None,
654            _phantom: std::marker::PhantomData,
655        }
656    }
657
658    pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
659        if self.block_first_key.is_none() {
660            self.block_first_key = Some(key.to_vec());
661        }
662
663        // Collect keys for bloom filter
664        if self.config.use_bloom_filter {
665            self.all_keys.push(key.to_vec());
666        }
667
668        let prefix_len = common_prefix_len(&self.prev_key, key);
669        let suffix = &key[prefix_len..];
670
671        write_vint(&mut self.block_buffer, prefix_len as u64)?;
672        write_vint(&mut self.block_buffer, suffix.len() as u64)?;
673        self.block_buffer.extend_from_slice(suffix);
674        value.serialize(&mut self.block_buffer)?;
675
676        self.prev_key.clear();
677        self.prev_key.extend_from_slice(key);
678        self.num_entries += 1;
679
680        if self.block_buffer.len() >= BLOCK_SIZE {
681            self.flush_block()?;
682        }
683
684        Ok(())
685    }
686
687    /// Flush and compress the current block
688    fn flush_block(&mut self) -> io::Result<()> {
689        if self.block_buffer.is_empty() {
690            return Ok(());
691        }
692
693        // Compress block with dictionary if available
694        let compressed = if let Some(ref dict) = self.dictionary {
695            crate::compression::compress_with_dict(
696                &self.block_buffer,
697                self.config.compression_level,
698                dict,
699            )?
700        } else {
701            crate::compression::compress(&self.block_buffer, self.config.compression_level)?
702        };
703
704        if let Some(first_key) = self.block_first_key.take() {
705            self.index.push(BlockIndexEntry {
706                first_key,
707                offset: self.current_offset,
708                length: compressed.len() as u32,
709            });
710        }
711
712        self.writer.write_all(&compressed)?;
713        self.current_offset += compressed.len() as u64;
714        self.block_buffer.clear();
715        self.prev_key.clear();
716
717        Ok(())
718    }
719
720    pub fn finish(mut self) -> io::Result<()> {
721        // Flush any remaining data
722        self.flush_block()?;
723
724        // Build bloom filter from collected keys
725        if self.config.use_bloom_filter && !self.all_keys.is_empty() {
726            let mut bloom = BloomFilter::new(self.all_keys.len(), self.config.bloom_bits_per_key);
727            for key in &self.all_keys {
728                bloom.insert(key);
729            }
730            self.bloom_filter = Some(bloom);
731        }
732
733        let data_end_offset = self.current_offset;
734
735        // Build sparse index - sample every SPARSE_INDEX_INTERVAL blocks
736        let sparse_index: Vec<SparseIndexEntry> = self
737            .index
738            .iter()
739            .enumerate()
740            .filter(|(i, _)| *i % SPARSE_INDEX_INTERVAL == 0)
741            .map(|(i, entry)| SparseIndexEntry {
742                first_key: entry.first_key.clone(),
743                block_idx: i as u32,
744            })
745            .collect();
746
747        // Write block index with prefix compression
748        let index_clone = self.index.clone();
749        self.write_block_index_compressed(&index_clone)?;
750
751        // Write sparse index
752        self.writer
753            .write_u32::<LittleEndian>(sparse_index.len() as u32)?;
754        for entry in &sparse_index {
755            self.writer
756                .write_u16::<LittleEndian>(entry.first_key.len() as u16)?;
757            self.writer.write_all(&entry.first_key)?;
758            self.writer.write_u32::<LittleEndian>(entry.block_idx)?;
759        }
760
761        // Write bloom filter if present
762        let bloom_offset = if let Some(ref bloom) = self.bloom_filter {
763            let bloom_data = bloom.to_bytes();
764            let offset = self.current_offset;
765            self.writer.write_all(&bloom_data)?;
766            self.current_offset += bloom_data.len() as u64;
767            offset
768        } else {
769            0
770        };
771
772        // Write dictionary if present
773        let dict_offset = if let Some(ref dict) = self.dictionary {
774            let dict_bytes = dict.as_bytes();
775            let offset = self.current_offset;
776            self.writer
777                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
778            self.writer.write_all(dict_bytes)?;
779            self.current_offset += 4 + dict_bytes.len() as u64;
780            offset
781        } else {
782            0
783        };
784
785        // Write extended footer (v3 format)
786        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
787        self.writer.write_u64::<LittleEndian>(self.num_entries)?;
788        self.writer.write_u64::<LittleEndian>(bloom_offset)?; // 0 if no bloom
789        self.writer.write_u64::<LittleEndian>(dict_offset)?; // 0 if no dict
790        self.writer
791            .write_u8(self.config.compression_level.0 as u8)?;
792        self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
793
794        Ok(())
795    }
796
797    /// Write block index with prefix compression for keys
798    fn write_block_index_compressed(&mut self, index: &[BlockIndexEntry]) -> io::Result<()> {
799        self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
800
801        let mut prev_key: Vec<u8> = Vec::new();
802        for entry in index {
803            // Prefix compress the key
804            let prefix_len = common_prefix_len(&prev_key, &entry.first_key);
805            let suffix = &entry.first_key[prefix_len..];
806
807            // Write: prefix_len (varint) + suffix_len (varint) + suffix + offset (varint) + length (varint)
808            write_vint(&mut *self.writer, prefix_len as u64)?;
809            write_vint(&mut *self.writer, suffix.len() as u64)?;
810            self.writer.write_all(suffix)?;
811            write_vint(&mut *self.writer, entry.offset)?;
812            write_vint(&mut *self.writer, entry.length as u64)?;
813
814            prev_key.clear();
815            prev_key.extend_from_slice(&entry.first_key);
816        }
817
818        Ok(())
819    }
820}
821
822/// Block index entry
823#[derive(Debug, Clone)]
824struct BlockIndexEntry {
825    first_key: Vec<u8>,
826    offset: u64,
827    length: u32,
828}
829
830/// Async SSTable reader - loads blocks on demand via LazyFileSlice
831pub struct AsyncSSTableReader<V: SSTableValue> {
832    /// LazyFileSlice for the data portion (blocks only) - fetches ranges on demand
833    data_slice: LazyFileSlice,
834    /// Block index (loaded into memory - small)
835    index: Vec<BlockIndexEntry>,
836    /// Sparse index for fast range narrowing (samples every Nth block)
837    sparse_index: Vec<SparseIndexEntry>,
838    num_entries: u64,
839    /// Hot cache for decompressed blocks
840    cache: RwLock<BlockCache>,
841    /// Bloom filter for fast negative lookups (optional)
842    bloom_filter: Option<BloomFilter>,
843    /// Compression dictionary (optional)
844    dictionary: Option<CompressionDict>,
845    /// Compression level used
846    #[allow(dead_code)]
847    compression_level: CompressionLevel,
848    _phantom: std::marker::PhantomData<V>,
849}
850
851/// LRU-style block cache
852struct BlockCache {
853    blocks: FxHashMap<u64, Arc<Vec<u8>>>,
854    access_order: Vec<u64>,
855    max_blocks: usize,
856}
857
858impl BlockCache {
859    fn new(max_blocks: usize) -> Self {
860        Self {
861            blocks: FxHashMap::default(),
862            access_order: Vec::new(),
863            max_blocks,
864        }
865    }
866
867    fn get(&mut self, offset: u64) -> Option<Arc<Vec<u8>>> {
868        if let Some(block) = self.blocks.get(&offset) {
869            if let Some(pos) = self.access_order.iter().position(|&o| o == offset) {
870                self.access_order.remove(pos);
871                self.access_order.push(offset);
872            }
873            Some(Arc::clone(block))
874        } else {
875            None
876        }
877    }
878
879    fn insert(&mut self, offset: u64, block: Arc<Vec<u8>>) {
880        while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
881            let evict_offset = self.access_order.remove(0);
882            self.blocks.remove(&evict_offset);
883        }
884        self.blocks.insert(offset, block);
885        self.access_order.push(offset);
886    }
887}
888
889impl<V: SSTableValue> AsyncSSTableReader<V> {
890    /// Open an SSTable from a LazyFileHandle
891    /// Only loads the footer and index into memory, data blocks fetched on-demand
892    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
893        let file_len = file_handle.len();
894        if file_len < 37 {
895            return Err(io::Error::new(
896                io::ErrorKind::InvalidData,
897                "SSTable too small",
898            ));
899        }
900
901        // Read footer (37 bytes)
902        // Format: data_end(8) + num_entries(8) + bloom_offset(8) + dict_offset(8) + compression_level(1) + magic(4)
903        let footer_bytes = file_handle
904            .read_bytes_range(file_len - 37..file_len)
905            .await?;
906
907        let mut reader = footer_bytes.as_slice();
908        let data_end_offset = reader.read_u64::<LittleEndian>()?;
909        let num_entries = reader.read_u64::<LittleEndian>()?;
910        let bloom_offset = reader.read_u64::<LittleEndian>()?;
911        let dict_offset = reader.read_u64::<LittleEndian>()?;
912        let compression_level = CompressionLevel(reader.read_u8()? as i32);
913        let magic = reader.read_u32::<LittleEndian>()?;
914
915        if magic != SSTABLE_MAGIC {
916            return Err(io::Error::new(
917                io::ErrorKind::InvalidData,
918                format!("Invalid SSTable magic: 0x{:08X}", magic),
919            ));
920        }
921
922        // Read index section
923        let index_start = data_end_offset;
924        let index_end = file_len - 37;
925        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
926        let mut reader = index_bytes.as_slice();
927
928        // Read block index (prefix-compressed)
929        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
930        let mut index = Vec::with_capacity(num_blocks);
931        let mut prev_key: Vec<u8> = Vec::new();
932
933        for _ in 0..num_blocks {
934            let prefix_len = read_vint(&mut reader)? as usize;
935            let suffix_len = read_vint(&mut reader)? as usize;
936            let mut suffix = vec![0u8; suffix_len];
937            reader.read_exact(&mut suffix)?;
938
939            // Reconstruct full key
940            let mut first_key = prev_key[..prefix_len.min(prev_key.len())].to_vec();
941            first_key.extend_from_slice(&suffix);
942
943            let offset = read_vint(&mut reader)?;
944            let length = read_vint(&mut reader)? as u32;
945
946            prev_key = first_key.clone();
947            index.push(BlockIndexEntry {
948                first_key,
949                offset,
950                length,
951            });
952        }
953
954        // Read sparse index
955        let num_sparse = reader.read_u32::<LittleEndian>()? as usize;
956        let mut sparse_index = Vec::with_capacity(num_sparse);
957
958        for _ in 0..num_sparse {
959            let key_len = reader.read_u16::<LittleEndian>()? as usize;
960            let mut first_key = vec![0u8; key_len];
961            reader.read_exact(&mut first_key)?;
962            let block_idx = reader.read_u32::<LittleEndian>()?;
963
964            sparse_index.push(SparseIndexEntry {
965                first_key,
966                block_idx,
967            });
968        }
969
970        // Load bloom filter if present
971        let bloom_filter = if bloom_offset > 0 {
972            let bloom_start = bloom_offset;
973            // Read bloom filter size first (12 bytes header)
974            let bloom_header = file_handle
975                .read_bytes_range(bloom_start..bloom_start + 12)
976                .await?;
977            let num_words = u32::from_le_bytes([
978                bloom_header[8],
979                bloom_header[9],
980                bloom_header[10],
981                bloom_header[11],
982            ]) as u64;
983            let bloom_size = 12 + num_words * 8;
984            let bloom_data = file_handle
985                .read_bytes_range(bloom_start..bloom_start + bloom_size)
986                .await?;
987            Some(BloomFilter::from_bytes(&bloom_data)?)
988        } else {
989            None
990        };
991
992        // Load dictionary if present
993        let dictionary = if dict_offset > 0 {
994            let dict_start = dict_offset;
995            // Read dictionary size first
996            let dict_len_bytes = file_handle
997                .read_bytes_range(dict_start..dict_start + 4)
998                .await?;
999            let dict_len = u32::from_le_bytes([
1000                dict_len_bytes[0],
1001                dict_len_bytes[1],
1002                dict_len_bytes[2],
1003                dict_len_bytes[3],
1004            ]) as u64;
1005            let dict_data = file_handle
1006                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
1007                .await?;
1008            Some(CompressionDict::from_bytes(dict_data.to_vec()))
1009        } else {
1010            None
1011        };
1012
1013        // Create a lazy slice for just the data portion
1014        let data_slice = file_handle.slice(0..data_end_offset);
1015
1016        Ok(Self {
1017            data_slice,
1018            index,
1019            sparse_index,
1020            num_entries,
1021            cache: RwLock::new(BlockCache::new(cache_blocks)),
1022            bloom_filter,
1023            dictionary,
1024            compression_level,
1025            _phantom: std::marker::PhantomData,
1026        })
1027    }
1028
1029    /// Number of entries
1030    pub fn num_entries(&self) -> u64 {
1031        self.num_entries
1032    }
1033
1034    /// Get stats about this SSTable for debugging
1035    pub fn stats(&self) -> SSTableStats {
1036        SSTableStats {
1037            num_blocks: self.index.len(),
1038            num_sparse_entries: self.sparse_index.len(),
1039            num_entries: self.num_entries,
1040            has_bloom_filter: self.bloom_filter.is_some(),
1041            has_dictionary: self.dictionary.is_some(),
1042            bloom_filter_size: self
1043                .bloom_filter
1044                .as_ref()
1045                .map(|b| b.size_bytes())
1046                .unwrap_or(0),
1047            dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
1048        }
1049    }
1050
1051    /// Look up a key (async - may need to load block)
1052    ///
1053    /// Uses bloom filter for fast negative lookups, then sparse index to narrow
1054    /// search range before binary search, reducing I/O to typically 1 block read.
1055    pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
1056        log::debug!(
1057            "SSTable::get called, key_len={}, total_blocks={}, sparse_entries={}",
1058            key.len(),
1059            self.index.len(),
1060            self.sparse_index.len()
1061        );
1062
1063        // Check bloom filter first - fast negative lookup
1064        if let Some(ref bloom) = self.bloom_filter
1065            && !bloom.may_contain(key)
1066        {
1067            log::debug!("SSTable::get bloom filter negative");
1068            return Ok(None);
1069        }
1070
1071        // Use sparse index to find the block range to search
1072        let (start_block, end_block) = self.find_block_range(key);
1073        log::debug!("SSTable::get sparse_range=[{}, {}]", start_block, end_block);
1074
1075        // Binary search within the narrowed range (in-memory, no I/O)
1076        let search_range = &self.index[start_block..=end_block];
1077        let block_idx =
1078            match search_range.binary_search_by(|entry| entry.first_key.as_slice().cmp(key)) {
1079                Ok(idx) => start_block + idx,
1080                Err(0) => {
1081                    if start_block == 0 {
1082                        log::debug!("SSTable::get key not found (before first block)");
1083                        return Ok(None);
1084                    }
1085                    start_block
1086                }
1087                Err(idx) => start_block + idx - 1,
1088            };
1089
1090        log::debug!("SSTable::get loading block_idx={}", block_idx);
1091
1092        // Now we know exactly which block to load - single I/O
1093        let block_data = self.load_block(block_idx).await?;
1094        self.search_block(&block_data, key)
1095    }
1096
1097    /// Batch lookup multiple keys with optimized I/O
1098    ///
1099    /// Groups keys by block and loads each block only once, reducing
1100    /// I/O from N reads to at most N reads (often fewer if keys share blocks).
1101    /// Uses bloom filter to skip keys that definitely don't exist.
1102    pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
1103        if keys.is_empty() {
1104            return Ok(Vec::new());
1105        }
1106
1107        // Map each key to its block index
1108        let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
1109        for (key_idx, key) in keys.iter().enumerate() {
1110            // Check bloom filter first
1111            if let Some(ref bloom) = self.bloom_filter
1112                && !bloom.may_contain(key)
1113            {
1114                key_to_block.push((key_idx, usize::MAX)); // Definitely not present
1115                continue;
1116            }
1117
1118            let (start_block, end_block) = self.find_block_range(key);
1119            let search_range = &self.index[start_block..=end_block];
1120            let block_idx =
1121                match search_range.binary_search_by(|entry| entry.first_key.as_slice().cmp(key)) {
1122                    Ok(idx) => start_block + idx,
1123                    Err(0) => {
1124                        if start_block == 0 {
1125                            key_to_block.push((key_idx, usize::MAX)); // Mark as not found
1126                            continue;
1127                        }
1128                        start_block
1129                    }
1130                    Err(idx) => start_block + idx - 1,
1131                };
1132            key_to_block.push((key_idx, block_idx));
1133        }
1134
1135        // Group keys by block
1136        let mut blocks_to_load: Vec<usize> = key_to_block
1137            .iter()
1138            .filter(|(_, b)| *b != usize::MAX)
1139            .map(|(_, b)| *b)
1140            .collect();
1141        blocks_to_load.sort_unstable();
1142        blocks_to_load.dedup();
1143
1144        // Load all needed blocks (this is where I/O happens)
1145        for &block_idx in &blocks_to_load {
1146            let _ = self.load_block(block_idx).await?;
1147        }
1148
1149        // Now search each key in its block (all blocks are cached)
1150        let mut results = vec![None; keys.len()];
1151        for (key_idx, block_idx) in key_to_block {
1152            if block_idx == usize::MAX {
1153                continue;
1154            }
1155            let block_data = self.load_block(block_idx).await?; // Will hit cache
1156            results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
1157        }
1158
1159        Ok(results)
1160    }
1161
1162    /// Find the block range that could contain the key using sparse index
1163    ///
1164    /// Returns (start_block_idx, end_block_idx) inclusive range
1165    fn find_block_range(&self, key: &[u8]) -> (usize, usize) {
1166        if self.sparse_index.is_empty() {
1167            return (0, self.index.len().saturating_sub(1));
1168        }
1169
1170        // Binary search in sparse index (in-memory, no I/O)
1171        let sparse_pos = match self
1172            .sparse_index
1173            .binary_search_by(|entry| entry.first_key.as_slice().cmp(key))
1174        {
1175            Ok(idx) => idx,
1176            Err(0) => 0,
1177            Err(idx) => idx - 1,
1178        };
1179
1180        let start_block = self.sparse_index[sparse_pos].block_idx as usize;
1181        let end_block = if sparse_pos + 1 < self.sparse_index.len() {
1182            // End at the next sparse entry's block (exclusive becomes inclusive -1)
1183            (self.sparse_index[sparse_pos + 1].block_idx as usize).saturating_sub(1)
1184        } else {
1185            // Last sparse entry - search to end of index
1186            self.index.len().saturating_sub(1)
1187        };
1188
1189        (start_block, end_block.max(start_block))
1190    }
1191
1192    /// Preload all data blocks into memory
1193    ///
1194    /// Call this after open() to eliminate all I/O during subsequent lookups.
1195    /// Useful when the SSTable is small enough to fit in memory.
1196    pub async fn preload_all_blocks(&self) -> io::Result<()> {
1197        for block_idx in 0..self.index.len() {
1198            self.load_block(block_idx).await?;
1199        }
1200        Ok(())
1201    }
1202
1203    /// Load a block (checks cache first, then loads from FileSlice)
1204    /// Uses dictionary decompression if dictionary is present
1205    async fn load_block(&self, block_idx: usize) -> io::Result<Arc<Vec<u8>>> {
1206        let entry = &self.index[block_idx];
1207
1208        // Check cache first
1209        {
1210            let mut cache = self.cache.write();
1211            if let Some(block) = cache.get(entry.offset) {
1212                log::debug!("SSTable::load_block idx={} CACHE HIT", block_idx);
1213                return Ok(block);
1214            }
1215        }
1216
1217        log::debug!(
1218            "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
1219            block_idx,
1220            entry.offset,
1221            entry.offset + entry.length as u64
1222        );
1223
1224        // Load from FileSlice
1225        let start = entry.offset;
1226        let end = start + entry.length as u64;
1227        let compressed = self.data_slice.read_bytes_range(start..end).await?;
1228
1229        // Decompress with dictionary if available
1230        let decompressed = if let Some(ref dict) = self.dictionary {
1231            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1232        } else {
1233            crate::compression::decompress(compressed.as_slice())?
1234        };
1235
1236        let block = Arc::new(decompressed);
1237
1238        // Insert into cache
1239        {
1240            let mut cache = self.cache.write();
1241            cache.insert(entry.offset, Arc::clone(&block));
1242        }
1243
1244        Ok(block)
1245    }
1246
1247    fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
1248        let mut reader = block_data;
1249        let mut current_key = Vec::new();
1250
1251        while !reader.is_empty() {
1252            let common_prefix_len = read_vint(&mut reader)? as usize;
1253            let suffix_len = read_vint(&mut reader)? as usize;
1254
1255            current_key.truncate(common_prefix_len);
1256            let mut suffix = vec![0u8; suffix_len];
1257            reader.read_exact(&mut suffix)?;
1258            current_key.extend_from_slice(&suffix);
1259
1260            let value = V::deserialize(&mut reader)?;
1261
1262            match current_key.as_slice().cmp(target_key) {
1263                std::cmp::Ordering::Equal => return Ok(Some(value)),
1264                std::cmp::Ordering::Greater => return Ok(None),
1265                std::cmp::Ordering::Less => continue,
1266            }
1267        }
1268
1269        Ok(None)
1270    }
1271
1272    /// Prefetch blocks for a key range
1273    pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
1274        let start_block = match self
1275            .index
1276            .binary_search_by(|e| e.first_key.as_slice().cmp(start_key))
1277        {
1278            Ok(idx) => idx,
1279            Err(0) => 0,
1280            Err(idx) => idx - 1,
1281        };
1282
1283        let end_block = match self
1284            .index
1285            .binary_search_by(|e| e.first_key.as_slice().cmp(end_key))
1286        {
1287            Ok(idx) => idx,
1288            Err(idx) if idx >= self.index.len() => self.index.len().saturating_sub(1),
1289            Err(idx) => idx,
1290        };
1291
1292        for block_idx in start_block..=end_block.min(self.index.len().saturating_sub(1)) {
1293            let _ = self.load_block(block_idx).await?;
1294        }
1295
1296        Ok(())
1297    }
1298
1299    /// Iterate over all entries (loads blocks as needed)
1300    pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
1301        AsyncSSTableIterator::new(self)
1302    }
1303
1304    /// Get all entries as a vector (for merging)
1305    pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
1306        let mut results = Vec::new();
1307
1308        for block_idx in 0..self.index.len() {
1309            let block_data = self.load_block(block_idx).await?;
1310            let mut reader = block_data.as_slice();
1311            let mut current_key = Vec::new();
1312
1313            while !reader.is_empty() {
1314                let common_prefix_len = read_vint(&mut reader)? as usize;
1315                let suffix_len = read_vint(&mut reader)? as usize;
1316
1317                current_key.truncate(common_prefix_len);
1318                let mut suffix = vec![0u8; suffix_len];
1319                reader.read_exact(&mut suffix)?;
1320                current_key.extend_from_slice(&suffix);
1321
1322                let value = V::deserialize(&mut reader)?;
1323                results.push((current_key.clone(), value));
1324            }
1325        }
1326
1327        Ok(results)
1328    }
1329}
1330
1331/// Async iterator over SSTable entries
1332pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
1333    reader: &'a AsyncSSTableReader<V>,
1334    current_block: usize,
1335    block_data: Option<Arc<Vec<u8>>>,
1336    block_offset: usize,
1337    current_key: Vec<u8>,
1338    finished: bool,
1339}
1340
1341impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
1342    fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
1343        Self {
1344            reader,
1345            current_block: 0,
1346            block_data: None,
1347            block_offset: 0,
1348            current_key: Vec::new(),
1349            finished: reader.index.is_empty(),
1350        }
1351    }
1352
1353    async fn load_next_block(&mut self) -> io::Result<bool> {
1354        if self.current_block >= self.reader.index.len() {
1355            self.finished = true;
1356            return Ok(false);
1357        }
1358
1359        self.block_data = Some(self.reader.load_block(self.current_block).await?);
1360        self.block_offset = 0;
1361        self.current_key.clear();
1362        self.current_block += 1;
1363        Ok(true)
1364    }
1365
1366    /// Advance to next entry (async)
1367    pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
1368        if self.finished {
1369            return Ok(None);
1370        }
1371
1372        if self.block_data.is_none() && !self.load_next_block().await? {
1373            return Ok(None);
1374        }
1375
1376        loop {
1377            let block = self.block_data.as_ref().unwrap();
1378            if self.block_offset >= block.len() {
1379                if !self.load_next_block().await? {
1380                    return Ok(None);
1381                }
1382                continue;
1383            }
1384
1385            let mut reader = &block[self.block_offset..];
1386            let start_len = reader.len();
1387
1388            let common_prefix_len = read_vint(&mut reader)? as usize;
1389            let suffix_len = read_vint(&mut reader)? as usize;
1390
1391            self.current_key.truncate(common_prefix_len);
1392            let mut suffix = vec![0u8; suffix_len];
1393            reader.read_exact(&mut suffix)?;
1394            self.current_key.extend_from_slice(&suffix);
1395
1396            let value = V::deserialize(&mut reader)?;
1397
1398            self.block_offset += start_len - reader.len();
1399
1400            return Ok(Some((self.current_key.clone(), value)));
1401        }
1402    }
1403}
1404
1405#[cfg(test)]
1406mod tests {
1407    use super::*;
1408
1409    #[test]
1410    fn test_bloom_filter_basic() {
1411        let mut bloom = BloomFilter::new(100, 10);
1412
1413        bloom.insert(b"hello");
1414        bloom.insert(b"world");
1415        bloom.insert(b"test");
1416
1417        assert!(bloom.may_contain(b"hello"));
1418        assert!(bloom.may_contain(b"world"));
1419        assert!(bloom.may_contain(b"test"));
1420
1421        // These should likely return false (with ~1% false positive rate)
1422        assert!(!bloom.may_contain(b"notfound"));
1423        assert!(!bloom.may_contain(b"missing"));
1424    }
1425
1426    #[test]
1427    fn test_bloom_filter_serialization() {
1428        let mut bloom = BloomFilter::new(100, 10);
1429        bloom.insert(b"key1");
1430        bloom.insert(b"key2");
1431
1432        let bytes = bloom.to_bytes();
1433        let restored = BloomFilter::from_bytes(&bytes).unwrap();
1434
1435        assert!(restored.may_contain(b"key1"));
1436        assert!(restored.may_contain(b"key2"));
1437        assert!(!restored.may_contain(b"key3"));
1438    }
1439
1440    #[test]
1441    fn test_bloom_filter_false_positive_rate() {
1442        let num_keys = 10000;
1443        let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
1444
1445        // Insert keys
1446        for i in 0..num_keys {
1447            let key = format!("key_{}", i);
1448            bloom.insert(key.as_bytes());
1449        }
1450
1451        // All inserted keys should be found
1452        for i in 0..num_keys {
1453            let key = format!("key_{}", i);
1454            assert!(bloom.may_contain(key.as_bytes()));
1455        }
1456
1457        // Check false positive rate on non-existent keys
1458        let mut false_positives = 0;
1459        let test_count = 10000;
1460        for i in 0..test_count {
1461            let key = format!("nonexistent_{}", i);
1462            if bloom.may_contain(key.as_bytes()) {
1463                false_positives += 1;
1464            }
1465        }
1466
1467        // With 10 bits per key, expect ~1% false positive rate
1468        // Allow up to 3% due to hash function variance
1469        let fp_rate = false_positives as f64 / test_count as f64;
1470        assert!(
1471            fp_rate < 0.03,
1472            "False positive rate {} is too high",
1473            fp_rate
1474        );
1475    }
1476
1477    #[test]
1478    fn test_sstable_writer_config() {
1479        use crate::structures::IndexOptimization;
1480
1481        // Default = Adaptive
1482        let config = SSTableWriterConfig::default();
1483        assert_eq!(config.compression_level.0, 9); // BETTER
1484        assert!(!config.use_bloom_filter);
1485        assert!(!config.use_dictionary);
1486
1487        // Adaptive
1488        let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
1489        assert_eq!(adaptive.compression_level.0, 9);
1490        assert!(!adaptive.use_bloom_filter);
1491        assert!(!adaptive.use_dictionary);
1492
1493        // SizeOptimized
1494        let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
1495        assert_eq!(size.compression_level.0, 22); // MAX
1496        assert!(size.use_bloom_filter);
1497        assert!(size.use_dictionary);
1498
1499        // PerformanceOptimized
1500        let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
1501        assert_eq!(perf.compression_level.0, 1); // FAST
1502        assert!(perf.use_bloom_filter); // Bloom helps skip blocks fast
1503        assert!(!perf.use_dictionary);
1504
1505        // Aliases
1506        let fast = SSTableWriterConfig::fast();
1507        assert_eq!(fast.compression_level.0, 1);
1508
1509        let max = SSTableWriterConfig::max_compression();
1510        assert_eq!(max.compression_level.0, 22);
1511    }
1512
1513    #[test]
1514    fn test_vint_roundtrip() {
1515        let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
1516
1517        for &val in &test_values {
1518            let mut buf = Vec::new();
1519            write_vint(&mut buf, val).unwrap();
1520            let mut reader = buf.as_slice();
1521            let decoded = read_vint(&mut reader).unwrap();
1522            assert_eq!(val, decoded, "Failed for value {}", val);
1523        }
1524    }
1525
1526    #[test]
1527    fn test_common_prefix_len() {
1528        assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
1529        assert_eq!(common_prefix_len(b"hello", b"help"), 3);
1530        assert_eq!(common_prefix_len(b"hello", b"world"), 0);
1531        assert_eq!(common_prefix_len(b"", b"hello"), 0);
1532        assert_eq!(common_prefix_len(b"hello", b""), 0);
1533    }
1534}