use std::collections::HashMap;
use reifydb_abi::operator::capabilities::CAPABILITY_ALL_STANDARD;
use reifydb_core::interface::catalog::flow::FlowNodeId;
use reifydb_sdk::{
error::Result,
operator::{
FFIOperator, FFIOperatorMetadata, change::BorrowedChange, column::operator::OperatorColumn,
context::OperatorContext,
},
state::cache::StateCache,
testing::{builders::TestChangeBuilder, harness::TestHarnessBuilder},
};
use reifydb_type::value::{Value, row_number::RowNumber};
use serde::{Deserialize, Serialize};
#[derive(Default, Clone, Serialize, Deserialize, Debug, PartialEq)]
struct CounterState {
count: i64,
}
#[derive(Default, Clone, Serialize, Deserialize, Debug, PartialEq)]
struct SumState {
total: i64,
}
struct PassthroughOperator;
impl FFIOperatorMetadata for PassthroughOperator {
const NAME: &'static str = "passthrough";
const API: u32 = 1;
const VERSION: &'static str = "1.0.0";
const DESCRIPTION: &'static str = "Pass-through operator for testing";
const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
}
impl FFIOperator for PassthroughOperator {
fn new(_operator_id: FlowNodeId, _config: &HashMap<String, Value>) -> Result<Self> {
Ok(Self)
}
fn apply(&mut self, _ctx: &mut OperatorContext, _input: BorrowedChange<'_>) -> Result<()> {
Ok(())
}
fn pull(&mut self, _ctx: &mut OperatorContext, _row_numbers: &[RowNumber]) -> Result<()> {
Ok(())
}
}
#[test]
fn test_cache_set_and_get() {
let mut harness = TestHarnessBuilder::<PassthroughOperator>::new().build().expect("Failed to build harness");
let mut cache: StateCache<String, CounterState> = StateCache::new(10);
let key = "test_key".to_string();
let value = CounterState {
count: 42,
};
let mut ctx = harness.create_operator_context();
cache.set(&mut ctx, &key, &value).expect("Set failed");
assert!(cache.is_cached(&key));
let mut ctx = harness.create_operator_context();
let retrieved = cache.get(&mut ctx, &key).expect("Get failed");
assert_eq!(retrieved, Some(value));
}
#[test]
fn test_cache_flush_persists_to_ffi() {
let mut harness = TestHarnessBuilder::<PassthroughOperator>::new().build().expect("Failed to build harness");
let mut cache: StateCache<String, CounterState> = StateCache::new(10);
let key = "persist_key".to_string();
let value = CounterState {
count: 100,
};
let mut ctx = harness.create_operator_context();
cache.set(&mut ctx, &key, &value).expect("Set failed");
assert_eq!(harness.state().len(), 0, "Set must not write through pre-flush");
let mut ctx = harness.create_operator_context();
cache.flush(&mut ctx).expect("Flush failed");
assert!(harness.state().len() > 0, "State should be persisted after flush");
}
#[test]
fn test_cache_get_or_default_creates_default() {
let mut harness = TestHarnessBuilder::<PassthroughOperator>::new().build().expect("Failed to build harness");
let mut cache: StateCache<String, CounterState> = StateCache::new(10);
let key = "new_key".to_string();
let mut ctx = harness.create_operator_context();
let result = cache.get_or_default(&mut ctx, &key).expect("get_or_default failed");
assert_eq!(result.count, 0); }
#[test]
fn test_cache_get_or_default_returns_existing() {
let mut harness = TestHarnessBuilder::<PassthroughOperator>::new().build().expect("Failed to build harness");
let mut cache: StateCache<String, CounterState> = StateCache::new(10);
let key = "existing_key".to_string();
let value = CounterState {
count: 50,
};
{
let mut ctx = harness.create_operator_context();
cache.set(&mut ctx, &key, &value).expect("Set failed");
}
{
let mut ctx = harness.create_operator_context();
let result = cache.get_or_default(&mut ctx, &key).expect("get_or_default failed");
assert_eq!(result.count, 50, "Should return existing value, not default");
}
}
#[test]
fn test_cache_update() {
let mut harness = TestHarnessBuilder::<PassthroughOperator>::new().build().expect("Failed to build harness");
let mut cache: StateCache<String, CounterState> = StateCache::new(10);
let key = "counter".to_string();
{
let mut ctx = harness.create_operator_context();
let result = cache
.update(&mut ctx, &key, |s| {
s.count += 10;
Ok(())
})
.expect("Update failed");
assert_eq!(result.count, 10);
}
{
let mut ctx = harness.create_operator_context();
let result = cache
.update(&mut ctx, &key, |s| {
s.count += 5;
Ok(())
})
.expect("Update failed");
assert_eq!(result.count, 15);
}
assert!(cache.is_cached(&key));
}
#[test]
fn test_cache_remove() {
let mut harness = TestHarnessBuilder::<PassthroughOperator>::new().build().expect("Failed to build harness");
let mut cache: StateCache<String, CounterState> = StateCache::new(10);
let key = "remove_key".to_string();
let value = CounterState {
count: 42,
};
{
let mut ctx = harness.create_operator_context();
cache.set(&mut ctx, &key, &value).expect("Set failed");
cache.flush(&mut ctx).expect("Flush failed");
}
assert!(cache.is_cached(&key));
assert!(harness.state().len() > 0);
{
let mut ctx = harness.create_operator_context();
cache.remove(&mut ctx, &key).expect("Remove failed");
cache.flush(&mut ctx).expect("Flush failed");
}
assert!(!cache.is_cached(&key));
{
let mut ctx = harness.create_operator_context();
let result = cache.get(&mut ctx, &key).expect("Get failed");
assert_eq!(result, None);
}
}
#[test]
fn test_cache_invalidate_only_clears_cache() {
let mut harness = TestHarnessBuilder::<PassthroughOperator>::new().build().expect("Failed to build harness");
let mut cache: StateCache<String, CounterState> = StateCache::new(10);
let key = "invalidate_key".to_string();
let value = CounterState {
count: 77,
};
{
let mut ctx = harness.create_operator_context();
cache.set(&mut ctx, &key, &value).expect("Set failed");
cache.flush(&mut ctx).expect("Flush failed");
}
assert!(cache.is_cached(&key));
cache.invalidate(&key);
assert!(!cache.is_cached(&key));
{
let mut ctx = harness.create_operator_context();
let retrieved = cache.get(&mut ctx, &key).expect("Get failed");
assert_eq!(retrieved, Some(value));
}
assert!(cache.is_cached(&key));
}
#[test]
fn test_cache_clear_cache() {
let mut harness = TestHarnessBuilder::<PassthroughOperator>::new().build().expect("Failed to build harness");
let mut cache: StateCache<String, CounterState> = StateCache::new(10);
{
let mut ctx = harness.create_operator_context();
for i in 0..3 {
let key = format!("key_{}", i);
let value = CounterState {
count: i,
};
cache.set(&mut ctx, &key, &value).expect("Set failed");
}
cache.flush(&mut ctx).expect("Flush failed");
}
assert_eq!(cache.len(), 3);
cache.clear_cache();
assert!(cache.is_empty());
{
let mut ctx = harness.create_operator_context();
let result = cache.get(&mut ctx, &"key_0".to_string()).expect("Get failed");
assert!(result.is_some(), "FFI state should still exist");
}
}
#[test]
fn test_cache_multiple_keys() {
let mut harness = TestHarnessBuilder::<PassthroughOperator>::new().build().expect("Failed to build harness");
let mut cache: StateCache<String, SumState> = StateCache::new(10);
{
let mut ctx = harness.create_operator_context();
for i in 0..5 {
let key = format!("sum_{}", i);
let value = SumState {
total: i * 10,
};
cache.set(&mut ctx, &key, &value).expect("Set failed");
}
}
assert_eq!(cache.len(), 5);
{
let mut ctx = harness.create_operator_context();
for i in 0..5 {
let key = format!("sum_{}", i);
let result = cache.get(&mut ctx, &key).expect("Get failed");
assert_eq!(
result,
Some(SumState {
total: i * 10
})
);
}
}
}
#[test]
fn test_cache_lru_eviction() {
let mut harness = TestHarnessBuilder::<PassthroughOperator>::new().build().expect("Failed to build harness");
let mut cache: StateCache<String, CounterState> = StateCache::new(3);
{
let mut ctx = harness.create_operator_context();
for i in 0..3 {
let key = format!("key_{}", i);
let value = CounterState {
count: i,
};
cache.set(&mut ctx, &key, &value).expect("Set failed");
}
cache.flush(&mut ctx).expect("Flush failed");
}
assert_eq!(cache.len(), 3);
assert!(cache.is_cached(&"key_0".to_string()));
assert!(cache.is_cached(&"key_1".to_string()));
assert!(cache.is_cached(&"key_2".to_string()));
{
let mut ctx = harness.create_operator_context();
let key = "key_3".to_string();
let value = CounterState {
count: 3,
};
cache.set(&mut ctx, &key, &value).expect("Set failed");
cache.flush(&mut ctx).expect("Flush failed");
}
assert!(!cache.is_cached(&"key_0".to_string()), "key_0 should be evicted");
assert!(cache.is_cached(&"key_3".to_string()), "key_3 should be cached");
assert_eq!(cache.len(), 3);
{
let mut ctx = harness.create_operator_context();
let result = cache.get(&mut ctx, &"key_0".to_string()).expect("Get failed");
assert_eq!(
result,
Some(CounterState {
count: 0
}),
"key_0 should still exist in FFI"
);
}
}
#[test]
fn test_cache_lru_access_updates_order() {
let mut harness = TestHarnessBuilder::<PassthroughOperator>::new().build().expect("Failed to build harness");
let mut cache: StateCache<String, CounterState> = StateCache::new(3);
{
let mut ctx = harness.create_operator_context();
for i in 0..3 {
let key = format!("key_{}", i);
let value = CounterState {
count: i,
};
cache.set(&mut ctx, &key, &value).expect("Set failed");
}
}
{
let mut ctx = harness.create_operator_context();
cache.get(&mut ctx, &"key_0".to_string()).expect("Get failed");
}
{
let mut ctx = harness.create_operator_context();
let key = "key_3".to_string();
let value = CounterState {
count: 3,
};
cache.set(&mut ctx, &key, &value).expect("Set failed");
}
assert!(cache.is_cached(&"key_0".to_string()), "key_0 should be cached (recently accessed)");
assert!(!cache.is_cached(&"key_1".to_string()), "key_1 should be evicted (LRU)");
assert!(cache.is_cached(&"key_2".to_string()));
assert!(cache.is_cached(&"key_3".to_string()));
}
#[test]
fn test_cache_tuple_keys() {
let mut harness = TestHarnessBuilder::<PassthroughOperator>::new().build().expect("Failed to build harness");
let mut cache: StateCache<(String, String), SumState> = StateCache::new(10);
let key1 = ("base".to_string(), "quote".to_string());
let key2 = ("foo".to_string(), "bar".to_string());
let value1 = SumState {
total: 100,
};
let value2 = SumState {
total: 200,
};
{
let mut ctx = harness.create_operator_context();
cache.set(&mut ctx, &key1, &value1).expect("Set failed");
cache.set(&mut ctx, &key2, &value2).expect("Set failed");
}
assert!(cache.is_cached(&key1));
assert!(cache.is_cached(&key2));
{
let mut ctx = harness.create_operator_context();
let result1 = cache.get(&mut ctx, &key1).expect("Get failed");
let result2 = cache.get(&mut ctx, &key2).expect("Get failed");
assert_eq!(result1, Some(value1));
assert_eq!(result2, Some(value2));
}
}
#[test]
fn test_cache_tuple_key_update() {
let mut harness = TestHarnessBuilder::<PassthroughOperator>::new().build().expect("Failed to build harness");
let mut cache: StateCache<(String, String), SumState> = StateCache::new(10);
let key = ("account".to_string(), "balance".to_string());
{
let mut ctx = harness.create_operator_context();
let result = cache
.update(&mut ctx, &key, |s| {
s.total += 500;
Ok(())
})
.expect("Update failed");
assert_eq!(result.total, 500);
}
{
let mut ctx = harness.create_operator_context();
let result = cache
.update(&mut ctx, &key, |s| {
s.total += 250;
Ok(())
})
.expect("Update failed");
assert_eq!(result.total, 750);
}
}
#[test]
fn test_cache_capacity() {
let cache: StateCache<String, CounterState> = StateCache::new(100);
assert_eq!(cache.capacity(), 100);
}
#[test]
fn test_cache_len_and_is_empty() {
let mut harness = TestHarnessBuilder::<PassthroughOperator>::new().build().expect("Failed to build harness");
let mut cache: StateCache<String, CounterState> = StateCache::new(10);
assert!(cache.is_empty());
assert_eq!(cache.len(), 0);
{
let mut ctx = harness.create_operator_context();
for i in 0..3 {
let key = format!("key_{}", i);
let value = CounterState {
count: i,
};
cache.set(&mut ctx, &key, &value).expect("Set failed");
}
}
assert!(!cache.is_empty());
assert_eq!(cache.len(), 3);
}
#[test]
fn test_cache_miss_then_hit() {
let mut harness = TestHarnessBuilder::<PassthroughOperator>::new().build().expect("Failed to build harness");
let mut cache: StateCache<String, CounterState> = StateCache::new(10);
let key = "miss_hit_key".to_string();
let value = CounterState {
count: 123,
};
{
let mut ctx = harness.create_operator_context();
cache.set(&mut ctx, &key, &value).expect("Set failed");
cache.flush(&mut ctx).expect("Flush failed");
}
cache.invalidate(&key);
assert!(!cache.is_cached(&key));
{
let mut ctx = harness.create_operator_context();
let result = cache.get(&mut ctx, &key).expect("Get failed");
assert_eq!(result, Some(value.clone()));
}
assert!(cache.is_cached(&key));
{
let mut ctx = harness.create_operator_context();
let result = cache.get(&mut ctx, &key).expect("Get failed");
assert_eq!(result, Some(value));
}
}
#[test]
fn test_cache_with_operator_apply() {
let mut harness = TestHarnessBuilder::<PassthroughOperator>::new().build().expect("Failed to build harness");
let mut cache: StateCache<String, CounterState> = StateCache::new(10);
let input = TestChangeBuilder::new()
.insert_row(1, vec![Value::Int8(10i64)])
.insert_row(2, vec![Value::Int8(20i64)])
.build();
{
let mut ctx = harness.create_operator_context();
let diff_count = input.diffs.len() as i64;
cache.update(&mut ctx, &"event_counter".to_string(), |s| {
s.count += diff_count;
Ok(())
})
.expect("Update failed");
}
let input2 = TestChangeBuilder::new().insert_row(3, vec![Value::Int8(30i64)]).build();
{
let mut ctx = harness.create_operator_context();
let diff_count = input2.diffs.len() as i64;
cache.update(&mut ctx, &"event_counter".to_string(), |s| {
s.count += diff_count;
Ok(())
})
.expect("Update failed");
}
{
let mut ctx = harness.create_operator_context();
let result = cache.get(&mut ctx, &"event_counter".to_string()).expect("Get failed");
assert_eq!(
result,
Some(CounterState {
count: 3
})
);
}
}