reifydb_sub_flow/operator/stateful/
single.rs1use reifydb_core::encoded::{key::EncodedKey, row::EncodedRow, schema::RowSchema};
4use reifydb_type::Result;
5
6use super::utils;
7use crate::{operator::stateful::raw::RawStatefulOperator, transaction::FlowTransaction};
8
9pub trait SingleStateful: RawStatefulOperator {
12 fn layout(&self) -> RowSchema;
14
15 fn key(&self) -> EncodedKey {
17 utils::empty_key()
18 }
19
20 fn create_state(&self) -> EncodedRow {
22 let layout = self.layout();
23 layout.allocate()
24 }
25
26 fn load_state(&self, txn: &mut FlowTransaction) -> Result<EncodedRow> {
28 let key = self.key();
29 utils::load_or_create_row(self.id(), txn, &key, &self.layout())
30 }
31
32 fn save_state(&self, txn: &mut FlowTransaction, row: EncodedRow) -> Result<()> {
34 let key = self.key();
35 utils::save_row(self.id(), txn, &key, row)
36 }
37
38 fn update_state<F>(&self, txn: &mut FlowTransaction, f: F) -> Result<EncodedRow>
40 where
41 F: FnOnce(&RowSchema, &mut EncodedRow) -> Result<()>,
42 {
43 let schema = self.layout();
44 let mut row = self.load_state(txn)?;
45 f(&schema, &mut row)?;
46 self.save_state(txn, row.clone())?;
47 Ok(row)
48 }
49
50 fn clear_state(&self, txn: &mut FlowTransaction) -> Result<()> {
52 let key = self.key();
53 utils::state_remove(self.id(), txn, &key)
54 }
55}
56
57#[cfg(test)]
58pub mod tests {
59 use reifydb_catalog::catalog::Catalog;
60 use reifydb_core::{common::CommitVersion, interface::catalog::flow::FlowNodeId};
61 use reifydb_transaction::interceptor::interceptors::Interceptors;
62
63 use super::*;
64 use crate::{operator::stateful::test_utils::test::*, transaction::FlowTransaction};
65
66 impl SingleStateful for TestOperator {
68 fn layout(&self) -> RowSchema {
69 self.layout.clone()
70 }
71 }
72
73 #[test]
74 fn testault_key() {
75 let operator = TestOperator::simple(FlowNodeId(1));
76 let key = operator.key();
77
78 assert_eq!(key.len(), 0);
80 }
81
82 #[test]
83 fn test_create_state() {
84 let operator = TestOperator::simple(FlowNodeId(1));
85 let state = operator.create_state();
86
87 assert!(state.len() > 0);
89 }
90
91 #[test]
92 fn test_load_save_state() {
93 let mut txn = create_test_transaction();
94 let mut txn =
95 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
96 let operator = TestOperator::simple(FlowNodeId(1));
97
98 let state1 = operator.load_state(&mut txn).unwrap();
100
101 let mut modified = state1.clone();
103 let layout = operator.layout();
104 layout.set_i64(&mut modified, 0, 0x33);
105 operator.save_state(&mut txn, modified.clone()).unwrap();
106
107 let state2 = operator.load_state(&mut txn).unwrap();
109 assert_eq!(layout.get_i64(&state2, 0), 0x33);
110 }
111
112 #[test]
113 fn test_update_state() {
114 let mut txn = create_test_transaction();
115 let mut txn =
116 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
117 let operator = TestOperator::simple(FlowNodeId(1));
118
119 let result = operator
121 .update_state(&mut txn, |schema, row| {
122 schema.set_i64(row, 0, 0x77);
123 Ok(())
124 })
125 .unwrap();
126
127 let layout = operator.layout();
128 assert_eq!(layout.get_i64(&result, 0), 0x77);
129
130 let loaded = operator.load_state(&mut txn).unwrap();
132 assert_eq!(layout.get_i64(&loaded, 0), 0x77);
133 }
134
135 #[test]
136 fn test_clear_state() {
137 let mut txn = create_test_transaction();
138 let mut txn =
139 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
140 let operator = TestOperator::simple(FlowNodeId(1));
141
142 operator.update_state(&mut txn, |schema, row| {
144 schema.set_i64(row, 0, 0x99);
145 Ok(())
146 })
147 .unwrap();
148
149 operator.clear_state(&mut txn).unwrap();
151
152 let new_state = operator.load_state(&mut txn).unwrap();
154 let layout = operator.layout();
155 assert_eq!(layout.get_i64(&new_state, 0), 0); }
157
158 #[test]
159 fn test_multiple_operators_isolated() {
160 let mut txn = create_test_transaction();
161 let mut txn =
162 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
163 let operator1 = TestOperator::simple(FlowNodeId(1));
164 let operator2 = TestOperator::simple(FlowNodeId(2));
165
166 operator1
168 .update_state(&mut txn, |schema, row| {
169 schema.set_i64(row, 0, 0x11);
170 Ok(())
171 })
172 .unwrap();
173
174 operator2
175 .update_state(&mut txn, |schema, row| {
176 schema.set_i64(row, 0, 0x22);
177 Ok(())
178 })
179 .unwrap();
180
181 let state1 = operator1.load_state(&mut txn).unwrap();
183 let state2 = operator2.load_state(&mut txn).unwrap();
184
185 let layout1 = operator1.layout();
186 let layout2 = operator2.layout();
187 assert_eq!(layout1.get_i64(&state1, 0), 0x11);
188 assert_eq!(layout2.get_i64(&state2, 0), 0x22);
189 }
190
191 #[test]
192 fn test_counter_simulation() {
193 let mut txn = create_test_transaction();
194 let mut txn =
195 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
196 let operator = TestOperator::new(FlowNodeId(1));
197
198 for i in 1..=5 {
200 operator.update_state(&mut txn, |schema, row| {
201 let current = schema.get_i64(row, 0);
203 schema.set_i64(row, 0, current + 1);
204 Ok(())
205 })
206 .unwrap();
207
208 let state = operator.load_state(&mut txn).unwrap();
209 let layout = operator.layout();
210 assert_eq!(layout.get_i64(&state, 0), i);
211 }
212 }
213}