use reifydb_core::{
encoded::{
key::{EncodedKey, EncodedKeyRange},
row::EncodedRow,
shape::RowShape,
},
interface::catalog::flow::FlowNodeId,
key::{EncodableKey, flow_node_internal_state::FlowNodeInternalStateKey, flow_node_state::FlowNodeStateKey},
};
use reifydb_type::Result;
use super::StateIterator;
use crate::transaction::FlowTransaction;
pub fn state_get(id: FlowNodeId, txn: &mut FlowTransaction, key: &EncodedKey) -> Result<Option<EncodedRow>> {
let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
let encoded_key = state_key.encode();
match txn.get(&encoded_key)? {
Some(multi) => Ok(Some(multi)),
None => Ok(None),
}
}
pub fn state_set(id: FlowNodeId, txn: &mut FlowTransaction, key: &EncodedKey, value: EncodedRow) -> Result<()> {
let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
let encoded_key = state_key.encode();
txn.set(&encoded_key, value)?;
Ok(())
}
pub fn state_remove(id: FlowNodeId, txn: &mut FlowTransaction, key: &EncodedKey) -> Result<()> {
let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
let encoded_key = state_key.encode();
txn.remove(&encoded_key)?;
Ok(())
}
pub fn internal_state_get(id: FlowNodeId, txn: &mut FlowTransaction, key: &EncodedKey) -> Result<Option<EncodedRow>> {
let state_key = FlowNodeInternalStateKey::new(id, key.as_ref().to_vec());
let encoded_key = state_key.encode();
match txn.get(&encoded_key)? {
Some(multi) => Ok(Some(multi)),
None => Ok(None),
}
}
pub fn internal_state_set(
id: FlowNodeId,
txn: &mut FlowTransaction,
key: &EncodedKey,
value: EncodedRow,
) -> Result<()> {
let state_key = FlowNodeInternalStateKey::new(id, key.as_ref().to_vec());
let encoded_key = state_key.encode();
txn.set(&encoded_key, value)?;
Ok(())
}
pub fn internal_state_remove(id: FlowNodeId, txn: &mut FlowTransaction, key: &EncodedKey) -> Result<()> {
let state_key = FlowNodeInternalStateKey::new(id, key.as_ref().to_vec());
let encoded_key = state_key.encode();
txn.remove(&encoded_key)?;
Ok(())
}
pub fn state_scan(id: FlowNodeId, txn: &mut FlowTransaction) -> Result<StateIterator> {
let range = FlowNodeStateKey::node_range(id);
let stream = txn.range(range, 1024);
let mut items = Vec::new();
for result in stream {
let multi = result?;
if let Some(state_key) = FlowNodeStateKey::decode(&multi.key) {
items.push((EncodedKey::new(state_key.key), multi.row));
} else {
items.push((multi.key, multi.row));
}
}
Ok(StateIterator::from_items(items))
}
pub fn state_range(id: FlowNodeId, txn: &mut FlowTransaction, range: EncodedKeyRange) -> Result<StateIterator> {
let prefixed_range = range.with_prefix(FlowNodeStateKey::encoded(id, vec![]));
let stream = txn.range(prefixed_range, 1024);
let mut items = Vec::new();
for result in stream {
let multi = result?;
if let Some(state_key) = FlowNodeStateKey::decode(&multi.key) {
items.push((EncodedKey::new(state_key.key), multi.row));
} else {
items.push((multi.key, multi.row));
}
}
Ok(StateIterator::from_items(items))
}
pub fn state_clear(id: FlowNodeId, txn: &mut FlowTransaction) -> Result<()> {
let range = FlowNodeStateKey::node_range(id);
let keys_to_remove = {
let stream = txn.range(range, 1024);
let mut keys = Vec::new();
for result in stream {
let multi = result?;
keys.push(multi.key);
}
keys
};
for key in keys_to_remove {
txn.remove(&key)?;
}
Ok(())
}
pub fn load_or_create_row(
id: FlowNodeId,
txn: &mut FlowTransaction,
key: &EncodedKey,
shape: &RowShape,
) -> Result<EncodedRow> {
match state_get(id, txn, key)? {
Some(row) => Ok(row),
None => Ok(shape.allocate()),
}
}
pub fn save_row(id: FlowNodeId, txn: &mut FlowTransaction, key: &EncodedKey, row: EncodedRow) -> Result<()> {
state_set(id, txn, key, row)
}
pub fn empty_key() -> EncodedKey {
EncodedKey::new(Vec::new())
}
#[cfg(test)]
pub mod tests {
use std::ops::Bound::{Excluded, Included, Unbounded};
use reifydb_catalog::catalog::Catalog;
use reifydb_core::common::CommitVersion;
use reifydb_runtime::context::clock::{Clock, MockClock};
use reifydb_transaction::interceptor::interceptors::Interceptors;
use reifydb_type::{util::cowvec::CowVec, value::r#type::Type};
use super::*;
use crate::{operator::stateful::test_utils::test::*, transaction::FlowTransaction};
#[test]
fn test_state_get_existing() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let node_id = FlowNodeId(1);
let key = test_key("get");
let value = test_row();
state_set(node_id, &mut txn, &key, value.clone()).unwrap();
let result = state_get(node_id, &mut txn, &key).unwrap();
assert!(result.is_some());
assert_row_eq(&result.unwrap(), &value);
}
#[test]
fn test_state_get_non_existing() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let node_id = FlowNodeId(1);
let key = test_key("nonexistent");
let result = state_get(node_id, &mut txn, &key).unwrap();
assert!(result.is_none());
}
#[test]
fn test_state_set_and_update() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let node_id = FlowNodeId(1);
let key = test_key("set");
let value1 = EncodedRow(CowVec::new(vec![1, 2, 3]));
let value2 = EncodedRow(CowVec::new(vec![4, 5, 6]));
state_set(node_id, &mut txn, &key, value1.clone()).unwrap();
let result = state_get(node_id, &mut txn, &key).unwrap().unwrap();
assert_row_eq(&result, &value1);
state_set(node_id, &mut txn, &key, value2.clone()).unwrap();
let result = state_get(node_id, &mut txn, &key).unwrap().unwrap();
assert_row_eq(&result, &value2);
}
#[test]
fn test_state_remove() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let node_id = FlowNodeId(1);
let key = test_key("remove");
let value = test_row();
state_set(node_id, &mut txn, &key, value.clone()).unwrap();
assert!(state_get(node_id, &mut txn, &key).unwrap().is_some());
state_remove(node_id, &mut txn, &key).unwrap();
assert!(state_get(node_id, &mut txn, &key).unwrap().is_none());
}
#[test]
fn test_state_scan() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let node_id = FlowNodeId(1);
for i in 0..5 {
let key = test_key(&format!("scan_{:02}", i)); let value = EncodedRow(CowVec::new(vec![i as u8]));
state_set(node_id, &mut txn, &key, value).unwrap();
}
let entries: Vec<_> = state_scan(node_id, &mut txn).unwrap().collect();
assert_eq!(entries.len(), 5);
for i in 0..5 {
assert_eq!(entries[i].1.as_slice()[0], i as u8);
}
}
#[test]
fn test_state_range() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let node_id = FlowNodeId(1);
let keys = vec!["a", "b", "c", "d", "e"];
for key_suffix in &keys {
let key = test_key(key_suffix);
let value = test_row();
state_set(node_id, &mut txn, &key, value).unwrap();
}
let range = EncodedKeyRange::new(Included(test_key("b")), Excluded(test_key("d")));
let entries: Vec<_> = state_range(node_id, &mut txn, range).unwrap().collect();
assert_eq!(entries.len(), 2);
}
#[test]
fn test_state_range_open_ended() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let node_id = FlowNodeId(1);
for i in 0..5 {
let key = test_key(&format!("range_{}", i));
let value = test_row();
state_set(node_id, &mut txn, &key, value).unwrap();
}
let entries = {
let range = EncodedKeyRange::new(Unbounded, Excluded(test_key("range_3")));
let prefixed_range = range.with_prefix(FlowNodeStateKey::encoded(node_id, vec![]));
let mut stream = txn.range(prefixed_range, 1024);
let mut entries = Vec::new();
while let Some(result) = stream.next() {
entries.push(result.unwrap());
}
entries
};
assert_eq!(entries.len(), 3);
let entries = {
let range = EncodedKeyRange::new(Included(test_key("range_3")), Unbounded);
let prefixed_range = range.with_prefix(FlowNodeStateKey::encoded(node_id, vec![]));
let mut stream = txn.range(prefixed_range, 1024);
let mut entries = Vec::new();
while let Some(result) = stream.next() {
entries.push(result.unwrap());
}
entries
};
assert_eq!(entries.len(), 2); }
#[test]
fn test_state_clear() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let node_id = FlowNodeId(1);
for i in 0..3 {
let key = test_key(&format!("clear_{}", i));
let value = test_row();
state_set(node_id, &mut txn, &key, value).unwrap();
}
let count = {
let range = FlowNodeStateKey::node_range(node_id);
let mut stream = txn.range(range, 1024);
let mut count = 0;
while let Some(result) = stream.next() {
let _ = result.unwrap();
count += 1;
}
count
};
assert_eq!(count, 3);
state_clear(node_id, &mut txn).unwrap();
let count = {
let range = FlowNodeStateKey::node_range(node_id);
let mut stream = txn.range(range, 1024);
let mut count = 0;
while let Some(result) = stream.next() {
let _ = result.unwrap();
count += 1;
}
count
};
assert_eq!(count, 0);
}
#[test]
fn test_load_or_create_row_existing() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let node_id = FlowNodeId(1);
let key = test_key("load_existing");
let value = test_row();
let layout = TestOperator::simple(node_id).layout;
state_set(node_id, &mut txn, &key, value.clone()).unwrap();
let result = load_or_create_row(node_id, &mut txn, &key, &layout).unwrap();
assert_row_eq(&result, &value);
}
#[test]
fn test_load_or_create_row_new() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let node_id = FlowNodeId(1);
let key = test_key("load_new");
let shape = RowShape::testing(&[Type::Int4]);
let result = load_or_create_row(node_id, &mut txn, &key, &shape).unwrap();
assert!(result.len() > 0);
}
#[test]
fn test_save_row() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let node_id = FlowNodeId(1);
let key = test_key("save");
let value = test_row();
save_row(node_id, &mut txn, &key, value.clone()).unwrap();
let result = state_get(node_id, &mut txn, &key).unwrap();
assert!(result.is_some());
assert_row_eq(&result.unwrap(), &value);
}
#[test]
fn test_empty_key() {
let key = empty_key();
assert_eq!(key.len(), 0);
assert!(key.as_ref().is_empty());
}
#[test]
fn test_multiple_nodes_isolation() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let node1 = FlowNodeId(1);
let node2 = FlowNodeId(2);
let key = test_key("shared");
let value1 = EncodedRow(CowVec::new(vec![1]));
let value2 = EncodedRow(CowVec::new(vec![2]));
state_set(node1, &mut txn, &key, value1.clone()).unwrap();
state_set(node2, &mut txn, &key, value2.clone()).unwrap();
let result1 = state_get(node1, &mut txn, &key).unwrap().unwrap();
let result2 = state_get(node2, &mut txn, &key).unwrap().unwrap();
assert_row_eq(&result1, &value1);
assert_row_eq(&result2, &value2);
state_clear(node1, &mut txn).unwrap();
assert!(state_get(node1, &mut txn, &key).unwrap().is_none());
assert!(state_get(node2, &mut txn, &key).unwrap().is_some());
}
#[test]
fn test_large_values() {
let mut txn = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&mut txn,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let node_id = FlowNodeId(1);
let key = test_key("large");
let large_value = EncodedRow(CowVec::new(vec![0xAB; 10240]));
state_set(node_id, &mut txn, &key, large_value.clone()).unwrap();
let result = state_get(node_id, &mut txn, &key).unwrap().unwrap();
assert_row_eq(&result, &large_value);
}
}