Skip to main content

reifydb_sdk/state/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4pub mod cache;
5pub mod ffi;
6pub mod keyed;
7pub mod row;
8pub mod single;
9pub mod utils;
10pub mod window;
11
12use std::ops::Bound;
13
14use postcard::{from_bytes, to_allocvec};
15use reifydb_core::encoded::{key::EncodedKey, row::EncodedRow, shape::RowShape};
16use reifydb_type::value::blob::Blob;
17use serde::{Serialize, de::DeserializeOwned};
18
19use crate::{
20	error::{FFIError, Result},
21	operator::{FFIOperator, context::OperatorContext},
22};
23
24pub struct StateEntry<T> {
25	pub value: T,
26	pub created_at_nanos: u64,
27	pub updated_at_nanos: u64,
28}
29
30pub struct State<'a> {
31	ctx: &'a mut OperatorContext,
32}
33
34impl<'a> State<'a> {
35	pub(crate) fn new(ctx: &'a mut OperatorContext) -> Self {
36		Self {
37			ctx,
38		}
39	}
40
41	pub fn get<T: DeserializeOwned>(&self, key: &EncodedKey) -> Result<Option<T>> {
42		match ffi::get(self.ctx, key)? {
43			Some(row) => decode_payload(&row).map(Some),
44			None => Ok(None),
45		}
46	}
47
48	pub fn set<T: Serialize>(&mut self, key: &EncodedKey, value: &T) -> Result<()> {
49		let row = encode_payload(value, self.now_nanos())?;
50		ffi::set(self.ctx, key, &row)
51	}
52
53	pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
54		ffi::remove(self.ctx, key)
55	}
56
57	pub fn contains(&self, key: &EncodedKey) -> Result<bool> {
58		Ok(ffi::get(self.ctx, key)?.is_some())
59	}
60
61	pub fn clear(&mut self) -> Result<()> {
62		ffi::clear(self.ctx)
63	}
64
65	pub fn scan_prefix<T: DeserializeOwned>(&self, prefix: &EncodedKey) -> Result<Vec<(EncodedKey, T)>> {
66		ffi::prefix(self.ctx, prefix)?.into_iter().map(|(k, row)| Ok((k, decode_payload(&row)?))).collect()
67	}
68
69	pub fn keys_with_prefix(&self, prefix: &EncodedKey) -> Result<Vec<EncodedKey>> {
70		Ok(ffi::prefix(self.ctx, prefix)?.into_iter().map(|(k, _)| k).collect())
71	}
72
73	pub fn range<T: DeserializeOwned>(
74		&self,
75		start: Bound<&EncodedKey>,
76		end: Bound<&EncodedKey>,
77	) -> Result<Vec<(EncodedKey, T)>> {
78		ffi::range(self.ctx, start, end)?.into_iter().map(|(k, row)| Ok((k, decode_payload(&row)?))).collect()
79	}
80
81	pub fn get_with_anchors<T: DeserializeOwned>(&self, key: &EncodedKey) -> Result<Option<StateEntry<T>>> {
82		match ffi::get(self.ctx, key)? {
83			Some(row) => Ok(Some(StateEntry {
84				created_at_nanos: row.created_at_nanos(),
85				updated_at_nanos: row.updated_at_nanos(),
86				value: decode_payload(&row)?,
87			})),
88			None => Ok(None),
89		}
90	}
91
92	#[inline]
93	fn now_nanos(&self) -> u64 {
94		unsafe { (*self.ctx.ctx).clock_now_nanos }
95	}
96}
97
98/// Operator-internal sequence-and-mapping state, stored under
99/// `FlowNodeInternalStateKey` instead of `FlowNodeStateKey`. Use this for
100/// state that must outlive operator TTL GC (e.g. `RowNumberProvider`'s
101/// monotonic counter and `EncodedKey -> RowNumber` mappings).
102///
103/// The host wraps each user-supplied key in
104/// `FlowNodeInternalStateKey(operator_id, ...)` so callers pass only the
105/// inner-tag bytes.
106pub struct InternalState<'a> {
107	ctx: &'a mut OperatorContext,
108}
109
110impl<'a> InternalState<'a> {
111	pub(crate) fn new(ctx: &'a mut OperatorContext) -> Self {
112		Self {
113			ctx,
114		}
115	}
116
117	pub fn get<T: DeserializeOwned>(&self, key: &EncodedKey) -> Result<Option<T>> {
118		match ffi::internal_get(self.ctx, key)? {
119			Some(row) => decode_payload(&row).map(Some),
120			None => Ok(None),
121		}
122	}
123
124	pub fn set<T: Serialize>(&mut self, key: &EncodedKey, value: &T) -> Result<()> {
125		let row = encode_payload(value, self.now_nanos())?;
126		ffi::internal_set(self.ctx, key, &row)
127	}
128
129	pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
130		ffi::internal_remove(self.ctx, key)
131	}
132
133	pub fn contains(&self, key: &EncodedKey) -> Result<bool> {
134		Ok(ffi::internal_get(self.ctx, key)?.is_some())
135	}
136
137	pub fn scan_prefix<T: DeserializeOwned>(&self, prefix: &EncodedKey) -> Result<Vec<(EncodedKey, T)>> {
138		ffi::internal_prefix(self.ctx, prefix)?
139			.into_iter()
140			.map(|(k, row)| Ok((k, decode_payload(&row)?)))
141			.collect()
142	}
143
144	pub fn keys_with_prefix(&self, prefix: &EncodedKey) -> Result<Vec<EncodedKey>> {
145		Ok(ffi::internal_prefix(self.ctx, prefix)?.into_iter().map(|(k, _)| k).collect())
146	}
147
148	#[inline]
149	fn now_nanos(&self) -> u64 {
150		unsafe { (*self.ctx.ctx).clock_now_nanos }
151	}
152}
153
154#[inline]
155fn encode_payload<T: Serialize>(value: &T, now_nanos: u64) -> Result<EncodedRow> {
156	let bytes = to_allocvec(value)
157		.map_err(|e| FFIError::Serialization(format!("operator state serialization failed: {}", e)))?;
158	let shape = RowShape::operator_state();
159	let mut row = shape.allocate();
160	shape.set_blob(&mut row, 0, &Blob::new(bytes));
161	row.set_timestamps(now_nanos, now_nanos);
162	Ok(row)
163}
164
165#[inline]
166fn decode_payload<T: DeserializeOwned>(row: &EncodedRow) -> Result<T> {
167	let shape = RowShape::operator_state();
168	let blob = shape.get_blob(row, 0);
169	from_bytes(blob.as_bytes())
170		.map_err(|e| FFIError::Serialization(format!("operator state deserialization failed: {}", e)))
171}
172
173pub trait FFIRawStatefulOperator: FFIOperator {
174	fn state_get<T: DeserializeOwned>(&self, ctx: &mut OperatorContext, key: &EncodedKey) -> Result<Option<T>> {
175		ctx.state().get(key)
176	}
177
178	fn state_set<T: Serialize>(&self, ctx: &mut OperatorContext, key: &EncodedKey, value: &T) -> Result<()> {
179		ctx.state().set(key, value)
180	}
181
182	fn state_remove(&self, ctx: &mut OperatorContext, key: &EncodedKey) -> Result<()> {
183		ctx.state().remove(key)
184	}
185
186	fn state_scan_prefix<T: DeserializeOwned>(
187		&self,
188		ctx: &mut OperatorContext,
189		prefix: &EncodedKey,
190	) -> Result<Vec<(EncodedKey, T)>> {
191		ctx.state().scan_prefix(prefix)
192	}
193
194	fn state_keys_with_prefix(&self, ctx: &mut OperatorContext, prefix: &EncodedKey) -> Result<Vec<EncodedKey>> {
195		ctx.state().keys_with_prefix(prefix)
196	}
197
198	fn state_contains(&self, ctx: &mut OperatorContext, key: &EncodedKey) -> Result<bool> {
199		ctx.state().contains(key)
200	}
201
202	fn state_clear(&self, ctx: &mut OperatorContext) -> Result<()> {
203		ctx.state().clear()
204	}
205
206	fn state_scan_range<T: DeserializeOwned>(
207		&self,
208		ctx: &mut OperatorContext,
209		start: Bound<&EncodedKey>,
210		end: Bound<&EncodedKey>,
211	) -> Result<Vec<(EncodedKey, T)>> {
212		ctx.state().range(start, end)
213	}
214
215	// `internal_state_*` mirrors the regular `state_*` surface but routes
216	// through `ctx.internal_state()`, which lives in
217	// `FlowNodeInternalStateKey` (outside operator TTL GC). Use for
218	// monotonic sequences, identity bindings, and watermarks.
219
220	fn internal_state_get<T: DeserializeOwned>(
221		&self,
222		ctx: &mut OperatorContext,
223		key: &EncodedKey,
224	) -> Result<Option<T>> {
225		ctx.internal_state().get(key)
226	}
227
228	fn internal_state_set<T: Serialize>(
229		&self,
230		ctx: &mut OperatorContext,
231		key: &EncodedKey,
232		value: &T,
233	) -> Result<()> {
234		ctx.internal_state().set(key, value)
235	}
236
237	fn internal_state_remove(&self, ctx: &mut OperatorContext, key: &EncodedKey) -> Result<()> {
238		ctx.internal_state().remove(key)
239	}
240
241	fn internal_state_scan_prefix<T: DeserializeOwned>(
242		&self,
243		ctx: &mut OperatorContext,
244		prefix: &EncodedKey,
245	) -> Result<Vec<(EncodedKey, T)>> {
246		ctx.internal_state().scan_prefix(prefix)
247	}
248
249	fn internal_state_keys_with_prefix(
250		&self,
251		ctx: &mut OperatorContext,
252		prefix: &EncodedKey,
253	) -> Result<Vec<EncodedKey>> {
254		ctx.internal_state().keys_with_prefix(prefix)
255	}
256
257	fn internal_state_contains(&self, ctx: &mut OperatorContext, key: &EncodedKey) -> Result<bool> {
258		ctx.internal_state().contains(key)
259	}
260}