reifydb_flow_operator_sdk/
store.rs1use std::{ops::Bound, ptr::null_mut, slice::from_raw_parts};
7
8use reifydb_core::{
9 CowVec,
10 value::encoded::{EncodedKey, EncodedValues},
11};
12use reifydb_flow_operator_abi::{BufferFFI, FFI_END_OF_ITERATION, FFI_NOT_FOUND, FFI_OK, StoreIteratorFFI};
13use tracing::instrument;
14
15use crate::{
16 context::OperatorContext,
17 error::{FFIError, Result},
18};
19
20pub struct Store<'a> {
22 ctx: &'a mut OperatorContext,
23}
24
25impl<'a> Store<'a> {
26 pub(crate) fn new(ctx: &'a mut OperatorContext) -> Self {
27 Self {
28 ctx,
29 }
30 }
31
32 #[instrument(name = "flow::operator::store::get", level = "trace", skip(self), fields(
33 key_len = key.as_bytes().len(),
34 found
35 ))]
36 pub fn get(&self, key: &EncodedKey) -> Result<Option<EncodedValues>> {
37 let result = raw_store_get(self.ctx, key)?;
38 tracing::Span::current().record("found", result.is_some());
39 Ok(result)
40 }
41
42 #[instrument(name = "flow::operator::store::contains_key", level = "trace", skip(self), fields(
43 key_len = key.as_bytes().len()
44 ))]
45 pub fn contains_key(&self, key: &EncodedKey) -> Result<bool> {
46 raw_store_contains_key(self.ctx, key)
47 }
48
49 #[instrument(name = "flow::operator::store::prefix", level = "trace", skip(self), fields(
50 prefix_len = prefix.as_bytes().len(),
51 result_count
52 ))]
53 pub fn prefix(&self, prefix: &EncodedKey) -> Result<Vec<(EncodedKey, EncodedValues)>> {
54 let results = raw_store_prefix(self.ctx, prefix)?;
55 tracing::Span::current().record("result_count", results.len());
56 Ok(results)
57 }
58
59 #[instrument(
60 name = "flow::operator::store::range",
61 level = "trace",
62 skip(self, start, end),
63 fields(result_count)
64 )]
65 pub fn range(
66 &self,
67 start: Bound<&EncodedKey>,
68 end: Bound<&EncodedKey>,
69 ) -> Result<Vec<(EncodedKey, EncodedValues)>> {
70 let results = raw_store_range(self.ctx, start, end)?;
71 tracing::Span::current().record("result_count", results.len());
72 Ok(results)
73 }
74}
75
76#[instrument(name = "flow::operator::store::raw::get", level = "trace", skip(ctx), fields(
78 key_len = key.as_bytes().len()
79))]
80fn raw_store_get(ctx: &OperatorContext, key: &EncodedKey) -> Result<Option<EncodedValues>> {
81 let key_bytes = key.as_bytes();
82 let mut output = BufferFFI {
83 ptr: null_mut(),
84 len: 0,
85 cap: 0,
86 };
87
88 unsafe {
89 let result =
90 ((*ctx.ctx).callbacks.store.get)(ctx.ctx, key_bytes.as_ptr(), key_bytes.len(), &mut output);
91
92 if result == FFI_OK {
93 if output.ptr.is_null() || output.len == 0 {
95 Ok(None)
96 } else {
97 let value_bytes = from_raw_parts(output.ptr, output.len).to_vec();
98 ((*ctx.ctx).callbacks.memory.free)(output.ptr as *mut u8, output.len);
100 Ok(Some(EncodedValues(CowVec::new(value_bytes))))
101 }
102 } else if result == FFI_NOT_FOUND {
103 Ok(None)
105 } else {
106 Err(FFIError::Other(format!("host_store_get failed with code {}", result)))
107 }
108 }
109}
110
111#[instrument(name = "flow::operator::store::raw::contains_key", level = "trace", skip(ctx), fields(
113 key_len = key.as_bytes().len()
114))]
115fn raw_store_contains_key(ctx: &OperatorContext, key: &EncodedKey) -> Result<bool> {
116 let key_bytes = key.as_bytes();
117 let mut result_byte: u8 = 0;
118
119 unsafe {
120 let result = ((*ctx.ctx).callbacks.store.contains_key)(
121 ctx.ctx,
122 key_bytes.as_ptr(),
123 key_bytes.len(),
124 &mut result_byte,
125 );
126
127 if result == FFI_OK {
128 Ok(result_byte != 0)
129 } else {
130 Err(FFIError::Other(format!("host_store_contains_key failed with code {}", result)))
131 }
132 }
133}
134
135#[instrument(name = "flow::operator::store::raw::prefix", level = "trace", skip(ctx), fields(
137 prefix_len = prefix.as_bytes().len()
138))]
139fn raw_store_prefix(ctx: &OperatorContext, prefix: &EncodedKey) -> Result<Vec<(EncodedKey, EncodedValues)>> {
140 let prefix_bytes = prefix.as_bytes();
141 let mut iterator: *mut StoreIteratorFFI = null_mut();
142
143 unsafe {
144 let result = ((*ctx.ctx).callbacks.store.prefix)(
145 ctx.ctx,
146 prefix_bytes.as_ptr(),
147 prefix_bytes.len(),
148 &mut iterator,
149 );
150
151 if result < 0 {
152 return Err(FFIError::Other(format!("host_store_prefix failed with code {}", result)));
153 }
154
155 collect_iterator_results(ctx, iterator)
156 }
157}
158
159const BOUND_UNBOUNDED: u8 = 0;
161const BOUND_INCLUDED: u8 = 1;
162const BOUND_EXCLUDED: u8 = 2;
163
164#[instrument(name = "flow::operator::store::raw::range", level = "trace", skip(ctx, start, end))]
166fn raw_store_range(
167 ctx: &OperatorContext,
168 start: Bound<&EncodedKey>,
169 end: Bound<&EncodedKey>,
170) -> Result<Vec<(EncodedKey, EncodedValues)>> {
171 let mut iterator: *mut StoreIteratorFFI = null_mut();
172
173 unsafe {
174 let (start_ptr, start_len, start_bound_type) = match start {
175 Bound::Unbounded => (std::ptr::null(), 0, BOUND_UNBOUNDED),
176 Bound::Included(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_INCLUDED),
177 Bound::Excluded(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_EXCLUDED),
178 };
179
180 let (end_ptr, end_len, end_bound_type) = match end {
181 Bound::Unbounded => (std::ptr::null(), 0, BOUND_UNBOUNDED),
182 Bound::Included(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_INCLUDED),
183 Bound::Excluded(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_EXCLUDED),
184 };
185
186 let result = ((*ctx.ctx).callbacks.store.range)(
187 ctx.ctx,
188 start_ptr,
189 start_len,
190 start_bound_type,
191 end_ptr,
192 end_len,
193 end_bound_type,
194 &mut iterator,
195 );
196
197 if result < 0 {
198 return Err(FFIError::Other(format!("host_store_range failed with code {}", result)));
199 }
200
201 collect_iterator_results(ctx, iterator)
202 }
203}
204
205#[instrument(
211 name = "flow::operator::store::collect_iterator",
212 level = "trace",
213 skip(ctx, iterator),
214 fields(result_count)
215)]
216unsafe fn collect_iterator_results(
217 ctx: &OperatorContext,
218 iterator: *mut StoreIteratorFFI,
219) -> Result<Vec<(EncodedKey, EncodedValues)>> {
220 if iterator.is_null() {
221 tracing::Span::current().record("result_count", 0);
222 return Ok(Vec::new());
223 }
224
225 let mut results = Vec::new();
226
227 loop {
228 let mut key_buf = BufferFFI {
229 ptr: null_mut(),
230 len: 0,
231 cap: 0,
232 };
233 let mut value_buf = BufferFFI {
234 ptr: null_mut(),
235 len: 0,
236 cap: 0,
237 };
238
239 let next_result =
240 unsafe { ((*ctx.ctx).callbacks.store.iterator_next)(iterator, &mut key_buf, &mut value_buf) };
241
242 if next_result == FFI_END_OF_ITERATION {
243 break;
245 } else if next_result != FFI_OK {
246 unsafe { ((*ctx.ctx).callbacks.store.iterator_free)(iterator) };
247 return Err(FFIError::Other(format!(
248 "host_store_iterator_next failed with code {}",
249 next_result
250 )));
251 }
252
253 if !key_buf.ptr.is_null() && key_buf.len > 0 {
255 let key_bytes = unsafe { from_raw_parts(key_buf.ptr, key_buf.len) }.to_vec();
256 let key = EncodedKey(CowVec::new(key_bytes));
257
258 let value = if !value_buf.ptr.is_null() && value_buf.len > 0 {
259 let value_bytes = unsafe { from_raw_parts(value_buf.ptr, value_buf.len) }.to_vec();
260 EncodedValues(CowVec::new(value_bytes))
261 } else {
262 EncodedValues(CowVec::new(Vec::new()))
263 };
264
265 unsafe { ((*ctx.ctx).callbacks.memory.free)(key_buf.ptr as *mut u8, key_buf.len) };
267 if !value_buf.ptr.is_null() && value_buf.len > 0 {
268 unsafe { ((*ctx.ctx).callbacks.memory.free)(value_buf.ptr as *mut u8, value_buf.len) };
269 }
270
271 results.push((key, value));
272 }
273 }
274
275 unsafe { ((*ctx.ctx).callbacks.store.iterator_free)(iterator) };
276 tracing::Span::current().record("result_count", results.len());
277 Ok(results)
278}