Skip to main content

reifydb_sub_flow/operator/stateful/
window.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3use reifydb_core::{
4	encoded::{
5		key::{EncodedKey, EncodedKeyRange},
6		row::EncodedRow,
7		shape::RowShape,
8	},
9	key::{EncodableKey, flow_node_state::FlowNodeStateKey},
10};
11use reifydb_type::Result;
12
13use super::utils;
14use crate::{operator::stateful::raw::RawStatefulOperator, transaction::FlowTransaction};
15
16/// Window-based state management for time or count-based windowing
17/// Extends TransformOperator directly and uses utility functions for state management
18pub trait WindowStateful: RawStatefulOperator {
19	/// Get or create the layout for state rows
20	fn layout(&self) -> RowShape;
21
22	/// Create a new state encoded with default values
23	fn create_state(&self) -> EncodedRow {
24		let layout = self.layout();
25		layout.allocate()
26	}
27
28	/// Load state for a window
29	fn load_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey) -> Result<EncodedRow> {
30		utils::load_or_create_row(self.id(), txn, window_key, &self.layout())
31	}
32
33	/// Save state for a window
34	fn save_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey, row: EncodedRow) -> Result<()> {
35		utils::save_row(self.id(), txn, window_key, row)
36	}
37
38	/// Scan keys within a range without removing them (read-only)
39	fn scan_keys_in_range(&self, txn: &mut FlowTransaction, range: &EncodedKeyRange) -> Result<Vec<EncodedKey>> {
40		let prefixed_range = range.clone().with_prefix(FlowNodeStateKey::new(self.id(), vec![]).encode());
41		let stream = txn.range(prefixed_range, 1024);
42		let mut keys = Vec::new();
43		for result in stream {
44			let multi = result?;
45			keys.push(EncodedKey::new(multi.key.to_vec()));
46		}
47		Ok(keys)
48	}
49
50	/// Expire windows within a given range
51	/// The range should be constructed by the caller based on their window ordering semantics
52	fn expire_range(&self, txn: &mut FlowTransaction, range: EncodedKeyRange) -> Result<u32> {
53		let prefixed_range = range.with_prefix(FlowNodeStateKey::new(self.id(), vec![]).encode());
54
55		let keys_to_remove = {
56			let stream = txn.range(prefixed_range, 1024);
57			let mut keys = Vec::new();
58			for result in stream {
59				let multi = result?;
60				keys.push(multi.key);
61			}
62			keys
63		};
64
65		let mut count = 0;
66		for key in keys_to_remove {
67			txn.remove(&key)?;
68			count += 1;
69		}
70
71		Ok(count as u32)
72	}
73}
74
75#[cfg(test)]
76pub mod tests {
77	use std::ops::Bound::{Excluded, Unbounded};
78
79	use reifydb_catalog::catalog::Catalog;
80	use reifydb_core::{
81		common::CommitVersion, interface::catalog::flow::FlowNodeId,
82		util::encoding::keycode::serializer::KeySerializer,
83	};
84	use reifydb_runtime::context::clock::{Clock, MockClock};
85	use reifydb_transaction::interceptor::interceptors::Interceptors;
86
87	use super::*;
88	use crate::{operator::stateful::test_utils::test::*, transaction::FlowTransaction};
89
90	/// Helper to create window keys from u64 for testing
91	/// Uses inverted encoding for proper ordering (smaller IDs produce larger keys)
92	fn test_window_key(window_id: u64) -> EncodedKey {
93		let mut serializer = KeySerializer::with_capacity(16);
94		serializer.extend_bytes(b"w:");
95		serializer.extend_u64(window_id);
96		EncodedKey::new(serializer.finish())
97	}
98
99	// Extend TestOperator to implement WindowStateful
100	impl WindowStateful for TestOperator {
101		fn layout(&self) -> RowShape {
102			self.layout.clone()
103		}
104	}
105
106	#[test]
107	fn test_window_key_encoding() {
108		// Test different window IDs
109		let key1 = test_window_key(1);
110		let key2 = test_window_key(2);
111		let key100 = test_window_key(100);
112
113		// Keys should be different
114		assert_ne!(key1.as_ref(), key2.as_ref());
115		assert_ne!(key1.as_ref(), key100.as_ref());
116
117		// Due to inverted encoding, smaller window IDs produce larger keys
118		assert!(key1 > key2);
119		assert!(key2 > key100);
120	}
121
122	#[test]
123	fn test_create_state() {
124		let operator = TestOperator::simple(FlowNodeId(1));
125		let state = operator.create_state();
126
127		// State should be allocated based on layout
128		assert!(state.len() > 0);
129	}
130
131	#[test]
132	fn test_load_save_window_state() {
133		let mut txn = create_test_transaction();
134		let mut txn = FlowTransaction::deferred(
135			&mut txn,
136			CommitVersion(1),
137			Catalog::testing(),
138			Interceptors::new(),
139			Clock::Mock(MockClock::from_millis(1000)),
140		);
141		let operator = TestOperator::simple(FlowNodeId(1));
142		let window_key = test_window_key(42);
143
144		// Initially should create new state
145		let state1 = operator.load_state(&mut txn, &window_key).unwrap();
146
147		// Modify and save
148		let mut modified = state1.clone();
149		let layout = operator.layout();
150		layout.set_i64(&mut modified, 0, 0xAB);
151		operator.save_state(&mut txn, &window_key, modified.clone()).unwrap();
152
153		// Load should return modified state
154		let state2 = operator.load_state(&mut txn, &window_key).unwrap();
155		assert_eq!(layout.get_i64(&state2, 0), 0xAB);
156	}
157
158	#[test]
159	fn test_multiple_windows() {
160		let mut txn = create_test_transaction();
161		let mut txn = FlowTransaction::deferred(
162			&mut txn,
163			CommitVersion(1),
164			Catalog::testing(),
165			Interceptors::new(),
166			Clock::Mock(MockClock::from_millis(1000)),
167		);
168		let operator = TestOperator::simple(FlowNodeId(1));
169
170		// Create states for multiple windows
171		let window_keys: Vec<_> = (0..5).map(|i| test_window_key(i)).collect();
172		let layout = operator.layout();
173		for (i, window_key) in window_keys.iter().enumerate() {
174			let mut state = operator.create_state();
175			layout.set_i64(&mut state, 0, i as i64);
176			operator.save_state(&mut txn, window_key, state).unwrap();
177		}
178
179		// Verify each window has its own state
180		for (i, window_key) in window_keys.iter().enumerate() {
181			let state = operator.load_state(&mut txn, window_key).unwrap();
182			assert_eq!(layout.get_i64(&state, 0), i as i64);
183		}
184	}
185
186	#[test]
187	fn test_expire_before() {
188		let mut txn = create_test_transaction();
189		let mut txn = FlowTransaction::deferred(
190			&mut txn,
191			CommitVersion(1),
192			Catalog::testing(),
193			Interceptors::new(),
194			Clock::Mock(MockClock::from_millis(1000)),
195		);
196		let operator = TestOperator::simple(FlowNodeId(1));
197
198		// Create windows 0 through 9
199		let window_keys: Vec<_> = (0..10).map(|i| test_window_key(i)).collect();
200		let layout = operator.layout();
201		for (i, window_key) in window_keys.iter().enumerate() {
202			let mut state = operator.create_state();
203			layout.set_i64(&mut state, 0, i as i64);
204			operator.save_state(&mut txn, window_key, state).unwrap();
205		}
206
207		// Expire windows before 5 (should remove 0-4)
208		// Due to inverted encoding, windows with smaller IDs have larger keys
209		// So to expire windows < 5, we need range from key(5) to end
210		let before_key = test_window_key(5);
211		let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
212		let expired = operator.expire_range(&mut txn, range).unwrap();
213		assert_eq!(expired, 5);
214
215		// Verify windows 0-4 are gone
216		for i in 0..5 {
217			let state = operator.load_state(&mut txn, &window_keys[i]).unwrap();
218			assert_eq!(layout.get_i64(&state, 0), 0); // Should be newly created (default)
219		}
220
221		// Verify windows 5-9 still exist
222		for i in 5..10 {
223			let state = operator.load_state(&mut txn, &window_keys[i]).unwrap();
224			assert_eq!(layout.get_i64(&state, 0), i as i64);
225		}
226	}
227
228	#[test]
229	fn test_expire_empty_range() {
230		let mut txn = create_test_transaction();
231		let mut txn = FlowTransaction::deferred(
232			&mut txn,
233			CommitVersion(1),
234			Catalog::testing(),
235			Interceptors::new(),
236			Clock::Mock(MockClock::from_millis(1000)),
237		);
238		let operator = TestOperator::simple(FlowNodeId(1));
239
240		// Create windows 5 through 9
241		let window_keys: Vec<_> = (5..10).map(|i| test_window_key(i)).collect();
242		let layout = operator.layout();
243		for (idx, window_key) in window_keys.iter().enumerate() {
244			let mut state = operator.create_state();
245			layout.set_i64(&mut state, 0, (idx + 5) as i64);
246			operator.save_state(&mut txn, window_key, state).unwrap();
247		}
248
249		// Expire before 3 (should remove nothing since all windows are >= 5)
250		let before_key = test_window_key(3);
251		let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
252		let expired = operator.expire_range(&mut txn, range).unwrap();
253		assert_eq!(expired, 0);
254
255		// All windows should still exist
256		for (idx, window_key) in window_keys.iter().enumerate() {
257			let state = operator.load_state(&mut txn, window_key).unwrap();
258			assert_eq!(layout.get_i64(&state, 0), (idx + 5) as i64);
259		}
260	}
261
262	#[test]
263	fn test_expire_all() {
264		let mut txn = create_test_transaction();
265		let mut txn = FlowTransaction::deferred(
266			&mut txn,
267			CommitVersion(1),
268			Catalog::testing(),
269			Interceptors::new(),
270			Clock::Mock(MockClock::from_millis(1000)),
271		);
272		let operator = TestOperator::simple(FlowNodeId(1));
273
274		// Create windows 0 through 4
275		let window_keys: Vec<_> = (0..5).map(|i| test_window_key(i)).collect();
276		let layout = operator.layout();
277		for (i, window_key) in window_keys.iter().enumerate() {
278			let mut state = operator.create_state();
279			layout.set_i64(&mut state, 0, i as i64);
280			operator.save_state(&mut txn, window_key, state).unwrap();
281		}
282
283		// Expire before 100 (should remove all)
284		let before_key = test_window_key(100);
285		let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
286		let expired = operator.expire_range(&mut txn, range).unwrap();
287		assert_eq!(expired, 5);
288
289		// All windows should be gone
290		for window_key in &window_keys {
291			let state = operator.load_state(&mut txn, window_key).unwrap();
292			assert_eq!(layout.get_i64(&state, 0), 0); // Should be newly created (default)
293		}
294	}
295
296	#[test]
297	fn test_sliding_window_simulation() {
298		let mut txn = create_test_transaction();
299		let mut txn = FlowTransaction::deferred(
300			&mut txn,
301			CommitVersion(1),
302			Catalog::testing(),
303			Interceptors::new(),
304			Clock::Mock(MockClock::from_millis(1000)),
305		);
306		let operator = TestOperator::new(FlowNodeId(1));
307
308		// Simulate a sliding window maintaining last 3 windows
309		let window_size = 3;
310		let mut all_window_keys = Vec::new();
311		let layout = operator.layout();
312
313		for current_window in 0..10 {
314			// Add data to current window
315			let window_key = test_window_key(current_window);
316			all_window_keys.push(window_key.clone());
317			let mut state = operator.create_state();
318			layout.set_i64(&mut state, 0, current_window as i64);
319			operator.save_state(&mut txn, &window_key, state).unwrap();
320
321			// Expire old windows
322			if current_window >= window_size {
323				let expire_before = current_window - window_size + 1;
324				let before_key = test_window_key(expire_before);
325				let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
326				operator.expire_range(&mut txn, range).unwrap();
327			}
328		}
329
330		// Only windows 7, 8, 9 should exist
331		for i in 0..7 {
332			let state = operator.load_state(&mut txn, &all_window_keys[i]).unwrap();
333			assert_eq!(layout.get_i64(&state, 0), 0); // Should be default (expired)
334		}
335
336		for i in 7..10 {
337			let state = operator.load_state(&mut txn, &all_window_keys[i]).unwrap();
338			assert_eq!(layout.get_i64(&state, 0), i as i64); // Should have saved data
339		}
340	}
341}