reifydb_engine/procedure/
ffi.rs1#![cfg(reifydb_target = "native")]
2use std::{
8 cell::RefCell,
9 ffi::c_void,
10 panic::{AssertUnwindSafe, catch_unwind},
11 process::abort,
12};
13
14use reifydb_abi::{
15 callbacks::{
16 catalog::CatalogCallbacks, host::HostCallbacks, log::LogCallbacks, memory::MemoryCallbacks,
17 rql::RqlCallbacks, state::StateCallbacks, store::StoreCallbacks,
18 },
19 constants::FFI_ERROR_INTERNAL,
20 context::context::ContextFFI,
21 data::{buffer::BufferFFI, column::ColumnsFFI},
22 procedure::{descriptor::ProcedureDescriptorFFI, vtable::ProcedureVTableFFI},
23};
24use reifydb_core::value::column::columns::Columns;
25use reifydb_sdk::ffi::arena::Arena;
26use reifydb_transaction::transaction::Transaction;
27use reifydb_type;
28use tracing::{error, instrument};
29
30use super::{Procedure, context::ProcedureContext};
31use crate::{
32 ffi::callbacks::{logging, memory, rql},
33 vm::executor::Executor,
34};
35
36pub struct NativeProcedureFFI {
38 #[allow(dead_code)]
40 descriptor: ProcedureDescriptorFFI,
41 vtable: ProcedureVTableFFI,
43 instance: *mut c_void,
45 arena: RefCell<Arena>,
47}
48
49impl NativeProcedureFFI {
50 pub fn new(descriptor: ProcedureDescriptorFFI, instance: *mut c_void) -> Self {
52 let vtable = descriptor.vtable;
53
54 Self {
55 descriptor,
56 vtable,
57 instance,
58 arena: RefCell::new(Arena::new()),
59 }
60 }
61}
62
63unsafe impl Send for NativeProcedureFFI {}
65unsafe impl Sync for NativeProcedureFFI {}
66
67impl Drop for NativeProcedureFFI {
68 fn drop(&mut self) {
69 if !self.instance.is_null() {
70 (self.vtable.destroy)(self.instance);
71 }
72 }
73}
74
75fn create_procedure_host_callbacks() -> HostCallbacks {
80 HostCallbacks {
81 memory: MemoryCallbacks {
82 alloc: memory::host_alloc,
83 free: memory::host_free,
84 realloc: memory::host_realloc,
85 },
86 state: stub_state_callbacks(),
87 log: LogCallbacks {
88 message: logging::host_log_message,
89 },
90 store: stub_store_callbacks(),
91 catalog: stub_catalog_callbacks(),
92 rql: RqlCallbacks {
93 rql: rql::host_rql,
94 },
95 }
96}
97
98impl Procedure for NativeProcedureFFI {
99 #[instrument(name = "procedure::ffi::call", level = "debug", skip_all)]
100 fn call(&self, ctx: &ProcedureContext, tx: &mut Transaction<'_>) -> Result<Columns> {
101 let mut arena = self.arena.borrow_mut();
102
103 memory::set_current_arena(&mut *arena as *mut Arena);
105
106 let params_bytes = to_stdvec(ctx.params)
108 .map_err(|e| FFIError::Other(format!("Failed to serialize params: {}", e)))?;
109
110 let callbacks = create_procedure_host_callbacks();
112 let mut ctx_ffi = ContextFFI {
113 txn_ptr: tx as *mut Transaction<'_> as *mut c_void,
114 executor_ptr: ctx.executor as *const Executor as *const c_void,
115 operator_id: 0,
116 callbacks,
117 };
118
119 let mut ffi_output = ColumnsFFI::empty();
120
121 let result = catch_unwind(AssertUnwindSafe(|| {
122 (self.vtable.call)(
123 self.instance,
124 &mut ctx_ffi,
125 params_bytes.as_ptr(),
126 params_bytes.len(),
127 &mut ffi_output,
128 )
129 }));
130
131 let result_code = match result {
132 Ok(code) => code,
133 Err(panic_info) => {
134 let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
135 s.to_string()
136 } else if let Some(s) = panic_info.downcast_ref::<String>() {
137 s.clone()
138 } else {
139 "Unknown panic".to_string()
140 };
141 error!("FFI procedure panicked during call: {}", msg);
142 abort();
143 }
144 };
145
146 if result_code != 0 {
147 memory::clear_current_arena();
148 arena.clear();
149 return Err(
150 FFIError::Other(format!("FFI procedure call failed with code: {}", result_code)).into()
151 );
152 }
153
154 let columns = arena.unmarshal_columns(&ffi_output);
155
156 memory::clear_current_arena();
157 arena.clear();
158
159 Ok(columns)
160 }
161}
162
163use postcard::to_stdvec;
166use reifydb_abi::{
167 catalog::{namespace::NamespaceFFI, table::TableFFI},
168 context::iterators::{StateIteratorFFI, StoreIteratorFFI},
169};
170use reifydb_sdk::error::FFIError;
171use reifydb_type::Result;
172
173fn stub_state_callbacks() -> StateCallbacks {
174 StateCallbacks {
175 get: stub_state_get,
176 set: stub_state_set,
177 remove: stub_state_remove,
178 clear: stub_state_clear,
179 prefix: stub_state_prefix,
180 range: stub_state_range,
181 iterator_next: stub_state_iterator_next,
182 iterator_free: stub_state_iterator_free,
183 }
184}
185
186extern "C" fn stub_state_get(_: u64, _: *mut ContextFFI, _: *const u8, _: usize, _: *mut BufferFFI) -> i32 {
187 FFI_ERROR_INTERNAL
188}
189extern "C" fn stub_state_set(_: u64, _: *mut ContextFFI, _: *const u8, _: usize, _: *const u8, _: usize) -> i32 {
190 FFI_ERROR_INTERNAL
191}
192extern "C" fn stub_state_remove(_: u64, _: *mut ContextFFI, _: *const u8, _: usize) -> i32 {
193 FFI_ERROR_INTERNAL
194}
195extern "C" fn stub_state_clear(_: u64, _: *mut ContextFFI) -> i32 {
196 FFI_ERROR_INTERNAL
197}
198extern "C" fn stub_state_prefix(
199 _: u64,
200 _: *mut ContextFFI,
201 _: *const u8,
202 _: usize,
203 _: *mut *mut StateIteratorFFI,
204) -> i32 {
205 FFI_ERROR_INTERNAL
206}
207extern "C" fn stub_state_range(
208 _: u64,
209 _: *mut ContextFFI,
210 _: *const u8,
211 _: usize,
212 _: u8,
213 _: *const u8,
214 _: usize,
215 _: u8,
216 _: *mut *mut StateIteratorFFI,
217) -> i32 {
218 FFI_ERROR_INTERNAL
219}
220extern "C" fn stub_state_iterator_next(_: *mut StateIteratorFFI, _: *mut BufferFFI, _: *mut BufferFFI) -> i32 {
221 FFI_ERROR_INTERNAL
222}
223extern "C" fn stub_state_iterator_free(_: *mut StateIteratorFFI) {}
224
225fn stub_store_callbacks() -> StoreCallbacks {
226 StoreCallbacks {
227 get: stub_store_get,
228 contains_key: stub_store_contains_key,
229 prefix: stub_store_prefix,
230 range: stub_store_range,
231 iterator_next: stub_store_iterator_next,
232 iterator_free: stub_store_iterator_free,
233 }
234}
235
236extern "C" fn stub_store_get(_: *mut ContextFFI, _: *const u8, _: usize, _: *mut BufferFFI) -> i32 {
237 FFI_ERROR_INTERNAL
238}
239extern "C" fn stub_store_contains_key(_: *mut ContextFFI, _: *const u8, _: usize, _: *mut u8) -> i32 {
240 FFI_ERROR_INTERNAL
241}
242extern "C" fn stub_store_prefix(_: *mut ContextFFI, _: *const u8, _: usize, _: *mut *mut StoreIteratorFFI) -> i32 {
243 FFI_ERROR_INTERNAL
244}
245extern "C" fn stub_store_range(
246 _: *mut ContextFFI,
247 _: *const u8,
248 _: usize,
249 _: u8,
250 _: *const u8,
251 _: usize,
252 _: u8,
253 _: *mut *mut StoreIteratorFFI,
254) -> i32 {
255 FFI_ERROR_INTERNAL
256}
257extern "C" fn stub_store_iterator_next(_: *mut StoreIteratorFFI, _: *mut BufferFFI, _: *mut BufferFFI) -> i32 {
258 FFI_ERROR_INTERNAL
259}
260extern "C" fn stub_store_iterator_free(_: *mut StoreIteratorFFI) {}
261
262fn stub_catalog_callbacks() -> CatalogCallbacks {
263 CatalogCallbacks {
264 find_namespace: stub_catalog_find_namespace,
265 find_namespace_by_name: stub_catalog_find_namespace_by_name,
266 find_table: stub_catalog_find_table,
267 find_table_by_name: stub_catalog_find_table_by_name,
268 free_namespace: stub_catalog_free_namespace,
269 free_table: stub_catalog_free_table,
270 }
271}
272
273extern "C" fn stub_catalog_find_namespace(_: *mut ContextFFI, _: u64, _: u64, _: *mut NamespaceFFI) -> i32 {
274 FFI_ERROR_INTERNAL
275}
276extern "C" fn stub_catalog_find_namespace_by_name(
277 _: *mut ContextFFI,
278 _: *const u8,
279 _: usize,
280 _: u64,
281 _: *mut NamespaceFFI,
282) -> i32 {
283 FFI_ERROR_INTERNAL
284}
285extern "C" fn stub_catalog_find_table(_: *mut ContextFFI, _: u64, _: u64, _: *mut TableFFI) -> i32 {
286 FFI_ERROR_INTERNAL
287}
288extern "C" fn stub_catalog_find_table_by_name(
289 _: *mut ContextFFI,
290 _: u64,
291 _: *const u8,
292 _: usize,
293 _: u64,
294 _: *mut TableFFI,
295) -> i32 {
296 FFI_ERROR_INTERNAL
297}
298extern "C" fn stub_catalog_free_namespace(_: *mut NamespaceFFI) {}
299extern "C" fn stub_catalog_free_table(_: *mut TableFFI) {}