1use std::{
5 cmp::Ordering,
6 collections, iter,
7 ops::Bound::{Excluded, Included, Unbounded},
8 vec,
9};
10
11use collections::BTreeMap;
12use iter::Peekable;
13use reifydb_core::{
14 common::CommitVersion,
15 encoded::{
16 encoded::EncodedValues,
17 key::{EncodedKey, EncodedKeyRange},
18 },
19 interface::store::{MultiVersionBatch, MultiVersionValues},
20 key::{Key, kind::KeyKind},
21};
22use reifydb_type::Result;
23use vec::IntoIter;
24
25use super::{FlowTransaction, PendingWrite};
26
27impl FlowTransaction {
28 pub fn get(&mut self, key: &EncodedKey) -> Result<Option<EncodedValues>> {
31 match self {
32 Self::Deferred {
33 pending,
34 primitive_query,
35 state_query,
36 ..
37 } => {
38 if pending.is_removed(key) {
40 return Ok(None);
41 }
42 if let Some(value) = pending.get(key) {
43 return Ok(Some(value.clone()));
44 }
45
46 let query = if Self::is_flow_state_key(key) {
48 state_query
49 } else {
50 primitive_query
51 };
52 match query.get(key)? {
53 Some(multi) => Ok(Some(multi.values().clone())),
54 None => Ok(None),
55 }
56 }
57 Self::Transactional {
58 pending,
59 base_pending,
60 primitive_query,
61 state_query,
62 ..
63 } => {
64 if pending.is_removed(key) {
66 return Ok(None);
67 }
68 if let Some(value) = pending.get(key) {
69 return Ok(Some(value.clone()));
70 }
71
72 if base_pending.is_removed(key) {
74 return Ok(None);
75 }
76 if let Some(value) = base_pending.get(key) {
77 return Ok(Some(value.clone()));
78 }
79
80 let query = if Self::is_flow_state_key(key) {
82 state_query
83 } else {
84 primitive_query
85 };
86 match query.get(key)? {
87 Some(multi) => Ok(Some(multi.values().clone())),
88 None => Ok(None),
89 }
90 }
91 }
92 }
93
94 pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
96 match self {
97 Self::Deferred {
98 pending,
99 primitive_query,
100 state_query,
101 ..
102 } => {
103 if pending.is_removed(key) {
104 return Ok(false);
105 }
106 if pending.get(key).is_some() {
107 return Ok(true);
108 }
109
110 let query = if Self::is_flow_state_key(key) {
111 state_query
112 } else {
113 primitive_query
114 };
115 query.contains_key(key)
116 }
117 Self::Transactional {
118 pending,
119 base_pending,
120 primitive_query,
121 state_query,
122 ..
123 } => {
124 if pending.is_removed(key) {
125 return Ok(false);
126 }
127 if pending.get(key).is_some() {
128 return Ok(true);
129 }
130
131 if base_pending.is_removed(key) {
132 return Ok(false);
133 }
134 if base_pending.get(key).is_some() {
135 return Ok(true);
136 }
137
138 let query = if Self::is_flow_state_key(key) {
139 state_query
140 } else {
141 primitive_query
142 };
143 query.contains_key(key)
144 }
145 }
146 }
147
148 pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
150 let range = EncodedKeyRange::prefix(prefix);
151 let items = self.range(range, 1024).collect::<Result<Vec<_>>>()?;
152 Ok(MultiVersionBatch {
153 items,
154 has_more: false,
155 })
156 }
157
158 fn is_flow_state_key(key: &EncodedKey) -> bool {
159 match Key::kind(&key) {
160 None => false,
161 Some(kind) => match kind {
162 KeyKind::FlowNodeState => true,
163 KeyKind::FlowNodeInternalState => true,
164 _ => false,
165 },
166 }
167 }
168
169 pub fn range(
176 &mut self,
177 range: EncodedKeyRange,
178 batch_size: usize,
179 ) -> Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_> {
180 match self {
181 Self::Deferred {
182 pending,
183 version,
184 primitive_query,
185 state_query,
186 ..
187 } => {
188 let merged: BTreeMap<EncodedKey, PendingWrite> = pending
189 .range((range.start.as_ref(), range.end.as_ref()))
190 .map(|(k, v)| (k.clone(), v.clone()))
191 .collect();
192 let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().collect();
193
194 let query = match range.start.as_ref() {
195 Included(start) | Excluded(start) => {
196 if Self::is_flow_state_key(start) {
197 &*state_query
198 } else {
199 &*primitive_query
200 }
201 }
202 Unbounded => &*primitive_query,
203 };
204
205 let storage_iter = query.range(range, batch_size);
206 let v = *version;
207 Box::new(flow_merge_pending_iterator(pending_vec, storage_iter, v))
208 }
209 Self::Transactional {
210 pending,
211 base_pending,
212 version,
213 primitive_query,
214 state_query,
215 ..
216 } => {
217 let mut merged: BTreeMap<EncodedKey, PendingWrite> = base_pending
219 .range((range.start.as_ref(), range.end.as_ref()))
220 .map(|(k, v)| (k.clone(), v.clone()))
221 .collect();
222 for (k, v) in pending.range((range.start.as_ref(), range.end.as_ref())) {
223 merged.insert(k.clone(), v.clone());
224 }
225 let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().collect();
226
227 let query = match range.start.as_ref() {
228 Included(start) | Excluded(start) => {
229 if Self::is_flow_state_key(start) {
230 &*state_query
231 } else {
232 &*primitive_query
233 }
234 }
235 Unbounded => &*primitive_query,
236 };
237
238 let storage_iter = query.range(range, batch_size);
239 let v = *version;
240 Box::new(flow_merge_pending_iterator(pending_vec, storage_iter, v))
241 }
242 }
243 }
244
245 pub fn range_rev(
251 &mut self,
252 range: EncodedKeyRange,
253 batch_size: usize,
254 ) -> Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_> {
255 match self {
256 Self::Deferred {
257 pending,
258 version,
259 primitive_query,
260 state_query,
261 ..
262 } => {
263 let merged: BTreeMap<EncodedKey, PendingWrite> = pending
264 .range((range.start.as_ref(), range.end.as_ref()))
265 .map(|(k, v)| (k.clone(), v.clone()))
266 .collect();
267 let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().rev().collect();
268
269 let query = match range.start.as_ref() {
270 Included(start) | Excluded(start) => {
271 if Self::is_flow_state_key(start) {
272 &*state_query
273 } else {
274 &*primitive_query
275 }
276 }
277 Unbounded => &*primitive_query,
278 };
279
280 let storage_iter = query.range_rev(range, batch_size);
281 let v = *version;
282 Box::new(flow_merge_pending_iterator_rev(pending_vec, storage_iter, v))
283 }
284 Self::Transactional {
285 pending,
286 base_pending,
287 version,
288 primitive_query,
289 state_query,
290 ..
291 } => {
292 let mut merged: BTreeMap<EncodedKey, PendingWrite> = base_pending
293 .range((range.start.as_ref(), range.end.as_ref()))
294 .map(|(k, v)| (k.clone(), v.clone()))
295 .collect();
296 for (k, v) in pending.range((range.start.as_ref(), range.end.as_ref())) {
297 merged.insert(k.clone(), v.clone());
298 }
299 let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().rev().collect();
300
301 let query = match range.start.as_ref() {
302 Included(start) | Excluded(start) => {
303 if Self::is_flow_state_key(start) {
304 &*state_query
305 } else {
306 &*primitive_query
307 }
308 }
309 Unbounded => &*primitive_query,
310 };
311
312 let storage_iter = query.range_rev(range, batch_size);
313 let v = *version;
314 Box::new(flow_merge_pending_iterator_rev(pending_vec, storage_iter, v))
315 }
316 }
317 }
318}
319
320struct FlowMergePendingIterator<I>
322where
323 I: Iterator<Item = Result<MultiVersionValues>>,
324{
325 storage_iter: Peekable<I>,
326 pending_iter: Peekable<IntoIter<(EncodedKey, PendingWrite)>>,
327 version: CommitVersion,
328}
329
330impl<I> Iterator for FlowMergePendingIterator<I>
331where
332 I: Iterator<Item = Result<MultiVersionValues>>,
333{
334 type Item = Result<MultiVersionValues>;
335
336 fn next(&mut self) -> Option<Self::Item> {
337 loop {
338 let next_storage = self.storage_iter.peek();
339
340 match (self.pending_iter.peek(), next_storage) {
341 (Some((pending_key, _)), Some(storage_result)) => {
342 let storage_val = match storage_result {
343 Ok(v) => v,
344 Err(_) => {
345 let err = self.storage_iter.next().unwrap();
347 return Some(err.map_err(|e| e.into()));
348 }
349 };
350 let cmp = pending_key.cmp(&storage_val.key);
351
352 if matches!(cmp, Ordering::Less) {
353 let (key, value) = self.pending_iter.next().unwrap();
355 if let PendingWrite::Set(values) = value {
356 return Some(Ok(MultiVersionValues {
357 key,
358 values,
359 version: self.version,
360 }));
361 }
362 } else if matches!(cmp, Ordering::Equal) {
364 let (key, value) = self.pending_iter.next().unwrap();
366 self.storage_iter.next(); if let PendingWrite::Set(values) = value {
368 return Some(Ok(MultiVersionValues {
369 key,
370 values,
371 version: self.version,
372 }));
373 }
374 } else {
376 return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
378 }
379 }
380 (Some(_), None) => {
381 let (key, value) = self.pending_iter.next().unwrap();
383 if let PendingWrite::Set(values) = value {
384 return Some(Ok(MultiVersionValues {
385 key,
386 values,
387 version: self.version,
388 }));
389 }
390 }
392 (None, Some(_)) => {
393 return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
395 }
396 (None, None) => return None,
397 }
398 }
399 }
400}
401
402fn flow_merge_pending_iterator<I>(
404 pending: Vec<(EncodedKey, PendingWrite)>,
405 storage_iter: I,
406 version: CommitVersion,
407) -> FlowMergePendingIterator<I>
408where
409 I: Iterator<Item = Result<MultiVersionValues>>,
410{
411 FlowMergePendingIterator {
412 storage_iter: storage_iter.peekable(),
413 pending_iter: pending.into_iter().peekable(),
414 version,
415 }
416}
417
418struct FlowMergePendingIteratorRev<I>
420where
421 I: Iterator<Item = Result<MultiVersionValues>>,
422{
423 storage_iter: Peekable<I>,
424 pending_iter: Peekable<IntoIter<(EncodedKey, PendingWrite)>>,
425 version: CommitVersion,
426}
427
428impl<I> Iterator for FlowMergePendingIteratorRev<I>
429where
430 I: Iterator<Item = Result<MultiVersionValues>>,
431{
432 type Item = Result<MultiVersionValues>;
433
434 fn next(&mut self) -> Option<Self::Item> {
435 loop {
436 let next_storage = self.storage_iter.peek();
437
438 match (self.pending_iter.peek(), next_storage) {
439 (Some((pending_key, _)), Some(storage_result)) => {
440 let storage_val = match storage_result {
441 Ok(v) => v,
442 Err(_) => {
443 let err = self.storage_iter.next().unwrap();
445 return Some(err.map_err(|e| e.into()));
446 }
447 };
448 let cmp = pending_key.cmp(&storage_val.key);
449
450 if matches!(cmp, Ordering::Greater) {
451 let (key, value) = self.pending_iter.next().unwrap();
453 if let PendingWrite::Set(values) = value {
454 return Some(Ok(MultiVersionValues {
455 key,
456 values,
457 version: self.version,
458 }));
459 }
460 } else if matches!(cmp, Ordering::Equal) {
462 let (key, value) = self.pending_iter.next().unwrap();
464 self.storage_iter.next(); if let PendingWrite::Set(values) = value {
466 return Some(Ok(MultiVersionValues {
467 key,
468 values,
469 version: self.version,
470 }));
471 }
472 } else {
474 return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
476 }
477 }
478 (Some(_), None) => {
479 let (key, value) = self.pending_iter.next().unwrap();
481 if let PendingWrite::Set(values) = value {
482 return Some(Ok(MultiVersionValues {
483 key,
484 values,
485 version: self.version,
486 }));
487 }
488 }
490 (None, Some(_)) => {
491 return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
493 }
494 (None, None) => return None,
495 }
496 }
497 }
498}
499
500fn flow_merge_pending_iterator_rev<I>(
502 pending: Vec<(EncodedKey, PendingWrite)>,
503 storage_iter: I,
504 version: CommitVersion,
505) -> FlowMergePendingIteratorRev<I>
506where
507 I: Iterator<Item = Result<MultiVersionValues>>,
508{
509 FlowMergePendingIteratorRev {
510 storage_iter: storage_iter.peekable(),
511 pending_iter: pending.into_iter().peekable(),
512 version,
513 }
514}
515
516#[cfg(test)]
517pub mod tests {
518 use reifydb_catalog::catalog::Catalog;
519 use reifydb_core::encoded::{
520 encoded::EncodedValues,
521 key::{EncodedKey, EncodedKeyRange},
522 };
523 use reifydb_engine::test_utils::create_test_engine;
524 use reifydb_transaction::interceptor::interceptors::Interceptors;
525 use reifydb_type::util::cowvec::CowVec;
526
527 use super::*;
528 use crate::operator::stateful::test_utils::test::create_test_transaction;
529
530 fn make_key(s: &str) -> EncodedKey {
531 EncodedKey::new(s.as_bytes().to_vec())
532 }
533
534 fn make_value(s: &str) -> EncodedValues {
535 EncodedValues(CowVec::new(s.as_bytes().to_vec()))
536 }
537
538 #[test]
539 fn test_get_from_pending() {
540 let parent = create_test_transaction();
541 let mut txn =
542 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
543
544 let key = make_key("key1");
545 let value = make_value("value1");
546
547 txn.set(&key, value.clone()).unwrap();
548
549 let result = txn.get(&key).unwrap();
551 assert_eq!(result, Some(value));
552 }
553
554 #[test]
555 fn test_get_from_committed() {
556 let engine = create_test_engine();
557
558 let key = make_key("key1");
559 let value = make_value("value1");
560
561 {
563 let mut cmd_txn = engine.begin_admin().unwrap();
564 cmd_txn.set(&key, value.clone()).unwrap();
565 cmd_txn.commit().unwrap();
566 }
567
568 let parent = engine.begin_admin().unwrap();
570 let version = parent.version();
571
572 let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
574
575 let result = txn.get(&key).unwrap();
577 assert_eq!(result, Some(value));
578 }
579
580 #[test]
581 fn test_get_pending_shadows_committed() {
582 let mut parent = create_test_transaction();
583
584 let key = make_key("key1");
585 parent.set(&key, make_value("old")).unwrap();
586 let version = parent.version();
587
588 let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
589
590 let new_value = make_value("new");
592 txn.set(&key, new_value.clone()).unwrap();
593
594 let result = txn.get(&key).unwrap();
596 assert_eq!(result, Some(new_value));
597 }
598
599 #[test]
600 fn test_get_removed_returns_none() {
601 let mut parent = create_test_transaction();
602
603 let key = make_key("key1");
604 parent.set(&key, make_value("value1")).unwrap();
605 let version = parent.version();
606
607 let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
608
609 txn.remove(&key).unwrap();
611
612 let result = txn.get(&key).unwrap();
614 assert_eq!(result, None);
615 }
616
617 #[test]
618 fn test_get_nonexistent_key() {
619 let parent = create_test_transaction();
620 let mut txn =
621 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
622
623 let result = txn.get(&make_key("missing")).unwrap();
624 assert_eq!(result, None);
625 }
626
627 #[test]
628 fn test_contains_key_pending() {
629 let parent = create_test_transaction();
630 let mut txn =
631 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
632
633 let key = make_key("key1");
634 txn.set(&key, make_value("value1")).unwrap();
635
636 assert!(txn.contains_key(&key).unwrap());
637 }
638
639 #[test]
640 fn test_contains_key_committed() {
641 let engine = create_test_engine();
642
643 let key = make_key("key1");
644
645 {
647 let mut cmd_txn = engine.begin_admin().unwrap();
648 cmd_txn.set(&key, make_value("value1")).unwrap();
649 cmd_txn.commit().unwrap();
650 }
651
652 let parent = engine.begin_admin().unwrap();
654 let version = parent.version();
655 let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
656
657 assert!(txn.contains_key(&key).unwrap());
658 }
659
660 #[test]
661 fn test_contains_key_removed_returns_false() {
662 let mut parent = create_test_transaction();
663
664 let key = make_key("key1");
665 parent.set(&key, make_value("value1")).unwrap();
666 let version = parent.version();
667
668 let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
669 txn.remove(&key).unwrap();
670
671 assert!(!txn.contains_key(&key).unwrap());
672 }
673
674 #[test]
675 fn test_contains_key_nonexistent() {
676 let parent = create_test_transaction();
677 let mut txn =
678 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
679
680 assert!(!txn.contains_key(&make_key("missing")).unwrap());
681 }
682
683 #[test]
684 fn test_scan_empty() {
685 let parent = create_test_transaction();
686 let mut txn =
687 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
688
689 let mut iter = txn.range(EncodedKeyRange::all(), 1024);
690 assert!(iter.next().is_none());
691 }
692
693 #[test]
694 fn test_scan_only_pending() {
695 let parent = create_test_transaction();
696 let mut txn =
697 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
698
699 txn.set(&make_key("b"), make_value("2")).unwrap();
700 txn.set(&make_key("a"), make_value("1")).unwrap();
701 txn.set(&make_key("c"), make_value("3")).unwrap();
702
703 let items: Vec<_> = txn.range(EncodedKeyRange::all(), 1024).collect::<Result<Vec<_>>>().unwrap();
704
705 assert_eq!(items.len(), 3);
707 assert_eq!(items[0].key, make_key("a"));
708 assert_eq!(items[1].key, make_key("b"));
709 assert_eq!(items[2].key, make_key("c"));
710 }
711
712 #[test]
713 fn test_scan_filters_removes() {
714 let parent = create_test_transaction();
715 let mut txn =
716 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
717
718 txn.set(&make_key("a"), make_value("1")).unwrap();
719 txn.remove(&make_key("b")).unwrap();
720 txn.set(&make_key("c"), make_value("3")).unwrap();
721
722 let items: Vec<_> = txn.range(EncodedKeyRange::all(), 1024).collect::<Result<Vec<_>>>().unwrap();
723
724 assert_eq!(items.len(), 2);
726 assert_eq!(items[0].key, make_key("a"));
727 assert_eq!(items[1].key, make_key("c"));
728 }
729
730 #[test]
731 fn test_range_empty() {
732 let parent = create_test_transaction();
733 let mut txn =
734 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
735
736 let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
737 let mut iter = txn.range(range, 1024);
738 assert!(iter.next().is_none());
739 }
740
741 #[test]
742 fn test_range_only_pending() {
743 let parent = create_test_transaction();
744 let mut txn =
745 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
746
747 txn.set(&make_key("a"), make_value("1")).unwrap();
748 txn.set(&make_key("b"), make_value("2")).unwrap();
749 txn.set(&make_key("c"), make_value("3")).unwrap();
750 txn.set(&make_key("d"), make_value("4")).unwrap();
751
752 let range = EncodedKeyRange::new(Included(make_key("b")), Excluded(make_key("d")));
753 let items: Vec<_> = txn.range(range, 1024).collect::<Result<Vec<_>>>().unwrap();
754
755 assert_eq!(items.len(), 2);
757 assert_eq!(items[0].key, make_key("b"));
758 assert_eq!(items[1].key, make_key("c"));
759 }
760
761 #[test]
762 fn test_prefix_empty() {
763 let parent = create_test_transaction();
764 let mut txn =
765 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
766
767 let prefix = make_key("test_");
768 let iter = txn.prefix(&prefix).unwrap();
769 assert!(iter.items.into_iter().next().is_none());
770 }
771
772 #[test]
773 fn test_prefix_only_pending() {
774 let parent = create_test_transaction();
775 let mut txn =
776 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
777
778 txn.set(&make_key("test_a"), make_value("1")).unwrap();
779 txn.set(&make_key("test_b"), make_value("2")).unwrap();
780 txn.set(&make_key("other_c"), make_value("3")).unwrap();
781
782 let prefix = make_key("test_");
783 let iter = txn.prefix(&prefix).unwrap();
784 let items: Vec<_> = iter.items.into_iter().collect();
785
786 assert_eq!(items.len(), 2);
788 assert_eq!(items[0].key, make_key("test_a"));
789 assert_eq!(items[1].key, make_key("test_b"));
790 }
791}