reifydb_sub_flow/transaction/
write.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{EncodedKey, value::encoded::EncodedValues};
5
6use super::FlowTransaction;
7
8impl FlowTransaction {
9	/// Set a value, buffering it in pending writes
10	pub fn set(&mut self, key: &EncodedKey, value: EncodedValues) -> crate::Result<()> {
11		self.metrics.increment_writes();
12		self.pending.insert(key.clone(), value);
13		Ok(())
14	}
15
16	/// Remove a key, buffering the deletion in pending operations
17	pub fn remove(&mut self, key: &EncodedKey) -> crate::Result<()> {
18		self.metrics.increment_removes();
19		self.pending.remove(key.clone());
20		Ok(())
21	}
22}
23
24#[cfg(test)]
25mod tests {
26	use reifydb_core::{
27		CommitVersion, CowVec, EncodedKey,
28		interface::{MultiVersionCommandTransaction, MultiVersionQueryTransaction},
29		value::encoded::EncodedValues,
30	};
31	use reifydb_engine::StandardCommandTransaction;
32
33	use super::*;
34	use crate::operator::stateful::test_utils::test::create_test_transaction;
35
36	fn make_key(s: &str) -> EncodedKey {
37		EncodedKey::new(s.as_bytes().to_vec())
38	}
39
40	fn make_value(s: &str) -> EncodedValues {
41		EncodedValues(CowVec::new(s.as_bytes().to_vec()))
42	}
43
44	async fn get_values(parent: &mut StandardCommandTransaction, key: &EncodedKey) -> Option<EncodedValues> {
45		parent.get(key).await.unwrap().map(|m| m.values)
46	}
47
48	#[tokio::test]
49	async fn test_set_buffers_to_pending() {
50		let parent = create_test_transaction().await;
51		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
52
53		let key = make_key("key1");
54		let value = make_value("value1");
55
56		txn.set(&key, value.clone()).unwrap();
57
58		// Value should be in pending buffer
59		assert_eq!(txn.pending.get(&key), Some(&value));
60		assert_eq!(txn.pending.len(), 1);
61	}
62
63	#[tokio::test]
64	async fn test_set_increments_writes_metric() {
65		let parent = create_test_transaction().await;
66		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
67
68		assert_eq!(txn.metrics().writes, 0);
69
70		txn.set(&make_key("key1"), make_value("value1")).unwrap();
71		assert_eq!(txn.metrics().writes, 1);
72
73		txn.set(&make_key("key2"), make_value("value2")).unwrap();
74		assert_eq!(txn.metrics().writes, 2);
75	}
76
77	#[tokio::test]
78	async fn test_set_multiple_keys() {
79		let parent = create_test_transaction().await;
80		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
81
82		txn.set(&make_key("key1"), make_value("value1")).unwrap();
83		txn.set(&make_key("key2"), make_value("value2")).unwrap();
84		txn.set(&make_key("key3"), make_value("value3")).unwrap();
85
86		assert_eq!(txn.pending.len(), 3);
87		assert_eq!(txn.metrics().writes, 3);
88		assert_eq!(txn.pending.get(&make_key("key1")), Some(&make_value("value1")));
89		assert_eq!(txn.pending.get(&make_key("key2")), Some(&make_value("value2")));
90		assert_eq!(txn.pending.get(&make_key("key3")), Some(&make_value("value3")));
91	}
92
93	#[tokio::test]
94	async fn test_set_overwrites_same_key() {
95		let parent = create_test_transaction().await;
96		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
97
98		let key = make_key("key1");
99		txn.set(&key, make_value("value1")).unwrap();
100		txn.set(&key, make_value("value2")).unwrap();
101
102		// Should have only one entry with latest value
103		assert_eq!(txn.pending.len(), 1);
104		assert_eq!(txn.pending.get(&key), Some(&make_value("value2")));
105		// Both writes should be counted in metrics
106		assert_eq!(txn.metrics().writes, 2);
107	}
108
109	#[tokio::test]
110	async fn test_remove_buffers_to_pending() {
111		let parent = create_test_transaction().await;
112		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
113
114		let key = make_key("key1");
115		txn.remove(&key).unwrap();
116
117		// Key should be marked for removal in pending buffer
118		assert!(txn.pending.is_removed(&key));
119		assert_eq!(txn.pending.len(), 1);
120	}
121
122	#[tokio::test]
123	async fn test_remove_increments_removes_metric() {
124		let parent = create_test_transaction().await;
125		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
126
127		assert_eq!(txn.metrics().removes, 0);
128
129		txn.remove(&make_key("key1")).unwrap();
130		assert_eq!(txn.metrics().removes, 1);
131
132		txn.remove(&make_key("key2")).unwrap();
133		assert_eq!(txn.metrics().removes, 2);
134	}
135
136	#[tokio::test]
137	async fn test_remove_multiple_keys() {
138		let parent = create_test_transaction().await;
139		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
140
141		txn.remove(&make_key("key1")).unwrap();
142		txn.remove(&make_key("key2")).unwrap();
143		txn.remove(&make_key("key3")).unwrap();
144
145		assert_eq!(txn.pending.len(), 3);
146		assert_eq!(txn.metrics().removes, 3);
147		assert!(txn.pending.is_removed(&make_key("key1")));
148		assert!(txn.pending.is_removed(&make_key("key2")));
149		assert!(txn.pending.is_removed(&make_key("key3")));
150	}
151
152	#[tokio::test]
153	async fn test_set_then_remove() {
154		let parent = create_test_transaction().await;
155		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
156
157		let key = make_key("key1");
158		txn.set(&key, make_value("value1")).unwrap();
159		assert_eq!(txn.pending.get(&key), Some(&make_value("value1")));
160
161		txn.remove(&key).unwrap();
162		assert!(txn.pending.is_removed(&key));
163		assert_eq!(txn.pending.get(&key), None);
164
165		// Metrics should count both operations
166		assert_eq!(txn.metrics().writes, 1);
167		assert_eq!(txn.metrics().removes, 1);
168	}
169
170	#[tokio::test]
171	async fn test_remove_then_set() {
172		let parent = create_test_transaction().await;
173		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
174
175		let key = make_key("key1");
176		txn.remove(&key).unwrap();
177		assert!(txn.pending.is_removed(&key));
178
179		txn.set(&key, make_value("value1")).unwrap();
180		assert!(!txn.pending.is_removed(&key));
181		assert_eq!(txn.pending.get(&key), Some(&make_value("value1")));
182
183		// Metrics should count both operations
184		assert_eq!(txn.metrics().removes, 1);
185		assert_eq!(txn.metrics().writes, 1);
186	}
187
188	#[tokio::test]
189	async fn test_writes_not_visible_to_parent() {
190		let mut parent = create_test_transaction().await;
191		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
192
193		let key = make_key("key1");
194		let value = make_value("value1");
195
196		// Set in FlowTransaction
197		txn.set(&key, value.clone()).unwrap();
198
199		// Parent should not see the write
200		assert_eq!(get_values(&mut parent, &key).await, None);
201	}
202
203	#[tokio::test]
204	async fn test_removes_not_visible_to_parent() {
205		let mut parent = create_test_transaction().await;
206
207		// Set a value in parent
208		let key = make_key("key1");
209		let value = make_value("value1");
210		parent.set(&key, value.clone()).await.unwrap();
211		assert_eq!(get_values(&mut parent, &key).await, Some(value.clone()));
212
213		// Create FlowTransaction and remove the key
214		let parent_version = parent.version();
215		let mut txn = FlowTransaction::new(&parent, parent_version).await;
216		txn.remove(&key).unwrap();
217
218		// Parent should still see the value
219		assert_eq!(get_values(&mut parent, &key).await, Some(value));
220	}
221
222	#[tokio::test]
223	async fn test_mixed_writes_and_removes() {
224		let parent = create_test_transaction().await;
225		let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
226
227		txn.set(&make_key("write1"), make_value("v1")).unwrap();
228		txn.remove(&make_key("remove1")).unwrap();
229		txn.set(&make_key("write2"), make_value("v2")).unwrap();
230		txn.remove(&make_key("remove2")).unwrap();
231
232		assert_eq!(txn.pending.len(), 4);
233		assert_eq!(txn.metrics().writes, 2);
234		assert_eq!(txn.metrics().removes, 2);
235
236		assert_eq!(txn.pending.get(&make_key("write1")), Some(&make_value("v1")));
237		assert_eq!(txn.pending.get(&make_key("write2")), Some(&make_value("v2")));
238		assert!(txn.pending.is_removed(&make_key("remove1")));
239		assert!(txn.pending.is_removed(&make_key("remove2")));
240	}
241}