hermes_core/structures/
sstable.rs

1//! Async SSTable with lazy loading via FileSlice
2//!
3//! Only loads the index into memory, blocks are loaded on-demand.
4//!
5//! ## Optimizations
6//!
7//! 1. **Sparse Index**: Samples every Nth block's first key for fast range narrowing,
8//!    reducing bisect reads from O(log N) to O(log(N/SPARSE_INDEX_INTERVAL)) ≈ 1-2 reads.
9//!
10//! 2. **Dictionary Compression**: Trains a Zstd dictionary from block samples for
11//!    15-30% better compression on similar data patterns.
12//!
13//! 3. **Configurable Compression Level**: Supports levels 1-22 for space/speed tradeoff.
14//!
15//! 4. **Block Index Prefix Compression**: Applies prefix compression to block index
16//!    keys for 5-10% index size reduction.
17//!
18//! 5. **Bloom Filter**: Per-SSTable bloom filter to skip blocks that definitely
19//!    don't contain a key, reducing unnecessary I/O.
20
21use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
22use parking_lot::RwLock;
23use rustc_hash::FxHashMap;
24use std::io::{self, Read, Write};
25use std::sync::Arc;
26
27use crate::compression::{CompressionDict, CompressionLevel};
28use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
29
30/// SSTable magic number - version 3 with optimizations
31pub const SSTABLE_MAGIC: u32 = 0x53544233; // "STB3"
32
33/// Block size for SSTable (16KB default)
34pub const BLOCK_SIZE: usize = 16 * 1024;
35
36/// Sparse index sampling interval - sample every Nth block
37/// The block index is already fully in memory; sparse index provides
38/// an additional level for very large SSTables.
39pub const SPARSE_INDEX_INTERVAL: usize = 16;
40
41/// Default dictionary size (64KB)
42pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
43
44/// Bloom filter bits per key (10 bits ≈ 1% false positive rate)
45pub const BLOOM_BITS_PER_KEY: usize = 10;
46
47/// Bloom filter hash count (optimal for 10 bits/key)
48pub const BLOOM_HASH_COUNT: usize = 7;
49
50// ============================================================================
51// Bloom Filter Implementation
52// ============================================================================
53
54/// Simple bloom filter for key existence checks
55#[derive(Debug, Clone)]
56pub struct BloomFilter {
57    bits: Vec<u64>,
58    num_bits: usize,
59    num_hashes: usize,
60}
61
62impl BloomFilter {
63    /// Create a new bloom filter sized for expected number of keys
64    pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
65        let num_bits = (expected_keys * bits_per_key).max(64);
66        let num_words = num_bits.div_ceil(64);
67        Self {
68            bits: vec![0u64; num_words],
69            num_bits,
70            num_hashes: BLOOM_HASH_COUNT,
71        }
72    }
73
74    /// Create from serialized bytes
75    pub fn from_bytes(data: &[u8]) -> io::Result<Self> {
76        if data.len() < 12 {
77            return Err(io::Error::new(
78                io::ErrorKind::InvalidData,
79                "Bloom filter data too short",
80            ));
81        }
82        let mut reader = data;
83        let num_bits = reader.read_u32::<LittleEndian>()? as usize;
84        let num_hashes = reader.read_u32::<LittleEndian>()? as usize;
85        let num_words = reader.read_u32::<LittleEndian>()? as usize;
86
87        if reader.len() < num_words * 8 {
88            return Err(io::Error::new(
89                io::ErrorKind::InvalidData,
90                "Bloom filter data truncated",
91            ));
92        }
93
94        let mut bits = Vec::with_capacity(num_words);
95        for _ in 0..num_words {
96            bits.push(reader.read_u64::<LittleEndian>()?);
97        }
98
99        Ok(Self {
100            bits,
101            num_bits,
102            num_hashes,
103        })
104    }
105
106    /// Serialize to bytes
107    pub fn to_bytes(&self) -> Vec<u8> {
108        let mut data = Vec::with_capacity(12 + self.bits.len() * 8);
109        data.write_u32::<LittleEndian>(self.num_bits as u32)
110            .unwrap();
111        data.write_u32::<LittleEndian>(self.num_hashes as u32)
112            .unwrap();
113        data.write_u32::<LittleEndian>(self.bits.len() as u32)
114            .unwrap();
115        for &word in &self.bits {
116            data.write_u64::<LittleEndian>(word).unwrap();
117        }
118        data
119    }
120
121    /// Add a key to the filter
122    pub fn insert(&mut self, key: &[u8]) {
123        let (h1, h2) = self.hash_pair(key);
124        for i in 0..self.num_hashes {
125            let bit_pos = self.get_bit_pos(h1, h2, i);
126            let word_idx = bit_pos / 64;
127            let bit_idx = bit_pos % 64;
128            if word_idx < self.bits.len() {
129                self.bits[word_idx] |= 1u64 << bit_idx;
130            }
131        }
132    }
133
134    /// Check if a key might be in the filter
135    /// Returns false if definitely not present, true if possibly present
136    pub fn may_contain(&self, key: &[u8]) -> bool {
137        let (h1, h2) = self.hash_pair(key);
138        for i in 0..self.num_hashes {
139            let bit_pos = self.get_bit_pos(h1, h2, i);
140            let word_idx = bit_pos / 64;
141            let bit_idx = bit_pos % 64;
142            if word_idx >= self.bits.len() || (self.bits[word_idx] & (1u64 << bit_idx)) == 0 {
143                return false;
144            }
145        }
146        true
147    }
148
149    /// Size in bytes
150    pub fn size_bytes(&self) -> usize {
151        12 + self.bits.len() * 8
152    }
153
154    /// Compute two hash values using FNV-1a variant
155    fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
156        // FNV-1a hash
157        let mut h1: u64 = 0xcbf29ce484222325;
158        for &byte in key {
159            h1 ^= byte as u64;
160            h1 = h1.wrapping_mul(0x100000001b3);
161        }
162
163        // Second hash using different seed
164        let mut h2: u64 = 0x84222325cbf29ce4;
165        for &byte in key {
166            h2 = h2.wrapping_mul(0x100000001b3);
167            h2 ^= byte as u64;
168        }
169
170        (h1, h2)
171    }
172
173    /// Get bit position for hash iteration i using double hashing
174    #[inline]
175    fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
176        (h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
177    }
178}
179
180/// SSTable value trait
181pub trait SSTableValue: Clone + Send + Sync {
182    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
183    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
184}
185
186/// u64 value implementation
187impl SSTableValue for u64 {
188    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
189        write_vint(writer, *self)
190    }
191
192    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
193        read_vint(reader)
194    }
195}
196
197/// Vec<u8> value implementation
198impl SSTableValue for Vec<u8> {
199    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
200        write_vint(writer, self.len() as u64)?;
201        writer.write_all(self)
202    }
203
204    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
205        let len = read_vint(reader)? as usize;
206        let mut data = vec![0u8; len];
207        reader.read_exact(&mut data)?;
208        Ok(data)
209    }
210}
211
212/// Maximum number of postings that can be inlined in TermInfo
213pub const MAX_INLINE_POSTINGS: usize = 3;
214
215/// Term info for posting list references
216///
217/// Supports two modes:
218/// - **Inline**: Small posting lists (1-3 docs) stored directly in TermInfo
219/// - **External**: Larger posting lists stored in separate .post file
220///
221/// This eliminates a separate I/O read for rare/unique terms.
222#[derive(Debug, Clone, PartialEq, Eq)]
223pub enum TermInfo {
224    /// Small posting list inlined directly (up to MAX_INLINE_POSTINGS entries)
225    /// Each entry is (doc_id, term_freq) delta-encoded
226    Inline {
227        /// Number of postings (1-3)
228        doc_freq: u8,
229        /// Inline data: delta-encoded (doc_id, term_freq) pairs
230        /// Format: [delta_doc_id, term_freq, delta_doc_id, term_freq, ...]
231        data: [u8; 16],
232        /// Actual length of data used
233        data_len: u8,
234    },
235    /// Reference to external posting list in .post file
236    External {
237        posting_offset: u64,
238        posting_len: u32,
239        doc_freq: u32,
240    },
241}
242
243impl TermInfo {
244    /// Create an external reference
245    pub fn external(posting_offset: u64, posting_len: u32, doc_freq: u32) -> Self {
246        TermInfo::External {
247            posting_offset,
248            posting_len,
249            doc_freq,
250        }
251    }
252
253    /// Try to create an inline TermInfo from posting data
254    /// Returns None if posting list is too large to inline
255    pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
256        if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
257            return None;
258        }
259
260        let mut data = [0u8; 16];
261        let mut cursor = std::io::Cursor::new(&mut data[..]);
262        let mut prev_doc_id = 0u32;
263
264        for (i, &doc_id) in doc_ids.iter().enumerate() {
265            let delta = doc_id - prev_doc_id;
266            if write_vint(&mut cursor, delta as u64).is_err() {
267                return None;
268            }
269            if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
270                return None;
271            }
272            prev_doc_id = doc_id;
273        }
274
275        let data_len = cursor.position() as u8;
276        if data_len > 16 {
277            return None;
278        }
279
280        Some(TermInfo::Inline {
281            doc_freq: doc_ids.len() as u8,
282            data,
283            data_len,
284        })
285    }
286
287    /// Get document frequency
288    pub fn doc_freq(&self) -> u32 {
289        match self {
290            TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
291            TermInfo::External { doc_freq, .. } => *doc_freq,
292        }
293    }
294
295    /// Check if this is an inline posting list
296    pub fn is_inline(&self) -> bool {
297        matches!(self, TermInfo::Inline { .. })
298    }
299
300    /// Get external posting info (offset, len) - returns None for inline
301    pub fn external_info(&self) -> Option<(u64, u32)> {
302        match self {
303            TermInfo::External {
304                posting_offset,
305                posting_len,
306                ..
307            } => Some((*posting_offset, *posting_len)),
308            TermInfo::Inline { .. } => None,
309        }
310    }
311
312    /// Decode inline postings into (doc_ids, term_freqs)
313    /// Returns None if this is an external reference
314    pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
315        match self {
316            TermInfo::Inline {
317                doc_freq,
318                data,
319                data_len,
320            } => {
321                let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
322                let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
323                let mut reader = &data[..*data_len as usize];
324                let mut prev_doc_id = 0u32;
325
326                for _ in 0..*doc_freq {
327                    let delta = read_vint(&mut reader).ok()? as u32;
328                    let tf = read_vint(&mut reader).ok()? as u32;
329                    let doc_id = prev_doc_id + delta;
330                    doc_ids.push(doc_id);
331                    term_freqs.push(tf);
332                    prev_doc_id = doc_id;
333                }
334
335                Some((doc_ids, term_freqs))
336            }
337            TermInfo::External { .. } => None,
338        }
339    }
340}
341
342impl SSTableValue for TermInfo {
343    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
344        match self {
345            TermInfo::Inline {
346                doc_freq,
347                data,
348                data_len,
349            } => {
350                // Tag byte 0xFF = inline marker
351                writer.write_u8(0xFF)?;
352                writer.write_u8(*doc_freq)?;
353                writer.write_u8(*data_len)?;
354                writer.write_all(&data[..*data_len as usize])?;
355            }
356            TermInfo::External {
357                posting_offset,
358                posting_len,
359                doc_freq,
360            } => {
361                // Tag byte 0x00 = external marker
362                writer.write_u8(0x00)?;
363                write_vint(writer, *doc_freq as u64)?;
364                write_vint(writer, *posting_offset)?;
365                write_vint(writer, *posting_len as u64)?;
366            }
367        }
368        Ok(())
369    }
370
371    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
372        let tag = reader.read_u8()?;
373
374        if tag == 0xFF {
375            // Inline
376            let doc_freq = reader.read_u8()?;
377            let data_len = reader.read_u8()?;
378            let mut data = [0u8; 16];
379            reader.read_exact(&mut data[..data_len as usize])?;
380            Ok(TermInfo::Inline {
381                doc_freq,
382                data,
383                data_len,
384            })
385        } else if tag == 0x00 {
386            // External
387            let doc_freq = read_vint(reader)? as u32;
388            let posting_offset = read_vint(reader)?;
389            let posting_len = read_vint(reader)? as u32;
390            Ok(TermInfo::External {
391                posting_offset,
392                posting_len,
393                doc_freq,
394            })
395        } else {
396            Err(io::Error::new(
397                io::ErrorKind::InvalidData,
398                format!("Invalid TermInfo tag: {}", tag),
399            ))
400        }
401    }
402}
403
404/// Write variable-length integer
405pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
406    loop {
407        let byte = (value & 0x7F) as u8;
408        value >>= 7;
409        if value == 0 {
410            writer.write_u8(byte)?;
411            return Ok(());
412        } else {
413            writer.write_u8(byte | 0x80)?;
414        }
415    }
416}
417
418/// Read variable-length integer
419pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
420    let mut result = 0u64;
421    let mut shift = 0;
422
423    loop {
424        let byte = reader.read_u8()?;
425        result |= ((byte & 0x7F) as u64) << shift;
426        if byte & 0x80 == 0 {
427            return Ok(result);
428        }
429        shift += 7;
430        if shift >= 64 {
431            return Err(io::Error::new(
432                io::ErrorKind::InvalidData,
433                "varint too long",
434            ));
435        }
436    }
437}
438
439/// Compute common prefix length
440pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
441    a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
442}
443
444/// Sparse index entry - samples every Nth block for fast range narrowing
445#[derive(Debug, Clone)]
446struct SparseIndexEntry {
447    /// First key of the sampled block
448    first_key: Vec<u8>,
449    /// Index into the full block index
450    block_idx: u32,
451}
452
453/// SSTable statistics for debugging
454#[derive(Debug, Clone)]
455pub struct SSTableStats {
456    pub num_blocks: usize,
457    pub num_sparse_entries: usize,
458    pub num_entries: u64,
459    pub has_bloom_filter: bool,
460    pub has_dictionary: bool,
461    pub bloom_filter_size: usize,
462    pub dictionary_size: usize,
463}
464
465/// SSTable writer configuration
466#[derive(Debug, Clone)]
467pub struct SSTableWriterConfig {
468    /// Compression level (1-22, higher = better compression but slower)
469    pub compression_level: CompressionLevel,
470    /// Whether to train and use a dictionary for compression
471    pub use_dictionary: bool,
472    /// Dictionary size in bytes (default 64KB)
473    pub dict_size: usize,
474    /// Whether to build a bloom filter
475    pub use_bloom_filter: bool,
476    /// Bloom filter bits per key (default 10 = ~1% false positive rate)
477    pub bloom_bits_per_key: usize,
478}
479
480impl Default for SSTableWriterConfig {
481    fn default() -> Self {
482        Self::from_optimization(crate::structures::IndexOptimization::default())
483    }
484}
485
486impl SSTableWriterConfig {
487    /// Create config from IndexOptimization mode
488    pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
489        use crate::structures::IndexOptimization;
490        match optimization {
491            IndexOptimization::Adaptive => Self {
492                compression_level: CompressionLevel::BETTER, // Level 9
493                use_dictionary: false,
494                dict_size: DEFAULT_DICT_SIZE,
495                use_bloom_filter: false,
496                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
497            },
498            IndexOptimization::SizeOptimized => Self {
499                compression_level: CompressionLevel::MAX, // Level 22
500                use_dictionary: true,
501                dict_size: DEFAULT_DICT_SIZE,
502                use_bloom_filter: true,
503                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
504            },
505            IndexOptimization::PerformanceOptimized => Self {
506                compression_level: CompressionLevel::FAST, // Level 1
507                use_dictionary: false,
508                dict_size: DEFAULT_DICT_SIZE,
509                use_bloom_filter: true, // Bloom helps skip blocks fast
510                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
511            },
512        }
513    }
514
515    /// Fast configuration - prioritize write speed over compression
516    pub fn fast() -> Self {
517        Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
518    }
519
520    /// Maximum compression configuration - prioritize size over speed
521    pub fn max_compression() -> Self {
522        Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
523    }
524}
525
526/// SSTable writer with optimizations:
527/// - Dictionary compression for blocks (if dictionary provided)
528/// - Configurable compression level
529/// - Block index prefix compression
530/// - Bloom filter for fast negative lookups
531pub struct SSTableWriter<'a, V: SSTableValue> {
532    writer: &'a mut dyn Write,
533    block_buffer: Vec<u8>,
534    prev_key: Vec<u8>,
535    index: Vec<BlockIndexEntry>,
536    current_offset: u64,
537    num_entries: u64,
538    block_first_key: Option<Vec<u8>>,
539    config: SSTableWriterConfig,
540    /// Pre-trained dictionary for compression (optional)
541    dictionary: Option<CompressionDict>,
542    /// All keys for bloom filter
543    all_keys: Vec<Vec<u8>>,
544    /// Bloom filter (built at finish time)
545    bloom_filter: Option<BloomFilter>,
546    _phantom: std::marker::PhantomData<V>,
547}
548
549impl<'a, V: SSTableValue> SSTableWriter<'a, V> {
550    /// Create a new SSTable writer with default configuration
551    pub fn new(writer: &'a mut dyn Write) -> Self {
552        Self::with_config(writer, SSTableWriterConfig::default())
553    }
554
555    /// Create a new SSTable writer with custom configuration
556    pub fn with_config(writer: &'a mut dyn Write, config: SSTableWriterConfig) -> Self {
557        Self {
558            writer,
559            block_buffer: Vec::with_capacity(BLOCK_SIZE),
560            prev_key: Vec::new(),
561            index: Vec::new(),
562            current_offset: 0,
563            num_entries: 0,
564            block_first_key: None,
565            config,
566            dictionary: None,
567            all_keys: Vec::new(),
568            bloom_filter: None,
569            _phantom: std::marker::PhantomData,
570        }
571    }
572
573    /// Create a new SSTable writer with a pre-trained dictionary
574    pub fn with_dictionary(
575        writer: &'a mut dyn Write,
576        config: SSTableWriterConfig,
577        dictionary: CompressionDict,
578    ) -> Self {
579        Self {
580            writer,
581            block_buffer: Vec::with_capacity(BLOCK_SIZE),
582            prev_key: Vec::new(),
583            index: Vec::new(),
584            current_offset: 0,
585            num_entries: 0,
586            block_first_key: None,
587            config,
588            dictionary: Some(dictionary),
589            all_keys: Vec::new(),
590            bloom_filter: None,
591            _phantom: std::marker::PhantomData,
592        }
593    }
594
595    pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
596        if self.block_first_key.is_none() {
597            self.block_first_key = Some(key.to_vec());
598        }
599
600        // Collect keys for bloom filter
601        if self.config.use_bloom_filter {
602            self.all_keys.push(key.to_vec());
603        }
604
605        let prefix_len = common_prefix_len(&self.prev_key, key);
606        let suffix = &key[prefix_len..];
607
608        write_vint(&mut self.block_buffer, prefix_len as u64)?;
609        write_vint(&mut self.block_buffer, suffix.len() as u64)?;
610        self.block_buffer.extend_from_slice(suffix);
611        value.serialize(&mut self.block_buffer)?;
612
613        self.prev_key.clear();
614        self.prev_key.extend_from_slice(key);
615        self.num_entries += 1;
616
617        if self.block_buffer.len() >= BLOCK_SIZE {
618            self.flush_block()?;
619        }
620
621        Ok(())
622    }
623
624    /// Flush and compress the current block
625    fn flush_block(&mut self) -> io::Result<()> {
626        if self.block_buffer.is_empty() {
627            return Ok(());
628        }
629
630        // Compress block with dictionary if available
631        let compressed = if let Some(ref dict) = self.dictionary {
632            crate::compression::compress_with_dict(
633                &self.block_buffer,
634                self.config.compression_level,
635                dict,
636            )?
637        } else {
638            crate::compression::compress(&self.block_buffer, self.config.compression_level)?
639        };
640
641        if let Some(first_key) = self.block_first_key.take() {
642            self.index.push(BlockIndexEntry {
643                first_key,
644                offset: self.current_offset,
645                length: compressed.len() as u32,
646            });
647        }
648
649        self.writer.write_all(&compressed)?;
650        self.current_offset += compressed.len() as u64;
651        self.block_buffer.clear();
652        self.prev_key.clear();
653
654        Ok(())
655    }
656
657    pub fn finish(mut self) -> io::Result<()> {
658        // Flush any remaining data
659        self.flush_block()?;
660
661        // Build bloom filter from collected keys
662        if self.config.use_bloom_filter && !self.all_keys.is_empty() {
663            let mut bloom = BloomFilter::new(self.all_keys.len(), self.config.bloom_bits_per_key);
664            for key in &self.all_keys {
665                bloom.insert(key);
666            }
667            self.bloom_filter = Some(bloom);
668        }
669
670        let data_end_offset = self.current_offset;
671
672        // Build sparse index - sample every SPARSE_INDEX_INTERVAL blocks
673        let sparse_index: Vec<SparseIndexEntry> = self
674            .index
675            .iter()
676            .enumerate()
677            .filter(|(i, _)| *i % SPARSE_INDEX_INTERVAL == 0)
678            .map(|(i, entry)| SparseIndexEntry {
679                first_key: entry.first_key.clone(),
680                block_idx: i as u32,
681            })
682            .collect();
683
684        // Write block index with prefix compression
685        let index_clone = self.index.clone();
686        self.write_block_index_compressed(&index_clone)?;
687
688        // Write sparse index
689        self.writer
690            .write_u32::<LittleEndian>(sparse_index.len() as u32)?;
691        for entry in &sparse_index {
692            self.writer
693                .write_u16::<LittleEndian>(entry.first_key.len() as u16)?;
694            self.writer.write_all(&entry.first_key)?;
695            self.writer.write_u32::<LittleEndian>(entry.block_idx)?;
696        }
697
698        // Write bloom filter if present
699        let bloom_offset = if let Some(ref bloom) = self.bloom_filter {
700            let bloom_data = bloom.to_bytes();
701            let offset = self.current_offset;
702            self.writer.write_all(&bloom_data)?;
703            self.current_offset += bloom_data.len() as u64;
704            offset
705        } else {
706            0
707        };
708
709        // Write dictionary if present
710        let dict_offset = if let Some(ref dict) = self.dictionary {
711            let dict_bytes = dict.as_bytes();
712            let offset = self.current_offset;
713            self.writer
714                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
715            self.writer.write_all(dict_bytes)?;
716            self.current_offset += 4 + dict_bytes.len() as u64;
717            offset
718        } else {
719            0
720        };
721
722        // Write extended footer (v3 format)
723        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
724        self.writer.write_u64::<LittleEndian>(self.num_entries)?;
725        self.writer.write_u64::<LittleEndian>(bloom_offset)?; // 0 if no bloom
726        self.writer.write_u64::<LittleEndian>(dict_offset)?; // 0 if no dict
727        self.writer
728            .write_u8(self.config.compression_level.0 as u8)?;
729        self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
730
731        Ok(())
732    }
733
734    /// Write block index with prefix compression for keys
735    fn write_block_index_compressed(&mut self, index: &[BlockIndexEntry]) -> io::Result<()> {
736        self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
737
738        let mut prev_key: Vec<u8> = Vec::new();
739        for entry in index {
740            // Prefix compress the key
741            let prefix_len = common_prefix_len(&prev_key, &entry.first_key);
742            let suffix = &entry.first_key[prefix_len..];
743
744            // Write: prefix_len (varint) + suffix_len (varint) + suffix + offset (varint) + length (varint)
745            write_vint(&mut *self.writer, prefix_len as u64)?;
746            write_vint(&mut *self.writer, suffix.len() as u64)?;
747            self.writer.write_all(suffix)?;
748            write_vint(&mut *self.writer, entry.offset)?;
749            write_vint(&mut *self.writer, entry.length as u64)?;
750
751            prev_key.clear();
752            prev_key.extend_from_slice(&entry.first_key);
753        }
754
755        Ok(())
756    }
757}
758
759/// Block index entry
760#[derive(Debug, Clone)]
761struct BlockIndexEntry {
762    first_key: Vec<u8>,
763    offset: u64,
764    length: u32,
765}
766
767/// Async SSTable reader - loads blocks on demand via LazyFileSlice
768pub struct AsyncSSTableReader<V: SSTableValue> {
769    /// LazyFileSlice for the data portion (blocks only) - fetches ranges on demand
770    data_slice: LazyFileSlice,
771    /// Block index (loaded into memory - small)
772    index: Vec<BlockIndexEntry>,
773    /// Sparse index for fast range narrowing (samples every Nth block)
774    sparse_index: Vec<SparseIndexEntry>,
775    num_entries: u64,
776    /// Hot cache for decompressed blocks
777    cache: RwLock<BlockCache>,
778    /// Bloom filter for fast negative lookups (optional)
779    bloom_filter: Option<BloomFilter>,
780    /// Compression dictionary (optional)
781    dictionary: Option<CompressionDict>,
782    /// Compression level used
783    #[allow(dead_code)]
784    compression_level: CompressionLevel,
785    _phantom: std::marker::PhantomData<V>,
786}
787
788/// LRU-style block cache
789struct BlockCache {
790    blocks: FxHashMap<u64, Arc<Vec<u8>>>,
791    access_order: Vec<u64>,
792    max_blocks: usize,
793}
794
795impl BlockCache {
796    fn new(max_blocks: usize) -> Self {
797        Self {
798            blocks: FxHashMap::default(),
799            access_order: Vec::new(),
800            max_blocks,
801        }
802    }
803
804    fn get(&mut self, offset: u64) -> Option<Arc<Vec<u8>>> {
805        if let Some(block) = self.blocks.get(&offset) {
806            if let Some(pos) = self.access_order.iter().position(|&o| o == offset) {
807                self.access_order.remove(pos);
808                self.access_order.push(offset);
809            }
810            Some(Arc::clone(block))
811        } else {
812            None
813        }
814    }
815
816    fn insert(&mut self, offset: u64, block: Arc<Vec<u8>>) {
817        while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
818            let evict_offset = self.access_order.remove(0);
819            self.blocks.remove(&evict_offset);
820        }
821        self.blocks.insert(offset, block);
822        self.access_order.push(offset);
823    }
824}
825
826impl<V: SSTableValue> AsyncSSTableReader<V> {
827    /// Open an SSTable from a LazyFileHandle
828    /// Only loads the footer and index into memory, data blocks fetched on-demand
829    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
830        let file_len = file_handle.len();
831        if file_len < 37 {
832            return Err(io::Error::new(
833                io::ErrorKind::InvalidData,
834                "SSTable too small",
835            ));
836        }
837
838        // Read footer (37 bytes)
839        // Format: data_end(8) + num_entries(8) + bloom_offset(8) + dict_offset(8) + compression_level(1) + magic(4)
840        let footer_bytes = file_handle
841            .read_bytes_range(file_len - 37..file_len)
842            .await?;
843
844        let mut reader = footer_bytes.as_slice();
845        let data_end_offset = reader.read_u64::<LittleEndian>()?;
846        let num_entries = reader.read_u64::<LittleEndian>()?;
847        let bloom_offset = reader.read_u64::<LittleEndian>()?;
848        let dict_offset = reader.read_u64::<LittleEndian>()?;
849        let compression_level = CompressionLevel(reader.read_u8()? as i32);
850        let magic = reader.read_u32::<LittleEndian>()?;
851
852        if magic != SSTABLE_MAGIC {
853            return Err(io::Error::new(
854                io::ErrorKind::InvalidData,
855                format!("Invalid SSTable magic: 0x{:08X}", magic),
856            ));
857        }
858
859        // Read index section
860        let index_start = data_end_offset as usize;
861        let index_end = file_len - 37;
862        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
863        let mut reader = index_bytes.as_slice();
864
865        // Read block index (prefix-compressed)
866        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
867        let mut index = Vec::with_capacity(num_blocks);
868        let mut prev_key: Vec<u8> = Vec::new();
869
870        for _ in 0..num_blocks {
871            let prefix_len = read_vint(&mut reader)? as usize;
872            let suffix_len = read_vint(&mut reader)? as usize;
873            let mut suffix = vec![0u8; suffix_len];
874            reader.read_exact(&mut suffix)?;
875
876            // Reconstruct full key
877            let mut first_key = prev_key[..prefix_len.min(prev_key.len())].to_vec();
878            first_key.extend_from_slice(&suffix);
879
880            let offset = read_vint(&mut reader)?;
881            let length = read_vint(&mut reader)? as u32;
882
883            prev_key = first_key.clone();
884            index.push(BlockIndexEntry {
885                first_key,
886                offset,
887                length,
888            });
889        }
890
891        // Read sparse index
892        let num_sparse = reader.read_u32::<LittleEndian>()? as usize;
893        let mut sparse_index = Vec::with_capacity(num_sparse);
894
895        for _ in 0..num_sparse {
896            let key_len = reader.read_u16::<LittleEndian>()? as usize;
897            let mut first_key = vec![0u8; key_len];
898            reader.read_exact(&mut first_key)?;
899            let block_idx = reader.read_u32::<LittleEndian>()?;
900
901            sparse_index.push(SparseIndexEntry {
902                first_key,
903                block_idx,
904            });
905        }
906
907        // Load bloom filter if present
908        let bloom_filter = if bloom_offset > 0 {
909            let bloom_start = bloom_offset as usize;
910            // Read bloom filter size first (12 bytes header)
911            let bloom_header = file_handle
912                .read_bytes_range(bloom_start..bloom_start + 12)
913                .await?;
914            let num_words = u32::from_le_bytes([
915                bloom_header[8],
916                bloom_header[9],
917                bloom_header[10],
918                bloom_header[11],
919            ]) as usize;
920            let bloom_size = 12 + num_words * 8;
921            let bloom_data = file_handle
922                .read_bytes_range(bloom_start..bloom_start + bloom_size)
923                .await?;
924            Some(BloomFilter::from_bytes(&bloom_data)?)
925        } else {
926            None
927        };
928
929        // Load dictionary if present
930        let dictionary = if dict_offset > 0 {
931            let dict_start = dict_offset as usize;
932            // Read dictionary size first
933            let dict_len_bytes = file_handle
934                .read_bytes_range(dict_start..dict_start + 4)
935                .await?;
936            let dict_len = u32::from_le_bytes([
937                dict_len_bytes[0],
938                dict_len_bytes[1],
939                dict_len_bytes[2],
940                dict_len_bytes[3],
941            ]) as usize;
942            let dict_data = file_handle
943                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
944                .await?;
945            Some(CompressionDict::from_bytes(dict_data.to_vec()))
946        } else {
947            None
948        };
949
950        // Create a lazy slice for just the data portion
951        let data_slice = file_handle.slice(0..data_end_offset as usize);
952
953        Ok(Self {
954            data_slice,
955            index,
956            sparse_index,
957            num_entries,
958            cache: RwLock::new(BlockCache::new(cache_blocks)),
959            bloom_filter,
960            dictionary,
961            compression_level,
962            _phantom: std::marker::PhantomData,
963        })
964    }
965
966    /// Number of entries
967    pub fn num_entries(&self) -> u64 {
968        self.num_entries
969    }
970
971    /// Get stats about this SSTable for debugging
972    pub fn stats(&self) -> SSTableStats {
973        SSTableStats {
974            num_blocks: self.index.len(),
975            num_sparse_entries: self.sparse_index.len(),
976            num_entries: self.num_entries,
977            has_bloom_filter: self.bloom_filter.is_some(),
978            has_dictionary: self.dictionary.is_some(),
979            bloom_filter_size: self
980                .bloom_filter
981                .as_ref()
982                .map(|b| b.size_bytes())
983                .unwrap_or(0),
984            dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
985        }
986    }
987
988    /// Look up a key (async - may need to load block)
989    ///
990    /// Uses bloom filter for fast negative lookups, then sparse index to narrow
991    /// search range before binary search, reducing I/O to typically 1 block read.
992    pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
993        log::debug!(
994            "SSTable::get called, key_len={}, total_blocks={}, sparse_entries={}",
995            key.len(),
996            self.index.len(),
997            self.sparse_index.len()
998        );
999
1000        // Check bloom filter first - fast negative lookup
1001        if let Some(ref bloom) = self.bloom_filter
1002            && !bloom.may_contain(key)
1003        {
1004            log::debug!("SSTable::get bloom filter negative");
1005            return Ok(None);
1006        }
1007
1008        // Use sparse index to find the block range to search
1009        let (start_block, end_block) = self.find_block_range(key);
1010        log::debug!("SSTable::get sparse_range=[{}, {}]", start_block, end_block);
1011
1012        // Binary search within the narrowed range (in-memory, no I/O)
1013        let search_range = &self.index[start_block..=end_block];
1014        let block_idx =
1015            match search_range.binary_search_by(|entry| entry.first_key.as_slice().cmp(key)) {
1016                Ok(idx) => start_block + idx,
1017                Err(0) => {
1018                    if start_block == 0 {
1019                        log::debug!("SSTable::get key not found (before first block)");
1020                        return Ok(None);
1021                    }
1022                    start_block
1023                }
1024                Err(idx) => start_block + idx - 1,
1025            };
1026
1027        log::debug!("SSTable::get loading block_idx={}", block_idx);
1028
1029        // Now we know exactly which block to load - single I/O
1030        let block_data = self.load_block(block_idx).await?;
1031        self.search_block(&block_data, key)
1032    }
1033
1034    /// Batch lookup multiple keys with optimized I/O
1035    ///
1036    /// Groups keys by block and loads each block only once, reducing
1037    /// I/O from N reads to at most N reads (often fewer if keys share blocks).
1038    /// Uses bloom filter to skip keys that definitely don't exist.
1039    pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
1040        if keys.is_empty() {
1041            return Ok(Vec::new());
1042        }
1043
1044        // Map each key to its block index
1045        let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
1046        for (key_idx, key) in keys.iter().enumerate() {
1047            // Check bloom filter first
1048            if let Some(ref bloom) = self.bloom_filter
1049                && !bloom.may_contain(key)
1050            {
1051                key_to_block.push((key_idx, usize::MAX)); // Definitely not present
1052                continue;
1053            }
1054
1055            let (start_block, end_block) = self.find_block_range(key);
1056            let search_range = &self.index[start_block..=end_block];
1057            let block_idx =
1058                match search_range.binary_search_by(|entry| entry.first_key.as_slice().cmp(key)) {
1059                    Ok(idx) => start_block + idx,
1060                    Err(0) => {
1061                        if start_block == 0 {
1062                            key_to_block.push((key_idx, usize::MAX)); // Mark as not found
1063                            continue;
1064                        }
1065                        start_block
1066                    }
1067                    Err(idx) => start_block + idx - 1,
1068                };
1069            key_to_block.push((key_idx, block_idx));
1070        }
1071
1072        // Group keys by block
1073        let mut blocks_to_load: Vec<usize> = key_to_block
1074            .iter()
1075            .filter(|(_, b)| *b != usize::MAX)
1076            .map(|(_, b)| *b)
1077            .collect();
1078        blocks_to_load.sort_unstable();
1079        blocks_to_load.dedup();
1080
1081        // Load all needed blocks (this is where I/O happens)
1082        for &block_idx in &blocks_to_load {
1083            let _ = self.load_block(block_idx).await?;
1084        }
1085
1086        // Now search each key in its block (all blocks are cached)
1087        let mut results = vec![None; keys.len()];
1088        for (key_idx, block_idx) in key_to_block {
1089            if block_idx == usize::MAX {
1090                continue;
1091            }
1092            let block_data = self.load_block(block_idx).await?; // Will hit cache
1093            results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
1094        }
1095
1096        Ok(results)
1097    }
1098
1099    /// Find the block range that could contain the key using sparse index
1100    ///
1101    /// Returns (start_block_idx, end_block_idx) inclusive range
1102    fn find_block_range(&self, key: &[u8]) -> (usize, usize) {
1103        if self.sparse_index.is_empty() {
1104            return (0, self.index.len().saturating_sub(1));
1105        }
1106
1107        // Binary search in sparse index (in-memory, no I/O)
1108        let sparse_pos = match self
1109            .sparse_index
1110            .binary_search_by(|entry| entry.first_key.as_slice().cmp(key))
1111        {
1112            Ok(idx) => idx,
1113            Err(0) => 0,
1114            Err(idx) => idx - 1,
1115        };
1116
1117        let start_block = self.sparse_index[sparse_pos].block_idx as usize;
1118        let end_block = if sparse_pos + 1 < self.sparse_index.len() {
1119            // End at the next sparse entry's block (exclusive becomes inclusive -1)
1120            (self.sparse_index[sparse_pos + 1].block_idx as usize).saturating_sub(1)
1121        } else {
1122            // Last sparse entry - search to end of index
1123            self.index.len().saturating_sub(1)
1124        };
1125
1126        (start_block, end_block.max(start_block))
1127    }
1128
1129    /// Preload all data blocks into memory
1130    ///
1131    /// Call this after open() to eliminate all I/O during subsequent lookups.
1132    /// Useful when the SSTable is small enough to fit in memory.
1133    pub async fn preload_all_blocks(&self) -> io::Result<()> {
1134        for block_idx in 0..self.index.len() {
1135            self.load_block(block_idx).await?;
1136        }
1137        Ok(())
1138    }
1139
1140    /// Load a block (checks cache first, then loads from FileSlice)
1141    /// Uses dictionary decompression if dictionary is present
1142    async fn load_block(&self, block_idx: usize) -> io::Result<Arc<Vec<u8>>> {
1143        let entry = &self.index[block_idx];
1144
1145        // Check cache first
1146        {
1147            let mut cache = self.cache.write();
1148            if let Some(block) = cache.get(entry.offset) {
1149                log::debug!("SSTable::load_block idx={} CACHE HIT", block_idx);
1150                return Ok(block);
1151            }
1152        }
1153
1154        log::debug!(
1155            "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
1156            block_idx,
1157            entry.offset,
1158            entry.offset + entry.length as u64
1159        );
1160
1161        // Load from FileSlice
1162        let start = entry.offset as usize;
1163        let end = start + entry.length as usize;
1164        let compressed = self.data_slice.read_bytes_range(start..end).await?;
1165
1166        // Decompress with dictionary if available
1167        let decompressed = if let Some(ref dict) = self.dictionary {
1168            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1169        } else {
1170            crate::compression::decompress(compressed.as_slice())?
1171        };
1172
1173        let block = Arc::new(decompressed);
1174
1175        // Insert into cache
1176        {
1177            let mut cache = self.cache.write();
1178            cache.insert(entry.offset, Arc::clone(&block));
1179        }
1180
1181        Ok(block)
1182    }
1183
1184    fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
1185        let mut reader = block_data;
1186        let mut current_key = Vec::new();
1187
1188        while !reader.is_empty() {
1189            let common_prefix_len = read_vint(&mut reader)? as usize;
1190            let suffix_len = read_vint(&mut reader)? as usize;
1191
1192            current_key.truncate(common_prefix_len);
1193            let mut suffix = vec![0u8; suffix_len];
1194            reader.read_exact(&mut suffix)?;
1195            current_key.extend_from_slice(&suffix);
1196
1197            let value = V::deserialize(&mut reader)?;
1198
1199            match current_key.as_slice().cmp(target_key) {
1200                std::cmp::Ordering::Equal => return Ok(Some(value)),
1201                std::cmp::Ordering::Greater => return Ok(None),
1202                std::cmp::Ordering::Less => continue,
1203            }
1204        }
1205
1206        Ok(None)
1207    }
1208
1209    /// Prefetch blocks for a key range
1210    pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
1211        let start_block = match self
1212            .index
1213            .binary_search_by(|e| e.first_key.as_slice().cmp(start_key))
1214        {
1215            Ok(idx) => idx,
1216            Err(0) => 0,
1217            Err(idx) => idx - 1,
1218        };
1219
1220        let end_block = match self
1221            .index
1222            .binary_search_by(|e| e.first_key.as_slice().cmp(end_key))
1223        {
1224            Ok(idx) => idx,
1225            Err(idx) if idx >= self.index.len() => self.index.len().saturating_sub(1),
1226            Err(idx) => idx,
1227        };
1228
1229        for block_idx in start_block..=end_block.min(self.index.len().saturating_sub(1)) {
1230            let _ = self.load_block(block_idx).await?;
1231        }
1232
1233        Ok(())
1234    }
1235
1236    /// Iterate over all entries (loads blocks as needed)
1237    pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
1238        AsyncSSTableIterator::new(self)
1239    }
1240
1241    /// Get all entries as a vector (for merging)
1242    pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
1243        let mut results = Vec::new();
1244
1245        for block_idx in 0..self.index.len() {
1246            let block_data = self.load_block(block_idx).await?;
1247            let mut reader = block_data.as_slice();
1248            let mut current_key = Vec::new();
1249
1250            while !reader.is_empty() {
1251                let common_prefix_len = read_vint(&mut reader)? as usize;
1252                let suffix_len = read_vint(&mut reader)? as usize;
1253
1254                current_key.truncate(common_prefix_len);
1255                let mut suffix = vec![0u8; suffix_len];
1256                reader.read_exact(&mut suffix)?;
1257                current_key.extend_from_slice(&suffix);
1258
1259                let value = V::deserialize(&mut reader)?;
1260                results.push((current_key.clone(), value));
1261            }
1262        }
1263
1264        Ok(results)
1265    }
1266}
1267
1268/// Async iterator over SSTable entries
1269pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
1270    reader: &'a AsyncSSTableReader<V>,
1271    current_block: usize,
1272    block_data: Option<Arc<Vec<u8>>>,
1273    block_offset: usize,
1274    current_key: Vec<u8>,
1275    finished: bool,
1276}
1277
1278impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
1279    fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
1280        Self {
1281            reader,
1282            current_block: 0,
1283            block_data: None,
1284            block_offset: 0,
1285            current_key: Vec::new(),
1286            finished: reader.index.is_empty(),
1287        }
1288    }
1289
1290    async fn load_next_block(&mut self) -> io::Result<bool> {
1291        if self.current_block >= self.reader.index.len() {
1292            self.finished = true;
1293            return Ok(false);
1294        }
1295
1296        self.block_data = Some(self.reader.load_block(self.current_block).await?);
1297        self.block_offset = 0;
1298        self.current_key.clear();
1299        self.current_block += 1;
1300        Ok(true)
1301    }
1302
1303    /// Advance to next entry (async)
1304    pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
1305        if self.finished {
1306            return Ok(None);
1307        }
1308
1309        if self.block_data.is_none() && !self.load_next_block().await? {
1310            return Ok(None);
1311        }
1312
1313        loop {
1314            let block = self.block_data.as_ref().unwrap();
1315            if self.block_offset >= block.len() {
1316                if !self.load_next_block().await? {
1317                    return Ok(None);
1318                }
1319                continue;
1320            }
1321
1322            let mut reader = &block[self.block_offset..];
1323            let start_len = reader.len();
1324
1325            let common_prefix_len = read_vint(&mut reader)? as usize;
1326            let suffix_len = read_vint(&mut reader)? as usize;
1327
1328            self.current_key.truncate(common_prefix_len);
1329            let mut suffix = vec![0u8; suffix_len];
1330            reader.read_exact(&mut suffix)?;
1331            self.current_key.extend_from_slice(&suffix);
1332
1333            let value = V::deserialize(&mut reader)?;
1334
1335            self.block_offset += start_len - reader.len();
1336
1337            return Ok(Some((self.current_key.clone(), value)));
1338        }
1339    }
1340}
1341
1342#[cfg(test)]
1343mod tests {
1344    use super::*;
1345
1346    #[test]
1347    fn test_bloom_filter_basic() {
1348        let mut bloom = BloomFilter::new(100, 10);
1349
1350        bloom.insert(b"hello");
1351        bloom.insert(b"world");
1352        bloom.insert(b"test");
1353
1354        assert!(bloom.may_contain(b"hello"));
1355        assert!(bloom.may_contain(b"world"));
1356        assert!(bloom.may_contain(b"test"));
1357
1358        // These should likely return false (with ~1% false positive rate)
1359        assert!(!bloom.may_contain(b"notfound"));
1360        assert!(!bloom.may_contain(b"missing"));
1361    }
1362
1363    #[test]
1364    fn test_bloom_filter_serialization() {
1365        let mut bloom = BloomFilter::new(100, 10);
1366        bloom.insert(b"key1");
1367        bloom.insert(b"key2");
1368
1369        let bytes = bloom.to_bytes();
1370        let restored = BloomFilter::from_bytes(&bytes).unwrap();
1371
1372        assert!(restored.may_contain(b"key1"));
1373        assert!(restored.may_contain(b"key2"));
1374        assert!(!restored.may_contain(b"key3"));
1375    }
1376
1377    #[test]
1378    fn test_bloom_filter_false_positive_rate() {
1379        let num_keys = 10000;
1380        let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
1381
1382        // Insert keys
1383        for i in 0..num_keys {
1384            let key = format!("key_{}", i);
1385            bloom.insert(key.as_bytes());
1386        }
1387
1388        // All inserted keys should be found
1389        for i in 0..num_keys {
1390            let key = format!("key_{}", i);
1391            assert!(bloom.may_contain(key.as_bytes()));
1392        }
1393
1394        // Check false positive rate on non-existent keys
1395        let mut false_positives = 0;
1396        let test_count = 10000;
1397        for i in 0..test_count {
1398            let key = format!("nonexistent_{}", i);
1399            if bloom.may_contain(key.as_bytes()) {
1400                false_positives += 1;
1401            }
1402        }
1403
1404        // With 10 bits per key, expect ~1% false positive rate
1405        // Allow up to 3% due to hash function variance
1406        let fp_rate = false_positives as f64 / test_count as f64;
1407        assert!(
1408            fp_rate < 0.03,
1409            "False positive rate {} is too high",
1410            fp_rate
1411        );
1412    }
1413
1414    #[test]
1415    fn test_sstable_writer_config() {
1416        use crate::structures::IndexOptimization;
1417
1418        // Default = Adaptive
1419        let config = SSTableWriterConfig::default();
1420        assert_eq!(config.compression_level.0, 9); // BETTER
1421        assert!(!config.use_bloom_filter);
1422        assert!(!config.use_dictionary);
1423
1424        // Adaptive
1425        let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
1426        assert_eq!(adaptive.compression_level.0, 9);
1427        assert!(!adaptive.use_bloom_filter);
1428        assert!(!adaptive.use_dictionary);
1429
1430        // SizeOptimized
1431        let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
1432        assert_eq!(size.compression_level.0, 22); // MAX
1433        assert!(size.use_bloom_filter);
1434        assert!(size.use_dictionary);
1435
1436        // PerformanceOptimized
1437        let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
1438        assert_eq!(perf.compression_level.0, 1); // FAST
1439        assert!(perf.use_bloom_filter); // Bloom helps skip blocks fast
1440        assert!(!perf.use_dictionary);
1441
1442        // Aliases
1443        let fast = SSTableWriterConfig::fast();
1444        assert_eq!(fast.compression_level.0, 1);
1445
1446        let max = SSTableWriterConfig::max_compression();
1447        assert_eq!(max.compression_level.0, 22);
1448    }
1449
1450    #[test]
1451    fn test_vint_roundtrip() {
1452        let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
1453
1454        for &val in &test_values {
1455            let mut buf = Vec::new();
1456            write_vint(&mut buf, val).unwrap();
1457            let mut reader = buf.as_slice();
1458            let decoded = read_vint(&mut reader).unwrap();
1459            assert_eq!(val, decoded, "Failed for value {}", val);
1460        }
1461    }
1462
1463    #[test]
1464    fn test_common_prefix_len() {
1465        assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
1466        assert_eq!(common_prefix_len(b"hello", b"help"), 3);
1467        assert_eq!(common_prefix_len(b"hello", b"world"), 0);
1468        assert_eq!(common_prefix_len(b"", b"hello"), 0);
1469        assert_eq!(common_prefix_len(b"hello", b""), 0);
1470    }
1471}