reifydb_flow_operator_sdk/stateful/
mod.rs

1//! State management utilities for operators
2
3pub mod keyed;
4mod raw;
5pub mod row;
6pub mod single;
7pub mod utils;
8pub mod window;
9
10// Re-export traits
11pub use keyed::FFIKeyedStateful;
12use reifydb_core::value::encoded::{EncodedKey, EncodedValues};
13pub use row::RowNumberProvider;
14pub use single::FFISingleStateful;
15pub use utils::*;
16pub use window::FFIWindowStateful;
17
18use crate::{FFIOperator, context::OperatorContext, error::Result};
19
20/// State manager providing state operations with EncodedKey and EncodedValues
21pub struct State<'a> {
22	ctx: &'a mut OperatorContext,
23}
24
25impl<'a> State<'a> {
26	/// Create a new state manager
27	pub(crate) fn new(ctx: &'a mut OperatorContext) -> Self {
28		Self {
29			ctx,
30		}
31	}
32
33	/// Get a value from state by key
34	pub fn get(&self, key: &EncodedKey) -> Result<Option<EncodedValues>> {
35		raw::raw_state_get(self.ctx, key)
36	}
37
38	/// Set a value in state by key
39	pub fn set(&mut self, key: &EncodedKey, value: &EncodedValues) -> Result<()> {
40		raw::raw_state_set(self.ctx, key, value)
41	}
42
43	/// Remove a value from state by key
44	pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
45		raw::raw_state_remove(self.ctx, key)
46	}
47
48	/// Check if a key exists in state
49	pub fn contains(&self, key: &EncodedKey) -> Result<bool> {
50		Ok(raw::raw_state_get(self.ctx, key)?.is_some())
51	}
52
53	/// Clear all state for this operator
54	pub fn clear(&mut self) -> Result<()> {
55		raw::raw_state_clear(self.ctx)
56	}
57
58	/// Scan state entries with a given key prefix
59	pub fn scan_prefix(&self, prefix: &EncodedKey) -> Result<Vec<(EncodedKey, EncodedValues)>> {
60		raw::raw_state_prefix(self.ctx, prefix)
61	}
62
63	/// Get all keys with a given prefix
64	pub fn keys_with_prefix(&self, prefix: &EncodedKey) -> Result<Vec<EncodedKey>> {
65		let entries = self.scan_prefix(prefix)?;
66		Ok(entries.into_iter().map(|(k, _)| k).collect())
67	}
68}
69
70/// Raw Stateful operations for FFI operators
71///
72/// This trait provides low-level key-value state operations for FFI operators.
73/// It mirrors the internal `RawStatefulOperator` trait but works through the FFI boundary.
74///
75/// # Example
76///
77/// ```ignore
78/// impl FFIRawStatefulOperator for MyOperator {}
79///
80/// // In your operator implementation:
81/// fn apply(&mut self, ctx: &mut OperatorContext, input: FlowChange) -> Result<FlowChange> {
82///     let key = EncodedKey::new(b"counter".to_vec());
83///     let value = self.state_get(ctx, &key)?;
84///     // ... use state
85/// }
86/// ```
87pub trait FFIRawStatefulOperator: FFIOperator {
88	/// Get raw bytes for a key
89	fn state_get(&self, ctx: &mut OperatorContext, key: &EncodedKey) -> Result<Option<EncodedValues>> {
90		ctx.state().get(key)
91	}
92
93	/// Set raw bytes for a key
94	fn state_set(&self, ctx: &mut OperatorContext, key: &EncodedKey, value: &EncodedValues) -> Result<()> {
95		ctx.state().set(key, value)
96	}
97
98	/// Remove a key
99	fn state_remove(&self, ctx: &mut OperatorContext, key: &EncodedKey) -> Result<()> {
100		ctx.state().remove(key)
101	}
102
103	/// Scan all keys with a prefix
104	fn state_scan_prefix(
105		&self,
106		ctx: &mut OperatorContext,
107		prefix: &EncodedKey,
108	) -> Result<Vec<(EncodedKey, EncodedValues)>> {
109		ctx.state().scan_prefix(prefix)
110	}
111
112	/// Get all keys with a prefix
113	fn state_keys_with_prefix(&self, ctx: &mut OperatorContext, prefix: &EncodedKey) -> Result<Vec<EncodedKey>> {
114		ctx.state().keys_with_prefix(prefix)
115	}
116
117	/// Check if a key exists
118	fn state_contains(&self, ctx: &mut OperatorContext, key: &EncodedKey) -> Result<bool> {
119		ctx.state().contains(key)
120	}
121
122	/// Clear all state for this operator
123	fn state_clear(&self, ctx: &mut OperatorContext) -> Result<()> {
124		ctx.state().clear()
125	}
126}