Skip to main content

hermes_core/structures/
sstable.rs

1//! Async SSTable with lazy loading via FileSlice
2//!
3//! Memory-efficient design - only loads minimal metadata into memory,
4//! blocks are loaded on-demand.
5//!
6//! ## Key Features
7//!
8//! 1. **FST-based Block Index**: Uses Finite State Transducer for key lookup
9//!    - Can be mmap'd directly without parsing into heap-allocated structures
10//!    - ~90% memory reduction compared to Vec<BlockIndexEntry>
11//!
12//! 2. **Bitpacked Block Addresses**: Offsets and lengths stored with delta encoding
13//!    - Minimal memory footprint for block metadata
14//!
15//! 3. **Dictionary Compression**: Zstd dictionary for 15-30% better compression
16//!
17//! 4. **Configurable Compression Level**: Levels 1-22 for space/speed tradeoff
18//!
19//! 5. **Bloom Filter**: Fast negative lookups to skip unnecessary I/O
20
21use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
22use parking_lot::RwLock;
23use rustc_hash::FxHashMap;
24use std::io::{self, Read, Write};
25use std::sync::Arc;
26
27#[cfg(feature = "native")]
28use super::sstable_index::FstBlockIndex;
29use super::sstable_index::{BlockAddr, BlockIndex, MmapBlockIndex};
30use crate::compression::{CompressionDict, CompressionLevel};
31use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice, OwnedBytes};
32
33/// SSTable magic number - version 4 with memory-efficient index
34/// Uses FST-based or mmap'd block index to avoid heap allocation
35pub const SSTABLE_MAGIC: u32 = 0x53544234; // "STB4"
36
37/// Block size for SSTable (16KB default)
38pub const BLOCK_SIZE: usize = 16 * 1024;
39
40/// Default dictionary size (64KB)
41pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
42
43/// Bloom filter bits per key (10 bits ≈ 1% false positive rate)
44pub const BLOOM_BITS_PER_KEY: usize = 10;
45
46/// Bloom filter hash count (optimal for 10 bits/key)
47pub const BLOOM_HASH_COUNT: usize = 7;
48
49// ============================================================================
50// Bloom Filter Implementation
51// ============================================================================
52
53/// Simple bloom filter for key existence checks
54#[derive(Debug, Clone)]
55pub struct BloomFilter {
56    bits: Vec<u64>,
57    num_bits: usize,
58    num_hashes: usize,
59}
60
61impl BloomFilter {
62    /// Create a new bloom filter sized for expected number of keys
63    pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
64        let num_bits = (expected_keys * bits_per_key).max(64);
65        let num_words = num_bits.div_ceil(64);
66        Self {
67            bits: vec![0u64; num_words],
68            num_bits,
69            num_hashes: BLOOM_HASH_COUNT,
70        }
71    }
72
73    /// Create from serialized bytes
74    pub fn from_bytes(data: &[u8]) -> io::Result<Self> {
75        if data.len() < 12 {
76            return Err(io::Error::new(
77                io::ErrorKind::InvalidData,
78                "Bloom filter data too short",
79            ));
80        }
81        let mut reader = data;
82        let num_bits = reader.read_u32::<LittleEndian>()? as usize;
83        let num_hashes = reader.read_u32::<LittleEndian>()? as usize;
84        let num_words = reader.read_u32::<LittleEndian>()? as usize;
85
86        if reader.len() < num_words * 8 {
87            return Err(io::Error::new(
88                io::ErrorKind::InvalidData,
89                "Bloom filter data truncated",
90            ));
91        }
92
93        let mut bits = Vec::with_capacity(num_words);
94        for _ in 0..num_words {
95            bits.push(reader.read_u64::<LittleEndian>()?);
96        }
97
98        Ok(Self {
99            bits,
100            num_bits,
101            num_hashes,
102        })
103    }
104
105    /// Serialize to bytes
106    pub fn to_bytes(&self) -> Vec<u8> {
107        let mut data = Vec::with_capacity(12 + self.bits.len() * 8);
108        data.write_u32::<LittleEndian>(self.num_bits as u32)
109            .unwrap();
110        data.write_u32::<LittleEndian>(self.num_hashes as u32)
111            .unwrap();
112        data.write_u32::<LittleEndian>(self.bits.len() as u32)
113            .unwrap();
114        for &word in &self.bits {
115            data.write_u64::<LittleEndian>(word).unwrap();
116        }
117        data
118    }
119
120    /// Add a key to the filter
121    pub fn insert(&mut self, key: &[u8]) {
122        let (h1, h2) = self.hash_pair(key);
123        for i in 0..self.num_hashes {
124            let bit_pos = self.get_bit_pos(h1, h2, i);
125            let word_idx = bit_pos / 64;
126            let bit_idx = bit_pos % 64;
127            if word_idx < self.bits.len() {
128                self.bits[word_idx] |= 1u64 << bit_idx;
129            }
130        }
131    }
132
133    /// Check if a key might be in the filter
134    /// Returns false if definitely not present, true if possibly present
135    pub fn may_contain(&self, key: &[u8]) -> bool {
136        let (h1, h2) = self.hash_pair(key);
137        for i in 0..self.num_hashes {
138            let bit_pos = self.get_bit_pos(h1, h2, i);
139            let word_idx = bit_pos / 64;
140            let bit_idx = bit_pos % 64;
141            if word_idx >= self.bits.len() || (self.bits[word_idx] & (1u64 << bit_idx)) == 0 {
142                return false;
143            }
144        }
145        true
146    }
147
148    /// Size in bytes
149    pub fn size_bytes(&self) -> usize {
150        12 + self.bits.len() * 8
151    }
152
153    /// Insert a pre-computed hash pair into the filter
154    pub fn insert_hashed(&mut self, h1: u64, h2: u64) {
155        for i in 0..self.num_hashes {
156            let bit_pos = self.get_bit_pos(h1, h2, i);
157            let word_idx = bit_pos / 64;
158            let bit_idx = bit_pos % 64;
159            if word_idx < self.bits.len() {
160                self.bits[word_idx] |= 1u64 << bit_idx;
161            }
162        }
163    }
164
165    /// Compute two hash values using FNV-1a variant
166    fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
167        // FNV-1a hash
168        let mut h1: u64 = 0xcbf29ce484222325;
169        for &byte in key {
170            h1 ^= byte as u64;
171            h1 = h1.wrapping_mul(0x100000001b3);
172        }
173
174        // Second hash using different seed
175        let mut h2: u64 = 0x84222325cbf29ce4;
176        for &byte in key {
177            h2 = h2.wrapping_mul(0x100000001b3);
178            h2 ^= byte as u64;
179        }
180
181        (h1, h2)
182    }
183
184    /// Get bit position for hash iteration i using double hashing
185    #[inline]
186    fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
187        (h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
188    }
189}
190
191/// Compute bloom filter hash pair for a key (standalone, no BloomFilter needed).
192/// Uses the same FNV-1a double-hashing as BloomFilter::hash_pair.
193fn bloom_hash_pair(key: &[u8]) -> (u64, u64) {
194    let mut h1: u64 = 0xcbf29ce484222325;
195    for &byte in key {
196        h1 ^= byte as u64;
197        h1 = h1.wrapping_mul(0x100000001b3);
198    }
199    let mut h2: u64 = 0x84222325cbf29ce4;
200    for &byte in key {
201        h2 = h2.wrapping_mul(0x100000001b3);
202        h2 ^= byte as u64;
203    }
204    (h1, h2)
205}
206
207/// SSTable value trait
208pub trait SSTableValue: Clone + Send + Sync {
209    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
210    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
211}
212
213/// u64 value implementation
214impl SSTableValue for u64 {
215    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
216        write_vint(writer, *self)
217    }
218
219    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
220        read_vint(reader)
221    }
222}
223
224/// Vec<u8> value implementation
225impl SSTableValue for Vec<u8> {
226    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
227        write_vint(writer, self.len() as u64)?;
228        writer.write_all(self)
229    }
230
231    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
232        let len = read_vint(reader)? as usize;
233        let mut data = vec![0u8; len];
234        reader.read_exact(&mut data)?;
235        Ok(data)
236    }
237}
238
239/// Sparse dimension info for SSTable-based sparse index
240/// Stores offset and length for posting list lookup
241#[derive(Debug, Clone, Copy, PartialEq, Eq)]
242pub struct SparseDimInfo {
243    /// Offset in sparse file where posting list starts
244    pub offset: u64,
245    /// Length of serialized posting list
246    pub length: u32,
247}
248
249impl SparseDimInfo {
250    pub fn new(offset: u64, length: u32) -> Self {
251        Self { offset, length }
252    }
253}
254
255impl SSTableValue for SparseDimInfo {
256    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
257        write_vint(writer, self.offset)?;
258        write_vint(writer, self.length as u64)
259    }
260
261    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
262        let offset = read_vint(reader)?;
263        let length = read_vint(reader)? as u32;
264        Ok(Self { offset, length })
265    }
266}
267
268/// Maximum number of postings that can be inlined in TermInfo
269pub const MAX_INLINE_POSTINGS: usize = 3;
270
271/// Term info for posting list references
272///
273/// Supports two modes:
274/// - **Inline**: Small posting lists (1-3 docs) stored directly in TermInfo
275/// - **External**: Larger posting lists stored in separate .post file
276///
277/// This eliminates a separate I/O read for rare/unique terms.
278#[derive(Debug, Clone, PartialEq, Eq)]
279pub enum TermInfo {
280    /// Small posting list inlined directly (up to MAX_INLINE_POSTINGS entries)
281    /// Each entry is (doc_id, term_freq) delta-encoded
282    Inline {
283        /// Number of postings (1-3)
284        doc_freq: u8,
285        /// Inline data: delta-encoded (doc_id, term_freq) pairs
286        /// Format: [delta_doc_id, term_freq, delta_doc_id, term_freq, ...]
287        data: [u8; 16],
288        /// Actual length of data used
289        data_len: u8,
290    },
291    /// Reference to external posting list in .post file
292    External {
293        posting_offset: u64,
294        posting_len: u32,
295        doc_freq: u32,
296        /// Position data offset (0 if no positions)
297        position_offset: u64,
298        /// Position data length (0 if no positions)
299        position_len: u32,
300    },
301}
302
303impl TermInfo {
304    /// Create an external reference
305    pub fn external(posting_offset: u64, posting_len: u32, doc_freq: u32) -> Self {
306        TermInfo::External {
307            posting_offset,
308            posting_len,
309            doc_freq,
310            position_offset: 0,
311            position_len: 0,
312        }
313    }
314
315    /// Create an external reference with position info
316    pub fn external_with_positions(
317        posting_offset: u64,
318        posting_len: u32,
319        doc_freq: u32,
320        position_offset: u64,
321        position_len: u32,
322    ) -> Self {
323        TermInfo::External {
324            posting_offset,
325            posting_len,
326            doc_freq,
327            position_offset,
328            position_len,
329        }
330    }
331
332    /// Try to create an inline TermInfo from posting data
333    /// Returns None if posting list is too large to inline
334    pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
335        if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
336            return None;
337        }
338
339        let mut data = [0u8; 16];
340        let mut cursor = std::io::Cursor::new(&mut data[..]);
341        let mut prev_doc_id = 0u32;
342
343        for (i, &doc_id) in doc_ids.iter().enumerate() {
344            let delta = doc_id - prev_doc_id;
345            if write_vint(&mut cursor, delta as u64).is_err() {
346                return None;
347            }
348            if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
349                return None;
350            }
351            prev_doc_id = doc_id;
352        }
353
354        let data_len = cursor.position() as u8;
355        if data_len > 16 {
356            return None;
357        }
358
359        Some(TermInfo::Inline {
360            doc_freq: doc_ids.len() as u8,
361            data,
362            data_len,
363        })
364    }
365
366    /// Try to create an inline TermInfo from an iterator of (doc_id, term_freq) pairs.
367    /// Zero-allocation alternative to `try_inline` — avoids collecting into Vec<u32>.
368    /// `count` is the number of postings (must match iterator length).
369    pub fn try_inline_iter(count: usize, iter: impl Iterator<Item = (u32, u32)>) -> Option<Self> {
370        if count > MAX_INLINE_POSTINGS || count == 0 {
371            return None;
372        }
373
374        let mut data = [0u8; 16];
375        let mut cursor = std::io::Cursor::new(&mut data[..]);
376        let mut prev_doc_id = 0u32;
377
378        for (doc_id, tf) in iter {
379            let delta = doc_id - prev_doc_id;
380            if write_vint(&mut cursor, delta as u64).is_err() {
381                return None;
382            }
383            if write_vint(&mut cursor, tf as u64).is_err() {
384                return None;
385            }
386            prev_doc_id = doc_id;
387        }
388
389        let data_len = cursor.position() as u8;
390
391        Some(TermInfo::Inline {
392            doc_freq: count as u8,
393            data,
394            data_len,
395        })
396    }
397
398    /// Get document frequency
399    pub fn doc_freq(&self) -> u32 {
400        match self {
401            TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
402            TermInfo::External { doc_freq, .. } => *doc_freq,
403        }
404    }
405
406    /// Check if this is an inline posting list
407    pub fn is_inline(&self) -> bool {
408        matches!(self, TermInfo::Inline { .. })
409    }
410
411    /// Get external posting info (offset, len) - returns None for inline
412    pub fn external_info(&self) -> Option<(u64, u32)> {
413        match self {
414            TermInfo::External {
415                posting_offset,
416                posting_len,
417                ..
418            } => Some((*posting_offset, *posting_len)),
419            TermInfo::Inline { .. } => None,
420        }
421    }
422
423    /// Get position info (offset, len) - returns None for inline or if no positions
424    pub fn position_info(&self) -> Option<(u64, u32)> {
425        match self {
426            TermInfo::External {
427                position_offset,
428                position_len,
429                ..
430            } if *position_len > 0 => Some((*position_offset, *position_len)),
431            _ => None,
432        }
433    }
434
435    /// Decode inline postings into (doc_ids, term_freqs)
436    /// Returns None if this is an external reference
437    pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
438        match self {
439            TermInfo::Inline {
440                doc_freq,
441                data,
442                data_len,
443            } => {
444                let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
445                let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
446                let mut reader = &data[..*data_len as usize];
447                let mut prev_doc_id = 0u32;
448
449                for _ in 0..*doc_freq {
450                    let delta = read_vint(&mut reader).ok()? as u32;
451                    let tf = read_vint(&mut reader).ok()? as u32;
452                    let doc_id = prev_doc_id + delta;
453                    doc_ids.push(doc_id);
454                    term_freqs.push(tf);
455                    prev_doc_id = doc_id;
456                }
457
458                Some((doc_ids, term_freqs))
459            }
460            TermInfo::External { .. } => None,
461        }
462    }
463}
464
465impl SSTableValue for TermInfo {
466    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
467        match self {
468            TermInfo::Inline {
469                doc_freq,
470                data,
471                data_len,
472            } => {
473                // Tag byte 0xFF = inline marker
474                writer.write_u8(0xFF)?;
475                writer.write_u8(*doc_freq)?;
476                writer.write_u8(*data_len)?;
477                writer.write_all(&data[..*data_len as usize])?;
478            }
479            TermInfo::External {
480                posting_offset,
481                posting_len,
482                doc_freq,
483                position_offset,
484                position_len,
485            } => {
486                // Tag byte 0x00 = external marker (no positions)
487                // Tag byte 0x01 = external with positions
488                if *position_len > 0 {
489                    writer.write_u8(0x01)?;
490                    write_vint(writer, *doc_freq as u64)?;
491                    write_vint(writer, *posting_offset)?;
492                    write_vint(writer, *posting_len as u64)?;
493                    write_vint(writer, *position_offset)?;
494                    write_vint(writer, *position_len as u64)?;
495                } else {
496                    writer.write_u8(0x00)?;
497                    write_vint(writer, *doc_freq as u64)?;
498                    write_vint(writer, *posting_offset)?;
499                    write_vint(writer, *posting_len as u64)?;
500                }
501            }
502        }
503        Ok(())
504    }
505
506    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
507        let tag = reader.read_u8()?;
508
509        if tag == 0xFF {
510            // Inline
511            let doc_freq = reader.read_u8()?;
512            let data_len = reader.read_u8()?;
513            let mut data = [0u8; 16];
514            reader.read_exact(&mut data[..data_len as usize])?;
515            Ok(TermInfo::Inline {
516                doc_freq,
517                data,
518                data_len,
519            })
520        } else if tag == 0x00 {
521            // External (no positions)
522            let doc_freq = read_vint(reader)? as u32;
523            let posting_offset = read_vint(reader)?;
524            let posting_len = read_vint(reader)? as u32;
525            Ok(TermInfo::External {
526                posting_offset,
527                posting_len,
528                doc_freq,
529                position_offset: 0,
530                position_len: 0,
531            })
532        } else if tag == 0x01 {
533            // External with positions
534            let doc_freq = read_vint(reader)? as u32;
535            let posting_offset = read_vint(reader)?;
536            let posting_len = read_vint(reader)? as u32;
537            let position_offset = read_vint(reader)?;
538            let position_len = read_vint(reader)? as u32;
539            Ok(TermInfo::External {
540                posting_offset,
541                posting_len,
542                doc_freq,
543                position_offset,
544                position_len,
545            })
546        } else {
547            Err(io::Error::new(
548                io::ErrorKind::InvalidData,
549                format!("Invalid TermInfo tag: {}", tag),
550            ))
551        }
552    }
553}
554
555/// Write variable-length integer
556pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
557    loop {
558        let byte = (value & 0x7F) as u8;
559        value >>= 7;
560        if value == 0 {
561            writer.write_u8(byte)?;
562            return Ok(());
563        } else {
564            writer.write_u8(byte | 0x80)?;
565        }
566    }
567}
568
569/// Read variable-length integer
570pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
571    let mut result = 0u64;
572    let mut shift = 0;
573
574    loop {
575        let byte = reader.read_u8()?;
576        result |= ((byte & 0x7F) as u64) << shift;
577        if byte & 0x80 == 0 {
578            return Ok(result);
579        }
580        shift += 7;
581        if shift >= 64 {
582            return Err(io::Error::new(
583                io::ErrorKind::InvalidData,
584                "varint too long",
585            ));
586        }
587    }
588}
589
590/// Compute common prefix length
591pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
592    a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
593}
594
595/// SSTable statistics for debugging
596#[derive(Debug, Clone)]
597pub struct SSTableStats {
598    pub num_blocks: usize,
599    pub num_sparse_entries: usize,
600    pub num_entries: u64,
601    pub has_bloom_filter: bool,
602    pub has_dictionary: bool,
603    pub bloom_filter_size: usize,
604    pub dictionary_size: usize,
605}
606
607/// SSTable writer configuration
608#[derive(Debug, Clone)]
609pub struct SSTableWriterConfig {
610    /// Compression level (1-22, higher = better compression but slower)
611    pub compression_level: CompressionLevel,
612    /// Whether to train and use a dictionary for compression
613    pub use_dictionary: bool,
614    /// Dictionary size in bytes (default 64KB)
615    pub dict_size: usize,
616    /// Whether to build a bloom filter
617    pub use_bloom_filter: bool,
618    /// Bloom filter bits per key (default 10 = ~1% false positive rate)
619    pub bloom_bits_per_key: usize,
620}
621
622impl Default for SSTableWriterConfig {
623    fn default() -> Self {
624        Self::from_optimization(crate::structures::IndexOptimization::default())
625    }
626}
627
628impl SSTableWriterConfig {
629    /// Create config from IndexOptimization mode
630    pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
631        use crate::structures::IndexOptimization;
632        match optimization {
633            IndexOptimization::Adaptive => Self {
634                compression_level: CompressionLevel::BETTER, // Level 9
635                use_dictionary: false,
636                dict_size: DEFAULT_DICT_SIZE,
637                use_bloom_filter: false,
638                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
639            },
640            IndexOptimization::SizeOptimized => Self {
641                compression_level: CompressionLevel::MAX, // Level 22
642                use_dictionary: true,
643                dict_size: DEFAULT_DICT_SIZE,
644                use_bloom_filter: true,
645                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
646            },
647            IndexOptimization::PerformanceOptimized => Self {
648                compression_level: CompressionLevel::FAST, // Level 1
649                use_dictionary: false,
650                dict_size: DEFAULT_DICT_SIZE,
651                use_bloom_filter: true, // Bloom helps skip blocks fast
652                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
653            },
654        }
655    }
656
657    /// Fast configuration - prioritize write speed over compression
658    pub fn fast() -> Self {
659        Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
660    }
661
662    /// Maximum compression configuration - prioritize size over speed
663    pub fn max_compression() -> Self {
664        Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
665    }
666}
667
668/// SSTable writer with optimizations:
669/// - Dictionary compression for blocks (if dictionary provided)
670/// - Configurable compression level
671/// - Block index prefix compression
672/// - Bloom filter for fast negative lookups
673pub struct SSTableWriter<W: Write, V: SSTableValue> {
674    writer: W,
675    block_buffer: Vec<u8>,
676    prev_key: Vec<u8>,
677    index: Vec<BlockIndexEntry>,
678    current_offset: u64,
679    num_entries: u64,
680    block_first_key: Option<Vec<u8>>,
681    config: SSTableWriterConfig,
682    /// Pre-trained dictionary for compression (optional)
683    dictionary: Option<CompressionDict>,
684    /// Bloom filter key hashes — compact (u64, u64) pairs instead of full keys.
685    /// Filter is built at finish() time with correct sizing.
686    bloom_hashes: Vec<(u64, u64)>,
687    _phantom: std::marker::PhantomData<V>,
688}
689
690impl<W: Write, V: SSTableValue> SSTableWriter<W, V> {
691    /// Create a new SSTable writer with default configuration
692    pub fn new(writer: W) -> Self {
693        Self::with_config(writer, SSTableWriterConfig::default())
694    }
695
696    /// Create a new SSTable writer with custom configuration
697    pub fn with_config(writer: W, config: SSTableWriterConfig) -> Self {
698        Self {
699            writer,
700            block_buffer: Vec::with_capacity(BLOCK_SIZE),
701            prev_key: Vec::new(),
702            index: Vec::new(),
703            current_offset: 0,
704            num_entries: 0,
705            block_first_key: None,
706            config,
707            dictionary: None,
708            bloom_hashes: Vec::new(),
709            _phantom: std::marker::PhantomData,
710        }
711    }
712
713    /// Create a new SSTable writer with a pre-trained dictionary
714    pub fn with_dictionary(
715        writer: W,
716        config: SSTableWriterConfig,
717        dictionary: CompressionDict,
718    ) -> Self {
719        Self {
720            writer,
721            block_buffer: Vec::with_capacity(BLOCK_SIZE),
722            prev_key: Vec::new(),
723            index: Vec::new(),
724            current_offset: 0,
725            num_entries: 0,
726            block_first_key: None,
727            config,
728            dictionary: Some(dictionary),
729            bloom_hashes: Vec::new(),
730            _phantom: std::marker::PhantomData,
731        }
732    }
733
734    pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
735        if self.block_first_key.is_none() {
736            self.block_first_key = Some(key.to_vec());
737        }
738
739        // Store compact hash pair for bloom filter (16 bytes vs ~48+ per key)
740        if self.config.use_bloom_filter {
741            self.bloom_hashes.push(bloom_hash_pair(key));
742        }
743
744        let prefix_len = common_prefix_len(&self.prev_key, key);
745        let suffix = &key[prefix_len..];
746
747        write_vint(&mut self.block_buffer, prefix_len as u64)?;
748        write_vint(&mut self.block_buffer, suffix.len() as u64)?;
749        self.block_buffer.extend_from_slice(suffix);
750        value.serialize(&mut self.block_buffer)?;
751
752        self.prev_key.clear();
753        self.prev_key.extend_from_slice(key);
754        self.num_entries += 1;
755
756        if self.block_buffer.len() >= BLOCK_SIZE {
757            self.flush_block()?;
758        }
759
760        Ok(())
761    }
762
763    /// Flush and compress the current block
764    fn flush_block(&mut self) -> io::Result<()> {
765        if self.block_buffer.is_empty() {
766            return Ok(());
767        }
768
769        // Compress block with dictionary if available
770        let compressed = if let Some(ref dict) = self.dictionary {
771            crate::compression::compress_with_dict(
772                &self.block_buffer,
773                self.config.compression_level,
774                dict,
775            )?
776        } else {
777            crate::compression::compress(&self.block_buffer, self.config.compression_level)?
778        };
779
780        if let Some(first_key) = self.block_first_key.take() {
781            self.index.push(BlockIndexEntry {
782                first_key,
783                offset: self.current_offset,
784                length: compressed.len() as u32,
785            });
786        }
787
788        self.writer.write_all(&compressed)?;
789        self.current_offset += compressed.len() as u64;
790        self.block_buffer.clear();
791        self.prev_key.clear();
792
793        Ok(())
794    }
795
796    pub fn finish(mut self) -> io::Result<W> {
797        // Flush any remaining data
798        self.flush_block()?;
799
800        // Build bloom filter from collected hashes (properly sized)
801        let bloom_filter = if self.config.use_bloom_filter && !self.bloom_hashes.is_empty() {
802            let mut bloom =
803                BloomFilter::new(self.bloom_hashes.len(), self.config.bloom_bits_per_key);
804            for (h1, h2) in &self.bloom_hashes {
805                bloom.insert_hashed(*h1, *h2);
806            }
807            Some(bloom)
808        } else {
809            None
810        };
811
812        let data_end_offset = self.current_offset;
813
814        // Build memory-efficient block index (v4 format)
815        // Convert to (key, BlockAddr) pairs for the new index format
816        let entries: Vec<(Vec<u8>, BlockAddr)> = self
817            .index
818            .iter()
819            .map(|e| {
820                (
821                    e.first_key.clone(),
822                    BlockAddr {
823                        offset: e.offset,
824                        length: e.length,
825                    },
826                )
827            })
828            .collect();
829
830        // Build FST-based index if native feature is enabled, otherwise use mmap index
831        #[cfg(feature = "native")]
832        let index_bytes = FstBlockIndex::build(&entries)?;
833        #[cfg(not(feature = "native"))]
834        let index_bytes = MmapBlockIndex::build(&entries)?;
835
836        // Write index bytes with length prefix
837        self.writer
838            .write_u32::<LittleEndian>(index_bytes.len() as u32)?;
839        self.writer.write_all(&index_bytes)?;
840        self.current_offset += 4 + index_bytes.len() as u64;
841
842        // Write bloom filter if present
843        let bloom_offset = if let Some(ref bloom) = bloom_filter {
844            let bloom_data = bloom.to_bytes();
845            let offset = self.current_offset;
846            self.writer.write_all(&bloom_data)?;
847            self.current_offset += bloom_data.len() as u64;
848            offset
849        } else {
850            0
851        };
852
853        // Write dictionary if present
854        let dict_offset = if let Some(ref dict) = self.dictionary {
855            let dict_bytes = dict.as_bytes();
856            let offset = self.current_offset;
857            self.writer
858                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
859            self.writer.write_all(dict_bytes)?;
860            self.current_offset += 4 + dict_bytes.len() as u64;
861            offset
862        } else {
863            0
864        };
865
866        // Write extended footer (v4 format)
867        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
868        self.writer.write_u64::<LittleEndian>(self.num_entries)?;
869        self.writer.write_u64::<LittleEndian>(bloom_offset)?; // 0 if no bloom
870        self.writer.write_u64::<LittleEndian>(dict_offset)?; // 0 if no dict
871        self.writer
872            .write_u8(self.config.compression_level.0 as u8)?;
873        self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
874
875        Ok(self.writer)
876    }
877}
878
879/// Block index entry
880#[derive(Debug, Clone)]
881struct BlockIndexEntry {
882    first_key: Vec<u8>,
883    offset: u64,
884    length: u32,
885}
886
887/// Async SSTable reader - loads blocks on demand via LazyFileSlice
888///
889/// Memory-efficient design (v4 format):
890/// - Block index uses FST (native) or mmap'd raw bytes - no heap allocation for keys
891/// - Block addresses stored in bitpacked format
892/// - Bloom filter and dictionary optional
893pub struct AsyncSSTableReader<V: SSTableValue> {
894    /// LazyFileSlice for the data portion (blocks only) - fetches ranges on demand
895    data_slice: LazyFileSlice,
896    /// Memory-efficient block index (v4: FST or mmap, v3: legacy Vec)
897    block_index: BlockIndex,
898    num_entries: u64,
899    /// Hot cache for decompressed blocks
900    cache: RwLock<BlockCache>,
901    /// Bloom filter for fast negative lookups (optional)
902    bloom_filter: Option<BloomFilter>,
903    /// Compression dictionary (optional)
904    dictionary: Option<CompressionDict>,
905    /// Compression level used
906    #[allow(dead_code)]
907    compression_level: CompressionLevel,
908    _phantom: std::marker::PhantomData<V>,
909}
910
911/// LRU block cache — O(1) lookup/insert, amortized O(n) promotion.
912///
913/// On `get()`, promotes the accessed entry to MRU position.
914/// On eviction, removes the LRU entry (front of VecDeque).
915/// For typical cache sizes (16-64 blocks), the linear scan in
916/// `promote()` is negligible compared to I/O savings.
917struct BlockCache {
918    blocks: FxHashMap<u64, Arc<[u8]>>,
919    lru_order: std::collections::VecDeque<u64>,
920    max_blocks: usize,
921}
922
923impl BlockCache {
924    fn new(max_blocks: usize) -> Self {
925        Self {
926            blocks: FxHashMap::default(),
927            lru_order: std::collections::VecDeque::with_capacity(max_blocks),
928            max_blocks,
929        }
930    }
931
932    fn get(&mut self, offset: u64) -> Option<Arc<[u8]>> {
933        if self.blocks.contains_key(&offset) {
934            self.promote(offset);
935            self.blocks.get(&offset).map(Arc::clone)
936        } else {
937            None
938        }
939    }
940
941    fn insert(&mut self, offset: u64, block: Arc<[u8]>) {
942        if self.blocks.contains_key(&offset) {
943            self.promote(offset);
944            return;
945        }
946        while self.blocks.len() >= self.max_blocks {
947            if let Some(evict_offset) = self.lru_order.pop_front() {
948                self.blocks.remove(&evict_offset);
949            } else {
950                break;
951            }
952        }
953        self.blocks.insert(offset, block);
954        self.lru_order.push_back(offset);
955    }
956
957    /// Move entry to MRU position (back of deque)
958    fn promote(&mut self, offset: u64) {
959        if let Some(pos) = self.lru_order.iter().position(|&k| k == offset) {
960            self.lru_order.remove(pos);
961            self.lru_order.push_back(offset);
962        }
963    }
964}
965
966impl<V: SSTableValue> AsyncSSTableReader<V> {
967    /// Open an SSTable from a LazyFileHandle
968    /// Only loads the footer and index into memory, data blocks fetched on-demand
969    ///
970    /// Supports both v3 (legacy) and v4 (memory-efficient) formats:
971    /// - v4: FST-based or mmap'd block index (no heap allocation for keys)
972    /// - v3: Prefix-compressed block index loaded into Vec (legacy fallback)
973    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
974        let file_len = file_handle.len();
975        if file_len < 37 {
976            return Err(io::Error::new(
977                io::ErrorKind::InvalidData,
978                "SSTable too small",
979            ));
980        }
981
982        // Read footer (37 bytes)
983        // Format: data_end(8) + num_entries(8) + bloom_offset(8) + dict_offset(8) + compression_level(1) + magic(4)
984        let footer_bytes = file_handle
985            .read_bytes_range(file_len - 37..file_len)
986            .await?;
987
988        let mut reader = footer_bytes.as_slice();
989        let data_end_offset = reader.read_u64::<LittleEndian>()?;
990        let num_entries = reader.read_u64::<LittleEndian>()?;
991        let bloom_offset = reader.read_u64::<LittleEndian>()?;
992        let dict_offset = reader.read_u64::<LittleEndian>()?;
993        let compression_level = CompressionLevel(reader.read_u8()? as i32);
994        let magic = reader.read_u32::<LittleEndian>()?;
995
996        if magic != SSTABLE_MAGIC {
997            return Err(io::Error::new(
998                io::ErrorKind::InvalidData,
999                format!("Invalid SSTable magic: 0x{:08X}", magic),
1000            ));
1001        }
1002
1003        // Read index section
1004        let index_start = data_end_offset;
1005        let index_end = file_len - 37;
1006        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
1007
1008        // Parse block index (length-prefixed FST or mmap index)
1009        let mut idx_reader = index_bytes.as_slice();
1010        let index_len = idx_reader.read_u32::<LittleEndian>()? as usize;
1011
1012        if index_len > idx_reader.len() {
1013            return Err(io::Error::new(
1014                io::ErrorKind::InvalidData,
1015                "Index data truncated",
1016            ));
1017        }
1018
1019        let index_data = OwnedBytes::new(idx_reader[..index_len].to_vec());
1020
1021        // Try FST first (native), fall back to mmap
1022        #[cfg(feature = "native")]
1023        let block_index = match FstBlockIndex::load(index_data.clone()) {
1024            Ok(fst_idx) => BlockIndex::Fst(fst_idx),
1025            Err(_) => BlockIndex::Mmap(MmapBlockIndex::load(index_data)?),
1026        };
1027        #[cfg(not(feature = "native"))]
1028        let block_index = BlockIndex::Mmap(MmapBlockIndex::load(index_data)?);
1029
1030        // Load bloom filter if present
1031        let bloom_filter = if bloom_offset > 0 {
1032            let bloom_start = bloom_offset;
1033            // Read bloom filter size first (12 bytes header)
1034            let bloom_header = file_handle
1035                .read_bytes_range(bloom_start..bloom_start + 12)
1036                .await?;
1037            let num_words = u32::from_le_bytes([
1038                bloom_header[8],
1039                bloom_header[9],
1040                bloom_header[10],
1041                bloom_header[11],
1042            ]) as u64;
1043            let bloom_size = 12 + num_words * 8;
1044            let bloom_data = file_handle
1045                .read_bytes_range(bloom_start..bloom_start + bloom_size)
1046                .await?;
1047            Some(BloomFilter::from_bytes(&bloom_data)?)
1048        } else {
1049            None
1050        };
1051
1052        // Load dictionary if present
1053        let dictionary = if dict_offset > 0 {
1054            let dict_start = dict_offset;
1055            // Read dictionary size first
1056            let dict_len_bytes = file_handle
1057                .read_bytes_range(dict_start..dict_start + 4)
1058                .await?;
1059            let dict_len = u32::from_le_bytes([
1060                dict_len_bytes[0],
1061                dict_len_bytes[1],
1062                dict_len_bytes[2],
1063                dict_len_bytes[3],
1064            ]) as u64;
1065            let dict_data = file_handle
1066                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
1067                .await?;
1068            Some(CompressionDict::from_bytes(dict_data.to_vec()))
1069        } else {
1070            None
1071        };
1072
1073        // Create a lazy slice for just the data portion
1074        let data_slice = file_handle.slice(0..data_end_offset);
1075
1076        Ok(Self {
1077            data_slice,
1078            block_index,
1079            num_entries,
1080            cache: RwLock::new(BlockCache::new(cache_blocks)),
1081            bloom_filter,
1082            dictionary,
1083            compression_level,
1084            _phantom: std::marker::PhantomData,
1085        })
1086    }
1087
1088    /// Number of entries
1089    pub fn num_entries(&self) -> u64 {
1090        self.num_entries
1091    }
1092
1093    /// Get stats about this SSTable for debugging
1094    pub fn stats(&self) -> SSTableStats {
1095        SSTableStats {
1096            num_blocks: self.block_index.len(),
1097            num_sparse_entries: 0, // No longer using sparse index separately
1098            num_entries: self.num_entries,
1099            has_bloom_filter: self.bloom_filter.is_some(),
1100            has_dictionary: self.dictionary.is_some(),
1101            bloom_filter_size: self
1102                .bloom_filter
1103                .as_ref()
1104                .map(|b| b.size_bytes())
1105                .unwrap_or(0),
1106            dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
1107        }
1108    }
1109
1110    /// Number of blocks currently in the cache
1111    pub fn cached_blocks(&self) -> usize {
1112        self.cache.read().blocks.len()
1113    }
1114
1115    /// Look up a key (async - may need to load block)
1116    ///
1117    /// Uses bloom filter for fast negative lookups, then memory-efficient
1118    /// block index to locate the block, reducing I/O to typically 1 block read.
1119    pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
1120        log::debug!(
1121            "SSTable::get called, key_len={}, total_blocks={}",
1122            key.len(),
1123            self.block_index.len()
1124        );
1125
1126        // Check bloom filter first - fast negative lookup
1127        if let Some(ref bloom) = self.bloom_filter
1128            && !bloom.may_contain(key)
1129        {
1130            log::debug!("SSTable::get bloom filter negative");
1131            return Ok(None);
1132        }
1133
1134        // Use block index to find the block that could contain the key
1135        let block_idx = match self.block_index.locate(key) {
1136            Some(idx) => idx,
1137            None => {
1138                log::debug!("SSTable::get key not found (before first block)");
1139                return Ok(None);
1140            }
1141        };
1142
1143        log::debug!("SSTable::get loading block_idx={}", block_idx);
1144
1145        // Now we know exactly which block to load - single I/O
1146        let block_data = self.load_block(block_idx).await?;
1147        self.search_block(&block_data, key)
1148    }
1149
1150    /// Batch lookup multiple keys with optimized I/O
1151    ///
1152    /// Groups keys by block and loads each block only once, reducing
1153    /// I/O from N reads to at most N reads (often fewer if keys share blocks).
1154    /// Uses bloom filter to skip keys that definitely don't exist.
1155    pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
1156        if keys.is_empty() {
1157            return Ok(Vec::new());
1158        }
1159
1160        // Map each key to its block index
1161        let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
1162        for (key_idx, key) in keys.iter().enumerate() {
1163            // Check bloom filter first
1164            if let Some(ref bloom) = self.bloom_filter
1165                && !bloom.may_contain(key)
1166            {
1167                key_to_block.push((key_idx, usize::MAX)); // Definitely not present
1168                continue;
1169            }
1170
1171            match self.block_index.locate(key) {
1172                Some(block_idx) => key_to_block.push((key_idx, block_idx)),
1173                None => key_to_block.push((key_idx, usize::MAX)), // Mark as not found
1174            }
1175        }
1176
1177        // Group keys by block
1178        let mut blocks_to_load: Vec<usize> = key_to_block
1179            .iter()
1180            .filter(|(_, b)| *b != usize::MAX)
1181            .map(|(_, b)| *b)
1182            .collect();
1183        blocks_to_load.sort_unstable();
1184        blocks_to_load.dedup();
1185
1186        // Load all needed blocks (this is where I/O happens)
1187        for &block_idx in &blocks_to_load {
1188            let _ = self.load_block(block_idx).await?;
1189        }
1190
1191        // Now search each key in its block (all blocks are cached)
1192        let mut results = vec![None; keys.len()];
1193        for (key_idx, block_idx) in key_to_block {
1194            if block_idx == usize::MAX {
1195                continue;
1196            }
1197            let block_data = self.load_block(block_idx).await?; // Will hit cache
1198            results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
1199        }
1200
1201        Ok(results)
1202    }
1203
1204    /// Preload all data blocks into memory
1205    ///
1206    /// Call this after open() to eliminate all I/O during subsequent lookups.
1207    /// Useful when the SSTable is small enough to fit in memory.
1208    pub async fn preload_all_blocks(&self) -> io::Result<()> {
1209        for block_idx in 0..self.block_index.len() {
1210            self.load_block(block_idx).await?;
1211        }
1212        Ok(())
1213    }
1214
1215    /// Prefetch all data blocks via a single bulk I/O operation.
1216    ///
1217    /// Reads the entire compressed data section in one call, then decompresses
1218    /// each block and populates the cache. This turns N individual reads into 1.
1219    /// Cache capacity is expanded to hold all blocks.
1220    pub async fn prefetch_all_data_bulk(&self) -> io::Result<()> {
1221        let num_blocks = self.block_index.len();
1222        if num_blocks == 0 {
1223            return Ok(());
1224        }
1225
1226        // Find total data extent
1227        let mut max_end: u64 = 0;
1228        for i in 0..num_blocks {
1229            if let Some(addr) = self.block_index.get_addr(i) {
1230                max_end = max_end.max(addr.offset + addr.length as u64);
1231            }
1232        }
1233
1234        // Single bulk read of entire data section
1235        let all_data = self.data_slice.read_bytes_range(0..max_end).await?;
1236        let buf = all_data.as_slice();
1237
1238        // Expand cache and decompress all blocks
1239        let mut cache = self.cache.write();
1240        cache.max_blocks = cache.max_blocks.max(num_blocks);
1241        for i in 0..num_blocks {
1242            let addr = self.block_index.get_addr(i).unwrap();
1243            if cache.get(addr.offset).is_some() {
1244                continue;
1245            }
1246            let compressed =
1247                &buf[addr.offset as usize..(addr.offset + addr.length as u64) as usize];
1248            let decompressed = if let Some(ref dict) = self.dictionary {
1249                crate::compression::decompress_with_dict(compressed, dict)?
1250            } else {
1251                crate::compression::decompress(compressed)?
1252            };
1253            cache.insert(addr.offset, Arc::from(decompressed));
1254        }
1255
1256        Ok(())
1257    }
1258
1259    /// Load a block (checks cache first, then loads from FileSlice)
1260    /// Uses dictionary decompression if dictionary is present
1261    async fn load_block(&self, block_idx: usize) -> io::Result<Arc<[u8]>> {
1262        let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
1263            io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
1264        })?;
1265
1266        // Check cache (write lock for LRU promotion on hit)
1267        {
1268            if let Some(block) = self.cache.write().get(addr.offset) {
1269                return Ok(block);
1270            }
1271        }
1272
1273        log::debug!(
1274            "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
1275            block_idx,
1276            addr.offset,
1277            addr.offset + addr.length as u64
1278        );
1279
1280        // Load from FileSlice
1281        let range = addr.byte_range();
1282        let compressed = self.data_slice.read_bytes_range(range).await?;
1283
1284        // Decompress with dictionary if available
1285        let decompressed = if let Some(ref dict) = self.dictionary {
1286            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1287        } else {
1288            crate::compression::decompress(compressed.as_slice())?
1289        };
1290
1291        let block: Arc<[u8]> = Arc::from(decompressed);
1292
1293        // Insert into cache
1294        {
1295            let mut cache = self.cache.write();
1296            cache.insert(addr.offset, Arc::clone(&block));
1297        }
1298
1299        Ok(block)
1300    }
1301
1302    fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
1303        let mut reader = block_data;
1304        let mut current_key = Vec::new();
1305
1306        while !reader.is_empty() {
1307            let common_prefix_len = read_vint(&mut reader)? as usize;
1308            let suffix_len = read_vint(&mut reader)? as usize;
1309
1310            current_key.truncate(common_prefix_len);
1311            let mut suffix = vec![0u8; suffix_len];
1312            reader.read_exact(&mut suffix)?;
1313            current_key.extend_from_slice(&suffix);
1314
1315            let value = V::deserialize(&mut reader)?;
1316
1317            match current_key.as_slice().cmp(target_key) {
1318                std::cmp::Ordering::Equal => return Ok(Some(value)),
1319                std::cmp::Ordering::Greater => return Ok(None),
1320                std::cmp::Ordering::Less => continue,
1321            }
1322        }
1323
1324        Ok(None)
1325    }
1326
1327    /// Prefetch blocks for a key range
1328    pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
1329        let start_block = self.block_index.locate(start_key).unwrap_or(0);
1330        let end_block = self
1331            .block_index
1332            .locate(end_key)
1333            .unwrap_or(self.block_index.len().saturating_sub(1));
1334
1335        for block_idx in start_block..=end_block.min(self.block_index.len().saturating_sub(1)) {
1336            let _ = self.load_block(block_idx).await?;
1337        }
1338
1339        Ok(())
1340    }
1341
1342    /// Iterate over all entries (loads blocks as needed)
1343    pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
1344        AsyncSSTableIterator::new(self)
1345    }
1346
1347    /// Get all entries as a vector (for merging)
1348    pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
1349        let mut results = Vec::new();
1350
1351        for block_idx in 0..self.block_index.len() {
1352            let block_data = self.load_block(block_idx).await?;
1353            let mut reader = &block_data[..];
1354            let mut current_key = Vec::new();
1355
1356            while !reader.is_empty() {
1357                let common_prefix_len = read_vint(&mut reader)? as usize;
1358                let suffix_len = read_vint(&mut reader)? as usize;
1359
1360                current_key.truncate(common_prefix_len);
1361                let mut suffix = vec![0u8; suffix_len];
1362                reader.read_exact(&mut suffix)?;
1363                current_key.extend_from_slice(&suffix);
1364
1365                let value = V::deserialize(&mut reader)?;
1366                results.push((current_key.clone(), value));
1367            }
1368        }
1369
1370        Ok(results)
1371    }
1372}
1373
1374/// Async iterator over SSTable entries
1375pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
1376    reader: &'a AsyncSSTableReader<V>,
1377    current_block: usize,
1378    block_data: Option<Arc<[u8]>>,
1379    block_offset: usize,
1380    current_key: Vec<u8>,
1381    finished: bool,
1382}
1383
1384impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
1385    fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
1386        Self {
1387            reader,
1388            current_block: 0,
1389            block_data: None,
1390            block_offset: 0,
1391            current_key: Vec::new(),
1392            finished: reader.block_index.is_empty(),
1393        }
1394    }
1395
1396    async fn load_next_block(&mut self) -> io::Result<bool> {
1397        if self.current_block >= self.reader.block_index.len() {
1398            self.finished = true;
1399            return Ok(false);
1400        }
1401
1402        self.block_data = Some(self.reader.load_block(self.current_block).await?);
1403        self.block_offset = 0;
1404        self.current_key.clear();
1405        self.current_block += 1;
1406        Ok(true)
1407    }
1408
1409    /// Advance to next entry (async)
1410    pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
1411        if self.finished {
1412            return Ok(None);
1413        }
1414
1415        if self.block_data.is_none() && !self.load_next_block().await? {
1416            return Ok(None);
1417        }
1418
1419        loop {
1420            let block = self.block_data.as_ref().unwrap();
1421            if self.block_offset >= block.len() {
1422                if !self.load_next_block().await? {
1423                    return Ok(None);
1424                }
1425                continue;
1426            }
1427
1428            let mut reader = &block[self.block_offset..];
1429            let start_len = reader.len();
1430
1431            let common_prefix_len = read_vint(&mut reader)? as usize;
1432            let suffix_len = read_vint(&mut reader)? as usize;
1433
1434            self.current_key.truncate(common_prefix_len);
1435            let mut suffix = vec![0u8; suffix_len];
1436            reader.read_exact(&mut suffix)?;
1437            self.current_key.extend_from_slice(&suffix);
1438
1439            let value = V::deserialize(&mut reader)?;
1440
1441            self.block_offset += start_len - reader.len();
1442
1443            return Ok(Some((self.current_key.clone(), value)));
1444        }
1445    }
1446}
1447
1448#[cfg(test)]
1449mod tests {
1450    use super::*;
1451
1452    #[test]
1453    fn test_bloom_filter_basic() {
1454        let mut bloom = BloomFilter::new(100, 10);
1455
1456        bloom.insert(b"hello");
1457        bloom.insert(b"world");
1458        bloom.insert(b"test");
1459
1460        assert!(bloom.may_contain(b"hello"));
1461        assert!(bloom.may_contain(b"world"));
1462        assert!(bloom.may_contain(b"test"));
1463
1464        // These should likely return false (with ~1% false positive rate)
1465        assert!(!bloom.may_contain(b"notfound"));
1466        assert!(!bloom.may_contain(b"missing"));
1467    }
1468
1469    #[test]
1470    fn test_bloom_filter_serialization() {
1471        let mut bloom = BloomFilter::new(100, 10);
1472        bloom.insert(b"key1");
1473        bloom.insert(b"key2");
1474
1475        let bytes = bloom.to_bytes();
1476        let restored = BloomFilter::from_bytes(&bytes).unwrap();
1477
1478        assert!(restored.may_contain(b"key1"));
1479        assert!(restored.may_contain(b"key2"));
1480        assert!(!restored.may_contain(b"key3"));
1481    }
1482
1483    #[test]
1484    fn test_bloom_filter_false_positive_rate() {
1485        let num_keys = 10000;
1486        let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
1487
1488        // Insert keys
1489        for i in 0..num_keys {
1490            let key = format!("key_{}", i);
1491            bloom.insert(key.as_bytes());
1492        }
1493
1494        // All inserted keys should be found
1495        for i in 0..num_keys {
1496            let key = format!("key_{}", i);
1497            assert!(bloom.may_contain(key.as_bytes()));
1498        }
1499
1500        // Check false positive rate on non-existent keys
1501        let mut false_positives = 0;
1502        let test_count = 10000;
1503        for i in 0..test_count {
1504            let key = format!("nonexistent_{}", i);
1505            if bloom.may_contain(key.as_bytes()) {
1506                false_positives += 1;
1507            }
1508        }
1509
1510        // With 10 bits per key, expect ~1% false positive rate
1511        // Allow up to 3% due to hash function variance
1512        let fp_rate = false_positives as f64 / test_count as f64;
1513        assert!(
1514            fp_rate < 0.03,
1515            "False positive rate {} is too high",
1516            fp_rate
1517        );
1518    }
1519
1520    #[test]
1521    fn test_sstable_writer_config() {
1522        use crate::structures::IndexOptimization;
1523
1524        // Default = Adaptive
1525        let config = SSTableWriterConfig::default();
1526        assert_eq!(config.compression_level.0, 9); // BETTER
1527        assert!(!config.use_bloom_filter);
1528        assert!(!config.use_dictionary);
1529
1530        // Adaptive
1531        let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
1532        assert_eq!(adaptive.compression_level.0, 9);
1533        assert!(!adaptive.use_bloom_filter);
1534        assert!(!adaptive.use_dictionary);
1535
1536        // SizeOptimized
1537        let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
1538        assert_eq!(size.compression_level.0, 22); // MAX
1539        assert!(size.use_bloom_filter);
1540        assert!(size.use_dictionary);
1541
1542        // PerformanceOptimized
1543        let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
1544        assert_eq!(perf.compression_level.0, 1); // FAST
1545        assert!(perf.use_bloom_filter); // Bloom helps skip blocks fast
1546        assert!(!perf.use_dictionary);
1547
1548        // Aliases
1549        let fast = SSTableWriterConfig::fast();
1550        assert_eq!(fast.compression_level.0, 1);
1551
1552        let max = SSTableWriterConfig::max_compression();
1553        assert_eq!(max.compression_level.0, 22);
1554    }
1555
1556    #[test]
1557    fn test_vint_roundtrip() {
1558        let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
1559
1560        for &val in &test_values {
1561            let mut buf = Vec::new();
1562            write_vint(&mut buf, val).unwrap();
1563            let mut reader = buf.as_slice();
1564            let decoded = read_vint(&mut reader).unwrap();
1565            assert_eq!(val, decoded, "Failed for value {}", val);
1566        }
1567    }
1568
1569    #[test]
1570    fn test_common_prefix_len() {
1571        assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
1572        assert_eq!(common_prefix_len(b"hello", b"help"), 3);
1573        assert_eq!(common_prefix_len(b"hello", b"world"), 0);
1574        assert_eq!(common_prefix_len(b"", b"hello"), 0);
1575        assert_eq!(common_prefix_len(b"hello", b""), 0);
1576    }
1577}