Skip to main content

hermes_core/structures/
sstable.rs

1//! Async SSTable with lazy loading via FileSlice
2//!
3//! Memory-efficient design - only loads minimal metadata into memory,
4//! blocks are loaded on-demand.
5//!
6//! ## Key Features
7//!
8//! 1. **FST-based Block Index**: Uses Finite State Transducer for key lookup
9//!    - Can be mmap'd directly without parsing into heap-allocated structures
10//!    - ~90% memory reduction compared to Vec<BlockIndexEntry>
11//!
12//! 2. **Bitpacked Block Addresses**: Offsets and lengths stored with delta encoding
13//!    - Minimal memory footprint for block metadata
14//!
15//! 3. **Dictionary Compression**: Zstd dictionary for 15-30% better compression
16//!
17//! 4. **Configurable Compression Level**: Levels 1-22 for space/speed tradeoff
18//!
19//! 5. **Bloom Filter**: Fast negative lookups to skip unnecessary I/O
20
21use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
22use parking_lot::RwLock;
23use rustc_hash::FxHashMap;
24use std::io::{self, Read, Write};
25use std::sync::Arc;
26
27#[cfg(feature = "native")]
28use super::sstable_index::FstBlockIndex;
29use super::sstable_index::{BlockAddr, BlockIndex, MmapBlockIndex};
30use crate::compression::{CompressionDict, CompressionLevel};
31use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice, OwnedBytes};
32
33/// SSTable magic number - version 4 with memory-efficient index
34/// Uses FST-based or mmap'd block index to avoid heap allocation
35pub const SSTABLE_MAGIC: u32 = 0x53544234; // "STB4"
36
37/// Block size for SSTable (16KB default)
38pub const BLOCK_SIZE: usize = 16 * 1024;
39
40/// Default dictionary size (64KB)
41pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
42
43/// Bloom filter bits per key (10 bits ≈ 1% false positive rate)
44pub const BLOOM_BITS_PER_KEY: usize = 10;
45
46/// Bloom filter hash count (optimal for 10 bits/key)
47pub const BLOOM_HASH_COUNT: usize = 7;
48
49// ============================================================================
50// Bloom Filter Implementation
51// ============================================================================
52
53/// Simple bloom filter for key existence checks
54#[derive(Debug, Clone)]
55pub struct BloomFilter {
56    bits: BloomBits,
57    num_bits: usize,
58    num_hashes: usize,
59}
60
61/// Bloom filter storage — Vec for write path, OwnedBytes for zero-copy read path.
62#[derive(Debug, Clone)]
63enum BloomBits {
64    /// Mutable storage for building (SSTable writer)
65    Vec(Vec<u64>),
66    /// Zero-copy mmap reference for reading (raw LE u64 words, no header)
67    Bytes(OwnedBytes),
68}
69
70impl BloomBits {
71    #[inline]
72    fn len(&self) -> usize {
73        match self {
74            BloomBits::Vec(v) => v.len(),
75            BloomBits::Bytes(b) => b.len() / 8,
76        }
77    }
78
79    #[inline]
80    fn get(&self, word_idx: usize) -> u64 {
81        match self {
82            BloomBits::Vec(v) => v[word_idx],
83            BloomBits::Bytes(b) => {
84                let off = word_idx * 8;
85                u64::from_le_bytes([
86                    b[off],
87                    b[off + 1],
88                    b[off + 2],
89                    b[off + 3],
90                    b[off + 4],
91                    b[off + 5],
92                    b[off + 6],
93                    b[off + 7],
94                ])
95            }
96        }
97    }
98
99    #[inline]
100    fn set_bit(&mut self, word_idx: usize, bit_idx: usize) {
101        match self {
102            BloomBits::Vec(v) => v[word_idx] |= 1u64 << bit_idx,
103            BloomBits::Bytes(_) => panic!("cannot mutate read-only bloom filter"),
104        }
105    }
106
107    fn size_bytes(&self) -> usize {
108        match self {
109            BloomBits::Vec(v) => v.len() * 8,
110            BloomBits::Bytes(b) => b.len(),
111        }
112    }
113}
114
115impl BloomFilter {
116    /// Create a new bloom filter sized for expected number of keys
117    pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
118        let num_bits = (expected_keys * bits_per_key).max(64);
119        let num_words = num_bits.div_ceil(64);
120        Self {
121            bits: BloomBits::Vec(vec![0u64; num_words]),
122            num_bits,
123            num_hashes: BLOOM_HASH_COUNT,
124        }
125    }
126
127    /// Create from serialized OwnedBytes (zero-copy for mmap)
128    pub fn from_owned_bytes(data: OwnedBytes) -> io::Result<Self> {
129        if data.len() < 12 {
130            return Err(io::Error::new(
131                io::ErrorKind::InvalidData,
132                "Bloom filter data too short",
133            ));
134        }
135        let d = data.as_slice();
136        let num_bits = u32::from_le_bytes([d[0], d[1], d[2], d[3]]) as usize;
137        let num_hashes = u32::from_le_bytes([d[4], d[5], d[6], d[7]]) as usize;
138        let num_words = u32::from_le_bytes([d[8], d[9], d[10], d[11]]) as usize;
139
140        if d.len() < 12 + num_words * 8 {
141            return Err(io::Error::new(
142                io::ErrorKind::InvalidData,
143                "Bloom filter data truncated",
144            ));
145        }
146
147        // Slice past the 12-byte header to get raw u64 LE words (zero-copy)
148        let bits_bytes = data.slice(12..12 + num_words * 8);
149
150        Ok(Self {
151            bits: BloomBits::Bytes(bits_bytes),
152            num_bits,
153            num_hashes,
154        })
155    }
156
157    /// Serialize to bytes
158    pub fn to_bytes(&self) -> Vec<u8> {
159        let num_words = self.bits.len();
160        let mut data = Vec::with_capacity(12 + num_words * 8);
161        data.write_u32::<LittleEndian>(self.num_bits as u32)
162            .unwrap();
163        data.write_u32::<LittleEndian>(self.num_hashes as u32)
164            .unwrap();
165        data.write_u32::<LittleEndian>(num_words as u32).unwrap();
166        for i in 0..num_words {
167            data.write_u64::<LittleEndian>(self.bits.get(i)).unwrap();
168        }
169        data
170    }
171
172    /// Add a key to the filter
173    pub fn insert(&mut self, key: &[u8]) {
174        let (h1, h2) = self.hash_pair(key);
175        for i in 0..self.num_hashes {
176            let bit_pos = self.get_bit_pos(h1, h2, i);
177            let word_idx = bit_pos / 64;
178            let bit_idx = bit_pos % 64;
179            if word_idx < self.bits.len() {
180                self.bits.set_bit(word_idx, bit_idx);
181            }
182        }
183    }
184
185    /// Check if a key might be in the filter
186    /// Returns false if definitely not present, true if possibly present
187    pub fn may_contain(&self, key: &[u8]) -> bool {
188        let (h1, h2) = self.hash_pair(key);
189        for i in 0..self.num_hashes {
190            let bit_pos = self.get_bit_pos(h1, h2, i);
191            let word_idx = bit_pos / 64;
192            let bit_idx = bit_pos % 64;
193            if word_idx >= self.bits.len() || (self.bits.get(word_idx) & (1u64 << bit_idx)) == 0 {
194                return false;
195            }
196        }
197        true
198    }
199
200    /// Size in bytes
201    pub fn size_bytes(&self) -> usize {
202        12 + self.bits.size_bytes()
203    }
204
205    /// Insert a pre-computed hash pair into the filter
206    pub fn insert_hashed(&mut self, h1: u64, h2: u64) {
207        for i in 0..self.num_hashes {
208            let bit_pos = self.get_bit_pos(h1, h2, i);
209            let word_idx = bit_pos / 64;
210            let bit_idx = bit_pos % 64;
211            if word_idx < self.bits.len() {
212                self.bits.set_bit(word_idx, bit_idx);
213            }
214        }
215    }
216
217    /// Compute two hash values using FNV-1a variant
218    fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
219        // FNV-1a hash
220        let mut h1: u64 = 0xcbf29ce484222325;
221        for &byte in key {
222            h1 ^= byte as u64;
223            h1 = h1.wrapping_mul(0x100000001b3);
224        }
225
226        // Second hash using different seed
227        let mut h2: u64 = 0x84222325cbf29ce4;
228        for &byte in key {
229            h2 = h2.wrapping_mul(0x100000001b3);
230            h2 ^= byte as u64;
231        }
232
233        (h1, h2)
234    }
235
236    /// Get bit position for hash iteration i using double hashing
237    #[inline]
238    fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
239        (h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
240    }
241}
242
243/// Compute bloom filter hash pair for a key (standalone, no BloomFilter needed).
244/// Uses the same FNV-1a double-hashing as BloomFilter::hash_pair.
245fn bloom_hash_pair(key: &[u8]) -> (u64, u64) {
246    let mut h1: u64 = 0xcbf29ce484222325;
247    for &byte in key {
248        h1 ^= byte as u64;
249        h1 = h1.wrapping_mul(0x100000001b3);
250    }
251    let mut h2: u64 = 0x84222325cbf29ce4;
252    for &byte in key {
253        h2 = h2.wrapping_mul(0x100000001b3);
254        h2 ^= byte as u64;
255    }
256    (h1, h2)
257}
258
259/// SSTable value trait
260pub trait SSTableValue: Clone + Send + Sync {
261    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
262    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
263}
264
265/// u64 value implementation
266impl SSTableValue for u64 {
267    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
268        write_vint(writer, *self)
269    }
270
271    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
272        read_vint(reader)
273    }
274}
275
276/// Vec<u8> value implementation
277impl SSTableValue for Vec<u8> {
278    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
279        write_vint(writer, self.len() as u64)?;
280        writer.write_all(self)
281    }
282
283    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
284        let len = read_vint(reader)? as usize;
285        let mut data = vec![0u8; len];
286        reader.read_exact(&mut data)?;
287        Ok(data)
288    }
289}
290
291/// Sparse dimension info for SSTable-based sparse index
292/// Stores offset and length for posting list lookup
293#[derive(Debug, Clone, Copy, PartialEq, Eq)]
294pub struct SparseDimInfo {
295    /// Offset in sparse file where posting list starts
296    pub offset: u64,
297    /// Length of serialized posting list
298    pub length: u32,
299}
300
301impl SparseDimInfo {
302    pub fn new(offset: u64, length: u32) -> Self {
303        Self { offset, length }
304    }
305}
306
307impl SSTableValue for SparseDimInfo {
308    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
309        write_vint(writer, self.offset)?;
310        write_vint(writer, self.length as u64)
311    }
312
313    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
314        let offset = read_vint(reader)?;
315        let length = read_vint(reader)? as u32;
316        Ok(Self { offset, length })
317    }
318}
319
320/// Maximum number of postings that can be inlined in TermInfo
321pub const MAX_INLINE_POSTINGS: usize = 3;
322
323/// Term info for posting list references
324///
325/// Supports two modes:
326/// - **Inline**: Small posting lists (1-3 docs) stored directly in TermInfo
327/// - **External**: Larger posting lists stored in separate .post file
328///
329/// This eliminates a separate I/O read for rare/unique terms.
330#[derive(Debug, Clone, PartialEq, Eq)]
331pub enum TermInfo {
332    /// Small posting list inlined directly (up to MAX_INLINE_POSTINGS entries)
333    /// Each entry is (doc_id, term_freq) delta-encoded
334    Inline {
335        /// Number of postings (1-3)
336        doc_freq: u8,
337        /// Inline data: delta-encoded (doc_id, term_freq) pairs
338        /// Format: [delta_doc_id, term_freq, delta_doc_id, term_freq, ...]
339        data: [u8; 16],
340        /// Actual length of data used
341        data_len: u8,
342    },
343    /// Reference to external posting list in .post file
344    External {
345        posting_offset: u64,
346        posting_len: u32,
347        doc_freq: u32,
348        /// Position data offset (0 if no positions)
349        position_offset: u64,
350        /// Position data length (0 if no positions)
351        position_len: u32,
352    },
353}
354
355impl TermInfo {
356    /// Create an external reference
357    pub fn external(posting_offset: u64, posting_len: u32, doc_freq: u32) -> Self {
358        TermInfo::External {
359            posting_offset,
360            posting_len,
361            doc_freq,
362            position_offset: 0,
363            position_len: 0,
364        }
365    }
366
367    /// Create an external reference with position info
368    pub fn external_with_positions(
369        posting_offset: u64,
370        posting_len: u32,
371        doc_freq: u32,
372        position_offset: u64,
373        position_len: u32,
374    ) -> Self {
375        TermInfo::External {
376            posting_offset,
377            posting_len,
378            doc_freq,
379            position_offset,
380            position_len,
381        }
382    }
383
384    /// Try to create an inline TermInfo from posting data
385    /// Returns None if posting list is too large to inline
386    pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
387        if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
388            return None;
389        }
390
391        let mut data = [0u8; 16];
392        let mut cursor = std::io::Cursor::new(&mut data[..]);
393        let mut prev_doc_id = 0u32;
394
395        for (i, &doc_id) in doc_ids.iter().enumerate() {
396            let delta = doc_id - prev_doc_id;
397            if write_vint(&mut cursor, delta as u64).is_err() {
398                return None;
399            }
400            if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
401                return None;
402            }
403            prev_doc_id = doc_id;
404        }
405
406        let data_len = cursor.position() as u8;
407        if data_len > 16 {
408            return None;
409        }
410
411        Some(TermInfo::Inline {
412            doc_freq: doc_ids.len() as u8,
413            data,
414            data_len,
415        })
416    }
417
418    /// Try to create an inline TermInfo from an iterator of (doc_id, term_freq) pairs.
419    /// Zero-allocation alternative to `try_inline` — avoids collecting into Vec<u32>.
420    /// `count` is the number of postings (must match iterator length).
421    pub fn try_inline_iter(count: usize, iter: impl Iterator<Item = (u32, u32)>) -> Option<Self> {
422        if count > MAX_INLINE_POSTINGS || count == 0 {
423            return None;
424        }
425
426        let mut data = [0u8; 16];
427        let mut cursor = std::io::Cursor::new(&mut data[..]);
428        let mut prev_doc_id = 0u32;
429
430        for (doc_id, tf) in iter {
431            let delta = doc_id - prev_doc_id;
432            if write_vint(&mut cursor, delta as u64).is_err() {
433                return None;
434            }
435            if write_vint(&mut cursor, tf as u64).is_err() {
436                return None;
437            }
438            prev_doc_id = doc_id;
439        }
440
441        let data_len = cursor.position() as u8;
442
443        Some(TermInfo::Inline {
444            doc_freq: count as u8,
445            data,
446            data_len,
447        })
448    }
449
450    /// Get document frequency
451    pub fn doc_freq(&self) -> u32 {
452        match self {
453            TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
454            TermInfo::External { doc_freq, .. } => *doc_freq,
455        }
456    }
457
458    /// Check if this is an inline posting list
459    pub fn is_inline(&self) -> bool {
460        matches!(self, TermInfo::Inline { .. })
461    }
462
463    /// Get external posting info (offset, len) - returns None for inline
464    pub fn external_info(&self) -> Option<(u64, u32)> {
465        match self {
466            TermInfo::External {
467                posting_offset,
468                posting_len,
469                ..
470            } => Some((*posting_offset, *posting_len)),
471            TermInfo::Inline { .. } => None,
472        }
473    }
474
475    /// Get position info (offset, len) - returns None for inline or if no positions
476    pub fn position_info(&self) -> Option<(u64, u32)> {
477        match self {
478            TermInfo::External {
479                position_offset,
480                position_len,
481                ..
482            } if *position_len > 0 => Some((*position_offset, *position_len)),
483            _ => None,
484        }
485    }
486
487    /// Decode inline postings into (doc_ids, term_freqs)
488    /// Returns None if this is an external reference
489    pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
490        match self {
491            TermInfo::Inline {
492                doc_freq,
493                data,
494                data_len,
495            } => {
496                let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
497                let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
498                let mut reader = &data[..*data_len as usize];
499                let mut prev_doc_id = 0u32;
500
501                for _ in 0..*doc_freq {
502                    let delta = read_vint(&mut reader).ok()? as u32;
503                    let tf = read_vint(&mut reader).ok()? as u32;
504                    let doc_id = prev_doc_id + delta;
505                    doc_ids.push(doc_id);
506                    term_freqs.push(tf);
507                    prev_doc_id = doc_id;
508                }
509
510                Some((doc_ids, term_freqs))
511            }
512            TermInfo::External { .. } => None,
513        }
514    }
515}
516
517impl SSTableValue for TermInfo {
518    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
519        match self {
520            TermInfo::Inline {
521                doc_freq,
522                data,
523                data_len,
524            } => {
525                // Tag byte 0xFF = inline marker
526                writer.write_u8(0xFF)?;
527                writer.write_u8(*doc_freq)?;
528                writer.write_u8(*data_len)?;
529                writer.write_all(&data[..*data_len as usize])?;
530            }
531            TermInfo::External {
532                posting_offset,
533                posting_len,
534                doc_freq,
535                position_offset,
536                position_len,
537            } => {
538                // Tag byte 0x00 = external marker (no positions)
539                // Tag byte 0x01 = external with positions
540                if *position_len > 0 {
541                    writer.write_u8(0x01)?;
542                    write_vint(writer, *doc_freq as u64)?;
543                    write_vint(writer, *posting_offset)?;
544                    write_vint(writer, *posting_len as u64)?;
545                    write_vint(writer, *position_offset)?;
546                    write_vint(writer, *position_len as u64)?;
547                } else {
548                    writer.write_u8(0x00)?;
549                    write_vint(writer, *doc_freq as u64)?;
550                    write_vint(writer, *posting_offset)?;
551                    write_vint(writer, *posting_len as u64)?;
552                }
553            }
554        }
555        Ok(())
556    }
557
558    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
559        let tag = reader.read_u8()?;
560
561        if tag == 0xFF {
562            // Inline
563            let doc_freq = reader.read_u8()?;
564            let data_len = reader.read_u8()?;
565            let mut data = [0u8; 16];
566            reader.read_exact(&mut data[..data_len as usize])?;
567            Ok(TermInfo::Inline {
568                doc_freq,
569                data,
570                data_len,
571            })
572        } else if tag == 0x00 {
573            // External (no positions)
574            let doc_freq = read_vint(reader)? as u32;
575            let posting_offset = read_vint(reader)?;
576            let posting_len = read_vint(reader)? as u32;
577            Ok(TermInfo::External {
578                posting_offset,
579                posting_len,
580                doc_freq,
581                position_offset: 0,
582                position_len: 0,
583            })
584        } else if tag == 0x01 {
585            // External with positions
586            let doc_freq = read_vint(reader)? as u32;
587            let posting_offset = read_vint(reader)?;
588            let posting_len = read_vint(reader)? as u32;
589            let position_offset = read_vint(reader)?;
590            let position_len = read_vint(reader)? as u32;
591            Ok(TermInfo::External {
592                posting_offset,
593                posting_len,
594                doc_freq,
595                position_offset,
596                position_len,
597            })
598        } else {
599            Err(io::Error::new(
600                io::ErrorKind::InvalidData,
601                format!("Invalid TermInfo tag: {}", tag),
602            ))
603        }
604    }
605}
606
607/// Write variable-length integer
608pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
609    loop {
610        let byte = (value & 0x7F) as u8;
611        value >>= 7;
612        if value == 0 {
613            writer.write_u8(byte)?;
614            return Ok(());
615        } else {
616            writer.write_u8(byte | 0x80)?;
617        }
618    }
619}
620
621/// Read variable-length integer
622pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
623    let mut result = 0u64;
624    let mut shift = 0;
625
626    loop {
627        let byte = reader.read_u8()?;
628        result |= ((byte & 0x7F) as u64) << shift;
629        if byte & 0x80 == 0 {
630            return Ok(result);
631        }
632        shift += 7;
633        if shift >= 64 {
634            return Err(io::Error::new(
635                io::ErrorKind::InvalidData,
636                "varint too long",
637            ));
638        }
639    }
640}
641
642/// Compute common prefix length
643pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
644    a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
645}
646
647/// SSTable statistics for debugging
648#[derive(Debug, Clone)]
649pub struct SSTableStats {
650    pub num_blocks: usize,
651    pub num_sparse_entries: usize,
652    pub num_entries: u64,
653    pub has_bloom_filter: bool,
654    pub has_dictionary: bool,
655    pub bloom_filter_size: usize,
656    pub dictionary_size: usize,
657}
658
659/// SSTable writer configuration
660#[derive(Debug, Clone)]
661pub struct SSTableWriterConfig {
662    /// Compression level (1-22, higher = better compression but slower)
663    pub compression_level: CompressionLevel,
664    /// Whether to train and use a dictionary for compression
665    pub use_dictionary: bool,
666    /// Dictionary size in bytes (default 64KB)
667    pub dict_size: usize,
668    /// Whether to build a bloom filter
669    pub use_bloom_filter: bool,
670    /// Bloom filter bits per key (default 10 = ~1% false positive rate)
671    pub bloom_bits_per_key: usize,
672}
673
674impl Default for SSTableWriterConfig {
675    fn default() -> Self {
676        Self::from_optimization(crate::structures::IndexOptimization::default())
677    }
678}
679
680impl SSTableWriterConfig {
681    /// Create config from IndexOptimization mode
682    pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
683        use crate::structures::IndexOptimization;
684        match optimization {
685            IndexOptimization::Adaptive => Self {
686                compression_level: CompressionLevel::BETTER, // Level 9
687                use_dictionary: false,
688                dict_size: DEFAULT_DICT_SIZE,
689                use_bloom_filter: false,
690                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
691            },
692            IndexOptimization::SizeOptimized => Self {
693                compression_level: CompressionLevel::MAX, // Level 22
694                use_dictionary: true,
695                dict_size: DEFAULT_DICT_SIZE,
696                use_bloom_filter: true,
697                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
698            },
699            IndexOptimization::PerformanceOptimized => Self {
700                compression_level: CompressionLevel::FAST, // Level 1
701                use_dictionary: false,
702                dict_size: DEFAULT_DICT_SIZE,
703                use_bloom_filter: true, // Bloom helps skip blocks fast
704                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
705            },
706        }
707    }
708
709    /// Fast configuration - prioritize write speed over compression
710    pub fn fast() -> Self {
711        Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
712    }
713
714    /// Maximum compression configuration - prioritize size over speed
715    pub fn max_compression() -> Self {
716        Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
717    }
718}
719
720/// SSTable writer with optimizations:
721/// - Dictionary compression for blocks (if dictionary provided)
722/// - Configurable compression level
723/// - Block index prefix compression
724/// - Bloom filter for fast negative lookups
725pub struct SSTableWriter<W: Write, V: SSTableValue> {
726    writer: W,
727    block_buffer: Vec<u8>,
728    prev_key: Vec<u8>,
729    index: Vec<BlockIndexEntry>,
730    current_offset: u64,
731    num_entries: u64,
732    block_first_key: Option<Vec<u8>>,
733    config: SSTableWriterConfig,
734    /// Pre-trained dictionary for compression (optional)
735    dictionary: Option<CompressionDict>,
736    /// Bloom filter key hashes — compact (u64, u64) pairs instead of full keys.
737    /// Filter is built at finish() time with correct sizing.
738    bloom_hashes: Vec<(u64, u64)>,
739    _phantom: std::marker::PhantomData<V>,
740}
741
742impl<W: Write, V: SSTableValue> SSTableWriter<W, V> {
743    /// Create a new SSTable writer with default configuration
744    pub fn new(writer: W) -> Self {
745        Self::with_config(writer, SSTableWriterConfig::default())
746    }
747
748    /// Create a new SSTable writer with custom configuration
749    pub fn with_config(writer: W, config: SSTableWriterConfig) -> Self {
750        Self {
751            writer,
752            block_buffer: Vec::with_capacity(BLOCK_SIZE),
753            prev_key: Vec::new(),
754            index: Vec::new(),
755            current_offset: 0,
756            num_entries: 0,
757            block_first_key: None,
758            config,
759            dictionary: None,
760            bloom_hashes: Vec::new(),
761            _phantom: std::marker::PhantomData,
762        }
763    }
764
765    /// Create a new SSTable writer with a pre-trained dictionary
766    pub fn with_dictionary(
767        writer: W,
768        config: SSTableWriterConfig,
769        dictionary: CompressionDict,
770    ) -> Self {
771        Self {
772            writer,
773            block_buffer: Vec::with_capacity(BLOCK_SIZE),
774            prev_key: Vec::new(),
775            index: Vec::new(),
776            current_offset: 0,
777            num_entries: 0,
778            block_first_key: None,
779            config,
780            dictionary: Some(dictionary),
781            bloom_hashes: Vec::new(),
782            _phantom: std::marker::PhantomData,
783        }
784    }
785
786    pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
787        if self.block_first_key.is_none() {
788            self.block_first_key = Some(key.to_vec());
789        }
790
791        // Store compact hash pair for bloom filter (16 bytes vs ~48+ per key)
792        if self.config.use_bloom_filter {
793            self.bloom_hashes.push(bloom_hash_pair(key));
794        }
795
796        let prefix_len = common_prefix_len(&self.prev_key, key);
797        let suffix = &key[prefix_len..];
798
799        write_vint(&mut self.block_buffer, prefix_len as u64)?;
800        write_vint(&mut self.block_buffer, suffix.len() as u64)?;
801        self.block_buffer.extend_from_slice(suffix);
802        value.serialize(&mut self.block_buffer)?;
803
804        self.prev_key.clear();
805        self.prev_key.extend_from_slice(key);
806        self.num_entries += 1;
807
808        if self.block_buffer.len() >= BLOCK_SIZE {
809            self.flush_block()?;
810        }
811
812        Ok(())
813    }
814
815    /// Flush and compress the current block
816    fn flush_block(&mut self) -> io::Result<()> {
817        if self.block_buffer.is_empty() {
818            return Ok(());
819        }
820
821        // Compress block with dictionary if available
822        let compressed = if let Some(ref dict) = self.dictionary {
823            crate::compression::compress_with_dict(
824                &self.block_buffer,
825                self.config.compression_level,
826                dict,
827            )?
828        } else {
829            crate::compression::compress(&self.block_buffer, self.config.compression_level)?
830        };
831
832        if let Some(first_key) = self.block_first_key.take() {
833            self.index.push(BlockIndexEntry {
834                first_key,
835                offset: self.current_offset,
836                length: compressed.len() as u32,
837            });
838        }
839
840        self.writer.write_all(&compressed)?;
841        self.current_offset += compressed.len() as u64;
842        self.block_buffer.clear();
843        self.prev_key.clear();
844
845        Ok(())
846    }
847
848    pub fn finish(mut self) -> io::Result<W> {
849        // Flush any remaining data
850        self.flush_block()?;
851
852        // Build bloom filter from collected hashes (properly sized)
853        let bloom_filter = if self.config.use_bloom_filter && !self.bloom_hashes.is_empty() {
854            let mut bloom =
855                BloomFilter::new(self.bloom_hashes.len(), self.config.bloom_bits_per_key);
856            for (h1, h2) in &self.bloom_hashes {
857                bloom.insert_hashed(*h1, *h2);
858            }
859            Some(bloom)
860        } else {
861            None
862        };
863
864        let data_end_offset = self.current_offset;
865
866        // Build memory-efficient block index (v4 format)
867        // Convert to (key, BlockAddr) pairs for the new index format
868        let entries: Vec<(Vec<u8>, BlockAddr)> = self
869            .index
870            .iter()
871            .map(|e| {
872                (
873                    e.first_key.clone(),
874                    BlockAddr {
875                        offset: e.offset,
876                        length: e.length,
877                    },
878                )
879            })
880            .collect();
881
882        // Build FST-based index if native feature is enabled, otherwise use mmap index
883        #[cfg(feature = "native")]
884        let index_bytes = FstBlockIndex::build(&entries)?;
885        #[cfg(not(feature = "native"))]
886        let index_bytes = MmapBlockIndex::build(&entries)?;
887
888        // Write index bytes with length prefix
889        self.writer
890            .write_u32::<LittleEndian>(index_bytes.len() as u32)?;
891        self.writer.write_all(&index_bytes)?;
892        self.current_offset += 4 + index_bytes.len() as u64;
893
894        // Write bloom filter if present
895        let bloom_offset = if let Some(ref bloom) = bloom_filter {
896            let bloom_data = bloom.to_bytes();
897            let offset = self.current_offset;
898            self.writer.write_all(&bloom_data)?;
899            self.current_offset += bloom_data.len() as u64;
900            offset
901        } else {
902            0
903        };
904
905        // Write dictionary if present
906        let dict_offset = if let Some(ref dict) = self.dictionary {
907            let dict_bytes = dict.as_bytes();
908            let offset = self.current_offset;
909            self.writer
910                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
911            self.writer.write_all(dict_bytes)?;
912            self.current_offset += 4 + dict_bytes.len() as u64;
913            offset
914        } else {
915            0
916        };
917
918        // Write extended footer (v4 format)
919        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
920        self.writer.write_u64::<LittleEndian>(self.num_entries)?;
921        self.writer.write_u64::<LittleEndian>(bloom_offset)?; // 0 if no bloom
922        self.writer.write_u64::<LittleEndian>(dict_offset)?; // 0 if no dict
923        self.writer
924            .write_u8(self.config.compression_level.0 as u8)?;
925        self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
926
927        Ok(self.writer)
928    }
929}
930
931/// Block index entry
932#[derive(Debug, Clone)]
933struct BlockIndexEntry {
934    first_key: Vec<u8>,
935    offset: u64,
936    length: u32,
937}
938
939/// Async SSTable reader - loads blocks on demand via LazyFileSlice
940///
941/// Memory-efficient design (v4 format):
942/// - Block index uses FST (native) or mmap'd raw bytes - no heap allocation for keys
943/// - Block addresses stored in bitpacked format
944/// - Bloom filter and dictionary optional
945pub struct AsyncSSTableReader<V: SSTableValue> {
946    /// LazyFileSlice for the data portion (blocks only) - fetches ranges on demand
947    data_slice: LazyFileSlice,
948    /// Memory-efficient block index (v4: FST or mmap, v3: legacy Vec)
949    block_index: BlockIndex,
950    num_entries: u64,
951    /// Hot cache for decompressed blocks
952    cache: RwLock<BlockCache>,
953    /// Bloom filter for fast negative lookups (optional)
954    bloom_filter: Option<BloomFilter>,
955    /// Compression dictionary (optional)
956    dictionary: Option<CompressionDict>,
957    /// Compression level used
958    #[allow(dead_code)]
959    compression_level: CompressionLevel,
960    _phantom: std::marker::PhantomData<V>,
961}
962
963/// LRU block cache — O(1) lookup/insert, amortized O(n) promotion.
964///
965/// On `get()`, promotes the accessed entry to MRU position.
966/// On eviction, removes the LRU entry (front of VecDeque).
967/// For typical cache sizes (16-64 blocks), the linear scan in
968/// `promote()` is negligible compared to I/O savings.
969struct BlockCache {
970    blocks: FxHashMap<u64, Arc<[u8]>>,
971    lru_order: std::collections::VecDeque<u64>,
972    max_blocks: usize,
973}
974
975impl BlockCache {
976    fn new(max_blocks: usize) -> Self {
977        Self {
978            blocks: FxHashMap::default(),
979            lru_order: std::collections::VecDeque::with_capacity(max_blocks),
980            max_blocks,
981        }
982    }
983
984    fn get(&mut self, offset: u64) -> Option<Arc<[u8]>> {
985        if self.blocks.contains_key(&offset) {
986            self.promote(offset);
987            self.blocks.get(&offset).map(Arc::clone)
988        } else {
989            None
990        }
991    }
992
993    fn insert(&mut self, offset: u64, block: Arc<[u8]>) {
994        if self.blocks.contains_key(&offset) {
995            self.promote(offset);
996            return;
997        }
998        while self.blocks.len() >= self.max_blocks {
999            if let Some(evict_offset) = self.lru_order.pop_front() {
1000                self.blocks.remove(&evict_offset);
1001            } else {
1002                break;
1003            }
1004        }
1005        self.blocks.insert(offset, block);
1006        self.lru_order.push_back(offset);
1007    }
1008
1009    /// Move entry to MRU position (back of deque)
1010    fn promote(&mut self, offset: u64) {
1011        if let Some(pos) = self.lru_order.iter().position(|&k| k == offset) {
1012            self.lru_order.remove(pos);
1013            self.lru_order.push_back(offset);
1014        }
1015    }
1016}
1017
1018impl<V: SSTableValue> AsyncSSTableReader<V> {
1019    /// Open an SSTable from a LazyFileHandle
1020    /// Only loads the footer and index into memory, data blocks fetched on-demand
1021    ///
1022    /// Supports both v3 (legacy) and v4 (memory-efficient) formats:
1023    /// - v4: FST-based or mmap'd block index (no heap allocation for keys)
1024    /// - v3: Prefix-compressed block index loaded into Vec (legacy fallback)
1025    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
1026        let file_len = file_handle.len();
1027        if file_len < 37 {
1028            return Err(io::Error::new(
1029                io::ErrorKind::InvalidData,
1030                "SSTable too small",
1031            ));
1032        }
1033
1034        // Read footer (37 bytes)
1035        // Format: data_end(8) + num_entries(8) + bloom_offset(8) + dict_offset(8) + compression_level(1) + magic(4)
1036        let footer_bytes = file_handle
1037            .read_bytes_range(file_len - 37..file_len)
1038            .await?;
1039
1040        let mut reader = footer_bytes.as_slice();
1041        let data_end_offset = reader.read_u64::<LittleEndian>()?;
1042        let num_entries = reader.read_u64::<LittleEndian>()?;
1043        let bloom_offset = reader.read_u64::<LittleEndian>()?;
1044        let dict_offset = reader.read_u64::<LittleEndian>()?;
1045        let compression_level = CompressionLevel(reader.read_u8()? as i32);
1046        let magic = reader.read_u32::<LittleEndian>()?;
1047
1048        if magic != SSTABLE_MAGIC {
1049            return Err(io::Error::new(
1050                io::ErrorKind::InvalidData,
1051                format!("Invalid SSTable magic: 0x{:08X}", magic),
1052            ));
1053        }
1054
1055        // Read index section
1056        let index_start = data_end_offset;
1057        let index_end = file_len - 37;
1058        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
1059
1060        // Parse block index (length-prefixed FST or mmap index)
1061        let mut idx_reader = index_bytes.as_slice();
1062        let index_len = idx_reader.read_u32::<LittleEndian>()? as usize;
1063
1064        if index_len > idx_reader.len() {
1065            return Err(io::Error::new(
1066                io::ErrorKind::InvalidData,
1067                "Index data truncated",
1068            ));
1069        }
1070
1071        let index_data = index_bytes.slice(4..4 + index_len);
1072
1073        // Try FST first (native), fall back to mmap
1074        #[cfg(feature = "native")]
1075        let block_index = match FstBlockIndex::load(index_data.clone()) {
1076            Ok(fst_idx) => BlockIndex::Fst(fst_idx),
1077            Err(_) => BlockIndex::Mmap(MmapBlockIndex::load(index_data)?),
1078        };
1079        #[cfg(not(feature = "native"))]
1080        let block_index = BlockIndex::Mmap(MmapBlockIndex::load(index_data)?);
1081
1082        // Load bloom filter if present
1083        let bloom_filter = if bloom_offset > 0 {
1084            let bloom_start = bloom_offset;
1085            // Read bloom filter size first (12 bytes header)
1086            let bloom_header = file_handle
1087                .read_bytes_range(bloom_start..bloom_start + 12)
1088                .await?;
1089            let num_words = u32::from_le_bytes([
1090                bloom_header[8],
1091                bloom_header[9],
1092                bloom_header[10],
1093                bloom_header[11],
1094            ]) as u64;
1095            let bloom_size = 12 + num_words * 8;
1096            let bloom_data = file_handle
1097                .read_bytes_range(bloom_start..bloom_start + bloom_size)
1098                .await?;
1099            Some(BloomFilter::from_owned_bytes(bloom_data)?)
1100        } else {
1101            None
1102        };
1103
1104        // Load dictionary if present
1105        let dictionary = if dict_offset > 0 {
1106            let dict_start = dict_offset;
1107            // Read dictionary size first
1108            let dict_len_bytes = file_handle
1109                .read_bytes_range(dict_start..dict_start + 4)
1110                .await?;
1111            let dict_len = u32::from_le_bytes([
1112                dict_len_bytes[0],
1113                dict_len_bytes[1],
1114                dict_len_bytes[2],
1115                dict_len_bytes[3],
1116            ]) as u64;
1117            let dict_data = file_handle
1118                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
1119                .await?;
1120            Some(CompressionDict::from_owned_bytes(dict_data))
1121        } else {
1122            None
1123        };
1124
1125        // Create a lazy slice for just the data portion
1126        let data_slice = file_handle.slice(0..data_end_offset);
1127
1128        Ok(Self {
1129            data_slice,
1130            block_index,
1131            num_entries,
1132            cache: RwLock::new(BlockCache::new(cache_blocks)),
1133            bloom_filter,
1134            dictionary,
1135            compression_level,
1136            _phantom: std::marker::PhantomData,
1137        })
1138    }
1139
1140    /// Number of entries
1141    pub fn num_entries(&self) -> u64 {
1142        self.num_entries
1143    }
1144
1145    /// Get stats about this SSTable for debugging
1146    pub fn stats(&self) -> SSTableStats {
1147        SSTableStats {
1148            num_blocks: self.block_index.len(),
1149            num_sparse_entries: 0, // No longer using sparse index separately
1150            num_entries: self.num_entries,
1151            has_bloom_filter: self.bloom_filter.is_some(),
1152            has_dictionary: self.dictionary.is_some(),
1153            bloom_filter_size: self
1154                .bloom_filter
1155                .as_ref()
1156                .map(|b| b.size_bytes())
1157                .unwrap_or(0),
1158            dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
1159        }
1160    }
1161
1162    /// Number of blocks currently in the cache
1163    pub fn cached_blocks(&self) -> usize {
1164        self.cache.read().blocks.len()
1165    }
1166
1167    /// Look up a key (async - may need to load block)
1168    ///
1169    /// Uses bloom filter for fast negative lookups, then memory-efficient
1170    /// block index to locate the block, reducing I/O to typically 1 block read.
1171    pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
1172        log::debug!(
1173            "SSTable::get called, key_len={}, total_blocks={}",
1174            key.len(),
1175            self.block_index.len()
1176        );
1177
1178        // Check bloom filter first - fast negative lookup
1179        if let Some(ref bloom) = self.bloom_filter
1180            && !bloom.may_contain(key)
1181        {
1182            log::debug!("SSTable::get bloom filter negative");
1183            return Ok(None);
1184        }
1185
1186        // Use block index to find the block that could contain the key
1187        let block_idx = match self.block_index.locate(key) {
1188            Some(idx) => idx,
1189            None => {
1190                log::debug!("SSTable::get key not found (before first block)");
1191                return Ok(None);
1192            }
1193        };
1194
1195        log::debug!("SSTable::get loading block_idx={}", block_idx);
1196
1197        // Now we know exactly which block to load - single I/O
1198        let block_data = self.load_block(block_idx).await?;
1199        self.search_block(&block_data, key)
1200    }
1201
1202    /// Batch lookup multiple keys with optimized I/O
1203    ///
1204    /// Groups keys by block and loads each block only once, reducing
1205    /// I/O from N reads to at most N reads (often fewer if keys share blocks).
1206    /// Uses bloom filter to skip keys that definitely don't exist.
1207    pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
1208        if keys.is_empty() {
1209            return Ok(Vec::new());
1210        }
1211
1212        // Map each key to its block index
1213        let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
1214        for (key_idx, key) in keys.iter().enumerate() {
1215            // Check bloom filter first
1216            if let Some(ref bloom) = self.bloom_filter
1217                && !bloom.may_contain(key)
1218            {
1219                key_to_block.push((key_idx, usize::MAX)); // Definitely not present
1220                continue;
1221            }
1222
1223            match self.block_index.locate(key) {
1224                Some(block_idx) => key_to_block.push((key_idx, block_idx)),
1225                None => key_to_block.push((key_idx, usize::MAX)), // Mark as not found
1226            }
1227        }
1228
1229        // Group keys by block
1230        let mut blocks_to_load: Vec<usize> = key_to_block
1231            .iter()
1232            .filter(|(_, b)| *b != usize::MAX)
1233            .map(|(_, b)| *b)
1234            .collect();
1235        blocks_to_load.sort_unstable();
1236        blocks_to_load.dedup();
1237
1238        // Load all needed blocks (this is where I/O happens)
1239        for &block_idx in &blocks_to_load {
1240            let _ = self.load_block(block_idx).await?;
1241        }
1242
1243        // Now search each key in its block (all blocks are cached)
1244        let mut results = vec![None; keys.len()];
1245        for (key_idx, block_idx) in key_to_block {
1246            if block_idx == usize::MAX {
1247                continue;
1248            }
1249            let block_data = self.load_block(block_idx).await?; // Will hit cache
1250            results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
1251        }
1252
1253        Ok(results)
1254    }
1255
1256    /// Preload all data blocks into memory
1257    ///
1258    /// Call this after open() to eliminate all I/O during subsequent lookups.
1259    /// Useful when the SSTable is small enough to fit in memory.
1260    pub async fn preload_all_blocks(&self) -> io::Result<()> {
1261        for block_idx in 0..self.block_index.len() {
1262            self.load_block(block_idx).await?;
1263        }
1264        Ok(())
1265    }
1266
1267    /// Prefetch all data blocks via a single bulk I/O operation.
1268    ///
1269    /// Reads the entire compressed data section in one call, then decompresses
1270    /// each block and populates the cache. This turns N individual reads into 1.
1271    /// Cache capacity is expanded to hold all blocks.
1272    pub async fn prefetch_all_data_bulk(&self) -> io::Result<()> {
1273        let num_blocks = self.block_index.len();
1274        if num_blocks == 0 {
1275            return Ok(());
1276        }
1277
1278        // Find total data extent
1279        let mut max_end: u64 = 0;
1280        for i in 0..num_blocks {
1281            if let Some(addr) = self.block_index.get_addr(i) {
1282                max_end = max_end.max(addr.offset + addr.length as u64);
1283            }
1284        }
1285
1286        // Single bulk read of entire data section
1287        let all_data = self.data_slice.read_bytes_range(0..max_end).await?;
1288        let buf = all_data.as_slice();
1289
1290        // Expand cache and decompress all blocks
1291        let mut cache = self.cache.write();
1292        cache.max_blocks = cache.max_blocks.max(num_blocks);
1293        for i in 0..num_blocks {
1294            let addr = self.block_index.get_addr(i).unwrap();
1295            if cache.get(addr.offset).is_some() {
1296                continue;
1297            }
1298            let compressed =
1299                &buf[addr.offset as usize..(addr.offset + addr.length as u64) as usize];
1300            let decompressed = if let Some(ref dict) = self.dictionary {
1301                crate::compression::decompress_with_dict(compressed, dict)?
1302            } else {
1303                crate::compression::decompress(compressed)?
1304            };
1305            cache.insert(addr.offset, Arc::from(decompressed));
1306        }
1307
1308        Ok(())
1309    }
1310
1311    /// Load a block (checks cache first, then loads from FileSlice)
1312    /// Uses dictionary decompression if dictionary is present
1313    async fn load_block(&self, block_idx: usize) -> io::Result<Arc<[u8]>> {
1314        let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
1315            io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
1316        })?;
1317
1318        // Check cache (write lock for LRU promotion on hit)
1319        {
1320            if let Some(block) = self.cache.write().get(addr.offset) {
1321                return Ok(block);
1322            }
1323        }
1324
1325        log::debug!(
1326            "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
1327            block_idx,
1328            addr.offset,
1329            addr.offset + addr.length as u64
1330        );
1331
1332        // Load from FileSlice
1333        let range = addr.byte_range();
1334        let compressed = self.data_slice.read_bytes_range(range).await?;
1335
1336        // Decompress with dictionary if available
1337        let decompressed = if let Some(ref dict) = self.dictionary {
1338            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1339        } else {
1340            crate::compression::decompress(compressed.as_slice())?
1341        };
1342
1343        let block: Arc<[u8]> = Arc::from(decompressed);
1344
1345        // Insert into cache
1346        {
1347            let mut cache = self.cache.write();
1348            cache.insert(addr.offset, Arc::clone(&block));
1349        }
1350
1351        Ok(block)
1352    }
1353
1354    fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
1355        let mut reader = block_data;
1356        let mut current_key = Vec::new();
1357
1358        while !reader.is_empty() {
1359            let common_prefix_len = read_vint(&mut reader)? as usize;
1360            let suffix_len = read_vint(&mut reader)? as usize;
1361
1362            current_key.truncate(common_prefix_len);
1363            let mut suffix = vec![0u8; suffix_len];
1364            reader.read_exact(&mut suffix)?;
1365            current_key.extend_from_slice(&suffix);
1366
1367            let value = V::deserialize(&mut reader)?;
1368
1369            match current_key.as_slice().cmp(target_key) {
1370                std::cmp::Ordering::Equal => return Ok(Some(value)),
1371                std::cmp::Ordering::Greater => return Ok(None),
1372                std::cmp::Ordering::Less => continue,
1373            }
1374        }
1375
1376        Ok(None)
1377    }
1378
1379    /// Prefetch blocks for a key range
1380    pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
1381        let start_block = self.block_index.locate(start_key).unwrap_or(0);
1382        let end_block = self
1383            .block_index
1384            .locate(end_key)
1385            .unwrap_or(self.block_index.len().saturating_sub(1));
1386
1387        for block_idx in start_block..=end_block.min(self.block_index.len().saturating_sub(1)) {
1388            let _ = self.load_block(block_idx).await?;
1389        }
1390
1391        Ok(())
1392    }
1393
1394    /// Iterate over all entries (loads blocks as needed)
1395    pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
1396        AsyncSSTableIterator::new(self)
1397    }
1398
1399    /// Get all entries as a vector (for merging)
1400    pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
1401        let mut results = Vec::new();
1402
1403        for block_idx in 0..self.block_index.len() {
1404            let block_data = self.load_block(block_idx).await?;
1405            let mut reader = &block_data[..];
1406            let mut current_key = Vec::new();
1407
1408            while !reader.is_empty() {
1409                let common_prefix_len = read_vint(&mut reader)? as usize;
1410                let suffix_len = read_vint(&mut reader)? as usize;
1411
1412                current_key.truncate(common_prefix_len);
1413                let mut suffix = vec![0u8; suffix_len];
1414                reader.read_exact(&mut suffix)?;
1415                current_key.extend_from_slice(&suffix);
1416
1417                let value = V::deserialize(&mut reader)?;
1418                results.push((current_key.clone(), value));
1419            }
1420        }
1421
1422        Ok(results)
1423    }
1424}
1425
1426/// Async iterator over SSTable entries
1427pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
1428    reader: &'a AsyncSSTableReader<V>,
1429    current_block: usize,
1430    block_data: Option<Arc<[u8]>>,
1431    block_offset: usize,
1432    current_key: Vec<u8>,
1433    finished: bool,
1434}
1435
1436impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
1437    fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
1438        Self {
1439            reader,
1440            current_block: 0,
1441            block_data: None,
1442            block_offset: 0,
1443            current_key: Vec::new(),
1444            finished: reader.block_index.is_empty(),
1445        }
1446    }
1447
1448    async fn load_next_block(&mut self) -> io::Result<bool> {
1449        if self.current_block >= self.reader.block_index.len() {
1450            self.finished = true;
1451            return Ok(false);
1452        }
1453
1454        self.block_data = Some(self.reader.load_block(self.current_block).await?);
1455        self.block_offset = 0;
1456        self.current_key.clear();
1457        self.current_block += 1;
1458        Ok(true)
1459    }
1460
1461    /// Advance to next entry (async)
1462    pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
1463        if self.finished {
1464            return Ok(None);
1465        }
1466
1467        if self.block_data.is_none() && !self.load_next_block().await? {
1468            return Ok(None);
1469        }
1470
1471        loop {
1472            let block = self.block_data.as_ref().unwrap();
1473            if self.block_offset >= block.len() {
1474                if !self.load_next_block().await? {
1475                    return Ok(None);
1476                }
1477                continue;
1478            }
1479
1480            let mut reader = &block[self.block_offset..];
1481            let start_len = reader.len();
1482
1483            let common_prefix_len = read_vint(&mut reader)? as usize;
1484            let suffix_len = read_vint(&mut reader)? as usize;
1485
1486            self.current_key.truncate(common_prefix_len);
1487            let mut suffix = vec![0u8; suffix_len];
1488            reader.read_exact(&mut suffix)?;
1489            self.current_key.extend_from_slice(&suffix);
1490
1491            let value = V::deserialize(&mut reader)?;
1492
1493            self.block_offset += start_len - reader.len();
1494
1495            return Ok(Some((self.current_key.clone(), value)));
1496        }
1497    }
1498}
1499
1500#[cfg(test)]
1501mod tests {
1502    use super::*;
1503
1504    #[test]
1505    fn test_bloom_filter_basic() {
1506        let mut bloom = BloomFilter::new(100, 10);
1507
1508        bloom.insert(b"hello");
1509        bloom.insert(b"world");
1510        bloom.insert(b"test");
1511
1512        assert!(bloom.may_contain(b"hello"));
1513        assert!(bloom.may_contain(b"world"));
1514        assert!(bloom.may_contain(b"test"));
1515
1516        // These should likely return false (with ~1% false positive rate)
1517        assert!(!bloom.may_contain(b"notfound"));
1518        assert!(!bloom.may_contain(b"missing"));
1519    }
1520
1521    #[test]
1522    fn test_bloom_filter_serialization() {
1523        let mut bloom = BloomFilter::new(100, 10);
1524        bloom.insert(b"key1");
1525        bloom.insert(b"key2");
1526
1527        let bytes = bloom.to_bytes();
1528        let restored = BloomFilter::from_owned_bytes(OwnedBytes::new(bytes)).unwrap();
1529
1530        assert!(restored.may_contain(b"key1"));
1531        assert!(restored.may_contain(b"key2"));
1532        assert!(!restored.may_contain(b"key3"));
1533    }
1534
1535    #[test]
1536    fn test_bloom_filter_false_positive_rate() {
1537        let num_keys = 10000;
1538        let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
1539
1540        // Insert keys
1541        for i in 0..num_keys {
1542            let key = format!("key_{}", i);
1543            bloom.insert(key.as_bytes());
1544        }
1545
1546        // All inserted keys should be found
1547        for i in 0..num_keys {
1548            let key = format!("key_{}", i);
1549            assert!(bloom.may_contain(key.as_bytes()));
1550        }
1551
1552        // Check false positive rate on non-existent keys
1553        let mut false_positives = 0;
1554        let test_count = 10000;
1555        for i in 0..test_count {
1556            let key = format!("nonexistent_{}", i);
1557            if bloom.may_contain(key.as_bytes()) {
1558                false_positives += 1;
1559            }
1560        }
1561
1562        // With 10 bits per key, expect ~1% false positive rate
1563        // Allow up to 3% due to hash function variance
1564        let fp_rate = false_positives as f64 / test_count as f64;
1565        assert!(
1566            fp_rate < 0.03,
1567            "False positive rate {} is too high",
1568            fp_rate
1569        );
1570    }
1571
1572    #[test]
1573    fn test_sstable_writer_config() {
1574        use crate::structures::IndexOptimization;
1575
1576        // Default = Adaptive
1577        let config = SSTableWriterConfig::default();
1578        assert_eq!(config.compression_level.0, 9); // BETTER
1579        assert!(!config.use_bloom_filter);
1580        assert!(!config.use_dictionary);
1581
1582        // Adaptive
1583        let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
1584        assert_eq!(adaptive.compression_level.0, 9);
1585        assert!(!adaptive.use_bloom_filter);
1586        assert!(!adaptive.use_dictionary);
1587
1588        // SizeOptimized
1589        let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
1590        assert_eq!(size.compression_level.0, 22); // MAX
1591        assert!(size.use_bloom_filter);
1592        assert!(size.use_dictionary);
1593
1594        // PerformanceOptimized
1595        let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
1596        assert_eq!(perf.compression_level.0, 1); // FAST
1597        assert!(perf.use_bloom_filter); // Bloom helps skip blocks fast
1598        assert!(!perf.use_dictionary);
1599
1600        // Aliases
1601        let fast = SSTableWriterConfig::fast();
1602        assert_eq!(fast.compression_level.0, 1);
1603
1604        let max = SSTableWriterConfig::max_compression();
1605        assert_eq!(max.compression_level.0, 22);
1606    }
1607
1608    #[test]
1609    fn test_vint_roundtrip() {
1610        let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
1611
1612        for &val in &test_values {
1613            let mut buf = Vec::new();
1614            write_vint(&mut buf, val).unwrap();
1615            let mut reader = buf.as_slice();
1616            let decoded = read_vint(&mut reader).unwrap();
1617            assert_eq!(val, decoded, "Failed for value {}", val);
1618        }
1619    }
1620
1621    #[test]
1622    fn test_common_prefix_len() {
1623        assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
1624        assert_eq!(common_prefix_len(b"hello", b"help"), 3);
1625        assert_eq!(common_prefix_len(b"hello", b"world"), 0);
1626        assert_eq!(common_prefix_len(b"", b"hello"), 0);
1627        assert_eq!(common_prefix_len(b"hello", b""), 0);
1628    }
1629}