vibesql_storage/database/indexes/
streaming.rs

1// ============================================================================
2// Streaming Range Scan - Iterator-based range scan without materialization
3// ============================================================================
4//
5// This module provides streaming iterators for range scans that avoid
6// materializing all matching row indices into a Vec. This is critical for
7// performance on range queries without LIMIT.
8//
9// Key benefits:
10// - No upfront Vec allocation for matching indices
11// - Memory usage proportional to output, not total matches
12// - Better cache locality (process rows as they're found)
13
14use std::{collections::btree_map, ops::Bound};
15
16use vibesql_types::SqlValue;
17
18/// Self-contained streaming range scan that owns its bounds.
19///
20/// This iterator owns the normalized start/end keys and uses BTreeMap::range()
21/// for efficient seeking instead of iterating through all entries.
22///
23/// The key insight is that we can store the bound keys in the struct and
24/// create the range iterator lazily on first access, avoiding lifetime issues.
25pub struct OwnedStreamingRangeScan<'a> {
26    /// The BTreeMap data reference
27    data: &'a std::collections::BTreeMap<Vec<SqlValue>, Vec<usize>>,
28    /// Current position within the current key's row indices
29    current_indices: Option<std::slice::Iter<'a, usize>>,
30    /// Reference to pending deletions for adjustment
31    pending_deletions: &'a [usize],
32    /// Normalized start bound key (owned)
33    start_key: Option<Vec<SqlValue>>,
34    /// Normalized end bound key (owned)
35    end_key: Option<Vec<SqlValue>>,
36    /// Whether start is inclusive
37    inclusive_start: bool,
38    /// Whether end is inclusive
39    inclusive_end: bool,
40    /// The range iterator (created lazily)
41    range_iter: Option<btree_map::Range<'a, Vec<SqlValue>, Vec<usize>>>,
42    /// Whether we've initialized the range iterator
43    initialized: bool,
44}
45
46impl<'a> OwnedStreamingRangeScan<'a> {
47    /// Create a new owned streaming range scan.
48    ///
49    /// Returns None if the range is empty or invalid.
50    pub fn new(
51        data: &'a std::collections::BTreeMap<Vec<SqlValue>, Vec<usize>>,
52        pending_deletions: &'a [usize],
53        start: Option<SqlValue>,
54        end: Option<SqlValue>,
55        inclusive_start: bool,
56        inclusive_end: bool,
57    ) -> Option<Self> {
58        // Check for empty/invalid ranges
59        if let (Some(start_val), Some(end_val)) = (&start, &end) {
60            if start_val == end_val && (!inclusive_start || !inclusive_end) {
61                return None; // Empty range
62            }
63            if start_val > end_val {
64                return None; // Inverted range
65            }
66        }
67
68        // Convert bounds to key format
69        let start_key = start.map(|v| vec![v]);
70        let end_key = end.map(|v| vec![v]);
71
72        Some(Self {
73            data,
74            current_indices: None,
75            pending_deletions,
76            start_key,
77            end_key,
78            inclusive_start,
79            inclusive_end,
80            range_iter: None,
81            initialized: false,
82        })
83    }
84
85    /// Adjust a row index by accounting for pending deletions.
86    #[inline]
87    fn adjust_row_index(&self, row_idx: usize) -> usize {
88        if self.pending_deletions.is_empty() {
89            row_idx
90        } else {
91            let decrement = self.pending_deletions.partition_point(|&d| d < row_idx);
92            row_idx - decrement
93        }
94    }
95
96    /// Initialize the range iterator if not already done.
97    /// This uses unsafe to extend the lifetime of the keys, which is safe
98    /// because the keys are stored in self and won't be modified.
99    fn ensure_initialized(&mut self) {
100        if self.initialized {
101            return;
102        }
103        self.initialized = true;
104
105        // Build bounds for BTreeMap::range()
106        // We need to create bounds that reference our stored keys
107        let start_bound: Bound<&[SqlValue]> = match &self.start_key {
108            Some(key) if self.inclusive_start => Bound::Included(key.as_slice()),
109            Some(key) => Bound::Excluded(key.as_slice()),
110            None => Bound::Unbounded,
111        };
112
113        let end_bound: Bound<&[SqlValue]> = match &self.end_key {
114            Some(key) if self.inclusive_end => Bound::Included(key.as_slice()),
115            Some(key) => Bound::Excluded(key.as_slice()),
116            None => Bound::Unbounded,
117        };
118
119        // Check for invalid range (both bounds excluded at same value)
120        if let (Bound::Excluded(s), Bound::Excluded(e)) = (&start_bound, &end_bound) {
121            if s == e {
122                return; // Invalid range - leave range_iter as None
123            }
124        }
125
126        // Create the range iterator
127        // SAFETY: The bounds reference self.start_key and self.end_key which are
128        // stored in self. The range iterator only needs these references to be
129        // valid during iteration, and since we're storing the keys in self,
130        // they will outlive the iterator.
131        self.range_iter = Some(self.data.range::<[SqlValue], _>((start_bound, end_bound)));
132    }
133}
134
135impl Iterator for OwnedStreamingRangeScan<'_> {
136    type Item = usize;
137
138    #[inline]
139    fn next(&mut self) -> Option<Self::Item> {
140        // Ensure we've initialized the range iterator
141        self.ensure_initialized();
142
143        loop {
144            // Try to get the next index from the current key's indices
145            if let Some(ref mut indices) = self.current_indices {
146                if let Some(&row_idx) = indices.next() {
147                    return Some(self.adjust_row_index(row_idx));
148                }
149            }
150
151            // Move to the next key in the range
152            let range_iter = self.range_iter.as_mut()?;
153            match range_iter.next() {
154                Some((_key, row_indices)) => {
155                    self.current_indices = Some(row_indices.iter());
156                    // Loop back to try getting from this key's indices
157                }
158                None => return None,
159            }
160        }
161    }
162
163    fn size_hint(&self) -> (usize, Option<usize>) {
164        (0, None)
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use std::collections::BTreeMap;
171
172    use super::*;
173
174    #[test]
175    fn test_owned_streaming_range_scan_basic() {
176        let mut data: BTreeMap<Vec<SqlValue>, Vec<usize>> = BTreeMap::new();
177        data.insert(vec![SqlValue::Integer(1)], vec![0]);
178        data.insert(vec![SqlValue::Integer(2)], vec![1, 2]);
179        data.insert(vec![SqlValue::Integer(3)], vec![3]);
180        data.insert(vec![SqlValue::Integer(4)], vec![4, 5, 6]);
181        data.insert(vec![SqlValue::Integer(5)], vec![7]);
182
183        let pending_deletions: Vec<usize> = vec![];
184
185        // Range scan for values 2..=4
186        let iter = OwnedStreamingRangeScan::new(
187            &data,
188            &pending_deletions,
189            Some(SqlValue::Integer(2)),
190            Some(SqlValue::Integer(4)),
191            true,
192            true,
193        )
194        .unwrap();
195
196        let results: Vec<usize> = iter.collect();
197        assert_eq!(results, vec![1, 2, 3, 4, 5, 6]);
198    }
199
200    #[test]
201    fn test_owned_streaming_range_scan_with_pending_deletions() {
202        let mut data: BTreeMap<Vec<SqlValue>, Vec<usize>> = BTreeMap::new();
203        data.insert(vec![SqlValue::Integer(1)], vec![0]);
204        data.insert(vec![SqlValue::Integer(2)], vec![1]);
205        data.insert(vec![SqlValue::Integer(3)], vec![2]);
206        data.insert(vec![SqlValue::Integer(4)], vec![3]);
207        data.insert(vec![SqlValue::Integer(5)], vec![4]);
208
209        // Row at index 1 was deleted
210        let pending_deletions: Vec<usize> = vec![1];
211
212        // Full scan
213        let iter = OwnedStreamingRangeScan::new(&data, &pending_deletions, None, None, true, true)
214            .unwrap();
215
216        let results: Vec<usize> = iter.collect();
217        // Original: [0, 1, 2, 3, 4]
218        // After adjusting for deletion at 1:
219        // - 0 stays 0 (no deletions before it)
220        // - 1 stays 1 (deletion at 1 is not < 1)
221        // - 2 becomes 1 (1 deletion before it)
222        // - 3 becomes 2 (1 deletion before it)
223        // - 4 becomes 3 (1 deletion before it)
224        assert_eq!(results, vec![0, 1, 1, 2, 3]);
225    }
226
227    #[test]
228    fn test_owned_streaming_range_scan_empty_range() {
229        let mut data: BTreeMap<Vec<SqlValue>, Vec<usize>> = BTreeMap::new();
230        data.insert(vec![SqlValue::Integer(1)], vec![0]);
231        data.insert(vec![SqlValue::Integer(5)], vec![4]);
232
233        let pending_deletions: Vec<usize> = vec![];
234
235        // Range 3..=4 has no matching keys
236        let iter = OwnedStreamingRangeScan::new(
237            &data,
238            &pending_deletions,
239            Some(SqlValue::Integer(3)),
240            Some(SqlValue::Integer(4)),
241            true,
242            true,
243        )
244        .unwrap();
245
246        let results: Vec<usize> = iter.collect();
247        let expected: Vec<usize> = vec![];
248        assert_eq!(results, expected);
249    }
250
251    #[test]
252    fn test_owned_streaming_range_scan_inverted_range() {
253        let mut data: BTreeMap<Vec<SqlValue>, Vec<usize>> = BTreeMap::new();
254        data.insert(vec![SqlValue::Integer(1)], vec![0]);
255
256        let pending_deletions: Vec<usize> = vec![];
257
258        // Inverted range: 5..=1
259        let result = OwnedStreamingRangeScan::new(
260            &data,
261            &pending_deletions,
262            Some(SqlValue::Integer(5)),
263            Some(SqlValue::Integer(1)),
264            true,
265            true,
266        );
267        assert!(result.is_none());
268    }
269
270    #[test]
271    fn test_owned_streaming_range_scan_exclusive_bounds() {
272        let mut data: BTreeMap<Vec<SqlValue>, Vec<usize>> = BTreeMap::new();
273        data.insert(vec![SqlValue::Integer(1)], vec![0]);
274        data.insert(vec![SqlValue::Integer(2)], vec![1]);
275        data.insert(vec![SqlValue::Integer(3)], vec![2]);
276        data.insert(vec![SqlValue::Integer(4)], vec![3]);
277        data.insert(vec![SqlValue::Integer(5)], vec![4]);
278
279        let pending_deletions: Vec<usize> = vec![];
280
281        // Range 2 < x < 4 (exclusive both ends)
282        let iter = OwnedStreamingRangeScan::new(
283            &data,
284            &pending_deletions,
285            Some(SqlValue::Integer(2)),
286            Some(SqlValue::Integer(4)),
287            false,
288            false,
289        )
290        .unwrap();
291
292        let results: Vec<usize> = iter.collect();
293        assert_eq!(results, vec![2]); // Only key 3
294    }
295
296    #[test]
297    fn test_owned_streaming_unbounded_start() {
298        let mut data: BTreeMap<Vec<SqlValue>, Vec<usize>> = BTreeMap::new();
299        data.insert(vec![SqlValue::Integer(1)], vec![0]);
300        data.insert(vec![SqlValue::Integer(2)], vec![1]);
301        data.insert(vec![SqlValue::Integer(3)], vec![2]);
302
303        let pending_deletions: Vec<usize> = vec![];
304
305        // Range ..=2
306        let iter = OwnedStreamingRangeScan::new(
307            &data,
308            &pending_deletions,
309            None,
310            Some(SqlValue::Integer(2)),
311            true,
312            true,
313        )
314        .unwrap();
315
316        let results: Vec<usize> = iter.collect();
317        assert_eq!(results, vec![0, 1]); // Keys 1 and 2
318    }
319
320    #[test]
321    fn test_owned_streaming_unbounded_end() {
322        let mut data: BTreeMap<Vec<SqlValue>, Vec<usize>> = BTreeMap::new();
323        data.insert(vec![SqlValue::Integer(1)], vec![0]);
324        data.insert(vec![SqlValue::Integer(2)], vec![1]);
325        data.insert(vec![SqlValue::Integer(3)], vec![2]);
326
327        let pending_deletions: Vec<usize> = vec![];
328
329        // Range 2..
330        let iter = OwnedStreamingRangeScan::new(
331            &data,
332            &pending_deletions,
333            Some(SqlValue::Integer(2)),
334            None,
335            true,
336            true,
337        )
338        .unwrap();
339
340        let results: Vec<usize> = iter.collect();
341        assert_eq!(results, vec![1, 2]); // Keys 2 and 3
342    }
343}