Skip to main content

sochdb_storage/
streaming_iterator.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Streaming Iterator Architecture for Scans
19//!
20//! This module implements memory-efficient streaming iterators that:
21//! - Use O(k) memory for LIMIT k queries regardless of table size
22//! - Enable zero-copy SSTable reads via mmap
23//! - Support merge iteration over multiple sorted sources (LSM-tree style)
24//! - Provide backpressure to prevent memory exhaustion
25//!
26//! ## Memory Complexity
27//!
28//! Current scan: M = O(N) where N = rows in scan range
29//! Streaming:    M = O(k + S × log(S)) where k = LIMIT, S = number of sources
30//!
31//! For 100M rows with LIMIT 10:
32//! - Current: 10GB allocation (100M × 100 bytes)
33//! - Streaming: ~10KB (10 rows + heap overhead)
34
35use std::borrow::Cow;
36use std::cmp::Ordering;
37use std::collections::BinaryHeap;
38
39/// A key-value entry with timestamp for MVCC
40#[derive(Debug, Clone)]
41pub struct Entry<'a> {
42    pub key: Cow<'a, [u8]>,
43    pub value: Cow<'a, [u8]>,
44    pub timestamp: u64,
45    pub is_tombstone: bool,
46}
47
48impl<'a> Entry<'a> {
49    /// Create a new entry
50    pub fn new(
51        key: impl Into<Cow<'a, [u8]>>,
52        value: impl Into<Cow<'a, [u8]>>,
53        timestamp: u64,
54    ) -> Self {
55        let value = value.into();
56        let is_tombstone = value.is_empty();
57        Self {
58            key: key.into(),
59            value,
60            timestamp,
61            is_tombstone,
62        }
63    }
64
65    /// Create a tombstone entry
66    pub fn tombstone(key: impl Into<Cow<'a, [u8]>>, timestamp: u64) -> Self {
67        Self {
68            key: key.into(),
69            value: Cow::Borrowed(&[]),
70            timestamp,
71            is_tombstone: true,
72        }
73    }
74
75    /// Convert to owned version
76    pub fn into_owned(self) -> Entry<'static> {
77        Entry {
78            key: Cow::Owned(self.key.into_owned()),
79            value: Cow::Owned(self.value.into_owned()),
80            timestamp: self.timestamp,
81            is_tombstone: self.is_tombstone,
82        }
83    }
84}
85
86/// Trait for a streaming source of entries
87pub trait EntryIterator<'a>: Send {
88    /// Peek at the next entry without consuming it
89    fn peek(&self) -> Option<&Entry<'a>>;
90
91    /// Advance to the next entry
92    fn advance(&mut self);
93
94    /// Check if the iterator is exhausted
95    fn is_exhausted(&self) -> bool;
96
97    /// Get the source priority (lower = higher priority for same key)
98    fn source_priority(&self) -> u8;
99}
100
101/// Peekable wrapper around an entry iterator
102struct PeekableSource<'a> {
103    source: Box<dyn EntryIterator<'a> + 'a>,
104    priority: u8,
105}
106
107impl<'a> PeekableSource<'a> {
108    fn new(source: Box<dyn EntryIterator<'a> + 'a>) -> Self {
109        let priority = source.source_priority();
110        Self { source, priority }
111    }
112}
113
114impl<'a> PartialEq for PeekableSource<'a> {
115    fn eq(&self, other: &Self) -> bool {
116        match (self.source.peek(), other.source.peek()) {
117            (Some(a), Some(b)) => a.key == b.key && self.priority == other.priority,
118            (None, None) => true,
119            _ => false,
120        }
121    }
122}
123
124impl<'a> Eq for PeekableSource<'a> {}
125
126impl<'a> PartialOrd for PeekableSource<'a> {
127    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
128        Some(self.cmp(other))
129    }
130}
131
132impl<'a> Ord for PeekableSource<'a> {
133    fn cmp(&self, other: &Self) -> Ordering {
134        match (self.source.peek(), other.source.peek()) {
135            (Some(a), Some(b)) => {
136                // Min-heap: reverse ordering
137                // First by key (ascending), then by priority (ascending = higher priority)
138                match b.key.cmp(&a.key) {
139                    Ordering::Equal => other.priority.cmp(&self.priority),
140                    ord => ord,
141                }
142            }
143            (Some(_), None) => Ordering::Greater, // None goes last
144            (None, Some(_)) => Ordering::Less,
145            (None, None) => Ordering::Equal,
146        }
147    }
148}
149
150/// Merge iterator over multiple sorted sources (LSM-tree style)
151pub struct MergeIterator<'a> {
152    /// Min-heap of source iterators
153    heap: BinaryHeap<PeekableSource<'a>>,
154    /// Current key for deduplication
155    current_key: Option<Vec<u8>>,
156    /// Statistics
157    stats: ScanStats,
158}
159
160impl<'a> MergeIterator<'a> {
161    /// Create a new merge iterator from multiple sources
162    pub fn new(sources: Vec<Box<dyn EntryIterator<'a> + 'a>>) -> Self {
163        let mut heap = BinaryHeap::with_capacity(sources.len());
164
165        for source in sources {
166            let peekable = PeekableSource::new(source);
167            if !peekable.source.is_exhausted() {
168                heap.push(peekable);
169            }
170        }
171
172        Self {
173            heap,
174            current_key: None,
175            stats: ScanStats::default(),
176        }
177    }
178
179    /// Get scan statistics
180    pub fn stats(&self) -> &ScanStats {
181        &self.stats
182    }
183}
184
185impl<'a> Iterator for MergeIterator<'a> {
186    type Item = Entry<'a>;
187
188    fn next(&mut self) -> Option<Self::Item> {
189        loop {
190            // Pop the source with smallest key
191            let mut source = self.heap.pop()?;
192
193            // Get the entry (must exist since we only push non-exhausted sources)
194            let entry = source.source.peek()?.clone();
195
196            // Advance this source
197            source.source.advance();
198
199            // Re-insert if not exhausted
200            if !source.source.is_exhausted() {
201                self.heap.push(source);
202            }
203
204            self.stats.entries_scanned += 1;
205
206            // Skip duplicate keys (keep only newest version)
207            if let Some(ref current) = self.current_key
208                && current.as_slice() == entry.key.as_ref()
209            {
210                self.stats.duplicates_skipped += 1;
211                continue;
212            }
213
214            self.current_key = Some(entry.key.to_vec());
215
216            // Skip tombstones
217            if entry.is_tombstone {
218                self.stats.tombstones_skipped += 1;
219                continue;
220            }
221
222            self.stats.entries_returned += 1;
223            return Some(entry);
224        }
225    }
226}
227
228/// Scan statistics
229#[derive(Debug, Default, Clone)]
230pub struct ScanStats {
231    pub entries_scanned: u64,
232    pub entries_returned: u64,
233    pub duplicates_skipped: u64,
234    pub tombstones_skipped: u64,
235}
236
237/// Vector-based entry iterator for testing and simple cases
238pub struct VecIterator<'a> {
239    entries: Vec<Entry<'a>>,
240    position: usize,
241    priority: u8,
242}
243
244impl<'a> VecIterator<'a> {
245    pub fn new(entries: Vec<Entry<'a>>, priority: u8) -> Self {
246        Self {
247            entries,
248            position: 0,
249            priority,
250        }
251    }
252}
253
254impl<'a> EntryIterator<'a> for VecIterator<'a> {
255    fn peek(&self) -> Option<&Entry<'a>> {
256        self.entries.get(self.position)
257    }
258
259    fn advance(&mut self) {
260        self.position += 1;
261    }
262
263    fn is_exhausted(&self) -> bool {
264        self.position >= self.entries.len()
265    }
266
267    fn source_priority(&self) -> u8 {
268        self.priority
269    }
270}
271
272/// Range-bounded iterator wrapper
273#[allow(dead_code)]
274pub struct RangeIterator<'a, I: EntryIterator<'a>> {
275    inner: I,
276    start_key: Vec<u8>,
277    end_key: Vec<u8>,
278    started: bool,
279    ended: bool,
280    _marker: std::marker::PhantomData<&'a ()>,
281}
282
283impl<'a, I: EntryIterator<'a>> RangeIterator<'a, I> {
284    pub fn new(inner: I, start_key: Vec<u8>, end_key: Vec<u8>) -> Self {
285        Self {
286            inner,
287            start_key,
288            end_key,
289            started: false,
290            ended: false,
291            _marker: std::marker::PhantomData,
292        }
293    }
294}
295
296impl<'a, I: EntryIterator<'a> + Send> EntryIterator<'a> for RangeIterator<'a, I> {
297    fn peek(&self) -> Option<&Entry<'a>> {
298        if self.ended {
299            return None;
300        }
301
302        let entry = self.inner.peek()?;
303
304        // Check end bound
305        if entry.key.as_ref() > self.end_key.as_slice() {
306            return None;
307        }
308
309        Some(entry)
310    }
311
312    fn advance(&mut self) {
313        if self.ended {
314            return;
315        }
316
317        self.inner.advance();
318
319        // Check if we've passed the end
320        if let Some(entry) = self.inner.peek()
321            && entry.key.as_ref() > self.end_key.as_slice()
322        {
323            self.ended = true;
324        }
325    }
326
327    fn is_exhausted(&self) -> bool {
328        self.ended || self.inner.is_exhausted()
329    }
330
331    fn source_priority(&self) -> u8 {
332        self.inner.source_priority()
333    }
334}
335
336/// Limit iterator wrapper
337pub struct LimitIterator<'a, I: Iterator<Item = Entry<'a>>> {
338    inner: I,
339    limit: usize,
340    count: usize,
341}
342
343impl<'a, I: Iterator<Item = Entry<'a>>> LimitIterator<'a, I> {
344    pub fn new(inner: I, limit: usize) -> Self {
345        Self {
346            inner,
347            limit,
348            count: 0,
349        }
350    }
351}
352
353impl<'a, I: Iterator<Item = Entry<'a>>> Iterator for LimitIterator<'a, I> {
354    type Item = Entry<'a>;
355
356    fn next(&mut self) -> Option<Self::Item> {
357        if self.count >= self.limit {
358            return None;
359        }
360        self.count += 1;
361        self.inner.next()
362    }
363}
364
365/// Filter iterator for predicate pushdown
366pub struct FilterIterator<'a, I, F>
367where
368    I: Iterator<Item = Entry<'a>>,
369    F: Fn(&Entry<'a>) -> bool,
370{
371    inner: I,
372    predicate: F,
373}
374
375impl<'a, I, F> FilterIterator<'a, I, F>
376where
377    I: Iterator<Item = Entry<'a>>,
378    F: Fn(&Entry<'a>) -> bool,
379{
380    pub fn new(inner: I, predicate: F) -> Self {
381        Self { inner, predicate }
382    }
383}
384
385impl<'a, I, F> Iterator for FilterIterator<'a, I, F>
386where
387    I: Iterator<Item = Entry<'a>>,
388    F: Fn(&Entry<'a>) -> bool,
389{
390    type Item = Entry<'a>;
391
392    fn next(&mut self) -> Option<Self::Item> {
393        loop {
394            let entry = self.inner.next()?;
395            if (self.predicate)(&entry) {
396                return Some(entry);
397            }
398        }
399    }
400}
401
402/// Memtable iterator (in-memory sorted data)
403pub struct MemtableIterator<'a> {
404    entries: Vec<Entry<'a>>,
405    position: usize,
406}
407
408impl<'a> MemtableIterator<'a> {
409    pub fn new(entries: Vec<Entry<'a>>) -> Self {
410        Self {
411            entries,
412            position: 0,
413        }
414    }
415
416    pub fn from_btree<K, V>(tree: &'a std::collections::BTreeMap<K, V>, timestamp: u64) -> Self
417    where
418        K: AsRef<[u8]>,
419        V: AsRef<[u8]>,
420    {
421        let entries: Vec<_> = tree
422            .iter()
423            .map(|(k, v)| {
424                Entry::new(
425                    Cow::Borrowed(k.as_ref()),
426                    Cow::Borrowed(v.as_ref()),
427                    timestamp,
428                )
429            })
430            .collect();
431
432        Self::new(entries)
433    }
434}
435
436impl<'a> EntryIterator<'a> for MemtableIterator<'a> {
437    fn peek(&self) -> Option<&Entry<'a>> {
438        self.entries.get(self.position)
439    }
440
441    fn advance(&mut self) {
442        self.position += 1;
443    }
444
445    fn is_exhausted(&self) -> bool {
446        self.position >= self.entries.len()
447    }
448
449    fn source_priority(&self) -> u8 {
450        0 // Memtable has highest priority (newest data)
451    }
452}
453
454/// SSTable iterator (simulated - real impl would use mmap)
455pub struct SstIterator<'a> {
456    /// Block data (simulating mmap region)
457    data: &'a [u8],
458    /// Current position in data
459    position: usize,
460    /// Cached current entry
461    current: Option<Entry<'a>>,
462    /// End key for range bound
463    end_key: Option<Vec<u8>>,
464    /// SSTable level (higher = lower priority)
465    level: u8,
466}
467
468impl<'a> SstIterator<'a> {
469    pub fn new(data: &'a [u8], level: u8) -> Self {
470        let mut iter = Self {
471            data,
472            position: 0,
473            current: None,
474            end_key: None,
475            level,
476        };
477        iter.read_next();
478        iter
479    }
480
481    pub fn with_end_key(mut self, end_key: Vec<u8>) -> Self {
482        self.end_key = Some(end_key);
483        self
484    }
485
486    fn read_next(&mut self) {
487        if self.position >= self.data.len() {
488            self.current = None;
489            return;
490        }
491
492        // Simple format: [key_len: u16][value_len: u16][timestamp: u64][key][value]
493        if self.position + 12 > self.data.len() {
494            self.current = None;
495            return;
496        }
497
498        let key_len =
499            u16::from_le_bytes([self.data[self.position], self.data[self.position + 1]]) as usize;
500        let value_len =
501            u16::from_le_bytes([self.data[self.position + 2], self.data[self.position + 3]])
502                as usize;
503        let timestamp = u64::from_le_bytes([
504            self.data[self.position + 4],
505            self.data[self.position + 5],
506            self.data[self.position + 6],
507            self.data[self.position + 7],
508            self.data[self.position + 8],
509            self.data[self.position + 9],
510            self.data[self.position + 10],
511            self.data[self.position + 11],
512        ]);
513
514        let key_start = self.position + 12;
515        let key_end = key_start + key_len;
516        let value_end = key_end + value_len;
517
518        if value_end > self.data.len() {
519            self.current = None;
520            return;
521        }
522
523        let key = &self.data[key_start..key_end];
524        let value = &self.data[key_end..value_end];
525
526        // Check end bound
527        if let Some(ref end_key) = self.end_key
528            && key > end_key.as_slice()
529        {
530            self.current = None;
531            return;
532        }
533
534        self.current = Some(Entry::new(
535            Cow::Borrowed(key),
536            Cow::Borrowed(value),
537            timestamp,
538        ));
539        self.position = value_end;
540    }
541}
542
543impl<'a> EntryIterator<'a> for SstIterator<'a> {
544    fn peek(&self) -> Option<&Entry<'a>> {
545        self.current.as_ref()
546    }
547
548    fn advance(&mut self) {
549        self.read_next();
550    }
551
552    fn is_exhausted(&self) -> bool {
553        self.current.is_none()
554    }
555
556    fn source_priority(&self) -> u8 {
557        // Lower level = higher priority (L0 > L1 > L2...)
558        // Memtable is priority 0, so SST levels start at 1
559        self.level + 1
560    }
561}
562
563/// Builder for SSTable data (for testing)
564pub struct SstBuilder {
565    data: Vec<u8>,
566}
567
568impl Default for SstBuilder {
569    fn default() -> Self {
570        Self::new()
571    }
572}
573
574impl SstBuilder {
575    pub fn new() -> Self {
576        Self { data: Vec::new() }
577    }
578
579    pub fn add(&mut self, key: &[u8], value: &[u8], timestamp: u64) -> &mut Self {
580        self.data
581            .extend_from_slice(&(key.len() as u16).to_le_bytes());
582        self.data
583            .extend_from_slice(&(value.len() as u16).to_le_bytes());
584        self.data.extend_from_slice(&timestamp.to_le_bytes());
585        self.data.extend_from_slice(key);
586        self.data.extend_from_slice(value);
587        self
588    }
589
590    pub fn build(self) -> Vec<u8> {
591        self.data
592    }
593}
594
595/// Extension trait for iterators
596pub trait IteratorExt<'a>: Iterator<Item = Entry<'a>> + Sized {
597    fn limit(self, n: usize) -> LimitIterator<'a, Self> {
598        LimitIterator::new(self, n)
599    }
600
601    fn filter_entries<F>(self, predicate: F) -> FilterIterator<'a, Self, F>
602    where
603        F: Fn(&Entry<'a>) -> bool,
604    {
605        FilterIterator::new(self, predicate)
606    }
607}
608
609impl<'a, I: Iterator<Item = Entry<'a>>> IteratorExt<'a> for I {}
610
611#[cfg(test)]
612mod tests {
613    use super::*;
614
615    fn make_entry(key: &[u8], value: &[u8], ts: u64) -> Entry<'static> {
616        Entry {
617            key: Cow::Owned(key.to_vec()),
618            value: Cow::Owned(value.to_vec()),
619            timestamp: ts,
620            is_tombstone: value.is_empty(),
621        }
622    }
623
624    #[test]
625    fn test_vec_iterator() {
626        let entries = vec![
627            make_entry(b"a", b"1", 100),
628            make_entry(b"b", b"2", 100),
629            make_entry(b"c", b"3", 100),
630        ];
631
632        let iter = VecIterator::new(entries, 0);
633        assert!(!iter.is_exhausted());
634        assert_eq!(iter.peek().unwrap().key.as_ref(), b"a");
635    }
636
637    #[test]
638    fn test_merge_iterator_single_source() {
639        let entries = vec![
640            make_entry(b"a", b"1", 100),
641            make_entry(b"b", b"2", 100),
642            make_entry(b"c", b"3", 100),
643        ];
644
645        let source: Box<dyn EntryIterator<'static> + 'static> =
646            Box::new(VecIterator::new(entries, 0));
647        let mut merge = MergeIterator::new(vec![source]);
648
649        let result: Vec<_> = merge.by_ref().collect();
650        assert_eq!(result.len(), 3);
651        assert_eq!(result[0].key.as_ref(), b"a");
652        assert_eq!(result[1].key.as_ref(), b"b");
653        assert_eq!(result[2].key.as_ref(), b"c");
654    }
655
656    #[test]
657    fn test_merge_iterator_multiple_sources() {
658        let source1: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
659            vec![
660                make_entry(b"a", b"1", 100),
661                make_entry(b"c", b"3", 100),
662                make_entry(b"e", b"5", 100),
663            ],
664            0,
665        ));
666
667        let source2: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
668            vec![
669                make_entry(b"b", b"2", 100),
670                make_entry(b"d", b"4", 100),
671                make_entry(b"f", b"6", 100),
672            ],
673            1,
674        ));
675
676        let mut merge = MergeIterator::new(vec![source1, source2]);
677        let result: Vec<_> = merge.by_ref().collect();
678
679        assert_eq!(result.len(), 6);
680        assert_eq!(result[0].key.as_ref(), b"a");
681        assert_eq!(result[1].key.as_ref(), b"b");
682        assert_eq!(result[2].key.as_ref(), b"c");
683        assert_eq!(result[3].key.as_ref(), b"d");
684        assert_eq!(result[4].key.as_ref(), b"e");
685        assert_eq!(result[5].key.as_ref(), b"f");
686    }
687
688    #[test]
689    fn test_merge_iterator_deduplication() {
690        // Source 0 has higher priority (memtable)
691        let source1: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
692            vec![
693                make_entry(b"a", b"new_value", 200), // Newer version
694            ],
695            0,
696        ));
697
698        // Source 1 has lower priority (SSTable)
699        let source2: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
700            vec![
701                make_entry(b"a", b"old_value", 100), // Older version
702                make_entry(b"b", b"2", 100),
703            ],
704            1,
705        ));
706
707        let mut merge = MergeIterator::new(vec![source1, source2]);
708        let result: Vec<_> = merge.by_ref().collect();
709
710        // Should have 2 entries, with 'a' having the new value
711        assert_eq!(result.len(), 2);
712        assert_eq!(result[0].key.as_ref(), b"a");
713        assert_eq!(result[0].value.as_ref(), b"new_value");
714        assert_eq!(result[1].key.as_ref(), b"b");
715
716        // Check stats
717        assert_eq!(merge.stats().duplicates_skipped, 1);
718    }
719
720    #[test]
721    fn test_merge_iterator_tombstones() {
722        let source1: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
723            vec![
724                Entry::tombstone(Cow::Owned(b"a".to_vec()), 200), // Delete
725            ],
726            0,
727        ));
728
729        let source2: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
730            vec![
731                make_entry(b"a", b"old_value", 100),
732                make_entry(b"b", b"2", 100),
733            ],
734            1,
735        ));
736
737        let mut merge = MergeIterator::new(vec![source1, source2]);
738        let result: Vec<_> = merge.by_ref().collect();
739
740        // 'a' should be skipped (tombstone), only 'b' remains
741        assert_eq!(result.len(), 1);
742        assert_eq!(result[0].key.as_ref(), b"b");
743
744        assert_eq!(merge.stats().tombstones_skipped, 1);
745    }
746
747    #[test]
748    fn test_limit_iterator() {
749        let entries = vec![
750            make_entry(b"a", b"1", 100),
751            make_entry(b"b", b"2", 100),
752            make_entry(b"c", b"3", 100),
753            make_entry(b"d", b"4", 100),
754            make_entry(b"e", b"5", 100),
755        ];
756
757        let source: Box<dyn EntryIterator<'static> + 'static> =
758            Box::new(VecIterator::new(entries, 0));
759        let merge = MergeIterator::new(vec![source]);
760
761        let result: Vec<_> = merge.limit(3).collect();
762
763        assert_eq!(result.len(), 3);
764        assert_eq!(result[0].key.as_ref(), b"a");
765        assert_eq!(result[2].key.as_ref(), b"c");
766    }
767
768    #[test]
769    fn test_filter_iterator() {
770        let entries = vec![
771            make_entry(b"a", b"1", 100),
772            make_entry(b"b", b"2", 100),
773            make_entry(b"c", b"3", 100),
774            make_entry(b"d", b"4", 100),
775        ];
776
777        let source: Box<dyn EntryIterator<'static> + 'static> =
778            Box::new(VecIterator::new(entries, 0));
779        let merge = MergeIterator::new(vec![source]);
780
781        // Filter to only keys < "c"
782        let result: Vec<_> = merge
783            .filter_entries(|e| e.key.as_ref() < b"c".as_slice())
784            .collect();
785
786        assert_eq!(result.len(), 2);
787        assert_eq!(result[0].key.as_ref(), b"a");
788        assert_eq!(result[1].key.as_ref(), b"b");
789    }
790
791    #[test]
792    fn test_sst_builder_and_iterator() {
793        let mut builder = SstBuilder::new();
794        builder
795            .add(b"apple", b"red", 100)
796            .add(b"banana", b"yellow", 100)
797            .add(b"cherry", b"red", 100);
798
799        let data = builder.build();
800        let iter = SstIterator::new(&data, 0);
801
802        assert!(!iter.is_exhausted());
803        assert_eq!(iter.peek().unwrap().key.as_ref(), b"apple");
804    }
805
806    #[test]
807    fn test_sst_iterator_full() {
808        let mut builder = SstBuilder::new();
809        builder
810            .add(b"a", b"1", 100)
811            .add(b"b", b"2", 200)
812            .add(b"c", b"3", 300);
813
814        let data = builder.build();
815        let mut iter = SstIterator::new(&data, 0);
816
817        let mut results = Vec::new();
818        while !iter.is_exhausted() {
819            results.push(iter.peek().unwrap().clone());
820            iter.advance();
821        }
822
823        assert_eq!(results.len(), 3);
824        assert_eq!(results[0].key.as_ref(), b"a");
825        assert_eq!(results[0].timestamp, 100);
826        assert_eq!(results[1].key.as_ref(), b"b");
827        assert_eq!(results[1].timestamp, 200);
828        assert_eq!(results[2].key.as_ref(), b"c");
829        assert_eq!(results[2].timestamp, 300);
830    }
831
832    #[test]
833    fn test_sst_with_merge() {
834        let mut builder1 = SstBuilder::new();
835        builder1.add(b"a", b"1", 100).add(b"c", b"3", 100);
836        let data1 = builder1.build();
837
838        let mut builder2 = SstBuilder::new();
839        builder2.add(b"b", b"2", 100).add(b"d", b"4", 100);
840        let data2 = builder2.build();
841
842        // Need to use 'static lifetime for the test
843        // In real usage, data would be mmap'd and outlive the iterator
844        let data1_static: &'static [u8] = Box::leak(data1.into_boxed_slice());
845        let data2_static: &'static [u8] = Box::leak(data2.into_boxed_slice());
846
847        let source1: Box<dyn EntryIterator<'static> + 'static> =
848            Box::new(SstIterator::new(data1_static, 0));
849        let source2: Box<dyn EntryIterator<'static> + 'static> =
850            Box::new(SstIterator::new(data2_static, 1));
851
852        let mut merge = MergeIterator::new(vec![source1, source2]);
853        let result: Vec<_> = merge.by_ref().collect();
854
855        assert_eq!(result.len(), 4);
856        assert_eq!(result[0].key.as_ref(), b"a");
857        assert_eq!(result[1].key.as_ref(), b"b");
858        assert_eq!(result[2].key.as_ref(), b"c");
859        assert_eq!(result[3].key.as_ref(), b"d");
860    }
861
862    #[test]
863    fn test_memory_efficiency() {
864        // This test demonstrates O(k) memory for LIMIT k
865        // Create "large" sources (simulated)
866        let entries1: Vec<Entry<'static>> = (0..100)
867            .map(|i| make_entry(format!("key{:05}", i * 2).as_bytes(), b"value", 100))
868            .collect();
869        let entries2: Vec<Entry<'static>> = (0..100)
870            .map(|i| make_entry(format!("key{:05}", i * 2 + 1).as_bytes(), b"value", 100))
871            .collect();
872
873        let source1: Box<dyn EntryIterator<'static> + 'static> =
874            Box::new(VecIterator::new(entries1, 0));
875        let source2: Box<dyn EntryIterator<'static> + 'static> =
876            Box::new(VecIterator::new(entries2, 1));
877
878        let merge = MergeIterator::new(vec![source1, source2]);
879
880        // Only take 10 - should use O(10) output memory, not O(200)
881        let result: Vec<_> = merge.limit(10).collect();
882
883        assert_eq!(result.len(), 10);
884        assert_eq!(result[0].key.as_ref(), b"key00000");
885        assert_eq!(result[9].key.as_ref(), b"key00009");
886    }
887
888    #[test]
889    fn test_empty_sources() {
890        let sources: Vec<Box<dyn EntryIterator<'static> + 'static>> = vec![
891            Box::new(VecIterator::new(vec![], 0)),
892            Box::new(VecIterator::new(vec![], 1)),
893        ];
894
895        let mut merge = MergeIterator::new(sources);
896        let result: Vec<_> = merge.by_ref().collect();
897
898        assert_eq!(result.len(), 0);
899    }
900
901    #[test]
902    fn test_scan_stats() {
903        let source1: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
904            vec![
905                make_entry(b"a", b"1", 200),
906                make_entry(b"b", b"", 200), // tombstone
907            ],
908            0,
909        ));
910
911        let source2: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
912            vec![
913                make_entry(b"a", b"old", 100), // duplicate
914                make_entry(b"c", b"3", 100),
915            ],
916            1,
917        ));
918
919        let mut merge = MergeIterator::new(vec![source1, source2]);
920        let _: Vec<_> = merge.by_ref().collect();
921
922        let stats = merge.stats();
923        assert_eq!(stats.entries_scanned, 4);
924        assert_eq!(stats.entries_returned, 2); // a, c
925        assert_eq!(stats.duplicates_skipped, 1); // old 'a'
926        assert_eq!(stats.tombstones_skipped, 1); // 'b'
927    }
928
929    #[test]
930    fn test_priority_ordering() {
931        // Same key in multiple sources - highest priority wins
932        let source1: Box<dyn EntryIterator<'static> + 'static> = Box::new(VecIterator::new(
933            vec![make_entry(b"key", b"memtable", 300)],
934            0,
935        )); // Highest priority
936
937        let source2: Box<dyn EntryIterator<'static> + 'static> =
938            Box::new(VecIterator::new(vec![make_entry(b"key", b"l0", 200)], 1));
939
940        let source3: Box<dyn EntryIterator<'static> + 'static> =
941            Box::new(VecIterator::new(vec![make_entry(b"key", b"l1", 100)], 2)); // Lowest priority
942
943        let mut merge = MergeIterator::new(vec![source1, source2, source3]);
944        let result: Vec<_> = merge.by_ref().collect();
945
946        assert_eq!(result.len(), 1);
947        assert_eq!(result[0].value.as_ref(), b"memtable");
948    }
949}