reifydb_sub_flow/transaction/
write.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{EncodedKey, interface::ViewDef, value::encoded::EncodedValues};
5use reifydb_type::RowNumber;
6
7use super::{FlowTransaction, ViewPending};
8
9impl FlowTransaction {
10	/// Set a value, buffering it in pending writes
11	pub fn set(&mut self, key: &EncodedKey, value: EncodedValues) -> crate::Result<()> {
12		self.metrics.increment_writes();
13		self.pending.insert(key.clone(), value);
14		Ok(())
15	}
16
17	/// Remove a key, buffering the deletion in pending operations
18	pub fn remove(&mut self, key: &EncodedKey) -> crate::Result<()> {
19		self.metrics.increment_removes();
20		self.pending.remove(key.clone());
21		Ok(())
22	}
23
24	/// Insert a row into a view, tracking the operation for interceptor calls at commit time.
25	///
26	/// This method buffers the write in pending (for storage) and also tracks the view
27	/// operation separately so that ViewInterceptor::pre_insert/post_insert can be called
28	/// on the parent transaction during commit().
29	pub fn insert_view(
30		&mut self,
31		key: &EncodedKey,
32		view: ViewDef,
33		row_number: RowNumber,
34		row: EncodedValues,
35	) -> crate::Result<()> {
36		self.metrics.increment_writes();
37		self.pending.insert(key.clone(), row.clone());
38		self.view_pending.push(ViewPending::Insert {
39			view,
40			row_number,
41			row,
42		});
43		Ok(())
44	}
45
46	/// Update a row in a view (remove old, insert new), tracking for interceptor calls.
47	///
48	/// This method buffers both the remove and set in pending (for storage) and tracks
49	/// the view operation for ViewInterceptor::pre_update/post_update calls during commit().
50	pub fn update_view(
51		&mut self,
52		old_key: &EncodedKey,
53		new_key: &EncodedKey,
54		view: ViewDef,
55		old_row_number: RowNumber,
56		new_row_number: RowNumber,
57		row: EncodedValues,
58	) -> crate::Result<()> {
59		self.metrics.increment_removes();
60		self.metrics.increment_writes();
61		self.pending.remove(old_key.clone());
62		self.pending.insert(new_key.clone(), row.clone());
63		self.view_pending.push(ViewPending::Update {
64			view,
65			old_row_number,
66			new_row_number,
67			row,
68		});
69		Ok(())
70	}
71
72	/// Remove a row from a view, tracking the operation for interceptor calls at commit time.
73	///
74	/// This method buffers the removal in pending (for storage) and tracks the view
75	/// operation for ViewInterceptor::pre_delete/post_delete calls during commit().
76	pub fn remove_view(&mut self, key: &EncodedKey, view: ViewDef, row_number: RowNumber) -> crate::Result<()> {
77		self.metrics.increment_removes();
78		self.pending.remove(key.clone());
79		self.view_pending.push(ViewPending::Remove {
80			view,
81			row_number,
82		});
83		Ok(())
84	}
85}
86
87#[cfg(test)]
88mod tests {
89	use reifydb_core::{
90		CommitVersion, CowVec, EncodedKey,
91		interface::{MultiVersionCommandTransaction, MultiVersionQueryTransaction},
92		value::encoded::EncodedValues,
93	};
94	use reifydb_engine::StandardCommandTransaction;
95
96	use super::*;
97	use crate::operator::stateful::test_utils::test::create_test_transaction;
98
99	fn make_key(s: &str) -> EncodedKey {
100		EncodedKey::new(s.as_bytes().to_vec())
101	}
102
103	fn make_value(s: &str) -> EncodedValues {
104		EncodedValues(CowVec::new(s.as_bytes().to_vec()))
105	}
106
107	fn get_values(parent: &mut StandardCommandTransaction, key: &EncodedKey) -> Option<EncodedValues> {
108		parent.get(key).unwrap().map(|m| m.values)
109	}
110
111	#[test]
112	fn test_set_buffers_to_pending() {
113		let parent = create_test_transaction();
114		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
115
116		let key = make_key("key1");
117		let value = make_value("value1");
118
119		txn.set(&key, value.clone()).unwrap();
120
121		// Value should be in pending buffer
122		assert_eq!(txn.pending.get(&key), Some(&value));
123		assert_eq!(txn.pending.len(), 1);
124	}
125
126	#[test]
127	fn test_set_increments_writes_metric() {
128		let parent = create_test_transaction();
129		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
130
131		assert_eq!(txn.metrics().writes, 0);
132
133		txn.set(&make_key("key1"), make_value("value1")).unwrap();
134		assert_eq!(txn.metrics().writes, 1);
135
136		txn.set(&make_key("key2"), make_value("value2")).unwrap();
137		assert_eq!(txn.metrics().writes, 2);
138	}
139
140	#[test]
141	fn test_set_multiple_keys() {
142		let parent = create_test_transaction();
143		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
144
145		txn.set(&make_key("key1"), make_value("value1")).unwrap();
146		txn.set(&make_key("key2"), make_value("value2")).unwrap();
147		txn.set(&make_key("key3"), make_value("value3")).unwrap();
148
149		assert_eq!(txn.pending.len(), 3);
150		assert_eq!(txn.metrics().writes, 3);
151		assert_eq!(txn.pending.get(&make_key("key1")), Some(&make_value("value1")));
152		assert_eq!(txn.pending.get(&make_key("key2")), Some(&make_value("value2")));
153		assert_eq!(txn.pending.get(&make_key("key3")), Some(&make_value("value3")));
154	}
155
156	#[test]
157	fn test_set_overwrites_same_key() {
158		let parent = create_test_transaction();
159		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
160
161		let key = make_key("key1");
162		txn.set(&key, make_value("value1")).unwrap();
163		txn.set(&key, make_value("value2")).unwrap();
164
165		// Should have only one entry with latest value
166		assert_eq!(txn.pending.len(), 1);
167		assert_eq!(txn.pending.get(&key), Some(&make_value("value2")));
168		// Both writes should be counted in metrics
169		assert_eq!(txn.metrics().writes, 2);
170	}
171
172	#[test]
173	fn test_remove_buffers_to_pending() {
174		let parent = create_test_transaction();
175		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
176
177		let key = make_key("key1");
178		txn.remove(&key).unwrap();
179
180		// Key should be marked for removal in pending buffer
181		assert!(txn.pending.is_removed(&key));
182		assert_eq!(txn.pending.len(), 1);
183	}
184
185	#[test]
186	fn test_remove_increments_removes_metric() {
187		let parent = create_test_transaction();
188		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
189
190		assert_eq!(txn.metrics().removes, 0);
191
192		txn.remove(&make_key("key1")).unwrap();
193		assert_eq!(txn.metrics().removes, 1);
194
195		txn.remove(&make_key("key2")).unwrap();
196		assert_eq!(txn.metrics().removes, 2);
197	}
198
199	#[test]
200	fn test_remove_multiple_keys() {
201		let parent = create_test_transaction();
202		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
203
204		txn.remove(&make_key("key1")).unwrap();
205		txn.remove(&make_key("key2")).unwrap();
206		txn.remove(&make_key("key3")).unwrap();
207
208		assert_eq!(txn.pending.len(), 3);
209		assert_eq!(txn.metrics().removes, 3);
210		assert!(txn.pending.is_removed(&make_key("key1")));
211		assert!(txn.pending.is_removed(&make_key("key2")));
212		assert!(txn.pending.is_removed(&make_key("key3")));
213	}
214
215	#[test]
216	fn test_set_then_remove() {
217		let parent = create_test_transaction();
218		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
219
220		let key = make_key("key1");
221		txn.set(&key, make_value("value1")).unwrap();
222		assert_eq!(txn.pending.get(&key), Some(&make_value("value1")));
223
224		txn.remove(&key).unwrap();
225		assert!(txn.pending.is_removed(&key));
226		assert_eq!(txn.pending.get(&key), None);
227
228		// Metrics should count both operations
229		assert_eq!(txn.metrics().writes, 1);
230		assert_eq!(txn.metrics().removes, 1);
231	}
232
233	#[test]
234	fn test_remove_then_set() {
235		let parent = create_test_transaction();
236		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
237
238		let key = make_key("key1");
239		txn.remove(&key).unwrap();
240		assert!(txn.pending.is_removed(&key));
241
242		txn.set(&key, make_value("value1")).unwrap();
243		assert!(!txn.pending.is_removed(&key));
244		assert_eq!(txn.pending.get(&key), Some(&make_value("value1")));
245
246		// Metrics should count both operations
247		assert_eq!(txn.metrics().removes, 1);
248		assert_eq!(txn.metrics().writes, 1);
249	}
250
251	#[test]
252	fn test_writes_not_visible_to_parent() {
253		let mut parent = create_test_transaction();
254		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
255
256		let key = make_key("key1");
257		let value = make_value("value1");
258
259		// Set in FlowTransaction
260		txn.set(&key, value.clone()).unwrap();
261
262		// Parent should not see the write
263		assert_eq!(get_values(&mut parent, &key), None);
264	}
265
266	#[test]
267	fn test_removes_not_visible_to_parent() {
268		let mut parent = create_test_transaction();
269
270		// Set a value in parent
271		let key = make_key("key1");
272		let value = make_value("value1");
273		parent.set(&key, value.clone()).unwrap();
274		assert_eq!(get_values(&mut parent, &key), Some(value.clone()));
275
276		// Create FlowTransaction and remove the key
277		let parent_version = parent.version();
278		let mut txn = FlowTransaction::new(&parent, parent_version);
279		txn.remove(&key).unwrap();
280
281		// Parent should still see the value
282		assert_eq!(get_values(&mut parent, &key), Some(value));
283	}
284
285	#[test]
286	fn test_mixed_writes_and_removes() {
287		let parent = create_test_transaction();
288		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
289
290		txn.set(&make_key("write1"), make_value("v1")).unwrap();
291		txn.remove(&make_key("remove1")).unwrap();
292		txn.set(&make_key("write2"), make_value("v2")).unwrap();
293		txn.remove(&make_key("remove2")).unwrap();
294
295		assert_eq!(txn.pending.len(), 4);
296		assert_eq!(txn.metrics().writes, 2);
297		assert_eq!(txn.metrics().removes, 2);
298
299		assert_eq!(txn.pending.get(&make_key("write1")), Some(&make_value("v1")));
300		assert_eq!(txn.pending.get(&make_key("write2")), Some(&make_value("v2")));
301		assert!(txn.pending.is_removed(&make_key("remove1")));
302		assert!(txn.pending.is_removed(&make_key("remove2")));
303	}
304}