reifydb_sub_flow/transaction/
read.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	EncodedKey, EncodedKeyRange,
6	interface::{BoxedMultiVersionIter, MultiVersionQueryTransaction},
7	value::encoded::EncodedValues,
8};
9
10use super::{FlowTransaction, iter_range::FlowRangeIter};
11
12impl FlowTransaction {
13	/// Get a value by key, checking pending writes first, then querying multi-version store
14	pub fn get(&mut self, key: &EncodedKey) -> crate::Result<Option<EncodedValues>> {
15		self.metrics.increment_reads();
16
17		// Check if key is marked for removal
18		if self.pending.is_removed(key) {
19			return Ok(None);
20		}
21
22		// Check pending writes
23		if let Some(value) = self.pending.get(key) {
24			return Ok(Some(value.clone()));
25		}
26
27		match self.query.get(key)? {
28			Some(multi) => Ok(Some(multi.values)),
29			None => Ok(None),
30		}
31	}
32
33	/// Check if a key exists
34	pub fn contains_key(&mut self, key: &EncodedKey) -> crate::Result<bool> {
35		self.metrics.increment_reads();
36
37		// Check if key is marked for removal
38		if self.pending.is_removed(key) {
39			return Ok(false);
40		}
41
42		// Check pending writes
43		if self.pending.get(key).is_some() {
44			return Ok(true);
45		}
46
47		self.query.contains_key(key)
48	}
49
50	/// Scan all keys in the transaction
51	pub fn scan(&mut self) -> crate::Result<BoxedMultiVersionIter> {
52		self.range(EncodedKeyRange::all())
53	}
54
55	/// Range query
56	pub fn range(&mut self, range: EncodedKeyRange) -> crate::Result<BoxedMultiVersionIter> {
57		self.metrics.increment_reads();
58
59		// Merge pending writes with committed results for the range
60		let pending = self.pending.range((range.start.as_ref(), range.end.as_ref()));
61		let committed = self.query.range(range)?;
62
63		Ok(Box::new(FlowRangeIter::new(pending, committed, self.version)))
64	}
65
66	/// Range query with batching
67	pub fn range_batched(
68		&mut self,
69		range: EncodedKeyRange,
70		batch_size: u64,
71	) -> crate::Result<BoxedMultiVersionIter> {
72		self.metrics.increment_reads();
73
74		// Merge pending writes with committed results for the range
75		let pending = self.pending.range((range.start.as_ref(), range.end.as_ref()));
76		let committed = self.query.range_batched(range, batch_size)?;
77
78		Ok(Box::new(FlowRangeIter::new(pending, committed, self.version)))
79	}
80
81	/// Prefix scan
82	pub fn prefix(&mut self, prefix: &EncodedKey) -> crate::Result<BoxedMultiVersionIter> {
83		self.metrics.increment_reads();
84
85		// Merge pending writes with committed results for the prefix
86		let range = EncodedKeyRange::prefix(prefix);
87		let pending = self.pending.range((range.start.as_ref(), range.end.as_ref()));
88		let committed = self.query.prefix(prefix)?;
89
90		Ok(Box::new(FlowRangeIter::new(pending, committed, self.version)))
91	}
92}
93
94#[cfg(test)]
95mod tests {
96	use std::collections::Bound;
97
98	use reifydb_core::{
99		CommitVersion, CowVec, EncodedKey, EncodedKeyRange,
100		interface::{Engine, MultiVersionCommandTransaction, MultiVersionQueryTransaction},
101		value::encoded::EncodedValues,
102	};
103
104	use super::*;
105	use crate::operator::stateful::test_utils::test::create_test_transaction;
106
107	fn make_key(s: &str) -> EncodedKey {
108		EncodedKey::new(s.as_bytes().to_vec())
109	}
110
111	fn make_value(s: &str) -> EncodedValues {
112		EncodedValues(CowVec::new(s.as_bytes().to_vec()))
113	}
114
115	#[test]
116	fn test_get_from_pending() {
117		let parent = create_test_transaction();
118		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
119
120		let key = make_key("key1");
121		let value = make_value("value1");
122
123		txn.set(&key, value.clone()).unwrap();
124
125		// Should get value from pending buffer
126		let result = txn.get(&key).unwrap();
127		assert_eq!(result, Some(value));
128	}
129
130	#[test]
131	fn test_get_from_committed() {
132		use crate::operator::stateful::test_utils::test::create_test_engine;
133		let engine = create_test_engine();
134
135		let key = make_key("key1");
136		let value = make_value("value1");
137
138		// Set value in first transaction and commit
139		{
140			let mut cmd_txn = engine.begin_command().unwrap();
141			cmd_txn.set(&key, value.clone()).unwrap();
142			cmd_txn.commit().unwrap();
143		}
144
145		// Create new command transaction to read committed data
146		let parent = engine.begin_command().unwrap();
147		let version = parent.version();
148
149		// Create FlowTransaction - should see committed value
150		let mut txn = FlowTransaction::new(&parent, version);
151
152		// Should get value from query transaction
153		let result = txn.get(&key).unwrap();
154		assert_eq!(result, Some(value));
155	}
156
157	#[test]
158	fn test_get_pending_shadows_committed() {
159		let mut parent = create_test_transaction();
160
161		let key = make_key("key1");
162		parent.set(&key, make_value("old")).unwrap();
163		let version = parent.version();
164
165		let mut txn = FlowTransaction::new(&parent, version);
166
167		// Override with new value in pending
168		let new_value = make_value("new");
169		txn.set(&key, new_value.clone()).unwrap();
170
171		// Should get new value from pending, not old value from committed
172		let result = txn.get(&key).unwrap();
173		assert_eq!(result, Some(new_value));
174	}
175
176	#[test]
177	fn test_get_removed_returns_none() {
178		let mut parent = create_test_transaction();
179
180		let key = make_key("key1");
181		parent.set(&key, make_value("value1")).unwrap();
182		let version = parent.version();
183
184		let mut txn = FlowTransaction::new(&parent, version);
185
186		// Remove in pending
187		txn.remove(&key).unwrap();
188
189		// Should return None even though it exists in committed
190		let result = txn.get(&key).unwrap();
191		assert_eq!(result, None);
192	}
193
194	#[test]
195	fn test_get_nonexistent_key() {
196		let parent = create_test_transaction();
197		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
198
199		let result = txn.get(&make_key("missing")).unwrap();
200		assert_eq!(result, None);
201	}
202
203	#[test]
204	fn test_get_increments_reads_metric() {
205		let parent = create_test_transaction();
206		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
207
208		assert_eq!(txn.metrics().reads, 0);
209
210		txn.get(&make_key("key1")).unwrap();
211		assert_eq!(txn.metrics().reads, 1);
212
213		txn.get(&make_key("key2")).unwrap();
214		assert_eq!(txn.metrics().reads, 2);
215	}
216
217	#[test]
218	fn test_contains_key_pending() {
219		let parent = create_test_transaction();
220		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
221
222		let key = make_key("key1");
223		txn.set(&key, make_value("value1")).unwrap();
224
225		assert!(txn.contains_key(&key).unwrap());
226	}
227
228	#[test]
229	fn test_contains_key_committed() {
230		use crate::operator::stateful::test_utils::test::create_test_engine;
231		let engine = create_test_engine();
232
233		let key = make_key("key1");
234
235		// Set value in first transaction and commit
236		{
237			let mut cmd_txn = engine.begin_command().unwrap();
238			cmd_txn.set(&key, make_value("value1")).unwrap();
239			cmd_txn.commit().unwrap();
240		}
241
242		// Create new command transaction
243		let parent = engine.begin_command().unwrap();
244		let version = parent.version();
245		let mut txn = FlowTransaction::new(&parent, version);
246
247		assert!(txn.contains_key(&key).unwrap());
248	}
249
250	#[test]
251	fn test_contains_key_removed_returns_false() {
252		let mut parent = create_test_transaction();
253
254		let key = make_key("key1");
255		parent.set(&key, make_value("value1")).unwrap();
256		let version = parent.version();
257
258		let mut txn = FlowTransaction::new(&parent, version);
259		txn.remove(&key).unwrap();
260
261		assert!(!txn.contains_key(&key).unwrap());
262	}
263
264	#[test]
265	fn test_contains_key_nonexistent() {
266		let parent = create_test_transaction();
267		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
268
269		assert!(!txn.contains_key(&make_key("missing")).unwrap());
270	}
271
272	#[test]
273	fn test_contains_key_increments_reads_metric() {
274		let parent = create_test_transaction();
275		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
276
277		assert_eq!(txn.metrics().reads, 0);
278
279		txn.contains_key(&make_key("key1")).unwrap();
280		assert_eq!(txn.metrics().reads, 1);
281
282		txn.contains_key(&make_key("key2")).unwrap();
283		assert_eq!(txn.metrics().reads, 2);
284	}
285
286	#[test]
287	fn test_scan_empty() {
288		let parent = create_test_transaction();
289		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
290
291		let mut iter = txn.range(EncodedKeyRange::all()).unwrap();
292		assert!(iter.next().is_none());
293	}
294
295	#[test]
296	fn test_scan_only_pending() {
297		let parent = create_test_transaction();
298		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
299
300		txn.set(&make_key("b"), make_value("2")).unwrap();
301		txn.set(&make_key("a"), make_value("1")).unwrap();
302		txn.set(&make_key("c"), make_value("3")).unwrap();
303
304		let mut iter = txn.range(EncodedKeyRange::all()).unwrap();
305		let items: Vec<_> = iter.by_ref().collect();
306
307		// Should be in sorted order
308		assert_eq!(items.len(), 3);
309		assert_eq!(items[0].key, make_key("a"));
310		assert_eq!(items[1].key, make_key("b"));
311		assert_eq!(items[2].key, make_key("c"));
312	}
313
314	#[test]
315	fn test_scan_filters_removes() {
316		let parent = create_test_transaction();
317		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
318
319		txn.set(&make_key("a"), make_value("1")).unwrap();
320		txn.remove(&make_key("b")).unwrap();
321		txn.set(&make_key("c"), make_value("3")).unwrap();
322
323		let mut iter = txn.range(EncodedKeyRange::all()).unwrap();
324		let items: Vec<_> = iter.by_ref().collect();
325
326		// Should only have 2 items (remove filtered out)
327		assert_eq!(items.len(), 2);
328		assert_eq!(items[0].key, make_key("a"));
329		assert_eq!(items[1].key, make_key("c"));
330	}
331
332	#[test]
333	fn test_scan_increments_reads_metric() {
334		let parent = create_test_transaction();
335		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
336
337		assert_eq!(txn.metrics().reads, 0);
338		let _ = txn.range(EncodedKeyRange::all()).unwrap();
339		assert_eq!(txn.metrics().reads, 1);
340	}
341
342	#[test]
343	fn test_range_empty() {
344		let parent = create_test_transaction();
345		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
346
347		let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
348		let mut iter = txn.range(range).unwrap();
349		assert!(iter.next().is_none());
350	}
351
352	#[test]
353	fn test_range_only_pending() {
354		let parent = create_test_transaction();
355		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
356
357		txn.set(&make_key("a"), make_value("1")).unwrap();
358		txn.set(&make_key("b"), make_value("2")).unwrap();
359		txn.set(&make_key("c"), make_value("3")).unwrap();
360		txn.set(&make_key("d"), make_value("4")).unwrap();
361
362		let range = EncodedKeyRange::new(Bound::Included(make_key("b")), Bound::Excluded(make_key("d")));
363		let mut iter = txn.range(range).unwrap();
364		let items: Vec<_> = iter.by_ref().collect();
365
366		// Should only include b and c (not d, exclusive end)
367		assert_eq!(items.len(), 2);
368		assert_eq!(items[0].key, make_key("b"));
369		assert_eq!(items[1].key, make_key("c"));
370	}
371
372	#[test]
373	fn test_range_increments_reads_metric() {
374		let parent = create_test_transaction();
375		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
376
377		assert_eq!(txn.metrics().reads, 0);
378
379		let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
380		let _ = txn.range(range).unwrap();
381
382		assert_eq!(txn.metrics().reads, 1);
383	}
384
385	#[test]
386	fn test_range_batched_increments_reads_metric() {
387		let parent = create_test_transaction();
388		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
389
390		assert_eq!(txn.metrics().reads, 0);
391
392		let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
393		let _ = txn.range_batched(range, 10).unwrap();
394
395		assert_eq!(txn.metrics().reads, 1);
396	}
397
398	#[test]
399	fn test_prefix_empty() {
400		let parent = create_test_transaction();
401		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
402
403		let prefix = make_key("test_");
404		let mut iter = txn.prefix(&prefix).unwrap();
405		assert!(iter.next().is_none());
406	}
407
408	#[test]
409	fn test_prefix_only_pending() {
410		let parent = create_test_transaction();
411		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
412
413		txn.set(&make_key("test_a"), make_value("1")).unwrap();
414		txn.set(&make_key("test_b"), make_value("2")).unwrap();
415		txn.set(&make_key("other_c"), make_value("3")).unwrap();
416
417		let prefix = make_key("test_");
418		let mut iter = txn.prefix(&prefix).unwrap();
419		let items: Vec<_> = iter.by_ref().collect();
420
421		// Should only include keys with prefix "test_"
422		assert_eq!(items.len(), 2);
423		assert_eq!(items[0].key, make_key("test_a"));
424		assert_eq!(items[1].key, make_key("test_b"));
425	}
426
427	#[test]
428	fn test_prefix_increments_reads_metric() {
429		let parent = create_test_transaction();
430		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
431
432		assert_eq!(txn.metrics().reads, 0);
433
434		let prefix = make_key("test_");
435		let _ = txn.prefix(&prefix).unwrap();
436
437		assert_eq!(txn.metrics().reads, 1);
438	}
439
440	#[test]
441	fn test_multiple_read_operations_accumulate_metrics() {
442		let parent = create_test_transaction();
443		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
444
445		txn.get(&make_key("k1")).unwrap();
446		txn.contains_key(&make_key("k2")).unwrap();
447		let _ = txn.range(EncodedKeyRange::all()).unwrap();
448		let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
449		let _ = txn.range(range).unwrap();
450
451		assert_eq!(txn.metrics().reads, 4);
452	}
453}