Skip to main content

reifydb_sub_flow/operator/
ffi.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	any::Any,
6	cell::{Cell, UnsafeCell},
7	ffi::c_void,
8	panic::{AssertUnwindSafe, catch_unwind},
9	process::abort,
10	ptr,
11};
12
13use reifydb_abi::{
14	callbacks::builder::EmitDiffKind,
15	context::context::ContextFFI,
16	flow::change::ChangeFFI,
17	operator::{descriptor::OperatorDescriptorFFI, vtable::OperatorVTableFFI},
18};
19use reifydb_core::{
20	common::CommitVersion,
21	interface::{
22		catalog::flow::FlowNodeId,
23		change::{Change, Diff, Diffs},
24	},
25	value::column::columns::Columns,
26};
27use reifydb_engine::vm::executor::Executor;
28use reifydb_extension::ffi_callbacks::builder::{BuilderRegistry, with_registry};
29use reifydb_sdk::{error::FFIError, ffi::arena::Arena};
30use reifydb_type::{
31	Result,
32	value::{datetime::DateTime, row_number::RowNumber},
33};
34use tracing::{Span, error, field, instrument};
35
36use crate::{
37	ffi::{callbacks::create_host_callbacks, context::new_ffi_context},
38	operator::Operator,
39	transaction::{FlowTransaction, slot::PersistFn},
40};
41
42// One scratch arena per OS thread. Rayon worker threads each have their own,
43// so there is no cross-thread sharing even on the transactional par_iter path.
44// The arena is reset at the start of each new txn (in `ensure_txn_setup`)
45// rather than at txn end, which is equivalent because `apply` always returns
46// before the next txn begins.
47thread_local! {
48	static FFI_MARSHAL_ARENA: UnsafeCell<Arena> = UnsafeCell::new(Arena::new());
49}
50
51/// Send-safe wrapper around `*mut c_void` for the guest instance pointer.
52/// The pointer is opaque to Rust but the FFI ops it backs are guaranteed
53/// thread-safe by the FFI ABI contract (operators are accessed serially
54/// per their FFIOperator host wrapper).
55#[derive(Clone, Copy)]
56struct SendableInstance(*mut c_void);
57unsafe impl Send for SendableInstance {}
58unsafe impl Sync for SendableInstance {}
59
60/// FFI operator that wraps an external operator implementation
61pub struct FFIOperator {
62	/// Operator descriptor from the FFI library
63	descriptor: OperatorDescriptorFFI,
64	/// Virtual function table for calling FFI functions
65	vtable: OperatorVTableFFI,
66	/// Pointer to the FFI operator instance
67	instance: *mut c_void,
68	/// ID for this operator
69	operator_id: FlowNodeId,
70	/// Executor for RQL execution via FFI callbacks
71	executor: Executor,
72	/// Per-instance output builder registry. The guest builds output
73	/// columns via `BuilderCallbacks` (acquire/data_ptr/commit/emit_diff);
74	/// after the vtable call returns the host drains accumulated diffs
75	/// from this registry to assemble the output `Change`. See
76	/// `crates/sub-flow/src/ffi/callbacks/builder.rs`.
77	builder_registry: BuilderRegistry,
78	/// Version of the last `FlowTransaction` for which the flush slot and
79	/// FFI arena were registered. Compared on every `apply`/`pull`/`tick`
80	/// so the idempotent registration calls are skipped after the first
81	/// invocation per txn. `u64::MAX` as sentinel (no txn yet).
82	last_registered_txn: Cell<u64>,
83	/// Pre-built FFI context. `operator_id` and `callbacks` (all static
84	/// function pointers) are written once in `new` and never change.
85	/// `txn_ptr` and `executor_ptr` are updated once per txn in
86	/// `ensure_txn_setup` and reused for every `apply`/`pull`/`tick` call
87	/// in that txn, avoiding a full struct rebuild on each invocation.
88	cached_ctx: UnsafeCell<ContextFFI>,
89}
90
91impl FFIOperator {
92	/// Create a new FFI operator
93	pub fn new(
94		descriptor: OperatorDescriptorFFI,
95		instance: *mut c_void,
96		operator_id: FlowNodeId,
97		executor: Executor,
98	) -> Self {
99		let vtable = descriptor.vtable;
100
101		Self {
102			descriptor,
103			vtable,
104			instance,
105			operator_id,
106			executor,
107			builder_registry: BuilderRegistry::new(),
108			last_registered_txn: Cell::new(u64::MAX),
109			cached_ctx: UnsafeCell::new(ContextFFI {
110				txn_ptr: ptr::null_mut(),
111				executor_ptr: ptr::null(),
112				operator_id: operator_id.0,
113				clock_now_nanos: 0,
114				callbacks: create_host_callbacks(),
115			}),
116		}
117	}
118
119	/// Get the operator descriptor
120	pub(crate) fn descriptor(&self) -> &OperatorDescriptorFFI {
121		&self.descriptor
122	}
123
124	fn ensure_txn_setup(&self, txn: &mut FlowTransaction) -> Result<()> {
125		let txn_version = txn.version().0;
126		if self.last_registered_txn.get() != txn_version {
127			ensure_flush_slot(txn, self.operator_id, self.vtable, self.instance, self.executor.clone())?;
128			self.last_registered_txn.set(txn_version);
129			// SAFETY: single-threaded actor; no aliasing with guest (vtable not
130			// yet called this txn).
131			let ctx = unsafe { &mut *self.cached_ctx.get() };
132			ctx.txn_ptr = txn as *mut _ as *mut c_void;
133			ctx.executor_ptr = &self.executor as *const _ as *const c_void;
134			ctx.clock_now_nanos = txn.clock().now_nanos();
135		}
136		Ok(())
137	}
138}
139
140// SAFETY: FFIOperator is only accessed from a single actor at a time.
141// The raw pointer and RefCell<Arena> are not shared across threads.
142unsafe impl Send for FFIOperator {}
143unsafe impl Sync for FFIOperator {}
144
145impl Drop for FFIOperator {
146	fn drop(&mut self) {
147		// Call the destroy function from the vtable to clean up the FFI operator instance
148		if !self.instance.is_null() {
149			unsafe { (self.vtable.destroy)(self.instance) };
150		}
151	}
152}
153
154/// Marshal a flow change to FFI format
155#[inline]
156#[instrument(name = "flow::ffi::marshal", level = "trace", skip_all)]
157fn marshal_input(arena: &mut Arena, change: &Change) -> ChangeFFI {
158	arena.marshal_change(change)
159}
160
161/// Call the FFI vtable apply function
162#[inline]
163#[instrument(name = "flow::ffi::vtable_call", level = "trace", skip_all, fields(operator_id = operator_id.0))]
164fn call_vtable(
165	vtable: &OperatorVTableFFI,
166	instance: *mut c_void,
167	ffi_ctx_ptr: *mut ContextFFI,
168	ffi_input: &ChangeFFI,
169	operator_id: FlowNodeId,
170) -> i32 {
171	let result = catch_unwind(AssertUnwindSafe(|| unsafe { (vtable.apply)(instance, ffi_ctx_ptr, ffi_input) }));
172
173	match result {
174		Ok(code) => code,
175		Err(panic_info) => {
176			let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
177				s.to_string()
178			} else if let Some(s) = panic_info.downcast_ref::<String>() {
179				s.clone()
180			} else {
181				"Unknown panic".to_string()
182			};
183			error!(operator_id = operator_id.0, "FFI operator panicked during apply: {}", msg);
184			abort();
185		}
186	}
187}
188
189/// Ensure this FFI op has a state-flush slot registered in the txn cache.
190///
191/// Called at the top of every `apply`/`pull`/`tick`. Idempotent within a
192/// txn: the slot is only created on first access. The slot's persist
193/// closure (run at `flush_operator_states` time) constructs a fresh
194/// `ContextFFI` and invokes `vtable.flush_state` on the guest instance.
195///
196/// Marks the slot dirty unconditionally so commit always calls
197/// `flush_state`. Most FFI ops have a default no-op `flush_state` so this
198/// is cheap; stateful ops drain their `StateCache` dirty list there.
199fn ensure_flush_slot(
200	txn: &mut FlowTransaction,
201	operator_id: FlowNodeId,
202	vtable: OperatorVTableFFI,
203	instance: *mut c_void,
204	executor: Executor,
205) -> Result<()> {
206	let send_instance = SendableInstance(instance);
207	let _ = txn.operator_state(operator_id, move |_txn| {
208		let captured_instance = send_instance;
209		let captured_vtable = vtable;
210		let captured_executor = executor;
211		let captured_id = operator_id;
212		let persist: PersistFn = Box::new(move |txn, _value: Box<dyn Any>| {
213			let ffi_ctx = new_ffi_context(txn, &captured_executor, captured_id, create_host_callbacks());
214			let ffi_ctx_ptr = &ffi_ctx as *const _ as *mut ContextFFI;
215			let inst = captured_instance;
216			let result = catch_unwind(AssertUnwindSafe(|| unsafe {
217				(captured_vtable.flush_state)(inst.0, ffi_ctx_ptr)
218			}));
219			match result {
220				Ok(0) => Ok(()),
221				Ok(code) => Err(FFIError::Other(format!(
222					"FFI operator flush_state failed with code: {}",
223					code
224				))
225				.into()),
226				Err(_) => {
227					error!(operator_id = captured_id.0, "FFI operator panicked during flush_state");
228					abort();
229				}
230			}
231		});
232		// Slot value is unused for FFI ops; we only need the persist hook.
233		Ok(((), persist))
234	})?;
235	txn.mark_state_dirty(operator_id);
236	Ok(())
237}
238
239impl Operator for FFIOperator {
240	fn id(&self) -> FlowNodeId {
241		self.operator_id
242	}
243
244	#[instrument(name = "flow::ffi::apply", level = "debug", skip_all, fields(
245		operator_id = self.operator_id.0,
246		input_diff_count = change.diffs.len(),
247		output_diff_count = field::Empty
248	))]
249	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
250		self.ensure_txn_setup(txn)?;
251
252		// Reset the arena before each call so scaffolding memory is bounded
253		// to one call's worth regardless of how many changes flow through
254		// this txn. Bumpalo keeps the chunk after reset, so after the first
255		// call this is a single pointer write with no system allocation.
256		// SAFETY: single-threaded per operator; no live pointers from a prior
257		// call exist at this point (apply() returns before the next call).
258		FFI_MARSHAL_ARENA.with(|cell| unsafe { (*cell.get()).clear() });
259		let ffi_input = FFI_MARSHAL_ARENA.with(|cell| marshal_input(unsafe { &mut *cell.get() }, &change));
260
261		let version = change.version;
262		let changed_at = change.changed_at;
263
264		let ffi_ctx_ptr = self.cached_ctx.get();
265
266		let result_code = with_registry(&self.builder_registry, || {
267			call_vtable(&self.vtable, self.instance, ffi_ctx_ptr, &ffi_input, self.operator_id)
268		});
269
270		if result_code != 0 {
271			// Drop any orphaned builder slots.
272			let _ = self.builder_registry.drain();
273			return Err(
274				FFIError::Other(format!("FFI operator apply failed with code: {}", result_code)).into()
275			);
276		}
277
278		let output_change = drain_emitted_diffs(&self.builder_registry, self.operator_id, version, changed_at);
279
280		Span::current().record("output_diff_count", output_change.diffs.len());
281
282		Ok(output_change)
283	}
284
285	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
286		self.ensure_txn_setup(txn)?;
287
288		let row_numbers: Vec<u64> = rows.iter().map(|r| (*r).into()).collect();
289		let ffi_ctx_ptr = self.cached_ctx.get();
290
291		let result_code = self.invoke_under_panic_guard("pull", || unsafe {
292			(self.vtable.pull)(self.instance, ffi_ctx_ptr, row_numbers.as_ptr(), row_numbers.len())
293		});
294
295		if result_code != 0 {
296			let _ = self.builder_registry.drain();
297			return Err(
298				FFIError::Other(format!("FFI operator pull failed with code: {}", result_code)).into()
299			);
300		}
301
302		// `pull` emits a single Insert-shaped diff whose `post` columns are the
303		// rows the guest fetched. Use the first emitted diff's `post` (or `pre`
304		// for Remove) as the result.
305		let mut diffs = self.builder_registry.drain();
306		let columns = if let Some(first) = diffs.drain(..).next() {
307			first.post.or(first.pre).unwrap_or_else(Columns::empty)
308		} else {
309			Columns::empty()
310		};
311
312		Ok(columns)
313	}
314
315	#[instrument(name = "flow::ffi::tick", level = "debug", skip_all, fields(
316		operator_id = self.operator_id.0,
317		output_diff_count = field::Empty
318	))]
319	fn tick(&self, txn: &mut FlowTransaction, timestamp: DateTime) -> Result<Option<Change>> {
320		self.ensure_txn_setup(txn)?;
321
322		let timestamp_nanos = timestamp.to_nanos();
323		let ffi_ctx_ptr = self.cached_ctx.get();
324
325		let result_code = self.invoke_under_panic_guard("tick", || unsafe {
326			(self.vtable.tick)(self.instance, ffi_ctx_ptr, timestamp_nanos)
327		});
328
329		if result_code < 0 {
330			let _ = self.builder_registry.drain();
331			return Err(
332				FFIError::Other(format!("FFI operator tick failed with code: {}", result_code)).into()
333			);
334		}
335
336		if result_code == 1 {
337			// No output: drain in case the guest acquired without emitting.
338			let _ = self.builder_registry.drain();
339			return Ok(None);
340		}
341
342		// Tick has no carried txn version; use timestamp nanos as the version
343		// surrogate (consistent with other tick-driven flows in this codebase).
344		// Ordering-sensitive callers rely on `changed_at` instead.
345		let version = CommitVersion(timestamp_nanos);
346		let output_change = drain_emitted_diffs(&self.builder_registry, self.operator_id, version, timestamp);
347		Span::current().record("output_diff_count", output_change.diffs.len());
348		Ok(Some(output_change))
349	}
350}
351
352impl FFIOperator {
353	#[inline]
354	fn invoke_under_panic_guard<F>(&self, op: &'static str, call: F) -> i32
355	where
356		F: FnOnce() -> i32,
357	{
358		with_registry(&self.builder_registry, || {
359			let result = catch_unwind(AssertUnwindSafe(call));
360			match result {
361				Ok(code) => code,
362				Err(panic_info) => {
363					let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
364						s.to_string()
365					} else if let Some(s) = panic_info.downcast_ref::<String>() {
366						s.clone()
367					} else {
368						"Unknown panic".to_string()
369					};
370					error!(
371						operator_id = self.operator_id.0,
372						"FFI operator panicked during {}: {}", op, msg
373					);
374					abort();
375				}
376			}
377		})
378	}
379}
380
381/// Collect emitted diffs from the registry into a `Change` originating from
382/// this operator's flow node.
383fn drain_emitted_diffs(
384	registry: &BuilderRegistry,
385	operator_id: FlowNodeId,
386	version: CommitVersion,
387	changed_at: DateTime,
388) -> Change {
389	let emitted = registry.drain();
390	let diffs: Diffs = emitted
391		.into_iter()
392		.map(|d| match d.kind {
393			EmitDiffKind::Insert => Diff::insert(d.post.unwrap_or_else(Columns::empty)),
394			EmitDiffKind::Update => Diff::update(
395				d.pre.unwrap_or_else(Columns::empty),
396				d.post.unwrap_or_else(Columns::empty),
397			),
398			EmitDiffKind::Remove => Diff::remove(d.pre.unwrap_or_else(Columns::empty)),
399		})
400		.collect();
401	Change::from_flow(operator_id, version, diffs, changed_at)
402}