Skip to main content

reifydb_sub_flow/transaction/
read.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	cmp::Ordering,
6	collections, iter,
7	ops::Bound::{Excluded, Included, Unbounded},
8	vec,
9};
10
11use collections::BTreeMap;
12use iter::Peekable;
13use reifydb_core::{
14	common::CommitVersion,
15	encoded::{
16		key::{EncodedKey, EncodedKeyRange},
17		row::EncodedRow,
18	},
19	interface::store::{MultiVersionBatch, MultiVersionRow},
20	key::{Key, kind::KeyKind},
21};
22use reifydb_type::Result;
23use vec::IntoIter;
24
25use super::{FlowTransaction, PendingWrite};
26
27impl FlowTransaction {
28	/// Get a value by key, checking pending writes first, then (if transactional) base_pending, then querying
29	/// multi-version store
30	pub fn get(&mut self, key: &EncodedKey) -> Result<Option<EncodedRow>> {
31		// 1. Check flow-generated pending writes
32		let inner = self.inner();
33		if inner.pending.is_removed(key) {
34			return Ok(None);
35		}
36		if let Some(value) = inner.pending.get(key) {
37			return Ok(Some(value.clone()));
38		}
39
40		// 2. Check transaction's base writes (only for Transactional variant)
41		if let Self::Transactional {
42			base_pending,
43			..
44		} = self
45		{
46			if base_pending.is_removed(key) {
47				return Ok(None);
48			}
49			if let Some(value) = base_pending.get(key) {
50				return Ok(Some(value.clone()));
51			}
52		}
53
54		// 3. Fall through to committed storage
55		let inner = self.inner_mut();
56		let query = if Self::is_flow_state_key(key) {
57			&inner.state_query
58		} else {
59			&inner.primitive_query
60		};
61		match query.get(key)? {
62			Some(multi) => Ok(Some(multi.row().clone())),
63			None => Ok(None),
64		}
65	}
66
67	/// Check if a key exists
68	pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
69		let inner = self.inner();
70		if inner.pending.is_removed(key) {
71			return Ok(false);
72		}
73		if inner.pending.get(key).is_some() {
74			return Ok(true);
75		}
76
77		if let Self::Transactional {
78			base_pending,
79			..
80		} = self
81		{
82			if base_pending.is_removed(key) {
83				return Ok(false);
84			}
85			if base_pending.get(key).is_some() {
86				return Ok(true);
87			}
88		}
89
90		let inner = self.inner_mut();
91		let query = if Self::is_flow_state_key(key) {
92			&inner.state_query
93		} else {
94			&inner.primitive_query
95		};
96		query.contains_key(key)
97	}
98
99	/// Prefix scan
100	pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
101		let range = EncodedKeyRange::prefix(prefix);
102		let items = self.range(range, 1024).collect::<Result<Vec<_>>>()?;
103		Ok(MultiVersionBatch {
104			items,
105			has_more: false,
106		})
107	}
108
109	fn is_flow_state_key(key: &EncodedKey) -> bool {
110		match Key::kind(&key) {
111			None => false,
112			Some(kind) => match kind {
113				KeyKind::FlowNodeState => true,
114				KeyKind::FlowNodeInternalState => true,
115				_ => false,
116			},
117		}
118	}
119
120	/// Create an iterator for forward range queries.
121	///
122	/// This properly handles high version density by scanning until batch_size
123	/// unique logical keys are collected. The iterator yields individual entries
124	/// and maintains cursor state internally. Pending writes are merged with
125	/// committed storage data.
126	pub fn range(
127		&mut self,
128		range: EncodedKeyRange,
129		batch_size: usize,
130	) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_> {
131		match self {
132			Self::Deferred {
133				inner,
134				..
135			} => {
136				let merged: BTreeMap<EncodedKey, PendingWrite> = inner
137					.pending
138					.range((range.start.as_ref(), range.end.as_ref()))
139					.map(|(k, v)| (k.clone(), v.clone()))
140					.collect();
141				let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().collect();
142
143				let query = match range.start.as_ref() {
144					Included(start) | Excluded(start) => {
145						if Self::is_flow_state_key(start) {
146							&inner.state_query
147						} else {
148							&inner.primitive_query
149						}
150					}
151					Unbounded => &inner.primitive_query,
152				};
153
154				let storage_iter = query.range(range, batch_size);
155				let v = inner.version;
156				Box::new(flow_merge_pending_iterator(pending_vec, storage_iter, v))
157			}
158			Self::Transactional {
159				inner,
160				base_pending,
161				..
162			} => {
163				// Collect base layer entries in range, then let flow pending shadow base for same keys
164				let mut merged: BTreeMap<EncodedKey, PendingWrite> = base_pending
165					.range((range.start.as_ref(), range.end.as_ref()))
166					.map(|(k, v)| (k.clone(), v.clone()))
167					.collect();
168				for (k, v) in inner.pending.range((range.start.as_ref(), range.end.as_ref())) {
169					merged.insert(k.clone(), v.clone());
170				}
171				let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().collect();
172
173				let query = match range.start.as_ref() {
174					Included(start) | Excluded(start) => {
175						if Self::is_flow_state_key(start) {
176							&inner.state_query
177						} else {
178							&inner.primitive_query
179						}
180					}
181					Unbounded => &inner.primitive_query,
182				};
183
184				let storage_iter = query.range(range, batch_size);
185				let v = inner.version;
186				Box::new(flow_merge_pending_iterator(pending_vec, storage_iter, v))
187			}
188		}
189	}
190
191	/// Create an iterator for reverse range queries.
192	///
193	/// This properly handles high version density by scanning until batch_size
194	/// unique logical keys are collected. The iterator yields individual entries
195	/// in reverse key order and maintains cursor state internally.
196	pub fn range_rev(
197		&mut self,
198		range: EncodedKeyRange,
199		batch_size: usize,
200	) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_> {
201		match self {
202			Self::Deferred {
203				inner,
204				..
205			} => {
206				let merged: BTreeMap<EncodedKey, PendingWrite> = inner
207					.pending
208					.range((range.start.as_ref(), range.end.as_ref()))
209					.map(|(k, v)| (k.clone(), v.clone()))
210					.collect();
211				let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().rev().collect();
212
213				let query = match range.start.as_ref() {
214					Included(start) | Excluded(start) => {
215						if Self::is_flow_state_key(start) {
216							&inner.state_query
217						} else {
218							&inner.primitive_query
219						}
220					}
221					Unbounded => &inner.primitive_query,
222				};
223
224				let storage_iter = query.range_rev(range, batch_size);
225				let v = inner.version;
226				Box::new(flow_merge_pending_iterator_rev(pending_vec, storage_iter, v))
227			}
228			Self::Transactional {
229				inner,
230				base_pending,
231				..
232			} => {
233				let mut merged: BTreeMap<EncodedKey, PendingWrite> = base_pending
234					.range((range.start.as_ref(), range.end.as_ref()))
235					.map(|(k, v)| (k.clone(), v.clone()))
236					.collect();
237				for (k, v) in inner.pending.range((range.start.as_ref(), range.end.as_ref())) {
238					merged.insert(k.clone(), v.clone());
239				}
240				let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().rev().collect();
241
242				let query = match range.start.as_ref() {
243					Included(start) | Excluded(start) => {
244						if Self::is_flow_state_key(start) {
245							&inner.state_query
246						} else {
247							&inner.primitive_query
248						}
249					}
250					Unbounded => &inner.primitive_query,
251				};
252
253				let storage_iter = query.range_rev(range, batch_size);
254				let v = inner.version;
255				Box::new(flow_merge_pending_iterator_rev(pending_vec, storage_iter, v))
256			}
257		}
258	}
259}
260
261/// Iterator that merges pending writes with storage data (forward order).
262struct FlowMergePendingIterator<I>
263where
264	I: Iterator<Item = Result<MultiVersionRow>>,
265{
266	storage_iter: Peekable<I>,
267	pending_iter: Peekable<IntoIter<(EncodedKey, PendingWrite)>>,
268	version: CommitVersion,
269}
270
271impl<I> Iterator for FlowMergePendingIterator<I>
272where
273	I: Iterator<Item = Result<MultiVersionRow>>,
274{
275	type Item = Result<MultiVersionRow>;
276
277	fn next(&mut self) -> Option<Self::Item> {
278		loop {
279			let next_storage = self.storage_iter.peek();
280
281			match (self.pending_iter.peek(), next_storage) {
282				(Some((pending_key, _)), Some(storage_result)) => {
283					let storage_val = match storage_result {
284						Ok(v) => v,
285						Err(_) => {
286							// Consume the error from the iterator and propagate it
287							let err = self.storage_iter.next().unwrap();
288							return Some(err.map_err(|e| e.into()));
289						}
290					};
291					let cmp = pending_key.cmp(&storage_val.key);
292
293					if matches!(cmp, Ordering::Less) {
294						// Pending key comes first
295						let (key, value) = self.pending_iter.next().unwrap();
296						if let PendingWrite::Set(row) = value {
297							return Some(Ok(MultiVersionRow {
298								key,
299								row,
300								version: self.version,
301							}));
302						}
303						// PendingWrite::Remove = skip (tombstone), continue loop
304					} else if matches!(cmp, Ordering::Equal) {
305						// Same key - pending shadows storage
306						let (key, value) = self.pending_iter.next().unwrap();
307						self.storage_iter.next(); // Consume storage entry
308						if let PendingWrite::Set(row) = value {
309							return Some(Ok(MultiVersionRow {
310								key,
311								row,
312								version: self.version,
313							}));
314						}
315						// PendingWrite::Remove = skip (tombstone), continue loop
316					} else {
317						// Storage key comes first
318						return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
319					}
320				}
321				(Some(_), None) => {
322					// Only pending left
323					let (key, value) = self.pending_iter.next().unwrap();
324					if let PendingWrite::Set(row) = value {
325						return Some(Ok(MultiVersionRow {
326							key,
327							row,
328							version: self.version,
329						}));
330					}
331					// PendingWrite::Remove = skip (tombstone), continue loop
332				}
333				(None, Some(_)) => {
334					// Only storage left
335					return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
336				}
337				(None, None) => return None,
338			}
339		}
340	}
341}
342
343/// Create an iterator that merges pending writes with storage data (forward order).
344fn flow_merge_pending_iterator<I>(
345	pending: Vec<(EncodedKey, PendingWrite)>,
346	storage_iter: I,
347	version: CommitVersion,
348) -> FlowMergePendingIterator<I>
349where
350	I: Iterator<Item = Result<MultiVersionRow>>,
351{
352	FlowMergePendingIterator {
353		storage_iter: storage_iter.peekable(),
354		pending_iter: pending.into_iter().peekable(),
355		version,
356	}
357}
358
359/// Iterator that merges pending writes with storage data (reverse order).
360struct FlowMergePendingIteratorRev<I>
361where
362	I: Iterator<Item = Result<MultiVersionRow>>,
363{
364	storage_iter: Peekable<I>,
365	pending_iter: Peekable<IntoIter<(EncodedKey, PendingWrite)>>,
366	version: CommitVersion,
367}
368
369impl<I> Iterator for FlowMergePendingIteratorRev<I>
370where
371	I: Iterator<Item = Result<MultiVersionRow>>,
372{
373	type Item = Result<MultiVersionRow>;
374
375	fn next(&mut self) -> Option<Self::Item> {
376		loop {
377			let next_storage = self.storage_iter.peek();
378
379			match (self.pending_iter.peek(), next_storage) {
380				(Some((pending_key, _)), Some(storage_result)) => {
381					let storage_val = match storage_result {
382						Ok(v) => v,
383						Err(_) => {
384							// Consume the error from the iterator and propagate it
385							let err = self.storage_iter.next().unwrap();
386							return Some(err.map_err(|e| e.into()));
387						}
388					};
389					let cmp = pending_key.cmp(&storage_val.key);
390
391					if matches!(cmp, Ordering::Greater) {
392						// Reverse: Pending key is larger (comes first in reverse)
393						let (key, value) = self.pending_iter.next().unwrap();
394						if let PendingWrite::Set(row) = value {
395							return Some(Ok(MultiVersionRow {
396								key,
397								row,
398								version: self.version,
399							}));
400						}
401						// PendingWrite::Remove = skip (tombstone), continue loop
402					} else if matches!(cmp, Ordering::Equal) {
403						// Same key - pending shadows storage
404						let (key, value) = self.pending_iter.next().unwrap();
405						self.storage_iter.next(); // Consume storage entry
406						if let PendingWrite::Set(row) = value {
407							return Some(Ok(MultiVersionRow {
408								key,
409								row,
410								version: self.version,
411							}));
412						}
413						// PendingWrite::Remove = skip (tombstone), continue loop
414					} else {
415						// Storage key comes first in reverse order
416						return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
417					}
418				}
419				(Some(_), None) => {
420					// Only pending left
421					let (key, value) = self.pending_iter.next().unwrap();
422					if let PendingWrite::Set(row) = value {
423						return Some(Ok(MultiVersionRow {
424							key,
425							row,
426							version: self.version,
427						}));
428					}
429					// PendingWrite::Remove = skip (tombstone), continue loop
430				}
431				(None, Some(_)) => {
432					// Only storage left
433					return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
434				}
435				(None, None) => return None,
436			}
437		}
438	}
439}
440
441/// Create an iterator that merges pending writes with storage data (reverse order).
442fn flow_merge_pending_iterator_rev<I>(
443	pending: Vec<(EncodedKey, PendingWrite)>,
444	storage_iter: I,
445	version: CommitVersion,
446) -> FlowMergePendingIteratorRev<I>
447where
448	I: Iterator<Item = Result<MultiVersionRow>>,
449{
450	FlowMergePendingIteratorRev {
451		storage_iter: storage_iter.peekable(),
452		pending_iter: pending.into_iter().peekable(),
453		version,
454	}
455}
456
457#[cfg(test)]
458pub mod tests {
459	use reifydb_catalog::catalog::Catalog;
460	use reifydb_core::encoded::{
461		key::{EncodedKey, EncodedKeyRange},
462		row::EncodedRow,
463	};
464	use reifydb_engine::test_harness::TestEngine;
465	use reifydb_transaction::interceptor::interceptors::Interceptors;
466	use reifydb_type::{util::cowvec::CowVec, value::identity::IdentityId};
467
468	use super::*;
469	use crate::operator::stateful::test_utils::test::create_test_transaction;
470
471	fn make_key(s: &str) -> EncodedKey {
472		EncodedKey::new(s.as_bytes().to_vec())
473	}
474
475	fn make_value(s: &str) -> EncodedRow {
476		EncodedRow(CowVec::new(s.as_bytes().to_vec()))
477	}
478
479	#[test]
480	fn test_get_from_pending() {
481		let parent = create_test_transaction();
482		let mut txn =
483			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
484
485		let key = make_key("key1");
486		let value = make_value("value1");
487
488		txn.set(&key, value.clone()).unwrap();
489
490		// Should get value from pending buffer
491		let result = txn.get(&key).unwrap();
492		assert_eq!(result, Some(value));
493	}
494
495	#[test]
496	fn test_get_from_committed() {
497		let t = TestEngine::new();
498
499		let key = make_key("key1");
500		let value = make_value("value1");
501
502		// Set value in first transaction and commit
503		{
504			let mut cmd_txn = t.begin_admin(IdentityId::system()).unwrap();
505			cmd_txn.set(&key, value.clone()).unwrap();
506			cmd_txn.commit().unwrap();
507		}
508
509		// Create new command transaction to read committed data
510		let parent = t.begin_admin(IdentityId::system()).unwrap();
511		let version = parent.version();
512
513		// Create FlowTransaction - should see committed value
514		let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
515
516		// Should get value from query transaction
517		let result = txn.get(&key).unwrap();
518		assert_eq!(result, Some(value));
519	}
520
521	#[test]
522	fn test_get_pending_shadows_committed() {
523		let mut parent = create_test_transaction();
524
525		let key = make_key("key1");
526		parent.set(&key, make_value("old")).unwrap();
527		let version = parent.version();
528
529		let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
530
531		// Override with new value in pending
532		let new_value = make_value("new");
533		txn.set(&key, new_value.clone()).unwrap();
534
535		// Should get new value from pending, not old value from committed
536		let result = txn.get(&key).unwrap();
537		assert_eq!(result, Some(new_value));
538	}
539
540	#[test]
541	fn test_get_removed_returns_none() {
542		let mut parent = create_test_transaction();
543
544		let key = make_key("key1");
545		parent.set(&key, make_value("value1")).unwrap();
546		let version = parent.version();
547
548		let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
549
550		// Remove in pending
551		txn.remove(&key).unwrap();
552
553		// Should return None even though it exists in committed
554		let result = txn.get(&key).unwrap();
555		assert_eq!(result, None);
556	}
557
558	#[test]
559	fn test_get_nonexistent_key() {
560		let parent = create_test_transaction();
561		let mut txn =
562			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
563
564		let result = txn.get(&make_key("missing")).unwrap();
565		assert_eq!(result, None);
566	}
567
568	#[test]
569	fn test_contains_key_pending() {
570		let parent = create_test_transaction();
571		let mut txn =
572			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
573
574		let key = make_key("key1");
575		txn.set(&key, make_value("value1")).unwrap();
576
577		assert!(txn.contains_key(&key).unwrap());
578	}
579
580	#[test]
581	fn test_contains_key_committed() {
582		let t = TestEngine::new();
583
584		let key = make_key("key1");
585
586		// Set value in first transaction and commit
587		{
588			let mut cmd_txn = t.begin_admin(IdentityId::system()).unwrap();
589			cmd_txn.set(&key, make_value("value1")).unwrap();
590			cmd_txn.commit().unwrap();
591		}
592
593		// Create new command transaction
594		let parent = t.begin_admin(IdentityId::system()).unwrap();
595		let version = parent.version();
596		let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
597
598		assert!(txn.contains_key(&key).unwrap());
599	}
600
601	#[test]
602	fn test_contains_key_removed_returns_false() {
603		let mut parent = create_test_transaction();
604
605		let key = make_key("key1");
606		parent.set(&key, make_value("value1")).unwrap();
607		let version = parent.version();
608
609		let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
610		txn.remove(&key).unwrap();
611
612		assert!(!txn.contains_key(&key).unwrap());
613	}
614
615	#[test]
616	fn test_contains_key_nonexistent() {
617		let parent = create_test_transaction();
618		let mut txn =
619			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
620
621		assert!(!txn.contains_key(&make_key("missing")).unwrap());
622	}
623
624	#[test]
625	fn test_scan_empty() {
626		let parent = create_test_transaction();
627		let mut txn =
628			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
629
630		let mut iter = txn.range(EncodedKeyRange::all(), 1024);
631		assert!(iter.next().is_none());
632	}
633
634	#[test]
635	fn test_scan_only_pending() {
636		let parent = create_test_transaction();
637		let mut txn =
638			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
639
640		txn.set(&make_key("b"), make_value("2")).unwrap();
641		txn.set(&make_key("a"), make_value("1")).unwrap();
642		txn.set(&make_key("c"), make_value("3")).unwrap();
643
644		let items: Vec<_> = txn.range(EncodedKeyRange::all(), 1024).collect::<Result<Vec<_>>>().unwrap();
645
646		// Should be in sorted order
647		assert_eq!(items.len(), 3);
648		assert_eq!(items[0].key, make_key("a"));
649		assert_eq!(items[1].key, make_key("b"));
650		assert_eq!(items[2].key, make_key("c"));
651	}
652
653	#[test]
654	fn test_scan_filters_removes() {
655		let parent = create_test_transaction();
656		let mut txn =
657			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
658
659		txn.set(&make_key("a"), make_value("1")).unwrap();
660		txn.remove(&make_key("b")).unwrap();
661		txn.set(&make_key("c"), make_value("3")).unwrap();
662
663		let items: Vec<_> = txn.range(EncodedKeyRange::all(), 1024).collect::<Result<Vec<_>>>().unwrap();
664
665		// Should only have 2 items (remove filtered out)
666		assert_eq!(items.len(), 2);
667		assert_eq!(items[0].key, make_key("a"));
668		assert_eq!(items[1].key, make_key("c"));
669	}
670
671	#[test]
672	fn test_range_empty() {
673		let parent = create_test_transaction();
674		let mut txn =
675			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
676
677		let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
678		let mut iter = txn.range(range, 1024);
679		assert!(iter.next().is_none());
680	}
681
682	#[test]
683	fn test_range_only_pending() {
684		let parent = create_test_transaction();
685		let mut txn =
686			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
687
688		txn.set(&make_key("a"), make_value("1")).unwrap();
689		txn.set(&make_key("b"), make_value("2")).unwrap();
690		txn.set(&make_key("c"), make_value("3")).unwrap();
691		txn.set(&make_key("d"), make_value("4")).unwrap();
692
693		let range = EncodedKeyRange::new(Included(make_key("b")), Excluded(make_key("d")));
694		let items: Vec<_> = txn.range(range, 1024).collect::<Result<Vec<_>>>().unwrap();
695
696		// Should only include b and c (not d, exclusive end)
697		assert_eq!(items.len(), 2);
698		assert_eq!(items[0].key, make_key("b"));
699		assert_eq!(items[1].key, make_key("c"));
700	}
701
702	#[test]
703	fn test_prefix_empty() {
704		let parent = create_test_transaction();
705		let mut txn =
706			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
707
708		let prefix = make_key("test_");
709		let iter = txn.prefix(&prefix).unwrap();
710		assert!(iter.items.into_iter().next().is_none());
711	}
712
713	#[test]
714	fn test_prefix_only_pending() {
715		let parent = create_test_transaction();
716		let mut txn =
717			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
718
719		txn.set(&make_key("test_a"), make_value("1")).unwrap();
720		txn.set(&make_key("test_b"), make_value("2")).unwrap();
721		txn.set(&make_key("other_c"), make_value("3")).unwrap();
722
723		let prefix = make_key("test_");
724		let iter = txn.prefix(&prefix).unwrap();
725		let items: Vec<_> = iter.items.into_iter().collect();
726
727		// Should only include keys with prefix "test_"
728		assert_eq!(items.len(), 2);
729		assert_eq!(items[0].key, make_key("test_a"));
730		assert_eq!(items[1].key, make_key("test_b"));
731	}
732}