1#![allow(dead_code)]
7
8use std::{cmp::Ordering, collections::btree_map::Range as BTreeMapRange};
9
10use reifydb_core::{
11 common::CommitVersion,
12 encoded::key::EncodedKey,
13 interface::store::{MultiVersionBatch, MultiVersionValues},
14};
15
16use super::PendingWrite;
17
18pub struct FlowRangeIter<'a> {
30 committed: Box<dyn Iterator<Item = MultiVersionValues> + Send + 'a>,
32 pending: BTreeMapRange<'a, EncodedKey, PendingWrite>,
34 next_pending: Option<(&'a EncodedKey, &'a PendingWrite)>,
36 next_committed: Option<MultiVersionValues>,
38 version: CommitVersion,
40}
41
42impl<'a> FlowRangeIter<'a> {
43 pub fn new(
45 pending: BTreeMapRange<'a, EncodedKey, PendingWrite>,
46 committed: Box<dyn Iterator<Item = MultiVersionValues> + Send + 'a>,
47 version: CommitVersion,
48 ) -> Self {
49 let mut iterator = Self {
50 pending,
51 committed,
52 next_pending: None,
53 next_committed: None,
54 version,
55 };
56
57 iterator.advance_pending();
58 iterator.advance_committed();
59
60 iterator
61 }
62
63 fn advance_pending(&mut self) {
65 self.next_pending = self.pending.next();
66 }
67
68 fn advance_committed(&mut self) {
70 self.next_committed = self.committed.next();
71 }
72}
73
74impl<'a> Iterator for FlowRangeIter<'a> {
75 type Item = MultiVersionValues;
76
77 fn next(&mut self) -> Option<Self::Item> {
78 loop {
79 match (&self.next_pending, &self.next_committed) {
80 (Some((pending_key, _pending_value)), Some(committed)) => {
82 match pending_key.as_ref().cmp(committed.key.as_ref()) {
83 Ordering::Less => {
85 let (key, value) = self.next_pending.take().unwrap();
86 self.advance_pending();
87
88 match value {
89 PendingWrite::Set(values) => {
90 return Some(MultiVersionValues {
91 key: key.clone(),
92 values: values.clone(),
93 version: self.version,
94 });
95 }
96 PendingWrite::Remove => continue, }
98 }
99 Ordering::Equal => {
101 let (key, value) = self.next_pending.take().unwrap();
102 self.advance_pending();
103 self.advance_committed(); match value {
106 PendingWrite::Set(values) => {
107 return Some(MultiVersionValues {
108 key: key.clone(),
109 values: values.clone(),
110 version: self.version,
111 });
112 }
113 PendingWrite::Remove => continue, }
115 }
116 Ordering::Greater => {
118 let committed = self.next_committed.take().unwrap();
119 self.advance_committed();
120 return Some(committed);
121 }
122 }
123 }
124 (Some(_), None) => {
126 let (key, value) = self.next_pending.take().unwrap();
127 self.advance_pending();
128
129 match value {
130 PendingWrite::Set(values) => {
131 return Some(MultiVersionValues {
132 key: key.clone(),
133 values: values.clone(),
134 version: self.version,
135 });
136 }
137 PendingWrite::Remove => continue, }
139 }
140 (None, Some(_)) => {
142 let committed = self.next_committed.take().unwrap();
143 self.advance_committed();
144 return Some(committed);
145 }
146 (None, None) => return None,
148 }
149 }
150 }
151}
152
153pub fn collect_batch(
159 pending: BTreeMapRange<'_, EncodedKey, PendingWrite>,
160 committed_batch: MultiVersionBatch,
161 version: CommitVersion,
162) -> MultiVersionBatch {
163 let iter = FlowRangeIter::new(pending, Box::new(committed_batch.items.into_iter()), version);
165
166 let items: Vec<_> = iter.collect();
168
169 MultiVersionBatch {
170 items,
171 has_more: false,
172 }
173}
174
175#[cfg(test)]
176pub mod tests {
177 use std::collections::BTreeMap;
178
179 use reifydb_core::{
180 common::CommitVersion,
181 encoded::{encoded::EncodedValues, key::EncodedKey},
182 interface::store::MultiVersionValues,
183 };
184 use reifydb_type::util::cowvec::CowVec;
185
186 use super::*;
187
188 fn make_key(s: &str) -> EncodedKey {
189 EncodedKey::new(s.as_bytes().to_vec())
190 }
191
192 fn make_value(s: &str) -> EncodedValues {
193 EncodedValues(CowVec::new(s.as_bytes().to_vec()))
194 }
195
196 fn make_committed(key: &str, value: &str, version: u64) -> MultiVersionValues {
197 MultiVersionValues {
198 key: make_key(key),
199 values: make_value(value),
200 version: CommitVersion(version),
201 }
202 }
203
204 #[test]
205 fn test_empty_range_both_iterators() {
206 let pending: BTreeMap<EncodedKey, PendingWrite> = BTreeMap::new();
207 let committed: Vec<MultiVersionValues> = vec![];
208
209 let mut iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(1));
210
211 assert!(iter.next().is_none());
212 }
213
214 #[test]
215 fn test_range_only_pending() {
216 let mut pending = BTreeMap::new();
217 pending.insert(make_key("a"), PendingWrite::Set(make_value("1")));
218 pending.insert(make_key("b"), PendingWrite::Set(make_value("2")));
219 pending.insert(make_key("c"), PendingWrite::Set(make_value("3")));
220 pending.insert(make_key("d"), PendingWrite::Set(make_value("4")));
221
222 let committed: Vec<MultiVersionValues> = vec![];
223
224 let iter = FlowRangeIter::new(
226 pending.range(make_key("b")..make_key("d")),
227 Box::new(committed.into_iter()),
228 CommitVersion(10),
229 );
230
231 let items: Vec<_> = iter.collect();
232 assert_eq!(items.len(), 2);
233 assert_eq!(items[0].key, make_key("b"));
234 assert_eq!(items[1].key, make_key("c"));
235 }
236
237 #[test]
238 fn test_range_only_committed() {
239 let pending: BTreeMap<EncodedKey, PendingWrite> = BTreeMap::new();
240 let committed =
241 vec![make_committed("a", "1", 5), make_committed("b", "2", 6), make_committed("c", "3", 7)];
242
243 let iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(10));
244
245 let items: Vec<_> = iter.collect();
246 assert_eq!(items.len(), 3);
247 assert_eq!(items[0].key, make_key("a"));
248 assert_eq!(items[1].key, make_key("b"));
249 assert_eq!(items[2].key, make_key("c"));
250 }
251
252 #[test]
253 fn test_range_filters_removes() {
254 let mut pending = BTreeMap::new();
255 pending.insert(make_key("a"), PendingWrite::Set(make_value("1")));
256 pending.insert(make_key("b"), PendingWrite::Remove);
257 pending.insert(make_key("c"), PendingWrite::Set(make_value("3")));
258
259 let committed: Vec<MultiVersionValues> = vec![];
260
261 let iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(10));
262
263 let items: Vec<_> = iter.collect();
264 assert_eq!(items.len(), 2);
265 assert_eq!(items[0].key, make_key("a"));
266 assert_eq!(items[1].key, make_key("c"));
267 }
268
269 #[test]
270 fn test_range_pending_shadows_committed() {
271 let mut pending = BTreeMap::new();
272 pending.insert(make_key("b"), PendingWrite::Set(make_value("new")));
273
274 let committed =
275 vec![make_committed("a", "1", 5), make_committed("b", "old", 6), make_committed("c", "3", 7)];
276
277 let iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(10));
278
279 let items: Vec<_> = iter.collect();
280 assert_eq!(items.len(), 3);
281 assert_eq!(items[1].key, make_key("b"));
282 assert_eq!(items[1].values, make_value("new"));
283 assert_eq!(items[1].version, CommitVersion(10));
284 }
285
286 #[test]
287 fn test_range_remove_hides_committed() {
288 let mut pending = BTreeMap::new();
289 pending.insert(make_key("b"), PendingWrite::Remove);
290
291 let committed =
292 vec![make_committed("a", "1", 5), make_committed("b", "2", 6), make_committed("c", "3", 7)];
293
294 let iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(10));
295
296 let items: Vec<_> = iter.collect();
297 assert_eq!(items.len(), 2);
298 assert_eq!(items[0].key, make_key("a"));
299 assert_eq!(items[1].key, make_key("c"));
300 }
301
302 #[test]
303 fn test_range_bounded_query() {
304 let mut pending = BTreeMap::new();
305 pending.insert(make_key("a"), PendingWrite::Set(make_value("a")));
306 pending.insert(make_key("b"), PendingWrite::Set(make_value("b")));
307 pending.insert(make_key("c"), PendingWrite::Set(make_value("c")));
308 pending.insert(make_key("d"), PendingWrite::Set(make_value("d")));
309 pending.insert(make_key("e"), PendingWrite::Set(make_value("e")));
310 pending.insert(make_key("f"), PendingWrite::Set(make_value("f")));
311
312 let committed = vec![make_committed("b", "old_b", 5), make_committed("f", "f_old", 6)];
313
314 let iter = FlowRangeIter::new(
316 pending.range(make_key("b")..make_key("e")),
317 Box::new(committed.into_iter()),
318 CommitVersion(10),
319 );
320
321 let items: Vec<_> = iter.collect();
322
323 assert_eq!(items.len(), 4);
324 assert_eq!(items[0].key, make_key("b"));
325 assert_eq!(items[0].values, make_value("b")); assert_eq!(items[1].key, make_key("c"));
328 assert_eq!(items[2].key, make_key("d"));
329 assert_eq!(items[3].key, make_key("f"));
330 assert_eq!(items[3].values, make_value("f_old"));
331 }
332
333 #[test]
334 fn test_range_interleaved_merge() {
335 let mut pending = BTreeMap::new();
336 pending.insert(make_key("b"), PendingWrite::Set(make_value("pending_b")));
337 pending.insert(make_key("d"), PendingWrite::Set(make_value("pending_d")));
338
339 let committed = vec![
340 make_committed("a", "committed_a", 5),
341 make_committed("c", "committed_c", 6),
342 make_committed("e", "committed_e", 7),
343 ];
344
345 let iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(10));
346
347 let items: Vec<_> = iter.collect();
348 assert_eq!(items.len(), 5);
349 assert_eq!(items[0].key, make_key("a"));
350 assert_eq!(items[1].key, make_key("b"));
351 assert_eq!(items[2].key, make_key("c"));
352 assert_eq!(items[3].key, make_key("d"));
353 assert_eq!(items[4].key, make_key("e"));
354 }
355
356 #[test]
357 fn test_range_sorted_order() {
358 let mut pending = BTreeMap::new();
359 pending.insert(make_key("m"), PendingWrite::Set(make_value("m")));
360 pending.insert(make_key("a"), PendingWrite::Set(make_value("a")));
361 pending.insert(make_key("z"), PendingWrite::Set(make_value("z")));
362
363 let committed =
364 vec![make_committed("d", "d", 5), make_committed("k", "k", 6), make_committed("p", "p", 7)];
365
366 let iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(10));
367
368 let items: Vec<_> = iter.collect();
369 assert_eq!(items.len(), 6);
370
371 let keys: Vec<_> = items.iter().map(|i| i.key.clone()).collect();
372 assert_eq!(
373 keys,
374 vec![make_key("a"), make_key("d"), make_key("k"), make_key("m"), make_key("p"), make_key("z")]
375 );
376 }
377
378 #[test]
379 fn test_range_with_start_bound() {
380 let mut pending = BTreeMap::new();
381 pending.insert(make_key("a"), PendingWrite::Set(make_value("a")));
382 pending.insert(make_key("b"), PendingWrite::Set(make_value("b")));
383 pending.insert(make_key("c"), PendingWrite::Set(make_value("c")));
384 pending.insert(make_key("d"), PendingWrite::Set(make_value("d")));
385
386 let committed: Vec<MultiVersionValues> = vec![];
387
388 let iter = FlowRangeIter::new(
390 pending.range(make_key("b")..),
391 Box::new(committed.into_iter()),
392 CommitVersion(10),
393 );
394
395 let items: Vec<_> = iter.collect();
396 assert_eq!(items.len(), 3);
397 assert_eq!(items[0].key, make_key("b"));
398 assert_eq!(items[1].key, make_key("c"));
399 assert_eq!(items[2].key, make_key("d"));
400 }
401
402 #[test]
403 fn test_range_with_end_bound() {
404 let mut pending = BTreeMap::new();
405 pending.insert(make_key("a"), PendingWrite::Set(make_value("a")));
406 pending.insert(make_key("b"), PendingWrite::Set(make_value("b")));
407 pending.insert(make_key("c"), PendingWrite::Set(make_value("c")));
408 pending.insert(make_key("d"), PendingWrite::Set(make_value("d")));
409
410 let committed: Vec<MultiVersionValues> = vec![];
411
412 let iter = FlowRangeIter::new(
414 pending.range(..make_key("c")),
415 Box::new(committed.into_iter()),
416 CommitVersion(10),
417 );
418
419 let items: Vec<_> = iter.collect();
420 assert_eq!(items.len(), 2);
421 assert_eq!(items[0].key, make_key("a"));
422 assert_eq!(items[1].key, make_key("b"));
423 }
424
425 #[test]
426 fn test_range_inclusive_bounds() {
427 let mut pending = BTreeMap::new();
428 pending.insert(make_key("a"), PendingWrite::Set(make_value("a")));
429 pending.insert(make_key("b"), PendingWrite::Set(make_value("b")));
430 pending.insert(make_key("c"), PendingWrite::Set(make_value("c")));
431 pending.insert(make_key("d"), PendingWrite::Set(make_value("d")));
432
433 let committed: Vec<MultiVersionValues> = vec![];
434
435 let iter = FlowRangeIter::new(
437 pending.range(make_key("b")..=make_key("c")),
438 Box::new(committed.into_iter()),
439 CommitVersion(10),
440 );
441
442 let items: Vec<_> = iter.collect();
443 assert_eq!(items.len(), 2);
444 assert_eq!(items[0].key, make_key("b"));
445 assert_eq!(items[1].key, make_key("c"));
446 }
447
448 #[test]
449 fn test_range_complex_scenario() {
450 let mut pending = BTreeMap::new();
451 pending.insert(make_key("a"), PendingWrite::Set(make_value("new_a")));
452 pending.insert(make_key("b"), PendingWrite::Remove);
453 pending.insert(make_key("c"), PendingWrite::Set(make_value("new_c")));
454 pending.insert(make_key("f"), PendingWrite::Remove);
455 pending.insert(make_key("g"), PendingWrite::Set(make_value("new_g")));
456
457 let committed = vec![
458 make_committed("a", "old_a", 5),
459 make_committed("b", "old_b", 6),
460 make_committed("d", "old_d", 7),
461 make_committed("e", "old_e", 8),
462 make_committed("f", "old_f", 9),
463 ];
464
465 let iter = FlowRangeIter::new(
467 pending.range(make_key("a")..make_key("g")),
468 Box::new(committed.into_iter()),
469 CommitVersion(10),
470 );
471
472 let items: Vec<_> = iter.collect();
473
474 assert_eq!(items.len(), 4);
477 assert_eq!(items[0].key, make_key("a"));
478 assert_eq!(items[0].values, make_value("new_a"));
479 assert_eq!(items[1].key, make_key("c"));
480 assert_eq!(items[1].values, make_value("new_c"));
481 assert_eq!(items[2].key, make_key("d"));
482 assert_eq!(items[2].values, make_value("old_d"));
483 assert_eq!(items[3].key, make_key("e"));
484 assert_eq!(items[3].values, make_value("old_e"));
485 }
486
487 #[test]
488 fn test_range_empty_result() {
489 let mut pending = BTreeMap::new();
490 pending.insert(make_key("a"), PendingWrite::Set(make_value("a")));
491 pending.insert(make_key("z"), PendingWrite::Set(make_value("z")));
492
493 let committed: Vec<MultiVersionValues> = vec![];
494
495 let iter = FlowRangeIter::new(
497 pending.range(make_key("m")..make_key("n")),
498 Box::new(committed.into_iter()),
499 CommitVersion(10),
500 );
501
502 let items: Vec<_> = iter.collect();
503 assert!(items.is_empty());
504 }
505}