1use std::borrow::Cow;
33use std::cmp::Ordering;
34use std::collections::BinaryHeap;
35
36#[derive(Debug, Clone)]
38pub struct Entry<'a> {
39 pub key: Cow<'a, [u8]>,
40 pub value: Cow<'a, [u8]>,
41 pub timestamp: u64,
42 pub is_tombstone: bool,
43}
44
45impl<'a> Entry<'a> {
46 pub fn new(
48 key: impl Into<Cow<'a, [u8]>>,
49 value: impl Into<Cow<'a, [u8]>>,
50 timestamp: u64,
51 ) -> Self {
52 let value = value.into();
53 let is_tombstone = value.is_empty();
54 Self {
55 key: key.into(),
56 value,
57 timestamp,
58 is_tombstone,
59 }
60 }
61
62 pub fn tombstone(key: impl Into<Cow<'a, [u8]>>, timestamp: u64) -> Self {
64 Self {
65 key: key.into(),
66 value: Cow::Borrowed(&[]),
67 timestamp,
68 is_tombstone: true,
69 }
70 }
71
72 pub fn into_owned(self) -> Entry<'static> {
74 Entry {
75 key: Cow::Owned(self.key.into_owned()),
76 value: Cow::Owned(self.value.into_owned()),
77 timestamp: self.timestamp,
78 is_tombstone: self.is_tombstone,
79 }
80 }
81}
82
83pub trait EntryIterator<'a>: Send {
85 fn peek(&self) -> Option<&Entry<'a>>;
87
88 fn advance(&mut self);
90
91 fn is_exhausted(&self) -> bool;
93
94 fn source_priority(&self) -> u8;
96}
97
98struct PeekableSource<'a> {
100 source: Box<dyn EntryIterator<'a> + 'a>,
101 priority: u8,
102}
103
104impl<'a> PeekableSource<'a> {
105 fn new(source: Box<dyn EntryIterator<'a> + 'a>) -> Self {
106 let priority = source.source_priority();
107 Self { source, priority }
108 }
109}
110
111impl<'a> PartialEq for PeekableSource<'a> {
112 fn eq(&self, other: &Self) -> bool {
113 match (self.source.peek(), other.source.peek()) {
114 (Some(a), Some(b)) => a.key == b.key && self.priority == other.priority,
115 (None, None) => true,
116 _ => false,
117 }
118 }
119}
120
121impl<'a> Eq for PeekableSource<'a> {}
122
123impl<'a> PartialOrd for PeekableSource<'a> {
124 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
125 Some(self.cmp(other))
126 }
127}
128
129impl<'a> Ord for PeekableSource<'a> {
130 fn cmp(&self, other: &Self) -> Ordering {
131 match (self.source.peek(), other.source.peek()) {
132 (Some(a), Some(b)) => {
133 match b.key.cmp(&a.key) {
136 Ordering::Equal => other.priority.cmp(&self.priority),
137 ord => ord,
138 }
139 }
140 (Some(_), None) => Ordering::Greater, (None, Some(_)) => Ordering::Less,
142 (None, None) => Ordering::Equal,
143 }
144 }
145}
146
147pub struct MergeIterator<'a> {
149 heap: BinaryHeap<PeekableSource<'a>>,
151 current_key: Option<Vec<u8>>,
153 stats: ScanStats,
155}
156
157impl<'a> MergeIterator<'a> {
158 pub fn new(sources: Vec<Box<dyn EntryIterator<'a> + 'a>>) -> Self {
160 let mut heap = BinaryHeap::with_capacity(sources.len());
161
162 for source in sources {
163 let peekable = PeekableSource::new(source);
164 if !peekable.source.is_exhausted() {
165 heap.push(peekable);
166 }
167 }
168
169 Self {
170 heap,
171 current_key: None,
172 stats: ScanStats::default(),
173 }
174 }
175
176 pub fn stats(&self) -> &ScanStats {
178 &self.stats
179 }
180}
181
182impl<'a> Iterator for MergeIterator<'a> {
183 type Item = Entry<'a>;
184
185 fn next(&mut self) -> Option<Self::Item> {
186 loop {
187 let mut source = self.heap.pop()?;
189
190 let entry = source.source.peek()?.clone();
192
193 source.source.advance();
195
196 if !source.source.is_exhausted() {
198 self.heap.push(source);
199 }
200
201 self.stats.entries_scanned += 1;
202
203 if let Some(ref current) = self.current_key
205 && current.as_slice() == entry.key.as_ref()
206 {
207 self.stats.duplicates_skipped += 1;
208 continue;
209 }
210
211 self.current_key = Some(entry.key.to_vec());
212
213 if entry.is_tombstone {
215 self.stats.tombstones_skipped += 1;
216 continue;
217 }
218
219 self.stats.entries_returned += 1;
220 return Some(entry);
221 }
222 }
223}
224
225#[derive(Debug, Default, Clone)]
227pub struct ScanStats {
228 pub entries_scanned: u64,
229 pub entries_returned: u64,
230 pub duplicates_skipped: u64,
231 pub tombstones_skipped: u64,
232}
233
234pub struct VecIterator<'a> {
236 entries: Vec<Entry<'a>>,
237 position: usize,
238 priority: u8,
239}
240
241impl<'a> VecIterator<'a> {
242 pub fn new(entries: Vec<Entry<'a>>, priority: u8) -> Self {
243 Self {
244 entries,
245 position: 0,
246 priority,
247 }
248 }
249}
250
251impl<'a> EntryIterator<'a> for VecIterator<'a> {
252 fn peek(&self) -> Option<&Entry<'a>> {
253 self.entries.get(self.position)
254 }
255
256 fn advance(&mut self) {
257 self.position += 1;
258 }
259
260 fn is_exhausted(&self) -> bool {
261 self.position >= self.entries.len()
262 }
263
264 fn source_priority(&self) -> u8 {
265 self.priority
266 }
267}
268
269#[allow(dead_code)]
271pub struct RangeIterator<'a, I: EntryIterator<'a>> {
272 inner: I,
273 start_key: Vec<u8>,
274 end_key: Vec<u8>,
275 started: bool,
276 ended: bool,
277 _marker: std::marker::PhantomData<&'a ()>,
278}
279
280impl<'a, I: EntryIterator<'a>> RangeIterator<'a, I> {
281 pub fn new(inner: I, start_key: Vec<u8>, end_key: Vec<u8>) -> Self {
282 Self {
283 inner,
284 start_key,
285 end_key,
286 started: false,
287 ended: false,
288 _marker: std::marker::PhantomData,
289 }
290 }
291}
292
293impl<'a, I: EntryIterator<'a> + Send> EntryIterator<'a> for RangeIterator<'a, I> {
294 fn peek(&self) -> Option<&Entry<'a>> {
295 if self.ended {
296 return None;
297 }
298
299 let entry = self.inner.peek()?;
300
301 if entry.key.as_ref() > self.end_key.as_slice() {
303 return None;
304 }
305
306 Some(entry)
307 }
308
309 fn advance(&mut self) {
310 if self.ended {
311 return;
312 }
313
314 self.inner.advance();
315
316 if let Some(entry) = self.inner.peek()
318 && entry.key.as_ref() > self.end_key.as_slice()
319 {
320 self.ended = true;
321 }
322 }
323
324 fn is_exhausted(&self) -> bool {
325 self.ended || self.inner.is_exhausted()
326 }
327
328 fn source_priority(&self) -> u8 {
329 self.inner.source_priority()
330 }
331}
332
333pub struct LimitIterator<'a, I: Iterator<Item = Entry<'a>>> {
335 inner: I,
336 limit: usize,
337 count: usize,
338}
339
340impl<'a, I: Iterator<Item = Entry<'a>>> LimitIterator<'a, I> {
341 pub fn new(inner: I, limit: usize) -> Self {
342 Self {
343 inner,
344 limit,
345 count: 0,
346 }
347 }
348}
349
350impl<'a, I: Iterator<Item = Entry<'a>>> Iterator for LimitIterator<'a, I> {
351 type Item = Entry<'a>;
352
353 fn next(&mut self) -> Option<Self::Item> {
354 if self.count >= self.limit {
355 return None;
356 }
357 self.count += 1;
358 self.inner.next()
359 }
360}
361
362pub struct FilterIterator<'a, I, F>
364where
365 I: Iterator<Item = Entry<'a>>,
366 F: Fn(&Entry<'a>) -> bool,
367{
368 inner: I,
369 predicate: F,
370}
371
372impl<'a, I, F> FilterIterator<'a, I, F>
373where
374 I: Iterator<Item = Entry<'a>>,
375 F: Fn(&Entry<'a>) -> bool,
376{
377 pub fn new(inner: I, predicate: F) -> Self {
378 Self { inner, predicate }
379 }
380}
381
382impl<'a, I, F> Iterator for FilterIterator<'a, I, F>
383where
384 I: Iterator<Item = Entry<'a>>,
385 F: Fn(&Entry<'a>) -> bool,
386{
387 type Item = Entry<'a>;
388
389 fn next(&mut self) -> Option<Self::Item> {
390 loop {
391 let entry = self.inner.next()?;
392 if (self.predicate)(&entry) {
393 return Some(entry);
394 }
395 }
396 }
397}
398
399pub struct MemtableIterator<'a> {
401 entries: Vec<Entry<'a>>,
402 position: usize,
403}
404
405impl<'a> MemtableIterator<'a> {
406 pub fn new(entries: Vec<Entry<'a>>) -> Self {
407 Self {
408 entries,
409 position: 0,
410 }
411 }
412
413 pub fn from_btree<K, V>(tree: &'a std::collections::BTreeMap<K, V>, timestamp: u64) -> Self
414 where
415 K: AsRef<[u8]>,
416 V: AsRef<[u8]>,
417 {
418 let entries: Vec<_> = tree
419 .iter()
420 .map(|(k, v)| {
421 Entry::new(
422 Cow::Borrowed(k.as_ref()),
423 Cow::Borrowed(v.as_ref()),
424 timestamp,
425 )
426 })
427 .collect();
428
429 Self::new(entries)
430 }
431}
432
433impl<'a> EntryIterator<'a> for MemtableIterator<'a> {
434 fn peek(&self) -> Option<&Entry<'a>> {
435 self.entries.get(self.position)
436 }
437
438 fn advance(&mut self) {
439 self.position += 1;
440 }
441
442 fn is_exhausted(&self) -> bool {
443 self.position >= self.entries.len()
444 }
445
446 fn source_priority(&self) -> u8 {
447 0 }
449}
450
451pub struct SstIterator<'a> {
453 data: &'a [u8],
455 position: usize,
457 current: Option<Entry<'a>>,
459 end_key: Option<Vec<u8>>,
461 level: u8,
463}
464
465impl<'a> SstIterator<'a> {
466 pub fn new(data: &'a [u8], level: u8) -> Self {
467 let mut iter = Self {
468 data,
469 position: 0,
470 current: None,
471 end_key: None,
472 level,
473 };
474 iter.read_next();
475 iter
476 }
477
478 pub fn with_end_key(mut self, end_key: Vec<u8>) -> Self {
479 self.end_key = Some(end_key);
480 self
481 }
482
483 fn read_next(&mut self) {
484 if self.position >= self.data.len() {
485 self.current = None;
486 return;
487 }
488
489 if self.position + 12 > self.data.len() {
491 self.current = None;
492 return;
493 }
494
495 let key_len =
496 u16::from_le_bytes([self.data[self.position], self.data[self.position + 1]]) as usize;
497 let value_len =
498 u16::from_le_bytes([self.data[self.position + 2], self.data[self.position + 3]])
499 as usize;
500 let timestamp = u64::from_le_bytes([
501 self.data[self.position + 4],
502 self.data[self.position + 5],
503 self.data[self.position + 6],
504 self.data[self.position + 7],
505 self.data[self.position + 8],
506 self.data[self.position + 9],
507 self.data[self.position + 10],
508 self.data[self.position + 11],
509 ]);
510
511 let key_start = self.position + 12;
512 let key_end = key_start + key_len;
513 let value_end = key_end + value_len;
514
515 if value_end > self.data.len() {
516 self.current = None;
517 return;
518 }
519
520 let key = &self.data[key_start..key_end];
521 let value = &self.data[key_end..value_end];
522
523 if let Some(ref end_key) = self.end_key
525 && key > end_key.as_slice()
526 {
527 self.current = None;
528 return;
529 }
530
531 self.current = Some(Entry::new(
532 Cow::Borrowed(key),
533 Cow::Borrowed(value),
534 timestamp,
535 ));
536 self.position = value_end;
537 }
538}
539
540impl<'a> EntryIterator<'a> for SstIterator<'a> {
541 fn peek(&self) -> Option<&Entry<'a>> {
542 self.current.as_ref()
543 }
544
545 fn advance(&mut self) {
546 self.read_next();
547 }
548
549 fn is_exhausted(&self) -> bool {
550 self.current.is_none()
551 }
552
553 fn source_priority(&self) -> u8 {
554 self.level + 1
557 }
558}
559
560pub struct SstBuilder {
562 data: Vec<u8>,
563}
564
565impl Default for SstBuilder {
566 fn default() -> Self {
567 Self::new()
568 }
569}
570
571impl SstBuilder {
572 pub fn new() -> Self {
573 Self { data: Vec::new() }
574 }
575
576 pub fn add(&mut self, key: &[u8], value: &[u8], timestamp: u64) -> &mut Self {
577 self.data
578 .extend_from_slice(&(key.len() as u16).to_le_bytes());
579 self.data
580 .extend_from_slice(&(value.len() as u16).to_le_bytes());
581 self.data.extend_from_slice(×tamp.to_le_bytes());
582 self.data.extend_from_slice(key);
583 self.data.extend_from_slice(value);
584 self
585 }
586
587 pub fn build(self) -> Vec<u8> {
588 self.data
589 }
590}
591
592pub trait IteratorExt<'a>: Iterator<Item = Entry<'a>> + Sized {
594 fn limit(self, n: usize) -> LimitIterator<'a, Self> {
595 LimitIterator::new(self, n)
596 }
597
598 fn filter_entries<F>(self, predicate: F) -> FilterIterator<'a, Self, F>
599 where
600 F: Fn(&Entry<'a>) -> bool,
601 {
602 FilterIterator::new(self, predicate)
603 }
604}
605
606impl<'a, I: Iterator<Item = Entry<'a>>> IteratorExt<'a> for I {}
607
608#[cfg(test)]
609mod tests {
610 use super::*;
611
612 fn make_entry(key: &[u8], value: &[u8], ts: u64) -> Entry<'static> {
613 Entry {
614 key: Cow::Owned(key.to_vec()),
615 value: Cow::Owned(value.to_vec()),
616 timestamp: ts,
617 is_tombstone: value.is_empty(),
618 }
619 }
620
621 #[test]
622 fn test_vec_iterator() {
623 let entries = vec![
624 make_entry(b"a", b"1", 100),
625 make_entry(b"b", b"2", 100),
626 make_entry(b"c", b"3", 100),
627 ];
628
629 let iter = VecIterator::new(entries, 0);
630 assert!(!iter.is_exhausted());
631 assert_eq!(iter.peek().unwrap().key.as_ref(), b"a");
632 }
633
634 #[test]
635 fn test_merge_iterator_single_source() {
636 let entries = vec![
637 make_entry(b"a", b"1", 100),
638 make_entry(b"b", b"2", 100),
639 make_entry(b"c", b"3", 100),
640 ];
641
642 let source: Box<dyn EntryIterator<'static> + 'static> =
643 Box::new(VecIterator::new(entries, 0));
644 let mut merge = MergeIterator::new(vec![source]);
645
646 let result: Vec<_> = merge.by_ref().collect();
647 assert_eq!(result.len(), 3);
648 assert_eq!(result[0].key.as_ref(), b"a");
649 assert_eq!(result[1].key.as_ref(), b"b");
650 assert_eq!(result[2].key.as_ref(), b"c");
651 }
652
653 #[test]
654 fn test_merge_iterator_multiple_sources() {
655 let source1: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
656 vec![
657 make_entry(b"a", b"1", 100),
658 make_entry(b"c", b"3", 100),
659 make_entry(b"e", b"5", 100),
660 ],
661 0,
662 ));
663
664 let source2: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
665 vec![
666 make_entry(b"b", b"2", 100),
667 make_entry(b"d", b"4", 100),
668 make_entry(b"f", b"6", 100),
669 ],
670 1,
671 ));
672
673 let mut merge = MergeIterator::new(vec![source1, source2]);
674 let result: Vec<_> = merge.by_ref().collect();
675
676 assert_eq!(result.len(), 6);
677 assert_eq!(result[0].key.as_ref(), b"a");
678 assert_eq!(result[1].key.as_ref(), b"b");
679 assert_eq!(result[2].key.as_ref(), b"c");
680 assert_eq!(result[3].key.as_ref(), b"d");
681 assert_eq!(result[4].key.as_ref(), b"e");
682 assert_eq!(result[5].key.as_ref(), b"f");
683 }
684
685 #[test]
686 fn test_merge_iterator_deduplication() {
687 let source1: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
689 vec![
690 make_entry(b"a", b"new_value", 200), ],
692 0,
693 ));
694
695 let source2: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
697 vec![
698 make_entry(b"a", b"old_value", 100), make_entry(b"b", b"2", 100),
700 ],
701 1,
702 ));
703
704 let mut merge = MergeIterator::new(vec![source1, source2]);
705 let result: Vec<_> = merge.by_ref().collect();
706
707 assert_eq!(result.len(), 2);
709 assert_eq!(result[0].key.as_ref(), b"a");
710 assert_eq!(result[0].value.as_ref(), b"new_value");
711 assert_eq!(result[1].key.as_ref(), b"b");
712
713 assert_eq!(merge.stats().duplicates_skipped, 1);
715 }
716
717 #[test]
718 fn test_merge_iterator_tombstones() {
719 let source1: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
720 vec![
721 Entry::tombstone(Cow::Owned(b"a".to_vec()), 200), ],
723 0,
724 ));
725
726 let source2: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
727 vec![
728 make_entry(b"a", b"old_value", 100),
729 make_entry(b"b", b"2", 100),
730 ],
731 1,
732 ));
733
734 let mut merge = MergeIterator::new(vec![source1, source2]);
735 let result: Vec<_> = merge.by_ref().collect();
736
737 assert_eq!(result.len(), 1);
739 assert_eq!(result[0].key.as_ref(), b"b");
740
741 assert_eq!(merge.stats().tombstones_skipped, 1);
742 }
743
744 #[test]
745 fn test_limit_iterator() {
746 let entries = vec![
747 make_entry(b"a", b"1", 100),
748 make_entry(b"b", b"2", 100),
749 make_entry(b"c", b"3", 100),
750 make_entry(b"d", b"4", 100),
751 make_entry(b"e", b"5", 100),
752 ];
753
754 let source: Box<dyn EntryIterator<'static> + 'static> =
755 Box::new(VecIterator::new(entries, 0));
756 let merge = MergeIterator::new(vec![source]);
757
758 let result: Vec<_> = merge.limit(3).collect();
759
760 assert_eq!(result.len(), 3);
761 assert_eq!(result[0].key.as_ref(), b"a");
762 assert_eq!(result[2].key.as_ref(), b"c");
763 }
764
765 #[test]
766 fn test_filter_iterator() {
767 let entries = vec![
768 make_entry(b"a", b"1", 100),
769 make_entry(b"b", b"2", 100),
770 make_entry(b"c", b"3", 100),
771 make_entry(b"d", b"4", 100),
772 ];
773
774 let source: Box<dyn EntryIterator<'static> + 'static> =
775 Box::new(VecIterator::new(entries, 0));
776 let merge = MergeIterator::new(vec![source]);
777
778 let result: Vec<_> = merge
780 .filter_entries(|e| e.key.as_ref() < b"c".as_slice())
781 .collect();
782
783 assert_eq!(result.len(), 2);
784 assert_eq!(result[0].key.as_ref(), b"a");
785 assert_eq!(result[1].key.as_ref(), b"b");
786 }
787
788 #[test]
789 fn test_sst_builder_and_iterator() {
790 let mut builder = SstBuilder::new();
791 builder
792 .add(b"apple", b"red", 100)
793 .add(b"banana", b"yellow", 100)
794 .add(b"cherry", b"red", 100);
795
796 let data = builder.build();
797 let iter = SstIterator::new(&data, 0);
798
799 assert!(!iter.is_exhausted());
800 assert_eq!(iter.peek().unwrap().key.as_ref(), b"apple");
801 }
802
803 #[test]
804 fn test_sst_iterator_full() {
805 let mut builder = SstBuilder::new();
806 builder
807 .add(b"a", b"1", 100)
808 .add(b"b", b"2", 200)
809 .add(b"c", b"3", 300);
810
811 let data = builder.build();
812 let mut iter = SstIterator::new(&data, 0);
813
814 let mut results = Vec::new();
815 while !iter.is_exhausted() {
816 results.push(iter.peek().unwrap().clone());
817 iter.advance();
818 }
819
820 assert_eq!(results.len(), 3);
821 assert_eq!(results[0].key.as_ref(), b"a");
822 assert_eq!(results[0].timestamp, 100);
823 assert_eq!(results[1].key.as_ref(), b"b");
824 assert_eq!(results[1].timestamp, 200);
825 assert_eq!(results[2].key.as_ref(), b"c");
826 assert_eq!(results[2].timestamp, 300);
827 }
828
829 #[test]
830 fn test_sst_with_merge() {
831 let mut builder1 = SstBuilder::new();
832 builder1.add(b"a", b"1", 100).add(b"c", b"3", 100);
833 let data1 = builder1.build();
834
835 let mut builder2 = SstBuilder::new();
836 builder2.add(b"b", b"2", 100).add(b"d", b"4", 100);
837 let data2 = builder2.build();
838
839 let data1_static: &'static [u8] = Box::leak(data1.into_boxed_slice());
842 let data2_static: &'static [u8] = Box::leak(data2.into_boxed_slice());
843
844 let source1: Box<dyn EntryIterator<'static> + 'static> =
845 Box::new(SstIterator::new(data1_static, 0));
846 let source2: Box<dyn EntryIterator<'static> + 'static> =
847 Box::new(SstIterator::new(data2_static, 1));
848
849 let mut merge = MergeIterator::new(vec![source1, source2]);
850 let result: Vec<_> = merge.by_ref().collect();
851
852 assert_eq!(result.len(), 4);
853 assert_eq!(result[0].key.as_ref(), b"a");
854 assert_eq!(result[1].key.as_ref(), b"b");
855 assert_eq!(result[2].key.as_ref(), b"c");
856 assert_eq!(result[3].key.as_ref(), b"d");
857 }
858
859 #[test]
860 fn test_memory_efficiency() {
861 let entries1: Vec<Entry<'static>> = (0..100)
864 .map(|i| make_entry(format!("key{:05}", i * 2).as_bytes(), b"value", 100))
865 .collect();
866 let entries2: Vec<Entry<'static>> = (0..100)
867 .map(|i| make_entry(format!("key{:05}", i * 2 + 1).as_bytes(), b"value", 100))
868 .collect();
869
870 let source1: Box<dyn EntryIterator<'static> + 'static> =
871 Box::new(VecIterator::new(entries1, 0));
872 let source2: Box<dyn EntryIterator<'static> + 'static> =
873 Box::new(VecIterator::new(entries2, 1));
874
875 let merge = MergeIterator::new(vec![source1, source2]);
876
877 let result: Vec<_> = merge.limit(10).collect();
879
880 assert_eq!(result.len(), 10);
881 assert_eq!(result[0].key.as_ref(), b"key00000");
882 assert_eq!(result[9].key.as_ref(), b"key00009");
883 }
884
885 #[test]
886 fn test_empty_sources() {
887 let sources: Vec<Box<dyn EntryIterator<'static> + 'static>> = vec![
888 Box::new(VecIterator::new(vec![], 0)),
889 Box::new(VecIterator::new(vec![], 1)),
890 ];
891
892 let mut merge = MergeIterator::new(sources);
893 let result: Vec<_> = merge.by_ref().collect();
894
895 assert_eq!(result.len(), 0);
896 }
897
898 #[test]
899 fn test_scan_stats() {
900 let source1: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
901 vec![
902 make_entry(b"a", b"1", 200),
903 make_entry(b"b", b"", 200), ],
905 0,
906 ));
907
908 let source2: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
909 vec![
910 make_entry(b"a", b"old", 100), make_entry(b"c", b"3", 100),
912 ],
913 1,
914 ));
915
916 let mut merge = MergeIterator::new(vec![source1, source2]);
917 let _: Vec<_> = merge.by_ref().collect();
918
919 let stats = merge.stats();
920 assert_eq!(stats.entries_scanned, 4);
921 assert_eq!(stats.entries_returned, 2); assert_eq!(stats.duplicates_skipped, 1); assert_eq!(stats.tombstones_skipped, 1); }
925
926 #[test]
927 fn test_priority_ordering() {
928 let source1: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
930 vec![make_entry(b"key", b"memtable", 300)],
931 0,
932 )); let source2: Box<dyn EntryIterator<'static> + 'static> =
935 Box::new(VecIterator::new(vec![make_entry(b"key", b"l0", 200)], 1));
936
937 let source3: Box<dyn EntryIterator<'static> + 'static> =
938 Box::new(VecIterator::new(vec![make_entry(b"key", b"l1", 100)], 2)); let mut merge = MergeIterator::new(vec![source1, source2, source3]);
941 let result: Vec<_> = merge.by_ref().collect();
942
943 assert_eq!(result.len(), 1);
944 assert_eq!(result[0].value.as_ref(), b"memtable");
945 }
946}