reifydb_sub_flow/operator/stateful/
raw.rs1use reifydb_core::encoded::{
5 key::{EncodedKey, EncodedKeyRange},
6 row::EncodedRow,
7};
8use reifydb_type::Result;
9
10use super::{StateIterator, utils};
11use crate::{Operator, transaction::FlowTransaction};
12
13pub trait RawStatefulOperator: Operator {
16 fn state_get(&self, txn: &mut FlowTransaction, key: &EncodedKey) -> Result<Option<EncodedRow>> {
18 utils::state_get(self.id(), txn, key)
19 }
20
21 fn state_set(&self, txn: &mut FlowTransaction, key: &EncodedKey, value: EncodedRow) -> Result<()> {
23 utils::state_set(self.id(), txn, key, value)
24 }
25
26 fn state_remove(&self, txn: &mut FlowTransaction, key: &EncodedKey) -> Result<()> {
28 utils::state_remove(self.id(), txn, key)
29 }
30
31 fn state_scan(&self, txn: &mut FlowTransaction) -> Result<StateIterator> {
33 utils::state_scan(self.id(), txn)
34 }
35
36 fn state_range(&self, txn: &mut FlowTransaction, range: EncodedKeyRange) -> Result<StateIterator> {
38 utils::state_range(self.id(), txn, range)
39 }
40
41 fn state_clear(&self, txn: &mut FlowTransaction) -> Result<()> {
43 utils::state_clear(self.id(), txn)
44 }
45}
46
47#[cfg(test)]
48pub mod tests {
49 use std::ops::Bound::{Excluded, Included};
50
51 use reifydb_catalog::catalog::Catalog;
52 use reifydb_core::{common::CommitVersion, interface::catalog::flow::FlowNodeId};
53 use reifydb_runtime::context::clock::{Clock, MockClock};
54 use reifydb_transaction::interceptor::interceptors::Interceptors;
55 use reifydb_type::util::cowvec::CowVec;
56
57 use super::*;
58 use crate::{operator::stateful::test_utils::test::*, transaction::FlowTransaction};
59
60 impl RawStatefulOperator for TestOperator {}
61
62 #[test]
63 fn test_simple_state_get_set() {
64 let mut txn = create_test_transaction();
65 let mut txn = FlowTransaction::deferred(
66 &mut txn,
67 CommitVersion(1),
68 Catalog::testing(),
69 Interceptors::new(),
70 Clock::Mock(MockClock::from_millis(1000)),
71 );
72 let operator = TestOperator::simple(FlowNodeId(1));
73 let key = test_key("simple_test");
74 let value = test_row();
75
76 assert!(operator.state_get(&mut txn, &key).unwrap().is_none());
78
79 operator.state_set(&mut txn, &key, value.clone()).unwrap();
81 let result = operator.state_get(&mut txn, &key).unwrap();
82 assert!(result.is_some());
83 assert_row_eq(&result.unwrap(), &value);
84 }
85
86 #[test]
87 fn test_simple_state_remove() {
88 let mut txn = create_test_transaction();
89 let mut txn = FlowTransaction::deferred(
90 &mut txn,
91 CommitVersion(1),
92 Catalog::testing(),
93 Interceptors::new(),
94 Clock::Mock(MockClock::from_millis(1000)),
95 );
96 let operator = TestOperator::simple(FlowNodeId(1));
97 let key = test_key("remove_test");
98 let value = test_row();
99
100 operator.state_set(&mut txn, &key, value).unwrap();
102 assert!(operator.state_get(&mut txn, &key).unwrap().is_some());
103
104 operator.state_remove(&mut txn, &key).unwrap();
105 assert!(operator.state_get(&mut txn, &key).unwrap().is_none());
106 }
107
108 #[test]
109 fn test_simple_state_scan() {
110 let mut txn = create_test_transaction();
111 let mut txn = FlowTransaction::deferred(
112 &mut txn,
113 CommitVersion(1),
114 Catalog::testing(),
115 Interceptors::new(),
116 Clock::Mock(MockClock::from_millis(1000)),
117 );
118 let operator = TestOperator::simple(FlowNodeId(1));
119
120 let entries = vec![("key_a", vec![1, 2]), ("key_b", vec![3, 4]), ("key_c", vec![5, 6])];
122 for (key_suffix, data) in &entries {
123 let key = test_key(key_suffix);
124 let value = EncodedRow(CowVec::new(data.clone()));
125 operator.state_set(&mut txn, &key, value).unwrap();
126 }
127
128 let scanned: Vec<_> = operator.state_scan(&mut txn).unwrap().collect();
130 assert_eq!(scanned.len(), 3);
131 }
132
133 #[test]
134 fn test_simple_state_range() {
135 let mut txn = create_test_transaction();
136 let mut txn = FlowTransaction::deferred(
137 &mut txn,
138 CommitVersion(1),
139 Catalog::testing(),
140 Interceptors::new(),
141 Clock::Mock(MockClock::from_millis(1000)),
142 );
143 let operator = TestOperator::simple(FlowNodeId(2));
144
145 for i in 0..10 {
147 let key = test_key(&format!("{:02}", i)); let value = EncodedRow(CowVec::new(vec![i as u8]));
149 operator.state_set(&mut txn, &key, value).unwrap();
150 }
151
152 let range = EncodedKeyRange::new(Included(test_key("02")), Excluded(test_key("05")));
153 let range_result: Vec<_> = operator.state_range(&mut txn, range).unwrap().collect();
154
155 assert_eq!(range_result.len(), 3);
157 assert_eq!(range_result[0].1.as_slice()[0], 2);
158 assert_eq!(range_result[1].1.as_slice()[0], 3);
159 assert_eq!(range_result[2].1.as_slice()[0], 4);
160 }
161
162 #[test]
163 fn test_simple_state_clear() {
164 let mut txn = create_test_transaction();
165 let mut txn = FlowTransaction::deferred(
166 &mut txn,
167 CommitVersion(1),
168 Catalog::testing(),
169 Interceptors::new(),
170 Clock::Mock(MockClock::from_millis(1000)),
171 );
172 let operator = TestOperator::simple(FlowNodeId(3));
173
174 for i in 0..5 {
176 let key = test_key(&format!("clear_{}", i));
177 let value = EncodedRow(CowVec::new(vec![i as u8]));
178 operator.state_set(&mut txn, &key, value).unwrap();
179 }
180
181 let count = operator.state_scan(&mut txn).unwrap().count();
183 assert_eq!(count, 5);
184
185 operator.state_clear(&mut txn).unwrap();
187
188 let count = operator.state_scan(&mut txn).unwrap().count();
190 assert_eq!(count, 0);
191 }
192
193 #[test]
194 fn test_operator_isolation() {
195 let mut txn = create_test_transaction();
196 let mut txn = FlowTransaction::deferred(
197 &mut txn,
198 CommitVersion(1),
199 Catalog::testing(),
200 Interceptors::new(),
201 Clock::Mock(MockClock::from_millis(1000)),
202 );
203 let operator1 = TestOperator::simple(FlowNodeId(10));
204 let operator2 = TestOperator::simple(FlowNodeId(20));
205 let shared_key = test_key("shared");
206
207 let value1 = EncodedRow(CowVec::new(vec![1]));
208 let value2 = EncodedRow(CowVec::new(vec![2]));
209
210 operator1.state_set(&mut txn, &shared_key, value1.clone()).unwrap();
212 operator2.state_set(&mut txn, &shared_key, value2.clone()).unwrap();
213
214 let result1 = operator1.state_get(&mut txn, &shared_key).unwrap().unwrap();
216 let result2 = operator2.state_get(&mut txn, &shared_key).unwrap().unwrap();
217
218 assert_row_eq(&result1, &value1);
219 assert_row_eq(&result2, &value2);
220 }
221
222 #[test]
223 fn test_empty_range() {
224 let mut txn = create_test_transaction();
225 let mut txn = FlowTransaction::deferred(
226 &mut txn,
227 CommitVersion(1),
228 Catalog::testing(),
229 Interceptors::new(),
230 Clock::Mock(MockClock::from_millis(1000)),
231 );
232 let operator = TestOperator::simple(FlowNodeId(4));
233
234 for i in 0..5 {
236 let key = test_key(&format!("item_{}", i));
237 let value = test_row();
238 operator.state_set(&mut txn, &key, value).unwrap();
239 }
240
241 let range = EncodedKeyRange::new(Included(test_key("z_aaa")), Excluded(test_key("z_zzz")));
243 let range_result: Vec<_> = operator.state_range(&mut txn, range).unwrap().collect();
244
245 assert_eq!(range_result.len(), 0);
246 }
247
248 #[test]
249 fn test_overwrite_existing_key() {
250 let mut txn = create_test_transaction();
251 let mut txn = FlowTransaction::deferred(
252 &mut txn,
253 CommitVersion(1),
254 Catalog::testing(),
255 Interceptors::new(),
256 Clock::Mock(MockClock::from_millis(1000)),
257 );
258 let operator = TestOperator::simple(FlowNodeId(5));
259 let key = test_key("overwrite");
260
261 let value1 = EncodedRow(CowVec::new(vec![1, 1, 1]));
262 let value2 = EncodedRow(CowVec::new(vec![2, 2, 2]));
263
264 operator.state_set(&mut txn, &key, value1).unwrap();
266
267 operator.state_set(&mut txn, &key, value2.clone()).unwrap();
269
270 let result = operator.state_get(&mut txn, &key).unwrap().unwrap();
272 assert_row_eq(&result, &value2);
273 }
274
275 #[test]
276 fn test_remove_non_existent_key() {
277 let mut txn = create_test_transaction();
278 let mut txn = FlowTransaction::deferred(
279 &mut txn,
280 CommitVersion(1),
281 Catalog::testing(),
282 Interceptors::new(),
283 Clock::Mock(MockClock::from_millis(1000)),
284 );
285 let operator = TestOperator::simple(FlowNodeId(6));
286 let key = test_key("non_existent");
287
288 operator.state_remove(&mut txn, &key).unwrap();
290
291 assert!(operator.state_get(&mut txn, &key).unwrap().is_none());
293 }
294
295 #[test]
296 fn test_scan_after_partial_removal() {
297 let mut txn = create_test_transaction();
298 let mut txn = FlowTransaction::deferred(
299 &mut txn,
300 CommitVersion(1),
301 Catalog::testing(),
302 Interceptors::new(),
303 Clock::Mock(MockClock::from_millis(1000)),
304 );
305 let operator = TestOperator::simple(FlowNodeId(7));
306
307 for i in 0..5 {
309 let key = test_key(&format!("partial_{}", i));
310 let value = EncodedRow(CowVec::new(vec![i as u8]));
311 operator.state_set(&mut txn, &key, value).unwrap();
312 }
313
314 operator.state_remove(&mut txn, &test_key("partial_1")).unwrap();
316 operator.state_remove(&mut txn, &test_key("partial_3")).unwrap();
317
318 let remaining: Vec<_> = operator.state_scan(&mut txn).unwrap().collect();
320 assert_eq!(remaining.len(), 3);
321 }
322}