Skip to main content

hermes_core/structures/
sstable.rs

1//! Async SSTable with lazy loading via FileSlice
2//!
3//! Memory-efficient design - only loads minimal metadata into memory,
4//! blocks are loaded on-demand.
5//!
6//! ## Key Features
7//!
8//! 1. **FST-based Block Index**: Uses Finite State Transducer for key lookup
9//!    - Can be mmap'd directly without parsing into heap-allocated structures
10//!    - ~90% memory reduction compared to Vec<BlockIndexEntry>
11//!
12//! 2. **Bitpacked Block Addresses**: Offsets and lengths stored with delta encoding
13//!    - Minimal memory footprint for block metadata
14//!
15//! 3. **Dictionary Compression**: Zstd dictionary for 15-30% better compression
16//!
17//! 4. **Configurable Compression Level**: Levels 1-22 for space/speed tradeoff
18//!
19//! 5. **Bloom Filter**: Fast negative lookups to skip unnecessary I/O
20
21use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
22use parking_lot::RwLock;
23use rustc_hash::FxHashMap;
24use std::io::{self, Read, Write};
25use std::sync::Arc;
26
27#[cfg(feature = "native")]
28use super::sstable_index::FstBlockIndex;
29use super::sstable_index::{BlockAddr, BlockIndex, MmapBlockIndex};
30use crate::compression::{CompressionDict, CompressionLevel};
31use crate::directories::{FileHandle, OwnedBytes};
32
33/// SSTable magic number - version 4 with memory-efficient index
34/// Uses FST-based or mmap'd block index to avoid heap allocation
35pub const SSTABLE_MAGIC: u32 = 0x53544234; // "STB4"
36
37/// Block size for SSTable (16KB default)
38pub const BLOCK_SIZE: usize = 16 * 1024;
39
40/// Default dictionary size (64KB)
41pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
42
43/// Bloom filter bits per key (10 bits ≈ 1% false positive rate)
44pub const BLOOM_BITS_PER_KEY: usize = 10;
45
46/// Bloom filter hash count (optimal for 10 bits/key)
47pub const BLOOM_HASH_COUNT: usize = 7;
48
49// ============================================================================
50// Bloom Filter Implementation
51// ============================================================================
52
53/// Simple bloom filter for key existence checks
54#[derive(Debug, Clone)]
55pub struct BloomFilter {
56    bits: BloomBits,
57    num_bits: usize,
58    num_hashes: usize,
59}
60
61/// Bloom filter storage — Vec for write path, OwnedBytes for zero-copy read path.
62#[derive(Debug, Clone)]
63enum BloomBits {
64    /// Mutable storage for building (SSTable writer)
65    Vec(Vec<u64>),
66    /// Zero-copy mmap reference for reading (raw LE u64 words, no header)
67    Bytes(OwnedBytes),
68}
69
70impl BloomBits {
71    #[inline]
72    fn len(&self) -> usize {
73        match self {
74            BloomBits::Vec(v) => v.len(),
75            BloomBits::Bytes(b) => b.len() / 8,
76        }
77    }
78
79    #[inline]
80    fn get(&self, word_idx: usize) -> u64 {
81        match self {
82            BloomBits::Vec(v) => v[word_idx],
83            BloomBits::Bytes(b) => {
84                let off = word_idx * 8;
85                u64::from_le_bytes([
86                    b[off],
87                    b[off + 1],
88                    b[off + 2],
89                    b[off + 3],
90                    b[off + 4],
91                    b[off + 5],
92                    b[off + 6],
93                    b[off + 7],
94                ])
95            }
96        }
97    }
98
99    #[inline]
100    fn set_bit(&mut self, word_idx: usize, bit_idx: usize) {
101        match self {
102            BloomBits::Vec(v) => v[word_idx] |= 1u64 << bit_idx,
103            BloomBits::Bytes(_) => panic!("cannot mutate read-only bloom filter"),
104        }
105    }
106
107    fn size_bytes(&self) -> usize {
108        match self {
109            BloomBits::Vec(v) => v.len() * 8,
110            BloomBits::Bytes(b) => b.len(),
111        }
112    }
113}
114
115impl BloomFilter {
116    /// Create a new bloom filter sized for expected number of keys
117    pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
118        let num_bits = (expected_keys * bits_per_key).max(64);
119        let num_words = num_bits.div_ceil(64);
120        Self {
121            bits: BloomBits::Vec(vec![0u64; num_words]),
122            num_bits,
123            num_hashes: BLOOM_HASH_COUNT,
124        }
125    }
126
127    /// Create from serialized bytes into a mutable Vec (for building/mutation).
128    /// Unlike `from_owned_bytes`, this copies data into a `Vec<u64>` so that
129    /// `insert()` works. Used by the primary-key bloom cache.
130    pub fn from_bytes_mutable(data: &[u8]) -> io::Result<Self> {
131        if data.len() < 12 {
132            return Err(io::Error::new(
133                io::ErrorKind::InvalidData,
134                "Bloom filter data too short",
135            ));
136        }
137        let num_bits = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
138        let num_hashes = u32::from_le_bytes([data[4], data[5], data[6], data[7]]) as usize;
139        let num_words = u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize;
140
141        if data.len() < 12 + num_words * 8 {
142            return Err(io::Error::new(
143                io::ErrorKind::InvalidData,
144                "Bloom filter data truncated",
145            ));
146        }
147
148        let mut vec = vec![0u64; num_words];
149        for (i, v) in vec.iter_mut().enumerate() {
150            let off = 12 + i * 8;
151            *v = u64::from_le_bytes(data[off..off + 8].try_into().unwrap());
152        }
153
154        Ok(Self {
155            bits: BloomBits::Vec(vec),
156            num_bits,
157            num_hashes,
158        })
159    }
160
161    /// Create from serialized OwnedBytes (zero-copy for mmap)
162    pub fn from_owned_bytes(data: OwnedBytes) -> io::Result<Self> {
163        if data.len() < 12 {
164            return Err(io::Error::new(
165                io::ErrorKind::InvalidData,
166                "Bloom filter data too short",
167            ));
168        }
169        let d = data.as_slice();
170        let num_bits = u32::from_le_bytes([d[0], d[1], d[2], d[3]]) as usize;
171        let num_hashes = u32::from_le_bytes([d[4], d[5], d[6], d[7]]) as usize;
172        let num_words = u32::from_le_bytes([d[8], d[9], d[10], d[11]]) as usize;
173
174        if d.len() < 12 + num_words * 8 {
175            return Err(io::Error::new(
176                io::ErrorKind::InvalidData,
177                "Bloom filter data truncated",
178            ));
179        }
180
181        // Slice past the 12-byte header to get raw u64 LE words (zero-copy)
182        let bits_bytes = data.slice(12..12 + num_words * 8);
183
184        Ok(Self {
185            bits: BloomBits::Bytes(bits_bytes),
186            num_bits,
187            num_hashes,
188        })
189    }
190
191    /// Serialize to bytes
192    pub fn to_bytes(&self) -> Vec<u8> {
193        let num_words = self.bits.len();
194        let mut data = Vec::with_capacity(12 + num_words * 8);
195        data.write_u32::<LittleEndian>(self.num_bits as u32)
196            .unwrap();
197        data.write_u32::<LittleEndian>(self.num_hashes as u32)
198            .unwrap();
199        data.write_u32::<LittleEndian>(num_words as u32).unwrap();
200        for i in 0..num_words {
201            data.write_u64::<LittleEndian>(self.bits.get(i)).unwrap();
202        }
203        data
204    }
205
206    /// Add a key to the filter
207    pub fn insert(&mut self, key: &[u8]) {
208        let (h1, h2) = self.hash_pair(key);
209        for i in 0..self.num_hashes {
210            let bit_pos = self.get_bit_pos(h1, h2, i);
211            let word_idx = bit_pos / 64;
212            let bit_idx = bit_pos % 64;
213            if word_idx < self.bits.len() {
214                self.bits.set_bit(word_idx, bit_idx);
215            }
216        }
217    }
218
219    /// Check if a key might be in the filter
220    /// Returns false if definitely not present, true if possibly present
221    pub fn may_contain(&self, key: &[u8]) -> bool {
222        let (h1, h2) = self.hash_pair(key);
223        for i in 0..self.num_hashes {
224            let bit_pos = self.get_bit_pos(h1, h2, i);
225            let word_idx = bit_pos / 64;
226            let bit_idx = bit_pos % 64;
227            if word_idx >= self.bits.len() || (self.bits.get(word_idx) & (1u64 << bit_idx)) == 0 {
228                return false;
229            }
230        }
231        true
232    }
233
234    /// Size in bytes
235    pub fn size_bytes(&self) -> usize {
236        12 + self.bits.size_bytes()
237    }
238
239    /// Insert a pre-computed hash pair into the filter
240    pub fn insert_hashed(&mut self, h1: u64, h2: u64) {
241        for i in 0..self.num_hashes {
242            let bit_pos = self.get_bit_pos(h1, h2, i);
243            let word_idx = bit_pos / 64;
244            let bit_idx = bit_pos % 64;
245            if word_idx < self.bits.len() {
246                self.bits.set_bit(word_idx, bit_idx);
247            }
248        }
249    }
250
251    /// Compute two hash values using FNV-1a variant (single pass over key bytes)
252    #[inline]
253    fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
254        let mut h1: u64 = 0xcbf29ce484222325;
255        let mut h2: u64 = 0x84222325cbf29ce4;
256        for &byte in key {
257            h1 ^= byte as u64;
258            h1 = h1.wrapping_mul(0x100000001b3);
259            h2 = h2.wrapping_mul(0x100000001b3);
260            h2 ^= byte as u64;
261        }
262        (h1, h2)
263    }
264
265    /// Get bit position for hash iteration i using double hashing
266    #[inline]
267    fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
268        (h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
269    }
270}
271
272/// Compute bloom filter hash pair for a key (standalone, no BloomFilter needed).
273/// Uses the same FNV-1a double-hashing as BloomFilter::hash_pair (single pass).
274#[inline]
275fn bloom_hash_pair(key: &[u8]) -> (u64, u64) {
276    let mut h1: u64 = 0xcbf29ce484222325;
277    let mut h2: u64 = 0x84222325cbf29ce4;
278    for &byte in key {
279        h1 ^= byte as u64;
280        h1 = h1.wrapping_mul(0x100000001b3);
281        h2 = h2.wrapping_mul(0x100000001b3);
282        h2 ^= byte as u64;
283    }
284    (h1, h2)
285}
286
287/// SSTable value trait
288pub trait SSTableValue: Clone + Send + Sync {
289    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
290    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
291}
292
293/// u64 value implementation
294impl SSTableValue for u64 {
295    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
296        write_vint(writer, *self)
297    }
298
299    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
300        read_vint(reader)
301    }
302}
303
304/// Vec<u8> value implementation
305impl SSTableValue for Vec<u8> {
306    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
307        write_vint(writer, self.len() as u64)?;
308        writer.write_all(self)
309    }
310
311    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
312        let len = read_vint(reader)? as usize;
313        let mut data = vec![0u8; len];
314        reader.read_exact(&mut data)?;
315        Ok(data)
316    }
317}
318
319/// Sparse dimension info for SSTable-based sparse index
320/// Stores offset and length for posting list lookup
321#[derive(Debug, Clone, Copy, PartialEq, Eq)]
322pub struct SparseDimInfo {
323    /// Offset in sparse file where posting list starts
324    pub offset: u64,
325    /// Length of serialized posting list
326    pub length: u32,
327}
328
329impl SparseDimInfo {
330    pub fn new(offset: u64, length: u32) -> Self {
331        Self { offset, length }
332    }
333}
334
335impl SSTableValue for SparseDimInfo {
336    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
337        write_vint(writer, self.offset)?;
338        write_vint(writer, self.length as u64)
339    }
340
341    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
342        let offset = read_vint(reader)?;
343        let length = read_vint(reader)? as u32;
344        Ok(Self { offset, length })
345    }
346}
347
348/// Maximum number of postings that can be inlined in TermInfo
349pub const MAX_INLINE_POSTINGS: usize = 3;
350
351/// Term info for posting list references
352///
353/// Supports two modes:
354/// - **Inline**: Small posting lists (1-3 docs) stored directly in TermInfo
355/// - **External**: Larger posting lists stored in separate .post file
356///
357/// This eliminates a separate I/O read for rare/unique terms.
358#[derive(Debug, Clone, PartialEq, Eq)]
359pub enum TermInfo {
360    /// Small posting list inlined directly (up to MAX_INLINE_POSTINGS entries)
361    /// Each entry is (doc_id, term_freq) delta-encoded
362    Inline {
363        /// Number of postings (1-3)
364        doc_freq: u8,
365        /// Inline data: delta-encoded (doc_id, term_freq) pairs
366        /// Format: [delta_doc_id, term_freq, delta_doc_id, term_freq, ...]
367        data: [u8; 16],
368        /// Actual length of data used
369        data_len: u8,
370    },
371    /// Reference to external posting list in .post file
372    External {
373        posting_offset: u64,
374        posting_len: u64,
375        doc_freq: u32,
376        /// Position data offset (0 if no positions)
377        position_offset: u64,
378        /// Position data length (0 if no positions)
379        position_len: u64,
380    },
381}
382
383impl TermInfo {
384    /// Create an external reference
385    pub fn external(posting_offset: u64, posting_len: u64, doc_freq: u32) -> Self {
386        TermInfo::External {
387            posting_offset,
388            posting_len,
389            doc_freq,
390            position_offset: 0,
391            position_len: 0,
392        }
393    }
394
395    /// Create an external reference with position info
396    pub fn external_with_positions(
397        posting_offset: u64,
398        posting_len: u64,
399        doc_freq: u32,
400        position_offset: u64,
401        position_len: u64,
402    ) -> Self {
403        TermInfo::External {
404            posting_offset,
405            posting_len,
406            doc_freq,
407            position_offset,
408            position_len,
409        }
410    }
411
412    /// Try to create an inline TermInfo from posting data
413    /// Returns None if posting list is too large to inline
414    pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
415        if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
416            return None;
417        }
418
419        let mut data = [0u8; 16];
420        let mut cursor = std::io::Cursor::new(&mut data[..]);
421        let mut prev_doc_id = 0u32;
422
423        for (i, &doc_id) in doc_ids.iter().enumerate() {
424            let delta = doc_id - prev_doc_id;
425            if write_vint(&mut cursor, delta as u64).is_err() {
426                return None;
427            }
428            if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
429                return None;
430            }
431            prev_doc_id = doc_id;
432        }
433
434        let data_len = cursor.position() as u8;
435        if data_len > 16 {
436            return None;
437        }
438
439        Some(TermInfo::Inline {
440            doc_freq: doc_ids.len() as u8,
441            data,
442            data_len,
443        })
444    }
445
446    /// Try to create an inline TermInfo from an iterator of (doc_id, term_freq) pairs.
447    /// Zero-allocation alternative to `try_inline` — avoids collecting into Vec<u32>.
448    /// `count` is the number of postings (must match iterator length).
449    pub fn try_inline_iter(count: usize, iter: impl Iterator<Item = (u32, u32)>) -> Option<Self> {
450        if count > MAX_INLINE_POSTINGS || count == 0 {
451            return None;
452        }
453
454        let mut data = [0u8; 16];
455        let mut cursor = std::io::Cursor::new(&mut data[..]);
456        let mut prev_doc_id = 0u32;
457
458        for (doc_id, tf) in iter {
459            let delta = doc_id - prev_doc_id;
460            if write_vint(&mut cursor, delta as u64).is_err() {
461                return None;
462            }
463            if write_vint(&mut cursor, tf as u64).is_err() {
464                return None;
465            }
466            prev_doc_id = doc_id;
467        }
468
469        let data_len = cursor.position() as u8;
470
471        Some(TermInfo::Inline {
472            doc_freq: count as u8,
473            data,
474            data_len,
475        })
476    }
477
478    /// Get document frequency
479    pub fn doc_freq(&self) -> u32 {
480        match self {
481            TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
482            TermInfo::External { doc_freq, .. } => *doc_freq,
483        }
484    }
485
486    /// Check if this is an inline posting list
487    pub fn is_inline(&self) -> bool {
488        matches!(self, TermInfo::Inline { .. })
489    }
490
491    /// Get external posting info (offset, len) - returns None for inline
492    pub fn external_info(&self) -> Option<(u64, u64)> {
493        match self {
494            TermInfo::External {
495                posting_offset,
496                posting_len,
497                ..
498            } => Some((*posting_offset, *posting_len)),
499            TermInfo::Inline { .. } => None,
500        }
501    }
502
503    /// Get position info (offset, len) - returns None for inline or if no positions
504    pub fn position_info(&self) -> Option<(u64, u64)> {
505        match self {
506            TermInfo::External {
507                position_offset,
508                position_len,
509                ..
510            } if *position_len > 0 => Some((*position_offset, *position_len)),
511            _ => None,
512        }
513    }
514
515    /// Decode inline postings into (doc_ids, term_freqs)
516    /// Returns None if this is an external reference
517    pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
518        match self {
519            TermInfo::Inline {
520                doc_freq,
521                data,
522                data_len,
523            } => {
524                let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
525                let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
526                let mut reader = &data[..*data_len as usize];
527                let mut prev_doc_id = 0u32;
528
529                for _ in 0..*doc_freq {
530                    let delta = read_vint(&mut reader).ok()? as u32;
531                    let tf = read_vint(&mut reader).ok()? as u32;
532                    let doc_id = prev_doc_id + delta;
533                    doc_ids.push(doc_id);
534                    term_freqs.push(tf);
535                    prev_doc_id = doc_id;
536                }
537
538                Some((doc_ids, term_freqs))
539            }
540            TermInfo::External { .. } => None,
541        }
542    }
543}
544
545impl SSTableValue for TermInfo {
546    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
547        match self {
548            TermInfo::Inline {
549                doc_freq,
550                data,
551                data_len,
552            } => {
553                // Tag byte 0xFF = inline marker
554                writer.write_u8(0xFF)?;
555                writer.write_u8(*doc_freq)?;
556                writer.write_u8(*data_len)?;
557                writer.write_all(&data[..*data_len as usize])?;
558            }
559            TermInfo::External {
560                posting_offset,
561                posting_len,
562                doc_freq,
563                position_offset,
564                position_len,
565            } => {
566                // Tag byte 0x00 = external marker (no positions)
567                // Tag byte 0x01 = external with positions
568                if *position_len > 0 {
569                    writer.write_u8(0x01)?;
570                    write_vint(writer, *doc_freq as u64)?;
571                    write_vint(writer, *posting_offset)?;
572                    write_vint(writer, *posting_len)?;
573                    write_vint(writer, *position_offset)?;
574                    write_vint(writer, *position_len)?;
575                } else {
576                    writer.write_u8(0x00)?;
577                    write_vint(writer, *doc_freq as u64)?;
578                    write_vint(writer, *posting_offset)?;
579                    write_vint(writer, *posting_len)?;
580                }
581            }
582        }
583        Ok(())
584    }
585
586    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
587        let tag = reader.read_u8()?;
588
589        if tag == 0xFF {
590            // Inline
591            let doc_freq = reader.read_u8()?;
592            let data_len = reader.read_u8()?;
593            let mut data = [0u8; 16];
594            reader.read_exact(&mut data[..data_len as usize])?;
595            Ok(TermInfo::Inline {
596                doc_freq,
597                data,
598                data_len,
599            })
600        } else if tag == 0x00 {
601            // External (no positions)
602            let doc_freq = read_vint(reader)? as u32;
603            let posting_offset = read_vint(reader)?;
604            let posting_len = read_vint(reader)?;
605            Ok(TermInfo::External {
606                posting_offset,
607                posting_len,
608                doc_freq,
609                position_offset: 0,
610                position_len: 0,
611            })
612        } else if tag == 0x01 {
613            // External with positions
614            let doc_freq = read_vint(reader)? as u32;
615            let posting_offset = read_vint(reader)?;
616            let posting_len = read_vint(reader)?;
617            let position_offset = read_vint(reader)?;
618            let position_len = read_vint(reader)?;
619            Ok(TermInfo::External {
620                posting_offset,
621                posting_len,
622                doc_freq,
623                position_offset,
624                position_len,
625            })
626        } else {
627            Err(io::Error::new(
628                io::ErrorKind::InvalidData,
629                format!("Invalid TermInfo tag: {}", tag),
630            ))
631        }
632    }
633}
634
635/// Write variable-length integer
636pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
637    loop {
638        let byte = (value & 0x7F) as u8;
639        value >>= 7;
640        if value == 0 {
641            writer.write_u8(byte)?;
642            return Ok(());
643        } else {
644            writer.write_u8(byte | 0x80)?;
645        }
646    }
647}
648
649/// Read variable-length integer
650pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
651    let mut result = 0u64;
652    let mut shift = 0;
653
654    loop {
655        let byte = reader.read_u8()?;
656        result |= ((byte & 0x7F) as u64) << shift;
657        if byte & 0x80 == 0 {
658            return Ok(result);
659        }
660        shift += 7;
661        if shift >= 64 {
662            return Err(io::Error::new(
663                io::ErrorKind::InvalidData,
664                "varint too long",
665            ));
666        }
667    }
668}
669
670/// Compute common prefix length
671pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
672    a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
673}
674
675/// SSTable statistics for debugging
676#[derive(Debug, Clone)]
677pub struct SSTableStats {
678    pub num_blocks: usize,
679    pub num_sparse_entries: usize,
680    pub num_entries: u64,
681    pub has_bloom_filter: bool,
682    pub has_dictionary: bool,
683    pub bloom_filter_size: usize,
684    pub dictionary_size: usize,
685}
686
687/// SSTable writer configuration
688#[derive(Debug, Clone)]
689pub struct SSTableWriterConfig {
690    /// Compression level (1-22, higher = better compression but slower)
691    pub compression_level: CompressionLevel,
692    /// Whether to train and use a dictionary for compression
693    pub use_dictionary: bool,
694    /// Dictionary size in bytes (default 64KB)
695    pub dict_size: usize,
696    /// Whether to build a bloom filter
697    pub use_bloom_filter: bool,
698    /// Bloom filter bits per key (default 10 = ~1% false positive rate)
699    pub bloom_bits_per_key: usize,
700}
701
702impl Default for SSTableWriterConfig {
703    fn default() -> Self {
704        Self::from_optimization(crate::structures::IndexOptimization::default())
705    }
706}
707
708impl SSTableWriterConfig {
709    /// Create config from IndexOptimization mode
710    pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
711        use crate::structures::IndexOptimization;
712        match optimization {
713            IndexOptimization::Adaptive => Self {
714                compression_level: CompressionLevel::BETTER, // Level 9
715                use_dictionary: false,
716                dict_size: DEFAULT_DICT_SIZE,
717                use_bloom_filter: true, // Bloom is cheap (~1.25 B/key) and avoids needless block reads
718                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
719            },
720            IndexOptimization::SizeOptimized => Self {
721                compression_level: CompressionLevel::MAX, // Level 22
722                use_dictionary: true,
723                dict_size: DEFAULT_DICT_SIZE,
724                use_bloom_filter: true,
725                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
726            },
727            IndexOptimization::PerformanceOptimized => Self {
728                compression_level: CompressionLevel::FAST, // Level 1
729                use_dictionary: false,
730                dict_size: DEFAULT_DICT_SIZE,
731                use_bloom_filter: true, // Bloom helps skip blocks fast
732                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
733            },
734        }
735    }
736
737    /// Fast configuration - prioritize write speed over compression
738    pub fn fast() -> Self {
739        Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
740    }
741
742    /// Maximum compression configuration - prioritize size over speed
743    pub fn max_compression() -> Self {
744        Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
745    }
746}
747
748/// SSTable writer with optimizations:
749/// - Dictionary compression for blocks (if dictionary provided)
750/// - Configurable compression level
751/// - Block index prefix compression
752/// - Bloom filter for fast negative lookups
753pub struct SSTableWriter<W: Write, V: SSTableValue> {
754    writer: W,
755    block_buffer: Vec<u8>,
756    prev_key: Vec<u8>,
757    index: Vec<BlockIndexEntry>,
758    current_offset: u64,
759    num_entries: u64,
760    block_first_key: Option<Vec<u8>>,
761    config: SSTableWriterConfig,
762    /// Pre-trained dictionary for compression (optional)
763    dictionary: Option<CompressionDict>,
764    /// Bloom filter key hashes — compact (u64, u64) pairs instead of full keys.
765    /// Filter is built at finish() time with correct sizing.
766    bloom_hashes: Vec<(u64, u64)>,
767    _phantom: std::marker::PhantomData<V>,
768}
769
770impl<W: Write, V: SSTableValue> SSTableWriter<W, V> {
771    /// Create a new SSTable writer with default configuration
772    pub fn new(writer: W) -> Self {
773        Self::with_config(writer, SSTableWriterConfig::default())
774    }
775
776    /// Create a new SSTable writer with custom configuration
777    pub fn with_config(writer: W, config: SSTableWriterConfig) -> Self {
778        Self {
779            writer,
780            block_buffer: Vec::with_capacity(BLOCK_SIZE),
781            prev_key: Vec::new(),
782            index: Vec::new(),
783            current_offset: 0,
784            num_entries: 0,
785            block_first_key: None,
786            config,
787            dictionary: None,
788            bloom_hashes: Vec::new(),
789            _phantom: std::marker::PhantomData,
790        }
791    }
792
793    /// Create a new SSTable writer with a pre-trained dictionary
794    pub fn with_dictionary(
795        writer: W,
796        config: SSTableWriterConfig,
797        dictionary: CompressionDict,
798    ) -> Self {
799        Self {
800            writer,
801            block_buffer: Vec::with_capacity(BLOCK_SIZE),
802            prev_key: Vec::new(),
803            index: Vec::new(),
804            current_offset: 0,
805            num_entries: 0,
806            block_first_key: None,
807            config,
808            dictionary: Some(dictionary),
809            bloom_hashes: Vec::new(),
810            _phantom: std::marker::PhantomData,
811        }
812    }
813
814    pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
815        if self.block_first_key.is_none() {
816            self.block_first_key = Some(key.to_vec());
817        }
818
819        // Store compact hash pair for bloom filter (16 bytes vs ~48+ per key)
820        if self.config.use_bloom_filter {
821            self.bloom_hashes.push(bloom_hash_pair(key));
822        }
823
824        let prefix_len = common_prefix_len(&self.prev_key, key);
825        let suffix = &key[prefix_len..];
826
827        write_vint(&mut self.block_buffer, prefix_len as u64)?;
828        write_vint(&mut self.block_buffer, suffix.len() as u64)?;
829        self.block_buffer.extend_from_slice(suffix);
830        value.serialize(&mut self.block_buffer)?;
831
832        self.prev_key.clear();
833        self.prev_key.extend_from_slice(key);
834        self.num_entries += 1;
835
836        if self.block_buffer.len() >= BLOCK_SIZE {
837            self.flush_block()?;
838        }
839
840        Ok(())
841    }
842
843    /// Flush and compress the current block
844    fn flush_block(&mut self) -> io::Result<()> {
845        if self.block_buffer.is_empty() {
846            return Ok(());
847        }
848
849        // Compress block with dictionary if available
850        let compressed = if let Some(ref dict) = self.dictionary {
851            crate::compression::compress_with_dict(
852                &self.block_buffer,
853                self.config.compression_level,
854                dict,
855            )?
856        } else {
857            crate::compression::compress(&self.block_buffer, self.config.compression_level)?
858        };
859
860        if let Some(first_key) = self.block_first_key.take() {
861            self.index.push(BlockIndexEntry {
862                first_key,
863                offset: self.current_offset,
864                length: compressed.len() as u32,
865            });
866        }
867
868        self.writer.write_all(&compressed)?;
869        self.current_offset += compressed.len() as u64;
870        self.block_buffer.clear();
871        self.prev_key.clear();
872
873        Ok(())
874    }
875
876    pub fn finish(mut self) -> io::Result<W> {
877        // Flush any remaining data
878        self.flush_block()?;
879
880        // Build bloom filter from collected hashes (properly sized)
881        let bloom_filter = if self.config.use_bloom_filter && !self.bloom_hashes.is_empty() {
882            let mut bloom =
883                BloomFilter::new(self.bloom_hashes.len(), self.config.bloom_bits_per_key);
884            for (h1, h2) in &self.bloom_hashes {
885                bloom.insert_hashed(*h1, *h2);
886            }
887            Some(bloom)
888        } else {
889            None
890        };
891
892        let data_end_offset = self.current_offset;
893
894        // Build memory-efficient block index
895        // Convert to (key, BlockAddr) pairs for the new index format
896        let entries: Vec<(Vec<u8>, BlockAddr)> = self
897            .index
898            .iter()
899            .map(|e| {
900                (
901                    e.first_key.clone(),
902                    BlockAddr {
903                        offset: e.offset,
904                        length: e.length,
905                    },
906                )
907            })
908            .collect();
909
910        // Build FST-based index if native feature is enabled, otherwise use mmap index
911        #[cfg(feature = "native")]
912        let index_bytes = FstBlockIndex::build(&entries)?;
913        #[cfg(not(feature = "native"))]
914        let index_bytes = MmapBlockIndex::build(&entries)?;
915
916        // Write index bytes with length prefix
917        self.writer
918            .write_u32::<LittleEndian>(index_bytes.len() as u32)?;
919        self.writer.write_all(&index_bytes)?;
920        self.current_offset += 4 + index_bytes.len() as u64;
921
922        // Write bloom filter if present
923        let bloom_offset = if let Some(ref bloom) = bloom_filter {
924            let bloom_data = bloom.to_bytes();
925            let offset = self.current_offset;
926            self.writer.write_all(&bloom_data)?;
927            self.current_offset += bloom_data.len() as u64;
928            offset
929        } else {
930            0
931        };
932
933        // Write dictionary if present
934        let dict_offset = if let Some(ref dict) = self.dictionary {
935            let dict_bytes = dict.as_bytes();
936            let offset = self.current_offset;
937            self.writer
938                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
939            self.writer.write_all(dict_bytes)?;
940            self.current_offset += 4 + dict_bytes.len() as u64;
941            offset
942        } else {
943            0
944        };
945
946        // Write extended footer
947        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
948        self.writer.write_u64::<LittleEndian>(self.num_entries)?;
949        self.writer.write_u64::<LittleEndian>(bloom_offset)?; // 0 if no bloom
950        self.writer.write_u64::<LittleEndian>(dict_offset)?; // 0 if no dict
951        self.writer
952            .write_u8(self.config.compression_level.0 as u8)?;
953        self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
954
955        Ok(self.writer)
956    }
957}
958
959/// Block index entry
960#[derive(Debug, Clone)]
961struct BlockIndexEntry {
962    first_key: Vec<u8>,
963    offset: u64,
964    length: u32,
965}
966
967/// Async SSTable reader - loads blocks on demand via FileHandle
968///
969/// Memory-efficient design:
970/// - Block index uses FST (native) or mmap'd raw bytes - no heap allocation for keys
971/// - Block addresses stored in bitpacked format
972/// - Bloom filter and dictionary optional
973pub struct AsyncSSTableReader<V: SSTableValue> {
974    /// FileHandle for the data portion (blocks only) - fetches ranges on demand
975    data_slice: FileHandle,
976    /// Memory-efficient block index (FST or mmap)
977    block_index: BlockIndex,
978    num_entries: u64,
979    /// Hot cache for decompressed blocks
980    cache: RwLock<BlockCache>,
981    /// Bloom filter for fast negative lookups (optional)
982    bloom_filter: Option<BloomFilter>,
983    /// Compression dictionary (optional)
984    dictionary: Option<CompressionDict>,
985    /// Compression level used
986    #[allow(dead_code)]
987    compression_level: CompressionLevel,
988    _phantom: std::marker::PhantomData<V>,
989}
990
991/// LRU block cache — O(1) lookup/insert, amortized O(n) promotion.
992///
993/// On `get()`, promotes the accessed entry to MRU position.
994/// On eviction, removes the LRU entry (front of VecDeque).
995/// For typical cache sizes (16-64 blocks), the linear scan in
996/// `promote()` is negligible compared to I/O savings.
997struct BlockCache {
998    blocks: FxHashMap<u64, Arc<[u8]>>,
999    lru_order: std::collections::VecDeque<u64>,
1000    max_blocks: usize,
1001}
1002
1003impl BlockCache {
1004    fn new(max_blocks: usize) -> Self {
1005        Self {
1006            blocks: FxHashMap::default(),
1007            lru_order: std::collections::VecDeque::with_capacity(max_blocks),
1008            max_blocks,
1009        }
1010    }
1011
1012    fn get(&mut self, offset: u64) -> Option<Arc<[u8]>> {
1013        if self.blocks.contains_key(&offset) {
1014            self.promote(offset);
1015            self.blocks.get(&offset).map(Arc::clone)
1016        } else {
1017            None
1018        }
1019    }
1020
1021    /// Read-only cache probe — no LRU promotion, safe behind a read lock.
1022    fn peek(&self, offset: u64) -> Option<Arc<[u8]>> {
1023        self.blocks.get(&offset).map(Arc::clone)
1024    }
1025
1026    fn insert(&mut self, offset: u64, block: Arc<[u8]>) {
1027        if self.blocks.contains_key(&offset) {
1028            self.promote(offset);
1029            return;
1030        }
1031        while self.blocks.len() >= self.max_blocks {
1032            if let Some(evict_offset) = self.lru_order.pop_front() {
1033                self.blocks.remove(&evict_offset);
1034            } else {
1035                break;
1036            }
1037        }
1038        self.blocks.insert(offset, block);
1039        self.lru_order.push_back(offset);
1040    }
1041
1042    /// Move entry to MRU position (back of deque)
1043    fn promote(&mut self, offset: u64) {
1044        if let Some(pos) = self.lru_order.iter().position(|&k| k == offset) {
1045            self.lru_order.remove(pos);
1046            self.lru_order.push_back(offset);
1047        }
1048    }
1049}
1050
1051impl<V: SSTableValue> AsyncSSTableReader<V> {
1052    /// Open an SSTable from a FileHandle
1053    /// Only loads the footer and index into memory, data blocks fetched on-demand
1054    ///
1055    /// Uses FST-based (native) or mmap'd block index (no heap allocation for keys)
1056    pub async fn open(file_handle: FileHandle, cache_blocks: usize) -> io::Result<Self> {
1057        let file_len = file_handle.len();
1058        if file_len < 37 {
1059            return Err(io::Error::new(
1060                io::ErrorKind::InvalidData,
1061                "SSTable too small",
1062            ));
1063        }
1064
1065        // Read footer (37 bytes)
1066        // Format: data_end(8) + num_entries(8) + bloom_offset(8) + dict_offset(8) + compression_level(1) + magic(4)
1067        let footer_bytes = file_handle
1068            .read_bytes_range(file_len - 37..file_len)
1069            .await?;
1070
1071        let mut reader = footer_bytes.as_slice();
1072        let data_end_offset = reader.read_u64::<LittleEndian>()?;
1073        let num_entries = reader.read_u64::<LittleEndian>()?;
1074        let bloom_offset = reader.read_u64::<LittleEndian>()?;
1075        let dict_offset = reader.read_u64::<LittleEndian>()?;
1076        let compression_level = CompressionLevel(reader.read_u8()? as i32);
1077        let magic = reader.read_u32::<LittleEndian>()?;
1078
1079        if magic != SSTABLE_MAGIC {
1080            return Err(io::Error::new(
1081                io::ErrorKind::InvalidData,
1082                format!("Invalid SSTable magic: 0x{:08X}", magic),
1083            ));
1084        }
1085
1086        // Read index section
1087        let index_start = data_end_offset;
1088        let index_end = file_len - 37;
1089        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
1090
1091        // Parse block index (length-prefixed FST or mmap index)
1092        let mut idx_reader = index_bytes.as_slice();
1093        let index_len = idx_reader.read_u32::<LittleEndian>()? as usize;
1094
1095        if index_len > idx_reader.len() {
1096            return Err(io::Error::new(
1097                io::ErrorKind::InvalidData,
1098                "Index data truncated",
1099            ));
1100        }
1101
1102        let index_data = index_bytes.slice(4..4 + index_len);
1103
1104        // Try FST first (native), fall back to mmap
1105        #[cfg(feature = "native")]
1106        let block_index = match FstBlockIndex::load(index_data.clone()) {
1107            Ok(fst_idx) => BlockIndex::Fst(fst_idx),
1108            Err(_) => BlockIndex::Mmap(MmapBlockIndex::load(index_data)?),
1109        };
1110        #[cfg(not(feature = "native"))]
1111        let block_index = BlockIndex::Mmap(MmapBlockIndex::load(index_data)?);
1112
1113        // Load bloom filter if present
1114        let bloom_filter = if bloom_offset > 0 {
1115            let bloom_start = bloom_offset;
1116            // Read bloom filter size first (12 bytes header)
1117            let bloom_header = file_handle
1118                .read_bytes_range(bloom_start..bloom_start + 12)
1119                .await?;
1120            let num_words = u32::from_le_bytes([
1121                bloom_header[8],
1122                bloom_header[9],
1123                bloom_header[10],
1124                bloom_header[11],
1125            ]) as u64;
1126            let bloom_size = 12 + num_words * 8;
1127            let bloom_data = file_handle
1128                .read_bytes_range(bloom_start..bloom_start + bloom_size)
1129                .await?;
1130            Some(BloomFilter::from_owned_bytes(bloom_data)?)
1131        } else {
1132            None
1133        };
1134
1135        // Load dictionary if present
1136        let dictionary = if dict_offset > 0 {
1137            let dict_start = dict_offset;
1138            // Read dictionary size first
1139            let dict_len_bytes = file_handle
1140                .read_bytes_range(dict_start..dict_start + 4)
1141                .await?;
1142            let dict_len = u32::from_le_bytes([
1143                dict_len_bytes[0],
1144                dict_len_bytes[1],
1145                dict_len_bytes[2],
1146                dict_len_bytes[3],
1147            ]) as u64;
1148            let dict_data = file_handle
1149                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
1150                .await?;
1151            Some(CompressionDict::from_owned_bytes(dict_data))
1152        } else {
1153            None
1154        };
1155
1156        // Create a lazy slice for just the data portion
1157        let data_slice = file_handle.slice(0..data_end_offset);
1158
1159        Ok(Self {
1160            data_slice,
1161            block_index,
1162            num_entries,
1163            cache: RwLock::new(BlockCache::new(cache_blocks)),
1164            bloom_filter,
1165            dictionary,
1166            compression_level,
1167            _phantom: std::marker::PhantomData,
1168        })
1169    }
1170
1171    /// Number of entries
1172    pub fn num_entries(&self) -> u64 {
1173        self.num_entries
1174    }
1175
1176    /// Get stats about this SSTable for debugging
1177    pub fn stats(&self) -> SSTableStats {
1178        SSTableStats {
1179            num_blocks: self.block_index.len(),
1180            num_sparse_entries: 0, // No longer using sparse index separately
1181            num_entries: self.num_entries,
1182            has_bloom_filter: self.bloom_filter.is_some(),
1183            has_dictionary: self.dictionary.is_some(),
1184            bloom_filter_size: self
1185                .bloom_filter
1186                .as_ref()
1187                .map(|b| b.size_bytes())
1188                .unwrap_or(0),
1189            dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
1190        }
1191    }
1192
1193    /// Number of blocks currently in the cache
1194    pub fn cached_blocks(&self) -> usize {
1195        self.cache.read().blocks.len()
1196    }
1197
1198    /// Look up a key (async - may need to load block)
1199    ///
1200    /// Uses bloom filter for fast negative lookups, then memory-efficient
1201    /// block index to locate the block, reducing I/O to typically 1 block read.
1202    pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
1203        log::debug!(
1204            "SSTable::get called, key_len={}, total_blocks={}",
1205            key.len(),
1206            self.block_index.len()
1207        );
1208
1209        // Check bloom filter first - fast negative lookup
1210        if let Some(ref bloom) = self.bloom_filter
1211            && !bloom.may_contain(key)
1212        {
1213            log::debug!("SSTable::get bloom filter negative");
1214            return Ok(None);
1215        }
1216
1217        // Use block index to find the block that could contain the key
1218        let block_idx = match self.block_index.locate(key) {
1219            Some(idx) => idx,
1220            None => {
1221                log::debug!("SSTable::get key not found (before first block)");
1222                return Ok(None);
1223            }
1224        };
1225
1226        log::debug!("SSTable::get loading block_idx={}", block_idx);
1227
1228        // Now we know exactly which block to load - single I/O
1229        let block_data = self.load_block(block_idx).await?;
1230        self.search_block(&block_data, key)
1231    }
1232
1233    /// Batch lookup multiple keys with optimized I/O
1234    ///
1235    /// Groups keys by block and loads each block only once, reducing
1236    /// I/O from N reads to at most N reads (often fewer if keys share blocks).
1237    /// Uses bloom filter to skip keys that definitely don't exist.
1238    pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
1239        if keys.is_empty() {
1240            return Ok(Vec::new());
1241        }
1242
1243        // Map each key to its block index
1244        let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
1245        for (key_idx, key) in keys.iter().enumerate() {
1246            // Check bloom filter first
1247            if let Some(ref bloom) = self.bloom_filter
1248                && !bloom.may_contain(key)
1249            {
1250                key_to_block.push((key_idx, usize::MAX)); // Definitely not present
1251                continue;
1252            }
1253
1254            match self.block_index.locate(key) {
1255                Some(block_idx) => key_to_block.push((key_idx, block_idx)),
1256                None => key_to_block.push((key_idx, usize::MAX)), // Mark as not found
1257            }
1258        }
1259
1260        // Group keys by block
1261        let mut blocks_to_load: Vec<usize> = key_to_block
1262            .iter()
1263            .filter(|(_, b)| *b != usize::MAX)
1264            .map(|(_, b)| *b)
1265            .collect();
1266        blocks_to_load.sort_unstable();
1267        blocks_to_load.dedup();
1268
1269        // Load all needed blocks (this is where I/O happens)
1270        for &block_idx in &blocks_to_load {
1271            let _ = self.load_block(block_idx).await?;
1272        }
1273
1274        // Now search each key in its block (all blocks are cached)
1275        let mut results = vec![None; keys.len()];
1276        for (key_idx, block_idx) in key_to_block {
1277            if block_idx == usize::MAX {
1278                continue;
1279            }
1280            let block_data = self.load_block(block_idx).await?; // Will hit cache
1281            results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
1282        }
1283
1284        Ok(results)
1285    }
1286
1287    /// Preload all data blocks into memory
1288    ///
1289    /// Call this after open() to eliminate all I/O during subsequent lookups.
1290    /// Useful when the SSTable is small enough to fit in memory.
1291    pub async fn preload_all_blocks(&self) -> io::Result<()> {
1292        for block_idx in 0..self.block_index.len() {
1293            self.load_block(block_idx).await?;
1294        }
1295        Ok(())
1296    }
1297
1298    /// Prefetch all data blocks via a single bulk I/O operation.
1299    ///
1300    /// Reads the entire compressed data section in one call, then decompresses
1301    /// each block and populates the cache. This turns N individual reads into 1.
1302    /// Cache capacity is expanded to hold all blocks.
1303    pub async fn prefetch_all_data_bulk(&self) -> io::Result<()> {
1304        let num_blocks = self.block_index.len();
1305        if num_blocks == 0 {
1306            return Ok(());
1307        }
1308
1309        // Find total data extent
1310        let mut max_end: u64 = 0;
1311        for i in 0..num_blocks {
1312            if let Some(addr) = self.block_index.get_addr(i) {
1313                max_end = max_end.max(addr.offset + addr.length as u64);
1314            }
1315        }
1316
1317        // Single bulk read of entire data section
1318        let all_data = self.data_slice.read_bytes_range(0..max_end).await?;
1319        let buf = all_data.as_slice();
1320
1321        // Expand cache and decompress all blocks
1322        let mut cache = self.cache.write();
1323        cache.max_blocks = cache.max_blocks.max(num_blocks);
1324        for i in 0..num_blocks {
1325            let addr = self.block_index.get_addr(i).unwrap();
1326            if cache.get(addr.offset).is_some() {
1327                continue;
1328            }
1329            let compressed =
1330                &buf[addr.offset as usize..(addr.offset + addr.length as u64) as usize];
1331            let decompressed = if let Some(ref dict) = self.dictionary {
1332                crate::compression::decompress_with_dict(compressed, dict)?
1333            } else {
1334                crate::compression::decompress(compressed)?
1335            };
1336            cache.insert(addr.offset, Arc::from(decompressed));
1337        }
1338
1339        Ok(())
1340    }
1341
1342    /// Load a block (checks cache first, then loads from FileSlice)
1343    /// Uses dictionary decompression if dictionary is present
1344    async fn load_block(&self, block_idx: usize) -> io::Result<Arc<[u8]>> {
1345        let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
1346            io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
1347        })?;
1348
1349        // Fast path: read-lock peek (no LRU promotion, zero writer contention)
1350        {
1351            if let Some(block) = self.cache.read().peek(addr.offset) {
1352                return Ok(block);
1353            }
1354        }
1355
1356        log::debug!(
1357            "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
1358            block_idx,
1359            addr.offset,
1360            addr.offset + addr.length as u64
1361        );
1362
1363        // Load from FileSlice
1364        let range = addr.byte_range();
1365        let compressed = self.data_slice.read_bytes_range(range).await?;
1366
1367        // Decompress with dictionary if available
1368        let decompressed = if let Some(ref dict) = self.dictionary {
1369            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1370        } else {
1371            crate::compression::decompress(compressed.as_slice())?
1372        };
1373
1374        let block: Arc<[u8]> = Arc::from(decompressed);
1375
1376        // Insert into cache (write lock, promotes LRU)
1377        {
1378            let mut cache = self.cache.write();
1379            cache.insert(addr.offset, Arc::clone(&block));
1380        }
1381
1382        Ok(block)
1383    }
1384
1385    /// Synchronous block load — only works for Inline (mmap/RAM) file handles.
1386    #[cfg(feature = "sync")]
1387    fn load_block_sync(&self, block_idx: usize) -> io::Result<Arc<[u8]>> {
1388        let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
1389            io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
1390        })?;
1391
1392        // Fast path: read-lock peek (no LRU promotion, zero writer contention)
1393        {
1394            if let Some(block) = self.cache.read().peek(addr.offset) {
1395                return Ok(block);
1396            }
1397        }
1398
1399        // Load from FileSlice (sync — requires Inline handle)
1400        let range = addr.byte_range();
1401        let compressed = self.data_slice.read_bytes_range_sync(range)?;
1402
1403        // Decompress with dictionary if available
1404        let decompressed = if let Some(ref dict) = self.dictionary {
1405            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1406        } else {
1407            crate::compression::decompress(compressed.as_slice())?
1408        };
1409
1410        let block: Arc<[u8]> = Arc::from(decompressed);
1411
1412        // Insert into cache (write lock, promotes LRU)
1413        {
1414            let mut cache = self.cache.write();
1415            cache.insert(addr.offset, Arc::clone(&block));
1416        }
1417
1418        Ok(block)
1419    }
1420
1421    /// Synchronous key lookup — only works for Inline (mmap/RAM) file handles.
1422    #[cfg(feature = "sync")]
1423    pub fn get_sync(&self, key: &[u8]) -> io::Result<Option<V>> {
1424        // Check bloom filter first — fast negative lookup
1425        if let Some(ref bloom) = self.bloom_filter
1426            && !bloom.may_contain(key)
1427        {
1428            return Ok(None);
1429        }
1430
1431        // Use block index to find the block that could contain the key
1432        let block_idx = match self.block_index.locate(key) {
1433            Some(idx) => idx,
1434            None => {
1435                return Ok(None);
1436            }
1437        };
1438
1439        let block_data = self.load_block_sync(block_idx)?;
1440        self.search_block(&block_data, key)
1441    }
1442
1443    fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
1444        let mut reader = block_data;
1445        let mut current_key = Vec::new();
1446
1447        while !reader.is_empty() {
1448            let common_prefix_len = read_vint(&mut reader)? as usize;
1449            let suffix_len = read_vint(&mut reader)? as usize;
1450
1451            if suffix_len > reader.len() {
1452                return Err(io::Error::new(
1453                    io::ErrorKind::UnexpectedEof,
1454                    "SSTable block suffix truncated",
1455                ));
1456            }
1457            current_key.truncate(common_prefix_len);
1458            current_key.extend_from_slice(&reader[..suffix_len]);
1459            reader = &reader[suffix_len..];
1460
1461            let value = V::deserialize(&mut reader)?;
1462
1463            match current_key.as_slice().cmp(target_key) {
1464                std::cmp::Ordering::Equal => return Ok(Some(value)),
1465                std::cmp::Ordering::Greater => return Ok(None),
1466                std::cmp::Ordering::Less => continue,
1467            }
1468        }
1469
1470        Ok(None)
1471    }
1472
1473    /// Prefetch blocks for a key range
1474    pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
1475        let start_block = self.block_index.locate(start_key).unwrap_or(0);
1476        let end_block = self
1477            .block_index
1478            .locate(end_key)
1479            .unwrap_or(self.block_index.len().saturating_sub(1));
1480
1481        for block_idx in start_block..=end_block.min(self.block_index.len().saturating_sub(1)) {
1482            let _ = self.load_block(block_idx).await?;
1483        }
1484
1485        Ok(())
1486    }
1487
1488    /// Iterate over all entries (loads blocks as needed)
1489    pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
1490        AsyncSSTableIterator::new(self)
1491    }
1492
1493    /// Get all entries as a vector (for merging)
1494    pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
1495        let mut results = Vec::new();
1496
1497        for block_idx in 0..self.block_index.len() {
1498            let block_data = self.load_block(block_idx).await?;
1499            let mut reader = &block_data[..];
1500            let mut current_key = Vec::new();
1501
1502            while !reader.is_empty() {
1503                let common_prefix_len = read_vint(&mut reader)? as usize;
1504                let suffix_len = read_vint(&mut reader)? as usize;
1505
1506                if suffix_len > reader.len() {
1507                    return Err(io::Error::new(
1508                        io::ErrorKind::UnexpectedEof,
1509                        "SSTable block suffix truncated",
1510                    ));
1511                }
1512                current_key.truncate(common_prefix_len);
1513                current_key.extend_from_slice(&reader[..suffix_len]);
1514                reader = &reader[suffix_len..];
1515
1516                let value = V::deserialize(&mut reader)?;
1517                results.push((current_key.clone(), value));
1518            }
1519        }
1520
1521        Ok(results)
1522    }
1523}
1524
1525/// Async iterator over SSTable entries
1526pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
1527    reader: &'a AsyncSSTableReader<V>,
1528    current_block: usize,
1529    block_data: Option<Arc<[u8]>>,
1530    block_offset: usize,
1531    current_key: Vec<u8>,
1532    finished: bool,
1533}
1534
1535impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
1536    fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
1537        Self {
1538            reader,
1539            current_block: 0,
1540            block_data: None,
1541            block_offset: 0,
1542            current_key: Vec::new(),
1543            finished: reader.block_index.is_empty(),
1544        }
1545    }
1546
1547    async fn load_next_block(&mut self) -> io::Result<bool> {
1548        if self.current_block >= self.reader.block_index.len() {
1549            self.finished = true;
1550            return Ok(false);
1551        }
1552
1553        self.block_data = Some(self.reader.load_block(self.current_block).await?);
1554        self.block_offset = 0;
1555        self.current_key.clear();
1556        self.current_block += 1;
1557        Ok(true)
1558    }
1559
1560    /// Advance to next entry (async)
1561    pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
1562        if self.finished {
1563            return Ok(None);
1564        }
1565
1566        if self.block_data.is_none() && !self.load_next_block().await? {
1567            return Ok(None);
1568        }
1569
1570        loop {
1571            let block = self.block_data.as_ref().unwrap();
1572            if self.block_offset >= block.len() {
1573                if !self.load_next_block().await? {
1574                    return Ok(None);
1575                }
1576                continue;
1577            }
1578
1579            let mut reader = &block[self.block_offset..];
1580            let start_len = reader.len();
1581
1582            let common_prefix_len = read_vint(&mut reader)? as usize;
1583            let suffix_len = read_vint(&mut reader)? as usize;
1584
1585            if suffix_len > reader.len() {
1586                return Err(io::Error::new(
1587                    io::ErrorKind::UnexpectedEof,
1588                    "SSTable block suffix truncated",
1589                ));
1590            }
1591            self.current_key.truncate(common_prefix_len);
1592            self.current_key.extend_from_slice(&reader[..suffix_len]);
1593            reader = &reader[suffix_len..];
1594
1595            let value = V::deserialize(&mut reader)?;
1596
1597            self.block_offset += start_len - reader.len();
1598
1599            return Ok(Some((self.current_key.clone(), value)));
1600        }
1601    }
1602}
1603
1604#[cfg(test)]
1605mod tests {
1606    use super::*;
1607
1608    #[test]
1609    fn test_bloom_filter_basic() {
1610        let mut bloom = BloomFilter::new(100, 10);
1611
1612        bloom.insert(b"hello");
1613        bloom.insert(b"world");
1614        bloom.insert(b"test");
1615
1616        assert!(bloom.may_contain(b"hello"));
1617        assert!(bloom.may_contain(b"world"));
1618        assert!(bloom.may_contain(b"test"));
1619
1620        // These should likely return false (with ~1% false positive rate)
1621        assert!(!bloom.may_contain(b"notfound"));
1622        assert!(!bloom.may_contain(b"missing"));
1623    }
1624
1625    #[test]
1626    fn test_bloom_filter_serialization() {
1627        let mut bloom = BloomFilter::new(100, 10);
1628        bloom.insert(b"key1");
1629        bloom.insert(b"key2");
1630
1631        let bytes = bloom.to_bytes();
1632        let restored = BloomFilter::from_owned_bytes(OwnedBytes::new(bytes)).unwrap();
1633
1634        assert!(restored.may_contain(b"key1"));
1635        assert!(restored.may_contain(b"key2"));
1636        assert!(!restored.may_contain(b"key3"));
1637    }
1638
1639    #[test]
1640    fn test_bloom_filter_false_positive_rate() {
1641        let num_keys = 10000;
1642        let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
1643
1644        // Insert keys
1645        for i in 0..num_keys {
1646            let key = format!("key_{}", i);
1647            bloom.insert(key.as_bytes());
1648        }
1649
1650        // All inserted keys should be found
1651        for i in 0..num_keys {
1652            let key = format!("key_{}", i);
1653            assert!(bloom.may_contain(key.as_bytes()));
1654        }
1655
1656        // Check false positive rate on non-existent keys
1657        let mut false_positives = 0;
1658        let test_count = 10000;
1659        for i in 0..test_count {
1660            let key = format!("nonexistent_{}", i);
1661            if bloom.may_contain(key.as_bytes()) {
1662                false_positives += 1;
1663            }
1664        }
1665
1666        // With 10 bits per key, expect ~1% false positive rate
1667        // Allow up to 3% due to hash function variance
1668        let fp_rate = false_positives as f64 / test_count as f64;
1669        assert!(
1670            fp_rate < 0.03,
1671            "False positive rate {} is too high",
1672            fp_rate
1673        );
1674    }
1675
1676    #[test]
1677    fn test_sstable_writer_config() {
1678        use crate::structures::IndexOptimization;
1679
1680        // Default = Adaptive
1681        let config = SSTableWriterConfig::default();
1682        assert_eq!(config.compression_level.0, 9); // BETTER
1683        assert!(config.use_bloom_filter); // Bloom always on — cheap and fast
1684        assert!(!config.use_dictionary);
1685
1686        // Adaptive
1687        let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
1688        assert_eq!(adaptive.compression_level.0, 9);
1689        assert!(adaptive.use_bloom_filter);
1690        assert!(!adaptive.use_dictionary);
1691
1692        // SizeOptimized
1693        let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
1694        assert_eq!(size.compression_level.0, 22); // MAX
1695        assert!(size.use_bloom_filter);
1696        assert!(size.use_dictionary);
1697
1698        // PerformanceOptimized
1699        let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
1700        assert_eq!(perf.compression_level.0, 1); // FAST
1701        assert!(perf.use_bloom_filter); // Bloom helps skip blocks fast
1702        assert!(!perf.use_dictionary);
1703
1704        // Aliases
1705        let fast = SSTableWriterConfig::fast();
1706        assert_eq!(fast.compression_level.0, 1);
1707
1708        let max = SSTableWriterConfig::max_compression();
1709        assert_eq!(max.compression_level.0, 22);
1710    }
1711
1712    #[test]
1713    fn test_vint_roundtrip() {
1714        let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
1715
1716        for &val in &test_values {
1717            let mut buf = Vec::new();
1718            write_vint(&mut buf, val).unwrap();
1719            let mut reader = buf.as_slice();
1720            let decoded = read_vint(&mut reader).unwrap();
1721            assert_eq!(val, decoded, "Failed for value {}", val);
1722        }
1723    }
1724
1725    #[test]
1726    fn test_common_prefix_len() {
1727        assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
1728        assert_eq!(common_prefix_len(b"hello", b"help"), 3);
1729        assert_eq!(common_prefix_len(b"hello", b"world"), 0);
1730        assert_eq!(common_prefix_len(b"", b"hello"), 0);
1731        assert_eq!(common_prefix_len(b"hello", b""), 0);
1732    }
1733}