reifydb_sub_flow/operator/stateful/
window.rs1use reifydb_core::{
4 encoded::{
5 encoded::EncodedValues,
6 key::{EncodedKey, EncodedKeyRange},
7 schema::Schema,
8 },
9 key::{EncodableKey, flow_node_state::FlowNodeStateKey},
10};
11use reifydb_type::Result;
12
13use super::utils;
14use crate::{operator::stateful::raw::RawStatefulOperator, transaction::FlowTransaction};
15
16pub trait WindowStateful: RawStatefulOperator {
19 fn layout(&self) -> Schema;
21
22 fn create_state(&self) -> EncodedValues {
24 let layout = self.layout();
25 layout.allocate()
26 }
27
28 fn load_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey) -> Result<EncodedValues> {
30 utils::load_or_create_row(self.id(), txn, window_key, &self.layout())
31 }
32
33 fn save_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey, row: EncodedValues) -> Result<()> {
35 utils::save_row(self.id(), txn, window_key, row)
36 }
37
38 fn expire_range(&self, txn: &mut FlowTransaction, range: EncodedKeyRange) -> Result<u32> {
41 let prefixed_range = range.with_prefix(FlowNodeStateKey::new(self.id(), vec![]).encode());
43
44 let keys_to_remove = {
46 let mut stream = txn.range(prefixed_range, 1024);
47 let mut keys = Vec::new();
48 while let Some(result) = stream.next() {
49 let multi = result?;
50 keys.push(multi.key);
51 }
52 keys
53 };
54
55 let mut count = 0;
56 for key in keys_to_remove {
57 txn.remove(&key)?;
58 count += 1;
59 }
60
61 Ok(count as u32)
62 }
63}
64
65#[cfg(test)]
66pub mod tests {
67 use std::ops::Bound::{Excluded, Unbounded};
68
69 use reifydb_catalog::catalog::Catalog;
70 use reifydb_core::{
71 common::CommitVersion, interface::catalog::flow::FlowNodeId,
72 util::encoding::keycode::serializer::KeySerializer,
73 };
74 use reifydb_transaction::interceptor::interceptors::Interceptors;
75
76 use super::*;
77 use crate::{operator::stateful::test_utils::test::*, transaction::FlowTransaction};
78
79 fn test_window_key(window_id: u64) -> EncodedKey {
82 let mut serializer = KeySerializer::with_capacity(16);
83 serializer.extend_bytes(b"w:");
84 serializer.extend_u64(window_id);
85 EncodedKey::new(serializer.finish())
86 }
87
88 impl WindowStateful for TestOperator {
90 fn layout(&self) -> Schema {
91 self.layout.clone()
92 }
93 }
94
95 #[test]
96 fn test_window_key_encoding() {
97 let key1 = test_window_key(1);
99 let key2 = test_window_key(2);
100 let key100 = test_window_key(100);
101
102 assert_ne!(key1.as_ref(), key2.as_ref());
104 assert_ne!(key1.as_ref(), key100.as_ref());
105
106 assert!(key1 > key2);
108 assert!(key2 > key100);
109 }
110
111 #[test]
112 fn test_create_state() {
113 let operator = TestOperator::simple(FlowNodeId(1));
114 let state = operator.create_state();
115
116 assert!(state.len() > 0);
118 }
119
120 #[test]
121 fn test_load_save_window_state() {
122 let mut txn = create_test_transaction();
123 let mut txn =
124 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
125 let operator = TestOperator::simple(FlowNodeId(1));
126 let window_key = test_window_key(42);
127
128 let state1 = operator.load_state(&mut txn, &window_key).unwrap();
130
131 let mut modified = state1.clone();
133 let layout = operator.layout();
134 layout.set_i64(&mut modified, 0, 0xAB);
135 operator.save_state(&mut txn, &window_key, modified.clone()).unwrap();
136
137 let state2 = operator.load_state(&mut txn, &window_key).unwrap();
139 assert_eq!(layout.get_i64(&state2, 0), 0xAB);
140 }
141
142 #[test]
143 fn test_multiple_windows() {
144 let mut txn = create_test_transaction();
145 let mut txn =
146 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
147 let operator = TestOperator::simple(FlowNodeId(1));
148
149 let window_keys: Vec<_> = (0..5).map(|i| test_window_key(i)).collect();
151 let layout = operator.layout();
152 for (i, window_key) in window_keys.iter().enumerate() {
153 let mut state = operator.create_state();
154 layout.set_i64(&mut state, 0, i as i64);
155 operator.save_state(&mut txn, window_key, state).unwrap();
156 }
157
158 for (i, window_key) in window_keys.iter().enumerate() {
160 let state = operator.load_state(&mut txn, window_key).unwrap();
161 assert_eq!(layout.get_i64(&state, 0), i as i64);
162 }
163 }
164
165 #[test]
166 fn test_expire_before() {
167 let mut txn = create_test_transaction();
168 let mut txn =
169 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
170 let operator = TestOperator::simple(FlowNodeId(1));
171
172 let window_keys: Vec<_> = (0..10).map(|i| test_window_key(i)).collect();
174 let layout = operator.layout();
175 for (i, window_key) in window_keys.iter().enumerate() {
176 let mut state = operator.create_state();
177 layout.set_i64(&mut state, 0, i as i64);
178 operator.save_state(&mut txn, window_key, state).unwrap();
179 }
180
181 let before_key = test_window_key(5);
185 let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
186 let expired = operator.expire_range(&mut txn, range).unwrap();
187 assert_eq!(expired, 5);
188
189 for i in 0..5 {
191 let state = operator.load_state(&mut txn, &window_keys[i]).unwrap();
192 assert_eq!(layout.get_i64(&state, 0), 0); }
194
195 for i in 5..10 {
197 let state = operator.load_state(&mut txn, &window_keys[i]).unwrap();
198 assert_eq!(layout.get_i64(&state, 0), i as i64);
199 }
200 }
201
202 #[test]
203 fn test_expire_empty_range() {
204 let mut txn = create_test_transaction();
205 let mut txn =
206 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
207 let operator = TestOperator::simple(FlowNodeId(1));
208
209 let window_keys: Vec<_> = (5..10).map(|i| test_window_key(i)).collect();
211 let layout = operator.layout();
212 for (idx, window_key) in window_keys.iter().enumerate() {
213 let mut state = operator.create_state();
214 layout.set_i64(&mut state, 0, (idx + 5) as i64);
215 operator.save_state(&mut txn, window_key, state).unwrap();
216 }
217
218 let before_key = test_window_key(3);
220 let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
221 let expired = operator.expire_range(&mut txn, range).unwrap();
222 assert_eq!(expired, 0);
223
224 for (idx, window_key) in window_keys.iter().enumerate() {
226 let state = operator.load_state(&mut txn, window_key).unwrap();
227 assert_eq!(layout.get_i64(&state, 0), (idx + 5) as i64);
228 }
229 }
230
231 #[test]
232 fn test_expire_all() {
233 let mut txn = create_test_transaction();
234 let mut txn =
235 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
236 let operator = TestOperator::simple(FlowNodeId(1));
237
238 let window_keys: Vec<_> = (0..5).map(|i| test_window_key(i)).collect();
240 let layout = operator.layout();
241 for (i, window_key) in window_keys.iter().enumerate() {
242 let mut state = operator.create_state();
243 layout.set_i64(&mut state, 0, i as i64);
244 operator.save_state(&mut txn, window_key, state).unwrap();
245 }
246
247 let before_key = test_window_key(100);
249 let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
250 let expired = operator.expire_range(&mut txn, range).unwrap();
251 assert_eq!(expired, 5);
252
253 for window_key in &window_keys {
255 let state = operator.load_state(&mut txn, window_key).unwrap();
256 assert_eq!(layout.get_i64(&state, 0), 0); }
258 }
259
260 #[test]
261 fn test_sliding_window_simulation() {
262 let mut txn = create_test_transaction();
263 let mut txn =
264 FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
265 let operator = TestOperator::new(FlowNodeId(1));
266
267 let window_size = 3;
269 let mut all_window_keys = Vec::new();
270 let layout = operator.layout();
271
272 for current_window in 0..10 {
273 let window_key = test_window_key(current_window);
275 all_window_keys.push(window_key.clone());
276 let mut state = operator.create_state();
277 layout.set_i64(&mut state, 0, current_window as i64);
278 operator.save_state(&mut txn, &window_key, state).unwrap();
279
280 if current_window >= window_size {
282 let expire_before = current_window - window_size + 1;
283 let before_key = test_window_key(expire_before);
284 let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
285 operator.expire_range(&mut txn, range).unwrap();
286 }
287 }
288
289 for i in 0..7 {
291 let state = operator.load_state(&mut txn, &all_window_keys[i]).unwrap();
292 assert_eq!(layout.get_i64(&state, 0), 0); }
294
295 for i in 7..10 {
296 let state = operator.load_state(&mut txn, &all_window_keys[i]).unwrap();
297 assert_eq!(layout.get_i64(&state, 0), i as i64); }
299 }
300}