reifydb_sub_flow/transaction/
commit.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use diagnostic::flow::flow_transaction_keyspace_overlap;
5use reifydb_core::interface::MultiVersionCommandTransaction;
6use reifydb_engine::StandardCommandTransaction;
7use reifydb_type::{diagnostic, return_error, util::hex};
8
9use super::{FlowTransaction, FlowTransactionMetrics, Pending};
10
11impl FlowTransaction {
12	/// Commit all pending writes and removes to the parent transaction
13	///
14	/// Takes the parent transaction as a mutable reference to apply buffered operations.
15	/// This allows the FlowTransaction to be reused for subsequent units of work.
16	/// The pending buffer is NOT cleared to maintain read-your-own-writes semantics.
17	///
18	/// Returns the transaction metrics.
19	///
20	/// # Errors
21	///
22	/// Returns an error if any key in this FlowTransaction overlaps with keys already
23	/// written by another FlowTransaction to the same parent. FlowTransactions must
24	/// operate on non-overlapping keyspaces.
25	pub fn commit(&mut self, parent: &mut StandardCommandTransaction) -> crate::Result<FlowTransactionMetrics> {
26		// Check for any overlapping keys with the parent's pending writes.
27		// This enforces that FlowTransactions operate on non-overlapping keyspaces.
28		{
29			let parent_pending = parent.pending_writes();
30			for (key, _) in self.pending.iter_sorted() {
31				// Check if key exists in parent
32				if parent_pending.contains_key(key) {
33					return_error!(flow_transaction_keyspace_overlap(hex::encode(key.as_ref())));
34				}
35			}
36		}
37
38		// Now apply all writes and removes
39		for (key, pending) in self.pending.iter_sorted() {
40			match pending {
41				Pending::Write(value) => {
42					parent.set(key, value.clone())?;
43				}
44				Pending::Remove => {
45					parent.remove(key)?;
46				}
47			}
48		}
49
50		self.pending.clear();
51		Ok(self.metrics.clone())
52	}
53}
54
55#[cfg(test)]
56mod tests {
57	use reifydb_core::CommitVersion;
58
59	use super::*;
60	use crate::{
61		operator::stateful::test_utils::test::create_test_transaction,
62		transaction::utils::test::{from_store, make_key, make_value},
63	};
64
65	#[test]
66	fn test_commit_empty_pending() {
67		let mut parent = create_test_transaction();
68		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
69
70		let metrics = txn.commit(&mut parent).unwrap();
71
72		// Metrics should be zero
73		assert_eq!(metrics.reads, 0);
74		assert_eq!(metrics.writes, 0);
75		assert_eq!(metrics.removes, 0);
76	}
77
78	#[test]
79	fn test_commit_single_write() {
80		let mut parent = create_test_transaction();
81		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
82
83		let key = make_key("key1");
84		let value = make_value("value1");
85		txn.set(&key, value.clone()).unwrap();
86
87		txn.commit(&mut parent).unwrap();
88
89		// Parent should now have the value
90		assert_eq!(from_store(&mut parent, &key), Some(value));
91	}
92
93	#[test]
94	fn test_commit_multiple_writes() {
95		let mut parent = create_test_transaction();
96		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
97
98		txn.set(&make_key("key1"), make_value("value1")).unwrap();
99		txn.set(&make_key("key2"), make_value("value2")).unwrap();
100		txn.set(&make_key("key3"), make_value("value3")).unwrap();
101
102		txn.commit(&mut parent).unwrap();
103
104		// All values should be in parent
105		assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
106		assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
107		assert_eq!(from_store(&mut parent, &make_key("key3")), Some(make_value("value3")));
108	}
109
110	#[test]
111	fn test_commit_removes() {
112		use reifydb_core::interface::Engine;
113
114		use crate::operator::stateful::test_utils::test::create_test_engine;
115
116		let engine = create_test_engine();
117		let mut parent = engine.begin_command().unwrap();
118
119		// First commit some data to the underlying storage
120		let key1 = make_key("key1");
121		let key2 = make_key("key2");
122		parent.set(&key1, make_value("value1")).unwrap();
123		parent.set(&key2, make_value("value2")).unwrap();
124		let commit_version = parent.commit().unwrap();
125
126		// Create new parent transaction after commit
127		let mut parent = engine.begin_command().unwrap();
128
129		// Verify values exist in storage
130		assert_eq!(from_store(&mut parent, &key1), Some(make_value("value1")));
131		assert_eq!(from_store(&mut parent, &key2), Some(make_value("value2")));
132
133		// Create FlowTransaction and remove the keys
134		let mut txn = FlowTransaction::new(&parent, commit_version);
135		txn.remove(&key1).unwrap();
136		txn.remove(&key2).unwrap();
137
138		txn.commit(&mut parent).unwrap();
139
140		// Commit parent to persist the removes
141		parent.commit().unwrap();
142
143		// Create new transaction to verify removes were persisted
144		let mut parent = engine.begin_command().unwrap();
145		assert_eq!(from_store(&mut parent, &key1), None);
146		assert_eq!(from_store(&mut parent, &key2), None);
147	}
148
149	#[test]
150	fn test_commit_mixed_writes_and_removes() {
151		use reifydb_core::interface::Engine;
152
153		use crate::operator::stateful::test_utils::test::create_test_engine;
154
155		let engine = create_test_engine();
156		let mut parent = engine.begin_command().unwrap();
157
158		// First commit some data to the underlying storage
159		let existing_key = make_key("existing");
160		parent.set(&existing_key, make_value("old")).unwrap();
161		let commit_version = parent.commit().unwrap();
162
163		// Create new parent transaction after commit
164		let mut parent = engine.begin_command().unwrap();
165
166		// Verify value exists in storage
167		assert_eq!(from_store(&mut parent, &existing_key), Some(make_value("old")));
168
169		// Create FlowTransaction
170		let mut txn = FlowTransaction::new(&parent, commit_version);
171
172		// Add a new key and remove the existing one
173		let new_key = make_key("new");
174		txn.set(&new_key, make_value("value")).unwrap();
175		txn.remove(&existing_key).unwrap();
176
177		txn.commit(&mut parent).unwrap();
178
179		// Commit parent to persist the changes
180		parent.commit().unwrap();
181
182		// Create new transaction to verify changes were persisted
183		let mut parent = engine.begin_command().unwrap();
184		assert_eq!(from_store(&mut parent, &new_key), Some(make_value("value")));
185		assert_eq!(from_store(&mut parent, &existing_key), None);
186	}
187
188	#[test]
189	fn test_commit_returns_metrics() {
190		let mut parent = create_test_transaction();
191		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
192
193		txn.set(&make_key("key1"), make_value("value1")).unwrap();
194		txn.get(&make_key("key2")).unwrap();
195		txn.remove(&make_key("key3")).unwrap();
196
197		let metrics = txn.commit(&mut parent).unwrap();
198
199		assert_eq!(metrics.writes, 1);
200		assert_eq!(metrics.reads, 1);
201		assert_eq!(metrics.removes, 1);
202	}
203
204	#[test]
205	fn test_commit_overwrites_storage_value() {
206		use reifydb_core::interface::Engine;
207
208		use crate::operator::stateful::test_utils::test::create_test_engine;
209
210		let engine = create_test_engine();
211		let mut parent = engine.begin_command().unwrap();
212
213		// First commit some data to the underlying storage
214		let key = make_key("key1");
215		parent.set(&key, make_value("old")).unwrap();
216		let commit_version = parent.commit().unwrap();
217
218		// Create new parent transaction after commit
219		let mut parent = engine.begin_command().unwrap();
220
221		// Verify old value exists in storage
222		assert_eq!(from_store(&mut parent, &key), Some(make_value("old")));
223
224		// Create FlowTransaction and overwrite the value
225		let mut txn = FlowTransaction::new(&parent, commit_version);
226		txn.set(&key, make_value("new")).unwrap();
227		txn.commit(&mut parent).unwrap();
228
229		// Parent should have new value
230		assert_eq!(from_store(&mut parent, &key), Some(make_value("new")));
231	}
232
233	#[test]
234	fn test_sequential_commits_different_keys() {
235		let mut parent = create_test_transaction();
236
237		// First FlowTransaction writes to key1
238		// Note: FlowTransactions must operate on non-overlapping keyspaces
239		// This is enforced at the flow scheduler level, not the transaction level
240		let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
241		txn1.set(&make_key("key1"), make_value("value1")).unwrap();
242		txn1.commit(&mut parent).unwrap();
243
244		// Second FlowTransaction writes to key2 (different keyspace)
245		let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
246		txn2.set(&make_key("key2"), make_value("value2")).unwrap();
247		txn2.commit(&mut parent).unwrap();
248
249		// Both values should be in parent
250		assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
251		assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
252	}
253
254	#[test]
255	fn test_same_key_multiple_overwrites() {
256		let mut parent = create_test_transaction();
257		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
258
259		let key = make_key("key1");
260
261		// Pattern 1: set-delete on same key within same transaction
262		txn.set(&key, make_value("first")).unwrap();
263		txn.remove(&key).unwrap();
264
265		// After set-delete, key should be marked for removal
266		assert!(txn.pending.is_removed(&key));
267
268		// Pattern 2: set-delete-set on same key within same transaction
269		txn.set(&key, make_value("second")).unwrap();
270		txn.remove(&key).unwrap();
271		txn.set(&key, make_value("final")).unwrap();
272
273		// After set-delete-set, key should have final value
274		assert_eq!(txn.pending.get(&key), Some(&make_value("final")));
275
276		// Commit and verify final state
277		txn.commit(&mut parent).unwrap();
278
279		// Only the final value should be in parent
280		assert_eq!(from_store(&mut parent, &key), Some(make_value("final")));
281	}
282
283	#[test]
284	fn test_commit_detects_overlapping_writes() {
285		let mut parent = create_test_transaction();
286
287		let key = make_key("key1");
288
289		// Create both FlowTransactions before any commits
290		let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
291		let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
292
293		// Both try to write to the same key
294		txn1.set(&key, make_value("value1")).unwrap();
295		txn2.set(&key, make_value("value2")).unwrap();
296
297		// First commit succeeds
298		txn1.commit(&mut parent).unwrap();
299
300		// Second commit should fail with keyspace overlap error
301		// because txn1 already wrote to key1
302		let result = txn2.commit(&mut parent);
303		assert!(result.is_err());
304
305		// Verify it's the expected error code
306		let err = result.unwrap_err();
307		assert_eq!(err.code, "FLOW_002");
308	}
309
310	#[test]
311	fn test_double_commit_prevention() {
312		let mut parent = create_test_transaction();
313
314		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
315		txn.set(&make_key("key1"), make_value("value1")).unwrap();
316
317		// First commit should succeed
318		let metrics = txn.commit(&mut parent);
319		assert!(metrics.is_ok(), "First commit should succeed");
320
321		// Transaction is consumed after commit, can't commit again
322		// This test verifies at compile-time that txn is moved
323		// If we could access txn here, it would be a bug
324		// The following line should not compile:
325		// txn.commit(&mut parent);  // ERROR: use of moved value
326	}
327
328	#[test]
329	fn test_commit_allows_nonoverlapping_writes() {
330		let mut parent = create_test_transaction();
331
332		// First FlowTransaction writes to key1
333		let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
334		txn1.set(&make_key("key1"), make_value("value1")).unwrap();
335		txn1.commit(&mut parent).unwrap();
336
337		// Second FlowTransaction writes to key2 (different keyspace)
338		// This should succeed because keyspaces don't overlap
339		let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
340		txn2.set(&make_key("key2"), make_value("value2")).unwrap();
341		let result = txn2.commit(&mut parent);
342
343		// Should succeed
344		assert!(result.is_ok());
345
346		// Both values should be in parent
347		assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
348		assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
349	}
350}