reifydb_flow_operator_sdk/testing/
stateful.rs1use std::collections::HashMap;
5
6use reifydb_core::{
7 CowVec,
8 value::encoded::{EncodedKey, EncodedValues, EncodedValuesLayout, IntoEncodedKey},
9};
10use reifydb_type::{Type, Value};
11
12pub struct SingleStatefulTestHelper {
14 layout: EncodedValuesLayout,
15 state: Option<Vec<u8>>,
16}
17
18impl SingleStatefulTestHelper {
19 pub fn new(layout: EncodedValuesLayout) -> Self {
21 Self {
22 layout,
23 state: None,
24 }
25 }
26
27 pub fn counter() -> Self {
29 Self::new(EncodedValuesLayout::new(&[Type::Int8]))
30 }
31
32 pub fn set_state(&mut self, values: &[Value]) {
34 let mut encoded = self.layout.allocate();
35 self.layout.set_values(&mut encoded, values);
36 self.state = Some(encoded.0.to_vec());
37 }
38
39 pub fn get_state(&self) -> Option<Vec<Value>> {
41 self.state.as_ref().map(|bytes| {
42 let encoded = EncodedValues(CowVec::new(bytes.clone()));
43 super::helpers::get_values(&self.layout, &encoded)
44 })
45 }
46
47 pub fn assert_state(&self, expected: &[Value]) {
49 let actual = self.get_state().expect("No state set");
50 assert_eq!(actual, expected, "State mismatch");
51 }
52
53 pub fn clear(&mut self) {
55 self.state = None;
56 }
57
58 pub fn has_state(&self) -> bool {
60 self.state.is_some()
61 }
62}
63
64pub struct KeyedStatefulTestHelper {
66 layout: EncodedValuesLayout,
67 states: HashMap<EncodedKey, EncodedValues>,
68}
69
70impl KeyedStatefulTestHelper {
71 pub fn new(layout: EncodedValuesLayout) -> Self {
73 Self {
74 layout,
75 states: HashMap::new(),
76 }
77 }
78
79 pub fn counter() -> Self {
81 Self::new(EncodedValuesLayout::new(&[Type::Int8]))
82 }
83
84 pub fn sum() -> Self {
86 Self::new(EncodedValuesLayout::new(&[Type::Int4]))
87 }
88
89 pub fn set_state<K>(&mut self, key: K, values: &[Value])
91 where
92 K: IntoEncodedKey,
93 {
94 let mut encoded = self.layout.allocate();
95 self.layout.set_values(&mut encoded, values);
96 self.states.insert(key.into_encoded_key(), encoded);
97 }
98
99 pub fn get_state<K>(&self, key: K) -> Option<Vec<Value>>
101 where
102 K: IntoEncodedKey,
103 {
104 self.states
105 .get(&key.into_encoded_key())
106 .map(|encoded| super::helpers::get_values(&self.layout, encoded))
107 }
108
109 pub fn assert_state<K>(&self, key: K, expected: &[Value])
111 where
112 K: IntoEncodedKey,
113 {
114 let key_encoded = key.into_encoded_key();
115 let actual = self
116 .states
117 .get(&key_encoded)
118 .map(|encoded| super::helpers::get_values(&self.layout, encoded))
119 .expect("No state for key");
120 assert_eq!(actual, expected, "State mismatch for key");
121 }
122
123 pub fn remove_state<K>(&mut self, key: K) -> Option<Vec<Value>>
125 where
126 K: IntoEncodedKey,
127 {
128 self.states
129 .remove(&key.into_encoded_key())
130 .map(|encoded| super::helpers::get_values(&self.layout, &encoded))
131 }
132
133 pub fn has_state<K>(&self, key: K) -> bool
135 where
136 K: IntoEncodedKey,
137 {
138 self.states.contains_key(&key.into_encoded_key())
139 }
140
141 pub fn state_count(&self) -> usize {
143 self.states.len()
144 }
145
146 pub fn clear(&mut self) {
148 self.states.clear();
149 }
150
151 pub fn keys(&self) -> Vec<&EncodedKey> {
153 self.states.keys().collect()
154 }
155
156 pub fn assert_count(&self, expected: usize) {
158 assert_eq!(self.state_count(), expected, "Expected {} states, found {}", expected, self.state_count());
159 }
160}
161
162pub struct WindowStatefulTestHelper {
164 layout: EncodedValuesLayout,
165 windows: HashMap<i64, HashMap<EncodedKey, EncodedValues>>, window_size: i64,
167}
168
169impl WindowStatefulTestHelper {
170 pub fn new(layout: EncodedValuesLayout, window_size: i64) -> Self {
172 Self {
173 layout,
174 windows: HashMap::new(),
175 window_size,
176 }
177 }
178
179 pub fn time_window_counter(window_size_seconds: i64) -> Self {
181 Self::new(EncodedValuesLayout::new(&[Type::Int8]), window_size_seconds)
182 }
183
184 pub fn count_window_sum(window_size_count: i64) -> Self {
186 Self::new(EncodedValuesLayout::new(&[Type::Int4]), window_size_count)
187 }
188
189 pub fn set_window_state<K>(&mut self, window_id: i64, key: K, values: &[Value])
191 where
192 K: IntoEncodedKey,
193 {
194 let mut encoded = self.layout.allocate();
195 self.layout.set_values(&mut encoded, values);
196
197 self.windows.entry(window_id).or_insert_with(HashMap::new).insert(key.into_encoded_key(), encoded);
198 }
199
200 pub fn get_window_state<K>(&self, window_id: i64, key: K) -> Option<Vec<Value>>
202 where
203 K: IntoEncodedKey,
204 {
205 self.windows
206 .get(&window_id)
207 .and_then(|window| window.get(&key.into_encoded_key()))
208 .map(|encoded| super::helpers::get_values(&self.layout, encoded))
209 }
210
211 pub fn assert_window_state<K>(&self, window_id: i64, key: K, expected: &[Value])
213 where
214 K: IntoEncodedKey,
215 {
216 let key_encoded = key.into_encoded_key();
217 let actual = self
218 .windows
219 .get(&window_id)
220 .and_then(|window| window.get(&key_encoded))
221 .map(|encoded| super::helpers::get_values(&self.layout, encoded))
222 .expect("No state for window and key");
223 assert_eq!(actual, expected, "State mismatch for window {} and key", window_id);
224 }
225
226 pub fn get_window(&self, window_id: i64) -> Option<&HashMap<EncodedKey, EncodedValues>> {
228 self.windows.get(&window_id)
229 }
230
231 pub fn remove_window(&mut self, window_id: i64) -> Option<HashMap<EncodedKey, EncodedValues>> {
233 self.windows.remove(&window_id)
234 }
235
236 pub fn has_window(&self, window_id: i64) -> bool {
238 self.windows.contains_key(&window_id)
239 }
240
241 pub fn window_count(&self) -> usize {
243 self.windows.len()
244 }
245
246 pub fn window_key_count(&self, window_id: i64) -> usize {
248 self.windows.get(&window_id).map(|w| w.len()).unwrap_or(0)
249 }
250
251 pub fn clear(&mut self) {
253 self.windows.clear();
254 }
255
256 pub fn window_ids(&self) -> Vec<i64> {
258 let mut ids: Vec<_> = self.windows.keys().copied().collect();
259 ids.sort();
260 ids
261 }
262
263 pub fn assert_window_count(&self, expected: usize) {
265 assert_eq!(
266 self.window_count(),
267 expected,
268 "Expected {} windows, found {}",
269 expected,
270 self.window_count()
271 );
272 }
273
274 pub fn window_for_timestamp(&self, timestamp: i64) -> i64 {
276 timestamp / self.window_size
277 }
278}
279
280pub mod scenarios {
282 use super::*;
283 use crate::testing::builders::TestFlowChangeBuilder;
284
285 pub fn counter_inserts(count: usize) -> Vec<crate::FlowChange> {
287 use reifydb_type::RowNumber;
288 (0..count)
289 .map(|i| {
290 TestFlowChangeBuilder::new()
291 .insert_row(RowNumber(i as u64), vec![Value::Int8(1i64)])
292 .build()
293 })
294 .collect()
295 }
296
297 pub fn grouped_inserts(groups: &[(&str, i32)]) -> crate::FlowChange {
299 use reifydb_type::RowNumber;
300 let mut builder = TestFlowChangeBuilder::new();
301 for (i, (key, value)) in groups.iter().enumerate() {
302 builder = builder
303 .insert_row(RowNumber(i as u64), vec![Value::Utf8((*key).into()), Value::Int4(*value)]);
304 }
305 builder.build()
306 }
307
308 pub fn state_updates(row_number: i64, old_value: i8, new_value: i8) -> crate::FlowChange {
310 use reifydb_type::RowNumber;
311 TestFlowChangeBuilder::new()
312 .update_row(
313 RowNumber(row_number as u64),
314 vec![Value::Int8(old_value as i64)],
315 vec![Value::Int8(new_value as i64)],
316 )
317 .build()
318 }
319
320 pub fn windowed_events(
322 window_size: i64,
323 events_per_window: usize,
324 windows: usize,
325 ) -> Vec<(i64, crate::FlowChange)> {
326 use reifydb_type::RowNumber;
327 let mut result = Vec::new();
328
329 for window in 0..windows {
330 let base_time = window as i64 * window_size;
331
332 for event in 0..events_per_window {
333 let timestamp = base_time + (event as i64 * (window_size / events_per_window as i64));
334 let change = TestFlowChangeBuilder::new()
335 .insert_row(
336 RowNumber(timestamp as u64),
337 vec![Value::Int8(1i64), Value::Int8(timestamp as i64)],
338 )
339 .build();
340 result.push((timestamp, change));
341 }
342 }
343
344 result
345 }
346}
347
348#[cfg(test)]
349mod tests {
350 use super::{scenarios::*, *};
351
352 #[test]
353 fn test_single_stateful_helper() {
354 let mut helper = SingleStatefulTestHelper::counter();
355
356 assert!(!helper.has_state());
357
358 helper.set_state(&[Value::Int8(42i64)]);
359 assert!(helper.has_state());
360 helper.assert_state(&[Value::Int8(42i64)]);
361
362 helper.clear();
363 assert!(!helper.has_state());
364 }
365
366 #[test]
367 fn test_keyed_stateful_helper() {
368 let mut helper = KeyedStatefulTestHelper::sum();
369
370 helper.set_state("key1", &[Value::Int4(100)]);
371 helper.set_state("key2", &[Value::Int4(200)]);
372
373 helper.assert_count(2);
374 helper.assert_state("key1", &[Value::Int4(100)]);
375 helper.assert_state("key2", &[Value::Int4(200)]);
376
377 assert!(helper.has_state("key1"));
378 assert!(!helper.has_state("key3"));
379
380 let removed = helper.remove_state("key1");
381 assert_eq!(removed, Some(vec![Value::Int4(100)]));
382 helper.assert_count(1);
383 }
384
385 #[test]
386 fn test_window_stateful_helper() {
387 let mut helper = WindowStatefulTestHelper::time_window_counter(60);
388
389 let window1 = helper.window_for_timestamp(30);
390 let window2 = helper.window_for_timestamp(90);
391
392 helper.set_window_state(window1, "key1", &[Value::Int8(10i64)]);
393 helper.set_window_state(window2, "key1", &[Value::Int8(20i64)]);
394
395 helper.assert_window_count(2);
396 helper.assert_window_state(window1, "key1", &[Value::Int8(10i64)]);
397 helper.assert_window_state(window2, "key1", &[Value::Int8(20i64)]);
398
399 assert_eq!(helper.window_ids(), vec![window1, window2]);
400 assert_eq!(helper.window_key_count(window1), 1);
401 }
402
403 #[test]
404 fn test_scenarios() {
405 let changes = counter_inserts(3);
407 assert_eq!(changes.len(), 3);
408
409 let grouped = grouped_inserts(&[("a", 10), ("b", 20), ("a", 30)]);
411 assert_eq!(grouped.diffs.len(), 3);
412
413 let update = state_updates(1, 10, 20);
415 assert_eq!(update.diffs.len(), 1);
416
417 let windowed = windowed_events(60, 2, 2);
419 assert_eq!(windowed.len(), 4); }
421
422 #[test]
423 fn test_into_encoded_key_with_strings() {
424 let mut helper = KeyedStatefulTestHelper::sum();
426
427 helper.set_state("string_key_1", &[Value::Int4(42)]);
429 helper.set_state("string_key_2", &[Value::Int4(100)]);
430
431 let key = String::from("dynamic_key");
433 helper.set_state(key.clone(), &[Value::Int4(200)]);
434
435 helper.set_state(123u32, &[Value::Int4(300)]);
437 helper.set_state(456u64, &[Value::Int4(400)]);
438
439 assert_eq!(helper.get_state("string_key_1"), Some(vec![Value::Int4(42)]));
441 assert_eq!(helper.get_state("string_key_2"), Some(vec![Value::Int4(100)]));
442 assert_eq!(helper.get_state(key), Some(vec![Value::Int4(200)]));
443 assert_eq!(helper.get_state(123u32), Some(vec![Value::Int4(300)]));
444 assert_eq!(helper.get_state(456u64), Some(vec![Value::Int4(400)]));
445
446 assert_eq!(helper.state_count(), 5);
447 }
448}