reifydb_flow_operator_sdk/
store.rs

1//! Store access for FFI operators
2//!
3//! Provides read-only access to the underlying store,
4//! allowing operators to query data beyond their own state.
5
6use std::{ops::Bound, ptr::null_mut, slice::from_raw_parts};
7
8use reifydb_core::{
9	CowVec,
10	value::encoded::{EncodedKey, EncodedValues},
11};
12use reifydb_flow_operator_abi::{BufferFFI, FFI_END_OF_ITERATION, FFI_NOT_FOUND, FFI_OK, StoreIteratorFFI};
13use tracing::instrument;
14
15use crate::{
16	context::OperatorContext,
17	error::{FFIError, Result},
18};
19
20/// Store accessor providing read-only access to the underlying store
21pub struct Store<'a> {
22	ctx: &'a mut OperatorContext,
23}
24
25impl<'a> Store<'a> {
26	pub(crate) fn new(ctx: &'a mut OperatorContext) -> Self {
27		Self {
28			ctx,
29		}
30	}
31
32	#[instrument(level = "trace", skip(self), fields(
33		key_len = key.as_bytes().len(),
34		found
35	))]
36	pub fn get(&self, key: &EncodedKey) -> Result<Option<EncodedValues>> {
37		let result = raw_store_get(self.ctx, key)?;
38		tracing::Span::current().record("found", result.is_some());
39		Ok(result)
40	}
41
42	#[instrument(level = "trace", skip(self), fields(
43		key_len = key.as_bytes().len()
44	))]
45	pub fn contains_key(&self, key: &EncodedKey) -> Result<bool> {
46		raw_store_contains_key(self.ctx, key)
47	}
48
49	#[instrument(level = "trace", skip(self), fields(
50		prefix_len = prefix.as_bytes().len(),
51		result_count
52	))]
53	pub fn prefix(&self, prefix: &EncodedKey) -> Result<Vec<(EncodedKey, EncodedValues)>> {
54		let results = raw_store_prefix(self.ctx, prefix)?;
55		tracing::Span::current().record("result_count", results.len());
56		Ok(results)
57	}
58
59	#[instrument(level = "trace", skip(self, start, end), fields(result_count))]
60	pub fn range(
61		&self,
62		start: Bound<&EncodedKey>,
63		end: Bound<&EncodedKey>,
64	) -> Result<Vec<(EncodedKey, EncodedValues)>> {
65		let results = raw_store_range(self.ctx, start, end)?;
66		tracing::Span::current().record("result_count", results.len());
67		Ok(results)
68	}
69}
70
71/// Get a value from store by key
72#[instrument(level = "trace", skip(ctx), fields(
73	key_len = key.as_bytes().len()
74))]
75fn raw_store_get(ctx: &OperatorContext, key: &EncodedKey) -> Result<Option<EncodedValues>> {
76	let key_bytes = key.as_bytes();
77	let mut output = BufferFFI {
78		ptr: null_mut(),
79		len: 0,
80		cap: 0,
81	};
82
83	unsafe {
84		let result =
85			((*ctx.ctx).callbacks.store.get)(ctx.ctx, key_bytes.as_ptr(), key_bytes.len(), &mut output);
86
87		if result == FFI_OK {
88			// Success - value found
89			if output.ptr.is_null() || output.len == 0 {
90				Ok(None)
91			} else {
92				let value_bytes = from_raw_parts(output.ptr, output.len).to_vec();
93				// Free the buffer allocated by host
94				((*ctx.ctx).callbacks.memory.free)(output.ptr as *mut u8, output.len);
95				Ok(Some(EncodedValues(CowVec::new(value_bytes))))
96			}
97		} else if result == FFI_NOT_FOUND {
98			// Key not found
99			Ok(None)
100		} else {
101			Err(FFIError::Other(format!("host_store_get failed with code {}", result)))
102		}
103	}
104}
105
106/// Check if a key exists in store
107#[instrument(level = "trace", skip(ctx), fields(
108	key_len = key.as_bytes().len()
109))]
110fn raw_store_contains_key(ctx: &OperatorContext, key: &EncodedKey) -> Result<bool> {
111	let key_bytes = key.as_bytes();
112	let mut result_byte: u8 = 0;
113
114	unsafe {
115		let result = ((*ctx.ctx).callbacks.store.contains_key)(
116			ctx.ctx,
117			key_bytes.as_ptr(),
118			key_bytes.len(),
119			&mut result_byte,
120		);
121
122		if result == FFI_OK {
123			Ok(result_byte != 0)
124		} else {
125			Err(FFIError::Other(format!("host_store_contains_key failed with code {}", result)))
126		}
127	}
128}
129
130/// Scan all keys with a given prefix
131#[instrument(level = "trace", skip(ctx), fields(
132	prefix_len = prefix.as_bytes().len()
133))]
134fn raw_store_prefix(ctx: &OperatorContext, prefix: &EncodedKey) -> Result<Vec<(EncodedKey, EncodedValues)>> {
135	let prefix_bytes = prefix.as_bytes();
136	let mut iterator: *mut StoreIteratorFFI = null_mut();
137
138	unsafe {
139		let result = ((*ctx.ctx).callbacks.store.prefix)(
140			ctx.ctx,
141			prefix_bytes.as_ptr(),
142			prefix_bytes.len(),
143			&mut iterator,
144		);
145
146		if result < 0 {
147			return Err(FFIError::Other(format!("host_store_prefix failed with code {}", result)));
148		}
149
150		collect_iterator_results(ctx, iterator)
151	}
152}
153
154/// Bound type constants for FFI
155const BOUND_UNBOUNDED: u8 = 0;
156const BOUND_INCLUDED: u8 = 1;
157const BOUND_EXCLUDED: u8 = 2;
158
159/// Scan all keys within a range
160#[instrument(level = "trace", skip(ctx, start, end))]
161fn raw_store_range(
162	ctx: &OperatorContext,
163	start: Bound<&EncodedKey>,
164	end: Bound<&EncodedKey>,
165) -> Result<Vec<(EncodedKey, EncodedValues)>> {
166	let mut iterator: *mut StoreIteratorFFI = null_mut();
167
168	unsafe {
169		let (start_ptr, start_len, start_bound_type) = match start {
170			Bound::Unbounded => (std::ptr::null(), 0, BOUND_UNBOUNDED),
171			Bound::Included(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_INCLUDED),
172			Bound::Excluded(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_EXCLUDED),
173		};
174
175		let (end_ptr, end_len, end_bound_type) = match end {
176			Bound::Unbounded => (std::ptr::null(), 0, BOUND_UNBOUNDED),
177			Bound::Included(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_INCLUDED),
178			Bound::Excluded(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_EXCLUDED),
179		};
180
181		let result = ((*ctx.ctx).callbacks.store.range)(
182			ctx.ctx,
183			start_ptr,
184			start_len,
185			start_bound_type,
186			end_ptr,
187			end_len,
188			end_bound_type,
189			&mut iterator,
190		);
191
192		if result < 0 {
193			return Err(FFIError::Other(format!("host_store_range failed with code {}", result)));
194		}
195
196		collect_iterator_results(ctx, iterator)
197	}
198}
199
200/// Helper to collect all results from a store iterator
201///
202/// # Safety
203/// - iterator must be a valid pointer returned by a store prefix/range call
204/// - ctx must have valid callbacks
205#[instrument(level = "trace", skip(ctx, iterator), fields(result_count))]
206unsafe fn collect_iterator_results(
207	ctx: &OperatorContext,
208	iterator: *mut StoreIteratorFFI,
209) -> Result<Vec<(EncodedKey, EncodedValues)>> {
210	if iterator.is_null() {
211		tracing::Span::current().record("result_count", 0);
212		return Ok(Vec::new());
213	}
214
215	let mut results = Vec::new();
216
217	loop {
218		let mut key_buf = BufferFFI {
219			ptr: null_mut(),
220			len: 0,
221			cap: 0,
222		};
223		let mut value_buf = BufferFFI {
224			ptr: null_mut(),
225			len: 0,
226			cap: 0,
227		};
228
229		let next_result =
230			unsafe { ((*ctx.ctx).callbacks.store.iterator_next)(iterator, &mut key_buf, &mut value_buf) };
231
232		if next_result == FFI_END_OF_ITERATION {
233			// End of iteration
234			break;
235		} else if next_result != FFI_OK {
236			unsafe { ((*ctx.ctx).callbacks.store.iterator_free)(iterator) };
237			return Err(FFIError::Other(format!(
238				"host_store_iterator_next failed with code {}",
239				next_result
240			)));
241		}
242
243		// Convert buffers to owned data
244		if !key_buf.ptr.is_null() && key_buf.len > 0 {
245			let key_bytes = unsafe { from_raw_parts(key_buf.ptr, key_buf.len) }.to_vec();
246			let key = EncodedKey(CowVec::new(key_bytes));
247
248			let value = if !value_buf.ptr.is_null() && value_buf.len > 0 {
249				let value_bytes = unsafe { from_raw_parts(value_buf.ptr, value_buf.len) }.to_vec();
250				EncodedValues(CowVec::new(value_bytes))
251			} else {
252				EncodedValues(CowVec::new(Vec::new()))
253			};
254
255			// Free buffers allocated by host
256			unsafe { ((*ctx.ctx).callbacks.memory.free)(key_buf.ptr as *mut u8, key_buf.len) };
257			if !value_buf.ptr.is_null() && value_buf.len > 0 {
258				unsafe { ((*ctx.ctx).callbacks.memory.free)(value_buf.ptr as *mut u8, value_buf.len) };
259			}
260
261			results.push((key, value));
262		}
263	}
264
265	unsafe { ((*ctx.ctx).callbacks.store.iterator_free)(iterator) };
266	tracing::Span::current().record("result_count", results.len());
267	Ok(results)
268}