Skip to main content

hermes_core/structures/
sstable.rs

1//! Async SSTable with lazy loading via FileSlice
2//!
3//! Memory-efficient design - only loads minimal metadata into memory,
4//! blocks are loaded on-demand.
5//!
6//! ## Key Features
7//!
8//! 1. **FST-based Block Index**: Uses Finite State Transducer for key lookup
9//!    - Can be mmap'd directly without parsing into heap-allocated structures
10//!    - ~90% memory reduction compared to Vec<BlockIndexEntry>
11//!
12//! 2. **Bitpacked Block Addresses**: Offsets and lengths stored with delta encoding
13//!    - Minimal memory footprint for block metadata
14//!
15//! 3. **Dictionary Compression**: Zstd dictionary for 15-30% better compression
16//!
17//! 4. **Configurable Compression Level**: Levels 1-22 for space/speed tradeoff
18//!
19//! 5. **Bloom Filter**: Fast negative lookups to skip unnecessary I/O
20
21use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
22use parking_lot::RwLock;
23use rustc_hash::FxHashMap;
24use std::io::{self, Read, Write};
25use std::sync::Arc;
26
27#[cfg(feature = "native")]
28use super::sstable_index::FstBlockIndex;
29use super::sstable_index::{BlockAddr, BlockIndex, MmapBlockIndex};
30use crate::compression::{CompressionDict, CompressionLevel};
31use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice, OwnedBytes};
32
33/// SSTable magic number - version 4 with memory-efficient index
34/// Uses FST-based or mmap'd block index to avoid heap allocation
35pub const SSTABLE_MAGIC: u32 = 0x53544234; // "STB4"
36
37/// Block size for SSTable (16KB default)
38pub const BLOCK_SIZE: usize = 16 * 1024;
39
40/// Default dictionary size (64KB)
41pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
42
43/// Bloom filter bits per key (10 bits ≈ 1% false positive rate)
44pub const BLOOM_BITS_PER_KEY: usize = 10;
45
46/// Bloom filter hash count (optimal for 10 bits/key)
47pub const BLOOM_HASH_COUNT: usize = 7;
48
49// ============================================================================
50// Bloom Filter Implementation
51// ============================================================================
52
53/// Simple bloom filter for key existence checks
54#[derive(Debug, Clone)]
55pub struct BloomFilter {
56    bits: Vec<u64>,
57    num_bits: usize,
58    num_hashes: usize,
59}
60
61impl BloomFilter {
62    /// Create a new bloom filter sized for expected number of keys
63    pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
64        let num_bits = (expected_keys * bits_per_key).max(64);
65        let num_words = num_bits.div_ceil(64);
66        Self {
67            bits: vec![0u64; num_words],
68            num_bits,
69            num_hashes: BLOOM_HASH_COUNT,
70        }
71    }
72
73    /// Create from serialized bytes
74    pub fn from_bytes(data: &[u8]) -> io::Result<Self> {
75        if data.len() < 12 {
76            return Err(io::Error::new(
77                io::ErrorKind::InvalidData,
78                "Bloom filter data too short",
79            ));
80        }
81        let mut reader = data;
82        let num_bits = reader.read_u32::<LittleEndian>()? as usize;
83        let num_hashes = reader.read_u32::<LittleEndian>()? as usize;
84        let num_words = reader.read_u32::<LittleEndian>()? as usize;
85
86        if reader.len() < num_words * 8 {
87            return Err(io::Error::new(
88                io::ErrorKind::InvalidData,
89                "Bloom filter data truncated",
90            ));
91        }
92
93        let mut bits = Vec::with_capacity(num_words);
94        for _ in 0..num_words {
95            bits.push(reader.read_u64::<LittleEndian>()?);
96        }
97
98        Ok(Self {
99            bits,
100            num_bits,
101            num_hashes,
102        })
103    }
104
105    /// Serialize to bytes
106    pub fn to_bytes(&self) -> Vec<u8> {
107        let mut data = Vec::with_capacity(12 + self.bits.len() * 8);
108        data.write_u32::<LittleEndian>(self.num_bits as u32)
109            .unwrap();
110        data.write_u32::<LittleEndian>(self.num_hashes as u32)
111            .unwrap();
112        data.write_u32::<LittleEndian>(self.bits.len() as u32)
113            .unwrap();
114        for &word in &self.bits {
115            data.write_u64::<LittleEndian>(word).unwrap();
116        }
117        data
118    }
119
120    /// Add a key to the filter
121    pub fn insert(&mut self, key: &[u8]) {
122        let (h1, h2) = self.hash_pair(key);
123        for i in 0..self.num_hashes {
124            let bit_pos = self.get_bit_pos(h1, h2, i);
125            let word_idx = bit_pos / 64;
126            let bit_idx = bit_pos % 64;
127            if word_idx < self.bits.len() {
128                self.bits[word_idx] |= 1u64 << bit_idx;
129            }
130        }
131    }
132
133    /// Check if a key might be in the filter
134    /// Returns false if definitely not present, true if possibly present
135    pub fn may_contain(&self, key: &[u8]) -> bool {
136        let (h1, h2) = self.hash_pair(key);
137        for i in 0..self.num_hashes {
138            let bit_pos = self.get_bit_pos(h1, h2, i);
139            let word_idx = bit_pos / 64;
140            let bit_idx = bit_pos % 64;
141            if word_idx >= self.bits.len() || (self.bits[word_idx] & (1u64 << bit_idx)) == 0 {
142                return false;
143            }
144        }
145        true
146    }
147
148    /// Size in bytes
149    pub fn size_bytes(&self) -> usize {
150        12 + self.bits.len() * 8
151    }
152
153    /// Compute two hash values using FNV-1a variant
154    fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
155        // FNV-1a hash
156        let mut h1: u64 = 0xcbf29ce484222325;
157        for &byte in key {
158            h1 ^= byte as u64;
159            h1 = h1.wrapping_mul(0x100000001b3);
160        }
161
162        // Second hash using different seed
163        let mut h2: u64 = 0x84222325cbf29ce4;
164        for &byte in key {
165            h2 = h2.wrapping_mul(0x100000001b3);
166            h2 ^= byte as u64;
167        }
168
169        (h1, h2)
170    }
171
172    /// Get bit position for hash iteration i using double hashing
173    #[inline]
174    fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
175        (h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
176    }
177}
178
179/// SSTable value trait
180pub trait SSTableValue: Clone + Send + Sync {
181    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
182    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
183}
184
185/// u64 value implementation
186impl SSTableValue for u64 {
187    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
188        write_vint(writer, *self)
189    }
190
191    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
192        read_vint(reader)
193    }
194}
195
196/// Vec<u8> value implementation
197impl SSTableValue for Vec<u8> {
198    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
199        write_vint(writer, self.len() as u64)?;
200        writer.write_all(self)
201    }
202
203    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
204        let len = read_vint(reader)? as usize;
205        let mut data = vec![0u8; len];
206        reader.read_exact(&mut data)?;
207        Ok(data)
208    }
209}
210
211/// Sparse dimension info for SSTable-based sparse index
212/// Stores offset and length for posting list lookup
213#[derive(Debug, Clone, Copy, PartialEq, Eq)]
214pub struct SparseDimInfo {
215    /// Offset in sparse file where posting list starts
216    pub offset: u64,
217    /// Length of serialized posting list
218    pub length: u32,
219}
220
221impl SparseDimInfo {
222    pub fn new(offset: u64, length: u32) -> Self {
223        Self { offset, length }
224    }
225}
226
227impl SSTableValue for SparseDimInfo {
228    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
229        write_vint(writer, self.offset)?;
230        write_vint(writer, self.length as u64)
231    }
232
233    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
234        let offset = read_vint(reader)?;
235        let length = read_vint(reader)? as u32;
236        Ok(Self { offset, length })
237    }
238}
239
240/// Maximum number of postings that can be inlined in TermInfo
241pub const MAX_INLINE_POSTINGS: usize = 3;
242
243/// Term info for posting list references
244///
245/// Supports two modes:
246/// - **Inline**: Small posting lists (1-3 docs) stored directly in TermInfo
247/// - **External**: Larger posting lists stored in separate .post file
248///
249/// This eliminates a separate I/O read for rare/unique terms.
250#[derive(Debug, Clone, PartialEq, Eq)]
251pub enum TermInfo {
252    /// Small posting list inlined directly (up to MAX_INLINE_POSTINGS entries)
253    /// Each entry is (doc_id, term_freq) delta-encoded
254    Inline {
255        /// Number of postings (1-3)
256        doc_freq: u8,
257        /// Inline data: delta-encoded (doc_id, term_freq) pairs
258        /// Format: [delta_doc_id, term_freq, delta_doc_id, term_freq, ...]
259        data: [u8; 16],
260        /// Actual length of data used
261        data_len: u8,
262    },
263    /// Reference to external posting list in .post file
264    External {
265        posting_offset: u64,
266        posting_len: u32,
267        doc_freq: u32,
268        /// Position data offset (0 if no positions)
269        position_offset: u64,
270        /// Position data length (0 if no positions)
271        position_len: u32,
272    },
273}
274
275impl TermInfo {
276    /// Create an external reference
277    pub fn external(posting_offset: u64, posting_len: u32, doc_freq: u32) -> Self {
278        TermInfo::External {
279            posting_offset,
280            posting_len,
281            doc_freq,
282            position_offset: 0,
283            position_len: 0,
284        }
285    }
286
287    /// Create an external reference with position info
288    pub fn external_with_positions(
289        posting_offset: u64,
290        posting_len: u32,
291        doc_freq: u32,
292        position_offset: u64,
293        position_len: u32,
294    ) -> Self {
295        TermInfo::External {
296            posting_offset,
297            posting_len,
298            doc_freq,
299            position_offset,
300            position_len,
301        }
302    }
303
304    /// Try to create an inline TermInfo from posting data
305    /// Returns None if posting list is too large to inline
306    pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
307        if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
308            return None;
309        }
310
311        let mut data = [0u8; 16];
312        let mut cursor = std::io::Cursor::new(&mut data[..]);
313        let mut prev_doc_id = 0u32;
314
315        for (i, &doc_id) in doc_ids.iter().enumerate() {
316            let delta = doc_id - prev_doc_id;
317            if write_vint(&mut cursor, delta as u64).is_err() {
318                return None;
319            }
320            if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
321                return None;
322            }
323            prev_doc_id = doc_id;
324        }
325
326        let data_len = cursor.position() as u8;
327        if data_len > 16 {
328            return None;
329        }
330
331        Some(TermInfo::Inline {
332            doc_freq: doc_ids.len() as u8,
333            data,
334            data_len,
335        })
336    }
337
338    /// Try to create an inline TermInfo from an iterator of (doc_id, term_freq) pairs.
339    /// Zero-allocation alternative to `try_inline` — avoids collecting into Vec<u32>.
340    /// `count` is the number of postings (must match iterator length).
341    pub fn try_inline_iter(count: usize, iter: impl Iterator<Item = (u32, u32)>) -> Option<Self> {
342        if count > MAX_INLINE_POSTINGS || count == 0 {
343            return None;
344        }
345
346        let mut data = [0u8; 16];
347        let mut cursor = std::io::Cursor::new(&mut data[..]);
348        let mut prev_doc_id = 0u32;
349
350        for (doc_id, tf) in iter {
351            let delta = doc_id - prev_doc_id;
352            if write_vint(&mut cursor, delta as u64).is_err() {
353                return None;
354            }
355            if write_vint(&mut cursor, tf as u64).is_err() {
356                return None;
357            }
358            prev_doc_id = doc_id;
359        }
360
361        let data_len = cursor.position() as u8;
362
363        Some(TermInfo::Inline {
364            doc_freq: count as u8,
365            data,
366            data_len,
367        })
368    }
369
370    /// Get document frequency
371    pub fn doc_freq(&self) -> u32 {
372        match self {
373            TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
374            TermInfo::External { doc_freq, .. } => *doc_freq,
375        }
376    }
377
378    /// Check if this is an inline posting list
379    pub fn is_inline(&self) -> bool {
380        matches!(self, TermInfo::Inline { .. })
381    }
382
383    /// Get external posting info (offset, len) - returns None for inline
384    pub fn external_info(&self) -> Option<(u64, u32)> {
385        match self {
386            TermInfo::External {
387                posting_offset,
388                posting_len,
389                ..
390            } => Some((*posting_offset, *posting_len)),
391            TermInfo::Inline { .. } => None,
392        }
393    }
394
395    /// Get position info (offset, len) - returns None for inline or if no positions
396    pub fn position_info(&self) -> Option<(u64, u32)> {
397        match self {
398            TermInfo::External {
399                position_offset,
400                position_len,
401                ..
402            } if *position_len > 0 => Some((*position_offset, *position_len)),
403            _ => None,
404        }
405    }
406
407    /// Decode inline postings into (doc_ids, term_freqs)
408    /// Returns None if this is an external reference
409    pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
410        match self {
411            TermInfo::Inline {
412                doc_freq,
413                data,
414                data_len,
415            } => {
416                let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
417                let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
418                let mut reader = &data[..*data_len as usize];
419                let mut prev_doc_id = 0u32;
420
421                for _ in 0..*doc_freq {
422                    let delta = read_vint(&mut reader).ok()? as u32;
423                    let tf = read_vint(&mut reader).ok()? as u32;
424                    let doc_id = prev_doc_id + delta;
425                    doc_ids.push(doc_id);
426                    term_freqs.push(tf);
427                    prev_doc_id = doc_id;
428                }
429
430                Some((doc_ids, term_freqs))
431            }
432            TermInfo::External { .. } => None,
433        }
434    }
435}
436
437impl SSTableValue for TermInfo {
438    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
439        match self {
440            TermInfo::Inline {
441                doc_freq,
442                data,
443                data_len,
444            } => {
445                // Tag byte 0xFF = inline marker
446                writer.write_u8(0xFF)?;
447                writer.write_u8(*doc_freq)?;
448                writer.write_u8(*data_len)?;
449                writer.write_all(&data[..*data_len as usize])?;
450            }
451            TermInfo::External {
452                posting_offset,
453                posting_len,
454                doc_freq,
455                position_offset,
456                position_len,
457            } => {
458                // Tag byte 0x00 = external marker (no positions)
459                // Tag byte 0x01 = external with positions
460                if *position_len > 0 {
461                    writer.write_u8(0x01)?;
462                    write_vint(writer, *doc_freq as u64)?;
463                    write_vint(writer, *posting_offset)?;
464                    write_vint(writer, *posting_len as u64)?;
465                    write_vint(writer, *position_offset)?;
466                    write_vint(writer, *position_len as u64)?;
467                } else {
468                    writer.write_u8(0x00)?;
469                    write_vint(writer, *doc_freq as u64)?;
470                    write_vint(writer, *posting_offset)?;
471                    write_vint(writer, *posting_len as u64)?;
472                }
473            }
474        }
475        Ok(())
476    }
477
478    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
479        let tag = reader.read_u8()?;
480
481        if tag == 0xFF {
482            // Inline
483            let doc_freq = reader.read_u8()?;
484            let data_len = reader.read_u8()?;
485            let mut data = [0u8; 16];
486            reader.read_exact(&mut data[..data_len as usize])?;
487            Ok(TermInfo::Inline {
488                doc_freq,
489                data,
490                data_len,
491            })
492        } else if tag == 0x00 {
493            // External (no positions)
494            let doc_freq = read_vint(reader)? as u32;
495            let posting_offset = read_vint(reader)?;
496            let posting_len = read_vint(reader)? as u32;
497            Ok(TermInfo::External {
498                posting_offset,
499                posting_len,
500                doc_freq,
501                position_offset: 0,
502                position_len: 0,
503            })
504        } else if tag == 0x01 {
505            // External with positions
506            let doc_freq = read_vint(reader)? as u32;
507            let posting_offset = read_vint(reader)?;
508            let posting_len = read_vint(reader)? as u32;
509            let position_offset = read_vint(reader)?;
510            let position_len = read_vint(reader)? as u32;
511            Ok(TermInfo::External {
512                posting_offset,
513                posting_len,
514                doc_freq,
515                position_offset,
516                position_len,
517            })
518        } else {
519            Err(io::Error::new(
520                io::ErrorKind::InvalidData,
521                format!("Invalid TermInfo tag: {}", tag),
522            ))
523        }
524    }
525}
526
527/// Write variable-length integer
528pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
529    loop {
530        let byte = (value & 0x7F) as u8;
531        value >>= 7;
532        if value == 0 {
533            writer.write_u8(byte)?;
534            return Ok(());
535        } else {
536            writer.write_u8(byte | 0x80)?;
537        }
538    }
539}
540
541/// Read variable-length integer
542pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
543    let mut result = 0u64;
544    let mut shift = 0;
545
546    loop {
547        let byte = reader.read_u8()?;
548        result |= ((byte & 0x7F) as u64) << shift;
549        if byte & 0x80 == 0 {
550            return Ok(result);
551        }
552        shift += 7;
553        if shift >= 64 {
554            return Err(io::Error::new(
555                io::ErrorKind::InvalidData,
556                "varint too long",
557            ));
558        }
559    }
560}
561
562/// Compute common prefix length
563pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
564    a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
565}
566
567/// SSTable statistics for debugging
568#[derive(Debug, Clone)]
569pub struct SSTableStats {
570    pub num_blocks: usize,
571    pub num_sparse_entries: usize,
572    pub num_entries: u64,
573    pub has_bloom_filter: bool,
574    pub has_dictionary: bool,
575    pub bloom_filter_size: usize,
576    pub dictionary_size: usize,
577}
578
579/// SSTable writer configuration
580#[derive(Debug, Clone)]
581pub struct SSTableWriterConfig {
582    /// Compression level (1-22, higher = better compression but slower)
583    pub compression_level: CompressionLevel,
584    /// Whether to train and use a dictionary for compression
585    pub use_dictionary: bool,
586    /// Dictionary size in bytes (default 64KB)
587    pub dict_size: usize,
588    /// Whether to build a bloom filter
589    pub use_bloom_filter: bool,
590    /// Bloom filter bits per key (default 10 = ~1% false positive rate)
591    pub bloom_bits_per_key: usize,
592}
593
594impl Default for SSTableWriterConfig {
595    fn default() -> Self {
596        Self::from_optimization(crate::structures::IndexOptimization::default())
597    }
598}
599
600impl SSTableWriterConfig {
601    /// Create config from IndexOptimization mode
602    pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
603        use crate::structures::IndexOptimization;
604        match optimization {
605            IndexOptimization::Adaptive => Self {
606                compression_level: CompressionLevel::BETTER, // Level 9
607                use_dictionary: false,
608                dict_size: DEFAULT_DICT_SIZE,
609                use_bloom_filter: false,
610                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
611            },
612            IndexOptimization::SizeOptimized => Self {
613                compression_level: CompressionLevel::MAX, // Level 22
614                use_dictionary: true,
615                dict_size: DEFAULT_DICT_SIZE,
616                use_bloom_filter: true,
617                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
618            },
619            IndexOptimization::PerformanceOptimized => Self {
620                compression_level: CompressionLevel::FAST, // Level 1
621                use_dictionary: false,
622                dict_size: DEFAULT_DICT_SIZE,
623                use_bloom_filter: true, // Bloom helps skip blocks fast
624                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
625            },
626        }
627    }
628
629    /// Fast configuration - prioritize write speed over compression
630    pub fn fast() -> Self {
631        Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
632    }
633
634    /// Maximum compression configuration - prioritize size over speed
635    pub fn max_compression() -> Self {
636        Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
637    }
638}
639
640/// SSTable writer with optimizations:
641/// - Dictionary compression for blocks (if dictionary provided)
642/// - Configurable compression level
643/// - Block index prefix compression
644/// - Bloom filter for fast negative lookups
645pub struct SSTableWriter<W: Write, V: SSTableValue> {
646    writer: W,
647    block_buffer: Vec<u8>,
648    prev_key: Vec<u8>,
649    index: Vec<BlockIndexEntry>,
650    current_offset: u64,
651    num_entries: u64,
652    block_first_key: Option<Vec<u8>>,
653    config: SSTableWriterConfig,
654    /// Pre-trained dictionary for compression (optional)
655    dictionary: Option<CompressionDict>,
656    /// All keys for bloom filter
657    all_keys: Vec<Vec<u8>>,
658    /// Bloom filter (built at finish time)
659    bloom_filter: Option<BloomFilter>,
660    _phantom: std::marker::PhantomData<V>,
661}
662
663impl<W: Write, V: SSTableValue> SSTableWriter<W, V> {
664    /// Create a new SSTable writer with default configuration
665    pub fn new(writer: W) -> Self {
666        Self::with_config(writer, SSTableWriterConfig::default())
667    }
668
669    /// Create a new SSTable writer with custom configuration
670    pub fn with_config(writer: W, config: SSTableWriterConfig) -> Self {
671        Self {
672            writer,
673            block_buffer: Vec::with_capacity(BLOCK_SIZE),
674            prev_key: Vec::new(),
675            index: Vec::new(),
676            current_offset: 0,
677            num_entries: 0,
678            block_first_key: None,
679            config,
680            dictionary: None,
681            all_keys: Vec::new(),
682            bloom_filter: None,
683            _phantom: std::marker::PhantomData,
684        }
685    }
686
687    /// Create a new SSTable writer with a pre-trained dictionary
688    pub fn with_dictionary(
689        writer: W,
690        config: SSTableWriterConfig,
691        dictionary: CompressionDict,
692    ) -> Self {
693        Self {
694            writer,
695            block_buffer: Vec::with_capacity(BLOCK_SIZE),
696            prev_key: Vec::new(),
697            index: Vec::new(),
698            current_offset: 0,
699            num_entries: 0,
700            block_first_key: None,
701            config,
702            dictionary: Some(dictionary),
703            all_keys: Vec::new(),
704            bloom_filter: None,
705            _phantom: std::marker::PhantomData,
706        }
707    }
708
709    pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
710        if self.block_first_key.is_none() {
711            self.block_first_key = Some(key.to_vec());
712        }
713
714        // Collect keys for bloom filter
715        if self.config.use_bloom_filter {
716            self.all_keys.push(key.to_vec());
717        }
718
719        let prefix_len = common_prefix_len(&self.prev_key, key);
720        let suffix = &key[prefix_len..];
721
722        write_vint(&mut self.block_buffer, prefix_len as u64)?;
723        write_vint(&mut self.block_buffer, suffix.len() as u64)?;
724        self.block_buffer.extend_from_slice(suffix);
725        value.serialize(&mut self.block_buffer)?;
726
727        self.prev_key.clear();
728        self.prev_key.extend_from_slice(key);
729        self.num_entries += 1;
730
731        if self.block_buffer.len() >= BLOCK_SIZE {
732            self.flush_block()?;
733        }
734
735        Ok(())
736    }
737
738    /// Flush and compress the current block
739    fn flush_block(&mut self) -> io::Result<()> {
740        if self.block_buffer.is_empty() {
741            return Ok(());
742        }
743
744        // Compress block with dictionary if available
745        let compressed = if let Some(ref dict) = self.dictionary {
746            crate::compression::compress_with_dict(
747                &self.block_buffer,
748                self.config.compression_level,
749                dict,
750            )?
751        } else {
752            crate::compression::compress(&self.block_buffer, self.config.compression_level)?
753        };
754
755        if let Some(first_key) = self.block_first_key.take() {
756            self.index.push(BlockIndexEntry {
757                first_key,
758                offset: self.current_offset,
759                length: compressed.len() as u32,
760            });
761        }
762
763        self.writer.write_all(&compressed)?;
764        self.current_offset += compressed.len() as u64;
765        self.block_buffer.clear();
766        self.prev_key.clear();
767
768        Ok(())
769    }
770
771    pub fn finish(mut self) -> io::Result<W> {
772        // Flush any remaining data
773        self.flush_block()?;
774
775        // Build bloom filter from collected keys
776        if self.config.use_bloom_filter && !self.all_keys.is_empty() {
777            let mut bloom = BloomFilter::new(self.all_keys.len(), self.config.bloom_bits_per_key);
778            for key in &self.all_keys {
779                bloom.insert(key);
780            }
781            self.bloom_filter = Some(bloom);
782        }
783
784        let data_end_offset = self.current_offset;
785
786        // Build memory-efficient block index (v4 format)
787        // Convert to (key, BlockAddr) pairs for the new index format
788        let entries: Vec<(Vec<u8>, BlockAddr)> = self
789            .index
790            .iter()
791            .map(|e| {
792                (
793                    e.first_key.clone(),
794                    BlockAddr {
795                        offset: e.offset,
796                        length: e.length,
797                    },
798                )
799            })
800            .collect();
801
802        // Build FST-based index if native feature is enabled, otherwise use mmap index
803        #[cfg(feature = "native")]
804        let index_bytes = FstBlockIndex::build(&entries)?;
805        #[cfg(not(feature = "native"))]
806        let index_bytes = MmapBlockIndex::build(&entries)?;
807
808        // Write index bytes with length prefix
809        self.writer
810            .write_u32::<LittleEndian>(index_bytes.len() as u32)?;
811        self.writer.write_all(&index_bytes)?;
812        self.current_offset += 4 + index_bytes.len() as u64;
813
814        // Write bloom filter if present
815        let bloom_offset = if let Some(ref bloom) = self.bloom_filter {
816            let bloom_data = bloom.to_bytes();
817            let offset = self.current_offset;
818            self.writer.write_all(&bloom_data)?;
819            self.current_offset += bloom_data.len() as u64;
820            offset
821        } else {
822            0
823        };
824
825        // Write dictionary if present
826        let dict_offset = if let Some(ref dict) = self.dictionary {
827            let dict_bytes = dict.as_bytes();
828            let offset = self.current_offset;
829            self.writer
830                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
831            self.writer.write_all(dict_bytes)?;
832            self.current_offset += 4 + dict_bytes.len() as u64;
833            offset
834        } else {
835            0
836        };
837
838        // Write extended footer (v4 format)
839        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
840        self.writer.write_u64::<LittleEndian>(self.num_entries)?;
841        self.writer.write_u64::<LittleEndian>(bloom_offset)?; // 0 if no bloom
842        self.writer.write_u64::<LittleEndian>(dict_offset)?; // 0 if no dict
843        self.writer
844            .write_u8(self.config.compression_level.0 as u8)?;
845        self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
846
847        Ok(self.writer)
848    }
849}
850
851/// Block index entry
852#[derive(Debug, Clone)]
853struct BlockIndexEntry {
854    first_key: Vec<u8>,
855    offset: u64,
856    length: u32,
857}
858
859/// Async SSTable reader - loads blocks on demand via LazyFileSlice
860///
861/// Memory-efficient design (v4 format):
862/// - Block index uses FST (native) or mmap'd raw bytes - no heap allocation for keys
863/// - Block addresses stored in bitpacked format
864/// - Bloom filter and dictionary optional
865pub struct AsyncSSTableReader<V: SSTableValue> {
866    /// LazyFileSlice for the data portion (blocks only) - fetches ranges on demand
867    data_slice: LazyFileSlice,
868    /// Memory-efficient block index (v4: FST or mmap, v3: legacy Vec)
869    block_index: BlockIndex,
870    num_entries: u64,
871    /// Hot cache for decompressed blocks
872    cache: RwLock<BlockCache>,
873    /// Bloom filter for fast negative lookups (optional)
874    bloom_filter: Option<BloomFilter>,
875    /// Compression dictionary (optional)
876    dictionary: Option<CompressionDict>,
877    /// Compression level used
878    #[allow(dead_code)]
879    compression_level: CompressionLevel,
880    _phantom: std::marker::PhantomData<V>,
881}
882
883/// FIFO block cache — O(1) lookup, insert, and eviction.
884///
885/// Uses VecDeque for eviction order (pop_front = O(1)) instead of
886/// Vec::remove(0) which is O(n). For small caches (16-64 blocks),
887/// FIFO performs nearly identically to LRU.
888struct BlockCache {
889    blocks: FxHashMap<u64, Arc<Vec<u8>>>,
890    insert_order: std::collections::VecDeque<u64>,
891    max_blocks: usize,
892}
893
894impl BlockCache {
895    fn new(max_blocks: usize) -> Self {
896        Self {
897            blocks: FxHashMap::default(),
898            insert_order: std::collections::VecDeque::with_capacity(max_blocks),
899            max_blocks,
900        }
901    }
902
903    fn get(&self, offset: u64) -> Option<Arc<Vec<u8>>> {
904        self.blocks.get(&offset).map(Arc::clone)
905    }
906
907    fn insert(&mut self, offset: u64, block: Arc<Vec<u8>>) {
908        if self.blocks.contains_key(&offset) {
909            return; // already cached
910        }
911        while self.blocks.len() >= self.max_blocks {
912            if let Some(evict_offset) = self.insert_order.pop_front() {
913                self.blocks.remove(&evict_offset);
914            } else {
915                break;
916            }
917        }
918        self.blocks.insert(offset, block);
919        self.insert_order.push_back(offset);
920    }
921}
922
923impl<V: SSTableValue> AsyncSSTableReader<V> {
924    /// Open an SSTable from a LazyFileHandle
925    /// Only loads the footer and index into memory, data blocks fetched on-demand
926    ///
927    /// Supports both v3 (legacy) and v4 (memory-efficient) formats:
928    /// - v4: FST-based or mmap'd block index (no heap allocation for keys)
929    /// - v3: Prefix-compressed block index loaded into Vec (legacy fallback)
930    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
931        let file_len = file_handle.len();
932        if file_len < 37 {
933            return Err(io::Error::new(
934                io::ErrorKind::InvalidData,
935                "SSTable too small",
936            ));
937        }
938
939        // Read footer (37 bytes)
940        // Format: data_end(8) + num_entries(8) + bloom_offset(8) + dict_offset(8) + compression_level(1) + magic(4)
941        let footer_bytes = file_handle
942            .read_bytes_range(file_len - 37..file_len)
943            .await?;
944
945        let mut reader = footer_bytes.as_slice();
946        let data_end_offset = reader.read_u64::<LittleEndian>()?;
947        let num_entries = reader.read_u64::<LittleEndian>()?;
948        let bloom_offset = reader.read_u64::<LittleEndian>()?;
949        let dict_offset = reader.read_u64::<LittleEndian>()?;
950        let compression_level = CompressionLevel(reader.read_u8()? as i32);
951        let magic = reader.read_u32::<LittleEndian>()?;
952
953        if magic != SSTABLE_MAGIC {
954            return Err(io::Error::new(
955                io::ErrorKind::InvalidData,
956                format!("Invalid SSTable magic: 0x{:08X}", magic),
957            ));
958        }
959
960        // Read index section
961        let index_start = data_end_offset;
962        let index_end = file_len - 37;
963        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
964
965        // Parse block index (length-prefixed FST or mmap index)
966        let mut idx_reader = index_bytes.as_slice();
967        let index_len = idx_reader.read_u32::<LittleEndian>()? as usize;
968
969        if index_len > idx_reader.len() {
970            return Err(io::Error::new(
971                io::ErrorKind::InvalidData,
972                "Index data truncated",
973            ));
974        }
975
976        let index_data = OwnedBytes::new(idx_reader[..index_len].to_vec());
977
978        // Try FST first (native), fall back to mmap
979        #[cfg(feature = "native")]
980        let block_index = match FstBlockIndex::load(index_data.clone()) {
981            Ok(fst_idx) => BlockIndex::Fst(fst_idx),
982            Err(_) => BlockIndex::Mmap(MmapBlockIndex::load(index_data)?),
983        };
984        #[cfg(not(feature = "native"))]
985        let block_index = BlockIndex::Mmap(MmapBlockIndex::load(index_data)?);
986
987        // Load bloom filter if present
988        let bloom_filter = if bloom_offset > 0 {
989            let bloom_start = bloom_offset;
990            // Read bloom filter size first (12 bytes header)
991            let bloom_header = file_handle
992                .read_bytes_range(bloom_start..bloom_start + 12)
993                .await?;
994            let num_words = u32::from_le_bytes([
995                bloom_header[8],
996                bloom_header[9],
997                bloom_header[10],
998                bloom_header[11],
999            ]) as u64;
1000            let bloom_size = 12 + num_words * 8;
1001            let bloom_data = file_handle
1002                .read_bytes_range(bloom_start..bloom_start + bloom_size)
1003                .await?;
1004            Some(BloomFilter::from_bytes(&bloom_data)?)
1005        } else {
1006            None
1007        };
1008
1009        // Load dictionary if present
1010        let dictionary = if dict_offset > 0 {
1011            let dict_start = dict_offset;
1012            // Read dictionary size first
1013            let dict_len_bytes = file_handle
1014                .read_bytes_range(dict_start..dict_start + 4)
1015                .await?;
1016            let dict_len = u32::from_le_bytes([
1017                dict_len_bytes[0],
1018                dict_len_bytes[1],
1019                dict_len_bytes[2],
1020                dict_len_bytes[3],
1021            ]) as u64;
1022            let dict_data = file_handle
1023                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
1024                .await?;
1025            Some(CompressionDict::from_bytes(dict_data.to_vec()))
1026        } else {
1027            None
1028        };
1029
1030        // Create a lazy slice for just the data portion
1031        let data_slice = file_handle.slice(0..data_end_offset);
1032
1033        Ok(Self {
1034            data_slice,
1035            block_index,
1036            num_entries,
1037            cache: RwLock::new(BlockCache::new(cache_blocks)),
1038            bloom_filter,
1039            dictionary,
1040            compression_level,
1041            _phantom: std::marker::PhantomData,
1042        })
1043    }
1044
1045    /// Number of entries
1046    pub fn num_entries(&self) -> u64 {
1047        self.num_entries
1048    }
1049
1050    /// Get stats about this SSTable for debugging
1051    pub fn stats(&self) -> SSTableStats {
1052        SSTableStats {
1053            num_blocks: self.block_index.len(),
1054            num_sparse_entries: 0, // No longer using sparse index separately
1055            num_entries: self.num_entries,
1056            has_bloom_filter: self.bloom_filter.is_some(),
1057            has_dictionary: self.dictionary.is_some(),
1058            bloom_filter_size: self
1059                .bloom_filter
1060                .as_ref()
1061                .map(|b| b.size_bytes())
1062                .unwrap_or(0),
1063            dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
1064        }
1065    }
1066
1067    /// Number of blocks currently in the cache
1068    pub fn cached_blocks(&self) -> usize {
1069        self.cache.read().blocks.len()
1070    }
1071
1072    /// Look up a key (async - may need to load block)
1073    ///
1074    /// Uses bloom filter for fast negative lookups, then memory-efficient
1075    /// block index to locate the block, reducing I/O to typically 1 block read.
1076    pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
1077        log::debug!(
1078            "SSTable::get called, key_len={}, total_blocks={}",
1079            key.len(),
1080            self.block_index.len()
1081        );
1082
1083        // Check bloom filter first - fast negative lookup
1084        if let Some(ref bloom) = self.bloom_filter
1085            && !bloom.may_contain(key)
1086        {
1087            log::debug!("SSTable::get bloom filter negative");
1088            return Ok(None);
1089        }
1090
1091        // Use block index to find the block that could contain the key
1092        let block_idx = match self.block_index.locate(key) {
1093            Some(idx) => idx,
1094            None => {
1095                log::debug!("SSTable::get key not found (before first block)");
1096                return Ok(None);
1097            }
1098        };
1099
1100        log::debug!("SSTable::get loading block_idx={}", block_idx);
1101
1102        // Now we know exactly which block to load - single I/O
1103        let block_data = self.load_block(block_idx).await?;
1104        self.search_block(&block_data, key)
1105    }
1106
1107    /// Batch lookup multiple keys with optimized I/O
1108    ///
1109    /// Groups keys by block and loads each block only once, reducing
1110    /// I/O from N reads to at most N reads (often fewer if keys share blocks).
1111    /// Uses bloom filter to skip keys that definitely don't exist.
1112    pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
1113        if keys.is_empty() {
1114            return Ok(Vec::new());
1115        }
1116
1117        // Map each key to its block index
1118        let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
1119        for (key_idx, key) in keys.iter().enumerate() {
1120            // Check bloom filter first
1121            if let Some(ref bloom) = self.bloom_filter
1122                && !bloom.may_contain(key)
1123            {
1124                key_to_block.push((key_idx, usize::MAX)); // Definitely not present
1125                continue;
1126            }
1127
1128            match self.block_index.locate(key) {
1129                Some(block_idx) => key_to_block.push((key_idx, block_idx)),
1130                None => key_to_block.push((key_idx, usize::MAX)), // Mark as not found
1131            }
1132        }
1133
1134        // Group keys by block
1135        let mut blocks_to_load: Vec<usize> = key_to_block
1136            .iter()
1137            .filter(|(_, b)| *b != usize::MAX)
1138            .map(|(_, b)| *b)
1139            .collect();
1140        blocks_to_load.sort_unstable();
1141        blocks_to_load.dedup();
1142
1143        // Load all needed blocks (this is where I/O happens)
1144        for &block_idx in &blocks_to_load {
1145            let _ = self.load_block(block_idx).await?;
1146        }
1147
1148        // Now search each key in its block (all blocks are cached)
1149        let mut results = vec![None; keys.len()];
1150        for (key_idx, block_idx) in key_to_block {
1151            if block_idx == usize::MAX {
1152                continue;
1153            }
1154            let block_data = self.load_block(block_idx).await?; // Will hit cache
1155            results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
1156        }
1157
1158        Ok(results)
1159    }
1160
1161    /// Preload all data blocks into memory
1162    ///
1163    /// Call this after open() to eliminate all I/O during subsequent lookups.
1164    /// Useful when the SSTable is small enough to fit in memory.
1165    pub async fn preload_all_blocks(&self) -> io::Result<()> {
1166        for block_idx in 0..self.block_index.len() {
1167            self.load_block(block_idx).await?;
1168        }
1169        Ok(())
1170    }
1171
1172    /// Prefetch all data blocks via a single bulk I/O operation.
1173    ///
1174    /// Reads the entire compressed data section in one call, then decompresses
1175    /// each block and populates the cache. This turns N individual reads into 1.
1176    /// Cache capacity is expanded to hold all blocks.
1177    pub async fn prefetch_all_data_bulk(&self) -> io::Result<()> {
1178        let num_blocks = self.block_index.len();
1179        if num_blocks == 0 {
1180            return Ok(());
1181        }
1182
1183        // Find total data extent
1184        let mut max_end: u64 = 0;
1185        for i in 0..num_blocks {
1186            if let Some(addr) = self.block_index.get_addr(i) {
1187                max_end = max_end.max(addr.offset + addr.length as u64);
1188            }
1189        }
1190
1191        // Single bulk read of entire data section
1192        let all_data = self.data_slice.read_bytes_range(0..max_end).await?;
1193        let buf = all_data.as_slice();
1194
1195        // Expand cache and decompress all blocks
1196        let mut cache = self.cache.write();
1197        cache.max_blocks = cache.max_blocks.max(num_blocks);
1198        for i in 0..num_blocks {
1199            let addr = self.block_index.get_addr(i).unwrap();
1200            if cache.get(addr.offset).is_some() {
1201                continue;
1202            }
1203            let compressed =
1204                &buf[addr.offset as usize..(addr.offset + addr.length as u64) as usize];
1205            let decompressed = if let Some(ref dict) = self.dictionary {
1206                crate::compression::decompress_with_dict(compressed, dict)?
1207            } else {
1208                crate::compression::decompress(compressed)?
1209            };
1210            cache.insert(addr.offset, Arc::new(decompressed));
1211        }
1212
1213        Ok(())
1214    }
1215
1216    /// Load a block (checks cache first, then loads from FileSlice)
1217    /// Uses dictionary decompression if dictionary is present
1218    async fn load_block(&self, block_idx: usize) -> io::Result<Arc<Vec<u8>>> {
1219        let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
1220            io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
1221        })?;
1222
1223        // Check cache first (read lock — concurrent cache hits don't serialize)
1224        {
1225            if let Some(block) = self.cache.read().get(addr.offset) {
1226                return Ok(block);
1227            }
1228        }
1229
1230        log::debug!(
1231            "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
1232            block_idx,
1233            addr.offset,
1234            addr.offset + addr.length as u64
1235        );
1236
1237        // Load from FileSlice
1238        let range = addr.byte_range();
1239        let compressed = self.data_slice.read_bytes_range(range).await?;
1240
1241        // Decompress with dictionary if available
1242        let decompressed = if let Some(ref dict) = self.dictionary {
1243            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1244        } else {
1245            crate::compression::decompress(compressed.as_slice())?
1246        };
1247
1248        let block = Arc::new(decompressed);
1249
1250        // Insert into cache
1251        {
1252            let mut cache = self.cache.write();
1253            cache.insert(addr.offset, Arc::clone(&block));
1254        }
1255
1256        Ok(block)
1257    }
1258
1259    fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
1260        let mut reader = block_data;
1261        let mut current_key = Vec::new();
1262
1263        while !reader.is_empty() {
1264            let common_prefix_len = read_vint(&mut reader)? as usize;
1265            let suffix_len = read_vint(&mut reader)? as usize;
1266
1267            current_key.truncate(common_prefix_len);
1268            let mut suffix = vec![0u8; suffix_len];
1269            reader.read_exact(&mut suffix)?;
1270            current_key.extend_from_slice(&suffix);
1271
1272            let value = V::deserialize(&mut reader)?;
1273
1274            match current_key.as_slice().cmp(target_key) {
1275                std::cmp::Ordering::Equal => return Ok(Some(value)),
1276                std::cmp::Ordering::Greater => return Ok(None),
1277                std::cmp::Ordering::Less => continue,
1278            }
1279        }
1280
1281        Ok(None)
1282    }
1283
1284    /// Prefetch blocks for a key range
1285    pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
1286        let start_block = self.block_index.locate(start_key).unwrap_or(0);
1287        let end_block = self
1288            .block_index
1289            .locate(end_key)
1290            .unwrap_or(self.block_index.len().saturating_sub(1));
1291
1292        for block_idx in start_block..=end_block.min(self.block_index.len().saturating_sub(1)) {
1293            let _ = self.load_block(block_idx).await?;
1294        }
1295
1296        Ok(())
1297    }
1298
1299    /// Iterate over all entries (loads blocks as needed)
1300    pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
1301        AsyncSSTableIterator::new(self)
1302    }
1303
1304    /// Get all entries as a vector (for merging)
1305    pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
1306        let mut results = Vec::new();
1307
1308        for block_idx in 0..self.block_index.len() {
1309            let block_data = self.load_block(block_idx).await?;
1310            let mut reader = block_data.as_slice();
1311            let mut current_key = Vec::new();
1312
1313            while !reader.is_empty() {
1314                let common_prefix_len = read_vint(&mut reader)? as usize;
1315                let suffix_len = read_vint(&mut reader)? as usize;
1316
1317                current_key.truncate(common_prefix_len);
1318                let mut suffix = vec![0u8; suffix_len];
1319                reader.read_exact(&mut suffix)?;
1320                current_key.extend_from_slice(&suffix);
1321
1322                let value = V::deserialize(&mut reader)?;
1323                results.push((current_key.clone(), value));
1324            }
1325        }
1326
1327        Ok(results)
1328    }
1329}
1330
1331/// Async iterator over SSTable entries
1332pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
1333    reader: &'a AsyncSSTableReader<V>,
1334    current_block: usize,
1335    block_data: Option<Arc<Vec<u8>>>,
1336    block_offset: usize,
1337    current_key: Vec<u8>,
1338    finished: bool,
1339}
1340
1341impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
1342    fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
1343        Self {
1344            reader,
1345            current_block: 0,
1346            block_data: None,
1347            block_offset: 0,
1348            current_key: Vec::new(),
1349            finished: reader.block_index.is_empty(),
1350        }
1351    }
1352
1353    async fn load_next_block(&mut self) -> io::Result<bool> {
1354        if self.current_block >= self.reader.block_index.len() {
1355            self.finished = true;
1356            return Ok(false);
1357        }
1358
1359        self.block_data = Some(self.reader.load_block(self.current_block).await?);
1360        self.block_offset = 0;
1361        self.current_key.clear();
1362        self.current_block += 1;
1363        Ok(true)
1364    }
1365
1366    /// Advance to next entry (async)
1367    pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
1368        if self.finished {
1369            return Ok(None);
1370        }
1371
1372        if self.block_data.is_none() && !self.load_next_block().await? {
1373            return Ok(None);
1374        }
1375
1376        loop {
1377            let block = self.block_data.as_ref().unwrap();
1378            if self.block_offset >= block.len() {
1379                if !self.load_next_block().await? {
1380                    return Ok(None);
1381                }
1382                continue;
1383            }
1384
1385            let mut reader = &block[self.block_offset..];
1386            let start_len = reader.len();
1387
1388            let common_prefix_len = read_vint(&mut reader)? as usize;
1389            let suffix_len = read_vint(&mut reader)? as usize;
1390
1391            self.current_key.truncate(common_prefix_len);
1392            let mut suffix = vec![0u8; suffix_len];
1393            reader.read_exact(&mut suffix)?;
1394            self.current_key.extend_from_slice(&suffix);
1395
1396            let value = V::deserialize(&mut reader)?;
1397
1398            self.block_offset += start_len - reader.len();
1399
1400            return Ok(Some((self.current_key.clone(), value)));
1401        }
1402    }
1403}
1404
1405#[cfg(test)]
1406mod tests {
1407    use super::*;
1408
1409    #[test]
1410    fn test_bloom_filter_basic() {
1411        let mut bloom = BloomFilter::new(100, 10);
1412
1413        bloom.insert(b"hello");
1414        bloom.insert(b"world");
1415        bloom.insert(b"test");
1416
1417        assert!(bloom.may_contain(b"hello"));
1418        assert!(bloom.may_contain(b"world"));
1419        assert!(bloom.may_contain(b"test"));
1420
1421        // These should likely return false (with ~1% false positive rate)
1422        assert!(!bloom.may_contain(b"notfound"));
1423        assert!(!bloom.may_contain(b"missing"));
1424    }
1425
1426    #[test]
1427    fn test_bloom_filter_serialization() {
1428        let mut bloom = BloomFilter::new(100, 10);
1429        bloom.insert(b"key1");
1430        bloom.insert(b"key2");
1431
1432        let bytes = bloom.to_bytes();
1433        let restored = BloomFilter::from_bytes(&bytes).unwrap();
1434
1435        assert!(restored.may_contain(b"key1"));
1436        assert!(restored.may_contain(b"key2"));
1437        assert!(!restored.may_contain(b"key3"));
1438    }
1439
1440    #[test]
1441    fn test_bloom_filter_false_positive_rate() {
1442        let num_keys = 10000;
1443        let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
1444
1445        // Insert keys
1446        for i in 0..num_keys {
1447            let key = format!("key_{}", i);
1448            bloom.insert(key.as_bytes());
1449        }
1450
1451        // All inserted keys should be found
1452        for i in 0..num_keys {
1453            let key = format!("key_{}", i);
1454            assert!(bloom.may_contain(key.as_bytes()));
1455        }
1456
1457        // Check false positive rate on non-existent keys
1458        let mut false_positives = 0;
1459        let test_count = 10000;
1460        for i in 0..test_count {
1461            let key = format!("nonexistent_{}", i);
1462            if bloom.may_contain(key.as_bytes()) {
1463                false_positives += 1;
1464            }
1465        }
1466
1467        // With 10 bits per key, expect ~1% false positive rate
1468        // Allow up to 3% due to hash function variance
1469        let fp_rate = false_positives as f64 / test_count as f64;
1470        assert!(
1471            fp_rate < 0.03,
1472            "False positive rate {} is too high",
1473            fp_rate
1474        );
1475    }
1476
1477    #[test]
1478    fn test_sstable_writer_config() {
1479        use crate::structures::IndexOptimization;
1480
1481        // Default = Adaptive
1482        let config = SSTableWriterConfig::default();
1483        assert_eq!(config.compression_level.0, 9); // BETTER
1484        assert!(!config.use_bloom_filter);
1485        assert!(!config.use_dictionary);
1486
1487        // Adaptive
1488        let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
1489        assert_eq!(adaptive.compression_level.0, 9);
1490        assert!(!adaptive.use_bloom_filter);
1491        assert!(!adaptive.use_dictionary);
1492
1493        // SizeOptimized
1494        let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
1495        assert_eq!(size.compression_level.0, 22); // MAX
1496        assert!(size.use_bloom_filter);
1497        assert!(size.use_dictionary);
1498
1499        // PerformanceOptimized
1500        let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
1501        assert_eq!(perf.compression_level.0, 1); // FAST
1502        assert!(perf.use_bloom_filter); // Bloom helps skip blocks fast
1503        assert!(!perf.use_dictionary);
1504
1505        // Aliases
1506        let fast = SSTableWriterConfig::fast();
1507        assert_eq!(fast.compression_level.0, 1);
1508
1509        let max = SSTableWriterConfig::max_compression();
1510        assert_eq!(max.compression_level.0, 22);
1511    }
1512
1513    #[test]
1514    fn test_vint_roundtrip() {
1515        let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
1516
1517        for &val in &test_values {
1518            let mut buf = Vec::new();
1519            write_vint(&mut buf, val).unwrap();
1520            let mut reader = buf.as_slice();
1521            let decoded = read_vint(&mut reader).unwrap();
1522            assert_eq!(val, decoded, "Failed for value {}", val);
1523        }
1524    }
1525
1526    #[test]
1527    fn test_common_prefix_len() {
1528        assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
1529        assert_eq!(common_prefix_len(b"hello", b"help"), 3);
1530        assert_eq!(common_prefix_len(b"hello", b"world"), 0);
1531        assert_eq!(common_prefix_len(b"", b"hello"), 0);
1532        assert_eq!(common_prefix_len(b"hello", b""), 0);
1533    }
1534}