reifydb_sub_flow/transaction/
commit.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use diagnostic::flow::flow_transaction_keyspace_overlap;
5use reifydb_core::interface::{MultiVersionCommandTransaction, interceptor::ViewInterceptor};
6use reifydb_engine::StandardCommandTransaction;
7use reifydb_type::{diagnostic, return_error, util::hex};
8
9use super::{FlowTransaction, FlowTransactionMetrics, Pending, ViewPending};
10
11impl FlowTransaction {
12	/// Commit all pending writes and removes to the parent transaction
13	///
14	/// Takes the parent transaction as a mutable reference to apply buffered operations.
15	/// This allows the FlowTransaction to be reused for subsequent units of work.
16	/// The pending buffer is NOT cleared to maintain read-your-own-writes semantics.
17	///
18	/// Returns the transaction metrics.
19	///
20	/// # Errors
21	///
22	/// Returns an error if any key in this FlowTransaction overlaps with keys already
23	/// written by another FlowTransaction to the same parent. FlowTransactions must
24	/// operate on non-overlapping keyspaces.
25	pub fn commit(&mut self, parent: &mut StandardCommandTransaction) -> crate::Result<FlowTransactionMetrics> {
26		// Check for any overlapping keys with the parent's pending writes.
27		// This enforces that FlowTransactions operate on non-overlapping keyspaces.
28		{
29			let parent_pending = parent.pending_writes();
30			for (key, _) in self.pending.iter_sorted() {
31				// Check if key exists in parent
32				if parent_pending.contains_key(key) {
33					return_error!(flow_transaction_keyspace_overlap(hex::encode(key.as_ref())));
34				}
35			}
36		}
37
38		// Process view operations with interceptor calls
39		// We need to call pre_* before the storage write and post_* after
40		for view_op in self.view_pending.drain(..) {
41			match view_op {
42				ViewPending::Insert {
43					view,
44					row_number,
45					row,
46				} => {
47					// Call pre-insert interceptor
48					ViewInterceptor::pre_insert(parent, &view, row_number, &row)?;
49					// Note: The actual storage write is already in self.pending
50					// and will be applied below
51					// Call post-insert interceptor
52					ViewInterceptor::post_insert(parent, &view, row_number, &row)?;
53				}
54				ViewPending::Update {
55					view,
56					old_row_number,
57					new_row_number,
58					row,
59				} => {
60					// Call pre-update interceptor
61					ViewInterceptor::pre_update(parent, &view, new_row_number, &row)?;
62					// Call pre-delete for the old row
63					ViewInterceptor::pre_delete(parent, &view, old_row_number)?;
64					// Note: The actual storage writes are in self.pending
65					// Call post-delete for the old row (we don't have the old row data)
66					// Call post-update interceptor
67					ViewInterceptor::post_update(parent, &view, new_row_number, &row, &row)?;
68				}
69				ViewPending::Remove {
70					view,
71					row_number,
72				} => {
73					// Call pre-delete interceptor
74					ViewInterceptor::pre_delete(parent, &view, row_number)?;
75					// Note: The actual storage removal is in self.pending
76					// We don't have the deleted row data for post_delete, so we skip it
77					// TODO: Consider storing the deleted row data if post_delete is needed
78				}
79			}
80		}
81
82		// Now apply all writes and removes to storage
83		for (key, pending) in self.pending.iter_sorted() {
84			match pending {
85				Pending::Write(value) => {
86					parent.set(key, value.clone())?;
87				}
88				Pending::Remove => {
89					parent.remove(key)?;
90				}
91			}
92		}
93
94		self.pending.clear();
95		Ok(self.metrics.clone())
96	}
97}
98
99#[cfg(test)]
100mod tests {
101	use reifydb_core::CommitVersion;
102
103	use super::*;
104	use crate::{
105		operator::stateful::test_utils::test::create_test_transaction,
106		transaction::utils::test::{from_store, make_key, make_value},
107	};
108
109	#[test]
110	fn test_commit_empty_pending() {
111		let mut parent = create_test_transaction();
112		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
113
114		let metrics = txn.commit(&mut parent).unwrap();
115
116		// Metrics should be zero
117		assert_eq!(metrics.reads, 0);
118		assert_eq!(metrics.writes, 0);
119		assert_eq!(metrics.removes, 0);
120	}
121
122	#[test]
123	fn test_commit_single_write() {
124		let mut parent = create_test_transaction();
125		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
126
127		let key = make_key("key1");
128		let value = make_value("value1");
129		txn.set(&key, value.clone()).unwrap();
130
131		txn.commit(&mut parent).unwrap();
132
133		// Parent should now have the value
134		assert_eq!(from_store(&mut parent, &key), Some(value));
135	}
136
137	#[test]
138	fn test_commit_multiple_writes() {
139		let mut parent = create_test_transaction();
140		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
141
142		txn.set(&make_key("key1"), make_value("value1")).unwrap();
143		txn.set(&make_key("key2"), make_value("value2")).unwrap();
144		txn.set(&make_key("key3"), make_value("value3")).unwrap();
145
146		txn.commit(&mut parent).unwrap();
147
148		// All values should be in parent
149		assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
150		assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
151		assert_eq!(from_store(&mut parent, &make_key("key3")), Some(make_value("value3")));
152	}
153
154	#[test]
155	fn test_commit_removes() {
156		use reifydb_core::interface::Engine;
157
158		use crate::operator::stateful::test_utils::test::create_test_engine;
159
160		let engine = create_test_engine();
161		let mut parent = engine.begin_command().unwrap();
162
163		// First commit some data to the underlying storage
164		let key1 = make_key("key1");
165		let key2 = make_key("key2");
166		parent.set(&key1, make_value("value1")).unwrap();
167		parent.set(&key2, make_value("value2")).unwrap();
168		let commit_version = parent.commit().unwrap();
169
170		// Create new parent transaction after commit
171		let mut parent = engine.begin_command().unwrap();
172
173		// Verify values exist in storage
174		assert_eq!(from_store(&mut parent, &key1), Some(make_value("value1")));
175		assert_eq!(from_store(&mut parent, &key2), Some(make_value("value2")));
176
177		// Create FlowTransaction and remove the keys
178		let mut txn = FlowTransaction::new(&parent, commit_version);
179		txn.remove(&key1).unwrap();
180		txn.remove(&key2).unwrap();
181
182		txn.commit(&mut parent).unwrap();
183
184		// Commit parent to persist the removes
185		parent.commit().unwrap();
186
187		// Create new transaction to verify removes were persisted
188		let mut parent = engine.begin_command().unwrap();
189		assert_eq!(from_store(&mut parent, &key1), None);
190		assert_eq!(from_store(&mut parent, &key2), None);
191	}
192
193	#[test]
194	fn test_commit_mixed_writes_and_removes() {
195		use reifydb_core::interface::Engine;
196
197		use crate::operator::stateful::test_utils::test::create_test_engine;
198
199		let engine = create_test_engine();
200		let mut parent = engine.begin_command().unwrap();
201
202		// First commit some data to the underlying storage
203		let existing_key = make_key("existing");
204		parent.set(&existing_key, make_value("old")).unwrap();
205		let commit_version = parent.commit().unwrap();
206
207		// Create new parent transaction after commit
208		let mut parent = engine.begin_command().unwrap();
209
210		// Verify value exists in storage
211		assert_eq!(from_store(&mut parent, &existing_key), Some(make_value("old")));
212
213		// Create FlowTransaction
214		let mut txn = FlowTransaction::new(&parent, commit_version);
215
216		// Add a new key and remove the existing one
217		let new_key = make_key("new");
218		txn.set(&new_key, make_value("value")).unwrap();
219		txn.remove(&existing_key).unwrap();
220
221		txn.commit(&mut parent).unwrap();
222
223		// Commit parent to persist the changes
224		parent.commit().unwrap();
225
226		// Create new transaction to verify changes were persisted
227		let mut parent = engine.begin_command().unwrap();
228		assert_eq!(from_store(&mut parent, &new_key), Some(make_value("value")));
229		assert_eq!(from_store(&mut parent, &existing_key), None);
230	}
231
232	#[test]
233	fn test_commit_returns_metrics() {
234		let mut parent = create_test_transaction();
235		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
236
237		txn.set(&make_key("key1"), make_value("value1")).unwrap();
238		txn.get(&make_key("key2")).unwrap();
239		txn.remove(&make_key("key3")).unwrap();
240
241		let metrics = txn.commit(&mut parent).unwrap();
242
243		assert_eq!(metrics.writes, 1);
244		assert_eq!(metrics.reads, 1);
245		assert_eq!(metrics.removes, 1);
246	}
247
248	#[test]
249	fn test_commit_overwrites_storage_value() {
250		use reifydb_core::interface::Engine;
251
252		use crate::operator::stateful::test_utils::test::create_test_engine;
253
254		let engine = create_test_engine();
255		let mut parent = engine.begin_command().unwrap();
256
257		// First commit some data to the underlying storage
258		let key = make_key("key1");
259		parent.set(&key, make_value("old")).unwrap();
260		let commit_version = parent.commit().unwrap();
261
262		// Create new parent transaction after commit
263		let mut parent = engine.begin_command().unwrap();
264
265		// Verify old value exists in storage
266		assert_eq!(from_store(&mut parent, &key), Some(make_value("old")));
267
268		// Create FlowTransaction and overwrite the value
269		let mut txn = FlowTransaction::new(&parent, commit_version);
270		txn.set(&key, make_value("new")).unwrap();
271		txn.commit(&mut parent).unwrap();
272
273		// Parent should have new value
274		assert_eq!(from_store(&mut parent, &key), Some(make_value("new")));
275	}
276
277	#[test]
278	fn test_sequential_commits_different_keys() {
279		let mut parent = create_test_transaction();
280
281		// First FlowTransaction writes to key1
282		// Note: FlowTransactions must operate on non-overlapping keyspaces
283		// This is enforced at the flow scheduler level, not the transaction level
284		let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
285		txn1.set(&make_key("key1"), make_value("value1")).unwrap();
286		txn1.commit(&mut parent).unwrap();
287
288		// Second FlowTransaction writes to key2 (different keyspace)
289		let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
290		txn2.set(&make_key("key2"), make_value("value2")).unwrap();
291		txn2.commit(&mut parent).unwrap();
292
293		// Both values should be in parent
294		assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
295		assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
296	}
297
298	#[test]
299	fn test_same_key_multiple_overwrites() {
300		let mut parent = create_test_transaction();
301		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
302
303		let key = make_key("key1");
304
305		// Pattern 1: set-delete on same key within same transaction
306		txn.set(&key, make_value("first")).unwrap();
307		txn.remove(&key).unwrap();
308
309		// After set-delete, key should be marked for removal
310		assert!(txn.pending.is_removed(&key));
311
312		// Pattern 2: set-delete-set on same key within same transaction
313		txn.set(&key, make_value("second")).unwrap();
314		txn.remove(&key).unwrap();
315		txn.set(&key, make_value("final")).unwrap();
316
317		// After set-delete-set, key should have final value
318		assert_eq!(txn.pending.get(&key), Some(&make_value("final")));
319
320		// Commit and verify final state
321		txn.commit(&mut parent).unwrap();
322
323		// Only the final value should be in parent
324		assert_eq!(from_store(&mut parent, &key), Some(make_value("final")));
325	}
326
327	#[test]
328	fn test_commit_detects_overlapping_writes() {
329		let mut parent = create_test_transaction();
330
331		let key = make_key("key1");
332
333		// Create both FlowTransactions before any commits
334		let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
335		let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
336
337		// Both try to write to the same key
338		txn1.set(&key, make_value("value1")).unwrap();
339		txn2.set(&key, make_value("value2")).unwrap();
340
341		// First commit succeeds
342		txn1.commit(&mut parent).unwrap();
343
344		// Second commit should fail with keyspace overlap error
345		// because txn1 already wrote to key1
346		let result = txn2.commit(&mut parent);
347		assert!(result.is_err());
348
349		// Verify it's the expected error code
350		let err = result.unwrap_err();
351		assert_eq!(err.code, "FLOW_002");
352	}
353
354	#[test]
355	fn test_double_commit_prevention() {
356		let mut parent = create_test_transaction();
357
358		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
359		txn.set(&make_key("key1"), make_value("value1")).unwrap();
360
361		// First commit should succeed
362		let metrics = txn.commit(&mut parent);
363		assert!(metrics.is_ok(), "First commit should succeed");
364
365		// Transaction is consumed after commit, can't commit again
366		// This test verifies at compile-time that txn is moved
367		// If we could access txn here, it would be a bug
368		// The following line should not compile:
369		// txn.commit(&mut parent);  // ERROR: use of moved value
370	}
371
372	#[test]
373	fn test_commit_allows_nonoverlapping_writes() {
374		let mut parent = create_test_transaction();
375
376		// First FlowTransaction writes to key1
377		let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
378		txn1.set(&make_key("key1"), make_value("value1")).unwrap();
379		txn1.commit(&mut parent).unwrap();
380
381		// Second FlowTransaction writes to key2 (different keyspace)
382		// This should succeed because keyspaces don't overlap
383		let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
384		txn2.set(&make_key("key2"), make_value("value2")).unwrap();
385		let result = txn2.commit(&mut parent);
386
387		// Should succeed
388		assert!(result.is_ok());
389
390		// Both values should be in parent
391		assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
392		assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
393	}
394}