reifydb_sub_flow/transaction/
commit.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use diagnostic::flow::flow_transaction_keyspace_overlap;
5use reifydb_core::interface::MultiVersionCommandTransaction;
6use reifydb_engine::StandardCommandTransaction;
7use reifydb_type::{diagnostic, return_error, util::hex};
8use tracing::instrument;
9
10use super::{FlowTransaction, FlowTransactionMetrics, Pending};
11
12impl FlowTransaction {
13	/// Commit all pending writes and removes to the parent transaction
14	///
15	/// Takes the parent transaction as a mutable reference to apply buffered operations.
16	/// This allows the FlowTransaction to be reused for subsequent units of work.
17	/// The pending buffer is NOT cleared to maintain read-your-own-writes semantics.
18	///
19	/// Returns the transaction metrics.
20	///
21	/// # Errors
22	///
23	/// Returns an error if any key in this FlowTransaction overlaps with keys already
24	/// written by another FlowTransaction to the same parent. FlowTransactions must
25	/// operate on non-overlapping keyspaces.
26
27	#[instrument(name = "flow::transaction::commit", level = "debug", skip(self, parent), fields(
28		pending_count = self.pending.len(),
29		writes,
30		removes
31	))]
32	pub async fn commit(
33		&mut self,
34		parent: &mut StandardCommandTransaction,
35	) -> crate::Result<FlowTransactionMetrics> {
36		// Check for any overlapping keys with the parent's pending writes.
37		// This enforces that FlowTransactions operate on non-overlapping keyspaces.
38		{
39			let parent_pending = parent.pending_writes();
40			for (key, _) in self.pending.iter_sorted() {
41				// Check if key exists in parent
42				if parent_pending.contains_key(key) {
43					return_error!(flow_transaction_keyspace_overlap(hex::encode(key.as_ref())));
44				}
45			}
46		}
47
48		let mut set_count = 0;
49		let mut remove_count = 0;
50		for (key, pending) in self.pending.iter_sorted() {
51			match pending {
52				Pending::Set(value) => {
53					parent.set(key, value.clone()).await?;
54					set_count += 1;
55				}
56				Pending::Remove => {
57					parent.remove(key).await?;
58					remove_count += 1;
59				}
60			}
61		}
62
63		tracing::Span::current().record("sets", set_count);
64		tracing::Span::current().record("removes", remove_count);
65
66		self.pending.clear();
67		Ok(self.metrics.clone())
68	}
69}
70
71#[cfg(test)]
72mod tests {
73	use reifydb_core::CommitVersion;
74
75	use super::*;
76	use crate::{
77		operator::stateful::test_utils::test::create_test_transaction,
78		transaction::utils::test::{from_store, make_key, make_value},
79	};
80
81	#[tokio::test]
82	async fn test_commit_empty_pending() {
83		let mut parent = create_test_transaction().await;
84		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
85
86		let metrics = txn.commit(&mut parent).await.unwrap();
87
88		// Metrics should be zero
89		assert_eq!(metrics.reads, 0);
90		assert_eq!(metrics.writes, 0);
91		assert_eq!(metrics.removes, 0);
92	}
93
94	#[tokio::test]
95	async fn test_commit_single_write() {
96		let mut parent = create_test_transaction().await;
97		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
98
99		let key = make_key("key1");
100		let value = make_value("value1");
101		txn.set(&key, value.clone()).unwrap();
102
103		txn.commit(&mut parent).await.unwrap();
104
105		// Parent should now have the value
106		assert_eq!(from_store(&mut parent, &key).await, Some(value));
107	}
108
109	#[tokio::test]
110	async fn test_commit_multiple_writes() {
111		let mut parent = create_test_transaction().await;
112		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
113
114		txn.set(&make_key("key1"), make_value("value1")).unwrap();
115		txn.set(&make_key("key2"), make_value("value2")).unwrap();
116		txn.set(&make_key("key3"), make_value("value3")).unwrap();
117
118		txn.commit(&mut parent).await.unwrap();
119
120		// All values should be in parent
121		assert_eq!(from_store(&mut parent, &make_key("key1")).await, Some(make_value("value1")));
122		assert_eq!(from_store(&mut parent, &make_key("key2")).await, Some(make_value("value2")));
123		assert_eq!(from_store(&mut parent, &make_key("key3")).await, Some(make_value("value3")));
124	}
125
126	#[tokio::test]
127	async fn test_commit_removes() {
128		use reifydb_core::interface::Engine;
129
130		use crate::operator::stateful::test_utils::test::create_test_engine;
131
132		let engine = create_test_engine().await;
133		let mut parent = engine.begin_command().await.unwrap();
134
135		// First commit some data to the underlying storage
136		let key1 = make_key("key1");
137		let key2 = make_key("key2");
138		parent.set(&key1, make_value("value1")).await.unwrap();
139		parent.set(&key2, make_value("value2")).await.unwrap();
140		let commit_version = parent.commit().await.unwrap();
141
142		// Create new parent transaction after commit
143		let mut parent = engine.begin_command().await.unwrap();
144
145		// Verify values exist in storage
146		assert_eq!(from_store(&mut parent, &key1).await, Some(make_value("value1")));
147		assert_eq!(from_store(&mut parent, &key2).await, Some(make_value("value2")));
148
149		// Create FlowTransaction and remove the keys
150		let mut txn = FlowTransaction::new(&parent, commit_version).await;
151		txn.remove(&key1).unwrap();
152		txn.remove(&key2).unwrap();
153
154		txn.commit(&mut parent).await.unwrap();
155
156		// Commit parent to persist the removes
157		parent.commit().await.unwrap();
158
159		// Create new transaction to verify removes were persisted
160		let mut parent = engine.begin_command().await.unwrap();
161		assert_eq!(from_store(&mut parent, &key1).await, None);
162		assert_eq!(from_store(&mut parent, &key2).await, None);
163	}
164
165	#[tokio::test]
166	async fn test_commit_mixed_writes_and_removes() {
167		use reifydb_core::interface::Engine;
168
169		use crate::operator::stateful::test_utils::test::create_test_engine;
170
171		let engine = create_test_engine().await;
172		let mut parent = engine.begin_command().await.unwrap();
173
174		// First commit some data to the underlying storage
175		let existing_key = make_key("existing");
176		parent.set(&existing_key, make_value("old")).await.unwrap();
177		let commit_version = parent.commit().await.unwrap();
178
179		// Create new parent transaction after commit
180		let mut parent = engine.begin_command().await.unwrap();
181
182		// Verify value exists in storage
183		assert_eq!(from_store(&mut parent, &existing_key).await, Some(make_value("old")));
184
185		// Create FlowTransaction
186		let mut txn = FlowTransaction::new(&parent, commit_version).await;
187
188		// Add a new key and remove the existing one
189		let new_key = make_key("new");
190		txn.set(&new_key, make_value("value")).unwrap();
191		txn.remove(&existing_key).unwrap();
192
193		txn.commit(&mut parent).await.unwrap();
194
195		// Commit parent to persist the changes
196		parent.commit().await.unwrap();
197
198		// Create new transaction to verify changes were persisted
199		let mut parent = engine.begin_command().await.unwrap();
200		assert_eq!(from_store(&mut parent, &new_key).await, Some(make_value("value")));
201		assert_eq!(from_store(&mut parent, &existing_key).await, None);
202	}
203
204	#[tokio::test]
205	async fn test_commit_returns_metrics() {
206		let mut parent = create_test_transaction().await;
207		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
208
209		txn.set(&make_key("key1"), make_value("value1")).unwrap();
210		txn.get(&make_key("key2")).await.unwrap();
211		txn.remove(&make_key("key3")).unwrap();
212
213		let metrics = txn.commit(&mut parent).await.unwrap();
214
215		assert_eq!(metrics.writes, 1);
216		assert_eq!(metrics.reads, 1);
217		assert_eq!(metrics.removes, 1);
218	}
219
220	#[tokio::test]
221	async fn test_commit_overwrites_storage_value() {
222		use reifydb_core::interface::Engine;
223
224		use crate::operator::stateful::test_utils::test::create_test_engine;
225
226		let engine = create_test_engine().await;
227		let mut parent = engine.begin_command().await.unwrap();
228
229		// First commit some data to the underlying storage
230		let key = make_key("key1");
231		parent.set(&key, make_value("old")).await.unwrap();
232		let commit_version = parent.commit().await.unwrap();
233
234		// Create new parent transaction after commit
235		let mut parent = engine.begin_command().await.unwrap();
236
237		// Verify old value exists in storage
238		assert_eq!(from_store(&mut parent, &key).await, Some(make_value("old")));
239
240		// Create FlowTransaction and overwrite the value
241		let mut txn = FlowTransaction::new(&parent, commit_version).await;
242		txn.set(&key, make_value("new")).unwrap();
243		txn.commit(&mut parent).await.unwrap();
244
245		// Parent should have new value
246		assert_eq!(from_store(&mut parent, &key).await, Some(make_value("new")));
247	}
248
249	#[tokio::test]
250	async fn test_sequential_commits_different_keys() {
251		let mut parent = create_test_transaction().await;
252
253		// First FlowTransaction writes to key1
254		// Note: FlowTransactions must operate on non-overlapping keyspaces
255		// This is enforced at the flow scheduler level, not the transaction level
256		let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1)).await;
257		txn1.set(&make_key("key1"), make_value("value1")).unwrap();
258		txn1.commit(&mut parent).await.unwrap();
259
260		// Second FlowTransaction writes to key2 (different keyspace)
261		let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2)).await;
262		txn2.set(&make_key("key2"), make_value("value2")).unwrap();
263		txn2.commit(&mut parent).await.unwrap();
264
265		// Both values should be in parent
266		assert_eq!(from_store(&mut parent, &make_key("key1")).await, Some(make_value("value1")));
267		assert_eq!(from_store(&mut parent, &make_key("key2")).await, Some(make_value("value2")));
268	}
269
270	#[tokio::test]
271	async fn test_same_key_multiple_overwrites() {
272		let mut parent = create_test_transaction().await;
273		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
274
275		let key = make_key("key1");
276
277		// Pattern 1: set-delete on same key within same transaction
278		txn.set(&key, make_value("first")).unwrap();
279		txn.remove(&key).unwrap();
280
281		// After set-delete, key should be marked for removal
282		assert!(txn.pending.is_removed(&key));
283
284		// Pattern 2: set-delete-set on same key within same transaction
285		txn.set(&key, make_value("second")).unwrap();
286		txn.remove(&key).unwrap();
287		txn.set(&key, make_value("final")).unwrap();
288
289		// After set-delete-set, key should have final value
290		assert_eq!(txn.pending.get(&key), Some(&make_value("final")));
291
292		// Commit and verify final state
293		txn.commit(&mut parent).await.unwrap();
294
295		// Only the final value should be in parent
296		assert_eq!(from_store(&mut parent, &key).await, Some(make_value("final")));
297	}
298
299	#[tokio::test]
300	async fn test_commit_detects_overlapping_writes() {
301		let mut parent = create_test_transaction().await;
302
303		let key = make_key("key1");
304
305		// Create both FlowTransactions before any commits
306		let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1)).await;
307		let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2)).await;
308
309		// Both try to write to the same key
310		txn1.set(&key, make_value("value1")).unwrap();
311		txn2.set(&key, make_value("value2")).unwrap();
312
313		// First commit succeeds
314		txn1.commit(&mut parent).await.unwrap();
315
316		// Second commit should fail with keyspace overlap error
317		// because txn1 already wrote to key1
318		let result = txn2.commit(&mut parent).await;
319		assert!(result.is_err());
320
321		// Verify it's the expected error code
322		let err = result.unwrap_err();
323		assert_eq!(err.code, "FLOW_002");
324	}
325
326	#[tokio::test]
327	async fn test_double_commit_prevention() {
328		let mut parent = create_test_transaction().await;
329
330		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
331		txn.set(&make_key("key1"), make_value("value1")).unwrap();
332
333		// First commit should succeed
334		let metrics = txn.commit(&mut parent).await;
335		assert!(metrics.is_ok(), "First commit should succeed");
336
337		// Transaction is consumed after commit, can't commit again
338		// This test verifies at compile-time that txn is moved
339		// If we could access txn here, it would be a bug
340		// The following line should not compile:
341		// txn.commit(&mut parent).await;  // ERROR: use of moved value
342	}
343
344	#[tokio::test]
345	async fn test_commit_allows_nonoverlapping_writes() {
346		let mut parent = create_test_transaction().await;
347
348		// First FlowTransaction writes to key1
349		let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1)).await;
350		txn1.set(&make_key("key1"), make_value("value1")).unwrap();
351		txn1.commit(&mut parent).await.unwrap();
352
353		// Second FlowTransaction writes to key2 (different keyspace)
354		// This should succeed because keyspaces don't overlap
355		let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2)).await;
356		txn2.set(&make_key("key2"), make_value("value2")).unwrap();
357		let result = txn2.commit(&mut parent).await;
358
359		// Should succeed
360		assert!(result.is_ok());
361
362		// Both values should be in parent
363		assert_eq!(from_store(&mut parent, &make_key("key1")).await, Some(make_value("value1")));
364		assert_eq!(from_store(&mut parent, &make_key("key2")).await, Some(make_value("value2")));
365	}
366}