vibesql_storage/database/indexes/
streaming.rs1use std::{collections::btree_map, ops::Bound};
15
16use vibesql_types::SqlValue;
17
18pub struct OwnedStreamingRangeScan<'a> {
26 data: &'a std::collections::BTreeMap<Vec<SqlValue>, Vec<usize>>,
28 current_indices: Option<std::slice::Iter<'a, usize>>,
30 pending_deletions: &'a [usize],
32 start_key: Option<Vec<SqlValue>>,
34 end_key: Option<Vec<SqlValue>>,
36 inclusive_start: bool,
38 inclusive_end: bool,
40 range_iter: Option<btree_map::Range<'a, Vec<SqlValue>, Vec<usize>>>,
42 initialized: bool,
44}
45
46impl<'a> OwnedStreamingRangeScan<'a> {
47 pub fn new(
51 data: &'a std::collections::BTreeMap<Vec<SqlValue>, Vec<usize>>,
52 pending_deletions: &'a [usize],
53 start: Option<SqlValue>,
54 end: Option<SqlValue>,
55 inclusive_start: bool,
56 inclusive_end: bool,
57 ) -> Option<Self> {
58 if let (Some(start_val), Some(end_val)) = (&start, &end) {
60 if start_val == end_val && (!inclusive_start || !inclusive_end) {
61 return None; }
63 if start_val > end_val {
64 return None; }
66 }
67
68 let start_key = start.map(|v| vec![v]);
70 let end_key = end.map(|v| vec![v]);
71
72 Some(Self {
73 data,
74 current_indices: None,
75 pending_deletions,
76 start_key,
77 end_key,
78 inclusive_start,
79 inclusive_end,
80 range_iter: None,
81 initialized: false,
82 })
83 }
84
85 #[inline]
87 fn adjust_row_index(&self, row_idx: usize) -> usize {
88 if self.pending_deletions.is_empty() {
89 row_idx
90 } else {
91 let decrement = self.pending_deletions.partition_point(|&d| d < row_idx);
92 row_idx - decrement
93 }
94 }
95
96 fn ensure_initialized(&mut self) {
100 if self.initialized {
101 return;
102 }
103 self.initialized = true;
104
105 let start_bound: Bound<&[SqlValue]> = match &self.start_key {
108 Some(key) if self.inclusive_start => Bound::Included(key.as_slice()),
109 Some(key) => Bound::Excluded(key.as_slice()),
110 None => Bound::Unbounded,
111 };
112
113 let end_bound: Bound<&[SqlValue]> = match &self.end_key {
114 Some(key) if self.inclusive_end => Bound::Included(key.as_slice()),
115 Some(key) => Bound::Excluded(key.as_slice()),
116 None => Bound::Unbounded,
117 };
118
119 if let (Bound::Excluded(s), Bound::Excluded(e)) = (&start_bound, &end_bound) {
121 if s == e {
122 return; }
124 }
125
126 self.range_iter = Some(self.data.range::<[SqlValue], _>((start_bound, end_bound)));
132 }
133}
134
135impl Iterator for OwnedStreamingRangeScan<'_> {
136 type Item = usize;
137
138 #[inline]
139 fn next(&mut self) -> Option<Self::Item> {
140 self.ensure_initialized();
142
143 loop {
144 if let Some(ref mut indices) = self.current_indices {
146 if let Some(&row_idx) = indices.next() {
147 return Some(self.adjust_row_index(row_idx));
148 }
149 }
150
151 let range_iter = self.range_iter.as_mut()?;
153 match range_iter.next() {
154 Some((_key, row_indices)) => {
155 self.current_indices = Some(row_indices.iter());
156 }
158 None => return None,
159 }
160 }
161 }
162
163 fn size_hint(&self) -> (usize, Option<usize>) {
164 (0, None)
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use std::collections::BTreeMap;
171
172 use super::*;
173
174 #[test]
175 fn test_owned_streaming_range_scan_basic() {
176 let mut data: BTreeMap<Vec<SqlValue>, Vec<usize>> = BTreeMap::new();
177 data.insert(vec![SqlValue::Integer(1)], vec![0]);
178 data.insert(vec![SqlValue::Integer(2)], vec![1, 2]);
179 data.insert(vec![SqlValue::Integer(3)], vec![3]);
180 data.insert(vec![SqlValue::Integer(4)], vec![4, 5, 6]);
181 data.insert(vec![SqlValue::Integer(5)], vec![7]);
182
183 let pending_deletions: Vec<usize> = vec![];
184
185 let iter = OwnedStreamingRangeScan::new(
187 &data,
188 &pending_deletions,
189 Some(SqlValue::Integer(2)),
190 Some(SqlValue::Integer(4)),
191 true,
192 true,
193 )
194 .unwrap();
195
196 let results: Vec<usize> = iter.collect();
197 assert_eq!(results, vec![1, 2, 3, 4, 5, 6]);
198 }
199
200 #[test]
201 fn test_owned_streaming_range_scan_with_pending_deletions() {
202 let mut data: BTreeMap<Vec<SqlValue>, Vec<usize>> = BTreeMap::new();
203 data.insert(vec![SqlValue::Integer(1)], vec![0]);
204 data.insert(vec![SqlValue::Integer(2)], vec![1]);
205 data.insert(vec![SqlValue::Integer(3)], vec![2]);
206 data.insert(vec![SqlValue::Integer(4)], vec![3]);
207 data.insert(vec![SqlValue::Integer(5)], vec![4]);
208
209 let pending_deletions: Vec<usize> = vec![1];
211
212 let iter = OwnedStreamingRangeScan::new(&data, &pending_deletions, None, None, true, true)
214 .unwrap();
215
216 let results: Vec<usize> = iter.collect();
217 assert_eq!(results, vec![0, 1, 1, 2, 3]);
225 }
226
227 #[test]
228 fn test_owned_streaming_range_scan_empty_range() {
229 let mut data: BTreeMap<Vec<SqlValue>, Vec<usize>> = BTreeMap::new();
230 data.insert(vec![SqlValue::Integer(1)], vec![0]);
231 data.insert(vec![SqlValue::Integer(5)], vec![4]);
232
233 let pending_deletions: Vec<usize> = vec![];
234
235 let iter = OwnedStreamingRangeScan::new(
237 &data,
238 &pending_deletions,
239 Some(SqlValue::Integer(3)),
240 Some(SqlValue::Integer(4)),
241 true,
242 true,
243 )
244 .unwrap();
245
246 let results: Vec<usize> = iter.collect();
247 let expected: Vec<usize> = vec![];
248 assert_eq!(results, expected);
249 }
250
251 #[test]
252 fn test_owned_streaming_range_scan_inverted_range() {
253 let mut data: BTreeMap<Vec<SqlValue>, Vec<usize>> = BTreeMap::new();
254 data.insert(vec![SqlValue::Integer(1)], vec![0]);
255
256 let pending_deletions: Vec<usize> = vec![];
257
258 let result = OwnedStreamingRangeScan::new(
260 &data,
261 &pending_deletions,
262 Some(SqlValue::Integer(5)),
263 Some(SqlValue::Integer(1)),
264 true,
265 true,
266 );
267 assert!(result.is_none());
268 }
269
270 #[test]
271 fn test_owned_streaming_range_scan_exclusive_bounds() {
272 let mut data: BTreeMap<Vec<SqlValue>, Vec<usize>> = BTreeMap::new();
273 data.insert(vec![SqlValue::Integer(1)], vec![0]);
274 data.insert(vec![SqlValue::Integer(2)], vec![1]);
275 data.insert(vec![SqlValue::Integer(3)], vec![2]);
276 data.insert(vec![SqlValue::Integer(4)], vec![3]);
277 data.insert(vec![SqlValue::Integer(5)], vec![4]);
278
279 let pending_deletions: Vec<usize> = vec![];
280
281 let iter = OwnedStreamingRangeScan::new(
283 &data,
284 &pending_deletions,
285 Some(SqlValue::Integer(2)),
286 Some(SqlValue::Integer(4)),
287 false,
288 false,
289 )
290 .unwrap();
291
292 let results: Vec<usize> = iter.collect();
293 assert_eq!(results, vec![2]); }
295
296 #[test]
297 fn test_owned_streaming_unbounded_start() {
298 let mut data: BTreeMap<Vec<SqlValue>, Vec<usize>> = BTreeMap::new();
299 data.insert(vec![SqlValue::Integer(1)], vec![0]);
300 data.insert(vec![SqlValue::Integer(2)], vec![1]);
301 data.insert(vec![SqlValue::Integer(3)], vec![2]);
302
303 let pending_deletions: Vec<usize> = vec![];
304
305 let iter = OwnedStreamingRangeScan::new(
307 &data,
308 &pending_deletions,
309 None,
310 Some(SqlValue::Integer(2)),
311 true,
312 true,
313 )
314 .unwrap();
315
316 let results: Vec<usize> = iter.collect();
317 assert_eq!(results, vec![0, 1]); }
319
320 #[test]
321 fn test_owned_streaming_unbounded_end() {
322 let mut data: BTreeMap<Vec<SqlValue>, Vec<usize>> = BTreeMap::new();
323 data.insert(vec![SqlValue::Integer(1)], vec![0]);
324 data.insert(vec![SqlValue::Integer(2)], vec![1]);
325 data.insert(vec![SqlValue::Integer(3)], vec![2]);
326
327 let pending_deletions: Vec<usize> = vec![];
328
329 let iter = OwnedStreamingRangeScan::new(
331 &data,
332 &pending_deletions,
333 Some(SqlValue::Integer(2)),
334 None,
335 true,
336 true,
337 )
338 .unwrap();
339
340 let results: Vec<usize> = iter.collect();
341 assert_eq!(results, vec![1, 2]); }
343}