reifydb_flow_operator_sdk/
store.rs1use std::{ops::Bound, ptr::null_mut, slice::from_raw_parts};
7
8use reifydb_core::{
9 CowVec,
10 value::encoded::{EncodedKey, EncodedValues},
11};
12use reifydb_flow_operator_abi::{BufferFFI, StoreIteratorFFI};
13
14use crate::{
15 context::OperatorContext,
16 error::{FFIError, Result},
17};
18
19pub struct Store<'a> {
21 ctx: &'a mut OperatorContext,
22}
23
24impl<'a> Store<'a> {
25 pub(crate) fn new(ctx: &'a mut OperatorContext) -> Self {
26 Self {
27 ctx,
28 }
29 }
30
31 pub fn get(&self, key: &EncodedKey) -> Result<Option<EncodedValues>> {
32 raw_store_get(self.ctx, key)
33 }
34
35 pub fn contains_key(&self, key: &EncodedKey) -> Result<bool> {
36 raw_store_contains_key(self.ctx, key)
37 }
38
39 pub fn prefix(&self, prefix: &EncodedKey) -> Result<Vec<(EncodedKey, EncodedValues)>> {
40 raw_store_prefix(self.ctx, prefix)
41 }
42
43 pub fn range(
44 &self,
45 start: Bound<&EncodedKey>,
46 end: Bound<&EncodedKey>,
47 ) -> Result<Vec<(EncodedKey, EncodedValues)>> {
48 raw_store_range(self.ctx, start, end)
49 }
50}
51
52fn raw_store_get(ctx: &OperatorContext, key: &EncodedKey) -> Result<Option<EncodedValues>> {
54 let key_bytes = key.as_bytes();
55 let mut output = BufferFFI {
56 ptr: null_mut(),
57 len: 0,
58 cap: 0,
59 };
60
61 unsafe {
62 let result =
63 ((*ctx.ctx).callbacks.store.get)(ctx.ctx, key_bytes.as_ptr(), key_bytes.len(), &mut output);
64
65 if result == 0 {
66 if output.ptr.is_null() || output.len == 0 {
68 Ok(None)
69 } else {
70 let value_bytes = from_raw_parts(output.ptr, output.len).to_vec();
71 ((*ctx.ctx).callbacks.memory.free)(output.ptr as *mut u8, output.len);
73 Ok(Some(EncodedValues(CowVec::new(value_bytes))))
74 }
75 } else if result == 1 {
76 Ok(None)
78 } else {
79 Err(FFIError::Other(format!("host_store_get failed with code {}", result)))
80 }
81 }
82}
83
84fn raw_store_contains_key(ctx: &OperatorContext, key: &EncodedKey) -> Result<bool> {
86 let key_bytes = key.as_bytes();
87 let mut result_byte: u8 = 0;
88
89 unsafe {
90 let result = ((*ctx.ctx).callbacks.store.contains_key)(
91 ctx.ctx,
92 key_bytes.as_ptr(),
93 key_bytes.len(),
94 &mut result_byte,
95 );
96
97 if result == 0 {
98 Ok(result_byte != 0)
99 } else {
100 Err(FFIError::Other(format!("host_store_contains_key failed with code {}", result)))
101 }
102 }
103}
104
105fn raw_store_prefix(ctx: &OperatorContext, prefix: &EncodedKey) -> Result<Vec<(EncodedKey, EncodedValues)>> {
107 let prefix_bytes = prefix.as_bytes();
108 let mut iterator: *mut StoreIteratorFFI = null_mut();
109
110 unsafe {
111 let result = ((*ctx.ctx).callbacks.store.prefix)(
112 ctx.ctx,
113 prefix_bytes.as_ptr(),
114 prefix_bytes.len(),
115 &mut iterator,
116 );
117
118 if result != 0 {
119 return Err(FFIError::Other(format!("host_store_prefix failed with code {}", result)));
120 }
121
122 collect_iterator_results(ctx, iterator)
123 }
124}
125
126const BOUND_UNBOUNDED: u8 = 0;
128const BOUND_INCLUDED: u8 = 1;
129const BOUND_EXCLUDED: u8 = 2;
130
131fn raw_store_range(
133 ctx: &OperatorContext,
134 start: Bound<&EncodedKey>,
135 end: Bound<&EncodedKey>,
136) -> Result<Vec<(EncodedKey, EncodedValues)>> {
137 let mut iterator: *mut StoreIteratorFFI = null_mut();
138
139 unsafe {
140 let (start_ptr, start_len, start_bound_type) = match start {
141 Bound::Unbounded => (std::ptr::null(), 0, BOUND_UNBOUNDED),
142 Bound::Included(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_INCLUDED),
143 Bound::Excluded(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_EXCLUDED),
144 };
145
146 let (end_ptr, end_len, end_bound_type) = match end {
147 Bound::Unbounded => (std::ptr::null(), 0, BOUND_UNBOUNDED),
148 Bound::Included(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_INCLUDED),
149 Bound::Excluded(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_EXCLUDED),
150 };
151
152 let result = ((*ctx.ctx).callbacks.store.range)(
153 ctx.ctx,
154 start_ptr,
155 start_len,
156 start_bound_type,
157 end_ptr,
158 end_len,
159 end_bound_type,
160 &mut iterator,
161 );
162
163 if result != 0 {
164 return Err(FFIError::Other(format!("host_store_range failed with code {}", result)));
165 }
166
167 collect_iterator_results(ctx, iterator)
168 }
169}
170
171unsafe fn collect_iterator_results(
177 ctx: &OperatorContext,
178 iterator: *mut StoreIteratorFFI,
179) -> Result<Vec<(EncodedKey, EncodedValues)>> {
180 if iterator.is_null() {
181 return Ok(Vec::new());
182 }
183
184 let mut results = Vec::new();
185
186 loop {
187 let mut key_buf = BufferFFI {
188 ptr: null_mut(),
189 len: 0,
190 cap: 0,
191 };
192 let mut value_buf = BufferFFI {
193 ptr: null_mut(),
194 len: 0,
195 cap: 0,
196 };
197
198 let next_result =
199 unsafe { ((*ctx.ctx).callbacks.store.iterator_next)(iterator, &mut key_buf, &mut value_buf) };
200
201 if next_result == 1 {
202 break;
204 } else if next_result != 0 {
205 unsafe { ((*ctx.ctx).callbacks.store.iterator_free)(iterator) };
206 return Err(FFIError::Other(format!(
207 "host_store_iterator_next failed with code {}",
208 next_result
209 )));
210 }
211
212 if !key_buf.ptr.is_null() && key_buf.len > 0 {
214 let key_bytes = unsafe { from_raw_parts(key_buf.ptr, key_buf.len) }.to_vec();
215 let key = EncodedKey(CowVec::new(key_bytes));
216
217 let value = if !value_buf.ptr.is_null() && value_buf.len > 0 {
218 let value_bytes = unsafe { from_raw_parts(value_buf.ptr, value_buf.len) }.to_vec();
219 EncodedValues(CowVec::new(value_bytes))
220 } else {
221 EncodedValues(CowVec::new(Vec::new()))
222 };
223
224 unsafe { ((*ctx.ctx).callbacks.memory.free)(key_buf.ptr as *mut u8, key_buf.len) };
226 if !value_buf.ptr.is_null() && value_buf.len > 0 {
227 unsafe { ((*ctx.ctx).callbacks.memory.free)(value_buf.ptr as *mut u8, value_buf.len) };
228 }
229
230 results.push((key, value));
231 }
232 }
233
234 unsafe { ((*ctx.ctx).callbacks.store.iterator_free)(iterator) };
235 Ok(results)
236}