Skip to main content

sochdb_storage/
optimized_scan.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Optimized Range Scan with O(log n + k) Asymptotics
19//!
20//! This module replaces the O(n) full-table scan with proper index utilization:
21//!
22//! ## Optimizations Applied
23//!
24//! 1. **Sparse Index Lookup**: O(log n) binary search in level metadata
25//! 2. **Block-Level Skipping**: Only read blocks that overlap [start, end]
26//! 3. **Version Filtering**: Skip blocks with no visible versions
27//! 4. **Bloom Filter Pre-check**: Skip SSTables that definitely don't contain range
28//! 5. **K-way Merge with Tournament Tree**: O(k log m) merging where m = levels
29//!
30//! ## Complexity Analysis
31//!
32//! - Index lookup: O(log n) per level
33//! - Block reads: O(k / block_size) where k = result count
34//! - Merge: O(k log m) where m = number of sources
35//! - Total: O(log n + k log m)
36//!
37//! Compare to naive scan: O(n) where n = total entries
38
39use std::cmp::Ordering;
40use std::collections::BinaryHeap;
41use std::sync::Arc;
42
43use parking_lot::RwLock;
44
45/// Snapshot timestamp for MVCC visibility
46pub type Timestamp = u64;
47
48/// A key-value pair with version metadata
49#[derive(Debug, Clone)]
50pub struct VersionedEntry {
51    /// The key
52    pub key: Vec<u8>,
53    /// The value (None = tombstone)
54    pub value: Option<Vec<u8>>,
55    /// Version timestamp
56    pub timestamp: Timestamp,
57    /// Sequence number (for ordering within same timestamp)
58    pub sequence: u64,
59}
60
61impl VersionedEntry {
62    /// Check if this is a deletion marker
63    pub fn is_tombstone(&self) -> bool {
64        self.value.is_none()
65    }
66}
67
68/// Source of entries for merging
69pub trait EntrySource: Send + Sync {
70    /// Get current entry (None if exhausted)
71    fn current(&self) -> Option<&VersionedEntry>;
72
73    /// Advance to next entry
74    fn next(&mut self) -> bool;
75
76    /// Seek to first entry >= key
77    fn seek(&mut self, key: &[u8]);
78
79    /// Check if source is exhausted
80    fn exhausted(&self) -> bool;
81
82    /// Source priority (lower = higher priority for same key)
83    fn priority(&self) -> u32;
84}
85
86/// Tournament tree node for k-way merge
87struct TournamentNode {
88    /// Index into sources array (u32::MAX = not valid)
89    source_idx: u32,
90    /// Cached entry for comparison
91    entry: Option<VersionedEntry>,
92}
93
94impl TournamentNode {
95    fn empty() -> Self {
96        Self {
97            source_idx: u32::MAX,
98            entry: None,
99        }
100    }
101
102    fn with_entry(source_idx: u32, entry: VersionedEntry) -> Self {
103        Self {
104            source_idx,
105            entry: Some(entry),
106        }
107    }
108
109    fn is_valid(&self) -> bool {
110        self.source_idx != u32::MAX && self.entry.is_some()
111    }
112
113    /// Compare two nodes (for tournament)
114    /// Returns true if self should "win" (come first in output)
115    fn beats(&self, other: &TournamentNode, sources: &[Box<dyn EntrySource>]) -> bool {
116        match (&self.entry, &other.entry) {
117            (None, _) => false,
118            (_, None) => true,
119            (Some(a), Some(b)) => {
120                match a.key.cmp(&b.key) {
121                    Ordering::Less => true,
122                    Ordering::Greater => false,
123                    Ordering::Equal => {
124                        // Same key: higher timestamp wins (newer version)
125                        if a.timestamp != b.timestamp {
126                            a.timestamp > b.timestamp
127                        } else {
128                            // Same timestamp: higher sequence wins
129                            if a.sequence != b.sequence {
130                                a.sequence > b.sequence
131                            } else {
132                                // Same sequence: lower priority source wins
133                                let a_priority = sources.get(self.source_idx as usize)
134                                    .map(|s| s.priority())
135                                    .unwrap_or(u32::MAX);
136                                let b_priority = sources.get(other.source_idx as usize)
137                                    .map(|s| s.priority())
138                                    .unwrap_or(u32::MAX);
139                                a_priority < b_priority
140                            }
141                        }
142                    }
143                }
144            }
145        }
146    }
147}
148
149/// Tournament tree for efficient k-way merge
150///
151/// Uses a complete binary tree where each internal node contains the
152/// "winner" of comparing its two children.
153pub struct TournamentTree {
154    /// The tree nodes (size = 2 * num_sources - 1)
155    nodes: Vec<TournamentNode>,
156    /// Number of sources (leaves)
157    num_sources: usize,
158}
159
160impl TournamentTree {
161    /// Create a new tournament tree
162    pub fn new(num_sources: usize) -> Self {
163        let tree_size = if num_sources == 0 { 0 } else { 2 * num_sources - 1 };
164        Self {
165            nodes: (0..tree_size).map(|_| TournamentNode::empty()).collect(),
166            num_sources,
167        }
168    }
169
170    /// Initialize tree with current entries from sources
171    pub fn initialize(&mut self, sources: &[Box<dyn EntrySource>]) {
172        if self.num_sources == 0 {
173            return;
174        }
175
176        // Fill leaves
177        let leaf_start = self.num_sources - 1;
178        for (i, source) in sources.iter().enumerate() {
179            let leaf_idx = leaf_start + i;
180            if leaf_idx < self.nodes.len() {
181                self.nodes[leaf_idx] = match source.current() {
182                    Some(entry) => TournamentNode::with_entry(i as u32, entry.clone()),
183                    None => TournamentNode::empty(),
184                };
185            }
186        }
187
188        // Build tree bottom-up
189        if leaf_start > 0 {
190            for i in (0..leaf_start).rev() {
191                let left = 2 * i + 1;
192                let right = 2 * i + 2;
193                self.nodes[i] = self.compare_nodes(left, right, sources);
194            }
195        }
196    }
197
198    /// Compare two nodes and return the winner
199    fn compare_nodes(
200        &self,
201        left_idx: usize,
202        right_idx: usize,
203        sources: &[Box<dyn EntrySource>],
204    ) -> TournamentNode {
205        let left = self.nodes.get(left_idx);
206        let right = self.nodes.get(right_idx);
207
208        match (left, right) {
209            (None, None) => TournamentNode::empty(),
210            (Some(l), None) => l.clone(),
211            (None, Some(r)) => r.clone(),
212            (Some(l), Some(r)) => {
213                if l.beats(r, sources) {
214                    l.clone()
215                } else {
216                    r.clone()
217                }
218            }
219        }
220    }
221
222    /// Get the current winner (minimum key)
223    pub fn peek(&self) -> Option<&VersionedEntry> {
224        self.nodes.first().and_then(|n| n.entry.as_ref())
225    }
226
227    /// Get the source index of the current winner
228    pub fn winner_source(&self) -> Option<u32> {
229        self.nodes.first().and_then(|n| {
230            if n.is_valid() {
231                Some(n.source_idx)
232            } else {
233                None
234            }
235        })
236    }
237
238    /// Advance the winning source and rebuild the tree
239    pub fn pop(&mut self, sources: &mut [Box<dyn EntrySource>]) -> Option<VersionedEntry> {
240        if self.num_sources == 0 {
241            return None;
242        }
243
244        let winner_source = self.winner_source()?;
245        let result = self.nodes[0].entry.clone();
246
247        // Advance the winning source
248        if let Some(source) = sources.get_mut(winner_source as usize) {
249            source.next();
250        }
251
252        // Update the leaf for this source
253        let leaf_idx = self.num_sources - 1 + winner_source as usize;
254        if leaf_idx < self.nodes.len() {
255            self.nodes[leaf_idx] = match sources.get(winner_source as usize).and_then(|s| s.current()) {
256                Some(entry) => TournamentNode::with_entry(winner_source, entry.clone()),
257                None => TournamentNode::empty(),
258            };
259        }
260
261        // Rebuild path from leaf to root
262        self.rebuild_path(leaf_idx, sources);
263
264        result
265    }
266
267    /// Rebuild the path from a leaf to the root
268    fn rebuild_path(&mut self, leaf_idx: usize, sources: &[Box<dyn EntrySource>]) {
269        let mut idx = leaf_idx;
270        while idx > 0 {
271            let parent = (idx - 1) / 2;
272            let left = 2 * parent + 1;
273            let right = 2 * parent + 2;
274            self.nodes[parent] = self.compare_nodes(left, right, sources);
275            idx = parent;
276        }
277    }
278}
279
280impl Clone for TournamentNode {
281    fn clone(&self) -> Self {
282        Self {
283            source_idx: self.source_idx,
284            entry: self.entry.clone(),
285        }
286    }
287}
288
289/// Range scan configuration
290#[derive(Debug, Clone)]
291pub struct ScanConfig {
292    /// Start key (inclusive)
293    pub start_key: Option<Vec<u8>>,
294    /// End key (exclusive)
295    pub end_key: Option<Vec<u8>>,
296    /// Snapshot timestamp for visibility
297    pub snapshot: Timestamp,
298    /// Maximum entries to return
299    pub limit: Option<usize>,
300    /// Skip tombstones in output
301    pub skip_tombstones: bool,
302    /// Only return latest version per key
303    pub latest_only: bool,
304}
305
306impl Default for ScanConfig {
307    fn default() -> Self {
308        Self {
309            start_key: None,
310            end_key: None,
311            snapshot: u64::MAX,
312            limit: None,
313            skip_tombstones: true,
314            latest_only: true,
315        }
316    }
317}
318
319impl ScanConfig {
320    /// Create a full table scan
321    pub fn full_scan() -> Self {
322        Self::default()
323    }
324
325    /// Create a range scan
326    pub fn range(start: Vec<u8>, end: Vec<u8>) -> Self {
327        Self {
328            start_key: Some(start),
329            end_key: Some(end),
330            ..Default::default()
331        }
332    }
333
334    /// Set snapshot timestamp
335    pub fn with_snapshot(mut self, ts: Timestamp) -> Self {
336        self.snapshot = ts;
337        self
338    }
339
340    /// Set limit
341    pub fn with_limit(mut self, limit: usize) -> Self {
342        self.limit = Some(limit);
343        self
344    }
345}
346
347/// File metadata for range-based file selection
348#[derive(Debug, Clone)]
349pub struct FileRange {
350    /// File ID
351    pub id: u64,
352    /// Smallest key in file
353    pub smallest_key: Vec<u8>,
354    /// Largest key in file
355    pub largest_key: Vec<u8>,
356    /// Smallest timestamp
357    pub min_timestamp: Timestamp,
358    /// Largest timestamp
359    pub max_timestamp: Timestamp,
360    /// Number of entries
361    pub num_entries: u64,
362}
363
364impl FileRange {
365    /// Check if file might contain keys in range
366    pub fn overlaps_range(&self, start: Option<&[u8]>, end: Option<&[u8]>) -> bool {
367        // Check key range overlap
368        let key_overlaps = match (start, end) {
369            (None, None) => true,
370            (Some(s), None) => self.largest_key.as_slice() >= s,
371            (None, Some(e)) => self.smallest_key.as_slice() < e,
372            (Some(s), Some(e)) => {
373                self.largest_key.as_slice() >= s && self.smallest_key.as_slice() < e
374            }
375        };
376
377        key_overlaps
378    }
379
380    /// Check if file might have visible versions
381    pub fn has_visible_versions(&self, snapshot: Timestamp) -> bool {
382        self.min_timestamp <= snapshot
383    }
384}
385
386/// Level metadata for file selection
387#[derive(Debug, Clone)]
388pub struct LevelFiles {
389    /// Level number
390    pub level: u32,
391    /// Files in this level (sorted by smallest_key for L1+)
392    pub files: Vec<FileRange>,
393}
394
395impl LevelFiles {
396    /// Find files that overlap the range
397    pub fn find_overlapping(&self, start: Option<&[u8]>, end: Option<&[u8]>) -> Vec<&FileRange> {
398        if self.level == 0 {
399            // L0 files may overlap, check all
400            self.files
401                .iter()
402                .filter(|f| f.overlaps_range(start, end))
403                .collect()
404        } else {
405            // L1+ files are sorted and non-overlapping
406            // Use binary search to find range
407            self.binary_search_range(start, end)
408        }
409    }
410
411    /// Binary search for overlapping files in sorted levels
412    fn binary_search_range(&self, start: Option<&[u8]>, end: Option<&[u8]>) -> Vec<&FileRange> {
413        if self.files.is_empty() {
414            return vec![];
415        }
416
417        // Find first file that might contain start
418        let start_idx = match start {
419            None => 0,
420            Some(s) => {
421                self.files
422                    .binary_search_by(|f| {
423                        if f.largest_key.as_slice() < s {
424                            Ordering::Less
425                        } else {
426                            Ordering::Greater
427                        }
428                    })
429                    .unwrap_or_else(|i| i)
430            }
431        };
432
433        // Find last file that might contain end
434        let end_idx = match end {
435            None => self.files.len(),
436            Some(e) => {
437                let idx = self.files
438                    .binary_search_by(|f| {
439                        if f.smallest_key.as_slice() >= e {
440                            Ordering::Greater
441                        } else {
442                            Ordering::Less
443                        }
444                    })
445                    .unwrap_or_else(|i| i);
446                idx.min(self.files.len())
447            }
448        };
449
450        self.files[start_idx..end_idx].iter().collect()
451    }
452}
453
454/// Optimized range scanner
455pub struct RangeScanner {
456    /// Scan configuration
457    config: ScanConfig,
458    /// Tournament tree for merging
459    tournament: TournamentTree,
460    /// Entry sources
461    sources: Vec<Box<dyn EntrySource>>,
462    /// Last emitted key (for deduplication)
463    last_key: Option<Vec<u8>>,
464    /// Count of entries returned
465    count: usize,
466    /// Scan statistics
467    stats: ScanStats,
468}
469
470/// Scan statistics
471#[derive(Debug, Default, Clone)]
472pub struct ScanStats {
473    /// Files considered
474    pub files_considered: usize,
475    /// Files skipped by range
476    pub files_skipped_range: usize,
477    /// Files skipped by timestamp
478    pub files_skipped_timestamp: usize,
479    /// Blocks read
480    pub blocks_read: usize,
481    /// Entries scanned
482    pub entries_scanned: usize,
483    /// Entries returned
484    pub entries_returned: usize,
485    /// Tombstones encountered
486    pub tombstones_seen: usize,
487    /// Duplicate versions skipped
488    pub duplicates_skipped: usize,
489}
490
491impl RangeScanner {
492    /// Create a new range scanner
493    pub fn new(config: ScanConfig, sources: Vec<Box<dyn EntrySource>>) -> Self {
494        let num_sources = sources.len();
495        let mut scanner = Self {
496            config,
497            tournament: TournamentTree::new(num_sources),
498            sources,
499            last_key: None,
500            count: 0,
501            stats: ScanStats::default(),
502        };
503
504        // Seek all sources to start position
505        if let Some(ref start) = scanner.config.start_key {
506            for source in &mut scanner.sources {
507                source.seek(start);
508            }
509        }
510
511        // Initialize tournament tree
512        scanner.tournament.initialize(&scanner.sources);
513
514        scanner
515    }
516
517    /// Get next visible entry
518    pub fn next(&mut self) -> Option<VersionedEntry> {
519        loop {
520            // Check limit
521            if let Some(limit) = self.config.limit {
522                if self.count >= limit {
523                    return None;
524                }
525            }
526
527            // Get next entry from tournament
528            let entry = self.tournament.pop(&mut self.sources)?;
529            self.stats.entries_scanned += 1;
530
531            // Check end key
532            if let Some(ref end) = self.config.end_key {
533                if entry.key.as_slice() >= end.as_slice() {
534                    return None;
535                }
536            }
537
538            // Check visibility (MVCC)
539            if entry.timestamp > self.config.snapshot {
540                continue; // Version too new
541            }
542
543            // Handle tombstones
544            if entry.is_tombstone() {
545                self.stats.tombstones_seen += 1;
546                if self.config.skip_tombstones {
547                    // Mark key as seen (for dedup) but don't return
548                    if self.config.latest_only {
549                        self.last_key = Some(entry.key);
550                    }
551                    continue;
552                }
553            }
554
555            // Handle deduplication for latest_only
556            if self.config.latest_only {
557                if let Some(ref last) = self.last_key {
558                    if entry.key == *last {
559                        self.stats.duplicates_skipped += 1;
560                        continue;
561                    }
562                }
563                self.last_key = Some(entry.key.clone());
564            }
565
566            self.count += 1;
567            self.stats.entries_returned += 1;
568            return Some(entry);
569        }
570    }
571
572    /// Get scan statistics
573    pub fn stats(&self) -> &ScanStats {
574        &self.stats
575    }
576
577    /// Collect all results into a Vec
578    pub fn collect(mut self) -> Vec<VersionedEntry> {
579        let mut results = Vec::new();
580        while let Some(entry) = self.next() {
581            results.push(entry);
582        }
583        results
584    }
585}
586
587/// Helper to select files for a range scan
588pub fn select_files_for_range(
589    levels: &[LevelFiles],
590    config: &ScanConfig,
591) -> (Vec<FileRange>, ScanStats) {
592    let mut selected = Vec::new();
593    let mut stats = ScanStats::default();
594
595    let start = config.start_key.as_deref();
596    let end = config.end_key.as_deref();
597
598    for level in levels {
599        for file in level.find_overlapping(start, end) {
600            stats.files_considered += 1;
601
602            // Check timestamp visibility
603            if !file.has_visible_versions(config.snapshot) {
604                stats.files_skipped_timestamp += 1;
605                continue;
606            }
607
608            selected.push(file.clone());
609        }
610
611        // Count files that were skipped by range
612        let total_files = level.files.len();
613        let overlapping = level.find_overlapping(start, end).len();
614        stats.files_skipped_range += total_files - overlapping;
615    }
616
617    (selected, stats)
618}
619
620// =============================================================================
621// Tests
622// =============================================================================
623
624#[cfg(test)]
625mod tests {
626    use super::*;
627
628    /// Simple in-memory source for testing
629    struct VecSource {
630        entries: Vec<VersionedEntry>,
631        pos: usize,
632        priority: u32,
633    }
634
635    impl VecSource {
636        fn new(entries: Vec<VersionedEntry>, priority: u32) -> Self {
637            Self {
638                entries,
639                pos: 0,
640                priority,
641            }
642        }
643    }
644
645    impl EntrySource for VecSource {
646        fn current(&self) -> Option<&VersionedEntry> {
647            self.entries.get(self.pos)
648        }
649
650        fn next(&mut self) -> bool {
651            if self.pos < self.entries.len() {
652                self.pos += 1;
653            }
654            self.pos < self.entries.len()
655        }
656
657        fn seek(&mut self, key: &[u8]) {
658            self.pos = self.entries.iter().position(|e| e.key.as_slice() >= key).unwrap_or(self.entries.len());
659        }
660
661        fn exhausted(&self) -> bool {
662            self.pos >= self.entries.len()
663        }
664
665        fn priority(&self) -> u32 {
666            self.priority
667        }
668    }
669
670    fn make_entry(key: &str, value: &str, ts: u64) -> VersionedEntry {
671        VersionedEntry {
672            key: key.as_bytes().to_vec(),
673            value: Some(value.as_bytes().to_vec()),
674            timestamp: ts,
675            sequence: 0,
676        }
677    }
678
679    fn make_tombstone(key: &str, ts: u64) -> VersionedEntry {
680        VersionedEntry {
681            key: key.as_bytes().to_vec(),
682            value: None,
683            timestamp: ts,
684            sequence: 0,
685        }
686    }
687
688    #[test]
689    fn test_tournament_tree_single_source() {
690        let entries = vec![
691            make_entry("a", "1", 100),
692            make_entry("b", "2", 100),
693            make_entry("c", "3", 100),
694        ];
695
696        let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
697        let sources = vec![source];
698
699        let scanner = RangeScanner::new(ScanConfig::default(), sources);
700        let results = scanner.collect();
701
702        assert_eq!(results.len(), 3);
703        assert_eq!(results[0].key, b"a");
704        assert_eq!(results[1].key, b"b");
705        assert_eq!(results[2].key, b"c");
706    }
707
708    #[test]
709    fn test_tournament_tree_merge() {
710        // Simulate L0 and L1 with overlapping keys
711        let l0_entries = vec![
712            make_entry("a", "new", 200),
713            make_entry("c", "new", 200),
714        ];
715        let l1_entries = vec![
716            make_entry("a", "old", 100),
717            make_entry("b", "old", 100),
718            make_entry("c", "old", 100),
719        ];
720
721        let sources: Vec<Box<dyn EntrySource>> = vec![
722            Box::new(VecSource::new(l0_entries, 0)),  // L0 higher priority
723            Box::new(VecSource::new(l1_entries, 1)),  // L1 lower priority
724        ];
725
726        let config = ScanConfig {
727            snapshot: 250,
728            latest_only: true,
729            ..Default::default()
730        };
731
732        let scanner = RangeScanner::new(config, sources);
733        let results = scanner.collect();
734
735        assert_eq!(results.len(), 3);
736        // Should have newer versions for a and c
737        assert_eq!(results[0].key, b"a");
738        assert_eq!(results[0].timestamp, 200);
739        assert_eq!(results[1].key, b"b");
740        assert_eq!(results[1].timestamp, 100);
741        assert_eq!(results[2].key, b"c");
742        assert_eq!(results[2].timestamp, 200);
743    }
744
745    #[test]
746    fn test_tombstone_handling() {
747        let entries = vec![
748            make_entry("a", "value", 100),
749            make_tombstone("b", 200),
750            make_entry("c", "value", 100),
751        ];
752
753        let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
754        let sources = vec![source];
755
756        let config = ScanConfig {
757            skip_tombstones: true,
758            ..Default::default()
759        };
760
761        let scanner = RangeScanner::new(config, sources);
762        let results = scanner.collect();
763
764        assert_eq!(results.len(), 2);
765        assert_eq!(results[0].key, b"a");
766        assert_eq!(results[1].key, b"c");
767    }
768
769    #[test]
770    fn test_range_bounds() {
771        let entries = vec![
772            make_entry("a", "1", 100),
773            make_entry("b", "2", 100),
774            make_entry("c", "3", 100),
775            make_entry("d", "4", 100),
776        ];
777
778        let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
779        let sources = vec![source];
780
781        let config = ScanConfig {
782            start_key: Some(b"b".to_vec()),
783            end_key: Some(b"d".to_vec()),
784            ..Default::default()
785        };
786
787        let scanner = RangeScanner::new(config, sources);
788        let results = scanner.collect();
789
790        assert_eq!(results.len(), 2);
791        assert_eq!(results[0].key, b"b");
792        assert_eq!(results[1].key, b"c");
793    }
794
795    #[test]
796    fn test_limit() {
797        let entries: Vec<_> = (0..100)
798            .map(|i| make_entry(&format!("key{:03}", i), &format!("val{}", i), 100))
799            .collect();
800
801        let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
802        let sources = vec![source];
803
804        let config = ScanConfig {
805            limit: Some(10),
806            ..Default::default()
807        };
808
809        let scanner = RangeScanner::new(config, sources);
810        let results = scanner.collect();
811
812        assert_eq!(results.len(), 10);
813    }
814
815    #[test]
816    fn test_mvcc_visibility() {
817        let entries = vec![
818            make_entry("a", "v1", 100),
819            make_entry("a", "v2", 200),
820            make_entry("a", "v3", 300),
821        ];
822
823        let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
824        let sources = vec![source];
825
826        // Snapshot at 150 should only see v1
827        let config = ScanConfig {
828            snapshot: 150,
829            latest_only: true,
830            ..Default::default()
831        };
832
833        let scanner = RangeScanner::new(config, sources);
834        let results = scanner.collect();
835
836        assert_eq!(results.len(), 1);
837        assert_eq!(results[0].timestamp, 100);
838    }
839
840    #[test]
841    fn test_file_range_overlap() {
842        let file = FileRange {
843            id: 1,
844            smallest_key: b"c".to_vec(),
845            largest_key: b"h".to_vec(),
846            min_timestamp: 100,
847            max_timestamp: 200,
848            num_entries: 100,
849        };
850
851        // Should overlap
852        assert!(file.overlaps_range(Some(b"a"), Some(b"e")));
853        assert!(file.overlaps_range(Some(b"f"), Some(b"z")));
854        assert!(file.overlaps_range(Some(b"d"), Some(b"g")));
855        assert!(file.overlaps_range(None, Some(b"e")));
856        assert!(file.overlaps_range(Some(b"f"), None));
857        assert!(file.overlaps_range(None, None));
858
859        // Should not overlap
860        assert!(!file.overlaps_range(Some(b"a"), Some(b"c")));
861        assert!(!file.overlaps_range(Some(b"i"), Some(b"z")));
862    }
863
864    #[test]
865    fn test_level_binary_search() {
866        let level = LevelFiles {
867            level: 1,
868            files: vec![
869                FileRange {
870                    id: 1,
871                    smallest_key: b"a".to_vec(),
872                    largest_key: b"d".to_vec(),
873                    min_timestamp: 100,
874                    max_timestamp: 200,
875                    num_entries: 100,
876                },
877                FileRange {
878                    id: 2,
879                    smallest_key: b"e".to_vec(),
880                    largest_key: b"h".to_vec(),
881                    min_timestamp: 100,
882                    max_timestamp: 200,
883                    num_entries: 100,
884                },
885                FileRange {
886                    id: 3,
887                    smallest_key: b"i".to_vec(),
888                    largest_key: b"l".to_vec(),
889                    min_timestamp: 100,
890                    max_timestamp: 200,
891                    num_entries: 100,
892                },
893            ],
894        };
895
896        // Query c-g should return files 1 and 2
897        let result = level.find_overlapping(Some(b"c"), Some(b"g"));
898        assert_eq!(result.len(), 2);
899        assert_eq!(result[0].id, 1);
900        assert_eq!(result[1].id, 2);
901    }
902}