1use std::cmp::Ordering;
37use std::collections::BinaryHeap;
38use std::sync::Arc;
39
40use parking_lot::RwLock;
41
42pub type Timestamp = u64;
44
45#[derive(Debug, Clone)]
47pub struct VersionedEntry {
48 pub key: Vec<u8>,
50 pub value: Option<Vec<u8>>,
52 pub timestamp: Timestamp,
54 pub sequence: u64,
56}
57
58impl VersionedEntry {
59 pub fn is_tombstone(&self) -> bool {
61 self.value.is_none()
62 }
63}
64
65pub trait EntrySource: Send + Sync {
67 fn current(&self) -> Option<&VersionedEntry>;
69
70 fn next(&mut self) -> bool;
72
73 fn seek(&mut self, key: &[u8]);
75
76 fn exhausted(&self) -> bool;
78
79 fn priority(&self) -> u32;
81}
82
83struct TournamentNode {
85 source_idx: u32,
87 entry: Option<VersionedEntry>,
89}
90
91impl TournamentNode {
92 fn empty() -> Self {
93 Self {
94 source_idx: u32::MAX,
95 entry: None,
96 }
97 }
98
99 fn with_entry(source_idx: u32, entry: VersionedEntry) -> Self {
100 Self {
101 source_idx,
102 entry: Some(entry),
103 }
104 }
105
106 fn is_valid(&self) -> bool {
107 self.source_idx != u32::MAX && self.entry.is_some()
108 }
109
110 fn beats(&self, other: &TournamentNode, sources: &[Box<dyn EntrySource>]) -> bool {
113 match (&self.entry, &other.entry) {
114 (None, _) => false,
115 (_, None) => true,
116 (Some(a), Some(b)) => {
117 match a.key.cmp(&b.key) {
118 Ordering::Less => true,
119 Ordering::Greater => false,
120 Ordering::Equal => {
121 if a.timestamp != b.timestamp {
123 a.timestamp > b.timestamp
124 } else {
125 if a.sequence != b.sequence {
127 a.sequence > b.sequence
128 } else {
129 let a_priority = sources.get(self.source_idx as usize)
131 .map(|s| s.priority())
132 .unwrap_or(u32::MAX);
133 let b_priority = sources.get(other.source_idx as usize)
134 .map(|s| s.priority())
135 .unwrap_or(u32::MAX);
136 a_priority < b_priority
137 }
138 }
139 }
140 }
141 }
142 }
143 }
144}
145
146pub struct TournamentTree {
151 nodes: Vec<TournamentNode>,
153 num_sources: usize,
155}
156
157impl TournamentTree {
158 pub fn new(num_sources: usize) -> Self {
160 let tree_size = if num_sources == 0 { 0 } else { 2 * num_sources - 1 };
161 Self {
162 nodes: (0..tree_size).map(|_| TournamentNode::empty()).collect(),
163 num_sources,
164 }
165 }
166
167 pub fn initialize(&mut self, sources: &[Box<dyn EntrySource>]) {
169 if self.num_sources == 0 {
170 return;
171 }
172
173 let leaf_start = self.num_sources - 1;
175 for (i, source) in sources.iter().enumerate() {
176 let leaf_idx = leaf_start + i;
177 if leaf_idx < self.nodes.len() {
178 self.nodes[leaf_idx] = match source.current() {
179 Some(entry) => TournamentNode::with_entry(i as u32, entry.clone()),
180 None => TournamentNode::empty(),
181 };
182 }
183 }
184
185 if leaf_start > 0 {
187 for i in (0..leaf_start).rev() {
188 let left = 2 * i + 1;
189 let right = 2 * i + 2;
190 self.nodes[i] = self.compare_nodes(left, right, sources);
191 }
192 }
193 }
194
195 fn compare_nodes(
197 &self,
198 left_idx: usize,
199 right_idx: usize,
200 sources: &[Box<dyn EntrySource>],
201 ) -> TournamentNode {
202 let left = self.nodes.get(left_idx);
203 let right = self.nodes.get(right_idx);
204
205 match (left, right) {
206 (None, None) => TournamentNode::empty(),
207 (Some(l), None) => l.clone(),
208 (None, Some(r)) => r.clone(),
209 (Some(l), Some(r)) => {
210 if l.beats(r, sources) {
211 l.clone()
212 } else {
213 r.clone()
214 }
215 }
216 }
217 }
218
219 pub fn peek(&self) -> Option<&VersionedEntry> {
221 self.nodes.first().and_then(|n| n.entry.as_ref())
222 }
223
224 pub fn winner_source(&self) -> Option<u32> {
226 self.nodes.first().and_then(|n| {
227 if n.is_valid() {
228 Some(n.source_idx)
229 } else {
230 None
231 }
232 })
233 }
234
235 pub fn pop(&mut self, sources: &mut [Box<dyn EntrySource>]) -> Option<VersionedEntry> {
237 if self.num_sources == 0 {
238 return None;
239 }
240
241 let winner_source = self.winner_source()?;
242 let result = self.nodes[0].entry.clone();
243
244 if let Some(source) = sources.get_mut(winner_source as usize) {
246 source.next();
247 }
248
249 let leaf_idx = self.num_sources - 1 + winner_source as usize;
251 if leaf_idx < self.nodes.len() {
252 self.nodes[leaf_idx] = match sources.get(winner_source as usize).and_then(|s| s.current()) {
253 Some(entry) => TournamentNode::with_entry(winner_source, entry.clone()),
254 None => TournamentNode::empty(),
255 };
256 }
257
258 self.rebuild_path(leaf_idx, sources);
260
261 result
262 }
263
264 fn rebuild_path(&mut self, leaf_idx: usize, sources: &[Box<dyn EntrySource>]) {
266 let mut idx = leaf_idx;
267 while idx > 0 {
268 let parent = (idx - 1) / 2;
269 let left = 2 * parent + 1;
270 let right = 2 * parent + 2;
271 self.nodes[parent] = self.compare_nodes(left, right, sources);
272 idx = parent;
273 }
274 }
275}
276
277impl Clone for TournamentNode {
278 fn clone(&self) -> Self {
279 Self {
280 source_idx: self.source_idx,
281 entry: self.entry.clone(),
282 }
283 }
284}
285
286#[derive(Debug, Clone)]
288pub struct ScanConfig {
289 pub start_key: Option<Vec<u8>>,
291 pub end_key: Option<Vec<u8>>,
293 pub snapshot: Timestamp,
295 pub limit: Option<usize>,
297 pub skip_tombstones: bool,
299 pub latest_only: bool,
301}
302
303impl Default for ScanConfig {
304 fn default() -> Self {
305 Self {
306 start_key: None,
307 end_key: None,
308 snapshot: u64::MAX,
309 limit: None,
310 skip_tombstones: true,
311 latest_only: true,
312 }
313 }
314}
315
316impl ScanConfig {
317 pub fn full_scan() -> Self {
319 Self::default()
320 }
321
322 pub fn range(start: Vec<u8>, end: Vec<u8>) -> Self {
324 Self {
325 start_key: Some(start),
326 end_key: Some(end),
327 ..Default::default()
328 }
329 }
330
331 pub fn with_snapshot(mut self, ts: Timestamp) -> Self {
333 self.snapshot = ts;
334 self
335 }
336
337 pub fn with_limit(mut self, limit: usize) -> Self {
339 self.limit = Some(limit);
340 self
341 }
342}
343
344#[derive(Debug, Clone)]
346pub struct FileRange {
347 pub id: u64,
349 pub smallest_key: Vec<u8>,
351 pub largest_key: Vec<u8>,
353 pub min_timestamp: Timestamp,
355 pub max_timestamp: Timestamp,
357 pub num_entries: u64,
359}
360
361impl FileRange {
362 pub fn overlaps_range(&self, start: Option<&[u8]>, end: Option<&[u8]>) -> bool {
364 let key_overlaps = match (start, end) {
366 (None, None) => true,
367 (Some(s), None) => self.largest_key.as_slice() >= s,
368 (None, Some(e)) => self.smallest_key.as_slice() < e,
369 (Some(s), Some(e)) => {
370 self.largest_key.as_slice() >= s && self.smallest_key.as_slice() < e
371 }
372 };
373
374 key_overlaps
375 }
376
377 pub fn has_visible_versions(&self, snapshot: Timestamp) -> bool {
379 self.min_timestamp <= snapshot
380 }
381}
382
383#[derive(Debug, Clone)]
385pub struct LevelFiles {
386 pub level: u32,
388 pub files: Vec<FileRange>,
390}
391
392impl LevelFiles {
393 pub fn find_overlapping(&self, start: Option<&[u8]>, end: Option<&[u8]>) -> Vec<&FileRange> {
395 if self.level == 0 {
396 self.files
398 .iter()
399 .filter(|f| f.overlaps_range(start, end))
400 .collect()
401 } else {
402 self.binary_search_range(start, end)
405 }
406 }
407
408 fn binary_search_range(&self, start: Option<&[u8]>, end: Option<&[u8]>) -> Vec<&FileRange> {
410 if self.files.is_empty() {
411 return vec![];
412 }
413
414 let start_idx = match start {
416 None => 0,
417 Some(s) => {
418 self.files
419 .binary_search_by(|f| {
420 if f.largest_key.as_slice() < s {
421 Ordering::Less
422 } else {
423 Ordering::Greater
424 }
425 })
426 .unwrap_or_else(|i| i)
427 }
428 };
429
430 let end_idx = match end {
432 None => self.files.len(),
433 Some(e) => {
434 let idx = self.files
435 .binary_search_by(|f| {
436 if f.smallest_key.as_slice() >= e {
437 Ordering::Greater
438 } else {
439 Ordering::Less
440 }
441 })
442 .unwrap_or_else(|i| i);
443 idx.min(self.files.len())
444 }
445 };
446
447 self.files[start_idx..end_idx].iter().collect()
448 }
449}
450
451pub struct RangeScanner {
453 config: ScanConfig,
455 tournament: TournamentTree,
457 sources: Vec<Box<dyn EntrySource>>,
459 last_key: Option<Vec<u8>>,
461 count: usize,
463 stats: ScanStats,
465}
466
467#[derive(Debug, Default, Clone)]
469pub struct ScanStats {
470 pub files_considered: usize,
472 pub files_skipped_range: usize,
474 pub files_skipped_timestamp: usize,
476 pub blocks_read: usize,
478 pub entries_scanned: usize,
480 pub entries_returned: usize,
482 pub tombstones_seen: usize,
484 pub duplicates_skipped: usize,
486}
487
488impl RangeScanner {
489 pub fn new(config: ScanConfig, sources: Vec<Box<dyn EntrySource>>) -> Self {
491 let num_sources = sources.len();
492 let mut scanner = Self {
493 config,
494 tournament: TournamentTree::new(num_sources),
495 sources,
496 last_key: None,
497 count: 0,
498 stats: ScanStats::default(),
499 };
500
501 if let Some(ref start) = scanner.config.start_key {
503 for source in &mut scanner.sources {
504 source.seek(start);
505 }
506 }
507
508 scanner.tournament.initialize(&scanner.sources);
510
511 scanner
512 }
513
514 pub fn next(&mut self) -> Option<VersionedEntry> {
516 loop {
517 if let Some(limit) = self.config.limit {
519 if self.count >= limit {
520 return None;
521 }
522 }
523
524 let entry = self.tournament.pop(&mut self.sources)?;
526 self.stats.entries_scanned += 1;
527
528 if let Some(ref end) = self.config.end_key {
530 if entry.key.as_slice() >= end.as_slice() {
531 return None;
532 }
533 }
534
535 if entry.timestamp > self.config.snapshot {
537 continue; }
539
540 if entry.is_tombstone() {
542 self.stats.tombstones_seen += 1;
543 if self.config.skip_tombstones {
544 if self.config.latest_only {
546 self.last_key = Some(entry.key);
547 }
548 continue;
549 }
550 }
551
552 if self.config.latest_only {
554 if let Some(ref last) = self.last_key {
555 if entry.key == *last {
556 self.stats.duplicates_skipped += 1;
557 continue;
558 }
559 }
560 self.last_key = Some(entry.key.clone());
561 }
562
563 self.count += 1;
564 self.stats.entries_returned += 1;
565 return Some(entry);
566 }
567 }
568
569 pub fn stats(&self) -> &ScanStats {
571 &self.stats
572 }
573
574 pub fn collect(mut self) -> Vec<VersionedEntry> {
576 let mut results = Vec::new();
577 while let Some(entry) = self.next() {
578 results.push(entry);
579 }
580 results
581 }
582}
583
584pub fn select_files_for_range(
586 levels: &[LevelFiles],
587 config: &ScanConfig,
588) -> (Vec<FileRange>, ScanStats) {
589 let mut selected = Vec::new();
590 let mut stats = ScanStats::default();
591
592 let start = config.start_key.as_deref();
593 let end = config.end_key.as_deref();
594
595 for level in levels {
596 for file in level.find_overlapping(start, end) {
597 stats.files_considered += 1;
598
599 if !file.has_visible_versions(config.snapshot) {
601 stats.files_skipped_timestamp += 1;
602 continue;
603 }
604
605 selected.push(file.clone());
606 }
607
608 let total_files = level.files.len();
610 let overlapping = level.find_overlapping(start, end).len();
611 stats.files_skipped_range += total_files - overlapping;
612 }
613
614 (selected, stats)
615}
616
617#[cfg(test)]
622mod tests {
623 use super::*;
624
625 struct VecSource {
627 entries: Vec<VersionedEntry>,
628 pos: usize,
629 priority: u32,
630 }
631
632 impl VecSource {
633 fn new(entries: Vec<VersionedEntry>, priority: u32) -> Self {
634 Self {
635 entries,
636 pos: 0,
637 priority,
638 }
639 }
640 }
641
642 impl EntrySource for VecSource {
643 fn current(&self) -> Option<&VersionedEntry> {
644 self.entries.get(self.pos)
645 }
646
647 fn next(&mut self) -> bool {
648 if self.pos < self.entries.len() {
649 self.pos += 1;
650 }
651 self.pos < self.entries.len()
652 }
653
654 fn seek(&mut self, key: &[u8]) {
655 self.pos = self.entries.iter().position(|e| e.key.as_slice() >= key).unwrap_or(self.entries.len());
656 }
657
658 fn exhausted(&self) -> bool {
659 self.pos >= self.entries.len()
660 }
661
662 fn priority(&self) -> u32 {
663 self.priority
664 }
665 }
666
667 fn make_entry(key: &str, value: &str, ts: u64) -> VersionedEntry {
668 VersionedEntry {
669 key: key.as_bytes().to_vec(),
670 value: Some(value.as_bytes().to_vec()),
671 timestamp: ts,
672 sequence: 0,
673 }
674 }
675
676 fn make_tombstone(key: &str, ts: u64) -> VersionedEntry {
677 VersionedEntry {
678 key: key.as_bytes().to_vec(),
679 value: None,
680 timestamp: ts,
681 sequence: 0,
682 }
683 }
684
685 #[test]
686 fn test_tournament_tree_single_source() {
687 let entries = vec![
688 make_entry("a", "1", 100),
689 make_entry("b", "2", 100),
690 make_entry("c", "3", 100),
691 ];
692
693 let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
694 let sources = vec![source];
695
696 let scanner = RangeScanner::new(ScanConfig::default(), sources);
697 let results = scanner.collect();
698
699 assert_eq!(results.len(), 3);
700 assert_eq!(results[0].key, b"a");
701 assert_eq!(results[1].key, b"b");
702 assert_eq!(results[2].key, b"c");
703 }
704
705 #[test]
706 fn test_tournament_tree_merge() {
707 let l0_entries = vec![
709 make_entry("a", "new", 200),
710 make_entry("c", "new", 200),
711 ];
712 let l1_entries = vec![
713 make_entry("a", "old", 100),
714 make_entry("b", "old", 100),
715 make_entry("c", "old", 100),
716 ];
717
718 let sources: Vec<Box<dyn EntrySource>> = vec![
719 Box::new(VecSource::new(l0_entries, 0)), Box::new(VecSource::new(l1_entries, 1)), ];
722
723 let config = ScanConfig {
724 snapshot: 250,
725 latest_only: true,
726 ..Default::default()
727 };
728
729 let scanner = RangeScanner::new(config, sources);
730 let results = scanner.collect();
731
732 assert_eq!(results.len(), 3);
733 assert_eq!(results[0].key, b"a");
735 assert_eq!(results[0].timestamp, 200);
736 assert_eq!(results[1].key, b"b");
737 assert_eq!(results[1].timestamp, 100);
738 assert_eq!(results[2].key, b"c");
739 assert_eq!(results[2].timestamp, 200);
740 }
741
742 #[test]
743 fn test_tombstone_handling() {
744 let entries = vec![
745 make_entry("a", "value", 100),
746 make_tombstone("b", 200),
747 make_entry("c", "value", 100),
748 ];
749
750 let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
751 let sources = vec![source];
752
753 let config = ScanConfig {
754 skip_tombstones: true,
755 ..Default::default()
756 };
757
758 let scanner = RangeScanner::new(config, sources);
759 let results = scanner.collect();
760
761 assert_eq!(results.len(), 2);
762 assert_eq!(results[0].key, b"a");
763 assert_eq!(results[1].key, b"c");
764 }
765
766 #[test]
767 fn test_range_bounds() {
768 let entries = vec![
769 make_entry("a", "1", 100),
770 make_entry("b", "2", 100),
771 make_entry("c", "3", 100),
772 make_entry("d", "4", 100),
773 ];
774
775 let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
776 let sources = vec![source];
777
778 let config = ScanConfig {
779 start_key: Some(b"b".to_vec()),
780 end_key: Some(b"d".to_vec()),
781 ..Default::default()
782 };
783
784 let scanner = RangeScanner::new(config, sources);
785 let results = scanner.collect();
786
787 assert_eq!(results.len(), 2);
788 assert_eq!(results[0].key, b"b");
789 assert_eq!(results[1].key, b"c");
790 }
791
792 #[test]
793 fn test_limit() {
794 let entries: Vec<_> = (0..100)
795 .map(|i| make_entry(&format!("key{:03}", i), &format!("val{}", i), 100))
796 .collect();
797
798 let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
799 let sources = vec![source];
800
801 let config = ScanConfig {
802 limit: Some(10),
803 ..Default::default()
804 };
805
806 let scanner = RangeScanner::new(config, sources);
807 let results = scanner.collect();
808
809 assert_eq!(results.len(), 10);
810 }
811
812 #[test]
813 fn test_mvcc_visibility() {
814 let entries = vec![
815 make_entry("a", "v1", 100),
816 make_entry("a", "v2", 200),
817 make_entry("a", "v3", 300),
818 ];
819
820 let source: Box<dyn EntrySource> = Box::new(VecSource::new(entries, 0));
821 let sources = vec![source];
822
823 let config = ScanConfig {
825 snapshot: 150,
826 latest_only: true,
827 ..Default::default()
828 };
829
830 let scanner = RangeScanner::new(config, sources);
831 let results = scanner.collect();
832
833 assert_eq!(results.len(), 1);
834 assert_eq!(results[0].timestamp, 100);
835 }
836
837 #[test]
838 fn test_file_range_overlap() {
839 let file = FileRange {
840 id: 1,
841 smallest_key: b"c".to_vec(),
842 largest_key: b"h".to_vec(),
843 min_timestamp: 100,
844 max_timestamp: 200,
845 num_entries: 100,
846 };
847
848 assert!(file.overlaps_range(Some(b"a"), Some(b"e")));
850 assert!(file.overlaps_range(Some(b"f"), Some(b"z")));
851 assert!(file.overlaps_range(Some(b"d"), Some(b"g")));
852 assert!(file.overlaps_range(None, Some(b"e")));
853 assert!(file.overlaps_range(Some(b"f"), None));
854 assert!(file.overlaps_range(None, None));
855
856 assert!(!file.overlaps_range(Some(b"a"), Some(b"c")));
858 assert!(!file.overlaps_range(Some(b"i"), Some(b"z")));
859 }
860
861 #[test]
862 fn test_level_binary_search() {
863 let level = LevelFiles {
864 level: 1,
865 files: vec![
866 FileRange {
867 id: 1,
868 smallest_key: b"a".to_vec(),
869 largest_key: b"d".to_vec(),
870 min_timestamp: 100,
871 max_timestamp: 200,
872 num_entries: 100,
873 },
874 FileRange {
875 id: 2,
876 smallest_key: b"e".to_vec(),
877 largest_key: b"h".to_vec(),
878 min_timestamp: 100,
879 max_timestamp: 200,
880 num_entries: 100,
881 },
882 FileRange {
883 id: 3,
884 smallest_key: b"i".to_vec(),
885 largest_key: b"l".to_vec(),
886 min_timestamp: 100,
887 max_timestamp: 200,
888 num_entries: 100,
889 },
890 ],
891 };
892
893 let result = level.find_overlapping(Some(b"c"), Some(b"g"));
895 assert_eq!(result.len(), 2);
896 assert_eq!(result[0].id, 1);
897 assert_eq!(result[1].id, 2);
898 }
899}