Skip to main content

sochdb_storage/sstable/
table.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! SSTable Reader
19//!
20//! This module provides an SSTable reader with:
21//! - Memory-mapped I/O for efficient access
22//! - Lazy block loading
23//! - Block cache integration
24//! - Binary search in index for O(log n) lookups
25//! - Filter-based negative lookup optimization
26
27use std::cmp::Ordering;
28use std::collections::HashMap;
29use std::fs::File;
30use std::io::{Read, Seek, SeekFrom};
31use std::path::{Path, PathBuf};
32use std::sync::Arc;
33
34use memmap2::{Mmap, MmapOptions};
35use parking_lot::RwLock;
36
37use super::block::{Block, BlockHandle, BlockIterator, BlockType};
38use super::filter::FilterReader;
39use super::format::{Footer, Header, Section, SectionType, SSTableFormat, HEADER_SIZE};
40
41/// Block cache entry
42pub struct CachedBlock {
43    /// Raw block data
44    pub data: Vec<u8>,
45    /// Block type (compression)
46    pub block_type: BlockType,
47    /// Decompressed data (if applicable)
48    pub decompressed: Vec<u8>,
49}
50
51/// Simple block cache (HashMap-based for simplicity)
52pub struct BlockCache {
53    /// Cache entries by (file_id, block_offset)
54    entries: RwLock<HashMap<(u64, u64), Arc<CachedBlock>>>,
55    /// Maximum capacity
56    capacity: usize,
57}
58
59impl BlockCache {
60    /// Create a new block cache
61    pub fn new(capacity: usize) -> Self {
62        Self {
63            entries: RwLock::new(HashMap::with_capacity(capacity)),
64            capacity,
65        }
66    }
67
68    /// Get a cached block
69    pub fn get(&self, file_id: u64, offset: u64) -> Option<Arc<CachedBlock>> {
70        self.entries.read().get(&(file_id, offset)).cloned()
71    }
72
73    /// Insert a block into cache
74    pub fn insert(&self, file_id: u64, offset: u64, block: CachedBlock) -> Arc<CachedBlock> {
75        let block = Arc::new(block);
76        let mut entries = self.entries.write();
77        
78        // Simple eviction: clear when full
79        if entries.len() >= self.capacity {
80            entries.clear();
81        }
82        
83        entries.insert((file_id, offset), block.clone());
84        block
85    }
86}
87
88/// Read options
89#[derive(Debug, Clone)]
90pub struct ReadOptions {
91    /// Verify checksums when reading blocks
92    pub verify_checksums: bool,
93    /// Fill block cache
94    pub fill_cache: bool,
95    /// Use filter to skip blocks
96    pub use_filter: bool,
97}
98
99impl Default for ReadOptions {
100    fn default() -> Self {
101        Self {
102            verify_checksums: true,
103            fill_cache: true,
104            use_filter: true,
105        }
106    }
107}
108
109/// SSTable reader for reading SSTable files
110pub struct SSTable {
111    /// File path
112    path: PathBuf,
113    /// Unique file ID for caching
114    file_id: u64,
115    /// Memory-mapped file
116    mmap: Mmap,
117    /// Parsed header
118    header: Header,
119    /// Parsed footer with sections
120    footer: Footer,
121    /// Index block (cached)
122    index: Vec<u8>,
123    /// Parsed index entries
124    index_entries: Vec<IndexEntry>,
125    /// Filter reader (if filter section exists)
126    filter: Option<FilterReader>,
127    /// File metadata
128    metadata: TableMetadata,
129    /// Block cache reference
130    cache: Option<Arc<BlockCache>>,
131}
132
133/// Index entry
134#[derive(Debug, Clone)]
135struct IndexEntry {
136    /// Largest key in this block (separator)
137    largest_key: Vec<u8>,
138    /// Block handle
139    handle: BlockHandle,
140}
141
142/// Table metadata
143#[derive(Debug, Clone)]
144pub struct TableMetadata {
145    /// File size
146    pub file_size: u64,
147    /// Number of data blocks
148    pub num_data_blocks: usize,
149    /// Smallest key
150    pub smallest_key: Option<Vec<u8>>,
151    /// Largest key
152    pub largest_key: Option<Vec<u8>>,
153}
154
155impl SSTable {
156    /// Open an SSTable file
157    pub fn open<P: AsRef<Path>>(path: P) -> std::io::Result<Self> {
158        Self::open_with_cache(path, None)
159    }
160
161    /// Open an SSTable file with a block cache
162    pub fn open_with_cache<P: AsRef<Path>>(
163        path: P,
164        cache: Option<Arc<BlockCache>>,
165    ) -> std::io::Result<Self> {
166        let path = path.as_ref();
167        let file = File::open(path)?;
168        let file_size = file.metadata()?.len();
169
170        // Memory-map the file
171        let mmap = unsafe { MmapOptions::new().map(&file)? };
172
173        // Generate file ID from path hash
174        let file_id = {
175            use std::hash::{Hash, Hasher};
176            let mut hasher = std::collections::hash_map::DefaultHasher::new();
177            path.hash(&mut hasher);
178            hasher.finish()
179        };
180
181        // Parse header
182        if mmap.len() < HEADER_SIZE {
183            return Err(std::io::Error::new(
184                std::io::ErrorKind::InvalidData,
185                "File too small for SSTable header",
186            ));
187        }
188
189        let header = Header::decode(&mmap[..HEADER_SIZE]).ok_or_else(|| {
190            std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid SSTable header")
191        })?;
192
193        // Parse footer
194        let footer_offset = header.footer_offset as usize;
195        if footer_offset >= mmap.len() {
196            return Err(std::io::Error::new(
197                std::io::ErrorKind::InvalidData,
198                "Footer offset beyond file",
199            ));
200        }
201
202        let footer = Footer::decode(&mmap[footer_offset..], header.num_sections).ok_or_else(|| {
203            std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid SSTable footer")
204        })?;
205
206        // Load index section
207        let index_section = footer
208            .sections
209            .iter()
210            .find(|s| s.section_type == SectionType::Index)
211            .ok_or_else(|| {
212                std::io::Error::new(std::io::ErrorKind::InvalidData, "Missing index section")
213            })?;
214
215        let index_start = index_section.offset as usize;
216        let index_end = index_start + index_section.size as usize;
217        let index = mmap[index_start..index_end].to_vec();
218
219        // Parse index entries
220        let index_entries = Self::parse_index(&index)?;
221
222        // Load filter section if present
223        let filter = footer
224            .sections
225            .iter()
226            .find(|s| s.section_type == SectionType::Filter)
227            .and_then(|section| {
228                let start = section.offset as usize;
229                let end = start + section.size as usize;
230                FilterReader::from_bytes(&mmap[start..end])
231            });
232
233        // Extract metadata
234        let metadata = TableMetadata {
235            file_size,
236            num_data_blocks: index_entries.len(),
237            smallest_key: index_entries.first().map(|e| e.largest_key.clone()),
238            largest_key: index_entries.last().map(|e| e.largest_key.clone()),
239        };
240
241        Ok(Self {
242            path: path.to_path_buf(),
243            file_id,
244            mmap,
245            header,
246            footer,
247            index,
248            index_entries,
249            filter,
250            metadata,
251            cache,
252        })
253    }
254
255    /// Parse index entries from index block data
256    fn parse_index(data: &[u8]) -> std::io::Result<Vec<IndexEntry>> {
257        let mut entries = Vec::new();
258        let block = Block::new(data.to_vec()).ok_or_else(|| {
259            std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid index block")
260        })?;
261        let mut iter = block.iter();
262
263        while iter.valid() {
264            let key = iter.key().to_vec();
265            let value = iter.value();
266
267            let (handle, _bytes_read) = BlockHandle::decode(value).ok_or_else(|| {
268                std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid block handle")
269            })?;
270
271            entries.push(IndexEntry {
272                largest_key: key,
273                handle,
274            });
275
276            iter.next();
277        }
278
279        Ok(entries)
280    }
281
282    /// Get a value by key
283    pub fn get(&self, key: &[u8], options: &ReadOptions) -> std::io::Result<Option<Vec<u8>>> {
284        // Use filter to check if key might exist
285        if options.use_filter {
286            if let Some(ref filter) = self.filter {
287                if !filter.may_contain(key) {
288                    return Ok(None);
289                }
290            }
291        }
292
293        // Binary search in index to find the right block
294        let block_idx = self.find_block_for_key(key);
295        if block_idx >= self.index_entries.len() {
296            return Ok(None);
297        }
298
299        // Load and search the block
300        let block_data = self.read_block(&self.index_entries[block_idx].handle, options)?;
301        let block = Block::new(block_data).ok_or_else(|| {
302            std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid data block")
303        })?;
304
305        let iter = block.seek(key);
306        if iter.valid() && iter.key() == key {
307            Ok(Some(iter.value().to_vec()))
308        } else {
309            Ok(None)
310        }
311    }
312
313    /// Binary search to find block that might contain the key
314    fn find_block_for_key(&self, key: &[u8]) -> usize {
315        // Binary search for first block where largest_key >= key
316        self.index_entries
317            .binary_search_by(|entry| {
318                if entry.largest_key.as_slice() < key {
319                    Ordering::Less
320                } else {
321                    Ordering::Greater
322                }
323            })
324            .unwrap_or_else(|i| i)
325    }
326
327    /// Read a block from file
328    fn read_block(
329        &self,
330        handle: &BlockHandle,
331        options: &ReadOptions,
332    ) -> std::io::Result<Vec<u8>> {
333        let offset = handle.offset();
334        let size = handle.size();
335
336        // Try cache first
337        if let Some(ref cache) = self.cache {
338            if let Some(block) = cache.get(self.file_id, offset) {
339                return Ok(block.decompressed.clone());
340            }
341        }
342
343        // Read from mmap
344        let start = offset as usize;
345        let end = start + size as usize;
346
347        if end + 5 > self.mmap.len() {
348            return Err(std::io::Error::new(
349                std::io::ErrorKind::InvalidData,
350                "Block extends beyond file",
351            ));
352        }
353
354        let block_data = &self.mmap[start..end];
355        let block_type = BlockType::from_u8(self.mmap[end]);
356        let stored_checksum = u32::from_le_bytes([
357            self.mmap[end + 1],
358            self.mmap[end + 2],
359            self.mmap[end + 3],
360            self.mmap[end + 4],
361        ]);
362
363        // Verify checksum if requested
364        if options.verify_checksums {
365            let computed_checksum = crc32fast::hash(block_data);
366            if computed_checksum != stored_checksum {
367                return Err(std::io::Error::new(
368                    std::io::ErrorKind::InvalidData,
369                    "Block checksum mismatch",
370                ));
371            }
372        }
373
374        // Decompress if needed
375        let decompressed = match block_type {
376            BlockType::Uncompressed => block_data.to_vec(),
377            BlockType::Lz4 => lz4_flex::decompress_size_prepended(block_data).map_err(|e| {
378                std::io::Error::new(std::io::ErrorKind::InvalidData, format!("LZ4 error: {}", e))
379            })?,
380            BlockType::Zstd => zstd::decode_all(block_data).map_err(|e| {
381                std::io::Error::new(std::io::ErrorKind::InvalidData, format!("Zstd error: {}", e))
382            })?,
383            BlockType::Snappy => {
384                return Err(std::io::Error::new(
385                    std::io::ErrorKind::InvalidData,
386                    "Snappy not supported",
387                ))
388            }
389        };
390
391        // Cache the block
392        if options.fill_cache {
393            if let Some(ref cache) = self.cache {
394                cache.insert(
395                    self.file_id,
396                    offset,
397                    CachedBlock {
398                        data: block_data.to_vec(),
399                        block_type,
400                        decompressed: decompressed.clone(),
401                    },
402                );
403            }
404        }
405
406        Ok(decompressed)
407    }
408
409    /// Create an iterator over all entries
410    pub fn iter(&self) -> SSTableIterator {
411        SSTableIterator::new(self)
412    }
413
414    /// Create a range iterator
415    pub fn range(
416        &self,
417        start: Option<&[u8]>,
418        end: Option<&[u8]>,
419    ) -> RangeIterator {
420        RangeIterator::new(self, start, end)
421    }
422
423    /// Get table metadata
424    pub fn metadata(&self) -> &TableMetadata {
425        &self.metadata
426    }
427
428    /// Get file path
429    pub fn path(&self) -> &Path {
430        &self.path
431    }
432
433    /// Get number of data blocks
434    pub fn num_blocks(&self) -> usize {
435        self.index_entries.len()
436    }
437
438    /// Check if key might exist (using filter)
439    pub fn may_contain(&self, key: &[u8]) -> bool {
440        self.filter
441            .as_ref()
442            .map(|f| f.may_contain(key))
443            .unwrap_or(true)
444    }
445}
446
447/// Iterator over all entries in an SSTable
448pub struct SSTableIterator<'a> {
449    table: &'a SSTable,
450    /// Current block index
451    block_idx: usize,
452    /// Current block data
453    block_data: Option<Vec<u8>>,
454    /// Current block iterator
455    block_iter: Option<BlockIterator<'a>>,
456    /// Read options
457    options: ReadOptions,
458    /// Is iterator valid
459    valid: bool,
460}
461
462impl<'a> SSTableIterator<'a> {
463    fn new(table: &'a SSTable) -> Self {
464        let mut iter = Self {
465            table,
466            block_idx: 0,
467            block_data: None,
468            block_iter: None,
469            options: ReadOptions::default(),
470            valid: false,
471        };
472        iter.load_block();
473        iter
474    }
475
476    /// Load current block
477    fn load_block(&mut self) {
478        if self.block_idx >= self.table.index_entries.len() {
479            self.valid = false;
480            return;
481        }
482
483        let handle = &self.table.index_entries[self.block_idx].handle;
484        match self.table.read_block(handle, &self.options) {
485            Ok(data) => {
486                self.block_data = Some(data);
487                self.valid = true;
488            }
489            Err(_) => {
490                self.valid = false;
491            }
492        }
493    }
494
495    /// Check if iterator is valid
496    pub fn valid(&self) -> bool {
497        self.valid
498    }
499
500    /// Get current key
501    pub fn key(&self) -> Option<&[u8]> {
502        if !self.valid {
503            return None;
504        }
505        // Note: In a full implementation, this would return the current key from block_iter
506        // This is a simplified version
507        self.block_data.as_ref().map(|_| &b""[..])
508    }
509
510    /// Get current value
511    pub fn value(&self) -> Option<&[u8]> {
512        if !self.valid {
513            return None;
514        }
515        self.block_data.as_ref().map(|_| &b""[..])
516    }
517
518    /// Move to next entry
519    pub fn next(&mut self) {
520        // In a full implementation:
521        // 1. Advance block_iter
522        // 2. If block_iter exhausted, load next block
523        self.block_idx += 1;
524        self.load_block();
525    }
526
527    /// Seek to key
528    pub fn seek(&mut self, target: &[u8]) {
529        // Binary search to find starting block
530        self.block_idx = self.table.find_block_for_key(target);
531        self.load_block();
532        // Then seek within the block
533    }
534}
535
536/// Range iterator
537pub struct RangeIterator<'a> {
538    table: &'a SSTable,
539    start: Option<Vec<u8>>,
540    end: Option<Vec<u8>>,
541    current_block: usize,
542    exhausted: bool,
543}
544
545impl<'a> RangeIterator<'a> {
546    fn new(table: &'a SSTable, start: Option<&[u8]>, end: Option<&[u8]>) -> Self {
547        let start_block = start
548            .map(|k| table.find_block_for_key(k))
549            .unwrap_or(0);
550
551        Self {
552            table,
553            start: start.map(|s| s.to_vec()),
554            end: end.map(|e| e.to_vec()),
555            current_block: start_block,
556            exhausted: false,
557        }
558    }
559
560    /// Check if range is exhausted
561    pub fn exhausted(&self) -> bool {
562        self.exhausted
563    }
564}
565
566// =============================================================================
567// Tests
568// =============================================================================
569
570#[cfg(test)]
571mod tests {
572    use super::*;
573    use crate::sstable::builder::{SSTableBuilder, SSTableBuilderOptions};
574    use tempfile::tempdir;
575
576    #[test]
577    fn test_roundtrip() {
578        let dir = tempdir().unwrap();
579        let path = dir.path().join("test.sst");
580
581        // Build SSTable
582        let options = SSTableBuilderOptions {
583            block_size: 256,
584            filter_policy: None,
585            ..Default::default()
586        };
587
588        let mut builder = SSTableBuilder::new(&path, options).unwrap();
589
590        for i in 0..100 {
591            let key = format!("key{:05}", i);
592            let value = format!("value{:05}", i);
593            builder.add(key.as_bytes(), value.as_bytes()).unwrap();
594        }
595
596        builder.finish().unwrap();
597
598        // Read SSTable
599        let table = SSTable::open(&path).unwrap();
600
601        assert_eq!(table.num_blocks(), table.metadata.num_data_blocks);
602    }
603
604    #[test]
605    fn test_get() {
606        let dir = tempdir().unwrap();
607        let path = dir.path().join("test_get.sst");
608
609        let options = SSTableBuilderOptions {
610            block_size: 256,
611            filter_policy: None,
612            ..Default::default()
613        };
614
615        let mut builder = SSTableBuilder::new(&path, options).unwrap();
616
617        for i in 0..100 {
618            let key = format!("key{:05}", i);
619            let value = format!("value{:05}", i);
620            builder.add(key.as_bytes(), value.as_bytes()).unwrap();
621        }
622
623        builder.finish().unwrap();
624
625        let table = SSTable::open(&path).unwrap();
626        let read_opts = ReadOptions::default();
627
628        // Test existing key
629        let result = table.get(b"key00050", &read_opts).unwrap();
630        assert!(result.is_some());
631        assert_eq!(result.unwrap(), b"value00050");
632
633        // Test non-existing key
634        let result = table.get(b"nonexistent", &read_opts).unwrap();
635        assert!(result.is_none());
636    }
637
638    #[test]
639    fn test_block_cache() {
640        let cache = BlockCache::new(100);
641
642        let block = CachedBlock {
643            data: vec![1, 2, 3],
644            block_type: BlockType::Uncompressed,
645            decompressed: vec![1, 2, 3],
646        };
647
648        cache.insert(1, 0, block);
649
650        let cached = cache.get(1, 0);
651        assert!(cached.is_some());
652        assert_eq!(cached.unwrap().data, vec![1, 2, 3]);
653
654        let missing = cache.get(1, 100);
655        assert!(missing.is_none());
656    }
657}