reifydb_sub_flow/transaction/
read.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::ops::Bound::{Excluded, Included, Unbounded};
5
6use reifydb_core::{
7	EncodedKey, EncodedKeyRange,
8	interface::{Key, MultiVersionBatch, MultiVersionQueryTransaction},
9	key::KeyKind,
10	value::encoded::EncodedValues,
11};
12
13use super::{FlowTransaction, iter_range::collect_batch};
14
15impl FlowTransaction {
16	/// Get a value by key, checking pending writes first, then querying multi-version store
17	pub async fn get(&mut self, key: &EncodedKey) -> crate::Result<Option<EncodedValues>> {
18		self.metrics.increment_reads();
19
20		if self.pending.is_removed(key) {
21			return Ok(None);
22		}
23
24		if let Some(value) = self.pending.get(key) {
25			return Ok(Some(value.clone()));
26		}
27
28		let query = if Self::is_flow_state_key(key) {
29			&mut self.state_query
30		} else {
31			&mut self.source_query
32		};
33
34		match query.get(key).await? {
35			Some(multi) => Ok(Some(multi.values)),
36			None => Ok(None),
37		}
38	}
39
40	/// Check if a key exists
41	pub async fn contains_key(&mut self, key: &EncodedKey) -> crate::Result<bool> {
42		self.metrics.increment_reads();
43
44		if self.pending.is_removed(key) {
45			return Ok(false);
46		}
47
48		if self.pending.get(key).is_some() {
49			return Ok(true);
50		}
51
52		let query = if Self::is_flow_state_key(key) {
53			&mut self.state_query
54		} else {
55			&mut self.source_query
56		};
57
58		query.contains_key(key).await
59	}
60
61	/// Range query
62	pub async fn range(&mut self, range: EncodedKeyRange) -> crate::Result<MultiVersionBatch> {
63		self.metrics.increment_reads();
64
65		let pending = self.pending.range((range.start.as_ref(), range.end.as_ref()));
66
67		let query = match range.start.as_ref() {
68			Included(start) | Excluded(start) => {
69				if Self::is_flow_state_key(start) {
70					&mut self.state_query
71				} else {
72					&mut self.source_query
73				}
74			}
75			Unbounded => &mut self.source_query,
76		};
77		let committed_batch = query.range_batch(range, 1024).await?;
78
79		Ok(collect_batch(pending, committed_batch, self.version))
80	}
81
82	/// Range query with batching
83	pub async fn range_batched(
84		&mut self,
85		range: EncodedKeyRange,
86		batch_size: u64,
87	) -> crate::Result<MultiVersionBatch> {
88		self.metrics.increment_reads();
89
90		let pending = self.pending.range((range.start.as_ref(), range.end.as_ref()));
91
92		let query = match range.start.as_ref() {
93			Included(start) | Excluded(start) => {
94				if Self::is_flow_state_key(start) {
95					&mut self.state_query
96				} else {
97					&mut self.source_query
98				}
99			}
100			Unbounded => &mut self.source_query,
101		};
102		let committed_batch = query.range_batch(range, batch_size).await?;
103
104		Ok(collect_batch(pending, committed_batch, self.version))
105	}
106
107	/// Prefix scan
108	pub async fn prefix(&mut self, prefix: &EncodedKey) -> crate::Result<MultiVersionBatch> {
109		self.metrics.increment_reads();
110
111		let range = EncodedKeyRange::prefix(prefix);
112		let pending = self.pending.range((range.start.as_ref(), range.end.as_ref()));
113
114		let query = if Self::is_flow_state_key(prefix) {
115			&mut self.state_query
116		} else {
117			&mut self.source_query
118		};
119		let committed_batch = query.prefix(prefix).await?;
120
121		Ok(collect_batch(pending, committed_batch, self.version))
122	}
123
124	fn is_flow_state_key(key: &EncodedKey) -> bool {
125		match Key::kind(&key) {
126			None => false,
127			Some(kind) => match kind {
128				KeyKind::FlowNodeState => true,
129				KeyKind::FlowNodeInternalState => true,
130				_ => false,
131			},
132		}
133	}
134}
135
136#[cfg(test)]
137mod tests {
138	use reifydb_core::{
139		CommitVersion, CowVec, EncodedKey, EncodedKeyRange,
140		interface::{Engine, MultiVersionCommandTransaction, MultiVersionQueryTransaction},
141		value::encoded::EncodedValues,
142	};
143
144	use super::*;
145	use crate::operator::stateful::test_utils::test::create_test_transaction;
146
147	fn make_key(s: &str) -> EncodedKey {
148		EncodedKey::new(s.as_bytes().to_vec())
149	}
150
151	fn make_value(s: &str) -> EncodedValues {
152		EncodedValues(CowVec::new(s.as_bytes().to_vec()))
153	}
154
155	#[tokio::test]
156	async fn test_get_from_pending() {
157		let parent = create_test_transaction().await;
158		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
159
160		let key = make_key("key1");
161		let value = make_value("value1");
162
163		txn.set(&key, value.clone()).unwrap();
164
165		// Should get value from pending buffer
166		let result = txn.get(&key).await.unwrap();
167		assert_eq!(result, Some(value));
168	}
169
170	#[tokio::test]
171	async fn test_get_from_committed() {
172		use crate::operator::stateful::test_utils::test::create_test_engine;
173		let engine = create_test_engine().await;
174
175		let key = make_key("key1");
176		let value = make_value("value1");
177
178		// Set value in first transaction and commit
179		{
180			let mut cmd_txn = engine.begin_command().await.unwrap();
181			cmd_txn.set(&key, value.clone()).await.unwrap();
182			cmd_txn.commit().await.unwrap();
183		}
184
185		// Create new command transaction to read committed data
186		let parent = engine.begin_command().await.unwrap();
187		let version = parent.version();
188
189		// Create FlowTransaction - should see committed value
190		let mut txn = FlowTransaction::new(&parent, version).await;
191
192		// Should get value from query transaction
193		let result = txn.get(&key).await.unwrap();
194		assert_eq!(result, Some(value));
195	}
196
197	#[tokio::test]
198	async fn test_get_pending_shadows_committed() {
199		let mut parent = create_test_transaction().await;
200
201		let key = make_key("key1");
202		parent.set(&key, make_value("old")).await.unwrap();
203		let version = parent.version();
204
205		let mut txn = FlowTransaction::new(&parent, version).await;
206
207		// Override with new value in pending
208		let new_value = make_value("new");
209		txn.set(&key, new_value.clone()).unwrap();
210
211		// Should get new value from pending, not old value from committed
212		let result = txn.get(&key).await.unwrap();
213		assert_eq!(result, Some(new_value));
214	}
215
216	#[tokio::test]
217	async fn test_get_removed_returns_none() {
218		let mut parent = create_test_transaction().await;
219
220		let key = make_key("key1");
221		parent.set(&key, make_value("value1")).await.unwrap();
222		let version = parent.version();
223
224		let mut txn = FlowTransaction::new(&parent, version).await;
225
226		// Remove in pending
227		txn.remove(&key).unwrap();
228
229		// Should return None even though it exists in committed
230		let result = txn.get(&key).await.unwrap();
231		assert_eq!(result, None);
232	}
233
234	#[tokio::test]
235	async fn test_get_nonexistent_key() {
236		let parent = create_test_transaction().await;
237		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
238
239		let result = txn.get(&make_key("missing")).await.unwrap();
240		assert_eq!(result, None);
241	}
242
243	#[tokio::test]
244	async fn test_get_increments_reads_metric() {
245		let parent = create_test_transaction().await;
246		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
247
248		assert_eq!(txn.metrics().reads, 0);
249
250		txn.get(&make_key("key1")).await.unwrap();
251		assert_eq!(txn.metrics().reads, 1);
252
253		txn.get(&make_key("key2")).await.unwrap();
254		assert_eq!(txn.metrics().reads, 2);
255	}
256
257	#[tokio::test]
258	async fn test_contains_key_pending() {
259		let parent = create_test_transaction().await;
260		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
261
262		let key = make_key("key1");
263		txn.set(&key, make_value("value1")).unwrap();
264
265		assert!(txn.contains_key(&key).await.unwrap());
266	}
267
268	#[tokio::test]
269	async fn test_contains_key_committed() {
270		use crate::operator::stateful::test_utils::test::create_test_engine;
271		let engine = create_test_engine().await;
272
273		let key = make_key("key1");
274
275		// Set value in first transaction and commit
276		{
277			let mut cmd_txn = engine.begin_command().await.unwrap();
278			cmd_txn.set(&key, make_value("value1")).await.unwrap();
279			cmd_txn.commit().await.unwrap();
280		}
281
282		// Create new command transaction
283		let parent = engine.begin_command().await.unwrap();
284		let version = parent.version();
285		let mut txn = FlowTransaction::new(&parent, version).await;
286
287		assert!(txn.contains_key(&key).await.unwrap());
288	}
289
290	#[tokio::test]
291	async fn test_contains_key_removed_returns_false() {
292		let mut parent = create_test_transaction().await;
293
294		let key = make_key("key1");
295		parent.set(&key, make_value("value1")).await.unwrap();
296		let version = parent.version();
297
298		let mut txn = FlowTransaction::new(&parent, version).await;
299		txn.remove(&key).unwrap();
300
301		assert!(!txn.contains_key(&key).await.unwrap());
302	}
303
304	#[tokio::test]
305	async fn test_contains_key_nonexistent() {
306		let parent = create_test_transaction().await;
307		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
308
309		assert!(!txn.contains_key(&make_key("missing")).await.unwrap());
310	}
311
312	#[tokio::test]
313	async fn test_contains_key_increments_reads_metric() {
314		let parent = create_test_transaction().await;
315		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
316
317		assert_eq!(txn.metrics().reads, 0);
318
319		txn.contains_key(&make_key("key1")).await.unwrap();
320		assert_eq!(txn.metrics().reads, 1);
321
322		txn.contains_key(&make_key("key2")).await.unwrap();
323		assert_eq!(txn.metrics().reads, 2);
324	}
325
326	#[tokio::test]
327	async fn test_scan_empty() {
328		let parent = create_test_transaction().await;
329		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
330
331		let iter = txn.range(EncodedKeyRange::all()).await.unwrap();
332		assert!(iter.items.into_iter().next().is_none());
333	}
334
335	#[tokio::test]
336	async fn test_scan_only_pending() {
337		let parent = create_test_transaction().await;
338		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
339
340		txn.set(&make_key("b"), make_value("2")).unwrap();
341		txn.set(&make_key("a"), make_value("1")).unwrap();
342		txn.set(&make_key("c"), make_value("3")).unwrap();
343
344		let iter = txn.range(EncodedKeyRange::all()).await.unwrap();
345		let items: Vec<_> = iter.items.into_iter().collect();
346
347		// Should be in sorted order
348		assert_eq!(items.len(), 3);
349		assert_eq!(items[0].key, make_key("a"));
350		assert_eq!(items[1].key, make_key("b"));
351		assert_eq!(items[2].key, make_key("c"));
352	}
353
354	#[tokio::test]
355	async fn test_scan_filters_removes() {
356		let parent = create_test_transaction().await;
357		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
358
359		txn.set(&make_key("a"), make_value("1")).unwrap();
360		txn.remove(&make_key("b")).unwrap();
361		txn.set(&make_key("c"), make_value("3")).unwrap();
362
363		let iter = txn.range(EncodedKeyRange::all()).await.unwrap();
364		let items: Vec<_> = iter.items.into_iter().collect();
365
366		// Should only have 2 items (remove filtered out)
367		assert_eq!(items.len(), 2);
368		assert_eq!(items[0].key, make_key("a"));
369		assert_eq!(items[1].key, make_key("c"));
370	}
371
372	#[tokio::test]
373	async fn test_scan_increments_reads_metric() {
374		let parent = create_test_transaction().await;
375		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
376
377		assert_eq!(txn.metrics().reads, 0);
378		let _ = txn.range(EncodedKeyRange::all()).await.unwrap();
379		assert_eq!(txn.metrics().reads, 1);
380	}
381
382	#[tokio::test]
383	async fn test_range_empty() {
384		let parent = create_test_transaction().await;
385		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
386
387		let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
388		let iter = txn.range(range).await.unwrap();
389		assert!(iter.items.into_iter().next().is_none());
390	}
391
392	#[tokio::test]
393	async fn test_range_only_pending() {
394		let parent = create_test_transaction().await;
395		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
396
397		txn.set(&make_key("a"), make_value("1")).unwrap();
398		txn.set(&make_key("b"), make_value("2")).unwrap();
399		txn.set(&make_key("c"), make_value("3")).unwrap();
400		txn.set(&make_key("d"), make_value("4")).unwrap();
401
402		let range = EncodedKeyRange::new(Included(make_key("b")), Excluded(make_key("d")));
403		let iter = txn.range(range).await.unwrap();
404		let items: Vec<_> = iter.items.into_iter().collect();
405
406		// Should only include b and c (not d, exclusive end)
407		assert_eq!(items.len(), 2);
408		assert_eq!(items[0].key, make_key("b"));
409		assert_eq!(items[1].key, make_key("c"));
410	}
411
412	#[tokio::test]
413	async fn test_range_increments_reads_metric() {
414		let parent = create_test_transaction().await;
415		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
416
417		assert_eq!(txn.metrics().reads, 0);
418
419		let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
420		let _ = txn.range(range).await.unwrap();
421
422		assert_eq!(txn.metrics().reads, 1);
423	}
424
425	#[tokio::test]
426	async fn test_range_batched_increments_reads_metric() {
427		let parent = create_test_transaction().await;
428		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
429
430		assert_eq!(txn.metrics().reads, 0);
431
432		let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
433		let _ = txn.range_batched(range, 10).await.unwrap();
434
435		assert_eq!(txn.metrics().reads, 1);
436	}
437
438	#[tokio::test]
439	async fn test_prefix_empty() {
440		let parent = create_test_transaction().await;
441		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
442
443		let prefix = make_key("test_");
444		let iter = txn.prefix(&prefix).await.unwrap();
445		assert!(iter.items.into_iter().next().is_none());
446	}
447
448	#[tokio::test]
449	async fn test_prefix_only_pending() {
450		let parent = create_test_transaction().await;
451		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
452
453		txn.set(&make_key("test_a"), make_value("1")).unwrap();
454		txn.set(&make_key("test_b"), make_value("2")).unwrap();
455		txn.set(&make_key("other_c"), make_value("3")).unwrap();
456
457		let prefix = make_key("test_");
458		let iter = txn.prefix(&prefix).await.unwrap();
459		let items: Vec<_> = iter.items.into_iter().collect();
460
461		// Should only include keys with prefix "test_"
462		assert_eq!(items.len(), 2);
463		assert_eq!(items[0].key, make_key("test_a"));
464		assert_eq!(items[1].key, make_key("test_b"));
465	}
466
467	#[tokio::test]
468	async fn test_prefix_increments_reads_metric() {
469		let parent = create_test_transaction().await;
470		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
471
472		assert_eq!(txn.metrics().reads, 0);
473
474		let prefix = make_key("test_");
475		let _ = txn.prefix(&prefix).await.unwrap();
476
477		assert_eq!(txn.metrics().reads, 1);
478	}
479
480	#[tokio::test]
481	async fn test_multiple_read_operations_accumulate_metrics() {
482		let parent = create_test_transaction().await;
483		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
484
485		txn.get(&make_key("k1")).await.unwrap();
486		txn.contains_key(&make_key("k2")).await.unwrap();
487		let _ = txn.range(EncodedKeyRange::all()).await.unwrap();
488		let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
489		let _ = txn.range(range).await.unwrap();
490
491		assert_eq!(txn.metrics().reads, 4);
492	}
493}