Skip to main content

reifydb_sub_flow/transaction/
write.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::encoded::{encoded::EncodedValues, key::EncodedKey};
5use reifydb_type::Result;
6
7use super::FlowTransaction;
8
9impl FlowTransaction {
10	/// Set a value, buffering it in pending writes
11	pub fn set(&mut self, key: &EncodedKey, value: EncodedValues) -> Result<()> {
12		match self {
13			Self::Deferred {
14				pending,
15				..
16			} => pending.insert(key.clone(), value),
17			Self::Transactional {
18				pending,
19				..
20			} => pending.insert(key.clone(), value),
21		}
22		Ok(())
23	}
24
25	/// Remove a key, buffering the deletion in pending operations
26	pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
27		match self {
28			Self::Deferred {
29				pending,
30				..
31			} => pending.remove(key.clone()),
32			Self::Transactional {
33				pending,
34				..
35			} => pending.remove(key.clone()),
36		}
37		Ok(())
38	}
39}
40
41#[cfg(test)]
42pub mod tests {
43	use reifydb_catalog::catalog::Catalog;
44	use reifydb_core::{
45		common::CommitVersion,
46		encoded::{encoded::EncodedValues, key::EncodedKey},
47	};
48	use reifydb_transaction::{interceptor::interceptors::Interceptors, transaction::admin::AdminTransaction};
49	use reifydb_type::util::cowvec::CowVec;
50
51	use super::*;
52	use crate::operator::stateful::test_utils::test::create_test_transaction;
53
54	fn make_key(s: &str) -> EncodedKey {
55		EncodedKey::new(s.as_bytes().to_vec())
56	}
57
58	fn make_value(s: &str) -> EncodedValues {
59		EncodedValues(CowVec::new(s.as_bytes().to_vec()))
60	}
61
62	fn get_values(parent: &mut AdminTransaction, key: &EncodedKey) -> Option<EncodedValues> {
63		parent.get(key).unwrap().map(|m| m.values.clone())
64	}
65
66	#[test]
67	fn test_set_buffers_to_pending() {
68		let parent = create_test_transaction();
69		let mut txn =
70			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
71
72		let key = make_key("key1");
73		let value = make_value("value1");
74
75		txn.set(&key, value.clone()).unwrap();
76
77		// Value should be in pending buffer
78		assert_eq!(txn.pending().get(&key), Some(&value));
79	}
80
81	#[test]
82	fn test_set_multiple_keys() {
83		let parent = create_test_transaction();
84		let mut txn =
85			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
86
87		txn.set(&make_key("key1"), make_value("value1")).unwrap();
88		txn.set(&make_key("key2"), make_value("value2")).unwrap();
89		txn.set(&make_key("key3"), make_value("value3")).unwrap();
90
91		assert_eq!(txn.pending().get(&make_key("key1")), Some(&make_value("value1")));
92		assert_eq!(txn.pending().get(&make_key("key2")), Some(&make_value("value2")));
93		assert_eq!(txn.pending().get(&make_key("key3")), Some(&make_value("value3")));
94	}
95
96	#[test]
97	fn test_set_overwrites_same_key() {
98		let parent = create_test_transaction();
99		let mut txn =
100			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
101
102		let key = make_key("key1");
103		txn.set(&key, make_value("value1")).unwrap();
104		txn.set(&key, make_value("value2")).unwrap();
105
106		// Should have only one entry with latest value
107		assert_eq!(txn.pending().get(&key), Some(&make_value("value2")));
108	}
109
110	#[test]
111	fn test_remove_buffers_to_pending() {
112		let parent = create_test_transaction();
113		let mut txn =
114			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
115
116		let key = make_key("key1");
117		txn.remove(&key).unwrap();
118
119		// Key should be marked for removal in pending buffer
120		assert!(txn.pending().is_removed(&key));
121	}
122
123	#[test]
124	fn test_remove_multiple_keys() {
125		let parent = create_test_transaction();
126		let mut txn =
127			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
128
129		txn.remove(&make_key("key1")).unwrap();
130		txn.remove(&make_key("key2")).unwrap();
131		txn.remove(&make_key("key3")).unwrap();
132
133		assert!(txn.pending().is_removed(&make_key("key1")));
134		assert!(txn.pending().is_removed(&make_key("key2")));
135		assert!(txn.pending().is_removed(&make_key("key3")));
136	}
137
138	#[test]
139	fn test_set_then_remove() {
140		let parent = create_test_transaction();
141		let mut txn =
142			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
143
144		let key = make_key("key1");
145		txn.set(&key, make_value("value1")).unwrap();
146		assert_eq!(txn.pending().get(&key), Some(&make_value("value1")));
147
148		txn.remove(&key).unwrap();
149		assert!(txn.pending().is_removed(&key));
150		assert_eq!(txn.pending().get(&key), None);
151	}
152
153	#[test]
154	fn test_remove_then_set() {
155		let parent = create_test_transaction();
156		let mut txn =
157			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
158
159		let key = make_key("key1");
160		txn.remove(&key).unwrap();
161		assert!(txn.pending().is_removed(&key));
162
163		txn.set(&key, make_value("value1")).unwrap();
164		assert!(!txn.pending().is_removed(&key));
165		assert_eq!(txn.pending().get(&key), Some(&make_value("value1")));
166	}
167
168	#[test]
169	fn test_writes_not_visible_to_parent() {
170		let mut parent = create_test_transaction();
171		let mut txn =
172			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
173
174		let key = make_key("key1");
175		let value = make_value("value1");
176
177		// Set in FlowTransaction
178		txn.set(&key, value.clone()).unwrap();
179
180		// Parent should not see the write
181		assert_eq!(get_values(&mut parent, &key), None);
182	}
183
184	#[test]
185	fn test_removes_not_visible_to_parent() {
186		let mut parent = create_test_transaction();
187
188		// Set a value in parent
189		let key = make_key("key1");
190		let value = make_value("value1");
191		parent.set(&key, value.clone()).unwrap();
192		assert_eq!(get_values(&mut parent, &key), Some(value.clone()));
193
194		// Create FlowTransaction and remove the key
195		let parent_version = parent.version();
196		let mut txn =
197			FlowTransaction::deferred(&parent, parent_version, Catalog::testing(), Interceptors::new());
198		txn.remove(&key).unwrap();
199
200		// Parent should still see the value
201		assert_eq!(get_values(&mut parent, &key), Some(value));
202	}
203
204	#[test]
205	fn test_mixed_writes_and_removes() {
206		let parent = create_test_transaction();
207		let mut txn =
208			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
209
210		txn.set(&make_key("write1"), make_value("v1")).unwrap();
211		txn.remove(&make_key("remove1")).unwrap();
212		txn.set(&make_key("write2"), make_value("v2")).unwrap();
213		txn.remove(&make_key("remove2")).unwrap();
214
215		assert_eq!(txn.pending().get(&make_key("write1")), Some(&make_value("v1")));
216		assert_eq!(txn.pending().get(&make_key("write2")), Some(&make_value("v2")));
217		assert!(txn.pending().is_removed(&make_key("remove1")));
218		assert!(txn.pending().is_removed(&make_key("remove2")));
219	}
220}