reifydb_sub_flow/transaction/
write.rs1use reifydb_core::{EncodedKey, interface::ViewDef, value::encoded::EncodedValues};
5use reifydb_type::RowNumber;
6
7use super::{FlowTransaction, ViewPending};
8
9impl FlowTransaction {
10 pub fn set(&mut self, key: &EncodedKey, value: EncodedValues) -> crate::Result<()> {
12 self.metrics.increment_writes();
13 self.pending.insert(key.clone(), value);
14 Ok(())
15 }
16
17 pub fn remove(&mut self, key: &EncodedKey) -> crate::Result<()> {
19 self.metrics.increment_removes();
20 self.pending.remove(key.clone());
21 Ok(())
22 }
23
24 pub fn insert_view(
30 &mut self,
31 key: &EncodedKey,
32 view: ViewDef,
33 row_number: RowNumber,
34 row: EncodedValues,
35 ) -> crate::Result<()> {
36 self.metrics.increment_writes();
37 self.pending.insert(key.clone(), row.clone());
38 self.view_pending.push(ViewPending::Insert {
39 view,
40 row_number,
41 row,
42 });
43 Ok(())
44 }
45
46 pub fn update_view(
51 &mut self,
52 old_key: &EncodedKey,
53 new_key: &EncodedKey,
54 view: ViewDef,
55 old_row_number: RowNumber,
56 new_row_number: RowNumber,
57 row: EncodedValues,
58 ) -> crate::Result<()> {
59 self.metrics.increment_removes();
60 self.metrics.increment_writes();
61 self.pending.remove(old_key.clone());
62 self.pending.insert(new_key.clone(), row.clone());
63 self.view_pending.push(ViewPending::Update {
64 view,
65 old_row_number,
66 new_row_number,
67 row,
68 });
69 Ok(())
70 }
71
72 pub fn remove_view(&mut self, key: &EncodedKey, view: ViewDef, row_number: RowNumber) -> crate::Result<()> {
77 self.metrics.increment_removes();
78 self.pending.remove(key.clone());
79 self.view_pending.push(ViewPending::Remove {
80 view,
81 row_number,
82 });
83 Ok(())
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use reifydb_core::{
90 CommitVersion, CowVec, EncodedKey,
91 interface::{MultiVersionCommandTransaction, MultiVersionQueryTransaction},
92 value::encoded::EncodedValues,
93 };
94 use reifydb_engine::StandardCommandTransaction;
95
96 use super::*;
97 use crate::operator::stateful::test_utils::test::create_test_transaction;
98
99 fn make_key(s: &str) -> EncodedKey {
100 EncodedKey::new(s.as_bytes().to_vec())
101 }
102
103 fn make_value(s: &str) -> EncodedValues {
104 EncodedValues(CowVec::new(s.as_bytes().to_vec()))
105 }
106
107 fn get_values(parent: &mut StandardCommandTransaction, key: &EncodedKey) -> Option<EncodedValues> {
108 parent.get(key).unwrap().map(|m| m.values)
109 }
110
111 #[test]
112 fn test_set_buffers_to_pending() {
113 let parent = create_test_transaction();
114 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
115
116 let key = make_key("key1");
117 let value = make_value("value1");
118
119 txn.set(&key, value.clone()).unwrap();
120
121 assert_eq!(txn.pending.get(&key), Some(&value));
123 assert_eq!(txn.pending.len(), 1);
124 }
125
126 #[test]
127 fn test_set_increments_writes_metric() {
128 let parent = create_test_transaction();
129 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
130
131 assert_eq!(txn.metrics().writes, 0);
132
133 txn.set(&make_key("key1"), make_value("value1")).unwrap();
134 assert_eq!(txn.metrics().writes, 1);
135
136 txn.set(&make_key("key2"), make_value("value2")).unwrap();
137 assert_eq!(txn.metrics().writes, 2);
138 }
139
140 #[test]
141 fn test_set_multiple_keys() {
142 let parent = create_test_transaction();
143 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
144
145 txn.set(&make_key("key1"), make_value("value1")).unwrap();
146 txn.set(&make_key("key2"), make_value("value2")).unwrap();
147 txn.set(&make_key("key3"), make_value("value3")).unwrap();
148
149 assert_eq!(txn.pending.len(), 3);
150 assert_eq!(txn.metrics().writes, 3);
151 assert_eq!(txn.pending.get(&make_key("key1")), Some(&make_value("value1")));
152 assert_eq!(txn.pending.get(&make_key("key2")), Some(&make_value("value2")));
153 assert_eq!(txn.pending.get(&make_key("key3")), Some(&make_value("value3")));
154 }
155
156 #[test]
157 fn test_set_overwrites_same_key() {
158 let parent = create_test_transaction();
159 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
160
161 let key = make_key("key1");
162 txn.set(&key, make_value("value1")).unwrap();
163 txn.set(&key, make_value("value2")).unwrap();
164
165 assert_eq!(txn.pending.len(), 1);
167 assert_eq!(txn.pending.get(&key), Some(&make_value("value2")));
168 assert_eq!(txn.metrics().writes, 2);
170 }
171
172 #[test]
173 fn test_remove_buffers_to_pending() {
174 let parent = create_test_transaction();
175 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
176
177 let key = make_key("key1");
178 txn.remove(&key).unwrap();
179
180 assert!(txn.pending.is_removed(&key));
182 assert_eq!(txn.pending.len(), 1);
183 }
184
185 #[test]
186 fn test_remove_increments_removes_metric() {
187 let parent = create_test_transaction();
188 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
189
190 assert_eq!(txn.metrics().removes, 0);
191
192 txn.remove(&make_key("key1")).unwrap();
193 assert_eq!(txn.metrics().removes, 1);
194
195 txn.remove(&make_key("key2")).unwrap();
196 assert_eq!(txn.metrics().removes, 2);
197 }
198
199 #[test]
200 fn test_remove_multiple_keys() {
201 let parent = create_test_transaction();
202 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
203
204 txn.remove(&make_key("key1")).unwrap();
205 txn.remove(&make_key("key2")).unwrap();
206 txn.remove(&make_key("key3")).unwrap();
207
208 assert_eq!(txn.pending.len(), 3);
209 assert_eq!(txn.metrics().removes, 3);
210 assert!(txn.pending.is_removed(&make_key("key1")));
211 assert!(txn.pending.is_removed(&make_key("key2")));
212 assert!(txn.pending.is_removed(&make_key("key3")));
213 }
214
215 #[test]
216 fn test_set_then_remove() {
217 let parent = create_test_transaction();
218 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
219
220 let key = make_key("key1");
221 txn.set(&key, make_value("value1")).unwrap();
222 assert_eq!(txn.pending.get(&key), Some(&make_value("value1")));
223
224 txn.remove(&key).unwrap();
225 assert!(txn.pending.is_removed(&key));
226 assert_eq!(txn.pending.get(&key), None);
227
228 assert_eq!(txn.metrics().writes, 1);
230 assert_eq!(txn.metrics().removes, 1);
231 }
232
233 #[test]
234 fn test_remove_then_set() {
235 let parent = create_test_transaction();
236 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
237
238 let key = make_key("key1");
239 txn.remove(&key).unwrap();
240 assert!(txn.pending.is_removed(&key));
241
242 txn.set(&key, make_value("value1")).unwrap();
243 assert!(!txn.pending.is_removed(&key));
244 assert_eq!(txn.pending.get(&key), Some(&make_value("value1")));
245
246 assert_eq!(txn.metrics().removes, 1);
248 assert_eq!(txn.metrics().writes, 1);
249 }
250
251 #[test]
252 fn test_writes_not_visible_to_parent() {
253 let mut parent = create_test_transaction();
254 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
255
256 let key = make_key("key1");
257 let value = make_value("value1");
258
259 txn.set(&key, value.clone()).unwrap();
261
262 assert_eq!(get_values(&mut parent, &key), None);
264 }
265
266 #[test]
267 fn test_removes_not_visible_to_parent() {
268 let mut parent = create_test_transaction();
269
270 let key = make_key("key1");
272 let value = make_value("value1");
273 parent.set(&key, value.clone()).unwrap();
274 assert_eq!(get_values(&mut parent, &key), Some(value.clone()));
275
276 let parent_version = parent.version();
278 let mut txn = FlowTransaction::new(&parent, parent_version);
279 txn.remove(&key).unwrap();
280
281 assert_eq!(get_values(&mut parent, &key), Some(value));
283 }
284
285 #[test]
286 fn test_mixed_writes_and_removes() {
287 let parent = create_test_transaction();
288 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
289
290 txn.set(&make_key("write1"), make_value("v1")).unwrap();
291 txn.remove(&make_key("remove1")).unwrap();
292 txn.set(&make_key("write2"), make_value("v2")).unwrap();
293 txn.remove(&make_key("remove2")).unwrap();
294
295 assert_eq!(txn.pending.len(), 4);
296 assert_eq!(txn.metrics().writes, 2);
297 assert_eq!(txn.metrics().removes, 2);
298
299 assert_eq!(txn.pending.get(&make_key("write1")), Some(&make_value("v1")));
300 assert_eq!(txn.pending.get(&make_key("write2")), Some(&make_value("v2")));
301 assert!(txn.pending.is_removed(&make_key("remove1")));
302 assert!(txn.pending.is_removed(&make_key("remove2")));
303 }
304}