sochdb_storage/sstable/
block.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Block Encoding with Restart Points and Hash Index
16//!
17//! This module implements the block format used in SSTables. Each block
18//! contains a sequence of key-value pairs with prefix compression and
19//! restart points for efficient random access.
20//!
21//! ## Block Format
22//!
23//! ```text
24//! ┌─────────────────────────────────────────────────────────────────────────┐
25//! │                           Block Contents                                 │
26//! ├─────────────────────────────────────────────────────────────────────────┤
27//! │ Entry 0: [shared_len][unshared_len][value_len][unshared_key][value]     │
28//! │ Entry 1: [shared_len][unshared_len][value_len][unshared_key][value]     │
29//! │ ...                                                                      │
30//! │ Entry N-1                                                                │
31//! ├─────────────────────────────────────────────────────────────────────────┤
32//! │ Restart Points: [offset_0][offset_1]...[offset_R-1][num_restarts]       │
33//! ├─────────────────────────────────────────────────────────────────────────┤
34//! │ Optional Hash Index: [bucket_0][bucket_1]...[bucket_B-1][num_buckets]   │
35//! ├─────────────────────────────────────────────────────────────────────────┤
36//! │ Block Trailer: [type(1)][checksum(4)]                                   │
37//! └─────────────────────────────────────────────────────────────────────────┘
38//! ```
39//!
40//! ## Prefix Compression
41//!
42//! Keys are prefix-compressed within a restart interval:
43//! - `shared_len`: Number of bytes shared with previous key
44//! - `unshared_len`: Number of non-shared key bytes following
45//! - `value_len`: Length of value
46//!
47//! At restart points, `shared_len = 0` (full key stored).
48//!
49//! ## Complexity Analysis
50//!
51//! | Operation    | Without Hash | With Hash Index     |
52//! |--------------|--------------|---------------------|
53//! | Point lookup | O(log n/r + r) | O(1) expected     |
54//! | Seek         | O(log n/r + r) | O(log n/r + r)    |
55//! | Iterate      | O(n)           | O(n)              |
56//!
57//! Where n = entries, r = restart interval (typically 16).
58
59use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
60use std::cmp::Ordering;
61use std::io::{Cursor, Read, Write};
62
63/// Default restart interval (entries between restart points)
64pub const DEFAULT_RESTART_INTERVAL: usize = 16;
65
66/// Default number of hash buckets per entry (for hash index)
67pub const DEFAULT_HASH_BUCKET_RATIO: f64 = 0.75;
68
69/// Block compression type
70#[repr(u8)]
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum BlockType {
73    /// Uncompressed block
74    Uncompressed = 0,
75    /// Snappy compressed
76    Snappy = 1,
77    /// LZ4 compressed
78    Lz4 = 2,
79    /// Zstd compressed
80    Zstd = 3,
81}
82
83impl TryFrom<u8> for BlockType {
84    type Error = ();
85
86    fn try_from(value: u8) -> Result<Self, Self::Error> {
87        match value {
88            0 => Ok(BlockType::Uncompressed),
89            1 => Ok(BlockType::Snappy),
90            2 => Ok(BlockType::Lz4),
91            3 => Ok(BlockType::Zstd),
92            _ => Err(()),
93        }
94    }
95}
96
97impl BlockType {
98    /// Convert from u8
99    pub fn from_u8(value: u8) -> Self {
100        Self::try_from(value).unwrap_or(BlockType::Uncompressed)
101    }
102}
103
104/// Handle to a block (offset + size)
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106pub struct BlockHandle {
107    /// Offset in file
108    pub offset: u64,
109    /// Size of block data (excluding trailer)
110    pub size: u64,
111}
112
113impl BlockHandle {
114    pub fn new(offset: u64, size: u64) -> Self {
115        Self { offset, size }
116    }
117
118    /// Get offset
119    pub fn offset(&self) -> u64 {
120        self.offset
121    }
122
123    /// Get size
124    pub fn size(&self) -> u64 {
125        self.size
126    }
127
128    /// Encode to bytes (varint encoded for compactness)
129    pub fn encode(&self) -> Vec<u8> {
130        let mut buf = Vec::with_capacity(20);
131        encode_varint(&mut buf, self.offset);
132        encode_varint(&mut buf, self.size);
133        buf
134    }
135
136    /// Decode from bytes
137    pub fn decode(data: &[u8]) -> Option<(Self, usize)> {
138        let mut cursor = Cursor::new(data);
139        let offset = decode_varint(&mut cursor)?;
140        let size = decode_varint(&mut cursor)?;
141        Some((Self { offset, size }, cursor.position() as usize))
142    }
143}
144
145/// A restart point entry
146#[derive(Debug, Clone)]
147struct RestartPoint {
148    /// Offset within block data
149    offset: u32,
150}
151
152/// Hash bucket entry for hash index
153#[derive(Debug, Clone)]
154struct HashBucket {
155    /// Restart point index (0xFF = empty)
156    restart_index: u8,
157}
158
159// =============================================================================
160// Block Builder
161// =============================================================================
162
163/// Builder for creating blocks with prefix compression
164pub struct BlockBuilder {
165    /// Block data buffer
166    buffer: Vec<u8>,
167    /// Restart points (offsets into buffer)
168    restarts: Vec<u32>,
169    /// Entries since last restart
170    entries_since_restart: usize,
171    /// Restart interval
172    restart_interval: usize,
173    /// Last key (for prefix compression)
174    last_key: Vec<u8>,
175    /// Number of entries
176    entry_count: usize,
177    /// Whether to build hash index
178    use_hash_index: bool,
179    /// Keys for hash index (stored temporarily during build)
180    keys_for_hash: Vec<Vec<u8>>,
181}
182
183impl BlockBuilder {
184    /// Create a new block builder
185    pub fn new(restart_interval: usize) -> Self {
186        Self {
187            buffer: Vec::with_capacity(4096),
188            restarts: vec![0], // First restart at offset 0
189            entries_since_restart: 0,
190            restart_interval,
191            last_key: Vec::new(),
192            entry_count: 0,
193            use_hash_index: false,
194            keys_for_hash: Vec::new(),
195        }
196    }
197
198    /// Create with hash index enabled
199    pub fn with_hash_index(restart_interval: usize) -> Self {
200        Self {
201            use_hash_index: true,
202            ..Self::new(restart_interval)
203        }
204    }
205
206    /// Add a key-value pair to the block
207    ///
208    /// Keys must be added in sorted order.
209    pub fn add(&mut self, key: &[u8], value: &[u8]) {
210        debug_assert!(
211            self.entry_count == 0 || key > self.last_key.as_slice(),
212            "Keys must be added in sorted order"
213        );
214
215        // Check if we need a restart point
216        let shared = if self.entries_since_restart >= self.restart_interval {
217            // New restart point - store full key
218            self.restarts.push(self.buffer.len() as u32);
219            self.entries_since_restart = 0;
220            0
221        } else {
222            // Calculate shared prefix with last key
223            self.shared_prefix_len(&self.last_key, key)
224        };
225
226        let non_shared = key.len() - shared;
227        let value_len = value.len();
228
229        // Write entry: [shared_len][non_shared_len][value_len][key_delta][value]
230        encode_varint(&mut self.buffer, shared as u64);
231        encode_varint(&mut self.buffer, non_shared as u64);
232        encode_varint(&mut self.buffer, value_len as u64);
233        self.buffer.extend_from_slice(&key[shared..]);
234        self.buffer.extend_from_slice(value);
235
236        // Update state
237        self.last_key.clear();
238        self.last_key.extend_from_slice(key);
239        self.entries_since_restart += 1;
240        self.entry_count += 1;
241
242        // Store key for hash index
243        if self.use_hash_index {
244            self.keys_for_hash.push(key.to_vec());
245        }
246    }
247
248    /// Calculate shared prefix length
249    fn shared_prefix_len(&self, a: &[u8], b: &[u8]) -> usize {
250        let mut shared = 0;
251        let min_len = a.len().min(b.len());
252        while shared < min_len && a[shared] == b[shared] {
253            shared += 1;
254        }
255        shared
256    }
257
258    /// Finish building the block
259    ///
260    /// Returns the block contents including restarts and optional hash index.
261    pub fn finish(&mut self) -> Vec<u8> {
262        let mut result = std::mem::take(&mut self.buffer);
263
264        // Build hash index if enabled
265        if self.use_hash_index && self.entry_count > 0 {
266            self.build_hash_index(&mut result);
267        }
268
269        // Write restart points
270        for restart in &self.restarts {
271            result.write_u32::<LittleEndian>(*restart).unwrap();
272        }
273        result
274            .write_u32::<LittleEndian>(self.restarts.len() as u32)
275            .unwrap();
276
277        result
278    }
279
280    /// Build hash index for fast point lookups
281    fn build_hash_index(&self, data: &mut Vec<u8>) {
282        // Number of buckets = entries * bucket_ratio
283        let num_buckets = ((self.entry_count as f64 * DEFAULT_HASH_BUCKET_RATIO) as usize).max(1);
284        let mut buckets = vec![0xFFu8; num_buckets]; // 0xFF = empty
285
286        // For each key, compute hash and store restart point index
287        for (key_idx, key) in self.keys_for_hash.iter().enumerate() {
288            let restart_idx = key_idx / self.restart_interval;
289            let bucket = Self::hash_key(key) as usize % num_buckets;
290            
291            // Simple linear probing for collisions
292            let mut probe = bucket;
293            for _ in 0..num_buckets {
294                if buckets[probe] == 0xFF {
295                    buckets[probe] = restart_idx as u8;
296                    break;
297                }
298                probe = (probe + 1) % num_buckets;
299            }
300        }
301
302        // Write hash index
303        data.extend_from_slice(&buckets);
304        data.write_u32::<LittleEndian>(num_buckets as u32).unwrap();
305    }
306
307    /// Hash a key for the hash index
308    fn hash_key(key: &[u8]) -> u32 {
309        // Use xxHash for speed
310        twox_hash::xxh3::hash64(key) as u32
311    }
312
313    /// Check if block is empty
314    pub fn is_empty(&self) -> bool {
315        self.entry_count == 0
316    }
317
318    /// Get approximate size of current block
319    pub fn estimated_size(&self) -> usize {
320        self.buffer.len() + self.restarts.len() * 4 + 4
321    }
322
323    /// Reset builder for reuse
324    pub fn reset(&mut self) {
325        self.buffer.clear();
326        self.restarts.clear();
327        self.restarts.push(0);
328        self.entries_since_restart = 0;
329        self.last_key.clear();
330        self.entry_count = 0;
331        self.keys_for_hash.clear();
332    }
333}
334
335impl Default for BlockBuilder {
336    fn default() -> Self {
337        Self::new(DEFAULT_RESTART_INTERVAL)
338    }
339}
340
341// =============================================================================
342// Block
343// =============================================================================
344
345/// An SSTable block with efficient key lookup
346pub struct Block {
347    /// Raw block data
348    data: Vec<u8>,
349    /// Offset where restart array begins
350    restarts_offset: usize,
351    /// Number of restart points
352    num_restarts: usize,
353    /// Number of hash buckets (0 if no hash index)
354    num_hash_buckets: usize,
355    /// Offset where hash index begins (if present)
356    hash_index_offset: usize,
357}
358
359impl Block {
360    /// Create a block from raw data
361    pub fn new(data: Vec<u8>) -> Option<Self> {
362        if data.len() < 4 {
363            return None;
364        }
365
366        // Read number of restarts (last 4 bytes)
367        let num_restarts = {
368            let mut cursor = Cursor::new(&data[data.len() - 4..]);
369            cursor.read_u32::<LittleEndian>().ok()? as usize
370        };
371
372        if num_restarts == 0 || data.len() < 4 + num_restarts * 4 {
373            return None;
374        }
375
376        let restarts_offset = data.len() - 4 - num_restarts * 4;
377
378        // TODO: Detect and parse hash index if present
379        // For now, assume no hash index
380        let num_hash_buckets = 0;
381        let hash_index_offset = restarts_offset;
382
383        Some(Self {
384            data,
385            restarts_offset,
386            num_restarts,
387            num_hash_buckets,
388            hash_index_offset,
389        })
390    }
391
392    /// Get restart point offset at given index
393    fn restart_offset(&self, index: usize) -> u32 {
394        debug_assert!(index < self.num_restarts);
395        let offset = self.restarts_offset + index * 4;
396        let mut cursor = Cursor::new(&self.data[offset..offset + 4]);
397        cursor.read_u32::<LittleEndian>().unwrap()
398    }
399
400    /// Seek to the first entry with key >= target
401    ///
402    /// Returns an iterator positioned at the target or past-the-end.
403    pub fn seek(&self, target: &[u8]) -> BlockIterator<'_> {
404        // Binary search on restart points
405        let mut left = 0;
406        let mut right = self.num_restarts;
407
408        while left < right {
409            let mid = left + (right - left) / 2;
410            let offset = self.restart_offset(mid) as usize;
411            
412            // Read key at restart point (shared = 0)
413            let key = self.read_key_at(offset);
414            
415            match key.as_slice().cmp(target) {
416                Ordering::Less => left = mid + 1,
417                Ordering::Greater => right = mid,
418                Ordering::Equal => {
419                    return BlockIterator::new(self, offset);
420                }
421            }
422        }
423
424        // left now points to the first restart point with key > target
425        // Start from the previous restart point and scan
426        let start_restart = if left > 0 { left - 1 } else { 0 };
427        let start_offset = self.restart_offset(start_restart) as usize;
428        
429        let mut iter = BlockIterator::new(self, start_offset);
430        while iter.valid() {
431            if iter.key() >= target {
432                break;
433            }
434            iter.next();
435        }
436        iter
437    }
438
439    /// Get value for exact key match
440    ///
441    /// Uses hash index if available for O(1) expected time.
442    pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
443        // TODO: Use hash index if available
444        
445        // Fall back to binary search
446        let mut iter = self.seek(key);
447        if iter.valid() && iter.key() == key {
448            Some(iter.value().to_vec())
449        } else {
450            None
451        }
452    }
453
454    /// Read the full key at a given offset (must be at a restart point)
455    fn read_key_at(&self, offset: usize) -> Vec<u8> {
456        let mut cursor = Cursor::new(&self.data[offset..self.restarts_offset]);
457        
458        let shared = decode_varint(&mut cursor).unwrap_or(0) as usize;
459        let non_shared = decode_varint(&mut cursor).unwrap_or(0) as usize;
460        let _value_len = decode_varint(&mut cursor);
461        
462        debug_assert_eq!(shared, 0, "Expected restart point (shared = 0)");
463        
464        let pos = cursor.position() as usize;
465        self.data[offset + pos..offset + pos + non_shared].to_vec()
466    }
467
468    /// Create an iterator over all entries
469    pub fn iter(&self) -> BlockIterator<'_> {
470        BlockIterator::new(self, 0)
471    }
472
473    /// Get block data
474    pub fn data(&self) -> &[u8] {
475        &self.data
476    }
477}
478
479// =============================================================================
480// Block Iterator
481// =============================================================================
482
483/// Iterator over block entries with prefix decompression
484pub struct BlockIterator<'a> {
485    block: &'a Block,
486    /// Current position in data
487    offset: usize,
488    /// Current reconstructed key
489    key: Vec<u8>,
490    /// Current value slice
491    value_start: usize,
492    value_len: usize,
493    /// Whether iterator is valid
494    valid: bool,
495}
496
497impl<'a> BlockIterator<'a> {
498    /// Create a new block iterator starting at the given offset
499    pub fn new(block: &'a Block, offset: usize) -> Self {
500        let mut iter = Self {
501            block,
502            offset,
503            key: Vec::new(),
504            value_start: 0,
505            value_len: 0,
506            valid: false,
507        };
508        iter.parse_entry();
509        iter
510    }
511
512    /// Check if iterator is valid
513    pub fn valid(&self) -> bool {
514        self.valid
515    }
516
517    /// Get current key
518    pub fn key(&self) -> &[u8] {
519        &self.key
520    }
521
522    /// Get current value
523    pub fn value(&self) -> &[u8] {
524        &self.block.data[self.value_start..self.value_start + self.value_len]
525    }
526
527    /// Move to next entry
528    pub fn next(&mut self) {
529        if !self.valid {
530            return;
531        }
532
533        // Move past current value
534        self.offset = self.value_start + self.value_len;
535        self.parse_entry();
536    }
537
538    /// Parse entry at current offset
539    fn parse_entry(&mut self) {
540        if self.offset >= self.block.restarts_offset {
541            self.valid = false;
542            return;
543        }
544
545        let mut cursor = Cursor::new(&self.block.data[self.offset..self.block.restarts_offset]);
546
547        // Read header
548        let shared = match decode_varint(&mut cursor) {
549            Some(v) => v as usize,
550            None => {
551                self.valid = false;
552                return;
553            }
554        };
555        let non_shared = match decode_varint(&mut cursor) {
556            Some(v) => v as usize,
557            None => {
558                self.valid = false;
559                return;
560            }
561        };
562        let value_len = match decode_varint(&mut cursor) {
563            Some(v) => v as usize,
564            None => {
565                self.valid = false;
566                return;
567            }
568        };
569
570        let header_len = cursor.position() as usize;
571        let data_start = self.offset + header_len;
572
573        // Bounds check
574        if data_start + non_shared + value_len > self.block.restarts_offset {
575            self.valid = false;
576            return;
577        }
578
579        // Reconstruct key
580        self.key.truncate(shared);
581        self.key
582            .extend_from_slice(&self.block.data[data_start..data_start + non_shared]);
583
584        // Record value location
585        self.value_start = data_start + non_shared;
586        self.value_len = value_len;
587        self.valid = true;
588    }
589
590    /// Seek to first entry >= target
591    pub fn seek(&mut self, target: &[u8]) {
592        // For now, delegate to block's seek and copy state
593        let new_iter = self.block.seek(target);
594        self.offset = new_iter.offset;
595        self.key = new_iter.key;
596        self.value_start = new_iter.value_start;
597        self.value_len = new_iter.value_len;
598        self.valid = new_iter.valid;
599    }
600
601    /// Seek to first entry
602    pub fn seek_to_first(&mut self) {
603        self.offset = 0;
604        self.key.clear();
605        self.parse_entry();
606    }
607}
608
609// =============================================================================
610// Varint Encoding/Decoding
611// =============================================================================
612
613/// Encode a u64 as a varint
614fn encode_varint(buf: &mut Vec<u8>, mut value: u64) {
615    while value >= 0x80 {
616        buf.push((value as u8) | 0x80);
617        value >>= 7;
618    }
619    buf.push(value as u8);
620}
621
622/// Decode a varint from a cursor
623fn decode_varint<R: Read>(reader: &mut R) -> Option<u64> {
624    let mut result: u64 = 0;
625    let mut shift = 0;
626
627    loop {
628        let byte = reader.read_u8().ok()?;
629        result |= ((byte & 0x7F) as u64) << shift;
630        if byte & 0x80 == 0 {
631            break;
632        }
633        shift += 7;
634        if shift >= 64 {
635            return None; // Overflow
636        }
637    }
638
639    Some(result)
640}
641
642// =============================================================================
643// Tests
644// =============================================================================
645
646#[cfg(test)]
647mod tests {
648    use super::*;
649
650    #[test]
651    fn test_block_builder_single_entry() {
652        let mut builder = BlockBuilder::new(16);
653        builder.add(b"key1", b"value1");
654        
655        let data = builder.finish();
656        let block = Block::new(data).unwrap();
657        
658        assert_eq!(block.get(b"key1"), Some(b"value1".to_vec()));
659        assert_eq!(block.get(b"key2"), None);
660    }
661
662    #[test]
663    fn test_block_builder_multiple_entries() {
664        let mut builder = BlockBuilder::new(4);
665        
666        for i in 0..20 {
667            let key = format!("key{:02}", i);
668            let value = format!("value{:02}", i);
669            builder.add(key.as_bytes(), value.as_bytes());
670        }
671        
672        let data = builder.finish();
673        let block = Block::new(data).unwrap();
674        
675        // Test all keys
676        for i in 0..20 {
677            let key = format!("key{:02}", i);
678            let expected_value = format!("value{:02}", i);
679            assert_eq!(block.get(key.as_bytes()), Some(expected_value.into_bytes()));
680        }
681    }
682
683    #[test]
684    fn test_block_iterator() {
685        let mut builder = BlockBuilder::new(4);
686        
687        builder.add(b"apple", b"1");
688        builder.add(b"banana", b"2");
689        builder.add(b"cherry", b"3");
690        builder.add(b"date", b"4");
691        
692        let data = builder.finish();
693        let block = Block::new(data).unwrap();
694        
695        let mut iter = block.iter();
696        let mut count = 0;
697        while iter.valid() {
698            count += 1;
699            iter.next();
700        }
701        assert_eq!(count, 4);
702    }
703
704    #[test]
705    fn test_block_seek() {
706        let mut builder = BlockBuilder::new(2);
707        
708        builder.add(b"a", b"1");
709        builder.add(b"c", b"2");
710        builder.add(b"e", b"3");
711        builder.add(b"g", b"4");
712        
713        let data = builder.finish();
714        let block = Block::new(data).unwrap();
715        
716        // Seek to existing key
717        let iter = block.seek(b"c");
718        assert!(iter.valid());
719        assert_eq!(iter.key(), b"c");
720        
721        // Seek to non-existing key (should find next)
722        let iter = block.seek(b"d");
723        assert!(iter.valid());
724        assert_eq!(iter.key(), b"e");
725        
726        // Seek past all keys
727        let iter = block.seek(b"z");
728        assert!(!iter.valid());
729    }
730
731    #[test]
732    fn test_prefix_compression() {
733        let mut builder = BlockBuilder::new(16);
734        
735        // Keys with common prefix (MUST be in sorted order!)
736        builder.add(b"user:1000:age", b"30");
737        builder.add(b"user:1000:email", b"alice@example.com");
738        builder.add(b"user:1000:name", b"Alice");
739        builder.add(b"user:1001:name", b"Bob");
740        
741        let data = builder.finish();
742        
743        // Verify compression happened (data should be smaller than uncompressed)
744        let uncompressed_size = 
745            b"user:1000:age".len() + b"30".len() +
746            b"user:1000:email".len() + b"alice@example.com".len() +
747            b"user:1000:name".len() + b"Alice".len() +
748            b"user:1001:name".len() + b"Bob".len();
749        
750        // Block should be smaller due to prefix compression
751        // (accounting for some overhead)
752        assert!(data.len() < uncompressed_size + 50);
753        
754        // Verify all keys are retrievable
755        let block = Block::new(data).unwrap();
756        assert_eq!(block.get(b"user:1000:age"), Some(b"30".to_vec()));
757        assert_eq!(block.get(b"user:1000:email"), Some(b"alice@example.com".to_vec()));
758        assert_eq!(block.get(b"user:1000:name"), Some(b"Alice".to_vec()));
759        assert_eq!(block.get(b"user:1001:name"), Some(b"Bob".to_vec()));
760    }
761
762    #[test]
763    fn test_varint_encoding() {
764        let test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
765        
766        for &value in &test_values {
767            let mut buf = Vec::new();
768            encode_varint(&mut buf, value);
769            
770            let mut cursor = Cursor::new(&buf);
771            let decoded = decode_varint(&mut cursor).unwrap();
772            
773            assert_eq!(value, decoded, "Failed for value {}", value);
774        }
775    }
776
777    #[test]
778    fn test_block_handle() {
779        let handle = BlockHandle::new(12345, 67890);
780        let encoded = handle.encode();
781        
782        let (decoded, len) = BlockHandle::decode(&encoded).unwrap();
783        assert_eq!(handle, decoded);
784        assert_eq!(len, encoded.len());
785    }
786}