Skip to main content

reifydb_sub_flow/operator/stateful/
single.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3use reifydb_core::encoded::{key::EncodedKey, row::EncodedRow, shape::RowShape};
4use reifydb_type::Result;
5
6use super::utils;
7use crate::{operator::stateful::raw::RawStatefulOperator, transaction::FlowTransaction};
8
9/// Operator with a single state value (like counters, running sums, etc.)
10/// Extends TransformOperator directly and uses utility functions for state management
11pub trait SingleStateful: RawStatefulOperator {
12	/// Get or create the layout for state rows
13	fn layout(&self) -> RowShape;
14
15	/// Key for the single state - default is empty
16	fn key(&self) -> EncodedKey {
17		utils::empty_key()
18	}
19
20	/// Create a new state encoded with default values
21	fn create_state(&self) -> EncodedRow {
22		let layout = self.layout();
23		layout.allocate()
24	}
25
26	/// Load the operator's single state encoded
27	fn load_state(&self, txn: &mut FlowTransaction) -> Result<EncodedRow> {
28		let key = self.key();
29		utils::load_or_create_row(self.id(), txn, &key, &self.layout())
30	}
31
32	/// Save the operator's single state encoded
33	fn save_state(&self, txn: &mut FlowTransaction, row: EncodedRow) -> Result<()> {
34		let key = self.key();
35		utils::save_row(self.id(), txn, &key, row)
36	}
37
38	/// Update state with a function
39	fn update_state<F>(&self, txn: &mut FlowTransaction, f: F) -> Result<EncodedRow>
40	where
41		F: FnOnce(&RowShape, &mut EncodedRow) -> Result<()>,
42	{
43		let shape = self.layout();
44		let mut row = self.load_state(txn)?;
45		f(&shape, &mut row)?;
46		self.save_state(txn, row.clone())?;
47		Ok(row)
48	}
49
50	/// Clear state
51	fn clear_state(&self, txn: &mut FlowTransaction) -> Result<()> {
52		let key = self.key();
53		utils::state_remove(self.id(), txn, &key)
54	}
55}
56
57#[cfg(test)]
58pub mod tests {
59	use reifydb_catalog::catalog::Catalog;
60	use reifydb_core::{common::CommitVersion, interface::catalog::flow::FlowNodeId};
61	use reifydb_runtime::context::clock::{Clock, MockClock};
62	use reifydb_transaction::interceptor::interceptors::Interceptors;
63
64	use super::*;
65	use crate::{operator::stateful::test_utils::test::*, transaction::FlowTransaction};
66
67	// Extend TestOperator to implement SingleStateful
68	impl SingleStateful for TestOperator {
69		fn layout(&self) -> RowShape {
70			self.layout.clone()
71		}
72	}
73
74	#[test]
75	fn testault_key() {
76		let operator = TestOperator::simple(FlowNodeId(1));
77		let key = operator.key();
78
79		// Default key should be empty
80		assert_eq!(key.len(), 0);
81	}
82
83	#[test]
84	fn test_create_state() {
85		let operator = TestOperator::simple(FlowNodeId(1));
86		let state = operator.create_state();
87
88		// State should be allocated based on layout
89		assert!(state.len() > 0);
90	}
91
92	#[test]
93	fn test_load_save_state() {
94		let mut txn = create_test_transaction();
95		let mut txn = FlowTransaction::deferred(
96			&mut txn,
97			CommitVersion(1),
98			Catalog::testing(),
99			Interceptors::new(),
100			Clock::Mock(MockClock::from_millis(1000)),
101		);
102		let operator = TestOperator::simple(FlowNodeId(1));
103
104		// Initially should create new state
105		let state1 = operator.load_state(&mut txn).unwrap();
106
107		// Modify and save
108		let mut modified = state1.clone();
109		let layout = operator.layout();
110		layout.set_i64(&mut modified, 0, 0x33);
111		operator.save_state(&mut txn, modified.clone()).unwrap();
112
113		// Load should return modified state
114		let state2 = operator.load_state(&mut txn).unwrap();
115		assert_eq!(layout.get_i64(&state2, 0), 0x33);
116	}
117
118	#[test]
119	fn test_update_state() {
120		let mut txn = create_test_transaction();
121		let mut txn = FlowTransaction::deferred(
122			&mut txn,
123			CommitVersion(1),
124			Catalog::testing(),
125			Interceptors::new(),
126			Clock::Mock(MockClock::from_millis(1000)),
127		);
128		let operator = TestOperator::simple(FlowNodeId(1));
129
130		// Update state with a function
131		let result = operator
132			.update_state(&mut txn, |shape, row| {
133				shape.set_i64(row, 0, 0x77);
134				Ok(())
135			})
136			.unwrap();
137
138		let layout = operator.layout();
139		assert_eq!(layout.get_i64(&result, 0), 0x77);
140
141		// Verify persistence
142		let loaded = operator.load_state(&mut txn).unwrap();
143		assert_eq!(layout.get_i64(&loaded, 0), 0x77);
144	}
145
146	#[test]
147	fn test_clear_state() {
148		let mut txn = create_test_transaction();
149		let mut txn = FlowTransaction::deferred(
150			&mut txn,
151			CommitVersion(1),
152			Catalog::testing(),
153			Interceptors::new(),
154			Clock::Mock(MockClock::from_millis(1000)),
155		);
156		let operator = TestOperator::simple(FlowNodeId(1));
157
158		// Create and modify state
159		operator.update_state(&mut txn, |shape, row| {
160			shape.set_i64(row, 0, 0x99);
161			Ok(())
162		})
163		.unwrap();
164
165		// Clear state
166		operator.clear_state(&mut txn).unwrap();
167
168		// Loading should create new default state
169		let new_state = operator.load_state(&mut txn).unwrap();
170		let layout = operator.layout();
171		assert_eq!(layout.get_i64(&new_state, 0), 0); // Should be default initialized
172	}
173
174	#[test]
175	fn test_multiple_operators_isolated() {
176		let mut txn = create_test_transaction();
177		let mut txn = FlowTransaction::deferred(
178			&mut txn,
179			CommitVersion(1),
180			Catalog::testing(),
181			Interceptors::new(),
182			Clock::Mock(MockClock::from_millis(1000)),
183		);
184		let operator1 = TestOperator::simple(FlowNodeId(1));
185		let operator2 = TestOperator::simple(FlowNodeId(2));
186
187		// Set different states for each operator
188		operator1
189			.update_state(&mut txn, |shape, row| {
190				shape.set_i64(row, 0, 0x11);
191				Ok(())
192			})
193			.unwrap();
194
195		operator2
196			.update_state(&mut txn, |shape, row| {
197				shape.set_i64(row, 0, 0x22);
198				Ok(())
199			})
200			.unwrap();
201
202		// Verify each operator has its own state
203		let state1 = operator1.load_state(&mut txn).unwrap();
204		let state2 = operator2.load_state(&mut txn).unwrap();
205
206		let layout1 = operator1.layout();
207		let layout2 = operator2.layout();
208		assert_eq!(layout1.get_i64(&state1, 0), 0x11);
209		assert_eq!(layout2.get_i64(&state2, 0), 0x22);
210	}
211
212	#[test]
213	fn test_counter_simulation() {
214		let mut txn = create_test_transaction();
215		let mut txn = FlowTransaction::deferred(
216			&mut txn,
217			CommitVersion(1),
218			Catalog::testing(),
219			Interceptors::new(),
220			Clock::Mock(MockClock::from_millis(1000)),
221		);
222		let operator = TestOperator::new(FlowNodeId(1));
223
224		// Simulate a counter incrementing
225		for i in 1..=5 {
226			operator.update_state(&mut txn, |shape, row| {
227				// Assuming first field is an int8 counter
228				let current = shape.get_i64(row, 0);
229				shape.set_i64(row, 0, current + 1);
230				Ok(())
231			})
232			.unwrap();
233
234			let state = operator.load_state(&mut txn).unwrap();
235			let layout = operator.layout();
236			assert_eq!(layout.get_i64(&state, 0), i);
237		}
238	}
239}