1use std::cmp::Ordering;
40use std::collections::BinaryHeap;
41use std::sync::Arc;
42
43use parking_lot::RwLock;
44
45pub type Timestamp = u64;
47
48#[derive(Debug, Clone)]
50pub struct VersionedEntry {
51 pub key: Vec<u8>,
53 pub value: Option<Vec<u8>>,
55 pub timestamp: Timestamp,
57 pub sequence: u64,
59}
60
61impl VersionedEntry {
62 pub fn is_tombstone(&self) -> bool {
64 self.value.is_none()
65 }
66}
67
68pub trait EntrySource: Send + Sync {
70 fn current(&self) -> Option<&VersionedEntry>;
72
73 fn next(&mut self) -> bool;
75
76 fn seek(&mut self, key: &[u8]);
78
79 fn exhausted(&self) -> bool;
81
82 fn priority(&self) -> u32;
84}
85
86struct TournamentNode {
88 source_idx: u32,
90 entry: Option<VersionedEntry>,
92}
93
94impl TournamentNode {
95 fn empty() -> Self {
96 Self {
97 source_idx: u32::MAX,
98 entry: None,
99 }
100 }
101
102 fn with_entry(source_idx: u32, entry: VersionedEntry) -> Self {
103 Self {
104 source_idx,
105 entry: Some(entry),
106 }
107 }
108
109 fn is_valid(&self) -> bool {
110 self.source_idx != u32::MAX && self.entry.is_some()
111 }
112
113 fn beats(&self, other: &TournamentNode, sources: &[Box<dyn EntrySource>]) -> bool {
116 match (&self.entry, &other.entry) {
117 (None, _) => false,
118 (_, None) => true,
119 (Some(a), Some(b)) => {
120 match a.key.cmp(&b.key) {
121 Ordering::Less => true,
122 Ordering::Greater => false,
123 Ordering::Equal => {
124 if a.timestamp != b.timestamp {
126 a.timestamp > b.timestamp
127 } else {
128 if a.sequence != b.sequence {
130 a.sequence > b.sequence
131 } else {
132 let a_priority = sources.get(self.source_idx as usize)
134 .map(|s| s.priority())
135 .unwrap_or(u32::MAX);
136 let b_priority = sources.get(other.source_idx as usize)
137 .map(|s| s.priority())
138 .unwrap_or(u32::MAX);
139 a_priority < b_priority
140 }
141 }
142 }
143 }
144 }
145 }
146 }
147}
148
149pub struct TournamentTree {
154 nodes: Vec<TournamentNode>,
156 num_sources: usize,
158}
159
160impl TournamentTree {
161 pub fn new(num_sources: usize) -> Self {
163 let tree_size = if num_sources == 0 { 0 } else { 2 * num_sources - 1 };
164 Self {
165 nodes: (0..tree_size).map(|_| TournamentNode::empty()).collect(),
166 num_sources,
167 }
168 }
169
170 pub fn initialize(&mut self, sources: &[Box<dyn EntrySource>]) {
172 if self.num_sources == 0 {
173 return;
174 }
175
176 let leaf_start = self.num_sources - 1;
178 for (i, source) in sources.iter().enumerate() {
179 let leaf_idx = leaf_start + i;
180 if leaf_idx < self.nodes.len() {
181 self.nodes[leaf_idx] = match source.current() {
182 Some(entry) => TournamentNode::with_entry(i as u32, entry.clone()),
183 None => TournamentNode::empty(),
184 };
185 }
186 }
187
188 if leaf_start > 0 {
190 for i in (0..leaf_start).rev() {
191 let left = 2 * i + 1;
192 let right = 2 * i + 2;
193 self.nodes[i] = self.compare_nodes(left, right, sources);
194 }
195 }
196 }
197
198 fn compare_nodes(
200 &self,
201 left_idx: usize,
202 right_idx: usize,
203 sources: &[Box<dyn EntrySource>],
204 ) -> TournamentNode {
205 let left = self.nodes.get(left_idx);
206 let right = self.nodes.get(right_idx);
207
208 match (left, right) {
209 (None, None) => TournamentNode::empty(),
210 (Some(l), None) => l.clone(),
211 (None, Some(r)) => r.clone(),
212 (Some(l), Some(r)) => {
213 if l.beats(r, sources) {
214 l.clone()
215 } else {
216 r.clone()
217 }
218 }
219 }
220 }
221
222 pub fn peek(&self) -> Option<&VersionedEntry> {
224 self.nodes.first().and_then(|n| n.entry.as_ref())
225 }
226
227 pub fn winner_source(&self) -> Option<u32> {
229 self.nodes.first().and_then(|n| {
230 if n.is_valid() {
231 Some(n.source_idx)
232 } else {
233 None
234 }
235 })
236 }
237
238 pub fn pop(&mut self, sources: &mut [Box<dyn EntrySource>]) -> Option<VersionedEntry> {
240 if self.num_sources == 0 {
241 return None;
242 }
243
244 let winner_source = self.winner_source()?;
245 let result = self.nodes[0].entry.clone();
246
247 if let Some(source) = sources.get_mut(winner_source as usize) {
249 source.next();
250 }
251
252 let leaf_idx = self.num_sources - 1 + winner_source as usize;
254 if leaf_idx < self.nodes.len() {
255 self.nodes[leaf_idx] = match sources.get(winner_source as usize).and_then(|s| s.current()) {
256 Some(entry) => TournamentNode::with_entry(winner_source, entry.clone()),
257 None => TournamentNode::empty(),
258 };
259 }
260
261 self.rebuild_path(leaf_idx, sources);
263
264 result
265 }
266
267 fn rebuild_path(&mut self, leaf_idx: usize, sources: &[Box<dyn EntrySource>]) {
269 let mut idx = leaf_idx;
270 while idx > 0 {
271 let parent = (idx - 1) / 2;
272 let left = 2 * parent + 1;
273 let right = 2 * parent + 2;
274 self.nodes[parent] = self.compare_nodes(left, right, sources);
275 idx = parent;
276 }
277 }
278}
279
280impl Clone for TournamentNode {
281 fn clone(&self) -> Self {
282 Self {
283 source_idx: self.source_idx,
284 entry: self.entry.clone(),
285 }
286 }
287}
288
289#[derive(Debug, Clone)]
291pub struct ScanConfig {
292 pub start_key: Option<Vec<u8>>,
294 pub end_key: Option<Vec<u8>>,
296 pub snapshot: Timestamp,
298 pub limit: Option<usize>,
300 pub skip_tombstones: bool,
302 pub latest_only: bool,
304}
305
306impl Default for ScanConfig {
307 fn default() -> Self {
308 Self {
309 start_key: None,
310 end_key: None,
311 snapshot: u64::MAX,
312 limit: None,
313 skip_tombstones: true,
314 latest_only: true,
315 }
316 }
317}
318
319impl ScanConfig {
320 pub fn full_scan() -> Self {
322 Self::default()
323 }
324
325 pub fn range(start: Vec<u8>, end: Vec<u8>) -> Self {
327 Self {
328 start_key: Some(start),
329 end_key: Some(end),
330 ..Default::default()
331 }
332 }
333
334 pub fn with_snapshot(mut self, ts: Timestamp) -> Self {
336 self.snapshot = ts;
337 self
338 }
339
340 pub fn with_limit(mut self, limit: usize) -> Self {
342 self.limit = Some(limit);
343 self
344 }
345}
346
347#[derive(Debug, Clone)]
349pub struct FileRange {
350 pub id: u64,
352 pub smallest_key: Vec<u8>,
354 pub largest_key: Vec<u8>,
356 pub min_timestamp: Timestamp,
358 pub max_timestamp: Timestamp,
360 pub num_entries: u64,
362}
363
364impl FileRange {
365 pub fn overlaps_range(&self, start: Option<&[u8]>, end: Option<&[u8]>) -> bool {
367 let key_overlaps = match (start, end) {
369 (None, None) => true,
370 (Some(s), None) => self.largest_key.as_slice() >= s,
371 (None, Some(e)) => self.smallest_key.as_slice() < e,
372 (Some(s), Some(e)) => {
373 self.largest_key.as_slice() >= s && self.smallest_key.as_slice() < e
374 }
375 };
376
377 key_overlaps
378 }
379
380 pub fn has_visible_versions(&self, snapshot: Timestamp) -> bool {
382 self.min_timestamp <= snapshot
383 }
384}
385
386#[derive(Debug, Clone)]
388pub struct LevelFiles {
389 pub level: u32,
391 pub files: Vec<FileRange>,
393}
394
395impl LevelFiles {
396 pub fn find_overlapping(&self, start: Option<&[u8]>, end: Option<&[u8]>) -> Vec<&FileRange> {
398 if self.level == 0 {
399 self.files
401 .iter()
402 .filter(|f| f.overlaps_range(start, end))
403 .collect()
404 } else {
405 self.binary_search_range(start, end)
408 }
409 }
410
411 fn binary_search_range(&self, start: Option<&[u8]>, end: Option<&[u8]>) -> Vec<&FileRange> {
413 if self.files.is_empty() {
414 return vec![];
415 }
416
417 let start_idx = match start {
419 None => 0,
420 Some(s) => {
421 self.files
422 .binary_search_by(|f| {
423 if f.largest_key.as_slice() < s {
424 Ordering::Less
425 } else {
426 Ordering::Greater
427 }
428 })
429 .unwrap_or_else(|i| i)
430 }
431 };
432
433 let end_idx = match end {
435 None => self.files.len(),
436 Some(e) => {
437 let idx = self.files
438 .binary_search_by(|f| {
439 if f.smallest_key.as_slice() >= e {
440 Ordering::Greater
441 } else {
442 Ordering::Less
443 }
444 })
445 .unwrap_or_else(|i| i);
446 idx.min(self.files.len())
447 }
448 };
449
450 self.files[start_idx..end_idx].iter().collect()
451 }
452}
453
454pub struct RangeScanner {
456 config: ScanConfig,
458 tournament: TournamentTree,
460 sources: Vec<Box<dyn EntrySource>>,
462 last_key: Option<Vec<u8>>,
464 count: usize,
466 stats: ScanStats,
468}
469
470#[derive(Debug, Default, Clone)]
472pub struct ScanStats {
473 pub files_considered: usize,
475 pub files_skipped_range: usize,
477 pub files_skipped_timestamp: usize,
479 pub blocks_read: usize,
481 pub entries_scanned: usize,
483 pub entries_returned: usize,
485 pub tombstones_seen: usize,
487 pub duplicates_skipped: usize,
489}
490
491impl RangeScanner {
492 pub fn new(config: ScanConfig, sources: Vec<Box<dyn EntrySource>>) -> Self {
494 let num_sources = sources.len();
495 let mut scanner = Self {
496 config,
497 tournament: TournamentTree::new(num_sources),
498 sources,
499 last_key: None,
500 count: 0,
501 stats: ScanStats::default(),
502 };
503
504 if let Some(ref start) = scanner.config.start_key {
506 for source in &mut scanner.sources {
507 source.seek(start);
508 }
509 }
510
511 scanner.tournament.initialize(&scanner.sources);
513
514 scanner
515 }
516
517 pub fn next(&mut self) -> Option<VersionedEntry> {
519 loop {
520 if let Some(limit) = self.config.limit {
522 if self.count >= limit {
523 return None;
524 }
525 }
526
527 let entry = self.tournament.pop(&mut self.sources)?;
529 self.stats.entries_scanned += 1;
530
531 if let Some(ref end) = self.config.end_key {
533 if entry.key.as_slice() >= end.as_slice() {
534 return None;
535 }
536 }
537
538 if entry.timestamp > self.config.snapshot {
540 continue; }
542
543 if entry.is_tombstone() {
545 self.stats.tombstones_seen += 1;
546 if self.config.skip_tombstones {
547 if self.config.latest_only {
549 self.last_key = Some(entry.key);
550 }
551 continue;
552 }
553 }
554
555 if self.config.latest_only {
557 if let Some(ref last) = self.last_key {
558 if entry.key == *last {
559 self.stats.duplicates_skipped += 1;
560 continue;
561 }
562 }
563 self.last_key = Some(entry.key.clone());
564 }
565
566 self.count += 1;
567 self.stats.entries_returned += 1;
568 return Some(entry);
569 }
570 }
571
572 pub fn stats(&self) -> &ScanStats {
574 &self.stats
575 }
576
577 pub fn collect(mut self) -> Vec<VersionedEntry> {
579 let mut results = Vec::new();
580 while let Some(entry) = self.next() {
581 results.push(entry);
582 }
583 results
584 }
585}
586
587pub fn select_files_for_range(
589 levels: &[LevelFiles],
590 config: &ScanConfig,
591) -> (Vec<FileRange>, ScanStats) {
592 let mut selected = Vec::new();
593 let mut stats = ScanStats::default();
594
595 let start = config.start_key.as_deref();
596 let end = config.end_key.as_deref();
597
598 for level in levels {
599 for file in level.find_overlapping(start, end) {
600 stats.files_considered += 1;
601
602 if !file.has_visible_versions(config.snapshot) {
604 stats.files_skipped_timestamp += 1;
605 continue;
606 }
607
608 selected.push(file.clone());
609 }
610
611 let total_files = level.files.len();
613 let overlapping = level.find_overlapping(start, end).len();
614 stats.files_skipped_range += total_files - overlapping;
615 }
616
617 (selected, stats)
618}
619
620#[cfg(test)]
625mod tests {
626 use super::*;
627
628 struct VecSource {
630 entries: Vec<VersionedEntry>,
631 pos: usize,
632 priority: u32,
633 }
634
635 impl VecSource {
636 fn new(entries: Vec<VersionedEntry>, priority: u32) -> Self {
637 Self {
638 entries,
639 pos: 0,
640 priority,
641 }
642 }
643 }
644
645 impl EntrySource for VecSource {
646 fn current(&self) -> Option<&VersionedEntry> {
647 self.entries.get(self.pos)
648 }
649
650 fn next(&mut self) -> bool {
651 if self.pos < self.entries.len() {
652 self.pos += 1;
653 }
654 self.pos < self.entries.len()
655 }
656
657 fn seek(&mut self, key: &[u8]) {
658 self.pos = self.entries.iter().position(|e| e.key.as_slice() >= key).unwrap_or(self.entries.len());
659 }
660
661 fn exhausted(&self) -> bool {
662 self.pos >= self.entries.len()
663 }
664
665 fn priority(&self) -> u32 {
666 self.priority
667 }
668 }
669
670 fn make_entry(key: &str, value: &str, ts: u64) -> VersionedEntry {
671 VersionedEntry {
672 key: key.as_bytes().to_vec(),
673 value: Some(value.as_bytes().to_vec()),
674 timestamp: ts,
675 sequence: 0,
676 }
677 }
678
679 fn make_tombstone(key: &str, ts: u64) -> VersionedEntry {
680 VersionedEntry {
681 key: key.as_bytes().to_vec(),
682 value: None,
683 timestamp: ts,
684 sequence: 0,
685 }
686 }
687
688 #[test]
689 fn test_tournament_tree_single_source() {
690 let entries = vec![
691 make_entry("a", "1", 100),
692 make_entry("b", "2", 100),
693 make_entry("c", "3", 100),
694 ];
695
696 let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
697 let sources = vec![source];
698
699 let scanner = RangeScanner::new(ScanConfig::default(), sources);
700 let results = scanner.collect();
701
702 assert_eq!(results.len(), 3);
703 assert_eq!(results[0].key, b"a");
704 assert_eq!(results[1].key, b"b");
705 assert_eq!(results[2].key, b"c");
706 }
707
708 #[test]
709 fn test_tournament_tree_merge() {
710 let l0_entries = vec![
712 make_entry("a", "new", 200),
713 make_entry("c", "new", 200),
714 ];
715 let l1_entries = vec![
716 make_entry("a", "old", 100),
717 make_entry("b", "old", 100),
718 make_entry("c", "old", 100),
719 ];
720
721 let sources: Vec<Box<dyn EntrySource>> = vec![
722 Box::new(VecSource::new(l0_entries, 0)), Box::new(VecSource::new(l1_entries, 1)), ];
725
726 let config = ScanConfig {
727 snapshot: 250,
728 latest_only: true,
729 ..Default::default()
730 };
731
732 let scanner = RangeScanner::new(config, sources);
733 let results = scanner.collect();
734
735 assert_eq!(results.len(), 3);
736 assert_eq!(results[0].key, b"a");
738 assert_eq!(results[0].timestamp, 200);
739 assert_eq!(results[1].key, b"b");
740 assert_eq!(results[1].timestamp, 100);
741 assert_eq!(results[2].key, b"c");
742 assert_eq!(results[2].timestamp, 200);
743 }
744
745 #[test]
746 fn test_tombstone_handling() {
747 let entries = vec![
748 make_entry("a", "value", 100),
749 make_tombstone("b", 200),
750 make_entry("c", "value", 100),
751 ];
752
753 let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
754 let sources = vec![source];
755
756 let config = ScanConfig {
757 skip_tombstones: true,
758 ..Default::default()
759 };
760
761 let scanner = RangeScanner::new(config, sources);
762 let results = scanner.collect();
763
764 assert_eq!(results.len(), 2);
765 assert_eq!(results[0].key, b"a");
766 assert_eq!(results[1].key, b"c");
767 }
768
769 #[test]
770 fn test_range_bounds() {
771 let entries = vec![
772 make_entry("a", "1", 100),
773 make_entry("b", "2", 100),
774 make_entry("c", "3", 100),
775 make_entry("d", "4", 100),
776 ];
777
778 let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
779 let sources = vec![source];
780
781 let config = ScanConfig {
782 start_key: Some(b"b".to_vec()),
783 end_key: Some(b"d".to_vec()),
784 ..Default::default()
785 };
786
787 let scanner = RangeScanner::new(config, sources);
788 let results = scanner.collect();
789
790 assert_eq!(results.len(), 2);
791 assert_eq!(results[0].key, b"b");
792 assert_eq!(results[1].key, b"c");
793 }
794
795 #[test]
796 fn test_limit() {
797 let entries: Vec<_> = (0..100)
798 .map(|i| make_entry(&format!("key{:03}", i), &format!("val{}", i), 100))
799 .collect();
800
801 let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
802 let sources = vec![source];
803
804 let config = ScanConfig {
805 limit: Some(10),
806 ..Default::default()
807 };
808
809 let scanner = RangeScanner::new(config, sources);
810 let results = scanner.collect();
811
812 assert_eq!(results.len(), 10);
813 }
814
815 #[test]
816 fn test_mvcc_visibility() {
817 let entries = vec![
818 make_entry("a", "v1", 100),
819 make_entry("a", "v2", 200),
820 make_entry("a", "v3", 300),
821 ];
822
823 let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
824 let sources = vec![source];
825
826 let config = ScanConfig {
828 snapshot: 150,
829 latest_only: true,
830 ..Default::default()
831 };
832
833 let scanner = RangeScanner::new(config, sources);
834 let results = scanner.collect();
835
836 assert_eq!(results.len(), 1);
837 assert_eq!(results[0].timestamp, 100);
838 }
839
840 #[test]
841 fn test_file_range_overlap() {
842 let file = FileRange {
843 id: 1,
844 smallest_key: b"c".to_vec(),
845 largest_key: b"h".to_vec(),
846 min_timestamp: 100,
847 max_timestamp: 200,
848 num_entries: 100,
849 };
850
851 assert!(file.overlaps_range(Some(b"a"), Some(b"e")));
853 assert!(file.overlaps_range(Some(b"f"), Some(b"z")));
854 assert!(file.overlaps_range(Some(b"d"), Some(b"g")));
855 assert!(file.overlaps_range(None, Some(b"e")));
856 assert!(file.overlaps_range(Some(b"f"), None));
857 assert!(file.overlaps_range(None, None));
858
859 assert!(!file.overlaps_range(Some(b"a"), Some(b"c")));
861 assert!(!file.overlaps_range(Some(b"i"), Some(b"z")));
862 }
863
864 #[test]
865 fn test_level_binary_search() {
866 let level = LevelFiles {
867 level: 1,
868 files: vec![
869 FileRange {
870 id: 1,
871 smallest_key: b"a".to_vec(),
872 largest_key: b"d".to_vec(),
873 min_timestamp: 100,
874 max_timestamp: 200,
875 num_entries: 100,
876 },
877 FileRange {
878 id: 2,
879 smallest_key: b"e".to_vec(),
880 largest_key: b"h".to_vec(),
881 min_timestamp: 100,
882 max_timestamp: 200,
883 num_entries: 100,
884 },
885 FileRange {
886 id: 3,
887 smallest_key: b"i".to_vec(),
888 largest_key: b"l".to_vec(),
889 min_timestamp: 100,
890 max_timestamp: 200,
891 num_entries: 100,
892 },
893 ],
894 };
895
896 let result = level.find_overlapping(Some(b"c"), Some(b"g"));
898 assert_eq!(result.len(), 2);
899 assert_eq!(result[0].id, 1);
900 assert_eq!(result[1].id, 2);
901 }
902}