sochdb_storage/
streaming_iterator.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Streaming Iterator Architecture for Scans
16//!
17//! This module implements memory-efficient streaming iterators that:
18//! - Use O(k) memory for LIMIT k queries regardless of table size
19//! - Enable zero-copy SSTable reads via mmap
20//! - Support merge iteration over multiple sorted sources (LSM-tree style)
21//! - Provide backpressure to prevent memory exhaustion
22//!
23//! ## Memory Complexity
24//!
25//! Current scan: M = O(N) where N = rows in scan range
26//! Streaming:    M = O(k + S × log(S)) where k = LIMIT, S = number of sources
27//!
28//! For 100M rows with LIMIT 10:
29//! - Current: 10GB allocation (100M × 100 bytes)
30//! - Streaming: ~10KB (10 rows + heap overhead)
31
32use std::borrow::Cow;
33use std::cmp::Ordering;
34use std::collections::BinaryHeap;
35
36/// A key-value entry with timestamp for MVCC
37#[derive(Debug, Clone)]
38pub struct Entry<'a> {
39    pub key: Cow<'a, [u8]>,
40    pub value: Cow<'a, [u8]>,
41    pub timestamp: u64,
42    pub is_tombstone: bool,
43}
44
45impl<'a> Entry<'a> {
46    /// Create a new entry
47    pub fn new(
48        key: impl Into<Cow<'a, [u8]>>,
49        value: impl Into<Cow<'a, [u8]>>,
50        timestamp: u64,
51    ) -> Self {
52        let value = value.into();
53        let is_tombstone = value.is_empty();
54        Self {
55            key: key.into(),
56            value,
57            timestamp,
58            is_tombstone,
59        }
60    }
61
62    /// Create a tombstone entry
63    pub fn tombstone(key: impl Into<Cow<'a, [u8]>>, timestamp: u64) -> Self {
64        Self {
65            key: key.into(),
66            value: Cow::Borrowed(&[]),
67            timestamp,
68            is_tombstone: true,
69        }
70    }
71
72    /// Convert to owned version
73    pub fn into_owned(self) -> Entry<'static> {
74        Entry {
75            key: Cow::Owned(self.key.into_owned()),
76            value: Cow::Owned(self.value.into_owned()),
77            timestamp: self.timestamp,
78            is_tombstone: self.is_tombstone,
79        }
80    }
81}
82
83/// Trait for a streaming source of entries
84pub trait EntryIterator<'a>: Send {
85    /// Peek at the next entry without consuming it
86    fn peek(&self) -> Option<&Entry<'a>>;
87
88    /// Advance to the next entry
89    fn advance(&mut self);
90
91    /// Check if the iterator is exhausted
92    fn is_exhausted(&self) -> bool;
93
94    /// Get the source priority (lower = higher priority for same key)
95    fn source_priority(&self) -> u8;
96}
97
98/// Peekable wrapper around an entry iterator
99struct PeekableSource<'a> {
100    source: Box<dyn EntryIterator<'a> + 'a>,
101    priority: u8,
102}
103
104impl<'a> PeekableSource<'a> {
105    fn new(source: Box<dyn EntryIterator<'a> + 'a>) -> Self {
106        let priority = source.source_priority();
107        Self { source, priority }
108    }
109}
110
111impl<'a> PartialEq for PeekableSource<'a> {
112    fn eq(&self, other: &Self) -> bool {
113        match (self.source.peek(), other.source.peek()) {
114            (Some(a), Some(b)) => a.key == b.key && self.priority == other.priority,
115            (None, None) => true,
116            _ => false,
117        }
118    }
119}
120
121impl<'a> Eq for PeekableSource<'a> {}
122
123impl<'a> PartialOrd for PeekableSource<'a> {
124    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
125        Some(self.cmp(other))
126    }
127}
128
129impl<'a> Ord for PeekableSource<'a> {
130    fn cmp(&self, other: &Self) -> Ordering {
131        match (self.source.peek(), other.source.peek()) {
132            (Some(a), Some(b)) => {
133                // Min-heap: reverse ordering
134                // First by key (ascending), then by priority (ascending = higher priority)
135                match b.key.cmp(&a.key) {
136                    Ordering::Equal => other.priority.cmp(&self.priority),
137                    ord => ord,
138                }
139            }
140            (Some(_), None) => Ordering::Greater, // None goes last
141            (None, Some(_)) => Ordering::Less,
142            (None, None) => Ordering::Equal,
143        }
144    }
145}
146
147/// Merge iterator over multiple sorted sources (LSM-tree style)
148pub struct MergeIterator<'a> {
149    /// Min-heap of source iterators
150    heap: BinaryHeap<PeekableSource<'a>>,
151    /// Current key for deduplication
152    current_key: Option<Vec<u8>>,
153    /// Statistics
154    stats: ScanStats,
155}
156
157impl<'a> MergeIterator<'a> {
158    /// Create a new merge iterator from multiple sources
159    pub fn new(sources: Vec<Box<dyn EntryIterator<'a> + 'a>>) -> Self {
160        let mut heap = BinaryHeap::with_capacity(sources.len());
161
162        for source in sources {
163            let peekable = PeekableSource::new(source);
164            if !peekable.source.is_exhausted() {
165                heap.push(peekable);
166            }
167        }
168
169        Self {
170            heap,
171            current_key: None,
172            stats: ScanStats::default(),
173        }
174    }
175
176    /// Get scan statistics
177    pub fn stats(&self) -> &ScanStats {
178        &self.stats
179    }
180}
181
182impl<'a> Iterator for MergeIterator<'a> {
183    type Item = Entry<'a>;
184
185    fn next(&mut self) -> Option<Self::Item> {
186        loop {
187            // Pop the source with smallest key
188            let mut source = self.heap.pop()?;
189
190            // Get the entry (must exist since we only push non-exhausted sources)
191            let entry = source.source.peek()?.clone();
192
193            // Advance this source
194            source.source.advance();
195
196            // Re-insert if not exhausted
197            if !source.source.is_exhausted() {
198                self.heap.push(source);
199            }
200
201            self.stats.entries_scanned += 1;
202
203            // Skip duplicate keys (keep only newest version)
204            if let Some(ref current) = self.current_key
205                && current.as_slice() == entry.key.as_ref()
206            {
207                self.stats.duplicates_skipped += 1;
208                continue;
209            }
210
211            self.current_key = Some(entry.key.to_vec());
212
213            // Skip tombstones
214            if entry.is_tombstone {
215                self.stats.tombstones_skipped += 1;
216                continue;
217            }
218
219            self.stats.entries_returned += 1;
220            return Some(entry);
221        }
222    }
223}
224
225/// Scan statistics
226#[derive(Debug, Default, Clone)]
227pub struct ScanStats {
228    pub entries_scanned: u64,
229    pub entries_returned: u64,
230    pub duplicates_skipped: u64,
231    pub tombstones_skipped: u64,
232}
233
234/// Vector-based entry iterator for testing and simple cases
235pub struct VecIterator<'a> {
236    entries: Vec<Entry<'a>>,
237    position: usize,
238    priority: u8,
239}
240
241impl<'a> VecIterator<'a> {
242    pub fn new(entries: Vec<Entry<'a>>, priority: u8) -> Self {
243        Self {
244            entries,
245            position: 0,
246            priority,
247        }
248    }
249}
250
251impl<'a> EntryIterator<'a> for VecIterator<'a> {
252    fn peek(&self) -> Option<&Entry<'a>> {
253        self.entries.get(self.position)
254    }
255
256    fn advance(&mut self) {
257        self.position += 1;
258    }
259
260    fn is_exhausted(&self) -> bool {
261        self.position >= self.entries.len()
262    }
263
264    fn source_priority(&self) -> u8 {
265        self.priority
266    }
267}
268
269/// Range-bounded iterator wrapper
270#[allow(dead_code)]
271pub struct RangeIterator<'a, I: EntryIterator<'a>> {
272    inner: I,
273    start_key: Vec<u8>,
274    end_key: Vec<u8>,
275    started: bool,
276    ended: bool,
277    _marker: std::marker::PhantomData<&'a ()>,
278}
279
280impl<'a, I: EntryIterator<'a>> RangeIterator<'a, I> {
281    pub fn new(inner: I, start_key: Vec<u8>, end_key: Vec<u8>) -> Self {
282        Self {
283            inner,
284            start_key,
285            end_key,
286            started: false,
287            ended: false,
288            _marker: std::marker::PhantomData,
289        }
290    }
291}
292
293impl<'a, I: EntryIterator<'a> + Send> EntryIterator<'a> for RangeIterator<'a, I> {
294    fn peek(&self) -> Option<&Entry<'a>> {
295        if self.ended {
296            return None;
297        }
298
299        let entry = self.inner.peek()?;
300
301        // Check end bound
302        if entry.key.as_ref() > self.end_key.as_slice() {
303            return None;
304        }
305
306        Some(entry)
307    }
308
309    fn advance(&mut self) {
310        if self.ended {
311            return;
312        }
313
314        self.inner.advance();
315
316        // Check if we've passed the end
317        if let Some(entry) = self.inner.peek()
318            && entry.key.as_ref() > self.end_key.as_slice()
319        {
320            self.ended = true;
321        }
322    }
323
324    fn is_exhausted(&self) -> bool {
325        self.ended || self.inner.is_exhausted()
326    }
327
328    fn source_priority(&self) -> u8 {
329        self.inner.source_priority()
330    }
331}
332
333/// Limit iterator wrapper
334pub struct LimitIterator<'a, I: Iterator<Item = Entry<'a>>> {
335    inner: I,
336    limit: usize,
337    count: usize,
338}
339
340impl<'a, I: Iterator<Item = Entry<'a>>> LimitIterator<'a, I> {
341    pub fn new(inner: I, limit: usize) -> Self {
342        Self {
343            inner,
344            limit,
345            count: 0,
346        }
347    }
348}
349
350impl<'a, I: Iterator<Item = Entry<'a>>> Iterator for LimitIterator<'a, I> {
351    type Item = Entry<'a>;
352
353    fn next(&mut self) -> Option<Self::Item> {
354        if self.count >= self.limit {
355            return None;
356        }
357        self.count += 1;
358        self.inner.next()
359    }
360}
361
362/// Filter iterator for predicate pushdown
363pub struct FilterIterator<'a, I, F>
364where
365    I: Iterator<Item = Entry<'a>>,
366    F: Fn(&Entry<'a>) -> bool,
367{
368    inner: I,
369    predicate: F,
370}
371
372impl<'a, I, F> FilterIterator<'a, I, F>
373where
374    I: Iterator<Item = Entry<'a>>,
375    F: Fn(&Entry<'a>) -> bool,
376{
377    pub fn new(inner: I, predicate: F) -> Self {
378        Self { inner, predicate }
379    }
380}
381
382impl<'a, I, F> Iterator for FilterIterator<'a, I, F>
383where
384    I: Iterator<Item = Entry<'a>>,
385    F: Fn(&Entry<'a>) -> bool,
386{
387    type Item = Entry<'a>;
388
389    fn next(&mut self) -> Option<Self::Item> {
390        loop {
391            let entry = self.inner.next()?;
392            if (self.predicate)(&entry) {
393                return Some(entry);
394            }
395        }
396    }
397}
398
399/// Memtable iterator (in-memory sorted data)
400pub struct MemtableIterator<'a> {
401    entries: Vec<Entry<'a>>,
402    position: usize,
403}
404
405impl<'a> MemtableIterator<'a> {
406    pub fn new(entries: Vec<Entry<'a>>) -> Self {
407        Self {
408            entries,
409            position: 0,
410        }
411    }
412
413    pub fn from_btree<K, V>(tree: &'a std::collections::BTreeMap<K, V>, timestamp: u64) -> Self
414    where
415        K: AsRef<[u8]>,
416        V: AsRef<[u8]>,
417    {
418        let entries: Vec<_> = tree
419            .iter()
420            .map(|(k, v)| {
421                Entry::new(
422                    Cow::Borrowed(k.as_ref()),
423                    Cow::Borrowed(v.as_ref()),
424                    timestamp,
425                )
426            })
427            .collect();
428
429        Self::new(entries)
430    }
431}
432
433impl<'a> EntryIterator<'a> for MemtableIterator<'a> {
434    fn peek(&self) -> Option<&Entry<'a>> {
435        self.entries.get(self.position)
436    }
437
438    fn advance(&mut self) {
439        self.position += 1;
440    }
441
442    fn is_exhausted(&self) -> bool {
443        self.position >= self.entries.len()
444    }
445
446    fn source_priority(&self) -> u8 {
447        0 // Memtable has highest priority (newest data)
448    }
449}
450
451/// SSTable iterator (simulated - real impl would use mmap)
452pub struct SstIterator<'a> {
453    /// Block data (simulating mmap region)
454    data: &'a [u8],
455    /// Current position in data
456    position: usize,
457    /// Cached current entry
458    current: Option<Entry<'a>>,
459    /// End key for range bound
460    end_key: Option<Vec<u8>>,
461    /// SSTable level (higher = lower priority)
462    level: u8,
463}
464
465impl<'a> SstIterator<'a> {
466    pub fn new(data: &'a [u8], level: u8) -> Self {
467        let mut iter = Self {
468            data,
469            position: 0,
470            current: None,
471            end_key: None,
472            level,
473        };
474        iter.read_next();
475        iter
476    }
477
478    pub fn with_end_key(mut self, end_key: Vec<u8>) -> Self {
479        self.end_key = Some(end_key);
480        self
481    }
482
483    fn read_next(&mut self) {
484        if self.position >= self.data.len() {
485            self.current = None;
486            return;
487        }
488
489        // Simple format: [key_len: u16][value_len: u16][timestamp: u64][key][value]
490        if self.position + 12 > self.data.len() {
491            self.current = None;
492            return;
493        }
494
495        let key_len =
496            u16::from_le_bytes([self.data[self.position], self.data[self.position + 1]]) as usize;
497        let value_len =
498            u16::from_le_bytes([self.data[self.position + 2], self.data[self.position + 3]])
499                as usize;
500        let timestamp = u64::from_le_bytes([
501            self.data[self.position + 4],
502            self.data[self.position + 5],
503            self.data[self.position + 6],
504            self.data[self.position + 7],
505            self.data[self.position + 8],
506            self.data[self.position + 9],
507            self.data[self.position + 10],
508            self.data[self.position + 11],
509        ]);
510
511        let key_start = self.position + 12;
512        let key_end = key_start + key_len;
513        let value_end = key_end + value_len;
514
515        if value_end > self.data.len() {
516            self.current = None;
517            return;
518        }
519
520        let key = &self.data[key_start..key_end];
521        let value = &self.data[key_end..value_end];
522
523        // Check end bound
524        if let Some(ref end_key) = self.end_key
525            && key > end_key.as_slice()
526        {
527            self.current = None;
528            return;
529        }
530
531        self.current = Some(Entry::new(
532            Cow::Borrowed(key),
533            Cow::Borrowed(value),
534            timestamp,
535        ));
536        self.position = value_end;
537    }
538}
539
540impl<'a> EntryIterator<'a> for SstIterator<'a> {
541    fn peek(&self) -> Option<&Entry<'a>> {
542        self.current.as_ref()
543    }
544
545    fn advance(&mut self) {
546        self.read_next();
547    }
548
549    fn is_exhausted(&self) -> bool {
550        self.current.is_none()
551    }
552
553    fn source_priority(&self) -> u8 {
554        // Lower level = higher priority (L0 > L1 > L2...)
555        // Memtable is priority 0, so SST levels start at 1
556        self.level + 1
557    }
558}
559
560/// Builder for SSTable data (for testing)
561pub struct SstBuilder {
562    data: Vec<u8>,
563}
564
565impl Default for SstBuilder {
566    fn default() -> Self {
567        Self::new()
568    }
569}
570
571impl SstBuilder {
572    pub fn new() -> Self {
573        Self { data: Vec::new() }
574    }
575
576    pub fn add(&mut self, key: &[u8], value: &[u8], timestamp: u64) -> &mut Self {
577        self.data
578            .extend_from_slice(&(key.len() as u16).to_le_bytes());
579        self.data
580            .extend_from_slice(&(value.len() as u16).to_le_bytes());
581        self.data.extend_from_slice(&timestamp.to_le_bytes());
582        self.data.extend_from_slice(key);
583        self.data.extend_from_slice(value);
584        self
585    }
586
587    pub fn build(self) -> Vec<u8> {
588        self.data
589    }
590}
591
592/// Extension trait for iterators
593pub trait IteratorExt<'a>: Iterator<Item = Entry<'a>> + Sized {
594    fn limit(self, n: usize) -> LimitIterator<'a, Self> {
595        LimitIterator::new(self, n)
596    }
597
598    fn filter_entries<F>(self, predicate: F) -> FilterIterator<'a, Self, F>
599    where
600        F: Fn(&Entry<'a>) -> bool,
601    {
602        FilterIterator::new(self, predicate)
603    }
604}
605
606impl<'a, I: Iterator<Item = Entry<'a>>> IteratorExt<'a> for I {}
607
608#[cfg(test)]
609mod tests {
610    use super::*;
611
612    fn make_entry(key: &[u8], value: &[u8], ts: u64) -> Entry<'static> {
613        Entry {
614            key: Cow::Owned(key.to_vec()),
615            value: Cow::Owned(value.to_vec()),
616            timestamp: ts,
617            is_tombstone: value.is_empty(),
618        }
619    }
620
621    #[test]
622    fn test_vec_iterator() {
623        let entries = vec![
624            make_entry(b"a", b"1", 100),
625            make_entry(b"b", b"2", 100),
626            make_entry(b"c", b"3", 100),
627        ];
628
629        let iter = VecIterator::new(entries, 0);
630        assert!(!iter.is_exhausted());
631        assert_eq!(iter.peek().unwrap().key.as_ref(), b"a");
632    }
633
634    #[test]
635    fn test_merge_iterator_single_source() {
636        let entries = vec![
637            make_entry(b"a", b"1", 100),
638            make_entry(b"b", b"2", 100),
639            make_entry(b"c", b"3", 100),
640        ];
641
642        let source: Box<dyn EntryIterator<'static> + 'static> =
643            Box::new(VecIterator::new(entries, 0));
644        let mut merge = MergeIterator::new(vec![source]);
645
646        let result: Vec<_> = merge.by_ref().collect();
647        assert_eq!(result.len(), 3);
648        assert_eq!(result[0].key.as_ref(), b"a");
649        assert_eq!(result[1].key.as_ref(), b"b");
650        assert_eq!(result[2].key.as_ref(), b"c");
651    }
652
653    #[test]
654    fn test_merge_iterator_multiple_sources() {
655        let source1: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
656            vec![
657                make_entry(b"a", b"1", 100),
658                make_entry(b"c", b"3", 100),
659                make_entry(b"e", b"5", 100),
660            ],
661            0,
662        ));
663
664        let source2: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
665            vec![
666                make_entry(b"b", b"2", 100),
667                make_entry(b"d", b"4", 100),
668                make_entry(b"f", b"6", 100),
669            ],
670            1,
671        ));
672
673        let mut merge = MergeIterator::new(vec![source1, source2]);
674        let result: Vec<_> = merge.by_ref().collect();
675
676        assert_eq!(result.len(), 6);
677        assert_eq!(result[0].key.as_ref(), b"a");
678        assert_eq!(result[1].key.as_ref(), b"b");
679        assert_eq!(result[2].key.as_ref(), b"c");
680        assert_eq!(result[3].key.as_ref(), b"d");
681        assert_eq!(result[4].key.as_ref(), b"e");
682        assert_eq!(result[5].key.as_ref(), b"f");
683    }
684
685    #[test]
686    fn test_merge_iterator_deduplication() {
687        // Source 0 has higher priority (memtable)
688        let source1: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
689            vec![
690                make_entry(b"a", b"new_value", 200), // Newer version
691            ],
692            0,
693        ));
694
695        // Source 1 has lower priority (SSTable)
696        let source2: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
697            vec![
698                make_entry(b"a", b"old_value", 100), // Older version
699                make_entry(b"b", b"2", 100),
700            ],
701            1,
702        ));
703
704        let mut merge = MergeIterator::new(vec![source1, source2]);
705        let result: Vec<_> = merge.by_ref().collect();
706
707        // Should have 2 entries, with 'a' having the new value
708        assert_eq!(result.len(), 2);
709        assert_eq!(result[0].key.as_ref(), b"a");
710        assert_eq!(result[0].value.as_ref(), b"new_value");
711        assert_eq!(result[1].key.as_ref(), b"b");
712
713        // Check stats
714        assert_eq!(merge.stats().duplicates_skipped, 1);
715    }
716
717    #[test]
718    fn test_merge_iterator_tombstones() {
719        let source1: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
720            vec![
721                Entry::tombstone(Cow::Owned(b"a".to_vec()), 200), // Delete
722            ],
723            0,
724        ));
725
726        let source2: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
727            vec![
728                make_entry(b"a", b"old_value", 100),
729                make_entry(b"b", b"2", 100),
730            ],
731            1,
732        ));
733
734        let mut merge = MergeIterator::new(vec![source1, source2]);
735        let result: Vec<_> = merge.by_ref().collect();
736
737        // 'a' should be skipped (tombstone), only 'b' remains
738        assert_eq!(result.len(), 1);
739        assert_eq!(result[0].key.as_ref(), b"b");
740
741        assert_eq!(merge.stats().tombstones_skipped, 1);
742    }
743
744    #[test]
745    fn test_limit_iterator() {
746        let entries = vec![
747            make_entry(b"a", b"1", 100),
748            make_entry(b"b", b"2", 100),
749            make_entry(b"c", b"3", 100),
750            make_entry(b"d", b"4", 100),
751            make_entry(b"e", b"5", 100),
752        ];
753
754        let source: Box<dyn EntryIterator<'static> + 'static> =
755            Box::new(VecIterator::new(entries, 0));
756        let merge = MergeIterator::new(vec![source]);
757
758        let result: Vec<_> = merge.limit(3).collect();
759
760        assert_eq!(result.len(), 3);
761        assert_eq!(result[0].key.as_ref(), b"a");
762        assert_eq!(result[2].key.as_ref(), b"c");
763    }
764
765    #[test]
766    fn test_filter_iterator() {
767        let entries = vec![
768            make_entry(b"a", b"1", 100),
769            make_entry(b"b", b"2", 100),
770            make_entry(b"c", b"3", 100),
771            make_entry(b"d", b"4", 100),
772        ];
773
774        let source: Box<dyn EntryIterator<'static> + 'static> =
775            Box::new(VecIterator::new(entries, 0));
776        let merge = MergeIterator::new(vec![source]);
777
778        // Filter to only keys < "c"
779        let result: Vec<_> = merge
780            .filter_entries(|e| e.key.as_ref() < b"c".as_slice())
781            .collect();
782
783        assert_eq!(result.len(), 2);
784        assert_eq!(result[0].key.as_ref(), b"a");
785        assert_eq!(result[1].key.as_ref(), b"b");
786    }
787
788    #[test]
789    fn test_sst_builder_and_iterator() {
790        let mut builder = SstBuilder::new();
791        builder
792            .add(b"apple", b"red", 100)
793            .add(b"banana", b"yellow", 100)
794            .add(b"cherry", b"red", 100);
795
796        let data = builder.build();
797        let iter = SstIterator::new(&data, 0);
798
799        assert!(!iter.is_exhausted());
800        assert_eq!(iter.peek().unwrap().key.as_ref(), b"apple");
801    }
802
803    #[test]
804    fn test_sst_iterator_full() {
805        let mut builder = SstBuilder::new();
806        builder
807            .add(b"a", b"1", 100)
808            .add(b"b", b"2", 200)
809            .add(b"c", b"3", 300);
810
811        let data = builder.build();
812        let mut iter = SstIterator::new(&data, 0);
813
814        let mut results = Vec::new();
815        while !iter.is_exhausted() {
816            results.push(iter.peek().unwrap().clone());
817            iter.advance();
818        }
819
820        assert_eq!(results.len(), 3);
821        assert_eq!(results[0].key.as_ref(), b"a");
822        assert_eq!(results[0].timestamp, 100);
823        assert_eq!(results[1].key.as_ref(), b"b");
824        assert_eq!(results[1].timestamp, 200);
825        assert_eq!(results[2].key.as_ref(), b"c");
826        assert_eq!(results[2].timestamp, 300);
827    }
828
829    #[test]
830    fn test_sst_with_merge() {
831        let mut builder1 = SstBuilder::new();
832        builder1.add(b"a", b"1", 100).add(b"c", b"3", 100);
833        let data1 = builder1.build();
834
835        let mut builder2 = SstBuilder::new();
836        builder2.add(b"b", b"2", 100).add(b"d", b"4", 100);
837        let data2 = builder2.build();
838
839        // Need to use 'static lifetime for the test
840        // In real usage, data would be mmap'd and outlive the iterator
841        let data1_static: &'static [u8] = Box::leak(data1.into_boxed_slice());
842        let data2_static: &'static [u8] = Box::leak(data2.into_boxed_slice());
843
844        let source1: Box<dyn EntryIterator<'static> + 'static> =
845            Box::new(SstIterator::new(data1_static, 0));
846        let source2: Box<dyn EntryIterator<'static> + 'static> =
847            Box::new(SstIterator::new(data2_static, 1));
848
849        let mut merge = MergeIterator::new(vec![source1, source2]);
850        let result: Vec<_> = merge.by_ref().collect();
851
852        assert_eq!(result.len(), 4);
853        assert_eq!(result[0].key.as_ref(), b"a");
854        assert_eq!(result[1].key.as_ref(), b"b");
855        assert_eq!(result[2].key.as_ref(), b"c");
856        assert_eq!(result[3].key.as_ref(), b"d");
857    }
858
859    #[test]
860    fn test_memory_efficiency() {
861        // This test demonstrates O(k) memory for LIMIT k
862        // Create "large" sources (simulated)
863        let entries1: Vec<Entry<'static>> = (0..100)
864            .map(|i| make_entry(format!("key{:05}", i * 2).as_bytes(), b"value", 100))
865            .collect();
866        let entries2: Vec<Entry<'static>> = (0..100)
867            .map(|i| make_entry(format!("key{:05}", i * 2 + 1).as_bytes(), b"value", 100))
868            .collect();
869
870        let source1: Box<dyn EntryIterator<'static> + 'static> =
871            Box::new(VecIterator::new(entries1, 0));
872        let source2: Box<dyn EntryIterator<'static> + 'static> =
873            Box::new(VecIterator::new(entries2, 1));
874
875        let merge = MergeIterator::new(vec![source1, source2]);
876
877        // Only take 10 - should use O(10) output memory, not O(200)
878        let result: Vec<_> = merge.limit(10).collect();
879
880        assert_eq!(result.len(), 10);
881        assert_eq!(result[0].key.as_ref(), b"key00000");
882        assert_eq!(result[9].key.as_ref(), b"key00009");
883    }
884
885    #[test]
886    fn test_empty_sources() {
887        let sources: Vec<Box<dyn EntryIterator<'static> + 'static>> = vec![
888            Box::new(VecIterator::new(vec![], 0)),
889            Box::new(VecIterator::new(vec![], 1)),
890        ];
891
892        let mut merge = MergeIterator::new(sources);
893        let result: Vec<_> = merge.by_ref().collect();
894
895        assert_eq!(result.len(), 0);
896    }
897
898    #[test]
899    fn test_scan_stats() {
900        let source1: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
901            vec![
902                make_entry(b"a", b"1", 200),
903                make_entry(b"b", b"", 200), // tombstone
904            ],
905            0,
906        ));
907
908        let source2: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
909            vec![
910                make_entry(b"a", b"old", 100), // duplicate
911                make_entry(b"c", b"3", 100),
912            ],
913            1,
914        ));
915
916        let mut merge = MergeIterator::new(vec![source1, source2]);
917        let _: Vec<_> = merge.by_ref().collect();
918
919        let stats = merge.stats();
920        assert_eq!(stats.entries_scanned, 4);
921        assert_eq!(stats.entries_returned, 2); // a, c
922        assert_eq!(stats.duplicates_skipped, 1); // old 'a'
923        assert_eq!(stats.tombstones_skipped, 1); // 'b'
924    }
925
926    #[test]
927    fn test_priority_ordering() {
928        // Same key in multiple sources - highest priority wins
929        let source1: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
930            vec![make_entry(b"key", b"memtable", 300)],
931            0,
932        )); // Highest priority
933
934        let source2: Box<dyn EntryIterator<'static> + 'static> =
935            Box::new(VecIterator::new(vec![make_entry(b"key", b"l0", 200)], 1));
936
937        let source3: Box<dyn EntryIterator<'static> + 'static> =
938            Box::new(VecIterator::new(vec![make_entry(b"key", b"l1", 100)], 2)); // Lowest priority
939
940        let mut merge = MergeIterator::new(vec![source1, source2, source3]);
941        let result: Vec<_> = merge.by_ref().collect();
942
943        assert_eq!(result.len(), 1);
944        assert_eq!(result[0].value.as_ref(), b"memtable");
945    }
946}