reifydb_flow_operator_sdk/
store.rs

1//! Store access for FFI operators
2//!
3//! Provides read-only access to the underlying store,
4//! allowing operators to query data beyond their own state.
5
6use std::{ops::Bound, ptr::null_mut, slice::from_raw_parts};
7
8use reifydb_core::{
9	CowVec,
10	value::encoded::{EncodedKey, EncodedValues},
11};
12use reifydb_flow_operator_abi::{BufferFFI, StoreIteratorFFI};
13
14use crate::{
15	context::OperatorContext,
16	error::{FFIError, Result},
17};
18
19/// Store accessor providing read-only access to the underlying store
20pub struct Store<'a> {
21	ctx: &'a mut OperatorContext,
22}
23
24impl<'a> Store<'a> {
25	pub(crate) fn new(ctx: &'a mut OperatorContext) -> Self {
26		Self {
27			ctx,
28		}
29	}
30
31	pub fn get(&self, key: &EncodedKey) -> Result<Option<EncodedValues>> {
32		raw_store_get(self.ctx, key)
33	}
34
35	pub fn contains_key(&self, key: &EncodedKey) -> Result<bool> {
36		raw_store_contains_key(self.ctx, key)
37	}
38
39	pub fn prefix(&self, prefix: &EncodedKey) -> Result<Vec<(EncodedKey, EncodedValues)>> {
40		raw_store_prefix(self.ctx, prefix)
41	}
42
43	pub fn range(
44		&self,
45		start: Bound<&EncodedKey>,
46		end: Bound<&EncodedKey>,
47	) -> Result<Vec<(EncodedKey, EncodedValues)>> {
48		raw_store_range(self.ctx, start, end)
49	}
50}
51
52/// Get a value from store by key
53fn raw_store_get(ctx: &OperatorContext, key: &EncodedKey) -> Result<Option<EncodedValues>> {
54	let key_bytes = key.as_bytes();
55	let mut output = BufferFFI {
56		ptr: null_mut(),
57		len: 0,
58		cap: 0,
59	};
60
61	unsafe {
62		let result =
63			((*ctx.ctx).callbacks.store.get)(ctx.ctx, key_bytes.as_ptr(), key_bytes.len(), &mut output);
64
65		if result == 0 {
66			// Success - value found
67			if output.ptr.is_null() || output.len == 0 {
68				Ok(None)
69			} else {
70				let value_bytes = from_raw_parts(output.ptr, output.len).to_vec();
71				// Free the buffer allocated by host
72				((*ctx.ctx).callbacks.memory.free)(output.ptr as *mut u8, output.len);
73				Ok(Some(EncodedValues(CowVec::new(value_bytes))))
74			}
75		} else if result == 1 {
76			// Key not found
77			Ok(None)
78		} else {
79			Err(FFIError::Other(format!("host_store_get failed with code {}", result)))
80		}
81	}
82}
83
84/// Check if a key exists in store
85fn raw_store_contains_key(ctx: &OperatorContext, key: &EncodedKey) -> Result<bool> {
86	let key_bytes = key.as_bytes();
87	let mut result_byte: u8 = 0;
88
89	unsafe {
90		let result = ((*ctx.ctx).callbacks.store.contains_key)(
91			ctx.ctx,
92			key_bytes.as_ptr(),
93			key_bytes.len(),
94			&mut result_byte,
95		);
96
97		if result == 0 {
98			Ok(result_byte != 0)
99		} else {
100			Err(FFIError::Other(format!("host_store_contains_key failed with code {}", result)))
101		}
102	}
103}
104
105/// Scan all keys with a given prefix
106fn raw_store_prefix(ctx: &OperatorContext, prefix: &EncodedKey) -> Result<Vec<(EncodedKey, EncodedValues)>> {
107	let prefix_bytes = prefix.as_bytes();
108	let mut iterator: *mut StoreIteratorFFI = null_mut();
109
110	unsafe {
111		let result = ((*ctx.ctx).callbacks.store.prefix)(
112			ctx.ctx,
113			prefix_bytes.as_ptr(),
114			prefix_bytes.len(),
115			&mut iterator,
116		);
117
118		if result != 0 {
119			return Err(FFIError::Other(format!("host_store_prefix failed with code {}", result)));
120		}
121
122		collect_iterator_results(ctx, iterator)
123	}
124}
125
126/// Bound type constants for FFI
127const BOUND_UNBOUNDED: u8 = 0;
128const BOUND_INCLUDED: u8 = 1;
129const BOUND_EXCLUDED: u8 = 2;
130
131/// Scan all keys within a range
132fn raw_store_range(
133	ctx: &OperatorContext,
134	start: Bound<&EncodedKey>,
135	end: Bound<&EncodedKey>,
136) -> Result<Vec<(EncodedKey, EncodedValues)>> {
137	let mut iterator: *mut StoreIteratorFFI = null_mut();
138
139	unsafe {
140		let (start_ptr, start_len, start_bound_type) = match start {
141			Bound::Unbounded => (std::ptr::null(), 0, BOUND_UNBOUNDED),
142			Bound::Included(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_INCLUDED),
143			Bound::Excluded(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_EXCLUDED),
144		};
145
146		let (end_ptr, end_len, end_bound_type) = match end {
147			Bound::Unbounded => (std::ptr::null(), 0, BOUND_UNBOUNDED),
148			Bound::Included(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_INCLUDED),
149			Bound::Excluded(key) => (key.as_bytes().as_ptr(), key.as_bytes().len(), BOUND_EXCLUDED),
150		};
151
152		let result = ((*ctx.ctx).callbacks.store.range)(
153			ctx.ctx,
154			start_ptr,
155			start_len,
156			start_bound_type,
157			end_ptr,
158			end_len,
159			end_bound_type,
160			&mut iterator,
161		);
162
163		if result != 0 {
164			return Err(FFIError::Other(format!("host_store_range failed with code {}", result)));
165		}
166
167		collect_iterator_results(ctx, iterator)
168	}
169}
170
171/// Helper to collect all results from a store iterator
172///
173/// # Safety
174/// - iterator must be a valid pointer returned by a store prefix/range call
175/// - ctx must have valid callbacks
176unsafe fn collect_iterator_results(
177	ctx: &OperatorContext,
178	iterator: *mut StoreIteratorFFI,
179) -> Result<Vec<(EncodedKey, EncodedValues)>> {
180	if iterator.is_null() {
181		return Ok(Vec::new());
182	}
183
184	let mut results = Vec::new();
185
186	loop {
187		let mut key_buf = BufferFFI {
188			ptr: null_mut(),
189			len: 0,
190			cap: 0,
191		};
192		let mut value_buf = BufferFFI {
193			ptr: null_mut(),
194			len: 0,
195			cap: 0,
196		};
197
198		let next_result =
199			unsafe { ((*ctx.ctx).callbacks.store.iterator_next)(iterator, &mut key_buf, &mut value_buf) };
200
201		if next_result == 1 {
202			// End of iteration
203			break;
204		} else if next_result != 0 {
205			unsafe { ((*ctx.ctx).callbacks.store.iterator_free)(iterator) };
206			return Err(FFIError::Other(format!(
207				"host_store_iterator_next failed with code {}",
208				next_result
209			)));
210		}
211
212		// Convert buffers to owned data
213		if !key_buf.ptr.is_null() && key_buf.len > 0 {
214			let key_bytes = unsafe { from_raw_parts(key_buf.ptr, key_buf.len) }.to_vec();
215			let key = EncodedKey(CowVec::new(key_bytes));
216
217			let value = if !value_buf.ptr.is_null() && value_buf.len > 0 {
218				let value_bytes = unsafe { from_raw_parts(value_buf.ptr, value_buf.len) }.to_vec();
219				EncodedValues(CowVec::new(value_bytes))
220			} else {
221				EncodedValues(CowVec::new(Vec::new()))
222			};
223
224			// Free buffers allocated by host
225			unsafe { ((*ctx.ctx).callbacks.memory.free)(key_buf.ptr as *mut u8, key_buf.len) };
226			if !value_buf.ptr.is_null() && value_buf.len > 0 {
227				unsafe { ((*ctx.ctx).callbacks.memory.free)(value_buf.ptr as *mut u8, value_buf.len) };
228			}
229
230			results.push((key, value));
231		}
232	}
233
234	unsafe { ((*ctx.ctx).callbacks.store.iterator_free)(iterator) };
235	Ok(results)
236}