Skip to main content

hermes_core/structures/
sstable.rs

1//! Async SSTable with lazy loading via FileSlice
2//!
3//! Memory-efficient design - only loads minimal metadata into memory,
4//! blocks are loaded on-demand.
5//!
6//! ## Key Features
7//!
8//! 1. **FST-based Block Index**: Uses Finite State Transducer for key lookup
9//!    - Can be mmap'd directly without parsing into heap-allocated structures
10//!    - ~90% memory reduction compared to Vec<BlockIndexEntry>
11//!
12//! 2. **Bitpacked Block Addresses**: Offsets and lengths stored with delta encoding
13//!    - Minimal memory footprint for block metadata
14//!
15//! 3. **Dictionary Compression**: Zstd dictionary for 15-30% better compression
16//!
17//! 4. **Configurable Compression Level**: Levels 1-22 for space/speed tradeoff
18//!
19//! 5. **Bloom Filter**: Fast negative lookups to skip unnecessary I/O
20
21use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
22use parking_lot::RwLock;
23use rustc_hash::FxHashMap;
24use std::io::{self, Read, Write};
25use std::sync::Arc;
26
27#[cfg(feature = "native")]
28use super::sstable_index::FstBlockIndex;
29use super::sstable_index::{BlockAddr, BlockIndex, MmapBlockIndex};
30use crate::compression::{CompressionDict, CompressionLevel};
31use crate::directories::{FileHandle, OwnedBytes};
32
33/// SSTable magic number - version 4 with memory-efficient index
34/// Uses FST-based or mmap'd block index to avoid heap allocation
35pub const SSTABLE_MAGIC: u32 = 0x53544234; // "STB4"
36
37/// Block size for SSTable (16KB default)
38pub const BLOCK_SIZE: usize = 16 * 1024;
39
40/// Default dictionary size (64KB)
41pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
42
43/// Bloom filter bits per key (10 bits ≈ 1% false positive rate)
44pub const BLOOM_BITS_PER_KEY: usize = 10;
45
46/// Bloom filter hash count (optimal for 10 bits/key)
47pub const BLOOM_HASH_COUNT: usize = 7;
48
49// ============================================================================
50// Bloom Filter Implementation
51// ============================================================================
52
53/// Simple bloom filter for key existence checks
54#[derive(Debug, Clone)]
55pub struct BloomFilter {
56    bits: BloomBits,
57    num_bits: usize,
58    num_hashes: usize,
59}
60
61/// Bloom filter storage — Vec for write path, OwnedBytes for zero-copy read path.
62#[derive(Debug, Clone)]
63enum BloomBits {
64    /// Mutable storage for building (SSTable writer)
65    Vec(Vec<u64>),
66    /// Zero-copy mmap reference for reading (raw LE u64 words, no header)
67    Bytes(OwnedBytes),
68}
69
70impl BloomBits {
71    #[inline]
72    fn len(&self) -> usize {
73        match self {
74            BloomBits::Vec(v) => v.len(),
75            BloomBits::Bytes(b) => b.len() / 8,
76        }
77    }
78
79    #[inline]
80    fn get(&self, word_idx: usize) -> u64 {
81        match self {
82            BloomBits::Vec(v) => v[word_idx],
83            BloomBits::Bytes(b) => {
84                let off = word_idx * 8;
85                u64::from_le_bytes([
86                    b[off],
87                    b[off + 1],
88                    b[off + 2],
89                    b[off + 3],
90                    b[off + 4],
91                    b[off + 5],
92                    b[off + 6],
93                    b[off + 7],
94                ])
95            }
96        }
97    }
98
99    #[inline]
100    fn set_bit(&mut self, word_idx: usize, bit_idx: usize) {
101        match self {
102            BloomBits::Vec(v) => v[word_idx] |= 1u64 << bit_idx,
103            BloomBits::Bytes(_) => panic!("cannot mutate read-only bloom filter"),
104        }
105    }
106
107    fn size_bytes(&self) -> usize {
108        match self {
109            BloomBits::Vec(v) => v.len() * 8,
110            BloomBits::Bytes(b) => b.len(),
111        }
112    }
113}
114
115impl BloomFilter {
116    /// Create a new bloom filter sized for expected number of keys
117    pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
118        let num_bits = (expected_keys * bits_per_key).max(64);
119        let num_words = num_bits.div_ceil(64);
120        Self {
121            bits: BloomBits::Vec(vec![0u64; num_words]),
122            num_bits,
123            num_hashes: BLOOM_HASH_COUNT,
124        }
125    }
126
127    /// Create from serialized OwnedBytes (zero-copy for mmap)
128    pub fn from_owned_bytes(data: OwnedBytes) -> io::Result<Self> {
129        if data.len() < 12 {
130            return Err(io::Error::new(
131                io::ErrorKind::InvalidData,
132                "Bloom filter data too short",
133            ));
134        }
135        let d = data.as_slice();
136        let num_bits = u32::from_le_bytes([d[0], d[1], d[2], d[3]]) as usize;
137        let num_hashes = u32::from_le_bytes([d[4], d[5], d[6], d[7]]) as usize;
138        let num_words = u32::from_le_bytes([d[8], d[9], d[10], d[11]]) as usize;
139
140        if d.len() < 12 + num_words * 8 {
141            return Err(io::Error::new(
142                io::ErrorKind::InvalidData,
143                "Bloom filter data truncated",
144            ));
145        }
146
147        // Slice past the 12-byte header to get raw u64 LE words (zero-copy)
148        let bits_bytes = data.slice(12..12 + num_words * 8);
149
150        Ok(Self {
151            bits: BloomBits::Bytes(bits_bytes),
152            num_bits,
153            num_hashes,
154        })
155    }
156
157    /// Serialize to bytes
158    pub fn to_bytes(&self) -> Vec<u8> {
159        let num_words = self.bits.len();
160        let mut data = Vec::with_capacity(12 + num_words * 8);
161        data.write_u32::<LittleEndian>(self.num_bits as u32)
162            .unwrap();
163        data.write_u32::<LittleEndian>(self.num_hashes as u32)
164            .unwrap();
165        data.write_u32::<LittleEndian>(num_words as u32).unwrap();
166        for i in 0..num_words {
167            data.write_u64::<LittleEndian>(self.bits.get(i)).unwrap();
168        }
169        data
170    }
171
172    /// Add a key to the filter
173    pub fn insert(&mut self, key: &[u8]) {
174        let (h1, h2) = self.hash_pair(key);
175        for i in 0..self.num_hashes {
176            let bit_pos = self.get_bit_pos(h1, h2, i);
177            let word_idx = bit_pos / 64;
178            let bit_idx = bit_pos % 64;
179            if word_idx < self.bits.len() {
180                self.bits.set_bit(word_idx, bit_idx);
181            }
182        }
183    }
184
185    /// Check if a key might be in the filter
186    /// Returns false if definitely not present, true if possibly present
187    pub fn may_contain(&self, key: &[u8]) -> bool {
188        let (h1, h2) = self.hash_pair(key);
189        for i in 0..self.num_hashes {
190            let bit_pos = self.get_bit_pos(h1, h2, i);
191            let word_idx = bit_pos / 64;
192            let bit_idx = bit_pos % 64;
193            if word_idx >= self.bits.len() || (self.bits.get(word_idx) & (1u64 << bit_idx)) == 0 {
194                return false;
195            }
196        }
197        true
198    }
199
200    /// Size in bytes
201    pub fn size_bytes(&self) -> usize {
202        12 + self.bits.size_bytes()
203    }
204
205    /// Insert a pre-computed hash pair into the filter
206    pub fn insert_hashed(&mut self, h1: u64, h2: u64) {
207        for i in 0..self.num_hashes {
208            let bit_pos = self.get_bit_pos(h1, h2, i);
209            let word_idx = bit_pos / 64;
210            let bit_idx = bit_pos % 64;
211            if word_idx < self.bits.len() {
212                self.bits.set_bit(word_idx, bit_idx);
213            }
214        }
215    }
216
217    /// Compute two hash values using FNV-1a variant (single pass over key bytes)
218    #[inline]
219    fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
220        let mut h1: u64 = 0xcbf29ce484222325;
221        let mut h2: u64 = 0x84222325cbf29ce4;
222        for &byte in key {
223            h1 ^= byte as u64;
224            h1 = h1.wrapping_mul(0x100000001b3);
225            h2 = h2.wrapping_mul(0x100000001b3);
226            h2 ^= byte as u64;
227        }
228        (h1, h2)
229    }
230
231    /// Get bit position for hash iteration i using double hashing
232    #[inline]
233    fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
234        (h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
235    }
236}
237
238/// Compute bloom filter hash pair for a key (standalone, no BloomFilter needed).
239/// Uses the same FNV-1a double-hashing as BloomFilter::hash_pair (single pass).
240#[inline]
241fn bloom_hash_pair(key: &[u8]) -> (u64, u64) {
242    let mut h1: u64 = 0xcbf29ce484222325;
243    let mut h2: u64 = 0x84222325cbf29ce4;
244    for &byte in key {
245        h1 ^= byte as u64;
246        h1 = h1.wrapping_mul(0x100000001b3);
247        h2 = h2.wrapping_mul(0x100000001b3);
248        h2 ^= byte as u64;
249    }
250    (h1, h2)
251}
252
253/// SSTable value trait
254pub trait SSTableValue: Clone + Send + Sync {
255    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
256    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
257}
258
259/// u64 value implementation
260impl SSTableValue for u64 {
261    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
262        write_vint(writer, *self)
263    }
264
265    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
266        read_vint(reader)
267    }
268}
269
270/// Vec<u8> value implementation
271impl SSTableValue for Vec<u8> {
272    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
273        write_vint(writer, self.len() as u64)?;
274        writer.write_all(self)
275    }
276
277    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
278        let len = read_vint(reader)? as usize;
279        let mut data = vec![0u8; len];
280        reader.read_exact(&mut data)?;
281        Ok(data)
282    }
283}
284
285/// Sparse dimension info for SSTable-based sparse index
286/// Stores offset and length for posting list lookup
287#[derive(Debug, Clone, Copy, PartialEq, Eq)]
288pub struct SparseDimInfo {
289    /// Offset in sparse file where posting list starts
290    pub offset: u64,
291    /// Length of serialized posting list
292    pub length: u32,
293}
294
295impl SparseDimInfo {
296    pub fn new(offset: u64, length: u32) -> Self {
297        Self { offset, length }
298    }
299}
300
301impl SSTableValue for SparseDimInfo {
302    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
303        write_vint(writer, self.offset)?;
304        write_vint(writer, self.length as u64)
305    }
306
307    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
308        let offset = read_vint(reader)?;
309        let length = read_vint(reader)? as u32;
310        Ok(Self { offset, length })
311    }
312}
313
314/// Maximum number of postings that can be inlined in TermInfo
315pub const MAX_INLINE_POSTINGS: usize = 3;
316
317/// Term info for posting list references
318///
319/// Supports two modes:
320/// - **Inline**: Small posting lists (1-3 docs) stored directly in TermInfo
321/// - **External**: Larger posting lists stored in separate .post file
322///
323/// This eliminates a separate I/O read for rare/unique terms.
324#[derive(Debug, Clone, PartialEq, Eq)]
325pub enum TermInfo {
326    /// Small posting list inlined directly (up to MAX_INLINE_POSTINGS entries)
327    /// Each entry is (doc_id, term_freq) delta-encoded
328    Inline {
329        /// Number of postings (1-3)
330        doc_freq: u8,
331        /// Inline data: delta-encoded (doc_id, term_freq) pairs
332        /// Format: [delta_doc_id, term_freq, delta_doc_id, term_freq, ...]
333        data: [u8; 16],
334        /// Actual length of data used
335        data_len: u8,
336    },
337    /// Reference to external posting list in .post file
338    External {
339        posting_offset: u64,
340        posting_len: u64,
341        doc_freq: u32,
342        /// Position data offset (0 if no positions)
343        position_offset: u64,
344        /// Position data length (0 if no positions)
345        position_len: u64,
346    },
347}
348
349impl TermInfo {
350    /// Create an external reference
351    pub fn external(posting_offset: u64, posting_len: u64, doc_freq: u32) -> Self {
352        TermInfo::External {
353            posting_offset,
354            posting_len,
355            doc_freq,
356            position_offset: 0,
357            position_len: 0,
358        }
359    }
360
361    /// Create an external reference with position info
362    pub fn external_with_positions(
363        posting_offset: u64,
364        posting_len: u64,
365        doc_freq: u32,
366        position_offset: u64,
367        position_len: u64,
368    ) -> Self {
369        TermInfo::External {
370            posting_offset,
371            posting_len,
372            doc_freq,
373            position_offset,
374            position_len,
375        }
376    }
377
378    /// Try to create an inline TermInfo from posting data
379    /// Returns None if posting list is too large to inline
380    pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
381        if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
382            return None;
383        }
384
385        let mut data = [0u8; 16];
386        let mut cursor = std::io::Cursor::new(&mut data[..]);
387        let mut prev_doc_id = 0u32;
388
389        for (i, &doc_id) in doc_ids.iter().enumerate() {
390            let delta = doc_id - prev_doc_id;
391            if write_vint(&mut cursor, delta as u64).is_err() {
392                return None;
393            }
394            if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
395                return None;
396            }
397            prev_doc_id = doc_id;
398        }
399
400        let data_len = cursor.position() as u8;
401        if data_len > 16 {
402            return None;
403        }
404
405        Some(TermInfo::Inline {
406            doc_freq: doc_ids.len() as u8,
407            data,
408            data_len,
409        })
410    }
411
412    /// Try to create an inline TermInfo from an iterator of (doc_id, term_freq) pairs.
413    /// Zero-allocation alternative to `try_inline` — avoids collecting into Vec<u32>.
414    /// `count` is the number of postings (must match iterator length).
415    pub fn try_inline_iter(count: usize, iter: impl Iterator<Item = (u32, u32)>) -> Option<Self> {
416        if count > MAX_INLINE_POSTINGS || count == 0 {
417            return None;
418        }
419
420        let mut data = [0u8; 16];
421        let mut cursor = std::io::Cursor::new(&mut data[..]);
422        let mut prev_doc_id = 0u32;
423
424        for (doc_id, tf) in iter {
425            let delta = doc_id - prev_doc_id;
426            if write_vint(&mut cursor, delta as u64).is_err() {
427                return None;
428            }
429            if write_vint(&mut cursor, tf as u64).is_err() {
430                return None;
431            }
432            prev_doc_id = doc_id;
433        }
434
435        let data_len = cursor.position() as u8;
436
437        Some(TermInfo::Inline {
438            doc_freq: count as u8,
439            data,
440            data_len,
441        })
442    }
443
444    /// Get document frequency
445    pub fn doc_freq(&self) -> u32 {
446        match self {
447            TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
448            TermInfo::External { doc_freq, .. } => *doc_freq,
449        }
450    }
451
452    /// Check if this is an inline posting list
453    pub fn is_inline(&self) -> bool {
454        matches!(self, TermInfo::Inline { .. })
455    }
456
457    /// Get external posting info (offset, len) - returns None for inline
458    pub fn external_info(&self) -> Option<(u64, u64)> {
459        match self {
460            TermInfo::External {
461                posting_offset,
462                posting_len,
463                ..
464            } => Some((*posting_offset, *posting_len)),
465            TermInfo::Inline { .. } => None,
466        }
467    }
468
469    /// Get position info (offset, len) - returns None for inline or if no positions
470    pub fn position_info(&self) -> Option<(u64, u64)> {
471        match self {
472            TermInfo::External {
473                position_offset,
474                position_len,
475                ..
476            } if *position_len > 0 => Some((*position_offset, *position_len)),
477            _ => None,
478        }
479    }
480
481    /// Decode inline postings into (doc_ids, term_freqs)
482    /// Returns None if this is an external reference
483    pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
484        match self {
485            TermInfo::Inline {
486                doc_freq,
487                data,
488                data_len,
489            } => {
490                let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
491                let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
492                let mut reader = &data[..*data_len as usize];
493                let mut prev_doc_id = 0u32;
494
495                for _ in 0..*doc_freq {
496                    let delta = read_vint(&mut reader).ok()? as u32;
497                    let tf = read_vint(&mut reader).ok()? as u32;
498                    let doc_id = prev_doc_id + delta;
499                    doc_ids.push(doc_id);
500                    term_freqs.push(tf);
501                    prev_doc_id = doc_id;
502                }
503
504                Some((doc_ids, term_freqs))
505            }
506            TermInfo::External { .. } => None,
507        }
508    }
509}
510
511impl SSTableValue for TermInfo {
512    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
513        match self {
514            TermInfo::Inline {
515                doc_freq,
516                data,
517                data_len,
518            } => {
519                // Tag byte 0xFF = inline marker
520                writer.write_u8(0xFF)?;
521                writer.write_u8(*doc_freq)?;
522                writer.write_u8(*data_len)?;
523                writer.write_all(&data[..*data_len as usize])?;
524            }
525            TermInfo::External {
526                posting_offset,
527                posting_len,
528                doc_freq,
529                position_offset,
530                position_len,
531            } => {
532                // Tag byte 0x00 = external marker (no positions)
533                // Tag byte 0x01 = external with positions
534                if *position_len > 0 {
535                    writer.write_u8(0x01)?;
536                    write_vint(writer, *doc_freq as u64)?;
537                    write_vint(writer, *posting_offset)?;
538                    write_vint(writer, *posting_len)?;
539                    write_vint(writer, *position_offset)?;
540                    write_vint(writer, *position_len)?;
541                } else {
542                    writer.write_u8(0x00)?;
543                    write_vint(writer, *doc_freq as u64)?;
544                    write_vint(writer, *posting_offset)?;
545                    write_vint(writer, *posting_len)?;
546                }
547            }
548        }
549        Ok(())
550    }
551
552    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
553        let tag = reader.read_u8()?;
554
555        if tag == 0xFF {
556            // Inline
557            let doc_freq = reader.read_u8()?;
558            let data_len = reader.read_u8()?;
559            let mut data = [0u8; 16];
560            reader.read_exact(&mut data[..data_len as usize])?;
561            Ok(TermInfo::Inline {
562                doc_freq,
563                data,
564                data_len,
565            })
566        } else if tag == 0x00 {
567            // External (no positions)
568            let doc_freq = read_vint(reader)? as u32;
569            let posting_offset = read_vint(reader)?;
570            let posting_len = read_vint(reader)?;
571            Ok(TermInfo::External {
572                posting_offset,
573                posting_len,
574                doc_freq,
575                position_offset: 0,
576                position_len: 0,
577            })
578        } else if tag == 0x01 {
579            // External with positions
580            let doc_freq = read_vint(reader)? as u32;
581            let posting_offset = read_vint(reader)?;
582            let posting_len = read_vint(reader)?;
583            let position_offset = read_vint(reader)?;
584            let position_len = read_vint(reader)?;
585            Ok(TermInfo::External {
586                posting_offset,
587                posting_len,
588                doc_freq,
589                position_offset,
590                position_len,
591            })
592        } else {
593            Err(io::Error::new(
594                io::ErrorKind::InvalidData,
595                format!("Invalid TermInfo tag: {}", tag),
596            ))
597        }
598    }
599}
600
601/// Write variable-length integer
602pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
603    loop {
604        let byte = (value & 0x7F) as u8;
605        value >>= 7;
606        if value == 0 {
607            writer.write_u8(byte)?;
608            return Ok(());
609        } else {
610            writer.write_u8(byte | 0x80)?;
611        }
612    }
613}
614
615/// Read variable-length integer
616pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
617    let mut result = 0u64;
618    let mut shift = 0;
619
620    loop {
621        let byte = reader.read_u8()?;
622        result |= ((byte & 0x7F) as u64) << shift;
623        if byte & 0x80 == 0 {
624            return Ok(result);
625        }
626        shift += 7;
627        if shift >= 64 {
628            return Err(io::Error::new(
629                io::ErrorKind::InvalidData,
630                "varint too long",
631            ));
632        }
633    }
634}
635
636/// Compute common prefix length
637pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
638    a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
639}
640
641/// SSTable statistics for debugging
642#[derive(Debug, Clone)]
643pub struct SSTableStats {
644    pub num_blocks: usize,
645    pub num_sparse_entries: usize,
646    pub num_entries: u64,
647    pub has_bloom_filter: bool,
648    pub has_dictionary: bool,
649    pub bloom_filter_size: usize,
650    pub dictionary_size: usize,
651}
652
653/// SSTable writer configuration
654#[derive(Debug, Clone)]
655pub struct SSTableWriterConfig {
656    /// Compression level (1-22, higher = better compression but slower)
657    pub compression_level: CompressionLevel,
658    /// Whether to train and use a dictionary for compression
659    pub use_dictionary: bool,
660    /// Dictionary size in bytes (default 64KB)
661    pub dict_size: usize,
662    /// Whether to build a bloom filter
663    pub use_bloom_filter: bool,
664    /// Bloom filter bits per key (default 10 = ~1% false positive rate)
665    pub bloom_bits_per_key: usize,
666}
667
668impl Default for SSTableWriterConfig {
669    fn default() -> Self {
670        Self::from_optimization(crate::structures::IndexOptimization::default())
671    }
672}
673
674impl SSTableWriterConfig {
675    /// Create config from IndexOptimization mode
676    pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
677        use crate::structures::IndexOptimization;
678        match optimization {
679            IndexOptimization::Adaptive => Self {
680                compression_level: CompressionLevel::BETTER, // Level 9
681                use_dictionary: false,
682                dict_size: DEFAULT_DICT_SIZE,
683                use_bloom_filter: false,
684                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
685            },
686            IndexOptimization::SizeOptimized => Self {
687                compression_level: CompressionLevel::MAX, // Level 22
688                use_dictionary: true,
689                dict_size: DEFAULT_DICT_SIZE,
690                use_bloom_filter: true,
691                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
692            },
693            IndexOptimization::PerformanceOptimized => Self {
694                compression_level: CompressionLevel::FAST, // Level 1
695                use_dictionary: false,
696                dict_size: DEFAULT_DICT_SIZE,
697                use_bloom_filter: true, // Bloom helps skip blocks fast
698                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
699            },
700        }
701    }
702
703    /// Fast configuration - prioritize write speed over compression
704    pub fn fast() -> Self {
705        Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
706    }
707
708    /// Maximum compression configuration - prioritize size over speed
709    pub fn max_compression() -> Self {
710        Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
711    }
712}
713
714/// SSTable writer with optimizations:
715/// - Dictionary compression for blocks (if dictionary provided)
716/// - Configurable compression level
717/// - Block index prefix compression
718/// - Bloom filter for fast negative lookups
719pub struct SSTableWriter<W: Write, V: SSTableValue> {
720    writer: W,
721    block_buffer: Vec<u8>,
722    prev_key: Vec<u8>,
723    index: Vec<BlockIndexEntry>,
724    current_offset: u64,
725    num_entries: u64,
726    block_first_key: Option<Vec<u8>>,
727    config: SSTableWriterConfig,
728    /// Pre-trained dictionary for compression (optional)
729    dictionary: Option<CompressionDict>,
730    /// Bloom filter key hashes — compact (u64, u64) pairs instead of full keys.
731    /// Filter is built at finish() time with correct sizing.
732    bloom_hashes: Vec<(u64, u64)>,
733    _phantom: std::marker::PhantomData<V>,
734}
735
736impl<W: Write, V: SSTableValue> SSTableWriter<W, V> {
737    /// Create a new SSTable writer with default configuration
738    pub fn new(writer: W) -> Self {
739        Self::with_config(writer, SSTableWriterConfig::default())
740    }
741
742    /// Create a new SSTable writer with custom configuration
743    pub fn with_config(writer: W, config: SSTableWriterConfig) -> Self {
744        Self {
745            writer,
746            block_buffer: Vec::with_capacity(BLOCK_SIZE),
747            prev_key: Vec::new(),
748            index: Vec::new(),
749            current_offset: 0,
750            num_entries: 0,
751            block_first_key: None,
752            config,
753            dictionary: None,
754            bloom_hashes: Vec::new(),
755            _phantom: std::marker::PhantomData,
756        }
757    }
758
759    /// Create a new SSTable writer with a pre-trained dictionary
760    pub fn with_dictionary(
761        writer: W,
762        config: SSTableWriterConfig,
763        dictionary: CompressionDict,
764    ) -> Self {
765        Self {
766            writer,
767            block_buffer: Vec::with_capacity(BLOCK_SIZE),
768            prev_key: Vec::new(),
769            index: Vec::new(),
770            current_offset: 0,
771            num_entries: 0,
772            block_first_key: None,
773            config,
774            dictionary: Some(dictionary),
775            bloom_hashes: Vec::new(),
776            _phantom: std::marker::PhantomData,
777        }
778    }
779
780    pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
781        if self.block_first_key.is_none() {
782            self.block_first_key = Some(key.to_vec());
783        }
784
785        // Store compact hash pair for bloom filter (16 bytes vs ~48+ per key)
786        if self.config.use_bloom_filter {
787            self.bloom_hashes.push(bloom_hash_pair(key));
788        }
789
790        let prefix_len = common_prefix_len(&self.prev_key, key);
791        let suffix = &key[prefix_len..];
792
793        write_vint(&mut self.block_buffer, prefix_len as u64)?;
794        write_vint(&mut self.block_buffer, suffix.len() as u64)?;
795        self.block_buffer.extend_from_slice(suffix);
796        value.serialize(&mut self.block_buffer)?;
797
798        self.prev_key.clear();
799        self.prev_key.extend_from_slice(key);
800        self.num_entries += 1;
801
802        if self.block_buffer.len() >= BLOCK_SIZE {
803            self.flush_block()?;
804        }
805
806        Ok(())
807    }
808
809    /// Flush and compress the current block
810    fn flush_block(&mut self) -> io::Result<()> {
811        if self.block_buffer.is_empty() {
812            return Ok(());
813        }
814
815        // Compress block with dictionary if available
816        let compressed = if let Some(ref dict) = self.dictionary {
817            crate::compression::compress_with_dict(
818                &self.block_buffer,
819                self.config.compression_level,
820                dict,
821            )?
822        } else {
823            crate::compression::compress(&self.block_buffer, self.config.compression_level)?
824        };
825
826        if let Some(first_key) = self.block_first_key.take() {
827            self.index.push(BlockIndexEntry {
828                first_key,
829                offset: self.current_offset,
830                length: compressed.len() as u32,
831            });
832        }
833
834        self.writer.write_all(&compressed)?;
835        self.current_offset += compressed.len() as u64;
836        self.block_buffer.clear();
837        self.prev_key.clear();
838
839        Ok(())
840    }
841
842    pub fn finish(mut self) -> io::Result<W> {
843        // Flush any remaining data
844        self.flush_block()?;
845
846        // Build bloom filter from collected hashes (properly sized)
847        let bloom_filter = if self.config.use_bloom_filter && !self.bloom_hashes.is_empty() {
848            let mut bloom =
849                BloomFilter::new(self.bloom_hashes.len(), self.config.bloom_bits_per_key);
850            for (h1, h2) in &self.bloom_hashes {
851                bloom.insert_hashed(*h1, *h2);
852            }
853            Some(bloom)
854        } else {
855            None
856        };
857
858        let data_end_offset = self.current_offset;
859
860        // Build memory-efficient block index
861        // Convert to (key, BlockAddr) pairs for the new index format
862        let entries: Vec<(Vec<u8>, BlockAddr)> = self
863            .index
864            .iter()
865            .map(|e| {
866                (
867                    e.first_key.clone(),
868                    BlockAddr {
869                        offset: e.offset,
870                        length: e.length,
871                    },
872                )
873            })
874            .collect();
875
876        // Build FST-based index if native feature is enabled, otherwise use mmap index
877        #[cfg(feature = "native")]
878        let index_bytes = FstBlockIndex::build(&entries)?;
879        #[cfg(not(feature = "native"))]
880        let index_bytes = MmapBlockIndex::build(&entries)?;
881
882        // Write index bytes with length prefix
883        self.writer
884            .write_u32::<LittleEndian>(index_bytes.len() as u32)?;
885        self.writer.write_all(&index_bytes)?;
886        self.current_offset += 4 + index_bytes.len() as u64;
887
888        // Write bloom filter if present
889        let bloom_offset = if let Some(ref bloom) = bloom_filter {
890            let bloom_data = bloom.to_bytes();
891            let offset = self.current_offset;
892            self.writer.write_all(&bloom_data)?;
893            self.current_offset += bloom_data.len() as u64;
894            offset
895        } else {
896            0
897        };
898
899        // Write dictionary if present
900        let dict_offset = if let Some(ref dict) = self.dictionary {
901            let dict_bytes = dict.as_bytes();
902            let offset = self.current_offset;
903            self.writer
904                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
905            self.writer.write_all(dict_bytes)?;
906            self.current_offset += 4 + dict_bytes.len() as u64;
907            offset
908        } else {
909            0
910        };
911
912        // Write extended footer
913        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
914        self.writer.write_u64::<LittleEndian>(self.num_entries)?;
915        self.writer.write_u64::<LittleEndian>(bloom_offset)?; // 0 if no bloom
916        self.writer.write_u64::<LittleEndian>(dict_offset)?; // 0 if no dict
917        self.writer
918            .write_u8(self.config.compression_level.0 as u8)?;
919        self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
920
921        Ok(self.writer)
922    }
923}
924
925/// Block index entry
926#[derive(Debug, Clone)]
927struct BlockIndexEntry {
928    first_key: Vec<u8>,
929    offset: u64,
930    length: u32,
931}
932
933/// Async SSTable reader - loads blocks on demand via FileHandle
934///
935/// Memory-efficient design:
936/// - Block index uses FST (native) or mmap'd raw bytes - no heap allocation for keys
937/// - Block addresses stored in bitpacked format
938/// - Bloom filter and dictionary optional
939pub struct AsyncSSTableReader<V: SSTableValue> {
940    /// FileHandle for the data portion (blocks only) - fetches ranges on demand
941    data_slice: FileHandle,
942    /// Memory-efficient block index (FST or mmap)
943    block_index: BlockIndex,
944    num_entries: u64,
945    /// Hot cache for decompressed blocks
946    cache: RwLock<BlockCache>,
947    /// Bloom filter for fast negative lookups (optional)
948    bloom_filter: Option<BloomFilter>,
949    /// Compression dictionary (optional)
950    dictionary: Option<CompressionDict>,
951    /// Compression level used
952    #[allow(dead_code)]
953    compression_level: CompressionLevel,
954    _phantom: std::marker::PhantomData<V>,
955}
956
957/// LRU block cache — O(1) lookup/insert, amortized O(n) promotion.
958///
959/// On `get()`, promotes the accessed entry to MRU position.
960/// On eviction, removes the LRU entry (front of VecDeque).
961/// For typical cache sizes (16-64 blocks), the linear scan in
962/// `promote()` is negligible compared to I/O savings.
963struct BlockCache {
964    blocks: FxHashMap<u64, Arc<[u8]>>,
965    lru_order: std::collections::VecDeque<u64>,
966    max_blocks: usize,
967}
968
969impl BlockCache {
970    fn new(max_blocks: usize) -> Self {
971        Self {
972            blocks: FxHashMap::default(),
973            lru_order: std::collections::VecDeque::with_capacity(max_blocks),
974            max_blocks,
975        }
976    }
977
978    fn get(&mut self, offset: u64) -> Option<Arc<[u8]>> {
979        if self.blocks.contains_key(&offset) {
980            self.promote(offset);
981            self.blocks.get(&offset).map(Arc::clone)
982        } else {
983            None
984        }
985    }
986
987    fn insert(&mut self, offset: u64, block: Arc<[u8]>) {
988        if self.blocks.contains_key(&offset) {
989            self.promote(offset);
990            return;
991        }
992        while self.blocks.len() >= self.max_blocks {
993            if let Some(evict_offset) = self.lru_order.pop_front() {
994                self.blocks.remove(&evict_offset);
995            } else {
996                break;
997            }
998        }
999        self.blocks.insert(offset, block);
1000        self.lru_order.push_back(offset);
1001    }
1002
1003    /// Move entry to MRU position (back of deque)
1004    fn promote(&mut self, offset: u64) {
1005        if let Some(pos) = self.lru_order.iter().position(|&k| k == offset) {
1006            self.lru_order.remove(pos);
1007            self.lru_order.push_back(offset);
1008        }
1009    }
1010}
1011
1012impl<V: SSTableValue> AsyncSSTableReader<V> {
1013    /// Open an SSTable from a FileHandle
1014    /// Only loads the footer and index into memory, data blocks fetched on-demand
1015    ///
1016    /// Uses FST-based (native) or mmap'd block index (no heap allocation for keys)
1017    pub async fn open(file_handle: FileHandle, cache_blocks: usize) -> io::Result<Self> {
1018        let file_len = file_handle.len();
1019        if file_len < 37 {
1020            return Err(io::Error::new(
1021                io::ErrorKind::InvalidData,
1022                "SSTable too small",
1023            ));
1024        }
1025
1026        // Read footer (37 bytes)
1027        // Format: data_end(8) + num_entries(8) + bloom_offset(8) + dict_offset(8) + compression_level(1) + magic(4)
1028        let footer_bytes = file_handle
1029            .read_bytes_range(file_len - 37..file_len)
1030            .await?;
1031
1032        let mut reader = footer_bytes.as_slice();
1033        let data_end_offset = reader.read_u64::<LittleEndian>()?;
1034        let num_entries = reader.read_u64::<LittleEndian>()?;
1035        let bloom_offset = reader.read_u64::<LittleEndian>()?;
1036        let dict_offset = reader.read_u64::<LittleEndian>()?;
1037        let compression_level = CompressionLevel(reader.read_u8()? as i32);
1038        let magic = reader.read_u32::<LittleEndian>()?;
1039
1040        if magic != SSTABLE_MAGIC {
1041            return Err(io::Error::new(
1042                io::ErrorKind::InvalidData,
1043                format!("Invalid SSTable magic: 0x{:08X}", magic),
1044            ));
1045        }
1046
1047        // Read index section
1048        let index_start = data_end_offset;
1049        let index_end = file_len - 37;
1050        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
1051
1052        // Parse block index (length-prefixed FST or mmap index)
1053        let mut idx_reader = index_bytes.as_slice();
1054        let index_len = idx_reader.read_u32::<LittleEndian>()? as usize;
1055
1056        if index_len > idx_reader.len() {
1057            return Err(io::Error::new(
1058                io::ErrorKind::InvalidData,
1059                "Index data truncated",
1060            ));
1061        }
1062
1063        let index_data = index_bytes.slice(4..4 + index_len);
1064
1065        // Try FST first (native), fall back to mmap
1066        #[cfg(feature = "native")]
1067        let block_index = match FstBlockIndex::load(index_data.clone()) {
1068            Ok(fst_idx) => BlockIndex::Fst(fst_idx),
1069            Err(_) => BlockIndex::Mmap(MmapBlockIndex::load(index_data)?),
1070        };
1071        #[cfg(not(feature = "native"))]
1072        let block_index = BlockIndex::Mmap(MmapBlockIndex::load(index_data)?);
1073
1074        // Load bloom filter if present
1075        let bloom_filter = if bloom_offset > 0 {
1076            let bloom_start = bloom_offset;
1077            // Read bloom filter size first (12 bytes header)
1078            let bloom_header = file_handle
1079                .read_bytes_range(bloom_start..bloom_start + 12)
1080                .await?;
1081            let num_words = u32::from_le_bytes([
1082                bloom_header[8],
1083                bloom_header[9],
1084                bloom_header[10],
1085                bloom_header[11],
1086            ]) as u64;
1087            let bloom_size = 12 + num_words * 8;
1088            let bloom_data = file_handle
1089                .read_bytes_range(bloom_start..bloom_start + bloom_size)
1090                .await?;
1091            Some(BloomFilter::from_owned_bytes(bloom_data)?)
1092        } else {
1093            None
1094        };
1095
1096        // Load dictionary if present
1097        let dictionary = if dict_offset > 0 {
1098            let dict_start = dict_offset;
1099            // Read dictionary size first
1100            let dict_len_bytes = file_handle
1101                .read_bytes_range(dict_start..dict_start + 4)
1102                .await?;
1103            let dict_len = u32::from_le_bytes([
1104                dict_len_bytes[0],
1105                dict_len_bytes[1],
1106                dict_len_bytes[2],
1107                dict_len_bytes[3],
1108            ]) as u64;
1109            let dict_data = file_handle
1110                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
1111                .await?;
1112            Some(CompressionDict::from_owned_bytes(dict_data))
1113        } else {
1114            None
1115        };
1116
1117        // Create a lazy slice for just the data portion
1118        let data_slice = file_handle.slice(0..data_end_offset);
1119
1120        Ok(Self {
1121            data_slice,
1122            block_index,
1123            num_entries,
1124            cache: RwLock::new(BlockCache::new(cache_blocks)),
1125            bloom_filter,
1126            dictionary,
1127            compression_level,
1128            _phantom: std::marker::PhantomData,
1129        })
1130    }
1131
1132    /// Number of entries
1133    pub fn num_entries(&self) -> u64 {
1134        self.num_entries
1135    }
1136
1137    /// Get stats about this SSTable for debugging
1138    pub fn stats(&self) -> SSTableStats {
1139        SSTableStats {
1140            num_blocks: self.block_index.len(),
1141            num_sparse_entries: 0, // No longer using sparse index separately
1142            num_entries: self.num_entries,
1143            has_bloom_filter: self.bloom_filter.is_some(),
1144            has_dictionary: self.dictionary.is_some(),
1145            bloom_filter_size: self
1146                .bloom_filter
1147                .as_ref()
1148                .map(|b| b.size_bytes())
1149                .unwrap_or(0),
1150            dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
1151        }
1152    }
1153
1154    /// Number of blocks currently in the cache
1155    pub fn cached_blocks(&self) -> usize {
1156        self.cache.read().blocks.len()
1157    }
1158
1159    /// Look up a key (async - may need to load block)
1160    ///
1161    /// Uses bloom filter for fast negative lookups, then memory-efficient
1162    /// block index to locate the block, reducing I/O to typically 1 block read.
1163    pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
1164        log::debug!(
1165            "SSTable::get called, key_len={}, total_blocks={}",
1166            key.len(),
1167            self.block_index.len()
1168        );
1169
1170        // Check bloom filter first - fast negative lookup
1171        if let Some(ref bloom) = self.bloom_filter
1172            && !bloom.may_contain(key)
1173        {
1174            log::debug!("SSTable::get bloom filter negative");
1175            return Ok(None);
1176        }
1177
1178        // Use block index to find the block that could contain the key
1179        let block_idx = match self.block_index.locate(key) {
1180            Some(idx) => idx,
1181            None => {
1182                log::debug!("SSTable::get key not found (before first block)");
1183                return Ok(None);
1184            }
1185        };
1186
1187        log::debug!("SSTable::get loading block_idx={}", block_idx);
1188
1189        // Now we know exactly which block to load - single I/O
1190        let block_data = self.load_block(block_idx).await?;
1191        self.search_block(&block_data, key)
1192    }
1193
1194    /// Batch lookup multiple keys with optimized I/O
1195    ///
1196    /// Groups keys by block and loads each block only once, reducing
1197    /// I/O from N reads to at most N reads (often fewer if keys share blocks).
1198    /// Uses bloom filter to skip keys that definitely don't exist.
1199    pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
1200        if keys.is_empty() {
1201            return Ok(Vec::new());
1202        }
1203
1204        // Map each key to its block index
1205        let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
1206        for (key_idx, key) in keys.iter().enumerate() {
1207            // Check bloom filter first
1208            if let Some(ref bloom) = self.bloom_filter
1209                && !bloom.may_contain(key)
1210            {
1211                key_to_block.push((key_idx, usize::MAX)); // Definitely not present
1212                continue;
1213            }
1214
1215            match self.block_index.locate(key) {
1216                Some(block_idx) => key_to_block.push((key_idx, block_idx)),
1217                None => key_to_block.push((key_idx, usize::MAX)), // Mark as not found
1218            }
1219        }
1220
1221        // Group keys by block
1222        let mut blocks_to_load: Vec<usize> = key_to_block
1223            .iter()
1224            .filter(|(_, b)| *b != usize::MAX)
1225            .map(|(_, b)| *b)
1226            .collect();
1227        blocks_to_load.sort_unstable();
1228        blocks_to_load.dedup();
1229
1230        // Load all needed blocks (this is where I/O happens)
1231        for &block_idx in &blocks_to_load {
1232            let _ = self.load_block(block_idx).await?;
1233        }
1234
1235        // Now search each key in its block (all blocks are cached)
1236        let mut results = vec![None; keys.len()];
1237        for (key_idx, block_idx) in key_to_block {
1238            if block_idx == usize::MAX {
1239                continue;
1240            }
1241            let block_data = self.load_block(block_idx).await?; // Will hit cache
1242            results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
1243        }
1244
1245        Ok(results)
1246    }
1247
1248    /// Preload all data blocks into memory
1249    ///
1250    /// Call this after open() to eliminate all I/O during subsequent lookups.
1251    /// Useful when the SSTable is small enough to fit in memory.
1252    pub async fn preload_all_blocks(&self) -> io::Result<()> {
1253        for block_idx in 0..self.block_index.len() {
1254            self.load_block(block_idx).await?;
1255        }
1256        Ok(())
1257    }
1258
1259    /// Prefetch all data blocks via a single bulk I/O operation.
1260    ///
1261    /// Reads the entire compressed data section in one call, then decompresses
1262    /// each block and populates the cache. This turns N individual reads into 1.
1263    /// Cache capacity is expanded to hold all blocks.
1264    pub async fn prefetch_all_data_bulk(&self) -> io::Result<()> {
1265        let num_blocks = self.block_index.len();
1266        if num_blocks == 0 {
1267            return Ok(());
1268        }
1269
1270        // Find total data extent
1271        let mut max_end: u64 = 0;
1272        for i in 0..num_blocks {
1273            if let Some(addr) = self.block_index.get_addr(i) {
1274                max_end = max_end.max(addr.offset + addr.length as u64);
1275            }
1276        }
1277
1278        // Single bulk read of entire data section
1279        let all_data = self.data_slice.read_bytes_range(0..max_end).await?;
1280        let buf = all_data.as_slice();
1281
1282        // Expand cache and decompress all blocks
1283        let mut cache = self.cache.write();
1284        cache.max_blocks = cache.max_blocks.max(num_blocks);
1285        for i in 0..num_blocks {
1286            let addr = self.block_index.get_addr(i).unwrap();
1287            if cache.get(addr.offset).is_some() {
1288                continue;
1289            }
1290            let compressed =
1291                &buf[addr.offset as usize..(addr.offset + addr.length as u64) as usize];
1292            let decompressed = if let Some(ref dict) = self.dictionary {
1293                crate::compression::decompress_with_dict(compressed, dict)?
1294            } else {
1295                crate::compression::decompress(compressed)?
1296            };
1297            cache.insert(addr.offset, Arc::from(decompressed));
1298        }
1299
1300        Ok(())
1301    }
1302
1303    /// Load a block (checks cache first, then loads from FileSlice)
1304    /// Uses dictionary decompression if dictionary is present
1305    async fn load_block(&self, block_idx: usize) -> io::Result<Arc<[u8]>> {
1306        let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
1307            io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
1308        })?;
1309
1310        // Check cache (write lock for LRU promotion on hit)
1311        {
1312            if let Some(block) = self.cache.write().get(addr.offset) {
1313                return Ok(block);
1314            }
1315        }
1316
1317        log::debug!(
1318            "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
1319            block_idx,
1320            addr.offset,
1321            addr.offset + addr.length as u64
1322        );
1323
1324        // Load from FileSlice
1325        let range = addr.byte_range();
1326        let compressed = self.data_slice.read_bytes_range(range).await?;
1327
1328        // Decompress with dictionary if available
1329        let decompressed = if let Some(ref dict) = self.dictionary {
1330            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1331        } else {
1332            crate::compression::decompress(compressed.as_slice())?
1333        };
1334
1335        let block: Arc<[u8]> = Arc::from(decompressed);
1336
1337        // Insert into cache
1338        {
1339            let mut cache = self.cache.write();
1340            cache.insert(addr.offset, Arc::clone(&block));
1341        }
1342
1343        Ok(block)
1344    }
1345
1346    fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
1347        let mut reader = block_data;
1348        let mut current_key = Vec::new();
1349
1350        while !reader.is_empty() {
1351            let common_prefix_len = read_vint(&mut reader)? as usize;
1352            let suffix_len = read_vint(&mut reader)? as usize;
1353
1354            current_key.truncate(common_prefix_len);
1355            let mut suffix = vec![0u8; suffix_len];
1356            reader.read_exact(&mut suffix)?;
1357            current_key.extend_from_slice(&suffix);
1358
1359            let value = V::deserialize(&mut reader)?;
1360
1361            match current_key.as_slice().cmp(target_key) {
1362                std::cmp::Ordering::Equal => return Ok(Some(value)),
1363                std::cmp::Ordering::Greater => return Ok(None),
1364                std::cmp::Ordering::Less => continue,
1365            }
1366        }
1367
1368        Ok(None)
1369    }
1370
1371    /// Prefetch blocks for a key range
1372    pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
1373        let start_block = self.block_index.locate(start_key).unwrap_or(0);
1374        let end_block = self
1375            .block_index
1376            .locate(end_key)
1377            .unwrap_or(self.block_index.len().saturating_sub(1));
1378
1379        for block_idx in start_block..=end_block.min(self.block_index.len().saturating_sub(1)) {
1380            let _ = self.load_block(block_idx).await?;
1381        }
1382
1383        Ok(())
1384    }
1385
1386    /// Iterate over all entries (loads blocks as needed)
1387    pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
1388        AsyncSSTableIterator::new(self)
1389    }
1390
1391    /// Get all entries as a vector (for merging)
1392    pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
1393        let mut results = Vec::new();
1394
1395        for block_idx in 0..self.block_index.len() {
1396            let block_data = self.load_block(block_idx).await?;
1397            let mut reader = &block_data[..];
1398            let mut current_key = Vec::new();
1399
1400            while !reader.is_empty() {
1401                let common_prefix_len = read_vint(&mut reader)? as usize;
1402                let suffix_len = read_vint(&mut reader)? as usize;
1403
1404                current_key.truncate(common_prefix_len);
1405                let mut suffix = vec![0u8; suffix_len];
1406                reader.read_exact(&mut suffix)?;
1407                current_key.extend_from_slice(&suffix);
1408
1409                let value = V::deserialize(&mut reader)?;
1410                results.push((current_key.clone(), value));
1411            }
1412        }
1413
1414        Ok(results)
1415    }
1416}
1417
1418/// Async iterator over SSTable entries
1419pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
1420    reader: &'a AsyncSSTableReader<V>,
1421    current_block: usize,
1422    block_data: Option<Arc<[u8]>>,
1423    block_offset: usize,
1424    current_key: Vec<u8>,
1425    finished: bool,
1426}
1427
1428impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
1429    fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
1430        Self {
1431            reader,
1432            current_block: 0,
1433            block_data: None,
1434            block_offset: 0,
1435            current_key: Vec::new(),
1436            finished: reader.block_index.is_empty(),
1437        }
1438    }
1439
1440    async fn load_next_block(&mut self) -> io::Result<bool> {
1441        if self.current_block >= self.reader.block_index.len() {
1442            self.finished = true;
1443            return Ok(false);
1444        }
1445
1446        self.block_data = Some(self.reader.load_block(self.current_block).await?);
1447        self.block_offset = 0;
1448        self.current_key.clear();
1449        self.current_block += 1;
1450        Ok(true)
1451    }
1452
1453    /// Advance to next entry (async)
1454    pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
1455        if self.finished {
1456            return Ok(None);
1457        }
1458
1459        if self.block_data.is_none() && !self.load_next_block().await? {
1460            return Ok(None);
1461        }
1462
1463        loop {
1464            let block = self.block_data.as_ref().unwrap();
1465            if self.block_offset >= block.len() {
1466                if !self.load_next_block().await? {
1467                    return Ok(None);
1468                }
1469                continue;
1470            }
1471
1472            let mut reader = &block[self.block_offset..];
1473            let start_len = reader.len();
1474
1475            let common_prefix_len = read_vint(&mut reader)? as usize;
1476            let suffix_len = read_vint(&mut reader)? as usize;
1477
1478            self.current_key.truncate(common_prefix_len);
1479            let mut suffix = vec![0u8; suffix_len];
1480            reader.read_exact(&mut suffix)?;
1481            self.current_key.extend_from_slice(&suffix);
1482
1483            let value = V::deserialize(&mut reader)?;
1484
1485            self.block_offset += start_len - reader.len();
1486
1487            return Ok(Some((self.current_key.clone(), value)));
1488        }
1489    }
1490}
1491
1492#[cfg(test)]
1493mod tests {
1494    use super::*;
1495
1496    #[test]
1497    fn test_bloom_filter_basic() {
1498        let mut bloom = BloomFilter::new(100, 10);
1499
1500        bloom.insert(b"hello");
1501        bloom.insert(b"world");
1502        bloom.insert(b"test");
1503
1504        assert!(bloom.may_contain(b"hello"));
1505        assert!(bloom.may_contain(b"world"));
1506        assert!(bloom.may_contain(b"test"));
1507
1508        // These should likely return false (with ~1% false positive rate)
1509        assert!(!bloom.may_contain(b"notfound"));
1510        assert!(!bloom.may_contain(b"missing"));
1511    }
1512
1513    #[test]
1514    fn test_bloom_filter_serialization() {
1515        let mut bloom = BloomFilter::new(100, 10);
1516        bloom.insert(b"key1");
1517        bloom.insert(b"key2");
1518
1519        let bytes = bloom.to_bytes();
1520        let restored = BloomFilter::from_owned_bytes(OwnedBytes::new(bytes)).unwrap();
1521
1522        assert!(restored.may_contain(b"key1"));
1523        assert!(restored.may_contain(b"key2"));
1524        assert!(!restored.may_contain(b"key3"));
1525    }
1526
1527    #[test]
1528    fn test_bloom_filter_false_positive_rate() {
1529        let num_keys = 10000;
1530        let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
1531
1532        // Insert keys
1533        for i in 0..num_keys {
1534            let key = format!("key_{}", i);
1535            bloom.insert(key.as_bytes());
1536        }
1537
1538        // All inserted keys should be found
1539        for i in 0..num_keys {
1540            let key = format!("key_{}", i);
1541            assert!(bloom.may_contain(key.as_bytes()));
1542        }
1543
1544        // Check false positive rate on non-existent keys
1545        let mut false_positives = 0;
1546        let test_count = 10000;
1547        for i in 0..test_count {
1548            let key = format!("nonexistent_{}", i);
1549            if bloom.may_contain(key.as_bytes()) {
1550                false_positives += 1;
1551            }
1552        }
1553
1554        // With 10 bits per key, expect ~1% false positive rate
1555        // Allow up to 3% due to hash function variance
1556        let fp_rate = false_positives as f64 / test_count as f64;
1557        assert!(
1558            fp_rate < 0.03,
1559            "False positive rate {} is too high",
1560            fp_rate
1561        );
1562    }
1563
1564    #[test]
1565    fn test_sstable_writer_config() {
1566        use crate::structures::IndexOptimization;
1567
1568        // Default = Adaptive
1569        let config = SSTableWriterConfig::default();
1570        assert_eq!(config.compression_level.0, 9); // BETTER
1571        assert!(!config.use_bloom_filter);
1572        assert!(!config.use_dictionary);
1573
1574        // Adaptive
1575        let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
1576        assert_eq!(adaptive.compression_level.0, 9);
1577        assert!(!adaptive.use_bloom_filter);
1578        assert!(!adaptive.use_dictionary);
1579
1580        // SizeOptimized
1581        let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
1582        assert_eq!(size.compression_level.0, 22); // MAX
1583        assert!(size.use_bloom_filter);
1584        assert!(size.use_dictionary);
1585
1586        // PerformanceOptimized
1587        let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
1588        assert_eq!(perf.compression_level.0, 1); // FAST
1589        assert!(perf.use_bloom_filter); // Bloom helps skip blocks fast
1590        assert!(!perf.use_dictionary);
1591
1592        // Aliases
1593        let fast = SSTableWriterConfig::fast();
1594        assert_eq!(fast.compression_level.0, 1);
1595
1596        let max = SSTableWriterConfig::max_compression();
1597        assert_eq!(max.compression_level.0, 22);
1598    }
1599
1600    #[test]
1601    fn test_vint_roundtrip() {
1602        let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
1603
1604        for &val in &test_values {
1605            let mut buf = Vec::new();
1606            write_vint(&mut buf, val).unwrap();
1607            let mut reader = buf.as_slice();
1608            let decoded = read_vint(&mut reader).unwrap();
1609            assert_eq!(val, decoded, "Failed for value {}", val);
1610        }
1611    }
1612
1613    #[test]
1614    fn test_common_prefix_len() {
1615        assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
1616        assert_eq!(common_prefix_len(b"hello", b"help"), 3);
1617        assert_eq!(common_prefix_len(b"hello", b"world"), 0);
1618        assert_eq!(common_prefix_len(b"", b"hello"), 0);
1619        assert_eq!(common_prefix_len(b"hello", b""), 0);
1620    }
1621}