Skip to main content

sochdb_storage/sstable/
table.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! SSTable Reader
19//!
20//! This module provides an SSTable reader with:
21//! - Memory-mapped I/O for efficient access
22//! - Lazy block loading
23//! - Block cache integration
24//! - Binary search in index for O(log n) lookups
25//! - Filter-based negative lookup optimization
26
27use std::cmp::Ordering;
28use std::collections::HashMap;
29use std::fs::File;
30use std::path::{Path, PathBuf};
31use std::sync::Arc;
32
33use memmap2::{Mmap, MmapOptions};
34use parking_lot::RwLock;
35
36use super::block::{Block, BlockHandle, BlockType};
37use super::filter::FilterReader;
38use super::format::{Footer, HEADER_SIZE, Header, SectionType};
39
40/// Block cache entry
41pub struct CachedBlock {
42    /// Raw block data
43    pub data: Vec<u8>,
44    /// Block type (compression)
45    pub block_type: BlockType,
46    /// Decompressed data (if applicable)
47    pub decompressed: Vec<u8>,
48}
49
50/// Simple block cache (HashMap-based for simplicity)
51pub struct BlockCache {
52    /// Cache entries by (file_id, block_offset)
53    entries: RwLock<HashMap<(u64, u64), Arc<CachedBlock>>>,
54    /// Maximum capacity
55    capacity: usize,
56}
57
58impl BlockCache {
59    /// Create a new block cache
60    pub fn new(capacity: usize) -> Self {
61        Self {
62            entries: RwLock::new(HashMap::with_capacity(capacity)),
63            capacity,
64        }
65    }
66
67    /// Get a cached block
68    pub fn get(&self, file_id: u64, offset: u64) -> Option<Arc<CachedBlock>> {
69        self.entries.read().get(&(file_id, offset)).cloned()
70    }
71
72    /// Insert a block into cache
73    pub fn insert(&self, file_id: u64, offset: u64, block: CachedBlock) -> Arc<CachedBlock> {
74        let block = Arc::new(block);
75        let mut entries = self.entries.write();
76
77        // Simple eviction: clear when full
78        if entries.len() >= self.capacity {
79            entries.clear();
80        }
81
82        entries.insert((file_id, offset), block.clone());
83        block
84    }
85}
86
87/// Read options
88#[derive(Debug, Clone)]
89pub struct ReadOptions {
90    /// Verify checksums when reading blocks
91    pub verify_checksums: bool,
92    /// Fill block cache
93    pub fill_cache: bool,
94    /// Use filter to skip blocks
95    pub use_filter: bool,
96}
97
98impl Default for ReadOptions {
99    fn default() -> Self {
100        Self {
101            verify_checksums: true,
102            fill_cache: true,
103            use_filter: true,
104        }
105    }
106}
107
108/// SSTable reader for reading SSTable files
109pub struct SSTable {
110    /// File path
111    path: PathBuf,
112    /// Unique file ID for caching
113    file_id: u64,
114    /// Memory-mapped file
115    mmap: Mmap,
116    /// Parsed header
117    header: Header,
118    /// Parsed footer with sections
119    footer: Footer,
120    /// Index block (cached)
121    index: Vec<u8>,
122    /// Parsed index entries
123    index_entries: Vec<IndexEntry>,
124    /// Filter reader (if filter section exists)
125    filter: Option<FilterReader>,
126    /// File metadata
127    metadata: TableMetadata,
128    /// Block cache reference
129    cache: Option<Arc<BlockCache>>,
130}
131
132/// Index entry
133#[derive(Debug, Clone)]
134struct IndexEntry {
135    /// Largest key in this block (separator)
136    largest_key: Vec<u8>,
137    /// Block handle
138    handle: BlockHandle,
139}
140
141/// Table metadata
142#[derive(Debug, Clone)]
143pub struct TableMetadata {
144    /// File size
145    pub file_size: u64,
146    /// Number of data blocks
147    pub num_data_blocks: usize,
148    /// Smallest key
149    pub smallest_key: Option<Vec<u8>>,
150    /// Largest key
151    pub largest_key: Option<Vec<u8>>,
152}
153
154impl SSTable {
155    /// Open an SSTable file
156    pub fn open<P: AsRef<Path>>(path: P) -> std::io::Result<Self> {
157        Self::open_with_cache(path, None)
158    }
159
160    /// Open an SSTable file with a block cache
161    pub fn open_with_cache<P: AsRef<Path>>(
162        path: P,
163        cache: Option<Arc<BlockCache>>,
164    ) -> std::io::Result<Self> {
165        let path = path.as_ref();
166        let file = File::open(path)?;
167        let file_size = file.metadata()?.len();
168
169        // Memory-map the file
170        let mmap = unsafe { MmapOptions::new().map(&file)? };
171
172        // Generate file ID from path hash
173        let file_id = {
174            use std::hash::{Hash, Hasher};
175            let mut hasher = std::collections::hash_map::DefaultHasher::new();
176            path.hash(&mut hasher);
177            hasher.finish()
178        };
179
180        // Parse header
181        if mmap.len() < HEADER_SIZE {
182            return Err(std::io::Error::new(
183                std::io::ErrorKind::InvalidData,
184                "File too small for SSTable header",
185            ));
186        }
187
188        let header = Header::decode(&mmap[..HEADER_SIZE]).ok_or_else(|| {
189            std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid SSTable header")
190        })?;
191
192        // Parse footer
193        let footer_offset = header.footer_offset as usize;
194        if footer_offset >= mmap.len() {
195            return Err(std::io::Error::new(
196                std::io::ErrorKind::InvalidData,
197                "Footer offset beyond file",
198            ));
199        }
200
201        let footer =
202            Footer::decode(&mmap[footer_offset..], header.num_sections).ok_or_else(|| {
203                std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid SSTable footer")
204            })?;
205
206        // Load index section
207        let index_section = footer
208            .sections
209            .iter()
210            .find(|s| s.section_type == SectionType::Index)
211            .ok_or_else(|| {
212                std::io::Error::new(std::io::ErrorKind::InvalidData, "Missing index section")
213            })?;
214
215        let index_start = index_section.offset as usize;
216        let index_end = index_start + index_section.size as usize;
217        let index = mmap[index_start..index_end].to_vec();
218
219        // Parse index entries
220        let index_entries = Self::parse_index(&index)?;
221
222        // Load filter section if present
223        let filter = footer
224            .sections
225            .iter()
226            .find(|s| s.section_type == SectionType::Filter)
227            .and_then(|section| {
228                let start = section.offset as usize;
229                let end = start + section.size as usize;
230                FilterReader::from_bytes(&mmap[start..end])
231            });
232
233        // Extract metadata
234        let metadata = TableMetadata {
235            file_size,
236            num_data_blocks: index_entries.len(),
237            smallest_key: index_entries.first().map(|e| e.largest_key.clone()),
238            largest_key: index_entries.last().map(|e| e.largest_key.clone()),
239        };
240
241        Ok(Self {
242            path: path.to_path_buf(),
243            file_id,
244            mmap,
245            header,
246            footer,
247            index,
248            index_entries,
249            filter,
250            metadata,
251            cache,
252        })
253    }
254
255    /// Parse index entries from index block data
256    fn parse_index(data: &[u8]) -> std::io::Result<Vec<IndexEntry>> {
257        let mut entries = Vec::new();
258        let block = Block::new(data.to_vec()).ok_or_else(|| {
259            std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid index block")
260        })?;
261        let mut iter = block.iter();
262
263        while iter.valid() {
264            let key = iter.key().to_vec();
265            let value = iter.value();
266
267            let (handle, _bytes_read) = BlockHandle::decode(value).ok_or_else(|| {
268                std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid block handle")
269            })?;
270
271            entries.push(IndexEntry {
272                largest_key: key,
273                handle,
274            });
275
276            iter.next();
277        }
278
279        Ok(entries)
280    }
281
282    /// Get a value by key
283    pub fn get(&self, key: &[u8], options: &ReadOptions) -> std::io::Result<Option<Vec<u8>>> {
284        // Use filter to check if key might exist
285        if options.use_filter {
286            if let Some(ref filter) = self.filter {
287                if !filter.may_contain(key) {
288                    return Ok(None);
289                }
290            }
291        }
292
293        // Binary search in index to find the right block
294        let block_idx = self.find_block_for_key(key);
295        if block_idx >= self.index_entries.len() {
296            return Ok(None);
297        }
298
299        // Load and search the block
300        let block_data = self.read_block(&self.index_entries[block_idx].handle, options)?;
301        let block = Block::new(block_data).ok_or_else(|| {
302            std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid data block")
303        })?;
304
305        let iter = block.seek(key);
306        if iter.valid() && iter.key() == key {
307            Ok(Some(iter.value().to_vec()))
308        } else {
309            Ok(None)
310        }
311    }
312
313    /// Binary search to find block that might contain the key
314    fn find_block_for_key(&self, key: &[u8]) -> usize {
315        // Binary search for first block where largest_key >= key
316        self.index_entries
317            .binary_search_by(|entry| {
318                if entry.largest_key.as_slice() < key {
319                    Ordering::Less
320                } else {
321                    Ordering::Greater
322                }
323            })
324            .unwrap_or_else(|i| i)
325    }
326
327    /// Read a block from file
328    fn read_block(&self, handle: &BlockHandle, options: &ReadOptions) -> std::io::Result<Vec<u8>> {
329        let offset = handle.offset();
330        let size = handle.size();
331
332        // Try cache first
333        if let Some(ref cache) = self.cache {
334            if let Some(block) = cache.get(self.file_id, offset) {
335                return Ok(block.decompressed.clone());
336            }
337        }
338
339        // Read from mmap
340        let start = offset as usize;
341        let end = start + size as usize;
342
343        if end + 5 > self.mmap.len() {
344            return Err(std::io::Error::new(
345                std::io::ErrorKind::InvalidData,
346                "Block extends beyond file",
347            ));
348        }
349
350        let block_data = &self.mmap[start..end];
351        let block_type = BlockType::from_u8(self.mmap[end]);
352        let stored_checksum = u32::from_le_bytes([
353            self.mmap[end + 1],
354            self.mmap[end + 2],
355            self.mmap[end + 3],
356            self.mmap[end + 4],
357        ]);
358
359        // Verify checksum if requested
360        if options.verify_checksums {
361            let computed_checksum = crc32fast::hash(block_data);
362            if computed_checksum != stored_checksum {
363                return Err(std::io::Error::new(
364                    std::io::ErrorKind::InvalidData,
365                    "Block checksum mismatch",
366                ));
367            }
368        }
369
370        // Decompress if needed
371        let decompressed = match block_type {
372            BlockType::Uncompressed => block_data.to_vec(),
373            BlockType::Lz4 => lz4_flex::decompress_size_prepended(block_data).map_err(|e| {
374                std::io::Error::new(std::io::ErrorKind::InvalidData, format!("LZ4 error: {}", e))
375            })?,
376            BlockType::Zstd => zstd::decode_all(block_data).map_err(|e| {
377                std::io::Error::new(
378                    std::io::ErrorKind::InvalidData,
379                    format!("Zstd error: {}", e),
380                )
381            })?,
382            BlockType::Snappy => {
383                return Err(std::io::Error::new(
384                    std::io::ErrorKind::InvalidData,
385                    "Snappy not supported",
386                ));
387            }
388        };
389
390        // Cache the block
391        if options.fill_cache {
392            if let Some(ref cache) = self.cache {
393                cache.insert(
394                    self.file_id,
395                    offset,
396                    CachedBlock {
397                        data: block_data.to_vec(),
398                        block_type,
399                        decompressed: decompressed.clone(),
400                    },
401                );
402            }
403        }
404
405        Ok(decompressed)
406    }
407
408    /// Create an iterator over all entries
409    pub fn iter(&self) -> SSTableIterator<'_> {
410        SSTableIterator::new(self)
411    }
412
413    /// Create a range iterator
414    pub fn range(&self, start: Option<&[u8]>, end: Option<&[u8]>) -> RangeIterator<'_> {
415        RangeIterator::new(self, start, end)
416    }
417
418    /// Get table metadata
419    pub fn metadata(&self) -> &TableMetadata {
420        &self.metadata
421    }
422
423    /// Get file path
424    pub fn path(&self) -> &Path {
425        &self.path
426    }
427
428    /// Get number of data blocks
429    pub fn num_blocks(&self) -> usize {
430        self.index_entries.len()
431    }
432
433    /// Check if key might exist (using filter)
434    pub fn may_contain(&self, key: &[u8]) -> bool {
435        self.filter
436            .as_ref()
437            .map(|f| f.may_contain(key))
438            .unwrap_or(true)
439    }
440}
441
442/// Iterator over all entries in an SSTable
443///
444/// Iterates through all data blocks sequentially, yielding every key-value
445/// entry. Loads each block, uses the proven `BlockIterator` to collect entries,
446/// then advances through them. This avoids self-referential borrows while
447/// reusing the correct prefix-decompression logic in `BlockIterator`.
448pub struct SSTableIterator<'a> {
449    table: &'a SSTable,
450    /// Current block index into `table.index_entries`
451    block_idx: usize,
452    /// Collected entries from current block: (key, value)
453    entries: Vec<(Vec<u8>, Vec<u8>)>,
454    /// Current entry index within `entries`
455    entry_idx: usize,
456    /// Read options
457    options: ReadOptions,
458    /// Is iterator positioned on a valid entry?
459    valid: bool,
460}
461
462impl<'a> SSTableIterator<'a> {
463    fn new(table: &'a SSTable) -> Self {
464        let mut iter = Self {
465            table,
466            block_idx: 0,
467            entries: Vec::new(),
468            entry_idx: 0,
469            options: ReadOptions::default(),
470            valid: false,
471        };
472        iter.load_block(0);
473        iter
474    }
475
476    /// Load block at `block_idx`, collect all entries via `BlockIterator`,
477    /// and position on the first entry.
478    fn load_block(&mut self, block_idx: usize) {
479        self.block_idx = block_idx;
480        self.entries.clear();
481        self.entry_idx = 0;
482        self.valid = false;
483
484        while self.block_idx < self.table.index_entries.len() {
485            let handle = &self.table.index_entries[self.block_idx].handle;
486            match self.table.read_block(handle, &self.options) {
487                Ok(data) => {
488                    if let Some(block) = Block::new(data) {
489                        // Use BlockIterator to collect all entries
490                        let mut bi = block.iter();
491                        while bi.valid() {
492                            self.entries.push((bi.key().to_vec(), bi.value().to_vec()));
493                            bi.next();
494                        }
495                        if !self.entries.is_empty() {
496                            self.entry_idx = 0;
497                            self.valid = true;
498                            return;
499                        }
500                        // Block had no entries — try next
501                    }
502                }
503                Err(_) => {
504                    // I/O error — skip this block
505                }
506            }
507            self.block_idx += 1;
508        }
509    }
510
511    /// Check if iterator is valid
512    pub fn valid(&self) -> bool {
513        self.valid
514    }
515
516    /// Get current key (only valid when `valid() == true`)
517    pub fn key(&self) -> Option<&[u8]> {
518        if self.valid {
519            Some(&self.entries[self.entry_idx].0)
520        } else {
521            None
522        }
523    }
524
525    /// Get current value (only valid when `valid() == true`)
526    pub fn value(&self) -> Option<&[u8]> {
527        if self.valid {
528            Some(&self.entries[self.entry_idx].1)
529        } else {
530            None
531        }
532    }
533
534    /// Advance to the next entry. If the current block is exhausted,
535    /// loads the next block and positions on its first entry.
536    pub fn next(&mut self) {
537        if !self.valid {
538            return;
539        }
540
541        self.entry_idx += 1;
542        if self.entry_idx < self.entries.len() {
543            return; // still within current block
544        }
545
546        // Current block exhausted — move to next block
547        self.load_block(self.block_idx + 1);
548    }
549
550    /// Seek to the first entry with key >= `target`.
551    pub fn seek(&mut self, target: &[u8]) {
552        // Binary search to find starting block
553        let start_block = self.table.find_block_for_key(target);
554        self.load_block(start_block);
555
556        // Scan forward until key >= target
557        while self.valid {
558            if self.entries[self.entry_idx].0.as_slice() >= target {
559                return;
560            }
561            self.next();
562        }
563    }
564
565    /// Seek to the very first entry in the SSTable.
566    pub fn seek_to_first(&mut self) {
567        self.load_block(0);
568    }
569}
570
571/// Range iterator over entries in [start, end) of an SSTable.
572///
573/// Uses `SSTableIterator` internally, adding upper-bound checking.
574pub struct RangeIterator<'a> {
575    inner: SSTableIterator<'a>,
576    end: Option<Vec<u8>>,
577    exhausted: bool,
578}
579
580impl<'a> RangeIterator<'a> {
581    fn new(table: &'a SSTable, start: Option<&[u8]>, end: Option<&[u8]>) -> Self {
582        let mut inner = SSTableIterator::new(table);
583
584        // Seek to start if provided
585        if let Some(start_key) = start {
586            inner.seek(start_key);
587        }
588
589        let mut ri = Self {
590            inner,
591            end: end.map(|e| e.to_vec()),
592            exhausted: false,
593        };
594
595        // Check if first entry is already beyond end
596        ri.check_end();
597        ri
598    }
599
600    /// Check if current key >= end bound; if so, mark exhausted.
601    fn check_end(&mut self) {
602        if self.exhausted {
603            return;
604        }
605        if !self.inner.valid() {
606            self.exhausted = true;
607            return;
608        }
609        if let Some(ref end_key) = self.end {
610            if let Some(key) = self.inner.key() {
611                if key >= end_key.as_slice() {
612                    self.exhausted = true;
613                }
614            }
615        }
616    }
617
618    /// Check if range is exhausted
619    pub fn exhausted(&self) -> bool {
620        self.exhausted
621    }
622
623    /// Check if iterator is valid (positioned on an entry within bounds)
624    pub fn valid(&self) -> bool {
625        !self.exhausted && self.inner.valid()
626    }
627
628    /// Get current key
629    pub fn key(&self) -> Option<&[u8]> {
630        if self.exhausted {
631            None
632        } else {
633            self.inner.key()
634        }
635    }
636
637    /// Get current value
638    pub fn value(&self) -> Option<&[u8]> {
639        if self.exhausted {
640            None
641        } else {
642            self.inner.value()
643        }
644    }
645
646    /// Advance to next entry within the range
647    pub fn next(&mut self) {
648        if self.exhausted {
649            return;
650        }
651        self.inner.next();
652        self.check_end();
653    }
654}
655
656// =============================================================================
657// Tests
658// =============================================================================
659
660#[cfg(test)]
661mod tests {
662    use super::*;
663    use crate::sstable::builder::{SSTableBuilder, SSTableBuilderOptions};
664    use tempfile::tempdir;
665
666    #[test]
667    fn test_roundtrip() {
668        let dir = tempdir().unwrap();
669        let path = dir.path().join("test.sst");
670
671        // Build SSTable
672        let options = SSTableBuilderOptions {
673            block_size: 256,
674            filter_policy: None,
675            ..Default::default()
676        };
677
678        let mut builder = SSTableBuilder::new(&path, options).unwrap();
679
680        for i in 0..100 {
681            let key = format!("key{:05}", i);
682            let value = format!("value{:05}", i);
683            builder.add(key.as_bytes(), value.as_bytes()).unwrap();
684        }
685
686        builder.finish().unwrap();
687
688        // Read SSTable
689        let table = SSTable::open(&path).unwrap();
690
691        assert_eq!(table.num_blocks(), table.metadata.num_data_blocks);
692    }
693
694    #[test]
695    fn test_get() {
696        let dir = tempdir().unwrap();
697        let path = dir.path().join("test_get.sst");
698
699        let options = SSTableBuilderOptions {
700            block_size: 256,
701            filter_policy: None,
702            ..Default::default()
703        };
704
705        let mut builder = SSTableBuilder::new(&path, options).unwrap();
706
707        for i in 0..100 {
708            let key = format!("key{:05}", i);
709            let value = format!("value{:05}", i);
710            builder.add(key.as_bytes(), value.as_bytes()).unwrap();
711        }
712
713        builder.finish().unwrap();
714
715        let table = SSTable::open(&path).unwrap();
716        let read_opts = ReadOptions::default();
717
718        // Test existing key
719        let result = table.get(b"key00050", &read_opts).unwrap();
720        assert!(result.is_some());
721        assert_eq!(result.unwrap(), b"value00050");
722
723        // Test non-existing key
724        let result = table.get(b"nonexistent", &read_opts).unwrap();
725        assert!(result.is_none());
726    }
727
728    #[test]
729    fn test_block_cache() {
730        let cache = BlockCache::new(100);
731
732        let block = CachedBlock {
733            data: vec![1, 2, 3],
734            block_type: BlockType::Uncompressed,
735            decompressed: vec![1, 2, 3],
736        };
737
738        cache.insert(1, 0, block);
739
740        let cached = cache.get(1, 0);
741        assert!(cached.is_some());
742        assert_eq!(cached.unwrap().data, vec![1, 2, 3]);
743
744        let missing = cache.get(1, 100);
745        assert!(missing.is_none());
746    }
747
748    #[test]
749    fn test_sstable_iterator_full_scan() {
750        let dir = tempdir().unwrap();
751        let path = dir.path().join("test_iter.sst");
752
753        // Use small blocks to force multiple blocks
754        let options = SSTableBuilderOptions {
755            block_size: 64,
756            filter_policy: None,
757            ..Default::default()
758        };
759
760        let mut builder = SSTableBuilder::new(&path, options).unwrap();
761
762        let n = 200;
763        for i in 0..n {
764            let key = format!("key{:05}", i);
765            let value = format!("value{:05}", i);
766            builder.add(key.as_bytes(), value.as_bytes()).unwrap();
767        }
768
769        builder.finish().unwrap();
770
771        let table = SSTable::open(&path).unwrap();
772        eprintln!("num_blocks = {}", table.num_blocks());
773        assert!(table.num_blocks() > 1, "Need multiple blocks for this test");
774
775        // Full iteration should return all entries in order
776        let mut iter = table.iter();
777        let mut count = 0;
778        let mut prev_key: Option<Vec<u8>> = None;
779
780        while iter.valid() {
781            let key = iter.key().unwrap().to_vec();
782            let value = iter.value().unwrap().to_vec();
783
784            let expected_key = format!("key{:05}", count);
785            let expected_val = format!("value{:05}", count);
786            assert_eq!(
787                String::from_utf8_lossy(&key),
788                expected_key,
789                "key mismatch at entry {}",
790                count
791            );
792            assert_eq!(
793                String::from_utf8_lossy(&value),
794                expected_val,
795                "value mismatch at entry {}",
796                count
797            );
798
799            // Keys must be strictly increasing
800            if let Some(ref pk) = prev_key {
801                assert!(key > *pk, "keys not in order at {}", count);
802            }
803            prev_key = Some(key);
804
805            count += 1;
806            iter.next();
807        }
808
809        assert_eq!(count, n, "iterator did not return all {} entries", n);
810    }
811
812    #[test]
813    fn test_sstable_iterator_seek() {
814        let dir = tempdir().unwrap();
815        let path = dir.path().join("test_seek.sst");
816
817        let options = SSTableBuilderOptions {
818            block_size: 64,
819            filter_policy: None,
820            ..Default::default()
821        };
822
823        let mut builder = SSTableBuilder::new(&path, options).unwrap();
824
825        for i in (0..100).step_by(2) {
826            let key = format!("key{:05}", i);
827            let value = format!("value{:05}", i);
828            builder.add(key.as_bytes(), value.as_bytes()).unwrap();
829        }
830
831        builder.finish().unwrap();
832
833        let table = SSTable::open(&path).unwrap();
834
835        // Seek to exact key
836        let mut iter = table.iter();
837        iter.seek(b"key00010");
838        assert!(iter.valid());
839        assert_eq!(iter.key().unwrap(), b"key00010");
840
841        // Seek to key between entries (should land on next key)
842        iter.seek(b"key00011");
843        assert!(iter.valid());
844        assert_eq!(iter.key().unwrap(), b"key00012");
845
846        // Seek past end
847        iter.seek(b"key99999");
848        assert!(!iter.valid());
849
850        // Seek to first
851        iter.seek_to_first();
852        assert!(iter.valid());
853        assert_eq!(iter.key().unwrap(), b"key00000");
854    }
855
856    #[test]
857    fn test_range_iterator() {
858        let dir = tempdir().unwrap();
859        let path = dir.path().join("test_range.sst");
860
861        let options = SSTableBuilderOptions {
862            block_size: 64,
863            filter_policy: None,
864            ..Default::default()
865        };
866
867        let mut builder = SSTableBuilder::new(&path, options).unwrap();
868
869        for i in 0..100 {
870            let key = format!("key{:05}", i);
871            let value = format!("value{:05}", i);
872            builder.add(key.as_bytes(), value.as_bytes()).unwrap();
873        }
874
875        builder.finish().unwrap();
876
877        let table = SSTable::open(&path).unwrap();
878
879        // Range [key00010, key00020)
880        let mut range = table.range(Some(b"key00010"), Some(b"key00020"));
881        let mut count = 0;
882
883        while range.valid() {
884            let key = range.key().unwrap();
885            assert!(key >= b"key00010".as_slice());
886            assert!(key < b"key00020".as_slice());
887            count += 1;
888            range.next();
889        }
890
891        assert_eq!(count, 10, "expected 10 keys in range [10, 20)");
892        assert!(range.exhausted());
893
894        // Full range (no bounds)
895        let mut range = table.range(None, None);
896        let mut total = 0;
897        while range.valid() {
898            total += 1;
899            range.next();
900        }
901        assert_eq!(total, 100);
902    }
903}