Skip to main content

reifydb_sub_flow/transaction/
read.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::ops::Bound::{Excluded, Included, Unbounded};
5
6use reifydb_core::{
7	common::CommitVersion,
8	encoded::{
9		encoded::EncodedValues,
10		key::{EncodedKey, EncodedKeyRange},
11	},
12	interface::store::{MultiVersionBatch, MultiVersionValues},
13	key::{Key, kind::KeyKind},
14};
15
16use super::{FlowTransaction, Pending};
17
18impl FlowTransaction {
19	/// Get a value by key, checking pending writes first, then querying multi-version store
20	pub fn get(&mut self, key: &EncodedKey) -> reifydb_type::Result<Option<EncodedValues>> {
21		if self.pending.is_removed(key) {
22			return Ok(None);
23		}
24
25		if let Some(value) = self.pending.get(key) {
26			return Ok(Some(value.clone()));
27		}
28
29		let query = if Self::is_flow_state_key(key) {
30			&mut self.state_query
31		} else {
32			&mut self.primitive_query
33		};
34
35		match query.get(key)? {
36			Some(multi) => Ok(Some(multi.values().clone())),
37			None => Ok(None),
38		}
39	}
40
41	/// Check if a key exists
42	pub fn contains_key(&mut self, key: &EncodedKey) -> reifydb_type::Result<bool> {
43		if self.pending.is_removed(key) {
44			return Ok(false);
45		}
46
47		if self.pending.get(key).is_some() {
48			return Ok(true);
49		}
50
51		let query = if Self::is_flow_state_key(key) {
52			&mut self.state_query
53		} else {
54			&mut self.primitive_query
55		};
56
57		query.contains_key(key)
58	}
59
60	/// Prefix scan
61	pub fn prefix(&mut self, prefix: &EncodedKey) -> reifydb_type::Result<MultiVersionBatch> {
62		let range = EncodedKeyRange::prefix(prefix);
63		let items = self.range(range, 1024).collect::<Result<Vec<_>, _>>()?;
64		Ok(MultiVersionBatch {
65			items,
66			has_more: false,
67		})
68	}
69
70	fn is_flow_state_key(key: &EncodedKey) -> bool {
71		match Key::kind(&key) {
72			None => false,
73			Some(kind) => match kind {
74				KeyKind::FlowNodeState => true,
75				KeyKind::FlowNodeInternalState => true,
76				_ => false,
77			},
78		}
79	}
80
81	/// Create an iterator for forward range queries.
82	///
83	/// This properly handles high version density by scanning until batch_size
84	/// unique logical keys are collected. The iterator yields individual entries
85	/// and maintains cursor state internally. Pending writes are merged with
86	/// committed storage data.
87	pub fn range(
88		&mut self,
89		range: EncodedKeyRange,
90		batch_size: usize,
91	) -> Box<dyn Iterator<Item = reifydb_type::Result<MultiVersionValues>> + Send + '_> {
92		// Collect pending writes in range as owned data
93		let pending: Vec<(EncodedKey, Pending)> = self
94			.pending
95			.range((range.start.as_ref(), range.end.as_ref()))
96			.map(|(k, v)| (k.clone(), v.clone()))
97			.collect();
98
99		let query = match range.start.as_ref() {
100			Included(start) | Excluded(start) => {
101				if Self::is_flow_state_key(start) {
102					&self.state_query
103				} else {
104					&self.primitive_query
105				}
106			}
107			Unbounded => &self.primitive_query,
108		};
109
110		let storage_iter = query.range(range, batch_size);
111		let version = self.version;
112
113		Box::new(flow_merge_pending_iterator(pending, storage_iter, version))
114	}
115
116	/// Create an iterator for reverse range queries.
117	///
118	/// This properly handles high version density by scanning until batch_size
119	/// unique logical keys are collected. The iterator yields individual entries
120	/// in reverse key order and maintains cursor state internally.
121	pub fn range_rev(
122		&mut self,
123		range: EncodedKeyRange,
124		batch_size: usize,
125	) -> Box<dyn Iterator<Item = reifydb_type::Result<MultiVersionValues>> + Send + '_> {
126		// Collect pending writes in range as owned data (reversed)
127		let pending: Vec<(EncodedKey, Pending)> = self
128			.pending
129			.range((range.start.as_ref(), range.end.as_ref()))
130			.rev()
131			.map(|(k, v)| (k.clone(), v.clone()))
132			.collect();
133
134		let query = match range.start.as_ref() {
135			Included(start) | Excluded(start) => {
136				if Self::is_flow_state_key(start) {
137					&self.state_query
138				} else {
139					&self.primitive_query
140				}
141			}
142			Unbounded => &self.primitive_query,
143		};
144
145		let storage_iter = query.range_rev(range, batch_size);
146		let version = self.version;
147
148		Box::new(flow_merge_pending_iterator_rev(pending, storage_iter, version))
149	}
150}
151
152/// Iterator that merges pending writes with storage data (forward order).
153struct FlowMergePendingIterator<I>
154where
155	I: Iterator<Item = reifydb_type::Result<MultiVersionValues>>,
156{
157	storage_iter: std::iter::Peekable<I>,
158	pending_iter: std::iter::Peekable<std::vec::IntoIter<(EncodedKey, Pending)>>,
159	version: CommitVersion,
160}
161
162impl<I> Iterator for FlowMergePendingIterator<I>
163where
164	I: Iterator<Item = reifydb_type::Result<MultiVersionValues>>,
165{
166	type Item = reifydb_type::Result<MultiVersionValues>;
167
168	fn next(&mut self) -> Option<Self::Item> {
169		use std::cmp::Ordering;
170
171		loop {
172			let next_storage = self.storage_iter.peek();
173
174			match (self.pending_iter.peek(), next_storage) {
175				(Some((pending_key, _)), Some(storage_result)) => {
176					let storage_val = match storage_result {
177						Ok(v) => v,
178						Err(_) => {
179							// Consume the error from the iterator and propagate it
180							let err = self.storage_iter.next().unwrap();
181							return Some(err.map_err(|e| e.into()));
182						}
183					};
184					let cmp = pending_key.cmp(&storage_val.key);
185
186					if matches!(cmp, Ordering::Less) {
187						// Pending key comes first
188						let (key, value) = self.pending_iter.next().unwrap();
189						if let Pending::Set(values) = value {
190							return Some(Ok(MultiVersionValues {
191								key,
192								values,
193								version: self.version,
194							}));
195						}
196						// Pending::Remove = skip (tombstone), continue loop
197					} else if matches!(cmp, Ordering::Equal) {
198						// Same key - pending shadows storage
199						let (key, value) = self.pending_iter.next().unwrap();
200						self.storage_iter.next(); // Consume storage entry
201						if let Pending::Set(values) = value {
202							return Some(Ok(MultiVersionValues {
203								key,
204								values,
205								version: self.version,
206							}));
207						}
208						// Pending::Remove = skip (tombstone), continue loop
209					} else {
210						// Storage key comes first
211						return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
212					}
213				}
214				(Some(_), None) => {
215					// Only pending left
216					let (key, value) = self.pending_iter.next().unwrap();
217					if let Pending::Set(values) = value {
218						return Some(Ok(MultiVersionValues {
219							key,
220							values,
221							version: self.version,
222						}));
223					}
224					// Pending::Remove = skip (tombstone), continue loop
225				}
226				(None, Some(_)) => {
227					// Only storage left
228					return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
229				}
230				(None, None) => return None,
231			}
232		}
233	}
234}
235
236/// Create an iterator that merges pending writes with storage data (forward order).
237fn flow_merge_pending_iterator<I>(
238	pending: Vec<(EncodedKey, Pending)>,
239	storage_iter: I,
240	version: CommitVersion,
241) -> FlowMergePendingIterator<I>
242where
243	I: Iterator<Item = reifydb_type::Result<MultiVersionValues>>,
244{
245	FlowMergePendingIterator {
246		storage_iter: storage_iter.peekable(),
247		pending_iter: pending.into_iter().peekable(),
248		version,
249	}
250}
251
252/// Iterator that merges pending writes with storage data (reverse order).
253struct FlowMergePendingIteratorRev<I>
254where
255	I: Iterator<Item = reifydb_type::Result<MultiVersionValues>>,
256{
257	storage_iter: std::iter::Peekable<I>,
258	pending_iter: std::iter::Peekable<std::vec::IntoIter<(EncodedKey, Pending)>>,
259	version: CommitVersion,
260}
261
262impl<I> Iterator for FlowMergePendingIteratorRev<I>
263where
264	I: Iterator<Item = reifydb_type::Result<MultiVersionValues>>,
265{
266	type Item = reifydb_type::Result<MultiVersionValues>;
267
268	fn next(&mut self) -> Option<Self::Item> {
269		use std::cmp::Ordering;
270
271		loop {
272			let next_storage = self.storage_iter.peek();
273
274			match (self.pending_iter.peek(), next_storage) {
275				(Some((pending_key, _)), Some(storage_result)) => {
276					let storage_val = match storage_result {
277						Ok(v) => v,
278						Err(_) => {
279							// Consume the error from the iterator and propagate it
280							let err = self.storage_iter.next().unwrap();
281							return Some(err.map_err(|e| e.into()));
282						}
283					};
284					let cmp = pending_key.cmp(&storage_val.key);
285
286					if matches!(cmp, Ordering::Greater) {
287						// Reverse: Pending key is larger (comes first in reverse)
288						let (key, value) = self.pending_iter.next().unwrap();
289						if let Pending::Set(values) = value {
290							return Some(Ok(MultiVersionValues {
291								key,
292								values,
293								version: self.version,
294							}));
295						}
296						// Pending::Remove = skip (tombstone), continue loop
297					} else if matches!(cmp, Ordering::Equal) {
298						// Same key - pending shadows storage
299						let (key, value) = self.pending_iter.next().unwrap();
300						self.storage_iter.next(); // Consume storage entry
301						if let Pending::Set(values) = value {
302							return Some(Ok(MultiVersionValues {
303								key,
304								values,
305								version: self.version,
306							}));
307						}
308						// Pending::Remove = skip (tombstone), continue loop
309					} else {
310						// Storage key comes first in reverse order
311						return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
312					}
313				}
314				(Some(_), None) => {
315					// Only pending left
316					let (key, value) = self.pending_iter.next().unwrap();
317					if let Pending::Set(values) = value {
318						return Some(Ok(MultiVersionValues {
319							key,
320							values,
321							version: self.version,
322						}));
323					}
324					// Pending::Remove = skip (tombstone), continue loop
325				}
326				(None, Some(_)) => {
327					// Only storage left
328					return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
329				}
330				(None, None) => return None,
331			}
332		}
333	}
334}
335
336/// Create an iterator that merges pending writes with storage data (reverse order).
337fn flow_merge_pending_iterator_rev<I>(
338	pending: Vec<(EncodedKey, Pending)>,
339	storage_iter: I,
340	version: CommitVersion,
341) -> FlowMergePendingIteratorRev<I>
342where
343	I: Iterator<Item = reifydb_type::Result<MultiVersionValues>>,
344{
345	FlowMergePendingIteratorRev {
346		storage_iter: storage_iter.peekable(),
347		pending_iter: pending.into_iter().peekable(),
348		version,
349	}
350}
351
352#[cfg(test)]
353pub mod tests {
354	use reifydb_catalog::catalog::Catalog;
355	use reifydb_core::encoded::{
356		encoded::EncodedValues,
357		key::{EncodedKey, EncodedKeyRange},
358	};
359	use reifydb_engine::test_utils::create_test_engine;
360	use reifydb_transaction::interceptor::interceptors::Interceptors;
361	use reifydb_type::util::cowvec::CowVec;
362
363	use super::*;
364	use crate::operator::stateful::test_utils::test::create_test_transaction;
365
366	fn make_key(s: &str) -> EncodedKey {
367		EncodedKey::new(s.as_bytes().to_vec())
368	}
369
370	fn make_value(s: &str) -> EncodedValues {
371		EncodedValues(CowVec::new(s.as_bytes().to_vec()))
372	}
373
374	#[test]
375	fn test_get_from_pending() {
376		let parent = create_test_transaction();
377		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
378
379		let key = make_key("key1");
380		let value = make_value("value1");
381
382		txn.set(&key, value.clone()).unwrap();
383
384		// Should get value from pending buffer
385		let result = txn.get(&key).unwrap();
386		assert_eq!(result, Some(value));
387	}
388
389	#[test]
390	fn test_get_from_committed() {
391		let engine = create_test_engine();
392
393		let key = make_key("key1");
394		let value = make_value("value1");
395
396		// Set value in first transaction and commit
397		{
398			let mut cmd_txn = engine.begin_admin().unwrap();
399			cmd_txn.set(&key, value.clone()).unwrap();
400			cmd_txn.commit().unwrap();
401		}
402
403		// Create new command transaction to read committed data
404		let parent = engine.begin_admin().unwrap();
405		let version = parent.version();
406
407		// Create FlowTransaction - should see committed value
408		let mut txn = FlowTransaction::new(&parent, version, Catalog::testing(), Interceptors::new());
409
410		// Should get value from query transaction
411		let result = txn.get(&key).unwrap();
412		assert_eq!(result, Some(value));
413	}
414
415	#[test]
416	fn test_get_pending_shadows_committed() {
417		let mut parent = create_test_transaction();
418
419		let key = make_key("key1");
420		parent.set(&key, make_value("old")).unwrap();
421		let version = parent.version();
422
423		let mut txn = FlowTransaction::new(&parent, version, Catalog::testing(), Interceptors::new());
424
425		// Override with new value in pending
426		let new_value = make_value("new");
427		txn.set(&key, new_value.clone()).unwrap();
428
429		// Should get new value from pending, not old value from committed
430		let result = txn.get(&key).unwrap();
431		assert_eq!(result, Some(new_value));
432	}
433
434	#[test]
435	fn test_get_removed_returns_none() {
436		let mut parent = create_test_transaction();
437
438		let key = make_key("key1");
439		parent.set(&key, make_value("value1")).unwrap();
440		let version = parent.version();
441
442		let mut txn = FlowTransaction::new(&parent, version, Catalog::testing(), Interceptors::new());
443
444		// Remove in pending
445		txn.remove(&key).unwrap();
446
447		// Should return None even though it exists in committed
448		let result = txn.get(&key).unwrap();
449		assert_eq!(result, None);
450	}
451
452	#[test]
453	fn test_get_nonexistent_key() {
454		let parent = create_test_transaction();
455		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
456
457		let result = txn.get(&make_key("missing")).unwrap();
458		assert_eq!(result, None);
459	}
460
461	#[test]
462	fn test_contains_key_pending() {
463		let parent = create_test_transaction();
464		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
465
466		let key = make_key("key1");
467		txn.set(&key, make_value("value1")).unwrap();
468
469		assert!(txn.contains_key(&key).unwrap());
470	}
471
472	#[test]
473	fn test_contains_key_committed() {
474		let engine = create_test_engine();
475
476		let key = make_key("key1");
477
478		// Set value in first transaction and commit
479		{
480			let mut cmd_txn = engine.begin_admin().unwrap();
481			cmd_txn.set(&key, make_value("value1")).unwrap();
482			cmd_txn.commit().unwrap();
483		}
484
485		// Create new command transaction
486		let parent = engine.begin_admin().unwrap();
487		let version = parent.version();
488		let mut txn = FlowTransaction::new(&parent, version, Catalog::testing(), Interceptors::new());
489
490		assert!(txn.contains_key(&key).unwrap());
491	}
492
493	#[test]
494	fn test_contains_key_removed_returns_false() {
495		let mut parent = create_test_transaction();
496
497		let key = make_key("key1");
498		parent.set(&key, make_value("value1")).unwrap();
499		let version = parent.version();
500
501		let mut txn = FlowTransaction::new(&parent, version, Catalog::testing(), Interceptors::new());
502		txn.remove(&key).unwrap();
503
504		assert!(!txn.contains_key(&key).unwrap());
505	}
506
507	#[test]
508	fn test_contains_key_nonexistent() {
509		let parent = create_test_transaction();
510		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
511
512		assert!(!txn.contains_key(&make_key("missing")).unwrap());
513	}
514
515	#[test]
516	fn test_scan_empty() {
517		let parent = create_test_transaction();
518		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
519
520		let mut iter = txn.range(EncodedKeyRange::all(), 1024);
521		assert!(iter.next().is_none());
522	}
523
524	#[test]
525	fn test_scan_only_pending() {
526		let parent = create_test_transaction();
527		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
528
529		txn.set(&make_key("b"), make_value("2")).unwrap();
530		txn.set(&make_key("a"), make_value("1")).unwrap();
531		txn.set(&make_key("c"), make_value("3")).unwrap();
532
533		let items: Vec<_> = txn.range(EncodedKeyRange::all(), 1024).collect::<Result<Vec<_>, _>>().unwrap();
534
535		// Should be in sorted order
536		assert_eq!(items.len(), 3);
537		assert_eq!(items[0].key, make_key("a"));
538		assert_eq!(items[1].key, make_key("b"));
539		assert_eq!(items[2].key, make_key("c"));
540	}
541
542	#[test]
543	fn test_scan_filters_removes() {
544		let parent = create_test_transaction();
545		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
546
547		txn.set(&make_key("a"), make_value("1")).unwrap();
548		txn.remove(&make_key("b")).unwrap();
549		txn.set(&make_key("c"), make_value("3")).unwrap();
550
551		let items: Vec<_> = txn.range(EncodedKeyRange::all(), 1024).collect::<Result<Vec<_>, _>>().unwrap();
552
553		// Should only have 2 items (remove filtered out)
554		assert_eq!(items.len(), 2);
555		assert_eq!(items[0].key, make_key("a"));
556		assert_eq!(items[1].key, make_key("c"));
557	}
558
559	#[test]
560	fn test_range_empty() {
561		let parent = create_test_transaction();
562		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
563
564		let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
565		let mut iter = txn.range(range, 1024);
566		assert!(iter.next().is_none());
567	}
568
569	#[test]
570	fn test_range_only_pending() {
571		let parent = create_test_transaction();
572		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
573
574		txn.set(&make_key("a"), make_value("1")).unwrap();
575		txn.set(&make_key("b"), make_value("2")).unwrap();
576		txn.set(&make_key("c"), make_value("3")).unwrap();
577		txn.set(&make_key("d"), make_value("4")).unwrap();
578
579		let range = EncodedKeyRange::new(Included(make_key("b")), Excluded(make_key("d")));
580		let items: Vec<_> = txn.range(range, 1024).collect::<Result<Vec<_>, _>>().unwrap();
581
582		// Should only include b and c (not d, exclusive end)
583		assert_eq!(items.len(), 2);
584		assert_eq!(items[0].key, make_key("b"));
585		assert_eq!(items[1].key, make_key("c"));
586	}
587
588	#[test]
589	fn test_prefix_empty() {
590		let parent = create_test_transaction();
591		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
592
593		let prefix = make_key("test_");
594		let iter = txn.prefix(&prefix).unwrap();
595		assert!(iter.items.into_iter().next().is_none());
596	}
597
598	#[test]
599	fn test_prefix_only_pending() {
600		let parent = create_test_transaction();
601		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
602
603		txn.set(&make_key("test_a"), make_value("1")).unwrap();
604		txn.set(&make_key("test_b"), make_value("2")).unwrap();
605		txn.set(&make_key("other_c"), make_value("3")).unwrap();
606
607		let prefix = make_key("test_");
608		let iter = txn.prefix(&prefix).unwrap();
609		let items: Vec<_> = iter.items.into_iter().collect();
610
611		// Should only include keys with prefix "test_"
612		assert_eq!(items.len(), 2);
613		assert_eq!(items[0].key, make_key("test_a"));
614		assert_eq!(items[1].key, make_key("test_b"));
615	}
616}