reifydb_sub_flow/operator/stateful/
window.rs1use reifydb_core::{
4 encoded::{
5 key::{EncodedKey, EncodedKeyRange},
6 row::EncodedRow,
7 schema::RowSchema,
8 },
9 key::{EncodableKey, flow_node_state::FlowNodeStateKey},
10};
11use reifydb_type::Result;
12
13use super::utils;
14use crate::{operator::stateful::raw::RawStatefulOperator, transaction::FlowTransaction};
15
16pub trait WindowStateful: RawStatefulOperator {
19 fn layout(&self) -> RowSchema;
21
22 fn create_state(&self) -> EncodedRow {
24 let layout = self.layout();
25 layout.allocate()
26 }
27
28 fn load_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey) -> Result<EncodedRow> {
30 utils::load_or_create_row(self.id(), txn, window_key, &self.layout())
31 }
32
33 fn save_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey, row: EncodedRow) -> Result<()> {
35 utils::save_row(self.id(), txn, window_key, row)
36 }
37
38 fn scan_keys_in_range(&self, txn: &mut FlowTransaction, range: &EncodedKeyRange) -> Result<Vec<EncodedKey>> {
40 let prefixed_range = range.clone().with_prefix(FlowNodeStateKey::new(self.id(), vec![]).encode());
41 let mut stream = txn.range(prefixed_range, 1024);
42 let mut keys = Vec::new();
43 while let Some(result) = stream.next() {
44 let multi = result?;
45 keys.push(EncodedKey::new(multi.key.to_vec()));
46 }
47 Ok(keys)
48 }
49
50 fn expire_range(&self, txn: &mut FlowTransaction, range: EncodedKeyRange) -> Result<u32> {
53 let prefixed_range = range.with_prefix(FlowNodeStateKey::new(self.id(), vec![]).encode());
54
55 let keys_to_remove = {
56 let mut stream = txn.range(prefixed_range, 1024);
57 let mut keys = Vec::new();
58 while let Some(result) = stream.next() {
59 let multi = result?;
60 keys.push(multi.key);
61 }
62 keys
63 };
64
65 let mut count = 0;
66 for key in keys_to_remove {
67 txn.remove(&key)?;
68 count += 1;
69 }
70
71 Ok(count as u32)
72 }
73}
74
75#[cfg(test)]
76pub mod tests {
77 use std::ops::Bound::{Excluded, Unbounded};
78
79 use reifydb_catalog::catalog::Catalog;
80 use reifydb_core::{
81 common::CommitVersion, interface::catalog::flow::FlowNodeId,
82 util::encoding::keycode::serializer::KeySerializer,
83 };
84 use reifydb_transaction::interceptor::interceptors::Interceptors;
85
86 use super::*;
87 use crate::{operator::stateful::test_utils::test::*, transaction::FlowTransaction};
88
89 fn test_window_key(window_id: u64) -> EncodedKey {
92 let mut serializer = KeySerializer::with_capacity(16);
93 serializer.extend_bytes(b"w:");
94 serializer.extend_u64(window_id);
95 EncodedKey::new(serializer.finish())
96 }
97
98 impl WindowStateful for TestOperator {
100 fn layout(&self) -> RowSchema {
101 self.layout.clone()
102 }
103 }
104
105 #[test]
106 fn test_window_key_encoding() {
107 let key1 = test_window_key(1);
109 let key2 = test_window_key(2);
110 let key100 = test_window_key(100);
111
112 assert_ne!(key1.as_ref(), key2.as_ref());
114 assert_ne!(key1.as_ref(), key100.as_ref());
115
116 assert!(key1 > key2);
118 assert!(key2 > key100);
119 }
120
121 #[test]
122 fn test_create_state() {
123 let operator = TestOperator::simple(FlowNodeId(1));
124 let state = operator.create_state();
125
126 assert!(state.len() > 0);
128 }
129
130 #[test]
131 fn test_load_save_window_state() {
132 let mut txn = create_test_transaction();
133 let mut txn =
134 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
135 let operator = TestOperator::simple(FlowNodeId(1));
136 let window_key = test_window_key(42);
137
138 let state1 = operator.load_state(&mut txn, &window_key).unwrap();
140
141 let mut modified = state1.clone();
143 let layout = operator.layout();
144 layout.set_i64(&mut modified, 0, 0xAB);
145 operator.save_state(&mut txn, &window_key, modified.clone()).unwrap();
146
147 let state2 = operator.load_state(&mut txn, &window_key).unwrap();
149 assert_eq!(layout.get_i64(&state2, 0), 0xAB);
150 }
151
152 #[test]
153 fn test_multiple_windows() {
154 let mut txn = create_test_transaction();
155 let mut txn =
156 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
157 let operator = TestOperator::simple(FlowNodeId(1));
158
159 let window_keys: Vec<_> = (0..5).map(|i| test_window_key(i)).collect();
161 let layout = operator.layout();
162 for (i, window_key) in window_keys.iter().enumerate() {
163 let mut state = operator.create_state();
164 layout.set_i64(&mut state, 0, i as i64);
165 operator.save_state(&mut txn, window_key, state).unwrap();
166 }
167
168 for (i, window_key) in window_keys.iter().enumerate() {
170 let state = operator.load_state(&mut txn, window_key).unwrap();
171 assert_eq!(layout.get_i64(&state, 0), i as i64);
172 }
173 }
174
175 #[test]
176 fn test_expire_before() {
177 let mut txn = create_test_transaction();
178 let mut txn =
179 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
180 let operator = TestOperator::simple(FlowNodeId(1));
181
182 let window_keys: Vec<_> = (0..10).map(|i| test_window_key(i)).collect();
184 let layout = operator.layout();
185 for (i, window_key) in window_keys.iter().enumerate() {
186 let mut state = operator.create_state();
187 layout.set_i64(&mut state, 0, i as i64);
188 operator.save_state(&mut txn, window_key, state).unwrap();
189 }
190
191 let before_key = test_window_key(5);
195 let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
196 let expired = operator.expire_range(&mut txn, range).unwrap();
197 assert_eq!(expired, 5);
198
199 for i in 0..5 {
201 let state = operator.load_state(&mut txn, &window_keys[i]).unwrap();
202 assert_eq!(layout.get_i64(&state, 0), 0); }
204
205 for i in 5..10 {
207 let state = operator.load_state(&mut txn, &window_keys[i]).unwrap();
208 assert_eq!(layout.get_i64(&state, 0), i as i64);
209 }
210 }
211
212 #[test]
213 fn test_expire_empty_range() {
214 let mut txn = create_test_transaction();
215 let mut txn =
216 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
217 let operator = TestOperator::simple(FlowNodeId(1));
218
219 let window_keys: Vec<_> = (5..10).map(|i| test_window_key(i)).collect();
221 let layout = operator.layout();
222 for (idx, window_key) in window_keys.iter().enumerate() {
223 let mut state = operator.create_state();
224 layout.set_i64(&mut state, 0, (idx + 5) as i64);
225 operator.save_state(&mut txn, window_key, state).unwrap();
226 }
227
228 let before_key = test_window_key(3);
230 let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
231 let expired = operator.expire_range(&mut txn, range).unwrap();
232 assert_eq!(expired, 0);
233
234 for (idx, window_key) in window_keys.iter().enumerate() {
236 let state = operator.load_state(&mut txn, window_key).unwrap();
237 assert_eq!(layout.get_i64(&state, 0), (idx + 5) as i64);
238 }
239 }
240
241 #[test]
242 fn test_expire_all() {
243 let mut txn = create_test_transaction();
244 let mut txn =
245 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
246 let operator = TestOperator::simple(FlowNodeId(1));
247
248 let window_keys: Vec<_> = (0..5).map(|i| test_window_key(i)).collect();
250 let layout = operator.layout();
251 for (i, window_key) in window_keys.iter().enumerate() {
252 let mut state = operator.create_state();
253 layout.set_i64(&mut state, 0, i as i64);
254 operator.save_state(&mut txn, window_key, state).unwrap();
255 }
256
257 let before_key = test_window_key(100);
259 let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
260 let expired = operator.expire_range(&mut txn, range).unwrap();
261 assert_eq!(expired, 5);
262
263 for window_key in &window_keys {
265 let state = operator.load_state(&mut txn, window_key).unwrap();
266 assert_eq!(layout.get_i64(&state, 0), 0); }
268 }
269
270 #[test]
271 fn test_sliding_window_simulation() {
272 let mut txn = create_test_transaction();
273 let mut txn =
274 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
275 let operator = TestOperator::new(FlowNodeId(1));
276
277 let window_size = 3;
279 let mut all_window_keys = Vec::new();
280 let layout = operator.layout();
281
282 for current_window in 0..10 {
283 let window_key = test_window_key(current_window);
285 all_window_keys.push(window_key.clone());
286 let mut state = operator.create_state();
287 layout.set_i64(&mut state, 0, current_window as i64);
288 operator.save_state(&mut txn, &window_key, state).unwrap();
289
290 if current_window >= window_size {
292 let expire_before = current_window - window_size + 1;
293 let before_key = test_window_key(expire_before);
294 let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
295 operator.expire_range(&mut txn, range).unwrap();
296 }
297 }
298
299 for i in 0..7 {
301 let state = operator.load_state(&mut txn, &all_window_keys[i]).unwrap();
302 assert_eq!(layout.get_i64(&state, 0), 0); }
304
305 for i in 7..10 {
306 let state = operator.load_state(&mut txn, &all_window_keys[i]).unwrap();
307 assert_eq!(layout.get_i64(&state, 0), i as i64); }
309 }
310}