Skip to main content

reifydb_sub_flow/operator/stateful/
raw.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::encoded::{
5	encoded::EncodedValues,
6	key::{EncodedKey, EncodedKeyRange},
7};
8use reifydb_type::Result;
9
10use super::{StateIterator, utils};
11use crate::{Operator, transaction::FlowTransaction};
12
13/// Raw Stateful operations - provides raw key-value access
14/// This is the foundation for operators that need state management
15pub trait RawStatefulOperator: Operator {
16	/// Get raw bytes for a key
17	fn state_get(&self, txn: &mut FlowTransaction, key: &EncodedKey) -> Result<Option<EncodedValues>> {
18		utils::state_get(self.id(), txn, key)
19	}
20
21	/// Set raw bytes for a key
22	fn state_set(&self, txn: &mut FlowTransaction, key: &EncodedKey, value: EncodedValues) -> Result<()> {
23		utils::state_set(self.id(), txn, key, value)
24	}
25
26	/// Remove a key
27	fn state_remove(&self, txn: &mut FlowTransaction, key: &EncodedKey) -> Result<()> {
28		utils::state_remove(self.id(), txn, key)
29	}
30
31	/// Scan all keys for this operator
32	fn state_scan(&self, txn: &mut FlowTransaction) -> Result<StateIterator> {
33		utils::state_scan(self.id(), txn)
34	}
35
36	/// Range query between keys
37	fn state_range(&self, txn: &mut FlowTransaction, range: EncodedKeyRange) -> Result<StateIterator> {
38		utils::state_range(self.id(), txn, range)
39	}
40
41	/// Clear all state for this operator
42	fn state_clear(&self, txn: &mut FlowTransaction) -> Result<()> {
43		utils::state_clear(self.id(), txn)
44	}
45}
46
47#[cfg(test)]
48pub mod tests {
49	use std::ops::Bound::{Excluded, Included};
50
51	use reifydb_catalog::catalog::Catalog;
52	use reifydb_core::{common::CommitVersion, interface::catalog::flow::FlowNodeId};
53	use reifydb_transaction::interceptor::interceptors::Interceptors;
54	use reifydb_type::util::cowvec::CowVec;
55
56	use super::*;
57	use crate::{operator::stateful::test_utils::test::*, transaction::FlowTransaction};
58
59	impl RawStatefulOperator for TestOperator {}
60
61	#[test]
62	fn test_simple_state_get_set() {
63		let mut txn = create_test_transaction();
64		let mut txn =
65			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
66		let operator = TestOperator::simple(FlowNodeId(1));
67		let key = test_key("simple_test");
68		let value = test_row();
69
70		// Initially should be None
71		assert!(operator.state_get(&mut txn, &key).unwrap().is_none());
72
73		// Set and verify
74		operator.state_set(&mut txn, &key, value.clone()).unwrap();
75		let result = operator.state_get(&mut txn, &key).unwrap();
76		assert!(result.is_some());
77		assert_row_eq(&result.unwrap(), &value);
78	}
79
80	#[test]
81	fn test_simple_state_remove() {
82		let mut txn = create_test_transaction();
83		let mut txn =
84			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
85		let operator = TestOperator::simple(FlowNodeId(1));
86		let key = test_key("remove_test");
87		let value = test_row();
88
89		// Set, verify, remove, verify
90		operator.state_set(&mut txn, &key, value).unwrap();
91		assert!(operator.state_get(&mut txn, &key).unwrap().is_some());
92
93		operator.state_remove(&mut txn, &key).unwrap();
94		assert!(operator.state_get(&mut txn, &key).unwrap().is_none());
95	}
96
97	#[test]
98	fn test_simple_state_scan() {
99		let mut txn = create_test_transaction();
100		let mut txn =
101			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
102		let operator = TestOperator::simple(FlowNodeId(1));
103
104		// Add multiple entries
105		let entries = vec![("key_a", vec![1, 2]), ("key_b", vec![3, 4]), ("key_c", vec![5, 6])];
106		for (key_suffix, data) in &entries {
107			let key = test_key(key_suffix);
108			let value = EncodedValues(CowVec::new(data.clone()));
109			operator.state_set(&mut txn, &key, value).unwrap();
110		}
111
112		// Scan and verify count
113		let scanned: Vec<_> = operator.state_scan(&mut txn).unwrap().collect();
114		assert_eq!(scanned.len(), 3);
115	}
116
117	#[test]
118	fn test_simple_state_range() {
119		let mut txn = create_test_transaction();
120		let mut txn =
121			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
122		let operator = TestOperator::simple(FlowNodeId(2));
123
124		// Add ordered entries
125		for i in 0..10 {
126			let key = test_key(&format!("{:02}", i)); // Ensures lexical ordering
127			let value = EncodedValues(CowVec::new(vec![i as u8]));
128			operator.state_set(&mut txn, &key, value).unwrap();
129		}
130
131		let range = EncodedKeyRange::new(Included(test_key("02")), Excluded(test_key("05")));
132		let range_result: Vec<_> = operator.state_range(&mut txn, range).unwrap().collect();
133
134		// Should get keys 02, 03, 04 (not 05 as end is exclusive)
135		assert_eq!(range_result.len(), 3);
136		assert_eq!(range_result[0].1.as_ref()[0], 2);
137		assert_eq!(range_result[1].1.as_ref()[0], 3);
138		assert_eq!(range_result[2].1.as_ref()[0], 4);
139	}
140
141	#[test]
142	fn test_simple_state_clear() {
143		let mut txn = create_test_transaction();
144		let mut txn =
145			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
146		let operator = TestOperator::simple(FlowNodeId(3));
147
148		// Add multiple entries
149		for i in 0..5 {
150			let key = test_key(&format!("clear_{}", i));
151			let value = EncodedValues(CowVec::new(vec![i as u8]));
152			operator.state_set(&mut txn, &key, value).unwrap();
153		}
154
155		// Verify entries exist
156		let count = operator.state_scan(&mut txn).unwrap().count();
157		assert_eq!(count, 5);
158
159		// Clear all
160		operator.state_clear(&mut txn).unwrap();
161
162		// Verify all cleared
163		let count = operator.state_scan(&mut txn).unwrap().count();
164		assert_eq!(count, 0);
165	}
166
167	#[test]
168	fn test_operator_isolation() {
169		let mut txn = create_test_transaction();
170		let mut txn =
171			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
172		let operator1 = TestOperator::simple(FlowNodeId(10));
173		let operator2 = TestOperator::simple(FlowNodeId(20));
174		let shared_key = test_key("shared");
175
176		let value1 = EncodedValues(CowVec::new(vec![1]));
177		let value2 = EncodedValues(CowVec::new(vec![2]));
178
179		// Set different values for same key in different operators
180		operator1.state_set(&mut txn, &shared_key, value1.clone()).unwrap();
181		operator2.state_set(&mut txn, &shared_key, value2.clone()).unwrap();
182
183		// Each operator should have its own value
184		let result1 = operator1.state_get(&mut txn, &shared_key).unwrap().unwrap();
185		let result2 = operator2.state_get(&mut txn, &shared_key).unwrap().unwrap();
186
187		assert_row_eq(&result1, &value1);
188		assert_row_eq(&result2, &value2);
189	}
190
191	#[test]
192	fn test_empty_range() {
193		let mut txn = create_test_transaction();
194		let mut txn =
195			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
196		let operator = TestOperator::simple(FlowNodeId(4));
197
198		// Add some entries
199		for i in 0..5 {
200			let key = test_key(&format!("item_{}", i));
201			let value = test_row();
202			operator.state_set(&mut txn, &key, value).unwrap();
203		}
204
205		// Query range that doesn't exist (after all "item_*" entries)
206		let range = EncodedKeyRange::new(Included(test_key("z_aaa")), Excluded(test_key("z_zzz")));
207		let range_result: Vec<_> = operator.state_range(&mut txn, range).unwrap().collect();
208
209		assert_eq!(range_result.len(), 0);
210	}
211
212	#[test]
213	fn test_overwrite_existing_key() {
214		let mut txn = create_test_transaction();
215		let mut txn =
216			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
217		let operator = TestOperator::simple(FlowNodeId(5));
218		let key = test_key("overwrite");
219
220		let value1 = EncodedValues(CowVec::new(vec![1, 1, 1]));
221		let value2 = EncodedValues(CowVec::new(vec![2, 2, 2]));
222
223		// Set initial value
224		operator.state_set(&mut txn, &key, value1).unwrap();
225
226		// Overwrite with new value
227		operator.state_set(&mut txn, &key, value2.clone()).unwrap();
228
229		// Should have the new value
230		let result = operator.state_get(&mut txn, &key).unwrap().unwrap();
231		assert_row_eq(&result, &value2);
232	}
233
234	#[test]
235	fn test_remove_non_existent_key() {
236		let mut txn = create_test_transaction();
237		let mut txn =
238			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
239		let operator = TestOperator::simple(FlowNodeId(6));
240		let key = test_key("non_existent");
241
242		// Remove non-existent key should not error
243		operator.state_remove(&mut txn, &key).unwrap();
244
245		// Should still be None
246		assert!(operator.state_get(&mut txn, &key).unwrap().is_none());
247	}
248
249	#[test]
250	fn test_scan_after_partial_removal() {
251		let mut txn = create_test_transaction();
252		let mut txn =
253			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
254		let operator = TestOperator::simple(FlowNodeId(7));
255
256		// Add 5 entries
257		for i in 0..5 {
258			let key = test_key(&format!("partial_{}", i));
259			let value = EncodedValues(CowVec::new(vec![i as u8]));
260			operator.state_set(&mut txn, &key, value).unwrap();
261		}
262
263		// Remove some entries
264		operator.state_remove(&mut txn, &test_key("partial_1")).unwrap();
265		operator.state_remove(&mut txn, &test_key("partial_3")).unwrap();
266
267		// Should have 3 entries left (0, 2, 4)
268		let remaining: Vec<_> = operator.state_scan(&mut txn).unwrap().collect();
269		assert_eq!(remaining.len(), 3);
270	}
271}