Skip to main content

sochdb_storage/sstable/
block.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Block Encoding with Restart Points and Hash Index
19//!
20//! This module implements the block format used in SSTables. Each block
21//! contains a sequence of key-value pairs with prefix compression and
22//! restart points for efficient random access.
23//!
24//! ## Block Format
25//!
26//! ```text
27//! ┌─────────────────────────────────────────────────────────────────────────┐
28//! │                           Block Contents                                 │
29//! ├─────────────────────────────────────────────────────────────────────────┤
30//! │ Entry 0: [shared_len][unshared_len][value_len][unshared_key][value]     │
31//! │ Entry 1: [shared_len][unshared_len][value_len][unshared_key][value]     │
32//! │ ...                                                                      │
33//! │ Entry N-1                                                                │
34//! ├─────────────────────────────────────────────────────────────────────────┤
35//! │ Restart Points: [offset_0][offset_1]...[offset_R-1][num_restarts]       │
36//! ├─────────────────────────────────────────────────────────────────────────┤
37//! │ Optional Hash Index: [bucket_0][bucket_1]...[bucket_B-1][num_buckets]   │
38//! ├─────────────────────────────────────────────────────────────────────────┤
39//! │ Block Trailer: [type(1)][checksum(4)]                                   │
40//! └─────────────────────────────────────────────────────────────────────────┘
41//! ```
42//!
43//! ## Prefix Compression
44//!
45//! Keys are prefix-compressed within a restart interval:
46//! - `shared_len`: Number of bytes shared with previous key
47//! - `unshared_len`: Number of non-shared key bytes following
48//! - `value_len`: Length of value
49//!
50//! At restart points, `shared_len = 0` (full key stored).
51//!
52//! ## Complexity Analysis
53//!
54//! | Operation    | Without Hash | With Hash Index     |
55//! |--------------|--------------|---------------------|
56//! | Point lookup | O(log n/r + r) | O(1) expected     |
57//! | Seek         | O(log n/r + r) | O(log n/r + r)    |
58//! | Iterate      | O(n)           | O(n)              |
59//!
60//! Where n = entries, r = restart interval (typically 16).
61
62use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
63use std::cmp::Ordering;
64use std::io::{Cursor, Read, Write};
65
66/// Default restart interval (entries between restart points)
67pub const DEFAULT_RESTART_INTERVAL: usize = 16;
68
69/// Default number of hash buckets per entry (for hash index)
70pub const DEFAULT_HASH_BUCKET_RATIO: f64 = 0.75;
71
72/// Block compression type
73#[repr(u8)]
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum BlockType {
76    /// Uncompressed block
77    Uncompressed = 0,
78    /// Snappy compressed
79    Snappy = 1,
80    /// LZ4 compressed
81    Lz4 = 2,
82    /// Zstd compressed
83    Zstd = 3,
84}
85
86impl TryFrom<u8> for BlockType {
87    type Error = ();
88
89    fn try_from(value: u8) -> Result<Self, Self::Error> {
90        match value {
91            0 => Ok(BlockType::Uncompressed),
92            1 => Ok(BlockType::Snappy),
93            2 => Ok(BlockType::Lz4),
94            3 => Ok(BlockType::Zstd),
95            _ => Err(()),
96        }
97    }
98}
99
100impl BlockType {
101    /// Convert from u8
102    pub fn from_u8(value: u8) -> Self {
103        Self::try_from(value).unwrap_or(BlockType::Uncompressed)
104    }
105}
106
107/// Handle to a block (offset + size)
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub struct BlockHandle {
110    /// Offset in file
111    pub offset: u64,
112    /// Size of block data (excluding trailer)
113    pub size: u64,
114}
115
116impl BlockHandle {
117    pub fn new(offset: u64, size: u64) -> Self {
118        Self { offset, size }
119    }
120
121    /// Get offset
122    pub fn offset(&self) -> u64 {
123        self.offset
124    }
125
126    /// Get size
127    pub fn size(&self) -> u64 {
128        self.size
129    }
130
131    /// Encode to bytes (varint encoded for compactness)
132    pub fn encode(&self) -> Vec<u8> {
133        let mut buf = Vec::with_capacity(20);
134        encode_varint(&mut buf, self.offset);
135        encode_varint(&mut buf, self.size);
136        buf
137    }
138
139    /// Decode from bytes
140    pub fn decode(data: &[u8]) -> Option<(Self, usize)> {
141        let mut cursor = Cursor::new(data);
142        let offset = decode_varint(&mut cursor)?;
143        let size = decode_varint(&mut cursor)?;
144        Some((Self { offset, size }, cursor.position() as usize))
145    }
146}
147
148/// A restart point entry
149#[derive(Debug, Clone)]
150struct RestartPoint {
151    /// Offset within block data
152    offset: u32,
153}
154
155/// Hash bucket entry for hash index
156#[derive(Debug, Clone)]
157struct HashBucket {
158    /// Restart point index (0xFF = empty)
159    restart_index: u8,
160}
161
162// =============================================================================
163// Block Builder
164// =============================================================================
165
166/// Builder for creating blocks with prefix compression
167pub struct BlockBuilder {
168    /// Block data buffer
169    buffer: Vec<u8>,
170    /// Restart points (offsets into buffer)
171    restarts: Vec<u32>,
172    /// Entries since last restart
173    entries_since_restart: usize,
174    /// Restart interval
175    restart_interval: usize,
176    /// Last key (for prefix compression)
177    last_key: Vec<u8>,
178    /// Number of entries
179    entry_count: usize,
180    /// Whether to build hash index
181    use_hash_index: bool,
182    /// Keys for hash index (stored temporarily during build)
183    keys_for_hash: Vec<Vec<u8>>,
184}
185
186impl BlockBuilder {
187    /// Create a new block builder
188    pub fn new(restart_interval: usize) -> Self {
189        Self {
190            buffer: Vec::with_capacity(4096),
191            restarts: vec![0], // First restart at offset 0
192            entries_since_restart: 0,
193            restart_interval,
194            last_key: Vec::new(),
195            entry_count: 0,
196            use_hash_index: false,
197            keys_for_hash: Vec::new(),
198        }
199    }
200
201    /// Create with hash index enabled
202    pub fn with_hash_index(restart_interval: usize) -> Self {
203        Self {
204            use_hash_index: true,
205            ..Self::new(restart_interval)
206        }
207    }
208
209    /// Add a key-value pair to the block
210    ///
211    /// Keys must be added in sorted order.
212    pub fn add(&mut self, key: &[u8], value: &[u8]) {
213        debug_assert!(
214            self.entry_count == 0 || key > self.last_key.as_slice(),
215            "Keys must be added in sorted order"
216        );
217
218        // Check if we need a restart point
219        let shared = if self.entries_since_restart >= self.restart_interval {
220            // New restart point - store full key
221            self.restarts.push(self.buffer.len() as u32);
222            self.entries_since_restart = 0;
223            0
224        } else {
225            // Calculate shared prefix with last key
226            self.shared_prefix_len(&self.last_key, key)
227        };
228
229        let non_shared = key.len() - shared;
230        let value_len = value.len();
231
232        // Write entry: [shared_len][non_shared_len][value_len][key_delta][value]
233        encode_varint(&mut self.buffer, shared as u64);
234        encode_varint(&mut self.buffer, non_shared as u64);
235        encode_varint(&mut self.buffer, value_len as u64);
236        self.buffer.extend_from_slice(&key[shared..]);
237        self.buffer.extend_from_slice(value);
238
239        // Update state
240        self.last_key.clear();
241        self.last_key.extend_from_slice(key);
242        self.entries_since_restart += 1;
243        self.entry_count += 1;
244
245        // Store key for hash index
246        if self.use_hash_index {
247            self.keys_for_hash.push(key.to_vec());
248        }
249    }
250
251    /// Calculate shared prefix length
252    fn shared_prefix_len(&self, a: &[u8], b: &[u8]) -> usize {
253        let mut shared = 0;
254        let min_len = a.len().min(b.len());
255        while shared < min_len && a[shared] == b[shared] {
256            shared += 1;
257        }
258        shared
259    }
260
261    /// Finish building the block
262    ///
263    /// Returns the block contents including restarts and optional hash index.
264    pub fn finish(&mut self) -> Vec<u8> {
265        let mut result = std::mem::take(&mut self.buffer);
266
267        // Build hash index if enabled
268        if self.use_hash_index && self.entry_count > 0 {
269            self.build_hash_index(&mut result);
270        }
271
272        // Write restart points
273        for restart in &self.restarts {
274            result.write_u32::<LittleEndian>(*restart).unwrap();
275        }
276        result
277            .write_u32::<LittleEndian>(self.restarts.len() as u32)
278            .unwrap();
279
280        result
281    }
282
283    /// Build hash index for fast point lookups
284    fn build_hash_index(&self, data: &mut Vec<u8>) {
285        // Number of buckets = entries * bucket_ratio
286        let num_buckets = ((self.entry_count as f64 * DEFAULT_HASH_BUCKET_RATIO) as usize).max(1);
287        let mut buckets = vec![0xFFu8; num_buckets]; // 0xFF = empty
288
289        // For each key, compute hash and store restart point index
290        for (key_idx, key) in self.keys_for_hash.iter().enumerate() {
291            let restart_idx = key_idx / self.restart_interval;
292            let bucket = Self::hash_key(key) as usize % num_buckets;
293            
294            // Simple linear probing for collisions
295            let mut probe = bucket;
296            for _ in 0..num_buckets {
297                if buckets[probe] == 0xFF {
298                    buckets[probe] = restart_idx as u8;
299                    break;
300                }
301                probe = (probe + 1) % num_buckets;
302            }
303        }
304
305        // Write hash index
306        data.extend_from_slice(&buckets);
307        data.write_u32::<LittleEndian>(num_buckets as u32).unwrap();
308    }
309
310    /// Hash a key for the hash index
311    fn hash_key(key: &[u8]) -> u32 {
312        // Use xxHash for speed
313        twox_hash::xxh3::hash64(key) as u32
314    }
315
316    /// Check if block is empty
317    pub fn is_empty(&self) -> bool {
318        self.entry_count == 0
319    }
320
321    /// Get approximate size of current block
322    pub fn estimated_size(&self) -> usize {
323        self.buffer.len() + self.restarts.len() * 4 + 4
324    }
325
326    /// Reset builder for reuse
327    pub fn reset(&mut self) {
328        self.buffer.clear();
329        self.restarts.clear();
330        self.restarts.push(0);
331        self.entries_since_restart = 0;
332        self.last_key.clear();
333        self.entry_count = 0;
334        self.keys_for_hash.clear();
335    }
336}
337
338impl Default for BlockBuilder {
339    fn default() -> Self {
340        Self::new(DEFAULT_RESTART_INTERVAL)
341    }
342}
343
344// =============================================================================
345// Block
346// =============================================================================
347
348/// An SSTable block with efficient key lookup
349pub struct Block {
350    /// Raw block data
351    data: Vec<u8>,
352    /// Offset where restart array begins
353    restarts_offset: usize,
354    /// Number of restart points
355    num_restarts: usize,
356    /// Number of hash buckets (0 if no hash index)
357    num_hash_buckets: usize,
358    /// Offset where hash index begins (if present)
359    hash_index_offset: usize,
360}
361
362impl Block {
363    /// Create a block from raw data
364    pub fn new(data: Vec<u8>) -> Option<Self> {
365        if data.len() < 4 {
366            return None;
367        }
368
369        // Read number of restarts (last 4 bytes)
370        let num_restarts = {
371            let mut cursor = Cursor::new(&data[data.len() - 4..]);
372            cursor.read_u32::<LittleEndian>().ok()? as usize
373        };
374
375        if num_restarts == 0 || data.len() < 4 + num_restarts * 4 {
376            return None;
377        }
378
379        let restarts_offset = data.len() - 4 - num_restarts * 4;
380
381        // Detect hash index if present.
382        // Block format WITH hash index:
383        //   [entries][hash_buckets (N bytes)][num_buckets: u32][restart_offsets][num_restarts: u32]
384        // Block format WITHOUT hash index:
385        //   [entries][restart_offsets][num_restarts: u32]
386        //
387        // Detection: read candidate num_buckets from 4 bytes before restart offsets.
388        // Verify that (a) it fits, and (b) all bucket bytes are 0xFF (empty) or
389        // valid restart indices (< num_restarts).
390        let (num_hash_buckets, hash_index_offset) = Self::detect_hash_index(&data, restarts_offset, num_restarts);
391
392        Some(Self {
393            data,
394            restarts_offset,
395            num_restarts,
396            num_hash_buckets,
397            hash_index_offset,
398        })
399    }
400
401    /// Detect hash index between entry data and restart offsets.
402    ///
403    /// Returns `(num_buckets, entries_end_offset)`. If no hash index is
404    /// detected, returns `(0, restarts_offset)`.
405    fn detect_hash_index(data: &[u8], restarts_offset: usize, num_restarts: usize) -> (usize, usize) {
406        // Need at least 4 bytes before restarts for the num_buckets field
407        if restarts_offset < 4 {
408            return (0, restarts_offset);
409        }
410
411        // Read candidate num_buckets (u32 LE) just before restart offsets
412        let nb_offset = restarts_offset - 4;
413        let candidate = u32::from_le_bytes([
414            data[nb_offset],
415            data[nb_offset + 1],
416            data[nb_offset + 2],
417            data[nb_offset + 3],
418        ]) as usize;
419
420        // Sanity: num_buckets must be > 0 and the hash buckets must fit
421        if candidate == 0 || candidate > nb_offset {
422            return (0, restarts_offset);
423        }
424
425        let hash_start = nb_offset - candidate;
426
427        // Verify every bucket byte is either 0xFF (empty) or a valid
428        // restart index (< num_restarts).  This is the distinguishing
429        // property of the hash index — regular entry data almost never
430        // satisfies this for all bytes.
431        let all_valid = data[hash_start..nb_offset]
432            .iter()
433            .all(|&b| b == 0xFF || (b as usize) < num_restarts);
434
435        if all_valid {
436            (candidate, hash_start)
437        } else {
438            (0, restarts_offset)
439        }
440    }
441
442    /// Get restart point offset at given index
443    fn restart_offset(&self, index: usize) -> u32 {
444        debug_assert!(index < self.num_restarts);
445        let offset = self.restarts_offset + index * 4;
446        let mut cursor = Cursor::new(&self.data[offset..offset + 4]);
447        cursor.read_u32::<LittleEndian>().unwrap()
448    }
449
450    /// Seek to the first entry with key >= target
451    ///
452    /// Returns an iterator positioned at the target or past-the-end.
453    pub fn seek(&self, target: &[u8]) -> BlockIterator<'_> {
454        // Binary search on restart points
455        let mut left = 0;
456        let mut right = self.num_restarts;
457
458        while left < right {
459            let mid = left + (right - left) / 2;
460            let offset = self.restart_offset(mid) as usize;
461            
462            // Read key at restart point (shared = 0)
463            let key = self.read_key_at(offset);
464            
465            match key.as_slice().cmp(target) {
466                Ordering::Less => left = mid + 1,
467                Ordering::Greater => right = mid,
468                Ordering::Equal => {
469                    return BlockIterator::new(self, offset);
470                }
471            }
472        }
473
474        // left now points to the first restart point with key > target
475        // Start from the previous restart point and scan
476        let start_restart = if left > 0 { left - 1 } else { 0 };
477        let start_offset = self.restart_offset(start_restart) as usize;
478        
479        let mut iter = BlockIterator::new(self, start_offset);
480        while iter.valid() {
481            if iter.key() >= target {
482                break;
483            }
484            iter.next();
485        }
486        iter
487    }
488
489    /// Get value for exact key match
490    ///
491    /// Uses hash index if available for O(1) expected time.
492    pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
493        // TODO: Use hash index if available
494        
495        // Fall back to binary search
496        let mut iter = self.seek(key);
497        if iter.valid() && iter.key() == key {
498            Some(iter.value().to_vec())
499        } else {
500            None
501        }
502    }
503
504    /// Read the full key at a given offset (must be at a restart point)
505    fn read_key_at(&self, offset: usize) -> Vec<u8> {
506        let mut cursor = Cursor::new(&self.data[offset..self.hash_index_offset]);
507        
508        let shared = decode_varint(&mut cursor).unwrap_or(0) as usize;
509        let non_shared = decode_varint(&mut cursor).unwrap_or(0) as usize;
510        let _value_len = decode_varint(&mut cursor);
511        
512        debug_assert_eq!(shared, 0, "Expected restart point (shared = 0)");
513        
514        let pos = cursor.position() as usize;
515        self.data[offset + pos..offset + pos + non_shared].to_vec()
516    }
517
518    /// Create an iterator over all entries
519    pub fn iter(&self) -> BlockIterator<'_> {
520        BlockIterator::new(self, 0)
521    }
522
523    /// Get block data
524    pub fn data(&self) -> &[u8] {
525        &self.data
526    }
527}
528
529// =============================================================================
530// Block Iterator
531// =============================================================================
532
533/// Iterator over block entries with prefix decompression
534pub struct BlockIterator<'a> {
535    block: &'a Block,
536    /// Current position in data
537    offset: usize,
538    /// Current reconstructed key
539    key: Vec<u8>,
540    /// Current value slice
541    value_start: usize,
542    value_len: usize,
543    /// Whether iterator is valid
544    valid: bool,
545}
546
547impl<'a> BlockIterator<'a> {
548    /// Create a new block iterator starting at the given offset
549    pub fn new(block: &'a Block, offset: usize) -> Self {
550        let mut iter = Self {
551            block,
552            offset,
553            key: Vec::new(),
554            value_start: 0,
555            value_len: 0,
556            valid: false,
557        };
558        iter.parse_entry();
559        iter
560    }
561
562    /// Check if iterator is valid
563    pub fn valid(&self) -> bool {
564        self.valid
565    }
566
567    /// Get current key
568    pub fn key(&self) -> &[u8] {
569        &self.key
570    }
571
572    /// Get current value
573    pub fn value(&self) -> &[u8] {
574        &self.block.data[self.value_start..self.value_start + self.value_len]
575    }
576
577    /// Move to next entry
578    pub fn next(&mut self) {
579        if !self.valid {
580            return;
581        }
582
583        // Move past current value
584        self.offset = self.value_start + self.value_len;
585        self.parse_entry();
586    }
587
588    /// Parse entry at current offset
589    fn parse_entry(&mut self) {
590        // Use hash_index_offset as the boundary — entry data ends before the
591        // hash index (if present), not at restarts_offset.
592        let entries_end = self.block.hash_index_offset;
593
594        if self.offset >= entries_end {
595            self.valid = false;
596            return;
597        }
598
599        let mut cursor = Cursor::new(&self.block.data[self.offset..entries_end]);
600
601        // Read header
602        let shared = match decode_varint(&mut cursor) {
603            Some(v) => v as usize,
604            None => {
605                self.valid = false;
606                return;
607            }
608        };
609        let non_shared = match decode_varint(&mut cursor) {
610            Some(v) => v as usize,
611            None => {
612                self.valid = false;
613                return;
614            }
615        };
616        let value_len = match decode_varint(&mut cursor) {
617            Some(v) => v as usize,
618            None => {
619                self.valid = false;
620                return;
621            }
622        };
623
624        let header_len = cursor.position() as usize;
625        let data_start = self.offset + header_len;
626
627        // Bounds check
628        if data_start + non_shared + value_len > entries_end {
629            self.valid = false;
630            return;
631        }
632
633        // Reconstruct key
634        self.key.truncate(shared);
635        self.key
636            .extend_from_slice(&self.block.data[data_start..data_start + non_shared]);
637
638        // Record value location
639        self.value_start = data_start + non_shared;
640        self.value_len = value_len;
641        self.valid = true;
642    }
643
644    /// Seek to first entry >= target
645    pub fn seek(&mut self, target: &[u8]) {
646        // For now, delegate to block's seek and copy state
647        let new_iter = self.block.seek(target);
648        self.offset = new_iter.offset;
649        self.key = new_iter.key;
650        self.value_start = new_iter.value_start;
651        self.value_len = new_iter.value_len;
652        self.valid = new_iter.valid;
653    }
654
655    /// Seek to first entry
656    pub fn seek_to_first(&mut self) {
657        self.offset = 0;
658        self.key.clear();
659        self.parse_entry();
660    }
661}
662
663// =============================================================================
664// Varint Encoding/Decoding
665// =============================================================================
666
667/// Encode a u64 as a varint
668fn encode_varint(buf: &mut Vec<u8>, mut value: u64) {
669    while value >= 0x80 {
670        buf.push((value as u8) | 0x80);
671        value >>= 7;
672    }
673    buf.push(value as u8);
674}
675
676/// Decode a varint from a cursor
677fn decode_varint<R: Read>(reader: &mut R) -> Option<u64> {
678    let mut result: u64 = 0;
679    let mut shift = 0;
680
681    loop {
682        let byte = reader.read_u8().ok()?;
683        result |= ((byte & 0x7F) as u64) << shift;
684        if byte & 0x80 == 0 {
685            break;
686        }
687        shift += 7;
688        if shift >= 64 {
689            return None; // Overflow
690        }
691    }
692
693    Some(result)
694}
695
696// =============================================================================
697// Tests
698// =============================================================================
699
700#[cfg(test)]
701mod tests {
702    use super::*;
703
704    #[test]
705    fn test_block_builder_single_entry() {
706        let mut builder = BlockBuilder::new(16);
707        builder.add(b"key1", b"value1");
708        
709        let data = builder.finish();
710        let block = Block::new(data).unwrap();
711        
712        assert_eq!(block.get(b"key1"), Some(b"value1".to_vec()));
713        assert_eq!(block.get(b"key2"), None);
714    }
715
716    #[test]
717    fn test_block_builder_multiple_entries() {
718        let mut builder = BlockBuilder::new(4);
719        
720        for i in 0..20 {
721            let key = format!("key{:02}", i);
722            let value = format!("value{:02}", i);
723            builder.add(key.as_bytes(), value.as_bytes());
724        }
725        
726        let data = builder.finish();
727        let block = Block::new(data).unwrap();
728        
729        // Test all keys
730        for i in 0..20 {
731            let key = format!("key{:02}", i);
732            let expected_value = format!("value{:02}", i);
733            assert_eq!(block.get(key.as_bytes()), Some(expected_value.into_bytes()));
734        }
735    }
736
737    #[test]
738    fn test_block_iterator() {
739        let mut builder = BlockBuilder::new(4);
740        
741        builder.add(b"apple", b"1");
742        builder.add(b"banana", b"2");
743        builder.add(b"cherry", b"3");
744        builder.add(b"date", b"4");
745        
746        let data = builder.finish();
747        let block = Block::new(data).unwrap();
748        
749        let mut iter = block.iter();
750        let mut count = 0;
751        while iter.valid() {
752            count += 1;
753            iter.next();
754        }
755        assert_eq!(count, 4);
756    }
757
758    #[test]
759    fn test_block_seek() {
760        let mut builder = BlockBuilder::new(2);
761        
762        builder.add(b"a", b"1");
763        builder.add(b"c", b"2");
764        builder.add(b"e", b"3");
765        builder.add(b"g", b"4");
766        
767        let data = builder.finish();
768        let block = Block::new(data).unwrap();
769        
770        // Seek to existing key
771        let iter = block.seek(b"c");
772        assert!(iter.valid());
773        assert_eq!(iter.key(), b"c");
774        
775        // Seek to non-existing key (should find next)
776        let iter = block.seek(b"d");
777        assert!(iter.valid());
778        assert_eq!(iter.key(), b"e");
779        
780        // Seek past all keys
781        let iter = block.seek(b"z");
782        assert!(!iter.valid());
783    }
784
785    #[test]
786    fn test_prefix_compression() {
787        let mut builder = BlockBuilder::new(16);
788        
789        // Keys with common prefix (MUST be in sorted order!)
790        builder.add(b"user:1000:age", b"30");
791        builder.add(b"user:1000:email", b"alice@example.com");
792        builder.add(b"user:1000:name", b"Alice");
793        builder.add(b"user:1001:name", b"Bob");
794        
795        let data = builder.finish();
796        
797        // Verify compression happened (data should be smaller than uncompressed)
798        let uncompressed_size = 
799            b"user:1000:age".len() + b"30".len() +
800            b"user:1000:email".len() + b"alice@example.com".len() +
801            b"user:1000:name".len() + b"Alice".len() +
802            b"user:1001:name".len() + b"Bob".len();
803        
804        // Block should be smaller due to prefix compression
805        // (accounting for some overhead)
806        assert!(data.len() < uncompressed_size + 50);
807        
808        // Verify all keys are retrievable
809        let block = Block::new(data).unwrap();
810        assert_eq!(block.get(b"user:1000:age"), Some(b"30".to_vec()));
811        assert_eq!(block.get(b"user:1000:email"), Some(b"alice@example.com".to_vec()));
812        assert_eq!(block.get(b"user:1000:name"), Some(b"Alice".to_vec()));
813        assert_eq!(block.get(b"user:1001:name"), Some(b"Bob".to_vec()));
814    }
815
816    #[test]
817    fn test_varint_encoding() {
818        let test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
819        
820        for &value in &test_values {
821            let mut buf = Vec::new();
822            encode_varint(&mut buf, value);
823            
824            let mut cursor = Cursor::new(&buf);
825            let decoded = decode_varint(&mut cursor).unwrap();
826            
827            assert_eq!(value, decoded, "Failed for value {}", value);
828        }
829    }
830
831    #[test]
832    fn test_block_handle() {
833        let handle = BlockHandle::new(12345, 67890);
834        let encoded = handle.encode();
835        
836        let (decoded, len) = BlockHandle::decode(&encoded).unwrap();
837        assert_eq!(handle, decoded);
838        assert_eq!(len, encoded.len());
839    }
840}