reifydb_sub_flow/transaction/
commit.rs1use diagnostic::flow::flow_transaction_keyspace_overlap;
5use reifydb_core::interface::MultiVersionCommandTransaction;
6use reifydb_engine::StandardCommandTransaction;
7use reifydb_type::{diagnostic, return_error, util::hex};
8
9use super::{FlowTransaction, FlowTransactionMetrics, Pending};
10
11impl FlowTransaction {
12 pub fn commit(&mut self, parent: &mut StandardCommandTransaction) -> crate::Result<FlowTransactionMetrics> {
26 {
29 let parent_pending = parent.pending_writes();
30 for (key, _) in self.pending.iter_sorted() {
31 if parent_pending.contains_key(key) {
33 return_error!(flow_transaction_keyspace_overlap(hex::encode(key.as_ref())));
34 }
35 }
36 }
37
38 for (key, pending) in self.pending.iter_sorted() {
40 match pending {
41 Pending::Write(value) => {
42 parent.set(key, value.clone())?;
43 }
44 Pending::Remove => {
45 parent.remove(key)?;
46 }
47 }
48 }
49
50 self.pending.clear();
51 Ok(self.metrics.clone())
52 }
53}
54
55#[cfg(test)]
56mod tests {
57 use reifydb_core::CommitVersion;
58
59 use super::*;
60 use crate::{
61 operator::stateful::test_utils::test::create_test_transaction,
62 transaction::utils::test::{from_store, make_key, make_value},
63 };
64
65 #[test]
66 fn test_commit_empty_pending() {
67 let mut parent = create_test_transaction();
68 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
69
70 let metrics = txn.commit(&mut parent).unwrap();
71
72 assert_eq!(metrics.reads, 0);
74 assert_eq!(metrics.writes, 0);
75 assert_eq!(metrics.removes, 0);
76 }
77
78 #[test]
79 fn test_commit_single_write() {
80 let mut parent = create_test_transaction();
81 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
82
83 let key = make_key("key1");
84 let value = make_value("value1");
85 txn.set(&key, value.clone()).unwrap();
86
87 txn.commit(&mut parent).unwrap();
88
89 assert_eq!(from_store(&mut parent, &key), Some(value));
91 }
92
93 #[test]
94 fn test_commit_multiple_writes() {
95 let mut parent = create_test_transaction();
96 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
97
98 txn.set(&make_key("key1"), make_value("value1")).unwrap();
99 txn.set(&make_key("key2"), make_value("value2")).unwrap();
100 txn.set(&make_key("key3"), make_value("value3")).unwrap();
101
102 txn.commit(&mut parent).unwrap();
103
104 assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
106 assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
107 assert_eq!(from_store(&mut parent, &make_key("key3")), Some(make_value("value3")));
108 }
109
110 #[test]
111 fn test_commit_removes() {
112 use reifydb_core::interface::Engine;
113
114 use crate::operator::stateful::test_utils::test::create_test_engine;
115
116 let engine = create_test_engine();
117 let mut parent = engine.begin_command().unwrap();
118
119 let key1 = make_key("key1");
121 let key2 = make_key("key2");
122 parent.set(&key1, make_value("value1")).unwrap();
123 parent.set(&key2, make_value("value2")).unwrap();
124 let commit_version = parent.commit().unwrap();
125
126 let mut parent = engine.begin_command().unwrap();
128
129 assert_eq!(from_store(&mut parent, &key1), Some(make_value("value1")));
131 assert_eq!(from_store(&mut parent, &key2), Some(make_value("value2")));
132
133 let mut txn = FlowTransaction::new(&parent, commit_version);
135 txn.remove(&key1).unwrap();
136 txn.remove(&key2).unwrap();
137
138 txn.commit(&mut parent).unwrap();
139
140 parent.commit().unwrap();
142
143 let mut parent = engine.begin_command().unwrap();
145 assert_eq!(from_store(&mut parent, &key1), None);
146 assert_eq!(from_store(&mut parent, &key2), None);
147 }
148
149 #[test]
150 fn test_commit_mixed_writes_and_removes() {
151 use reifydb_core::interface::Engine;
152
153 use crate::operator::stateful::test_utils::test::create_test_engine;
154
155 let engine = create_test_engine();
156 let mut parent = engine.begin_command().unwrap();
157
158 let existing_key = make_key("existing");
160 parent.set(&existing_key, make_value("old")).unwrap();
161 let commit_version = parent.commit().unwrap();
162
163 let mut parent = engine.begin_command().unwrap();
165
166 assert_eq!(from_store(&mut parent, &existing_key), Some(make_value("old")));
168
169 let mut txn = FlowTransaction::new(&parent, commit_version);
171
172 let new_key = make_key("new");
174 txn.set(&new_key, make_value("value")).unwrap();
175 txn.remove(&existing_key).unwrap();
176
177 txn.commit(&mut parent).unwrap();
178
179 parent.commit().unwrap();
181
182 let mut parent = engine.begin_command().unwrap();
184 assert_eq!(from_store(&mut parent, &new_key), Some(make_value("value")));
185 assert_eq!(from_store(&mut parent, &existing_key), None);
186 }
187
188 #[test]
189 fn test_commit_returns_metrics() {
190 let mut parent = create_test_transaction();
191 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
192
193 txn.set(&make_key("key1"), make_value("value1")).unwrap();
194 txn.get(&make_key("key2")).unwrap();
195 txn.remove(&make_key("key3")).unwrap();
196
197 let metrics = txn.commit(&mut parent).unwrap();
198
199 assert_eq!(metrics.writes, 1);
200 assert_eq!(metrics.reads, 1);
201 assert_eq!(metrics.removes, 1);
202 }
203
204 #[test]
205 fn test_commit_overwrites_storage_value() {
206 use reifydb_core::interface::Engine;
207
208 use crate::operator::stateful::test_utils::test::create_test_engine;
209
210 let engine = create_test_engine();
211 let mut parent = engine.begin_command().unwrap();
212
213 let key = make_key("key1");
215 parent.set(&key, make_value("old")).unwrap();
216 let commit_version = parent.commit().unwrap();
217
218 let mut parent = engine.begin_command().unwrap();
220
221 assert_eq!(from_store(&mut parent, &key), Some(make_value("old")));
223
224 let mut txn = FlowTransaction::new(&parent, commit_version);
226 txn.set(&key, make_value("new")).unwrap();
227 txn.commit(&mut parent).unwrap();
228
229 assert_eq!(from_store(&mut parent, &key), Some(make_value("new")));
231 }
232
233 #[test]
234 fn test_sequential_commits_different_keys() {
235 let mut parent = create_test_transaction();
236
237 let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
241 txn1.set(&make_key("key1"), make_value("value1")).unwrap();
242 txn1.commit(&mut parent).unwrap();
243
244 let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
246 txn2.set(&make_key("key2"), make_value("value2")).unwrap();
247 txn2.commit(&mut parent).unwrap();
248
249 assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
251 assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
252 }
253
254 #[test]
255 fn test_same_key_multiple_overwrites() {
256 let mut parent = create_test_transaction();
257 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
258
259 let key = make_key("key1");
260
261 txn.set(&key, make_value("first")).unwrap();
263 txn.remove(&key).unwrap();
264
265 assert!(txn.pending.is_removed(&key));
267
268 txn.set(&key, make_value("second")).unwrap();
270 txn.remove(&key).unwrap();
271 txn.set(&key, make_value("final")).unwrap();
272
273 assert_eq!(txn.pending.get(&key), Some(&make_value("final")));
275
276 txn.commit(&mut parent).unwrap();
278
279 assert_eq!(from_store(&mut parent, &key), Some(make_value("final")));
281 }
282
283 #[test]
284 fn test_commit_detects_overlapping_writes() {
285 let mut parent = create_test_transaction();
286
287 let key = make_key("key1");
288
289 let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
291 let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
292
293 txn1.set(&key, make_value("value1")).unwrap();
295 txn2.set(&key, make_value("value2")).unwrap();
296
297 txn1.commit(&mut parent).unwrap();
299
300 let result = txn2.commit(&mut parent);
303 assert!(result.is_err());
304
305 let err = result.unwrap_err();
307 assert_eq!(err.code, "FLOW_002");
308 }
309
310 #[test]
311 fn test_double_commit_prevention() {
312 let mut parent = create_test_transaction();
313
314 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
315 txn.set(&make_key("key1"), make_value("value1")).unwrap();
316
317 let metrics = txn.commit(&mut parent);
319 assert!(metrics.is_ok(), "First commit should succeed");
320
321 }
327
328 #[test]
329 fn test_commit_allows_nonoverlapping_writes() {
330 let mut parent = create_test_transaction();
331
332 let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
334 txn1.set(&make_key("key1"), make_value("value1")).unwrap();
335 txn1.commit(&mut parent).unwrap();
336
337 let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
340 txn2.set(&make_key("key2"), make_value("value2")).unwrap();
341 let result = txn2.commit(&mut parent);
342
343 assert!(result.is_ok());
345
346 assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
348 assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
349 }
350}