Skip to main content

reifydb_engine/procedure/
ffi.rs

1#![cfg(reifydb_target = "native")]
2// SPDX-License-Identifier: AGPL-3.0-or-later
3// Copyright (c) 2025 ReifyDB
4
5//! FFI procedure implementation that bridges native shared-library procedures with ReifyDB
6
7use std::{
8	cell::RefCell,
9	ffi::c_void,
10	panic::{AssertUnwindSafe, catch_unwind},
11	process::abort,
12};
13
14use reifydb_abi::{
15	callbacks::{
16		catalog::CatalogCallbacks, host::HostCallbacks, log::LogCallbacks, memory::MemoryCallbacks,
17		rql::RqlCallbacks, state::StateCallbacks, store::StoreCallbacks,
18	},
19	constants::FFI_ERROR_INTERNAL,
20	context::context::ContextFFI,
21	data::{buffer::BufferFFI, column::ColumnsFFI},
22	procedure::{descriptor::ProcedureDescriptorFFI, vtable::ProcedureVTableFFI},
23};
24use reifydb_core::value::column::columns::Columns;
25use reifydb_sdk::ffi::arena::Arena;
26use reifydb_transaction::transaction::Transaction;
27use reifydb_type;
28use tracing::{error, instrument};
29
30use super::{Procedure, context::ProcedureContext};
31use crate::{
32	ffi::callbacks::{logging, memory, rql},
33	vm::executor::Executor,
34};
35
36/// FFI procedure that wraps an external procedure implementation
37pub struct NativeProcedureFFI {
38	/// Procedure descriptor from the FFI library
39	#[allow(dead_code)]
40	descriptor: ProcedureDescriptorFFI,
41	/// Virtual function table for calling FFI functions
42	vtable: ProcedureVTableFFI,
43	/// Pointer to the FFI procedure instance
44	instance: *mut c_void,
45	/// Arena for type conversions
46	arena: RefCell<Arena>,
47}
48
49impl NativeProcedureFFI {
50	/// Create a new FFI procedure
51	pub fn new(descriptor: ProcedureDescriptorFFI, instance: *mut c_void) -> Self {
52		let vtable = descriptor.vtable;
53
54		Self {
55			descriptor,
56			vtable,
57			instance,
58			arena: RefCell::new(Arena::new()),
59		}
60	}
61}
62
63// SAFETY: NativeProcedureFFI is only accessed from a single context at a time.
64unsafe impl Send for NativeProcedureFFI {}
65unsafe impl Sync for NativeProcedureFFI {}
66
67impl Drop for NativeProcedureFFI {
68	fn drop(&mut self) {
69		if !self.instance.is_null() {
70			(self.vtable.destroy)(self.instance);
71		}
72	}
73}
74
75/// Create host callbacks for FFI procedures.
76///
77/// Uses real memory/logging/rql callbacks, and stubs for state/store/catalog
78/// (which are not relevant for procedure execution).
79fn create_procedure_host_callbacks() -> HostCallbacks {
80	HostCallbacks {
81		memory: MemoryCallbacks {
82			alloc: memory::host_alloc,
83			free: memory::host_free,
84			realloc: memory::host_realloc,
85		},
86		state: stub_state_callbacks(),
87		log: LogCallbacks {
88			message: logging::host_log_message,
89		},
90		store: stub_store_callbacks(),
91		catalog: stub_catalog_callbacks(),
92		rql: RqlCallbacks {
93			rql: rql::host_rql,
94		},
95	}
96}
97
98impl Procedure for NativeProcedureFFI {
99	#[instrument(name = "procedure::ffi::call", level = "debug", skip_all)]
100	fn call(&self, ctx: &ProcedureContext, tx: &mut Transaction<'_>) -> Result<Columns> {
101		let mut arena = self.arena.borrow_mut();
102
103		// Set thread-local arena for host_alloc
104		memory::set_current_arena(&mut *arena as *mut Arena);
105
106		// Serialize params to postcard bytes
107		let params_bytes = to_stdvec(ctx.params)
108			.map_err(|e| FFIError::Other(format!("Failed to serialize params: {}", e)))?;
109
110		// Build ContextFFI with real callbacks
111		let callbacks = create_procedure_host_callbacks();
112		let mut ctx_ffi = ContextFFI {
113			txn_ptr: tx as *mut Transaction<'_> as *mut c_void,
114			executor_ptr: ctx.executor as *const Executor as *const c_void,
115			operator_id: 0,
116			callbacks,
117		};
118
119		let mut ffi_output = ColumnsFFI::empty();
120
121		let result = catch_unwind(AssertUnwindSafe(|| {
122			(self.vtable.call)(
123				self.instance,
124				&mut ctx_ffi,
125				params_bytes.as_ptr(),
126				params_bytes.len(),
127				&mut ffi_output,
128			)
129		}));
130
131		let result_code = match result {
132			Ok(code) => code,
133			Err(panic_info) => {
134				let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
135					s.to_string()
136				} else if let Some(s) = panic_info.downcast_ref::<String>() {
137					s.clone()
138				} else {
139					"Unknown panic".to_string()
140				};
141				error!("FFI procedure panicked during call: {}", msg);
142				abort();
143			}
144		};
145
146		if result_code != 0 {
147			memory::clear_current_arena();
148			arena.clear();
149			return Err(
150				FFIError::Other(format!("FFI procedure call failed with code: {}", result_code)).into()
151			);
152		}
153
154		let columns = arena.unmarshal_columns(&ffi_output);
155
156		memory::clear_current_arena();
157		arena.clear();
158
159		Ok(columns)
160	}
161}
162
163// ---- Stub callbacks for state/store/catalog (not used by procedures) ----
164
165use postcard::to_stdvec;
166use reifydb_abi::{
167	catalog::{namespace::NamespaceFFI, table::TableFFI},
168	context::iterators::{StateIteratorFFI, StoreIteratorFFI},
169};
170use reifydb_sdk::error::FFIError;
171use reifydb_type::Result;
172
173fn stub_state_callbacks() -> StateCallbacks {
174	StateCallbacks {
175		get: stub_state_get,
176		set: stub_state_set,
177		remove: stub_state_remove,
178		clear: stub_state_clear,
179		prefix: stub_state_prefix,
180		range: stub_state_range,
181		iterator_next: stub_state_iterator_next,
182		iterator_free: stub_state_iterator_free,
183	}
184}
185
186extern "C" fn stub_state_get(_: u64, _: *mut ContextFFI, _: *const u8, _: usize, _: *mut BufferFFI) -> i32 {
187	FFI_ERROR_INTERNAL
188}
189extern "C" fn stub_state_set(_: u64, _: *mut ContextFFI, _: *const u8, _: usize, _: *const u8, _: usize) -> i32 {
190	FFI_ERROR_INTERNAL
191}
192extern "C" fn stub_state_remove(_: u64, _: *mut ContextFFI, _: *const u8, _: usize) -> i32 {
193	FFI_ERROR_INTERNAL
194}
195extern "C" fn stub_state_clear(_: u64, _: *mut ContextFFI) -> i32 {
196	FFI_ERROR_INTERNAL
197}
198extern "C" fn stub_state_prefix(
199	_: u64,
200	_: *mut ContextFFI,
201	_: *const u8,
202	_: usize,
203	_: *mut *mut StateIteratorFFI,
204) -> i32 {
205	FFI_ERROR_INTERNAL
206}
207extern "C" fn stub_state_range(
208	_: u64,
209	_: *mut ContextFFI,
210	_: *const u8,
211	_: usize,
212	_: u8,
213	_: *const u8,
214	_: usize,
215	_: u8,
216	_: *mut *mut StateIteratorFFI,
217) -> i32 {
218	FFI_ERROR_INTERNAL
219}
220extern "C" fn stub_state_iterator_next(_: *mut StateIteratorFFI, _: *mut BufferFFI, _: *mut BufferFFI) -> i32 {
221	FFI_ERROR_INTERNAL
222}
223extern "C" fn stub_state_iterator_free(_: *mut StateIteratorFFI) {}
224
225fn stub_store_callbacks() -> StoreCallbacks {
226	StoreCallbacks {
227		get: stub_store_get,
228		contains_key: stub_store_contains_key,
229		prefix: stub_store_prefix,
230		range: stub_store_range,
231		iterator_next: stub_store_iterator_next,
232		iterator_free: stub_store_iterator_free,
233	}
234}
235
236extern "C" fn stub_store_get(_: *mut ContextFFI, _: *const u8, _: usize, _: *mut BufferFFI) -> i32 {
237	FFI_ERROR_INTERNAL
238}
239extern "C" fn stub_store_contains_key(_: *mut ContextFFI, _: *const u8, _: usize, _: *mut u8) -> i32 {
240	FFI_ERROR_INTERNAL
241}
242extern "C" fn stub_store_prefix(_: *mut ContextFFI, _: *const u8, _: usize, _: *mut *mut StoreIteratorFFI) -> i32 {
243	FFI_ERROR_INTERNAL
244}
245extern "C" fn stub_store_range(
246	_: *mut ContextFFI,
247	_: *const u8,
248	_: usize,
249	_: u8,
250	_: *const u8,
251	_: usize,
252	_: u8,
253	_: *mut *mut StoreIteratorFFI,
254) -> i32 {
255	FFI_ERROR_INTERNAL
256}
257extern "C" fn stub_store_iterator_next(_: *mut StoreIteratorFFI, _: *mut BufferFFI, _: *mut BufferFFI) -> i32 {
258	FFI_ERROR_INTERNAL
259}
260extern "C" fn stub_store_iterator_free(_: *mut StoreIteratorFFI) {}
261
262fn stub_catalog_callbacks() -> CatalogCallbacks {
263	CatalogCallbacks {
264		find_namespace: stub_catalog_find_namespace,
265		find_namespace_by_name: stub_catalog_find_namespace_by_name,
266		find_table: stub_catalog_find_table,
267		find_table_by_name: stub_catalog_find_table_by_name,
268		free_namespace: stub_catalog_free_namespace,
269		free_table: stub_catalog_free_table,
270	}
271}
272
273extern "C" fn stub_catalog_find_namespace(_: *mut ContextFFI, _: u64, _: u64, _: *mut NamespaceFFI) -> i32 {
274	FFI_ERROR_INTERNAL
275}
276extern "C" fn stub_catalog_find_namespace_by_name(
277	_: *mut ContextFFI,
278	_: *const u8,
279	_: usize,
280	_: u64,
281	_: *mut NamespaceFFI,
282) -> i32 {
283	FFI_ERROR_INTERNAL
284}
285extern "C" fn stub_catalog_find_table(_: *mut ContextFFI, _: u64, _: u64, _: *mut TableFFI) -> i32 {
286	FFI_ERROR_INTERNAL
287}
288extern "C" fn stub_catalog_find_table_by_name(
289	_: *mut ContextFFI,
290	_: u64,
291	_: *const u8,
292	_: usize,
293	_: u64,
294	_: *mut TableFFI,
295) -> i32 {
296	FFI_ERROR_INTERNAL
297}
298extern "C" fn stub_catalog_free_namespace(_: *mut NamespaceFFI) {}
299extern "C" fn stub_catalog_free_table(_: *mut TableFFI) {}