1use std::{
5 cmp::Ordering,
6 collections, iter,
7 ops::Bound::{Excluded, Included, Unbounded},
8 vec,
9};
10
11use collections::BTreeMap;
12use iter::Peekable;
13use reifydb_core::{
14 common::CommitVersion,
15 encoded::{
16 key::{EncodedKey, EncodedKeyRange},
17 row::EncodedRow,
18 },
19 interface::store::{MultiVersionBatch, MultiVersionRow},
20 key::{Key, kind::KeyKind},
21};
22use reifydb_type::Result;
23use vec::IntoIter;
24
25use super::{FlowTransaction, PendingWrite};
26
27impl FlowTransaction {
28 pub fn get(&mut self, key: &EncodedKey) -> Result<Option<EncodedRow>> {
31 let inner = self.inner();
33 if inner.pending.is_removed(key) {
34 return Ok(None);
35 }
36 if let Some(value) = inner.pending.get(key) {
37 return Ok(Some(value.clone()));
38 }
39
40 if let Self::Transactional {
42 base_pending,
43 ..
44 } = self
45 {
46 if base_pending.is_removed(key) {
47 return Ok(None);
48 }
49 if let Some(value) = base_pending.get(key) {
50 return Ok(Some(value.clone()));
51 }
52 }
53
54 let inner = self.inner_mut();
56 let query = if Self::is_flow_state_key(key) {
57 &inner.state_query
58 } else {
59 &inner.primitive_query
60 };
61 match query.get(key)? {
62 Some(multi) => Ok(Some(multi.row().clone())),
63 None => Ok(None),
64 }
65 }
66
67 pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
69 let inner = self.inner();
70 if inner.pending.is_removed(key) {
71 return Ok(false);
72 }
73 if inner.pending.get(key).is_some() {
74 return Ok(true);
75 }
76
77 if let Self::Transactional {
78 base_pending,
79 ..
80 } = self
81 {
82 if base_pending.is_removed(key) {
83 return Ok(false);
84 }
85 if base_pending.get(key).is_some() {
86 return Ok(true);
87 }
88 }
89
90 let inner = self.inner_mut();
91 let query = if Self::is_flow_state_key(key) {
92 &inner.state_query
93 } else {
94 &inner.primitive_query
95 };
96 query.contains_key(key)
97 }
98
99 pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
101 let range = EncodedKeyRange::prefix(prefix);
102 let items = self.range(range, 1024).collect::<Result<Vec<_>>>()?;
103 Ok(MultiVersionBatch {
104 items,
105 has_more: false,
106 })
107 }
108
109 fn is_flow_state_key(key: &EncodedKey) -> bool {
110 match Key::kind(&key) {
111 None => false,
112 Some(kind) => match kind {
113 KeyKind::FlowNodeState => true,
114 KeyKind::FlowNodeInternalState => true,
115 _ => false,
116 },
117 }
118 }
119
120 pub fn range(
127 &mut self,
128 range: EncodedKeyRange,
129 batch_size: usize,
130 ) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_> {
131 match self {
132 Self::Deferred {
133 inner,
134 ..
135 } => {
136 let merged: BTreeMap<EncodedKey, PendingWrite> = inner
137 .pending
138 .range((range.start.as_ref(), range.end.as_ref()))
139 .map(|(k, v)| (k.clone(), v.clone()))
140 .collect();
141 let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().collect();
142
143 let query = match range.start.as_ref() {
144 Included(start) | Excluded(start) => {
145 if Self::is_flow_state_key(start) {
146 &inner.state_query
147 } else {
148 &inner.primitive_query
149 }
150 }
151 Unbounded => &inner.primitive_query,
152 };
153
154 let storage_iter = query.range(range, batch_size);
155 let v = inner.version;
156 Box::new(flow_merge_pending_iterator(pending_vec, storage_iter, v))
157 }
158 Self::Transactional {
159 inner,
160 base_pending,
161 ..
162 } => {
163 let mut merged: BTreeMap<EncodedKey, PendingWrite> = base_pending
165 .range((range.start.as_ref(), range.end.as_ref()))
166 .map(|(k, v)| (k.clone(), v.clone()))
167 .collect();
168 for (k, v) in inner.pending.range((range.start.as_ref(), range.end.as_ref())) {
169 merged.insert(k.clone(), v.clone());
170 }
171 let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().collect();
172
173 let query = match range.start.as_ref() {
174 Included(start) | Excluded(start) => {
175 if Self::is_flow_state_key(start) {
176 &inner.state_query
177 } else {
178 &inner.primitive_query
179 }
180 }
181 Unbounded => &inner.primitive_query,
182 };
183
184 let storage_iter = query.range(range, batch_size);
185 let v = inner.version;
186 Box::new(flow_merge_pending_iterator(pending_vec, storage_iter, v))
187 }
188 }
189 }
190
191 pub fn range_rev(
197 &mut self,
198 range: EncodedKeyRange,
199 batch_size: usize,
200 ) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_> {
201 match self {
202 Self::Deferred {
203 inner,
204 ..
205 } => {
206 let merged: BTreeMap<EncodedKey, PendingWrite> = inner
207 .pending
208 .range((range.start.as_ref(), range.end.as_ref()))
209 .map(|(k, v)| (k.clone(), v.clone()))
210 .collect();
211 let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().rev().collect();
212
213 let query = match range.start.as_ref() {
214 Included(start) | Excluded(start) => {
215 if Self::is_flow_state_key(start) {
216 &inner.state_query
217 } else {
218 &inner.primitive_query
219 }
220 }
221 Unbounded => &inner.primitive_query,
222 };
223
224 let storage_iter = query.range_rev(range, batch_size);
225 let v = inner.version;
226 Box::new(flow_merge_pending_iterator_rev(pending_vec, storage_iter, v))
227 }
228 Self::Transactional {
229 inner,
230 base_pending,
231 ..
232 } => {
233 let mut merged: BTreeMap<EncodedKey, PendingWrite> = base_pending
234 .range((range.start.as_ref(), range.end.as_ref()))
235 .map(|(k, v)| (k.clone(), v.clone()))
236 .collect();
237 for (k, v) in inner.pending.range((range.start.as_ref(), range.end.as_ref())) {
238 merged.insert(k.clone(), v.clone());
239 }
240 let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().rev().collect();
241
242 let query = match range.start.as_ref() {
243 Included(start) | Excluded(start) => {
244 if Self::is_flow_state_key(start) {
245 &inner.state_query
246 } else {
247 &inner.primitive_query
248 }
249 }
250 Unbounded => &inner.primitive_query,
251 };
252
253 let storage_iter = query.range_rev(range, batch_size);
254 let v = inner.version;
255 Box::new(flow_merge_pending_iterator_rev(pending_vec, storage_iter, v))
256 }
257 }
258 }
259}
260
261struct FlowMergePendingIterator<I>
263where
264 I: Iterator<Item = Result<MultiVersionRow>>,
265{
266 storage_iter: Peekable<I>,
267 pending_iter: Peekable<IntoIter<(EncodedKey, PendingWrite)>>,
268 version: CommitVersion,
269}
270
271impl<I> Iterator for FlowMergePendingIterator<I>
272where
273 I: Iterator<Item = Result<MultiVersionRow>>,
274{
275 type Item = Result<MultiVersionRow>;
276
277 fn next(&mut self) -> Option<Self::Item> {
278 loop {
279 let next_storage = self.storage_iter.peek();
280
281 match (self.pending_iter.peek(), next_storage) {
282 (Some((pending_key, _)), Some(storage_result)) => {
283 let storage_val = match storage_result {
284 Ok(v) => v,
285 Err(_) => {
286 let err = self.storage_iter.next().unwrap();
288 return Some(err.map_err(|e| e.into()));
289 }
290 };
291 let cmp = pending_key.cmp(&storage_val.key);
292
293 if matches!(cmp, Ordering::Less) {
294 let (key, value) = self.pending_iter.next().unwrap();
296 if let PendingWrite::Set(row) = value {
297 return Some(Ok(MultiVersionRow {
298 key,
299 row,
300 version: self.version,
301 }));
302 }
303 } else if matches!(cmp, Ordering::Equal) {
305 let (key, value) = self.pending_iter.next().unwrap();
307 self.storage_iter.next(); if let PendingWrite::Set(row) = value {
309 return Some(Ok(MultiVersionRow {
310 key,
311 row,
312 version: self.version,
313 }));
314 }
315 } else {
317 return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
319 }
320 }
321 (Some(_), None) => {
322 let (key, value) = self.pending_iter.next().unwrap();
324 if let PendingWrite::Set(row) = value {
325 return Some(Ok(MultiVersionRow {
326 key,
327 row,
328 version: self.version,
329 }));
330 }
331 }
333 (None, Some(_)) => {
334 return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
336 }
337 (None, None) => return None,
338 }
339 }
340 }
341}
342
343fn flow_merge_pending_iterator<I>(
345 pending: Vec<(EncodedKey, PendingWrite)>,
346 storage_iter: I,
347 version: CommitVersion,
348) -> FlowMergePendingIterator<I>
349where
350 I: Iterator<Item = Result<MultiVersionRow>>,
351{
352 FlowMergePendingIterator {
353 storage_iter: storage_iter.peekable(),
354 pending_iter: pending.into_iter().peekable(),
355 version,
356 }
357}
358
359struct FlowMergePendingIteratorRev<I>
361where
362 I: Iterator<Item = Result<MultiVersionRow>>,
363{
364 storage_iter: Peekable<I>,
365 pending_iter: Peekable<IntoIter<(EncodedKey, PendingWrite)>>,
366 version: CommitVersion,
367}
368
369impl<I> Iterator for FlowMergePendingIteratorRev<I>
370where
371 I: Iterator<Item = Result<MultiVersionRow>>,
372{
373 type Item = Result<MultiVersionRow>;
374
375 fn next(&mut self) -> Option<Self::Item> {
376 loop {
377 let next_storage = self.storage_iter.peek();
378
379 match (self.pending_iter.peek(), next_storage) {
380 (Some((pending_key, _)), Some(storage_result)) => {
381 let storage_val = match storage_result {
382 Ok(v) => v,
383 Err(_) => {
384 let err = self.storage_iter.next().unwrap();
386 return Some(err.map_err(|e| e.into()));
387 }
388 };
389 let cmp = pending_key.cmp(&storage_val.key);
390
391 if matches!(cmp, Ordering::Greater) {
392 let (key, value) = self.pending_iter.next().unwrap();
394 if let PendingWrite::Set(row) = value {
395 return Some(Ok(MultiVersionRow {
396 key,
397 row,
398 version: self.version,
399 }));
400 }
401 } else if matches!(cmp, Ordering::Equal) {
403 let (key, value) = self.pending_iter.next().unwrap();
405 self.storage_iter.next(); if let PendingWrite::Set(row) = value {
407 return Some(Ok(MultiVersionRow {
408 key,
409 row,
410 version: self.version,
411 }));
412 }
413 } else {
415 return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
417 }
418 }
419 (Some(_), None) => {
420 let (key, value) = self.pending_iter.next().unwrap();
422 if let PendingWrite::Set(row) = value {
423 return Some(Ok(MultiVersionRow {
424 key,
425 row,
426 version: self.version,
427 }));
428 }
429 }
431 (None, Some(_)) => {
432 return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
434 }
435 (None, None) => return None,
436 }
437 }
438 }
439}
440
441fn flow_merge_pending_iterator_rev<I>(
443 pending: Vec<(EncodedKey, PendingWrite)>,
444 storage_iter: I,
445 version: CommitVersion,
446) -> FlowMergePendingIteratorRev<I>
447where
448 I: Iterator<Item = Result<MultiVersionRow>>,
449{
450 FlowMergePendingIteratorRev {
451 storage_iter: storage_iter.peekable(),
452 pending_iter: pending.into_iter().peekable(),
453 version,
454 }
455}
456
457#[cfg(test)]
458pub mod tests {
459 use reifydb_catalog::catalog::Catalog;
460 use reifydb_core::encoded::{
461 key::{EncodedKey, EncodedKeyRange},
462 row::EncodedRow,
463 };
464 use reifydb_engine::test_harness::TestEngine;
465 use reifydb_transaction::interceptor::interceptors::Interceptors;
466 use reifydb_type::{util::cowvec::CowVec, value::identity::IdentityId};
467
468 use super::*;
469 use crate::operator::stateful::test_utils::test::create_test_transaction;
470
471 fn make_key(s: &str) -> EncodedKey {
472 EncodedKey::new(s.as_bytes().to_vec())
473 }
474
475 fn make_value(s: &str) -> EncodedRow {
476 EncodedRow(CowVec::new(s.as_bytes().to_vec()))
477 }
478
479 #[test]
480 fn test_get_from_pending() {
481 let parent = create_test_transaction();
482 let mut txn =
483 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
484
485 let key = make_key("key1");
486 let value = make_value("value1");
487
488 txn.set(&key, value.clone()).unwrap();
489
490 let result = txn.get(&key).unwrap();
492 assert_eq!(result, Some(value));
493 }
494
495 #[test]
496 fn test_get_from_committed() {
497 let t = TestEngine::new();
498
499 let key = make_key("key1");
500 let value = make_value("value1");
501
502 {
504 let mut cmd_txn = t.begin_admin(IdentityId::system()).unwrap();
505 cmd_txn.set(&key, value.clone()).unwrap();
506 cmd_txn.commit().unwrap();
507 }
508
509 let parent = t.begin_admin(IdentityId::system()).unwrap();
511 let version = parent.version();
512
513 let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
515
516 let result = txn.get(&key).unwrap();
518 assert_eq!(result, Some(value));
519 }
520
521 #[test]
522 fn test_get_pending_shadows_committed() {
523 let mut parent = create_test_transaction();
524
525 let key = make_key("key1");
526 parent.set(&key, make_value("old")).unwrap();
527 let version = parent.version();
528
529 let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
530
531 let new_value = make_value("new");
533 txn.set(&key, new_value.clone()).unwrap();
534
535 let result = txn.get(&key).unwrap();
537 assert_eq!(result, Some(new_value));
538 }
539
540 #[test]
541 fn test_get_removed_returns_none() {
542 let mut parent = create_test_transaction();
543
544 let key = make_key("key1");
545 parent.set(&key, make_value("value1")).unwrap();
546 let version = parent.version();
547
548 let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
549
550 txn.remove(&key).unwrap();
552
553 let result = txn.get(&key).unwrap();
555 assert_eq!(result, None);
556 }
557
558 #[test]
559 fn test_get_nonexistent_key() {
560 let parent = create_test_transaction();
561 let mut txn =
562 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
563
564 let result = txn.get(&make_key("missing")).unwrap();
565 assert_eq!(result, None);
566 }
567
568 #[test]
569 fn test_contains_key_pending() {
570 let parent = create_test_transaction();
571 let mut txn =
572 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
573
574 let key = make_key("key1");
575 txn.set(&key, make_value("value1")).unwrap();
576
577 assert!(txn.contains_key(&key).unwrap());
578 }
579
580 #[test]
581 fn test_contains_key_committed() {
582 let t = TestEngine::new();
583
584 let key = make_key("key1");
585
586 {
588 let mut cmd_txn = t.begin_admin(IdentityId::system()).unwrap();
589 cmd_txn.set(&key, make_value("value1")).unwrap();
590 cmd_txn.commit().unwrap();
591 }
592
593 let parent = t.begin_admin(IdentityId::system()).unwrap();
595 let version = parent.version();
596 let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
597
598 assert!(txn.contains_key(&key).unwrap());
599 }
600
601 #[test]
602 fn test_contains_key_removed_returns_false() {
603 let mut parent = create_test_transaction();
604
605 let key = make_key("key1");
606 parent.set(&key, make_value("value1")).unwrap();
607 let version = parent.version();
608
609 let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
610 txn.remove(&key).unwrap();
611
612 assert!(!txn.contains_key(&key).unwrap());
613 }
614
615 #[test]
616 fn test_contains_key_nonexistent() {
617 let parent = create_test_transaction();
618 let mut txn =
619 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
620
621 assert!(!txn.contains_key(&make_key("missing")).unwrap());
622 }
623
624 #[test]
625 fn test_scan_empty() {
626 let parent = create_test_transaction();
627 let mut txn =
628 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
629
630 let mut iter = txn.range(EncodedKeyRange::all(), 1024);
631 assert!(iter.next().is_none());
632 }
633
634 #[test]
635 fn test_scan_only_pending() {
636 let parent = create_test_transaction();
637 let mut txn =
638 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
639
640 txn.set(&make_key("b"), make_value("2")).unwrap();
641 txn.set(&make_key("a"), make_value("1")).unwrap();
642 txn.set(&make_key("c"), make_value("3")).unwrap();
643
644 let items: Vec<_> = txn.range(EncodedKeyRange::all(), 1024).collect::<Result<Vec<_>>>().unwrap();
645
646 assert_eq!(items.len(), 3);
648 assert_eq!(items[0].key, make_key("a"));
649 assert_eq!(items[1].key, make_key("b"));
650 assert_eq!(items[2].key, make_key("c"));
651 }
652
653 #[test]
654 fn test_scan_filters_removes() {
655 let parent = create_test_transaction();
656 let mut txn =
657 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
658
659 txn.set(&make_key("a"), make_value("1")).unwrap();
660 txn.remove(&make_key("b")).unwrap();
661 txn.set(&make_key("c"), make_value("3")).unwrap();
662
663 let items: Vec<_> = txn.range(EncodedKeyRange::all(), 1024).collect::<Result<Vec<_>>>().unwrap();
664
665 assert_eq!(items.len(), 2);
667 assert_eq!(items[0].key, make_key("a"));
668 assert_eq!(items[1].key, make_key("c"));
669 }
670
671 #[test]
672 fn test_range_empty() {
673 let parent = create_test_transaction();
674 let mut txn =
675 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
676
677 let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
678 let mut iter = txn.range(range, 1024);
679 assert!(iter.next().is_none());
680 }
681
682 #[test]
683 fn test_range_only_pending() {
684 let parent = create_test_transaction();
685 let mut txn =
686 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
687
688 txn.set(&make_key("a"), make_value("1")).unwrap();
689 txn.set(&make_key("b"), make_value("2")).unwrap();
690 txn.set(&make_key("c"), make_value("3")).unwrap();
691 txn.set(&make_key("d"), make_value("4")).unwrap();
692
693 let range = EncodedKeyRange::new(Included(make_key("b")), Excluded(make_key("d")));
694 let items: Vec<_> = txn.range(range, 1024).collect::<Result<Vec<_>>>().unwrap();
695
696 assert_eq!(items.len(), 2);
698 assert_eq!(items[0].key, make_key("b"));
699 assert_eq!(items[1].key, make_key("c"));
700 }
701
702 #[test]
703 fn test_prefix_empty() {
704 let parent = create_test_transaction();
705 let mut txn =
706 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
707
708 let prefix = make_key("test_");
709 let iter = txn.prefix(&prefix).unwrap();
710 assert!(iter.items.into_iter().next().is_none());
711 }
712
713 #[test]
714 fn test_prefix_only_pending() {
715 let parent = create_test_transaction();
716 let mut txn =
717 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
718
719 txn.set(&make_key("test_a"), make_value("1")).unwrap();
720 txn.set(&make_key("test_b"), make_value("2")).unwrap();
721 txn.set(&make_key("other_c"), make_value("3")).unwrap();
722
723 let prefix = make_key("test_");
724 let iter = txn.prefix(&prefix).unwrap();
725 let items: Vec<_> = iter.items.into_iter().collect();
726
727 assert_eq!(items.len(), 2);
729 assert_eq!(items[0].key, make_key("test_a"));
730 assert_eq!(items[1].key, make_key("test_b"));
731 }
732}