reifydb_sub_flow/operator/stateful/
single.rs1use reifydb_core::encoded::{key::EncodedKey, row::EncodedRow, shape::RowShape};
4use reifydb_type::Result;
5
6use super::utils;
7use crate::{operator::stateful::raw::RawStatefulOperator, transaction::FlowTransaction};
8
9pub trait SingleStateful: RawStatefulOperator {
12 fn layout(&self) -> RowShape;
14
15 fn key(&self) -> EncodedKey {
17 utils::empty_key()
18 }
19
20 fn create_state(&self) -> EncodedRow {
22 let layout = self.layout();
23 layout.allocate()
24 }
25
26 fn load_state(&self, txn: &mut FlowTransaction) -> Result<EncodedRow> {
28 let key = self.key();
29 utils::load_or_create_row(self.id(), txn, &key, &self.layout())
30 }
31
32 fn save_state(&self, txn: &mut FlowTransaction, row: EncodedRow) -> Result<()> {
34 let key = self.key();
35 utils::save_row(self.id(), txn, &key, row)
36 }
37
38 fn update_state<F>(&self, txn: &mut FlowTransaction, f: F) -> Result<EncodedRow>
40 where
41 F: FnOnce(&RowShape, &mut EncodedRow) -> Result<()>,
42 {
43 let shape = self.layout();
44 let mut row = self.load_state(txn)?;
45 f(&shape, &mut row)?;
46 self.save_state(txn, row.clone())?;
47 Ok(row)
48 }
49
50 fn clear_state(&self, txn: &mut FlowTransaction) -> Result<()> {
52 let key = self.key();
53 utils::state_remove(self.id(), txn, &key)
54 }
55}
56
57#[cfg(test)]
58pub mod tests {
59 use reifydb_catalog::catalog::Catalog;
60 use reifydb_core::{common::CommitVersion, interface::catalog::flow::FlowNodeId};
61 use reifydb_runtime::context::clock::{Clock, MockClock};
62 use reifydb_transaction::interceptor::interceptors::Interceptors;
63
64 use super::*;
65 use crate::{operator::stateful::test_utils::test::*, transaction::FlowTransaction};
66
67 impl SingleStateful for TestOperator {
69 fn layout(&self) -> RowShape {
70 self.layout.clone()
71 }
72 }
73
74 #[test]
75 fn testault_key() {
76 let operator = TestOperator::simple(FlowNodeId(1));
77 let key = operator.key();
78
79 assert_eq!(key.len(), 0);
81 }
82
83 #[test]
84 fn test_create_state() {
85 let operator = TestOperator::simple(FlowNodeId(1));
86 let state = operator.create_state();
87
88 assert!(state.len() > 0);
90 }
91
92 #[test]
93 fn test_load_save_state() {
94 let mut txn = create_test_transaction();
95 let mut txn = FlowTransaction::deferred(
96 &mut txn,
97 CommitVersion(1),
98 Catalog::testing(),
99 Interceptors::new(),
100 Clock::Mock(MockClock::from_millis(1000)),
101 );
102 let operator = TestOperator::simple(FlowNodeId(1));
103
104 let state1 = operator.load_state(&mut txn).unwrap();
106
107 let mut modified = state1.clone();
109 let layout = operator.layout();
110 layout.set_i64(&mut modified, 0, 0x33);
111 operator.save_state(&mut txn, modified.clone()).unwrap();
112
113 let state2 = operator.load_state(&mut txn).unwrap();
115 assert_eq!(layout.get_i64(&state2, 0), 0x33);
116 }
117
118 #[test]
119 fn test_update_state() {
120 let mut txn = create_test_transaction();
121 let mut txn = FlowTransaction::deferred(
122 &mut txn,
123 CommitVersion(1),
124 Catalog::testing(),
125 Interceptors::new(),
126 Clock::Mock(MockClock::from_millis(1000)),
127 );
128 let operator = TestOperator::simple(FlowNodeId(1));
129
130 let result = operator
132 .update_state(&mut txn, |shape, row| {
133 shape.set_i64(row, 0, 0x77);
134 Ok(())
135 })
136 .unwrap();
137
138 let layout = operator.layout();
139 assert_eq!(layout.get_i64(&result, 0), 0x77);
140
141 let loaded = operator.load_state(&mut txn).unwrap();
143 assert_eq!(layout.get_i64(&loaded, 0), 0x77);
144 }
145
146 #[test]
147 fn test_clear_state() {
148 let mut txn = create_test_transaction();
149 let mut txn = FlowTransaction::deferred(
150 &mut txn,
151 CommitVersion(1),
152 Catalog::testing(),
153 Interceptors::new(),
154 Clock::Mock(MockClock::from_millis(1000)),
155 );
156 let operator = TestOperator::simple(FlowNodeId(1));
157
158 operator.update_state(&mut txn, |shape, row| {
160 shape.set_i64(row, 0, 0x99);
161 Ok(())
162 })
163 .unwrap();
164
165 operator.clear_state(&mut txn).unwrap();
167
168 let new_state = operator.load_state(&mut txn).unwrap();
170 let layout = operator.layout();
171 assert_eq!(layout.get_i64(&new_state, 0), 0); }
173
174 #[test]
175 fn test_multiple_operators_isolated() {
176 let mut txn = create_test_transaction();
177 let mut txn = FlowTransaction::deferred(
178 &mut txn,
179 CommitVersion(1),
180 Catalog::testing(),
181 Interceptors::new(),
182 Clock::Mock(MockClock::from_millis(1000)),
183 );
184 let operator1 = TestOperator::simple(FlowNodeId(1));
185 let operator2 = TestOperator::simple(FlowNodeId(2));
186
187 operator1
189 .update_state(&mut txn, |shape, row| {
190 shape.set_i64(row, 0, 0x11);
191 Ok(())
192 })
193 .unwrap();
194
195 operator2
196 .update_state(&mut txn, |shape, row| {
197 shape.set_i64(row, 0, 0x22);
198 Ok(())
199 })
200 .unwrap();
201
202 let state1 = operator1.load_state(&mut txn).unwrap();
204 let state2 = operator2.load_state(&mut txn).unwrap();
205
206 let layout1 = operator1.layout();
207 let layout2 = operator2.layout();
208 assert_eq!(layout1.get_i64(&state1, 0), 0x11);
209 assert_eq!(layout2.get_i64(&state2, 0), 0x22);
210 }
211
212 #[test]
213 fn test_counter_simulation() {
214 let mut txn = create_test_transaction();
215 let mut txn = FlowTransaction::deferred(
216 &mut txn,
217 CommitVersion(1),
218 Catalog::testing(),
219 Interceptors::new(),
220 Clock::Mock(MockClock::from_millis(1000)),
221 );
222 let operator = TestOperator::new(FlowNodeId(1));
223
224 for i in 1..=5 {
226 operator.update_state(&mut txn, |shape, row| {
227 let current = shape.get_i64(row, 0);
229 shape.set_i64(row, 0, current + 1);
230 Ok(())
231 })
232 .unwrap();
233
234 let state = operator.load_state(&mut txn).unwrap();
235 let layout = operator.layout();
236 assert_eq!(layout.get_i64(&state, 0), i);
237 }
238 }
239}