Skip to main content

reifydb_sub_flow/operator/stateful/
raw.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::encoded::{
5	key::{EncodedKey, EncodedKeyRange},
6	row::EncodedRow,
7};
8use reifydb_type::Result;
9
10use super::{StateIterator, utils};
11use crate::{Operator, transaction::FlowTransaction};
12
13/// Raw Stateful operations - provides raw key-value access
14/// This is the foundation for operators that need state management
15pub trait RawStatefulOperator: Operator {
16	/// Get raw bytes for a key
17	fn state_get(&self, txn: &mut FlowTransaction, key: &EncodedKey) -> Result<Option<EncodedRow>> {
18		utils::state_get(self.id(), txn, key)
19	}
20
21	/// Set raw bytes for a key
22	fn state_set(&self, txn: &mut FlowTransaction, key: &EncodedKey, value: EncodedRow) -> Result<()> {
23		utils::state_set(self.id(), txn, key, value)
24	}
25
26	/// Remove a key
27	fn state_remove(&self, txn: &mut FlowTransaction, key: &EncodedKey) -> Result<()> {
28		utils::state_remove(self.id(), txn, key)
29	}
30
31	/// Scan all keys for this operator
32	fn state_scan(&self, txn: &mut FlowTransaction) -> Result<StateIterator> {
33		utils::state_scan(self.id(), txn)
34	}
35
36	/// Range query between keys
37	fn state_range(&self, txn: &mut FlowTransaction, range: EncodedKeyRange) -> Result<StateIterator> {
38		utils::state_range(self.id(), txn, range)
39	}
40
41	/// Clear all state for this operator
42	fn state_clear(&self, txn: &mut FlowTransaction) -> Result<()> {
43		utils::state_clear(self.id(), txn)
44	}
45}
46
47#[cfg(test)]
48pub mod tests {
49	use std::ops::Bound::{Excluded, Included};
50
51	use reifydb_catalog::catalog::Catalog;
52	use reifydb_core::{common::CommitVersion, interface::catalog::flow::FlowNodeId};
53	use reifydb_runtime::context::clock::{Clock, MockClock};
54	use reifydb_transaction::interceptor::interceptors::Interceptors;
55	use reifydb_type::util::cowvec::CowVec;
56
57	use super::*;
58	use crate::{operator::stateful::test_utils::test::*, transaction::FlowTransaction};
59
60	impl RawStatefulOperator for TestOperator {}
61
62	#[test]
63	fn test_simple_state_get_set() {
64		let mut txn = create_test_transaction();
65		let mut txn = FlowTransaction::deferred(
66			&mut txn,
67			CommitVersion(1),
68			Catalog::testing(),
69			Interceptors::new(),
70			Clock::Mock(MockClock::from_millis(1000)),
71		);
72		let operator = TestOperator::simple(FlowNodeId(1));
73		let key = test_key("simple_test");
74		let value = test_row();
75
76		// Initially should be None
77		assert!(operator.state_get(&mut txn, &key).unwrap().is_none());
78
79		// Set and verify
80		operator.state_set(&mut txn, &key, value.clone()).unwrap();
81		let result = operator.state_get(&mut txn, &key).unwrap();
82		assert!(result.is_some());
83		assert_row_eq(&result.unwrap(), &value);
84	}
85
86	#[test]
87	fn test_simple_state_remove() {
88		let mut txn = create_test_transaction();
89		let mut txn = FlowTransaction::deferred(
90			&mut txn,
91			CommitVersion(1),
92			Catalog::testing(),
93			Interceptors::new(),
94			Clock::Mock(MockClock::from_millis(1000)),
95		);
96		let operator = TestOperator::simple(FlowNodeId(1));
97		let key = test_key("remove_test");
98		let value = test_row();
99
100		// Set, verify, remove, verify
101		operator.state_set(&mut txn, &key, value).unwrap();
102		assert!(operator.state_get(&mut txn, &key).unwrap().is_some());
103
104		operator.state_remove(&mut txn, &key).unwrap();
105		assert!(operator.state_get(&mut txn, &key).unwrap().is_none());
106	}
107
108	#[test]
109	fn test_simple_state_scan() {
110		let mut txn = create_test_transaction();
111		let mut txn = FlowTransaction::deferred(
112			&mut txn,
113			CommitVersion(1),
114			Catalog::testing(),
115			Interceptors::new(),
116			Clock::Mock(MockClock::from_millis(1000)),
117		);
118		let operator = TestOperator::simple(FlowNodeId(1));
119
120		// Add multiple entries
121		let entries = vec![("key_a", vec![1, 2]), ("key_b", vec![3, 4]), ("key_c", vec![5, 6])];
122		for (key_suffix, data) in &entries {
123			let key = test_key(key_suffix);
124			let value = EncodedRow(CowVec::new(data.clone()));
125			operator.state_set(&mut txn, &key, value).unwrap();
126		}
127
128		// Scan and verify count
129		let scanned: Vec<_> = operator.state_scan(&mut txn).unwrap().collect();
130		assert_eq!(scanned.len(), 3);
131	}
132
133	#[test]
134	fn test_simple_state_range() {
135		let mut txn = create_test_transaction();
136		let mut txn = FlowTransaction::deferred(
137			&mut txn,
138			CommitVersion(1),
139			Catalog::testing(),
140			Interceptors::new(),
141			Clock::Mock(MockClock::from_millis(1000)),
142		);
143		let operator = TestOperator::simple(FlowNodeId(2));
144
145		// Add ordered entries
146		for i in 0..10 {
147			let key = test_key(&format!("{:02}", i)); // Ensures lexical ordering
148			let value = EncodedRow(CowVec::new(vec![i as u8]));
149			operator.state_set(&mut txn, &key, value).unwrap();
150		}
151
152		let range = EncodedKeyRange::new(Included(test_key("02")), Excluded(test_key("05")));
153		let range_result: Vec<_> = operator.state_range(&mut txn, range).unwrap().collect();
154
155		// Should get keys 02, 03, 04 (not 05 as end is exclusive)
156		assert_eq!(range_result.len(), 3);
157		assert_eq!(range_result[0].1.as_slice()[0], 2);
158		assert_eq!(range_result[1].1.as_slice()[0], 3);
159		assert_eq!(range_result[2].1.as_slice()[0], 4);
160	}
161
162	#[test]
163	fn test_simple_state_clear() {
164		let mut txn = create_test_transaction();
165		let mut txn = FlowTransaction::deferred(
166			&mut txn,
167			CommitVersion(1),
168			Catalog::testing(),
169			Interceptors::new(),
170			Clock::Mock(MockClock::from_millis(1000)),
171		);
172		let operator = TestOperator::simple(FlowNodeId(3));
173
174		// Add multiple entries
175		for i in 0..5 {
176			let key = test_key(&format!("clear_{}", i));
177			let value = EncodedRow(CowVec::new(vec![i as u8]));
178			operator.state_set(&mut txn, &key, value).unwrap();
179		}
180
181		// Verify entries exist
182		let count = operator.state_scan(&mut txn).unwrap().count();
183		assert_eq!(count, 5);
184
185		// Clear all
186		operator.state_clear(&mut txn).unwrap();
187
188		// Verify all cleared
189		let count = operator.state_scan(&mut txn).unwrap().count();
190		assert_eq!(count, 0);
191	}
192
193	#[test]
194	fn test_operator_isolation() {
195		let mut txn = create_test_transaction();
196		let mut txn = FlowTransaction::deferred(
197			&mut txn,
198			CommitVersion(1),
199			Catalog::testing(),
200			Interceptors::new(),
201			Clock::Mock(MockClock::from_millis(1000)),
202		);
203		let operator1 = TestOperator::simple(FlowNodeId(10));
204		let operator2 = TestOperator::simple(FlowNodeId(20));
205		let shared_key = test_key("shared");
206
207		let value1 = EncodedRow(CowVec::new(vec![1]));
208		let value2 = EncodedRow(CowVec::new(vec![2]));
209
210		// Set different values for same key in different operators
211		operator1.state_set(&mut txn, &shared_key, value1.clone()).unwrap();
212		operator2.state_set(&mut txn, &shared_key, value2.clone()).unwrap();
213
214		// Each operator should have its own value
215		let result1 = operator1.state_get(&mut txn, &shared_key).unwrap().unwrap();
216		let result2 = operator2.state_get(&mut txn, &shared_key).unwrap().unwrap();
217
218		assert_row_eq(&result1, &value1);
219		assert_row_eq(&result2, &value2);
220	}
221
222	#[test]
223	fn test_empty_range() {
224		let mut txn = create_test_transaction();
225		let mut txn = FlowTransaction::deferred(
226			&mut txn,
227			CommitVersion(1),
228			Catalog::testing(),
229			Interceptors::new(),
230			Clock::Mock(MockClock::from_millis(1000)),
231		);
232		let operator = TestOperator::simple(FlowNodeId(4));
233
234		// Add some entries
235		for i in 0..5 {
236			let key = test_key(&format!("item_{}", i));
237			let value = test_row();
238			operator.state_set(&mut txn, &key, value).unwrap();
239		}
240
241		// Query range that doesn't exist (after all "item_*" entries)
242		let range = EncodedKeyRange::new(Included(test_key("z_aaa")), Excluded(test_key("z_zzz")));
243		let range_result: Vec<_> = operator.state_range(&mut txn, range).unwrap().collect();
244
245		assert_eq!(range_result.len(), 0);
246	}
247
248	#[test]
249	fn test_overwrite_existing_key() {
250		let mut txn = create_test_transaction();
251		let mut txn = FlowTransaction::deferred(
252			&mut txn,
253			CommitVersion(1),
254			Catalog::testing(),
255			Interceptors::new(),
256			Clock::Mock(MockClock::from_millis(1000)),
257		);
258		let operator = TestOperator::simple(FlowNodeId(5));
259		let key = test_key("overwrite");
260
261		let value1 = EncodedRow(CowVec::new(vec![1, 1, 1]));
262		let value2 = EncodedRow(CowVec::new(vec![2, 2, 2]));
263
264		// Set initial value
265		operator.state_set(&mut txn, &key, value1).unwrap();
266
267		// Overwrite with new value
268		operator.state_set(&mut txn, &key, value2.clone()).unwrap();
269
270		// Should have the new value
271		let result = operator.state_get(&mut txn, &key).unwrap().unwrap();
272		assert_row_eq(&result, &value2);
273	}
274
275	#[test]
276	fn test_remove_non_existent_key() {
277		let mut txn = create_test_transaction();
278		let mut txn = FlowTransaction::deferred(
279			&mut txn,
280			CommitVersion(1),
281			Catalog::testing(),
282			Interceptors::new(),
283			Clock::Mock(MockClock::from_millis(1000)),
284		);
285		let operator = TestOperator::simple(FlowNodeId(6));
286		let key = test_key("non_existent");
287
288		// Remove non-existent key should not error
289		operator.state_remove(&mut txn, &key).unwrap();
290
291		// Should still be None
292		assert!(operator.state_get(&mut txn, &key).unwrap().is_none());
293	}
294
295	#[test]
296	fn test_scan_after_partial_removal() {
297		let mut txn = create_test_transaction();
298		let mut txn = FlowTransaction::deferred(
299			&mut txn,
300			CommitVersion(1),
301			Catalog::testing(),
302			Interceptors::new(),
303			Clock::Mock(MockClock::from_millis(1000)),
304		);
305		let operator = TestOperator::simple(FlowNodeId(7));
306
307		// Add 5 entries
308		for i in 0..5 {
309			let key = test_key(&format!("partial_{}", i));
310			let value = EncodedRow(CowVec::new(vec![i as u8]));
311			operator.state_set(&mut txn, &key, value).unwrap();
312		}
313
314		// Remove some entries
315		operator.state_remove(&mut txn, &test_key("partial_1")).unwrap();
316		operator.state_remove(&mut txn, &test_key("partial_3")).unwrap();
317
318		// Should have 3 entries left (0, 2, 4)
319		let remaining: Vec<_> = operator.state_scan(&mut txn).unwrap().collect();
320		assert_eq!(remaining.len(), 3);
321	}
322}