Skip to main content

reifydb_sub_flow/transaction/
range.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4// These types are now only used by tests (production code uses streaming),
5// but we keep them for testing the merge algorithm.
6#![allow(dead_code)]
7
8use std::{cmp::Ordering, collections::btree_map::Range as BTreeMapRange};
9
10use reifydb_core::{
11	common::CommitVersion,
12	encoded::key::EncodedKey,
13	interface::store::{MultiVersionBatch, MultiVersionRow},
14};
15
16use super::PendingWrite;
17
18/// Iterator that merges pending writes with committed range query results
19///
20/// This iterator combines two sorted streams within a bounded range:
21/// 1. Pending writes from the local BTreeMap range (sorted by key)
22/// 2. Committed results from the underlying query transaction range
23///
24/// The merge algorithm:
25/// - Compares keys at each step
26/// - Prefers pending writes when keys are equal (overlay semantics)
27/// - Filters out removed keys (PendingWrite::Remove)
28/// - Maintains sorted order
29pub struct FlowRangeIter<'a> {
30	/// Iterator over committed results from query transaction
31	committed: Box<dyn Iterator<Item = MultiVersionRow> + Send + 'a>,
32	/// Range iterator over pending writes in sorted order
33	pending: BTreeMapRange<'a, EncodedKey, PendingWrite>,
34	/// Pre-fetched next pending item for lookahead comparison
35	next_pending: Option<(&'a EncodedKey, &'a PendingWrite)>,
36	/// Pre-fetched next committed item for lookahead comparison
37	next_committed: Option<MultiVersionRow>,
38	/// Fixed version for pending writes (CDC version)
39	version: CommitVersion,
40}
41
42impl<'a> FlowRangeIter<'a> {
43	/// Create a new merge iterator for range queries
44	pub fn new(
45		pending: BTreeMapRange<'a, EncodedKey, PendingWrite>,
46		committed: Box<dyn Iterator<Item = MultiVersionRow> + Send + 'a>,
47		version: CommitVersion,
48	) -> Self {
49		let mut iterator = Self {
50			pending,
51			committed,
52			next_pending: None,
53			next_committed: None,
54			version,
55		};
56
57		iterator.advance_pending();
58		iterator.advance_committed();
59
60		iterator
61	}
62
63	/// Advance the pending iterator and cache the next item
64	fn advance_pending(&mut self) {
65		self.next_pending = self.pending.next();
66	}
67
68	/// Advance the committed iterator and cache the next item
69	fn advance_committed(&mut self) {
70		self.next_committed = self.committed.next();
71	}
72}
73
74impl<'a> Iterator for FlowRangeIter<'a> {
75	type Item = MultiVersionRow;
76
77	fn next(&mut self) -> Option<Self::Item> {
78		loop {
79			match (&self.next_pending, &self.next_committed) {
80				// Both pending and committed have items
81				(Some((pending_key, _pending_value)), Some(committed)) => {
82					match pending_key.as_ref().cmp(committed.key.as_ref()) {
83						// Pending has smaller key - yield pending if it's a write
84						Ordering::Less => {
85							let (key, value) = self.next_pending.take().unwrap();
86							self.advance_pending();
87
88							match value {
89								PendingWrite::Set(row) => {
90									return Some(MultiVersionRow {
91										key: key.clone(),
92										row: row.clone(),
93										version: self.version,
94									});
95								}
96								PendingWrite::Remove => continue, // Skip removed keys
97							}
98						}
99						// Keys are equal - prefer pending, skip committed
100						Ordering::Equal => {
101							let (key, value) = self.next_pending.take().unwrap();
102							self.advance_pending();
103							self.advance_committed(); // Skip the duplicate committed entry
104
105							match value {
106								PendingWrite::Set(row) => {
107									return Some(MultiVersionRow {
108										key: key.clone(),
109										row: row.clone(),
110										version: self.version,
111									});
112								}
113								PendingWrite::Remove => continue, // Skip removed keys
114							}
115						}
116						// Committed has smaller key - yield committed
117						Ordering::Greater => {
118							let committed = self.next_committed.take().unwrap();
119							self.advance_committed();
120							return Some(committed);
121						}
122					}
123				}
124				// Only pending items left
125				(Some(_), None) => {
126					let (key, value) = self.next_pending.take().unwrap();
127					self.advance_pending();
128
129					match value {
130						PendingWrite::Set(row) => {
131							return Some(MultiVersionRow {
132								key: key.clone(),
133								row: row.clone(),
134								version: self.version,
135							});
136						}
137						PendingWrite::Remove => continue, // Skip removed keys
138					}
139				}
140				// Only committed items left
141				(None, Some(_)) => {
142					let committed = self.next_committed.take().unwrap();
143					self.advance_committed();
144					return Some(committed);
145				}
146				// Both exhausted
147				(None, None) => return None,
148			}
149		}
150	}
151}
152
153/// Collect a merged batch of pending and committed values
154///
155/// This function uses the FlowRangeIter to merge pending writes with committed batch results,
156/// materializing all items into a single batch. The `has_more` field is always false because
157/// pending writes are finite and fully materialized.
158pub fn collect_batch(
159	pending: BTreeMapRange<'_, EncodedKey, PendingWrite>,
160	committed_batch: MultiVersionBatch,
161	version: CommitVersion,
162) -> MultiVersionBatch {
163	// Create iterator with same merge logic
164	let iter = FlowRangeIter::new(pending, Box::new(committed_batch.items.into_iter()), version);
165
166	// Materialize all items
167	let items: Vec<_> = iter.collect();
168
169	MultiVersionBatch {
170		items,
171		has_more: false,
172	}
173}
174
175#[cfg(test)]
176pub mod tests {
177	use std::collections::BTreeMap;
178
179	use reifydb_core::{
180		common::CommitVersion,
181		encoded::{key::EncodedKey, row::EncodedRow},
182		interface::store::MultiVersionRow,
183	};
184	use reifydb_type::util::cowvec::CowVec;
185
186	use super::*;
187
188	fn make_key(s: &str) -> EncodedKey {
189		EncodedKey::new(s.as_bytes().to_vec())
190	}
191
192	fn make_value(s: &str) -> EncodedRow {
193		EncodedRow(CowVec::new(s.as_bytes().to_vec()))
194	}
195
196	fn make_committed(key: &str, value: &str, version: u64) -> MultiVersionRow {
197		MultiVersionRow {
198			key: make_key(key),
199			row: make_value(value),
200			version: CommitVersion(version),
201		}
202	}
203
204	#[test]
205	fn test_empty_range_both_iterators() {
206		let pending: BTreeMap<EncodedKey, PendingWrite> = BTreeMap::new();
207		let committed: Vec<MultiVersionRow> = vec![];
208
209		let mut iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(1));
210
211		assert!(iter.next().is_none());
212	}
213
214	#[test]
215	fn test_range_only_pending() {
216		let mut pending = BTreeMap::new();
217		pending.insert(make_key("a"), PendingWrite::Set(make_value("1")));
218		pending.insert(make_key("b"), PendingWrite::Set(make_value("2")));
219		pending.insert(make_key("c"), PendingWrite::Set(make_value("3")));
220		pending.insert(make_key("d"), PendingWrite::Set(make_value("4")));
221
222		let committed: Vec<MultiVersionRow> = vec![];
223
224		// Range from "b" to "d" (exclusive)
225		let iter = FlowRangeIter::new(
226			pending.range(make_key("b")..make_key("d")),
227			Box::new(committed.into_iter()),
228			CommitVersion(10),
229		);
230
231		let items: Vec<_> = iter.collect();
232		assert_eq!(items.len(), 2);
233		assert_eq!(items[0].key, make_key("b"));
234		assert_eq!(items[1].key, make_key("c"));
235	}
236
237	#[test]
238	fn test_range_only_committed() {
239		let pending: BTreeMap<EncodedKey, PendingWrite> = BTreeMap::new();
240		let committed =
241			vec![make_committed("a", "1", 5), make_committed("b", "2", 6), make_committed("c", "3", 7)];
242
243		let iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(10));
244
245		let items: Vec<_> = iter.collect();
246		assert_eq!(items.len(), 3);
247		assert_eq!(items[0].key, make_key("a"));
248		assert_eq!(items[1].key, make_key("b"));
249		assert_eq!(items[2].key, make_key("c"));
250	}
251
252	#[test]
253	fn test_range_filters_removes() {
254		let mut pending = BTreeMap::new();
255		pending.insert(make_key("a"), PendingWrite::Set(make_value("1")));
256		pending.insert(make_key("b"), PendingWrite::Remove);
257		pending.insert(make_key("c"), PendingWrite::Set(make_value("3")));
258
259		let committed: Vec<MultiVersionRow> = vec![];
260
261		let iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(10));
262
263		let items: Vec<_> = iter.collect();
264		assert_eq!(items.len(), 2);
265		assert_eq!(items[0].key, make_key("a"));
266		assert_eq!(items[1].key, make_key("c"));
267	}
268
269	#[test]
270	fn test_range_pending_shadows_committed() {
271		let mut pending = BTreeMap::new();
272		pending.insert(make_key("b"), PendingWrite::Set(make_value("new")));
273
274		let committed =
275			vec![make_committed("a", "1", 5), make_committed("b", "old", 6), make_committed("c", "3", 7)];
276
277		let iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(10));
278
279		let items: Vec<_> = iter.collect();
280		assert_eq!(items.len(), 3);
281		assert_eq!(items[1].key, make_key("b"));
282		assert_eq!(items[1].row, make_value("new"));
283		assert_eq!(items[1].version, CommitVersion(10));
284	}
285
286	#[test]
287	fn test_range_remove_hides_committed() {
288		let mut pending = BTreeMap::new();
289		pending.insert(make_key("b"), PendingWrite::Remove);
290
291		let committed =
292			vec![make_committed("a", "1", 5), make_committed("b", "2", 6), make_committed("c", "3", 7)];
293
294		let iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(10));
295
296		let items: Vec<_> = iter.collect();
297		assert_eq!(items.len(), 2);
298		assert_eq!(items[0].key, make_key("a"));
299		assert_eq!(items[1].key, make_key("c"));
300	}
301
302	#[test]
303	fn test_range_bounded_query() {
304		let mut pending = BTreeMap::new();
305		pending.insert(make_key("a"), PendingWrite::Set(make_value("a")));
306		pending.insert(make_key("b"), PendingWrite::Set(make_value("b")));
307		pending.insert(make_key("c"), PendingWrite::Set(make_value("c")));
308		pending.insert(make_key("d"), PendingWrite::Set(make_value("d")));
309		pending.insert(make_key("e"), PendingWrite::Set(make_value("e")));
310		pending.insert(make_key("f"), PendingWrite::Set(make_value("f")));
311
312		let committed = vec![make_committed("b", "old_b", 5), make_committed("f", "f_old", 6)];
313
314		// Range from "b" (inclusive) to "e" (exclusive)
315		let iter = FlowRangeIter::new(
316			pending.range(make_key("b")..make_key("e")),
317			Box::new(committed.into_iter()),
318			CommitVersion(10),
319		);
320
321		let items: Vec<_> = iter.collect();
322
323		assert_eq!(items.len(), 4);
324		assert_eq!(items[0].key, make_key("b"));
325		assert_eq!(items[0].row, make_value("b")); // Pending value
326
327		assert_eq!(items[1].key, make_key("c"));
328		assert_eq!(items[2].key, make_key("d"));
329		assert_eq!(items[3].key, make_key("f"));
330		assert_eq!(items[3].row, make_value("f_old"));
331	}
332
333	#[test]
334	fn test_range_interleaved_merge() {
335		let mut pending = BTreeMap::new();
336		pending.insert(make_key("b"), PendingWrite::Set(make_value("pending_b")));
337		pending.insert(make_key("d"), PendingWrite::Set(make_value("pending_d")));
338
339		let committed = vec![
340			make_committed("a", "committed_a", 5),
341			make_committed("c", "committed_c", 6),
342			make_committed("e", "committed_e", 7),
343		];
344
345		let iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(10));
346
347		let items: Vec<_> = iter.collect();
348		assert_eq!(items.len(), 5);
349		assert_eq!(items[0].key, make_key("a"));
350		assert_eq!(items[1].key, make_key("b"));
351		assert_eq!(items[2].key, make_key("c"));
352		assert_eq!(items[3].key, make_key("d"));
353		assert_eq!(items[4].key, make_key("e"));
354	}
355
356	#[test]
357	fn test_range_sorted_order() {
358		let mut pending = BTreeMap::new();
359		pending.insert(make_key("m"), PendingWrite::Set(make_value("m")));
360		pending.insert(make_key("a"), PendingWrite::Set(make_value("a")));
361		pending.insert(make_key("z"), PendingWrite::Set(make_value("z")));
362
363		let committed =
364			vec![make_committed("d", "d", 5), make_committed("k", "k", 6), make_committed("p", "p", 7)];
365
366		let iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(10));
367
368		let items: Vec<_> = iter.collect();
369		assert_eq!(items.len(), 6);
370
371		let keys: Vec<_> = items.iter().map(|i| i.key.clone()).collect();
372		assert_eq!(
373			keys,
374			vec![make_key("a"), make_key("d"), make_key("k"), make_key("m"), make_key("p"), make_key("z")]
375		);
376	}
377
378	#[test]
379	fn test_range_with_start_bound() {
380		let mut pending = BTreeMap::new();
381		pending.insert(make_key("a"), PendingWrite::Set(make_value("a")));
382		pending.insert(make_key("b"), PendingWrite::Set(make_value("b")));
383		pending.insert(make_key("c"), PendingWrite::Set(make_value("c")));
384		pending.insert(make_key("d"), PendingWrite::Set(make_value("d")));
385
386		let committed: Vec<MultiVersionRow> = vec![];
387
388		// Start from "b" onwards
389		let iter = FlowRangeIter::new(
390			pending.range(make_key("b")..),
391			Box::new(committed.into_iter()),
392			CommitVersion(10),
393		);
394
395		let items: Vec<_> = iter.collect();
396		assert_eq!(items.len(), 3);
397		assert_eq!(items[0].key, make_key("b"));
398		assert_eq!(items[1].key, make_key("c"));
399		assert_eq!(items[2].key, make_key("d"));
400	}
401
402	#[test]
403	fn test_range_with_end_bound() {
404		let mut pending = BTreeMap::new();
405		pending.insert(make_key("a"), PendingWrite::Set(make_value("a")));
406		pending.insert(make_key("b"), PendingWrite::Set(make_value("b")));
407		pending.insert(make_key("c"), PendingWrite::Set(make_value("c")));
408		pending.insert(make_key("d"), PendingWrite::Set(make_value("d")));
409
410		let committed: Vec<MultiVersionRow> = vec![];
411
412		// Up to "c" (exclusive)
413		let iter = FlowRangeIter::new(
414			pending.range(..make_key("c")),
415			Box::new(committed.into_iter()),
416			CommitVersion(10),
417		);
418
419		let items: Vec<_> = iter.collect();
420		assert_eq!(items.len(), 2);
421		assert_eq!(items[0].key, make_key("a"));
422		assert_eq!(items[1].key, make_key("b"));
423	}
424
425	#[test]
426	fn test_range_inclusive_bounds() {
427		let mut pending = BTreeMap::new();
428		pending.insert(make_key("a"), PendingWrite::Set(make_value("a")));
429		pending.insert(make_key("b"), PendingWrite::Set(make_value("b")));
430		pending.insert(make_key("c"), PendingWrite::Set(make_value("c")));
431		pending.insert(make_key("d"), PendingWrite::Set(make_value("d")));
432
433		let committed: Vec<MultiVersionRow> = vec![];
434
435		// From "b" to "c" inclusive
436		let iter = FlowRangeIter::new(
437			pending.range(make_key("b")..=make_key("c")),
438			Box::new(committed.into_iter()),
439			CommitVersion(10),
440		);
441
442		let items: Vec<_> = iter.collect();
443		assert_eq!(items.len(), 2);
444		assert_eq!(items[0].key, make_key("b"));
445		assert_eq!(items[1].key, make_key("c"));
446	}
447
448	#[test]
449	fn test_range_complex_scenario() {
450		let mut pending = BTreeMap::new();
451		pending.insert(make_key("a"), PendingWrite::Set(make_value("new_a")));
452		pending.insert(make_key("b"), PendingWrite::Remove);
453		pending.insert(make_key("c"), PendingWrite::Set(make_value("new_c")));
454		pending.insert(make_key("f"), PendingWrite::Remove);
455		pending.insert(make_key("g"), PendingWrite::Set(make_value("new_g")));
456
457		let committed = vec![
458			make_committed("a", "old_a", 5),
459			make_committed("b", "old_b", 6),
460			make_committed("d", "old_d", 7),
461			make_committed("e", "old_e", 8),
462			make_committed("f", "old_f", 9),
463		];
464
465		// Range from "a" to "g" (exclusive)
466		let iter = FlowRangeIter::new(
467			pending.range(make_key("a")..make_key("g")),
468			Box::new(committed.into_iter()),
469			CommitVersion(10),
470		);
471
472		let items: Vec<_> = iter.collect();
473
474		// Expected: a(new), c(new), d(old), e(old)
475		// Removed: b, f
476		assert_eq!(items.len(), 4);
477		assert_eq!(items[0].key, make_key("a"));
478		assert_eq!(items[0].row, make_value("new_a"));
479		assert_eq!(items[1].key, make_key("c"));
480		assert_eq!(items[1].row, make_value("new_c"));
481		assert_eq!(items[2].key, make_key("d"));
482		assert_eq!(items[2].row, make_value("old_d"));
483		assert_eq!(items[3].key, make_key("e"));
484		assert_eq!(items[3].row, make_value("old_e"));
485	}
486
487	#[test]
488	fn test_range_empty_result() {
489		let mut pending = BTreeMap::new();
490		pending.insert(make_key("a"), PendingWrite::Set(make_value("a")));
491		pending.insert(make_key("z"), PendingWrite::Set(make_value("z")));
492
493		let committed: Vec<MultiVersionRow> = vec![];
494
495		// Range with no matching keys
496		let iter = FlowRangeIter::new(
497			pending.range(make_key("m")..make_key("n")),
498			Box::new(committed.into_iter()),
499			CommitVersion(10),
500		);
501
502		let items: Vec<_> = iter.collect();
503		assert!(items.is_empty());
504	}
505}