1use std::cmp::Ordering;
40
41pub type Timestamp = u64;
43
44#[derive(Debug, Clone)]
46pub struct VersionedEntry {
47 pub key: Vec<u8>,
49 pub value: Option<Vec<u8>>,
51 pub timestamp: Timestamp,
53 pub sequence: u64,
55}
56
57impl VersionedEntry {
58 pub fn is_tombstone(&self) -> bool {
60 self.value.is_none()
61 }
62}
63
64pub trait EntrySource: Send + Sync {
66 fn current(&self) -> Option<&VersionedEntry>;
68
69 fn next(&mut self) -> bool;
71
72 fn seek(&mut self, key: &[u8]);
74
75 fn exhausted(&self) -> bool;
77
78 fn priority(&self) -> u32;
80}
81
82struct TournamentNode {
84 source_idx: u32,
86 entry: Option<VersionedEntry>,
88}
89
90impl TournamentNode {
91 fn empty() -> Self {
92 Self {
93 source_idx: u32::MAX,
94 entry: None,
95 }
96 }
97
98 fn with_entry(source_idx: u32, entry: VersionedEntry) -> Self {
99 Self {
100 source_idx,
101 entry: Some(entry),
102 }
103 }
104
105 fn is_valid(&self) -> bool {
106 self.source_idx != u32::MAX && self.entry.is_some()
107 }
108
109 fn beats(&self, other: &TournamentNode, sources: &[Box<dyn EntrySource>]) -> bool {
112 match (&self.entry, &other.entry) {
113 (None, _) => false,
114 (_, None) => true,
115 (Some(a), Some(b)) => {
116 match a.key.cmp(&b.key) {
117 Ordering::Less => true,
118 Ordering::Greater => false,
119 Ordering::Equal => {
120 if a.timestamp != b.timestamp {
122 a.timestamp > b.timestamp
123 } else {
124 if a.sequence != b.sequence {
126 a.sequence > b.sequence
127 } else {
128 let a_priority = sources
130 .get(self.source_idx as usize)
131 .map(|s| s.priority())
132 .unwrap_or(u32::MAX);
133 let b_priority = sources
134 .get(other.source_idx as usize)
135 .map(|s| s.priority())
136 .unwrap_or(u32::MAX);
137 a_priority < b_priority
138 }
139 }
140 }
141 }
142 }
143 }
144 }
145}
146
147pub struct TournamentTree {
152 nodes: Vec<TournamentNode>,
154 num_sources: usize,
156}
157
158impl TournamentTree {
159 pub fn new(num_sources: usize) -> Self {
161 let tree_size = if num_sources == 0 {
162 0
163 } else {
164 2 * num_sources - 1
165 };
166 Self {
167 nodes: (0..tree_size).map(|_| TournamentNode::empty()).collect(),
168 num_sources,
169 }
170 }
171
172 pub fn initialize(&mut self, sources: &[Box<dyn EntrySource>]) {
174 if self.num_sources == 0 {
175 return;
176 }
177
178 let leaf_start = self.num_sources - 1;
180 for (i, source) in sources.iter().enumerate() {
181 let leaf_idx = leaf_start + i;
182 if leaf_idx < self.nodes.len() {
183 self.nodes[leaf_idx] = match source.current() {
184 Some(entry) => TournamentNode::with_entry(i as u32, entry.clone()),
185 None => TournamentNode::empty(),
186 };
187 }
188 }
189
190 if leaf_start > 0 {
192 for i in (0..leaf_start).rev() {
193 let left = 2 * i + 1;
194 let right = 2 * i + 2;
195 self.nodes[i] = self.compare_nodes(left, right, sources);
196 }
197 }
198 }
199
200 fn compare_nodes(
202 &self,
203 left_idx: usize,
204 right_idx: usize,
205 sources: &[Box<dyn EntrySource>],
206 ) -> TournamentNode {
207 let left = self.nodes.get(left_idx);
208 let right = self.nodes.get(right_idx);
209
210 match (left, right) {
211 (None, None) => TournamentNode::empty(),
212 (Some(l), None) => l.clone(),
213 (None, Some(r)) => r.clone(),
214 (Some(l), Some(r)) => {
215 if l.beats(r, sources) {
216 l.clone()
217 } else {
218 r.clone()
219 }
220 }
221 }
222 }
223
224 pub fn peek(&self) -> Option<&VersionedEntry> {
226 self.nodes.first().and_then(|n| n.entry.as_ref())
227 }
228
229 pub fn winner_source(&self) -> Option<u32> {
231 self.nodes.first().and_then(|n| {
232 if n.is_valid() {
233 Some(n.source_idx)
234 } else {
235 None
236 }
237 })
238 }
239
240 pub fn pop(&mut self, sources: &mut [Box<dyn EntrySource>]) -> Option<VersionedEntry> {
242 if self.num_sources == 0 {
243 return None;
244 }
245
246 let winner_source = self.winner_source()?;
247 let result = self.nodes[0].entry.clone();
248
249 if let Some(source) = sources.get_mut(winner_source as usize) {
251 source.next();
252 }
253
254 let leaf_idx = self.num_sources - 1 + winner_source as usize;
256 if leaf_idx < self.nodes.len() {
257 self.nodes[leaf_idx] = match sources
258 .get(winner_source as usize)
259 .and_then(|s| s.current())
260 {
261 Some(entry) => TournamentNode::with_entry(winner_source, entry.clone()),
262 None => TournamentNode::empty(),
263 };
264 }
265
266 self.rebuild_path(leaf_idx, sources);
268
269 result
270 }
271
272 fn rebuild_path(&mut self, leaf_idx: usize, sources: &[Box<dyn EntrySource>]) {
274 let mut idx = leaf_idx;
275 while idx > 0 {
276 let parent = (idx - 1) / 2;
277 let left = 2 * parent + 1;
278 let right = 2 * parent + 2;
279 self.nodes[parent] = self.compare_nodes(left, right, sources);
280 idx = parent;
281 }
282 }
283}
284
285impl Clone for TournamentNode {
286 fn clone(&self) -> Self {
287 Self {
288 source_idx: self.source_idx,
289 entry: self.entry.clone(),
290 }
291 }
292}
293
294#[derive(Debug, Clone)]
296pub struct ScanConfig {
297 pub start_key: Option<Vec<u8>>,
299 pub end_key: Option<Vec<u8>>,
301 pub snapshot: Timestamp,
303 pub limit: Option<usize>,
305 pub skip_tombstones: bool,
307 pub latest_only: bool,
309}
310
311impl Default for ScanConfig {
312 fn default() -> Self {
313 Self {
314 start_key: None,
315 end_key: None,
316 snapshot: u64::MAX,
317 limit: None,
318 skip_tombstones: true,
319 latest_only: true,
320 }
321 }
322}
323
324impl ScanConfig {
325 pub fn full_scan() -> Self {
327 Self::default()
328 }
329
330 pub fn range(start: Vec<u8>, end: Vec<u8>) -> Self {
332 Self {
333 start_key: Some(start),
334 end_key: Some(end),
335 ..Default::default()
336 }
337 }
338
339 pub fn with_snapshot(mut self, ts: Timestamp) -> Self {
341 self.snapshot = ts;
342 self
343 }
344
345 pub fn with_limit(mut self, limit: usize) -> Self {
347 self.limit = Some(limit);
348 self
349 }
350}
351
352#[derive(Debug, Clone)]
354pub struct FileRange {
355 pub id: u64,
357 pub smallest_key: Vec<u8>,
359 pub largest_key: Vec<u8>,
361 pub min_timestamp: Timestamp,
363 pub max_timestamp: Timestamp,
365 pub num_entries: u64,
367}
368
369impl FileRange {
370 pub fn overlaps_range(&self, start: Option<&[u8]>, end: Option<&[u8]>) -> bool {
372 let key_overlaps = match (start, end) {
374 (None, None) => true,
375 (Some(s), None) => self.largest_key.as_slice() >= s,
376 (None, Some(e)) => self.smallest_key.as_slice() < e,
377 (Some(s), Some(e)) => {
378 self.largest_key.as_slice() >= s && self.smallest_key.as_slice() < e
379 }
380 };
381
382 key_overlaps
383 }
384
385 pub fn has_visible_versions(&self, snapshot: Timestamp) -> bool {
387 self.min_timestamp <= snapshot
388 }
389}
390
391#[derive(Debug, Clone)]
393pub struct LevelFiles {
394 pub level: u32,
396 pub files: Vec<FileRange>,
398}
399
400impl LevelFiles {
401 pub fn find_overlapping(&self, start: Option<&[u8]>, end: Option<&[u8]>) -> Vec<&FileRange> {
403 if self.level == 0 {
404 self.files
406 .iter()
407 .filter(|f| f.overlaps_range(start, end))
408 .collect()
409 } else {
410 self.binary_search_range(start, end)
413 }
414 }
415
416 fn binary_search_range(&self, start: Option<&[u8]>, end: Option<&[u8]>) -> Vec<&FileRange> {
418 if self.files.is_empty() {
419 return vec![];
420 }
421
422 let start_idx = match start {
424 None => 0,
425 Some(s) => self
426 .files
427 .binary_search_by(|f| {
428 if f.largest_key.as_slice() < s {
429 Ordering::Less
430 } else {
431 Ordering::Greater
432 }
433 })
434 .unwrap_or_else(|i| i),
435 };
436
437 let end_idx = match end {
439 None => self.files.len(),
440 Some(e) => {
441 let idx = self
442 .files
443 .binary_search_by(|f| {
444 if f.smallest_key.as_slice() >= e {
445 Ordering::Greater
446 } else {
447 Ordering::Less
448 }
449 })
450 .unwrap_or_else(|i| i);
451 idx.min(self.files.len())
452 }
453 };
454
455 self.files[start_idx..end_idx].iter().collect()
456 }
457}
458
459pub struct RangeScanner {
461 config: ScanConfig,
463 tournament: TournamentTree,
465 sources: Vec<Box<dyn EntrySource>>,
467 last_key: Option<Vec<u8>>,
469 count: usize,
471 stats: ScanStats,
473}
474
475#[derive(Debug, Default, Clone)]
477pub struct ScanStats {
478 pub files_considered: usize,
480 pub files_skipped_range: usize,
482 pub files_skipped_timestamp: usize,
484 pub blocks_read: usize,
486 pub entries_scanned: usize,
488 pub entries_returned: usize,
490 pub tombstones_seen: usize,
492 pub duplicates_skipped: usize,
494}
495
496impl RangeScanner {
497 pub fn new(config: ScanConfig, sources: Vec<Box<dyn EntrySource>>) -> Self {
499 let num_sources = sources.len();
500 let mut scanner = Self {
501 config,
502 tournament: TournamentTree::new(num_sources),
503 sources,
504 last_key: None,
505 count: 0,
506 stats: ScanStats::default(),
507 };
508
509 if let Some(ref start) = scanner.config.start_key {
511 for source in &mut scanner.sources {
512 source.seek(start);
513 }
514 }
515
516 scanner.tournament.initialize(&scanner.sources);
518
519 scanner
520 }
521
522 pub fn next(&mut self) -> Option<VersionedEntry> {
524 loop {
525 if let Some(limit) = self.config.limit {
527 if self.count >= limit {
528 return None;
529 }
530 }
531
532 let entry = self.tournament.pop(&mut self.sources)?;
534 self.stats.entries_scanned += 1;
535
536 if let Some(ref end) = self.config.end_key {
538 if entry.key.as_slice() >= end.as_slice() {
539 return None;
540 }
541 }
542
543 if entry.timestamp > self.config.snapshot {
545 continue; }
547
548 if entry.is_tombstone() {
550 self.stats.tombstones_seen += 1;
551 if self.config.skip_tombstones {
552 if self.config.latest_only {
554 self.last_key = Some(entry.key);
555 }
556 continue;
557 }
558 }
559
560 if self.config.latest_only {
562 if let Some(ref last) = self.last_key {
563 if entry.key == *last {
564 self.stats.duplicates_skipped += 1;
565 continue;
566 }
567 }
568 self.last_key = Some(entry.key.clone());
569 }
570
571 self.count += 1;
572 self.stats.entries_returned += 1;
573 return Some(entry);
574 }
575 }
576
577 pub fn stats(&self) -> &ScanStats {
579 &self.stats
580 }
581
582 pub fn collect(mut self) -> Vec<VersionedEntry> {
584 let mut results = Vec::new();
585 while let Some(entry) = self.next() {
586 results.push(entry);
587 }
588 results
589 }
590}
591
592pub fn select_files_for_range(
594 levels: &[LevelFiles],
595 config: &ScanConfig,
596) -> (Vec<FileRange>, ScanStats) {
597 let mut selected = Vec::new();
598 let mut stats = ScanStats::default();
599
600 let start = config.start_key.as_deref();
601 let end = config.end_key.as_deref();
602
603 for level in levels {
604 for file in level.find_overlapping(start, end) {
605 stats.files_considered += 1;
606
607 if !file.has_visible_versions(config.snapshot) {
609 stats.files_skipped_timestamp += 1;
610 continue;
611 }
612
613 selected.push(file.clone());
614 }
615
616 let total_files = level.files.len();
618 let overlapping = level.find_overlapping(start, end).len();
619 stats.files_skipped_range += total_files - overlapping;
620 }
621
622 (selected, stats)
623}
624
625#[cfg(test)]
630mod tests {
631 use super::*;
632
633 struct VecSource {
635 entries: Vec<VersionedEntry>,
636 pos: usize,
637 priority: u32,
638 }
639
640 impl VecSource {
641 fn new(entries: Vec<VersionedEntry>, priority: u32) -> Self {
642 Self {
643 entries,
644 pos: 0,
645 priority,
646 }
647 }
648 }
649
650 impl EntrySource for VecSource {
651 fn current(&self) -> Option<&VersionedEntry> {
652 self.entries.get(self.pos)
653 }
654
655 fn next(&mut self) -> bool {
656 if self.pos < self.entries.len() {
657 self.pos += 1;
658 }
659 self.pos < self.entries.len()
660 }
661
662 fn seek(&mut self, key: &[u8]) {
663 self.pos = self
664 .entries
665 .iter()
666 .position(|e| e.key.as_slice() >= key)
667 .unwrap_or(self.entries.len());
668 }
669
670 fn exhausted(&self) -> bool {
671 self.pos >= self.entries.len()
672 }
673
674 fn priority(&self) -> u32 {
675 self.priority
676 }
677 }
678
679 fn make_entry(key: &str, value: &str, ts: u64) -> VersionedEntry {
680 VersionedEntry {
681 key: key.as_bytes().to_vec(),
682 value: Some(value.as_bytes().to_vec()),
683 timestamp: ts,
684 sequence: 0,
685 }
686 }
687
688 fn make_tombstone(key: &str, ts: u64) -> VersionedEntry {
689 VersionedEntry {
690 key: key.as_bytes().to_vec(),
691 value: None,
692 timestamp: ts,
693 sequence: 0,
694 }
695 }
696
697 #[test]
698 fn test_tournament_tree_single_source() {
699 let entries = vec![
700 make_entry("a", "1", 100),
701 make_entry("b", "2", 100),
702 make_entry("c", "3", 100),
703 ];
704
705 let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
706 let sources = vec![source];
707
708 let scanner = RangeScanner::new(ScanConfig::default(), sources);
709 let results = scanner.collect();
710
711 assert_eq!(results.len(), 3);
712 assert_eq!(results[0].key, b"a");
713 assert_eq!(results[1].key, b"b");
714 assert_eq!(results[2].key, b"c");
715 }
716
717 #[test]
718 fn test_tournament_tree_merge() {
719 let l0_entries = vec![make_entry("a", "new", 200), make_entry("c", "new", 200)];
721 let l1_entries = vec![
722 make_entry("a", "old", 100),
723 make_entry("b", "old", 100),
724 make_entry("c", "old", 100),
725 ];
726
727 let sources: Vec<Box<dyn EntrySource>> = vec![
728 Box::new(VecSource::new(l0_entries, 0)), Box::new(VecSource::new(l1_entries, 1)), ];
731
732 let config = ScanConfig {
733 snapshot: 250,
734 latest_only: true,
735 ..Default::default()
736 };
737
738 let scanner = RangeScanner::new(config, sources);
739 let results = scanner.collect();
740
741 assert_eq!(results.len(), 3);
742 assert_eq!(results[0].key, b"a");
744 assert_eq!(results[0].timestamp, 200);
745 assert_eq!(results[1].key, b"b");
746 assert_eq!(results[1].timestamp, 100);
747 assert_eq!(results[2].key, b"c");
748 assert_eq!(results[2].timestamp, 200);
749 }
750
751 #[test]
752 fn test_tombstone_handling() {
753 let entries = vec![
754 make_entry("a", "value", 100),
755 make_tombstone("b", 200),
756 make_entry("c", "value", 100),
757 ];
758
759 let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
760 let sources = vec![source];
761
762 let config = ScanConfig {
763 skip_tombstones: true,
764 ..Default::default()
765 };
766
767 let scanner = RangeScanner::new(config, sources);
768 let results = scanner.collect();
769
770 assert_eq!(results.len(), 2);
771 assert_eq!(results[0].key, b"a");
772 assert_eq!(results[1].key, b"c");
773 }
774
775 #[test]
776 fn test_range_bounds() {
777 let entries = vec![
778 make_entry("a", "1", 100),
779 make_entry("b", "2", 100),
780 make_entry("c", "3", 100),
781 make_entry("d", "4", 100),
782 ];
783
784 let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
785 let sources = vec![source];
786
787 let config = ScanConfig {
788 start_key: Some(b"b".to_vec()),
789 end_key: Some(b"d".to_vec()),
790 ..Default::default()
791 };
792
793 let scanner = RangeScanner::new(config, sources);
794 let results = scanner.collect();
795
796 assert_eq!(results.len(), 2);
797 assert_eq!(results[0].key, b"b");
798 assert_eq!(results[1].key, b"c");
799 }
800
801 #[test]
802 fn test_limit() {
803 let entries: Vec<_> = (0..100)
804 .map(|i| make_entry(&format!("key{:03}", i), &format!("val{}", i), 100))
805 .collect();
806
807 let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
808 let sources = vec![source];
809
810 let config = ScanConfig {
811 limit: Some(10),
812 ..Default::default()
813 };
814
815 let scanner = RangeScanner::new(config, sources);
816 let results = scanner.collect();
817
818 assert_eq!(results.len(), 10);
819 }
820
821 #[test]
822 fn test_mvcc_visibility() {
823 let entries = vec![
824 make_entry("a", "v1", 100),
825 make_entry("a", "v2", 200),
826 make_entry("a", "v3", 300),
827 ];
828
829 let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
830 let sources = vec![source];
831
832 let config = ScanConfig {
834 snapshot: 150,
835 latest_only: true,
836 ..Default::default()
837 };
838
839 let scanner = RangeScanner::new(config, sources);
840 let results = scanner.collect();
841
842 assert_eq!(results.len(), 1);
843 assert_eq!(results[0].timestamp, 100);
844 }
845
846 #[test]
847 fn test_file_range_overlap() {
848 let file = FileRange {
849 id: 1,
850 smallest_key: b"c".to_vec(),
851 largest_key: b"h".to_vec(),
852 min_timestamp: 100,
853 max_timestamp: 200,
854 num_entries: 100,
855 };
856
857 assert!(file.overlaps_range(Some(b"a"), Some(b"e")));
859 assert!(file.overlaps_range(Some(b"f"), Some(b"z")));
860 assert!(file.overlaps_range(Some(b"d"), Some(b"g")));
861 assert!(file.overlaps_range(None, Some(b"e")));
862 assert!(file.overlaps_range(Some(b"f"), None));
863 assert!(file.overlaps_range(None, None));
864
865 assert!(!file.overlaps_range(Some(b"a"), Some(b"c")));
867 assert!(!file.overlaps_range(Some(b"i"), Some(b"z")));
868 }
869
870 #[test]
871 fn test_level_binary_search() {
872 let level = LevelFiles {
873 level: 1,
874 files: vec![
875 FileRange {
876 id: 1,
877 smallest_key: b"a".to_vec(),
878 largest_key: b"d".to_vec(),
879 min_timestamp: 100,
880 max_timestamp: 200,
881 num_entries: 100,
882 },
883 FileRange {
884 id: 2,
885 smallest_key: b"e".to_vec(),
886 largest_key: b"h".to_vec(),
887 min_timestamp: 100,
888 max_timestamp: 200,
889 num_entries: 100,
890 },
891 FileRange {
892 id: 3,
893 smallest_key: b"i".to_vec(),
894 largest_key: b"l".to_vec(),
895 min_timestamp: 100,
896 max_timestamp: 200,
897 num_entries: 100,
898 },
899 ],
900 };
901
902 let result = level.find_overlapping(Some(b"c"), Some(b"g"));
904 assert_eq!(result.len(), 2);
905 assert_eq!(result[0].id, 1);
906 assert_eq!(result[1].id, 2);
907 }
908}