reifydb_sub_flow/transaction/
write.rs1use reifydb_core::encoded::{key::EncodedKey, row::EncodedRow};
5use reifydb_type::Result;
6
7use super::FlowTransaction;
8
9impl FlowTransaction {
10 pub fn set(&mut self, key: &EncodedKey, value: EncodedRow) -> Result<()> {
12 self.inner_mut().pending.insert(key.clone(), value);
13 Ok(())
14 }
15
16 pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
18 self.inner_mut().pending.remove(key.clone());
19 Ok(())
20 }
21}
22
23#[cfg(test)]
24pub mod tests {
25 use reifydb_catalog::catalog::Catalog;
26 use reifydb_core::{
27 common::CommitVersion,
28 encoded::{key::EncodedKey, row::EncodedRow},
29 };
30 use reifydb_transaction::{interceptor::interceptors::Interceptors, transaction::admin::AdminTransaction};
31 use reifydb_type::util::cowvec::CowVec;
32
33 use super::*;
34 use crate::operator::stateful::test_utils::test::create_test_transaction;
35
36 fn make_key(s: &str) -> EncodedKey {
37 EncodedKey::new(s.as_bytes().to_vec())
38 }
39
40 fn make_value(s: &str) -> EncodedRow {
41 EncodedRow(CowVec::new(s.as_bytes().to_vec()))
42 }
43
44 fn get_row(parent: &mut AdminTransaction, key: &EncodedKey) -> Option<EncodedRow> {
45 parent.get(key).unwrap().map(|m| m.row.clone())
46 }
47
48 #[test]
49 fn test_set_buffers_to_pending() {
50 let parent = create_test_transaction();
51 let mut txn =
52 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
53
54 let key = make_key("key1");
55 let value = make_value("value1");
56
57 txn.set(&key, value.clone()).unwrap();
58
59 assert_eq!(txn.pending().get(&key), Some(&value));
61 }
62
63 #[test]
64 fn test_set_multiple_keys() {
65 let parent = create_test_transaction();
66 let mut txn =
67 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
68
69 txn.set(&make_key("key1"), make_value("value1")).unwrap();
70 txn.set(&make_key("key2"), make_value("value2")).unwrap();
71 txn.set(&make_key("key3"), make_value("value3")).unwrap();
72
73 assert_eq!(txn.pending().get(&make_key("key1")), Some(&make_value("value1")));
74 assert_eq!(txn.pending().get(&make_key("key2")), Some(&make_value("value2")));
75 assert_eq!(txn.pending().get(&make_key("key3")), Some(&make_value("value3")));
76 }
77
78 #[test]
79 fn test_set_overwrites_same_key() {
80 let parent = create_test_transaction();
81 let mut txn =
82 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
83
84 let key = make_key("key1");
85 txn.set(&key, make_value("value1")).unwrap();
86 txn.set(&key, make_value("value2")).unwrap();
87
88 assert_eq!(txn.pending().get(&key), Some(&make_value("value2")));
90 }
91
92 #[test]
93 fn test_remove_buffers_to_pending() {
94 let parent = create_test_transaction();
95 let mut txn =
96 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
97
98 let key = make_key("key1");
99 txn.remove(&key).unwrap();
100
101 assert!(txn.pending().is_removed(&key));
103 }
104
105 #[test]
106 fn test_remove_multiple_keys() {
107 let parent = create_test_transaction();
108 let mut txn =
109 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
110
111 txn.remove(&make_key("key1")).unwrap();
112 txn.remove(&make_key("key2")).unwrap();
113 txn.remove(&make_key("key3")).unwrap();
114
115 assert!(txn.pending().is_removed(&make_key("key1")));
116 assert!(txn.pending().is_removed(&make_key("key2")));
117 assert!(txn.pending().is_removed(&make_key("key3")));
118 }
119
120 #[test]
121 fn test_set_then_remove() {
122 let parent = create_test_transaction();
123 let mut txn =
124 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
125
126 let key = make_key("key1");
127 txn.set(&key, make_value("value1")).unwrap();
128 assert_eq!(txn.pending().get(&key), Some(&make_value("value1")));
129
130 txn.remove(&key).unwrap();
131 assert!(txn.pending().is_removed(&key));
132 assert_eq!(txn.pending().get(&key), None);
133 }
134
135 #[test]
136 fn test_remove_then_set() {
137 let parent = create_test_transaction();
138 let mut txn =
139 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
140
141 let key = make_key("key1");
142 txn.remove(&key).unwrap();
143 assert!(txn.pending().is_removed(&key));
144
145 txn.set(&key, make_value("value1")).unwrap();
146 assert!(!txn.pending().is_removed(&key));
147 assert_eq!(txn.pending().get(&key), Some(&make_value("value1")));
148 }
149
150 #[test]
151 fn test_writes_not_visible_to_parent() {
152 let mut parent = create_test_transaction();
153 let mut txn =
154 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
155
156 let key = make_key("key1");
157 let value = make_value("value1");
158
159 txn.set(&key, value.clone()).unwrap();
161
162 assert_eq!(get_row(&mut parent, &key), None);
164 }
165
166 #[test]
167 fn test_removes_not_visible_to_parent() {
168 let mut parent = create_test_transaction();
169
170 let key = make_key("key1");
172 let value = make_value("value1");
173 parent.set(&key, value.clone()).unwrap();
174 assert_eq!(get_row(&mut parent, &key), Some(value.clone()));
175
176 let parent_version = parent.version();
178 let mut txn =
179 FlowTransaction::deferred(&parent, parent_version, Catalog::testing(), Interceptors::new());
180 txn.remove(&key).unwrap();
181
182 assert_eq!(get_row(&mut parent, &key), Some(value));
184 }
185
186 #[test]
187 fn test_mixed_writes_and_removes() {
188 let parent = create_test_transaction();
189 let mut txn =
190 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
191
192 txn.set(&make_key("write1"), make_value("v1")).unwrap();
193 txn.remove(&make_key("remove1")).unwrap();
194 txn.set(&make_key("write2"), make_value("v2")).unwrap();
195 txn.remove(&make_key("remove2")).unwrap();
196
197 assert_eq!(txn.pending().get(&make_key("write1")), Some(&make_value("v1")));
198 assert_eq!(txn.pending().get(&make_key("write2")), Some(&make_value("v2")));
199 assert!(txn.pending().is_removed(&make_key("remove1")));
200 assert!(txn.pending().is_removed(&make_key("remove2")));
201 }
202}