reifydb_flow_operator_sdk/
store.rs

1//! Store access for FFI operators
2//!
3//! Provides read-only access to the underlying store,
4//! allowing operators to query data beyond their own state.
5
6use std::{ops::Bound, ptr::null_mut, slice::from_raw_parts};
7
8use reifydb_core::{
9	CowVec,
10	value::encoded::{EncodedKey, EncodedValues},
11};
12use reifydb_flow_operator_abi::{BufferFFI, FFI_END_OF_ITERATION, FFI_NOT_FOUND, FFI_OK, StoreIteratorFFI};
13use tracing::instrument;
14
15use crate::{
16	context::OperatorContext,
17	error::{FFIError, Result},
18};
19
20/// Store accessor providing read-only access to the underlying store
21pub struct Store<'a> {
22	ctx: &'a mut OperatorContext,
23}
24
25impl<'a> Store<'a> {
26	pub(crate) fn new(ctx: &'a mut OperatorContext) -> Self {
27		Self {
28			ctx,
29		}
30	}
31
32	#[instrument(name = "flow::operator::store::get", level = "trace", skip(self), fields(
33		key_len = key.as_bytes().len(),
34		found
35	))]
36	pub fn get(&self, key: &EncodedKey) -> Result<Option<EncodedValues>> {
37		let result = raw_store_get(self.ctx, key)?;
38		tracing::Span::current().record("found", result.is_some());
39		Ok(result)
40	}
41
42	#[instrument(name = "flow::operator::store::contains_key", level = "trace", skip(self), fields(
43		key_len = key.as_bytes().len()
44	))]
45	pub fn contains_key(&self, key: &EncodedKey) -> Result<bool> {
46		raw_store_contains_key(self.ctx, key)
47	}
48
49	#[instrument(name = "flow::operator::store::prefix", level = "trace", skip(self), fields(
50		prefix_len = prefix.as_bytes().len(),
51		result_count
52	))]
53	pub fn prefix(&self, prefix: &EncodedKey) -> Result<Vec<(EncodedKey, EncodedValues)>> {
54		let results = raw_store_prefix(self.ctx, prefix)?;
55		tracing::Span::current().record("result_count", results.len());
56		Ok(results)
57	}
58
59	#[instrument(
60		name = "flow::operator::store::range",
61		level = "trace",
62		skip(self, start, end),
63		fields(result_count)
64	)]
65	pub fn range(
66		&self,
67		start: Bound<&EncodedKey>,
68		end: Bound<&EncodedKey>,
69	) -> Result<Vec<(EncodedKey, EncodedValues)>> {
70		let results = raw_store_range(self.ctx, start, end)?;
71		tracing::Span::current().record("result_count", results.len());
72		Ok(results)
73	}
74}
75
76/// Get a value from store by key
77#[instrument(name = "flow::operator::store::raw::get", level = "trace", skip(ctx), fields(
78	key_len = key.as_bytes().len()
79))]
80fn raw_store_get(ctx: &OperatorContext, key: &EncodedKey) -> Result<Option<EncodedValues>> {
81	let key_bytes = key.as_bytes();
82	let mut output = BufferFFI {
83		ptr: null_mut(),
84		len: 0,
85		cap: 0,
86	};
87
88	unsafe {
89		let result =
90			((*ctx.ctx).callbacks.store.get)(ctx.ctx, key_bytes.as_ptr(), key_bytes.len(), &mut output);
91
92		if result == FFI_OK {
93			// Success - value found
94			if output.ptr.is_null() || output.len == 0 {
95				Ok(None)
96			} else {
97				let value_bytes = from_raw_parts(output.ptr, output.len).to_vec();
98				// Free the buffer allocated by host
99				((*ctx.ctx).callbacks.memory.free)(output.ptr as *mut u8, output.len);
100				Ok(Some(EncodedValues(CowVec::new(value_bytes))))
101			}
102		} else if result == FFI_NOT_FOUND {
103			// Key not found
104			Ok(None)
105		} else {
106			Err(FFIError::Other(format!("host_store_get failed with code {}", result)))
107		}
108	}
109}
110
111/// Check if a key exists in store
112#[instrument(name = "flow::operator::store::raw::contains_key", level = "trace", skip(ctx), fields(
113	key_len = key.as_bytes().len()
114))]
115fn raw_store_contains_key(ctx: &OperatorContext, key: &EncodedKey) -> Result<bool> {
116	let key_bytes = key.as_bytes();
117	let mut result_byte: u8 = 0;
118
119	unsafe {
120		let result = ((*ctx.ctx).callbacks.store.contains_key)(
121			ctx.ctx,
122			key_bytes.as_ptr(),
123			key_bytes.len(),
124			&mut result_byte,
125		);
126
127		if result == FFI_OK {
128			Ok(result_byte != 0)
129		} else {
130			Err(FFIError::Other(format!("host_store_contains_key failed with code {}", result)))
131		}
132	}
133}
134
135/// Scan all keys with a given prefix
136#[instrument(name = "flow::operator::store::raw::prefix", level = "trace", skip(ctx), fields(
137	prefix_len = prefix.as_bytes().len()
138))]
139fn raw_store_prefix(ctx: &OperatorContext, prefix: &EncodedKey) -> Result<Vec<(EncodedKey, EncodedValues)>> {
140	let prefix_bytes = prefix.as_bytes();
141	let mut iterator: *mut StoreIteratorFFI = null_mut();
142
143	unsafe {
144		let result = ((*ctx.ctx).callbacks.store.prefix)(
145			ctx.ctx,
146			prefix_bytes.as_ptr(),
147			prefix_bytes.len(),
148			&mut iterator,
149		);
150
151		if result < 0 {
152			return Err(FFIError::Other(format!("host_store_prefix failed with code {}", result)));
153		}
154
155		collect_iterator_results(ctx, iterator)
156	}
157}
158
159/// Bound type constants for FFI
160const BOUND_UNBOUNDED: u8 = 0;
161const BOUND_INCLUDED: u8 = 1;
162const BOUND_EXCLUDED: u8 = 2;
163
164/// Scan all keys within a range
165#[instrument(name = "flow::operator::store::raw::range", level = "trace", skip(ctx, start, end))]
166fn raw_store_range(
167	ctx: &OperatorContext,
168	start: Bound<&EncodedKey>,
169	end: Bound<&EncodedKey>,
170) -> Result<Vec<(EncodedKey, EncodedValues)>> {
171	let mut iterator: *mut StoreIteratorFFI = null_mut();
172
173	unsafe {
174		let (start_ptr, start_len, start_bound_type) = match start {
175			Bound::Unbounded => (std::ptr::null(), 0, BOUND_UNBOUNDED),
176			Bound::Included(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_INCLUDED),
177			Bound::Excluded(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_EXCLUDED),
178		};
179
180		let (end_ptr, end_len, end_bound_type) = match end {
181			Bound::Unbounded => (std::ptr::null(), 0, BOUND_UNBOUNDED),
182			Bound::Included(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_INCLUDED),
183			Bound::Excluded(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_EXCLUDED),
184		};
185
186		let result = ((*ctx.ctx).callbacks.store.range)(
187			ctx.ctx,
188			start_ptr,
189			start_len,
190			start_bound_type,
191			end_ptr,
192			end_len,
193			end_bound_type,
194			&mut iterator,
195		);
196
197		if result < 0 {
198			return Err(FFIError::Other(format!("host_store_range failed with code {}", result)));
199		}
200
201		collect_iterator_results(ctx, iterator)
202	}
203}
204
205/// Helper to collect all results from a store iterator
206///
207/// # Safety
208/// - iterator must be a valid pointer returned by a store prefix/range call
209/// - ctx must have valid callbacks
210#[instrument(
211	name = "flow::operator::store::collect_iterator",
212	level = "trace",
213	skip(ctx, iterator),
214	fields(result_count)
215)]
216unsafe fn collect_iterator_results(
217	ctx: &OperatorContext,
218	iterator: *mut StoreIteratorFFI,
219) -> Result<Vec<(EncodedKey, EncodedValues)>> {
220	if iterator.is_null() {
221		tracing::Span::current().record("result_count", 0);
222		return Ok(Vec::new());
223	}
224
225	let mut results = Vec::new();
226
227	loop {
228		let mut key_buf = BufferFFI {
229			ptr: null_mut(),
230			len: 0,
231			cap: 0,
232		};
233		let mut value_buf = BufferFFI {
234			ptr: null_mut(),
235			len: 0,
236			cap: 0,
237		};
238
239		let next_result =
240			unsafe { ((*ctx.ctx).callbacks.store.iterator_next)(iterator, &mut key_buf, &mut value_buf) };
241
242		if next_result == FFI_END_OF_ITERATION {
243			// End of iteration
244			break;
245		} else if next_result != FFI_OK {
246			unsafe { ((*ctx.ctx).callbacks.store.iterator_free)(iterator) };
247			return Err(FFIError::Other(format!(
248				"host_store_iterator_next failed with code {}",
249				next_result
250			)));
251		}
252
253		// Convert buffers to owned data
254		if !key_buf.ptr.is_null() && key_buf.len > 0 {
255			let key_bytes = unsafe { from_raw_parts(key_buf.ptr, key_buf.len) }.to_vec();
256			let key = EncodedKey(CowVec::new(key_bytes));
257
258			let value = if !value_buf.ptr.is_null() && value_buf.len > 0 {
259				let value_bytes = unsafe { from_raw_parts(value_buf.ptr, value_buf.len) }.to_vec();
260				EncodedValues(CowVec::new(value_bytes))
261			} else {
262				EncodedValues(CowVec::new(Vec::new()))
263			};
264
265			// Free buffers allocated by host
266			unsafe { ((*ctx.ctx).callbacks.memory.free)(key_buf.ptr as *mut u8, key_buf.len) };
267			if !value_buf.ptr.is_null() && value_buf.len > 0 {
268				unsafe { ((*ctx.ctx).callbacks.memory.free)(value_buf.ptr as *mut u8, value_buf.len) };
269			}
270
271			results.push((key, value));
272		}
273	}
274
275	unsafe { ((*ctx.ctx).callbacks.store.iterator_free)(iterator) };
276	tracing::Span::current().record("result_count", results.len());
277	Ok(results)
278}