Skip to main content

hermes_core/structures/
sstable.rs

1//! Async SSTable with lazy loading via FileSlice
2//!
3//! Memory-efficient design - only loads minimal metadata into memory,
4//! blocks are loaded on-demand.
5//!
6//! ## Key Features
7//!
8//! 1. **FST-based Block Index**: Uses Finite State Transducer for key lookup
9//!    - Can be mmap'd directly without parsing into heap-allocated structures
10//!    - ~90% memory reduction compared to Vec<BlockIndexEntry>
11//!
12//! 2. **Bitpacked Block Addresses**: Offsets and lengths stored with delta encoding
13//!    - Minimal memory footprint for block metadata
14//!
15//! 3. **Dictionary Compression**: Zstd dictionary for 15-30% better compression
16//!
17//! 4. **Configurable Compression Level**: Levels 1-22 for space/speed tradeoff
18//!
19//! 5. **Bloom Filter**: Fast negative lookups to skip unnecessary I/O
20
21use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
22use parking_lot::RwLock;
23use rustc_hash::FxHashMap;
24use std::io::{self, Read, Write};
25use std::sync::Arc;
26
27#[cfg(feature = "native")]
28use super::sstable_index::FstBlockIndex;
29use super::sstable_index::{BlockAddr, BlockIndex, MmapBlockIndex};
30use crate::compression::{CompressionDict, CompressionLevel};
31use crate::directories::{FileHandle, OwnedBytes};
32
33/// SSTable magic number - version 4 with memory-efficient index
34/// Uses FST-based or mmap'd block index to avoid heap allocation
35pub const SSTABLE_MAGIC: u32 = 0x53544234; // "STB4"
36
37/// Block size for SSTable (16KB default)
38pub const BLOCK_SIZE: usize = 16 * 1024;
39
40/// Default dictionary size (64KB)
41pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
42
43/// Bloom filter bits per key (10 bits ≈ 1% false positive rate)
44pub const BLOOM_BITS_PER_KEY: usize = 10;
45
46/// Bloom filter hash count (optimal for 10 bits/key)
47pub const BLOOM_HASH_COUNT: usize = 7;
48
49// ============================================================================
50// Bloom Filter Implementation
51// ============================================================================
52
53/// Simple bloom filter for key existence checks
54#[derive(Debug, Clone)]
55pub struct BloomFilter {
56    bits: BloomBits,
57    num_bits: usize,
58    num_hashes: usize,
59}
60
61/// Bloom filter storage — Vec for write path, OwnedBytes for zero-copy read path.
62#[derive(Debug, Clone)]
63enum BloomBits {
64    /// Mutable storage for building (SSTable writer)
65    Vec(Vec<u64>),
66    /// Zero-copy mmap reference for reading (raw LE u64 words, no header)
67    Bytes(OwnedBytes),
68}
69
70impl BloomBits {
71    #[inline]
72    fn len(&self) -> usize {
73        match self {
74            BloomBits::Vec(v) => v.len(),
75            BloomBits::Bytes(b) => b.len() / 8,
76        }
77    }
78
79    #[inline]
80    fn get(&self, word_idx: usize) -> u64 {
81        match self {
82            BloomBits::Vec(v) => v[word_idx],
83            BloomBits::Bytes(b) => {
84                let off = word_idx * 8;
85                u64::from_le_bytes([
86                    b[off],
87                    b[off + 1],
88                    b[off + 2],
89                    b[off + 3],
90                    b[off + 4],
91                    b[off + 5],
92                    b[off + 6],
93                    b[off + 7],
94                ])
95            }
96        }
97    }
98
99    #[inline]
100    fn set_bit(&mut self, word_idx: usize, bit_idx: usize) {
101        match self {
102            BloomBits::Vec(v) => v[word_idx] |= 1u64 << bit_idx,
103            BloomBits::Bytes(_) => panic!("cannot mutate read-only bloom filter"),
104        }
105    }
106
107    fn size_bytes(&self) -> usize {
108        match self {
109            BloomBits::Vec(v) => v.len() * 8,
110            BloomBits::Bytes(b) => b.len(),
111        }
112    }
113}
114
115impl BloomFilter {
116    /// Create a new bloom filter sized for expected number of keys
117    pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
118        let num_bits = (expected_keys * bits_per_key).max(64);
119        let num_words = num_bits.div_ceil(64);
120        Self {
121            bits: BloomBits::Vec(vec![0u64; num_words]),
122            num_bits,
123            num_hashes: BLOOM_HASH_COUNT,
124        }
125    }
126
127    /// Create from serialized OwnedBytes (zero-copy for mmap)
128    pub fn from_owned_bytes(data: OwnedBytes) -> io::Result<Self> {
129        if data.len() < 12 {
130            return Err(io::Error::new(
131                io::ErrorKind::InvalidData,
132                "Bloom filter data too short",
133            ));
134        }
135        let d = data.as_slice();
136        let num_bits = u32::from_le_bytes([d[0], d[1], d[2], d[3]]) as usize;
137        let num_hashes = u32::from_le_bytes([d[4], d[5], d[6], d[7]]) as usize;
138        let num_words = u32::from_le_bytes([d[8], d[9], d[10], d[11]]) as usize;
139
140        if d.len() < 12 + num_words * 8 {
141            return Err(io::Error::new(
142                io::ErrorKind::InvalidData,
143                "Bloom filter data truncated",
144            ));
145        }
146
147        // Slice past the 12-byte header to get raw u64 LE words (zero-copy)
148        let bits_bytes = data.slice(12..12 + num_words * 8);
149
150        Ok(Self {
151            bits: BloomBits::Bytes(bits_bytes),
152            num_bits,
153            num_hashes,
154        })
155    }
156
157    /// Serialize to bytes
158    pub fn to_bytes(&self) -> Vec<u8> {
159        let num_words = self.bits.len();
160        let mut data = Vec::with_capacity(12 + num_words * 8);
161        data.write_u32::<LittleEndian>(self.num_bits as u32)
162            .unwrap();
163        data.write_u32::<LittleEndian>(self.num_hashes as u32)
164            .unwrap();
165        data.write_u32::<LittleEndian>(num_words as u32).unwrap();
166        for i in 0..num_words {
167            data.write_u64::<LittleEndian>(self.bits.get(i)).unwrap();
168        }
169        data
170    }
171
172    /// Add a key to the filter
173    pub fn insert(&mut self, key: &[u8]) {
174        let (h1, h2) = self.hash_pair(key);
175        for i in 0..self.num_hashes {
176            let bit_pos = self.get_bit_pos(h1, h2, i);
177            let word_idx = bit_pos / 64;
178            let bit_idx = bit_pos % 64;
179            if word_idx < self.bits.len() {
180                self.bits.set_bit(word_idx, bit_idx);
181            }
182        }
183    }
184
185    /// Check if a key might be in the filter
186    /// Returns false if definitely not present, true if possibly present
187    pub fn may_contain(&self, key: &[u8]) -> bool {
188        let (h1, h2) = self.hash_pair(key);
189        for i in 0..self.num_hashes {
190            let bit_pos = self.get_bit_pos(h1, h2, i);
191            let word_idx = bit_pos / 64;
192            let bit_idx = bit_pos % 64;
193            if word_idx >= self.bits.len() || (self.bits.get(word_idx) & (1u64 << bit_idx)) == 0 {
194                return false;
195            }
196        }
197        true
198    }
199
200    /// Size in bytes
201    pub fn size_bytes(&self) -> usize {
202        12 + self.bits.size_bytes()
203    }
204
205    /// Insert a pre-computed hash pair into the filter
206    pub fn insert_hashed(&mut self, h1: u64, h2: u64) {
207        for i in 0..self.num_hashes {
208            let bit_pos = self.get_bit_pos(h1, h2, i);
209            let word_idx = bit_pos / 64;
210            let bit_idx = bit_pos % 64;
211            if word_idx < self.bits.len() {
212                self.bits.set_bit(word_idx, bit_idx);
213            }
214        }
215    }
216
217    /// Compute two hash values using FNV-1a variant (single pass over key bytes)
218    #[inline]
219    fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
220        let mut h1: u64 = 0xcbf29ce484222325;
221        let mut h2: u64 = 0x84222325cbf29ce4;
222        for &byte in key {
223            h1 ^= byte as u64;
224            h1 = h1.wrapping_mul(0x100000001b3);
225            h2 = h2.wrapping_mul(0x100000001b3);
226            h2 ^= byte as u64;
227        }
228        (h1, h2)
229    }
230
231    /// Get bit position for hash iteration i using double hashing
232    #[inline]
233    fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
234        (h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
235    }
236}
237
238/// Compute bloom filter hash pair for a key (standalone, no BloomFilter needed).
239/// Uses the same FNV-1a double-hashing as BloomFilter::hash_pair (single pass).
240#[inline]
241fn bloom_hash_pair(key: &[u8]) -> (u64, u64) {
242    let mut h1: u64 = 0xcbf29ce484222325;
243    let mut h2: u64 = 0x84222325cbf29ce4;
244    for &byte in key {
245        h1 ^= byte as u64;
246        h1 = h1.wrapping_mul(0x100000001b3);
247        h2 = h2.wrapping_mul(0x100000001b3);
248        h2 ^= byte as u64;
249    }
250    (h1, h2)
251}
252
253/// SSTable value trait
254pub trait SSTableValue: Clone + Send + Sync {
255    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
256    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
257}
258
259/// u64 value implementation
260impl SSTableValue for u64 {
261    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
262        write_vint(writer, *self)
263    }
264
265    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
266        read_vint(reader)
267    }
268}
269
270/// Vec<u8> value implementation
271impl SSTableValue for Vec<u8> {
272    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
273        write_vint(writer, self.len() as u64)?;
274        writer.write_all(self)
275    }
276
277    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
278        let len = read_vint(reader)? as usize;
279        let mut data = vec![0u8; len];
280        reader.read_exact(&mut data)?;
281        Ok(data)
282    }
283}
284
285/// Sparse dimension info for SSTable-based sparse index
286/// Stores offset and length for posting list lookup
287#[derive(Debug, Clone, Copy, PartialEq, Eq)]
288pub struct SparseDimInfo {
289    /// Offset in sparse file where posting list starts
290    pub offset: u64,
291    /// Length of serialized posting list
292    pub length: u32,
293}
294
295impl SparseDimInfo {
296    pub fn new(offset: u64, length: u32) -> Self {
297        Self { offset, length }
298    }
299}
300
301impl SSTableValue for SparseDimInfo {
302    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
303        write_vint(writer, self.offset)?;
304        write_vint(writer, self.length as u64)
305    }
306
307    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
308        let offset = read_vint(reader)?;
309        let length = read_vint(reader)? as u32;
310        Ok(Self { offset, length })
311    }
312}
313
314/// Maximum number of postings that can be inlined in TermInfo
315pub const MAX_INLINE_POSTINGS: usize = 3;
316
317/// Term info for posting list references
318///
319/// Supports two modes:
320/// - **Inline**: Small posting lists (1-3 docs) stored directly in TermInfo
321/// - **External**: Larger posting lists stored in separate .post file
322///
323/// This eliminates a separate I/O read for rare/unique terms.
324#[derive(Debug, Clone, PartialEq, Eq)]
325pub enum TermInfo {
326    /// Small posting list inlined directly (up to MAX_INLINE_POSTINGS entries)
327    /// Each entry is (doc_id, term_freq) delta-encoded
328    Inline {
329        /// Number of postings (1-3)
330        doc_freq: u8,
331        /// Inline data: delta-encoded (doc_id, term_freq) pairs
332        /// Format: [delta_doc_id, term_freq, delta_doc_id, term_freq, ...]
333        data: [u8; 16],
334        /// Actual length of data used
335        data_len: u8,
336    },
337    /// Reference to external posting list in .post file
338    External {
339        posting_offset: u64,
340        posting_len: u64,
341        doc_freq: u32,
342        /// Position data offset (0 if no positions)
343        position_offset: u64,
344        /// Position data length (0 if no positions)
345        position_len: u64,
346    },
347}
348
349impl TermInfo {
350    /// Create an external reference
351    pub fn external(posting_offset: u64, posting_len: u64, doc_freq: u32) -> Self {
352        TermInfo::External {
353            posting_offset,
354            posting_len,
355            doc_freq,
356            position_offset: 0,
357            position_len: 0,
358        }
359    }
360
361    /// Create an external reference with position info
362    pub fn external_with_positions(
363        posting_offset: u64,
364        posting_len: u64,
365        doc_freq: u32,
366        position_offset: u64,
367        position_len: u64,
368    ) -> Self {
369        TermInfo::External {
370            posting_offset,
371            posting_len,
372            doc_freq,
373            position_offset,
374            position_len,
375        }
376    }
377
378    /// Try to create an inline TermInfo from posting data
379    /// Returns None if posting list is too large to inline
380    pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
381        if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
382            return None;
383        }
384
385        let mut data = [0u8; 16];
386        let mut cursor = std::io::Cursor::new(&mut data[..]);
387        let mut prev_doc_id = 0u32;
388
389        for (i, &doc_id) in doc_ids.iter().enumerate() {
390            let delta = doc_id - prev_doc_id;
391            if write_vint(&mut cursor, delta as u64).is_err() {
392                return None;
393            }
394            if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
395                return None;
396            }
397            prev_doc_id = doc_id;
398        }
399
400        let data_len = cursor.position() as u8;
401        if data_len > 16 {
402            return None;
403        }
404
405        Some(TermInfo::Inline {
406            doc_freq: doc_ids.len() as u8,
407            data,
408            data_len,
409        })
410    }
411
412    /// Try to create an inline TermInfo from an iterator of (doc_id, term_freq) pairs.
413    /// Zero-allocation alternative to `try_inline` — avoids collecting into Vec<u32>.
414    /// `count` is the number of postings (must match iterator length).
415    pub fn try_inline_iter(count: usize, iter: impl Iterator<Item = (u32, u32)>) -> Option<Self> {
416        if count > MAX_INLINE_POSTINGS || count == 0 {
417            return None;
418        }
419
420        let mut data = [0u8; 16];
421        let mut cursor = std::io::Cursor::new(&mut data[..]);
422        let mut prev_doc_id = 0u32;
423
424        for (doc_id, tf) in iter {
425            let delta = doc_id - prev_doc_id;
426            if write_vint(&mut cursor, delta as u64).is_err() {
427                return None;
428            }
429            if write_vint(&mut cursor, tf as u64).is_err() {
430                return None;
431            }
432            prev_doc_id = doc_id;
433        }
434
435        let data_len = cursor.position() as u8;
436
437        Some(TermInfo::Inline {
438            doc_freq: count as u8,
439            data,
440            data_len,
441        })
442    }
443
444    /// Get document frequency
445    pub fn doc_freq(&self) -> u32 {
446        match self {
447            TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
448            TermInfo::External { doc_freq, .. } => *doc_freq,
449        }
450    }
451
452    /// Check if this is an inline posting list
453    pub fn is_inline(&self) -> bool {
454        matches!(self, TermInfo::Inline { .. })
455    }
456
457    /// Get external posting info (offset, len) - returns None for inline
458    pub fn external_info(&self) -> Option<(u64, u64)> {
459        match self {
460            TermInfo::External {
461                posting_offset,
462                posting_len,
463                ..
464            } => Some((*posting_offset, *posting_len)),
465            TermInfo::Inline { .. } => None,
466        }
467    }
468
469    /// Get position info (offset, len) - returns None for inline or if no positions
470    pub fn position_info(&self) -> Option<(u64, u64)> {
471        match self {
472            TermInfo::External {
473                position_offset,
474                position_len,
475                ..
476            } if *position_len > 0 => Some((*position_offset, *position_len)),
477            _ => None,
478        }
479    }
480
481    /// Decode inline postings into (doc_ids, term_freqs)
482    /// Returns None if this is an external reference
483    pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
484        match self {
485            TermInfo::Inline {
486                doc_freq,
487                data,
488                data_len,
489            } => {
490                let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
491                let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
492                let mut reader = &data[..*data_len as usize];
493                let mut prev_doc_id = 0u32;
494
495                for _ in 0..*doc_freq {
496                    let delta = read_vint(&mut reader).ok()? as u32;
497                    let tf = read_vint(&mut reader).ok()? as u32;
498                    let doc_id = prev_doc_id + delta;
499                    doc_ids.push(doc_id);
500                    term_freqs.push(tf);
501                    prev_doc_id = doc_id;
502                }
503
504                Some((doc_ids, term_freqs))
505            }
506            TermInfo::External { .. } => None,
507        }
508    }
509}
510
511impl SSTableValue for TermInfo {
512    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
513        match self {
514            TermInfo::Inline {
515                doc_freq,
516                data,
517                data_len,
518            } => {
519                // Tag byte 0xFF = inline marker
520                writer.write_u8(0xFF)?;
521                writer.write_u8(*doc_freq)?;
522                writer.write_u8(*data_len)?;
523                writer.write_all(&data[..*data_len as usize])?;
524            }
525            TermInfo::External {
526                posting_offset,
527                posting_len,
528                doc_freq,
529                position_offset,
530                position_len,
531            } => {
532                // Tag byte 0x00 = external marker (no positions)
533                // Tag byte 0x01 = external with positions
534                if *position_len > 0 {
535                    writer.write_u8(0x01)?;
536                    write_vint(writer, *doc_freq as u64)?;
537                    write_vint(writer, *posting_offset)?;
538                    write_vint(writer, *posting_len)?;
539                    write_vint(writer, *position_offset)?;
540                    write_vint(writer, *position_len)?;
541                } else {
542                    writer.write_u8(0x00)?;
543                    write_vint(writer, *doc_freq as u64)?;
544                    write_vint(writer, *posting_offset)?;
545                    write_vint(writer, *posting_len)?;
546                }
547            }
548        }
549        Ok(())
550    }
551
552    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
553        let tag = reader.read_u8()?;
554
555        if tag == 0xFF {
556            // Inline
557            let doc_freq = reader.read_u8()?;
558            let data_len = reader.read_u8()?;
559            let mut data = [0u8; 16];
560            reader.read_exact(&mut data[..data_len as usize])?;
561            Ok(TermInfo::Inline {
562                doc_freq,
563                data,
564                data_len,
565            })
566        } else if tag == 0x00 {
567            // External (no positions)
568            let doc_freq = read_vint(reader)? as u32;
569            let posting_offset = read_vint(reader)?;
570            let posting_len = read_vint(reader)?;
571            Ok(TermInfo::External {
572                posting_offset,
573                posting_len,
574                doc_freq,
575                position_offset: 0,
576                position_len: 0,
577            })
578        } else if tag == 0x01 {
579            // External with positions
580            let doc_freq = read_vint(reader)? as u32;
581            let posting_offset = read_vint(reader)?;
582            let posting_len = read_vint(reader)?;
583            let position_offset = read_vint(reader)?;
584            let position_len = read_vint(reader)?;
585            Ok(TermInfo::External {
586                posting_offset,
587                posting_len,
588                doc_freq,
589                position_offset,
590                position_len,
591            })
592        } else {
593            Err(io::Error::new(
594                io::ErrorKind::InvalidData,
595                format!("Invalid TermInfo tag: {}", tag),
596            ))
597        }
598    }
599}
600
601/// Write variable-length integer
602pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
603    loop {
604        let byte = (value & 0x7F) as u8;
605        value >>= 7;
606        if value == 0 {
607            writer.write_u8(byte)?;
608            return Ok(());
609        } else {
610            writer.write_u8(byte | 0x80)?;
611        }
612    }
613}
614
615/// Read variable-length integer
616pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
617    let mut result = 0u64;
618    let mut shift = 0;
619
620    loop {
621        let byte = reader.read_u8()?;
622        result |= ((byte & 0x7F) as u64) << shift;
623        if byte & 0x80 == 0 {
624            return Ok(result);
625        }
626        shift += 7;
627        if shift >= 64 {
628            return Err(io::Error::new(
629                io::ErrorKind::InvalidData,
630                "varint too long",
631            ));
632        }
633    }
634}
635
636/// Compute common prefix length
637pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
638    a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
639}
640
641/// SSTable statistics for debugging
642#[derive(Debug, Clone)]
643pub struct SSTableStats {
644    pub num_blocks: usize,
645    pub num_sparse_entries: usize,
646    pub num_entries: u64,
647    pub has_bloom_filter: bool,
648    pub has_dictionary: bool,
649    pub bloom_filter_size: usize,
650    pub dictionary_size: usize,
651}
652
653/// SSTable writer configuration
654#[derive(Debug, Clone)]
655pub struct SSTableWriterConfig {
656    /// Compression level (1-22, higher = better compression but slower)
657    pub compression_level: CompressionLevel,
658    /// Whether to train and use a dictionary for compression
659    pub use_dictionary: bool,
660    /// Dictionary size in bytes (default 64KB)
661    pub dict_size: usize,
662    /// Whether to build a bloom filter
663    pub use_bloom_filter: bool,
664    /// Bloom filter bits per key (default 10 = ~1% false positive rate)
665    pub bloom_bits_per_key: usize,
666}
667
668impl Default for SSTableWriterConfig {
669    fn default() -> Self {
670        Self::from_optimization(crate::structures::IndexOptimization::default())
671    }
672}
673
674impl SSTableWriterConfig {
675    /// Create config from IndexOptimization mode
676    pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
677        use crate::structures::IndexOptimization;
678        match optimization {
679            IndexOptimization::Adaptive => Self {
680                compression_level: CompressionLevel::BETTER, // Level 9
681                use_dictionary: false,
682                dict_size: DEFAULT_DICT_SIZE,
683                use_bloom_filter: false,
684                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
685            },
686            IndexOptimization::SizeOptimized => Self {
687                compression_level: CompressionLevel::MAX, // Level 22
688                use_dictionary: true,
689                dict_size: DEFAULT_DICT_SIZE,
690                use_bloom_filter: true,
691                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
692            },
693            IndexOptimization::PerformanceOptimized => Self {
694                compression_level: CompressionLevel::FAST, // Level 1
695                use_dictionary: false,
696                dict_size: DEFAULT_DICT_SIZE,
697                use_bloom_filter: true, // Bloom helps skip blocks fast
698                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
699            },
700        }
701    }
702
703    /// Fast configuration - prioritize write speed over compression
704    pub fn fast() -> Self {
705        Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
706    }
707
708    /// Maximum compression configuration - prioritize size over speed
709    pub fn max_compression() -> Self {
710        Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
711    }
712}
713
714/// SSTable writer with optimizations:
715/// - Dictionary compression for blocks (if dictionary provided)
716/// - Configurable compression level
717/// - Block index prefix compression
718/// - Bloom filter for fast negative lookups
719pub struct SSTableWriter<W: Write, V: SSTableValue> {
720    writer: W,
721    block_buffer: Vec<u8>,
722    prev_key: Vec<u8>,
723    index: Vec<BlockIndexEntry>,
724    current_offset: u64,
725    num_entries: u64,
726    block_first_key: Option<Vec<u8>>,
727    config: SSTableWriterConfig,
728    /// Pre-trained dictionary for compression (optional)
729    dictionary: Option<CompressionDict>,
730    /// Bloom filter key hashes — compact (u64, u64) pairs instead of full keys.
731    /// Filter is built at finish() time with correct sizing.
732    bloom_hashes: Vec<(u64, u64)>,
733    _phantom: std::marker::PhantomData<V>,
734}
735
736impl<W: Write, V: SSTableValue> SSTableWriter<W, V> {
737    /// Create a new SSTable writer with default configuration
738    pub fn new(writer: W) -> Self {
739        Self::with_config(writer, SSTableWriterConfig::default())
740    }
741
742    /// Create a new SSTable writer with custom configuration
743    pub fn with_config(writer: W, config: SSTableWriterConfig) -> Self {
744        Self {
745            writer,
746            block_buffer: Vec::with_capacity(BLOCK_SIZE),
747            prev_key: Vec::new(),
748            index: Vec::new(),
749            current_offset: 0,
750            num_entries: 0,
751            block_first_key: None,
752            config,
753            dictionary: None,
754            bloom_hashes: Vec::new(),
755            _phantom: std::marker::PhantomData,
756        }
757    }
758
759    /// Create a new SSTable writer with a pre-trained dictionary
760    pub fn with_dictionary(
761        writer: W,
762        config: SSTableWriterConfig,
763        dictionary: CompressionDict,
764    ) -> Self {
765        Self {
766            writer,
767            block_buffer: Vec::with_capacity(BLOCK_SIZE),
768            prev_key: Vec::new(),
769            index: Vec::new(),
770            current_offset: 0,
771            num_entries: 0,
772            block_first_key: None,
773            config,
774            dictionary: Some(dictionary),
775            bloom_hashes: Vec::new(),
776            _phantom: std::marker::PhantomData,
777        }
778    }
779
780    pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
781        if self.block_first_key.is_none() {
782            self.block_first_key = Some(key.to_vec());
783        }
784
785        // Store compact hash pair for bloom filter (16 bytes vs ~48+ per key)
786        if self.config.use_bloom_filter {
787            self.bloom_hashes.push(bloom_hash_pair(key));
788        }
789
790        let prefix_len = common_prefix_len(&self.prev_key, key);
791        let suffix = &key[prefix_len..];
792
793        write_vint(&mut self.block_buffer, prefix_len as u64)?;
794        write_vint(&mut self.block_buffer, suffix.len() as u64)?;
795        self.block_buffer.extend_from_slice(suffix);
796        value.serialize(&mut self.block_buffer)?;
797
798        self.prev_key.clear();
799        self.prev_key.extend_from_slice(key);
800        self.num_entries += 1;
801
802        if self.block_buffer.len() >= BLOCK_SIZE {
803            self.flush_block()?;
804        }
805
806        Ok(())
807    }
808
809    /// Flush and compress the current block
810    fn flush_block(&mut self) -> io::Result<()> {
811        if self.block_buffer.is_empty() {
812            return Ok(());
813        }
814
815        // Compress block with dictionary if available
816        let compressed = if let Some(ref dict) = self.dictionary {
817            crate::compression::compress_with_dict(
818                &self.block_buffer,
819                self.config.compression_level,
820                dict,
821            )?
822        } else {
823            crate::compression::compress(&self.block_buffer, self.config.compression_level)?
824        };
825
826        if let Some(first_key) = self.block_first_key.take() {
827            self.index.push(BlockIndexEntry {
828                first_key,
829                offset: self.current_offset,
830                length: compressed.len() as u32,
831            });
832        }
833
834        self.writer.write_all(&compressed)?;
835        self.current_offset += compressed.len() as u64;
836        self.block_buffer.clear();
837        self.prev_key.clear();
838
839        Ok(())
840    }
841
842    pub fn finish(mut self) -> io::Result<W> {
843        // Flush any remaining data
844        self.flush_block()?;
845
846        // Build bloom filter from collected hashes (properly sized)
847        let bloom_filter = if self.config.use_bloom_filter && !self.bloom_hashes.is_empty() {
848            let mut bloom =
849                BloomFilter::new(self.bloom_hashes.len(), self.config.bloom_bits_per_key);
850            for (h1, h2) in &self.bloom_hashes {
851                bloom.insert_hashed(*h1, *h2);
852            }
853            Some(bloom)
854        } else {
855            None
856        };
857
858        let data_end_offset = self.current_offset;
859
860        // Build memory-efficient block index
861        // Convert to (key, BlockAddr) pairs for the new index format
862        let entries: Vec<(Vec<u8>, BlockAddr)> = self
863            .index
864            .iter()
865            .map(|e| {
866                (
867                    e.first_key.clone(),
868                    BlockAddr {
869                        offset: e.offset,
870                        length: e.length,
871                    },
872                )
873            })
874            .collect();
875
876        // Build FST-based index if native feature is enabled, otherwise use mmap index
877        #[cfg(feature = "native")]
878        let index_bytes = FstBlockIndex::build(&entries)?;
879        #[cfg(not(feature = "native"))]
880        let index_bytes = MmapBlockIndex::build(&entries)?;
881
882        // Write index bytes with length prefix
883        self.writer
884            .write_u32::<LittleEndian>(index_bytes.len() as u32)?;
885        self.writer.write_all(&index_bytes)?;
886        self.current_offset += 4 + index_bytes.len() as u64;
887
888        // Write bloom filter if present
889        let bloom_offset = if let Some(ref bloom) = bloom_filter {
890            let bloom_data = bloom.to_bytes();
891            let offset = self.current_offset;
892            self.writer.write_all(&bloom_data)?;
893            self.current_offset += bloom_data.len() as u64;
894            offset
895        } else {
896            0
897        };
898
899        // Write dictionary if present
900        let dict_offset = if let Some(ref dict) = self.dictionary {
901            let dict_bytes = dict.as_bytes();
902            let offset = self.current_offset;
903            self.writer
904                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
905            self.writer.write_all(dict_bytes)?;
906            self.current_offset += 4 + dict_bytes.len() as u64;
907            offset
908        } else {
909            0
910        };
911
912        // Write extended footer
913        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
914        self.writer.write_u64::<LittleEndian>(self.num_entries)?;
915        self.writer.write_u64::<LittleEndian>(bloom_offset)?; // 0 if no bloom
916        self.writer.write_u64::<LittleEndian>(dict_offset)?; // 0 if no dict
917        self.writer
918            .write_u8(self.config.compression_level.0 as u8)?;
919        self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
920
921        Ok(self.writer)
922    }
923}
924
925/// Block index entry
926#[derive(Debug, Clone)]
927struct BlockIndexEntry {
928    first_key: Vec<u8>,
929    offset: u64,
930    length: u32,
931}
932
933/// Async SSTable reader - loads blocks on demand via FileHandle
934///
935/// Memory-efficient design:
936/// - Block index uses FST (native) or mmap'd raw bytes - no heap allocation for keys
937/// - Block addresses stored in bitpacked format
938/// - Bloom filter and dictionary optional
939pub struct AsyncSSTableReader<V: SSTableValue> {
940    /// FileHandle for the data portion (blocks only) - fetches ranges on demand
941    data_slice: FileHandle,
942    /// Memory-efficient block index (FST or mmap)
943    block_index: BlockIndex,
944    num_entries: u64,
945    /// Hot cache for decompressed blocks
946    cache: RwLock<BlockCache>,
947    /// Bloom filter for fast negative lookups (optional)
948    bloom_filter: Option<BloomFilter>,
949    /// Compression dictionary (optional)
950    dictionary: Option<CompressionDict>,
951    /// Compression level used
952    #[allow(dead_code)]
953    compression_level: CompressionLevel,
954    _phantom: std::marker::PhantomData<V>,
955}
956
957/// LRU block cache — O(1) lookup/insert, amortized O(n) promotion.
958///
959/// On `get()`, promotes the accessed entry to MRU position.
960/// On eviction, removes the LRU entry (front of VecDeque).
961/// For typical cache sizes (16-64 blocks), the linear scan in
962/// `promote()` is negligible compared to I/O savings.
963struct BlockCache {
964    blocks: FxHashMap<u64, Arc<[u8]>>,
965    lru_order: std::collections::VecDeque<u64>,
966    max_blocks: usize,
967}
968
969impl BlockCache {
970    fn new(max_blocks: usize) -> Self {
971        Self {
972            blocks: FxHashMap::default(),
973            lru_order: std::collections::VecDeque::with_capacity(max_blocks),
974            max_blocks,
975        }
976    }
977
978    fn get(&mut self, offset: u64) -> Option<Arc<[u8]>> {
979        if self.blocks.contains_key(&offset) {
980            self.promote(offset);
981            self.blocks.get(&offset).map(Arc::clone)
982        } else {
983            None
984        }
985    }
986
987    /// Read-only cache probe — no LRU promotion, safe behind a read lock.
988    fn peek(&self, offset: u64) -> Option<Arc<[u8]>> {
989        self.blocks.get(&offset).map(Arc::clone)
990    }
991
992    fn insert(&mut self, offset: u64, block: Arc<[u8]>) {
993        if self.blocks.contains_key(&offset) {
994            self.promote(offset);
995            return;
996        }
997        while self.blocks.len() >= self.max_blocks {
998            if let Some(evict_offset) = self.lru_order.pop_front() {
999                self.blocks.remove(&evict_offset);
1000            } else {
1001                break;
1002            }
1003        }
1004        self.blocks.insert(offset, block);
1005        self.lru_order.push_back(offset);
1006    }
1007
1008    /// Move entry to MRU position (back of deque)
1009    fn promote(&mut self, offset: u64) {
1010        if let Some(pos) = self.lru_order.iter().position(|&k| k == offset) {
1011            self.lru_order.remove(pos);
1012            self.lru_order.push_back(offset);
1013        }
1014    }
1015}
1016
1017impl<V: SSTableValue> AsyncSSTableReader<V> {
1018    /// Open an SSTable from a FileHandle
1019    /// Only loads the footer and index into memory, data blocks fetched on-demand
1020    ///
1021    /// Uses FST-based (native) or mmap'd block index (no heap allocation for keys)
1022    pub async fn open(file_handle: FileHandle, cache_blocks: usize) -> io::Result<Self> {
1023        let file_len = file_handle.len();
1024        if file_len < 37 {
1025            return Err(io::Error::new(
1026                io::ErrorKind::InvalidData,
1027                "SSTable too small",
1028            ));
1029        }
1030
1031        // Read footer (37 bytes)
1032        // Format: data_end(8) + num_entries(8) + bloom_offset(8) + dict_offset(8) + compression_level(1) + magic(4)
1033        let footer_bytes = file_handle
1034            .read_bytes_range(file_len - 37..file_len)
1035            .await?;
1036
1037        let mut reader = footer_bytes.as_slice();
1038        let data_end_offset = reader.read_u64::<LittleEndian>()?;
1039        let num_entries = reader.read_u64::<LittleEndian>()?;
1040        let bloom_offset = reader.read_u64::<LittleEndian>()?;
1041        let dict_offset = reader.read_u64::<LittleEndian>()?;
1042        let compression_level = CompressionLevel(reader.read_u8()? as i32);
1043        let magic = reader.read_u32::<LittleEndian>()?;
1044
1045        if magic != SSTABLE_MAGIC {
1046            return Err(io::Error::new(
1047                io::ErrorKind::InvalidData,
1048                format!("Invalid SSTable magic: 0x{:08X}", magic),
1049            ));
1050        }
1051
1052        // Read index section
1053        let index_start = data_end_offset;
1054        let index_end = file_len - 37;
1055        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
1056
1057        // Parse block index (length-prefixed FST or mmap index)
1058        let mut idx_reader = index_bytes.as_slice();
1059        let index_len = idx_reader.read_u32::<LittleEndian>()? as usize;
1060
1061        if index_len > idx_reader.len() {
1062            return Err(io::Error::new(
1063                io::ErrorKind::InvalidData,
1064                "Index data truncated",
1065            ));
1066        }
1067
1068        let index_data = index_bytes.slice(4..4 + index_len);
1069
1070        // Try FST first (native), fall back to mmap
1071        #[cfg(feature = "native")]
1072        let block_index = match FstBlockIndex::load(index_data.clone()) {
1073            Ok(fst_idx) => BlockIndex::Fst(fst_idx),
1074            Err(_) => BlockIndex::Mmap(MmapBlockIndex::load(index_data)?),
1075        };
1076        #[cfg(not(feature = "native"))]
1077        let block_index = BlockIndex::Mmap(MmapBlockIndex::load(index_data)?);
1078
1079        // Load bloom filter if present
1080        let bloom_filter = if bloom_offset > 0 {
1081            let bloom_start = bloom_offset;
1082            // Read bloom filter size first (12 bytes header)
1083            let bloom_header = file_handle
1084                .read_bytes_range(bloom_start..bloom_start + 12)
1085                .await?;
1086            let num_words = u32::from_le_bytes([
1087                bloom_header[8],
1088                bloom_header[9],
1089                bloom_header[10],
1090                bloom_header[11],
1091            ]) as u64;
1092            let bloom_size = 12 + num_words * 8;
1093            let bloom_data = file_handle
1094                .read_bytes_range(bloom_start..bloom_start + bloom_size)
1095                .await?;
1096            Some(BloomFilter::from_owned_bytes(bloom_data)?)
1097        } else {
1098            None
1099        };
1100
1101        // Load dictionary if present
1102        let dictionary = if dict_offset > 0 {
1103            let dict_start = dict_offset;
1104            // Read dictionary size first
1105            let dict_len_bytes = file_handle
1106                .read_bytes_range(dict_start..dict_start + 4)
1107                .await?;
1108            let dict_len = u32::from_le_bytes([
1109                dict_len_bytes[0],
1110                dict_len_bytes[1],
1111                dict_len_bytes[2],
1112                dict_len_bytes[3],
1113            ]) as u64;
1114            let dict_data = file_handle
1115                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
1116                .await?;
1117            Some(CompressionDict::from_owned_bytes(dict_data))
1118        } else {
1119            None
1120        };
1121
1122        // Create a lazy slice for just the data portion
1123        let data_slice = file_handle.slice(0..data_end_offset);
1124
1125        Ok(Self {
1126            data_slice,
1127            block_index,
1128            num_entries,
1129            cache: RwLock::new(BlockCache::new(cache_blocks)),
1130            bloom_filter,
1131            dictionary,
1132            compression_level,
1133            _phantom: std::marker::PhantomData,
1134        })
1135    }
1136
1137    /// Number of entries
1138    pub fn num_entries(&self) -> u64 {
1139        self.num_entries
1140    }
1141
1142    /// Get stats about this SSTable for debugging
1143    pub fn stats(&self) -> SSTableStats {
1144        SSTableStats {
1145            num_blocks: self.block_index.len(),
1146            num_sparse_entries: 0, // No longer using sparse index separately
1147            num_entries: self.num_entries,
1148            has_bloom_filter: self.bloom_filter.is_some(),
1149            has_dictionary: self.dictionary.is_some(),
1150            bloom_filter_size: self
1151                .bloom_filter
1152                .as_ref()
1153                .map(|b| b.size_bytes())
1154                .unwrap_or(0),
1155            dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
1156        }
1157    }
1158
1159    /// Number of blocks currently in the cache
1160    pub fn cached_blocks(&self) -> usize {
1161        self.cache.read().blocks.len()
1162    }
1163
1164    /// Look up a key (async - may need to load block)
1165    ///
1166    /// Uses bloom filter for fast negative lookups, then memory-efficient
1167    /// block index to locate the block, reducing I/O to typically 1 block read.
1168    pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
1169        log::debug!(
1170            "SSTable::get called, key_len={}, total_blocks={}",
1171            key.len(),
1172            self.block_index.len()
1173        );
1174
1175        // Check bloom filter first - fast negative lookup
1176        if let Some(ref bloom) = self.bloom_filter
1177            && !bloom.may_contain(key)
1178        {
1179            log::debug!("SSTable::get bloom filter negative");
1180            return Ok(None);
1181        }
1182
1183        // Use block index to find the block that could contain the key
1184        let block_idx = match self.block_index.locate(key) {
1185            Some(idx) => idx,
1186            None => {
1187                log::debug!("SSTable::get key not found (before first block)");
1188                return Ok(None);
1189            }
1190        };
1191
1192        log::debug!("SSTable::get loading block_idx={}", block_idx);
1193
1194        // Now we know exactly which block to load - single I/O
1195        let block_data = self.load_block(block_idx).await?;
1196        self.search_block(&block_data, key)
1197    }
1198
1199    /// Batch lookup multiple keys with optimized I/O
1200    ///
1201    /// Groups keys by block and loads each block only once, reducing
1202    /// I/O from N reads to at most N reads (often fewer if keys share blocks).
1203    /// Uses bloom filter to skip keys that definitely don't exist.
1204    pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
1205        if keys.is_empty() {
1206            return Ok(Vec::new());
1207        }
1208
1209        // Map each key to its block index
1210        let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
1211        for (key_idx, key) in keys.iter().enumerate() {
1212            // Check bloom filter first
1213            if let Some(ref bloom) = self.bloom_filter
1214                && !bloom.may_contain(key)
1215            {
1216                key_to_block.push((key_idx, usize::MAX)); // Definitely not present
1217                continue;
1218            }
1219
1220            match self.block_index.locate(key) {
1221                Some(block_idx) => key_to_block.push((key_idx, block_idx)),
1222                None => key_to_block.push((key_idx, usize::MAX)), // Mark as not found
1223            }
1224        }
1225
1226        // Group keys by block
1227        let mut blocks_to_load: Vec<usize> = key_to_block
1228            .iter()
1229            .filter(|(_, b)| *b != usize::MAX)
1230            .map(|(_, b)| *b)
1231            .collect();
1232        blocks_to_load.sort_unstable();
1233        blocks_to_load.dedup();
1234
1235        // Load all needed blocks (this is where I/O happens)
1236        for &block_idx in &blocks_to_load {
1237            let _ = self.load_block(block_idx).await?;
1238        }
1239
1240        // Now search each key in its block (all blocks are cached)
1241        let mut results = vec![None; keys.len()];
1242        for (key_idx, block_idx) in key_to_block {
1243            if block_idx == usize::MAX {
1244                continue;
1245            }
1246            let block_data = self.load_block(block_idx).await?; // Will hit cache
1247            results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
1248        }
1249
1250        Ok(results)
1251    }
1252
1253    /// Preload all data blocks into memory
1254    ///
1255    /// Call this after open() to eliminate all I/O during subsequent lookups.
1256    /// Useful when the SSTable is small enough to fit in memory.
1257    pub async fn preload_all_blocks(&self) -> io::Result<()> {
1258        for block_idx in 0..self.block_index.len() {
1259            self.load_block(block_idx).await?;
1260        }
1261        Ok(())
1262    }
1263
1264    /// Prefetch all data blocks via a single bulk I/O operation.
1265    ///
1266    /// Reads the entire compressed data section in one call, then decompresses
1267    /// each block and populates the cache. This turns N individual reads into 1.
1268    /// Cache capacity is expanded to hold all blocks.
1269    pub async fn prefetch_all_data_bulk(&self) -> io::Result<()> {
1270        let num_blocks = self.block_index.len();
1271        if num_blocks == 0 {
1272            return Ok(());
1273        }
1274
1275        // Find total data extent
1276        let mut max_end: u64 = 0;
1277        for i in 0..num_blocks {
1278            if let Some(addr) = self.block_index.get_addr(i) {
1279                max_end = max_end.max(addr.offset + addr.length as u64);
1280            }
1281        }
1282
1283        // Single bulk read of entire data section
1284        let all_data = self.data_slice.read_bytes_range(0..max_end).await?;
1285        let buf = all_data.as_slice();
1286
1287        // Expand cache and decompress all blocks
1288        let mut cache = self.cache.write();
1289        cache.max_blocks = cache.max_blocks.max(num_blocks);
1290        for i in 0..num_blocks {
1291            let addr = self.block_index.get_addr(i).unwrap();
1292            if cache.get(addr.offset).is_some() {
1293                continue;
1294            }
1295            let compressed =
1296                &buf[addr.offset as usize..(addr.offset + addr.length as u64) as usize];
1297            let decompressed = if let Some(ref dict) = self.dictionary {
1298                crate::compression::decompress_with_dict(compressed, dict)?
1299            } else {
1300                crate::compression::decompress(compressed)?
1301            };
1302            cache.insert(addr.offset, Arc::from(decompressed));
1303        }
1304
1305        Ok(())
1306    }
1307
1308    /// Load a block (checks cache first, then loads from FileSlice)
1309    /// Uses dictionary decompression if dictionary is present
1310    async fn load_block(&self, block_idx: usize) -> io::Result<Arc<[u8]>> {
1311        let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
1312            io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
1313        })?;
1314
1315        // Fast path: read-lock peek (no LRU promotion, zero writer contention)
1316        {
1317            if let Some(block) = self.cache.read().peek(addr.offset) {
1318                return Ok(block);
1319            }
1320        }
1321
1322        log::debug!(
1323            "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
1324            block_idx,
1325            addr.offset,
1326            addr.offset + addr.length as u64
1327        );
1328
1329        // Load from FileSlice
1330        let range = addr.byte_range();
1331        let compressed = self.data_slice.read_bytes_range(range).await?;
1332
1333        // Decompress with dictionary if available
1334        let decompressed = if let Some(ref dict) = self.dictionary {
1335            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1336        } else {
1337            crate::compression::decompress(compressed.as_slice())?
1338        };
1339
1340        let block: Arc<[u8]> = Arc::from(decompressed);
1341
1342        // Insert into cache (write lock, promotes LRU)
1343        {
1344            let mut cache = self.cache.write();
1345            cache.insert(addr.offset, Arc::clone(&block));
1346        }
1347
1348        Ok(block)
1349    }
1350
1351    /// Synchronous block load — only works for Inline (mmap/RAM) file handles.
1352    #[cfg(feature = "sync")]
1353    fn load_block_sync(&self, block_idx: usize) -> io::Result<Arc<[u8]>> {
1354        let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
1355            io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
1356        })?;
1357
1358        // Fast path: read-lock peek (no LRU promotion, zero writer contention)
1359        {
1360            if let Some(block) = self.cache.read().peek(addr.offset) {
1361                return Ok(block);
1362            }
1363        }
1364
1365        // Load from FileSlice (sync — requires Inline handle)
1366        let range = addr.byte_range();
1367        let compressed = self.data_slice.read_bytes_range_sync(range)?;
1368
1369        // Decompress with dictionary if available
1370        let decompressed = if let Some(ref dict) = self.dictionary {
1371            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1372        } else {
1373            crate::compression::decompress(compressed.as_slice())?
1374        };
1375
1376        let block: Arc<[u8]> = Arc::from(decompressed);
1377
1378        // Insert into cache (write lock, promotes LRU)
1379        {
1380            let mut cache = self.cache.write();
1381            cache.insert(addr.offset, Arc::clone(&block));
1382        }
1383
1384        Ok(block)
1385    }
1386
1387    /// Synchronous key lookup — only works for Inline (mmap/RAM) file handles.
1388    #[cfg(feature = "sync")]
1389    pub fn get_sync(&self, key: &[u8]) -> io::Result<Option<V>> {
1390        // Check bloom filter first — fast negative lookup
1391        if let Some(ref bloom) = self.bloom_filter
1392            && !bloom.may_contain(key)
1393        {
1394            return Ok(None);
1395        }
1396
1397        // Use block index to find the block that could contain the key
1398        let block_idx = match self.block_index.locate(key) {
1399            Some(idx) => idx,
1400            None => {
1401                return Ok(None);
1402            }
1403        };
1404
1405        let block_data = self.load_block_sync(block_idx)?;
1406        self.search_block(&block_data, key)
1407    }
1408
1409    fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
1410        let mut reader = block_data;
1411        let mut current_key = Vec::new();
1412
1413        while !reader.is_empty() {
1414            let common_prefix_len = read_vint(&mut reader)? as usize;
1415            let suffix_len = read_vint(&mut reader)? as usize;
1416
1417            if suffix_len > reader.len() {
1418                return Err(io::Error::new(
1419                    io::ErrorKind::UnexpectedEof,
1420                    "SSTable block suffix truncated",
1421                ));
1422            }
1423            current_key.truncate(common_prefix_len);
1424            current_key.extend_from_slice(&reader[..suffix_len]);
1425            reader = &reader[suffix_len..];
1426
1427            let value = V::deserialize(&mut reader)?;
1428
1429            match current_key.as_slice().cmp(target_key) {
1430                std::cmp::Ordering::Equal => return Ok(Some(value)),
1431                std::cmp::Ordering::Greater => return Ok(None),
1432                std::cmp::Ordering::Less => continue,
1433            }
1434        }
1435
1436        Ok(None)
1437    }
1438
1439    /// Prefetch blocks for a key range
1440    pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
1441        let start_block = self.block_index.locate(start_key).unwrap_or(0);
1442        let end_block = self
1443            .block_index
1444            .locate(end_key)
1445            .unwrap_or(self.block_index.len().saturating_sub(1));
1446
1447        for block_idx in start_block..=end_block.min(self.block_index.len().saturating_sub(1)) {
1448            let _ = self.load_block(block_idx).await?;
1449        }
1450
1451        Ok(())
1452    }
1453
1454    /// Iterate over all entries (loads blocks as needed)
1455    pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
1456        AsyncSSTableIterator::new(self)
1457    }
1458
1459    /// Get all entries as a vector (for merging)
1460    pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
1461        let mut results = Vec::new();
1462
1463        for block_idx in 0..self.block_index.len() {
1464            let block_data = self.load_block(block_idx).await?;
1465            let mut reader = &block_data[..];
1466            let mut current_key = Vec::new();
1467
1468            while !reader.is_empty() {
1469                let common_prefix_len = read_vint(&mut reader)? as usize;
1470                let suffix_len = read_vint(&mut reader)? as usize;
1471
1472                if suffix_len > reader.len() {
1473                    return Err(io::Error::new(
1474                        io::ErrorKind::UnexpectedEof,
1475                        "SSTable block suffix truncated",
1476                    ));
1477                }
1478                current_key.truncate(common_prefix_len);
1479                current_key.extend_from_slice(&reader[..suffix_len]);
1480                reader = &reader[suffix_len..];
1481
1482                let value = V::deserialize(&mut reader)?;
1483                results.push((current_key.clone(), value));
1484            }
1485        }
1486
1487        Ok(results)
1488    }
1489}
1490
1491/// Async iterator over SSTable entries
1492pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
1493    reader: &'a AsyncSSTableReader<V>,
1494    current_block: usize,
1495    block_data: Option<Arc<[u8]>>,
1496    block_offset: usize,
1497    current_key: Vec<u8>,
1498    finished: bool,
1499}
1500
1501impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
1502    fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
1503        Self {
1504            reader,
1505            current_block: 0,
1506            block_data: None,
1507            block_offset: 0,
1508            current_key: Vec::new(),
1509            finished: reader.block_index.is_empty(),
1510        }
1511    }
1512
1513    async fn load_next_block(&mut self) -> io::Result<bool> {
1514        if self.current_block >= self.reader.block_index.len() {
1515            self.finished = true;
1516            return Ok(false);
1517        }
1518
1519        self.block_data = Some(self.reader.load_block(self.current_block).await?);
1520        self.block_offset = 0;
1521        self.current_key.clear();
1522        self.current_block += 1;
1523        Ok(true)
1524    }
1525
1526    /// Advance to next entry (async)
1527    pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
1528        if self.finished {
1529            return Ok(None);
1530        }
1531
1532        if self.block_data.is_none() && !self.load_next_block().await? {
1533            return Ok(None);
1534        }
1535
1536        loop {
1537            let block = self.block_data.as_ref().unwrap();
1538            if self.block_offset >= block.len() {
1539                if !self.load_next_block().await? {
1540                    return Ok(None);
1541                }
1542                continue;
1543            }
1544
1545            let mut reader = &block[self.block_offset..];
1546            let start_len = reader.len();
1547
1548            let common_prefix_len = read_vint(&mut reader)? as usize;
1549            let suffix_len = read_vint(&mut reader)? as usize;
1550
1551            if suffix_len > reader.len() {
1552                return Err(io::Error::new(
1553                    io::ErrorKind::UnexpectedEof,
1554                    "SSTable block suffix truncated",
1555                ));
1556            }
1557            self.current_key.truncate(common_prefix_len);
1558            self.current_key.extend_from_slice(&reader[..suffix_len]);
1559            reader = &reader[suffix_len..];
1560
1561            let value = V::deserialize(&mut reader)?;
1562
1563            self.block_offset += start_len - reader.len();
1564
1565            return Ok(Some((self.current_key.clone(), value)));
1566        }
1567    }
1568}
1569
1570#[cfg(test)]
1571mod tests {
1572    use super::*;
1573
1574    #[test]
1575    fn test_bloom_filter_basic() {
1576        let mut bloom = BloomFilter::new(100, 10);
1577
1578        bloom.insert(b"hello");
1579        bloom.insert(b"world");
1580        bloom.insert(b"test");
1581
1582        assert!(bloom.may_contain(b"hello"));
1583        assert!(bloom.may_contain(b"world"));
1584        assert!(bloom.may_contain(b"test"));
1585
1586        // These should likely return false (with ~1% false positive rate)
1587        assert!(!bloom.may_contain(b"notfound"));
1588        assert!(!bloom.may_contain(b"missing"));
1589    }
1590
1591    #[test]
1592    fn test_bloom_filter_serialization() {
1593        let mut bloom = BloomFilter::new(100, 10);
1594        bloom.insert(b"key1");
1595        bloom.insert(b"key2");
1596
1597        let bytes = bloom.to_bytes();
1598        let restored = BloomFilter::from_owned_bytes(OwnedBytes::new(bytes)).unwrap();
1599
1600        assert!(restored.may_contain(b"key1"));
1601        assert!(restored.may_contain(b"key2"));
1602        assert!(!restored.may_contain(b"key3"));
1603    }
1604
1605    #[test]
1606    fn test_bloom_filter_false_positive_rate() {
1607        let num_keys = 10000;
1608        let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
1609
1610        // Insert keys
1611        for i in 0..num_keys {
1612            let key = format!("key_{}", i);
1613            bloom.insert(key.as_bytes());
1614        }
1615
1616        // All inserted keys should be found
1617        for i in 0..num_keys {
1618            let key = format!("key_{}", i);
1619            assert!(bloom.may_contain(key.as_bytes()));
1620        }
1621
1622        // Check false positive rate on non-existent keys
1623        let mut false_positives = 0;
1624        let test_count = 10000;
1625        for i in 0..test_count {
1626            let key = format!("nonexistent_{}", i);
1627            if bloom.may_contain(key.as_bytes()) {
1628                false_positives += 1;
1629            }
1630        }
1631
1632        // With 10 bits per key, expect ~1% false positive rate
1633        // Allow up to 3% due to hash function variance
1634        let fp_rate = false_positives as f64 / test_count as f64;
1635        assert!(
1636            fp_rate < 0.03,
1637            "False positive rate {} is too high",
1638            fp_rate
1639        );
1640    }
1641
1642    #[test]
1643    fn test_sstable_writer_config() {
1644        use crate::structures::IndexOptimization;
1645
1646        // Default = Adaptive
1647        let config = SSTableWriterConfig::default();
1648        assert_eq!(config.compression_level.0, 9); // BETTER
1649        assert!(!config.use_bloom_filter);
1650        assert!(!config.use_dictionary);
1651
1652        // Adaptive
1653        let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
1654        assert_eq!(adaptive.compression_level.0, 9);
1655        assert!(!adaptive.use_bloom_filter);
1656        assert!(!adaptive.use_dictionary);
1657
1658        // SizeOptimized
1659        let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
1660        assert_eq!(size.compression_level.0, 22); // MAX
1661        assert!(size.use_bloom_filter);
1662        assert!(size.use_dictionary);
1663
1664        // PerformanceOptimized
1665        let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
1666        assert_eq!(perf.compression_level.0, 1); // FAST
1667        assert!(perf.use_bloom_filter); // Bloom helps skip blocks fast
1668        assert!(!perf.use_dictionary);
1669
1670        // Aliases
1671        let fast = SSTableWriterConfig::fast();
1672        assert_eq!(fast.compression_level.0, 1);
1673
1674        let max = SSTableWriterConfig::max_compression();
1675        assert_eq!(max.compression_level.0, 22);
1676    }
1677
1678    #[test]
1679    fn test_vint_roundtrip() {
1680        let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
1681
1682        for &val in &test_values {
1683            let mut buf = Vec::new();
1684            write_vint(&mut buf, val).unwrap();
1685            let mut reader = buf.as_slice();
1686            let decoded = read_vint(&mut reader).unwrap();
1687            assert_eq!(val, decoded, "Failed for value {}", val);
1688        }
1689    }
1690
1691    #[test]
1692    fn test_common_prefix_len() {
1693        assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
1694        assert_eq!(common_prefix_len(b"hello", b"help"), 3);
1695        assert_eq!(common_prefix_len(b"hello", b"world"), 0);
1696        assert_eq!(common_prefix_len(b"", b"hello"), 0);
1697        assert_eq!(common_prefix_len(b"hello", b""), 0);
1698    }
1699}