Skip to main content

sochdb_storage/
optimized_scan.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Optimized Range Scan with O(log n + k) Asymptotics
19//!
20//! This module replaces the O(n) full-table scan with proper index utilization:
21//!
22//! ## Optimizations Applied
23//!
24//! 1. **Sparse Index Lookup**: O(log n) binary search in level metadata
25//! 2. **Block-Level Skipping**: Only read blocks that overlap [start, end]
26//! 3. **Version Filtering**: Skip blocks with no visible versions
27//! 4. **Bloom Filter Pre-check**: Skip SSTables that definitely don't contain range
28//! 5. **K-way Merge with Tournament Tree**: O(k log m) merging where m = levels
29//!
30//! ## Complexity Analysis
31//!
32//! - Index lookup: O(log n) per level
33//! - Block reads: O(k / block_size) where k = result count
34//! - Merge: O(k log m) where m = number of sources
35//! - Total: O(log n + k log m)
36//!
37//! Compare to naive scan: O(n) where n = total entries
38
39use std::cmp::Ordering;
40
41/// Snapshot timestamp for MVCC visibility
42pub type Timestamp = u64;
43
44/// A key-value pair with version metadata
45#[derive(Debug, Clone)]
46pub struct VersionedEntry {
47    /// The key
48    pub key: Vec<u8>,
49    /// The value (None = tombstone)
50    pub value: Option<Vec<u8>>,
51    /// Version timestamp
52    pub timestamp: Timestamp,
53    /// Sequence number (for ordering within same timestamp)
54    pub sequence: u64,
55}
56
57impl VersionedEntry {
58    /// Check if this is a deletion marker
59    pub fn is_tombstone(&self) -> bool {
60        self.value.is_none()
61    }
62}
63
64/// Source of entries for merging
65pub trait EntrySource: Send + Sync {
66    /// Get current entry (None if exhausted)
67    fn current(&self) -> Option<&VersionedEntry>;
68
69    /// Advance to next entry
70    fn next(&mut self) -> bool;
71
72    /// Seek to first entry >= key
73    fn seek(&mut self, key: &[u8]);
74
75    /// Check if source is exhausted
76    fn exhausted(&self) -> bool;
77
78    /// Source priority (lower = higher priority for same key)
79    fn priority(&self) -> u32;
80}
81
82/// Tournament tree node for k-way merge
83struct TournamentNode {
84    /// Index into sources array (u32::MAX = not valid)
85    source_idx: u32,
86    /// Cached entry for comparison
87    entry: Option<VersionedEntry>,
88}
89
90impl TournamentNode {
91    fn empty() -> Self {
92        Self {
93            source_idx: u32::MAX,
94            entry: None,
95        }
96    }
97
98    fn with_entry(source_idx: u32, entry: VersionedEntry) -> Self {
99        Self {
100            source_idx,
101            entry: Some(entry),
102        }
103    }
104
105    fn is_valid(&self) -> bool {
106        self.source_idx != u32::MAX && self.entry.is_some()
107    }
108
109    /// Compare two nodes (for tournament)
110    /// Returns true if self should "win" (come first in output)
111    fn beats(&self, other: &TournamentNode, sources: &[Box<dyn EntrySource>]) -> bool {
112        match (&self.entry, &other.entry) {
113            (None, _) => false,
114            (_, None) => true,
115            (Some(a), Some(b)) => {
116                match a.key.cmp(&b.key) {
117                    Ordering::Less => true,
118                    Ordering::Greater => false,
119                    Ordering::Equal => {
120                        // Same key: higher timestamp wins (newer version)
121                        if a.timestamp != b.timestamp {
122                            a.timestamp > b.timestamp
123                        } else {
124                            // Same timestamp: higher sequence wins
125                            if a.sequence != b.sequence {
126                                a.sequence > b.sequence
127                            } else {
128                                // Same sequence: lower priority source wins
129                                let a_priority = sources
130                                    .get(self.source_idx as usize)
131                                    .map(|s| s.priority())
132                                    .unwrap_or(u32::MAX);
133                                let b_priority = sources
134                                    .get(other.source_idx as usize)
135                                    .map(|s| s.priority())
136                                    .unwrap_or(u32::MAX);
137                                a_priority < b_priority
138                            }
139                        }
140                    }
141                }
142            }
143        }
144    }
145}
146
147/// Tournament tree for efficient k-way merge
148///
149/// Uses a complete binary tree where each internal node contains the
150/// "winner" of comparing its two children.
151pub struct TournamentTree {
152    /// The tree nodes (size = 2 * num_sources - 1)
153    nodes: Vec<TournamentNode>,
154    /// Number of sources (leaves)
155    num_sources: usize,
156}
157
158impl TournamentTree {
159    /// Create a new tournament tree
160    pub fn new(num_sources: usize) -> Self {
161        let tree_size = if num_sources == 0 {
162            0
163        } else {
164            2 * num_sources - 1
165        };
166        Self {
167            nodes: (0..tree_size).map(|_| TournamentNode::empty()).collect(),
168            num_sources,
169        }
170    }
171
172    /// Initialize tree with current entries from sources
173    pub fn initialize(&mut self, sources: &[Box<dyn EntrySource>]) {
174        if self.num_sources == 0 {
175            return;
176        }
177
178        // Fill leaves
179        let leaf_start = self.num_sources - 1;
180        for (i, source) in sources.iter().enumerate() {
181            let leaf_idx = leaf_start + i;
182            if leaf_idx < self.nodes.len() {
183                self.nodes[leaf_idx] = match source.current() {
184                    Some(entry) => TournamentNode::with_entry(i as u32, entry.clone()),
185                    None => TournamentNode::empty(),
186                };
187            }
188        }
189
190        // Build tree bottom-up
191        if leaf_start > 0 {
192            for i in (0..leaf_start).rev() {
193                let left = 2 * i + 1;
194                let right = 2 * i + 2;
195                self.nodes[i] = self.compare_nodes(left, right, sources);
196            }
197        }
198    }
199
200    /// Compare two nodes and return the winner
201    fn compare_nodes(
202        &self,
203        left_idx: usize,
204        right_idx: usize,
205        sources: &[Box<dyn EntrySource>],
206    ) -> TournamentNode {
207        let left = self.nodes.get(left_idx);
208        let right = self.nodes.get(right_idx);
209
210        match (left, right) {
211            (None, None) => TournamentNode::empty(),
212            (Some(l), None) => l.clone(),
213            (None, Some(r)) => r.clone(),
214            (Some(l), Some(r)) => {
215                if l.beats(r, sources) {
216                    l.clone()
217                } else {
218                    r.clone()
219                }
220            }
221        }
222    }
223
224    /// Get the current winner (minimum key)
225    pub fn peek(&self) -> Option<&VersionedEntry> {
226        self.nodes.first().and_then(|n| n.entry.as_ref())
227    }
228
229    /// Get the source index of the current winner
230    pub fn winner_source(&self) -> Option<u32> {
231        self.nodes.first().and_then(|n| {
232            if n.is_valid() {
233                Some(n.source_idx)
234            } else {
235                None
236            }
237        })
238    }
239
240    /// Advance the winning source and rebuild the tree
241    pub fn pop(&mut self, sources: &mut [Box<dyn EntrySource>]) -> Option<VersionedEntry> {
242        if self.num_sources == 0 {
243            return None;
244        }
245
246        let winner_source = self.winner_source()?;
247        let result = self.nodes[0].entry.clone();
248
249        // Advance the winning source
250        if let Some(source) = sources.get_mut(winner_source as usize) {
251            source.next();
252        }
253
254        // Update the leaf for this source
255        let leaf_idx = self.num_sources - 1 + winner_source as usize;
256        if leaf_idx < self.nodes.len() {
257            self.nodes[leaf_idx] = match sources
258                .get(winner_source as usize)
259                .and_then(|s| s.current())
260            {
261                Some(entry) => TournamentNode::with_entry(winner_source, entry.clone()),
262                None => TournamentNode::empty(),
263            };
264        }
265
266        // Rebuild path from leaf to root
267        self.rebuild_path(leaf_idx, sources);
268
269        result
270    }
271
272    /// Rebuild the path from a leaf to the root
273    fn rebuild_path(&mut self, leaf_idx: usize, sources: &[Box<dyn EntrySource>]) {
274        let mut idx = leaf_idx;
275        while idx > 0 {
276            let parent = (idx - 1) / 2;
277            let left = 2 * parent + 1;
278            let right = 2 * parent + 2;
279            self.nodes[parent] = self.compare_nodes(left, right, sources);
280            idx = parent;
281        }
282    }
283}
284
285impl Clone for TournamentNode {
286    fn clone(&self) -> Self {
287        Self {
288            source_idx: self.source_idx,
289            entry: self.entry.clone(),
290        }
291    }
292}
293
294/// Range scan configuration
295#[derive(Debug, Clone)]
296pub struct ScanConfig {
297    /// Start key (inclusive)
298    pub start_key: Option<Vec<u8>>,
299    /// End key (exclusive)
300    pub end_key: Option<Vec<u8>>,
301    /// Snapshot timestamp for visibility
302    pub snapshot: Timestamp,
303    /// Maximum entries to return
304    pub limit: Option<usize>,
305    /// Skip tombstones in output
306    pub skip_tombstones: bool,
307    /// Only return latest version per key
308    pub latest_only: bool,
309}
310
311impl Default for ScanConfig {
312    fn default() -> Self {
313        Self {
314            start_key: None,
315            end_key: None,
316            snapshot: u64::MAX,
317            limit: None,
318            skip_tombstones: true,
319            latest_only: true,
320        }
321    }
322}
323
324impl ScanConfig {
325    /// Create a full table scan
326    pub fn full_scan() -> Self {
327        Self::default()
328    }
329
330    /// Create a range scan
331    pub fn range(start: Vec<u8>, end: Vec<u8>) -> Self {
332        Self {
333            start_key: Some(start),
334            end_key: Some(end),
335            ..Default::default()
336        }
337    }
338
339    /// Set snapshot timestamp
340    pub fn with_snapshot(mut self, ts: Timestamp) -> Self {
341        self.snapshot = ts;
342        self
343    }
344
345    /// Set limit
346    pub fn with_limit(mut self, limit: usize) -> Self {
347        self.limit = Some(limit);
348        self
349    }
350}
351
352/// File metadata for range-based file selection
353#[derive(Debug, Clone)]
354pub struct FileRange {
355    /// File ID
356    pub id: u64,
357    /// Smallest key in file
358    pub smallest_key: Vec<u8>,
359    /// Largest key in file
360    pub largest_key: Vec<u8>,
361    /// Smallest timestamp
362    pub min_timestamp: Timestamp,
363    /// Largest timestamp
364    pub max_timestamp: Timestamp,
365    /// Number of entries
366    pub num_entries: u64,
367}
368
369impl FileRange {
370    /// Check if file might contain keys in range
371    pub fn overlaps_range(&self, start: Option<&[u8]>, end: Option<&[u8]>) -> bool {
372        // Check key range overlap
373        let key_overlaps = match (start, end) {
374            (None, None) => true,
375            (Some(s), None) => self.largest_key.as_slice() >= s,
376            (None, Some(e)) => self.smallest_key.as_slice() < e,
377            (Some(s), Some(e)) => {
378                self.largest_key.as_slice() >= s && self.smallest_key.as_slice() < e
379            }
380        };
381
382        key_overlaps
383    }
384
385    /// Check if file might have visible versions
386    pub fn has_visible_versions(&self, snapshot: Timestamp) -> bool {
387        self.min_timestamp <= snapshot
388    }
389}
390
391/// Level metadata for file selection
392#[derive(Debug, Clone)]
393pub struct LevelFiles {
394    /// Level number
395    pub level: u32,
396    /// Files in this level (sorted by smallest_key for L1+)
397    pub files: Vec<FileRange>,
398}
399
400impl LevelFiles {
401    /// Find files that overlap the range
402    pub fn find_overlapping(&self, start: Option<&[u8]>, end: Option<&[u8]>) -> Vec<&FileRange> {
403        if self.level == 0 {
404            // L0 files may overlap, check all
405            self.files
406                .iter()
407                .filter(|f| f.overlaps_range(start, end))
408                .collect()
409        } else {
410            // L1+ files are sorted and non-overlapping
411            // Use binary search to find range
412            self.binary_search_range(start, end)
413        }
414    }
415
416    /// Binary search for overlapping files in sorted levels
417    fn binary_search_range(&self, start: Option<&[u8]>, end: Option<&[u8]>) -> Vec<&FileRange> {
418        if self.files.is_empty() {
419            return vec![];
420        }
421
422        // Find first file that might contain start
423        let start_idx = match start {
424            None => 0,
425            Some(s) => self
426                .files
427                .binary_search_by(|f| {
428                    if f.largest_key.as_slice() < s {
429                        Ordering::Less
430                    } else {
431                        Ordering::Greater
432                    }
433                })
434                .unwrap_or_else(|i| i),
435        };
436
437        // Find last file that might contain end
438        let end_idx = match end {
439            None => self.files.len(),
440            Some(e) => {
441                let idx = self
442                    .files
443                    .binary_search_by(|f| {
444                        if f.smallest_key.as_slice() >= e {
445                            Ordering::Greater
446                        } else {
447                            Ordering::Less
448                        }
449                    })
450                    .unwrap_or_else(|i| i);
451                idx.min(self.files.len())
452            }
453        };
454
455        self.files[start_idx..end_idx].iter().collect()
456    }
457}
458
459/// Optimized range scanner
460pub struct RangeScanner {
461    /// Scan configuration
462    config: ScanConfig,
463    /// Tournament tree for merging
464    tournament: TournamentTree,
465    /// Entry sources
466    sources: Vec<Box<dyn EntrySource>>,
467    /// Last emitted key (for deduplication)
468    last_key: Option<Vec<u8>>,
469    /// Count of entries returned
470    count: usize,
471    /// Scan statistics
472    stats: ScanStats,
473}
474
475/// Scan statistics
476#[derive(Debug, Default, Clone)]
477pub struct ScanStats {
478    /// Files considered
479    pub files_considered: usize,
480    /// Files skipped by range
481    pub files_skipped_range: usize,
482    /// Files skipped by timestamp
483    pub files_skipped_timestamp: usize,
484    /// Blocks read
485    pub blocks_read: usize,
486    /// Entries scanned
487    pub entries_scanned: usize,
488    /// Entries returned
489    pub entries_returned: usize,
490    /// Tombstones encountered
491    pub tombstones_seen: usize,
492    /// Duplicate versions skipped
493    pub duplicates_skipped: usize,
494}
495
496impl RangeScanner {
497    /// Create a new range scanner
498    pub fn new(config: ScanConfig, sources: Vec<Box<dyn EntrySource>>) -> Self {
499        let num_sources = sources.len();
500        let mut scanner = Self {
501            config,
502            tournament: TournamentTree::new(num_sources),
503            sources,
504            last_key: None,
505            count: 0,
506            stats: ScanStats::default(),
507        };
508
509        // Seek all sources to start position
510        if let Some(ref start) = scanner.config.start_key {
511            for source in &mut scanner.sources {
512                source.seek(start);
513            }
514        }
515
516        // Initialize tournament tree
517        scanner.tournament.initialize(&scanner.sources);
518
519        scanner
520    }
521
522    /// Get next visible entry
523    pub fn next(&mut self) -> Option<VersionedEntry> {
524        loop {
525            // Check limit
526            if let Some(limit) = self.config.limit {
527                if self.count >= limit {
528                    return None;
529                }
530            }
531
532            // Get next entry from tournament
533            let entry = self.tournament.pop(&mut self.sources)?;
534            self.stats.entries_scanned += 1;
535
536            // Check end key
537            if let Some(ref end) = self.config.end_key {
538                if entry.key.as_slice() >= end.as_slice() {
539                    return None;
540                }
541            }
542
543            // Check visibility (MVCC)
544            if entry.timestamp > self.config.snapshot {
545                continue; // Version too new
546            }
547
548            // Handle tombstones
549            if entry.is_tombstone() {
550                self.stats.tombstones_seen += 1;
551                if self.config.skip_tombstones {
552                    // Mark key as seen (for dedup) but don't return
553                    if self.config.latest_only {
554                        self.last_key = Some(entry.key);
555                    }
556                    continue;
557                }
558            }
559
560            // Handle deduplication for latest_only
561            if self.config.latest_only {
562                if let Some(ref last) = self.last_key {
563                    if entry.key == *last {
564                        self.stats.duplicates_skipped += 1;
565                        continue;
566                    }
567                }
568                self.last_key = Some(entry.key.clone());
569            }
570
571            self.count += 1;
572            self.stats.entries_returned += 1;
573            return Some(entry);
574        }
575    }
576
577    /// Get scan statistics
578    pub fn stats(&self) -> &ScanStats {
579        &self.stats
580    }
581
582    /// Collect all results into a Vec
583    pub fn collect(mut self) -> Vec<VersionedEntry> {
584        let mut results = Vec::new();
585        while let Some(entry) = self.next() {
586            results.push(entry);
587        }
588        results
589    }
590}
591
592/// Helper to select files for a range scan
593pub fn select_files_for_range(
594    levels: &[LevelFiles],
595    config: &ScanConfig,
596) -> (Vec<FileRange>, ScanStats) {
597    let mut selected = Vec::new();
598    let mut stats = ScanStats::default();
599
600    let start = config.start_key.as_deref();
601    let end = config.end_key.as_deref();
602
603    for level in levels {
604        for file in level.find_overlapping(start, end) {
605            stats.files_considered += 1;
606
607            // Check timestamp visibility
608            if !file.has_visible_versions(config.snapshot) {
609                stats.files_skipped_timestamp += 1;
610                continue;
611            }
612
613            selected.push(file.clone());
614        }
615
616        // Count files that were skipped by range
617        let total_files = level.files.len();
618        let overlapping = level.find_overlapping(start, end).len();
619        stats.files_skipped_range += total_files - overlapping;
620    }
621
622    (selected, stats)
623}
624
625// =============================================================================
626// Tests
627// =============================================================================
628
629#[cfg(test)]
630mod tests {
631    use super::*;
632
633    /// Simple in-memory source for testing
634    struct VecSource {
635        entries: Vec<VersionedEntry>,
636        pos: usize,
637        priority: u32,
638    }
639
640    impl VecSource {
641        fn new(entries: Vec<VersionedEntry>, priority: u32) -> Self {
642            Self {
643                entries,
644                pos: 0,
645                priority,
646            }
647        }
648    }
649
650    impl EntrySource for VecSource {
651        fn current(&self) -> Option<&VersionedEntry> {
652            self.entries.get(self.pos)
653        }
654
655        fn next(&mut self) -> bool {
656            if self.pos < self.entries.len() {
657                self.pos += 1;
658            }
659            self.pos < self.entries.len()
660        }
661
662        fn seek(&mut self, key: &[u8]) {
663            self.pos = self
664                .entries
665                .iter()
666                .position(|e| e.key.as_slice() >= key)
667                .unwrap_or(self.entries.len());
668        }
669
670        fn exhausted(&self) -> bool {
671            self.pos >= self.entries.len()
672        }
673
674        fn priority(&self) -> u32 {
675            self.priority
676        }
677    }
678
679    fn make_entry(key: &str, value: &str, ts: u64) -> VersionedEntry {
680        VersionedEntry {
681            key: key.as_bytes().to_vec(),
682            value: Some(value.as_bytes().to_vec()),
683            timestamp: ts,
684            sequence: 0,
685        }
686    }
687
688    fn make_tombstone(key: &str, ts: u64) -> VersionedEntry {
689        VersionedEntry {
690            key: key.as_bytes().to_vec(),
691            value: None,
692            timestamp: ts,
693            sequence: 0,
694        }
695    }
696
697    #[test]
698    fn test_tournament_tree_single_source() {
699        let entries = vec![
700            make_entry("a", "1", 100),
701            make_entry("b", "2", 100),
702            make_entry("c", "3", 100),
703        ];
704
705        let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
706        let sources = vec![source];
707
708        let scanner = RangeScanner::new(ScanConfig::default(), sources);
709        let results = scanner.collect();
710
711        assert_eq!(results.len(), 3);
712        assert_eq!(results[0].key, b"a");
713        assert_eq!(results[1].key, b"b");
714        assert_eq!(results[2].key, b"c");
715    }
716
717    #[test]
718    fn test_tournament_tree_merge() {
719        // Simulate L0 and L1 with overlapping keys
720        let l0_entries = vec![make_entry("a", "new", 200), make_entry("c", "new", 200)];
721        let l1_entries = vec![
722            make_entry("a", "old", 100),
723            make_entry("b", "old", 100),
724            make_entry("c", "old", 100),
725        ];
726
727        let sources: Vec<Box<dyn EntrySource>> = vec![
728            Box::new(VecSource::new(l0_entries, 0)), // L0 higher priority
729            Box::new(VecSource::new(l1_entries, 1)), // L1 lower priority
730        ];
731
732        let config = ScanConfig {
733            snapshot: 250,
734            latest_only: true,
735            ..Default::default()
736        };
737
738        let scanner = RangeScanner::new(config, sources);
739        let results = scanner.collect();
740
741        assert_eq!(results.len(), 3);
742        // Should have newer versions for a and c
743        assert_eq!(results[0].key, b"a");
744        assert_eq!(results[0].timestamp, 200);
745        assert_eq!(results[1].key, b"b");
746        assert_eq!(results[1].timestamp, 100);
747        assert_eq!(results[2].key, b"c");
748        assert_eq!(results[2].timestamp, 200);
749    }
750
751    #[test]
752    fn test_tombstone_handling() {
753        let entries = vec![
754            make_entry("a", "value", 100),
755            make_tombstone("b", 200),
756            make_entry("c", "value", 100),
757        ];
758
759        let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
760        let sources = vec![source];
761
762        let config = ScanConfig {
763            skip_tombstones: true,
764            ..Default::default()
765        };
766
767        let scanner = RangeScanner::new(config, sources);
768        let results = scanner.collect();
769
770        assert_eq!(results.len(), 2);
771        assert_eq!(results[0].key, b"a");
772        assert_eq!(results[1].key, b"c");
773    }
774
775    #[test]
776    fn test_range_bounds() {
777        let entries = vec![
778            make_entry("a", "1", 100),
779            make_entry("b", "2", 100),
780            make_entry("c", "3", 100),
781            make_entry("d", "4", 100),
782        ];
783
784        let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
785        let sources = vec![source];
786
787        let config = ScanConfig {
788            start_key: Some(b"b".to_vec()),
789            end_key: Some(b"d".to_vec()),
790            ..Default::default()
791        };
792
793        let scanner = RangeScanner::new(config, sources);
794        let results = scanner.collect();
795
796        assert_eq!(results.len(), 2);
797        assert_eq!(results[0].key, b"b");
798        assert_eq!(results[1].key, b"c");
799    }
800
801    #[test]
802    fn test_limit() {
803        let entries: Vec<_> = (0..100)
804            .map(|i| make_entry(&format!("key{:03}", i), &format!("val{}", i), 100))
805            .collect();
806
807        let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
808        let sources = vec![source];
809
810        let config = ScanConfig {
811            limit: Some(10),
812            ..Default::default()
813        };
814
815        let scanner = RangeScanner::new(config, sources);
816        let results = scanner.collect();
817
818        assert_eq!(results.len(), 10);
819    }
820
821    #[test]
822    fn test_mvcc_visibility() {
823        let entries = vec![
824            make_entry("a", "v1", 100),
825            make_entry("a", "v2", 200),
826            make_entry("a", "v3", 300),
827        ];
828
829        let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
830        let sources = vec![source];
831
832        // Snapshot at 150 should only see v1
833        let config = ScanConfig {
834            snapshot: 150,
835            latest_only: true,
836            ..Default::default()
837        };
838
839        let scanner = RangeScanner::new(config, sources);
840        let results = scanner.collect();
841
842        assert_eq!(results.len(), 1);
843        assert_eq!(results[0].timestamp, 100);
844    }
845
846    #[test]
847    fn test_file_range_overlap() {
848        let file = FileRange {
849            id: 1,
850            smallest_key: b"c".to_vec(),
851            largest_key: b"h".to_vec(),
852            min_timestamp: 100,
853            max_timestamp: 200,
854            num_entries: 100,
855        };
856
857        // Should overlap
858        assert!(file.overlaps_range(Some(b"a"), Some(b"e")));
859        assert!(file.overlaps_range(Some(b"f"), Some(b"z")));
860        assert!(file.overlaps_range(Some(b"d"), Some(b"g")));
861        assert!(file.overlaps_range(None, Some(b"e")));
862        assert!(file.overlaps_range(Some(b"f"), None));
863        assert!(file.overlaps_range(None, None));
864
865        // Should not overlap
866        assert!(!file.overlaps_range(Some(b"a"), Some(b"c")));
867        assert!(!file.overlaps_range(Some(b"i"), Some(b"z")));
868    }
869
870    #[test]
871    fn test_level_binary_search() {
872        let level = LevelFiles {
873            level: 1,
874            files: vec![
875                FileRange {
876                    id: 1,
877                    smallest_key: b"a".to_vec(),
878                    largest_key: b"d".to_vec(),
879                    min_timestamp: 100,
880                    max_timestamp: 200,
881                    num_entries: 100,
882                },
883                FileRange {
884                    id: 2,
885                    smallest_key: b"e".to_vec(),
886                    largest_key: b"h".to_vec(),
887                    min_timestamp: 100,
888                    max_timestamp: 200,
889                    num_entries: 100,
890                },
891                FileRange {
892                    id: 3,
893                    smallest_key: b"i".to_vec(),
894                    largest_key: b"l".to_vec(),
895                    min_timestamp: 100,
896                    max_timestamp: 200,
897                    num_entries: 100,
898                },
899            ],
900        };
901
902        // Query c-g should return files 1 and 2
903        let result = level.find_overlapping(Some(b"c"), Some(b"g"));
904        assert_eq!(result.len(), 2);
905        assert_eq!(result[0].id, 1);
906        assert_eq!(result[1].id, 2);
907    }
908}