use reifydb_core::encoded::{key::EncodedKey, row::EncodedRow, schema::RowSchema};
use reifydb_type::Result;
use super::utils;
use crate::{operator::stateful::raw::RawStatefulOperator, transaction::FlowTransaction};
pub trait SingleStateful: RawStatefulOperator {
fn layout(&self) -> RowSchema;
fn key(&self) -> EncodedKey {
utils::empty_key()
}
fn create_state(&self) -> EncodedRow {
let layout = self.layout();
layout.allocate()
}
fn load_state(&self, txn: &mut FlowTransaction) -> Result<EncodedRow> {
let key = self.key();
utils::load_or_create_row(self.id(), txn, &key, &self.layout())
}
fn save_state(&self, txn: &mut FlowTransaction, row: EncodedRow) -> Result<()> {
let key = self.key();
utils::save_row(self.id(), txn, &key, row)
}
fn update_state<F>(&self, txn: &mut FlowTransaction, f: F) -> Result<EncodedRow>
where
F: FnOnce(&RowSchema, &mut EncodedRow) -> Result<()>,
{
let schema = self.layout();
let mut row = self.load_state(txn)?;
f(&schema, &mut row)?;
self.save_state(txn, row.clone())?;
Ok(row)
}
fn clear_state(&self, txn: &mut FlowTransaction) -> Result<()> {
let key = self.key();
utils::state_remove(self.id(), txn, &key)
}
}
#[cfg(test)]
pub mod tests {
use reifydb_catalog::catalog::Catalog;
use reifydb_core::{common::CommitVersion, interface::catalog::flow::FlowNodeId};
use reifydb_transaction::interceptor::interceptors::Interceptors;
use super::*;
use crate::{operator::stateful::test_utils::test::*, transaction::FlowTransaction};
impl SingleStateful for TestOperator {
fn layout(&self) -> RowSchema {
self.layout.clone()
}
}
#[test]
fn testault_key() {
let operator = TestOperator::simple(FlowNodeId(1));
let key = operator.key();
assert_eq!(key.len(), 0);
}
#[test]
fn test_create_state() {
let operator = TestOperator::simple(FlowNodeId(1));
let state = operator.create_state();
assert!(state.len() > 0);
}
#[test]
fn test_load_save_state() {
let mut txn = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
let operator = TestOperator::simple(FlowNodeId(1));
let state1 = operator.load_state(&mut txn).unwrap();
let mut modified = state1.clone();
let layout = operator.layout();
layout.set_i64(&mut modified, 0, 0x33);
operator.save_state(&mut txn, modified.clone()).unwrap();
let state2 = operator.load_state(&mut txn).unwrap();
assert_eq!(layout.get_i64(&state2, 0), 0x33);
}
#[test]
fn test_update_state() {
let mut txn = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
let operator = TestOperator::simple(FlowNodeId(1));
let result = operator
.update_state(&mut txn, |schema, row| {
schema.set_i64(row, 0, 0x77);
Ok(())
})
.unwrap();
let layout = operator.layout();
assert_eq!(layout.get_i64(&result, 0), 0x77);
let loaded = operator.load_state(&mut txn).unwrap();
assert_eq!(layout.get_i64(&loaded, 0), 0x77);
}
#[test]
fn test_clear_state() {
let mut txn = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
let operator = TestOperator::simple(FlowNodeId(1));
operator.update_state(&mut txn, |schema, row| {
schema.set_i64(row, 0, 0x99);
Ok(())
})
.unwrap();
operator.clear_state(&mut txn).unwrap();
let new_state = operator.load_state(&mut txn).unwrap();
let layout = operator.layout();
assert_eq!(layout.get_i64(&new_state, 0), 0); }
#[test]
fn test_multiple_operators_isolated() {
let mut txn = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
let operator1 = TestOperator::simple(FlowNodeId(1));
let operator2 = TestOperator::simple(FlowNodeId(2));
operator1
.update_state(&mut txn, |schema, row| {
schema.set_i64(row, 0, 0x11);
Ok(())
})
.unwrap();
operator2
.update_state(&mut txn, |schema, row| {
schema.set_i64(row, 0, 0x22);
Ok(())
})
.unwrap();
let state1 = operator1.load_state(&mut txn).unwrap();
let state2 = operator2.load_state(&mut txn).unwrap();
let layout1 = operator1.layout();
let layout2 = operator2.layout();
assert_eq!(layout1.get_i64(&state1, 0), 0x11);
assert_eq!(layout2.get_i64(&state2, 0), 0x22);
}
#[test]
fn test_counter_simulation() {
let mut txn = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&mut txn, CommitVersion(1), Catalog::testing(), Interceptors::new());
let operator = TestOperator::new(FlowNodeId(1));
for i in 1..=5 {
operator.update_state(&mut txn, |schema, row| {
let current = schema.get_i64(row, 0);
schema.set_i64(row, 0, current + 1);
Ok(())
})
.unwrap();
let state = operator.load_state(&mut txn).unwrap();
let layout = operator.layout();
assert_eq!(layout.get_i64(&state, 0), i);
}
}
}