reifydb_sub_flow/transaction/
read.rs1use std::ops::Bound::{Excluded, Included, Unbounded};
5
6use reifydb_core::{
7 EncodedKey, EncodedKeyRange,
8 interface::{Key, MultiVersionBatch, MultiVersionQueryTransaction},
9 key::KeyKind,
10 value::encoded::EncodedValues,
11};
12
13use super::{FlowTransaction, iter_range::collect_batch};
14
15impl FlowTransaction {
16 pub async fn get(&mut self, key: &EncodedKey) -> crate::Result<Option<EncodedValues>> {
18 self.metrics.increment_reads();
19
20 if self.pending.is_removed(key) {
21 return Ok(None);
22 }
23
24 if let Some(value) = self.pending.get(key) {
25 return Ok(Some(value.clone()));
26 }
27
28 let query = if Self::is_flow_state_key(key) {
29 &mut self.state_query
30 } else {
31 &mut self.source_query
32 };
33
34 match query.get(key).await? {
35 Some(multi) => Ok(Some(multi.values)),
36 None => Ok(None),
37 }
38 }
39
40 pub async fn contains_key(&mut self, key: &EncodedKey) -> crate::Result<bool> {
42 self.metrics.increment_reads();
43
44 if self.pending.is_removed(key) {
45 return Ok(false);
46 }
47
48 if self.pending.get(key).is_some() {
49 return Ok(true);
50 }
51
52 let query = if Self::is_flow_state_key(key) {
53 &mut self.state_query
54 } else {
55 &mut self.source_query
56 };
57
58 query.contains_key(key).await
59 }
60
61 pub async fn range(&mut self, range: EncodedKeyRange) -> crate::Result<MultiVersionBatch> {
63 self.metrics.increment_reads();
64
65 let pending = self.pending.range((range.start.as_ref(), range.end.as_ref()));
66
67 let query = match range.start.as_ref() {
68 Included(start) | Excluded(start) => {
69 if Self::is_flow_state_key(start) {
70 &mut self.state_query
71 } else {
72 &mut self.source_query
73 }
74 }
75 Unbounded => &mut self.source_query,
76 };
77 let committed_batch = query.range_batch(range, 1024).await?;
78
79 Ok(collect_batch(pending, committed_batch, self.version))
80 }
81
82 pub async fn range_batched(
84 &mut self,
85 range: EncodedKeyRange,
86 batch_size: u64,
87 ) -> crate::Result<MultiVersionBatch> {
88 self.metrics.increment_reads();
89
90 let pending = self.pending.range((range.start.as_ref(), range.end.as_ref()));
91
92 let query = match range.start.as_ref() {
93 Included(start) | Excluded(start) => {
94 if Self::is_flow_state_key(start) {
95 &mut self.state_query
96 } else {
97 &mut self.source_query
98 }
99 }
100 Unbounded => &mut self.source_query,
101 };
102 let committed_batch = query.range_batch(range, batch_size).await?;
103
104 Ok(collect_batch(pending, committed_batch, self.version))
105 }
106
107 pub async fn prefix(&mut self, prefix: &EncodedKey) -> crate::Result<MultiVersionBatch> {
109 self.metrics.increment_reads();
110
111 let range = EncodedKeyRange::prefix(prefix);
112 let pending = self.pending.range((range.start.as_ref(), range.end.as_ref()));
113
114 let query = if Self::is_flow_state_key(prefix) {
115 &mut self.state_query
116 } else {
117 &mut self.source_query
118 };
119 let committed_batch = query.prefix(prefix).await?;
120
121 Ok(collect_batch(pending, committed_batch, self.version))
122 }
123
124 fn is_flow_state_key(key: &EncodedKey) -> bool {
125 match Key::kind(&key) {
126 None => false,
127 Some(kind) => match kind {
128 KeyKind::FlowNodeState => true,
129 KeyKind::FlowNodeInternalState => true,
130 _ => false,
131 },
132 }
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use reifydb_core::{
139 CommitVersion, CowVec, EncodedKey, EncodedKeyRange,
140 interface::{Engine, MultiVersionCommandTransaction, MultiVersionQueryTransaction},
141 value::encoded::EncodedValues,
142 };
143
144 use super::*;
145 use crate::operator::stateful::test_utils::test::create_test_transaction;
146
147 fn make_key(s: &str) -> EncodedKey {
148 EncodedKey::new(s.as_bytes().to_vec())
149 }
150
151 fn make_value(s: &str) -> EncodedValues {
152 EncodedValues(CowVec::new(s.as_bytes().to_vec()))
153 }
154
155 #[tokio::test]
156 async fn test_get_from_pending() {
157 let parent = create_test_transaction().await;
158 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
159
160 let key = make_key("key1");
161 let value = make_value("value1");
162
163 txn.set(&key, value.clone()).unwrap();
164
165 let result = txn.get(&key).await.unwrap();
167 assert_eq!(result, Some(value));
168 }
169
170 #[tokio::test]
171 async fn test_get_from_committed() {
172 use crate::operator::stateful::test_utils::test::create_test_engine;
173 let engine = create_test_engine().await;
174
175 let key = make_key("key1");
176 let value = make_value("value1");
177
178 {
180 let mut cmd_txn = engine.begin_command().await.unwrap();
181 cmd_txn.set(&key, value.clone()).await.unwrap();
182 cmd_txn.commit().await.unwrap();
183 }
184
185 let parent = engine.begin_command().await.unwrap();
187 let version = parent.version();
188
189 let mut txn = FlowTransaction::new(&parent, version).await;
191
192 let result = txn.get(&key).await.unwrap();
194 assert_eq!(result, Some(value));
195 }
196
197 #[tokio::test]
198 async fn test_get_pending_shadows_committed() {
199 let mut parent = create_test_transaction().await;
200
201 let key = make_key("key1");
202 parent.set(&key, make_value("old")).await.unwrap();
203 let version = parent.version();
204
205 let mut txn = FlowTransaction::new(&parent, version).await;
206
207 let new_value = make_value("new");
209 txn.set(&key, new_value.clone()).unwrap();
210
211 let result = txn.get(&key).await.unwrap();
213 assert_eq!(result, Some(new_value));
214 }
215
216 #[tokio::test]
217 async fn test_get_removed_returns_none() {
218 let mut parent = create_test_transaction().await;
219
220 let key = make_key("key1");
221 parent.set(&key, make_value("value1")).await.unwrap();
222 let version = parent.version();
223
224 let mut txn = FlowTransaction::new(&parent, version).await;
225
226 txn.remove(&key).unwrap();
228
229 let result = txn.get(&key).await.unwrap();
231 assert_eq!(result, None);
232 }
233
234 #[tokio::test]
235 async fn test_get_nonexistent_key() {
236 let parent = create_test_transaction().await;
237 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
238
239 let result = txn.get(&make_key("missing")).await.unwrap();
240 assert_eq!(result, None);
241 }
242
243 #[tokio::test]
244 async fn test_get_increments_reads_metric() {
245 let parent = create_test_transaction().await;
246 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
247
248 assert_eq!(txn.metrics().reads, 0);
249
250 txn.get(&make_key("key1")).await.unwrap();
251 assert_eq!(txn.metrics().reads, 1);
252
253 txn.get(&make_key("key2")).await.unwrap();
254 assert_eq!(txn.metrics().reads, 2);
255 }
256
257 #[tokio::test]
258 async fn test_contains_key_pending() {
259 let parent = create_test_transaction().await;
260 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
261
262 let key = make_key("key1");
263 txn.set(&key, make_value("value1")).unwrap();
264
265 assert!(txn.contains_key(&key).await.unwrap());
266 }
267
268 #[tokio::test]
269 async fn test_contains_key_committed() {
270 use crate::operator::stateful::test_utils::test::create_test_engine;
271 let engine = create_test_engine().await;
272
273 let key = make_key("key1");
274
275 {
277 let mut cmd_txn = engine.begin_command().await.unwrap();
278 cmd_txn.set(&key, make_value("value1")).await.unwrap();
279 cmd_txn.commit().await.unwrap();
280 }
281
282 let parent = engine.begin_command().await.unwrap();
284 let version = parent.version();
285 let mut txn = FlowTransaction::new(&parent, version).await;
286
287 assert!(txn.contains_key(&key).await.unwrap());
288 }
289
290 #[tokio::test]
291 async fn test_contains_key_removed_returns_false() {
292 let mut parent = create_test_transaction().await;
293
294 let key = make_key("key1");
295 parent.set(&key, make_value("value1")).await.unwrap();
296 let version = parent.version();
297
298 let mut txn = FlowTransaction::new(&parent, version).await;
299 txn.remove(&key).unwrap();
300
301 assert!(!txn.contains_key(&key).await.unwrap());
302 }
303
304 #[tokio::test]
305 async fn test_contains_key_nonexistent() {
306 let parent = create_test_transaction().await;
307 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
308
309 assert!(!txn.contains_key(&make_key("missing")).await.unwrap());
310 }
311
312 #[tokio::test]
313 async fn test_contains_key_increments_reads_metric() {
314 let parent = create_test_transaction().await;
315 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
316
317 assert_eq!(txn.metrics().reads, 0);
318
319 txn.contains_key(&make_key("key1")).await.unwrap();
320 assert_eq!(txn.metrics().reads, 1);
321
322 txn.contains_key(&make_key("key2")).await.unwrap();
323 assert_eq!(txn.metrics().reads, 2);
324 }
325
326 #[tokio::test]
327 async fn test_scan_empty() {
328 let parent = create_test_transaction().await;
329 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
330
331 let iter = txn.range(EncodedKeyRange::all()).await.unwrap();
332 assert!(iter.items.into_iter().next().is_none());
333 }
334
335 #[tokio::test]
336 async fn test_scan_only_pending() {
337 let parent = create_test_transaction().await;
338 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
339
340 txn.set(&make_key("b"), make_value("2")).unwrap();
341 txn.set(&make_key("a"), make_value("1")).unwrap();
342 txn.set(&make_key("c"), make_value("3")).unwrap();
343
344 let iter = txn.range(EncodedKeyRange::all()).await.unwrap();
345 let items: Vec<_> = iter.items.into_iter().collect();
346
347 assert_eq!(items.len(), 3);
349 assert_eq!(items[0].key, make_key("a"));
350 assert_eq!(items[1].key, make_key("b"));
351 assert_eq!(items[2].key, make_key("c"));
352 }
353
354 #[tokio::test]
355 async fn test_scan_filters_removes() {
356 let parent = create_test_transaction().await;
357 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
358
359 txn.set(&make_key("a"), make_value("1")).unwrap();
360 txn.remove(&make_key("b")).unwrap();
361 txn.set(&make_key("c"), make_value("3")).unwrap();
362
363 let iter = txn.range(EncodedKeyRange::all()).await.unwrap();
364 let items: Vec<_> = iter.items.into_iter().collect();
365
366 assert_eq!(items.len(), 2);
368 assert_eq!(items[0].key, make_key("a"));
369 assert_eq!(items[1].key, make_key("c"));
370 }
371
372 #[tokio::test]
373 async fn test_scan_increments_reads_metric() {
374 let parent = create_test_transaction().await;
375 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
376
377 assert_eq!(txn.metrics().reads, 0);
378 let _ = txn.range(EncodedKeyRange::all()).await.unwrap();
379 assert_eq!(txn.metrics().reads, 1);
380 }
381
382 #[tokio::test]
383 async fn test_range_empty() {
384 let parent = create_test_transaction().await;
385 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
386
387 let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
388 let iter = txn.range(range).await.unwrap();
389 assert!(iter.items.into_iter().next().is_none());
390 }
391
392 #[tokio::test]
393 async fn test_range_only_pending() {
394 let parent = create_test_transaction().await;
395 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
396
397 txn.set(&make_key("a"), make_value("1")).unwrap();
398 txn.set(&make_key("b"), make_value("2")).unwrap();
399 txn.set(&make_key("c"), make_value("3")).unwrap();
400 txn.set(&make_key("d"), make_value("4")).unwrap();
401
402 let range = EncodedKeyRange::new(Included(make_key("b")), Excluded(make_key("d")));
403 let iter = txn.range(range).await.unwrap();
404 let items: Vec<_> = iter.items.into_iter().collect();
405
406 assert_eq!(items.len(), 2);
408 assert_eq!(items[0].key, make_key("b"));
409 assert_eq!(items[1].key, make_key("c"));
410 }
411
412 #[tokio::test]
413 async fn test_range_increments_reads_metric() {
414 let parent = create_test_transaction().await;
415 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
416
417 assert_eq!(txn.metrics().reads, 0);
418
419 let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
420 let _ = txn.range(range).await.unwrap();
421
422 assert_eq!(txn.metrics().reads, 1);
423 }
424
425 #[tokio::test]
426 async fn test_range_batched_increments_reads_metric() {
427 let parent = create_test_transaction().await;
428 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
429
430 assert_eq!(txn.metrics().reads, 0);
431
432 let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
433 let _ = txn.range_batched(range, 10).await.unwrap();
434
435 assert_eq!(txn.metrics().reads, 1);
436 }
437
438 #[tokio::test]
439 async fn test_prefix_empty() {
440 let parent = create_test_transaction().await;
441 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
442
443 let prefix = make_key("test_");
444 let iter = txn.prefix(&prefix).await.unwrap();
445 assert!(iter.items.into_iter().next().is_none());
446 }
447
448 #[tokio::test]
449 async fn test_prefix_only_pending() {
450 let parent = create_test_transaction().await;
451 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
452
453 txn.set(&make_key("test_a"), make_value("1")).unwrap();
454 txn.set(&make_key("test_b"), make_value("2")).unwrap();
455 txn.set(&make_key("other_c"), make_value("3")).unwrap();
456
457 let prefix = make_key("test_");
458 let iter = txn.prefix(&prefix).await.unwrap();
459 let items: Vec<_> = iter.items.into_iter().collect();
460
461 assert_eq!(items.len(), 2);
463 assert_eq!(items[0].key, make_key("test_a"));
464 assert_eq!(items[1].key, make_key("test_b"));
465 }
466
467 #[tokio::test]
468 async fn test_prefix_increments_reads_metric() {
469 let parent = create_test_transaction().await;
470 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
471
472 assert_eq!(txn.metrics().reads, 0);
473
474 let prefix = make_key("test_");
475 let _ = txn.prefix(&prefix).await.unwrap();
476
477 assert_eq!(txn.metrics().reads, 1);
478 }
479
480 #[tokio::test]
481 async fn test_multiple_read_operations_accumulate_metrics() {
482 let parent = create_test_transaction().await;
483 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
484
485 txn.get(&make_key("k1")).await.unwrap();
486 txn.contains_key(&make_key("k2")).await.unwrap();
487 let _ = txn.range(EncodedKeyRange::all()).await.unwrap();
488 let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
489 let _ = txn.range(range).await.unwrap();
490
491 assert_eq!(txn.metrics().reads, 4);
492 }
493}