reifydb_sub_flow/transaction/
write.rs1use reifydb_core::encoded::{encoded::EncodedValues, key::EncodedKey};
5use reifydb_type::Result;
6
7use super::FlowTransaction;
8
9impl FlowTransaction {
10 pub fn set(&mut self, key: &EncodedKey, value: EncodedValues) -> Result<()> {
12 match self {
13 Self::Deferred {
14 pending,
15 ..
16 } => pending.insert(key.clone(), value),
17 Self::Transactional {
18 pending,
19 ..
20 } => pending.insert(key.clone(), value),
21 }
22 Ok(())
23 }
24
25 pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
27 match self {
28 Self::Deferred {
29 pending,
30 ..
31 } => pending.remove(key.clone()),
32 Self::Transactional {
33 pending,
34 ..
35 } => pending.remove(key.clone()),
36 }
37 Ok(())
38 }
39}
40
41#[cfg(test)]
42pub mod tests {
43 use reifydb_catalog::catalog::Catalog;
44 use reifydb_core::{
45 common::CommitVersion,
46 encoded::{encoded::EncodedValues, key::EncodedKey},
47 };
48 use reifydb_transaction::{interceptor::interceptors::Interceptors, transaction::admin::AdminTransaction};
49 use reifydb_type::util::cowvec::CowVec;
50
51 use super::*;
52 use crate::operator::stateful::test_utils::test::create_test_transaction;
53
54 fn make_key(s: &str) -> EncodedKey {
55 EncodedKey::new(s.as_bytes().to_vec())
56 }
57
58 fn make_value(s: &str) -> EncodedValues {
59 EncodedValues(CowVec::new(s.as_bytes().to_vec()))
60 }
61
62 fn get_values(parent: &mut AdminTransaction, key: &EncodedKey) -> Option<EncodedValues> {
63 parent.get(key).unwrap().map(|m| m.values.clone())
64 }
65
66 #[test]
67 fn test_set_buffers_to_pending() {
68 let parent = create_test_transaction();
69 let mut txn =
70 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
71
72 let key = make_key("key1");
73 let value = make_value("value1");
74
75 txn.set(&key, value.clone()).unwrap();
76
77 assert_eq!(txn.pending().get(&key), Some(&value));
79 }
80
81 #[test]
82 fn test_set_multiple_keys() {
83 let parent = create_test_transaction();
84 let mut txn =
85 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
86
87 txn.set(&make_key("key1"), make_value("value1")).unwrap();
88 txn.set(&make_key("key2"), make_value("value2")).unwrap();
89 txn.set(&make_key("key3"), make_value("value3")).unwrap();
90
91 assert_eq!(txn.pending().get(&make_key("key1")), Some(&make_value("value1")));
92 assert_eq!(txn.pending().get(&make_key("key2")), Some(&make_value("value2")));
93 assert_eq!(txn.pending().get(&make_key("key3")), Some(&make_value("value3")));
94 }
95
96 #[test]
97 fn test_set_overwrites_same_key() {
98 let parent = create_test_transaction();
99 let mut txn =
100 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
101
102 let key = make_key("key1");
103 txn.set(&key, make_value("value1")).unwrap();
104 txn.set(&key, make_value("value2")).unwrap();
105
106 assert_eq!(txn.pending().get(&key), Some(&make_value("value2")));
108 }
109
110 #[test]
111 fn test_remove_buffers_to_pending() {
112 let parent = create_test_transaction();
113 let mut txn =
114 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
115
116 let key = make_key("key1");
117 txn.remove(&key).unwrap();
118
119 assert!(txn.pending().is_removed(&key));
121 }
122
123 #[test]
124 fn test_remove_multiple_keys() {
125 let parent = create_test_transaction();
126 let mut txn =
127 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
128
129 txn.remove(&make_key("key1")).unwrap();
130 txn.remove(&make_key("key2")).unwrap();
131 txn.remove(&make_key("key3")).unwrap();
132
133 assert!(txn.pending().is_removed(&make_key("key1")));
134 assert!(txn.pending().is_removed(&make_key("key2")));
135 assert!(txn.pending().is_removed(&make_key("key3")));
136 }
137
138 #[test]
139 fn test_set_then_remove() {
140 let parent = create_test_transaction();
141 let mut txn =
142 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
143
144 let key = make_key("key1");
145 txn.set(&key, make_value("value1")).unwrap();
146 assert_eq!(txn.pending().get(&key), Some(&make_value("value1")));
147
148 txn.remove(&key).unwrap();
149 assert!(txn.pending().is_removed(&key));
150 assert_eq!(txn.pending().get(&key), None);
151 }
152
153 #[test]
154 fn test_remove_then_set() {
155 let parent = create_test_transaction();
156 let mut txn =
157 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
158
159 let key = make_key("key1");
160 txn.remove(&key).unwrap();
161 assert!(txn.pending().is_removed(&key));
162
163 txn.set(&key, make_value("value1")).unwrap();
164 assert!(!txn.pending().is_removed(&key));
165 assert_eq!(txn.pending().get(&key), Some(&make_value("value1")));
166 }
167
168 #[test]
169 fn test_writes_not_visible_to_parent() {
170 let mut parent = create_test_transaction();
171 let mut txn =
172 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
173
174 let key = make_key("key1");
175 let value = make_value("value1");
176
177 txn.set(&key, value.clone()).unwrap();
179
180 assert_eq!(get_values(&mut parent, &key), None);
182 }
183
184 #[test]
185 fn test_removes_not_visible_to_parent() {
186 let mut parent = create_test_transaction();
187
188 let key = make_key("key1");
190 let value = make_value("value1");
191 parent.set(&key, value.clone()).unwrap();
192 assert_eq!(get_values(&mut parent, &key), Some(value.clone()));
193
194 let parent_version = parent.version();
196 let mut txn =
197 FlowTransaction::deferred(&parent, parent_version, Catalog::testing(), Interceptors::new());
198 txn.remove(&key).unwrap();
199
200 assert_eq!(get_values(&mut parent, &key), Some(value));
202 }
203
204 #[test]
205 fn test_mixed_writes_and_removes() {
206 let parent = create_test_transaction();
207 let mut txn =
208 FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
209
210 txn.set(&make_key("write1"), make_value("v1")).unwrap();
211 txn.remove(&make_key("remove1")).unwrap();
212 txn.set(&make_key("write2"), make_value("v2")).unwrap();
213 txn.remove(&make_key("remove2")).unwrap();
214
215 assert_eq!(txn.pending().get(&make_key("write1")), Some(&make_value("v1")));
216 assert_eq!(txn.pending().get(&make_key("write2")), Some(&make_value("v2")));
217 assert!(txn.pending().is_removed(&make_key("remove1")));
218 assert!(txn.pending().is_removed(&make_key("remove2")));
219 }
220}