reifydb_sub_flow/transaction/
commit.rs1use reifydb_core::interface::MultiVersionCommandTransaction;
5use reifydb_engine::StandardCommandTransaction;
6use reifydb_type::{diagnostic, return_error, util::hex};
7
8use super::{FlowTransaction, FlowTransactionMetrics, Pending};
9
10impl FlowTransaction {
11 pub fn commit(&mut self, parent: &mut StandardCommandTransaction) -> crate::Result<FlowTransactionMetrics> {
25 {
28 let parent_pending = parent.pending_writes();
29 for (key, _) in self.pending.iter_sorted() {
30 if parent_pending.contains_key(key) {
32 return_error!(diagnostic::flow::flow_transaction_keyspace_overlap(
33 hex::encode(key.as_ref())
34 ));
35 }
36 }
37 }
38
39 for (key, pending) in self.pending.iter_sorted() {
41 match pending {
42 Pending::Write(value) => {
43 parent.set(key, value.clone())?;
44 }
45 Pending::Remove => {
46 parent.remove(key)?;
47 }
48 }
49 }
50
51 self.pending.clear();
52
53 Ok(self.metrics.clone())
54 }
55}
56
57#[cfg(test)]
58mod tests {
59 use reifydb_core::CommitVersion;
60
61 use super::*;
62 use crate::{
63 operator::stateful::test_utils::test::create_test_transaction,
64 transaction::test_utils::test::{from_store, make_key, make_value},
65 };
66
67 #[test]
68 fn test_commit_empty_pending() {
69 let mut parent = create_test_transaction();
70 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
71
72 let metrics = txn.commit(&mut parent).unwrap();
73
74 assert_eq!(metrics.reads, 0);
76 assert_eq!(metrics.writes, 0);
77 assert_eq!(metrics.removes, 0);
78 }
79
80 #[test]
81 fn test_commit_single_write() {
82 let mut parent = create_test_transaction();
83 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
84
85 let key = make_key("key1");
86 let value = make_value("value1");
87 txn.set(&key, value.clone()).unwrap();
88
89 txn.commit(&mut parent).unwrap();
90
91 assert_eq!(from_store(&mut parent, &key), Some(value));
93 }
94
95 #[test]
96 fn test_commit_multiple_writes() {
97 let mut parent = create_test_transaction();
98 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
99
100 txn.set(&make_key("key1"), make_value("value1")).unwrap();
101 txn.set(&make_key("key2"), make_value("value2")).unwrap();
102 txn.set(&make_key("key3"), make_value("value3")).unwrap();
103
104 txn.commit(&mut parent).unwrap();
105
106 assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
108 assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
109 assert_eq!(from_store(&mut parent, &make_key("key3")), Some(make_value("value3")));
110 }
111
112 #[test]
113 fn test_commit_removes() {
114 use reifydb_core::interface::Engine;
115
116 use crate::operator::stateful::test_utils::test::create_test_engine;
117
118 let engine = create_test_engine();
119 let mut parent = engine.begin_command().unwrap();
120
121 let key1 = make_key("key1");
123 let key2 = make_key("key2");
124 parent.set(&key1, make_value("value1")).unwrap();
125 parent.set(&key2, make_value("value2")).unwrap();
126 let commit_version = parent.commit().unwrap();
127
128 let mut parent = engine.begin_command().unwrap();
130
131 assert_eq!(from_store(&mut parent, &key1), Some(make_value("value1")));
133 assert_eq!(from_store(&mut parent, &key2), Some(make_value("value2")));
134
135 let mut txn = FlowTransaction::new(&parent, commit_version);
137 txn.remove(&key1).unwrap();
138 txn.remove(&key2).unwrap();
139
140 txn.commit(&mut parent).unwrap();
141
142 parent.commit().unwrap();
144
145 let mut parent = engine.begin_command().unwrap();
147 assert_eq!(from_store(&mut parent, &key1), None);
148 assert_eq!(from_store(&mut parent, &key2), None);
149 }
150
151 #[test]
152 fn test_commit_mixed_writes_and_removes() {
153 use reifydb_core::interface::Engine;
154
155 use crate::operator::stateful::test_utils::test::create_test_engine;
156
157 let engine = create_test_engine();
158 let mut parent = engine.begin_command().unwrap();
159
160 let existing_key = make_key("existing");
162 parent.set(&existing_key, make_value("old")).unwrap();
163 let commit_version = parent.commit().unwrap();
164
165 let mut parent = engine.begin_command().unwrap();
167
168 assert_eq!(from_store(&mut parent, &existing_key), Some(make_value("old")));
170
171 let mut txn = FlowTransaction::new(&parent, commit_version);
173
174 let new_key = make_key("new");
176 txn.set(&new_key, make_value("value")).unwrap();
177 txn.remove(&existing_key).unwrap();
178
179 txn.commit(&mut parent).unwrap();
180
181 parent.commit().unwrap();
183
184 let mut parent = engine.begin_command().unwrap();
186 assert_eq!(from_store(&mut parent, &new_key), Some(make_value("value")));
187 assert_eq!(from_store(&mut parent, &existing_key), None);
188 }
189
190 #[test]
191 fn test_commit_returns_metrics() {
192 let mut parent = create_test_transaction();
193 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
194
195 txn.set(&make_key("key1"), make_value("value1")).unwrap();
196 txn.get(&make_key("key2")).unwrap();
197 txn.remove(&make_key("key3")).unwrap();
198
199 let metrics = txn.commit(&mut parent).unwrap();
200
201 assert_eq!(metrics.writes, 1);
202 assert_eq!(metrics.reads, 1);
203 assert_eq!(metrics.removes, 1);
204 }
205
206 #[test]
207 fn test_commit_overwrites_storage_value() {
208 use reifydb_core::interface::Engine;
209
210 use crate::operator::stateful::test_utils::test::create_test_engine;
211
212 let engine = create_test_engine();
213 let mut parent = engine.begin_command().unwrap();
214
215 let key = make_key("key1");
217 parent.set(&key, make_value("old")).unwrap();
218 let commit_version = parent.commit().unwrap();
219
220 let mut parent = engine.begin_command().unwrap();
222
223 assert_eq!(from_store(&mut parent, &key), Some(make_value("old")));
225
226 let mut txn = FlowTransaction::new(&parent, commit_version);
228 txn.set(&key, make_value("new")).unwrap();
229 txn.commit(&mut parent).unwrap();
230
231 assert_eq!(from_store(&mut parent, &key), Some(make_value("new")));
233 }
234
235 #[test]
236 fn test_sequential_commits_different_keys() {
237 let mut parent = create_test_transaction();
238
239 let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
243 txn1.set(&make_key("key1"), make_value("value1")).unwrap();
244 txn1.commit(&mut parent).unwrap();
245
246 let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
248 txn2.set(&make_key("key2"), make_value("value2")).unwrap();
249 txn2.commit(&mut parent).unwrap();
250
251 assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
253 assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
254 }
255
256 #[test]
257 fn test_same_key_multiple_overwrites() {
258 let mut parent = create_test_transaction();
259 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
260
261 let key = make_key("key1");
262
263 txn.set(&key, make_value("first")).unwrap();
265 txn.remove(&key).unwrap();
266
267 assert!(txn.pending.is_removed(&key));
269
270 txn.set(&key, make_value("second")).unwrap();
272 txn.remove(&key).unwrap();
273 txn.set(&key, make_value("final")).unwrap();
274
275 assert_eq!(txn.pending.get(&key), Some(&make_value("final")));
277
278 txn.commit(&mut parent).unwrap();
280
281 assert_eq!(from_store(&mut parent, &key), Some(make_value("final")));
283 }
284
285 #[test]
286 fn test_commit_detects_overlapping_writes() {
287 let mut parent = create_test_transaction();
288
289 let key = make_key("key1");
290
291 let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
293 let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
294
295 txn1.set(&key, make_value("value1")).unwrap();
297 txn2.set(&key, make_value("value2")).unwrap();
298
299 txn1.commit(&mut parent).unwrap();
301
302 let result = txn2.commit(&mut parent);
305 assert!(result.is_err());
306
307 let err = result.unwrap_err();
309 assert_eq!(err.code, "FLOW_002");
310 }
311
312 #[test]
313 fn test_double_commit_prevention() {
314 let mut parent = create_test_transaction();
315
316 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
317 txn.set(&make_key("key1"), make_value("value1")).unwrap();
318
319 let metrics = txn.commit(&mut parent);
321 assert!(metrics.is_ok(), "First commit should succeed");
322
323 }
329
330 #[test]
331 fn test_commit_allows_nonoverlapping_writes() {
332 let mut parent = create_test_transaction();
333
334 let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
336 txn1.set(&make_key("key1"), make_value("value1")).unwrap();
337 txn1.commit(&mut parent).unwrap();
338
339 let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
342 txn2.set(&make_key("key2"), make_value("value2")).unwrap();
343 let result = txn2.commit(&mut parent);
344
345 assert!(result.is_ok());
347
348 assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
350 assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
351 }
352}