Skip to main content

reifydb_sub_flow/transaction/
write.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::encoded::{key::EncodedKey, row::EncodedRow};
5use reifydb_type::Result;
6
7use super::FlowTransaction;
8
9impl FlowTransaction {
10	/// Set a value, buffering it in pending writes
11	pub fn set(&mut self, key: &EncodedKey, value: EncodedRow) -> Result<()> {
12		self.inner_mut().pending.insert(key.clone(), value);
13		Ok(())
14	}
15
16	/// Remove a key, buffering the deletion in pending operations
17	pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
18		self.inner_mut().pending.remove(key.clone());
19		Ok(())
20	}
21}
22
23#[cfg(test)]
24pub mod tests {
25	use reifydb_catalog::catalog::Catalog;
26	use reifydb_core::{
27		common::CommitVersion,
28		encoded::{key::EncodedKey, row::EncodedRow},
29	};
30	use reifydb_transaction::{interceptor::interceptors::Interceptors, transaction::admin::AdminTransaction};
31	use reifydb_type::util::cowvec::CowVec;
32
33	use super::*;
34	use crate::operator::stateful::test_utils::test::create_test_transaction;
35
36	fn make_key(s: &str) -> EncodedKey {
37		EncodedKey::new(s.as_bytes().to_vec())
38	}
39
40	fn make_value(s: &str) -> EncodedRow {
41		EncodedRow(CowVec::new(s.as_bytes().to_vec()))
42	}
43
44	fn get_row(parent: &mut AdminTransaction, key: &EncodedKey) -> Option<EncodedRow> {
45		parent.get(key).unwrap().map(|m| m.row.clone())
46	}
47
48	#[test]
49	fn test_set_buffers_to_pending() {
50		let parent = create_test_transaction();
51		let mut txn =
52			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
53
54		let key = make_key("key1");
55		let value = make_value("value1");
56
57		txn.set(&key, value.clone()).unwrap();
58
59		// Value should be in pending buffer
60		assert_eq!(txn.pending().get(&key), Some(&value));
61	}
62
63	#[test]
64	fn test_set_multiple_keys() {
65		let parent = create_test_transaction();
66		let mut txn =
67			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
68
69		txn.set(&make_key("key1"), make_value("value1")).unwrap();
70		txn.set(&make_key("key2"), make_value("value2")).unwrap();
71		txn.set(&make_key("key3"), make_value("value3")).unwrap();
72
73		assert_eq!(txn.pending().get(&make_key("key1")), Some(&make_value("value1")));
74		assert_eq!(txn.pending().get(&make_key("key2")), Some(&make_value("value2")));
75		assert_eq!(txn.pending().get(&make_key("key3")), Some(&make_value("value3")));
76	}
77
78	#[test]
79	fn test_set_overwrites_same_key() {
80		let parent = create_test_transaction();
81		let mut txn =
82			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
83
84		let key = make_key("key1");
85		txn.set(&key, make_value("value1")).unwrap();
86		txn.set(&key, make_value("value2")).unwrap();
87
88		// Should have only one entry with latest value
89		assert_eq!(txn.pending().get(&key), Some(&make_value("value2")));
90	}
91
92	#[test]
93	fn test_remove_buffers_to_pending() {
94		let parent = create_test_transaction();
95		let mut txn =
96			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
97
98		let key = make_key("key1");
99		txn.remove(&key).unwrap();
100
101		// Key should be marked for removal in pending buffer
102		assert!(txn.pending().is_removed(&key));
103	}
104
105	#[test]
106	fn test_remove_multiple_keys() {
107		let parent = create_test_transaction();
108		let mut txn =
109			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
110
111		txn.remove(&make_key("key1")).unwrap();
112		txn.remove(&make_key("key2")).unwrap();
113		txn.remove(&make_key("key3")).unwrap();
114
115		assert!(txn.pending().is_removed(&make_key("key1")));
116		assert!(txn.pending().is_removed(&make_key("key2")));
117		assert!(txn.pending().is_removed(&make_key("key3")));
118	}
119
120	#[test]
121	fn test_set_then_remove() {
122		let parent = create_test_transaction();
123		let mut txn =
124			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
125
126		let key = make_key("key1");
127		txn.set(&key, make_value("value1")).unwrap();
128		assert_eq!(txn.pending().get(&key), Some(&make_value("value1")));
129
130		txn.remove(&key).unwrap();
131		assert!(txn.pending().is_removed(&key));
132		assert_eq!(txn.pending().get(&key), None);
133	}
134
135	#[test]
136	fn test_remove_then_set() {
137		let parent = create_test_transaction();
138		let mut txn =
139			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
140
141		let key = make_key("key1");
142		txn.remove(&key).unwrap();
143		assert!(txn.pending().is_removed(&key));
144
145		txn.set(&key, make_value("value1")).unwrap();
146		assert!(!txn.pending().is_removed(&key));
147		assert_eq!(txn.pending().get(&key), Some(&make_value("value1")));
148	}
149
150	#[test]
151	fn test_writes_not_visible_to_parent() {
152		let mut parent = create_test_transaction();
153		let mut txn =
154			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
155
156		let key = make_key("key1");
157		let value = make_value("value1");
158
159		// Set in FlowTransaction
160		txn.set(&key, value.clone()).unwrap();
161
162		// Parent should not see the write
163		assert_eq!(get_row(&mut parent, &key), None);
164	}
165
166	#[test]
167	fn test_removes_not_visible_to_parent() {
168		let mut parent = create_test_transaction();
169
170		// Set a value in parent
171		let key = make_key("key1");
172		let value = make_value("value1");
173		parent.set(&key, value.clone()).unwrap();
174		assert_eq!(get_row(&mut parent, &key), Some(value.clone()));
175
176		// Create FlowTransaction and remove the key
177		let parent_version = parent.version();
178		let mut txn =
179			FlowTransaction::deferred(&parent, parent_version, Catalog::testing(), Interceptors::new());
180		txn.remove(&key).unwrap();
181
182		// Parent should still see the value
183		assert_eq!(get_row(&mut parent, &key), Some(value));
184	}
185
186	#[test]
187	fn test_mixed_writes_and_removes() {
188		let parent = create_test_transaction();
189		let mut txn =
190			FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
191
192		txn.set(&make_key("write1"), make_value("v1")).unwrap();
193		txn.remove(&make_key("remove1")).unwrap();
194		txn.set(&make_key("write2"), make_value("v2")).unwrap();
195		txn.remove(&make_key("remove2")).unwrap();
196
197		assert_eq!(txn.pending().get(&make_key("write1")), Some(&make_value("v1")));
198		assert_eq!(txn.pending().get(&make_key("write2")), Some(&make_value("v2")));
199		assert!(txn.pending().is_removed(&make_key("remove1")));
200		assert!(txn.pending().is_removed(&make_key("remove2")));
201	}
202}