Skip to main content

reifydb_sub_flow/transaction/
read.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	cmp::Ordering,
6	collections, iter,
7	ops::{
8		Bound::{Excluded, Included, Unbounded},
9		RangeBounds,
10	},
11	vec,
12};
13
14use collections::BTreeMap;
15use iter::Peekable;
16use reifydb_core::{
17	actors::pending::PendingWrite,
18	common::CommitVersion,
19	encoded::{
20		key::{EncodedKey, EncodedKeyRange},
21		row::EncodedRow,
22	},
23	interface::store::{MultiVersionBatch, MultiVersionRow},
24	key::{Key, kind::KeyKind},
25};
26use reifydb_type::Result;
27use vec::IntoIter;
28
29use super::FlowTransaction;
30
31/// Determines which query snapshot to read from inside a FlowTransaction.
32pub(crate) enum ReadFrom {
33	/// Snapshots at the latest committed version. Use for keys whose values
34	/// are produced by flow processing and must reflect writes committed by
35	/// previous CDC batches (operator state, ringbuffer metadata, series metadata).
36	StateQuery,
37	/// Snapshots at the CDC event version. Use for keys whose values are the
38	/// source data being consumed - the original table rows that triggered
39	/// the CDC event, plus catalog/schema metadata the flow reads but never writes.
40	Query,
41}
42
43impl FlowTransaction {
44	/// Get a value by key, checking pending writes first, then (if transactional) base_pending, then querying
45	/// multi-version store
46	pub fn get(&mut self, key: &EncodedKey) -> Result<Option<EncodedRow>> {
47		// 1. Check flow-generated pending writes
48		let inner = self.inner();
49		if inner.pending.is_removed(key) {
50			return Ok(None);
51		}
52		if let Some(value) = inner.pending.get(key) {
53			return Ok(Some(value.clone()));
54		}
55
56		// 2. Check transaction's base writes (only for Transactional variant)
57		if let Self::Transactional {
58			base_pending,
59			..
60		} = self
61		{
62			if base_pending.is_removed(key) {
63				return Ok(None);
64			}
65			if let Some(value) = base_pending.get(key) {
66				return Ok(Some(value.clone()));
67			}
68		}
69
70		// 3. Ephemeral: state keys from HashMap, source keys from query
71		if let Self::Ephemeral {
72			inner,
73			state,
74		} = self
75		{
76			return match Self::read_from(key) {
77				ReadFrom::StateQuery => Ok(state.get(key).cloned()),
78				ReadFrom::Query => match inner.query.get(key)? {
79					Some(multi) => Ok(Some(multi.row().clone())),
80					None => Ok(None),
81				},
82			};
83		}
84
85		// 4. Fall through to committed storage
86		let inner = self.inner_mut();
87		let query = match Self::read_from(key) {
88			ReadFrom::StateQuery => inner.state_query.as_ref().unwrap(),
89			ReadFrom::Query => &inner.query,
90		};
91		match query.get(key)? {
92			Some(multi) => Ok(Some(multi.row().clone())),
93			None => Ok(None),
94		}
95	}
96
97	/// Check if a key exists
98	pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
99		let inner = self.inner();
100		if inner.pending.is_removed(key) {
101			return Ok(false);
102		}
103		if inner.pending.get(key).is_some() {
104			return Ok(true);
105		}
106
107		if let Self::Transactional {
108			base_pending,
109			..
110		} = self
111		{
112			if base_pending.is_removed(key) {
113				return Ok(false);
114			}
115			if base_pending.get(key).is_some() {
116				return Ok(true);
117			}
118		}
119
120		// Ephemeral: state keys from HashMap, source keys from query
121		if let Self::Ephemeral {
122			inner,
123			state,
124		} = self
125		{
126			return match Self::read_from(key) {
127				ReadFrom::StateQuery => Ok(state.contains_key(key)),
128				ReadFrom::Query => inner.query.contains_key(key),
129			};
130		}
131
132		let inner = self.inner_mut();
133		let query = match Self::read_from(key) {
134			ReadFrom::StateQuery => inner.state_query.as_ref().unwrap(),
135			ReadFrom::Query => &inner.query,
136		};
137		query.contains_key(key)
138	}
139
140	/// Prefix scan
141	pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
142		let range = EncodedKeyRange::prefix(prefix);
143		let items = self.range(range, 1024).collect::<Result<Vec<_>>>()?;
144		Ok(MultiVersionBatch {
145			items,
146			has_more: false,
147		})
148	}
149
150	pub(crate) fn read_from(key: &EncodedKey) -> ReadFrom {
151		match Key::kind(key) {
152			None => ReadFrom::Query,
153			Some(kind) => match kind {
154				// Flow-produced state - read from state_query so subsequent
155				// CDC batches see the prior batch's committed writes.
156				KeyKind::FlowNodeState => ReadFrom::StateQuery,
157				KeyKind::FlowNodeInternalState => ReadFrom::StateQuery,
158				KeyKind::RingBufferMetadata => ReadFrom::StateQuery,
159				KeyKind::SeriesMetadata => ReadFrom::StateQuery,
160
161				// Row keys are used for both source tables and flow-produced views.
162				// Sink operators write view/ringbuffer rows but never read them
163				// back via get() - they only read metadata and operator state.
164				// Source table rows must come from query (CDC event version).
165				// If a future operator needs to read back its own row output
166				// across CDC batches, this will need finer-grained routing
167				// based on the ShapeId embedded in the RowKey.
168				KeyKind::Row => ReadFrom::Query,
169
170				// Source data & catalog metadata - read from query
171				KeyKind::Namespace => ReadFrom::Query,
172				KeyKind::Table => ReadFrom::Query,
173				KeyKind::NamespaceTable => ReadFrom::Query,
174				KeyKind::SystemSequence => ReadFrom::Query,
175				KeyKind::Columns => ReadFrom::Query,
176				KeyKind::Column => ReadFrom::Query,
177				KeyKind::RowSequence => ReadFrom::Query,
178				KeyKind::ColumnProperty => ReadFrom::Query,
179				KeyKind::SystemVersion => ReadFrom::Query,
180				KeyKind::TransactionVersion => ReadFrom::Query,
181				KeyKind::Index => ReadFrom::Query,
182				KeyKind::IndexEntry => ReadFrom::Query,
183				KeyKind::ColumnSequence => ReadFrom::Query,
184				KeyKind::CdcConsumer => ReadFrom::Query,
185				KeyKind::View => ReadFrom::Query,
186				KeyKind::NamespaceView => ReadFrom::Query,
187				KeyKind::PrimaryKey => ReadFrom::Query,
188				KeyKind::RingBuffer => ReadFrom::Query,
189				KeyKind::NamespaceRingBuffer => ReadFrom::Query,
190				KeyKind::ShapeRetentionStrategy => ReadFrom::Query,
191				KeyKind::OperatorRetentionStrategy => ReadFrom::Query,
192				KeyKind::Flow => ReadFrom::Query,
193				KeyKind::NamespaceFlow => ReadFrom::Query,
194				KeyKind::FlowNode => ReadFrom::Query,
195				KeyKind::FlowNodeByFlow => ReadFrom::Query,
196				KeyKind::FlowEdge => ReadFrom::Query,
197				KeyKind::FlowEdgeByFlow => ReadFrom::Query,
198				KeyKind::Dictionary => ReadFrom::Query,
199				KeyKind::DictionaryEntry => ReadFrom::Query,
200				KeyKind::DictionaryEntryIndex => ReadFrom::Query,
201				KeyKind::NamespaceDictionary => ReadFrom::Query,
202				KeyKind::DictionarySequence => ReadFrom::Query,
203				KeyKind::Metric => ReadFrom::Query,
204				KeyKind::FlowVersion => ReadFrom::Query,
205				KeyKind::Subscription => ReadFrom::Query,
206				KeyKind::SubscriptionRow => ReadFrom::Query,
207				KeyKind::SubscriptionColumn => ReadFrom::Query,
208				KeyKind::Shape => ReadFrom::Query,
209				KeyKind::RowShapeField => ReadFrom::Query,
210				KeyKind::SumType => ReadFrom::Query,
211				KeyKind::NamespaceSumType => ReadFrom::Query,
212				KeyKind::Handler => ReadFrom::Query,
213				KeyKind::NamespaceHandler => ReadFrom::Query,
214				KeyKind::VariantHandler => ReadFrom::Query,
215				KeyKind::Series => ReadFrom::Query,
216				KeyKind::NamespaceSeries => ReadFrom::Query,
217				KeyKind::Identity => ReadFrom::Query,
218				KeyKind::Role => ReadFrom::Query,
219				KeyKind::GrantedRole => ReadFrom::Query,
220				KeyKind::Policy => ReadFrom::Query,
221				KeyKind::PolicyOp => ReadFrom::Query,
222				KeyKind::Migration => ReadFrom::Query,
223				KeyKind::MigrationEvent => ReadFrom::Query,
224				KeyKind::Authentication => ReadFrom::Query,
225				KeyKind::ConfigStorage => ReadFrom::Query,
226				KeyKind::Token => ReadFrom::Query,
227				KeyKind::Source => ReadFrom::Query,
228				KeyKind::NamespaceSource => ReadFrom::Query,
229				KeyKind::Sink => ReadFrom::Query,
230				KeyKind::NamespaceSink => ReadFrom::Query,
231				KeyKind::SourceCheckpoint => ReadFrom::Query,
232				KeyKind::RowTtl => ReadFrom::Query,
233				KeyKind::Procedure => ReadFrom::Query,
234				KeyKind::NamespaceProcedure => ReadFrom::Query,
235				KeyKind::ProcedureParam => ReadFrom::Query,
236				KeyKind::Binding => ReadFrom::Query,
237				KeyKind::NamespaceBinding => ReadFrom::Query,
238			},
239		}
240	}
241
242	/// Create an iterator for forward range queries.
243	///
244	/// This properly handles high version density by scanning until batch_size
245	/// unique logical keys are collected. The iterator yields individual entries
246	/// and maintains cursor state internally. Pending writes are merged with
247	/// committed storage data.
248	pub fn range(
249		&mut self,
250		range: EncodedKeyRange,
251		batch_size: usize,
252	) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_> {
253		match self {
254			Self::Deferred {
255				inner,
256				..
257			} => {
258				let merged: BTreeMap<EncodedKey, PendingWrite> = inner
259					.pending
260					.range((range.start.as_ref(), range.end.as_ref()))
261					.map(|(k, v)| (k.clone(), v.clone()))
262					.collect();
263				let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().collect();
264
265				let query = match range.start.as_ref() {
266					Included(start) | Excluded(start) => match Self::read_from(start) {
267						ReadFrom::StateQuery => inner.state_query.as_ref().unwrap(),
268						ReadFrom::Query => &inner.query,
269					},
270					Unbounded => &inner.query,
271				};
272
273				let storage_iter = query.range(range, batch_size);
274				let v = inner.version;
275				Box::new(flow_merge_pending_iterator(pending_vec, storage_iter, v))
276			}
277			Self::Transactional {
278				inner,
279				base_pending,
280				..
281			} => {
282				// Collect base layer entries in range, then let flow pending shadow base for same keys
283				let mut merged: BTreeMap<EncodedKey, PendingWrite> = base_pending
284					.range((range.start.as_ref(), range.end.as_ref()))
285					.map(|(k, v)| (k.clone(), v.clone()))
286					.collect();
287				for (k, v) in inner.pending.range((range.start.as_ref(), range.end.as_ref())) {
288					merged.insert(k.clone(), v.clone());
289				}
290				let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().collect();
291
292				let query = match range.start.as_ref() {
293					Included(start) | Excluded(start) => match Self::read_from(start) {
294						ReadFrom::StateQuery => inner.state_query.as_ref().unwrap(),
295						ReadFrom::Query => &inner.query,
296					},
297					Unbounded => &inner.query,
298				};
299
300				let storage_iter = query.range(range, batch_size);
301				let v = inner.version;
302				Box::new(flow_merge_pending_iterator(pending_vec, storage_iter, v))
303			}
304			Self::Ephemeral {
305				inner,
306				state,
307			} => {
308				// Merge pending writes with ephemeral state or query
309				let is_state_range = match range.start.as_ref() {
310					Included(start) | Excluded(start) => {
311						matches!(Self::read_from(start), ReadFrom::StateQuery)
312					}
313					Unbounded => false,
314				};
315
316				let merged: BTreeMap<EncodedKey, PendingWrite> = inner
317					.pending
318					.range((range.start.as_ref(), range.end.as_ref()))
319					.map(|(k, v)| (k.clone(), v.clone()))
320					.collect();
321				let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().collect();
322
323				if is_state_range {
324					// State keys: build iterator from state HashMap
325					let state_items: Vec<Result<MultiVersionRow>> = state
326						.iter()
327						.filter(|(k, _)| range.contains(k))
328						.map(|(k, v)| {
329							Ok(MultiVersionRow {
330								key: k.clone(),
331								row: v.clone(),
332								version: inner.version,
333							})
334						})
335						.collect();
336					let v = inner.version;
337					// Sort state items by key for merge iterator
338					let mut sorted_items = state_items;
339					sorted_items.sort_by(|a, b| match (a, b) {
340						(Ok(a), Ok(b)) => a.key.cmp(&b.key),
341						_ => Ordering::Equal,
342					});
343					Box::new(flow_merge_pending_iterator(pending_vec, sorted_items.into_iter(), v))
344				} else {
345					// Source data: delegate to query
346					let storage_iter = inner.query.range(range, batch_size);
347					let v = inner.version;
348					Box::new(flow_merge_pending_iterator(pending_vec, storage_iter, v))
349				}
350			}
351		}
352	}
353
354	/// Create an iterator for reverse range queries.
355	///
356	/// This properly handles high version density by scanning until batch_size
357	/// unique logical keys are collected. The iterator yields individual entries
358	/// in reverse key order and maintains cursor state internally.
359	pub fn range_rev(
360		&mut self,
361		range: EncodedKeyRange,
362		batch_size: usize,
363	) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_> {
364		match self {
365			Self::Deferred {
366				inner,
367				..
368			} => {
369				let merged: BTreeMap<EncodedKey, PendingWrite> = inner
370					.pending
371					.range((range.start.as_ref(), range.end.as_ref()))
372					.map(|(k, v)| (k.clone(), v.clone()))
373					.collect();
374				let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().rev().collect();
375
376				let query = match range.start.as_ref() {
377					Included(start) | Excluded(start) => match Self::read_from(start) {
378						ReadFrom::StateQuery => inner.state_query.as_ref().unwrap(),
379						ReadFrom::Query => &inner.query,
380					},
381					Unbounded => &inner.query,
382				};
383
384				let storage_iter = query.range_rev(range, batch_size);
385				let v = inner.version;
386				Box::new(flow_merge_pending_iterator_rev(pending_vec, storage_iter, v))
387			}
388			Self::Transactional {
389				inner,
390				base_pending,
391				..
392			} => {
393				let mut merged: BTreeMap<EncodedKey, PendingWrite> = base_pending
394					.range((range.start.as_ref(), range.end.as_ref()))
395					.map(|(k, v)| (k.clone(), v.clone()))
396					.collect();
397				for (k, v) in inner.pending.range((range.start.as_ref(), range.end.as_ref())) {
398					merged.insert(k.clone(), v.clone());
399				}
400				let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().rev().collect();
401
402				let query = match range.start.as_ref() {
403					Included(start) | Excluded(start) => match Self::read_from(start) {
404						ReadFrom::StateQuery => inner.state_query.as_ref().unwrap(),
405						ReadFrom::Query => &inner.query,
406					},
407					Unbounded => &inner.query,
408				};
409
410				let storage_iter = query.range_rev(range, batch_size);
411				let v = inner.version;
412				Box::new(flow_merge_pending_iterator_rev(pending_vec, storage_iter, v))
413			}
414			Self::Ephemeral {
415				inner,
416				state,
417			} => {
418				let is_state_range = match range.start.as_ref() {
419					Included(start) | Excluded(start) => {
420						matches!(Self::read_from(start), ReadFrom::StateQuery)
421					}
422					Unbounded => false,
423				};
424
425				let merged: BTreeMap<EncodedKey, PendingWrite> = inner
426					.pending
427					.range((range.start.as_ref(), range.end.as_ref()))
428					.map(|(k, v)| (k.clone(), v.clone()))
429					.collect();
430				let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().rev().collect();
431
432				if is_state_range {
433					// State keys: build reverse iterator from state HashMap
434					let mut state_items: Vec<Result<MultiVersionRow>> = state
435						.iter()
436						.filter(|(k, _)| range.contains(k))
437						.map(|(k, v)| {
438							Ok(MultiVersionRow {
439								key: k.clone(),
440								row: v.clone(),
441								version: inner.version,
442							})
443						})
444						.collect();
445					let v = inner.version;
446					// Sort in reverse order for reverse merge iterator
447					state_items.sort_by(|a, b| match (a, b) {
448						(Ok(a), Ok(b)) => b.key.cmp(&a.key),
449						_ => Ordering::Equal,
450					});
451					Box::new(flow_merge_pending_iterator_rev(
452						pending_vec,
453						state_items.into_iter(),
454						v,
455					))
456				} else {
457					// Source data: delegate to query
458					let storage_iter = inner.query.range_rev(range, batch_size);
459					let v = inner.version;
460					Box::new(flow_merge_pending_iterator_rev(pending_vec, storage_iter, v))
461				}
462			}
463		}
464	}
465}
466
467/// Iterator that merges pending writes with storage data (forward order).
468struct FlowMergePendingIterator<I>
469where
470	I: Iterator<Item = Result<MultiVersionRow>>,
471{
472	storage_iter: Peekable<I>,
473	pending_iter: Peekable<IntoIter<(EncodedKey, PendingWrite)>>,
474	version: CommitVersion,
475}
476
477impl<I> Iterator for FlowMergePendingIterator<I>
478where
479	I: Iterator<Item = Result<MultiVersionRow>>,
480{
481	type Item = Result<MultiVersionRow>;
482
483	fn next(&mut self) -> Option<Self::Item> {
484		loop {
485			let next_storage = self.storage_iter.peek();
486
487			match (self.pending_iter.peek(), next_storage) {
488				(Some((pending_key, _)), Some(storage_result)) => {
489					let storage_val = match storage_result {
490						Ok(v) => v,
491						Err(_) => {
492							// Consume the error from the iterator and propagate it
493							let err = self.storage_iter.next().unwrap();
494							return Some(err);
495						}
496					};
497					let cmp = pending_key.cmp(&storage_val.key);
498
499					if matches!(cmp, Ordering::Less) {
500						// Pending key comes first
501						let (key, value) = self.pending_iter.next().unwrap();
502						if let PendingWrite::Set(row) = value {
503							return Some(Ok(MultiVersionRow {
504								key,
505								row,
506								version: self.version,
507							}));
508						}
509						// PendingWrite::Remove = skip (tombstone), continue loop
510					} else if matches!(cmp, Ordering::Equal) {
511						// Same key - pending shadows storage
512						let (key, value) = self.pending_iter.next().unwrap();
513						self.storage_iter.next(); // Consume storage entry
514						if let PendingWrite::Set(row) = value {
515							return Some(Ok(MultiVersionRow {
516								key,
517								row,
518								version: self.version,
519							}));
520						}
521						// PendingWrite::Remove = skip (tombstone), continue loop
522					} else {
523						// Storage key comes first
524						return Some(self.storage_iter.next().unwrap());
525					}
526				}
527				(Some(_), None) => {
528					// Only pending left
529					let (key, value) = self.pending_iter.next().unwrap();
530					if let PendingWrite::Set(row) = value {
531						return Some(Ok(MultiVersionRow {
532							key,
533							row,
534							version: self.version,
535						}));
536					}
537					// PendingWrite::Remove = skip (tombstone), continue loop
538				}
539				(None, Some(_)) => {
540					// Only storage left
541					return Some(self.storage_iter.next().unwrap());
542				}
543				(None, None) => return None,
544			}
545		}
546	}
547}
548
549/// Create an iterator that merges pending writes with storage data (forward order).
550fn flow_merge_pending_iterator<I>(
551	pending: Vec<(EncodedKey, PendingWrite)>,
552	storage_iter: I,
553	version: CommitVersion,
554) -> FlowMergePendingIterator<I>
555where
556	I: Iterator<Item = Result<MultiVersionRow>>,
557{
558	FlowMergePendingIterator {
559		storage_iter: storage_iter.peekable(),
560		pending_iter: pending.into_iter().peekable(),
561		version,
562	}
563}
564
565/// Iterator that merges pending writes with storage data (reverse order).
566struct FlowMergePendingIteratorRev<I>
567where
568	I: Iterator<Item = Result<MultiVersionRow>>,
569{
570	storage_iter: Peekable<I>,
571	pending_iter: Peekable<IntoIter<(EncodedKey, PendingWrite)>>,
572	version: CommitVersion,
573}
574
575impl<I> Iterator for FlowMergePendingIteratorRev<I>
576where
577	I: Iterator<Item = Result<MultiVersionRow>>,
578{
579	type Item = Result<MultiVersionRow>;
580
581	fn next(&mut self) -> Option<Self::Item> {
582		loop {
583			let next_storage = self.storage_iter.peek();
584
585			match (self.pending_iter.peek(), next_storage) {
586				(Some((pending_key, _)), Some(storage_result)) => {
587					let storage_val = match storage_result {
588						Ok(v) => v,
589						Err(_) => {
590							// Consume the error from the iterator and propagate it
591							let err = self.storage_iter.next().unwrap();
592							return Some(err);
593						}
594					};
595					let cmp = pending_key.cmp(&storage_val.key);
596
597					if matches!(cmp, Ordering::Greater) {
598						// Reverse: Pending key is larger (comes first in reverse)
599						let (key, value) = self.pending_iter.next().unwrap();
600						if let PendingWrite::Set(row) = value {
601							return Some(Ok(MultiVersionRow {
602								key,
603								row,
604								version: self.version,
605							}));
606						}
607						// PendingWrite::Remove = skip (tombstone), continue loop
608					} else if matches!(cmp, Ordering::Equal) {
609						// Same key - pending shadows storage
610						let (key, value) = self.pending_iter.next().unwrap();
611						self.storage_iter.next(); // Consume storage entry
612						if let PendingWrite::Set(row) = value {
613							return Some(Ok(MultiVersionRow {
614								key,
615								row,
616								version: self.version,
617							}));
618						}
619						// PendingWrite::Remove = skip (tombstone), continue loop
620					} else {
621						// Storage key comes first in reverse order
622						return Some(self.storage_iter.next().unwrap());
623					}
624				}
625				(Some(_), None) => {
626					// Only pending left
627					let (key, value) = self.pending_iter.next().unwrap();
628					if let PendingWrite::Set(row) = value {
629						return Some(Ok(MultiVersionRow {
630							key,
631							row,
632							version: self.version,
633						}));
634					}
635					// PendingWrite::Remove = skip (tombstone), continue loop
636				}
637				(None, Some(_)) => {
638					// Only storage left
639					return Some(self.storage_iter.next().unwrap());
640				}
641				(None, None) => return None,
642			}
643		}
644	}
645}
646
647/// Create an iterator that merges pending writes with storage data (reverse order).
648fn flow_merge_pending_iterator_rev<I>(
649	pending: Vec<(EncodedKey, PendingWrite)>,
650	storage_iter: I,
651	version: CommitVersion,
652) -> FlowMergePendingIteratorRev<I>
653where
654	I: Iterator<Item = Result<MultiVersionRow>>,
655{
656	FlowMergePendingIteratorRev {
657		storage_iter: storage_iter.peekable(),
658		pending_iter: pending.into_iter().peekable(),
659		version,
660	}
661}
662
663#[cfg(test)]
664pub mod tests {
665	use reifydb_catalog::catalog::Catalog;
666	use reifydb_core::encoded::{
667		key::{EncodedKey, EncodedKeyRange},
668		row::EncodedRow,
669	};
670	use reifydb_engine::test_harness::TestEngine;
671	use reifydb_runtime::context::clock::{Clock, MockClock};
672	use reifydb_transaction::interceptor::interceptors::Interceptors;
673	use reifydb_type::{util::cowvec::CowVec, value::identity::IdentityId};
674
675	use super::*;
676	use crate::operator::stateful::test_utils::test::create_test_transaction;
677
678	fn make_key(s: &str) -> EncodedKey {
679		EncodedKey::new(s.as_bytes().to_vec())
680	}
681
682	fn make_value(s: &str) -> EncodedRow {
683		EncodedRow(CowVec::new(s.as_bytes().to_vec()))
684	}
685
686	#[test]
687	fn test_get_from_pending() {
688		let parent = create_test_transaction();
689		let mut txn = FlowTransaction::deferred(
690			&parent,
691			CommitVersion(1),
692			Catalog::testing(),
693			Interceptors::new(),
694			Clock::Mock(MockClock::from_millis(1000)),
695		);
696
697		let key = make_key("key1");
698		let value = make_value("value1");
699
700		txn.set(&key, value.clone()).unwrap();
701
702		// Should get value from pending buffer
703		let result = txn.get(&key).unwrap();
704		assert_eq!(result, Some(value));
705	}
706
707	#[test]
708	fn test_get_from_committed() {
709		let t = TestEngine::new();
710
711		let key = make_key("key1");
712		let value = make_value("value1");
713
714		// Set value in first transaction and commit
715		{
716			let mut cmd_txn = t.begin_admin(IdentityId::system()).unwrap();
717			cmd_txn.set(&key, value.clone()).unwrap();
718			cmd_txn.commit().unwrap();
719		}
720
721		// Create new command transaction to read committed data
722		let parent = t.begin_admin(IdentityId::system()).unwrap();
723		let version = parent.version();
724
725		// Create FlowTransaction - should see committed value
726		let mut txn = FlowTransaction::deferred(
727			&parent,
728			version,
729			Catalog::testing(),
730			Interceptors::new(),
731			Clock::Mock(MockClock::from_millis(1000)),
732		);
733
734		// Should get value from query transaction
735		let result = txn.get(&key).unwrap();
736		assert_eq!(result, Some(value));
737	}
738
739	#[test]
740	fn test_get_pending_shadows_committed() {
741		let mut parent = create_test_transaction();
742
743		let key = make_key("key1");
744		parent.set(&key, make_value("old")).unwrap();
745		let version = parent.version();
746
747		let mut txn = FlowTransaction::deferred(
748			&parent,
749			version,
750			Catalog::testing(),
751			Interceptors::new(),
752			Clock::Mock(MockClock::from_millis(1000)),
753		);
754
755		// Override with new value in pending
756		let new_value = make_value("new");
757		txn.set(&key, new_value.clone()).unwrap();
758
759		// Should get new value from pending, not old value from committed
760		let result = txn.get(&key).unwrap();
761		assert_eq!(result, Some(new_value));
762	}
763
764	#[test]
765	fn test_get_removed_returns_none() {
766		let mut parent = create_test_transaction();
767
768		let key = make_key("key1");
769		parent.set(&key, make_value("value1")).unwrap();
770		let version = parent.version();
771
772		let mut txn = FlowTransaction::deferred(
773			&parent,
774			version,
775			Catalog::testing(),
776			Interceptors::new(),
777			Clock::Mock(MockClock::from_millis(1000)),
778		);
779
780		// Remove in pending
781		txn.remove(&key).unwrap();
782
783		// Should return None even though it exists in committed
784		let result = txn.get(&key).unwrap();
785		assert_eq!(result, None);
786	}
787
788	#[test]
789	fn test_get_nonexistent_key() {
790		let parent = create_test_transaction();
791		let mut txn = FlowTransaction::deferred(
792			&parent,
793			CommitVersion(1),
794			Catalog::testing(),
795			Interceptors::new(),
796			Clock::Mock(MockClock::from_millis(1000)),
797		);
798
799		let result = txn.get(&make_key("missing")).unwrap();
800		assert_eq!(result, None);
801	}
802
803	#[test]
804	fn test_contains_key_pending() {
805		let parent = create_test_transaction();
806		let mut txn = FlowTransaction::deferred(
807			&parent,
808			CommitVersion(1),
809			Catalog::testing(),
810			Interceptors::new(),
811			Clock::Mock(MockClock::from_millis(1000)),
812		);
813
814		let key = make_key("key1");
815		txn.set(&key, make_value("value1")).unwrap();
816
817		assert!(txn.contains_key(&key).unwrap());
818	}
819
820	#[test]
821	fn test_contains_key_committed() {
822		let t = TestEngine::new();
823
824		let key = make_key("key1");
825
826		// Set value in first transaction and commit
827		{
828			let mut cmd_txn = t.begin_admin(IdentityId::system()).unwrap();
829			cmd_txn.set(&key, make_value("value1")).unwrap();
830			cmd_txn.commit().unwrap();
831		}
832
833		// Create new command transaction
834		let parent = t.begin_admin(IdentityId::system()).unwrap();
835		let version = parent.version();
836		let mut txn = FlowTransaction::deferred(
837			&parent,
838			version,
839			Catalog::testing(),
840			Interceptors::new(),
841			Clock::Mock(MockClock::from_millis(1000)),
842		);
843
844		assert!(txn.contains_key(&key).unwrap());
845	}
846
847	#[test]
848	fn test_contains_key_removed_returns_false() {
849		let mut parent = create_test_transaction();
850
851		let key = make_key("key1");
852		parent.set(&key, make_value("value1")).unwrap();
853		let version = parent.version();
854
855		let mut txn = FlowTransaction::deferred(
856			&parent,
857			version,
858			Catalog::testing(),
859			Interceptors::new(),
860			Clock::Mock(MockClock::from_millis(1000)),
861		);
862		txn.remove(&key).unwrap();
863
864		assert!(!txn.contains_key(&key).unwrap());
865	}
866
867	#[test]
868	fn test_contains_key_nonexistent() {
869		let parent = create_test_transaction();
870		let mut txn = FlowTransaction::deferred(
871			&parent,
872			CommitVersion(1),
873			Catalog::testing(),
874			Interceptors::new(),
875			Clock::Mock(MockClock::from_millis(1000)),
876		);
877
878		assert!(!txn.contains_key(&make_key("missing")).unwrap());
879	}
880
881	#[test]
882	fn test_scan_empty() {
883		let parent = create_test_transaction();
884		let mut txn = FlowTransaction::deferred(
885			&parent,
886			CommitVersion(1),
887			Catalog::testing(),
888			Interceptors::new(),
889			Clock::Mock(MockClock::from_millis(1000)),
890		);
891
892		let mut iter = txn.range(EncodedKeyRange::all(), 1024);
893		assert!(iter.next().is_none());
894	}
895
896	#[test]
897	fn test_scan_only_pending() {
898		let parent = create_test_transaction();
899		let mut txn = FlowTransaction::deferred(
900			&parent,
901			CommitVersion(1),
902			Catalog::testing(),
903			Interceptors::new(),
904			Clock::Mock(MockClock::from_millis(1000)),
905		);
906
907		txn.set(&make_key("b"), make_value("2")).unwrap();
908		txn.set(&make_key("a"), make_value("1")).unwrap();
909		txn.set(&make_key("c"), make_value("3")).unwrap();
910
911		let items: Vec<_> = txn.range(EncodedKeyRange::all(), 1024).collect::<Result<Vec<_>>>().unwrap();
912
913		// Should be in sorted order
914		assert_eq!(items.len(), 3);
915		assert_eq!(items[0].key, make_key("a"));
916		assert_eq!(items[1].key, make_key("b"));
917		assert_eq!(items[2].key, make_key("c"));
918	}
919
920	#[test]
921	fn test_scan_filters_removes() {
922		let parent = create_test_transaction();
923		let mut txn = FlowTransaction::deferred(
924			&parent,
925			CommitVersion(1),
926			Catalog::testing(),
927			Interceptors::new(),
928			Clock::Mock(MockClock::from_millis(1000)),
929		);
930
931		txn.set(&make_key("a"), make_value("1")).unwrap();
932		txn.remove(&make_key("b")).unwrap();
933		txn.set(&make_key("c"), make_value("3")).unwrap();
934
935		let items: Vec<_> = txn.range(EncodedKeyRange::all(), 1024).collect::<Result<Vec<_>>>().unwrap();
936
937		// Should only have 2 items (remove filtered out)
938		assert_eq!(items.len(), 2);
939		assert_eq!(items[0].key, make_key("a"));
940		assert_eq!(items[1].key, make_key("c"));
941	}
942
943	#[test]
944	fn test_range_empty() {
945		let parent = create_test_transaction();
946		let mut txn = FlowTransaction::deferred(
947			&parent,
948			CommitVersion(1),
949			Catalog::testing(),
950			Interceptors::new(),
951			Clock::Mock(MockClock::from_millis(1000)),
952		);
953
954		let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
955		let mut iter = txn.range(range, 1024);
956		assert!(iter.next().is_none());
957	}
958
959	#[test]
960	fn test_range_only_pending() {
961		let parent = create_test_transaction();
962		let mut txn = FlowTransaction::deferred(
963			&parent,
964			CommitVersion(1),
965			Catalog::testing(),
966			Interceptors::new(),
967			Clock::Mock(MockClock::from_millis(1000)),
968		);
969
970		txn.set(&make_key("a"), make_value("1")).unwrap();
971		txn.set(&make_key("b"), make_value("2")).unwrap();
972		txn.set(&make_key("c"), make_value("3")).unwrap();
973		txn.set(&make_key("d"), make_value("4")).unwrap();
974
975		let range = EncodedKeyRange::new(Included(make_key("b")), Excluded(make_key("d")));
976		let items: Vec<_> = txn.range(range, 1024).collect::<Result<Vec<_>>>().unwrap();
977
978		// Should only include b and c (not d, exclusive end)
979		assert_eq!(items.len(), 2);
980		assert_eq!(items[0].key, make_key("b"));
981		assert_eq!(items[1].key, make_key("c"));
982	}
983
984	#[test]
985	fn test_prefix_empty() {
986		let parent = create_test_transaction();
987		let mut txn = FlowTransaction::deferred(
988			&parent,
989			CommitVersion(1),
990			Catalog::testing(),
991			Interceptors::new(),
992			Clock::Mock(MockClock::from_millis(1000)),
993		);
994
995		let prefix = make_key("test_");
996		let iter = txn.prefix(&prefix).unwrap();
997		assert!(iter.items.into_iter().next().is_none());
998	}
999
1000	#[test]
1001	fn test_prefix_only_pending() {
1002		let parent = create_test_transaction();
1003		let mut txn = FlowTransaction::deferred(
1004			&parent,
1005			CommitVersion(1),
1006			Catalog::testing(),
1007			Interceptors::new(),
1008			Clock::Mock(MockClock::from_millis(1000)),
1009		);
1010
1011		txn.set(&make_key("test_a"), make_value("1")).unwrap();
1012		txn.set(&make_key("test_b"), make_value("2")).unwrap();
1013		txn.set(&make_key("other_c"), make_value("3")).unwrap();
1014
1015		let prefix = make_key("test_");
1016		let iter = txn.prefix(&prefix).unwrap();
1017		let items: Vec<_> = iter.items.into_iter().collect();
1018
1019		// Should only include keys with prefix "test_"
1020		assert_eq!(items.len(), 2);
1021		assert_eq!(items[0].key, make_key("test_a"));
1022		assert_eq!(items[1].key, make_key("test_b"));
1023	}
1024}