reifydb_sub_flow/operator/stateful/
window.rs1use reifydb_core::{
4 encoded::{
5 key::{EncodedKey, EncodedKeyRange},
6 row::EncodedRow,
7 shape::RowShape,
8 },
9 key::{EncodableKey, flow_node_state::FlowNodeStateKey},
10};
11use reifydb_type::Result;
12
13use super::utils;
14use crate::{operator::stateful::raw::RawStatefulOperator, transaction::FlowTransaction};
15
16pub trait WindowStateful: RawStatefulOperator {
19 fn layout(&self) -> RowShape;
21
22 fn create_state(&self) -> EncodedRow {
24 let layout = self.layout();
25 layout.allocate()
26 }
27
28 fn load_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey) -> Result<EncodedRow> {
30 utils::load_or_create_row(self.id(), txn, window_key, &self.layout())
31 }
32
33 fn save_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey, row: EncodedRow) -> Result<()> {
35 utils::save_row(self.id(), txn, window_key, row)
36 }
37
38 fn scan_keys_in_range(&self, txn: &mut FlowTransaction, range: &EncodedKeyRange) -> Result<Vec<EncodedKey>> {
40 let prefixed_range = range.clone().with_prefix(FlowNodeStateKey::new(self.id(), vec![]).encode());
41 let stream = txn.range(prefixed_range, 1024);
42 let mut keys = Vec::new();
43 for result in stream {
44 let multi = result?;
45 keys.push(EncodedKey::new(multi.key.to_vec()));
46 }
47 Ok(keys)
48 }
49
50 fn expire_range(&self, txn: &mut FlowTransaction, range: EncodedKeyRange) -> Result<u32> {
53 let prefixed_range = range.with_prefix(FlowNodeStateKey::new(self.id(), vec![]).encode());
54
55 let keys_to_remove = {
56 let stream = txn.range(prefixed_range, 1024);
57 let mut keys = Vec::new();
58 for result in stream {
59 let multi = result?;
60 keys.push(multi.key);
61 }
62 keys
63 };
64
65 let mut count = 0;
66 for key in keys_to_remove {
67 txn.remove(&key)?;
68 count += 1;
69 }
70
71 Ok(count as u32)
72 }
73}
74
75#[cfg(test)]
76pub mod tests {
77 use std::ops::Bound::{Excluded, Unbounded};
78
79 use reifydb_catalog::catalog::Catalog;
80 use reifydb_core::{
81 common::CommitVersion, interface::catalog::flow::FlowNodeId,
82 util::encoding::keycode::serializer::KeySerializer,
83 };
84 use reifydb_runtime::context::clock::{Clock, MockClock};
85 use reifydb_transaction::interceptor::interceptors::Interceptors;
86
87 use super::*;
88 use crate::{operator::stateful::test_utils::test::*, transaction::FlowTransaction};
89
90 fn test_window_key(window_id: u64) -> EncodedKey {
93 let mut serializer = KeySerializer::with_capacity(16);
94 serializer.extend_bytes(b"w:");
95 serializer.extend_u64(window_id);
96 EncodedKey::new(serializer.finish())
97 }
98
99 impl WindowStateful for TestOperator {
101 fn layout(&self) -> RowShape {
102 self.layout.clone()
103 }
104 }
105
106 #[test]
107 fn test_window_key_encoding() {
108 let key1 = test_window_key(1);
110 let key2 = test_window_key(2);
111 let key100 = test_window_key(100);
112
113 assert_ne!(key1.as_ref(), key2.as_ref());
115 assert_ne!(key1.as_ref(), key100.as_ref());
116
117 assert!(key1 > key2);
119 assert!(key2 > key100);
120 }
121
122 #[test]
123 fn test_create_state() {
124 let operator = TestOperator::simple(FlowNodeId(1));
125 let state = operator.create_state();
126
127 assert!(state.len() > 0);
129 }
130
131 #[test]
132 fn test_load_save_window_state() {
133 let mut txn = create_test_transaction();
134 let mut txn = FlowTransaction::deferred(
135 &mut txn,
136 CommitVersion(1),
137 Catalog::testing(),
138 Interceptors::new(),
139 Clock::Mock(MockClock::from_millis(1000)),
140 );
141 let operator = TestOperator::simple(FlowNodeId(1));
142 let window_key = test_window_key(42);
143
144 let state1 = operator.load_state(&mut txn, &window_key).unwrap();
146
147 let mut modified = state1.clone();
149 let layout = operator.layout();
150 layout.set_i64(&mut modified, 0, 0xAB);
151 operator.save_state(&mut txn, &window_key, modified.clone()).unwrap();
152
153 let state2 = operator.load_state(&mut txn, &window_key).unwrap();
155 assert_eq!(layout.get_i64(&state2, 0), 0xAB);
156 }
157
158 #[test]
159 fn test_multiple_windows() {
160 let mut txn = create_test_transaction();
161 let mut txn = FlowTransaction::deferred(
162 &mut txn,
163 CommitVersion(1),
164 Catalog::testing(),
165 Interceptors::new(),
166 Clock::Mock(MockClock::from_millis(1000)),
167 );
168 let operator = TestOperator::simple(FlowNodeId(1));
169
170 let window_keys: Vec<_> = (0..5).map(|i| test_window_key(i)).collect();
172 let layout = operator.layout();
173 for (i, window_key) in window_keys.iter().enumerate() {
174 let mut state = operator.create_state();
175 layout.set_i64(&mut state, 0, i as i64);
176 operator.save_state(&mut txn, window_key, state).unwrap();
177 }
178
179 for (i, window_key) in window_keys.iter().enumerate() {
181 let state = operator.load_state(&mut txn, window_key).unwrap();
182 assert_eq!(layout.get_i64(&state, 0), i as i64);
183 }
184 }
185
186 #[test]
187 fn test_expire_before() {
188 let mut txn = create_test_transaction();
189 let mut txn = FlowTransaction::deferred(
190 &mut txn,
191 CommitVersion(1),
192 Catalog::testing(),
193 Interceptors::new(),
194 Clock::Mock(MockClock::from_millis(1000)),
195 );
196 let operator = TestOperator::simple(FlowNodeId(1));
197
198 let window_keys: Vec<_> = (0..10).map(|i| test_window_key(i)).collect();
200 let layout = operator.layout();
201 for (i, window_key) in window_keys.iter().enumerate() {
202 let mut state = operator.create_state();
203 layout.set_i64(&mut state, 0, i as i64);
204 operator.save_state(&mut txn, window_key, state).unwrap();
205 }
206
207 let before_key = test_window_key(5);
211 let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
212 let expired = operator.expire_range(&mut txn, range).unwrap();
213 assert_eq!(expired, 5);
214
215 for i in 0..5 {
217 let state = operator.load_state(&mut txn, &window_keys[i]).unwrap();
218 assert_eq!(layout.get_i64(&state, 0), 0); }
220
221 for i in 5..10 {
223 let state = operator.load_state(&mut txn, &window_keys[i]).unwrap();
224 assert_eq!(layout.get_i64(&state, 0), i as i64);
225 }
226 }
227
228 #[test]
229 fn test_expire_empty_range() {
230 let mut txn = create_test_transaction();
231 let mut txn = FlowTransaction::deferred(
232 &mut txn,
233 CommitVersion(1),
234 Catalog::testing(),
235 Interceptors::new(),
236 Clock::Mock(MockClock::from_millis(1000)),
237 );
238 let operator = TestOperator::simple(FlowNodeId(1));
239
240 let window_keys: Vec<_> = (5..10).map(|i| test_window_key(i)).collect();
242 let layout = operator.layout();
243 for (idx, window_key) in window_keys.iter().enumerate() {
244 let mut state = operator.create_state();
245 layout.set_i64(&mut state, 0, (idx + 5) as i64);
246 operator.save_state(&mut txn, window_key, state).unwrap();
247 }
248
249 let before_key = test_window_key(3);
251 let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
252 let expired = operator.expire_range(&mut txn, range).unwrap();
253 assert_eq!(expired, 0);
254
255 for (idx, window_key) in window_keys.iter().enumerate() {
257 let state = operator.load_state(&mut txn, window_key).unwrap();
258 assert_eq!(layout.get_i64(&state, 0), (idx + 5) as i64);
259 }
260 }
261
262 #[test]
263 fn test_expire_all() {
264 let mut txn = create_test_transaction();
265 let mut txn = FlowTransaction::deferred(
266 &mut txn,
267 CommitVersion(1),
268 Catalog::testing(),
269 Interceptors::new(),
270 Clock::Mock(MockClock::from_millis(1000)),
271 );
272 let operator = TestOperator::simple(FlowNodeId(1));
273
274 let window_keys: Vec<_> = (0..5).map(|i| test_window_key(i)).collect();
276 let layout = operator.layout();
277 for (i, window_key) in window_keys.iter().enumerate() {
278 let mut state = operator.create_state();
279 layout.set_i64(&mut state, 0, i as i64);
280 operator.save_state(&mut txn, window_key, state).unwrap();
281 }
282
283 let before_key = test_window_key(100);
285 let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
286 let expired = operator.expire_range(&mut txn, range).unwrap();
287 assert_eq!(expired, 5);
288
289 for window_key in &window_keys {
291 let state = operator.load_state(&mut txn, window_key).unwrap();
292 assert_eq!(layout.get_i64(&state, 0), 0); }
294 }
295
296 #[test]
297 fn test_sliding_window_simulation() {
298 let mut txn = create_test_transaction();
299 let mut txn = FlowTransaction::deferred(
300 &mut txn,
301 CommitVersion(1),
302 Catalog::testing(),
303 Interceptors::new(),
304 Clock::Mock(MockClock::from_millis(1000)),
305 );
306 let operator = TestOperator::new(FlowNodeId(1));
307
308 let window_size = 3;
310 let mut all_window_keys = Vec::new();
311 let layout = operator.layout();
312
313 for current_window in 0..10 {
314 let window_key = test_window_key(current_window);
316 all_window_keys.push(window_key.clone());
317 let mut state = operator.create_state();
318 layout.set_i64(&mut state, 0, current_window as i64);
319 operator.save_state(&mut txn, &window_key, state).unwrap();
320
321 if current_window >= window_size {
323 let expire_before = current_window - window_size + 1;
324 let before_key = test_window_key(expire_before);
325 let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
326 operator.expire_range(&mut txn, range).unwrap();
327 }
328 }
329
330 for i in 0..7 {
332 let state = operator.load_state(&mut txn, &all_window_keys[i]).unwrap();
333 assert_eq!(layout.get_i64(&state, 0), 0); }
335
336 for i in 7..10 {
337 let state = operator.load_state(&mut txn, &all_window_keys[i]).unwrap();
338 assert_eq!(layout.get_i64(&state, 0), i as i64); }
340 }
341}