use std::collections::HashMap;
use reifydb_core::encoded::{
key::{EncodedKey, IntoEncodedKey},
row::EncodedRow,
shape::RowShape,
};
use reifydb_type::{
util::cowvec::CowVec,
value::{Value, r#type::Type},
};
use super::helpers::get_values;
pub struct SingleStatefulTestHelper {
shape: RowShape,
state: Option<Vec<u8>>,
}
impl SingleStatefulTestHelper {
pub fn new(shape: RowShape) -> Self {
Self {
shape,
state: None,
}
}
pub fn counter() -> Self {
Self::new(RowShape::testing(&[Type::Int8]))
}
pub fn set_state(&mut self, values: &[Value]) {
let mut encoded = self.shape.allocate();
self.shape.set_values(&mut encoded, values);
self.state = Some(encoded.0.to_vec());
}
pub fn get_state(&self) -> Option<Vec<Value>> {
self.state.as_ref().map(|bytes| {
let encoded = EncodedRow(CowVec::new(bytes.clone()));
get_values(&self.shape, &encoded)
})
}
pub fn assert_state(&self, expected: &[Value]) {
let actual = self.get_state().expect("No state set");
assert_eq!(actual, expected, "State mismatch");
}
pub fn clear(&mut self) {
self.state = None;
}
pub fn has_state(&self) -> bool {
self.state.is_some()
}
}
pub struct KeyedStatefulTestHelper {
shape: RowShape,
states: HashMap<EncodedKey, EncodedRow>,
}
impl KeyedStatefulTestHelper {
pub fn new(shape: RowShape) -> Self {
Self {
shape,
states: HashMap::new(),
}
}
pub fn counter() -> Self {
Self::new(RowShape::testing(&[Type::Int8]))
}
pub fn sum() -> Self {
Self::new(RowShape::testing(&[Type::Int4]))
}
pub fn set_state<K>(&mut self, key: K, values: &[Value])
where
K: IntoEncodedKey,
{
let mut encoded = self.shape.allocate();
self.shape.set_values(&mut encoded, values);
self.states.insert(key.into_encoded_key(), encoded);
}
pub fn get_state<K>(&self, key: K) -> Option<Vec<Value>>
where
K: IntoEncodedKey,
{
self.states.get(&key.into_encoded_key()).map(|encoded| get_values(&self.shape, encoded))
}
pub fn assert_state<K>(&self, key: K, expected: &[Value])
where
K: IntoEncodedKey,
{
let key_encoded = key.into_encoded_key();
let actual = self
.states
.get(&key_encoded)
.map(|encoded| get_values(&self.shape, encoded))
.expect("No state for key");
assert_eq!(actual, expected, "State mismatch for key");
}
pub fn remove_state<K>(&mut self, key: K) -> Option<Vec<Value>>
where
K: IntoEncodedKey,
{
self.states.remove(&key.into_encoded_key()).map(|encoded| get_values(&self.shape, &encoded))
}
pub fn has_state<K>(&self, key: K) -> bool
where
K: IntoEncodedKey,
{
self.states.contains_key(&key.into_encoded_key())
}
pub fn state_count(&self) -> usize {
self.states.len()
}
pub fn clear(&mut self) {
self.states.clear();
}
pub fn keys(&self) -> Vec<&EncodedKey> {
self.states.keys().collect()
}
pub fn assert_count(&self, expected: usize) {
assert_eq!(self.state_count(), expected, "Expected {} states, found {}", expected, self.state_count());
}
}
pub struct WindowStatefulTestHelper {
shape: RowShape,
windows: HashMap<i64, HashMap<EncodedKey, EncodedRow>>, window_size: i64,
}
impl WindowStatefulTestHelper {
pub fn new(shape: RowShape, window_size: i64) -> Self {
Self {
shape,
windows: HashMap::new(),
window_size,
}
}
pub fn time_window_counter(window_size_seconds: i64) -> Self {
Self::new(RowShape::testing(&[Type::Int8]), window_size_seconds)
}
pub fn count_window_sum(window_size_count: i64) -> Self {
Self::new(RowShape::testing(&[Type::Int4]), window_size_count)
}
pub fn set_window_state<K>(&mut self, window_id: i64, key: K, values: &[Value])
where
K: IntoEncodedKey,
{
let mut encoded = self.shape.allocate();
self.shape.set_values(&mut encoded, values);
self.windows.entry(window_id).or_default().insert(key.into_encoded_key(), encoded);
}
pub fn get_window_state<K>(&self, window_id: i64, key: K) -> Option<Vec<Value>>
where
K: IntoEncodedKey,
{
self.windows
.get(&window_id)
.and_then(|window| window.get(&key.into_encoded_key()))
.map(|encoded| get_values(&self.shape, encoded))
}
pub fn assert_window_state<K>(&self, window_id: i64, key: K, expected: &[Value])
where
K: IntoEncodedKey,
{
let key_encoded = key.into_encoded_key();
let actual = self
.windows
.get(&window_id)
.and_then(|window| window.get(&key_encoded))
.map(|encoded| get_values(&self.shape, encoded))
.expect("No state for window and key");
assert_eq!(actual, expected, "State mismatch for window {} and key", window_id);
}
pub fn get_window(&self, window_id: i64) -> Option<&HashMap<EncodedKey, EncodedRow>> {
self.windows.get(&window_id)
}
pub fn remove_window(&mut self, window_id: i64) -> Option<HashMap<EncodedKey, EncodedRow>> {
self.windows.remove(&window_id)
}
pub fn has_window(&self, window_id: i64) -> bool {
self.windows.contains_key(&window_id)
}
pub fn window_count(&self) -> usize {
self.windows.len()
}
pub fn window_key_count(&self, window_id: i64) -> usize {
self.windows.get(&window_id).map(|w| w.len()).unwrap_or(0)
}
pub fn clear(&mut self) {
self.windows.clear();
}
pub fn window_ids(&self) -> Vec<i64> {
let mut ids: Vec<_> = self.windows.keys().copied().collect();
ids.sort();
ids
}
pub fn assert_window_count(&self, expected: usize) {
assert_eq!(
self.window_count(),
expected,
"Expected {} windows, found {}",
expected,
self.window_count()
);
}
pub fn window_for_timestamp(&self, timestamp: i64) -> i64 {
timestamp / self.window_size
}
}
pub mod scenarios {
use reifydb_core::interface::change::Change;
use reifydb_type::value::row_number::RowNumber;
use super::*;
use crate::testing::builders::TestChangeBuilder;
pub fn counter_inserts(count: usize) -> Vec<Change> {
(0..count)
.map(|i| {
TestChangeBuilder::new()
.insert_row(RowNumber(i as u64), vec![Value::Int8(1i64)])
.build()
})
.collect()
}
pub fn grouped_inserts(groups: &[(&str, i32)]) -> Change {
let mut builder = TestChangeBuilder::new();
for (i, (key, value)) in groups.iter().enumerate() {
builder = builder
.insert_row(RowNumber(i as u64), vec![Value::Utf8((*key).into()), Value::Int4(*value)]);
}
builder.build()
}
pub fn state_updates(row_number: i64, old_value: i8, new_value: i8) -> Change {
TestChangeBuilder::new()
.update_row(
RowNumber(row_number as u64),
vec![Value::Int8(old_value as i64)],
vec![Value::Int8(new_value as i64)],
)
.build()
}
pub fn windowed_events(window_size: i64, events_per_window: usize, windows: usize) -> Vec<(i64, Change)> {
let mut result = Vec::new();
for window in 0..windows {
let base_time = window as i64 * window_size;
for event in 0..events_per_window {
let timestamp = base_time + (event as i64 * (window_size / events_per_window as i64));
let change = TestChangeBuilder::new()
.insert_row(
RowNumber(timestamp as u64),
vec![Value::Int8(1i64), Value::Int8(timestamp)],
)
.build();
result.push((timestamp, change));
}
}
result
}
}
#[cfg(test)]
pub mod tests {
use super::{scenarios::*, *};
#[test]
fn test_single_stateful_helper() {
let mut helper = SingleStatefulTestHelper::counter();
assert!(!helper.has_state());
helper.set_state(&[Value::Int8(42i64)]);
assert!(helper.has_state());
helper.assert_state(&[Value::Int8(42i64)]);
helper.clear();
assert!(!helper.has_state());
}
#[test]
fn test_keyed_stateful_helper() {
let mut helper = KeyedStatefulTestHelper::sum();
helper.set_state("key1", &[Value::Int4(100)]);
helper.set_state("key2", &[Value::Int4(200)]);
helper.assert_count(2);
helper.assert_state("key1", &[Value::Int4(100)]);
helper.assert_state("key2", &[Value::Int4(200)]);
assert!(helper.has_state("key1"));
assert!(!helper.has_state("key3"));
let removed = helper.remove_state("key1");
assert_eq!(removed, Some(vec![Value::Int4(100)]));
helper.assert_count(1);
}
#[test]
fn test_window_stateful_helper() {
let mut helper = WindowStatefulTestHelper::time_window_counter(60);
let window1 = helper.window_for_timestamp(30);
let window2 = helper.window_for_timestamp(90);
helper.set_window_state(window1, "key1", &[Value::Int8(10i64)]);
helper.set_window_state(window2, "key1", &[Value::Int8(20i64)]);
helper.assert_window_count(2);
helper.assert_window_state(window1, "key1", &[Value::Int8(10i64)]);
helper.assert_window_state(window2, "key1", &[Value::Int8(20i64)]);
assert_eq!(helper.window_ids(), vec![window1, window2]);
assert_eq!(helper.window_key_count(window1), 1);
}
#[test]
fn test_scenarios() {
let changes = counter_inserts(3);
assert_eq!(changes.len(), 3);
let grouped = grouped_inserts(&[("a", 10), ("b", 20), ("a", 30)]);
assert_eq!(grouped.diffs.len(), 3);
let update = state_updates(1, 10, 20);
assert_eq!(update.diffs.len(), 1);
let windowed = windowed_events(60, 2, 2);
assert_eq!(windowed.len(), 4); }
#[test]
fn test_into_encoded_key_with_strings() {
let mut helper = KeyedStatefulTestHelper::sum();
helper.set_state("string_key_1", &[Value::Int4(42)]);
helper.set_state("string_key_2", &[Value::Int4(100)]);
let key = String::from("dynamic_key");
helper.set_state(key.clone(), &[Value::Int4(200)]);
helper.set_state(123u32, &[Value::Int4(300)]);
helper.set_state(456u64, &[Value::Int4(400)]);
assert_eq!(helper.get_state("string_key_1"), Some(vec![Value::Int4(42)]));
assert_eq!(helper.get_state("string_key_2"), Some(vec![Value::Int4(100)]));
assert_eq!(helper.get_state(key), Some(vec![Value::Int4(200)]));
assert_eq!(helper.get_state(123u32), Some(vec![Value::Int4(300)]));
assert_eq!(helper.get_state(456u64), Some(vec![Value::Int4(400)]));
assert_eq!(helper.state_count(), 5);
}
}