use reifydb_core::{
encoded::{
key::{EncodedKey, EncodedKeyRange},
row::EncodedRow,
schema::RowSchema,
},
key::{EncodableKey, flow_node_state::FlowNodeStateKey},
};
use reifydb_type::Result;
use super::utils;
use crate::{operator::stateful::raw::RawStatefulOperator, transaction::FlowTransaction};
pub trait WindowStateful: RawStatefulOperator {
fn layout(&self) -> RowSchema;
fn create_state(&self) -> EncodedRow {
let layout = self.layout();
layout.allocate()
}
fn load_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey) -> Result<EncodedRow> {
utils::load_or_create_row(self.id(), txn, window_key, &self.layout())
}
fn save_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey, row: EncodedRow) -> Result<()> {
utils::save_row(self.id(), txn, window_key, row)
}
fn scan_keys_in_range(&self, txn: &mut FlowTransaction, range: &EncodedKeyRange) -> Result<Vec<EncodedKey>> {
let prefixed_range = range.clone().with_prefix(FlowNodeStateKey::new(self.id(), vec![]).encode());
let mut stream = txn.range(prefixed_range, 1024);
let mut keys = Vec::new();
while let Some(result) = stream.next() {
let multi = result?;
keys.push(EncodedKey::new(multi.key.to_vec()));
}
Ok(keys)
}
fn expire_range(&self, txn: &mut FlowTransaction, range: EncodedKeyRange) -> Result<u32> {
let prefixed_range = range.with_prefix(FlowNodeStateKey::new(self.id(), vec![]).encode());
let keys_to_remove = {
let mut stream = txn.range(prefixed_range, 1024);
let mut keys = Vec::new();
while let Some(result) = stream.next() {
let multi = result?;
keys.push(multi.key);
}
keys
};
let mut count = 0;
for key in keys_to_remove {
txn.remove(&key)?;
count += 1;
}
Ok(count as u32)
}
}
#[cfg(test)]
pub mod tests {
use std::ops::Bound::{Excluded, Unbounded};
use reifydb_catalog::catalog::Catalog;
use reifydb_core::{
common::CommitVersion, interface::catalog::flow::FlowNodeId,
util::encoding::keycode::serializer::KeySerializer,
};
use reifydb_transaction::interceptor::interceptors::Interceptors;
use super::*;
use crate::{operator::stateful::test_utils::test::*, transaction::FlowTransaction};
fn test_window_key(window_id: u64) -> EncodedKey {
let mut serializer = KeySerializer::with_capacity(16);
serializer.extend_bytes(b"w:");
serializer.extend_u64(window_id);
EncodedKey::new(serializer.finish())
}
impl WindowStateful for TestOperator {
fn layout(&self) -> RowSchema {
self.layout.clone()
}
}
#[test]
fn test_window_key_encoding() {
let key1 = test_window_key(1);
let key2 = test_window_key(2);
let key100 = test_window_key(100);
assert_ne!(key1.as_ref(), key2.as_ref());
assert_ne!(key1.as_ref(), key100.as_ref());
assert!(key1 > key2);
assert!(key2 > key100);
}
#[test]
fn test_create_state() {
let operator = TestOperator::simple(FlowNodeId(1));
let state = operator.create_state();
assert!(state.len() > 0);
}
#[test]
fn test_load_save_window_state() {
let mut txn = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
let operator = TestOperator::simple(FlowNodeId(1));
let window_key = test_window_key(42);
let state1 = operator.load_state(&mut txn, &window_key).unwrap();
let mut modified = state1.clone();
let layout = operator.layout();
layout.set_i64(&mut modified, 0, 0xAB);
operator.save_state(&mut txn, &window_key, modified.clone()).unwrap();
let state2 = operator.load_state(&mut txn, &window_key).unwrap();
assert_eq!(layout.get_i64(&state2, 0), 0xAB);
}
#[test]
fn test_multiple_windows() {
let mut txn = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
let operator = TestOperator::simple(FlowNodeId(1));
let window_keys: Vec<_> = (0..5).map(|i| test_window_key(i)).collect();
let layout = operator.layout();
for (i, window_key) in window_keys.iter().enumerate() {
let mut state = operator.create_state();
layout.set_i64(&mut state, 0, i as i64);
operator.save_state(&mut txn, window_key, state).unwrap();
}
for (i, window_key) in window_keys.iter().enumerate() {
let state = operator.load_state(&mut txn, window_key).unwrap();
assert_eq!(layout.get_i64(&state, 0), i as i64);
}
}
#[test]
fn test_expire_before() {
let mut txn = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
let operator = TestOperator::simple(FlowNodeId(1));
let window_keys: Vec<_> = (0..10).map(|i| test_window_key(i)).collect();
let layout = operator.layout();
for (i, window_key) in window_keys.iter().enumerate() {
let mut state = operator.create_state();
layout.set_i64(&mut state, 0, i as i64);
operator.save_state(&mut txn, window_key, state).unwrap();
}
let before_key = test_window_key(5);
let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
let expired = operator.expire_range(&mut txn, range).unwrap();
assert_eq!(expired, 5);
for i in 0..5 {
let state = operator.load_state(&mut txn, &window_keys[i]).unwrap();
assert_eq!(layout.get_i64(&state, 0), 0); }
for i in 5..10 {
let state = operator.load_state(&mut txn, &window_keys[i]).unwrap();
assert_eq!(layout.get_i64(&state, 0), i as i64);
}
}
#[test]
fn test_expire_empty_range() {
let mut txn = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
let operator = TestOperator::simple(FlowNodeId(1));
let window_keys: Vec<_> = (5..10).map(|i| test_window_key(i)).collect();
let layout = operator.layout();
for (idx, window_key) in window_keys.iter().enumerate() {
let mut state = operator.create_state();
layout.set_i64(&mut state, 0, (idx + 5) as i64);
operator.save_state(&mut txn, window_key, state).unwrap();
}
let before_key = test_window_key(3);
let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
let expired = operator.expire_range(&mut txn, range).unwrap();
assert_eq!(expired, 0);
for (idx, window_key) in window_keys.iter().enumerate() {
let state = operator.load_state(&mut txn, window_key).unwrap();
assert_eq!(layout.get_i64(&state, 0), (idx + 5) as i64);
}
}
#[test]
fn test_expire_all() {
let mut txn = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
let operator = TestOperator::simple(FlowNodeId(1));
let window_keys: Vec<_> = (0..5).map(|i| test_window_key(i)).collect();
let layout = operator.layout();
for (i, window_key) in window_keys.iter().enumerate() {
let mut state = operator.create_state();
layout.set_i64(&mut state, 0, i as i64);
operator.save_state(&mut txn, window_key, state).unwrap();
}
let before_key = test_window_key(100);
let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
let expired = operator.expire_range(&mut txn, range).unwrap();
assert_eq!(expired, 5);
for window_key in &window_keys {
let state = operator.load_state(&mut txn, window_key).unwrap();
assert_eq!(layout.get_i64(&state, 0), 0); }
}
#[test]
fn test_sliding_window_simulation() {
let mut txn = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
let operator = TestOperator::new(FlowNodeId(1));
let window_size = 3;
let mut all_window_keys = Vec::new();
let layout = operator.layout();
for current_window in 0..10 {
let window_key = test_window_key(current_window);
all_window_keys.push(window_key.clone());
let mut state = operator.create_state();
layout.set_i64(&mut state, 0, current_window as i64);
operator.save_state(&mut txn, &window_key, state).unwrap();
if current_window >= window_size {
let expire_before = current_window - window_size + 1;
let before_key = test_window_key(expire_before);
let range = EncodedKeyRange::new(Excluded(before_key), Unbounded);
operator.expire_range(&mut txn, range).unwrap();
}
}
for i in 0..7 {
let state = operator.load_state(&mut txn, &all_window_keys[i]).unwrap();
assert_eq!(layout.get_i64(&state, 0), 0); }
for i in 7..10 {
let state = operator.load_state(&mut txn, &all_window_keys[i]).unwrap();
assert_eq!(layout.get_i64(&state, 0), i as i64); }
}
}