Skip to main content

reifydb_sub_flow/transaction/
write.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::encoded::{key::EncodedKey, row::EncodedRow};
5use reifydb_type::Result;
6
7use super::FlowTransaction;
8
9impl FlowTransaction {
10	/// Set a value, buffering it in pending writes
11	pub fn set(&mut self, key: &EncodedKey, value: EncodedRow) -> Result<()> {
12		self.inner_mut().pending.insert(key.clone(), value);
13		Ok(())
14	}
15
16	/// Remove a key, buffering the deletion in pending operations
17	pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
18		self.inner_mut().pending.remove(key.clone());
19		Ok(())
20	}
21}
22
23#[cfg(test)]
24pub mod tests {
25	use reifydb_catalog::catalog::Catalog;
26	use reifydb_core::{
27		common::CommitVersion,
28		encoded::{key::EncodedKey, row::EncodedRow},
29	};
30	use reifydb_runtime::context::clock::{Clock, MockClock};
31	use reifydb_transaction::{interceptor::interceptors::Interceptors, transaction::admin::AdminTransaction};
32	use reifydb_type::util::cowvec::CowVec;
33
34	use super::*;
35	use crate::operator::stateful::test_utils::test::create_test_transaction;
36
37	fn make_key(s: &str) -> EncodedKey {
38		EncodedKey::new(s.as_bytes().to_vec())
39	}
40
41	fn make_value(s: &str) -> EncodedRow {
42		EncodedRow(CowVec::new(s.as_bytes().to_vec()))
43	}
44
45	fn get_row(parent: &mut AdminTransaction, key: &EncodedKey) -> Option<EncodedRow> {
46		parent.get(key).unwrap().map(|m| m.row.clone())
47	}
48
49	#[test]
50	fn test_set_buffers_to_pending() {
51		let parent = create_test_transaction();
52		let mut txn = FlowTransaction::deferred(
53			&parent,
54			CommitVersion(1),
55			Catalog::testing(),
56			Interceptors::new(),
57			Clock::Mock(MockClock::from_millis(1000)),
58		);
59
60		let key = make_key("key1");
61		let value = make_value("value1");
62
63		txn.set(&key, value.clone()).unwrap();
64
65		// Value should be in pending buffer
66		assert_eq!(txn.pending().get(&key), Some(&value));
67	}
68
69	#[test]
70	fn test_set_multiple_keys() {
71		let parent = create_test_transaction();
72		let mut txn = FlowTransaction::deferred(
73			&parent,
74			CommitVersion(1),
75			Catalog::testing(),
76			Interceptors::new(),
77			Clock::Mock(MockClock::from_millis(1000)),
78		);
79
80		txn.set(&make_key("key1"), make_value("value1")).unwrap();
81		txn.set(&make_key("key2"), make_value("value2")).unwrap();
82		txn.set(&make_key("key3"), make_value("value3")).unwrap();
83
84		assert_eq!(txn.pending().get(&make_key("key1")), Some(&make_value("value1")));
85		assert_eq!(txn.pending().get(&make_key("key2")), Some(&make_value("value2")));
86		assert_eq!(txn.pending().get(&make_key("key3")), Some(&make_value("value3")));
87	}
88
89	#[test]
90	fn test_set_overwrites_same_key() {
91		let parent = create_test_transaction();
92		let mut txn = FlowTransaction::deferred(
93			&parent,
94			CommitVersion(1),
95			Catalog::testing(),
96			Interceptors::new(),
97			Clock::Mock(MockClock::from_millis(1000)),
98		);
99
100		let key = make_key("key1");
101		txn.set(&key, make_value("value1")).unwrap();
102		txn.set(&key, make_value("value2")).unwrap();
103
104		// Should have only one entry with latest value
105		assert_eq!(txn.pending().get(&key), Some(&make_value("value2")));
106	}
107
108	#[test]
109	fn test_remove_buffers_to_pending() {
110		let parent = create_test_transaction();
111		let mut txn = FlowTransaction::deferred(
112			&parent,
113			CommitVersion(1),
114			Catalog::testing(),
115			Interceptors::new(),
116			Clock::Mock(MockClock::from_millis(1000)),
117		);
118
119		let key = make_key("key1");
120		txn.remove(&key).unwrap();
121
122		// Key should be marked for removal in pending buffer
123		assert!(txn.pending().is_removed(&key));
124	}
125
126	#[test]
127	fn test_remove_multiple_keys() {
128		let parent = create_test_transaction();
129		let mut txn = FlowTransaction::deferred(
130			&parent,
131			CommitVersion(1),
132			Catalog::testing(),
133			Interceptors::new(),
134			Clock::Mock(MockClock::from_millis(1000)),
135		);
136
137		txn.remove(&make_key("key1")).unwrap();
138		txn.remove(&make_key("key2")).unwrap();
139		txn.remove(&make_key("key3")).unwrap();
140
141		assert!(txn.pending().is_removed(&make_key("key1")));
142		assert!(txn.pending().is_removed(&make_key("key2")));
143		assert!(txn.pending().is_removed(&make_key("key3")));
144	}
145
146	#[test]
147	fn test_set_then_remove() {
148		let parent = create_test_transaction();
149		let mut txn = FlowTransaction::deferred(
150			&parent,
151			CommitVersion(1),
152			Catalog::testing(),
153			Interceptors::new(),
154			Clock::Mock(MockClock::from_millis(1000)),
155		);
156
157		let key = make_key("key1");
158		txn.set(&key, make_value("value1")).unwrap();
159		assert_eq!(txn.pending().get(&key), Some(&make_value("value1")));
160
161		txn.remove(&key).unwrap();
162		assert!(txn.pending().is_removed(&key));
163		assert_eq!(txn.pending().get(&key), None);
164	}
165
166	#[test]
167	fn test_remove_then_set() {
168		let parent = create_test_transaction();
169		let mut txn = FlowTransaction::deferred(
170			&parent,
171			CommitVersion(1),
172			Catalog::testing(),
173			Interceptors::new(),
174			Clock::Mock(MockClock::from_millis(1000)),
175		);
176
177		let key = make_key("key1");
178		txn.remove(&key).unwrap();
179		assert!(txn.pending().is_removed(&key));
180
181		txn.set(&key, make_value("value1")).unwrap();
182		assert!(!txn.pending().is_removed(&key));
183		assert_eq!(txn.pending().get(&key), Some(&make_value("value1")));
184	}
185
186	#[test]
187	fn test_writes_not_visible_to_parent() {
188		let mut parent = create_test_transaction();
189		let mut txn = FlowTransaction::deferred(
190			&parent,
191			CommitVersion(1),
192			Catalog::testing(),
193			Interceptors::new(),
194			Clock::Mock(MockClock::from_millis(1000)),
195		);
196
197		let key = make_key("key1");
198		let value = make_value("value1");
199
200		// Set in FlowTransaction
201		txn.set(&key, value.clone()).unwrap();
202
203		// Parent should not see the write
204		assert_eq!(get_row(&mut parent, &key), None);
205	}
206
207	#[test]
208	fn test_removes_not_visible_to_parent() {
209		let mut parent = create_test_transaction();
210
211		// Set a value in parent
212		let key = make_key("key1");
213		let value = make_value("value1");
214		parent.set(&key, value.clone()).unwrap();
215		assert_eq!(get_row(&mut parent, &key), Some(value.clone()));
216
217		// Create FlowTransaction and remove the key
218		let parent_version = parent.version();
219		let mut txn = FlowTransaction::deferred(
220			&parent,
221			parent_version,
222			Catalog::testing(),
223			Interceptors::new(),
224			Clock::Mock(MockClock::from_millis(1000)),
225		);
226		txn.remove(&key).unwrap();
227
228		// Parent should still see the value
229		assert_eq!(get_row(&mut parent, &key), Some(value));
230	}
231
232	#[test]
233	fn test_mixed_writes_and_removes() {
234		let parent = create_test_transaction();
235		let mut txn = FlowTransaction::deferred(
236			&parent,
237			CommitVersion(1),
238			Catalog::testing(),
239			Interceptors::new(),
240			Clock::Mock(MockClock::from_millis(1000)),
241		);
242
243		txn.set(&make_key("write1"), make_value("v1")).unwrap();
244		txn.remove(&make_key("remove1")).unwrap();
245		txn.set(&make_key("write2"), make_value("v2")).unwrap();
246		txn.remove(&make_key("remove2")).unwrap();
247
248		assert_eq!(txn.pending().get(&make_key("write1")), Some(&make_value("v1")));
249		assert_eq!(txn.pending().get(&make_key("write2")), Some(&make_value("v2")));
250		assert!(txn.pending().is_removed(&make_key("remove1")));
251		assert!(txn.pending().is_removed(&make_key("remove2")));
252	}
253}