reifydb_sub_flow/transaction/
write.rs1use reifydb_core::encoded::{key::EncodedKey, row::EncodedRow};
5use reifydb_type::Result;
6
7use super::FlowTransaction;
8
9impl FlowTransaction {
10 pub fn set(&mut self, key: &EncodedKey, value: EncodedRow) -> Result<()> {
12 self.inner_mut().pending.insert(key.clone(), value);
13 Ok(())
14 }
15
16 pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
18 self.inner_mut().pending.remove(key.clone());
19 Ok(())
20 }
21}
22
23#[cfg(test)]
24pub mod tests {
25 use reifydb_catalog::catalog::Catalog;
26 use reifydb_core::{
27 common::CommitVersion,
28 encoded::{key::EncodedKey, row::EncodedRow},
29 };
30 use reifydb_runtime::context::clock::{Clock, MockClock};
31 use reifydb_transaction::{interceptor::interceptors::Interceptors, transaction::admin::AdminTransaction};
32 use reifydb_type::util::cowvec::CowVec;
33
34 use super::*;
35 use crate::operator::stateful::test_utils::test::create_test_transaction;
36
37 fn make_key(s: &str) -> EncodedKey {
38 EncodedKey::new(s.as_bytes().to_vec())
39 }
40
41 fn make_value(s: &str) -> EncodedRow {
42 EncodedRow(CowVec::new(s.as_bytes().to_vec()))
43 }
44
45 fn get_row(parent: &mut AdminTransaction, key: &EncodedKey) -> Option<EncodedRow> {
46 parent.get(key).unwrap().map(|m| m.row.clone())
47 }
48
49 #[test]
50 fn test_set_buffers_to_pending() {
51 let parent = create_test_transaction();
52 let mut txn = FlowTransaction::deferred(
53 &parent,
54 CommitVersion(1),
55 Catalog::testing(),
56 Interceptors::new(),
57 Clock::Mock(MockClock::from_millis(1000)),
58 );
59
60 let key = make_key("key1");
61 let value = make_value("value1");
62
63 txn.set(&key, value.clone()).unwrap();
64
65 assert_eq!(txn.pending().get(&key), Some(&value));
67 }
68
69 #[test]
70 fn test_set_multiple_keys() {
71 let parent = create_test_transaction();
72 let mut txn = FlowTransaction::deferred(
73 &parent,
74 CommitVersion(1),
75 Catalog::testing(),
76 Interceptors::new(),
77 Clock::Mock(MockClock::from_millis(1000)),
78 );
79
80 txn.set(&make_key("key1"), make_value("value1")).unwrap();
81 txn.set(&make_key("key2"), make_value("value2")).unwrap();
82 txn.set(&make_key("key3"), make_value("value3")).unwrap();
83
84 assert_eq!(txn.pending().get(&make_key("key1")), Some(&make_value("value1")));
85 assert_eq!(txn.pending().get(&make_key("key2")), Some(&make_value("value2")));
86 assert_eq!(txn.pending().get(&make_key("key3")), Some(&make_value("value3")));
87 }
88
89 #[test]
90 fn test_set_overwrites_same_key() {
91 let parent = create_test_transaction();
92 let mut txn = FlowTransaction::deferred(
93 &parent,
94 CommitVersion(1),
95 Catalog::testing(),
96 Interceptors::new(),
97 Clock::Mock(MockClock::from_millis(1000)),
98 );
99
100 let key = make_key("key1");
101 txn.set(&key, make_value("value1")).unwrap();
102 txn.set(&key, make_value("value2")).unwrap();
103
104 assert_eq!(txn.pending().get(&key), Some(&make_value("value2")));
106 }
107
108 #[test]
109 fn test_remove_buffers_to_pending() {
110 let parent = create_test_transaction();
111 let mut txn = FlowTransaction::deferred(
112 &parent,
113 CommitVersion(1),
114 Catalog::testing(),
115 Interceptors::new(),
116 Clock::Mock(MockClock::from_millis(1000)),
117 );
118
119 let key = make_key("key1");
120 txn.remove(&key).unwrap();
121
122 assert!(txn.pending().is_removed(&key));
124 }
125
126 #[test]
127 fn test_remove_multiple_keys() {
128 let parent = create_test_transaction();
129 let mut txn = FlowTransaction::deferred(
130 &parent,
131 CommitVersion(1),
132 Catalog::testing(),
133 Interceptors::new(),
134 Clock::Mock(MockClock::from_millis(1000)),
135 );
136
137 txn.remove(&make_key("key1")).unwrap();
138 txn.remove(&make_key("key2")).unwrap();
139 txn.remove(&make_key("key3")).unwrap();
140
141 assert!(txn.pending().is_removed(&make_key("key1")));
142 assert!(txn.pending().is_removed(&make_key("key2")));
143 assert!(txn.pending().is_removed(&make_key("key3")));
144 }
145
146 #[test]
147 fn test_set_then_remove() {
148 let parent = create_test_transaction();
149 let mut txn = FlowTransaction::deferred(
150 &parent,
151 CommitVersion(1),
152 Catalog::testing(),
153 Interceptors::new(),
154 Clock::Mock(MockClock::from_millis(1000)),
155 );
156
157 let key = make_key("key1");
158 txn.set(&key, make_value("value1")).unwrap();
159 assert_eq!(txn.pending().get(&key), Some(&make_value("value1")));
160
161 txn.remove(&key).unwrap();
162 assert!(txn.pending().is_removed(&key));
163 assert_eq!(txn.pending().get(&key), None);
164 }
165
166 #[test]
167 fn test_remove_then_set() {
168 let parent = create_test_transaction();
169 let mut txn = FlowTransaction::deferred(
170 &parent,
171 CommitVersion(1),
172 Catalog::testing(),
173 Interceptors::new(),
174 Clock::Mock(MockClock::from_millis(1000)),
175 );
176
177 let key = make_key("key1");
178 txn.remove(&key).unwrap();
179 assert!(txn.pending().is_removed(&key));
180
181 txn.set(&key, make_value("value1")).unwrap();
182 assert!(!txn.pending().is_removed(&key));
183 assert_eq!(txn.pending().get(&key), Some(&make_value("value1")));
184 }
185
186 #[test]
187 fn test_writes_not_visible_to_parent() {
188 let mut parent = create_test_transaction();
189 let mut txn = FlowTransaction::deferred(
190 &parent,
191 CommitVersion(1),
192 Catalog::testing(),
193 Interceptors::new(),
194 Clock::Mock(MockClock::from_millis(1000)),
195 );
196
197 let key = make_key("key1");
198 let value = make_value("value1");
199
200 txn.set(&key, value.clone()).unwrap();
202
203 assert_eq!(get_row(&mut parent, &key), None);
205 }
206
207 #[test]
208 fn test_removes_not_visible_to_parent() {
209 let mut parent = create_test_transaction();
210
211 let key = make_key("key1");
213 let value = make_value("value1");
214 parent.set(&key, value.clone()).unwrap();
215 assert_eq!(get_row(&mut parent, &key), Some(value.clone()));
216
217 let parent_version = parent.version();
219 let mut txn = FlowTransaction::deferred(
220 &parent,
221 parent_version,
222 Catalog::testing(),
223 Interceptors::new(),
224 Clock::Mock(MockClock::from_millis(1000)),
225 );
226 txn.remove(&key).unwrap();
227
228 assert_eq!(get_row(&mut parent, &key), Some(value));
230 }
231
232 #[test]
233 fn test_mixed_writes_and_removes() {
234 let parent = create_test_transaction();
235 let mut txn = FlowTransaction::deferred(
236 &parent,
237 CommitVersion(1),
238 Catalog::testing(),
239 Interceptors::new(),
240 Clock::Mock(MockClock::from_millis(1000)),
241 );
242
243 txn.set(&make_key("write1"), make_value("v1")).unwrap();
244 txn.remove(&make_key("remove1")).unwrap();
245 txn.set(&make_key("write2"), make_value("v2")).unwrap();
246 txn.remove(&make_key("remove2")).unwrap();
247
248 assert_eq!(txn.pending().get(&make_key("write1")), Some(&make_value("v1")));
249 assert_eq!(txn.pending().get(&make_key("write2")), Some(&make_value("v2")));
250 assert!(txn.pending().is_removed(&make_key("remove1")));
251 assert!(txn.pending().is_removed(&make_key("remove2")));
252 }
253}