reifydb_flow_operator_sdk/
store.rs1use std::{ops::Bound, ptr::null_mut, slice::from_raw_parts};
7
8use reifydb_core::{
9 CowVec,
10 value::encoded::{EncodedKey, EncodedValues},
11};
12use reifydb_flow_operator_abi::{BufferFFI, FFI_END_OF_ITERATION, FFI_NOT_FOUND, FFI_OK, StoreIteratorFFI};
13use tracing::instrument;
14
15use crate::{
16 context::OperatorContext,
17 error::{FFIError, Result},
18};
19
20pub struct Store<'a> {
22 ctx: &'a mut OperatorContext,
23}
24
25impl<'a> Store<'a> {
26 pub(crate) fn new(ctx: &'a mut OperatorContext) -> Self {
27 Self {
28 ctx,
29 }
30 }
31
32 #[instrument(level = "trace", skip(self), fields(
33 key_len = key.as_bytes().len(),
34 found
35 ))]
36 pub fn get(&self, key: &EncodedKey) -> Result<Option<EncodedValues>> {
37 let result = raw_store_get(self.ctx, key)?;
38 tracing::Span::current().record("found", result.is_some());
39 Ok(result)
40 }
41
42 #[instrument(level = "trace", skip(self), fields(
43 key_len = key.as_bytes().len()
44 ))]
45 pub fn contains_key(&self, key: &EncodedKey) -> Result<bool> {
46 raw_store_contains_key(self.ctx, key)
47 }
48
49 #[instrument(level = "trace", skip(self), fields(
50 prefix_len = prefix.as_bytes().len(),
51 result_count
52 ))]
53 pub fn prefix(&self, prefix: &EncodedKey) -> Result<Vec<(EncodedKey, EncodedValues)>> {
54 let results = raw_store_prefix(self.ctx, prefix)?;
55 tracing::Span::current().record("result_count", results.len());
56 Ok(results)
57 }
58
59 #[instrument(level = "trace", skip(self, start, end), fields(result_count))]
60 pub fn range(
61 &self,
62 start: Bound<&EncodedKey>,
63 end: Bound<&EncodedKey>,
64 ) -> Result<Vec<(EncodedKey, EncodedValues)>> {
65 let results = raw_store_range(self.ctx, start, end)?;
66 tracing::Span::current().record("result_count", results.len());
67 Ok(results)
68 }
69}
70
71#[instrument(level = "trace", skip(ctx), fields(
73 key_len = key.as_bytes().len()
74))]
75fn raw_store_get(ctx: &OperatorContext, key: &EncodedKey) -> Result<Option<EncodedValues>> {
76 let key_bytes = key.as_bytes();
77 let mut output = BufferFFI {
78 ptr: null_mut(),
79 len: 0,
80 cap: 0,
81 };
82
83 unsafe {
84 let result =
85 ((*ctx.ctx).callbacks.store.get)(ctx.ctx, key_bytes.as_ptr(), key_bytes.len(), &mut output);
86
87 if result == FFI_OK {
88 if output.ptr.is_null() || output.len == 0 {
90 Ok(None)
91 } else {
92 let value_bytes = from_raw_parts(output.ptr, output.len).to_vec();
93 ((*ctx.ctx).callbacks.memory.free)(output.ptr as *mut u8, output.len);
95 Ok(Some(EncodedValues(CowVec::new(value_bytes))))
96 }
97 } else if result == FFI_NOT_FOUND {
98 Ok(None)
100 } else {
101 Err(FFIError::Other(format!("host_store_get failed with code {}", result)))
102 }
103 }
104}
105
106#[instrument(level = "trace", skip(ctx), fields(
108 key_len = key.as_bytes().len()
109))]
110fn raw_store_contains_key(ctx: &OperatorContext, key: &EncodedKey) -> Result<bool> {
111 let key_bytes = key.as_bytes();
112 let mut result_byte: u8 = 0;
113
114 unsafe {
115 let result = ((*ctx.ctx).callbacks.store.contains_key)(
116 ctx.ctx,
117 key_bytes.as_ptr(),
118 key_bytes.len(),
119 &mut result_byte,
120 );
121
122 if result == FFI_OK {
123 Ok(result_byte != 0)
124 } else {
125 Err(FFIError::Other(format!("host_store_contains_key failed with code {}", result)))
126 }
127 }
128}
129
130#[instrument(level = "trace", skip(ctx), fields(
132 prefix_len = prefix.as_bytes().len()
133))]
134fn raw_store_prefix(ctx: &OperatorContext, prefix: &EncodedKey) -> Result<Vec<(EncodedKey, EncodedValues)>> {
135 let prefix_bytes = prefix.as_bytes();
136 let mut iterator: *mut StoreIteratorFFI = null_mut();
137
138 unsafe {
139 let result = ((*ctx.ctx).callbacks.store.prefix)(
140 ctx.ctx,
141 prefix_bytes.as_ptr(),
142 prefix_bytes.len(),
143 &mut iterator,
144 );
145
146 if result < 0 {
147 return Err(FFIError::Other(format!("host_store_prefix failed with code {}", result)));
148 }
149
150 collect_iterator_results(ctx, iterator)
151 }
152}
153
154const BOUND_UNBOUNDED: u8 = 0;
156const BOUND_INCLUDED: u8 = 1;
157const BOUND_EXCLUDED: u8 = 2;
158
159#[instrument(level = "trace", skip(ctx, start, end))]
161fn raw_store_range(
162 ctx: &OperatorContext,
163 start: Bound<&EncodedKey>,
164 end: Bound<&EncodedKey>,
165) -> Result<Vec<(EncodedKey, EncodedValues)>> {
166 let mut iterator: *mut StoreIteratorFFI = null_mut();
167
168 unsafe {
169 let (start_ptr, start_len, start_bound_type) = match start {
170 Bound::Unbounded => (std::ptr::null(), 0, BOUND_UNBOUNDED),
171 Bound::Included(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_INCLUDED),
172 Bound::Excluded(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_EXCLUDED),
173 };
174
175 let (end_ptr, end_len, end_bound_type) = match end {
176 Bound::Unbounded => (std::ptr::null(), 0, BOUND_UNBOUNDED),
177 Bound::Included(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_INCLUDED),
178 Bound::Excluded(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_EXCLUDED),
179 };
180
181 let result = ((*ctx.ctx).callbacks.store.range)(
182 ctx.ctx,
183 start_ptr,
184 start_len,
185 start_bound_type,
186 end_ptr,
187 end_len,
188 end_bound_type,
189 &mut iterator,
190 );
191
192 if result < 0 {
193 return Err(FFIError::Other(format!("host_store_range failed with code {}", result)));
194 }
195
196 collect_iterator_results(ctx, iterator)
197 }
198}
199
200#[instrument(level = "trace", skip(ctx, iterator), fields(result_count))]
206unsafe fn collect_iterator_results(
207 ctx: &OperatorContext,
208 iterator: *mut StoreIteratorFFI,
209) -> Result<Vec<(EncodedKey, EncodedValues)>> {
210 if iterator.is_null() {
211 tracing::Span::current().record("result_count", 0);
212 return Ok(Vec::new());
213 }
214
215 let mut results = Vec::new();
216
217 loop {
218 let mut key_buf = BufferFFI {
219 ptr: null_mut(),
220 len: 0,
221 cap: 0,
222 };
223 let mut value_buf = BufferFFI {
224 ptr: null_mut(),
225 len: 0,
226 cap: 0,
227 };
228
229 let next_result =
230 unsafe { ((*ctx.ctx).callbacks.store.iterator_next)(iterator, &mut key_buf, &mut value_buf) };
231
232 if next_result == FFI_END_OF_ITERATION {
233 break;
235 } else if next_result != FFI_OK {
236 unsafe { ((*ctx.ctx).callbacks.store.iterator_free)(iterator) };
237 return Err(FFIError::Other(format!(
238 "host_store_iterator_next failed with code {}",
239 next_result
240 )));
241 }
242
243 if !key_buf.ptr.is_null() && key_buf.len > 0 {
245 let key_bytes = unsafe { from_raw_parts(key_buf.ptr, key_buf.len) }.to_vec();
246 let key = EncodedKey(CowVec::new(key_bytes));
247
248 let value = if !value_buf.ptr.is_null() && value_buf.len > 0 {
249 let value_bytes = unsafe { from_raw_parts(value_buf.ptr, value_buf.len) }.to_vec();
250 EncodedValues(CowVec::new(value_bytes))
251 } else {
252 EncodedValues(CowVec::new(Vec::new()))
253 };
254
255 unsafe { ((*ctx.ctx).callbacks.memory.free)(key_buf.ptr as *mut u8, key_buf.len) };
257 if !value_buf.ptr.is_null() && value_buf.len > 0 {
258 unsafe { ((*ctx.ctx).callbacks.memory.free)(value_buf.ptr as *mut u8, value_buf.len) };
259 }
260
261 results.push((key, value));
262 }
263 }
264
265 unsafe { ((*ctx.ctx).callbacks.store.iterator_free)(iterator) };
266 tracing::Span::current().record("result_count", results.len());
267 Ok(results)
268}