Skip to main content

sochdb_storage/sstable/
block.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Block Encoding with Restart Points and Hash Index
19//!
20//! This module implements the block format used in SSTables. Each block
21//! contains a sequence of key-value pairs with prefix compression and
22//! restart points for efficient random access.
23//!
24//! ## Block Format
25//!
26//! ```text
27//! ┌─────────────────────────────────────────────────────────────────────────┐
28//! │                           Block Contents                                 │
29//! ├─────────────────────────────────────────────────────────────────────────┤
30//! │ Entry 0: [shared_len][unshared_len][value_len][unshared_key][value]     │
31//! │ Entry 1: [shared_len][unshared_len][value_len][unshared_key][value]     │
32//! │ ...                                                                      │
33//! │ Entry N-1                                                                │
34//! ├─────────────────────────────────────────────────────────────────────────┤
35//! │ Restart Points: [offset_0][offset_1]...[offset_R-1][num_restarts]       │
36//! ├─────────────────────────────────────────────────────────────────────────┤
37//! │ Optional Hash Index: [bucket_0][bucket_1]...[bucket_B-1][num_buckets]   │
38//! ├─────────────────────────────────────────────────────────────────────────┤
39//! │ Block Trailer: [type(1)][checksum(4)]                                   │
40//! └─────────────────────────────────────────────────────────────────────────┘
41//! ```
42//!
43//! ## Prefix Compression
44//!
45//! Keys are prefix-compressed within a restart interval:
46//! - `shared_len`: Number of bytes shared with previous key
47//! - `unshared_len`: Number of non-shared key bytes following
48//! - `value_len`: Length of value
49//!
50//! At restart points, `shared_len = 0` (full key stored).
51//!
52//! ## Complexity Analysis
53//!
54//! | Operation    | Without Hash | With Hash Index     |
55//! |--------------|--------------|---------------------|
56//! | Point lookup | O(log n/r + r) | O(1) expected     |
57//! | Seek         | O(log n/r + r) | O(log n/r + r)    |
58//! | Iterate      | O(n)           | O(n)              |
59//!
60//! Where n = entries, r = restart interval (typically 16).
61
62use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
63use std::cmp::Ordering;
64use std::io::{Cursor, Read, Write};
65
66/// Default restart interval (entries between restart points)
67pub const DEFAULT_RESTART_INTERVAL: usize = 16;
68
69/// Default number of hash buckets per entry (for hash index)
70pub const DEFAULT_HASH_BUCKET_RATIO: f64 = 0.75;
71
72/// Block compression type
73#[repr(u8)]
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum BlockType {
76    /// Uncompressed block
77    Uncompressed = 0,
78    /// Snappy compressed
79    Snappy = 1,
80    /// LZ4 compressed
81    Lz4 = 2,
82    /// Zstd compressed
83    Zstd = 3,
84}
85
86impl TryFrom<u8> for BlockType {
87    type Error = ();
88
89    fn try_from(value: u8) -> Result<Self, Self::Error> {
90        match value {
91            0 => Ok(BlockType::Uncompressed),
92            1 => Ok(BlockType::Snappy),
93            2 => Ok(BlockType::Lz4),
94            3 => Ok(BlockType::Zstd),
95            _ => Err(()),
96        }
97    }
98}
99
100impl BlockType {
101    /// Convert from u8
102    pub fn from_u8(value: u8) -> Self {
103        Self::try_from(value).unwrap_or(BlockType::Uncompressed)
104    }
105}
106
107/// Handle to a block (offset + size)
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub struct BlockHandle {
110    /// Offset in file
111    pub offset: u64,
112    /// Size of block data (excluding trailer)
113    pub size: u64,
114}
115
116impl BlockHandle {
117    pub fn new(offset: u64, size: u64) -> Self {
118        Self { offset, size }
119    }
120
121    /// Get offset
122    pub fn offset(&self) -> u64 {
123        self.offset
124    }
125
126    /// Get size
127    pub fn size(&self) -> u64 {
128        self.size
129    }
130
131    /// Encode to bytes (varint encoded for compactness)
132    pub fn encode(&self) -> Vec<u8> {
133        let mut buf = Vec::with_capacity(20);
134        encode_varint(&mut buf, self.offset);
135        encode_varint(&mut buf, self.size);
136        buf
137    }
138
139    /// Decode from bytes
140    pub fn decode(data: &[u8]) -> Option<(Self, usize)> {
141        let mut cursor = Cursor::new(data);
142        let offset = decode_varint(&mut cursor)?;
143        let size = decode_varint(&mut cursor)?;
144        Some((Self { offset, size }, cursor.position() as usize))
145    }
146}
147
148/// A restart point entry
149#[derive(Debug, Clone)]
150struct RestartPoint {
151    /// Offset within block data
152    offset: u32,
153}
154
155/// Hash bucket entry for hash index
156#[derive(Debug, Clone)]
157struct HashBucket {
158    /// Restart point index (0xFF = empty)
159    restart_index: u8,
160}
161
162// =============================================================================
163// Block Builder
164// =============================================================================
165
166/// Builder for creating blocks with prefix compression
167pub struct BlockBuilder {
168    /// Block data buffer
169    buffer: Vec<u8>,
170    /// Restart points (offsets into buffer)
171    restarts: Vec<u32>,
172    /// Entries since last restart
173    entries_since_restart: usize,
174    /// Restart interval
175    restart_interval: usize,
176    /// Last key (for prefix compression)
177    last_key: Vec<u8>,
178    /// Number of entries
179    entry_count: usize,
180    /// Whether to build hash index
181    use_hash_index: bool,
182    /// Keys for hash index (stored temporarily during build)
183    keys_for_hash: Vec<Vec<u8>>,
184}
185
186impl BlockBuilder {
187    /// Create a new block builder
188    pub fn new(restart_interval: usize) -> Self {
189        Self {
190            buffer: Vec::with_capacity(4096),
191            restarts: vec![0], // First restart at offset 0
192            entries_since_restart: 0,
193            restart_interval,
194            last_key: Vec::new(),
195            entry_count: 0,
196            use_hash_index: false,
197            keys_for_hash: Vec::new(),
198        }
199    }
200
201    /// Create with hash index enabled
202    pub fn with_hash_index(restart_interval: usize) -> Self {
203        Self {
204            use_hash_index: true,
205            ..Self::new(restart_interval)
206        }
207    }
208
209    /// Add a key-value pair to the block
210    ///
211    /// Keys must be added in sorted order.
212    pub fn add(&mut self, key: &[u8], value: &[u8]) {
213        debug_assert!(
214            self.entry_count == 0 || key > self.last_key.as_slice(),
215            "Keys must be added in sorted order"
216        );
217
218        // Check if we need a restart point
219        let shared = if self.entries_since_restart >= self.restart_interval {
220            // New restart point - store full key
221            self.restarts.push(self.buffer.len() as u32);
222            self.entries_since_restart = 0;
223            0
224        } else {
225            // Calculate shared prefix with last key
226            self.shared_prefix_len(&self.last_key, key)
227        };
228
229        let non_shared = key.len() - shared;
230        let value_len = value.len();
231
232        // Write entry: [shared_len][non_shared_len][value_len][key_delta][value]
233        encode_varint(&mut self.buffer, shared as u64);
234        encode_varint(&mut self.buffer, non_shared as u64);
235        encode_varint(&mut self.buffer, value_len as u64);
236        self.buffer.extend_from_slice(&key[shared..]);
237        self.buffer.extend_from_slice(value);
238
239        // Update state
240        self.last_key.clear();
241        self.last_key.extend_from_slice(key);
242        self.entries_since_restart += 1;
243        self.entry_count += 1;
244
245        // Store key for hash index
246        if self.use_hash_index {
247            self.keys_for_hash.push(key.to_vec());
248        }
249    }
250
251    /// Calculate shared prefix length
252    fn shared_prefix_len(&self, a: &[u8], b: &[u8]) -> usize {
253        let mut shared = 0;
254        let min_len = a.len().min(b.len());
255        while shared < min_len && a[shared] == b[shared] {
256            shared += 1;
257        }
258        shared
259    }
260
261    /// Finish building the block
262    ///
263    /// Returns the block contents including restarts and optional hash index.
264    pub fn finish(&mut self) -> Vec<u8> {
265        let mut result = std::mem::take(&mut self.buffer);
266
267        // Build hash index if enabled
268        if self.use_hash_index && self.entry_count > 0 {
269            self.build_hash_index(&mut result);
270        }
271
272        // Write restart points
273        for restart in &self.restarts {
274            result.write_u32::<LittleEndian>(*restart).unwrap();
275        }
276        result
277            .write_u32::<LittleEndian>(self.restarts.len() as u32)
278            .unwrap();
279
280        result
281    }
282
283    /// Build hash index for fast point lookups
284    fn build_hash_index(&self, data: &mut Vec<u8>) {
285        // Number of buckets = entries * bucket_ratio
286        let num_buckets = ((self.entry_count as f64 * DEFAULT_HASH_BUCKET_RATIO) as usize).max(1);
287        let mut buckets = vec![0xFFu8; num_buckets]; // 0xFF = empty
288
289        // For each key, compute hash and store restart point index
290        for (key_idx, key) in self.keys_for_hash.iter().enumerate() {
291            let restart_idx = key_idx / self.restart_interval;
292            let bucket = Self::hash_key(key) as usize % num_buckets;
293            
294            // Simple linear probing for collisions
295            let mut probe = bucket;
296            for _ in 0..num_buckets {
297                if buckets[probe] == 0xFF {
298                    buckets[probe] = restart_idx as u8;
299                    break;
300                }
301                probe = (probe + 1) % num_buckets;
302            }
303        }
304
305        // Write hash index
306        data.extend_from_slice(&buckets);
307        data.write_u32::<LittleEndian>(num_buckets as u32).unwrap();
308    }
309
310    /// Hash a key for the hash index
311    fn hash_key(key: &[u8]) -> u32 {
312        // Use xxHash for speed
313        twox_hash::xxh3::hash64(key) as u32
314    }
315
316    /// Check if block is empty
317    pub fn is_empty(&self) -> bool {
318        self.entry_count == 0
319    }
320
321    /// Get approximate size of current block
322    pub fn estimated_size(&self) -> usize {
323        self.buffer.len() + self.restarts.len() * 4 + 4
324    }
325
326    /// Reset builder for reuse
327    pub fn reset(&mut self) {
328        self.buffer.clear();
329        self.restarts.clear();
330        self.restarts.push(0);
331        self.entries_since_restart = 0;
332        self.last_key.clear();
333        self.entry_count = 0;
334        self.keys_for_hash.clear();
335    }
336}
337
338impl Default for BlockBuilder {
339    fn default() -> Self {
340        Self::new(DEFAULT_RESTART_INTERVAL)
341    }
342}
343
344// =============================================================================
345// Block
346// =============================================================================
347
348/// An SSTable block with efficient key lookup
349pub struct Block {
350    /// Raw block data
351    data: Vec<u8>,
352    /// Offset where restart array begins
353    restarts_offset: usize,
354    /// Number of restart points
355    num_restarts: usize,
356    /// Number of hash buckets (0 if no hash index)
357    num_hash_buckets: usize,
358    /// Offset where hash index begins (if present)
359    hash_index_offset: usize,
360}
361
362impl Block {
363    /// Create a block from raw data
364    pub fn new(data: Vec<u8>) -> Option<Self> {
365        if data.len() < 4 {
366            return None;
367        }
368
369        // Read number of restarts (last 4 bytes)
370        let num_restarts = {
371            let mut cursor = Cursor::new(&data[data.len() - 4..]);
372            cursor.read_u32::<LittleEndian>().ok()? as usize
373        };
374
375        if num_restarts == 0 || data.len() < 4 + num_restarts * 4 {
376            return None;
377        }
378
379        let restarts_offset = data.len() - 4 - num_restarts * 4;
380
381        // TODO: Detect and parse hash index if present
382        // For now, assume no hash index
383        let num_hash_buckets = 0;
384        let hash_index_offset = restarts_offset;
385
386        Some(Self {
387            data,
388            restarts_offset,
389            num_restarts,
390            num_hash_buckets,
391            hash_index_offset,
392        })
393    }
394
395    /// Get restart point offset at given index
396    fn restart_offset(&self, index: usize) -> u32 {
397        debug_assert!(index < self.num_restarts);
398        let offset = self.restarts_offset + index * 4;
399        let mut cursor = Cursor::new(&self.data[offset..offset + 4]);
400        cursor.read_u32::<LittleEndian>().unwrap()
401    }
402
403    /// Seek to the first entry with key >= target
404    ///
405    /// Returns an iterator positioned at the target or past-the-end.
406    pub fn seek(&self, target: &[u8]) -> BlockIterator<'_> {
407        // Binary search on restart points
408        let mut left = 0;
409        let mut right = self.num_restarts;
410
411        while left < right {
412            let mid = left + (right - left) / 2;
413            let offset = self.restart_offset(mid) as usize;
414            
415            // Read key at restart point (shared = 0)
416            let key = self.read_key_at(offset);
417            
418            match key.as_slice().cmp(target) {
419                Ordering::Less => left = mid + 1,
420                Ordering::Greater => right = mid,
421                Ordering::Equal => {
422                    return BlockIterator::new(self, offset);
423                }
424            }
425        }
426
427        // left now points to the first restart point with key > target
428        // Start from the previous restart point and scan
429        let start_restart = if left > 0 { left - 1 } else { 0 };
430        let start_offset = self.restart_offset(start_restart) as usize;
431        
432        let mut iter = BlockIterator::new(self, start_offset);
433        while iter.valid() {
434            if iter.key() >= target {
435                break;
436            }
437            iter.next();
438        }
439        iter
440    }
441
442    /// Get value for exact key match
443    ///
444    /// Uses hash index if available for O(1) expected time.
445    pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
446        // TODO: Use hash index if available
447        
448        // Fall back to binary search
449        let mut iter = self.seek(key);
450        if iter.valid() && iter.key() == key {
451            Some(iter.value().to_vec())
452        } else {
453            None
454        }
455    }
456
457    /// Read the full key at a given offset (must be at a restart point)
458    fn read_key_at(&self, offset: usize) -> Vec<u8> {
459        let mut cursor = Cursor::new(&self.data[offset..self.restarts_offset]);
460        
461        let shared = decode_varint(&mut cursor).unwrap_or(0) as usize;
462        let non_shared = decode_varint(&mut cursor).unwrap_or(0) as usize;
463        let _value_len = decode_varint(&mut cursor);
464        
465        debug_assert_eq!(shared, 0, "Expected restart point (shared = 0)");
466        
467        let pos = cursor.position() as usize;
468        self.data[offset + pos..offset + pos + non_shared].to_vec()
469    }
470
471    /// Create an iterator over all entries
472    pub fn iter(&self) -> BlockIterator<'_> {
473        BlockIterator::new(self, 0)
474    }
475
476    /// Get block data
477    pub fn data(&self) -> &[u8] {
478        &self.data
479    }
480}
481
482// =============================================================================
483// Block Iterator
484// =============================================================================
485
486/// Iterator over block entries with prefix decompression
487pub struct BlockIterator<'a> {
488    block: &'a Block,
489    /// Current position in data
490    offset: usize,
491    /// Current reconstructed key
492    key: Vec<u8>,
493    /// Current value slice
494    value_start: usize,
495    value_len: usize,
496    /// Whether iterator is valid
497    valid: bool,
498}
499
500impl<'a> BlockIterator<'a> {
501    /// Create a new block iterator starting at the given offset
502    pub fn new(block: &'a Block, offset: usize) -> Self {
503        let mut iter = Self {
504            block,
505            offset,
506            key: Vec::new(),
507            value_start: 0,
508            value_len: 0,
509            valid: false,
510        };
511        iter.parse_entry();
512        iter
513    }
514
515    /// Check if iterator is valid
516    pub fn valid(&self) -> bool {
517        self.valid
518    }
519
520    /// Get current key
521    pub fn key(&self) -> &[u8] {
522        &self.key
523    }
524
525    /// Get current value
526    pub fn value(&self) -> &[u8] {
527        &self.block.data[self.value_start..self.value_start + self.value_len]
528    }
529
530    /// Move to next entry
531    pub fn next(&mut self) {
532        if !self.valid {
533            return;
534        }
535
536        // Move past current value
537        self.offset = self.value_start + self.value_len;
538        self.parse_entry();
539    }
540
541    /// Parse entry at current offset
542    fn parse_entry(&mut self) {
543        if self.offset >= self.block.restarts_offset {
544            self.valid = false;
545            return;
546        }
547
548        let mut cursor = Cursor::new(&self.block.data[self.offset..self.block.restarts_offset]);
549
550        // Read header
551        let shared = match decode_varint(&mut cursor) {
552            Some(v) => v as usize,
553            None => {
554                self.valid = false;
555                return;
556            }
557        };
558        let non_shared = match decode_varint(&mut cursor) {
559            Some(v) => v as usize,
560            None => {
561                self.valid = false;
562                return;
563            }
564        };
565        let value_len = match decode_varint(&mut cursor) {
566            Some(v) => v as usize,
567            None => {
568                self.valid = false;
569                return;
570            }
571        };
572
573        let header_len = cursor.position() as usize;
574        let data_start = self.offset + header_len;
575
576        // Bounds check
577        if data_start + non_shared + value_len > self.block.restarts_offset {
578            self.valid = false;
579            return;
580        }
581
582        // Reconstruct key
583        self.key.truncate(shared);
584        self.key
585            .extend_from_slice(&self.block.data[data_start..data_start + non_shared]);
586
587        // Record value location
588        self.value_start = data_start + non_shared;
589        self.value_len = value_len;
590        self.valid = true;
591    }
592
593    /// Seek to first entry >= target
594    pub fn seek(&mut self, target: &[u8]) {
595        // For now, delegate to block's seek and copy state
596        let new_iter = self.block.seek(target);
597        self.offset = new_iter.offset;
598        self.key = new_iter.key;
599        self.value_start = new_iter.value_start;
600        self.value_len = new_iter.value_len;
601        self.valid = new_iter.valid;
602    }
603
604    /// Seek to first entry
605    pub fn seek_to_first(&mut self) {
606        self.offset = 0;
607        self.key.clear();
608        self.parse_entry();
609    }
610}
611
612// =============================================================================
613// Varint Encoding/Decoding
614// =============================================================================
615
616/// Encode a u64 as a varint
617fn encode_varint(buf: &mut Vec<u8>, mut value: u64) {
618    while value >= 0x80 {
619        buf.push((value as u8) | 0x80);
620        value >>= 7;
621    }
622    buf.push(value as u8);
623}
624
625/// Decode a varint from a cursor
626fn decode_varint<R: Read>(reader: &mut R) -> Option<u64> {
627    let mut result: u64 = 0;
628    let mut shift = 0;
629
630    loop {
631        let byte = reader.read_u8().ok()?;
632        result |= ((byte & 0x7F) as u64) << shift;
633        if byte & 0x80 == 0 {
634            break;
635        }
636        shift += 7;
637        if shift >= 64 {
638            return None; // Overflow
639        }
640    }
641
642    Some(result)
643}
644
645// =============================================================================
646// Tests
647// =============================================================================
648
649#[cfg(test)]
650mod tests {
651    use super::*;
652
653    #[test]
654    fn test_block_builder_single_entry() {
655        let mut builder = BlockBuilder::new(16);
656        builder.add(b"key1", b"value1");
657        
658        let data = builder.finish();
659        let block = Block::new(data).unwrap();
660        
661        assert_eq!(block.get(b"key1"), Some(b"value1".to_vec()));
662        assert_eq!(block.get(b"key2"), None);
663    }
664
665    #[test]
666    fn test_block_builder_multiple_entries() {
667        let mut builder = BlockBuilder::new(4);
668        
669        for i in 0..20 {
670            let key = format!("key{:02}", i);
671            let value = format!("value{:02}", i);
672            builder.add(key.as_bytes(), value.as_bytes());
673        }
674        
675        let data = builder.finish();
676        let block = Block::new(data).unwrap();
677        
678        // Test all keys
679        for i in 0..20 {
680            let key = format!("key{:02}", i);
681            let expected_value = format!("value{:02}", i);
682            assert_eq!(block.get(key.as_bytes()), Some(expected_value.into_bytes()));
683        }
684    }
685
686    #[test]
687    fn test_block_iterator() {
688        let mut builder = BlockBuilder::new(4);
689        
690        builder.add(b"apple", b"1");
691        builder.add(b"banana", b"2");
692        builder.add(b"cherry", b"3");
693        builder.add(b"date", b"4");
694        
695        let data = builder.finish();
696        let block = Block::new(data).unwrap();
697        
698        let mut iter = block.iter();
699        let mut count = 0;
700        while iter.valid() {
701            count += 1;
702            iter.next();
703        }
704        assert_eq!(count, 4);
705    }
706
707    #[test]
708    fn test_block_seek() {
709        let mut builder = BlockBuilder::new(2);
710        
711        builder.add(b"a", b"1");
712        builder.add(b"c", b"2");
713        builder.add(b"e", b"3");
714        builder.add(b"g", b"4");
715        
716        let data = builder.finish();
717        let block = Block::new(data).unwrap();
718        
719        // Seek to existing key
720        let iter = block.seek(b"c");
721        assert!(iter.valid());
722        assert_eq!(iter.key(), b"c");
723        
724        // Seek to non-existing key (should find next)
725        let iter = block.seek(b"d");
726        assert!(iter.valid());
727        assert_eq!(iter.key(), b"e");
728        
729        // Seek past all keys
730        let iter = block.seek(b"z");
731        assert!(!iter.valid());
732    }
733
734    #[test]
735    fn test_prefix_compression() {
736        let mut builder = BlockBuilder::new(16);
737        
738        // Keys with common prefix (MUST be in sorted order!)
739        builder.add(b"user:1000:age", b"30");
740        builder.add(b"user:1000:email", b"alice@example.com");
741        builder.add(b"user:1000:name", b"Alice");
742        builder.add(b"user:1001:name", b"Bob");
743        
744        let data = builder.finish();
745        
746        // Verify compression happened (data should be smaller than uncompressed)
747        let uncompressed_size = 
748            b"user:1000:age".len() + b"30".len() +
749            b"user:1000:email".len() + b"alice@example.com".len() +
750            b"user:1000:name".len() + b"Alice".len() +
751            b"user:1001:name".len() + b"Bob".len();
752        
753        // Block should be smaller due to prefix compression
754        // (accounting for some overhead)
755        assert!(data.len() < uncompressed_size + 50);
756        
757        // Verify all keys are retrievable
758        let block = Block::new(data).unwrap();
759        assert_eq!(block.get(b"user:1000:age"), Some(b"30".to_vec()));
760        assert_eq!(block.get(b"user:1000:email"), Some(b"alice@example.com".to_vec()));
761        assert_eq!(block.get(b"user:1000:name"), Some(b"Alice".to_vec()));
762        assert_eq!(block.get(b"user:1001:name"), Some(b"Bob".to_vec()));
763    }
764
765    #[test]
766    fn test_varint_encoding() {
767        let test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
768        
769        for &value in &test_values {
770            let mut buf = Vec::new();
771            encode_varint(&mut buf, value);
772            
773            let mut cursor = Cursor::new(&buf);
774            let decoded = decode_varint(&mut cursor).unwrap();
775            
776            assert_eq!(value, decoded, "Failed for value {}", value);
777        }
778    }
779
780    #[test]
781    fn test_block_handle() {
782        let handle = BlockHandle::new(12345, 67890);
783        let encoded = handle.encode();
784        
785        let (decoded, len) = BlockHandle::decode(&encoded).unwrap();
786        assert_eq!(handle, decoded);
787        assert_eq!(len, encoded.len());
788    }
789}