sochdb_storage/sstable/
table.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SSTable Reader
16//!
17//! This module provides an SSTable reader with:
18//! - Memory-mapped I/O for efficient access
19//! - Lazy block loading
20//! - Block cache integration
21//! - Binary search in index for O(log n) lookups
22//! - Filter-based negative lookup optimization
23
24use std::cmp::Ordering;
25use std::collections::HashMap;
26use std::fs::File;
27use std::io::{Read, Seek, SeekFrom};
28use std::path::{Path, PathBuf};
29use std::sync::Arc;
30
31use memmap2::{Mmap, MmapOptions};
32use parking_lot::RwLock;
33
34use super::block::{Block, BlockHandle, BlockIterator, BlockType};
35use super::filter::FilterReader;
36use super::format::{Footer, Header, Section, SectionType, SSTableFormat, HEADER_SIZE};
37
38/// Block cache entry
39pub struct CachedBlock {
40    /// Raw block data
41    pub data: Vec<u8>,
42    /// Block type (compression)
43    pub block_type: BlockType,
44    /// Decompressed data (if applicable)
45    pub decompressed: Vec<u8>,
46}
47
48/// Simple block cache (HashMap-based for simplicity)
49pub struct BlockCache {
50    /// Cache entries by (file_id, block_offset)
51    entries: RwLock<HashMap<(u64, u64), Arc<CachedBlock>>>,
52    /// Maximum capacity
53    capacity: usize,
54}
55
56impl BlockCache {
57    /// Create a new block cache
58    pub fn new(capacity: usize) -> Self {
59        Self {
60            entries: RwLock::new(HashMap::with_capacity(capacity)),
61            capacity,
62        }
63    }
64
65    /// Get a cached block
66    pub fn get(&self, file_id: u64, offset: u64) -> Option<Arc<CachedBlock>> {
67        self.entries.read().get(&(file_id, offset)).cloned()
68    }
69
70    /// Insert a block into cache
71    pub fn insert(&self, file_id: u64, offset: u64, block: CachedBlock) -> Arc<CachedBlock> {
72        let block = Arc::new(block);
73        let mut entries = self.entries.write();
74        
75        // Simple eviction: clear when full
76        if entries.len() >= self.capacity {
77            entries.clear();
78        }
79        
80        entries.insert((file_id, offset), block.clone());
81        block
82    }
83}
84
85/// Read options
86#[derive(Debug, Clone)]
87pub struct ReadOptions {
88    /// Verify checksums when reading blocks
89    pub verify_checksums: bool,
90    /// Fill block cache
91    pub fill_cache: bool,
92    /// Use filter to skip blocks
93    pub use_filter: bool,
94}
95
96impl Default for ReadOptions {
97    fn default() -> Self {
98        Self {
99            verify_checksums: true,
100            fill_cache: true,
101            use_filter: true,
102        }
103    }
104}
105
106/// SSTable reader for reading SSTable files
107pub struct SSTable {
108    /// File path
109    path: PathBuf,
110    /// Unique file ID for caching
111    file_id: u64,
112    /// Memory-mapped file
113    mmap: Mmap,
114    /// Parsed header
115    header: Header,
116    /// Parsed footer with sections
117    footer: Footer,
118    /// Index block (cached)
119    index: Vec<u8>,
120    /// Parsed index entries
121    index_entries: Vec<IndexEntry>,
122    /// Filter reader (if filter section exists)
123    filter: Option<FilterReader>,
124    /// File metadata
125    metadata: TableMetadata,
126    /// Block cache reference
127    cache: Option<Arc<BlockCache>>,
128}
129
130/// Index entry
131#[derive(Debug, Clone)]
132struct IndexEntry {
133    /// Largest key in this block (separator)
134    largest_key: Vec<u8>,
135    /// Block handle
136    handle: BlockHandle,
137}
138
139/// Table metadata
140#[derive(Debug, Clone)]
141pub struct TableMetadata {
142    /// File size
143    pub file_size: u64,
144    /// Number of data blocks
145    pub num_data_blocks: usize,
146    /// Smallest key
147    pub smallest_key: Option<Vec<u8>>,
148    /// Largest key
149    pub largest_key: Option<Vec<u8>>,
150}
151
152impl SSTable {
153    /// Open an SSTable file
154    pub fn open<P: AsRef<Path>>(path: P) -> std::io::Result<Self> {
155        Self::open_with_cache(path, None)
156    }
157
158    /// Open an SSTable file with a block cache
159    pub fn open_with_cache<P: AsRef<Path>>(
160        path: P,
161        cache: Option<Arc<BlockCache>>,
162    ) -> std::io::Result<Self> {
163        let path = path.as_ref();
164        let file = File::open(path)?;
165        let file_size = file.metadata()?.len();
166
167        // Memory-map the file
168        let mmap = unsafe { MmapOptions::new().map(&file)? };
169
170        // Generate file ID from path hash
171        let file_id = {
172            use std::hash::{Hash, Hasher};
173            let mut hasher = std::collections::hash_map::DefaultHasher::new();
174            path.hash(&mut hasher);
175            hasher.finish()
176        };
177
178        // Parse header
179        if mmap.len() < HEADER_SIZE {
180            return Err(std::io::Error::new(
181                std::io::ErrorKind::InvalidData,
182                "File too small for SSTable header",
183            ));
184        }
185
186        let header = Header::decode(&mmap[..HEADER_SIZE]).ok_or_else(|| {
187            std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid SSTable header")
188        })?;
189
190        // Parse footer
191        let footer_offset = header.footer_offset as usize;
192        if footer_offset >= mmap.len() {
193            return Err(std::io::Error::new(
194                std::io::ErrorKind::InvalidData,
195                "Footer offset beyond file",
196            ));
197        }
198
199        let footer = Footer::decode(&mmap[footer_offset..], header.num_sections).ok_or_else(|| {
200            std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid SSTable footer")
201        })?;
202
203        // Load index section
204        let index_section = footer
205            .sections
206            .iter()
207            .find(|s| s.section_type == SectionType::Index)
208            .ok_or_else(|| {
209                std::io::Error::new(std::io::ErrorKind::InvalidData, "Missing index section")
210            })?;
211
212        let index_start = index_section.offset as usize;
213        let index_end = index_start + index_section.size as usize;
214        let index = mmap[index_start..index_end].to_vec();
215
216        // Parse index entries
217        let index_entries = Self::parse_index(&index)?;
218
219        // Load filter section if present
220        let filter = footer
221            .sections
222            .iter()
223            .find(|s| s.section_type == SectionType::Filter)
224            .and_then(|section| {
225                let start = section.offset as usize;
226                let end = start + section.size as usize;
227                FilterReader::from_bytes(&mmap[start..end])
228            });
229
230        // Extract metadata
231        let metadata = TableMetadata {
232            file_size,
233            num_data_blocks: index_entries.len(),
234            smallest_key: index_entries.first().map(|e| e.largest_key.clone()),
235            largest_key: index_entries.last().map(|e| e.largest_key.clone()),
236        };
237
238        Ok(Self {
239            path: path.to_path_buf(),
240            file_id,
241            mmap,
242            header,
243            footer,
244            index,
245            index_entries,
246            filter,
247            metadata,
248            cache,
249        })
250    }
251
252    /// Parse index entries from index block data
253    fn parse_index(data: &[u8]) -> std::io::Result<Vec<IndexEntry>> {
254        let mut entries = Vec::new();
255        let block = Block::new(data.to_vec()).ok_or_else(|| {
256            std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid index block")
257        })?;
258        let mut iter = block.iter();
259
260        while iter.valid() {
261            let key = iter.key().to_vec();
262            let value = iter.value();
263
264            let (handle, _bytes_read) = BlockHandle::decode(value).ok_or_else(|| {
265                std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid block handle")
266            })?;
267
268            entries.push(IndexEntry {
269                largest_key: key,
270                handle,
271            });
272
273            iter.next();
274        }
275
276        Ok(entries)
277    }
278
279    /// Get a value by key
280    pub fn get(&self, key: &[u8], options: &ReadOptions) -> std::io::Result<Option<Vec<u8>>> {
281        // Use filter to check if key might exist
282        if options.use_filter {
283            if let Some(ref filter) = self.filter {
284                if !filter.may_contain(key) {
285                    return Ok(None);
286                }
287            }
288        }
289
290        // Binary search in index to find the right block
291        let block_idx = self.find_block_for_key(key);
292        if block_idx >= self.index_entries.len() {
293            return Ok(None);
294        }
295
296        // Load and search the block
297        let block_data = self.read_block(&self.index_entries[block_idx].handle, options)?;
298        let block = Block::new(block_data).ok_or_else(|| {
299            std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid data block")
300        })?;
301
302        let iter = block.seek(key);
303        if iter.valid() && iter.key() == key {
304            Ok(Some(iter.value().to_vec()))
305        } else {
306            Ok(None)
307        }
308    }
309
310    /// Binary search to find block that might contain the key
311    fn find_block_for_key(&self, key: &[u8]) -> usize {
312        // Binary search for first block where largest_key >= key
313        self.index_entries
314            .binary_search_by(|entry| {
315                if entry.largest_key.as_slice() < key {
316                    Ordering::Less
317                } else {
318                    Ordering::Greater
319                }
320            })
321            .unwrap_or_else(|i| i)
322    }
323
324    /// Read a block from file
325    fn read_block(
326        &self,
327        handle: &BlockHandle,
328        options: &ReadOptions,
329    ) -> std::io::Result<Vec<u8>> {
330        let offset = handle.offset();
331        let size = handle.size();
332
333        // Try cache first
334        if let Some(ref cache) = self.cache {
335            if let Some(block) = cache.get(self.file_id, offset) {
336                return Ok(block.decompressed.clone());
337            }
338        }
339
340        // Read from mmap
341        let start = offset as usize;
342        let end = start + size as usize;
343
344        if end + 5 > self.mmap.len() {
345            return Err(std::io::Error::new(
346                std::io::ErrorKind::InvalidData,
347                "Block extends beyond file",
348            ));
349        }
350
351        let block_data = &self.mmap[start..end];
352        let block_type = BlockType::from_u8(self.mmap[end]);
353        let stored_checksum = u32::from_le_bytes([
354            self.mmap[end + 1],
355            self.mmap[end + 2],
356            self.mmap[end + 3],
357            self.mmap[end + 4],
358        ]);
359
360        // Verify checksum if requested
361        if options.verify_checksums {
362            let computed_checksum = crc32fast::hash(block_data);
363            if computed_checksum != stored_checksum {
364                return Err(std::io::Error::new(
365                    std::io::ErrorKind::InvalidData,
366                    "Block checksum mismatch",
367                ));
368            }
369        }
370
371        // Decompress if needed
372        let decompressed = match block_type {
373            BlockType::Uncompressed => block_data.to_vec(),
374            BlockType::Lz4 => lz4_flex::decompress_size_prepended(block_data).map_err(|e| {
375                std::io::Error::new(std::io::ErrorKind::InvalidData, format!("LZ4 error: {}", e))
376            })?,
377            BlockType::Zstd => zstd::decode_all(block_data).map_err(|e| {
378                std::io::Error::new(std::io::ErrorKind::InvalidData, format!("Zstd error: {}", e))
379            })?,
380            BlockType::Snappy => {
381                return Err(std::io::Error::new(
382                    std::io::ErrorKind::InvalidData,
383                    "Snappy not supported",
384                ))
385            }
386        };
387
388        // Cache the block
389        if options.fill_cache {
390            if let Some(ref cache) = self.cache {
391                cache.insert(
392                    self.file_id,
393                    offset,
394                    CachedBlock {
395                        data: block_data.to_vec(),
396                        block_type,
397                        decompressed: decompressed.clone(),
398                    },
399                );
400            }
401        }
402
403        Ok(decompressed)
404    }
405
406    /// Create an iterator over all entries
407    pub fn iter(&self) -> SSTableIterator {
408        SSTableIterator::new(self)
409    }
410
411    /// Create a range iterator
412    pub fn range(
413        &self,
414        start: Option<&[u8]>,
415        end: Option<&[u8]>,
416    ) -> RangeIterator {
417        RangeIterator::new(self, start, end)
418    }
419
420    /// Get table metadata
421    pub fn metadata(&self) -> &TableMetadata {
422        &self.metadata
423    }
424
425    /// Get file path
426    pub fn path(&self) -> &Path {
427        &self.path
428    }
429
430    /// Get number of data blocks
431    pub fn num_blocks(&self) -> usize {
432        self.index_entries.len()
433    }
434
435    /// Check if key might exist (using filter)
436    pub fn may_contain(&self, key: &[u8]) -> bool {
437        self.filter
438            .as_ref()
439            .map(|f| f.may_contain(key))
440            .unwrap_or(true)
441    }
442}
443
444/// Iterator over all entries in an SSTable
445pub struct SSTableIterator<'a> {
446    table: &'a SSTable,
447    /// Current block index
448    block_idx: usize,
449    /// Current block data
450    block_data: Option<Vec<u8>>,
451    /// Current block iterator
452    block_iter: Option<BlockIterator<'a>>,
453    /// Read options
454    options: ReadOptions,
455    /// Is iterator valid
456    valid: bool,
457}
458
459impl<'a> SSTableIterator<'a> {
460    fn new(table: &'a SSTable) -> Self {
461        let mut iter = Self {
462            table,
463            block_idx: 0,
464            block_data: None,
465            block_iter: None,
466            options: ReadOptions::default(),
467            valid: false,
468        };
469        iter.load_block();
470        iter
471    }
472
473    /// Load current block
474    fn load_block(&mut self) {
475        if self.block_idx >= self.table.index_entries.len() {
476            self.valid = false;
477            return;
478        }
479
480        let handle = &self.table.index_entries[self.block_idx].handle;
481        match self.table.read_block(handle, &self.options) {
482            Ok(data) => {
483                self.block_data = Some(data);
484                self.valid = true;
485            }
486            Err(_) => {
487                self.valid = false;
488            }
489        }
490    }
491
492    /// Check if iterator is valid
493    pub fn valid(&self) -> bool {
494        self.valid
495    }
496
497    /// Get current key
498    pub fn key(&self) -> Option<&[u8]> {
499        if !self.valid {
500            return None;
501        }
502        // Note: In a full implementation, this would return the current key from block_iter
503        // This is a simplified version
504        self.block_data.as_ref().map(|_| &b""[..])
505    }
506
507    /// Get current value
508    pub fn value(&self) -> Option<&[u8]> {
509        if !self.valid {
510            return None;
511        }
512        self.block_data.as_ref().map(|_| &b""[..])
513    }
514
515    /// Move to next entry
516    pub fn next(&mut self) {
517        // In a full implementation:
518        // 1. Advance block_iter
519        // 2. If block_iter exhausted, load next block
520        self.block_idx += 1;
521        self.load_block();
522    }
523
524    /// Seek to key
525    pub fn seek(&mut self, target: &[u8]) {
526        // Binary search to find starting block
527        self.block_idx = self.table.find_block_for_key(target);
528        self.load_block();
529        // Then seek within the block
530    }
531}
532
533/// Range iterator
534pub struct RangeIterator<'a> {
535    table: &'a SSTable,
536    start: Option<Vec<u8>>,
537    end: Option<Vec<u8>>,
538    current_block: usize,
539    exhausted: bool,
540}
541
542impl<'a> RangeIterator<'a> {
543    fn new(table: &'a SSTable, start: Option<&[u8]>, end: Option<&[u8]>) -> Self {
544        let start_block = start
545            .map(|k| table.find_block_for_key(k))
546            .unwrap_or(0);
547
548        Self {
549            table,
550            start: start.map(|s| s.to_vec()),
551            end: end.map(|e| e.to_vec()),
552            current_block: start_block,
553            exhausted: false,
554        }
555    }
556
557    /// Check if range is exhausted
558    pub fn exhausted(&self) -> bool {
559        self.exhausted
560    }
561}
562
563// =============================================================================
564// Tests
565// =============================================================================
566
567#[cfg(test)]
568mod tests {
569    use super::*;
570    use crate::sstable::builder::{SSTableBuilder, SSTableBuilderOptions};
571    use tempfile::tempdir;
572
573    #[test]
574    fn test_roundtrip() {
575        let dir = tempdir().unwrap();
576        let path = dir.path().join("test.sst");
577
578        // Build SSTable
579        let options = SSTableBuilderOptions {
580            block_size: 256,
581            filter_policy: None,
582            ..Default::default()
583        };
584
585        let mut builder = SSTableBuilder::new(&path, options).unwrap();
586
587        for i in 0..100 {
588            let key = format!("key{:05}", i);
589            let value = format!("value{:05}", i);
590            builder.add(key.as_bytes(), value.as_bytes()).unwrap();
591        }
592
593        builder.finish().unwrap();
594
595        // Read SSTable
596        let table = SSTable::open(&path).unwrap();
597
598        assert_eq!(table.num_blocks(), table.metadata.num_data_blocks);
599    }
600
601    #[test]
602    fn test_get() {
603        let dir = tempdir().unwrap();
604        let path = dir.path().join("test_get.sst");
605
606        let options = SSTableBuilderOptions {
607            block_size: 256,
608            filter_policy: None,
609            ..Default::default()
610        };
611
612        let mut builder = SSTableBuilder::new(&path, options).unwrap();
613
614        for i in 0..100 {
615            let key = format!("key{:05}", i);
616            let value = format!("value{:05}", i);
617            builder.add(key.as_bytes(), value.as_bytes()).unwrap();
618        }
619
620        builder.finish().unwrap();
621
622        let table = SSTable::open(&path).unwrap();
623        let read_opts = ReadOptions::default();
624
625        // Test existing key
626        let result = table.get(b"key00050", &read_opts).unwrap();
627        assert!(result.is_some());
628        assert_eq!(result.unwrap(), b"value00050");
629
630        // Test non-existing key
631        let result = table.get(b"nonexistent", &read_opts).unwrap();
632        assert!(result.is_none());
633    }
634
635    #[test]
636    fn test_block_cache() {
637        let cache = BlockCache::new(100);
638
639        let block = CachedBlock {
640            data: vec![1, 2, 3],
641            block_type: BlockType::Uncompressed,
642            decompressed: vec![1, 2, 3],
643        };
644
645        cache.insert(1, 0, block);
646
647        let cached = cache.get(1, 0);
648        assert!(cached.is_some());
649        assert_eq!(cached.unwrap().data, vec![1, 2, 3]);
650
651        let missing = cache.get(1, 100);
652        assert!(missing.is_none());
653    }
654}