Skip to main content

hermes_core/structures/
sstable_index.rs

1//! Memory-efficient SSTable index structures
2//!
3//! This module provides two approaches for memory-efficient block indexing:
4//!
5//! ## Option 1: FST-based Index (native feature)
6//! Uses a Finite State Transducer to map keys to block ordinals. The FST can be
7//! mmap'd directly without parsing into heap-allocated structures.
8//!
9//! ## Option 2: Mmap'd Raw Index
10//! Keeps the prefix-compressed block index as raw bytes and decodes entries
11//! on-demand during binary search. No heap allocation for the index.
12//!
13//! Both approaches use a compact BlockAddrStore with bitpacked offsets/lengths.
14
15use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
16use std::io::{self, Write};
17use std::ops::Range;
18
19use crate::directories::OwnedBytes;
20
21/// Block address - offset and length in the data section
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub struct BlockAddr {
24    pub offset: u64,
25    pub length: u32,
26}
27
28impl BlockAddr {
29    pub fn byte_range(&self) -> Range<u64> {
30        self.offset..self.offset + self.length as u64
31    }
32}
33
34/// Compact storage for block addresses using delta + bitpacking
35///
36/// Memory layout:
37/// - Header: num_blocks (u32) + offset_bits (u8) + length_bits (u8)
38/// - Bitpacked data: offsets and lengths interleaved
39///
40/// Uses delta encoding for offsets (blocks are sequential) and
41/// stores lengths directly (typically similar sizes).
42#[derive(Debug)]
43pub struct BlockAddrStore {
44    data: OwnedBytes,
45    num_blocks: u32,
46    offset_bits: u8,
47    length_bits: u8,
48    header_size: usize,
49}
50
51impl BlockAddrStore {
52    /// Build from a list of block addresses
53    pub fn build(addrs: &[BlockAddr]) -> io::Result<Vec<u8>> {
54        if addrs.is_empty() {
55            let mut buf = Vec::with_capacity(6);
56            buf.write_u32::<LittleEndian>(0)?;
57            buf.write_u8(0)?;
58            buf.write_u8(0)?;
59            return Ok(buf);
60        }
61
62        // Compute delta offsets and find max values for bit width
63        let mut deltas = Vec::with_capacity(addrs.len());
64        let mut prev_end: u64 = 0;
65        let mut max_delta: u64 = 0;
66        let mut max_length: u32 = 0;
67
68        for addr in addrs {
69            // Delta from end of previous block (handles gaps)
70            let delta = addr.offset.saturating_sub(prev_end);
71            deltas.push(delta);
72            max_delta = max_delta.max(delta);
73            max_length = max_length.max(addr.length);
74            prev_end = addr.offset + addr.length as u64;
75        }
76
77        // Compute bit widths
78        let offset_bits = if max_delta == 0 {
79            1
80        } else {
81            (64 - max_delta.leading_zeros()) as u8
82        };
83        let length_bits = if max_length == 0 {
84            1
85        } else {
86            (32 - max_length.leading_zeros()) as u8
87        };
88
89        // Calculate packed size
90        let bits_per_entry = offset_bits as usize + length_bits as usize;
91        let total_bits = bits_per_entry * addrs.len();
92        let packed_bytes = total_bits.div_ceil(8);
93
94        let mut buf = Vec::with_capacity(6 + packed_bytes);
95        buf.write_u32::<LittleEndian>(addrs.len() as u32)?;
96        buf.write_u8(offset_bits)?;
97        buf.write_u8(length_bits)?;
98
99        // Bitpack the data
100        let mut bit_writer = BitWriter::new(&mut buf);
101        for (i, addr) in addrs.iter().enumerate() {
102            bit_writer.write(deltas[i], offset_bits)?;
103            bit_writer.write(addr.length as u64, length_bits)?;
104        }
105        bit_writer.flush()?;
106
107        Ok(buf)
108    }
109
110    /// Load from raw bytes (zero-copy if mmap'd)
111    pub fn load(data: OwnedBytes) -> io::Result<Self> {
112        if data.len() < 6 {
113            return Err(io::Error::new(
114                io::ErrorKind::InvalidData,
115                "BlockAddrStore data too short",
116            ));
117        }
118
119        let mut reader = data.as_slice();
120        let num_blocks = reader.read_u32::<LittleEndian>()?;
121        let offset_bits = reader.read_u8()?;
122        let length_bits = reader.read_u8()?;
123
124        Ok(Self {
125            data,
126            num_blocks,
127            offset_bits,
128            length_bits,
129            header_size: 6,
130        })
131    }
132
133    /// Number of blocks
134    pub fn len(&self) -> usize {
135        self.num_blocks as usize
136    }
137
138    /// Check if empty
139    pub fn is_empty(&self) -> bool {
140        self.num_blocks == 0
141    }
142
143    /// Get block address by index (decodes on-demand)
144    pub fn get(&self, idx: usize) -> Option<BlockAddr> {
145        if idx >= self.num_blocks as usize {
146            return None;
147        }
148
149        let packed_data = &self.data.as_slice()[self.header_size..];
150
151        // We need to decode from the start to accumulate offsets
152        let mut bit_reader = BitReader::new(packed_data);
153        let mut current_offset: u64 = 0;
154
155        for i in 0..=idx {
156            let delta = bit_reader.read(self.offset_bits).ok()?;
157            let length = bit_reader.read(self.length_bits).ok()? as u32;
158
159            current_offset += delta;
160
161            if i == idx {
162                return Some(BlockAddr {
163                    offset: current_offset,
164                    length,
165                });
166            }
167
168            current_offset += length as u64;
169        }
170
171        None
172    }
173
174    /// Get all block addresses (for iteration)
175    pub fn all(&self) -> Vec<BlockAddr> {
176        let mut result = Vec::with_capacity(self.num_blocks as usize);
177        let packed_data = &self.data.as_slice()[self.header_size..];
178        let mut bit_reader = BitReader::new(packed_data);
179        let mut current_offset: u64 = 0;
180
181        for _ in 0..self.num_blocks {
182            if let (Ok(delta), Ok(length)) = (
183                bit_reader.read(self.offset_bits),
184                bit_reader.read(self.length_bits),
185            ) {
186                current_offset += delta;
187                result.push(BlockAddr {
188                    offset: current_offset,
189                    length: length as u32,
190                });
191                current_offset += length;
192            }
193        }
194
195        result
196    }
197}
198
199/// FST-based block index (Option 1)
200///
201/// Maps keys to block ordinals using an FST. The FST bytes can be mmap'd
202/// directly without any parsing or heap allocation.
203#[cfg(feature = "native")]
204pub struct FstBlockIndex {
205    fst: fst::Map<OwnedBytes>,
206    block_addrs: BlockAddrStore,
207}
208
209#[cfg(feature = "native")]
210impl FstBlockIndex {
211    /// Build FST index from keys and block addresses
212    pub fn build(entries: &[(Vec<u8>, BlockAddr)]) -> io::Result<Vec<u8>> {
213        use fst::MapBuilder;
214
215        // Build FST mapping keys to block ordinals
216        let mut fst_builder = MapBuilder::memory();
217        for (i, (key, _)) in entries.iter().enumerate() {
218            fst_builder
219                .insert(key, i as u64)
220                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
221        }
222        let fst_bytes = fst_builder
223            .into_inner()
224            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
225
226        // Build block address store
227        let addrs: Vec<BlockAddr> = entries.iter().map(|(_, addr)| *addr).collect();
228        let addr_bytes = BlockAddrStore::build(&addrs)?;
229
230        // Combine: fst_len (u32) + fst_bytes + addr_bytes
231        let mut result = Vec::with_capacity(4 + fst_bytes.len() + addr_bytes.len());
232        result.write_u32::<LittleEndian>(fst_bytes.len() as u32)?;
233        result.extend_from_slice(&fst_bytes);
234        result.extend_from_slice(&addr_bytes);
235
236        Ok(result)
237    }
238
239    /// Load from raw bytes
240    pub fn load(data: OwnedBytes) -> io::Result<Self> {
241        if data.len() < 4 {
242            return Err(io::Error::new(
243                io::ErrorKind::InvalidData,
244                "FstBlockIndex data too short",
245            ));
246        }
247
248        let fst_len = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
249
250        if data.len() < 4 + fst_len {
251            return Err(io::Error::new(
252                io::ErrorKind::InvalidData,
253                "FstBlockIndex FST data truncated",
254            ));
255        }
256
257        let fst_data = data.slice(4..4 + fst_len);
258        let addr_data = data.slice(4 + fst_len..data.len());
259
260        let fst =
261            fst::Map::new(fst_data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
262        let block_addrs = BlockAddrStore::load(addr_data)?;
263
264        Ok(Self { fst, block_addrs })
265    }
266
267    /// Look up the block index for a key
268    /// Returns the block ordinal that could contain this key
269    pub fn locate(&self, key: &[u8]) -> Option<usize> {
270        // FST gives us the block whose first_key <= key
271        // We need to find the largest key <= target
272        use fst::{IntoStreamer, Streamer};
273
274        let mut stream = self.fst.range().ge(key).into_stream();
275
276        // If exact match, return that block
277        if let Some((found_key, ordinal)) = stream.next()
278            && found_key == key
279        {
280            return Some(ordinal as usize);
281        }
282
283        // Otherwise, find the block before (largest key < target)
284        let mut stream = self.fst.range().lt(key).into_stream();
285        let mut last_ordinal = None;
286        while let Some((_, ordinal)) = stream.next() {
287            last_ordinal = Some(ordinal as usize);
288        }
289
290        last_ordinal
291    }
292
293    /// Get block address by ordinal
294    pub fn get_addr(&self, ordinal: usize) -> Option<BlockAddr> {
295        self.block_addrs.get(ordinal)
296    }
297
298    /// Number of blocks
299    pub fn len(&self) -> usize {
300        self.block_addrs.len()
301    }
302
303    /// Check if empty
304    pub fn is_empty(&self) -> bool {
305        self.block_addrs.is_empty()
306    }
307
308    /// Get all block addresses
309    pub fn all_addrs(&self) -> Vec<BlockAddr> {
310        self.block_addrs.all()
311    }
312}
313
314/// Mmap'd raw block index (Option 2)
315///
316/// Keeps the prefix-compressed block index as raw bytes and decodes
317/// entries on-demand. Uses restart points every R entries for O(log N)
318/// lookup via binary search instead of O(N) linear scan.
319pub struct MmapBlockIndex {
320    data: OwnedBytes,
321    num_blocks: u32,
322    block_addrs: BlockAddrStore,
323    /// Offset where the prefix-compressed keys start
324    keys_offset: usize,
325    /// Offset where the keys section ends (restart array begins)
326    keys_end: usize,
327    /// Byte offset in data where the restart offsets array starts
328    restart_array_offset: usize,
329    /// Number of restart points
330    restart_count: usize,
331    /// Restart interval (R) — a restart point every R entries
332    restart_interval: usize,
333}
334
335/// Restart interval: store full (uncompressed) key every R entries
336const RESTART_INTERVAL: usize = 16;
337
338impl MmapBlockIndex {
339    /// Build mmap-friendly index from entries.
340    ///
341    /// Format: `num_blocks (u32) | BlockAddrStore | prefix-compressed keys
342    /// (with restart points) | restart_offsets[..] | restart_count (u32) | restart_interval (u16)`
343    pub fn build(entries: &[(Vec<u8>, BlockAddr)]) -> io::Result<Vec<u8>> {
344        if entries.is_empty() {
345            let mut buf = Vec::with_capacity(16);
346            buf.write_u32::<LittleEndian>(0)?; // num_blocks
347            buf.extend_from_slice(&BlockAddrStore::build(&[])?);
348            // Empty restart array + footer
349            buf.write_u32::<LittleEndian>(0)?; // restart_count
350            buf.write_u16::<LittleEndian>(RESTART_INTERVAL as u16)?;
351            return Ok(buf);
352        }
353
354        // Build block address store
355        let addrs: Vec<BlockAddr> = entries.iter().map(|(_, addr)| *addr).collect();
356        let addr_bytes = BlockAddrStore::build(&addrs)?;
357
358        // Build prefix-compressed keys with restart points
359        let mut keys_buf = Vec::new();
360        let mut prev_key: Vec<u8> = Vec::new();
361        let mut restart_offsets: Vec<u32> = Vec::new();
362
363        for (i, (key, _)) in entries.iter().enumerate() {
364            let is_restart = i % RESTART_INTERVAL == 0;
365
366            if is_restart {
367                restart_offsets.push(keys_buf.len() as u32);
368                // Store full key (no prefix compression)
369                write_vint(&mut keys_buf, 0)?;
370                write_vint(&mut keys_buf, key.len() as u64)?;
371                keys_buf.extend_from_slice(key);
372            } else {
373                let prefix_len = common_prefix_len(&prev_key, key);
374                let suffix = &key[prefix_len..];
375                write_vint(&mut keys_buf, prefix_len as u64)?;
376                write_vint(&mut keys_buf, suffix.len() as u64)?;
377                keys_buf.extend_from_slice(suffix);
378            }
379
380            prev_key.clear();
381            prev_key.extend_from_slice(key);
382        }
383
384        // Combine: num_blocks + addr_bytes + keys + restart_offsets + footer
385        let restart_count = restart_offsets.len();
386        let mut result =
387            Vec::with_capacity(4 + addr_bytes.len() + keys_buf.len() + restart_count * 4 + 6);
388        result.write_u32::<LittleEndian>(entries.len() as u32)?;
389        result.extend_from_slice(&addr_bytes);
390        result.extend_from_slice(&keys_buf);
391
392        // Write restart offsets array
393        for &off in &restart_offsets {
394            result.write_u32::<LittleEndian>(off)?;
395        }
396
397        // Write footer
398        result.write_u32::<LittleEndian>(restart_count as u32)?;
399        result.write_u16::<LittleEndian>(RESTART_INTERVAL as u16)?;
400
401        Ok(result)
402    }
403
404    /// Load from raw bytes
405    pub fn load(data: OwnedBytes) -> io::Result<Self> {
406        if data.len() < 4 {
407            return Err(io::Error::new(
408                io::ErrorKind::InvalidData,
409                "MmapBlockIndex data too short",
410            ));
411        }
412
413        let num_blocks = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
414
415        // Load block addresses
416        let addr_data_start = 4;
417        let remaining = data.slice(addr_data_start..data.len());
418        let block_addrs = BlockAddrStore::load(remaining.clone())?;
419
420        // Calculate where keys start
421        let bits_per_entry = block_addrs.offset_bits as usize + block_addrs.length_bits as usize;
422        let total_bits = bits_per_entry * num_blocks as usize;
423        let addr_packed_size = total_bits.div_ceil(8);
424        let keys_offset = addr_data_start + 6 + addr_packed_size; // 6 = header of BlockAddrStore
425
426        // Read footer (last 6 bytes: restart_count u32 + restart_interval u16)
427        if data.len() < keys_offset + 6 {
428            return Err(io::Error::new(
429                io::ErrorKind::InvalidData,
430                "MmapBlockIndex missing restart footer",
431            ));
432        }
433        let footer_start = data.len() - 6;
434        let restart_count = u32::from_le_bytes([
435            data[footer_start],
436            data[footer_start + 1],
437            data[footer_start + 2],
438            data[footer_start + 3],
439        ]) as usize;
440        let restart_interval =
441            u16::from_le_bytes([data[footer_start + 4], data[footer_start + 5]]) as usize;
442
443        // Restart offsets array: restart_count × 4 bytes, just before footer
444        let restart_array_offset = footer_start - restart_count * 4;
445
446        // Keys section spans from keys_offset to restart_array_offset
447        let keys_end = restart_array_offset;
448
449        Ok(Self {
450            data,
451            num_blocks,
452            block_addrs,
453            keys_offset,
454            keys_end,
455            restart_array_offset,
456            restart_count,
457            restart_interval,
458        })
459    }
460
461    /// Read restart offset at given index directly from mmap'd data
462    #[inline]
463    fn restart_offset(&self, idx: usize) -> u32 {
464        let pos = self.restart_array_offset + idx * 4;
465        u32::from_le_bytes([
466            self.data[pos],
467            self.data[pos + 1],
468            self.data[pos + 2],
469            self.data[pos + 3],
470        ])
471    }
472
473    /// Decode the full key at a restart point (prefix_len is always 0)
474    fn decode_restart_key<'a>(&self, keys_data: &'a [u8], restart_idx: usize) -> &'a [u8] {
475        let offset = self.restart_offset(restart_idx) as usize;
476        let mut reader = &keys_data[offset..];
477
478        let prefix_len = read_vint(&mut reader).unwrap_or(0) as usize;
479        debug_assert_eq!(prefix_len, 0, "restart point should have prefix_len=0");
480        let suffix_len = read_vint(&mut reader).unwrap_or(0) as usize;
481
482        // reader now points to the suffix bytes
483        &reader[..suffix_len]
484    }
485
486    /// O(log(N/R) + R) lookup using binary search on restart points, then
487    /// linear scan with prefix decompression within the interval.
488    pub fn locate(&self, target: &[u8]) -> Option<usize> {
489        if self.num_blocks == 0 {
490            return None;
491        }
492
493        let keys_data = &self.data.as_slice()[self.keys_offset..self.keys_end];
494
495        // Binary search on restart points to find the interval
496        let mut lo = 0usize;
497        let mut hi = self.restart_count;
498
499        while lo < hi {
500            let mid = lo + (hi - lo) / 2;
501            let key = self.decode_restart_key(keys_data, mid);
502            match key.cmp(target) {
503                std::cmp::Ordering::Equal => {
504                    return Some(mid * self.restart_interval);
505                }
506                std::cmp::Ordering::Less => lo = mid + 1,
507                std::cmp::Ordering::Greater => hi = mid,
508            }
509        }
510
511        // lo is the first restart point whose key > target (or restart_count)
512        // Search in the interval starting at restart (lo - 1), or 0 if lo == 0
513        if lo == 0 {
514            // target < first restart key — might be before all keys
515            // but we still need to scan from the beginning
516        }
517
518        let restart_idx = if lo > 0 { lo - 1 } else { 0 };
519        let start_ordinal = restart_idx * self.restart_interval;
520        let end_ordinal = if restart_idx + 1 < self.restart_count {
521            (restart_idx + 1) * self.restart_interval
522        } else {
523            self.num_blocks as usize
524        };
525
526        // Linear scan from restart point through at most R entries
527        let scan_offset = self.restart_offset(restart_idx) as usize;
528        let mut reader = &keys_data[scan_offset..];
529        let mut current_key = Vec::new();
530        let mut last_le_block: Option<usize> = None;
531
532        for i in start_ordinal..end_ordinal {
533            let prefix_len = match read_vint(&mut reader) {
534                Ok(v) => v as usize,
535                Err(_) => break,
536            };
537            let suffix_len = match read_vint(&mut reader) {
538                Ok(v) => v as usize,
539                Err(_) => break,
540            };
541
542            current_key.truncate(prefix_len);
543            if suffix_len > reader.len() {
544                break;
545            }
546            current_key.extend_from_slice(&reader[..suffix_len]);
547            reader = &reader[suffix_len..];
548
549            match current_key.as_slice().cmp(target) {
550                std::cmp::Ordering::Equal => return Some(i),
551                std::cmp::Ordering::Less => last_le_block = Some(i),
552                std::cmp::Ordering::Greater => return last_le_block,
553            }
554        }
555
556        last_le_block
557    }
558
559    /// Get block address by ordinal
560    pub fn get_addr(&self, ordinal: usize) -> Option<BlockAddr> {
561        self.block_addrs.get(ordinal)
562    }
563
564    /// Number of blocks
565    pub fn len(&self) -> usize {
566        self.num_blocks as usize
567    }
568
569    /// Check if empty
570    pub fn is_empty(&self) -> bool {
571        self.num_blocks == 0
572    }
573
574    /// Get all block addresses
575    pub fn all_addrs(&self) -> Vec<BlockAddr> {
576        self.block_addrs.all()
577    }
578
579    /// Decode all keys (for debugging/merging)
580    pub fn all_keys(&self) -> Vec<Vec<u8>> {
581        let mut result = Vec::with_capacity(self.num_blocks as usize);
582        let keys_data = &self.data.as_slice()[self.keys_offset..self.keys_end];
583        let mut reader = keys_data;
584        let mut current_key = Vec::new();
585
586        for _ in 0..self.num_blocks {
587            let prefix_len = match read_vint(&mut reader) {
588                Ok(v) => v as usize,
589                Err(_) => break,
590            };
591            let suffix_len = match read_vint(&mut reader) {
592                Ok(v) => v as usize,
593                Err(_) => break,
594            };
595
596            current_key.truncate(prefix_len);
597            if suffix_len > reader.len() {
598                break;
599            }
600            current_key.extend_from_slice(&reader[..suffix_len]);
601            reader = &reader[suffix_len..];
602
603            result.push(current_key.clone());
604        }
605
606        result
607    }
608}
609
610/// Unified block index that can use either FST or mmap'd raw index
611pub enum BlockIndex {
612    #[cfg(feature = "native")]
613    Fst(FstBlockIndex),
614    Mmap(MmapBlockIndex),
615}
616
617impl BlockIndex {
618    /// Locate the block that could contain the key
619    pub fn locate(&self, key: &[u8]) -> Option<usize> {
620        match self {
621            #[cfg(feature = "native")]
622            BlockIndex::Fst(idx) => idx.locate(key),
623            BlockIndex::Mmap(idx) => idx.locate(key),
624        }
625    }
626
627    /// Get block address by ordinal
628    pub fn get_addr(&self, ordinal: usize) -> Option<BlockAddr> {
629        match self {
630            #[cfg(feature = "native")]
631            BlockIndex::Fst(idx) => idx.get_addr(ordinal),
632            BlockIndex::Mmap(idx) => idx.get_addr(ordinal),
633        }
634    }
635
636    /// Number of blocks
637    pub fn len(&self) -> usize {
638        match self {
639            #[cfg(feature = "native")]
640            BlockIndex::Fst(idx) => idx.len(),
641            BlockIndex::Mmap(idx) => idx.len(),
642        }
643    }
644
645    /// Check if empty
646    pub fn is_empty(&self) -> bool {
647        self.len() == 0
648    }
649
650    /// Get all block addresses
651    pub fn all_addrs(&self) -> Vec<BlockAddr> {
652        match self {
653            #[cfg(feature = "native")]
654            BlockIndex::Fst(idx) => idx.all_addrs(),
655            BlockIndex::Mmap(idx) => idx.all_addrs(),
656        }
657    }
658}
659
660// ============================================================================
661// Helper functions
662// ============================================================================
663
664fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
665    a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
666}
667
668fn write_vint<W: Write>(writer: &mut W, mut value: u64) -> io::Result<()> {
669    loop {
670        let byte = (value & 0x7F) as u8;
671        value >>= 7;
672        if value == 0 {
673            writer.write_all(&[byte])?;
674            return Ok(());
675        } else {
676            writer.write_all(&[byte | 0x80])?;
677        }
678    }
679}
680
681fn read_vint(reader: &mut &[u8]) -> io::Result<u64> {
682    let mut result = 0u64;
683    let mut shift = 0;
684
685    loop {
686        if reader.is_empty() {
687            return Err(io::Error::new(
688                io::ErrorKind::UnexpectedEof,
689                "Unexpected end of varint",
690            ));
691        }
692        let byte = reader[0];
693        *reader = &reader[1..];
694        result |= ((byte & 0x7F) as u64) << shift;
695        if byte & 0x80 == 0 {
696            return Ok(result);
697        }
698        shift += 7;
699        if shift >= 64 {
700            return Err(io::Error::new(
701                io::ErrorKind::InvalidData,
702                "Varint too long",
703            ));
704        }
705    }
706}
707
708/// Simple bit writer for packing
709struct BitWriter<'a> {
710    output: &'a mut Vec<u8>,
711    buffer: u64,
712    bits_in_buffer: u8,
713}
714
715impl<'a> BitWriter<'a> {
716    fn new(output: &'a mut Vec<u8>) -> Self {
717        Self {
718            output,
719            buffer: 0,
720            bits_in_buffer: 0,
721        }
722    }
723
724    fn write(&mut self, value: u64, num_bits: u8) -> io::Result<()> {
725        debug_assert!(num_bits <= 64);
726
727        self.buffer |= value << self.bits_in_buffer;
728        self.bits_in_buffer += num_bits;
729
730        while self.bits_in_buffer >= 8 {
731            self.output.push(self.buffer as u8);
732            self.buffer >>= 8;
733            self.bits_in_buffer -= 8;
734        }
735
736        Ok(())
737    }
738
739    fn flush(&mut self) -> io::Result<()> {
740        if self.bits_in_buffer > 0 {
741            self.output.push(self.buffer as u8);
742            self.buffer = 0;
743            self.bits_in_buffer = 0;
744        }
745        Ok(())
746    }
747}
748
749/// Simple bit reader for unpacking
750struct BitReader<'a> {
751    data: &'a [u8],
752    byte_pos: usize,
753    bit_pos: u8,
754}
755
756impl<'a> BitReader<'a> {
757    fn new(data: &'a [u8]) -> Self {
758        Self {
759            data,
760            byte_pos: 0,
761            bit_pos: 0,
762        }
763    }
764
765    fn read(&mut self, num_bits: u8) -> io::Result<u64> {
766        if num_bits == 0 {
767            return Ok(0);
768        }
769
770        let mut result: u64 = 0;
771        let mut bits_read: u8 = 0;
772
773        while bits_read < num_bits {
774            if self.byte_pos >= self.data.len() {
775                return Err(io::Error::new(
776                    io::ErrorKind::UnexpectedEof,
777                    "Not enough bits",
778                ));
779            }
780
781            let bits_available = 8 - self.bit_pos;
782            let bits_to_read = (num_bits - bits_read).min(bits_available);
783            // Handle edge case where bits_to_read == 8 to avoid overflow
784            let mask = if bits_to_read >= 8 {
785                0xFF
786            } else {
787                (1u8 << bits_to_read) - 1
788            };
789            let bits = (self.data[self.byte_pos] >> self.bit_pos) & mask;
790
791            result |= (bits as u64) << bits_read;
792            bits_read += bits_to_read;
793            self.bit_pos += bits_to_read;
794
795            if self.bit_pos >= 8 {
796                self.byte_pos += 1;
797                self.bit_pos = 0;
798            }
799        }
800
801        Ok(result)
802    }
803}
804
805#[cfg(test)]
806mod tests {
807    use super::*;
808
809    #[test]
810    fn test_block_addr_store_roundtrip() {
811        let addrs = vec![
812            BlockAddr {
813                offset: 0,
814                length: 1000,
815            },
816            BlockAddr {
817                offset: 1000,
818                length: 1500,
819            },
820            BlockAddr {
821                offset: 2500,
822                length: 800,
823            },
824            BlockAddr {
825                offset: 3300,
826                length: 2000,
827            },
828        ];
829
830        let bytes = BlockAddrStore::build(&addrs).unwrap();
831        let store = BlockAddrStore::load(OwnedBytes::new(bytes)).unwrap();
832
833        assert_eq!(store.len(), 4);
834        for (i, expected) in addrs.iter().enumerate() {
835            let actual = store.get(i).unwrap();
836            assert_eq!(actual.offset, expected.offset, "offset mismatch at {}", i);
837            assert_eq!(actual.length, expected.length, "length mismatch at {}", i);
838        }
839    }
840
841    #[test]
842    fn test_block_addr_store_empty() {
843        let bytes = BlockAddrStore::build(&[]).unwrap();
844        let store = BlockAddrStore::load(OwnedBytes::new(bytes)).unwrap();
845        assert_eq!(store.len(), 0);
846        assert!(store.get(0).is_none());
847    }
848
849    #[test]
850    fn test_mmap_block_index_roundtrip() {
851        let entries = vec![
852            (
853                b"aaa".to_vec(),
854                BlockAddr {
855                    offset: 0,
856                    length: 100,
857                },
858            ),
859            (
860                b"bbb".to_vec(),
861                BlockAddr {
862                    offset: 100,
863                    length: 150,
864                },
865            ),
866            (
867                b"ccc".to_vec(),
868                BlockAddr {
869                    offset: 250,
870                    length: 200,
871                },
872            ),
873        ];
874
875        let bytes = MmapBlockIndex::build(&entries).unwrap();
876        let index = MmapBlockIndex::load(OwnedBytes::new(bytes)).unwrap();
877
878        assert_eq!(index.len(), 3);
879
880        // Test locate
881        assert_eq!(index.locate(b"aaa"), Some(0));
882        assert_eq!(index.locate(b"bbb"), Some(1));
883        assert_eq!(index.locate(b"ccc"), Some(2));
884        assert_eq!(index.locate(b"aab"), Some(0)); // Between aaa and bbb
885        assert_eq!(index.locate(b"ddd"), Some(2)); // After all keys
886        assert_eq!(index.locate(b"000"), None); // Before all keys
887    }
888
889    #[cfg(feature = "native")]
890    #[test]
891    fn test_fst_block_index_roundtrip() {
892        let entries = vec![
893            (
894                b"aaa".to_vec(),
895                BlockAddr {
896                    offset: 0,
897                    length: 100,
898                },
899            ),
900            (
901                b"bbb".to_vec(),
902                BlockAddr {
903                    offset: 100,
904                    length: 150,
905                },
906            ),
907            (
908                b"ccc".to_vec(),
909                BlockAddr {
910                    offset: 250,
911                    length: 200,
912                },
913            ),
914        ];
915
916        let bytes = FstBlockIndex::build(&entries).unwrap();
917        let index = FstBlockIndex::load(OwnedBytes::new(bytes)).unwrap();
918
919        assert_eq!(index.len(), 3);
920
921        // Test locate
922        assert_eq!(index.locate(b"aaa"), Some(0));
923        assert_eq!(index.locate(b"bbb"), Some(1));
924        assert_eq!(index.locate(b"ccc"), Some(2));
925        assert_eq!(index.locate(b"aab"), Some(0)); // Between aaa and bbb
926        assert_eq!(index.locate(b"ddd"), Some(2)); // After all keys
927    }
928
929    #[test]
930    fn test_bit_writer_reader() {
931        let mut buf = Vec::new();
932        let mut writer = BitWriter::new(&mut buf);
933
934        writer.write(5, 3).unwrap(); // 101
935        writer.write(3, 2).unwrap(); // 11
936        writer.write(15, 4).unwrap(); // 1111
937        writer.flush().unwrap();
938
939        let mut reader = BitReader::new(&buf);
940        assert_eq!(reader.read(3).unwrap(), 5);
941        assert_eq!(reader.read(2).unwrap(), 3);
942        assert_eq!(reader.read(4).unwrap(), 15);
943    }
944}