reifydb_sub_flow/transaction/
commit.rs1use diagnostic::flow::flow_transaction_keyspace_overlap;
5use reifydb_core::interface::{MultiVersionCommandTransaction, interceptor::ViewInterceptor};
6use reifydb_engine::StandardCommandTransaction;
7use reifydb_type::{diagnostic, return_error, util::hex};
8use tracing::instrument;
9
10use super::{FlowTransaction, FlowTransactionMetrics, Pending, ViewPending};
11
12impl FlowTransaction {
13 #[instrument(level = "debug", skip(self, parent), fields(
27 pending_count = self.pending.len(),
28 view_ops_count = self.view_pending.len(),
29 writes,
30 removes
31 ))]
32 pub fn commit(&mut self, parent: &mut StandardCommandTransaction) -> crate::Result<FlowTransactionMetrics> {
33 {
36 let parent_pending = parent.pending_writes();
37 for (key, _) in self.pending.iter_sorted() {
38 if parent_pending.contains_key(key) {
40 return_error!(flow_transaction_keyspace_overlap(hex::encode(key.as_ref())));
41 }
42 }
43 }
44
45 for view_op in self.view_pending.drain(..) {
48 match view_op {
49 ViewPending::Insert {
50 view,
51 row_number,
52 row,
53 } => {
54 ViewInterceptor::pre_insert(parent, &view, row_number, &row)?;
56 ViewInterceptor::post_insert(parent, &view, row_number, &row)?;
60 }
61 ViewPending::Update {
62 view,
63 old_row_number,
64 new_row_number,
65 row,
66 } => {
67 ViewInterceptor::pre_update(parent, &view, new_row_number, &row)?;
69 ViewInterceptor::pre_delete(parent, &view, old_row_number)?;
71 ViewInterceptor::post_update(parent, &view, new_row_number, &row, &row)?;
75 }
76 ViewPending::Remove {
77 view,
78 row_number,
79 } => {
80 ViewInterceptor::pre_delete(parent, &view, row_number)?;
82 }
86 }
87 }
88
89 let mut write_count = 0;
91 let mut remove_count = 0;
92 for (key, pending) in self.pending.iter_sorted() {
93 match pending {
94 Pending::Write(value) => {
95 parent.set(key, value.clone())?;
96 write_count += 1;
97 }
98 Pending::Remove => {
99 parent.remove(key)?;
100 remove_count += 1;
101 }
102 }
103 }
104
105 tracing::Span::current().record("writes", write_count);
106 tracing::Span::current().record("removes", remove_count);
107
108 self.pending.clear();
109 Ok(self.metrics.clone())
110 }
111}
112
113#[cfg(test)]
114mod tests {
115 use reifydb_core::CommitVersion;
116
117 use super::*;
118 use crate::{
119 operator::stateful::test_utils::test::create_test_transaction,
120 transaction::utils::test::{from_store, make_key, make_value},
121 };
122
123 #[test]
124 fn test_commit_empty_pending() {
125 let mut parent = create_test_transaction();
126 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
127
128 let metrics = txn.commit(&mut parent).unwrap();
129
130 assert_eq!(metrics.reads, 0);
132 assert_eq!(metrics.writes, 0);
133 assert_eq!(metrics.removes, 0);
134 }
135
136 #[test]
137 fn test_commit_single_write() {
138 let mut parent = create_test_transaction();
139 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
140
141 let key = make_key("key1");
142 let value = make_value("value1");
143 txn.set(&key, value.clone()).unwrap();
144
145 txn.commit(&mut parent).unwrap();
146
147 assert_eq!(from_store(&mut parent, &key), Some(value));
149 }
150
151 #[test]
152 fn test_commit_multiple_writes() {
153 let mut parent = create_test_transaction();
154 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
155
156 txn.set(&make_key("key1"), make_value("value1")).unwrap();
157 txn.set(&make_key("key2"), make_value("value2")).unwrap();
158 txn.set(&make_key("key3"), make_value("value3")).unwrap();
159
160 txn.commit(&mut parent).unwrap();
161
162 assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
164 assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
165 assert_eq!(from_store(&mut parent, &make_key("key3")), Some(make_value("value3")));
166 }
167
168 #[test]
169 fn test_commit_removes() {
170 use reifydb_core::interface::Engine;
171
172 use crate::operator::stateful::test_utils::test::create_test_engine;
173
174 let engine = create_test_engine();
175 let mut parent = engine.begin_command().unwrap();
176
177 let key1 = make_key("key1");
179 let key2 = make_key("key2");
180 parent.set(&key1, make_value("value1")).unwrap();
181 parent.set(&key2, make_value("value2")).unwrap();
182 let commit_version = parent.commit().unwrap();
183
184 let mut parent = engine.begin_command().unwrap();
186
187 assert_eq!(from_store(&mut parent, &key1), Some(make_value("value1")));
189 assert_eq!(from_store(&mut parent, &key2), Some(make_value("value2")));
190
191 let mut txn = FlowTransaction::new(&parent, commit_version);
193 txn.remove(&key1).unwrap();
194 txn.remove(&key2).unwrap();
195
196 txn.commit(&mut parent).unwrap();
197
198 parent.commit().unwrap();
200
201 let mut parent = engine.begin_command().unwrap();
203 assert_eq!(from_store(&mut parent, &key1), None);
204 assert_eq!(from_store(&mut parent, &key2), None);
205 }
206
207 #[test]
208 fn test_commit_mixed_writes_and_removes() {
209 use reifydb_core::interface::Engine;
210
211 use crate::operator::stateful::test_utils::test::create_test_engine;
212
213 let engine = create_test_engine();
214 let mut parent = engine.begin_command().unwrap();
215
216 let existing_key = make_key("existing");
218 parent.set(&existing_key, make_value("old")).unwrap();
219 let commit_version = parent.commit().unwrap();
220
221 let mut parent = engine.begin_command().unwrap();
223
224 assert_eq!(from_store(&mut parent, &existing_key), Some(make_value("old")));
226
227 let mut txn = FlowTransaction::new(&parent, commit_version);
229
230 let new_key = make_key("new");
232 txn.set(&new_key, make_value("value")).unwrap();
233 txn.remove(&existing_key).unwrap();
234
235 txn.commit(&mut parent).unwrap();
236
237 parent.commit().unwrap();
239
240 let mut parent = engine.begin_command().unwrap();
242 assert_eq!(from_store(&mut parent, &new_key), Some(make_value("value")));
243 assert_eq!(from_store(&mut parent, &existing_key), None);
244 }
245
246 #[test]
247 fn test_commit_returns_metrics() {
248 let mut parent = create_test_transaction();
249 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
250
251 txn.set(&make_key("key1"), make_value("value1")).unwrap();
252 txn.get(&make_key("key2")).unwrap();
253 txn.remove(&make_key("key3")).unwrap();
254
255 let metrics = txn.commit(&mut parent).unwrap();
256
257 assert_eq!(metrics.writes, 1);
258 assert_eq!(metrics.reads, 1);
259 assert_eq!(metrics.removes, 1);
260 }
261
262 #[test]
263 fn test_commit_overwrites_storage_value() {
264 use reifydb_core::interface::Engine;
265
266 use crate::operator::stateful::test_utils::test::create_test_engine;
267
268 let engine = create_test_engine();
269 let mut parent = engine.begin_command().unwrap();
270
271 let key = make_key("key1");
273 parent.set(&key, make_value("old")).unwrap();
274 let commit_version = parent.commit().unwrap();
275
276 let mut parent = engine.begin_command().unwrap();
278
279 assert_eq!(from_store(&mut parent, &key), Some(make_value("old")));
281
282 let mut txn = FlowTransaction::new(&parent, commit_version);
284 txn.set(&key, make_value("new")).unwrap();
285 txn.commit(&mut parent).unwrap();
286
287 assert_eq!(from_store(&mut parent, &key), Some(make_value("new")));
289 }
290
291 #[test]
292 fn test_sequential_commits_different_keys() {
293 let mut parent = create_test_transaction();
294
295 let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
299 txn1.set(&make_key("key1"), make_value("value1")).unwrap();
300 txn1.commit(&mut parent).unwrap();
301
302 let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
304 txn2.set(&make_key("key2"), make_value("value2")).unwrap();
305 txn2.commit(&mut parent).unwrap();
306
307 assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
309 assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
310 }
311
312 #[test]
313 fn test_same_key_multiple_overwrites() {
314 let mut parent = create_test_transaction();
315 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
316
317 let key = make_key("key1");
318
319 txn.set(&key, make_value("first")).unwrap();
321 txn.remove(&key).unwrap();
322
323 assert!(txn.pending.is_removed(&key));
325
326 txn.set(&key, make_value("second")).unwrap();
328 txn.remove(&key).unwrap();
329 txn.set(&key, make_value("final")).unwrap();
330
331 assert_eq!(txn.pending.get(&key), Some(&make_value("final")));
333
334 txn.commit(&mut parent).unwrap();
336
337 assert_eq!(from_store(&mut parent, &key), Some(make_value("final")));
339 }
340
341 #[test]
342 fn test_commit_detects_overlapping_writes() {
343 let mut parent = create_test_transaction();
344
345 let key = make_key("key1");
346
347 let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
349 let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
350
351 txn1.set(&key, make_value("value1")).unwrap();
353 txn2.set(&key, make_value("value2")).unwrap();
354
355 txn1.commit(&mut parent).unwrap();
357
358 let result = txn2.commit(&mut parent);
361 assert!(result.is_err());
362
363 let err = result.unwrap_err();
365 assert_eq!(err.code, "FLOW_002");
366 }
367
368 #[test]
369 fn test_double_commit_prevention() {
370 let mut parent = create_test_transaction();
371
372 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
373 txn.set(&make_key("key1"), make_value("value1")).unwrap();
374
375 let metrics = txn.commit(&mut parent);
377 assert!(metrics.is_ok(), "First commit should succeed");
378
379 }
385
386 #[test]
387 fn test_commit_allows_nonoverlapping_writes() {
388 let mut parent = create_test_transaction();
389
390 let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
392 txn1.set(&make_key("key1"), make_value("value1")).unwrap();
393 txn1.commit(&mut parent).unwrap();
394
395 let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
398 txn2.set(&make_key("key2"), make_value("value2")).unwrap();
399 let result = txn2.commit(&mut parent);
400
401 assert!(result.is_ok());
403
404 assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
406 assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
407 }
408}