Skip to main content

hermes_core/structures/
sstable.rs

1//! Async SSTable with lazy loading via FileSlice
2//!
3//! Memory-efficient design - only loads minimal metadata into memory,
4//! blocks are loaded on-demand.
5//!
6//! ## Key Features
7//!
8//! 1. **FST-based Block Index**: Uses Finite State Transducer for key lookup
9//!    - Can be mmap'd directly without parsing into heap-allocated structures
10//!    - ~90% memory reduction compared to Vec<BlockIndexEntry>
11//!
12//! 2. **Bitpacked Block Addresses**: Offsets and lengths stored with delta encoding
13//!    - Minimal memory footprint for block metadata
14//!
15//! 3. **Dictionary Compression**: Zstd dictionary for 15-30% better compression
16//!
17//! 4. **Configurable Compression Level**: Levels 1-22 for space/speed tradeoff
18//!
19//! 5. **Bloom Filter**: Fast negative lookups to skip unnecessary I/O
20
21use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
22use parking_lot::RwLock;
23use rustc_hash::FxHashMap;
24use std::io::{self, Read, Write};
25use std::sync::Arc;
26
27#[cfg(feature = "native")]
28use super::sstable_index::FstBlockIndex;
29use super::sstable_index::{BlockAddr, BlockIndex, MmapBlockIndex};
30use crate::compression::{CompressionDict, CompressionLevel};
31use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice, OwnedBytes};
32
33/// SSTable magic number - version 4 with memory-efficient index
34/// Uses FST-based or mmap'd block index to avoid heap allocation
35pub const SSTABLE_MAGIC: u32 = 0x53544234; // "STB4"
36
37/// Block size for SSTable (16KB default)
38pub const BLOCK_SIZE: usize = 16 * 1024;
39
40/// Default dictionary size (64KB)
41pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
42
43/// Bloom filter bits per key (10 bits ≈ 1% false positive rate)
44pub const BLOOM_BITS_PER_KEY: usize = 10;
45
46/// Bloom filter hash count (optimal for 10 bits/key)
47pub const BLOOM_HASH_COUNT: usize = 7;
48
49// ============================================================================
50// Bloom Filter Implementation
51// ============================================================================
52
53/// Simple bloom filter for key existence checks
54#[derive(Debug, Clone)]
55pub struct BloomFilter {
56    bits: BloomBits,
57    num_bits: usize,
58    num_hashes: usize,
59}
60
61/// Bloom filter storage — Vec for write path, OwnedBytes for zero-copy read path.
62#[derive(Debug, Clone)]
63enum BloomBits {
64    /// Mutable storage for building (SSTable writer)
65    Vec(Vec<u64>),
66    /// Zero-copy mmap reference for reading (raw LE u64 words, no header)
67    Bytes(OwnedBytes),
68}
69
70impl BloomBits {
71    #[inline]
72    fn len(&self) -> usize {
73        match self {
74            BloomBits::Vec(v) => v.len(),
75            BloomBits::Bytes(b) => b.len() / 8,
76        }
77    }
78
79    #[inline]
80    fn get(&self, word_idx: usize) -> u64 {
81        match self {
82            BloomBits::Vec(v) => v[word_idx],
83            BloomBits::Bytes(b) => {
84                let off = word_idx * 8;
85                u64::from_le_bytes([
86                    b[off],
87                    b[off + 1],
88                    b[off + 2],
89                    b[off + 3],
90                    b[off + 4],
91                    b[off + 5],
92                    b[off + 6],
93                    b[off + 7],
94                ])
95            }
96        }
97    }
98
99    #[inline]
100    fn set_bit(&mut self, word_idx: usize, bit_idx: usize) {
101        match self {
102            BloomBits::Vec(v) => v[word_idx] |= 1u64 << bit_idx,
103            BloomBits::Bytes(_) => panic!("cannot mutate read-only bloom filter"),
104        }
105    }
106
107    fn size_bytes(&self) -> usize {
108        match self {
109            BloomBits::Vec(v) => v.len() * 8,
110            BloomBits::Bytes(b) => b.len(),
111        }
112    }
113}
114
115impl BloomFilter {
116    /// Create a new bloom filter sized for expected number of keys
117    pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
118        let num_bits = (expected_keys * bits_per_key).max(64);
119        let num_words = num_bits.div_ceil(64);
120        Self {
121            bits: BloomBits::Vec(vec![0u64; num_words]),
122            num_bits,
123            num_hashes: BLOOM_HASH_COUNT,
124        }
125    }
126
127    /// Create from serialized OwnedBytes (zero-copy for mmap)
128    pub fn from_owned_bytes(data: OwnedBytes) -> io::Result<Self> {
129        if data.len() < 12 {
130            return Err(io::Error::new(
131                io::ErrorKind::InvalidData,
132                "Bloom filter data too short",
133            ));
134        }
135        let d = data.as_slice();
136        let num_bits = u32::from_le_bytes([d[0], d[1], d[2], d[3]]) as usize;
137        let num_hashes = u32::from_le_bytes([d[4], d[5], d[6], d[7]]) as usize;
138        let num_words = u32::from_le_bytes([d[8], d[9], d[10], d[11]]) as usize;
139
140        if d.len() < 12 + num_words * 8 {
141            return Err(io::Error::new(
142                io::ErrorKind::InvalidData,
143                "Bloom filter data truncated",
144            ));
145        }
146
147        // Slice past the 12-byte header to get raw u64 LE words (zero-copy)
148        let bits_bytes = data.slice(12..12 + num_words * 8);
149
150        Ok(Self {
151            bits: BloomBits::Bytes(bits_bytes),
152            num_bits,
153            num_hashes,
154        })
155    }
156
157    /// Serialize to bytes
158    pub fn to_bytes(&self) -> Vec<u8> {
159        let num_words = self.bits.len();
160        let mut data = Vec::with_capacity(12 + num_words * 8);
161        data.write_u32::<LittleEndian>(self.num_bits as u32)
162            .unwrap();
163        data.write_u32::<LittleEndian>(self.num_hashes as u32)
164            .unwrap();
165        data.write_u32::<LittleEndian>(num_words as u32).unwrap();
166        for i in 0..num_words {
167            data.write_u64::<LittleEndian>(self.bits.get(i)).unwrap();
168        }
169        data
170    }
171
172    /// Add a key to the filter
173    pub fn insert(&mut self, key: &[u8]) {
174        let (h1, h2) = self.hash_pair(key);
175        for i in 0..self.num_hashes {
176            let bit_pos = self.get_bit_pos(h1, h2, i);
177            let word_idx = bit_pos / 64;
178            let bit_idx = bit_pos % 64;
179            if word_idx < self.bits.len() {
180                self.bits.set_bit(word_idx, bit_idx);
181            }
182        }
183    }
184
185    /// Check if a key might be in the filter
186    /// Returns false if definitely not present, true if possibly present
187    pub fn may_contain(&self, key: &[u8]) -> bool {
188        let (h1, h2) = self.hash_pair(key);
189        for i in 0..self.num_hashes {
190            let bit_pos = self.get_bit_pos(h1, h2, i);
191            let word_idx = bit_pos / 64;
192            let bit_idx = bit_pos % 64;
193            if word_idx >= self.bits.len() || (self.bits.get(word_idx) & (1u64 << bit_idx)) == 0 {
194                return false;
195            }
196        }
197        true
198    }
199
200    /// Size in bytes
201    pub fn size_bytes(&self) -> usize {
202        12 + self.bits.size_bytes()
203    }
204
205    /// Insert a pre-computed hash pair into the filter
206    pub fn insert_hashed(&mut self, h1: u64, h2: u64) {
207        for i in 0..self.num_hashes {
208            let bit_pos = self.get_bit_pos(h1, h2, i);
209            let word_idx = bit_pos / 64;
210            let bit_idx = bit_pos % 64;
211            if word_idx < self.bits.len() {
212                self.bits.set_bit(word_idx, bit_idx);
213            }
214        }
215    }
216
217    /// Compute two hash values using FNV-1a variant (single pass over key bytes)
218    #[inline]
219    fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
220        let mut h1: u64 = 0xcbf29ce484222325;
221        let mut h2: u64 = 0x84222325cbf29ce4;
222        for &byte in key {
223            h1 ^= byte as u64;
224            h1 = h1.wrapping_mul(0x100000001b3);
225            h2 = h2.wrapping_mul(0x100000001b3);
226            h2 ^= byte as u64;
227        }
228        (h1, h2)
229    }
230
231    /// Get bit position for hash iteration i using double hashing
232    #[inline]
233    fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
234        (h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
235    }
236}
237
238/// Compute bloom filter hash pair for a key (standalone, no BloomFilter needed).
239/// Uses the same FNV-1a double-hashing as BloomFilter::hash_pair (single pass).
240#[inline]
241fn bloom_hash_pair(key: &[u8]) -> (u64, u64) {
242    let mut h1: u64 = 0xcbf29ce484222325;
243    let mut h2: u64 = 0x84222325cbf29ce4;
244    for &byte in key {
245        h1 ^= byte as u64;
246        h1 = h1.wrapping_mul(0x100000001b3);
247        h2 = h2.wrapping_mul(0x100000001b3);
248        h2 ^= byte as u64;
249    }
250    (h1, h2)
251}
252
253/// SSTable value trait
254pub trait SSTableValue: Clone + Send + Sync {
255    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
256    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
257}
258
259/// u64 value implementation
260impl SSTableValue for u64 {
261    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
262        write_vint(writer, *self)
263    }
264
265    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
266        read_vint(reader)
267    }
268}
269
270/// Vec<u8> value implementation
271impl SSTableValue for Vec<u8> {
272    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
273        write_vint(writer, self.len() as u64)?;
274        writer.write_all(self)
275    }
276
277    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
278        let len = read_vint(reader)? as usize;
279        let mut data = vec![0u8; len];
280        reader.read_exact(&mut data)?;
281        Ok(data)
282    }
283}
284
285/// Sparse dimension info for SSTable-based sparse index
286/// Stores offset and length for posting list lookup
287#[derive(Debug, Clone, Copy, PartialEq, Eq)]
288pub struct SparseDimInfo {
289    /// Offset in sparse file where posting list starts
290    pub offset: u64,
291    /// Length of serialized posting list
292    pub length: u32,
293}
294
295impl SparseDimInfo {
296    pub fn new(offset: u64, length: u32) -> Self {
297        Self { offset, length }
298    }
299}
300
301impl SSTableValue for SparseDimInfo {
302    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
303        write_vint(writer, self.offset)?;
304        write_vint(writer, self.length as u64)
305    }
306
307    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
308        let offset = read_vint(reader)?;
309        let length = read_vint(reader)? as u32;
310        Ok(Self { offset, length })
311    }
312}
313
314/// Maximum number of postings that can be inlined in TermInfo
315pub const MAX_INLINE_POSTINGS: usize = 3;
316
317/// Term info for posting list references
318///
319/// Supports two modes:
320/// - **Inline**: Small posting lists (1-3 docs) stored directly in TermInfo
321/// - **External**: Larger posting lists stored in separate .post file
322///
323/// This eliminates a separate I/O read for rare/unique terms.
324#[derive(Debug, Clone, PartialEq, Eq)]
325pub enum TermInfo {
326    /// Small posting list inlined directly (up to MAX_INLINE_POSTINGS entries)
327    /// Each entry is (doc_id, term_freq) delta-encoded
328    Inline {
329        /// Number of postings (1-3)
330        doc_freq: u8,
331        /// Inline data: delta-encoded (doc_id, term_freq) pairs
332        /// Format: [delta_doc_id, term_freq, delta_doc_id, term_freq, ...]
333        data: [u8; 16],
334        /// Actual length of data used
335        data_len: u8,
336    },
337    /// Reference to external posting list in .post file
338    External {
339        posting_offset: u64,
340        posting_len: u32,
341        doc_freq: u32,
342        /// Position data offset (0 if no positions)
343        position_offset: u64,
344        /// Position data length (0 if no positions)
345        position_len: u32,
346    },
347}
348
349impl TermInfo {
350    /// Create an external reference
351    pub fn external(posting_offset: u64, posting_len: u32, doc_freq: u32) -> Self {
352        TermInfo::External {
353            posting_offset,
354            posting_len,
355            doc_freq,
356            position_offset: 0,
357            position_len: 0,
358        }
359    }
360
361    /// Create an external reference with position info
362    pub fn external_with_positions(
363        posting_offset: u64,
364        posting_len: u32,
365        doc_freq: u32,
366        position_offset: u64,
367        position_len: u32,
368    ) -> Self {
369        TermInfo::External {
370            posting_offset,
371            posting_len,
372            doc_freq,
373            position_offset,
374            position_len,
375        }
376    }
377
378    /// Try to create an inline TermInfo from posting data
379    /// Returns None if posting list is too large to inline
380    pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
381        if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
382            return None;
383        }
384
385        let mut data = [0u8; 16];
386        let mut cursor = std::io::Cursor::new(&mut data[..]);
387        let mut prev_doc_id = 0u32;
388
389        for (i, &doc_id) in doc_ids.iter().enumerate() {
390            let delta = doc_id - prev_doc_id;
391            if write_vint(&mut cursor, delta as u64).is_err() {
392                return None;
393            }
394            if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
395                return None;
396            }
397            prev_doc_id = doc_id;
398        }
399
400        let data_len = cursor.position() as u8;
401        if data_len > 16 {
402            return None;
403        }
404
405        Some(TermInfo::Inline {
406            doc_freq: doc_ids.len() as u8,
407            data,
408            data_len,
409        })
410    }
411
412    /// Try to create an inline TermInfo from an iterator of (doc_id, term_freq) pairs.
413    /// Zero-allocation alternative to `try_inline` — avoids collecting into Vec<u32>.
414    /// `count` is the number of postings (must match iterator length).
415    pub fn try_inline_iter(count: usize, iter: impl Iterator<Item = (u32, u32)>) -> Option<Self> {
416        if count > MAX_INLINE_POSTINGS || count == 0 {
417            return None;
418        }
419
420        let mut data = [0u8; 16];
421        let mut cursor = std::io::Cursor::new(&mut data[..]);
422        let mut prev_doc_id = 0u32;
423
424        for (doc_id, tf) in iter {
425            let delta = doc_id - prev_doc_id;
426            if write_vint(&mut cursor, delta as u64).is_err() {
427                return None;
428            }
429            if write_vint(&mut cursor, tf as u64).is_err() {
430                return None;
431            }
432            prev_doc_id = doc_id;
433        }
434
435        let data_len = cursor.position() as u8;
436
437        Some(TermInfo::Inline {
438            doc_freq: count as u8,
439            data,
440            data_len,
441        })
442    }
443
444    /// Get document frequency
445    pub fn doc_freq(&self) -> u32 {
446        match self {
447            TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
448            TermInfo::External { doc_freq, .. } => *doc_freq,
449        }
450    }
451
452    /// Check if this is an inline posting list
453    pub fn is_inline(&self) -> bool {
454        matches!(self, TermInfo::Inline { .. })
455    }
456
457    /// Get external posting info (offset, len) - returns None for inline
458    pub fn external_info(&self) -> Option<(u64, u32)> {
459        match self {
460            TermInfo::External {
461                posting_offset,
462                posting_len,
463                ..
464            } => Some((*posting_offset, *posting_len)),
465            TermInfo::Inline { .. } => None,
466        }
467    }
468
469    /// Get position info (offset, len) - returns None for inline or if no positions
470    pub fn position_info(&self) -> Option<(u64, u32)> {
471        match self {
472            TermInfo::External {
473                position_offset,
474                position_len,
475                ..
476            } if *position_len > 0 => Some((*position_offset, *position_len)),
477            _ => None,
478        }
479    }
480
481    /// Decode inline postings into (doc_ids, term_freqs)
482    /// Returns None if this is an external reference
483    pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
484        match self {
485            TermInfo::Inline {
486                doc_freq,
487                data,
488                data_len,
489            } => {
490                let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
491                let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
492                let mut reader = &data[..*data_len as usize];
493                let mut prev_doc_id = 0u32;
494
495                for _ in 0..*doc_freq {
496                    let delta = read_vint(&mut reader).ok()? as u32;
497                    let tf = read_vint(&mut reader).ok()? as u32;
498                    let doc_id = prev_doc_id + delta;
499                    doc_ids.push(doc_id);
500                    term_freqs.push(tf);
501                    prev_doc_id = doc_id;
502                }
503
504                Some((doc_ids, term_freqs))
505            }
506            TermInfo::External { .. } => None,
507        }
508    }
509}
510
511impl SSTableValue for TermInfo {
512    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
513        match self {
514            TermInfo::Inline {
515                doc_freq,
516                data,
517                data_len,
518            } => {
519                // Tag byte 0xFF = inline marker
520                writer.write_u8(0xFF)?;
521                writer.write_u8(*doc_freq)?;
522                writer.write_u8(*data_len)?;
523                writer.write_all(&data[..*data_len as usize])?;
524            }
525            TermInfo::External {
526                posting_offset,
527                posting_len,
528                doc_freq,
529                position_offset,
530                position_len,
531            } => {
532                // Tag byte 0x00 = external marker (no positions)
533                // Tag byte 0x01 = external with positions
534                if *position_len > 0 {
535                    writer.write_u8(0x01)?;
536                    write_vint(writer, *doc_freq as u64)?;
537                    write_vint(writer, *posting_offset)?;
538                    write_vint(writer, *posting_len as u64)?;
539                    write_vint(writer, *position_offset)?;
540                    write_vint(writer, *position_len as u64)?;
541                } else {
542                    writer.write_u8(0x00)?;
543                    write_vint(writer, *doc_freq as u64)?;
544                    write_vint(writer, *posting_offset)?;
545                    write_vint(writer, *posting_len as u64)?;
546                }
547            }
548        }
549        Ok(())
550    }
551
552    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
553        let tag = reader.read_u8()?;
554
555        if tag == 0xFF {
556            // Inline
557            let doc_freq = reader.read_u8()?;
558            let data_len = reader.read_u8()?;
559            let mut data = [0u8; 16];
560            reader.read_exact(&mut data[..data_len as usize])?;
561            Ok(TermInfo::Inline {
562                doc_freq,
563                data,
564                data_len,
565            })
566        } else if tag == 0x00 {
567            // External (no positions)
568            let doc_freq = read_vint(reader)? as u32;
569            let posting_offset = read_vint(reader)?;
570            let posting_len = read_vint(reader)? as u32;
571            Ok(TermInfo::External {
572                posting_offset,
573                posting_len,
574                doc_freq,
575                position_offset: 0,
576                position_len: 0,
577            })
578        } else if tag == 0x01 {
579            // External with positions
580            let doc_freq = read_vint(reader)? as u32;
581            let posting_offset = read_vint(reader)?;
582            let posting_len = read_vint(reader)? as u32;
583            let position_offset = read_vint(reader)?;
584            let position_len = read_vint(reader)? as u32;
585            Ok(TermInfo::External {
586                posting_offset,
587                posting_len,
588                doc_freq,
589                position_offset,
590                position_len,
591            })
592        } else {
593            Err(io::Error::new(
594                io::ErrorKind::InvalidData,
595                format!("Invalid TermInfo tag: {}", tag),
596            ))
597        }
598    }
599}
600
601/// Write variable-length integer
602pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
603    loop {
604        let byte = (value & 0x7F) as u8;
605        value >>= 7;
606        if value == 0 {
607            writer.write_u8(byte)?;
608            return Ok(());
609        } else {
610            writer.write_u8(byte | 0x80)?;
611        }
612    }
613}
614
615/// Read variable-length integer
616pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
617    let mut result = 0u64;
618    let mut shift = 0;
619
620    loop {
621        let byte = reader.read_u8()?;
622        result |= ((byte & 0x7F) as u64) << shift;
623        if byte & 0x80 == 0 {
624            return Ok(result);
625        }
626        shift += 7;
627        if shift >= 64 {
628            return Err(io::Error::new(
629                io::ErrorKind::InvalidData,
630                "varint too long",
631            ));
632        }
633    }
634}
635
636/// Compute common prefix length
637pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
638    a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
639}
640
641/// SSTable statistics for debugging
642#[derive(Debug, Clone)]
643pub struct SSTableStats {
644    pub num_blocks: usize,
645    pub num_sparse_entries: usize,
646    pub num_entries: u64,
647    pub has_bloom_filter: bool,
648    pub has_dictionary: bool,
649    pub bloom_filter_size: usize,
650    pub dictionary_size: usize,
651}
652
653/// SSTable writer configuration
654#[derive(Debug, Clone)]
655pub struct SSTableWriterConfig {
656    /// Compression level (1-22, higher = better compression but slower)
657    pub compression_level: CompressionLevel,
658    /// Whether to train and use a dictionary for compression
659    pub use_dictionary: bool,
660    /// Dictionary size in bytes (default 64KB)
661    pub dict_size: usize,
662    /// Whether to build a bloom filter
663    pub use_bloom_filter: bool,
664    /// Bloom filter bits per key (default 10 = ~1% false positive rate)
665    pub bloom_bits_per_key: usize,
666}
667
668impl Default for SSTableWriterConfig {
669    fn default() -> Self {
670        Self::from_optimization(crate::structures::IndexOptimization::default())
671    }
672}
673
674impl SSTableWriterConfig {
675    /// Create config from IndexOptimization mode
676    pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
677        use crate::structures::IndexOptimization;
678        match optimization {
679            IndexOptimization::Adaptive => Self {
680                compression_level: CompressionLevel::BETTER, // Level 9
681                use_dictionary: false,
682                dict_size: DEFAULT_DICT_SIZE,
683                use_bloom_filter: false,
684                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
685            },
686            IndexOptimization::SizeOptimized => Self {
687                compression_level: CompressionLevel::MAX, // Level 22
688                use_dictionary: true,
689                dict_size: DEFAULT_DICT_SIZE,
690                use_bloom_filter: true,
691                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
692            },
693            IndexOptimization::PerformanceOptimized => Self {
694                compression_level: CompressionLevel::FAST, // Level 1
695                use_dictionary: false,
696                dict_size: DEFAULT_DICT_SIZE,
697                use_bloom_filter: true, // Bloom helps skip blocks fast
698                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
699            },
700        }
701    }
702
703    /// Fast configuration - prioritize write speed over compression
704    pub fn fast() -> Self {
705        Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
706    }
707
708    /// Maximum compression configuration - prioritize size over speed
709    pub fn max_compression() -> Self {
710        Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
711    }
712}
713
714/// SSTable writer with optimizations:
715/// - Dictionary compression for blocks (if dictionary provided)
716/// - Configurable compression level
717/// - Block index prefix compression
718/// - Bloom filter for fast negative lookups
719pub struct SSTableWriter<W: Write, V: SSTableValue> {
720    writer: W,
721    block_buffer: Vec<u8>,
722    prev_key: Vec<u8>,
723    index: Vec<BlockIndexEntry>,
724    current_offset: u64,
725    num_entries: u64,
726    block_first_key: Option<Vec<u8>>,
727    config: SSTableWriterConfig,
728    /// Pre-trained dictionary for compression (optional)
729    dictionary: Option<CompressionDict>,
730    /// Bloom filter key hashes — compact (u64, u64) pairs instead of full keys.
731    /// Filter is built at finish() time with correct sizing.
732    bloom_hashes: Vec<(u64, u64)>,
733    _phantom: std::marker::PhantomData<V>,
734}
735
736impl<W: Write, V: SSTableValue> SSTableWriter<W, V> {
737    /// Create a new SSTable writer with default configuration
738    pub fn new(writer: W) -> Self {
739        Self::with_config(writer, SSTableWriterConfig::default())
740    }
741
742    /// Create a new SSTable writer with custom configuration
743    pub fn with_config(writer: W, config: SSTableWriterConfig) -> Self {
744        Self {
745            writer,
746            block_buffer: Vec::with_capacity(BLOCK_SIZE),
747            prev_key: Vec::new(),
748            index: Vec::new(),
749            current_offset: 0,
750            num_entries: 0,
751            block_first_key: None,
752            config,
753            dictionary: None,
754            bloom_hashes: Vec::new(),
755            _phantom: std::marker::PhantomData,
756        }
757    }
758
759    /// Create a new SSTable writer with a pre-trained dictionary
760    pub fn with_dictionary(
761        writer: W,
762        config: SSTableWriterConfig,
763        dictionary: CompressionDict,
764    ) -> Self {
765        Self {
766            writer,
767            block_buffer: Vec::with_capacity(BLOCK_SIZE),
768            prev_key: Vec::new(),
769            index: Vec::new(),
770            current_offset: 0,
771            num_entries: 0,
772            block_first_key: None,
773            config,
774            dictionary: Some(dictionary),
775            bloom_hashes: Vec::new(),
776            _phantom: std::marker::PhantomData,
777        }
778    }
779
780    pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
781        if self.block_first_key.is_none() {
782            self.block_first_key = Some(key.to_vec());
783        }
784
785        // Store compact hash pair for bloom filter (16 bytes vs ~48+ per key)
786        if self.config.use_bloom_filter {
787            self.bloom_hashes.push(bloom_hash_pair(key));
788        }
789
790        let prefix_len = common_prefix_len(&self.prev_key, key);
791        let suffix = &key[prefix_len..];
792
793        write_vint(&mut self.block_buffer, prefix_len as u64)?;
794        write_vint(&mut self.block_buffer, suffix.len() as u64)?;
795        self.block_buffer.extend_from_slice(suffix);
796        value.serialize(&mut self.block_buffer)?;
797
798        self.prev_key.clear();
799        self.prev_key.extend_from_slice(key);
800        self.num_entries += 1;
801
802        if self.block_buffer.len() >= BLOCK_SIZE {
803            self.flush_block()?;
804        }
805
806        Ok(())
807    }
808
809    /// Flush and compress the current block
810    fn flush_block(&mut self) -> io::Result<()> {
811        if self.block_buffer.is_empty() {
812            return Ok(());
813        }
814
815        // Compress block with dictionary if available
816        let compressed = if let Some(ref dict) = self.dictionary {
817            crate::compression::compress_with_dict(
818                &self.block_buffer,
819                self.config.compression_level,
820                dict,
821            )?
822        } else {
823            crate::compression::compress(&self.block_buffer, self.config.compression_level)?
824        };
825
826        if let Some(first_key) = self.block_first_key.take() {
827            self.index.push(BlockIndexEntry {
828                first_key,
829                offset: self.current_offset,
830                length: compressed.len() as u32,
831            });
832        }
833
834        self.writer.write_all(&compressed)?;
835        self.current_offset += compressed.len() as u64;
836        self.block_buffer.clear();
837        self.prev_key.clear();
838
839        Ok(())
840    }
841
842    pub fn finish(mut self) -> io::Result<W> {
843        // Flush any remaining data
844        self.flush_block()?;
845
846        // Build bloom filter from collected hashes (properly sized)
847        let bloom_filter = if self.config.use_bloom_filter && !self.bloom_hashes.is_empty() {
848            let mut bloom =
849                BloomFilter::new(self.bloom_hashes.len(), self.config.bloom_bits_per_key);
850            for (h1, h2) in &self.bloom_hashes {
851                bloom.insert_hashed(*h1, *h2);
852            }
853            Some(bloom)
854        } else {
855            None
856        };
857
858        let data_end_offset = self.current_offset;
859
860        // Build memory-efficient block index (v4 format)
861        // Convert to (key, BlockAddr) pairs for the new index format
862        let entries: Vec<(Vec<u8>, BlockAddr)> = self
863            .index
864            .iter()
865            .map(|e| {
866                (
867                    e.first_key.clone(),
868                    BlockAddr {
869                        offset: e.offset,
870                        length: e.length,
871                    },
872                )
873            })
874            .collect();
875
876        // Build FST-based index if native feature is enabled, otherwise use mmap index
877        #[cfg(feature = "native")]
878        let index_bytes = FstBlockIndex::build(&entries)?;
879        #[cfg(not(feature = "native"))]
880        let index_bytes = MmapBlockIndex::build(&entries)?;
881
882        // Write index bytes with length prefix
883        self.writer
884            .write_u32::<LittleEndian>(index_bytes.len() as u32)?;
885        self.writer.write_all(&index_bytes)?;
886        self.current_offset += 4 + index_bytes.len() as u64;
887
888        // Write bloom filter if present
889        let bloom_offset = if let Some(ref bloom) = bloom_filter {
890            let bloom_data = bloom.to_bytes();
891            let offset = self.current_offset;
892            self.writer.write_all(&bloom_data)?;
893            self.current_offset += bloom_data.len() as u64;
894            offset
895        } else {
896            0
897        };
898
899        // Write dictionary if present
900        let dict_offset = if let Some(ref dict) = self.dictionary {
901            let dict_bytes = dict.as_bytes();
902            let offset = self.current_offset;
903            self.writer
904                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
905            self.writer.write_all(dict_bytes)?;
906            self.current_offset += 4 + dict_bytes.len() as u64;
907            offset
908        } else {
909            0
910        };
911
912        // Write extended footer (v4 format)
913        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
914        self.writer.write_u64::<LittleEndian>(self.num_entries)?;
915        self.writer.write_u64::<LittleEndian>(bloom_offset)?; // 0 if no bloom
916        self.writer.write_u64::<LittleEndian>(dict_offset)?; // 0 if no dict
917        self.writer
918            .write_u8(self.config.compression_level.0 as u8)?;
919        self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
920
921        Ok(self.writer)
922    }
923}
924
925/// Block index entry
926#[derive(Debug, Clone)]
927struct BlockIndexEntry {
928    first_key: Vec<u8>,
929    offset: u64,
930    length: u32,
931}
932
933/// Async SSTable reader - loads blocks on demand via LazyFileSlice
934///
935/// Memory-efficient design (v4 format):
936/// - Block index uses FST (native) or mmap'd raw bytes - no heap allocation for keys
937/// - Block addresses stored in bitpacked format
938/// - Bloom filter and dictionary optional
939pub struct AsyncSSTableReader<V: SSTableValue> {
940    /// LazyFileSlice for the data portion (blocks only) - fetches ranges on demand
941    data_slice: LazyFileSlice,
942    /// Memory-efficient block index (v4: FST or mmap, v3: legacy Vec)
943    block_index: BlockIndex,
944    num_entries: u64,
945    /// Hot cache for decompressed blocks
946    cache: RwLock<BlockCache>,
947    /// Bloom filter for fast negative lookups (optional)
948    bloom_filter: Option<BloomFilter>,
949    /// Compression dictionary (optional)
950    dictionary: Option<CompressionDict>,
951    /// Compression level used
952    #[allow(dead_code)]
953    compression_level: CompressionLevel,
954    _phantom: std::marker::PhantomData<V>,
955}
956
957/// LRU block cache — O(1) lookup/insert, amortized O(n) promotion.
958///
959/// On `get()`, promotes the accessed entry to MRU position.
960/// On eviction, removes the LRU entry (front of VecDeque).
961/// For typical cache sizes (16-64 blocks), the linear scan in
962/// `promote()` is negligible compared to I/O savings.
963struct BlockCache {
964    blocks: FxHashMap<u64, Arc<[u8]>>,
965    lru_order: std::collections::VecDeque<u64>,
966    max_blocks: usize,
967}
968
969impl BlockCache {
970    fn new(max_blocks: usize) -> Self {
971        Self {
972            blocks: FxHashMap::default(),
973            lru_order: std::collections::VecDeque::with_capacity(max_blocks),
974            max_blocks,
975        }
976    }
977
978    fn get(&mut self, offset: u64) -> Option<Arc<[u8]>> {
979        if self.blocks.contains_key(&offset) {
980            self.promote(offset);
981            self.blocks.get(&offset).map(Arc::clone)
982        } else {
983            None
984        }
985    }
986
987    fn insert(&mut self, offset: u64, block: Arc<[u8]>) {
988        if self.blocks.contains_key(&offset) {
989            self.promote(offset);
990            return;
991        }
992        while self.blocks.len() >= self.max_blocks {
993            if let Some(evict_offset) = self.lru_order.pop_front() {
994                self.blocks.remove(&evict_offset);
995            } else {
996                break;
997            }
998        }
999        self.blocks.insert(offset, block);
1000        self.lru_order.push_back(offset);
1001    }
1002
1003    /// Move entry to MRU position (back of deque)
1004    fn promote(&mut self, offset: u64) {
1005        if let Some(pos) = self.lru_order.iter().position(|&k| k == offset) {
1006            self.lru_order.remove(pos);
1007            self.lru_order.push_back(offset);
1008        }
1009    }
1010}
1011
1012impl<V: SSTableValue> AsyncSSTableReader<V> {
1013    /// Open an SSTable from a LazyFileHandle
1014    /// Only loads the footer and index into memory, data blocks fetched on-demand
1015    ///
1016    /// Supports both v3 (legacy) and v4 (memory-efficient) formats:
1017    /// - v4: FST-based or mmap'd block index (no heap allocation for keys)
1018    /// - v3: Prefix-compressed block index loaded into Vec (legacy fallback)
1019    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
1020        let file_len = file_handle.len();
1021        if file_len < 37 {
1022            return Err(io::Error::new(
1023                io::ErrorKind::InvalidData,
1024                "SSTable too small",
1025            ));
1026        }
1027
1028        // Read footer (37 bytes)
1029        // Format: data_end(8) + num_entries(8) + bloom_offset(8) + dict_offset(8) + compression_level(1) + magic(4)
1030        let footer_bytes = file_handle
1031            .read_bytes_range(file_len - 37..file_len)
1032            .await?;
1033
1034        let mut reader = footer_bytes.as_slice();
1035        let data_end_offset = reader.read_u64::<LittleEndian>()?;
1036        let num_entries = reader.read_u64::<LittleEndian>()?;
1037        let bloom_offset = reader.read_u64::<LittleEndian>()?;
1038        let dict_offset = reader.read_u64::<LittleEndian>()?;
1039        let compression_level = CompressionLevel(reader.read_u8()? as i32);
1040        let magic = reader.read_u32::<LittleEndian>()?;
1041
1042        if magic != SSTABLE_MAGIC {
1043            return Err(io::Error::new(
1044                io::ErrorKind::InvalidData,
1045                format!("Invalid SSTable magic: 0x{:08X}", magic),
1046            ));
1047        }
1048
1049        // Read index section
1050        let index_start = data_end_offset;
1051        let index_end = file_len - 37;
1052        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
1053
1054        // Parse block index (length-prefixed FST or mmap index)
1055        let mut idx_reader = index_bytes.as_slice();
1056        let index_len = idx_reader.read_u32::<LittleEndian>()? as usize;
1057
1058        if index_len > idx_reader.len() {
1059            return Err(io::Error::new(
1060                io::ErrorKind::InvalidData,
1061                "Index data truncated",
1062            ));
1063        }
1064
1065        let index_data = index_bytes.slice(4..4 + index_len);
1066
1067        // Try FST first (native), fall back to mmap
1068        #[cfg(feature = "native")]
1069        let block_index = match FstBlockIndex::load(index_data.clone()) {
1070            Ok(fst_idx) => BlockIndex::Fst(fst_idx),
1071            Err(_) => BlockIndex::Mmap(MmapBlockIndex::load(index_data)?),
1072        };
1073        #[cfg(not(feature = "native"))]
1074        let block_index = BlockIndex::Mmap(MmapBlockIndex::load(index_data)?);
1075
1076        // Load bloom filter if present
1077        let bloom_filter = if bloom_offset > 0 {
1078            let bloom_start = bloom_offset;
1079            // Read bloom filter size first (12 bytes header)
1080            let bloom_header = file_handle
1081                .read_bytes_range(bloom_start..bloom_start + 12)
1082                .await?;
1083            let num_words = u32::from_le_bytes([
1084                bloom_header[8],
1085                bloom_header[9],
1086                bloom_header[10],
1087                bloom_header[11],
1088            ]) as u64;
1089            let bloom_size = 12 + num_words * 8;
1090            let bloom_data = file_handle
1091                .read_bytes_range(bloom_start..bloom_start + bloom_size)
1092                .await?;
1093            Some(BloomFilter::from_owned_bytes(bloom_data)?)
1094        } else {
1095            None
1096        };
1097
1098        // Load dictionary if present
1099        let dictionary = if dict_offset > 0 {
1100            let dict_start = dict_offset;
1101            // Read dictionary size first
1102            let dict_len_bytes = file_handle
1103                .read_bytes_range(dict_start..dict_start + 4)
1104                .await?;
1105            let dict_len = u32::from_le_bytes([
1106                dict_len_bytes[0],
1107                dict_len_bytes[1],
1108                dict_len_bytes[2],
1109                dict_len_bytes[3],
1110            ]) as u64;
1111            let dict_data = file_handle
1112                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
1113                .await?;
1114            Some(CompressionDict::from_owned_bytes(dict_data))
1115        } else {
1116            None
1117        };
1118
1119        // Create a lazy slice for just the data portion
1120        let data_slice = file_handle.slice(0..data_end_offset);
1121
1122        Ok(Self {
1123            data_slice,
1124            block_index,
1125            num_entries,
1126            cache: RwLock::new(BlockCache::new(cache_blocks)),
1127            bloom_filter,
1128            dictionary,
1129            compression_level,
1130            _phantom: std::marker::PhantomData,
1131        })
1132    }
1133
1134    /// Number of entries
1135    pub fn num_entries(&self) -> u64 {
1136        self.num_entries
1137    }
1138
1139    /// Get stats about this SSTable for debugging
1140    pub fn stats(&self) -> SSTableStats {
1141        SSTableStats {
1142            num_blocks: self.block_index.len(),
1143            num_sparse_entries: 0, // No longer using sparse index separately
1144            num_entries: self.num_entries,
1145            has_bloom_filter: self.bloom_filter.is_some(),
1146            has_dictionary: self.dictionary.is_some(),
1147            bloom_filter_size: self
1148                .bloom_filter
1149                .as_ref()
1150                .map(|b| b.size_bytes())
1151                .unwrap_or(0),
1152            dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
1153        }
1154    }
1155
1156    /// Number of blocks currently in the cache
1157    pub fn cached_blocks(&self) -> usize {
1158        self.cache.read().blocks.len()
1159    }
1160
1161    /// Look up a key (async - may need to load block)
1162    ///
1163    /// Uses bloom filter for fast negative lookups, then memory-efficient
1164    /// block index to locate the block, reducing I/O to typically 1 block read.
1165    pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
1166        log::debug!(
1167            "SSTable::get called, key_len={}, total_blocks={}",
1168            key.len(),
1169            self.block_index.len()
1170        );
1171
1172        // Check bloom filter first - fast negative lookup
1173        if let Some(ref bloom) = self.bloom_filter
1174            && !bloom.may_contain(key)
1175        {
1176            log::debug!("SSTable::get bloom filter negative");
1177            return Ok(None);
1178        }
1179
1180        // Use block index to find the block that could contain the key
1181        let block_idx = match self.block_index.locate(key) {
1182            Some(idx) => idx,
1183            None => {
1184                log::debug!("SSTable::get key not found (before first block)");
1185                return Ok(None);
1186            }
1187        };
1188
1189        log::debug!("SSTable::get loading block_idx={}", block_idx);
1190
1191        // Now we know exactly which block to load - single I/O
1192        let block_data = self.load_block(block_idx).await?;
1193        self.search_block(&block_data, key)
1194    }
1195
1196    /// Batch lookup multiple keys with optimized I/O
1197    ///
1198    /// Groups keys by block and loads each block only once, reducing
1199    /// I/O from N reads to at most N reads (often fewer if keys share blocks).
1200    /// Uses bloom filter to skip keys that definitely don't exist.
1201    pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
1202        if keys.is_empty() {
1203            return Ok(Vec::new());
1204        }
1205
1206        // Map each key to its block index
1207        let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
1208        for (key_idx, key) in keys.iter().enumerate() {
1209            // Check bloom filter first
1210            if let Some(ref bloom) = self.bloom_filter
1211                && !bloom.may_contain(key)
1212            {
1213                key_to_block.push((key_idx, usize::MAX)); // Definitely not present
1214                continue;
1215            }
1216
1217            match self.block_index.locate(key) {
1218                Some(block_idx) => key_to_block.push((key_idx, block_idx)),
1219                None => key_to_block.push((key_idx, usize::MAX)), // Mark as not found
1220            }
1221        }
1222
1223        // Group keys by block
1224        let mut blocks_to_load: Vec<usize> = key_to_block
1225            .iter()
1226            .filter(|(_, b)| *b != usize::MAX)
1227            .map(|(_, b)| *b)
1228            .collect();
1229        blocks_to_load.sort_unstable();
1230        blocks_to_load.dedup();
1231
1232        // Load all needed blocks (this is where I/O happens)
1233        for &block_idx in &blocks_to_load {
1234            let _ = self.load_block(block_idx).await?;
1235        }
1236
1237        // Now search each key in its block (all blocks are cached)
1238        let mut results = vec![None; keys.len()];
1239        for (key_idx, block_idx) in key_to_block {
1240            if block_idx == usize::MAX {
1241                continue;
1242            }
1243            let block_data = self.load_block(block_idx).await?; // Will hit cache
1244            results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
1245        }
1246
1247        Ok(results)
1248    }
1249
1250    /// Preload all data blocks into memory
1251    ///
1252    /// Call this after open() to eliminate all I/O during subsequent lookups.
1253    /// Useful when the SSTable is small enough to fit in memory.
1254    pub async fn preload_all_blocks(&self) -> io::Result<()> {
1255        for block_idx in 0..self.block_index.len() {
1256            self.load_block(block_idx).await?;
1257        }
1258        Ok(())
1259    }
1260
1261    /// Prefetch all data blocks via a single bulk I/O operation.
1262    ///
1263    /// Reads the entire compressed data section in one call, then decompresses
1264    /// each block and populates the cache. This turns N individual reads into 1.
1265    /// Cache capacity is expanded to hold all blocks.
1266    pub async fn prefetch_all_data_bulk(&self) -> io::Result<()> {
1267        let num_blocks = self.block_index.len();
1268        if num_blocks == 0 {
1269            return Ok(());
1270        }
1271
1272        // Find total data extent
1273        let mut max_end: u64 = 0;
1274        for i in 0..num_blocks {
1275            if let Some(addr) = self.block_index.get_addr(i) {
1276                max_end = max_end.max(addr.offset + addr.length as u64);
1277            }
1278        }
1279
1280        // Single bulk read of entire data section
1281        let all_data = self.data_slice.read_bytes_range(0..max_end).await?;
1282        let buf = all_data.as_slice();
1283
1284        // Expand cache and decompress all blocks
1285        let mut cache = self.cache.write();
1286        cache.max_blocks = cache.max_blocks.max(num_blocks);
1287        for i in 0..num_blocks {
1288            let addr = self.block_index.get_addr(i).unwrap();
1289            if cache.get(addr.offset).is_some() {
1290                continue;
1291            }
1292            let compressed =
1293                &buf[addr.offset as usize..(addr.offset + addr.length as u64) as usize];
1294            let decompressed = if let Some(ref dict) = self.dictionary {
1295                crate::compression::decompress_with_dict(compressed, dict)?
1296            } else {
1297                crate::compression::decompress(compressed)?
1298            };
1299            cache.insert(addr.offset, Arc::from(decompressed));
1300        }
1301
1302        Ok(())
1303    }
1304
1305    /// Load a block (checks cache first, then loads from FileSlice)
1306    /// Uses dictionary decompression if dictionary is present
1307    async fn load_block(&self, block_idx: usize) -> io::Result<Arc<[u8]>> {
1308        let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
1309            io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
1310        })?;
1311
1312        // Check cache (write lock for LRU promotion on hit)
1313        {
1314            if let Some(block) = self.cache.write().get(addr.offset) {
1315                return Ok(block);
1316            }
1317        }
1318
1319        log::debug!(
1320            "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
1321            block_idx,
1322            addr.offset,
1323            addr.offset + addr.length as u64
1324        );
1325
1326        // Load from FileSlice
1327        let range = addr.byte_range();
1328        let compressed = self.data_slice.read_bytes_range(range).await?;
1329
1330        // Decompress with dictionary if available
1331        let decompressed = if let Some(ref dict) = self.dictionary {
1332            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1333        } else {
1334            crate::compression::decompress(compressed.as_slice())?
1335        };
1336
1337        let block: Arc<[u8]> = Arc::from(decompressed);
1338
1339        // Insert into cache
1340        {
1341            let mut cache = self.cache.write();
1342            cache.insert(addr.offset, Arc::clone(&block));
1343        }
1344
1345        Ok(block)
1346    }
1347
1348    fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
1349        let mut reader = block_data;
1350        let mut current_key = Vec::new();
1351
1352        while !reader.is_empty() {
1353            let common_prefix_len = read_vint(&mut reader)? as usize;
1354            let suffix_len = read_vint(&mut reader)? as usize;
1355
1356            current_key.truncate(common_prefix_len);
1357            let mut suffix = vec![0u8; suffix_len];
1358            reader.read_exact(&mut suffix)?;
1359            current_key.extend_from_slice(&suffix);
1360
1361            let value = V::deserialize(&mut reader)?;
1362
1363            match current_key.as_slice().cmp(target_key) {
1364                std::cmp::Ordering::Equal => return Ok(Some(value)),
1365                std::cmp::Ordering::Greater => return Ok(None),
1366                std::cmp::Ordering::Less => continue,
1367            }
1368        }
1369
1370        Ok(None)
1371    }
1372
1373    /// Prefetch blocks for a key range
1374    pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
1375        let start_block = self.block_index.locate(start_key).unwrap_or(0);
1376        let end_block = self
1377            .block_index
1378            .locate(end_key)
1379            .unwrap_or(self.block_index.len().saturating_sub(1));
1380
1381        for block_idx in start_block..=end_block.min(self.block_index.len().saturating_sub(1)) {
1382            let _ = self.load_block(block_idx).await?;
1383        }
1384
1385        Ok(())
1386    }
1387
1388    /// Iterate over all entries (loads blocks as needed)
1389    pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
1390        AsyncSSTableIterator::new(self)
1391    }
1392
1393    /// Get all entries as a vector (for merging)
1394    pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
1395        let mut results = Vec::new();
1396
1397        for block_idx in 0..self.block_index.len() {
1398            let block_data = self.load_block(block_idx).await?;
1399            let mut reader = &block_data[..];
1400            let mut current_key = Vec::new();
1401
1402            while !reader.is_empty() {
1403                let common_prefix_len = read_vint(&mut reader)? as usize;
1404                let suffix_len = read_vint(&mut reader)? as usize;
1405
1406                current_key.truncate(common_prefix_len);
1407                let mut suffix = vec![0u8; suffix_len];
1408                reader.read_exact(&mut suffix)?;
1409                current_key.extend_from_slice(&suffix);
1410
1411                let value = V::deserialize(&mut reader)?;
1412                results.push((current_key.clone(), value));
1413            }
1414        }
1415
1416        Ok(results)
1417    }
1418}
1419
1420/// Async iterator over SSTable entries
1421pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
1422    reader: &'a AsyncSSTableReader<V>,
1423    current_block: usize,
1424    block_data: Option<Arc<[u8]>>,
1425    block_offset: usize,
1426    current_key: Vec<u8>,
1427    finished: bool,
1428}
1429
1430impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
1431    fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
1432        Self {
1433            reader,
1434            current_block: 0,
1435            block_data: None,
1436            block_offset: 0,
1437            current_key: Vec::new(),
1438            finished: reader.block_index.is_empty(),
1439        }
1440    }
1441
1442    async fn load_next_block(&mut self) -> io::Result<bool> {
1443        if self.current_block >= self.reader.block_index.len() {
1444            self.finished = true;
1445            return Ok(false);
1446        }
1447
1448        self.block_data = Some(self.reader.load_block(self.current_block).await?);
1449        self.block_offset = 0;
1450        self.current_key.clear();
1451        self.current_block += 1;
1452        Ok(true)
1453    }
1454
1455    /// Advance to next entry (async)
1456    pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
1457        if self.finished {
1458            return Ok(None);
1459        }
1460
1461        if self.block_data.is_none() && !self.load_next_block().await? {
1462            return Ok(None);
1463        }
1464
1465        loop {
1466            let block = self.block_data.as_ref().unwrap();
1467            if self.block_offset >= block.len() {
1468                if !self.load_next_block().await? {
1469                    return Ok(None);
1470                }
1471                continue;
1472            }
1473
1474            let mut reader = &block[self.block_offset..];
1475            let start_len = reader.len();
1476
1477            let common_prefix_len = read_vint(&mut reader)? as usize;
1478            let suffix_len = read_vint(&mut reader)? as usize;
1479
1480            self.current_key.truncate(common_prefix_len);
1481            let mut suffix = vec![0u8; suffix_len];
1482            reader.read_exact(&mut suffix)?;
1483            self.current_key.extend_from_slice(&suffix);
1484
1485            let value = V::deserialize(&mut reader)?;
1486
1487            self.block_offset += start_len - reader.len();
1488
1489            return Ok(Some((self.current_key.clone(), value)));
1490        }
1491    }
1492}
1493
1494#[cfg(test)]
1495mod tests {
1496    use super::*;
1497
1498    #[test]
1499    fn test_bloom_filter_basic() {
1500        let mut bloom = BloomFilter::new(100, 10);
1501
1502        bloom.insert(b"hello");
1503        bloom.insert(b"world");
1504        bloom.insert(b"test");
1505
1506        assert!(bloom.may_contain(b"hello"));
1507        assert!(bloom.may_contain(b"world"));
1508        assert!(bloom.may_contain(b"test"));
1509
1510        // These should likely return false (with ~1% false positive rate)
1511        assert!(!bloom.may_contain(b"notfound"));
1512        assert!(!bloom.may_contain(b"missing"));
1513    }
1514
1515    #[test]
1516    fn test_bloom_filter_serialization() {
1517        let mut bloom = BloomFilter::new(100, 10);
1518        bloom.insert(b"key1");
1519        bloom.insert(b"key2");
1520
1521        let bytes = bloom.to_bytes();
1522        let restored = BloomFilter::from_owned_bytes(OwnedBytes::new(bytes)).unwrap();
1523
1524        assert!(restored.may_contain(b"key1"));
1525        assert!(restored.may_contain(b"key2"));
1526        assert!(!restored.may_contain(b"key3"));
1527    }
1528
1529    #[test]
1530    fn test_bloom_filter_false_positive_rate() {
1531        let num_keys = 10000;
1532        let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
1533
1534        // Insert keys
1535        for i in 0..num_keys {
1536            let key = format!("key_{}", i);
1537            bloom.insert(key.as_bytes());
1538        }
1539
1540        // All inserted keys should be found
1541        for i in 0..num_keys {
1542            let key = format!("key_{}", i);
1543            assert!(bloom.may_contain(key.as_bytes()));
1544        }
1545
1546        // Check false positive rate on non-existent keys
1547        let mut false_positives = 0;
1548        let test_count = 10000;
1549        for i in 0..test_count {
1550            let key = format!("nonexistent_{}", i);
1551            if bloom.may_contain(key.as_bytes()) {
1552                false_positives += 1;
1553            }
1554        }
1555
1556        // With 10 bits per key, expect ~1% false positive rate
1557        // Allow up to 3% due to hash function variance
1558        let fp_rate = false_positives as f64 / test_count as f64;
1559        assert!(
1560            fp_rate < 0.03,
1561            "False positive rate {} is too high",
1562            fp_rate
1563        );
1564    }
1565
1566    #[test]
1567    fn test_sstable_writer_config() {
1568        use crate::structures::IndexOptimization;
1569
1570        // Default = Adaptive
1571        let config = SSTableWriterConfig::default();
1572        assert_eq!(config.compression_level.0, 9); // BETTER
1573        assert!(!config.use_bloom_filter);
1574        assert!(!config.use_dictionary);
1575
1576        // Adaptive
1577        let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
1578        assert_eq!(adaptive.compression_level.0, 9);
1579        assert!(!adaptive.use_bloom_filter);
1580        assert!(!adaptive.use_dictionary);
1581
1582        // SizeOptimized
1583        let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
1584        assert_eq!(size.compression_level.0, 22); // MAX
1585        assert!(size.use_bloom_filter);
1586        assert!(size.use_dictionary);
1587
1588        // PerformanceOptimized
1589        let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
1590        assert_eq!(perf.compression_level.0, 1); // FAST
1591        assert!(perf.use_bloom_filter); // Bloom helps skip blocks fast
1592        assert!(!perf.use_dictionary);
1593
1594        // Aliases
1595        let fast = SSTableWriterConfig::fast();
1596        assert_eq!(fast.compression_level.0, 1);
1597
1598        let max = SSTableWriterConfig::max_compression();
1599        assert_eq!(max.compression_level.0, 22);
1600    }
1601
1602    #[test]
1603    fn test_vint_roundtrip() {
1604        let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
1605
1606        for &val in &test_values {
1607            let mut buf = Vec::new();
1608            write_vint(&mut buf, val).unwrap();
1609            let mut reader = buf.as_slice();
1610            let decoded = read_vint(&mut reader).unwrap();
1611            assert_eq!(val, decoded, "Failed for value {}", val);
1612        }
1613    }
1614
1615    #[test]
1616    fn test_common_prefix_len() {
1617        assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
1618        assert_eq!(common_prefix_len(b"hello", b"help"), 3);
1619        assert_eq!(common_prefix_len(b"hello", b"world"), 0);
1620        assert_eq!(common_prefix_len(b"", b"hello"), 0);
1621        assert_eq!(common_prefix_len(b"hello", b""), 0);
1622    }
1623}