Skip to main content

reifydb_sub_flow/operator/stateful/
window.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3use reifydb_core::{
4	encoded::{
5		key::{EncodedKey, EncodedKeyRange},
6		row::EncodedRow,
7		schema::RowSchema,
8	},
9	key::{EncodableKey, flow_node_state::FlowNodeStateKey},
10};
11use reifydb_type::Result;
12
13use super::utils;
14use crate::{operator::stateful::raw::RawStatefulOperator, transaction::FlowTransaction};
15
16/// Window-based state management for time or count-based windowing
17/// Extends TransformOperator directly and uses utility functions for state management
18pub trait WindowStateful: RawStatefulOperator {
19	/// Get or create the layout for state rows
20	fn layout(&self) -> RowSchema;
21
22	/// Create a new state encoded with default values
23	fn create_state(&self) -> EncodedRow {
24		let layout = self.layout();
25		layout.allocate()
26	}
27
28	/// Load state for a window
29	fn load_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey) -> Result<EncodedRow> {
30		utils::load_or_create_row(self.id(), txn, window_key, &self.layout())
31	}
32
33	/// Save state for a window
34	fn save_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey, row: EncodedRow) -> Result<()> {
35		utils::save_row(self.id(), txn, window_key, row)
36	}
37
38	/// Scan keys within a range without removing them (read-only)
39	fn scan_keys_in_range(&self, txn: &mut FlowTransaction, range: &EncodedKeyRange) -> Result<Vec<EncodedKey>> {
40		let prefixed_range = range.clone().with_prefix(FlowNodeStateKey::new(self.id(), vec![]).encode());
41		let mut stream = txn.range(prefixed_range, 1024);
42		let mut keys = Vec::new();
43		while let Some(result) = stream.next() {
44			let multi = result?;
45			keys.push(EncodedKey::new(multi.key.to_vec()));
46		}
47		Ok(keys)
48	}
49
50	/// Expire windows within a given range
51	/// The range should be constructed by the caller based on their window ordering semantics
52	fn expire_range(&self, txn: &mut FlowTransaction, range: EncodedKeyRange) -> Result<u32> {
53		let prefixed_range = range.with_prefix(FlowNodeStateKey::new(self.id(), vec![]).encode());
54
55		let keys_to_remove = {
56			let mut stream = txn.range(prefixed_range, 1024);
57			let mut keys = Vec::new();
58			while let Some(result) = stream.next() {
59				let multi = result?;
60				keys.push(multi.key);
61			}
62			keys
63		};
64
65		let mut count = 0;
66		for key in keys_to_remove {
67			txn.remove(&key)?;
68			count += 1;
69		}
70
71		Ok(count as u32)
72	}
73}
74
75#[cfg(test)]
76pub mod tests {
77	use std::ops::Bound::{Excluded, Unbounded};
78
79	use reifydb_catalog::catalog::Catalog;
80	use reifydb_core::{
81		common::CommitVersion, interface::catalog::flow::FlowNodeId,
82		util::encoding::keycode::serializer::KeySerializer,
83	};
84	use reifydb_transaction::interceptor::interceptors::Interceptors;
85
86	use super::*;
87	use crate::{operator::stateful::test_utils::test::*, transaction::FlowTransaction};
88
89	/// Helper to create window keys from u64 for testing
90	/// Uses inverted encoding for proper ordering (smaller IDs produce larger keys)
91	fn test_window_key(window_id: u64) -> EncodedKey {
92		let mut serializer = KeySerializer::with_capacity(16);
93		serializer.extend_bytes(b"w:");
94		serializer.extend_u64(window_id);
95		EncodedKey::new(serializer.finish())
96	}
97
98	// Extend TestOperator to implement WindowStateful
99	impl WindowStateful for TestOperator {
100		fn layout(&self) -> RowSchema {
101			self.layout.clone()
102		}
103	}
104
105	#[test]
106	fn test_window_key_encoding() {
107		// Test different window IDs
108		let key1 = test_window_key(1);
109		let key2 = test_window_key(2);
110		let key100 = test_window_key(100);
111
112		// Keys should be different
113		assert_ne!(key1.as_ref(), key2.as_ref());
114		assert_ne!(key1.as_ref(), key100.as_ref());
115
116		// Due to inverted encoding, smaller window IDs produce larger keys
117		assert!(key1 > key2);
118		assert!(key2 > key100);
119	}
120
121	#[test]
122	fn test_create_state() {
123		let operator = TestOperator::simple(FlowNodeId(1));
124		let state = operator.create_state();
125
126		// State should be allocated based on layout
127		assert!(state.len() > 0);
128	}
129
130	#[test]
131	fn test_load_save_window_state() {
132		let mut txn = create_test_transaction();
133		let mut txn =
134			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
135		let operator = TestOperator::simple(FlowNodeId(1));
136		let window_key = test_window_key(42);
137
138		// Initially should create new state
139		let state1 = operator.load_state(&mut txn, &window_key).unwrap();
140
141		// Modify and save
142		let mut modified = state1.clone();
143		let layout = operator.layout();
144		layout.set_i64(&mut modified, 0, 0xAB);
145		operator.save_state(&mut txn, &window_key, modified.clone()).unwrap();
146
147		// Load should return modified state
148		let state2 = operator.load_state(&mut txn, &window_key).unwrap();
149		assert_eq!(layout.get_i64(&state2, 0), 0xAB);
150	}
151
152	#[test]
153	fn test_multiple_windows() {
154		let mut txn = create_test_transaction();
155		let mut txn =
156			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
157		let operator = TestOperator::simple(FlowNodeId(1));
158
159		// Create states for multiple windows
160		let window_keys: Vec<_> = (0..5).map(|i| test_window_key(i)).collect();
161		let layout = operator.layout();
162		for (i, window_key) in window_keys.iter().enumerate() {
163			let mut state = operator.create_state();
164			layout.set_i64(&mut state, 0, i as i64);
165			operator.save_state(&mut txn, window_key, state).unwrap();
166		}
167
168		// Verify each window has its own state
169		for (i, window_key) in window_keys.iter().enumerate() {
170			let state = operator.load_state(&mut txn, window_key).unwrap();
171			assert_eq!(layout.get_i64(&state, 0), i as i64);
172		}
173	}
174
175	#[test]
176	fn test_expire_before() {
177		let mut txn = create_test_transaction();
178		let mut txn =
179			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
180		let operator = TestOperator::simple(FlowNodeId(1));
181
182		// Create windows 0 through 9
183		let window_keys: Vec<_> = (0..10).map(|i| test_window_key(i)).collect();
184		let layout = operator.layout();
185		for (i, window_key) in window_keys.iter().enumerate() {
186			let mut state = operator.create_state();
187			layout.set_i64(&mut state, 0, i as i64);
188			operator.save_state(&mut txn, window_key, state).unwrap();
189		}
190
191		// Expire windows before 5 (should remove 0-4)
192		// Due to inverted encoding, windows with smaller IDs have larger keys
193		// So to expire windows < 5, we need range from key(5) to end
194		let before_key = test_window_key(5);
195		let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
196		let expired = operator.expire_range(&mut txn, range).unwrap();
197		assert_eq!(expired, 5);
198
199		// Verify windows 0-4 are gone
200		for i in 0..5 {
201			let state = operator.load_state(&mut txn, &window_keys[i]).unwrap();
202			assert_eq!(layout.get_i64(&state, 0), 0); // Should be newly created (default)
203		}
204
205		// Verify windows 5-9 still exist
206		for i in 5..10 {
207			let state = operator.load_state(&mut txn, &window_keys[i]).unwrap();
208			assert_eq!(layout.get_i64(&state, 0), i as i64);
209		}
210	}
211
212	#[test]
213	fn test_expire_empty_range() {
214		let mut txn = create_test_transaction();
215		let mut txn =
216			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
217		let operator = TestOperator::simple(FlowNodeId(1));
218
219		// Create windows 5 through 9
220		let window_keys: Vec<_> = (5..10).map(|i| test_window_key(i)).collect();
221		let layout = operator.layout();
222		for (idx, window_key) in window_keys.iter().enumerate() {
223			let mut state = operator.create_state();
224			layout.set_i64(&mut state, 0, (idx + 5) as i64);
225			operator.save_state(&mut txn, window_key, state).unwrap();
226		}
227
228		// Expire before 3 (should remove nothing since all windows are >= 5)
229		let before_key = test_window_key(3);
230		let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
231		let expired = operator.expire_range(&mut txn, range).unwrap();
232		assert_eq!(expired, 0);
233
234		// All windows should still exist
235		for (idx, window_key) in window_keys.iter().enumerate() {
236			let state = operator.load_state(&mut txn, window_key).unwrap();
237			assert_eq!(layout.get_i64(&state, 0), (idx + 5) as i64);
238		}
239	}
240
241	#[test]
242	fn test_expire_all() {
243		let mut txn = create_test_transaction();
244		let mut txn =
245			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
246		let operator = TestOperator::simple(FlowNodeId(1));
247
248		// Create windows 0 through 4
249		let window_keys: Vec<_> = (0..5).map(|i| test_window_key(i)).collect();
250		let layout = operator.layout();
251		for (i, window_key) in window_keys.iter().enumerate() {
252			let mut state = operator.create_state();
253			layout.set_i64(&mut state, 0, i as i64);
254			operator.save_state(&mut txn, window_key, state).unwrap();
255		}
256
257		// Expire before 100 (should remove all)
258		let before_key = test_window_key(100);
259		let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
260		let expired = operator.expire_range(&mut txn, range).unwrap();
261		assert_eq!(expired, 5);
262
263		// All windows should be gone
264		for window_key in &window_keys {
265			let state = operator.load_state(&mut txn, window_key).unwrap();
266			assert_eq!(layout.get_i64(&state, 0), 0); // Should be newly created (default)
267		}
268	}
269
270	#[test]
271	fn test_sliding_window_simulation() {
272		let mut txn = create_test_transaction();
273		let mut txn =
274			FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
275		let operator = TestOperator::new(FlowNodeId(1));
276
277		// Simulate a sliding window maintaining last 3 windows
278		let window_size = 3;
279		let mut all_window_keys = Vec::new();
280		let layout = operator.layout();
281
282		for current_window in 0..10 {
283			// Add data to current window
284			let window_key = test_window_key(current_window);
285			all_window_keys.push(window_key.clone());
286			let mut state = operator.create_state();
287			layout.set_i64(&mut state, 0, current_window as i64);
288			operator.save_state(&mut txn, &window_key, state).unwrap();
289
290			// Expire old windows
291			if current_window >= window_size {
292				let expire_before = current_window - window_size + 1;
293				let before_key = test_window_key(expire_before);
294				let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
295				operator.expire_range(&mut txn, range).unwrap();
296			}
297		}
298
299		// Only windows 7, 8, 9 should exist
300		for i in 0..7 {
301			let state = operator.load_state(&mut txn, &all_window_keys[i]).unwrap();
302			assert_eq!(layout.get_i64(&state, 0), 0); // Should be default (expired)
303		}
304
305		for i in 7..10 {
306			let state = operator.load_state(&mut txn, &all_window_keys[i]).unwrap();
307			assert_eq!(layout.get_i64(&state, 0), i as i64); // Should have saved data
308		}
309	}
310}