Skip to main content

reifydb_sub_flow/operator/stateful/
single.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3use reifydb_core::encoded::{key::EncodedKey, row::EncodedRow, schema::RowSchema};
4use reifydb_type::Result;
5
6use super::utils;
7use crate::{operator::stateful::raw::RawStatefulOperator, transaction::FlowTransaction};
8
9/// Operator with a single state value (like counters, running sums, etc.)
10/// Extends TransformOperator directly and uses utility functions for state management
11pub trait SingleStateful: RawStatefulOperator {
12	/// Get or create the layout for state rows
13	fn layout(&self) -> RowSchema;
14
15	/// Key for the single state - default is empty
16	fn key(&self) -> EncodedKey {
17		utils::empty_key()
18	}
19
20	/// Create a new state encoded with default values
21	fn create_state(&self) -> EncodedRow {
22		let layout = self.layout();
23		layout.allocate()
24	}
25
26	/// Load the operator's single state encoded
27	fn load_state(&self, txn: &mut FlowTransaction) -> Result<EncodedRow> {
28		let key = self.key();
29		utils::load_or_create_row(self.id(), txn, &key, &self.layout())
30	}
31
32	/// Save the operator's single state encoded
33	fn save_state(&self, txn: &mut FlowTransaction, row: EncodedRow) -> Result<()> {
34		let key = self.key();
35		utils::save_row(self.id(), txn, &key, row)
36	}
37
38	/// Update state with a function
39	fn update_state<F>(&self, txn: &mut FlowTransaction, f: F) -> Result<EncodedRow>
40	where
41		F: FnOnce(&RowSchema, &mut EncodedRow) -> Result<()>,
42	{
43		let schema = self.layout();
44		let mut row = self.load_state(txn)?;
45		f(&schema, &mut row)?;
46		self.save_state(txn, row.clone())?;
47		Ok(row)
48	}
49
50	/// Clear state
51	fn clear_state(&self, txn: &mut FlowTransaction) -> Result<()> {
52		let key = self.key();
53		utils::state_remove(self.id(), txn, &key)
54	}
55}
56
57#[cfg(test)]
58pub mod tests {
59	use reifydb_catalog::catalog::Catalog;
60	use reifydb_core::{common::CommitVersion, interface::catalog::flow::FlowNodeId};
61	use reifydb_transaction::interceptor::interceptors::Interceptors;
62
63	use super::*;
64	use crate::{operator::stateful::test_utils::test::*, transaction::FlowTransaction};
65
66	// Extend TestOperator to implement SingleStateful
67	impl SingleStateful for TestOperator {
68		fn layout(&self) -> RowSchema {
69			self.layout.clone()
70		}
71	}
72
73	#[test]
74	fn testault_key() {
75		let operator = TestOperator::simple(FlowNodeId(1));
76		let key = operator.key();
77
78		// Default key should be empty
79		assert_eq!(key.len(), 0);
80	}
81
82	#[test]
83	fn test_create_state() {
84		let operator = TestOperator::simple(FlowNodeId(1));
85		let state = operator.create_state();
86
87		// State should be allocated based on layout
88		assert!(state.len() > 0);
89	}
90
91	#[test]
92	fn test_load_save_state() {
93		let mut txn = create_test_transaction();
94		let mut txn =
95			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
96		let operator = TestOperator::simple(FlowNodeId(1));
97
98		// Initially should create new state
99		let state1 = operator.load_state(&mut txn).unwrap();
100
101		// Modify and save
102		let mut modified = state1.clone();
103		let layout = operator.layout();
104		layout.set_i64(&mut modified, 0, 0x33);
105		operator.save_state(&mut txn, modified.clone()).unwrap();
106
107		// Load should return modified state
108		let state2 = operator.load_state(&mut txn).unwrap();
109		assert_eq!(layout.get_i64(&state2, 0), 0x33);
110	}
111
112	#[test]
113	fn test_update_state() {
114		let mut txn = create_test_transaction();
115		let mut txn =
116			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
117		let operator = TestOperator::simple(FlowNodeId(1));
118
119		// Update state with a function
120		let result = operator
121			.update_state(&mut txn, |schema, row| {
122				schema.set_i64(row, 0, 0x77);
123				Ok(())
124			})
125			.unwrap();
126
127		let layout = operator.layout();
128		assert_eq!(layout.get_i64(&result, 0), 0x77);
129
130		// Verify persistence
131		let loaded = operator.load_state(&mut txn).unwrap();
132		assert_eq!(layout.get_i64(&loaded, 0), 0x77);
133	}
134
135	#[test]
136	fn test_clear_state() {
137		let mut txn = create_test_transaction();
138		let mut txn =
139			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
140		let operator = TestOperator::simple(FlowNodeId(1));
141
142		// Create and modify state
143		operator.update_state(&mut txn, |schema, row| {
144			schema.set_i64(row, 0, 0x99);
145			Ok(())
146		})
147		.unwrap();
148
149		// Clear state
150		operator.clear_state(&mut txn).unwrap();
151
152		// Loading should create new default state
153		let new_state = operator.load_state(&mut txn).unwrap();
154		let layout = operator.layout();
155		assert_eq!(layout.get_i64(&new_state, 0), 0); // Should be default initialized
156	}
157
158	#[test]
159	fn test_multiple_operators_isolated() {
160		let mut txn = create_test_transaction();
161		let mut txn =
162			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
163		let operator1 = TestOperator::simple(FlowNodeId(1));
164		let operator2 = TestOperator::simple(FlowNodeId(2));
165
166		// Set different states for each operator
167		operator1
168			.update_state(&mut txn, |schema, row| {
169				schema.set_i64(row, 0, 0x11);
170				Ok(())
171			})
172			.unwrap();
173
174		operator2
175			.update_state(&mut txn, |schema, row| {
176				schema.set_i64(row, 0, 0x22);
177				Ok(())
178			})
179			.unwrap();
180
181		// Verify each operator has its own state
182		let state1 = operator1.load_state(&mut txn).unwrap();
183		let state2 = operator2.load_state(&mut txn).unwrap();
184
185		let layout1 = operator1.layout();
186		let layout2 = operator2.layout();
187		assert_eq!(layout1.get_i64(&state1, 0), 0x11);
188		assert_eq!(layout2.get_i64(&state2, 0), 0x22);
189	}
190
191	#[test]
192	fn test_counter_simulation() {
193		let mut txn = create_test_transaction();
194		let mut txn =
195			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
196		let operator = TestOperator::new(FlowNodeId(1));
197
198		// Simulate a counter incrementing
199		for i in 1..=5 {
200			operator.update_state(&mut txn, |schema, row| {
201				// Assuming first field is an int8 counter
202				let current = schema.get_i64(row, 0);
203				schema.set_i64(row, 0, current + 1);
204				Ok(())
205			})
206			.unwrap();
207
208			let state = operator.load_state(&mut txn).unwrap();
209			let layout = operator.layout();
210			assert_eq!(layout.get_i64(&state, 0), i);
211		}
212	}
213}