1use std::borrow::Cow;
36use std::cmp::Ordering;
37use std::collections::BinaryHeap;
38
39#[derive(Debug, Clone)]
41pub struct Entry<'a> {
42 pub key: Cow<'a, [u8]>,
43 pub value: Cow<'a, [u8]>,
44 pub timestamp: u64,
45 pub is_tombstone: bool,
46}
47
48impl<'a> Entry<'a> {
49 pub fn new(
51 key: impl Into<Cow<'a, [u8]>>,
52 value: impl Into<Cow<'a, [u8]>>,
53 timestamp: u64,
54 ) -> Self {
55 let value = value.into();
56 let is_tombstone = value.is_empty();
57 Self {
58 key: key.into(),
59 value,
60 timestamp,
61 is_tombstone,
62 }
63 }
64
65 pub fn tombstone(key: impl Into<Cow<'a, [u8]>>, timestamp: u64) -> Self {
67 Self {
68 key: key.into(),
69 value: Cow::Borrowed(&[]),
70 timestamp,
71 is_tombstone: true,
72 }
73 }
74
75 pub fn into_owned(self) -> Entry<'static> {
77 Entry {
78 key: Cow::Owned(self.key.into_owned()),
79 value: Cow::Owned(self.value.into_owned()),
80 timestamp: self.timestamp,
81 is_tombstone: self.is_tombstone,
82 }
83 }
84}
85
86pub trait EntryIterator<'a>: Send {
88 fn peek(&self) -> Option<&Entry<'a>>;
90
91 fn advance(&mut self);
93
94 fn is_exhausted(&self) -> bool;
96
97 fn source_priority(&self) -> u8;
99}
100
101struct PeekableSource<'a> {
103 source: Box<dyn EntryIterator<'a> + 'a>,
104 priority: u8,
105}
106
107impl<'a> PeekableSource<'a> {
108 fn new(source: Box<dyn EntryIterator<'a> + 'a>) -> Self {
109 let priority = source.source_priority();
110 Self { source, priority }
111 }
112}
113
114impl<'a> PartialEq for PeekableSource<'a> {
115 fn eq(&self, other: &Self) -> bool {
116 match (self.source.peek(), other.source.peek()) {
117 (Some(a), Some(b)) => a.key == b.key && self.priority == other.priority,
118 (None, None) => true,
119 _ => false,
120 }
121 }
122}
123
124impl<'a> Eq for PeekableSource<'a> {}
125
126impl<'a> PartialOrd for PeekableSource<'a> {
127 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
128 Some(self.cmp(other))
129 }
130}
131
132impl<'a> Ord for PeekableSource<'a> {
133 fn cmp(&self, other: &Self) -> Ordering {
134 match (self.source.peek(), other.source.peek()) {
135 (Some(a), Some(b)) => {
136 match b.key.cmp(&a.key) {
139 Ordering::Equal => other.priority.cmp(&self.priority),
140 ord => ord,
141 }
142 }
143 (Some(_), None) => Ordering::Greater, (None, Some(_)) => Ordering::Less,
145 (None, None) => Ordering::Equal,
146 }
147 }
148}
149
150pub struct MergeIterator<'a> {
152 heap: BinaryHeap<PeekableSource<'a>>,
154 current_key: Option<Vec<u8>>,
156 stats: ScanStats,
158}
159
160impl<'a> MergeIterator<'a> {
161 pub fn new(sources: Vec<Box<dyn EntryIterator<'a> + 'a>>) -> Self {
163 let mut heap = BinaryHeap::with_capacity(sources.len());
164
165 for source in sources {
166 let peekable = PeekableSource::new(source);
167 if !peekable.source.is_exhausted() {
168 heap.push(peekable);
169 }
170 }
171
172 Self {
173 heap,
174 current_key: None,
175 stats: ScanStats::default(),
176 }
177 }
178
179 pub fn stats(&self) -> &ScanStats {
181 &self.stats
182 }
183}
184
185impl<'a> Iterator for MergeIterator<'a> {
186 type Item = Entry<'a>;
187
188 fn next(&mut self) -> Option<Self::Item> {
189 loop {
190 let mut source = self.heap.pop()?;
192
193 let entry = source.source.peek()?.clone();
195
196 source.source.advance();
198
199 if !source.source.is_exhausted() {
201 self.heap.push(source);
202 }
203
204 self.stats.entries_scanned += 1;
205
206 if let Some(ref current) = self.current_key
208 && current.as_slice() == entry.key.as_ref()
209 {
210 self.stats.duplicates_skipped += 1;
211 continue;
212 }
213
214 self.current_key = Some(entry.key.to_vec());
215
216 if entry.is_tombstone {
218 self.stats.tombstones_skipped += 1;
219 continue;
220 }
221
222 self.stats.entries_returned += 1;
223 return Some(entry);
224 }
225 }
226}
227
228#[derive(Debug, Default, Clone)]
230pub struct ScanStats {
231 pub entries_scanned: u64,
232 pub entries_returned: u64,
233 pub duplicates_skipped: u64,
234 pub tombstones_skipped: u64,
235}
236
237pub struct VecIterator<'a> {
239 entries: Vec<Entry<'a>>,
240 position: usize,
241 priority: u8,
242}
243
244impl<'a> VecIterator<'a> {
245 pub fn new(entries: Vec<Entry<'a>>, priority: u8) -> Self {
246 Self {
247 entries,
248 position: 0,
249 priority,
250 }
251 }
252}
253
254impl<'a> EntryIterator<'a> for VecIterator<'a> {
255 fn peek(&self) -> Option<&Entry<'a>> {
256 self.entries.get(self.position)
257 }
258
259 fn advance(&mut self) {
260 self.position += 1;
261 }
262
263 fn is_exhausted(&self) -> bool {
264 self.position >= self.entries.len()
265 }
266
267 fn source_priority(&self) -> u8 {
268 self.priority
269 }
270}
271
272#[allow(dead_code)]
274pub struct RangeIterator<'a, I: EntryIterator<'a>> {
275 inner: I,
276 start_key: Vec<u8>,
277 end_key: Vec<u8>,
278 started: bool,
279 ended: bool,
280 _marker: std::marker::PhantomData<&'a ()>,
281}
282
283impl<'a, I: EntryIterator<'a>> RangeIterator<'a, I> {
284 pub fn new(inner: I, start_key: Vec<u8>, end_key: Vec<u8>) -> Self {
285 Self {
286 inner,
287 start_key,
288 end_key,
289 started: false,
290 ended: false,
291 _marker: std::marker::PhantomData,
292 }
293 }
294}
295
296impl<'a, I: EntryIterator<'a> + Send> EntryIterator<'a> for RangeIterator<'a, I> {
297 fn peek(&self) -> Option<&Entry<'a>> {
298 if self.ended {
299 return None;
300 }
301
302 let entry = self.inner.peek()?;
303
304 if entry.key.as_ref() > self.end_key.as_slice() {
306 return None;
307 }
308
309 Some(entry)
310 }
311
312 fn advance(&mut self) {
313 if self.ended {
314 return;
315 }
316
317 self.inner.advance();
318
319 if let Some(entry) = self.inner.peek()
321 && entry.key.as_ref() > self.end_key.as_slice()
322 {
323 self.ended = true;
324 }
325 }
326
327 fn is_exhausted(&self) -> bool {
328 self.ended || self.inner.is_exhausted()
329 }
330
331 fn source_priority(&self) -> u8 {
332 self.inner.source_priority()
333 }
334}
335
336pub struct LimitIterator<'a, I: Iterator<Item = Entry<'a>>> {
338 inner: I,
339 limit: usize,
340 count: usize,
341}
342
343impl<'a, I: Iterator<Item = Entry<'a>>> LimitIterator<'a, I> {
344 pub fn new(inner: I, limit: usize) -> Self {
345 Self {
346 inner,
347 limit,
348 count: 0,
349 }
350 }
351}
352
353impl<'a, I: Iterator<Item = Entry<'a>>> Iterator for LimitIterator<'a, I> {
354 type Item = Entry<'a>;
355
356 fn next(&mut self) -> Option<Self::Item> {
357 if self.count >= self.limit {
358 return None;
359 }
360 self.count += 1;
361 self.inner.next()
362 }
363}
364
365pub struct FilterIterator<'a, I, F>
367where
368 I: Iterator<Item = Entry<'a>>,
369 F: Fn(&Entry<'a>) -> bool,
370{
371 inner: I,
372 predicate: F,
373}
374
375impl<'a, I, F> FilterIterator<'a, I, F>
376where
377 I: Iterator<Item = Entry<'a>>,
378 F: Fn(&Entry<'a>) -> bool,
379{
380 pub fn new(inner: I, predicate: F) -> Self {
381 Self { inner, predicate }
382 }
383}
384
385impl<'a, I, F> Iterator for FilterIterator<'a, I, F>
386where
387 I: Iterator<Item = Entry<'a>>,
388 F: Fn(&Entry<'a>) -> bool,
389{
390 type Item = Entry<'a>;
391
392 fn next(&mut self) -> Option<Self::Item> {
393 loop {
394 let entry = self.inner.next()?;
395 if (self.predicate)(&entry) {
396 return Some(entry);
397 }
398 }
399 }
400}
401
402pub struct MemtableIterator<'a> {
404 entries: Vec<Entry<'a>>,
405 position: usize,
406}
407
408impl<'a> MemtableIterator<'a> {
409 pub fn new(entries: Vec<Entry<'a>>) -> Self {
410 Self {
411 entries,
412 position: 0,
413 }
414 }
415
416 pub fn from_btree<K, V>(tree: &'a std::collections::BTreeMap<K, V>, timestamp: u64) -> Self
417 where
418 K: AsRef<[u8]>,
419 V: AsRef<[u8]>,
420 {
421 let entries: Vec<_> = tree
422 .iter()
423 .map(|(k, v)| {
424 Entry::new(
425 Cow::Borrowed(k.as_ref()),
426 Cow::Borrowed(v.as_ref()),
427 timestamp,
428 )
429 })
430 .collect();
431
432 Self::new(entries)
433 }
434}
435
436impl<'a> EntryIterator<'a> for MemtableIterator<'a> {
437 fn peek(&self) -> Option<&Entry<'a>> {
438 self.entries.get(self.position)
439 }
440
441 fn advance(&mut self) {
442 self.position += 1;
443 }
444
445 fn is_exhausted(&self) -> bool {
446 self.position >= self.entries.len()
447 }
448
449 fn source_priority(&self) -> u8 {
450 0 }
452}
453
454pub struct SstIterator<'a> {
456 data: &'a [u8],
458 position: usize,
460 current: Option<Entry<'a>>,
462 end_key: Option<Vec<u8>>,
464 level: u8,
466}
467
468impl<'a> SstIterator<'a> {
469 pub fn new(data: &'a [u8], level: u8) -> Self {
470 let mut iter = Self {
471 data,
472 position: 0,
473 current: None,
474 end_key: None,
475 level,
476 };
477 iter.read_next();
478 iter
479 }
480
481 pub fn with_end_key(mut self, end_key: Vec<u8>) -> Self {
482 self.end_key = Some(end_key);
483 self
484 }
485
486 fn read_next(&mut self) {
487 if self.position >= self.data.len() {
488 self.current = None;
489 return;
490 }
491
492 if self.position + 12 > self.data.len() {
494 self.current = None;
495 return;
496 }
497
498 let key_len =
499 u16::from_le_bytes([self.data[self.position], self.data[self.position + 1]]) as usize;
500 let value_len =
501 u16::from_le_bytes([self.data[self.position + 2], self.data[self.position + 3]])
502 as usize;
503 let timestamp = u64::from_le_bytes([
504 self.data[self.position + 4],
505 self.data[self.position + 5],
506 self.data[self.position + 6],
507 self.data[self.position + 7],
508 self.data[self.position + 8],
509 self.data[self.position + 9],
510 self.data[self.position + 10],
511 self.data[self.position + 11],
512 ]);
513
514 let key_start = self.position + 12;
515 let key_end = key_start + key_len;
516 let value_end = key_end + value_len;
517
518 if value_end > self.data.len() {
519 self.current = None;
520 return;
521 }
522
523 let key = &self.data[key_start..key_end];
524 let value = &self.data[key_end..value_end];
525
526 if let Some(ref end_key) = self.end_key
528 && key > end_key.as_slice()
529 {
530 self.current = None;
531 return;
532 }
533
534 self.current = Some(Entry::new(
535 Cow::Borrowed(key),
536 Cow::Borrowed(value),
537 timestamp,
538 ));
539 self.position = value_end;
540 }
541}
542
543impl<'a> EntryIterator<'a> for SstIterator<'a> {
544 fn peek(&self) -> Option<&Entry<'a>> {
545 self.current.as_ref()
546 }
547
548 fn advance(&mut self) {
549 self.read_next();
550 }
551
552 fn is_exhausted(&self) -> bool {
553 self.current.is_none()
554 }
555
556 fn source_priority(&self) -> u8 {
557 self.level + 1
560 }
561}
562
563pub struct SstBuilder {
565 data: Vec<u8>,
566}
567
568impl Default for SstBuilder {
569 fn default() -> Self {
570 Self::new()
571 }
572}
573
574impl SstBuilder {
575 pub fn new() -> Self {
576 Self { data: Vec::new() }
577 }
578
579 pub fn add(&mut self, key: &[u8], value: &[u8], timestamp: u64) -> &mut Self {
580 self.data
581 .extend_from_slice(&(key.len() as u16).to_le_bytes());
582 self.data
583 .extend_from_slice(&(value.len() as u16).to_le_bytes());
584 self.data.extend_from_slice(×tamp.to_le_bytes());
585 self.data.extend_from_slice(key);
586 self.data.extend_from_slice(value);
587 self
588 }
589
590 pub fn build(self) -> Vec<u8> {
591 self.data
592 }
593}
594
595pub trait IteratorExt<'a>: Iterator<Item = Entry<'a>> + Sized {
597 fn limit(self, n: usize) -> LimitIterator<'a, Self> {
598 LimitIterator::new(self, n)
599 }
600
601 fn filter_entries<F>(self, predicate: F) -> FilterIterator<'a, Self, F>
602 where
603 F: Fn(&Entry<'a>) -> bool,
604 {
605 FilterIterator::new(self, predicate)
606 }
607}
608
609impl<'a, I: Iterator<Item = Entry<'a>>> IteratorExt<'a> for I {}
610
611#[cfg(test)]
612mod tests {
613 use super::*;
614
615 fn make_entry(key: &[u8], value: &[u8], ts: u64) -> Entry<'static> {
616 Entry {
617 key: Cow::Owned(key.to_vec()),
618 value: Cow::Owned(value.to_vec()),
619 timestamp: ts,
620 is_tombstone: value.is_empty(),
621 }
622 }
623
624 #[test]
625 fn test_vec_iterator() {
626 let entries = vec![
627 make_entry(b"a", b"1", 100),
628 make_entry(b"b", b"2", 100),
629 make_entry(b"c", b"3", 100),
630 ];
631
632 let iter = VecIterator::new(entries, 0);
633 assert!(!iter.is_exhausted());
634 assert_eq!(iter.peek().unwrap().key.as_ref(), b"a");
635 }
636
637 #[test]
638 fn test_merge_iterator_single_source() {
639 let entries = vec![
640 make_entry(b"a", b"1", 100),
641 make_entry(b"b", b"2", 100),
642 make_entry(b"c", b"3", 100),
643 ];
644
645 let source: Box<dyn EntryIterator<'static> + 'static> =
646 Box::new(VecIterator::new(entries, 0));
647 let mut merge = MergeIterator::new(vec![source]);
648
649 let result: Vec<_> = merge.by_ref().collect();
650 assert_eq!(result.len(), 3);
651 assert_eq!(result[0].key.as_ref(), b"a");
652 assert_eq!(result[1].key.as_ref(), b"b");
653 assert_eq!(result[2].key.as_ref(), b"c");
654 }
655
656 #[test]
657 fn test_merge_iterator_multiple_sources() {
658 let source1: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
659 vec![
660 make_entry(b"a", b"1", 100),
661 make_entry(b"c", b"3", 100),
662 make_entry(b"e", b"5", 100),
663 ],
664 0,
665 ));
666
667 let source2: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
668 vec![
669 make_entry(b"b", b"2", 100),
670 make_entry(b"d", b"4", 100),
671 make_entry(b"f", b"6", 100),
672 ],
673 1,
674 ));
675
676 let mut merge = MergeIterator::new(vec![source1, source2]);
677 let result: Vec<_> = merge.by_ref().collect();
678
679 assert_eq!(result.len(), 6);
680 assert_eq!(result[0].key.as_ref(), b"a");
681 assert_eq!(result[1].key.as_ref(), b"b");
682 assert_eq!(result[2].key.as_ref(), b"c");
683 assert_eq!(result[3].key.as_ref(), b"d");
684 assert_eq!(result[4].key.as_ref(), b"e");
685 assert_eq!(result[5].key.as_ref(), b"f");
686 }
687
688 #[test]
689 fn test_merge_iterator_deduplication() {
690 let source1: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
692 vec![
693 make_entry(b"a", b"new_value", 200), ],
695 0,
696 ));
697
698 let source2: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
700 vec![
701 make_entry(b"a", b"old_value", 100), make_entry(b"b", b"2", 100),
703 ],
704 1,
705 ));
706
707 let mut merge = MergeIterator::new(vec![source1, source2]);
708 let result: Vec<_> = merge.by_ref().collect();
709
710 assert_eq!(result.len(), 2);
712 assert_eq!(result[0].key.as_ref(), b"a");
713 assert_eq!(result[0].value.as_ref(), b"new_value");
714 assert_eq!(result[1].key.as_ref(), b"b");
715
716 assert_eq!(merge.stats().duplicates_skipped, 1);
718 }
719
720 #[test]
721 fn test_merge_iterator_tombstones() {
722 let source1: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
723 vec![
724 Entry::tombstone(Cow::Owned(b"a".to_vec()), 200), ],
726 0,
727 ));
728
729 let source2: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
730 vec![
731 make_entry(b"a", b"old_value", 100),
732 make_entry(b"b", b"2", 100),
733 ],
734 1,
735 ));
736
737 let mut merge = MergeIterator::new(vec![source1, source2]);
738 let result: Vec<_> = merge.by_ref().collect();
739
740 assert_eq!(result.len(), 1);
742 assert_eq!(result[0].key.as_ref(), b"b");
743
744 assert_eq!(merge.stats().tombstones_skipped, 1);
745 }
746
747 #[test]
748 fn test_limit_iterator() {
749 let entries = vec![
750 make_entry(b"a", b"1", 100),
751 make_entry(b"b", b"2", 100),
752 make_entry(b"c", b"3", 100),
753 make_entry(b"d", b"4", 100),
754 make_entry(b"e", b"5", 100),
755 ];
756
757 let source: Box<dyn EntryIterator<'static> + 'static> =
758 Box::new(VecIterator::new(entries, 0));
759 let merge = MergeIterator::new(vec![source]);
760
761 let result: Vec<_> = merge.limit(3).collect();
762
763 assert_eq!(result.len(), 3);
764 assert_eq!(result[0].key.as_ref(), b"a");
765 assert_eq!(result[2].key.as_ref(), b"c");
766 }
767
768 #[test]
769 fn test_filter_iterator() {
770 let entries = vec![
771 make_entry(b"a", b"1", 100),
772 make_entry(b"b", b"2", 100),
773 make_entry(b"c", b"3", 100),
774 make_entry(b"d", b"4", 100),
775 ];
776
777 let source: Box<dyn EntryIterator<'static> + 'static> =
778 Box::new(VecIterator::new(entries, 0));
779 let merge = MergeIterator::new(vec![source]);
780
781 let result: Vec<_> = merge
783 .filter_entries(|e| e.key.as_ref() < b"c".as_slice())
784 .collect();
785
786 assert_eq!(result.len(), 2);
787 assert_eq!(result[0].key.as_ref(), b"a");
788 assert_eq!(result[1].key.as_ref(), b"b");
789 }
790
791 #[test]
792 fn test_sst_builder_and_iterator() {
793 let mut builder = SstBuilder::new();
794 builder
795 .add(b"apple", b"red", 100)
796 .add(b"banana", b"yellow", 100)
797 .add(b"cherry", b"red", 100);
798
799 let data = builder.build();
800 let iter = SstIterator::new(&data, 0);
801
802 assert!(!iter.is_exhausted());
803 assert_eq!(iter.peek().unwrap().key.as_ref(), b"apple");
804 }
805
806 #[test]
807 fn test_sst_iterator_full() {
808 let mut builder = SstBuilder::new();
809 builder
810 .add(b"a", b"1", 100)
811 .add(b"b", b"2", 200)
812 .add(b"c", b"3", 300);
813
814 let data = builder.build();
815 let mut iter = SstIterator::new(&data, 0);
816
817 let mut results = Vec::new();
818 while !iter.is_exhausted() {
819 results.push(iter.peek().unwrap().clone());
820 iter.advance();
821 }
822
823 assert_eq!(results.len(), 3);
824 assert_eq!(results[0].key.as_ref(), b"a");
825 assert_eq!(results[0].timestamp, 100);
826 assert_eq!(results[1].key.as_ref(), b"b");
827 assert_eq!(results[1].timestamp, 200);
828 assert_eq!(results[2].key.as_ref(), b"c");
829 assert_eq!(results[2].timestamp, 300);
830 }
831
832 #[test]
833 fn test_sst_with_merge() {
834 let mut builder1 = SstBuilder::new();
835 builder1.add(b"a", b"1", 100).add(b"c", b"3", 100);
836 let data1 = builder1.build();
837
838 let mut builder2 = SstBuilder::new();
839 builder2.add(b"b", b"2", 100).add(b"d", b"4", 100);
840 let data2 = builder2.build();
841
842 let data1_static: &'static [u8] = Box::leak(data1.into_boxed_slice());
845 let data2_static: &'static [u8] = Box::leak(data2.into_boxed_slice());
846
847 let source1: Box<dyn EntryIterator<'static> + 'static> =
848 Box::new(SstIterator::new(data1_static, 0));
849 let source2: Box<dyn EntryIterator<'static> + 'static> =
850 Box::new(SstIterator::new(data2_static, 1));
851
852 let mut merge = MergeIterator::new(vec![source1, source2]);
853 let result: Vec<_> = merge.by_ref().collect();
854
855 assert_eq!(result.len(), 4);
856 assert_eq!(result[0].key.as_ref(), b"a");
857 assert_eq!(result[1].key.as_ref(), b"b");
858 assert_eq!(result[2].key.as_ref(), b"c");
859 assert_eq!(result[3].key.as_ref(), b"d");
860 }
861
862 #[test]
863 fn test_memory_efficiency() {
864 let entries1: Vec<Entry<'static>> = (0..100)
867 .map(|i| make_entry(format!("key{:05}", i * 2).as_bytes(), b"value", 100))
868 .collect();
869 let entries2: Vec<Entry<'static>> = (0..100)
870 .map(|i| make_entry(format!("key{:05}", i * 2 + 1).as_bytes(), b"value", 100))
871 .collect();
872
873 let source1: Box<dyn EntryIterator<'static> + 'static> =
874 Box::new(VecIterator::new(entries1, 0));
875 let source2: Box<dyn EntryIterator<'static> + 'static> =
876 Box::new(VecIterator::new(entries2, 1));
877
878 let merge = MergeIterator::new(vec![source1, source2]);
879
880 let result: Vec<_> = merge.limit(10).collect();
882
883 assert_eq!(result.len(), 10);
884 assert_eq!(result[0].key.as_ref(), b"key00000");
885 assert_eq!(result[9].key.as_ref(), b"key00009");
886 }
887
888 #[test]
889 fn test_empty_sources() {
890 let sources: Vec<Box<dyn EntryIterator<'static> + 'static>> = vec![
891 Box::new(VecIterator::new(vec![], 0)),
892 Box::new(VecIterator::new(vec![], 1)),
893 ];
894
895 let mut merge = MergeIterator::new(sources);
896 let result: Vec<_> = merge.by_ref().collect();
897
898 assert_eq!(result.len(), 0);
899 }
900
901 #[test]
902 fn test_scan_stats() {
903 let source1: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
904 vec![
905 make_entry(b"a", b"1", 200),
906 make_entry(b"b", b"", 200), ],
908 0,
909 ));
910
911 let source2: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
912 vec![
913 make_entry(b"a", b"old", 100), make_entry(b"c", b"3", 100),
915 ],
916 1,
917 ));
918
919 let mut merge = MergeIterator::new(vec![source1, source2]);
920 let _: Vec<_> = merge.by_ref().collect();
921
922 let stats = merge.stats();
923 assert_eq!(stats.entries_scanned, 4);
924 assert_eq!(stats.entries_returned, 2); assert_eq!(stats.duplicates_skipped, 1); assert_eq!(stats.tombstones_skipped, 1); }
928
929 #[test]
930 fn test_priority_ordering() {
931 let source1: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
933 vec![make_entry(b"key", b"memtable", 300)],
934 0,
935 )); let source2: Box<dyn EntryIterator<'static> + 'static> =
938 Box::new(VecIterator::new(vec![make_entry(b"key", b"l0", 200)], 1));
939
940 let source3: Box<dyn EntryIterator<'static> + 'static> =
941 Box::new(VecIterator::new(vec![make_entry(b"key", b"l1", 100)], 2)); let mut merge = MergeIterator::new(vec![source1, source2, source3]);
944 let result: Vec<_> = merge.by_ref().collect();
945
946 assert_eq!(result.len(), 1);
947 assert_eq!(result[0].value.as_ref(), b"memtable");
948 }
949}