Skip to main content

sochdb_storage/sstable/
block.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Block Encoding with Restart Points and Hash Index
19//!
20//! This module implements the block format used in SSTables. Each block
21//! contains a sequence of key-value pairs with prefix compression and
22//! restart points for efficient random access.
23//!
24//! ## Block Format
25//!
26//! ```text
27//! ┌─────────────────────────────────────────────────────────────────────────┐
28//! │                           Block Contents                                 │
29//! ├─────────────────────────────────────────────────────────────────────────┤
30//! │ Entry 0: [shared_len][unshared_len][value_len][unshared_key][value]     │
31//! │ Entry 1: [shared_len][unshared_len][value_len][unshared_key][value]     │
32//! │ ...                                                                      │
33//! │ Entry N-1                                                                │
34//! ├─────────────────────────────────────────────────────────────────────────┤
35//! │ Restart Points: [offset_0][offset_1]...[offset_R-1][num_restarts]       │
36//! ├─────────────────────────────────────────────────────────────────────────┤
37//! │ Optional Hash Index: [bucket_0][bucket_1]...[bucket_B-1][num_buckets]   │
38//! ├─────────────────────────────────────────────────────────────────────────┤
39//! │ Block Trailer: [type(1)][checksum(4)]                                   │
40//! └─────────────────────────────────────────────────────────────────────────┘
41//! ```
42//!
43//! ## Prefix Compression
44//!
45//! Keys are prefix-compressed within a restart interval:
46//! - `shared_len`: Number of bytes shared with previous key
47//! - `unshared_len`: Number of non-shared key bytes following
48//! - `value_len`: Length of value
49//!
50//! At restart points, `shared_len = 0` (full key stored).
51//!
52//! ## Complexity Analysis
53//!
54//! | Operation    | Without Hash | With Hash Index     |
55//! |--------------|--------------|---------------------|
56//! | Point lookup | O(log n/r + r) | O(1) expected     |
57//! | Seek         | O(log n/r + r) | O(log n/r + r)    |
58//! | Iterate      | O(n)           | O(n)              |
59//!
60//! Where n = entries, r = restart interval (typically 16).
61
62use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
63use std::cmp::Ordering;
64use std::io::{Cursor, Read};
65
66/// Default restart interval (entries between restart points)
67pub const DEFAULT_RESTART_INTERVAL: usize = 16;
68
69/// Default number of hash buckets per entry (for hash index)
70pub const DEFAULT_HASH_BUCKET_RATIO: f64 = 0.75;
71
72/// Block compression type
73#[repr(u8)]
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum BlockType {
76    /// Uncompressed block
77    Uncompressed = 0,
78    /// Snappy compressed
79    Snappy = 1,
80    /// LZ4 compressed
81    Lz4 = 2,
82    /// Zstd compressed
83    Zstd = 3,
84}
85
86impl TryFrom<u8> for BlockType {
87    type Error = ();
88
89    fn try_from(value: u8) -> Result<Self, Self::Error> {
90        match value {
91            0 => Ok(BlockType::Uncompressed),
92            1 => Ok(BlockType::Snappy),
93            2 => Ok(BlockType::Lz4),
94            3 => Ok(BlockType::Zstd),
95            _ => Err(()),
96        }
97    }
98}
99
100impl BlockType {
101    /// Convert from u8
102    pub fn from_u8(value: u8) -> Self {
103        Self::try_from(value).unwrap_or(BlockType::Uncompressed)
104    }
105}
106
107/// Handle to a block (offset + size)
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub struct BlockHandle {
110    /// Offset in file
111    pub offset: u64,
112    /// Size of block data (excluding trailer)
113    pub size: u64,
114}
115
116impl BlockHandle {
117    pub fn new(offset: u64, size: u64) -> Self {
118        Self { offset, size }
119    }
120
121    /// Get offset
122    pub fn offset(&self) -> u64 {
123        self.offset
124    }
125
126    /// Get size
127    pub fn size(&self) -> u64 {
128        self.size
129    }
130
131    /// Encode to bytes (varint encoded for compactness)
132    pub fn encode(&self) -> Vec<u8> {
133        let mut buf = Vec::with_capacity(20);
134        encode_varint(&mut buf, self.offset);
135        encode_varint(&mut buf, self.size);
136        buf
137    }
138
139    /// Decode from bytes
140    pub fn decode(data: &[u8]) -> Option<(Self, usize)> {
141        let mut cursor = Cursor::new(data);
142        let offset = decode_varint(&mut cursor)?;
143        let size = decode_varint(&mut cursor)?;
144        Some((Self { offset, size }, cursor.position() as usize))
145    }
146}
147
148/// A restart point entry
149#[derive(Debug, Clone)]
150struct RestartPoint {
151    /// Offset within block data
152    offset: u32,
153}
154
155/// Hash bucket entry for hash index
156#[derive(Debug, Clone)]
157struct HashBucket {
158    /// Restart point index (0xFF = empty)
159    restart_index: u8,
160}
161
162// =============================================================================
163// Block Builder
164// =============================================================================
165
166/// Builder for creating blocks with prefix compression
167pub struct BlockBuilder {
168    /// Block data buffer
169    buffer: Vec<u8>,
170    /// Restart points (offsets into buffer)
171    restarts: Vec<u32>,
172    /// Entries since last restart
173    entries_since_restart: usize,
174    /// Restart interval
175    restart_interval: usize,
176    /// Last key (for prefix compression)
177    last_key: Vec<u8>,
178    /// Number of entries
179    entry_count: usize,
180    /// Whether to build hash index
181    use_hash_index: bool,
182    /// Keys for hash index (stored temporarily during build)
183    keys_for_hash: Vec<Vec<u8>>,
184}
185
186impl BlockBuilder {
187    /// Create a new block builder
188    pub fn new(restart_interval: usize) -> Self {
189        Self {
190            buffer: Vec::with_capacity(4096),
191            restarts: vec![0], // First restart at offset 0
192            entries_since_restart: 0,
193            restart_interval,
194            last_key: Vec::new(),
195            entry_count: 0,
196            use_hash_index: false,
197            keys_for_hash: Vec::new(),
198        }
199    }
200
201    /// Create with hash index enabled
202    pub fn with_hash_index(restart_interval: usize) -> Self {
203        Self {
204            use_hash_index: true,
205            ..Self::new(restart_interval)
206        }
207    }
208
209    /// Add a key-value pair to the block
210    ///
211    /// Keys must be added in sorted order.
212    pub fn add(&mut self, key: &[u8], value: &[u8]) {
213        debug_assert!(
214            self.entry_count == 0 || key > self.last_key.as_slice(),
215            "Keys must be added in sorted order"
216        );
217
218        // Check if we need a restart point
219        let shared = if self.entries_since_restart >= self.restart_interval {
220            // New restart point - store full key
221            self.restarts.push(self.buffer.len() as u32);
222            self.entries_since_restart = 0;
223            0
224        } else {
225            // Calculate shared prefix with last key
226            self.shared_prefix_len(&self.last_key, key)
227        };
228
229        let non_shared = key.len() - shared;
230        let value_len = value.len();
231
232        // Write entry: [shared_len][non_shared_len][value_len][key_delta][value]
233        encode_varint(&mut self.buffer, shared as u64);
234        encode_varint(&mut self.buffer, non_shared as u64);
235        encode_varint(&mut self.buffer, value_len as u64);
236        self.buffer.extend_from_slice(&key[shared..]);
237        self.buffer.extend_from_slice(value);
238
239        // Update state
240        self.last_key.clear();
241        self.last_key.extend_from_slice(key);
242        self.entries_since_restart += 1;
243        self.entry_count += 1;
244
245        // Store key for hash index
246        if self.use_hash_index {
247            self.keys_for_hash.push(key.to_vec());
248        }
249    }
250
251    /// Calculate shared prefix length
252    fn shared_prefix_len(&self, a: &[u8], b: &[u8]) -> usize {
253        let mut shared = 0;
254        let min_len = a.len().min(b.len());
255        while shared < min_len && a[shared] == b[shared] {
256            shared += 1;
257        }
258        shared
259    }
260
261    /// Finish building the block
262    ///
263    /// Returns the block contents including restarts and optional hash index.
264    pub fn finish(&mut self) -> Vec<u8> {
265        let mut result = std::mem::take(&mut self.buffer);
266
267        // Build hash index if enabled
268        if self.use_hash_index && self.entry_count > 0 {
269            self.build_hash_index(&mut result);
270        }
271
272        // Write restart points
273        for restart in &self.restarts {
274            result.write_u32::<LittleEndian>(*restart).unwrap();
275        }
276        result
277            .write_u32::<LittleEndian>(self.restarts.len() as u32)
278            .unwrap();
279
280        result
281    }
282
283    /// Build hash index for fast point lookups
284    fn build_hash_index(&self, data: &mut Vec<u8>) {
285        // Number of buckets = entries * bucket_ratio
286        let num_buckets = ((self.entry_count as f64 * DEFAULT_HASH_BUCKET_RATIO) as usize).max(1);
287        let mut buckets = vec![0xFFu8; num_buckets]; // 0xFF = empty
288
289        // For each key, compute hash and store restart point index
290        for (key_idx, key) in self.keys_for_hash.iter().enumerate() {
291            let restart_idx = key_idx / self.restart_interval;
292            let bucket = Self::hash_key(key) as usize % num_buckets;
293
294            // Simple linear probing for collisions
295            let mut probe = bucket;
296            for _ in 0..num_buckets {
297                if buckets[probe] == 0xFF {
298                    buckets[probe] = restart_idx as u8;
299                    break;
300                }
301                probe = (probe + 1) % num_buckets;
302            }
303        }
304
305        // Write hash index
306        data.extend_from_slice(&buckets);
307        data.write_u32::<LittleEndian>(num_buckets as u32).unwrap();
308    }
309
310    /// Hash a key for the hash index
311    fn hash_key(key: &[u8]) -> u32 {
312        // Use xxHash for speed
313        twox_hash::xxh3::hash64(key) as u32
314    }
315
316    /// Check if block is empty
317    pub fn is_empty(&self) -> bool {
318        self.entry_count == 0
319    }
320
321    /// Get approximate size of current block
322    pub fn estimated_size(&self) -> usize {
323        self.buffer.len() + self.restarts.len() * 4 + 4
324    }
325
326    /// Reset builder for reuse
327    pub fn reset(&mut self) {
328        self.buffer.clear();
329        self.restarts.clear();
330        self.restarts.push(0);
331        self.entries_since_restart = 0;
332        self.last_key.clear();
333        self.entry_count = 0;
334        self.keys_for_hash.clear();
335    }
336}
337
338impl Default for BlockBuilder {
339    fn default() -> Self {
340        Self::new(DEFAULT_RESTART_INTERVAL)
341    }
342}
343
344// =============================================================================
345// Block
346// =============================================================================
347
348/// An SSTable block with efficient key lookup
349pub struct Block {
350    /// Raw block data
351    data: Vec<u8>,
352    /// Offset where restart array begins
353    restarts_offset: usize,
354    /// Number of restart points
355    num_restarts: usize,
356    /// Number of hash buckets (0 if no hash index)
357    num_hash_buckets: usize,
358    /// Offset where hash index begins (if present)
359    hash_index_offset: usize,
360}
361
362impl Block {
363    /// Create a block from raw data
364    pub fn new(data: Vec<u8>) -> Option<Self> {
365        if data.len() < 4 {
366            return None;
367        }
368
369        // Read number of restarts (last 4 bytes)
370        let num_restarts = {
371            let mut cursor = Cursor::new(&data[data.len() - 4..]);
372            cursor.read_u32::<LittleEndian>().ok()? as usize
373        };
374
375        if num_restarts == 0 || data.len() < 4 + num_restarts * 4 {
376            return None;
377        }
378
379        let restarts_offset = data.len() - 4 - num_restarts * 4;
380
381        // Detect hash index if present.
382        // Block format WITH hash index:
383        //   [entries][hash_buckets (N bytes)][num_buckets: u32][restart_offsets][num_restarts: u32]
384        // Block format WITHOUT hash index:
385        //   [entries][restart_offsets][num_restarts: u32]
386        //
387        // Detection: read candidate num_buckets from 4 bytes before restart offsets.
388        // Verify that (a) it fits, and (b) all bucket bytes are 0xFF (empty) or
389        // valid restart indices (< num_restarts).
390        let (num_hash_buckets, hash_index_offset) =
391            Self::detect_hash_index(&data, restarts_offset, num_restarts);
392
393        Some(Self {
394            data,
395            restarts_offset,
396            num_restarts,
397            num_hash_buckets,
398            hash_index_offset,
399        })
400    }
401
402    /// Detect hash index between entry data and restart offsets.
403    ///
404    /// Returns `(num_buckets, entries_end_offset)`. If no hash index is
405    /// detected, returns `(0, restarts_offset)`.
406    fn detect_hash_index(
407        data: &[u8],
408        restarts_offset: usize,
409        num_restarts: usize,
410    ) -> (usize, usize) {
411        // Need at least 4 bytes before restarts for the num_buckets field
412        if restarts_offset < 4 {
413            return (0, restarts_offset);
414        }
415
416        // Read candidate num_buckets (u32 LE) just before restart offsets
417        let nb_offset = restarts_offset - 4;
418        let candidate = u32::from_le_bytes([
419            data[nb_offset],
420            data[nb_offset + 1],
421            data[nb_offset + 2],
422            data[nb_offset + 3],
423        ]) as usize;
424
425        // Sanity: num_buckets must be > 0 and the hash buckets must fit
426        if candidate == 0 || candidate > nb_offset {
427            return (0, restarts_offset);
428        }
429
430        let hash_start = nb_offset - candidate;
431
432        // Verify every bucket byte is either 0xFF (empty) or a valid
433        // restart index (< num_restarts).  This is the distinguishing
434        // property of the hash index — regular entry data almost never
435        // satisfies this for all bytes.
436        let all_valid = data[hash_start..nb_offset]
437            .iter()
438            .all(|&b| b == 0xFF || (b as usize) < num_restarts);
439
440        if all_valid {
441            (candidate, hash_start)
442        } else {
443            (0, restarts_offset)
444        }
445    }
446
447    /// Get restart point offset at given index
448    fn restart_offset(&self, index: usize) -> u32 {
449        debug_assert!(index < self.num_restarts);
450        let offset = self.restarts_offset + index * 4;
451        let mut cursor = Cursor::new(&self.data[offset..offset + 4]);
452        cursor.read_u32::<LittleEndian>().unwrap()
453    }
454
455    /// Seek to the first entry with key >= target
456    ///
457    /// Returns an iterator positioned at the target or past-the-end.
458    pub fn seek(&self, target: &[u8]) -> BlockIterator<'_> {
459        // Binary search on restart points
460        let mut left = 0;
461        let mut right = self.num_restarts;
462
463        while left < right {
464            let mid = left + (right - left) / 2;
465            let offset = self.restart_offset(mid) as usize;
466
467            // Read key at restart point (shared = 0)
468            let key = self.read_key_at(offset);
469
470            match key.as_slice().cmp(target) {
471                Ordering::Less => left = mid + 1,
472                Ordering::Greater => right = mid,
473                Ordering::Equal => {
474                    return BlockIterator::new(self, offset);
475                }
476            }
477        }
478
479        // left now points to the first restart point with key > target
480        // Start from the previous restart point and scan
481        let start_restart = if left > 0 { left - 1 } else { 0 };
482        let start_offset = self.restart_offset(start_restart) as usize;
483
484        let mut iter = BlockIterator::new(self, start_offset);
485        while iter.valid() {
486            if iter.key() >= target {
487                break;
488            }
489            iter.next();
490        }
491        iter
492    }
493
494    /// Get value for exact key match
495    ///
496    /// Uses hash index if available for O(1) expected time.
497    pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
498        // TODO: Use hash index if available
499
500        // Fall back to binary search
501        let iter = self.seek(key);
502        if iter.valid() && iter.key() == key {
503            Some(iter.value().to_vec())
504        } else {
505            None
506        }
507    }
508
509    /// Read the full key at a given offset (must be at a restart point)
510    fn read_key_at(&self, offset: usize) -> Vec<u8> {
511        let mut cursor = Cursor::new(&self.data[offset..self.hash_index_offset]);
512
513        let shared = decode_varint(&mut cursor).unwrap_or(0) as usize;
514        let non_shared = decode_varint(&mut cursor).unwrap_or(0) as usize;
515        let _value_len = decode_varint(&mut cursor);
516
517        debug_assert_eq!(shared, 0, "Expected restart point (shared = 0)");
518
519        let pos = cursor.position() as usize;
520        self.data[offset + pos..offset + pos + non_shared].to_vec()
521    }
522
523    /// Create an iterator over all entries
524    pub fn iter(&self) -> BlockIterator<'_> {
525        BlockIterator::new(self, 0)
526    }
527
528    /// Get block data
529    pub fn data(&self) -> &[u8] {
530        &self.data
531    }
532}
533
534// =============================================================================
535// Block Iterator
536// =============================================================================
537
538/// Iterator over block entries with prefix decompression
539pub struct BlockIterator<'a> {
540    block: &'a Block,
541    /// Current position in data
542    offset: usize,
543    /// Current reconstructed key
544    key: Vec<u8>,
545    /// Current value slice
546    value_start: usize,
547    value_len: usize,
548    /// Whether iterator is valid
549    valid: bool,
550}
551
552impl<'a> BlockIterator<'a> {
553    /// Create a new block iterator starting at the given offset
554    pub fn new(block: &'a Block, offset: usize) -> Self {
555        let mut iter = Self {
556            block,
557            offset,
558            key: Vec::new(),
559            value_start: 0,
560            value_len: 0,
561            valid: false,
562        };
563        iter.parse_entry();
564        iter
565    }
566
567    /// Check if iterator is valid
568    pub fn valid(&self) -> bool {
569        self.valid
570    }
571
572    /// Get current key
573    pub fn key(&self) -> &[u8] {
574        &self.key
575    }
576
577    /// Get current value
578    pub fn value(&self) -> &[u8] {
579        &self.block.data[self.value_start..self.value_start + self.value_len]
580    }
581
582    /// Move to next entry
583    pub fn next(&mut self) {
584        if !self.valid {
585            return;
586        }
587
588        // Move past current value
589        self.offset = self.value_start + self.value_len;
590        self.parse_entry();
591    }
592
593    /// Parse entry at current offset
594    fn parse_entry(&mut self) {
595        // Use hash_index_offset as the boundary — entry data ends before the
596        // hash index (if present), not at restarts_offset.
597        let entries_end = self.block.hash_index_offset;
598
599        if self.offset >= entries_end {
600            self.valid = false;
601            return;
602        }
603
604        let mut cursor = Cursor::new(&self.block.data[self.offset..entries_end]);
605
606        // Read header
607        let shared = match decode_varint(&mut cursor) {
608            Some(v) => v as usize,
609            None => {
610                self.valid = false;
611                return;
612            }
613        };
614        let non_shared = match decode_varint(&mut cursor) {
615            Some(v) => v as usize,
616            None => {
617                self.valid = false;
618                return;
619            }
620        };
621        let value_len = match decode_varint(&mut cursor) {
622            Some(v) => v as usize,
623            None => {
624                self.valid = false;
625                return;
626            }
627        };
628
629        let header_len = cursor.position() as usize;
630        let data_start = self.offset + header_len;
631
632        // Bounds check
633        if data_start + non_shared + value_len > entries_end {
634            self.valid = false;
635            return;
636        }
637
638        // Reconstruct key
639        self.key.truncate(shared);
640        self.key
641            .extend_from_slice(&self.block.data[data_start..data_start + non_shared]);
642
643        // Record value location
644        self.value_start = data_start + non_shared;
645        self.value_len = value_len;
646        self.valid = true;
647    }
648
649    /// Seek to first entry >= target
650    pub fn seek(&mut self, target: &[u8]) {
651        // For now, delegate to block's seek and copy state
652        let new_iter = self.block.seek(target);
653        self.offset = new_iter.offset;
654        self.key = new_iter.key;
655        self.value_start = new_iter.value_start;
656        self.value_len = new_iter.value_len;
657        self.valid = new_iter.valid;
658    }
659
660    /// Seek to first entry
661    pub fn seek_to_first(&mut self) {
662        self.offset = 0;
663        self.key.clear();
664        self.parse_entry();
665    }
666}
667
668// =============================================================================
669// Varint Encoding/Decoding
670// =============================================================================
671
672/// Encode a u64 as a varint
673fn encode_varint(buf: &mut Vec<u8>, mut value: u64) {
674    while value >= 0x80 {
675        buf.push((value as u8) | 0x80);
676        value >>= 7;
677    }
678    buf.push(value as u8);
679}
680
681/// Decode a varint from a cursor
682fn decode_varint<R: Read>(reader: &mut R) -> Option<u64> {
683    let mut result: u64 = 0;
684    let mut shift = 0;
685
686    loop {
687        let byte = reader.read_u8().ok()?;
688        result |= ((byte & 0x7F) as u64) << shift;
689        if byte & 0x80 == 0 {
690            break;
691        }
692        shift += 7;
693        if shift >= 64 {
694            return None; // Overflow
695        }
696    }
697
698    Some(result)
699}
700
701// =============================================================================
702// Tests
703// =============================================================================
704
705#[cfg(test)]
706mod tests {
707    use super::*;
708
709    #[test]
710    fn test_block_builder_single_entry() {
711        let mut builder = BlockBuilder::new(16);
712        builder.add(b"key1", b"value1");
713
714        let data = builder.finish();
715        let block = Block::new(data).unwrap();
716
717        assert_eq!(block.get(b"key1"), Some(b"value1".to_vec()));
718        assert_eq!(block.get(b"key2"), None);
719    }
720
721    #[test]
722    fn test_block_builder_multiple_entries() {
723        let mut builder = BlockBuilder::new(4);
724
725        for i in 0..20 {
726            let key = format!("key{:02}", i);
727            let value = format!("value{:02}", i);
728            builder.add(key.as_bytes(), value.as_bytes());
729        }
730
731        let data = builder.finish();
732        let block = Block::new(data).unwrap();
733
734        // Test all keys
735        for i in 0..20 {
736            let key = format!("key{:02}", i);
737            let expected_value = format!("value{:02}", i);
738            assert_eq!(block.get(key.as_bytes()), Some(expected_value.into_bytes()));
739        }
740    }
741
742    #[test]
743    fn test_block_iterator() {
744        let mut builder = BlockBuilder::new(4);
745
746        builder.add(b"apple", b"1");
747        builder.add(b"banana", b"2");
748        builder.add(b"cherry", b"3");
749        builder.add(b"date", b"4");
750
751        let data = builder.finish();
752        let block = Block::new(data).unwrap();
753
754        let mut iter = block.iter();
755        let mut count = 0;
756        while iter.valid() {
757            count += 1;
758            iter.next();
759        }
760        assert_eq!(count, 4);
761    }
762
763    #[test]
764    fn test_block_seek() {
765        let mut builder = BlockBuilder::new(2);
766
767        builder.add(b"a", b"1");
768        builder.add(b"c", b"2");
769        builder.add(b"e", b"3");
770        builder.add(b"g", b"4");
771
772        let data = builder.finish();
773        let block = Block::new(data).unwrap();
774
775        // Seek to existing key
776        let iter = block.seek(b"c");
777        assert!(iter.valid());
778        assert_eq!(iter.key(), b"c");
779
780        // Seek to non-existing key (should find next)
781        let iter = block.seek(b"d");
782        assert!(iter.valid());
783        assert_eq!(iter.key(), b"e");
784
785        // Seek past all keys
786        let iter = block.seek(b"z");
787        assert!(!iter.valid());
788    }
789
790    #[test]
791    fn test_prefix_compression() {
792        let mut builder = BlockBuilder::new(16);
793
794        // Keys with common prefix (MUST be in sorted order!)
795        builder.add(b"user:1000:age", b"30");
796        builder.add(b"user:1000:email", b"alice@example.com");
797        builder.add(b"user:1000:name", b"Alice");
798        builder.add(b"user:1001:name", b"Bob");
799
800        let data = builder.finish();
801
802        // Verify compression happened (data should be smaller than uncompressed)
803        let uncompressed_size = b"user:1000:age".len()
804            + b"30".len()
805            + b"user:1000:email".len()
806            + b"alice@example.com".len()
807            + b"user:1000:name".len()
808            + b"Alice".len()
809            + b"user:1001:name".len()
810            + b"Bob".len();
811
812        // Block should be smaller due to prefix compression
813        // (accounting for some overhead)
814        assert!(data.len() < uncompressed_size + 50);
815
816        // Verify all keys are retrievable
817        let block = Block::new(data).unwrap();
818        assert_eq!(block.get(b"user:1000:age"), Some(b"30".to_vec()));
819        assert_eq!(
820            block.get(b"user:1000:email"),
821            Some(b"alice@example.com".to_vec())
822        );
823        assert_eq!(block.get(b"user:1000:name"), Some(b"Alice".to_vec()));
824        assert_eq!(block.get(b"user:1001:name"), Some(b"Bob".to_vec()));
825    }
826
827    #[test]
828    fn test_varint_encoding() {
829        let test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
830
831        for &value in &test_values {
832            let mut buf = Vec::new();
833            encode_varint(&mut buf, value);
834
835            let mut cursor = Cursor::new(&buf);
836            let decoded = decode_varint(&mut cursor).unwrap();
837
838            assert_eq!(value, decoded, "Failed for value {}", value);
839        }
840    }
841
842    #[test]
843    fn test_block_handle() {
844        let handle = BlockHandle::new(12345, 67890);
845        let encoded = handle.encode();
846
847        let (decoded, len) = BlockHandle::decode(&encoded).unwrap();
848        assert_eq!(handle, decoded);
849        assert_eq!(len, encoded.len());
850    }
851}