Skip to main content

reifydb_sub_flow/operator/
ffi.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4#![cfg(reifydb_target = "native")]
5// SPDX-License-Identifier: Apache-2.0
6// Copyright (c) 2025 ReifyDB
7
8//! FFI operator implementation that bridges FFI operators with ReifyDB
9
10use std::{
11	cell::RefCell,
12	ffi::c_void,
13	panic::{AssertUnwindSafe, catch_unwind},
14	process::abort,
15	result::Result as StdResult,
16};
17
18use reifydb_abi::{
19	context::context::ContextFFI,
20	data::column::ColumnsFFI,
21	flow::change::ChangeFFI,
22	operator::{descriptor::OperatorDescriptorFFI, vtable::OperatorVTableFFI},
23};
24use reifydb_core::{
25	interface::{catalog::flow::FlowNodeId, change::Change},
26	value::column::columns::Columns,
27};
28use reifydb_engine::vm::executor::Executor;
29use reifydb_sdk::{error::FFIError, ffi::arena::Arena};
30use reifydb_type::{
31	Result,
32	value::{datetime::DateTime, row_number::RowNumber},
33};
34use tracing::{Span, error, field, instrument};
35
36use crate::{
37	ffi::{callbacks::create_host_callbacks, context::new_ffi_context},
38	operator::Operator,
39	transaction::FlowTransaction,
40};
41
42/// FFI operator that wraps an external operator implementation
43pub struct FFIOperator {
44	/// Operator descriptor from the FFI library
45	descriptor: OperatorDescriptorFFI,
46	/// Virtual function table for calling FFI functions
47	vtable: OperatorVTableFFI,
48	/// Pointer to the FFI operator instance
49	instance: *mut c_void,
50	/// ID for this operator
51	operator_id: FlowNodeId,
52	/// Executor for RQL execution via FFI callbacks
53	executor: Executor,
54	/// Arena for type conversions
55	arena: RefCell<Arena>,
56}
57
58impl FFIOperator {
59	/// Create a new FFI operator
60	pub fn new(
61		descriptor: OperatorDescriptorFFI,
62		instance: *mut c_void,
63		operator_id: FlowNodeId,
64		executor: Executor,
65	) -> Self {
66		let vtable = descriptor.vtable;
67
68		Self {
69			descriptor,
70			vtable,
71			instance,
72			operator_id,
73			executor,
74			arena: RefCell::new(Arena::new()),
75		}
76	}
77
78	/// Get the operator descriptor
79	pub(crate) fn descriptor(&self) -> &OperatorDescriptorFFI {
80		&self.descriptor
81	}
82}
83
84// SAFETY: FFIOperator is only accessed from a single actor at a time.
85// The raw pointer and RefCell<Arena> are not shared across threads.
86unsafe impl Send for FFIOperator {}
87unsafe impl Sync for FFIOperator {}
88
89impl Drop for FFIOperator {
90	fn drop(&mut self) {
91		// Call the destroy function from the vtable to clean up the FFI operator instance
92		if !self.instance.is_null() {
93			(self.vtable.destroy)(self.instance);
94		}
95	}
96}
97
98/// Marshal a flow change to FFI format
99#[inline]
100#[instrument(name = "flow::ffi::marshal", level = "trace", skip_all)]
101fn marshal_input(arena: &mut Arena, change: &Change) -> ChangeFFI {
102	arena.marshal_change(change)
103}
104
105/// Call the FFI vtable apply function
106#[inline]
107#[instrument(name = "flow::ffi::vtable_call", level = "trace", skip_all, fields(operator_id = operator_id.0))]
108fn call_vtable(
109	vtable: &OperatorVTableFFI,
110	instance: *mut c_void,
111	ffi_ctx_ptr: *mut ContextFFI,
112	ffi_input: &ChangeFFI,
113	ffi_output: &mut ChangeFFI,
114	operator_id: FlowNodeId,
115) -> i32 {
116	let result = catch_unwind(AssertUnwindSafe(|| (vtable.apply)(instance, ffi_ctx_ptr, ffi_input, ffi_output)));
117
118	match result {
119		Ok(code) => code,
120		Err(panic_info) => {
121			let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
122				s.to_string()
123			} else if let Some(s) = panic_info.downcast_ref::<String>() {
124				s.clone()
125			} else {
126				"Unknown panic".to_string()
127			};
128			error!(operator_id = operator_id.0, "FFI operator panicked during apply: {}", msg);
129			abort();
130		}
131	}
132}
133
134/// Unmarshal FFI output to Change
135#[inline]
136#[instrument(name = "flow::ffi::unmarshal", level = "trace", skip_all)]
137fn unmarshal_output(arena: &mut Arena, ffi_output: &ChangeFFI) -> StdResult<Change, String> {
138	arena.unmarshal_change(ffi_output)
139}
140
141impl Operator for FFIOperator {
142	fn id(&self) -> FlowNodeId {
143		self.operator_id
144	}
145
146	#[instrument(name = "flow::ffi::apply", level = "debug", skip_all, fields(
147		operator_id = self.operator_id.0,
148		input_diff_count = change.diffs.len(),
149		output_diff_count = field::Empty
150	))]
151	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
152		let mut arena = self.arena.borrow_mut();
153
154		let ffi_input = marshal_input(&mut arena, &change);
155
156		let mut ffi_output = ChangeFFI::empty();
157
158		let ffi_ctx = new_ffi_context(txn, &self.executor, self.operator_id, create_host_callbacks());
159		let ffi_ctx_ptr = &ffi_ctx as *const _ as *mut ContextFFI;
160
161		let result_code = call_vtable(
162			&self.vtable,
163			self.instance,
164			ffi_ctx_ptr,
165			&ffi_input,
166			&mut ffi_output,
167			self.operator_id,
168		);
169
170		if result_code != 0 {
171			return Err(
172				FFIError::Other(format!("FFI operator apply failed with code: {}", result_code)).into()
173			);
174		}
175
176		let output_change = unmarshal_output(&mut arena, &ffi_output).map_err(|e| FFIError::Other(e))?;
177
178		// Clear the arena after operation
179		arena.clear();
180
181		Span::current().record("output_diff_count", output_change.diffs.len());
182
183		Ok(output_change)
184	}
185
186	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
187		let mut arena = self.arena.borrow_mut();
188
189		let row_numbers: Vec<u64> = rows.iter().map(|r| (*r).into()).collect();
190
191		let mut ffi_output = ColumnsFFI::empty();
192
193		let ffi_ctx = new_ffi_context(txn, &self.executor, self.operator_id, create_host_callbacks());
194		let ffi_ctx_ptr = &ffi_ctx as *const _ as *mut ContextFFI;
195
196		// Call FFI pull function
197		let result = catch_unwind(AssertUnwindSafe(|| {
198			(self.vtable.pull)(
199				self.instance,
200				ffi_ctx_ptr,
201				row_numbers.as_ptr(),
202				row_numbers.len(),
203				&mut ffi_output,
204			)
205		}));
206
207		let result_code = match result {
208			Ok(code) => code,
209			Err(panic_info) => {
210				let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
211					s.to_string()
212				} else if let Some(s) = panic_info.downcast_ref::<String>() {
213					s.clone()
214				} else {
215					"Unknown panic".to_string()
216				};
217				error!(operator_id = self.operator_id.0, "FFI operator panicked during pull: {}", msg);
218				abort();
219			}
220		};
221
222		if result_code != 0 {
223			return Err(
224				FFIError::Other(format!("FFI operator pull failed with code: {}", result_code)).into()
225			);
226		}
227
228		// Unmarshal the columns
229		let columns = arena.unmarshal_columns(&ffi_output);
230
231		// Clear the arena's arena after operation
232		arena.clear();
233
234		Ok(columns)
235	}
236
237	#[instrument(name = "flow::ffi::tick", level = "debug", skip_all, fields(
238		operator_id = self.operator_id.0,
239		output_diff_count = field::Empty
240	))]
241	fn tick(&self, txn: &mut FlowTransaction, timestamp: DateTime) -> Result<Option<Change>> {
242		let mut arena = self.arena.borrow_mut();
243
244		let timestamp_nanos = timestamp.to_nanos();
245		let mut ffi_output = ChangeFFI::empty();
246
247		let ffi_ctx = new_ffi_context(txn, &self.executor, self.operator_id, create_host_callbacks());
248		let ffi_ctx_ptr = &ffi_ctx as *const _ as *mut ContextFFI;
249
250		let result = catch_unwind(AssertUnwindSafe(|| {
251			(self.vtable.tick)(self.instance, ffi_ctx_ptr, timestamp_nanos, &mut ffi_output)
252		}));
253
254		let result_code = match result {
255			Ok(code) => code,
256			Err(panic_info) => {
257				let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
258					s.to_string()
259				} else if let Some(s) = panic_info.downcast_ref::<String>() {
260					s.clone()
261				} else {
262					"Unknown panic".to_string()
263				};
264				error!(operator_id = self.operator_id.0, "FFI operator panicked during tick: {}", msg);
265				abort();
266			}
267		};
268
269		if result_code < 0 {
270			return Err(
271				FFIError::Other(format!("FFI operator tick failed with code: {}", result_code)).into()
272			);
273		}
274
275		if result_code == 1 {
276			// No output (no-op)
277			arena.clear();
278			return Ok(None);
279		}
280
281		let output_change = unmarshal_output(&mut arena, &ffi_output).map_err(|e| FFIError::Other(e))?;
282
283		arena.clear();
284
285		Span::current().record("output_diff_count", output_change.diffs.len());
286
287		Ok(Some(output_change))
288	}
289}