reifydb_sub_flow/operator/stateful/
raw.rs1use reifydb_core::encoded::{
5 encoded::EncodedValues,
6 key::{EncodedKey, EncodedKeyRange},
7};
8use reifydb_type::Result;
9
10use super::{StateIterator, utils};
11use crate::{Operator, transaction::FlowTransaction};
12
13pub trait RawStatefulOperator: Operator {
16 fn state_get(&self, txn: &mut FlowTransaction, key: &EncodedKey) -> Result<Option<EncodedValues>> {
18 utils::state_get(self.id(), txn, key)
19 }
20
21 fn state_set(&self, txn: &mut FlowTransaction, key: &EncodedKey, value: EncodedValues) -> Result<()> {
23 utils::state_set(self.id(), txn, key, value)
24 }
25
26 fn state_remove(&self, txn: &mut FlowTransaction, key: &EncodedKey) -> Result<()> {
28 utils::state_remove(self.id(), txn, key)
29 }
30
31 fn state_scan(&self, txn: &mut FlowTransaction) -> Result<StateIterator> {
33 utils::state_scan(self.id(), txn)
34 }
35
36 fn state_range(&self, txn: &mut FlowTransaction, range: EncodedKeyRange) -> Result<StateIterator> {
38 utils::state_range(self.id(), txn, range)
39 }
40
41 fn state_clear(&self, txn: &mut FlowTransaction) -> Result<()> {
43 utils::state_clear(self.id(), txn)
44 }
45}
46
47#[cfg(test)]
48pub mod tests {
49 use std::ops::Bound::{Excluded, Included};
50
51 use reifydb_catalog::catalog::Catalog;
52 use reifydb_core::{common::CommitVersion, interface::catalog::flow::FlowNodeId};
53 use reifydb_transaction::interceptor::interceptors::Interceptors;
54 use reifydb_type::util::cowvec::CowVec;
55
56 use super::*;
57 use crate::{operator::stateful::test_utils::test::*, transaction::FlowTransaction};
58
59 impl RawStatefulOperator for TestOperator {}
60
61 #[test]
62 fn test_simple_state_get_set() {
63 let mut txn = create_test_transaction();
64 let mut txn =
65 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
66 let operator = TestOperator::simple(FlowNodeId(1));
67 let key = test_key("simple_test");
68 let value = test_row();
69
70 assert!(operator.state_get(&mut txn, &key).unwrap().is_none());
72
73 operator.state_set(&mut txn, &key, value.clone()).unwrap();
75 let result = operator.state_get(&mut txn, &key).unwrap();
76 assert!(result.is_some());
77 assert_row_eq(&result.unwrap(), &value);
78 }
79
80 #[test]
81 fn test_simple_state_remove() {
82 let mut txn = create_test_transaction();
83 let mut txn =
84 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
85 let operator = TestOperator::simple(FlowNodeId(1));
86 let key = test_key("remove_test");
87 let value = test_row();
88
89 operator.state_set(&mut txn, &key, value).unwrap();
91 assert!(operator.state_get(&mut txn, &key).unwrap().is_some());
92
93 operator.state_remove(&mut txn, &key).unwrap();
94 assert!(operator.state_get(&mut txn, &key).unwrap().is_none());
95 }
96
97 #[test]
98 fn test_simple_state_scan() {
99 let mut txn = create_test_transaction();
100 let mut txn =
101 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
102 let operator = TestOperator::simple(FlowNodeId(1));
103
104 let entries = vec![("key_a", vec![1, 2]), ("key_b", vec![3, 4]), ("key_c", vec![5, 6])];
106 for (key_suffix, data) in &entries {
107 let key = test_key(key_suffix);
108 let value = EncodedValues(CowVec::new(data.clone()));
109 operator.state_set(&mut txn, &key, value).unwrap();
110 }
111
112 let scanned: Vec<_> = operator.state_scan(&mut txn).unwrap().collect();
114 assert_eq!(scanned.len(), 3);
115 }
116
117 #[test]
118 fn test_simple_state_range() {
119 let mut txn = create_test_transaction();
120 let mut txn =
121 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
122 let operator = TestOperator::simple(FlowNodeId(2));
123
124 for i in 0..10 {
126 let key = test_key(&format!("{:02}", i)); let value = EncodedValues(CowVec::new(vec![i as u8]));
128 operator.state_set(&mut txn, &key, value).unwrap();
129 }
130
131 let range = EncodedKeyRange::new(Included(test_key("02")), Excluded(test_key("05")));
132 let range_result: Vec<_> = operator.state_range(&mut txn, range).unwrap().collect();
133
134 assert_eq!(range_result.len(), 3);
136 assert_eq!(range_result[0].1.as_ref()[0], 2);
137 assert_eq!(range_result[1].1.as_ref()[0], 3);
138 assert_eq!(range_result[2].1.as_ref()[0], 4);
139 }
140
141 #[test]
142 fn test_simple_state_clear() {
143 let mut txn = create_test_transaction();
144 let mut txn =
145 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
146 let operator = TestOperator::simple(FlowNodeId(3));
147
148 for i in 0..5 {
150 let key = test_key(&format!("clear_{}", i));
151 let value = EncodedValues(CowVec::new(vec![i as u8]));
152 operator.state_set(&mut txn, &key, value).unwrap();
153 }
154
155 let count = operator.state_scan(&mut txn).unwrap().count();
157 assert_eq!(count, 5);
158
159 operator.state_clear(&mut txn).unwrap();
161
162 let count = operator.state_scan(&mut txn).unwrap().count();
164 assert_eq!(count, 0);
165 }
166
167 #[test]
168 fn test_operator_isolation() {
169 let mut txn = create_test_transaction();
170 let mut txn =
171 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
172 let operator1 = TestOperator::simple(FlowNodeId(10));
173 let operator2 = TestOperator::simple(FlowNodeId(20));
174 let shared_key = test_key("shared");
175
176 let value1 = EncodedValues(CowVec::new(vec![1]));
177 let value2 = EncodedValues(CowVec::new(vec![2]));
178
179 operator1.state_set(&mut txn, &shared_key, value1.clone()).unwrap();
181 operator2.state_set(&mut txn, &shared_key, value2.clone()).unwrap();
182
183 let result1 = operator1.state_get(&mut txn, &shared_key).unwrap().unwrap();
185 let result2 = operator2.state_get(&mut txn, &shared_key).unwrap().unwrap();
186
187 assert_row_eq(&result1, &value1);
188 assert_row_eq(&result2, &value2);
189 }
190
191 #[test]
192 fn test_empty_range() {
193 let mut txn = create_test_transaction();
194 let mut txn =
195 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
196 let operator = TestOperator::simple(FlowNodeId(4));
197
198 for i in 0..5 {
200 let key = test_key(&format!("item_{}", i));
201 let value = test_row();
202 operator.state_set(&mut txn, &key, value).unwrap();
203 }
204
205 let range = EncodedKeyRange::new(Included(test_key("z_aaa")), Excluded(test_key("z_zzz")));
207 let range_result: Vec<_> = operator.state_range(&mut txn, range).unwrap().collect();
208
209 assert_eq!(range_result.len(), 0);
210 }
211
212 #[test]
213 fn test_overwrite_existing_key() {
214 let mut txn = create_test_transaction();
215 let mut txn =
216 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
217 let operator = TestOperator::simple(FlowNodeId(5));
218 let key = test_key("overwrite");
219
220 let value1 = EncodedValues(CowVec::new(vec![1, 1, 1]));
221 let value2 = EncodedValues(CowVec::new(vec![2, 2, 2]));
222
223 operator.state_set(&mut txn, &key, value1).unwrap();
225
226 operator.state_set(&mut txn, &key, value2.clone()).unwrap();
228
229 let result = operator.state_get(&mut txn, &key).unwrap().unwrap();
231 assert_row_eq(&result, &value2);
232 }
233
234 #[test]
235 fn test_remove_non_existent_key() {
236 let mut txn = create_test_transaction();
237 let mut txn =
238 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
239 let operator = TestOperator::simple(FlowNodeId(6));
240 let key = test_key("non_existent");
241
242 operator.state_remove(&mut txn, &key).unwrap();
244
245 assert!(operator.state_get(&mut txn, &key).unwrap().is_none());
247 }
248
249 #[test]
250 fn test_scan_after_partial_removal() {
251 let mut txn = create_test_transaction();
252 let mut txn =
253 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
254 let operator = TestOperator::simple(FlowNodeId(7));
255
256 for i in 0..5 {
258 let key = test_key(&format!("partial_{}", i));
259 let value = EncodedValues(CowVec::new(vec![i as u8]));
260 operator.state_set(&mut txn, &key, value).unwrap();
261 }
262
263 operator.state_remove(&mut txn, &test_key("partial_1")).unwrap();
265 operator.state_remove(&mut txn, &test_key("partial_3")).unwrap();
266
267 let remaining: Vec<_> = operator.state_scan(&mut txn).unwrap().collect();
269 assert_eq!(remaining.len(), 3);
270 }
271}