reifydb_sub_flow/transaction/
commit.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use diagnostic::flow::flow_transaction_keyspace_overlap;
5use reifydb_core::interface::{MultiVersionCommandTransaction, interceptor::ViewInterceptor};
6use reifydb_engine::StandardCommandTransaction;
7use reifydb_type::{diagnostic, return_error, util::hex};
8use tracing::instrument;
9
10use super::{FlowTransaction, FlowTransactionMetrics, Pending, ViewPending};
11
12impl FlowTransaction {
13	/// Commit all pending writes and removes to the parent transaction
14	///
15	/// Takes the parent transaction as a mutable reference to apply buffered operations.
16	/// This allows the FlowTransaction to be reused for subsequent units of work.
17	/// The pending buffer is NOT cleared to maintain read-your-own-writes semantics.
18	///
19	/// Returns the transaction metrics.
20	///
21	/// # Errors
22	///
23	/// Returns an error if any key in this FlowTransaction overlaps with keys already
24	/// written by another FlowTransaction to the same parent. FlowTransactions must
25	/// operate on non-overlapping keyspaces.
26	#[instrument(level = "debug", skip(self, parent), fields(
27		pending_count = self.pending.len(),
28		view_ops_count = self.view_pending.len(),
29		writes,
30		removes
31	))]
32	pub fn commit(&mut self, parent: &mut StandardCommandTransaction) -> crate::Result<FlowTransactionMetrics> {
33		// Check for any overlapping keys with the parent's pending writes.
34		// This enforces that FlowTransactions operate on non-overlapping keyspaces.
35		{
36			let parent_pending = parent.pending_writes();
37			for (key, _) in self.pending.iter_sorted() {
38				// Check if key exists in parent
39				if parent_pending.contains_key(key) {
40					return_error!(flow_transaction_keyspace_overlap(hex::encode(key.as_ref())));
41				}
42			}
43		}
44
45		// Process view operations with interceptor calls
46		// We need to call pre_* before the storage write and post_* after
47		for view_op in self.view_pending.drain(..) {
48			match view_op {
49				ViewPending::Insert {
50					view,
51					row_number,
52					row,
53				} => {
54					// Call pre-insert interceptor
55					ViewInterceptor::pre_insert(parent, &view, row_number, &row)?;
56					// Note: The actual storage write is already in self.pending
57					// and will be applied below
58					// Call post-insert interceptor
59					ViewInterceptor::post_insert(parent, &view, row_number, &row)?;
60				}
61				ViewPending::Update {
62					view,
63					old_row_number,
64					new_row_number,
65					row,
66				} => {
67					// Call pre-update interceptor
68					ViewInterceptor::pre_update(parent, &view, new_row_number, &row)?;
69					// Call pre-delete for the old row
70					ViewInterceptor::pre_delete(parent, &view, old_row_number)?;
71					// Note: The actual storage writes are in self.pending
72					// Call post-delete for the old row (we don't have the old row data)
73					// Call post-update interceptor
74					ViewInterceptor::post_update(parent, &view, new_row_number, &row, &row)?;
75				}
76				ViewPending::Remove {
77					view,
78					row_number,
79				} => {
80					// Call pre-delete interceptor
81					ViewInterceptor::pre_delete(parent, &view, row_number)?;
82					// Note: The actual storage removal is in self.pending
83					// We don't have the deleted row data for post_delete, so we skip it
84					// TODO: Consider storing the deleted row data if post_delete is needed
85				}
86			}
87		}
88
89		// Now apply all writes and removes to storage
90		let mut write_count = 0;
91		let mut remove_count = 0;
92		for (key, pending) in self.pending.iter_sorted() {
93			match pending {
94				Pending::Write(value) => {
95					parent.set(key, value.clone())?;
96					write_count += 1;
97				}
98				Pending::Remove => {
99					parent.remove(key)?;
100					remove_count += 1;
101				}
102			}
103		}
104
105		tracing::Span::current().record("writes", write_count);
106		tracing::Span::current().record("removes", remove_count);
107
108		self.pending.clear();
109		Ok(self.metrics.clone())
110	}
111}
112
113#[cfg(test)]
114mod tests {
115	use reifydb_core::CommitVersion;
116
117	use super::*;
118	use crate::{
119		operator::stateful::test_utils::test::create_test_transaction,
120		transaction::utils::test::{from_store, make_key, make_value},
121	};
122
123	#[test]
124	fn test_commit_empty_pending() {
125		let mut parent = create_test_transaction();
126		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
127
128		let metrics = txn.commit(&mut parent).unwrap();
129
130		// Metrics should be zero
131		assert_eq!(metrics.reads, 0);
132		assert_eq!(metrics.writes, 0);
133		assert_eq!(metrics.removes, 0);
134	}
135
136	#[test]
137	fn test_commit_single_write() {
138		let mut parent = create_test_transaction();
139		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
140
141		let key = make_key("key1");
142		let value = make_value("value1");
143		txn.set(&key, value.clone()).unwrap();
144
145		txn.commit(&mut parent).unwrap();
146
147		// Parent should now have the value
148		assert_eq!(from_store(&mut parent, &key), Some(value));
149	}
150
151	#[test]
152	fn test_commit_multiple_writes() {
153		let mut parent = create_test_transaction();
154		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
155
156		txn.set(&make_key("key1"), make_value("value1")).unwrap();
157		txn.set(&make_key("key2"), make_value("value2")).unwrap();
158		txn.set(&make_key("key3"), make_value("value3")).unwrap();
159
160		txn.commit(&mut parent).unwrap();
161
162		// All values should be in parent
163		assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
164		assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
165		assert_eq!(from_store(&mut parent, &make_key("key3")), Some(make_value("value3")));
166	}
167
168	#[test]
169	fn test_commit_removes() {
170		use reifydb_core::interface::Engine;
171
172		use crate::operator::stateful::test_utils::test::create_test_engine;
173
174		let engine = create_test_engine();
175		let mut parent = engine.begin_command().unwrap();
176
177		// First commit some data to the underlying storage
178		let key1 = make_key("key1");
179		let key2 = make_key("key2");
180		parent.set(&key1, make_value("value1")).unwrap();
181		parent.set(&key2, make_value("value2")).unwrap();
182		let commit_version = parent.commit().unwrap();
183
184		// Create new parent transaction after commit
185		let mut parent = engine.begin_command().unwrap();
186
187		// Verify values exist in storage
188		assert_eq!(from_store(&mut parent, &key1), Some(make_value("value1")));
189		assert_eq!(from_store(&mut parent, &key2), Some(make_value("value2")));
190
191		// Create FlowTransaction and remove the keys
192		let mut txn = FlowTransaction::new(&parent, commit_version);
193		txn.remove(&key1).unwrap();
194		txn.remove(&key2).unwrap();
195
196		txn.commit(&mut parent).unwrap();
197
198		// Commit parent to persist the removes
199		parent.commit().unwrap();
200
201		// Create new transaction to verify removes were persisted
202		let mut parent = engine.begin_command().unwrap();
203		assert_eq!(from_store(&mut parent, &key1), None);
204		assert_eq!(from_store(&mut parent, &key2), None);
205	}
206
207	#[test]
208	fn test_commit_mixed_writes_and_removes() {
209		use reifydb_core::interface::Engine;
210
211		use crate::operator::stateful::test_utils::test::create_test_engine;
212
213		let engine = create_test_engine();
214		let mut parent = engine.begin_command().unwrap();
215
216		// First commit some data to the underlying storage
217		let existing_key = make_key("existing");
218		parent.set(&existing_key, make_value("old")).unwrap();
219		let commit_version = parent.commit().unwrap();
220
221		// Create new parent transaction after commit
222		let mut parent = engine.begin_command().unwrap();
223
224		// Verify value exists in storage
225		assert_eq!(from_store(&mut parent, &existing_key), Some(make_value("old")));
226
227		// Create FlowTransaction
228		let mut txn = FlowTransaction::new(&parent, commit_version);
229
230		// Add a new key and remove the existing one
231		let new_key = make_key("new");
232		txn.set(&new_key, make_value("value")).unwrap();
233		txn.remove(&existing_key).unwrap();
234
235		txn.commit(&mut parent).unwrap();
236
237		// Commit parent to persist the changes
238		parent.commit().unwrap();
239
240		// Create new transaction to verify changes were persisted
241		let mut parent = engine.begin_command().unwrap();
242		assert_eq!(from_store(&mut parent, &new_key), Some(make_value("value")));
243		assert_eq!(from_store(&mut parent, &existing_key), None);
244	}
245
246	#[test]
247	fn test_commit_returns_metrics() {
248		let mut parent = create_test_transaction();
249		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
250
251		txn.set(&make_key("key1"), make_value("value1")).unwrap();
252		txn.get(&make_key("key2")).unwrap();
253		txn.remove(&make_key("key3")).unwrap();
254
255		let metrics = txn.commit(&mut parent).unwrap();
256
257		assert_eq!(metrics.writes, 1);
258		assert_eq!(metrics.reads, 1);
259		assert_eq!(metrics.removes, 1);
260	}
261
262	#[test]
263	fn test_commit_overwrites_storage_value() {
264		use reifydb_core::interface::Engine;
265
266		use crate::operator::stateful::test_utils::test::create_test_engine;
267
268		let engine = create_test_engine();
269		let mut parent = engine.begin_command().unwrap();
270
271		// First commit some data to the underlying storage
272		let key = make_key("key1");
273		parent.set(&key, make_value("old")).unwrap();
274		let commit_version = parent.commit().unwrap();
275
276		// Create new parent transaction after commit
277		let mut parent = engine.begin_command().unwrap();
278
279		// Verify old value exists in storage
280		assert_eq!(from_store(&mut parent, &key), Some(make_value("old")));
281
282		// Create FlowTransaction and overwrite the value
283		let mut txn = FlowTransaction::new(&parent, commit_version);
284		txn.set(&key, make_value("new")).unwrap();
285		txn.commit(&mut parent).unwrap();
286
287		// Parent should have new value
288		assert_eq!(from_store(&mut parent, &key), Some(make_value("new")));
289	}
290
291	#[test]
292	fn test_sequential_commits_different_keys() {
293		let mut parent = create_test_transaction();
294
295		// First FlowTransaction writes to key1
296		// Note: FlowTransactions must operate on non-overlapping keyspaces
297		// This is enforced at the flow scheduler level, not the transaction level
298		let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
299		txn1.set(&make_key("key1"), make_value("value1")).unwrap();
300		txn1.commit(&mut parent).unwrap();
301
302		// Second FlowTransaction writes to key2 (different keyspace)
303		let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
304		txn2.set(&make_key("key2"), make_value("value2")).unwrap();
305		txn2.commit(&mut parent).unwrap();
306
307		// Both values should be in parent
308		assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
309		assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
310	}
311
312	#[test]
313	fn test_same_key_multiple_overwrites() {
314		let mut parent = create_test_transaction();
315		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
316
317		let key = make_key("key1");
318
319		// Pattern 1: set-delete on same key within same transaction
320		txn.set(&key, make_value("first")).unwrap();
321		txn.remove(&key).unwrap();
322
323		// After set-delete, key should be marked for removal
324		assert!(txn.pending.is_removed(&key));
325
326		// Pattern 2: set-delete-set on same key within same transaction
327		txn.set(&key, make_value("second")).unwrap();
328		txn.remove(&key).unwrap();
329		txn.set(&key, make_value("final")).unwrap();
330
331		// After set-delete-set, key should have final value
332		assert_eq!(txn.pending.get(&key), Some(&make_value("final")));
333
334		// Commit and verify final state
335		txn.commit(&mut parent).unwrap();
336
337		// Only the final value should be in parent
338		assert_eq!(from_store(&mut parent, &key), Some(make_value("final")));
339	}
340
341	#[test]
342	fn test_commit_detects_overlapping_writes() {
343		let mut parent = create_test_transaction();
344
345		let key = make_key("key1");
346
347		// Create both FlowTransactions before any commits
348		let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
349		let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
350
351		// Both try to write to the same key
352		txn1.set(&key, make_value("value1")).unwrap();
353		txn2.set(&key, make_value("value2")).unwrap();
354
355		// First commit succeeds
356		txn1.commit(&mut parent).unwrap();
357
358		// Second commit should fail with keyspace overlap error
359		// because txn1 already wrote to key1
360		let result = txn2.commit(&mut parent);
361		assert!(result.is_err());
362
363		// Verify it's the expected error code
364		let err = result.unwrap_err();
365		assert_eq!(err.code, "FLOW_002");
366	}
367
368	#[test]
369	fn test_double_commit_prevention() {
370		let mut parent = create_test_transaction();
371
372		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
373		txn.set(&make_key("key1"), make_value("value1")).unwrap();
374
375		// First commit should succeed
376		let metrics = txn.commit(&mut parent);
377		assert!(metrics.is_ok(), "First commit should succeed");
378
379		// Transaction is consumed after commit, can't commit again
380		// This test verifies at compile-time that txn is moved
381		// If we could access txn here, it would be a bug
382		// The following line should not compile:
383		// txn.commit(&mut parent);  // ERROR: use of moved value
384	}
385
386	#[test]
387	fn test_commit_allows_nonoverlapping_writes() {
388		let mut parent = create_test_transaction();
389
390		// First FlowTransaction writes to key1
391		let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1));
392		txn1.set(&make_key("key1"), make_value("value1")).unwrap();
393		txn1.commit(&mut parent).unwrap();
394
395		// Second FlowTransaction writes to key2 (different keyspace)
396		// This should succeed because keyspaces don't overlap
397		let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2));
398		txn2.set(&make_key("key2"), make_value("value2")).unwrap();
399		let result = txn2.commit(&mut parent);
400
401		// Should succeed
402		assert!(result.is_ok());
403
404		// Both values should be in parent
405		assert_eq!(from_store(&mut parent, &make_key("key1")), Some(make_value("value1")));
406		assert_eq!(from_store(&mut parent, &make_key("key2")), Some(make_value("value2")));
407	}
408}