reifydb_sub_flow/transaction/
write.rs1use reifydb_core::{EncodedKey, value::encoded::EncodedValues};
5
6use super::FlowTransaction;
7
8impl FlowTransaction {
9 pub fn set(&mut self, key: &EncodedKey, value: EncodedValues) -> crate::Result<()> {
11 self.metrics.increment_writes();
12 self.pending.insert(key.clone(), value);
13 Ok(())
14 }
15
16 pub fn remove(&mut self, key: &EncodedKey) -> crate::Result<()> {
18 self.metrics.increment_removes();
19 self.pending.remove(key.clone());
20 Ok(())
21 }
22}
23
24#[cfg(test)]
25mod tests {
26 use reifydb_core::{
27 CommitVersion, CowVec, EncodedKey,
28 interface::{MultiVersionCommandTransaction, MultiVersionQueryTransaction},
29 value::encoded::EncodedValues,
30 };
31 use reifydb_engine::StandardCommandTransaction;
32
33 use super::*;
34 use crate::operator::stateful::test_utils::test::create_test_transaction;
35
36 fn make_key(s: &str) -> EncodedKey {
37 EncodedKey::new(s.as_bytes().to_vec())
38 }
39
40 fn make_value(s: &str) -> EncodedValues {
41 EncodedValues(CowVec::new(s.as_bytes().to_vec()))
42 }
43
44 async fn get_values(parent: &mut StandardCommandTransaction, key: &EncodedKey) -> Option<EncodedValues> {
45 parent.get(key).await.unwrap().map(|m| m.values)
46 }
47
48 #[tokio::test]
49 async fn test_set_buffers_to_pending() {
50 let parent = create_test_transaction().await;
51 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
52
53 let key = make_key("key1");
54 let value = make_value("value1");
55
56 txn.set(&key, value.clone()).unwrap();
57
58 assert_eq!(txn.pending.get(&key), Some(&value));
60 assert_eq!(txn.pending.len(), 1);
61 }
62
63 #[tokio::test]
64 async fn test_set_increments_writes_metric() {
65 let parent = create_test_transaction().await;
66 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
67
68 assert_eq!(txn.metrics().writes, 0);
69
70 txn.set(&make_key("key1"), make_value("value1")).unwrap();
71 assert_eq!(txn.metrics().writes, 1);
72
73 txn.set(&make_key("key2"), make_value("value2")).unwrap();
74 assert_eq!(txn.metrics().writes, 2);
75 }
76
77 #[tokio::test]
78 async fn test_set_multiple_keys() {
79 let parent = create_test_transaction().await;
80 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
81
82 txn.set(&make_key("key1"), make_value("value1")).unwrap();
83 txn.set(&make_key("key2"), make_value("value2")).unwrap();
84 txn.set(&make_key("key3"), make_value("value3")).unwrap();
85
86 assert_eq!(txn.pending.len(), 3);
87 assert_eq!(txn.metrics().writes, 3);
88 assert_eq!(txn.pending.get(&make_key("key1")), Some(&make_value("value1")));
89 assert_eq!(txn.pending.get(&make_key("key2")), Some(&make_value("value2")));
90 assert_eq!(txn.pending.get(&make_key("key3")), Some(&make_value("value3")));
91 }
92
93 #[tokio::test]
94 async fn test_set_overwrites_same_key() {
95 let parent = create_test_transaction().await;
96 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
97
98 let key = make_key("key1");
99 txn.set(&key, make_value("value1")).unwrap();
100 txn.set(&key, make_value("value2")).unwrap();
101
102 assert_eq!(txn.pending.len(), 1);
104 assert_eq!(txn.pending.get(&key), Some(&make_value("value2")));
105 assert_eq!(txn.metrics().writes, 2);
107 }
108
109 #[tokio::test]
110 async fn test_remove_buffers_to_pending() {
111 let parent = create_test_transaction().await;
112 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
113
114 let key = make_key("key1");
115 txn.remove(&key).unwrap();
116
117 assert!(txn.pending.is_removed(&key));
119 assert_eq!(txn.pending.len(), 1);
120 }
121
122 #[tokio::test]
123 async fn test_remove_increments_removes_metric() {
124 let parent = create_test_transaction().await;
125 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
126
127 assert_eq!(txn.metrics().removes, 0);
128
129 txn.remove(&make_key("key1")).unwrap();
130 assert_eq!(txn.metrics().removes, 1);
131
132 txn.remove(&make_key("key2")).unwrap();
133 assert_eq!(txn.metrics().removes, 2);
134 }
135
136 #[tokio::test]
137 async fn test_remove_multiple_keys() {
138 let parent = create_test_transaction().await;
139 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
140
141 txn.remove(&make_key("key1")).unwrap();
142 txn.remove(&make_key("key2")).unwrap();
143 txn.remove(&make_key("key3")).unwrap();
144
145 assert_eq!(txn.pending.len(), 3);
146 assert_eq!(txn.metrics().removes, 3);
147 assert!(txn.pending.is_removed(&make_key("key1")));
148 assert!(txn.pending.is_removed(&make_key("key2")));
149 assert!(txn.pending.is_removed(&make_key("key3")));
150 }
151
152 #[tokio::test]
153 async fn test_set_then_remove() {
154 let parent = create_test_transaction().await;
155 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
156
157 let key = make_key("key1");
158 txn.set(&key, make_value("value1")).unwrap();
159 assert_eq!(txn.pending.get(&key), Some(&make_value("value1")));
160
161 txn.remove(&key).unwrap();
162 assert!(txn.pending.is_removed(&key));
163 assert_eq!(txn.pending.get(&key), None);
164
165 assert_eq!(txn.metrics().writes, 1);
167 assert_eq!(txn.metrics().removes, 1);
168 }
169
170 #[tokio::test]
171 async fn test_remove_then_set() {
172 let parent = create_test_transaction().await;
173 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
174
175 let key = make_key("key1");
176 txn.remove(&key).unwrap();
177 assert!(txn.pending.is_removed(&key));
178
179 txn.set(&key, make_value("value1")).unwrap();
180 assert!(!txn.pending.is_removed(&key));
181 assert_eq!(txn.pending.get(&key), Some(&make_value("value1")));
182
183 assert_eq!(txn.metrics().removes, 1);
185 assert_eq!(txn.metrics().writes, 1);
186 }
187
188 #[tokio::test]
189 async fn test_writes_not_visible_to_parent() {
190 let mut parent = create_test_transaction().await;
191 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
192
193 let key = make_key("key1");
194 let value = make_value("value1");
195
196 txn.set(&key, value.clone()).unwrap();
198
199 assert_eq!(get_values(&mut parent, &key).await, None);
201 }
202
203 #[tokio::test]
204 async fn test_removes_not_visible_to_parent() {
205 let mut parent = create_test_transaction().await;
206
207 let key = make_key("key1");
209 let value = make_value("value1");
210 parent.set(&key, value.clone()).await.unwrap();
211 assert_eq!(get_values(&mut parent, &key).await, Some(value.clone()));
212
213 let parent_version = parent.version();
215 let mut txn = FlowTransaction::new(&parent, parent_version).await;
216 txn.remove(&key).unwrap();
217
218 assert_eq!(get_values(&mut parent, &key).await, Some(value));
220 }
221
222 #[tokio::test]
223 async fn test_mixed_writes_and_removes() {
224 let parent = create_test_transaction().await;
225 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
226
227 txn.set(&make_key("write1"), make_value("v1")).unwrap();
228 txn.remove(&make_key("remove1")).unwrap();
229 txn.set(&make_key("write2"), make_value("v2")).unwrap();
230 txn.remove(&make_key("remove2")).unwrap();
231
232 assert_eq!(txn.pending.len(), 4);
233 assert_eq!(txn.metrics().writes, 2);
234 assert_eq!(txn.metrics().removes, 2);
235
236 assert_eq!(txn.pending.get(&make_key("write1")), Some(&make_value("v1")));
237 assert_eq!(txn.pending.get(&make_key("write2")), Some(&make_value("v2")));
238 assert!(txn.pending.is_removed(&make_key("remove1")));
239 assert!(txn.pending.is_removed(&make_key("remove2")));
240 }
241}