1pub mod cache;
5pub mod ffi;
6pub mod keyed;
7pub mod row;
8pub mod single;
9pub mod utils;
10pub mod window;
11
12use std::ops::Bound;
13
14use postcard::{from_bytes, to_allocvec};
15use reifydb_core::encoded::{key::EncodedKey, row::EncodedRow, shape::RowShape};
16use reifydb_type::value::blob::Blob;
17use serde::{Serialize, de::DeserializeOwned};
18
19use crate::{
20 error::{FFIError, Result},
21 operator::{FFIOperator, context::OperatorContext},
22};
23
24pub struct StateEntry<T> {
25 pub value: T,
26 pub created_at_nanos: u64,
27 pub updated_at_nanos: u64,
28}
29
30pub struct State<'a> {
31 ctx: &'a mut OperatorContext,
32}
33
34impl<'a> State<'a> {
35 pub(crate) fn new(ctx: &'a mut OperatorContext) -> Self {
36 Self {
37 ctx,
38 }
39 }
40
41 pub fn get<T: DeserializeOwned>(&self, key: &EncodedKey) -> Result<Option<T>> {
42 match ffi::get(self.ctx, key)? {
43 Some(row) => decode_payload(&row).map(Some),
44 None => Ok(None),
45 }
46 }
47
48 pub fn set<T: Serialize>(&mut self, key: &EncodedKey, value: &T) -> Result<()> {
49 let row = encode_payload(value, self.now_nanos())?;
50 ffi::set(self.ctx, key, &row)
51 }
52
53 pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
54 ffi::remove(self.ctx, key)
55 }
56
57 pub fn contains(&self, key: &EncodedKey) -> Result<bool> {
58 Ok(ffi::get(self.ctx, key)?.is_some())
59 }
60
61 pub fn clear(&mut self) -> Result<()> {
62 ffi::clear(self.ctx)
63 }
64
65 pub fn scan_prefix<T: DeserializeOwned>(&self, prefix: &EncodedKey) -> Result<Vec<(EncodedKey, T)>> {
66 ffi::prefix(self.ctx, prefix)?.into_iter().map(|(k, row)| Ok((k, decode_payload(&row)?))).collect()
67 }
68
69 pub fn keys_with_prefix(&self, prefix: &EncodedKey) -> Result<Vec<EncodedKey>> {
70 Ok(ffi::prefix(self.ctx, prefix)?.into_iter().map(|(k, _)| k).collect())
71 }
72
73 pub fn range<T: DeserializeOwned>(
74 &self,
75 start: Bound<&EncodedKey>,
76 end: Bound<&EncodedKey>,
77 ) -> Result<Vec<(EncodedKey, T)>> {
78 ffi::range(self.ctx, start, end)?.into_iter().map(|(k, row)| Ok((k, decode_payload(&row)?))).collect()
79 }
80
81 pub fn get_with_anchors<T: DeserializeOwned>(&self, key: &EncodedKey) -> Result<Option<StateEntry<T>>> {
82 match ffi::get(self.ctx, key)? {
83 Some(row) => Ok(Some(StateEntry {
84 created_at_nanos: row.created_at_nanos(),
85 updated_at_nanos: row.updated_at_nanos(),
86 value: decode_payload(&row)?,
87 })),
88 None => Ok(None),
89 }
90 }
91
92 #[inline]
93 fn now_nanos(&self) -> u64 {
94 unsafe { (*self.ctx.ctx).clock_now_nanos }
95 }
96}
97
98pub struct InternalState<'a> {
107 ctx: &'a mut OperatorContext,
108}
109
110impl<'a> InternalState<'a> {
111 pub(crate) fn new(ctx: &'a mut OperatorContext) -> Self {
112 Self {
113 ctx,
114 }
115 }
116
117 pub fn get<T: DeserializeOwned>(&self, key: &EncodedKey) -> Result<Option<T>> {
118 match ffi::internal_get(self.ctx, key)? {
119 Some(row) => decode_payload(&row).map(Some),
120 None => Ok(None),
121 }
122 }
123
124 pub fn set<T: Serialize>(&mut self, key: &EncodedKey, value: &T) -> Result<()> {
125 let row = encode_payload(value, self.now_nanos())?;
126 ffi::internal_set(self.ctx, key, &row)
127 }
128
129 pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
130 ffi::internal_remove(self.ctx, key)
131 }
132
133 pub fn contains(&self, key: &EncodedKey) -> Result<bool> {
134 Ok(ffi::internal_get(self.ctx, key)?.is_some())
135 }
136
137 pub fn scan_prefix<T: DeserializeOwned>(&self, prefix: &EncodedKey) -> Result<Vec<(EncodedKey, T)>> {
138 ffi::internal_prefix(self.ctx, prefix)?
139 .into_iter()
140 .map(|(k, row)| Ok((k, decode_payload(&row)?)))
141 .collect()
142 }
143
144 pub fn keys_with_prefix(&self, prefix: &EncodedKey) -> Result<Vec<EncodedKey>> {
145 Ok(ffi::internal_prefix(self.ctx, prefix)?.into_iter().map(|(k, _)| k).collect())
146 }
147
148 #[inline]
149 fn now_nanos(&self) -> u64 {
150 unsafe { (*self.ctx.ctx).clock_now_nanos }
151 }
152}
153
154#[inline]
155fn encode_payload<T: Serialize>(value: &T, now_nanos: u64) -> Result<EncodedRow> {
156 let bytes = to_allocvec(value)
157 .map_err(|e| FFIError::Serialization(format!("operator state serialization failed: {}", e)))?;
158 let shape = RowShape::operator_state();
159 let mut row = shape.allocate();
160 shape.set_blob(&mut row, 0, &Blob::new(bytes));
161 row.set_timestamps(now_nanos, now_nanos);
162 Ok(row)
163}
164
165#[inline]
166fn decode_payload<T: DeserializeOwned>(row: &EncodedRow) -> Result<T> {
167 let shape = RowShape::operator_state();
168 let blob = shape.get_blob(row, 0);
169 from_bytes(blob.as_bytes())
170 .map_err(|e| FFIError::Serialization(format!("operator state deserialization failed: {}", e)))
171}
172
173pub trait FFIRawStatefulOperator: FFIOperator {
174 fn state_get<T: DeserializeOwned>(&self, ctx: &mut OperatorContext, key: &EncodedKey) -> Result<Option<T>> {
175 ctx.state().get(key)
176 }
177
178 fn state_set<T: Serialize>(&self, ctx: &mut OperatorContext, key: &EncodedKey, value: &T) -> Result<()> {
179 ctx.state().set(key, value)
180 }
181
182 fn state_remove(&self, ctx: &mut OperatorContext, key: &EncodedKey) -> Result<()> {
183 ctx.state().remove(key)
184 }
185
186 fn state_scan_prefix<T: DeserializeOwned>(
187 &self,
188 ctx: &mut OperatorContext,
189 prefix: &EncodedKey,
190 ) -> Result<Vec<(EncodedKey, T)>> {
191 ctx.state().scan_prefix(prefix)
192 }
193
194 fn state_keys_with_prefix(&self, ctx: &mut OperatorContext, prefix: &EncodedKey) -> Result<Vec<EncodedKey>> {
195 ctx.state().keys_with_prefix(prefix)
196 }
197
198 fn state_contains(&self, ctx: &mut OperatorContext, key: &EncodedKey) -> Result<bool> {
199 ctx.state().contains(key)
200 }
201
202 fn state_clear(&self, ctx: &mut OperatorContext) -> Result<()> {
203 ctx.state().clear()
204 }
205
206 fn state_scan_range<T: DeserializeOwned>(
207 &self,
208 ctx: &mut OperatorContext,
209 start: Bound<&EncodedKey>,
210 end: Bound<&EncodedKey>,
211 ) -> Result<Vec<(EncodedKey, T)>> {
212 ctx.state().range(start, end)
213 }
214
215 fn internal_state_get<T: DeserializeOwned>(
221 &self,
222 ctx: &mut OperatorContext,
223 key: &EncodedKey,
224 ) -> Result<Option<T>> {
225 ctx.internal_state().get(key)
226 }
227
228 fn internal_state_set<T: Serialize>(
229 &self,
230 ctx: &mut OperatorContext,
231 key: &EncodedKey,
232 value: &T,
233 ) -> Result<()> {
234 ctx.internal_state().set(key, value)
235 }
236
237 fn internal_state_remove(&self, ctx: &mut OperatorContext, key: &EncodedKey) -> Result<()> {
238 ctx.internal_state().remove(key)
239 }
240
241 fn internal_state_scan_prefix<T: DeserializeOwned>(
242 &self,
243 ctx: &mut OperatorContext,
244 prefix: &EncodedKey,
245 ) -> Result<Vec<(EncodedKey, T)>> {
246 ctx.internal_state().scan_prefix(prefix)
247 }
248
249 fn internal_state_keys_with_prefix(
250 &self,
251 ctx: &mut OperatorContext,
252 prefix: &EncodedKey,
253 ) -> Result<Vec<EncodedKey>> {
254 ctx.internal_state().keys_with_prefix(prefix)
255 }
256
257 fn internal_state_contains(&self, ctx: &mut OperatorContext, key: &EncodedKey) -> Result<bool> {
258 ctx.internal_state().contains(key)
259 }
260}