reifydb_flow_operator_sdk/testing/
stateful.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::collections::HashMap;
5
6use reifydb_core::{
7	CowVec,
8	value::encoded::{EncodedKey, EncodedValues, EncodedValuesLayout, IntoEncodedKey},
9};
10use reifydb_type::{Type, Value};
11
12/// Test helper for FFISingleStateful operators
13pub struct SingleStatefulTestHelper {
14	layout: EncodedValuesLayout,
15	state: Option<Vec<u8>>,
16}
17
18impl SingleStatefulTestHelper {
19	/// Create a new single stateful test helper
20	pub fn new(layout: EncodedValuesLayout) -> Self {
21		Self {
22			layout,
23			state: None,
24		}
25	}
26
27	/// Create with a simple counter layout (single int8)
28	pub fn counter() -> Self {
29		Self::new(EncodedValuesLayout::new(&[Type::Int8]))
30	}
31
32	/// Set the current state
33	pub fn set_state(&mut self, values: &[Value]) {
34		let mut encoded = self.layout.allocate();
35		self.layout.set_values(&mut encoded, values);
36		self.state = Some(encoded.0.to_vec());
37	}
38
39	/// Get the current state
40	pub fn get_state(&self) -> Option<Vec<Value>> {
41		self.state.as_ref().map(|bytes| {
42			let encoded = EncodedValues(CowVec::new(bytes.clone()));
43			super::helpers::get_values(&self.layout, &encoded)
44		})
45	}
46
47	/// Assert the state matches expected values
48	pub fn assert_state(&self, expected: &[Value]) {
49		let actual = self.get_state().expect("No state set");
50		assert_eq!(actual, expected, "State mismatch");
51	}
52
53	/// Clear the state
54	pub fn clear(&mut self) {
55		self.state = None;
56	}
57
58	/// Check if state exists
59	pub fn has_state(&self) -> bool {
60		self.state.is_some()
61	}
62}
63
64/// Test helper for FFIKeyedStateful operators
65pub struct KeyedStatefulTestHelper {
66	layout: EncodedValuesLayout,
67	states: HashMap<EncodedKey, EncodedValues>,
68}
69
70impl KeyedStatefulTestHelper {
71	/// Create a new keyed stateful test helper
72	pub fn new(layout: EncodedValuesLayout) -> Self {
73		Self {
74			layout,
75			states: HashMap::new(),
76		}
77	}
78
79	/// Create with a simple counter layout (single int8)
80	pub fn counter() -> Self {
81		Self::new(EncodedValuesLayout::new(&[Type::Int8]))
82	}
83
84	/// Create with a sum layout (single int8 or int4)
85	pub fn sum() -> Self {
86		Self::new(EncodedValuesLayout::new(&[Type::Int4]))
87	}
88
89	/// Set state for a key
90	pub fn set_state<K>(&mut self, key: K, values: &[Value])
91	where
92		K: IntoEncodedKey,
93	{
94		let mut encoded = self.layout.allocate();
95		self.layout.set_values(&mut encoded, values);
96		self.states.insert(key.into_encoded_key(), encoded);
97	}
98
99	/// Get state for a key
100	pub fn get_state<K>(&self, key: K) -> Option<Vec<Value>>
101	where
102		K: IntoEncodedKey,
103	{
104		self.states
105			.get(&key.into_encoded_key())
106			.map(|encoded| super::helpers::get_values(&self.layout, encoded))
107	}
108
109	/// Assert state for a key matches expected values
110	pub fn assert_state<K>(&self, key: K, expected: &[Value])
111	where
112		K: IntoEncodedKey,
113	{
114		let key_encoded = key.into_encoded_key();
115		let actual = self
116			.states
117			.get(&key_encoded)
118			.map(|encoded| super::helpers::get_values(&self.layout, encoded))
119			.expect("No state for key");
120		assert_eq!(actual, expected, "State mismatch for key");
121	}
122
123	/// Remove state for a key
124	pub fn remove_state<K>(&mut self, key: K) -> Option<Vec<Value>>
125	where
126		K: IntoEncodedKey,
127	{
128		self.states
129			.remove(&key.into_encoded_key())
130			.map(|encoded| super::helpers::get_values(&self.layout, &encoded))
131	}
132
133	/// Check if a key has state
134	pub fn has_state<K>(&self, key: K) -> bool
135	where
136		K: IntoEncodedKey,
137	{
138		self.states.contains_key(&key.into_encoded_key())
139	}
140
141	/// Get the number of keys with state
142	pub fn state_count(&self) -> usize {
143		self.states.len()
144	}
145
146	/// Clear all states
147	pub fn clear(&mut self) {
148		self.states.clear();
149	}
150
151	/// Get all keys
152	pub fn keys(&self) -> Vec<&EncodedKey> {
153		self.states.keys().collect()
154	}
155
156	/// Assert the number of states
157	pub fn assert_count(&self, expected: usize) {
158		assert_eq!(self.state_count(), expected, "Expected {} states, found {}", expected, self.state_count());
159	}
160}
161
162/// Test helper for FFIWindowStateful operators
163pub struct WindowStatefulTestHelper {
164	layout: EncodedValuesLayout,
165	windows: HashMap<i64, HashMap<EncodedKey, EncodedValues>>, // window_id -> key -> state
166	window_size: i64,
167}
168
169impl WindowStatefulTestHelper {
170	/// Create a new window stateful test helper
171	pub fn new(layout: EncodedValuesLayout, window_size: i64) -> Self {
172		Self {
173			layout,
174			windows: HashMap::new(),
175			window_size,
176		}
177	}
178
179	/// Create with a counter layout for time windows
180	pub fn time_window_counter(window_size_seconds: i64) -> Self {
181		Self::new(EncodedValuesLayout::new(&[Type::Int8]), window_size_seconds)
182	}
183
184	/// Create with a sum layout for count windows
185	pub fn count_window_sum(window_size_count: i64) -> Self {
186		Self::new(EncodedValuesLayout::new(&[Type::Int4]), window_size_count)
187	}
188
189	/// Set state for a window and key
190	pub fn set_window_state<K>(&mut self, window_id: i64, key: K, values: &[Value])
191	where
192		K: IntoEncodedKey,
193	{
194		let mut encoded = self.layout.allocate();
195		self.layout.set_values(&mut encoded, values);
196
197		self.windows.entry(window_id).or_insert_with(HashMap::new).insert(key.into_encoded_key(), encoded);
198	}
199
200	/// Get state for a window and key
201	pub fn get_window_state<K>(&self, window_id: i64, key: K) -> Option<Vec<Value>>
202	where
203		K: IntoEncodedKey,
204	{
205		self.windows
206			.get(&window_id)
207			.and_then(|window| window.get(&key.into_encoded_key()))
208			.map(|encoded| super::helpers::get_values(&self.layout, encoded))
209	}
210
211	/// Assert state for a window and key
212	pub fn assert_window_state<K>(&self, window_id: i64, key: K, expected: &[Value])
213	where
214		K: IntoEncodedKey,
215	{
216		let key_encoded = key.into_encoded_key();
217		let actual = self
218			.windows
219			.get(&window_id)
220			.and_then(|window| window.get(&key_encoded))
221			.map(|encoded| super::helpers::get_values(&self.layout, encoded))
222			.expect("No state for window and key");
223		assert_eq!(actual, expected, "State mismatch for window {} and key", window_id);
224	}
225
226	/// Get all states for a window
227	pub fn get_window(&self, window_id: i64) -> Option<&HashMap<EncodedKey, EncodedValues>> {
228		self.windows.get(&window_id)
229	}
230
231	/// Remove a window
232	pub fn remove_window(&mut self, window_id: i64) -> Option<HashMap<EncodedKey, EncodedValues>> {
233		self.windows.remove(&window_id)
234	}
235
236	/// Check if a window exists
237	pub fn has_window(&self, window_id: i64) -> bool {
238		self.windows.contains_key(&window_id)
239	}
240
241	/// Get the number of windows
242	pub fn window_count(&self) -> usize {
243		self.windows.len()
244	}
245
246	/// Get the number of keys in a window
247	pub fn window_key_count(&self, window_id: i64) -> usize {
248		self.windows.get(&window_id).map(|w| w.len()).unwrap_or(0)
249	}
250
251	/// Clear all windows
252	pub fn clear(&mut self) {
253		self.windows.clear();
254	}
255
256	/// Get all window IDs
257	pub fn window_ids(&self) -> Vec<i64> {
258		let mut ids: Vec<_> = self.windows.keys().copied().collect();
259		ids.sort();
260		ids
261	}
262
263	/// Assert the number of windows
264	pub fn assert_window_count(&self, expected: usize) {
265		assert_eq!(
266			self.window_count(),
267			expected,
268			"Expected {} windows, found {}",
269			expected,
270			self.window_count()
271		);
272	}
273
274	/// Calculate the window ID for a timestamp
275	pub fn window_for_timestamp(&self, timestamp: i64) -> i64 {
276		timestamp / self.window_size
277	}
278}
279
280/// Common test scenarios for stateful operators
281pub mod scenarios {
282	use super::*;
283	use crate::testing::builders::TestFlowChangeBuilder;
284
285	/// Create a sequence of inserts for testing counters
286	pub fn counter_inserts(count: usize) -> Vec<crate::FlowChange> {
287		use reifydb_type::RowNumber;
288		(0..count)
289			.map(|i| {
290				TestFlowChangeBuilder::new()
291					.insert_row(RowNumber(i as u64), vec![Value::Int8(1i64)])
292					.build()
293			})
294			.collect()
295	}
296
297	/// Create a sequence of keyed inserts for group-by testing
298	pub fn grouped_inserts(groups: &[(&str, i32)]) -> crate::FlowChange {
299		use reifydb_type::RowNumber;
300		let mut builder = TestFlowChangeBuilder::new();
301		for (i, (key, value)) in groups.iter().enumerate() {
302			builder = builder
303				.insert_row(RowNumber(i as u64), vec![Value::Utf8((*key).into()), Value::Int4(*value)]);
304		}
305		builder.build()
306	}
307
308	/// Create a sequence of updates for testing state changes
309	pub fn state_updates(row_number: i64, old_value: i8, new_value: i8) -> crate::FlowChange {
310		use reifydb_type::RowNumber;
311		TestFlowChangeBuilder::new()
312			.update_row(
313				RowNumber(row_number as u64),
314				vec![Value::Int8(old_value as i64)],
315				vec![Value::Int8(new_value as i64)],
316			)
317			.build()
318	}
319
320	/// Create a windowed sequence of events
321	pub fn windowed_events(
322		window_size: i64,
323		events_per_window: usize,
324		windows: usize,
325	) -> Vec<(i64, crate::FlowChange)> {
326		use reifydb_type::RowNumber;
327		let mut result = Vec::new();
328
329		for window in 0..windows {
330			let base_time = window as i64 * window_size;
331
332			for event in 0..events_per_window {
333				let timestamp = base_time + (event as i64 * (window_size / events_per_window as i64));
334				let change = TestFlowChangeBuilder::new()
335					.insert_row(
336						RowNumber(timestamp as u64),
337						vec![Value::Int8(1i64), Value::Int8(timestamp as i64)],
338					)
339					.build();
340				result.push((timestamp, change));
341			}
342		}
343
344		result
345	}
346}
347
348#[cfg(test)]
349mod tests {
350	use super::{scenarios::*, *};
351
352	#[test]
353	fn test_single_stateful_helper() {
354		let mut helper = SingleStatefulTestHelper::counter();
355
356		assert!(!helper.has_state());
357
358		helper.set_state(&[Value::Int8(42i64)]);
359		assert!(helper.has_state());
360		helper.assert_state(&[Value::Int8(42i64)]);
361
362		helper.clear();
363		assert!(!helper.has_state());
364	}
365
366	#[test]
367	fn test_keyed_stateful_helper() {
368		let mut helper = KeyedStatefulTestHelper::sum();
369
370		helper.set_state("key1", &[Value::Int4(100)]);
371		helper.set_state("key2", &[Value::Int4(200)]);
372
373		helper.assert_count(2);
374		helper.assert_state("key1", &[Value::Int4(100)]);
375		helper.assert_state("key2", &[Value::Int4(200)]);
376
377		assert!(helper.has_state("key1"));
378		assert!(!helper.has_state("key3"));
379
380		let removed = helper.remove_state("key1");
381		assert_eq!(removed, Some(vec![Value::Int4(100)]));
382		helper.assert_count(1);
383	}
384
385	#[test]
386	fn test_window_stateful_helper() {
387		let mut helper = WindowStatefulTestHelper::time_window_counter(60);
388
389		let window1 = helper.window_for_timestamp(30);
390		let window2 = helper.window_for_timestamp(90);
391
392		helper.set_window_state(window1, "key1", &[Value::Int8(10i64)]);
393		helper.set_window_state(window2, "key1", &[Value::Int8(20i64)]);
394
395		helper.assert_window_count(2);
396		helper.assert_window_state(window1, "key1", &[Value::Int8(10i64)]);
397		helper.assert_window_state(window2, "key1", &[Value::Int8(20i64)]);
398
399		assert_eq!(helper.window_ids(), vec![window1, window2]);
400		assert_eq!(helper.window_key_count(window1), 1);
401	}
402
403	#[test]
404	fn test_scenarios() {
405		// Test counter inserts
406		let changes = counter_inserts(3);
407		assert_eq!(changes.len(), 3);
408
409		// Test grouped inserts
410		let grouped = grouped_inserts(&[("a", 10), ("b", 20), ("a", 30)]);
411		assert_eq!(grouped.diffs.len(), 3);
412
413		// Test state updates
414		let update = state_updates(1, 10, 20);
415		assert_eq!(update.diffs.len(), 1);
416
417		// Test windowed events
418		let windowed = windowed_events(60, 2, 2);
419		assert_eq!(windowed.len(), 4); // 2 windows * 2 events per window
420	}
421
422	#[test]
423	fn test_into_encoded_key_with_strings() {
424		// This test verifies that IntoEncodedKey works with string literals
425		let mut helper = KeyedStatefulTestHelper::sum();
426
427		// Test with &str literals
428		helper.set_state("string_key_1", &[Value::Int4(42)]);
429		helper.set_state("string_key_2", &[Value::Int4(100)]);
430
431		// Test with String
432		let key = String::from("dynamic_key");
433		helper.set_state(key.clone(), &[Value::Int4(200)]);
434
435		// Test with numeric keys
436		helper.set_state(123u32, &[Value::Int4(300)]);
437		helper.set_state(456u64, &[Value::Int4(400)]);
438
439		// Verify all keys work
440		assert_eq!(helper.get_state("string_key_1"), Some(vec![Value::Int4(42)]));
441		assert_eq!(helper.get_state("string_key_2"), Some(vec![Value::Int4(100)]));
442		assert_eq!(helper.get_state(key), Some(vec![Value::Int4(200)]));
443		assert_eq!(helper.get_state(123u32), Some(vec![Value::Int4(300)]));
444		assert_eq!(helper.get_state(456u64), Some(vec![Value::Int4(400)]));
445
446		assert_eq!(helper.state_count(), 5);
447	}
448}