reifydb_sub_flow/transaction/
commit.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::interface::MultiVersionCommandTransaction;
5use reifydb_engine::StandardCommandTransaction;
6use reifydb_type::{diagnostic, return_error, util::hex};
7
8use super::{FlowTransaction, FlowTransactionMetrics, Pending};
9
10impl FlowTransaction {
11	/// Commit all pending writes and removes to the parent transaction
12	///
13	/// Takes the parent transaction as a mutable reference to apply buffered operations.
14	/// This allows the FlowTransaction to be reused for subsequent units of work.
15	/// The pending buffer is NOT cleared to maintain read-your-own-writes semantics.
16	///
17	/// Returns the transaction metrics.
18	///
19	/// # Errors
20	///
21	/// Returns an error if any key in this FlowTransaction overlaps with keys already
22	/// written by another FlowTransaction to the same parent. FlowTransactions must
23	/// operate on non-overlapping keyspaces.
24	pub fn commit(&mut self, parent: &mut StandardCommandTransaction) -> crate::Result<FlowTransactionMetrics> {
25		// Check for any overlapping keys with the parent's pending writes.
26		// This enforces that FlowTransactions operate on non-overlapping keyspaces.
27		{
28			let parent_pending = parent.pending_writes();
29			for (key, _) in self.pending.iter_sorted() {
30				// Check if key exists in parent
31				if parent_pending.contains_key(key) {
32					return_error!(diagnostic::flow::flow_transaction_keyspace_overlap(
33						hex::encode(key.as_ref())
34					));
35				}
36			}
37		}
38
39		// Now apply all writes and removes
40		for (key, pending) in self.pending.iter_sorted() {
41			match pending {
42				Pending::Write(value) => {
43					parent.set(key, value.clone())?;
44				}
45				Pending::Remove => {
46					parent.remove(key)?;
47				}
48			}
49		}
50
51		self.pending.clear();
52
53		Ok(self.metrics.clone())
54	}
55}
56
57#[cfg(test)]
58mod tests {
59	use reifydb_core::CommitVersion;
60
61	use super::*;
62	use crate::{
63		operator::stateful::test_utils::test::create_test_transaction,
64		transaction::test_utils::test::{from_store, make_key, make_value},
65	};
66
67	#[test]
68	fn test_commit_empty_pending() {
69		let mut parent = create_test_transaction();
70		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
71
72		let metrics = txn.commit(&mut parent).unwrap();
73
74		// Metrics should be zero
75		assert_eq!(metrics.reads, 0);
76		assert_eq!(metrics.writes, 0);
77		assert_eq!(metrics.removes, 0);
78	}
79
80	#[test]
81	fn test_commit_single_write() {
82		let mut parent = create_test_transaction();
83		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
84
85		let key = make_key("key1");
86		let value = make_value("value1");
87		txn.set(&key, value.clone()).unwrap();
88
89		txn.commit(&mut parent).unwrap();
90
91		// Parent should now have the value
92		assert_eq!(from_store(&mut parent, &key), Some(value));
93	}
94
95	#[test]
96	fn test_commit_multiple_writes() {
97		let mut parent = create_test_transaction();
98		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
99
100		txn.set(&make_key("key1"), make_value("value1")).unwrap();
101		txn.set(&make_key("key2"), make_value("value2")).unwrap();
102		txn.set(&make_key("key3"), make_value("value3")).unwrap();
103
104		txn.commit(&mut parent).unwrap();
105
106		// All values should be in parent
107		assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
108		assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
109		assert_eq!(from_store(&mut parent, &make_key("key3")), Some(make_value("value3")));
110	}
111
112	#[test]
113	fn test_commit_removes() {
114		use reifydb_core::interface::Engine;
115
116		use crate::operator::stateful::test_utils::test::create_test_engine;
117
118		let engine = create_test_engine();
119		let mut parent = engine.begin_command().unwrap();
120
121		// First commit some data to the underlying storage
122		let key1 = make_key("key1");
123		let key2 = make_key("key2");
124		parent.set(&key1, make_value("value1")).unwrap();
125		parent.set(&key2, make_value("value2")).unwrap();
126		let commit_version = parent.commit().unwrap();
127
128		// Create new parent transaction after commit
129		let mut parent = engine.begin_command().unwrap();
130
131		// Verify values exist in storage
132		assert_eq!(from_store(&mut parent, &key1), Some(make_value("value1")));
133		assert_eq!(from_store(&mut parent, &key2), Some(make_value("value2")));
134
135		// Create FlowTransaction and remove the keys
136		let mut txn = FlowTransaction::new(&parent, commit_version);
137		txn.remove(&key1).unwrap();
138		txn.remove(&key2).unwrap();
139
140		txn.commit(&mut parent).unwrap();
141
142		// Commit parent to persist the removes
143		parent.commit().unwrap();
144
145		// Create new transaction to verify removes were persisted
146		let mut parent = engine.begin_command().unwrap();
147		assert_eq!(from_store(&mut parent, &key1), None);
148		assert_eq!(from_store(&mut parent, &key2), None);
149	}
150
151	#[test]
152	fn test_commit_mixed_writes_and_removes() {
153		use reifydb_core::interface::Engine;
154
155		use crate::operator::stateful::test_utils::test::create_test_engine;
156
157		let engine = create_test_engine();
158		let mut parent = engine.begin_command().unwrap();
159
160		// First commit some data to the underlying storage
161		let existing_key = make_key("existing");
162		parent.set(&existing_key, make_value("old")).unwrap();
163		let commit_version = parent.commit().unwrap();
164
165		// Create new parent transaction after commit
166		let mut parent = engine.begin_command().unwrap();
167
168		// Verify value exists in storage
169		assert_eq!(from_store(&mut parent, &existing_key), Some(make_value("old")));
170
171		// Create FlowTransaction
172		let mut txn = FlowTransaction::new(&parent, commit_version);
173
174		// Add a new key and remove the existing one
175		let new_key = make_key("new");
176		txn.set(&new_key, make_value("value")).unwrap();
177		txn.remove(&existing_key).unwrap();
178
179		txn.commit(&mut parent).unwrap();
180
181		// Commit parent to persist the changes
182		parent.commit().unwrap();
183
184		// Create new transaction to verify changes were persisted
185		let mut parent = engine.begin_command().unwrap();
186		assert_eq!(from_store(&mut parent, &new_key), Some(make_value("value")));
187		assert_eq!(from_store(&mut parent, &existing_key), None);
188	}
189
190	#[test]
191	fn test_commit_returns_metrics() {
192		let mut parent = create_test_transaction();
193		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
194
195		txn.set(&make_key("key1"), make_value("value1")).unwrap();
196		txn.get(&make_key("key2")).unwrap();
197		txn.remove(&make_key("key3")).unwrap();
198
199		let metrics = txn.commit(&mut parent).unwrap();
200
201		assert_eq!(metrics.writes, 1);
202		assert_eq!(metrics.reads, 1);
203		assert_eq!(metrics.removes, 1);
204	}
205
206	#[test]
207	fn test_commit_overwrites_storage_value() {
208		use reifydb_core::interface::Engine;
209
210		use crate::operator::stateful::test_utils::test::create_test_engine;
211
212		let engine = create_test_engine();
213		let mut parent = engine.begin_command().unwrap();
214
215		// First commit some data to the underlying storage
216		let key = make_key("key1");
217		parent.set(&key, make_value("old")).unwrap();
218		let commit_version = parent.commit().unwrap();
219
220		// Create new parent transaction after commit
221		let mut parent = engine.begin_command().unwrap();
222
223		// Verify old value exists in storage
224		assert_eq!(from_store(&mut parent, &key), Some(make_value("old")));
225
226		// Create FlowTransaction and overwrite the value
227		let mut txn = FlowTransaction::new(&parent, commit_version);
228		txn.set(&key, make_value("new")).unwrap();
229		txn.commit(&mut parent).unwrap();
230
231		// Parent should have new value
232		assert_eq!(from_store(&mut parent, &key), Some(make_value("new")));
233	}
234
235	#[test]
236	fn test_sequential_commits_different_keys() {
237		let mut parent = create_test_transaction();
238
239		// First FlowTransaction writes to key1
240		// Note: FlowTransactions must operate on non-overlapping keyspaces
241		// This is enforced at the flow scheduler level, not the transaction level
242		let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
243		txn1.set(&make_key("key1"), make_value("value1")).unwrap();
244		txn1.commit(&mut parent).unwrap();
245
246		// Second FlowTransaction writes to key2 (different keyspace)
247		let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
248		txn2.set(&make_key("key2"), make_value("value2")).unwrap();
249		txn2.commit(&mut parent).unwrap();
250
251		// Both values should be in parent
252		assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
253		assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
254	}
255
256	#[test]
257	fn test_same_key_multiple_overwrites() {
258		let mut parent = create_test_transaction();
259		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
260
261		let key = make_key("key1");
262
263		// Pattern 1: set-delete on same key within same transaction
264		txn.set(&key, make_value("first")).unwrap();
265		txn.remove(&key).unwrap();
266
267		// After set-delete, key should be marked for removal
268		assert!(txn.pending.is_removed(&key));
269
270		// Pattern 2: set-delete-set on same key within same transaction
271		txn.set(&key, make_value("second")).unwrap();
272		txn.remove(&key).unwrap();
273		txn.set(&key, make_value("final")).unwrap();
274
275		// After set-delete-set, key should have final value
276		assert_eq!(txn.pending.get(&key), Some(&make_value("final")));
277
278		// Commit and verify final state
279		txn.commit(&mut parent).unwrap();
280
281		// Only the final value should be in parent
282		assert_eq!(from_store(&mut parent, &key), Some(make_value("final")));
283	}
284
285	#[test]
286	fn test_commit_detects_overlapping_writes() {
287		let mut parent = create_test_transaction();
288
289		let key = make_key("key1");
290
291		// Create both FlowTransactions before any commits
292		let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
293		let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
294
295		// Both try to write to the same key
296		txn1.set(&key, make_value("value1")).unwrap();
297		txn2.set(&key, make_value("value2")).unwrap();
298
299		// First commit succeeds
300		txn1.commit(&mut parent).unwrap();
301
302		// Second commit should fail with keyspace overlap error
303		// because txn1 already wrote to key1
304		let result = txn2.commit(&mut parent);
305		assert!(result.is_err());
306
307		// Verify it's the expected error code
308		let err = result.unwrap_err();
309		assert_eq!(err.code, "FLOW_002");
310	}
311
312	#[test]
313	fn test_double_commit_prevention() {
314		let mut parent = create_test_transaction();
315
316		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
317		txn.set(&make_key("key1"), make_value("value1")).unwrap();
318
319		// First commit should succeed
320		let metrics = txn.commit(&mut parent);
321		assert!(metrics.is_ok(), "First commit should succeed");
322
323		// Transaction is consumed after commit, can't commit again
324		// This test verifies at compile-time that txn is moved
325		// If we could access txn here, it would be a bug
326		// The following line should not compile:
327		// txn.commit(&mut parent);  // ERROR: use of moved value
328	}
329
330	#[test]
331	fn test_commit_allows_nonoverlapping_writes() {
332		let mut parent = create_test_transaction();
333
334		// First FlowTransaction writes to key1
335		let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
336		txn1.set(&make_key("key1"), make_value("value1")).unwrap();
337		txn1.commit(&mut parent).unwrap();
338
339		// Second FlowTransaction writes to key2 (different keyspace)
340		// This should succeed because keyspaces don't overlap
341		let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
342		txn2.set(&make_key("key2"), make_value("value2")).unwrap();
343		let result = txn2.commit(&mut parent);
344
345		// Should succeed
346		assert!(result.is_ok());
347
348		// Both values should be in parent
349		assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
350		assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
351	}
352}