Skip to main content

reifydb_sub_flow/operator/
ffi.rs

1#![cfg(reifydb_target = "native")]
2// SPDX-License-Identifier: Apache-2.0
3// Copyright (c) 2025 ReifyDB
4
5//! FFI operator implementation that bridges FFI operators with ReifyDB
6
7use std::{
8	cell::RefCell,
9	ffi::c_void,
10	panic::{AssertUnwindSafe, catch_unwind},
11	process::abort,
12	result::Result as StdResult,
13};
14
15use reifydb_abi::{
16	context::context::ContextFFI,
17	data::column::ColumnsFFI,
18	flow::change::ChangeFFI,
19	operator::{descriptor::OperatorDescriptorFFI, vtable::OperatorVTableFFI},
20};
21use reifydb_core::{
22	interface::{catalog::flow::FlowNodeId, change::Change},
23	value::column::columns::Columns,
24};
25use reifydb_engine::vm::executor::Executor;
26use reifydb_sdk::{error::FFIError, ffi::arena::Arena};
27use reifydb_type::{Result, value::row_number::RowNumber};
28use tracing::{Span, error, field, instrument};
29
30use crate::{
31	ffi::{callbacks::create_host_callbacks, context::new_ffi_context},
32	operator::Operator,
33	transaction::FlowTransaction,
34};
35
36/// FFI operator that wraps an external operator implementation
37pub struct FFIOperator {
38	/// Operator descriptor from the FFI library
39	descriptor: OperatorDescriptorFFI,
40	/// Virtual function table for calling FFI functions
41	vtable: OperatorVTableFFI,
42	/// Pointer to the FFI operator instance
43	instance: *mut c_void,
44	/// ID for this operator
45	operator_id: FlowNodeId,
46	/// Executor for RQL execution via FFI callbacks
47	executor: Executor,
48	/// Arena for type conversions
49	arena: RefCell<Arena>,
50}
51
52impl FFIOperator {
53	/// Create a new FFI operator
54	pub fn new(
55		descriptor: OperatorDescriptorFFI,
56		instance: *mut c_void,
57		operator_id: FlowNodeId,
58		executor: Executor,
59	) -> Self {
60		let vtable = descriptor.vtable;
61
62		Self {
63			descriptor,
64			vtable,
65			instance,
66			operator_id,
67			executor,
68			arena: RefCell::new(Arena::new()),
69		}
70	}
71
72	/// Get the operator descriptor
73	pub(crate) fn descriptor(&self) -> &OperatorDescriptorFFI {
74		&self.descriptor
75	}
76}
77
78// SAFETY: FFIOperator is only accessed from a single actor at a time.
79// The raw pointer and RefCell<Arena> are not shared across threads.
80unsafe impl Send for FFIOperator {}
81unsafe impl Sync for FFIOperator {}
82
83impl Drop for FFIOperator {
84	fn drop(&mut self) {
85		// Call the destroy function from the vtable to clean up the FFI operator instance
86		if !self.instance.is_null() {
87			(self.vtable.destroy)(self.instance);
88		}
89	}
90}
91
92/// Marshal a flow change to FFI format
93#[inline]
94#[instrument(name = "flow::ffi::marshal", level = "trace", skip_all)]
95fn marshal_input(arena: &mut Arena, change: &Change) -> ChangeFFI {
96	arena.marshal_change(change)
97}
98
99/// Call the FFI vtable apply function
100#[inline]
101#[instrument(name = "flow::ffi::vtable_call", level = "trace", skip_all, fields(operator_id = operator_id.0))]
102fn call_vtable(
103	vtable: &OperatorVTableFFI,
104	instance: *mut c_void,
105	ffi_ctx_ptr: *mut ContextFFI,
106	ffi_input: &ChangeFFI,
107	ffi_output: &mut ChangeFFI,
108	operator_id: FlowNodeId,
109) -> i32 {
110	let result = catch_unwind(AssertUnwindSafe(|| (vtable.apply)(instance, ffi_ctx_ptr, ffi_input, ffi_output)));
111
112	match result {
113		Ok(code) => code,
114		Err(panic_info) => {
115			let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
116				s.to_string()
117			} else if let Some(s) = panic_info.downcast_ref::<String>() {
118				s.clone()
119			} else {
120				"Unknown panic".to_string()
121			};
122			error!(operator_id = operator_id.0, "FFI operator panicked during apply: {}", msg);
123			abort();
124		}
125	}
126}
127
128/// Unmarshal FFI output to Change
129#[inline]
130#[instrument(name = "flow::ffi::unmarshal", level = "trace", skip_all)]
131fn unmarshal_output(arena: &mut Arena, ffi_output: &ChangeFFI) -> StdResult<Change, String> {
132	arena.unmarshal_change(ffi_output)
133}
134
135impl Operator for FFIOperator {
136	fn id(&self) -> FlowNodeId {
137		self.operator_id
138	}
139
140	#[instrument(name = "flow::ffi::apply", level = "debug", skip_all, fields(
141		operator_id = self.operator_id.0,
142		input_diff_count = change.diffs.len(),
143		output_diff_count = field::Empty
144	))]
145	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
146		let mut arena = self.arena.borrow_mut();
147
148		let ffi_input = marshal_input(&mut arena, &change);
149
150		let mut ffi_output = ChangeFFI::empty();
151
152		let ffi_ctx = new_ffi_context(txn, &self.executor, self.operator_id, create_host_callbacks());
153		let ffi_ctx_ptr = &ffi_ctx as *const _ as *mut ContextFFI;
154
155		let result_code = call_vtable(
156			&self.vtable,
157			self.instance,
158			ffi_ctx_ptr,
159			&ffi_input,
160			&mut ffi_output,
161			self.operator_id,
162		);
163
164		if result_code != 0 {
165			return Err(
166				FFIError::Other(format!("FFI operator apply failed with code: {}", result_code)).into()
167			);
168		}
169
170		let output_change = unmarshal_output(&mut arena, &ffi_output).map_err(|e| FFIError::Other(e))?;
171
172		// Clear the arena after operation
173		arena.clear();
174
175		Span::current().record("output_diff_count", output_change.diffs.len());
176
177		Ok(output_change)
178	}
179
180	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
181		let mut arena = self.arena.borrow_mut();
182
183		let row_numbers: Vec<u64> = rows.iter().map(|r| (*r).into()).collect();
184
185		let mut ffi_output = ColumnsFFI::empty();
186
187		let ffi_ctx = new_ffi_context(txn, &self.executor, self.operator_id, create_host_callbacks());
188		let ffi_ctx_ptr = &ffi_ctx as *const _ as *mut ContextFFI;
189
190		// Call FFI pull function
191		let result = catch_unwind(AssertUnwindSafe(|| {
192			(self.vtable.pull)(
193				self.instance,
194				ffi_ctx_ptr,
195				row_numbers.as_ptr(),
196				row_numbers.len(),
197				&mut ffi_output,
198			)
199		}));
200
201		let result_code = match result {
202			Ok(code) => code,
203			Err(panic_info) => {
204				let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
205					s.to_string()
206				} else if let Some(s) = panic_info.downcast_ref::<String>() {
207					s.clone()
208				} else {
209					"Unknown panic".to_string()
210				};
211				error!(operator_id = self.operator_id.0, "FFI operator panicked during pull: {}", msg);
212				abort();
213			}
214		};
215
216		if result_code != 0 {
217			return Err(
218				FFIError::Other(format!("FFI operator pull failed with code: {}", result_code)).into()
219			);
220		}
221
222		// Unmarshal the columns
223		let columns = arena.unmarshal_columns(&ffi_output);
224
225		// Clear the arena's arena after operation
226		arena.clear();
227
228		Ok(columns)
229	}
230}