1use std::ops::Bound::{Excluded, Included, Unbounded};
5
6use reifydb_core::{
7 common::CommitVersion,
8 encoded::{
9 encoded::EncodedValues,
10 key::{EncodedKey, EncodedKeyRange},
11 },
12 interface::store::{MultiVersionBatch, MultiVersionValues},
13 key::{Key, kind::KeyKind},
14};
15
16use super::{FlowTransaction, Pending};
17
18impl FlowTransaction {
19 pub fn get(&mut self, key: &EncodedKey) -> reifydb_type::Result<Option<EncodedValues>> {
21 if self.pending.is_removed(key) {
22 return Ok(None);
23 }
24
25 if let Some(value) = self.pending.get(key) {
26 return Ok(Some(value.clone()));
27 }
28
29 let query = if Self::is_flow_state_key(key) {
30 &mut self.state_query
31 } else {
32 &mut self.primitive_query
33 };
34
35 match query.get(key)? {
36 Some(multi) => Ok(Some(multi.values().clone())),
37 None => Ok(None),
38 }
39 }
40
41 pub fn contains_key(&mut self, key: &EncodedKey) -> reifydb_type::Result<bool> {
43 if self.pending.is_removed(key) {
44 return Ok(false);
45 }
46
47 if self.pending.get(key).is_some() {
48 return Ok(true);
49 }
50
51 let query = if Self::is_flow_state_key(key) {
52 &mut self.state_query
53 } else {
54 &mut self.primitive_query
55 };
56
57 query.contains_key(key)
58 }
59
60 pub fn prefix(&mut self, prefix: &EncodedKey) -> reifydb_type::Result<MultiVersionBatch> {
62 let range = EncodedKeyRange::prefix(prefix);
63 let items = self.range(range, 1024).collect::<Result<Vec<_>, _>>()?;
64 Ok(MultiVersionBatch {
65 items,
66 has_more: false,
67 })
68 }
69
70 fn is_flow_state_key(key: &EncodedKey) -> bool {
71 match Key::kind(&key) {
72 None => false,
73 Some(kind) => match kind {
74 KeyKind::FlowNodeState => true,
75 KeyKind::FlowNodeInternalState => true,
76 _ => false,
77 },
78 }
79 }
80
81 pub fn range(
88 &mut self,
89 range: EncodedKeyRange,
90 batch_size: usize,
91 ) -> Box<dyn Iterator<Item = reifydb_type::Result<MultiVersionValues>> + Send + '_> {
92 let pending: Vec<(EncodedKey, Pending)> = self
94 .pending
95 .range((range.start.as_ref(), range.end.as_ref()))
96 .map(|(k, v)| (k.clone(), v.clone()))
97 .collect();
98
99 let query = match range.start.as_ref() {
100 Included(start) | Excluded(start) => {
101 if Self::is_flow_state_key(start) {
102 &self.state_query
103 } else {
104 &self.primitive_query
105 }
106 }
107 Unbounded => &self.primitive_query,
108 };
109
110 let storage_iter = query.range(range, batch_size);
111 let version = self.version;
112
113 Box::new(flow_merge_pending_iterator(pending, storage_iter, version))
114 }
115
116 pub fn range_rev(
122 &mut self,
123 range: EncodedKeyRange,
124 batch_size: usize,
125 ) -> Box<dyn Iterator<Item = reifydb_type::Result<MultiVersionValues>> + Send + '_> {
126 let pending: Vec<(EncodedKey, Pending)> = self
128 .pending
129 .range((range.start.as_ref(), range.end.as_ref()))
130 .rev()
131 .map(|(k, v)| (k.clone(), v.clone()))
132 .collect();
133
134 let query = match range.start.as_ref() {
135 Included(start) | Excluded(start) => {
136 if Self::is_flow_state_key(start) {
137 &self.state_query
138 } else {
139 &self.primitive_query
140 }
141 }
142 Unbounded => &self.primitive_query,
143 };
144
145 let storage_iter = query.range_rev(range, batch_size);
146 let version = self.version;
147
148 Box::new(flow_merge_pending_iterator_rev(pending, storage_iter, version))
149 }
150}
151
152struct FlowMergePendingIterator<I>
154where
155 I: Iterator<Item = reifydb_type::Result<MultiVersionValues>>,
156{
157 storage_iter: std::iter::Peekable<I>,
158 pending_iter: std::iter::Peekable<std::vec::IntoIter<(EncodedKey, Pending)>>,
159 version: CommitVersion,
160}
161
162impl<I> Iterator for FlowMergePendingIterator<I>
163where
164 I: Iterator<Item = reifydb_type::Result<MultiVersionValues>>,
165{
166 type Item = reifydb_type::Result<MultiVersionValues>;
167
168 fn next(&mut self) -> Option<Self::Item> {
169 use std::cmp::Ordering;
170
171 loop {
172 let next_storage = self.storage_iter.peek();
173
174 match (self.pending_iter.peek(), next_storage) {
175 (Some((pending_key, _)), Some(storage_result)) => {
176 let storage_val = match storage_result {
177 Ok(v) => v,
178 Err(_) => {
179 let err = self.storage_iter.next().unwrap();
181 return Some(err.map_err(|e| e.into()));
182 }
183 };
184 let cmp = pending_key.cmp(&storage_val.key);
185
186 if matches!(cmp, Ordering::Less) {
187 let (key, value) = self.pending_iter.next().unwrap();
189 if let Pending::Set(values) = value {
190 return Some(Ok(MultiVersionValues {
191 key,
192 values,
193 version: self.version,
194 }));
195 }
196 } else if matches!(cmp, Ordering::Equal) {
198 let (key, value) = self.pending_iter.next().unwrap();
200 self.storage_iter.next(); if let Pending::Set(values) = value {
202 return Some(Ok(MultiVersionValues {
203 key,
204 values,
205 version: self.version,
206 }));
207 }
208 } else {
210 return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
212 }
213 }
214 (Some(_), None) => {
215 let (key, value) = self.pending_iter.next().unwrap();
217 if let Pending::Set(values) = value {
218 return Some(Ok(MultiVersionValues {
219 key,
220 values,
221 version: self.version,
222 }));
223 }
224 }
226 (None, Some(_)) => {
227 return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
229 }
230 (None, None) => return None,
231 }
232 }
233 }
234}
235
236fn flow_merge_pending_iterator<I>(
238 pending: Vec<(EncodedKey, Pending)>,
239 storage_iter: I,
240 version: CommitVersion,
241) -> FlowMergePendingIterator<I>
242where
243 I: Iterator<Item = reifydb_type::Result<MultiVersionValues>>,
244{
245 FlowMergePendingIterator {
246 storage_iter: storage_iter.peekable(),
247 pending_iter: pending.into_iter().peekable(),
248 version,
249 }
250}
251
252struct FlowMergePendingIteratorRev<I>
254where
255 I: Iterator<Item = reifydb_type::Result<MultiVersionValues>>,
256{
257 storage_iter: std::iter::Peekable<I>,
258 pending_iter: std::iter::Peekable<std::vec::IntoIter<(EncodedKey, Pending)>>,
259 version: CommitVersion,
260}
261
262impl<I> Iterator for FlowMergePendingIteratorRev<I>
263where
264 I: Iterator<Item = reifydb_type::Result<MultiVersionValues>>,
265{
266 type Item = reifydb_type::Result<MultiVersionValues>;
267
268 fn next(&mut self) -> Option<Self::Item> {
269 use std::cmp::Ordering;
270
271 loop {
272 let next_storage = self.storage_iter.peek();
273
274 match (self.pending_iter.peek(), next_storage) {
275 (Some((pending_key, _)), Some(storage_result)) => {
276 let storage_val = match storage_result {
277 Ok(v) => v,
278 Err(_) => {
279 let err = self.storage_iter.next().unwrap();
281 return Some(err.map_err(|e| e.into()));
282 }
283 };
284 let cmp = pending_key.cmp(&storage_val.key);
285
286 if matches!(cmp, Ordering::Greater) {
287 let (key, value) = self.pending_iter.next().unwrap();
289 if let Pending::Set(values) = value {
290 return Some(Ok(MultiVersionValues {
291 key,
292 values,
293 version: self.version,
294 }));
295 }
296 } else if matches!(cmp, Ordering::Equal) {
298 let (key, value) = self.pending_iter.next().unwrap();
300 self.storage_iter.next(); if let Pending::Set(values) = value {
302 return Some(Ok(MultiVersionValues {
303 key,
304 values,
305 version: self.version,
306 }));
307 }
308 } else {
310 return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
312 }
313 }
314 (Some(_), None) => {
315 let (key, value) = self.pending_iter.next().unwrap();
317 if let Pending::Set(values) = value {
318 return Some(Ok(MultiVersionValues {
319 key,
320 values,
321 version: self.version,
322 }));
323 }
324 }
326 (None, Some(_)) => {
327 return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
329 }
330 (None, None) => return None,
331 }
332 }
333 }
334}
335
336fn flow_merge_pending_iterator_rev<I>(
338 pending: Vec<(EncodedKey, Pending)>,
339 storage_iter: I,
340 version: CommitVersion,
341) -> FlowMergePendingIteratorRev<I>
342where
343 I: Iterator<Item = reifydb_type::Result<MultiVersionValues>>,
344{
345 FlowMergePendingIteratorRev {
346 storage_iter: storage_iter.peekable(),
347 pending_iter: pending.into_iter().peekable(),
348 version,
349 }
350}
351
352#[cfg(test)]
353pub mod tests {
354 use reifydb_catalog::catalog::Catalog;
355 use reifydb_core::encoded::{
356 encoded::EncodedValues,
357 key::{EncodedKey, EncodedKeyRange},
358 };
359 use reifydb_engine::test_utils::create_test_engine;
360 use reifydb_transaction::interceptor::interceptors::Interceptors;
361 use reifydb_type::util::cowvec::CowVec;
362
363 use super::*;
364 use crate::operator::stateful::test_utils::test::create_test_transaction;
365
366 fn make_key(s: &str) -> EncodedKey {
367 EncodedKey::new(s.as_bytes().to_vec())
368 }
369
370 fn make_value(s: &str) -> EncodedValues {
371 EncodedValues(CowVec::new(s.as_bytes().to_vec()))
372 }
373
374 #[test]
375 fn test_get_from_pending() {
376 let parent = create_test_transaction();
377 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
378
379 let key = make_key("key1");
380 let value = make_value("value1");
381
382 txn.set(&key, value.clone()).unwrap();
383
384 let result = txn.get(&key).unwrap();
386 assert_eq!(result, Some(value));
387 }
388
389 #[test]
390 fn test_get_from_committed() {
391 let engine = create_test_engine();
392
393 let key = make_key("key1");
394 let value = make_value("value1");
395
396 {
398 let mut cmd_txn = engine.begin_admin().unwrap();
399 cmd_txn.set(&key, value.clone()).unwrap();
400 cmd_txn.commit().unwrap();
401 }
402
403 let parent = engine.begin_admin().unwrap();
405 let version = parent.version();
406
407 let mut txn = FlowTransaction::new(&parent, version, Catalog::testing(), Interceptors::new());
409
410 let result = txn.get(&key).unwrap();
412 assert_eq!(result, Some(value));
413 }
414
415 #[test]
416 fn test_get_pending_shadows_committed() {
417 let mut parent = create_test_transaction();
418
419 let key = make_key("key1");
420 parent.set(&key, make_value("old")).unwrap();
421 let version = parent.version();
422
423 let mut txn = FlowTransaction::new(&parent, version, Catalog::testing(), Interceptors::new());
424
425 let new_value = make_value("new");
427 txn.set(&key, new_value.clone()).unwrap();
428
429 let result = txn.get(&key).unwrap();
431 assert_eq!(result, Some(new_value));
432 }
433
434 #[test]
435 fn test_get_removed_returns_none() {
436 let mut parent = create_test_transaction();
437
438 let key = make_key("key1");
439 parent.set(&key, make_value("value1")).unwrap();
440 let version = parent.version();
441
442 let mut txn = FlowTransaction::new(&parent, version, Catalog::testing(), Interceptors::new());
443
444 txn.remove(&key).unwrap();
446
447 let result = txn.get(&key).unwrap();
449 assert_eq!(result, None);
450 }
451
452 #[test]
453 fn test_get_nonexistent_key() {
454 let parent = create_test_transaction();
455 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
456
457 let result = txn.get(&make_key("missing")).unwrap();
458 assert_eq!(result, None);
459 }
460
461 #[test]
462 fn test_contains_key_pending() {
463 let parent = create_test_transaction();
464 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
465
466 let key = make_key("key1");
467 txn.set(&key, make_value("value1")).unwrap();
468
469 assert!(txn.contains_key(&key).unwrap());
470 }
471
472 #[test]
473 fn test_contains_key_committed() {
474 let engine = create_test_engine();
475
476 let key = make_key("key1");
477
478 {
480 let mut cmd_txn = engine.begin_admin().unwrap();
481 cmd_txn.set(&key, make_value("value1")).unwrap();
482 cmd_txn.commit().unwrap();
483 }
484
485 let parent = engine.begin_admin().unwrap();
487 let version = parent.version();
488 let mut txn = FlowTransaction::new(&parent, version, Catalog::testing(), Interceptors::new());
489
490 assert!(txn.contains_key(&key).unwrap());
491 }
492
493 #[test]
494 fn test_contains_key_removed_returns_false() {
495 let mut parent = create_test_transaction();
496
497 let key = make_key("key1");
498 parent.set(&key, make_value("value1")).unwrap();
499 let version = parent.version();
500
501 let mut txn = FlowTransaction::new(&parent, version, Catalog::testing(), Interceptors::new());
502 txn.remove(&key).unwrap();
503
504 assert!(!txn.contains_key(&key).unwrap());
505 }
506
507 #[test]
508 fn test_contains_key_nonexistent() {
509 let parent = create_test_transaction();
510 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
511
512 assert!(!txn.contains_key(&make_key("missing")).unwrap());
513 }
514
515 #[test]
516 fn test_scan_empty() {
517 let parent = create_test_transaction();
518 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
519
520 let mut iter = txn.range(EncodedKeyRange::all(), 1024);
521 assert!(iter.next().is_none());
522 }
523
524 #[test]
525 fn test_scan_only_pending() {
526 let parent = create_test_transaction();
527 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
528
529 txn.set(&make_key("b"), make_value("2")).unwrap();
530 txn.set(&make_key("a"), make_value("1")).unwrap();
531 txn.set(&make_key("c"), make_value("3")).unwrap();
532
533 let items: Vec<_> = txn.range(EncodedKeyRange::all(), 1024).collect::<Result<Vec<_>, _>>().unwrap();
534
535 assert_eq!(items.len(), 3);
537 assert_eq!(items[0].key, make_key("a"));
538 assert_eq!(items[1].key, make_key("b"));
539 assert_eq!(items[2].key, make_key("c"));
540 }
541
542 #[test]
543 fn test_scan_filters_removes() {
544 let parent = create_test_transaction();
545 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
546
547 txn.set(&make_key("a"), make_value("1")).unwrap();
548 txn.remove(&make_key("b")).unwrap();
549 txn.set(&make_key("c"), make_value("3")).unwrap();
550
551 let items: Vec<_> = txn.range(EncodedKeyRange::all(), 1024).collect::<Result<Vec<_>, _>>().unwrap();
552
553 assert_eq!(items.len(), 2);
555 assert_eq!(items[0].key, make_key("a"));
556 assert_eq!(items[1].key, make_key("c"));
557 }
558
559 #[test]
560 fn test_range_empty() {
561 let parent = create_test_transaction();
562 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
563
564 let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
565 let mut iter = txn.range(range, 1024);
566 assert!(iter.next().is_none());
567 }
568
569 #[test]
570 fn test_range_only_pending() {
571 let parent = create_test_transaction();
572 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
573
574 txn.set(&make_key("a"), make_value("1")).unwrap();
575 txn.set(&make_key("b"), make_value("2")).unwrap();
576 txn.set(&make_key("c"), make_value("3")).unwrap();
577 txn.set(&make_key("d"), make_value("4")).unwrap();
578
579 let range = EncodedKeyRange::new(Included(make_key("b")), Excluded(make_key("d")));
580 let items: Vec<_> = txn.range(range, 1024).collect::<Result<Vec<_>, _>>().unwrap();
581
582 assert_eq!(items.len(), 2);
584 assert_eq!(items[0].key, make_key("b"));
585 assert_eq!(items[1].key, make_key("c"));
586 }
587
588 #[test]
589 fn test_prefix_empty() {
590 let parent = create_test_transaction();
591 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
592
593 let prefix = make_key("test_");
594 let iter = txn.prefix(&prefix).unwrap();
595 assert!(iter.items.into_iter().next().is_none());
596 }
597
598 #[test]
599 fn test_prefix_only_pending() {
600 let parent = create_test_transaction();
601 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
602
603 txn.set(&make_key("test_a"), make_value("1")).unwrap();
604 txn.set(&make_key("test_b"), make_value("2")).unwrap();
605 txn.set(&make_key("other_c"), make_value("3")).unwrap();
606
607 let prefix = make_key("test_");
608 let iter = txn.prefix(&prefix).unwrap();
609 let items: Vec<_> = iter.items.into_iter().collect();
610
611 assert_eq!(items.len(), 2);
613 assert_eq!(items[0].key, make_key("test_a"));
614 assert_eq!(items[1].key, make_key("test_b"));
615 }
616}