Skip to main content

sochdb_storage/sstable/
table.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! SSTable Reader
19//!
20//! This module provides an SSTable reader with:
21//! - Memory-mapped I/O for efficient access
22//! - Lazy block loading
23//! - Block cache integration
24//! - Binary search in index for O(log n) lookups
25//! - Filter-based negative lookup optimization
26
27use std::cmp::Ordering;
28use std::collections::HashMap;
29use std::fs::File;
30use std::io::{Read, Seek, SeekFrom};
31use std::path::{Path, PathBuf};
32use std::sync::Arc;
33
34use memmap2::{Mmap, MmapOptions};
35use parking_lot::RwLock;
36
37use super::block::{Block, BlockHandle, BlockIterator, BlockType};
38use super::filter::FilterReader;
39use super::format::{Footer, Header, Section, SectionType, SSTableFormat, HEADER_SIZE};
40
41/// Block cache entry
42pub struct CachedBlock {
43    /// Raw block data
44    pub data: Vec<u8>,
45    /// Block type (compression)
46    pub block_type: BlockType,
47    /// Decompressed data (if applicable)
48    pub decompressed: Vec<u8>,
49}
50
51/// Simple block cache (HashMap-based for simplicity)
52pub struct BlockCache {
53    /// Cache entries by (file_id, block_offset)
54    entries: RwLock<HashMap<(u64, u64), Arc<CachedBlock>>>,
55    /// Maximum capacity
56    capacity: usize,
57}
58
59impl BlockCache {
60    /// Create a new block cache
61    pub fn new(capacity: usize) -> Self {
62        Self {
63            entries: RwLock::new(HashMap::with_capacity(capacity)),
64            capacity,
65        }
66    }
67
68    /// Get a cached block
69    pub fn get(&self, file_id: u64, offset: u64) -> Option<Arc<CachedBlock>> {
70        self.entries.read().get(&(file_id, offset)).cloned()
71    }
72
73    /// Insert a block into cache
74    pub fn insert(&self, file_id: u64, offset: u64, block: CachedBlock) -> Arc<CachedBlock> {
75        let block = Arc::new(block);
76        let mut entries = self.entries.write();
77        
78        // Simple eviction: clear when full
79        if entries.len() >= self.capacity {
80            entries.clear();
81        }
82        
83        entries.insert((file_id, offset), block.clone());
84        block
85    }
86}
87
88/// Read options
89#[derive(Debug, Clone)]
90pub struct ReadOptions {
91    /// Verify checksums when reading blocks
92    pub verify_checksums: bool,
93    /// Fill block cache
94    pub fill_cache: bool,
95    /// Use filter to skip blocks
96    pub use_filter: bool,
97}
98
99impl Default for ReadOptions {
100    fn default() -> Self {
101        Self {
102            verify_checksums: true,
103            fill_cache: true,
104            use_filter: true,
105        }
106    }
107}
108
109/// SSTable reader for reading SSTable files
110pub struct SSTable {
111    /// File path
112    path: PathBuf,
113    /// Unique file ID for caching
114    file_id: u64,
115    /// Memory-mapped file
116    mmap: Mmap,
117    /// Parsed header
118    header: Header,
119    /// Parsed footer with sections
120    footer: Footer,
121    /// Index block (cached)
122    index: Vec<u8>,
123    /// Parsed index entries
124    index_entries: Vec<IndexEntry>,
125    /// Filter reader (if filter section exists)
126    filter: Option<FilterReader>,
127    /// File metadata
128    metadata: TableMetadata,
129    /// Block cache reference
130    cache: Option<Arc<BlockCache>>,
131}
132
133/// Index entry
134#[derive(Debug, Clone)]
135struct IndexEntry {
136    /// Largest key in this block (separator)
137    largest_key: Vec<u8>,
138    /// Block handle
139    handle: BlockHandle,
140}
141
142/// Table metadata
143#[derive(Debug, Clone)]
144pub struct TableMetadata {
145    /// File size
146    pub file_size: u64,
147    /// Number of data blocks
148    pub num_data_blocks: usize,
149    /// Smallest key
150    pub smallest_key: Option<Vec<u8>>,
151    /// Largest key
152    pub largest_key: Option<Vec<u8>>,
153}
154
155impl SSTable {
156    /// Open an SSTable file
157    pub fn open<P: AsRef<Path>>(path: P) -> std::io::Result<Self> {
158        Self::open_with_cache(path, None)
159    }
160
161    /// Open an SSTable file with a block cache
162    pub fn open_with_cache<P: AsRef<Path>>(
163        path: P,
164        cache: Option<Arc<BlockCache>>,
165    ) -> std::io::Result<Self> {
166        let path = path.as_ref();
167        let file = File::open(path)?;
168        let file_size = file.metadata()?.len();
169
170        // Memory-map the file
171        let mmap = unsafe { MmapOptions::new().map(&file)? };
172
173        // Generate file ID from path hash
174        let file_id = {
175            use std::hash::{Hash, Hasher};
176            let mut hasher = std::collections::hash_map::DefaultHasher::new();
177            path.hash(&mut hasher);
178            hasher.finish()
179        };
180
181        // Parse header
182        if mmap.len() < HEADER_SIZE {
183            return Err(std::io::Error::new(
184                std::io::ErrorKind::InvalidData,
185                "File too small for SSTable header",
186            ));
187        }
188
189        let header = Header::decode(&mmap[..HEADER_SIZE]).ok_or_else(|| {
190            std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid SSTable header")
191        })?;
192
193        // Parse footer
194        let footer_offset = header.footer_offset as usize;
195        if footer_offset >= mmap.len() {
196            return Err(std::io::Error::new(
197                std::io::ErrorKind::InvalidData,
198                "Footer offset beyond file",
199            ));
200        }
201
202        let footer = Footer::decode(&mmap[footer_offset..], header.num_sections).ok_or_else(|| {
203            std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid SSTable footer")
204        })?;
205
206        // Load index section
207        let index_section = footer
208            .sections
209            .iter()
210            .find(|s| s.section_type == SectionType::Index)
211            .ok_or_else(|| {
212                std::io::Error::new(std::io::ErrorKind::InvalidData, "Missing index section")
213            })?;
214
215        let index_start = index_section.offset as usize;
216        let index_end = index_start + index_section.size as usize;
217        let index = mmap[index_start..index_end].to_vec();
218
219        // Parse index entries
220        let index_entries = Self::parse_index(&index)?;
221
222        // Load filter section if present
223        let filter = footer
224            .sections
225            .iter()
226            .find(|s| s.section_type == SectionType::Filter)
227            .and_then(|section| {
228                let start = section.offset as usize;
229                let end = start + section.size as usize;
230                FilterReader::from_bytes(&mmap[start..end])
231            });
232
233        // Extract metadata
234        let metadata = TableMetadata {
235            file_size,
236            num_data_blocks: index_entries.len(),
237            smallest_key: index_entries.first().map(|e| e.largest_key.clone()),
238            largest_key: index_entries.last().map(|e| e.largest_key.clone()),
239        };
240
241        Ok(Self {
242            path: path.to_path_buf(),
243            file_id,
244            mmap,
245            header,
246            footer,
247            index,
248            index_entries,
249            filter,
250            metadata,
251            cache,
252        })
253    }
254
255    /// Parse index entries from index block data
256    fn parse_index(data: &[u8]) -> std::io::Result<Vec<IndexEntry>> {
257        let mut entries = Vec::new();
258        let block = Block::new(data.to_vec()).ok_or_else(|| {
259            std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid index block")
260        })?;
261        let mut iter = block.iter();
262
263        while iter.valid() {
264            let key = iter.key().to_vec();
265            let value = iter.value();
266
267            let (handle, _bytes_read) = BlockHandle::decode(value).ok_or_else(|| {
268                std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid block handle")
269            })?;
270
271            entries.push(IndexEntry {
272                largest_key: key,
273                handle,
274            });
275
276            iter.next();
277        }
278
279        Ok(entries)
280    }
281
282    /// Get a value by key
283    pub fn get(&self, key: &[u8], options: &ReadOptions) -> std::io::Result<Option<Vec<u8>>> {
284        // Use filter to check if key might exist
285        if options.use_filter {
286            if let Some(ref filter) = self.filter {
287                if !filter.may_contain(key) {
288                    return Ok(None);
289                }
290            }
291        }
292
293        // Binary search in index to find the right block
294        let block_idx = self.find_block_for_key(key);
295        if block_idx >= self.index_entries.len() {
296            return Ok(None);
297        }
298
299        // Load and search the block
300        let block_data = self.read_block(&self.index_entries[block_idx].handle, options)?;
301        let block = Block::new(block_data).ok_or_else(|| {
302            std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid data block")
303        })?;
304
305        let iter = block.seek(key);
306        if iter.valid() && iter.key() == key {
307            Ok(Some(iter.value().to_vec()))
308        } else {
309            Ok(None)
310        }
311    }
312
313    /// Binary search to find block that might contain the key
314    fn find_block_for_key(&self, key: &[u8]) -> usize {
315        // Binary search for first block where largest_key >= key
316        self.index_entries
317            .binary_search_by(|entry| {
318                if entry.largest_key.as_slice() < key {
319                    Ordering::Less
320                } else {
321                    Ordering::Greater
322                }
323            })
324            .unwrap_or_else(|i| i)
325    }
326
327    /// Read a block from file
328    fn read_block(
329        &self,
330        handle: &BlockHandle,
331        options: &ReadOptions,
332    ) -> std::io::Result<Vec<u8>> {
333        let offset = handle.offset();
334        let size = handle.size();
335
336        // Try cache first
337        if let Some(ref cache) = self.cache {
338            if let Some(block) = cache.get(self.file_id, offset) {
339                return Ok(block.decompressed.clone());
340            }
341        }
342
343        // Read from mmap
344        let start = offset as usize;
345        let end = start + size as usize;
346
347        if end + 5 > self.mmap.len() {
348            return Err(std::io::Error::new(
349                std::io::ErrorKind::InvalidData,
350                "Block extends beyond file",
351            ));
352        }
353
354        let block_data = &self.mmap[start..end];
355        let block_type = BlockType::from_u8(self.mmap[end]);
356        let stored_checksum = u32::from_le_bytes([
357            self.mmap[end + 1],
358            self.mmap[end + 2],
359            self.mmap[end + 3],
360            self.mmap[end + 4],
361        ]);
362
363        // Verify checksum if requested
364        if options.verify_checksums {
365            let computed_checksum = crc32fast::hash(block_data);
366            if computed_checksum != stored_checksum {
367                return Err(std::io::Error::new(
368                    std::io::ErrorKind::InvalidData,
369                    "Block checksum mismatch",
370                ));
371            }
372        }
373
374        // Decompress if needed
375        let decompressed = match block_type {
376            BlockType::Uncompressed => block_data.to_vec(),
377            BlockType::Lz4 => lz4_flex::decompress_size_prepended(block_data).map_err(|e| {
378                std::io::Error::new(std::io::ErrorKind::InvalidData, format!("LZ4 error: {}", e))
379            })?,
380            BlockType::Zstd => zstd::decode_all(block_data).map_err(|e| {
381                std::io::Error::new(std::io::ErrorKind::InvalidData, format!("Zstd error: {}", e))
382            })?,
383            BlockType::Snappy => {
384                return Err(std::io::Error::new(
385                    std::io::ErrorKind::InvalidData,
386                    "Snappy not supported",
387                ))
388            }
389        };
390
391        // Cache the block
392        if options.fill_cache {
393            if let Some(ref cache) = self.cache {
394                cache.insert(
395                    self.file_id,
396                    offset,
397                    CachedBlock {
398                        data: block_data.to_vec(),
399                        block_type,
400                        decompressed: decompressed.clone(),
401                    },
402                );
403            }
404        }
405
406        Ok(decompressed)
407    }
408
409    /// Create an iterator over all entries
410    pub fn iter(&self) -> SSTableIterator {
411        SSTableIterator::new(self)
412    }
413
414    /// Create a range iterator
415    pub fn range(
416        &self,
417        start: Option<&[u8]>,
418        end: Option<&[u8]>,
419    ) -> RangeIterator {
420        RangeIterator::new(self, start, end)
421    }
422
423    /// Get table metadata
424    pub fn metadata(&self) -> &TableMetadata {
425        &self.metadata
426    }
427
428    /// Get file path
429    pub fn path(&self) -> &Path {
430        &self.path
431    }
432
433    /// Get number of data blocks
434    pub fn num_blocks(&self) -> usize {
435        self.index_entries.len()
436    }
437
438    /// Check if key might exist (using filter)
439    pub fn may_contain(&self, key: &[u8]) -> bool {
440        self.filter
441            .as_ref()
442            .map(|f| f.may_contain(key))
443            .unwrap_or(true)
444    }
445}
446
447/// Iterator over all entries in an SSTable
448///
449/// Iterates through all data blocks sequentially, yielding every key-value
450/// entry. Loads each block, uses the proven `BlockIterator` to collect entries, 
451/// then advances through them. This avoids self-referential borrows while 
452/// reusing the correct prefix-decompression logic in `BlockIterator`.
453pub struct SSTableIterator<'a> {
454    table: &'a SSTable,
455    /// Current block index into `table.index_entries`
456    block_idx: usize,
457    /// Collected entries from current block: (key, value)
458    entries: Vec<(Vec<u8>, Vec<u8>)>,
459    /// Current entry index within `entries`
460    entry_idx: usize,
461    /// Read options
462    options: ReadOptions,
463    /// Is iterator positioned on a valid entry?
464    valid: bool,
465}
466
467impl<'a> SSTableIterator<'a> {
468    fn new(table: &'a SSTable) -> Self {
469        let mut iter = Self {
470            table,
471            block_idx: 0,
472            entries: Vec::new(),
473            entry_idx: 0,
474            options: ReadOptions::default(),
475            valid: false,
476        };
477        iter.load_block(0);
478        iter
479    }
480
481    /// Load block at `block_idx`, collect all entries via `BlockIterator`,
482    /// and position on the first entry.
483    fn load_block(&mut self, block_idx: usize) {
484        self.block_idx = block_idx;
485        self.entries.clear();
486        self.entry_idx = 0;
487        self.valid = false;
488
489        while self.block_idx < self.table.index_entries.len() {
490            let handle = &self.table.index_entries[self.block_idx].handle;
491            match self.table.read_block(handle, &self.options) {
492                Ok(data) => {
493                    if let Some(block) = Block::new(data) {
494                        // Use BlockIterator to collect all entries
495                        let mut bi = block.iter();
496                        while bi.valid() {
497                            self.entries.push((
498                                bi.key().to_vec(),
499                                bi.value().to_vec(),
500                            ));
501                            bi.next();
502                        }
503                        if !self.entries.is_empty() {
504                            self.entry_idx = 0;
505                            self.valid = true;
506                            return;
507                        }
508                        // Block had no entries — try next
509                    }
510                }
511                Err(_) => {
512                    // I/O error — skip this block
513                }
514            }
515            self.block_idx += 1;
516        }
517    }
518
519    /// Check if iterator is valid
520    pub fn valid(&self) -> bool {
521        self.valid
522    }
523
524    /// Get current key (only valid when `valid() == true`)
525    pub fn key(&self) -> Option<&[u8]> {
526        if self.valid {
527            Some(&self.entries[self.entry_idx].0)
528        } else {
529            None
530        }
531    }
532
533    /// Get current value (only valid when `valid() == true`)
534    pub fn value(&self) -> Option<&[u8]> {
535        if self.valid {
536            Some(&self.entries[self.entry_idx].1)
537        } else {
538            None
539        }
540    }
541
542    /// Advance to the next entry. If the current block is exhausted,
543    /// loads the next block and positions on its first entry.
544    pub fn next(&mut self) {
545        if !self.valid {
546            return;
547        }
548
549        self.entry_idx += 1;
550        if self.entry_idx < self.entries.len() {
551            return; // still within current block
552        }
553
554        // Current block exhausted — move to next block
555        self.load_block(self.block_idx + 1);
556    }
557
558    /// Seek to the first entry with key >= `target`.
559    pub fn seek(&mut self, target: &[u8]) {
560        // Binary search to find starting block
561        let start_block = self.table.find_block_for_key(target);
562        self.load_block(start_block);
563
564        // Scan forward until key >= target
565        while self.valid {
566            if self.entries[self.entry_idx].0.as_slice() >= target {
567                return;
568            }
569            self.next();
570        }
571    }
572
573    /// Seek to the very first entry in the SSTable.
574    pub fn seek_to_first(&mut self) {
575        self.load_block(0);
576    }
577}
578
579/// Range iterator over entries in [start, end) of an SSTable.
580///
581/// Uses `SSTableIterator` internally, adding upper-bound checking.
582pub struct RangeIterator<'a> {
583    inner: SSTableIterator<'a>,
584    end: Option<Vec<u8>>,
585    exhausted: bool,
586}
587
588impl<'a> RangeIterator<'a> {
589    fn new(table: &'a SSTable, start: Option<&[u8]>, end: Option<&[u8]>) -> Self {
590        let mut inner = SSTableIterator::new(table);
591
592        // Seek to start if provided
593        if let Some(start_key) = start {
594            inner.seek(start_key);
595        }
596
597        let mut ri = Self {
598            inner,
599            end: end.map(|e| e.to_vec()),
600            exhausted: false,
601        };
602
603        // Check if first entry is already beyond end
604        ri.check_end();
605        ri
606    }
607
608    /// Check if current key >= end bound; if so, mark exhausted.
609    fn check_end(&mut self) {
610        if self.exhausted {
611            return;
612        }
613        if !self.inner.valid() {
614            self.exhausted = true;
615            return;
616        }
617        if let Some(ref end_key) = self.end {
618            if let Some(key) = self.inner.key() {
619                if key >= end_key.as_slice() {
620                    self.exhausted = true;
621                }
622            }
623        }
624    }
625
626    /// Check if range is exhausted
627    pub fn exhausted(&self) -> bool {
628        self.exhausted
629    }
630
631    /// Check if iterator is valid (positioned on an entry within bounds)
632    pub fn valid(&self) -> bool {
633        !self.exhausted && self.inner.valid()
634    }
635
636    /// Get current key
637    pub fn key(&self) -> Option<&[u8]> {
638        if self.exhausted { None } else { self.inner.key() }
639    }
640
641    /// Get current value
642    pub fn value(&self) -> Option<&[u8]> {
643        if self.exhausted { None } else { self.inner.value() }
644    }
645
646    /// Advance to next entry within the range
647    pub fn next(&mut self) {
648        if self.exhausted {
649            return;
650        }
651        self.inner.next();
652        self.check_end();
653    }
654}
655
656// =============================================================================
657// Tests
658// =============================================================================
659
660#[cfg(test)]
661mod tests {
662    use super::*;
663    use crate::sstable::builder::{SSTableBuilder, SSTableBuilderOptions};
664    use tempfile::tempdir;
665
666    #[test]
667    fn test_roundtrip() {
668        let dir = tempdir().unwrap();
669        let path = dir.path().join("test.sst");
670
671        // Build SSTable
672        let options = SSTableBuilderOptions {
673            block_size: 256,
674            filter_policy: None,
675            ..Default::default()
676        };
677
678        let mut builder = SSTableBuilder::new(&path, options).unwrap();
679
680        for i in 0..100 {
681            let key = format!("key{:05}", i);
682            let value = format!("value{:05}", i);
683            builder.add(key.as_bytes(), value.as_bytes()).unwrap();
684        }
685
686        builder.finish().unwrap();
687
688        // Read SSTable
689        let table = SSTable::open(&path).unwrap();
690
691        assert_eq!(table.num_blocks(), table.metadata.num_data_blocks);
692    }
693
694    #[test]
695    fn test_get() {
696        let dir = tempdir().unwrap();
697        let path = dir.path().join("test_get.sst");
698
699        let options = SSTableBuilderOptions {
700            block_size: 256,
701            filter_policy: None,
702            ..Default::default()
703        };
704
705        let mut builder = SSTableBuilder::new(&path, options).unwrap();
706
707        for i in 0..100 {
708            let key = format!("key{:05}", i);
709            let value = format!("value{:05}", i);
710            builder.add(key.as_bytes(), value.as_bytes()).unwrap();
711        }
712
713        builder.finish().unwrap();
714
715        let table = SSTable::open(&path).unwrap();
716        let read_opts = ReadOptions::default();
717
718        // Test existing key
719        let result = table.get(b"key00050", &read_opts).unwrap();
720        assert!(result.is_some());
721        assert_eq!(result.unwrap(), b"value00050");
722
723        // Test non-existing key
724        let result = table.get(b"nonexistent", &read_opts).unwrap();
725        assert!(result.is_none());
726    }
727
728    #[test]
729    fn test_block_cache() {
730        let cache = BlockCache::new(100);
731
732        let block = CachedBlock {
733            data: vec![1, 2, 3],
734            block_type: BlockType::Uncompressed,
735            decompressed: vec![1, 2, 3],
736        };
737
738        cache.insert(1, 0, block);
739
740        let cached = cache.get(1, 0);
741        assert!(cached.is_some());
742        assert_eq!(cached.unwrap().data, vec![1, 2, 3]);
743
744        let missing = cache.get(1, 100);
745        assert!(missing.is_none());
746    }
747
748    #[test]
749    fn test_sstable_iterator_full_scan() {
750        let dir = tempdir().unwrap();
751        let path = dir.path().join("test_iter.sst");
752
753        // Use small blocks to force multiple blocks
754        let options = SSTableBuilderOptions {
755            block_size: 64,
756            filter_policy: None,
757            ..Default::default()
758        };
759
760        let mut builder = SSTableBuilder::new(&path, options).unwrap();
761
762        let n = 200;
763        for i in 0..n {
764            let key = format!("key{:05}", i);
765            let value = format!("value{:05}", i);
766            builder.add(key.as_bytes(), value.as_bytes()).unwrap();
767        }
768
769        builder.finish().unwrap();
770
771        let table = SSTable::open(&path).unwrap();
772        eprintln!("num_blocks = {}", table.num_blocks());
773        assert!(table.num_blocks() > 1, "Need multiple blocks for this test");
774
775        // Full iteration should return all entries in order
776        let mut iter = table.iter();
777        let mut count = 0;
778        let mut prev_key: Option<Vec<u8>> = None;
779
780        while iter.valid() {
781            let key = iter.key().unwrap().to_vec();
782            let value = iter.value().unwrap().to_vec();
783
784            let expected_key = format!("key{:05}", count);
785            let expected_val = format!("value{:05}", count);
786            assert_eq!(
787                String::from_utf8_lossy(&key),
788                expected_key,
789                "key mismatch at entry {}",
790                count
791            );
792            assert_eq!(
793                String::from_utf8_lossy(&value),
794                expected_val,
795                "value mismatch at entry {}",
796                count
797            );
798
799            // Keys must be strictly increasing
800            if let Some(ref pk) = prev_key {
801                assert!(key > *pk, "keys not in order at {}", count);
802            }
803            prev_key = Some(key);
804
805            count += 1;
806            iter.next();
807        }
808
809        assert_eq!(count, n, "iterator did not return all {} entries", n);
810    }
811
812    #[test]
813    fn test_sstable_iterator_seek() {
814        let dir = tempdir().unwrap();
815        let path = dir.path().join("test_seek.sst");
816
817        let options = SSTableBuilderOptions {
818            block_size: 64,
819            filter_policy: None,
820            ..Default::default()
821        };
822
823        let mut builder = SSTableBuilder::new(&path, options).unwrap();
824
825        for i in (0..100).step_by(2) {
826            let key = format!("key{:05}", i);
827            let value = format!("value{:05}", i);
828            builder.add(key.as_bytes(), value.as_bytes()).unwrap();
829        }
830
831        builder.finish().unwrap();
832
833        let table = SSTable::open(&path).unwrap();
834
835        // Seek to exact key
836        let mut iter = table.iter();
837        iter.seek(b"key00010");
838        assert!(iter.valid());
839        assert_eq!(iter.key().unwrap(), b"key00010");
840
841        // Seek to key between entries (should land on next key)
842        iter.seek(b"key00011");
843        assert!(iter.valid());
844        assert_eq!(iter.key().unwrap(), b"key00012");
845
846        // Seek past end
847        iter.seek(b"key99999");
848        assert!(!iter.valid());
849
850        // Seek to first
851        iter.seek_to_first();
852        assert!(iter.valid());
853        assert_eq!(iter.key().unwrap(), b"key00000");
854    }
855
856    #[test]
857    fn test_range_iterator() {
858        let dir = tempdir().unwrap();
859        let path = dir.path().join("test_range.sst");
860
861        let options = SSTableBuilderOptions {
862            block_size: 64,
863            filter_policy: None,
864            ..Default::default()
865        };
866
867        let mut builder = SSTableBuilder::new(&path, options).unwrap();
868
869        for i in 0..100 {
870            let key = format!("key{:05}", i);
871            let value = format!("value{:05}", i);
872            builder.add(key.as_bytes(), value.as_bytes()).unwrap();
873        }
874
875        builder.finish().unwrap();
876
877        let table = SSTable::open(&path).unwrap();
878
879        // Range [key00010, key00020)
880        let mut range = table.range(Some(b"key00010"), Some(b"key00020"));
881        let mut count = 0;
882
883        while range.valid() {
884            let key = range.key().unwrap();
885            assert!(key >= b"key00010".as_slice());
886            assert!(key < b"key00020".as_slice());
887            count += 1;
888            range.next();
889        }
890
891        assert_eq!(count, 10, "expected 10 keys in range [10, 20)");
892        assert!(range.exhausted());
893
894        // Full range (no bounds)
895        let mut range = table.range(None, None);
896        let mut total = 0;
897        while range.valid() {
898            total += 1;
899            range.next();
900        }
901        assert_eq!(total, 100);
902    }
903}