1use std::{
5 cmp::Ordering,
6 collections, iter,
7 ops::{
8 Bound::{Excluded, Included, Unbounded},
9 RangeBounds,
10 },
11 vec,
12};
13
14use collections::BTreeMap;
15use iter::Peekable;
16use reifydb_core::{
17 actors::pending::PendingWrite,
18 common::CommitVersion,
19 encoded::{
20 key::{EncodedKey, EncodedKeyRange},
21 row::EncodedRow,
22 },
23 interface::store::{MultiVersionBatch, MultiVersionRow},
24 key::{Key, kind::KeyKind},
25};
26use reifydb_type::Result;
27use vec::IntoIter;
28
29use super::FlowTransaction;
30
31pub(crate) enum ReadFrom {
33 StateQuery,
37 Query,
41}
42
43impl FlowTransaction {
44 pub fn get(&mut self, key: &EncodedKey) -> Result<Option<EncodedRow>> {
47 let inner = self.inner();
49 if inner.pending.is_removed(key) {
50 return Ok(None);
51 }
52 if let Some(value) = inner.pending.get(key) {
53 return Ok(Some(value.clone()));
54 }
55
56 if let Self::Transactional {
58 base_pending,
59 ..
60 } = self
61 {
62 if base_pending.is_removed(key) {
63 return Ok(None);
64 }
65 if let Some(value) = base_pending.get(key) {
66 return Ok(Some(value.clone()));
67 }
68 }
69
70 if let Self::Ephemeral {
72 inner,
73 state,
74 } = self
75 {
76 return match Self::read_from(key) {
77 ReadFrom::StateQuery => Ok(state.get(key).cloned()),
78 ReadFrom::Query => match inner.query.get(key)? {
79 Some(multi) => Ok(Some(multi.row().clone())),
80 None => Ok(None),
81 },
82 };
83 }
84
85 let inner = self.inner_mut();
87 let query = match Self::read_from(key) {
88 ReadFrom::StateQuery => inner.state_query.as_ref().unwrap(),
89 ReadFrom::Query => &inner.query,
90 };
91 match query.get(key)? {
92 Some(multi) => Ok(Some(multi.row().clone())),
93 None => Ok(None),
94 }
95 }
96
97 pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
99 let inner = self.inner();
100 if inner.pending.is_removed(key) {
101 return Ok(false);
102 }
103 if inner.pending.get(key).is_some() {
104 return Ok(true);
105 }
106
107 if let Self::Transactional {
108 base_pending,
109 ..
110 } = self
111 {
112 if base_pending.is_removed(key) {
113 return Ok(false);
114 }
115 if base_pending.get(key).is_some() {
116 return Ok(true);
117 }
118 }
119
120 if let Self::Ephemeral {
122 inner,
123 state,
124 } = self
125 {
126 return match Self::read_from(key) {
127 ReadFrom::StateQuery => Ok(state.contains_key(key)),
128 ReadFrom::Query => inner.query.contains_key(key),
129 };
130 }
131
132 let inner = self.inner_mut();
133 let query = match Self::read_from(key) {
134 ReadFrom::StateQuery => inner.state_query.as_ref().unwrap(),
135 ReadFrom::Query => &inner.query,
136 };
137 query.contains_key(key)
138 }
139
140 pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
142 let range = EncodedKeyRange::prefix(prefix);
143 let items = self.range(range, 1024).collect::<Result<Vec<_>>>()?;
144 Ok(MultiVersionBatch {
145 items,
146 has_more: false,
147 })
148 }
149
150 pub(crate) fn read_from(key: &EncodedKey) -> ReadFrom {
151 match Key::kind(key) {
152 None => ReadFrom::Query,
153 Some(kind) => match kind {
154 KeyKind::FlowNodeState => ReadFrom::StateQuery,
157 KeyKind::FlowNodeInternalState => ReadFrom::StateQuery,
158 KeyKind::RingBufferMetadata => ReadFrom::StateQuery,
159 KeyKind::SeriesMetadata => ReadFrom::StateQuery,
160
161 KeyKind::Row => ReadFrom::Query,
169
170 KeyKind::Namespace => ReadFrom::Query,
172 KeyKind::Table => ReadFrom::Query,
173 KeyKind::NamespaceTable => ReadFrom::Query,
174 KeyKind::SystemSequence => ReadFrom::Query,
175 KeyKind::Columns => ReadFrom::Query,
176 KeyKind::Column => ReadFrom::Query,
177 KeyKind::RowSequence => ReadFrom::Query,
178 KeyKind::ColumnProperty => ReadFrom::Query,
179 KeyKind::SystemVersion => ReadFrom::Query,
180 KeyKind::TransactionVersion => ReadFrom::Query,
181 KeyKind::Index => ReadFrom::Query,
182 KeyKind::IndexEntry => ReadFrom::Query,
183 KeyKind::ColumnSequence => ReadFrom::Query,
184 KeyKind::CdcConsumer => ReadFrom::Query,
185 KeyKind::View => ReadFrom::Query,
186 KeyKind::NamespaceView => ReadFrom::Query,
187 KeyKind::PrimaryKey => ReadFrom::Query,
188 KeyKind::RingBuffer => ReadFrom::Query,
189 KeyKind::NamespaceRingBuffer => ReadFrom::Query,
190 KeyKind::ShapeRetentionStrategy => ReadFrom::Query,
191 KeyKind::OperatorRetentionStrategy => ReadFrom::Query,
192 KeyKind::Flow => ReadFrom::Query,
193 KeyKind::NamespaceFlow => ReadFrom::Query,
194 KeyKind::FlowNode => ReadFrom::Query,
195 KeyKind::FlowNodeByFlow => ReadFrom::Query,
196 KeyKind::FlowEdge => ReadFrom::Query,
197 KeyKind::FlowEdgeByFlow => ReadFrom::Query,
198 KeyKind::Dictionary => ReadFrom::Query,
199 KeyKind::DictionaryEntry => ReadFrom::Query,
200 KeyKind::DictionaryEntryIndex => ReadFrom::Query,
201 KeyKind::NamespaceDictionary => ReadFrom::Query,
202 KeyKind::DictionarySequence => ReadFrom::Query,
203 KeyKind::Metric => ReadFrom::Query,
204 KeyKind::FlowVersion => ReadFrom::Query,
205 KeyKind::Subscription => ReadFrom::Query,
206 KeyKind::SubscriptionRow => ReadFrom::Query,
207 KeyKind::SubscriptionColumn => ReadFrom::Query,
208 KeyKind::Shape => ReadFrom::Query,
209 KeyKind::RowShapeField => ReadFrom::Query,
210 KeyKind::SumType => ReadFrom::Query,
211 KeyKind::NamespaceSumType => ReadFrom::Query,
212 KeyKind::Handler => ReadFrom::Query,
213 KeyKind::NamespaceHandler => ReadFrom::Query,
214 KeyKind::VariantHandler => ReadFrom::Query,
215 KeyKind::Series => ReadFrom::Query,
216 KeyKind::NamespaceSeries => ReadFrom::Query,
217 KeyKind::Identity => ReadFrom::Query,
218 KeyKind::Role => ReadFrom::Query,
219 KeyKind::GrantedRole => ReadFrom::Query,
220 KeyKind::Policy => ReadFrom::Query,
221 KeyKind::PolicyOp => ReadFrom::Query,
222 KeyKind::Migration => ReadFrom::Query,
223 KeyKind::MigrationEvent => ReadFrom::Query,
224 KeyKind::Authentication => ReadFrom::Query,
225 KeyKind::ConfigStorage => ReadFrom::Query,
226 KeyKind::Token => ReadFrom::Query,
227 KeyKind::Source => ReadFrom::Query,
228 KeyKind::NamespaceSource => ReadFrom::Query,
229 KeyKind::Sink => ReadFrom::Query,
230 KeyKind::NamespaceSink => ReadFrom::Query,
231 KeyKind::SourceCheckpoint => ReadFrom::Query,
232 KeyKind::RowTtl => ReadFrom::Query,
233 KeyKind::Procedure => ReadFrom::Query,
234 KeyKind::NamespaceProcedure => ReadFrom::Query,
235 KeyKind::ProcedureParam => ReadFrom::Query,
236 KeyKind::Binding => ReadFrom::Query,
237 KeyKind::NamespaceBinding => ReadFrom::Query,
238 },
239 }
240 }
241
242 pub fn range(
249 &mut self,
250 range: EncodedKeyRange,
251 batch_size: usize,
252 ) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_> {
253 match self {
254 Self::Deferred {
255 inner,
256 ..
257 } => {
258 let merged: BTreeMap<EncodedKey, PendingWrite> = inner
259 .pending
260 .range((range.start.as_ref(), range.end.as_ref()))
261 .map(|(k, v)| (k.clone(), v.clone()))
262 .collect();
263 let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().collect();
264
265 let query = match range.start.as_ref() {
266 Included(start) | Excluded(start) => match Self::read_from(start) {
267 ReadFrom::StateQuery => inner.state_query.as_ref().unwrap(),
268 ReadFrom::Query => &inner.query,
269 },
270 Unbounded => &inner.query,
271 };
272
273 let storage_iter = query.range(range, batch_size);
274 let v = inner.version;
275 Box::new(flow_merge_pending_iterator(pending_vec, storage_iter, v))
276 }
277 Self::Transactional {
278 inner,
279 base_pending,
280 ..
281 } => {
282 let mut merged: BTreeMap<EncodedKey, PendingWrite> = base_pending
284 .range((range.start.as_ref(), range.end.as_ref()))
285 .map(|(k, v)| (k.clone(), v.clone()))
286 .collect();
287 for (k, v) in inner.pending.range((range.start.as_ref(), range.end.as_ref())) {
288 merged.insert(k.clone(), v.clone());
289 }
290 let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().collect();
291
292 let query = match range.start.as_ref() {
293 Included(start) | Excluded(start) => match Self::read_from(start) {
294 ReadFrom::StateQuery => inner.state_query.as_ref().unwrap(),
295 ReadFrom::Query => &inner.query,
296 },
297 Unbounded => &inner.query,
298 };
299
300 let storage_iter = query.range(range, batch_size);
301 let v = inner.version;
302 Box::new(flow_merge_pending_iterator(pending_vec, storage_iter, v))
303 }
304 Self::Ephemeral {
305 inner,
306 state,
307 } => {
308 let is_state_range = match range.start.as_ref() {
310 Included(start) | Excluded(start) => {
311 matches!(Self::read_from(start), ReadFrom::StateQuery)
312 }
313 Unbounded => false,
314 };
315
316 let merged: BTreeMap<EncodedKey, PendingWrite> = inner
317 .pending
318 .range((range.start.as_ref(), range.end.as_ref()))
319 .map(|(k, v)| (k.clone(), v.clone()))
320 .collect();
321 let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().collect();
322
323 if is_state_range {
324 let state_items: Vec<Result<MultiVersionRow>> = state
326 .iter()
327 .filter(|(k, _)| range.contains(k))
328 .map(|(k, v)| {
329 Ok(MultiVersionRow {
330 key: k.clone(),
331 row: v.clone(),
332 version: inner.version,
333 })
334 })
335 .collect();
336 let v = inner.version;
337 let mut sorted_items = state_items;
339 sorted_items.sort_by(|a, b| match (a, b) {
340 (Ok(a), Ok(b)) => a.key.cmp(&b.key),
341 _ => Ordering::Equal,
342 });
343 Box::new(flow_merge_pending_iterator(pending_vec, sorted_items.into_iter(), v))
344 } else {
345 let storage_iter = inner.query.range(range, batch_size);
347 let v = inner.version;
348 Box::new(flow_merge_pending_iterator(pending_vec, storage_iter, v))
349 }
350 }
351 }
352 }
353
354 pub fn range_rev(
360 &mut self,
361 range: EncodedKeyRange,
362 batch_size: usize,
363 ) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_> {
364 match self {
365 Self::Deferred {
366 inner,
367 ..
368 } => {
369 let merged: BTreeMap<EncodedKey, PendingWrite> = inner
370 .pending
371 .range((range.start.as_ref(), range.end.as_ref()))
372 .map(|(k, v)| (k.clone(), v.clone()))
373 .collect();
374 let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().rev().collect();
375
376 let query = match range.start.as_ref() {
377 Included(start) | Excluded(start) => match Self::read_from(start) {
378 ReadFrom::StateQuery => inner.state_query.as_ref().unwrap(),
379 ReadFrom::Query => &inner.query,
380 },
381 Unbounded => &inner.query,
382 };
383
384 let storage_iter = query.range_rev(range, batch_size);
385 let v = inner.version;
386 Box::new(flow_merge_pending_iterator_rev(pending_vec, storage_iter, v))
387 }
388 Self::Transactional {
389 inner,
390 base_pending,
391 ..
392 } => {
393 let mut merged: BTreeMap<EncodedKey, PendingWrite> = base_pending
394 .range((range.start.as_ref(), range.end.as_ref()))
395 .map(|(k, v)| (k.clone(), v.clone()))
396 .collect();
397 for (k, v) in inner.pending.range((range.start.as_ref(), range.end.as_ref())) {
398 merged.insert(k.clone(), v.clone());
399 }
400 let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().rev().collect();
401
402 let query = match range.start.as_ref() {
403 Included(start) | Excluded(start) => match Self::read_from(start) {
404 ReadFrom::StateQuery => inner.state_query.as_ref().unwrap(),
405 ReadFrom::Query => &inner.query,
406 },
407 Unbounded => &inner.query,
408 };
409
410 let storage_iter = query.range_rev(range, batch_size);
411 let v = inner.version;
412 Box::new(flow_merge_pending_iterator_rev(pending_vec, storage_iter, v))
413 }
414 Self::Ephemeral {
415 inner,
416 state,
417 } => {
418 let is_state_range = match range.start.as_ref() {
419 Included(start) | Excluded(start) => {
420 matches!(Self::read_from(start), ReadFrom::StateQuery)
421 }
422 Unbounded => false,
423 };
424
425 let merged: BTreeMap<EncodedKey, PendingWrite> = inner
426 .pending
427 .range((range.start.as_ref(), range.end.as_ref()))
428 .map(|(k, v)| (k.clone(), v.clone()))
429 .collect();
430 let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().rev().collect();
431
432 if is_state_range {
433 let mut state_items: Vec<Result<MultiVersionRow>> = state
435 .iter()
436 .filter(|(k, _)| range.contains(k))
437 .map(|(k, v)| {
438 Ok(MultiVersionRow {
439 key: k.clone(),
440 row: v.clone(),
441 version: inner.version,
442 })
443 })
444 .collect();
445 let v = inner.version;
446 state_items.sort_by(|a, b| match (a, b) {
448 (Ok(a), Ok(b)) => b.key.cmp(&a.key),
449 _ => Ordering::Equal,
450 });
451 Box::new(flow_merge_pending_iterator_rev(
452 pending_vec,
453 state_items.into_iter(),
454 v,
455 ))
456 } else {
457 let storage_iter = inner.query.range_rev(range, batch_size);
459 let v = inner.version;
460 Box::new(flow_merge_pending_iterator_rev(pending_vec, storage_iter, v))
461 }
462 }
463 }
464 }
465}
466
467struct FlowMergePendingIterator<I>
469where
470 I: Iterator<Item = Result<MultiVersionRow>>,
471{
472 storage_iter: Peekable<I>,
473 pending_iter: Peekable<IntoIter<(EncodedKey, PendingWrite)>>,
474 version: CommitVersion,
475}
476
477impl<I> Iterator for FlowMergePendingIterator<I>
478where
479 I: Iterator<Item = Result<MultiVersionRow>>,
480{
481 type Item = Result<MultiVersionRow>;
482
483 fn next(&mut self) -> Option<Self::Item> {
484 loop {
485 let next_storage = self.storage_iter.peek();
486
487 match (self.pending_iter.peek(), next_storage) {
488 (Some((pending_key, _)), Some(storage_result)) => {
489 let storage_val = match storage_result {
490 Ok(v) => v,
491 Err(_) => {
492 let err = self.storage_iter.next().unwrap();
494 return Some(err);
495 }
496 };
497 let cmp = pending_key.cmp(&storage_val.key);
498
499 if matches!(cmp, Ordering::Less) {
500 let (key, value) = self.pending_iter.next().unwrap();
502 if let PendingWrite::Set(row) = value {
503 return Some(Ok(MultiVersionRow {
504 key,
505 row,
506 version: self.version,
507 }));
508 }
509 } else if matches!(cmp, Ordering::Equal) {
511 let (key, value) = self.pending_iter.next().unwrap();
513 self.storage_iter.next(); if let PendingWrite::Set(row) = value {
515 return Some(Ok(MultiVersionRow {
516 key,
517 row,
518 version: self.version,
519 }));
520 }
521 } else {
523 return Some(self.storage_iter.next().unwrap());
525 }
526 }
527 (Some(_), None) => {
528 let (key, value) = self.pending_iter.next().unwrap();
530 if let PendingWrite::Set(row) = value {
531 return Some(Ok(MultiVersionRow {
532 key,
533 row,
534 version: self.version,
535 }));
536 }
537 }
539 (None, Some(_)) => {
540 return Some(self.storage_iter.next().unwrap());
542 }
543 (None, None) => return None,
544 }
545 }
546 }
547}
548
549fn flow_merge_pending_iterator<I>(
551 pending: Vec<(EncodedKey, PendingWrite)>,
552 storage_iter: I,
553 version: CommitVersion,
554) -> FlowMergePendingIterator<I>
555where
556 I: Iterator<Item = Result<MultiVersionRow>>,
557{
558 FlowMergePendingIterator {
559 storage_iter: storage_iter.peekable(),
560 pending_iter: pending.into_iter().peekable(),
561 version,
562 }
563}
564
565struct FlowMergePendingIteratorRev<I>
567where
568 I: Iterator<Item = Result<MultiVersionRow>>,
569{
570 storage_iter: Peekable<I>,
571 pending_iter: Peekable<IntoIter<(EncodedKey, PendingWrite)>>,
572 version: CommitVersion,
573}
574
575impl<I> Iterator for FlowMergePendingIteratorRev<I>
576where
577 I: Iterator<Item = Result<MultiVersionRow>>,
578{
579 type Item = Result<MultiVersionRow>;
580
581 fn next(&mut self) -> Option<Self::Item> {
582 loop {
583 let next_storage = self.storage_iter.peek();
584
585 match (self.pending_iter.peek(), next_storage) {
586 (Some((pending_key, _)), Some(storage_result)) => {
587 let storage_val = match storage_result {
588 Ok(v) => v,
589 Err(_) => {
590 let err = self.storage_iter.next().unwrap();
592 return Some(err);
593 }
594 };
595 let cmp = pending_key.cmp(&storage_val.key);
596
597 if matches!(cmp, Ordering::Greater) {
598 let (key, value) = self.pending_iter.next().unwrap();
600 if let PendingWrite::Set(row) = value {
601 return Some(Ok(MultiVersionRow {
602 key,
603 row,
604 version: self.version,
605 }));
606 }
607 } else if matches!(cmp, Ordering::Equal) {
609 let (key, value) = self.pending_iter.next().unwrap();
611 self.storage_iter.next(); if let PendingWrite::Set(row) = value {
613 return Some(Ok(MultiVersionRow {
614 key,
615 row,
616 version: self.version,
617 }));
618 }
619 } else {
621 return Some(self.storage_iter.next().unwrap());
623 }
624 }
625 (Some(_), None) => {
626 let (key, value) = self.pending_iter.next().unwrap();
628 if let PendingWrite::Set(row) = value {
629 return Some(Ok(MultiVersionRow {
630 key,
631 row,
632 version: self.version,
633 }));
634 }
635 }
637 (None, Some(_)) => {
638 return Some(self.storage_iter.next().unwrap());
640 }
641 (None, None) => return None,
642 }
643 }
644 }
645}
646
647fn flow_merge_pending_iterator_rev<I>(
649 pending: Vec<(EncodedKey, PendingWrite)>,
650 storage_iter: I,
651 version: CommitVersion,
652) -> FlowMergePendingIteratorRev<I>
653where
654 I: Iterator<Item = Result<MultiVersionRow>>,
655{
656 FlowMergePendingIteratorRev {
657 storage_iter: storage_iter.peekable(),
658 pending_iter: pending.into_iter().peekable(),
659 version,
660 }
661}
662
663#[cfg(test)]
664pub mod tests {
665 use reifydb_catalog::catalog::Catalog;
666 use reifydb_core::encoded::{
667 key::{EncodedKey, EncodedKeyRange},
668 row::EncodedRow,
669 };
670 use reifydb_engine::test_harness::TestEngine;
671 use reifydb_runtime::context::clock::{Clock, MockClock};
672 use reifydb_transaction::interceptor::interceptors::Interceptors;
673 use reifydb_type::{util::cowvec::CowVec, value::identity::IdentityId};
674
675 use super::*;
676 use crate::operator::stateful::test_utils::test::create_test_transaction;
677
678 fn make_key(s: &str) -> EncodedKey {
679 EncodedKey::new(s.as_bytes().to_vec())
680 }
681
682 fn make_value(s: &str) -> EncodedRow {
683 EncodedRow(CowVec::new(s.as_bytes().to_vec()))
684 }
685
686 #[test]
687 fn test_get_from_pending() {
688 let parent = create_test_transaction();
689 let mut txn = FlowTransaction::deferred(
690 &parent,
691 CommitVersion(1),
692 Catalog::testing(),
693 Interceptors::new(),
694 Clock::Mock(MockClock::from_millis(1000)),
695 );
696
697 let key = make_key("key1");
698 let value = make_value("value1");
699
700 txn.set(&key, value.clone()).unwrap();
701
702 let result = txn.get(&key).unwrap();
704 assert_eq!(result, Some(value));
705 }
706
707 #[test]
708 fn test_get_from_committed() {
709 let t = TestEngine::new();
710
711 let key = make_key("key1");
712 let value = make_value("value1");
713
714 {
716 let mut cmd_txn = t.begin_admin(IdentityId::system()).unwrap();
717 cmd_txn.set(&key, value.clone()).unwrap();
718 cmd_txn.commit().unwrap();
719 }
720
721 let parent = t.begin_admin(IdentityId::system()).unwrap();
723 let version = parent.version();
724
725 let mut txn = FlowTransaction::deferred(
727 &parent,
728 version,
729 Catalog::testing(),
730 Interceptors::new(),
731 Clock::Mock(MockClock::from_millis(1000)),
732 );
733
734 let result = txn.get(&key).unwrap();
736 assert_eq!(result, Some(value));
737 }
738
739 #[test]
740 fn test_get_pending_shadows_committed() {
741 let mut parent = create_test_transaction();
742
743 let key = make_key("key1");
744 parent.set(&key, make_value("old")).unwrap();
745 let version = parent.version();
746
747 let mut txn = FlowTransaction::deferred(
748 &parent,
749 version,
750 Catalog::testing(),
751 Interceptors::new(),
752 Clock::Mock(MockClock::from_millis(1000)),
753 );
754
755 let new_value = make_value("new");
757 txn.set(&key, new_value.clone()).unwrap();
758
759 let result = txn.get(&key).unwrap();
761 assert_eq!(result, Some(new_value));
762 }
763
764 #[test]
765 fn test_get_removed_returns_none() {
766 let mut parent = create_test_transaction();
767
768 let key = make_key("key1");
769 parent.set(&key, make_value("value1")).unwrap();
770 let version = parent.version();
771
772 let mut txn = FlowTransaction::deferred(
773 &parent,
774 version,
775 Catalog::testing(),
776 Interceptors::new(),
777 Clock::Mock(MockClock::from_millis(1000)),
778 );
779
780 txn.remove(&key).unwrap();
782
783 let result = txn.get(&key).unwrap();
785 assert_eq!(result, None);
786 }
787
788 #[test]
789 fn test_get_nonexistent_key() {
790 let parent = create_test_transaction();
791 let mut txn = FlowTransaction::deferred(
792 &parent,
793 CommitVersion(1),
794 Catalog::testing(),
795 Interceptors::new(),
796 Clock::Mock(MockClock::from_millis(1000)),
797 );
798
799 let result = txn.get(&make_key("missing")).unwrap();
800 assert_eq!(result, None);
801 }
802
803 #[test]
804 fn test_contains_key_pending() {
805 let parent = create_test_transaction();
806 let mut txn = FlowTransaction::deferred(
807 &parent,
808 CommitVersion(1),
809 Catalog::testing(),
810 Interceptors::new(),
811 Clock::Mock(MockClock::from_millis(1000)),
812 );
813
814 let key = make_key("key1");
815 txn.set(&key, make_value("value1")).unwrap();
816
817 assert!(txn.contains_key(&key).unwrap());
818 }
819
820 #[test]
821 fn test_contains_key_committed() {
822 let t = TestEngine::new();
823
824 let key = make_key("key1");
825
826 {
828 let mut cmd_txn = t.begin_admin(IdentityId::system()).unwrap();
829 cmd_txn.set(&key, make_value("value1")).unwrap();
830 cmd_txn.commit().unwrap();
831 }
832
833 let parent = t.begin_admin(IdentityId::system()).unwrap();
835 let version = parent.version();
836 let mut txn = FlowTransaction::deferred(
837 &parent,
838 version,
839 Catalog::testing(),
840 Interceptors::new(),
841 Clock::Mock(MockClock::from_millis(1000)),
842 );
843
844 assert!(txn.contains_key(&key).unwrap());
845 }
846
847 #[test]
848 fn test_contains_key_removed_returns_false() {
849 let mut parent = create_test_transaction();
850
851 let key = make_key("key1");
852 parent.set(&key, make_value("value1")).unwrap();
853 let version = parent.version();
854
855 let mut txn = FlowTransaction::deferred(
856 &parent,
857 version,
858 Catalog::testing(),
859 Interceptors::new(),
860 Clock::Mock(MockClock::from_millis(1000)),
861 );
862 txn.remove(&key).unwrap();
863
864 assert!(!txn.contains_key(&key).unwrap());
865 }
866
867 #[test]
868 fn test_contains_key_nonexistent() {
869 let parent = create_test_transaction();
870 let mut txn = FlowTransaction::deferred(
871 &parent,
872 CommitVersion(1),
873 Catalog::testing(),
874 Interceptors::new(),
875 Clock::Mock(MockClock::from_millis(1000)),
876 );
877
878 assert!(!txn.contains_key(&make_key("missing")).unwrap());
879 }
880
881 #[test]
882 fn test_scan_empty() {
883 let parent = create_test_transaction();
884 let mut txn = FlowTransaction::deferred(
885 &parent,
886 CommitVersion(1),
887 Catalog::testing(),
888 Interceptors::new(),
889 Clock::Mock(MockClock::from_millis(1000)),
890 );
891
892 let mut iter = txn.range(EncodedKeyRange::all(), 1024);
893 assert!(iter.next().is_none());
894 }
895
896 #[test]
897 fn test_scan_only_pending() {
898 let parent = create_test_transaction();
899 let mut txn = FlowTransaction::deferred(
900 &parent,
901 CommitVersion(1),
902 Catalog::testing(),
903 Interceptors::new(),
904 Clock::Mock(MockClock::from_millis(1000)),
905 );
906
907 txn.set(&make_key("b"), make_value("2")).unwrap();
908 txn.set(&make_key("a"), make_value("1")).unwrap();
909 txn.set(&make_key("c"), make_value("3")).unwrap();
910
911 let items: Vec<_> = txn.range(EncodedKeyRange::all(), 1024).collect::<Result<Vec<_>>>().unwrap();
912
913 assert_eq!(items.len(), 3);
915 assert_eq!(items[0].key, make_key("a"));
916 assert_eq!(items[1].key, make_key("b"));
917 assert_eq!(items[2].key, make_key("c"));
918 }
919
920 #[test]
921 fn test_scan_filters_removes() {
922 let parent = create_test_transaction();
923 let mut txn = FlowTransaction::deferred(
924 &parent,
925 CommitVersion(1),
926 Catalog::testing(),
927 Interceptors::new(),
928 Clock::Mock(MockClock::from_millis(1000)),
929 );
930
931 txn.set(&make_key("a"), make_value("1")).unwrap();
932 txn.remove(&make_key("b")).unwrap();
933 txn.set(&make_key("c"), make_value("3")).unwrap();
934
935 let items: Vec<_> = txn.range(EncodedKeyRange::all(), 1024).collect::<Result<Vec<_>>>().unwrap();
936
937 assert_eq!(items.len(), 2);
939 assert_eq!(items[0].key, make_key("a"));
940 assert_eq!(items[1].key, make_key("c"));
941 }
942
943 #[test]
944 fn test_range_empty() {
945 let parent = create_test_transaction();
946 let mut txn = FlowTransaction::deferred(
947 &parent,
948 CommitVersion(1),
949 Catalog::testing(),
950 Interceptors::new(),
951 Clock::Mock(MockClock::from_millis(1000)),
952 );
953
954 let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
955 let mut iter = txn.range(range, 1024);
956 assert!(iter.next().is_none());
957 }
958
959 #[test]
960 fn test_range_only_pending() {
961 let parent = create_test_transaction();
962 let mut txn = FlowTransaction::deferred(
963 &parent,
964 CommitVersion(1),
965 Catalog::testing(),
966 Interceptors::new(),
967 Clock::Mock(MockClock::from_millis(1000)),
968 );
969
970 txn.set(&make_key("a"), make_value("1")).unwrap();
971 txn.set(&make_key("b"), make_value("2")).unwrap();
972 txn.set(&make_key("c"), make_value("3")).unwrap();
973 txn.set(&make_key("d"), make_value("4")).unwrap();
974
975 let range = EncodedKeyRange::new(Included(make_key("b")), Excluded(make_key("d")));
976 let items: Vec<_> = txn.range(range, 1024).collect::<Result<Vec<_>>>().unwrap();
977
978 assert_eq!(items.len(), 2);
980 assert_eq!(items[0].key, make_key("b"));
981 assert_eq!(items[1].key, make_key("c"));
982 }
983
984 #[test]
985 fn test_prefix_empty() {
986 let parent = create_test_transaction();
987 let mut txn = FlowTransaction::deferred(
988 &parent,
989 CommitVersion(1),
990 Catalog::testing(),
991 Interceptors::new(),
992 Clock::Mock(MockClock::from_millis(1000)),
993 );
994
995 let prefix = make_key("test_");
996 let iter = txn.prefix(&prefix).unwrap();
997 assert!(iter.items.into_iter().next().is_none());
998 }
999
1000 #[test]
1001 fn test_prefix_only_pending() {
1002 let parent = create_test_transaction();
1003 let mut txn = FlowTransaction::deferred(
1004 &parent,
1005 CommitVersion(1),
1006 Catalog::testing(),
1007 Interceptors::new(),
1008 Clock::Mock(MockClock::from_millis(1000)),
1009 );
1010
1011 txn.set(&make_key("test_a"), make_value("1")).unwrap();
1012 txn.set(&make_key("test_b"), make_value("2")).unwrap();
1013 txn.set(&make_key("other_c"), make_value("3")).unwrap();
1014
1015 let prefix = make_key("test_");
1016 let iter = txn.prefix(&prefix).unwrap();
1017 let items: Vec<_> = iter.items.into_iter().collect();
1018
1019 assert_eq!(items.len(), 2);
1021 assert_eq!(items[0].key, make_key("test_a"));
1022 assert_eq!(items[1].key, make_key("test_b"));
1023 }
1024}