Skip to main content

reifydb_sub_flow/operator/stateful/
window.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3use reifydb_core::{
4	encoded::{
5		encoded::EncodedValues,
6		key::{EncodedKey, EncodedKeyRange},
7		schema::Schema,
8	},
9	key::{EncodableKey, flow_node_state::FlowNodeStateKey},
10};
11use reifydb_type::Result;
12
13use super::utils;
14use crate::{operator::stateful::raw::RawStatefulOperator, transaction::FlowTransaction};
15
16/// Window-based state management for time or count-based windowing
17/// Extends TransformOperator directly and uses utility functions for state management
18pub trait WindowStateful: RawStatefulOperator {
19	/// Get or create the layout for state rows
20	fn layout(&self) -> Schema;
21
22	/// Create a new state encoded with default values
23	fn create_state(&self) -> EncodedValues {
24		let layout = self.layout();
25		layout.allocate()
26	}
27
28	/// Load state for a window
29	fn load_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey) -> Result<EncodedValues> {
30		utils::load_or_create_row(self.id(), txn, window_key, &self.layout())
31	}
32
33	/// Save state for a window
34	fn save_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey, row: EncodedValues) -> Result<()> {
35		utils::save_row(self.id(), txn, window_key, row)
36	}
37
38	/// Expire windows within a given range
39	/// The range should be constructed by the caller based on their window ordering semantics
40	fn expire_range(&self, txn: &mut FlowTransaction, range: EncodedKeyRange) -> Result<u32> {
41		// Add the operator state prefix to the range
42		let prefixed_range = range.with_prefix(FlowNodeStateKey::new(self.id(), vec![]).encode());
43
44		// Collect keys to remove (similar pattern to state_clear in utils.rs)
45		let keys_to_remove = {
46			let mut stream = txn.range(prefixed_range, 1024);
47			let mut keys = Vec::new();
48			while let Some(result) = stream.next() {
49				let multi = result?;
50				keys.push(multi.key);
51			}
52			keys
53		};
54
55		let mut count = 0;
56		for key in keys_to_remove {
57			txn.remove(&key)?;
58			count += 1;
59		}
60
61		Ok(count as u32)
62	}
63}
64
65#[cfg(test)]
66pub mod tests {
67	use std::ops::Bound::{Excluded, Unbounded};
68
69	use reifydb_catalog::catalog::Catalog;
70	use reifydb_core::{
71		common::CommitVersion, interface::catalog::flow::FlowNodeId,
72		util::encoding::keycode::serializer::KeySerializer,
73	};
74	use reifydb_transaction::interceptor::interceptors::Interceptors;
75
76	use super::*;
77	use crate::{operator::stateful::test_utils::test::*, transaction::FlowTransaction};
78
79	/// Helper to create window keys from u64 for testing
80	/// Uses inverted encoding for proper ordering (smaller IDs produce larger keys)
81	fn test_window_key(window_id: u64) -> EncodedKey {
82		let mut serializer = KeySerializer::with_capacity(16);
83		serializer.extend_bytes(b"w:");
84		serializer.extend_u64(window_id);
85		EncodedKey::new(serializer.finish())
86	}
87
88	// Extend TestOperator to implement WindowStateful
89	impl WindowStateful for TestOperator {
90		fn layout(&self) -> Schema {
91			self.layout.clone()
92		}
93	}
94
95	#[test]
96	fn test_window_key_encoding() {
97		// Test different window IDs
98		let key1 = test_window_key(1);
99		let key2 = test_window_key(2);
100		let key100 = test_window_key(100);
101
102		// Keys should be different
103		assert_ne!(key1.as_ref(), key2.as_ref());
104		assert_ne!(key1.as_ref(), key100.as_ref());
105
106		// Due to inverted encoding, smaller window IDs produce larger keys
107		assert!(key1 > key2);
108		assert!(key2 > key100);
109	}
110
111	#[test]
112	fn test_create_state() {
113		let operator = TestOperator::simple(FlowNodeId(1));
114		let state = operator.create_state();
115
116		// State should be allocated based on layout
117		assert!(state.len() > 0);
118	}
119
120	#[test]
121	fn test_load_save_window_state() {
122		let mut txn = create_test_transaction();
123		let mut txn =
124			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
125		let operator = TestOperator::simple(FlowNodeId(1));
126		let window_key = test_window_key(42);
127
128		// Initially should create new state
129		let state1 = operator.load_state(&mut txn, &window_key).unwrap();
130
131		// Modify and save
132		let mut modified = state1.clone();
133		let layout = operator.layout();
134		layout.set_i64(&mut modified, 0, 0xAB);
135		operator.save_state(&mut txn, &window_key, modified.clone()).unwrap();
136
137		// Load should return modified state
138		let state2 = operator.load_state(&mut txn, &window_key).unwrap();
139		assert_eq!(layout.get_i64(&state2, 0), 0xAB);
140	}
141
142	#[test]
143	fn test_multiple_windows() {
144		let mut txn = create_test_transaction();
145		let mut txn =
146			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
147		let operator = TestOperator::simple(FlowNodeId(1));
148
149		// Create states for multiple windows
150		let window_keys: Vec<_> = (0..5).map(|i| test_window_key(i)).collect();
151		let layout = operator.layout();
152		for (i, window_key) in window_keys.iter().enumerate() {
153			let mut state = operator.create_state();
154			layout.set_i64(&mut state, 0, i as i64);
155			operator.save_state(&mut txn, window_key, state).unwrap();
156		}
157
158		// Verify each window has its own state
159		for (i, window_key) in window_keys.iter().enumerate() {
160			let state = operator.load_state(&mut txn, window_key).unwrap();
161			assert_eq!(layout.get_i64(&state, 0), i as i64);
162		}
163	}
164
165	#[test]
166	fn test_expire_before() {
167		let mut txn = create_test_transaction();
168		let mut txn =
169			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
170		let operator = TestOperator::simple(FlowNodeId(1));
171
172		// Create windows 0 through 9
173		let window_keys: Vec<_> = (0..10).map(|i| test_window_key(i)).collect();
174		let layout = operator.layout();
175		for (i, window_key) in window_keys.iter().enumerate() {
176			let mut state = operator.create_state();
177			layout.set_i64(&mut state, 0, i as i64);
178			operator.save_state(&mut txn, window_key, state).unwrap();
179		}
180
181		// Expire windows before 5 (should remove 0-4)
182		// Due to inverted encoding, windows with smaller IDs have larger keys
183		// So to expire windows < 5, we need range from key(5) to end
184		let before_key = test_window_key(5);
185		let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
186		let expired = operator.expire_range(&mut txn, range).unwrap();
187		assert_eq!(expired, 5);
188
189		// Verify windows 0-4 are gone
190		for i in 0..5 {
191			let state = operator.load_state(&mut txn, &window_keys[i]).unwrap();
192			assert_eq!(layout.get_i64(&state, 0), 0); // Should be newly created (default)
193		}
194
195		// Verify windows 5-9 still exist
196		for i in 5..10 {
197			let state = operator.load_state(&mut txn, &window_keys[i]).unwrap();
198			assert_eq!(layout.get_i64(&state, 0), i as i64);
199		}
200	}
201
202	#[test]
203	fn test_expire_empty_range() {
204		let mut txn = create_test_transaction();
205		let mut txn =
206			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
207		let operator = TestOperator::simple(FlowNodeId(1));
208
209		// Create windows 5 through 9
210		let window_keys: Vec<_> = (5..10).map(|i| test_window_key(i)).collect();
211		let layout = operator.layout();
212		for (idx, window_key) in window_keys.iter().enumerate() {
213			let mut state = operator.create_state();
214			layout.set_i64(&mut state, 0, (idx + 5) as i64);
215			operator.save_state(&mut txn, window_key, state).unwrap();
216		}
217
218		// Expire before 3 (should remove nothing since all windows are >= 5)
219		let before_key = test_window_key(3);
220		let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
221		let expired = operator.expire_range(&mut txn, range).unwrap();
222		assert_eq!(expired, 0);
223
224		// All windows should still exist
225		for (idx, window_key) in window_keys.iter().enumerate() {
226			let state = operator.load_state(&mut txn, window_key).unwrap();
227			assert_eq!(layout.get_i64(&state, 0), (idx + 5) as i64);
228		}
229	}
230
231	#[test]
232	fn test_expire_all() {
233		let mut txn = create_test_transaction();
234		let mut txn =
235			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
236		let operator = TestOperator::simple(FlowNodeId(1));
237
238		// Create windows 0 through 4
239		let window_keys: Vec<_> = (0..5).map(|i| test_window_key(i)).collect();
240		let layout = operator.layout();
241		for (i, window_key) in window_keys.iter().enumerate() {
242			let mut state = operator.create_state();
243			layout.set_i64(&mut state, 0, i as i64);
244			operator.save_state(&mut txn, window_key, state).unwrap();
245		}
246
247		// Expire before 100 (should remove all)
248		let before_key = test_window_key(100);
249		let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
250		let expired = operator.expire_range(&mut txn, range).unwrap();
251		assert_eq!(expired, 5);
252
253		// All windows should be gone
254		for window_key in &window_keys {
255			let state = operator.load_state(&mut txn, window_key).unwrap();
256			assert_eq!(layout.get_i64(&state, 0), 0); // Should be newly created (default)
257		}
258	}
259
260	#[test]
261	fn test_sliding_window_simulation() {
262		let mut txn = create_test_transaction();
263		let mut txn =
264			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
265		let operator = TestOperator::new(FlowNodeId(1));
266
267		// Simulate a sliding window maintaining last 3 windows
268		let window_size = 3;
269		let mut all_window_keys = Vec::new();
270		let layout = operator.layout();
271
272		for current_window in 0..10 {
273			// Add data to current window
274			let window_key = test_window_key(current_window);
275			all_window_keys.push(window_key.clone());
276			let mut state = operator.create_state();
277			layout.set_i64(&mut state, 0, current_window as i64);
278			operator.save_state(&mut txn, &window_key, state).unwrap();
279
280			// Expire old windows
281			if current_window >= window_size {
282				let expire_before = current_window - window_size + 1;
283				let before_key = test_window_key(expire_before);
284				let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
285				operator.expire_range(&mut txn, range).unwrap();
286			}
287		}
288
289		// Only windows 7, 8, 9 should exist
290		for i in 0..7 {
291			let state = operator.load_state(&mut txn, &all_window_keys[i]).unwrap();
292			assert_eq!(layout.get_i64(&state, 0), 0); // Should be default (expired)
293		}
294
295		for i in 7..10 {
296			let state = operator.load_state(&mut txn, &all_window_keys[i]).unwrap();
297			assert_eq!(layout.get_i64(&state, 0), i as i64); // Should have saved data
298		}
299	}
300}