reifydb_sub_flow/transaction/
commit.rs1use diagnostic::flow::flow_transaction_keyspace_overlap;
5use reifydb_core::interface::{MultiVersionCommandTransaction, interceptor::ViewInterceptor};
6use reifydb_engine::StandardCommandTransaction;
7use reifydb_type::{diagnostic, return_error, util::hex};
8
9use super::{FlowTransaction, FlowTransactionMetrics, Pending, ViewPending};
10
11impl FlowTransaction {
12 pub fn commit(&mut self, parent: &mut StandardCommandTransaction) -> crate::Result<FlowTransactionMetrics> {
26 {
29 let parent_pending = parent.pending_writes();
30 for (key, _) in self.pending.iter_sorted() {
31 if parent_pending.contains_key(key) {
33 return_error!(flow_transaction_keyspace_overlap(hex::encode(key.as_ref())));
34 }
35 }
36 }
37
38 for view_op in self.view_pending.drain(..) {
41 match view_op {
42 ViewPending::Insert {
43 view,
44 row_number,
45 row,
46 } => {
47 ViewInterceptor::pre_insert(parent, &view, row_number, &row)?;
49 ViewInterceptor::post_insert(parent, &view, row_number, &row)?;
53 }
54 ViewPending::Update {
55 view,
56 old_row_number,
57 new_row_number,
58 row,
59 } => {
60 ViewInterceptor::pre_update(parent, &view, new_row_number, &row)?;
62 ViewInterceptor::pre_delete(parent, &view, old_row_number)?;
64 ViewInterceptor::post_update(parent, &view, new_row_number, &row, &row)?;
68 }
69 ViewPending::Remove {
70 view,
71 row_number,
72 } => {
73 ViewInterceptor::pre_delete(parent, &view, row_number)?;
75 }
79 }
80 }
81
82 for (key, pending) in self.pending.iter_sorted() {
84 match pending {
85 Pending::Write(value) => {
86 parent.set(key, value.clone())?;
87 }
88 Pending::Remove => {
89 parent.remove(key)?;
90 }
91 }
92 }
93
94 self.pending.clear();
95 Ok(self.metrics.clone())
96 }
97}
98
99#[cfg(test)]
100mod tests {
101 use reifydb_core::CommitVersion;
102
103 use super::*;
104 use crate::{
105 operator::stateful::test_utils::test::create_test_transaction,
106 transaction::utils::test::{from_store, make_key, make_value},
107 };
108
109 #[test]
110 fn test_commit_empty_pending() {
111 let mut parent = create_test_transaction();
112 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
113
114 let metrics = txn.commit(&mut parent).unwrap();
115
116 assert_eq!(metrics.reads, 0);
118 assert_eq!(metrics.writes, 0);
119 assert_eq!(metrics.removes, 0);
120 }
121
122 #[test]
123 fn test_commit_single_write() {
124 let mut parent = create_test_transaction();
125 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
126
127 let key = make_key("key1");
128 let value = make_value("value1");
129 txn.set(&key, value.clone()).unwrap();
130
131 txn.commit(&mut parent).unwrap();
132
133 assert_eq!(from_store(&mut parent, &key), Some(value));
135 }
136
137 #[test]
138 fn test_commit_multiple_writes() {
139 let mut parent = create_test_transaction();
140 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
141
142 txn.set(&make_key("key1"), make_value("value1")).unwrap();
143 txn.set(&make_key("key2"), make_value("value2")).unwrap();
144 txn.set(&make_key("key3"), make_value("value3")).unwrap();
145
146 txn.commit(&mut parent).unwrap();
147
148 assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
150 assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
151 assert_eq!(from_store(&mut parent, &make_key("key3")), Some(make_value("value3")));
152 }
153
154 #[test]
155 fn test_commit_removes() {
156 use reifydb_core::interface::Engine;
157
158 use crate::operator::stateful::test_utils::test::create_test_engine;
159
160 let engine = create_test_engine();
161 let mut parent = engine.begin_command().unwrap();
162
163 let key1 = make_key("key1");
165 let key2 = make_key("key2");
166 parent.set(&key1, make_value("value1")).unwrap();
167 parent.set(&key2, make_value("value2")).unwrap();
168 let commit_version = parent.commit().unwrap();
169
170 let mut parent = engine.begin_command().unwrap();
172
173 assert_eq!(from_store(&mut parent, &key1), Some(make_value("value1")));
175 assert_eq!(from_store(&mut parent, &key2), Some(make_value("value2")));
176
177 let mut txn = FlowTransaction::new(&parent, commit_version);
179 txn.remove(&key1).unwrap();
180 txn.remove(&key2).unwrap();
181
182 txn.commit(&mut parent).unwrap();
183
184 parent.commit().unwrap();
186
187 let mut parent = engine.begin_command().unwrap();
189 assert_eq!(from_store(&mut parent, &key1), None);
190 assert_eq!(from_store(&mut parent, &key2), None);
191 }
192
193 #[test]
194 fn test_commit_mixed_writes_and_removes() {
195 use reifydb_core::interface::Engine;
196
197 use crate::operator::stateful::test_utils::test::create_test_engine;
198
199 let engine = create_test_engine();
200 let mut parent = engine.begin_command().unwrap();
201
202 let existing_key = make_key("existing");
204 parent.set(&existing_key, make_value("old")).unwrap();
205 let commit_version = parent.commit().unwrap();
206
207 let mut parent = engine.begin_command().unwrap();
209
210 assert_eq!(from_store(&mut parent, &existing_key), Some(make_value("old")));
212
213 let mut txn = FlowTransaction::new(&parent, commit_version);
215
216 let new_key = make_key("new");
218 txn.set(&new_key, make_value("value")).unwrap();
219 txn.remove(&existing_key).unwrap();
220
221 txn.commit(&mut parent).unwrap();
222
223 parent.commit().unwrap();
225
226 let mut parent = engine.begin_command().unwrap();
228 assert_eq!(from_store(&mut parent, &new_key), Some(make_value("value")));
229 assert_eq!(from_store(&mut parent, &existing_key), None);
230 }
231
232 #[test]
233 fn test_commit_returns_metrics() {
234 let mut parent = create_test_transaction();
235 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
236
237 txn.set(&make_key("key1"), make_value("value1")).unwrap();
238 txn.get(&make_key("key2")).unwrap();
239 txn.remove(&make_key("key3")).unwrap();
240
241 let metrics = txn.commit(&mut parent).unwrap();
242
243 assert_eq!(metrics.writes, 1);
244 assert_eq!(metrics.reads, 1);
245 assert_eq!(metrics.removes, 1);
246 }
247
248 #[test]
249 fn test_commit_overwrites_storage_value() {
250 use reifydb_core::interface::Engine;
251
252 use crate::operator::stateful::test_utils::test::create_test_engine;
253
254 let engine = create_test_engine();
255 let mut parent = engine.begin_command().unwrap();
256
257 let key = make_key("key1");
259 parent.set(&key, make_value("old")).unwrap();
260 let commit_version = parent.commit().unwrap();
261
262 let mut parent = engine.begin_command().unwrap();
264
265 assert_eq!(from_store(&mut parent, &key), Some(make_value("old")));
267
268 let mut txn = FlowTransaction::new(&parent, commit_version);
270 txn.set(&key, make_value("new")).unwrap();
271 txn.commit(&mut parent).unwrap();
272
273 assert_eq!(from_store(&mut parent, &key), Some(make_value("new")));
275 }
276
277 #[test]
278 fn test_sequential_commits_different_keys() {
279 let mut parent = create_test_transaction();
280
281 let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
285 txn1.set(&make_key("key1"), make_value("value1")).unwrap();
286 txn1.commit(&mut parent).unwrap();
287
288 let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
290 txn2.set(&make_key("key2"), make_value("value2")).unwrap();
291 txn2.commit(&mut parent).unwrap();
292
293 assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
295 assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
296 }
297
298 #[test]
299 fn test_same_key_multiple_overwrites() {
300 let mut parent = create_test_transaction();
301 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
302
303 let key = make_key("key1");
304
305 txn.set(&key, make_value("first")).unwrap();
307 txn.remove(&key).unwrap();
308
309 assert!(txn.pending.is_removed(&key));
311
312 txn.set(&key, make_value("second")).unwrap();
314 txn.remove(&key).unwrap();
315 txn.set(&key, make_value("final")).unwrap();
316
317 assert_eq!(txn.pending.get(&key), Some(&make_value("final")));
319
320 txn.commit(&mut parent).unwrap();
322
323 assert_eq!(from_store(&mut parent, &key), Some(make_value("final")));
325 }
326
327 #[test]
328 fn test_commit_detects_overlapping_writes() {
329 let mut parent = create_test_transaction();
330
331 let key = make_key("key1");
332
333 let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
335 let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
336
337 txn1.set(&key, make_value("value1")).unwrap();
339 txn2.set(&key, make_value("value2")).unwrap();
340
341 txn1.commit(&mut parent).unwrap();
343
344 let result = txn2.commit(&mut parent);
347 assert!(result.is_err());
348
349 let err = result.unwrap_err();
351 assert_eq!(err.code, "FLOW_002");
352 }
353
354 #[test]
355 fn test_double_commit_prevention() {
356 let mut parent = create_test_transaction();
357
358 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
359 txn.set(&make_key("key1"), make_value("value1")).unwrap();
360
361 let metrics = txn.commit(&mut parent);
363 assert!(metrics.is_ok(), "First commit should succeed");
364
365 }
371
372 #[test]
373 fn test_commit_allows_nonoverlapping_writes() {
374 let mut parent = create_test_transaction();
375
376 let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
378 txn1.set(&make_key("key1"), make_value("value1")).unwrap();
379 txn1.commit(&mut parent).unwrap();
380
381 let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
384 txn2.set(&make_key("key2"), make_value("value2")).unwrap();
385 let result = txn2.commit(&mut parent);
386
387 assert!(result.is_ok());
389
390 assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
392 assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
393 }
394}