Skip to main content

reifydb_sub_flow/transaction/
read.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	cmp::Ordering,
6	collections, iter,
7	ops::Bound::{Excluded, Included, Unbounded},
8	vec,
9};
10
11use collections::BTreeMap;
12use iter::Peekable;
13use reifydb_core::{
14	common::CommitVersion,
15	encoded::{
16		encoded::EncodedValues,
17		key::{EncodedKey, EncodedKeyRange},
18	},
19	interface::store::{MultiVersionBatch, MultiVersionValues},
20	key::{Key, kind::KeyKind},
21};
22use reifydb_type::Result;
23use vec::IntoIter;
24
25use super::{FlowTransaction, PendingWrite};
26
27impl FlowTransaction {
28	/// Get a value by key, checking pending writes first, then (if transactional) base_pending, then querying
29	/// multi-version store
30	pub fn get(&mut self, key: &EncodedKey) -> Result<Option<EncodedValues>> {
31		match self {
32			Self::Deferred {
33				pending,
34				primitive_query,
35				state_query,
36				..
37			} => {
38				// 1. Check flow-generated pending writes
39				if pending.is_removed(key) {
40					return Ok(None);
41				}
42				if let Some(value) = pending.get(key) {
43					return Ok(Some(value.clone()));
44				}
45
46				// 2. Fall through to committed storage
47				let query = if Self::is_flow_state_key(key) {
48					state_query
49				} else {
50					primitive_query
51				};
52				match query.get(key)? {
53					Some(multi) => Ok(Some(multi.values().clone())),
54					None => Ok(None),
55				}
56			}
57			Self::Transactional {
58				pending,
59				base_pending,
60				primitive_query,
61				state_query,
62				..
63			} => {
64				// 1. Check flow-generated pending writes
65				if pending.is_removed(key) {
66					return Ok(None);
67				}
68				if let Some(value) = pending.get(key) {
69					return Ok(Some(value.clone()));
70				}
71
72				// 2. Check transaction's base writes (uncommitted row data)
73				if base_pending.is_removed(key) {
74					return Ok(None);
75				}
76				if let Some(value) = base_pending.get(key) {
77					return Ok(Some(value.clone()));
78				}
79
80				// 3. Fall through to committed storage
81				let query = if Self::is_flow_state_key(key) {
82					state_query
83				} else {
84					primitive_query
85				};
86				match query.get(key)? {
87					Some(multi) => Ok(Some(multi.values().clone())),
88					None => Ok(None),
89				}
90			}
91		}
92	}
93
94	/// Check if a key exists
95	pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
96		match self {
97			Self::Deferred {
98				pending,
99				primitive_query,
100				state_query,
101				..
102			} => {
103				if pending.is_removed(key) {
104					return Ok(false);
105				}
106				if pending.get(key).is_some() {
107					return Ok(true);
108				}
109
110				let query = if Self::is_flow_state_key(key) {
111					state_query
112				} else {
113					primitive_query
114				};
115				query.contains_key(key)
116			}
117			Self::Transactional {
118				pending,
119				base_pending,
120				primitive_query,
121				state_query,
122				..
123			} => {
124				if pending.is_removed(key) {
125					return Ok(false);
126				}
127				if pending.get(key).is_some() {
128					return Ok(true);
129				}
130
131				if base_pending.is_removed(key) {
132					return Ok(false);
133				}
134				if base_pending.get(key).is_some() {
135					return Ok(true);
136				}
137
138				let query = if Self::is_flow_state_key(key) {
139					state_query
140				} else {
141					primitive_query
142				};
143				query.contains_key(key)
144			}
145		}
146	}
147
148	/// Prefix scan
149	pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
150		let range = EncodedKeyRange::prefix(prefix);
151		let items = self.range(range, 1024).collect::<Result<Vec<_>>>()?;
152		Ok(MultiVersionBatch {
153			items,
154			has_more: false,
155		})
156	}
157
158	fn is_flow_state_key(key: &EncodedKey) -> bool {
159		match Key::kind(&key) {
160			None => false,
161			Some(kind) => match kind {
162				KeyKind::FlowNodeState => true,
163				KeyKind::FlowNodeInternalState => true,
164				_ => false,
165			},
166		}
167	}
168
169	/// Create an iterator for forward range queries.
170	///
171	/// This properly handles high version density by scanning until batch_size
172	/// unique logical keys are collected. The iterator yields individual entries
173	/// and maintains cursor state internally. Pending writes are merged with
174	/// committed storage data.
175	pub fn range(
176		&mut self,
177		range: EncodedKeyRange,
178		batch_size: usize,
179	) -> Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_> {
180		match self {
181			Self::Deferred {
182				pending,
183				version,
184				primitive_query,
185				state_query,
186				..
187			} => {
188				let merged: BTreeMap<EncodedKey, PendingWrite> = pending
189					.range((range.start.as_ref(), range.end.as_ref()))
190					.map(|(k, v)| (k.clone(), v.clone()))
191					.collect();
192				let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().collect();
193
194				let query = match range.start.as_ref() {
195					Included(start) | Excluded(start) => {
196						if Self::is_flow_state_key(start) {
197							&*state_query
198						} else {
199							&*primitive_query
200						}
201					}
202					Unbounded => &*primitive_query,
203				};
204
205				let storage_iter = query.range(range, batch_size);
206				let v = *version;
207				Box::new(flow_merge_pending_iterator(pending_vec, storage_iter, v))
208			}
209			Self::Transactional {
210				pending,
211				base_pending,
212				version,
213				primitive_query,
214				state_query,
215				..
216			} => {
217				// Collect base layer entries in range, then let flow pending shadow base for same keys
218				let mut merged: BTreeMap<EncodedKey, PendingWrite> = base_pending
219					.range((range.start.as_ref(), range.end.as_ref()))
220					.map(|(k, v)| (k.clone(), v.clone()))
221					.collect();
222				for (k, v) in pending.range((range.start.as_ref(), range.end.as_ref())) {
223					merged.insert(k.clone(), v.clone());
224				}
225				let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().collect();
226
227				let query = match range.start.as_ref() {
228					Included(start) | Excluded(start) => {
229						if Self::is_flow_state_key(start) {
230							&*state_query
231						} else {
232							&*primitive_query
233						}
234					}
235					Unbounded => &*primitive_query,
236				};
237
238				let storage_iter = query.range(range, batch_size);
239				let v = *version;
240				Box::new(flow_merge_pending_iterator(pending_vec, storage_iter, v))
241			}
242		}
243	}
244
245	/// Create an iterator for reverse range queries.
246	///
247	/// This properly handles high version density by scanning until batch_size
248	/// unique logical keys are collected. The iterator yields individual entries
249	/// in reverse key order and maintains cursor state internally.
250	pub fn range_rev(
251		&mut self,
252		range: EncodedKeyRange,
253		batch_size: usize,
254	) -> Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_> {
255		match self {
256			Self::Deferred {
257				pending,
258				version,
259				primitive_query,
260				state_query,
261				..
262			} => {
263				let merged: BTreeMap<EncodedKey, PendingWrite> = pending
264					.range((range.start.as_ref(), range.end.as_ref()))
265					.map(|(k, v)| (k.clone(), v.clone()))
266					.collect();
267				let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().rev().collect();
268
269				let query = match range.start.as_ref() {
270					Included(start) | Excluded(start) => {
271						if Self::is_flow_state_key(start) {
272							&*state_query
273						} else {
274							&*primitive_query
275						}
276					}
277					Unbounded => &*primitive_query,
278				};
279
280				let storage_iter = query.range_rev(range, batch_size);
281				let v = *version;
282				Box::new(flow_merge_pending_iterator_rev(pending_vec, storage_iter, v))
283			}
284			Self::Transactional {
285				pending,
286				base_pending,
287				version,
288				primitive_query,
289				state_query,
290				..
291			} => {
292				let mut merged: BTreeMap<EncodedKey, PendingWrite> = base_pending
293					.range((range.start.as_ref(), range.end.as_ref()))
294					.map(|(k, v)| (k.clone(), v.clone()))
295					.collect();
296				for (k, v) in pending.range((range.start.as_ref(), range.end.as_ref())) {
297					merged.insert(k.clone(), v.clone());
298				}
299				let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().rev().collect();
300
301				let query = match range.start.as_ref() {
302					Included(start) | Excluded(start) => {
303						if Self::is_flow_state_key(start) {
304							&*state_query
305						} else {
306							&*primitive_query
307						}
308					}
309					Unbounded => &*primitive_query,
310				};
311
312				let storage_iter = query.range_rev(range, batch_size);
313				let v = *version;
314				Box::new(flow_merge_pending_iterator_rev(pending_vec, storage_iter, v))
315			}
316		}
317	}
318}
319
320/// Iterator that merges pending writes with storage data (forward order).
321struct FlowMergePendingIterator<I>
322where
323	I: Iterator<Item = Result<MultiVersionValues>>,
324{
325	storage_iter: Peekable<I>,
326	pending_iter: Peekable<IntoIter<(EncodedKey, PendingWrite)>>,
327	version: CommitVersion,
328}
329
330impl<I> Iterator for FlowMergePendingIterator<I>
331where
332	I: Iterator<Item = Result<MultiVersionValues>>,
333{
334	type Item = Result<MultiVersionValues>;
335
336	fn next(&mut self) -> Option<Self::Item> {
337		loop {
338			let next_storage = self.storage_iter.peek();
339
340			match (self.pending_iter.peek(), next_storage) {
341				(Some((pending_key, _)), Some(storage_result)) => {
342					let storage_val = match storage_result {
343						Ok(v) => v,
344						Err(_) => {
345							// Consume the error from the iterator and propagate it
346							let err = self.storage_iter.next().unwrap();
347							return Some(err.map_err(|e| e.into()));
348						}
349					};
350					let cmp = pending_key.cmp(&storage_val.key);
351
352					if matches!(cmp, Ordering::Less) {
353						// Pending key comes first
354						let (key, value) = self.pending_iter.next().unwrap();
355						if let PendingWrite::Set(values) = value {
356							return Some(Ok(MultiVersionValues {
357								key,
358								values,
359								version: self.version,
360							}));
361						}
362						// PendingWrite::Remove = skip (tombstone), continue loop
363					} else if matches!(cmp, Ordering::Equal) {
364						// Same key - pending shadows storage
365						let (key, value) = self.pending_iter.next().unwrap();
366						self.storage_iter.next(); // Consume storage entry
367						if let PendingWrite::Set(values) = value {
368							return Some(Ok(MultiVersionValues {
369								key,
370								values,
371								version: self.version,
372							}));
373						}
374						// PendingWrite::Remove = skip (tombstone), continue loop
375					} else {
376						// Storage key comes first
377						return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
378					}
379				}
380				(Some(_), None) => {
381					// Only pending left
382					let (key, value) = self.pending_iter.next().unwrap();
383					if let PendingWrite::Set(values) = value {
384						return Some(Ok(MultiVersionValues {
385							key,
386							values,
387							version: self.version,
388						}));
389					}
390					// PendingWrite::Remove = skip (tombstone), continue loop
391				}
392				(None, Some(_)) => {
393					// Only storage left
394					return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
395				}
396				(None, None) => return None,
397			}
398		}
399	}
400}
401
402/// Create an iterator that merges pending writes with storage data (forward order).
403fn flow_merge_pending_iterator<I>(
404	pending: Vec<(EncodedKey, PendingWrite)>,
405	storage_iter: I,
406	version: CommitVersion,
407) -> FlowMergePendingIterator<I>
408where
409	I: Iterator<Item = Result<MultiVersionValues>>,
410{
411	FlowMergePendingIterator {
412		storage_iter: storage_iter.peekable(),
413		pending_iter: pending.into_iter().peekable(),
414		version,
415	}
416}
417
418/// Iterator that merges pending writes with storage data (reverse order).
419struct FlowMergePendingIteratorRev<I>
420where
421	I: Iterator<Item = Result<MultiVersionValues>>,
422{
423	storage_iter: Peekable<I>,
424	pending_iter: Peekable<IntoIter<(EncodedKey, PendingWrite)>>,
425	version: CommitVersion,
426}
427
428impl<I> Iterator for FlowMergePendingIteratorRev<I>
429where
430	I: Iterator<Item = Result<MultiVersionValues>>,
431{
432	type Item = Result<MultiVersionValues>;
433
434	fn next(&mut self) -> Option<Self::Item> {
435		loop {
436			let next_storage = self.storage_iter.peek();
437
438			match (self.pending_iter.peek(), next_storage) {
439				(Some((pending_key, _)), Some(storage_result)) => {
440					let storage_val = match storage_result {
441						Ok(v) => v,
442						Err(_) => {
443							// Consume the error from the iterator and propagate it
444							let err = self.storage_iter.next().unwrap();
445							return Some(err.map_err(|e| e.into()));
446						}
447					};
448					let cmp = pending_key.cmp(&storage_val.key);
449
450					if matches!(cmp, Ordering::Greater) {
451						// Reverse: Pending key is larger (comes first in reverse)
452						let (key, value) = self.pending_iter.next().unwrap();
453						if let PendingWrite::Set(values) = value {
454							return Some(Ok(MultiVersionValues {
455								key,
456								values,
457								version: self.version,
458							}));
459						}
460						// PendingWrite::Remove = skip (tombstone), continue loop
461					} else if matches!(cmp, Ordering::Equal) {
462						// Same key - pending shadows storage
463						let (key, value) = self.pending_iter.next().unwrap();
464						self.storage_iter.next(); // Consume storage entry
465						if let PendingWrite::Set(values) = value {
466							return Some(Ok(MultiVersionValues {
467								key,
468								values,
469								version: self.version,
470							}));
471						}
472						// PendingWrite::Remove = skip (tombstone), continue loop
473					} else {
474						// Storage key comes first in reverse order
475						return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
476					}
477				}
478				(Some(_), None) => {
479					// Only pending left
480					let (key, value) = self.pending_iter.next().unwrap();
481					if let PendingWrite::Set(values) = value {
482						return Some(Ok(MultiVersionValues {
483							key,
484							values,
485							version: self.version,
486						}));
487					}
488					// PendingWrite::Remove = skip (tombstone), continue loop
489				}
490				(None, Some(_)) => {
491					// Only storage left
492					return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
493				}
494				(None, None) => return None,
495			}
496		}
497	}
498}
499
500/// Create an iterator that merges pending writes with storage data (reverse order).
501fn flow_merge_pending_iterator_rev<I>(
502	pending: Vec<(EncodedKey, PendingWrite)>,
503	storage_iter: I,
504	version: CommitVersion,
505) -> FlowMergePendingIteratorRev<I>
506where
507	I: Iterator<Item = Result<MultiVersionValues>>,
508{
509	FlowMergePendingIteratorRev {
510		storage_iter: storage_iter.peekable(),
511		pending_iter: pending.into_iter().peekable(),
512		version,
513	}
514}
515
516#[cfg(test)]
517pub mod tests {
518	use reifydb_catalog::catalog::Catalog;
519	use reifydb_core::encoded::{
520		encoded::EncodedValues,
521		key::{EncodedKey, EncodedKeyRange},
522	};
523	use reifydb_engine::test_utils::create_test_engine;
524	use reifydb_transaction::interceptor::interceptors::Interceptors;
525	use reifydb_type::util::cowvec::CowVec;
526
527	use super::*;
528	use crate::operator::stateful::test_utils::test::create_test_transaction;
529
530	fn make_key(s: &str) -> EncodedKey {
531		EncodedKey::new(s.as_bytes().to_vec())
532	}
533
534	fn make_value(s: &str) -> EncodedValues {
535		EncodedValues(CowVec::new(s.as_bytes().to_vec()))
536	}
537
538	#[test]
539	fn test_get_from_pending() {
540		let parent = create_test_transaction();
541		let mut txn =
542			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
543
544		let key = make_key("key1");
545		let value = make_value("value1");
546
547		txn.set(&key, value.clone()).unwrap();
548
549		// Should get value from pending buffer
550		let result = txn.get(&key).unwrap();
551		assert_eq!(result, Some(value));
552	}
553
554	#[test]
555	fn test_get_from_committed() {
556		let engine = create_test_engine();
557
558		let key = make_key("key1");
559		let value = make_value("value1");
560
561		// Set value in first transaction and commit
562		{
563			let mut cmd_txn = engine.begin_admin().unwrap();
564			cmd_txn.set(&key, value.clone()).unwrap();
565			cmd_txn.commit().unwrap();
566		}
567
568		// Create new command transaction to read committed data
569		let parent = engine.begin_admin().unwrap();
570		let version = parent.version();
571
572		// Create FlowTransaction - should see committed value
573		let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
574
575		// Should get value from query transaction
576		let result = txn.get(&key).unwrap();
577		assert_eq!(result, Some(value));
578	}
579
580	#[test]
581	fn test_get_pending_shadows_committed() {
582		let mut parent = create_test_transaction();
583
584		let key = make_key("key1");
585		parent.set(&key, make_value("old")).unwrap();
586		let version = parent.version();
587
588		let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
589
590		// Override with new value in pending
591		let new_value = make_value("new");
592		txn.set(&key, new_value.clone()).unwrap();
593
594		// Should get new value from pending, not old value from committed
595		let result = txn.get(&key).unwrap();
596		assert_eq!(result, Some(new_value));
597	}
598
599	#[test]
600	fn test_get_removed_returns_none() {
601		let mut parent = create_test_transaction();
602
603		let key = make_key("key1");
604		parent.set(&key, make_value("value1")).unwrap();
605		let version = parent.version();
606
607		let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
608
609		// Remove in pending
610		txn.remove(&key).unwrap();
611
612		// Should return None even though it exists in committed
613		let result = txn.get(&key).unwrap();
614		assert_eq!(result, None);
615	}
616
617	#[test]
618	fn test_get_nonexistent_key() {
619		let parent = create_test_transaction();
620		let mut txn =
621			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
622
623		let result = txn.get(&make_key("missing")).unwrap();
624		assert_eq!(result, None);
625	}
626
627	#[test]
628	fn test_contains_key_pending() {
629		let parent = create_test_transaction();
630		let mut txn =
631			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
632
633		let key = make_key("key1");
634		txn.set(&key, make_value("value1")).unwrap();
635
636		assert!(txn.contains_key(&key).unwrap());
637	}
638
639	#[test]
640	fn test_contains_key_committed() {
641		let engine = create_test_engine();
642
643		let key = make_key("key1");
644
645		// Set value in first transaction and commit
646		{
647			let mut cmd_txn = engine.begin_admin().unwrap();
648			cmd_txn.set(&key, make_value("value1")).unwrap();
649			cmd_txn.commit().unwrap();
650		}
651
652		// Create new command transaction
653		let parent = engine.begin_admin().unwrap();
654		let version = parent.version();
655		let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
656
657		assert!(txn.contains_key(&key).unwrap());
658	}
659
660	#[test]
661	fn test_contains_key_removed_returns_false() {
662		let mut parent = create_test_transaction();
663
664		let key = make_key("key1");
665		parent.set(&key, make_value("value1")).unwrap();
666		let version = parent.version();
667
668		let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
669		txn.remove(&key).unwrap();
670
671		assert!(!txn.contains_key(&key).unwrap());
672	}
673
674	#[test]
675	fn test_contains_key_nonexistent() {
676		let parent = create_test_transaction();
677		let mut txn =
678			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
679
680		assert!(!txn.contains_key(&make_key("missing")).unwrap());
681	}
682
683	#[test]
684	fn test_scan_empty() {
685		let parent = create_test_transaction();
686		let mut txn =
687			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
688
689		let mut iter = txn.range(EncodedKeyRange::all(), 1024);
690		assert!(iter.next().is_none());
691	}
692
693	#[test]
694	fn test_scan_only_pending() {
695		let parent = create_test_transaction();
696		let mut txn =
697			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
698
699		txn.set(&make_key("b"), make_value("2")).unwrap();
700		txn.set(&make_key("a"), make_value("1")).unwrap();
701		txn.set(&make_key("c"), make_value("3")).unwrap();
702
703		let items: Vec<_> = txn.range(EncodedKeyRange::all(), 1024).collect::<Result<Vec<_>>>().unwrap();
704
705		// Should be in sorted order
706		assert_eq!(items.len(), 3);
707		assert_eq!(items[0].key, make_key("a"));
708		assert_eq!(items[1].key, make_key("b"));
709		assert_eq!(items[2].key, make_key("c"));
710	}
711
712	#[test]
713	fn test_scan_filters_removes() {
714		let parent = create_test_transaction();
715		let mut txn =
716			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
717
718		txn.set(&make_key("a"), make_value("1")).unwrap();
719		txn.remove(&make_key("b")).unwrap();
720		txn.set(&make_key("c"), make_value("3")).unwrap();
721
722		let items: Vec<_> = txn.range(EncodedKeyRange::all(), 1024).collect::<Result<Vec<_>>>().unwrap();
723
724		// Should only have 2 items (remove filtered out)
725		assert_eq!(items.len(), 2);
726		assert_eq!(items[0].key, make_key("a"));
727		assert_eq!(items[1].key, make_key("c"));
728	}
729
730	#[test]
731	fn test_range_empty() {
732		let parent = create_test_transaction();
733		let mut txn =
734			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
735
736		let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
737		let mut iter = txn.range(range, 1024);
738		assert!(iter.next().is_none());
739	}
740
741	#[test]
742	fn test_range_only_pending() {
743		let parent = create_test_transaction();
744		let mut txn =
745			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
746
747		txn.set(&make_key("a"), make_value("1")).unwrap();
748		txn.set(&make_key("b"), make_value("2")).unwrap();
749		txn.set(&make_key("c"), make_value("3")).unwrap();
750		txn.set(&make_key("d"), make_value("4")).unwrap();
751
752		let range = EncodedKeyRange::new(Included(make_key("b")), Excluded(make_key("d")));
753		let items: Vec<_> = txn.range(range, 1024).collect::<Result<Vec<_>>>().unwrap();
754
755		// Should only include b and c (not d, exclusive end)
756		assert_eq!(items.len(), 2);
757		assert_eq!(items[0].key, make_key("b"));
758		assert_eq!(items[1].key, make_key("c"));
759	}
760
761	#[test]
762	fn test_prefix_empty() {
763		let parent = create_test_transaction();
764		let mut txn =
765			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
766
767		let prefix = make_key("test_");
768		let iter = txn.prefix(&prefix).unwrap();
769		assert!(iter.items.into_iter().next().is_none());
770	}
771
772	#[test]
773	fn test_prefix_only_pending() {
774		let parent = create_test_transaction();
775		let mut txn =
776			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
777
778		txn.set(&make_key("test_a"), make_value("1")).unwrap();
779		txn.set(&make_key("test_b"), make_value("2")).unwrap();
780		txn.set(&make_key("other_c"), make_value("3")).unwrap();
781
782		let prefix = make_key("test_");
783		let iter = txn.prefix(&prefix).unwrap();
784		let items: Vec<_> = iter.items.into_iter().collect();
785
786		// Should only include keys with prefix "test_"
787		assert_eq!(items.len(), 2);
788		assert_eq!(items[0].key, make_key("test_a"));
789		assert_eq!(items[1].key, make_key("test_b"));
790	}
791}