Skip to main content

hermes_core/structures/
sstable.rs

1//! Async SSTable with lazy loading via FileSlice
2//!
3//! Memory-efficient design - only loads minimal metadata into memory,
4//! blocks are loaded on-demand.
5//!
6//! ## Key Features
7//!
8//! 1. **FST-based Block Index**: Uses Finite State Transducer for key lookup
9//!    - Can be mmap'd directly without parsing into heap-allocated structures
10//!    - ~90% memory reduction compared to Vec<BlockIndexEntry>
11//!
12//! 2. **Bitpacked Block Addresses**: Offsets and lengths stored with delta encoding
13//!    - Minimal memory footprint for block metadata
14//!
15//! 3. **Dictionary Compression**: Zstd dictionary for 15-30% better compression
16//!
17//! 4. **Configurable Compression Level**: Levels 1-22 for space/speed tradeoff
18//!
19//! 5. **Bloom Filter**: Fast negative lookups to skip unnecessary I/O
20
21use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
22use parking_lot::RwLock;
23use rustc_hash::FxHashMap;
24use std::io::{self, Read, Write};
25use std::sync::Arc;
26
27#[cfg(feature = "native")]
28use super::sstable_index::FstBlockIndex;
29use super::sstable_index::{BlockAddr, BlockIndex, MmapBlockIndex};
30use crate::compression::{CompressionDict, CompressionLevel};
31use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice, OwnedBytes};
32
33/// SSTable magic number - version 4 with memory-efficient index
34/// Uses FST-based or mmap'd block index to avoid heap allocation
35pub const SSTABLE_MAGIC: u32 = 0x53544234; // "STB4"
36
37/// Block size for SSTable (16KB default)
38pub const BLOCK_SIZE: usize = 16 * 1024;
39
40/// Default dictionary size (64KB)
41pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
42
43/// Bloom filter bits per key (10 bits ≈ 1% false positive rate)
44pub const BLOOM_BITS_PER_KEY: usize = 10;
45
46/// Bloom filter hash count (optimal for 10 bits/key)
47pub const BLOOM_HASH_COUNT: usize = 7;
48
49// ============================================================================
50// Bloom Filter Implementation
51// ============================================================================
52
53/// Simple bloom filter for key existence checks
54#[derive(Debug, Clone)]
55pub struct BloomFilter {
56    bits: Vec<u64>,
57    num_bits: usize,
58    num_hashes: usize,
59}
60
61impl BloomFilter {
62    /// Create a new bloom filter sized for expected number of keys
63    pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
64        let num_bits = (expected_keys * bits_per_key).max(64);
65        let num_words = num_bits.div_ceil(64);
66        Self {
67            bits: vec![0u64; num_words],
68            num_bits,
69            num_hashes: BLOOM_HASH_COUNT,
70        }
71    }
72
73    /// Create from serialized bytes
74    pub fn from_bytes(data: &[u8]) -> io::Result<Self> {
75        if data.len() < 12 {
76            return Err(io::Error::new(
77                io::ErrorKind::InvalidData,
78                "Bloom filter data too short",
79            ));
80        }
81        let mut reader = data;
82        let num_bits = reader.read_u32::<LittleEndian>()? as usize;
83        let num_hashes = reader.read_u32::<LittleEndian>()? as usize;
84        let num_words = reader.read_u32::<LittleEndian>()? as usize;
85
86        if reader.len() < num_words * 8 {
87            return Err(io::Error::new(
88                io::ErrorKind::InvalidData,
89                "Bloom filter data truncated",
90            ));
91        }
92
93        let mut bits = Vec::with_capacity(num_words);
94        for _ in 0..num_words {
95            bits.push(reader.read_u64::<LittleEndian>()?);
96        }
97
98        Ok(Self {
99            bits,
100            num_bits,
101            num_hashes,
102        })
103    }
104
105    /// Serialize to bytes
106    pub fn to_bytes(&self) -> Vec<u8> {
107        let mut data = Vec::with_capacity(12 + self.bits.len() * 8);
108        data.write_u32::<LittleEndian>(self.num_bits as u32)
109            .unwrap();
110        data.write_u32::<LittleEndian>(self.num_hashes as u32)
111            .unwrap();
112        data.write_u32::<LittleEndian>(self.bits.len() as u32)
113            .unwrap();
114        for &word in &self.bits {
115            data.write_u64::<LittleEndian>(word).unwrap();
116        }
117        data
118    }
119
120    /// Add a key to the filter
121    pub fn insert(&mut self, key: &[u8]) {
122        let (h1, h2) = self.hash_pair(key);
123        for i in 0..self.num_hashes {
124            let bit_pos = self.get_bit_pos(h1, h2, i);
125            let word_idx = bit_pos / 64;
126            let bit_idx = bit_pos % 64;
127            if word_idx < self.bits.len() {
128                self.bits[word_idx] |= 1u64 << bit_idx;
129            }
130        }
131    }
132
133    /// Check if a key might be in the filter
134    /// Returns false if definitely not present, true if possibly present
135    pub fn may_contain(&self, key: &[u8]) -> bool {
136        let (h1, h2) = self.hash_pair(key);
137        for i in 0..self.num_hashes {
138            let bit_pos = self.get_bit_pos(h1, h2, i);
139            let word_idx = bit_pos / 64;
140            let bit_idx = bit_pos % 64;
141            if word_idx >= self.bits.len() || (self.bits[word_idx] & (1u64 << bit_idx)) == 0 {
142                return false;
143            }
144        }
145        true
146    }
147
148    /// Size in bytes
149    pub fn size_bytes(&self) -> usize {
150        12 + self.bits.len() * 8
151    }
152
153    /// Compute two hash values using FNV-1a variant
154    fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
155        // FNV-1a hash
156        let mut h1: u64 = 0xcbf29ce484222325;
157        for &byte in key {
158            h1 ^= byte as u64;
159            h1 = h1.wrapping_mul(0x100000001b3);
160        }
161
162        // Second hash using different seed
163        let mut h2: u64 = 0x84222325cbf29ce4;
164        for &byte in key {
165            h2 = h2.wrapping_mul(0x100000001b3);
166            h2 ^= byte as u64;
167        }
168
169        (h1, h2)
170    }
171
172    /// Get bit position for hash iteration i using double hashing
173    #[inline]
174    fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
175        (h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
176    }
177}
178
179/// SSTable value trait
180pub trait SSTableValue: Clone + Send + Sync {
181    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
182    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
183}
184
185/// u64 value implementation
186impl SSTableValue for u64 {
187    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
188        write_vint(writer, *self)
189    }
190
191    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
192        read_vint(reader)
193    }
194}
195
196/// Vec<u8> value implementation
197impl SSTableValue for Vec<u8> {
198    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
199        write_vint(writer, self.len() as u64)?;
200        writer.write_all(self)
201    }
202
203    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
204        let len = read_vint(reader)? as usize;
205        let mut data = vec![0u8; len];
206        reader.read_exact(&mut data)?;
207        Ok(data)
208    }
209}
210
211/// Maximum number of postings that can be inlined in TermInfo
212pub const MAX_INLINE_POSTINGS: usize = 3;
213
214/// Term info for posting list references
215///
216/// Supports two modes:
217/// - **Inline**: Small posting lists (1-3 docs) stored directly in TermInfo
218/// - **External**: Larger posting lists stored in separate .post file
219///
220/// This eliminates a separate I/O read for rare/unique terms.
221#[derive(Debug, Clone, PartialEq, Eq)]
222pub enum TermInfo {
223    /// Small posting list inlined directly (up to MAX_INLINE_POSTINGS entries)
224    /// Each entry is (doc_id, term_freq) delta-encoded
225    Inline {
226        /// Number of postings (1-3)
227        doc_freq: u8,
228        /// Inline data: delta-encoded (doc_id, term_freq) pairs
229        /// Format: [delta_doc_id, term_freq, delta_doc_id, term_freq, ...]
230        data: [u8; 16],
231        /// Actual length of data used
232        data_len: u8,
233    },
234    /// Reference to external posting list in .post file
235    External {
236        posting_offset: u64,
237        posting_len: u32,
238        doc_freq: u32,
239        /// Position data offset (0 if no positions)
240        position_offset: u64,
241        /// Position data length (0 if no positions)
242        position_len: u32,
243    },
244}
245
246impl TermInfo {
247    /// Create an external reference
248    pub fn external(posting_offset: u64, posting_len: u32, doc_freq: u32) -> Self {
249        TermInfo::External {
250            posting_offset,
251            posting_len,
252            doc_freq,
253            position_offset: 0,
254            position_len: 0,
255        }
256    }
257
258    /// Create an external reference with position info
259    pub fn external_with_positions(
260        posting_offset: u64,
261        posting_len: u32,
262        doc_freq: u32,
263        position_offset: u64,
264        position_len: u32,
265    ) -> Self {
266        TermInfo::External {
267            posting_offset,
268            posting_len,
269            doc_freq,
270            position_offset,
271            position_len,
272        }
273    }
274
275    /// Try to create an inline TermInfo from posting data
276    /// Returns None if posting list is too large to inline
277    pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
278        if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
279            return None;
280        }
281
282        let mut data = [0u8; 16];
283        let mut cursor = std::io::Cursor::new(&mut data[..]);
284        let mut prev_doc_id = 0u32;
285
286        for (i, &doc_id) in doc_ids.iter().enumerate() {
287            let delta = doc_id - prev_doc_id;
288            if write_vint(&mut cursor, delta as u64).is_err() {
289                return None;
290            }
291            if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
292                return None;
293            }
294            prev_doc_id = doc_id;
295        }
296
297        let data_len = cursor.position() as u8;
298        if data_len > 16 {
299            return None;
300        }
301
302        Some(TermInfo::Inline {
303            doc_freq: doc_ids.len() as u8,
304            data,
305            data_len,
306        })
307    }
308
309    /// Get document frequency
310    pub fn doc_freq(&self) -> u32 {
311        match self {
312            TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
313            TermInfo::External { doc_freq, .. } => *doc_freq,
314        }
315    }
316
317    /// Check if this is an inline posting list
318    pub fn is_inline(&self) -> bool {
319        matches!(self, TermInfo::Inline { .. })
320    }
321
322    /// Get external posting info (offset, len) - returns None for inline
323    pub fn external_info(&self) -> Option<(u64, u32)> {
324        match self {
325            TermInfo::External {
326                posting_offset,
327                posting_len,
328                ..
329            } => Some((*posting_offset, *posting_len)),
330            TermInfo::Inline { .. } => None,
331        }
332    }
333
334    /// Get position info (offset, len) - returns None for inline or if no positions
335    pub fn position_info(&self) -> Option<(u64, u32)> {
336        match self {
337            TermInfo::External {
338                position_offset,
339                position_len,
340                ..
341            } if *position_len > 0 => Some((*position_offset, *position_len)),
342            _ => None,
343        }
344    }
345
346    /// Decode inline postings into (doc_ids, term_freqs)
347    /// Returns None if this is an external reference
348    pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
349        match self {
350            TermInfo::Inline {
351                doc_freq,
352                data,
353                data_len,
354            } => {
355                let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
356                let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
357                let mut reader = &data[..*data_len as usize];
358                let mut prev_doc_id = 0u32;
359
360                for _ in 0..*doc_freq {
361                    let delta = read_vint(&mut reader).ok()? as u32;
362                    let tf = read_vint(&mut reader).ok()? as u32;
363                    let doc_id = prev_doc_id + delta;
364                    doc_ids.push(doc_id);
365                    term_freqs.push(tf);
366                    prev_doc_id = doc_id;
367                }
368
369                Some((doc_ids, term_freqs))
370            }
371            TermInfo::External { .. } => None,
372        }
373    }
374}
375
376impl SSTableValue for TermInfo {
377    fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
378        match self {
379            TermInfo::Inline {
380                doc_freq,
381                data,
382                data_len,
383            } => {
384                // Tag byte 0xFF = inline marker
385                writer.write_u8(0xFF)?;
386                writer.write_u8(*doc_freq)?;
387                writer.write_u8(*data_len)?;
388                writer.write_all(&data[..*data_len as usize])?;
389            }
390            TermInfo::External {
391                posting_offset,
392                posting_len,
393                doc_freq,
394                position_offset,
395                position_len,
396            } => {
397                // Tag byte 0x00 = external marker (no positions)
398                // Tag byte 0x01 = external with positions
399                if *position_len > 0 {
400                    writer.write_u8(0x01)?;
401                    write_vint(writer, *doc_freq as u64)?;
402                    write_vint(writer, *posting_offset)?;
403                    write_vint(writer, *posting_len as u64)?;
404                    write_vint(writer, *position_offset)?;
405                    write_vint(writer, *position_len as u64)?;
406                } else {
407                    writer.write_u8(0x00)?;
408                    write_vint(writer, *doc_freq as u64)?;
409                    write_vint(writer, *posting_offset)?;
410                    write_vint(writer, *posting_len as u64)?;
411                }
412            }
413        }
414        Ok(())
415    }
416
417    fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
418        let tag = reader.read_u8()?;
419
420        if tag == 0xFF {
421            // Inline
422            let doc_freq = reader.read_u8()?;
423            let data_len = reader.read_u8()?;
424            let mut data = [0u8; 16];
425            reader.read_exact(&mut data[..data_len as usize])?;
426            Ok(TermInfo::Inline {
427                doc_freq,
428                data,
429                data_len,
430            })
431        } else if tag == 0x00 {
432            // External (no positions)
433            let doc_freq = read_vint(reader)? as u32;
434            let posting_offset = read_vint(reader)?;
435            let posting_len = read_vint(reader)? as u32;
436            Ok(TermInfo::External {
437                posting_offset,
438                posting_len,
439                doc_freq,
440                position_offset: 0,
441                position_len: 0,
442            })
443        } else if tag == 0x01 {
444            // External with positions
445            let doc_freq = read_vint(reader)? as u32;
446            let posting_offset = read_vint(reader)?;
447            let posting_len = read_vint(reader)? as u32;
448            let position_offset = read_vint(reader)?;
449            let position_len = read_vint(reader)? as u32;
450            Ok(TermInfo::External {
451                posting_offset,
452                posting_len,
453                doc_freq,
454                position_offset,
455                position_len,
456            })
457        } else {
458            Err(io::Error::new(
459                io::ErrorKind::InvalidData,
460                format!("Invalid TermInfo tag: {}", tag),
461            ))
462        }
463    }
464}
465
466/// Write variable-length integer
467pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
468    loop {
469        let byte = (value & 0x7F) as u8;
470        value >>= 7;
471        if value == 0 {
472            writer.write_u8(byte)?;
473            return Ok(());
474        } else {
475            writer.write_u8(byte | 0x80)?;
476        }
477    }
478}
479
480/// Read variable-length integer
481pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
482    let mut result = 0u64;
483    let mut shift = 0;
484
485    loop {
486        let byte = reader.read_u8()?;
487        result |= ((byte & 0x7F) as u64) << shift;
488        if byte & 0x80 == 0 {
489            return Ok(result);
490        }
491        shift += 7;
492        if shift >= 64 {
493            return Err(io::Error::new(
494                io::ErrorKind::InvalidData,
495                "varint too long",
496            ));
497        }
498    }
499}
500
501/// Compute common prefix length
502pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
503    a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
504}
505
506/// SSTable statistics for debugging
507#[derive(Debug, Clone)]
508pub struct SSTableStats {
509    pub num_blocks: usize,
510    pub num_sparse_entries: usize,
511    pub num_entries: u64,
512    pub has_bloom_filter: bool,
513    pub has_dictionary: bool,
514    pub bloom_filter_size: usize,
515    pub dictionary_size: usize,
516}
517
518/// SSTable writer configuration
519#[derive(Debug, Clone)]
520pub struct SSTableWriterConfig {
521    /// Compression level (1-22, higher = better compression but slower)
522    pub compression_level: CompressionLevel,
523    /// Whether to train and use a dictionary for compression
524    pub use_dictionary: bool,
525    /// Dictionary size in bytes (default 64KB)
526    pub dict_size: usize,
527    /// Whether to build a bloom filter
528    pub use_bloom_filter: bool,
529    /// Bloom filter bits per key (default 10 = ~1% false positive rate)
530    pub bloom_bits_per_key: usize,
531}
532
533impl Default for SSTableWriterConfig {
534    fn default() -> Self {
535        Self::from_optimization(crate::structures::IndexOptimization::default())
536    }
537}
538
539impl SSTableWriterConfig {
540    /// Create config from IndexOptimization mode
541    pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
542        use crate::structures::IndexOptimization;
543        match optimization {
544            IndexOptimization::Adaptive => Self {
545                compression_level: CompressionLevel::BETTER, // Level 9
546                use_dictionary: false,
547                dict_size: DEFAULT_DICT_SIZE,
548                use_bloom_filter: false,
549                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
550            },
551            IndexOptimization::SizeOptimized => Self {
552                compression_level: CompressionLevel::MAX, // Level 22
553                use_dictionary: true,
554                dict_size: DEFAULT_DICT_SIZE,
555                use_bloom_filter: true,
556                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
557            },
558            IndexOptimization::PerformanceOptimized => Self {
559                compression_level: CompressionLevel::FAST, // Level 1
560                use_dictionary: false,
561                dict_size: DEFAULT_DICT_SIZE,
562                use_bloom_filter: true, // Bloom helps skip blocks fast
563                bloom_bits_per_key: BLOOM_BITS_PER_KEY,
564            },
565        }
566    }
567
568    /// Fast configuration - prioritize write speed over compression
569    pub fn fast() -> Self {
570        Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
571    }
572
573    /// Maximum compression configuration - prioritize size over speed
574    pub fn max_compression() -> Self {
575        Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
576    }
577}
578
579/// SSTable writer with optimizations:
580/// - Dictionary compression for blocks (if dictionary provided)
581/// - Configurable compression level
582/// - Block index prefix compression
583/// - Bloom filter for fast negative lookups
584pub struct SSTableWriter<'a, V: SSTableValue> {
585    writer: &'a mut dyn Write,
586    block_buffer: Vec<u8>,
587    prev_key: Vec<u8>,
588    index: Vec<BlockIndexEntry>,
589    current_offset: u64,
590    num_entries: u64,
591    block_first_key: Option<Vec<u8>>,
592    config: SSTableWriterConfig,
593    /// Pre-trained dictionary for compression (optional)
594    dictionary: Option<CompressionDict>,
595    /// All keys for bloom filter
596    all_keys: Vec<Vec<u8>>,
597    /// Bloom filter (built at finish time)
598    bloom_filter: Option<BloomFilter>,
599    _phantom: std::marker::PhantomData<V>,
600}
601
602impl<'a, V: SSTableValue> SSTableWriter<'a, V> {
603    /// Create a new SSTable writer with default configuration
604    pub fn new(writer: &'a mut dyn Write) -> Self {
605        Self::with_config(writer, SSTableWriterConfig::default())
606    }
607
608    /// Create a new SSTable writer with custom configuration
609    pub fn with_config(writer: &'a mut dyn Write, config: SSTableWriterConfig) -> Self {
610        Self {
611            writer,
612            block_buffer: Vec::with_capacity(BLOCK_SIZE),
613            prev_key: Vec::new(),
614            index: Vec::new(),
615            current_offset: 0,
616            num_entries: 0,
617            block_first_key: None,
618            config,
619            dictionary: None,
620            all_keys: Vec::new(),
621            bloom_filter: None,
622            _phantom: std::marker::PhantomData,
623        }
624    }
625
626    /// Create a new SSTable writer with a pre-trained dictionary
627    pub fn with_dictionary(
628        writer: &'a mut dyn Write,
629        config: SSTableWriterConfig,
630        dictionary: CompressionDict,
631    ) -> Self {
632        Self {
633            writer,
634            block_buffer: Vec::with_capacity(BLOCK_SIZE),
635            prev_key: Vec::new(),
636            index: Vec::new(),
637            current_offset: 0,
638            num_entries: 0,
639            block_first_key: None,
640            config,
641            dictionary: Some(dictionary),
642            all_keys: Vec::new(),
643            bloom_filter: None,
644            _phantom: std::marker::PhantomData,
645        }
646    }
647
648    pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
649        if self.block_first_key.is_none() {
650            self.block_first_key = Some(key.to_vec());
651        }
652
653        // Collect keys for bloom filter
654        if self.config.use_bloom_filter {
655            self.all_keys.push(key.to_vec());
656        }
657
658        let prefix_len = common_prefix_len(&self.prev_key, key);
659        let suffix = &key[prefix_len..];
660
661        write_vint(&mut self.block_buffer, prefix_len as u64)?;
662        write_vint(&mut self.block_buffer, suffix.len() as u64)?;
663        self.block_buffer.extend_from_slice(suffix);
664        value.serialize(&mut self.block_buffer)?;
665
666        self.prev_key.clear();
667        self.prev_key.extend_from_slice(key);
668        self.num_entries += 1;
669
670        if self.block_buffer.len() >= BLOCK_SIZE {
671            self.flush_block()?;
672        }
673
674        Ok(())
675    }
676
677    /// Flush and compress the current block
678    fn flush_block(&mut self) -> io::Result<()> {
679        if self.block_buffer.is_empty() {
680            return Ok(());
681        }
682
683        // Compress block with dictionary if available
684        let compressed = if let Some(ref dict) = self.dictionary {
685            crate::compression::compress_with_dict(
686                &self.block_buffer,
687                self.config.compression_level,
688                dict,
689            )?
690        } else {
691            crate::compression::compress(&self.block_buffer, self.config.compression_level)?
692        };
693
694        if let Some(first_key) = self.block_first_key.take() {
695            self.index.push(BlockIndexEntry {
696                first_key,
697                offset: self.current_offset,
698                length: compressed.len() as u32,
699            });
700        }
701
702        self.writer.write_all(&compressed)?;
703        self.current_offset += compressed.len() as u64;
704        self.block_buffer.clear();
705        self.prev_key.clear();
706
707        Ok(())
708    }
709
710    pub fn finish(mut self) -> io::Result<()> {
711        // Flush any remaining data
712        self.flush_block()?;
713
714        // Build bloom filter from collected keys
715        if self.config.use_bloom_filter && !self.all_keys.is_empty() {
716            let mut bloom = BloomFilter::new(self.all_keys.len(), self.config.bloom_bits_per_key);
717            for key in &self.all_keys {
718                bloom.insert(key);
719            }
720            self.bloom_filter = Some(bloom);
721        }
722
723        let data_end_offset = self.current_offset;
724
725        // Build memory-efficient block index (v4 format)
726        // Convert to (key, BlockAddr) pairs for the new index format
727        let entries: Vec<(Vec<u8>, BlockAddr)> = self
728            .index
729            .iter()
730            .map(|e| {
731                (
732                    e.first_key.clone(),
733                    BlockAddr {
734                        offset: e.offset,
735                        length: e.length,
736                    },
737                )
738            })
739            .collect();
740
741        // Build FST-based index if native feature is enabled, otherwise use mmap index
742        #[cfg(feature = "native")]
743        let index_bytes = FstBlockIndex::build(&entries)?;
744        #[cfg(not(feature = "native"))]
745        let index_bytes = MmapBlockIndex::build(&entries)?;
746
747        // Write index bytes with length prefix
748        self.writer
749            .write_u32::<LittleEndian>(index_bytes.len() as u32)?;
750        self.writer.write_all(&index_bytes)?;
751        self.current_offset += 4 + index_bytes.len() as u64;
752
753        // Write bloom filter if present
754        let bloom_offset = if let Some(ref bloom) = self.bloom_filter {
755            let bloom_data = bloom.to_bytes();
756            let offset = self.current_offset;
757            self.writer.write_all(&bloom_data)?;
758            self.current_offset += bloom_data.len() as u64;
759            offset
760        } else {
761            0
762        };
763
764        // Write dictionary if present
765        let dict_offset = if let Some(ref dict) = self.dictionary {
766            let dict_bytes = dict.as_bytes();
767            let offset = self.current_offset;
768            self.writer
769                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
770            self.writer.write_all(dict_bytes)?;
771            self.current_offset += 4 + dict_bytes.len() as u64;
772            offset
773        } else {
774            0
775        };
776
777        // Write extended footer (v4 format)
778        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
779        self.writer.write_u64::<LittleEndian>(self.num_entries)?;
780        self.writer.write_u64::<LittleEndian>(bloom_offset)?; // 0 if no bloom
781        self.writer.write_u64::<LittleEndian>(dict_offset)?; // 0 if no dict
782        self.writer
783            .write_u8(self.config.compression_level.0 as u8)?;
784        self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
785
786        Ok(())
787    }
788}
789
790/// Block index entry
791#[derive(Debug, Clone)]
792struct BlockIndexEntry {
793    first_key: Vec<u8>,
794    offset: u64,
795    length: u32,
796}
797
798/// Async SSTable reader - loads blocks on demand via LazyFileSlice
799///
800/// Memory-efficient design (v4 format):
801/// - Block index uses FST (native) or mmap'd raw bytes - no heap allocation for keys
802/// - Block addresses stored in bitpacked format
803/// - Bloom filter and dictionary optional
804pub struct AsyncSSTableReader<V: SSTableValue> {
805    /// LazyFileSlice for the data portion (blocks only) - fetches ranges on demand
806    data_slice: LazyFileSlice,
807    /// Memory-efficient block index (v4: FST or mmap, v3: legacy Vec)
808    block_index: BlockIndex,
809    num_entries: u64,
810    /// Hot cache for decompressed blocks
811    cache: RwLock<BlockCache>,
812    /// Bloom filter for fast negative lookups (optional)
813    bloom_filter: Option<BloomFilter>,
814    /// Compression dictionary (optional)
815    dictionary: Option<CompressionDict>,
816    /// Compression level used
817    #[allow(dead_code)]
818    compression_level: CompressionLevel,
819    _phantom: std::marker::PhantomData<V>,
820}
821
822/// LRU-style block cache
823struct BlockCache {
824    blocks: FxHashMap<u64, Arc<Vec<u8>>>,
825    access_order: Vec<u64>,
826    max_blocks: usize,
827}
828
829impl BlockCache {
830    fn new(max_blocks: usize) -> Self {
831        Self {
832            blocks: FxHashMap::default(),
833            access_order: Vec::new(),
834            max_blocks,
835        }
836    }
837
838    fn get(&mut self, offset: u64) -> Option<Arc<Vec<u8>>> {
839        if let Some(block) = self.blocks.get(&offset) {
840            if let Some(pos) = self.access_order.iter().position(|&o| o == offset) {
841                self.access_order.remove(pos);
842                self.access_order.push(offset);
843            }
844            Some(Arc::clone(block))
845        } else {
846            None
847        }
848    }
849
850    fn insert(&mut self, offset: u64, block: Arc<Vec<u8>>) {
851        while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
852            let evict_offset = self.access_order.remove(0);
853            self.blocks.remove(&evict_offset);
854        }
855        self.blocks.insert(offset, block);
856        self.access_order.push(offset);
857    }
858}
859
860impl<V: SSTableValue> AsyncSSTableReader<V> {
861    /// Open an SSTable from a LazyFileHandle
862    /// Only loads the footer and index into memory, data blocks fetched on-demand
863    ///
864    /// Supports both v3 (legacy) and v4 (memory-efficient) formats:
865    /// - v4: FST-based or mmap'd block index (no heap allocation for keys)
866    /// - v3: Prefix-compressed block index loaded into Vec (legacy fallback)
867    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
868        let file_len = file_handle.len();
869        if file_len < 37 {
870            return Err(io::Error::new(
871                io::ErrorKind::InvalidData,
872                "SSTable too small",
873            ));
874        }
875
876        // Read footer (37 bytes)
877        // Format: data_end(8) + num_entries(8) + bloom_offset(8) + dict_offset(8) + compression_level(1) + magic(4)
878        let footer_bytes = file_handle
879            .read_bytes_range(file_len - 37..file_len)
880            .await?;
881
882        let mut reader = footer_bytes.as_slice();
883        let data_end_offset = reader.read_u64::<LittleEndian>()?;
884        let num_entries = reader.read_u64::<LittleEndian>()?;
885        let bloom_offset = reader.read_u64::<LittleEndian>()?;
886        let dict_offset = reader.read_u64::<LittleEndian>()?;
887        let compression_level = CompressionLevel(reader.read_u8()? as i32);
888        let magic = reader.read_u32::<LittleEndian>()?;
889
890        if magic != SSTABLE_MAGIC {
891            return Err(io::Error::new(
892                io::ErrorKind::InvalidData,
893                format!("Invalid SSTable magic: 0x{:08X}", magic),
894            ));
895        }
896
897        // Read index section
898        let index_start = data_end_offset;
899        let index_end = file_len - 37;
900        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
901
902        // Parse block index (length-prefixed FST or mmap index)
903        let mut idx_reader = index_bytes.as_slice();
904        let index_len = idx_reader.read_u32::<LittleEndian>()? as usize;
905
906        if index_len > idx_reader.len() {
907            return Err(io::Error::new(
908                io::ErrorKind::InvalidData,
909                "Index data truncated",
910            ));
911        }
912
913        let index_data = OwnedBytes::new(idx_reader[..index_len].to_vec());
914
915        // Try FST first (native), fall back to mmap
916        #[cfg(feature = "native")]
917        let block_index = match FstBlockIndex::load(index_data.clone()) {
918            Ok(fst_idx) => BlockIndex::Fst(fst_idx),
919            Err(_) => BlockIndex::Mmap(MmapBlockIndex::load(index_data)?),
920        };
921        #[cfg(not(feature = "native"))]
922        let block_index = BlockIndex::Mmap(MmapBlockIndex::load(index_data)?);
923
924        // Load bloom filter if present
925        let bloom_filter = if bloom_offset > 0 {
926            let bloom_start = bloom_offset;
927            // Read bloom filter size first (12 bytes header)
928            let bloom_header = file_handle
929                .read_bytes_range(bloom_start..bloom_start + 12)
930                .await?;
931            let num_words = u32::from_le_bytes([
932                bloom_header[8],
933                bloom_header[9],
934                bloom_header[10],
935                bloom_header[11],
936            ]) as u64;
937            let bloom_size = 12 + num_words * 8;
938            let bloom_data = file_handle
939                .read_bytes_range(bloom_start..bloom_start + bloom_size)
940                .await?;
941            Some(BloomFilter::from_bytes(&bloom_data)?)
942        } else {
943            None
944        };
945
946        // Load dictionary if present
947        let dictionary = if dict_offset > 0 {
948            let dict_start = dict_offset;
949            // Read dictionary size first
950            let dict_len_bytes = file_handle
951                .read_bytes_range(dict_start..dict_start + 4)
952                .await?;
953            let dict_len = u32::from_le_bytes([
954                dict_len_bytes[0],
955                dict_len_bytes[1],
956                dict_len_bytes[2],
957                dict_len_bytes[3],
958            ]) as u64;
959            let dict_data = file_handle
960                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
961                .await?;
962            Some(CompressionDict::from_bytes(dict_data.to_vec()))
963        } else {
964            None
965        };
966
967        // Create a lazy slice for just the data portion
968        let data_slice = file_handle.slice(0..data_end_offset);
969
970        Ok(Self {
971            data_slice,
972            block_index,
973            num_entries,
974            cache: RwLock::new(BlockCache::new(cache_blocks)),
975            bloom_filter,
976            dictionary,
977            compression_level,
978            _phantom: std::marker::PhantomData,
979        })
980    }
981
982    /// Number of entries
983    pub fn num_entries(&self) -> u64 {
984        self.num_entries
985    }
986
987    /// Get stats about this SSTable for debugging
988    pub fn stats(&self) -> SSTableStats {
989        SSTableStats {
990            num_blocks: self.block_index.len(),
991            num_sparse_entries: 0, // No longer using sparse index separately
992            num_entries: self.num_entries,
993            has_bloom_filter: self.bloom_filter.is_some(),
994            has_dictionary: self.dictionary.is_some(),
995            bloom_filter_size: self
996                .bloom_filter
997                .as_ref()
998                .map(|b| b.size_bytes())
999                .unwrap_or(0),
1000            dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
1001        }
1002    }
1003
1004    /// Look up a key (async - may need to load block)
1005    ///
1006    /// Uses bloom filter for fast negative lookups, then memory-efficient
1007    /// block index to locate the block, reducing I/O to typically 1 block read.
1008    pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
1009        log::debug!(
1010            "SSTable::get called, key_len={}, total_blocks={}",
1011            key.len(),
1012            self.block_index.len()
1013        );
1014
1015        // Check bloom filter first - fast negative lookup
1016        if let Some(ref bloom) = self.bloom_filter
1017            && !bloom.may_contain(key)
1018        {
1019            log::debug!("SSTable::get bloom filter negative");
1020            return Ok(None);
1021        }
1022
1023        // Use block index to find the block that could contain the key
1024        let block_idx = match self.block_index.locate(key) {
1025            Some(idx) => idx,
1026            None => {
1027                log::debug!("SSTable::get key not found (before first block)");
1028                return Ok(None);
1029            }
1030        };
1031
1032        log::debug!("SSTable::get loading block_idx={}", block_idx);
1033
1034        // Now we know exactly which block to load - single I/O
1035        let block_data = self.load_block(block_idx).await?;
1036        self.search_block(&block_data, key)
1037    }
1038
1039    /// Batch lookup multiple keys with optimized I/O
1040    ///
1041    /// Groups keys by block and loads each block only once, reducing
1042    /// I/O from N reads to at most N reads (often fewer if keys share blocks).
1043    /// Uses bloom filter to skip keys that definitely don't exist.
1044    pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
1045        if keys.is_empty() {
1046            return Ok(Vec::new());
1047        }
1048
1049        // Map each key to its block index
1050        let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
1051        for (key_idx, key) in keys.iter().enumerate() {
1052            // Check bloom filter first
1053            if let Some(ref bloom) = self.bloom_filter
1054                && !bloom.may_contain(key)
1055            {
1056                key_to_block.push((key_idx, usize::MAX)); // Definitely not present
1057                continue;
1058            }
1059
1060            match self.block_index.locate(key) {
1061                Some(block_idx) => key_to_block.push((key_idx, block_idx)),
1062                None => key_to_block.push((key_idx, usize::MAX)), // Mark as not found
1063            }
1064        }
1065
1066        // Group keys by block
1067        let mut blocks_to_load: Vec<usize> = key_to_block
1068            .iter()
1069            .filter(|(_, b)| *b != usize::MAX)
1070            .map(|(_, b)| *b)
1071            .collect();
1072        blocks_to_load.sort_unstable();
1073        blocks_to_load.dedup();
1074
1075        // Load all needed blocks (this is where I/O happens)
1076        for &block_idx in &blocks_to_load {
1077            let _ = self.load_block(block_idx).await?;
1078        }
1079
1080        // Now search each key in its block (all blocks are cached)
1081        let mut results = vec![None; keys.len()];
1082        for (key_idx, block_idx) in key_to_block {
1083            if block_idx == usize::MAX {
1084                continue;
1085            }
1086            let block_data = self.load_block(block_idx).await?; // Will hit cache
1087            results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
1088        }
1089
1090        Ok(results)
1091    }
1092
1093    /// Preload all data blocks into memory
1094    ///
1095    /// Call this after open() to eliminate all I/O during subsequent lookups.
1096    /// Useful when the SSTable is small enough to fit in memory.
1097    pub async fn preload_all_blocks(&self) -> io::Result<()> {
1098        for block_idx in 0..self.block_index.len() {
1099            self.load_block(block_idx).await?;
1100        }
1101        Ok(())
1102    }
1103
1104    /// Load a block (checks cache first, then loads from FileSlice)
1105    /// Uses dictionary decompression if dictionary is present
1106    async fn load_block(&self, block_idx: usize) -> io::Result<Arc<Vec<u8>>> {
1107        let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
1108            io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
1109        })?;
1110
1111        // Check cache first
1112        {
1113            let mut cache = self.cache.write();
1114            if let Some(block) = cache.get(addr.offset) {
1115                log::debug!("SSTable::load_block idx={} CACHE HIT", block_idx);
1116                return Ok(block);
1117            }
1118        }
1119
1120        log::debug!(
1121            "SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
1122            block_idx,
1123            addr.offset,
1124            addr.offset + addr.length as u64
1125        );
1126
1127        // Load from FileSlice
1128        let range = addr.byte_range();
1129        let compressed = self.data_slice.read_bytes_range(range).await?;
1130
1131        // Decompress with dictionary if available
1132        let decompressed = if let Some(ref dict) = self.dictionary {
1133            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
1134        } else {
1135            crate::compression::decompress(compressed.as_slice())?
1136        };
1137
1138        let block = Arc::new(decompressed);
1139
1140        // Insert into cache
1141        {
1142            let mut cache = self.cache.write();
1143            cache.insert(addr.offset, Arc::clone(&block));
1144        }
1145
1146        Ok(block)
1147    }
1148
1149    fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
1150        let mut reader = block_data;
1151        let mut current_key = Vec::new();
1152
1153        while !reader.is_empty() {
1154            let common_prefix_len = read_vint(&mut reader)? as usize;
1155            let suffix_len = read_vint(&mut reader)? as usize;
1156
1157            current_key.truncate(common_prefix_len);
1158            let mut suffix = vec![0u8; suffix_len];
1159            reader.read_exact(&mut suffix)?;
1160            current_key.extend_from_slice(&suffix);
1161
1162            let value = V::deserialize(&mut reader)?;
1163
1164            match current_key.as_slice().cmp(target_key) {
1165                std::cmp::Ordering::Equal => return Ok(Some(value)),
1166                std::cmp::Ordering::Greater => return Ok(None),
1167                std::cmp::Ordering::Less => continue,
1168            }
1169        }
1170
1171        Ok(None)
1172    }
1173
1174    /// Prefetch blocks for a key range
1175    pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
1176        let start_block = self.block_index.locate(start_key).unwrap_or(0);
1177        let end_block = self
1178            .block_index
1179            .locate(end_key)
1180            .unwrap_or(self.block_index.len().saturating_sub(1));
1181
1182        for block_idx in start_block..=end_block.min(self.block_index.len().saturating_sub(1)) {
1183            let _ = self.load_block(block_idx).await?;
1184        }
1185
1186        Ok(())
1187    }
1188
1189    /// Iterate over all entries (loads blocks as needed)
1190    pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
1191        AsyncSSTableIterator::new(self)
1192    }
1193
1194    /// Get all entries as a vector (for merging)
1195    pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
1196        let mut results = Vec::new();
1197
1198        for block_idx in 0..self.block_index.len() {
1199            let block_data = self.load_block(block_idx).await?;
1200            let mut reader = block_data.as_slice();
1201            let mut current_key = Vec::new();
1202
1203            while !reader.is_empty() {
1204                let common_prefix_len = read_vint(&mut reader)? as usize;
1205                let suffix_len = read_vint(&mut reader)? as usize;
1206
1207                current_key.truncate(common_prefix_len);
1208                let mut suffix = vec![0u8; suffix_len];
1209                reader.read_exact(&mut suffix)?;
1210                current_key.extend_from_slice(&suffix);
1211
1212                let value = V::deserialize(&mut reader)?;
1213                results.push((current_key.clone(), value));
1214            }
1215        }
1216
1217        Ok(results)
1218    }
1219}
1220
1221/// Async iterator over SSTable entries
1222pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
1223    reader: &'a AsyncSSTableReader<V>,
1224    current_block: usize,
1225    block_data: Option<Arc<Vec<u8>>>,
1226    block_offset: usize,
1227    current_key: Vec<u8>,
1228    finished: bool,
1229}
1230
1231impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
1232    fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
1233        Self {
1234            reader,
1235            current_block: 0,
1236            block_data: None,
1237            block_offset: 0,
1238            current_key: Vec::new(),
1239            finished: reader.block_index.is_empty(),
1240        }
1241    }
1242
1243    async fn load_next_block(&mut self) -> io::Result<bool> {
1244        if self.current_block >= self.reader.block_index.len() {
1245            self.finished = true;
1246            return Ok(false);
1247        }
1248
1249        self.block_data = Some(self.reader.load_block(self.current_block).await?);
1250        self.block_offset = 0;
1251        self.current_key.clear();
1252        self.current_block += 1;
1253        Ok(true)
1254    }
1255
1256    /// Advance to next entry (async)
1257    pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
1258        if self.finished {
1259            return Ok(None);
1260        }
1261
1262        if self.block_data.is_none() && !self.load_next_block().await? {
1263            return Ok(None);
1264        }
1265
1266        loop {
1267            let block = self.block_data.as_ref().unwrap();
1268            if self.block_offset >= block.len() {
1269                if !self.load_next_block().await? {
1270                    return Ok(None);
1271                }
1272                continue;
1273            }
1274
1275            let mut reader = &block[self.block_offset..];
1276            let start_len = reader.len();
1277
1278            let common_prefix_len = read_vint(&mut reader)? as usize;
1279            let suffix_len = read_vint(&mut reader)? as usize;
1280
1281            self.current_key.truncate(common_prefix_len);
1282            let mut suffix = vec![0u8; suffix_len];
1283            reader.read_exact(&mut suffix)?;
1284            self.current_key.extend_from_slice(&suffix);
1285
1286            let value = V::deserialize(&mut reader)?;
1287
1288            self.block_offset += start_len - reader.len();
1289
1290            return Ok(Some((self.current_key.clone(), value)));
1291        }
1292    }
1293}
1294
1295#[cfg(test)]
1296mod tests {
1297    use super::*;
1298
1299    #[test]
1300    fn test_bloom_filter_basic() {
1301        let mut bloom = BloomFilter::new(100, 10);
1302
1303        bloom.insert(b"hello");
1304        bloom.insert(b"world");
1305        bloom.insert(b"test");
1306
1307        assert!(bloom.may_contain(b"hello"));
1308        assert!(bloom.may_contain(b"world"));
1309        assert!(bloom.may_contain(b"test"));
1310
1311        // These should likely return false (with ~1% false positive rate)
1312        assert!(!bloom.may_contain(b"notfound"));
1313        assert!(!bloom.may_contain(b"missing"));
1314    }
1315
1316    #[test]
1317    fn test_bloom_filter_serialization() {
1318        let mut bloom = BloomFilter::new(100, 10);
1319        bloom.insert(b"key1");
1320        bloom.insert(b"key2");
1321
1322        let bytes = bloom.to_bytes();
1323        let restored = BloomFilter::from_bytes(&bytes).unwrap();
1324
1325        assert!(restored.may_contain(b"key1"));
1326        assert!(restored.may_contain(b"key2"));
1327        assert!(!restored.may_contain(b"key3"));
1328    }
1329
1330    #[test]
1331    fn test_bloom_filter_false_positive_rate() {
1332        let num_keys = 10000;
1333        let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
1334
1335        // Insert keys
1336        for i in 0..num_keys {
1337            let key = format!("key_{}", i);
1338            bloom.insert(key.as_bytes());
1339        }
1340
1341        // All inserted keys should be found
1342        for i in 0..num_keys {
1343            let key = format!("key_{}", i);
1344            assert!(bloom.may_contain(key.as_bytes()));
1345        }
1346
1347        // Check false positive rate on non-existent keys
1348        let mut false_positives = 0;
1349        let test_count = 10000;
1350        for i in 0..test_count {
1351            let key = format!("nonexistent_{}", i);
1352            if bloom.may_contain(key.as_bytes()) {
1353                false_positives += 1;
1354            }
1355        }
1356
1357        // With 10 bits per key, expect ~1% false positive rate
1358        // Allow up to 3% due to hash function variance
1359        let fp_rate = false_positives as f64 / test_count as f64;
1360        assert!(
1361            fp_rate < 0.03,
1362            "False positive rate {} is too high",
1363            fp_rate
1364        );
1365    }
1366
1367    #[test]
1368    fn test_sstable_writer_config() {
1369        use crate::structures::IndexOptimization;
1370
1371        // Default = Adaptive
1372        let config = SSTableWriterConfig::default();
1373        assert_eq!(config.compression_level.0, 9); // BETTER
1374        assert!(!config.use_bloom_filter);
1375        assert!(!config.use_dictionary);
1376
1377        // Adaptive
1378        let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
1379        assert_eq!(adaptive.compression_level.0, 9);
1380        assert!(!adaptive.use_bloom_filter);
1381        assert!(!adaptive.use_dictionary);
1382
1383        // SizeOptimized
1384        let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
1385        assert_eq!(size.compression_level.0, 22); // MAX
1386        assert!(size.use_bloom_filter);
1387        assert!(size.use_dictionary);
1388
1389        // PerformanceOptimized
1390        let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
1391        assert_eq!(perf.compression_level.0, 1); // FAST
1392        assert!(perf.use_bloom_filter); // Bloom helps skip blocks fast
1393        assert!(!perf.use_dictionary);
1394
1395        // Aliases
1396        let fast = SSTableWriterConfig::fast();
1397        assert_eq!(fast.compression_level.0, 1);
1398
1399        let max = SSTableWriterConfig::max_compression();
1400        assert_eq!(max.compression_level.0, 22);
1401    }
1402
1403    #[test]
1404    fn test_vint_roundtrip() {
1405        let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
1406
1407        for &val in &test_values {
1408            let mut buf = Vec::new();
1409            write_vint(&mut buf, val).unwrap();
1410            let mut reader = buf.as_slice();
1411            let decoded = read_vint(&mut reader).unwrap();
1412            assert_eq!(val, decoded, "Failed for value {}", val);
1413        }
1414    }
1415
1416    #[test]
1417    fn test_common_prefix_len() {
1418        assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
1419        assert_eq!(common_prefix_len(b"hello", b"help"), 3);
1420        assert_eq!(common_prefix_len(b"hello", b"world"), 0);
1421        assert_eq!(common_prefix_len(b"", b"hello"), 0);
1422        assert_eq!(common_prefix_len(b"hello", b""), 0);
1423    }
1424}