Skip to main content

reifydb_sdk/ffi/
wrapper.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	any::{self, Any},
6	backtrace::Backtrace,
7	cell::RefCell,
8	ffi::c_void,
9	io::{self, Write},
10	panic::{AssertUnwindSafe, catch_unwind},
11	process::abort,
12	slice,
13};
14
15use reifydb_abi::{
16	constants::FFI_ERROR_NULL_PTR,
17	context::context::ContextFFI,
18	flow::{
19		change::ChangeFFI,
20		diff::{DiffFFI, DiffType},
21	},
22	operator::vtable::OperatorVTableFFI,
23};
24use reifydb_type::value::{datetime::DateTime, row_number::RowNumber};
25use tracing::{error, instrument, warn};
26
27use crate::operator::{FFIOperator, change::BorrowedChange, context::OperatorContext};
28
29thread_local! {
30	/// Detail string stored by the innermost error-producing site and consumed
31	/// by the abort-printing site. Set whenever an FFI entry point is about to
32	/// return a negative code, cleared after the FATAL block is printed.
33	static FFI_FATAL_DETAIL: RefCell<Option<String>> = const { RefCell::new(None) };
34}
35
36fn set_fatal_detail(detail: String) {
37	FFI_FATAL_DETAIL.with(|cell| *cell.borrow_mut() = Some(detail));
38}
39
40fn take_fatal_detail() -> Option<String> {
41	FFI_FATAL_DETAIL.with(|cell| cell.borrow_mut().take())
42}
43
44fn origin_type_name(origin_type: u8) -> &'static str {
45	match origin_type {
46		0 => "Flow",
47		1 => "Table",
48		2 => "View",
49		3 => "VTable",
50		4 => "RingBuffer",
51		6 => "Dictionary",
52		7 => "Series",
53		_ => "Unknown",
54	}
55}
56
57/// Summarize a ChangeFFI without unmarshalling its columns. Safe to call from
58/// FATAL paths because it only touches the struct header + diff-type tags.
59///
60/// # Safety
61/// - `input` must be a valid pointer to `ChangeFFI` or null.
62unsafe fn describe_change_input(input: *const ChangeFFI) -> String {
63	if input.is_null() {
64		return "<null>".to_string();
65	}
66	let ffi = unsafe { &*input };
67	let types = if !ffi.diffs.is_null() && ffi.diff_count > 0 {
68		let diffs: &[DiffFFI] = unsafe { slice::from_raw_parts(ffi.diffs, ffi.diff_count) };
69		let names: Vec<&'static str> = diffs
70			.iter()
71			.map(|d| match d.diff_type {
72				DiffType::Insert => "Insert",
73				DiffType::Update => "Update",
74				DiffType::Remove => "Remove",
75			})
76			.collect();
77		format!("[{}]", names.join(", "))
78	} else {
79		"[]".to_string()
80	};
81	format!(
82		"origin={}({}) diff_count={} diff_types={}",
83		origin_type_name(ffi.origin.origin),
84		ffi.origin.id,
85		ffi.diff_count,
86		types,
87	)
88}
89
90fn describe_panic_payload(payload: &Box<dyn Any + Send>) -> String {
91	if let Some(s) = payload.downcast_ref::<&'static str>() {
92		s.to_string()
93	} else if let Some(s) = payload.downcast_ref::<String>() {
94		s.clone()
95	} else {
96		format!("<non-string panic payload, TypeId={:?}>", (**payload).type_id())
97	}
98}
99
100fn code_meaning(code: i32) -> &'static str {
101	match code {
102		-2 => "operator returned Err",
103		-3 => "unmarshal failed",
104		-99 => "panic caught in catch_unwind",
105		_ => "unknown",
106	}
107}
108
109/// Emit the unified FATAL diagnostic block to stderr and flush.
110///
111/// `input_description` is optional because `ffi_pull` has no ChangeFFI input
112/// and `ffi_destroy` has no input at all.
113fn print_ffi_fatal(
114	entry: &str,
115	operator: &str,
116	code: i32,
117	detail: &str,
118	input_description: Option<&str>,
119	backtrace: Option<&Backtrace>,
120) {
121	let mut err = io::stderr().lock();
122	let _ = writeln!(err, "========== FFI FATAL ==========");
123	let _ = writeln!(err, "entry:    {}", entry);
124	let _ = writeln!(err, "operator: {}", operator);
125	let _ = writeln!(err, "code:     {} ({})", code, code_meaning(code));
126	let _ = writeln!(
127		err,
128		"detail:   {}",
129		if detail.is_empty() {
130			"<none>"
131		} else {
132			detail
133		}
134	);
135	if let Some(desc) = input_description {
136		let _ = writeln!(err, "input:    {}", desc);
137	}
138	if let Some(bt) = backtrace {
139		let _ = writeln!(err, "backtrace:\n{}", bt);
140	}
141	let _ = writeln!(err, "===============================");
142	let _ = err.flush();
143}
144
145/// Wrapper that adapts a Rust operator to the zero-copy FFI interface.
146///
147/// No internal arena: input is a `BorrowedChange<'_>` whose pointers alias
148/// host-owned native column storage, and output is written directly into
149/// host-pool-owned buffers via `ctx.builder()`. The wrapper's only job is
150/// to hold the Rust operator instance and expose it as `*mut c_void`.
151pub struct OperatorWrapper<O: FFIOperator> {
152	pub(crate) operator: O,
153}
154
155impl<O: FFIOperator> OperatorWrapper<O> {
156	/// Create a new operator wrapper
157	pub fn new(operator: O) -> Self {
158		Self {
159			operator,
160		}
161	}
162
163	/// Get a pointer to this wrapper as c_void
164	pub fn as_ptr(&mut self) -> *mut c_void {
165		self as *mut _ as *mut c_void
166	}
167
168	/// Create from a raw pointer
169	pub fn from_ptr(ptr: *mut c_void) -> &'static mut Self {
170		unsafe { &mut *(ptr as *mut Self) }
171	}
172}
173
174/// # Safety
175///
176/// - `instance` must be a valid pointer to an `OperatorWrapper<O>` created by `Box::new`.
177/// - `ctx` must be a valid pointer to a `ContextFFI`.
178/// - `input` must be a valid pointer to a `ChangeFFI` whose buffer pointers are valid for the duration of the call.
179#[instrument(name = "flow::operator::ffi::apply", level = "debug", skip_all, fields(
180	operator_type = any::type_name::<O>(),
181))]
182pub unsafe extern "C" fn ffi_apply<O: FFIOperator>(
183	instance: *mut c_void,
184	ctx: *mut ContextFFI,
185	input: *const ChangeFFI,
186) -> i32 {
187	let result = catch_unwind(AssertUnwindSafe(|| {
188		if input.is_null() {
189			set_fatal_detail("ffi_apply: input is null".to_string());
190			return -3;
191		}
192		let wrapper = OperatorWrapper::<O>::from_ptr(instance);
193		let borrowed = unsafe { BorrowedChange::from_raw(input) };
194		let mut op_ctx = OperatorContext::new(ctx);
195		match wrapper.operator.apply(&mut op_ctx, borrowed) {
196			Ok(()) => 0,
197			Err(e) => {
198				warn!(?e, "Apply failed");
199				set_fatal_detail(format!("{:?}", e));
200				-2
201			}
202		}
203	}));
204
205	let (code, backtrace) = match result {
206		Ok(code) => (code, None),
207		Err(payload) => {
208			let bt = Backtrace::force_capture();
209			set_fatal_detail(describe_panic_payload(&payload));
210			error!("Panic in ffi_apply");
211			(-99, Some(bt))
212		}
213	};
214
215	if code < 0 {
216		let detail = take_fatal_detail().unwrap_or_default();
217		let input_desc = unsafe { describe_change_input(input) };
218		print_ffi_fatal(
219			"ffi_apply",
220			any::type_name::<O>(),
221			code,
222			&detail,
223			Some(&input_desc),
224			backtrace.as_ref(),
225		);
226		abort();
227	}
228	code
229}
230
231/// # Safety
232///
233/// - `instance` must be a valid pointer to an `OperatorWrapper<O>` created by `Box::new`.
234/// - `ctx` must be a valid pointer to a `ContextFFI`.
235/// - `row_numbers` must be valid for reading `count` elements, or null if `count` is 0.
236#[instrument(name = "flow::operator::ffi::pull", level = "debug", skip_all, fields(
237	operator_type = any::type_name::<O>(),
238	row_count = count,
239))]
240pub unsafe extern "C" fn ffi_pull<O: FFIOperator>(
241	instance: *mut c_void,
242	ctx: *mut ContextFFI,
243	row_numbers: *const u64,
244	count: usize,
245) -> i32 {
246	let result = catch_unwind(AssertUnwindSafe(|| {
247		let wrapper = OperatorWrapper::<O>::from_ptr(instance);
248
249		// Convert row numbers; the input slice borrows host memory for
250		// the duration of the call, but RowNumber is Copy so we
251		// collect owned values for the operator.
252		let numbers: Vec<RowNumber> = if !row_numbers.is_null() && count > 0 {
253			unsafe { slice::from_raw_parts(row_numbers, count) }
254				.iter()
255				.map(|&n| RowNumber::from(n))
256				.collect()
257		} else {
258			Vec::new()
259		};
260
261		let mut op_ctx = OperatorContext::new(ctx);
262		match wrapper.operator.pull(&mut op_ctx, &numbers) {
263			Ok(()) => 0,
264			Err(e) => {
265				warn!(?e, "pull failed");
266				set_fatal_detail(format!("{:?}", e));
267				-2
268			}
269		}
270	}));
271
272	let (code, backtrace) = match result {
273		Ok(code) => (code, None),
274		Err(payload) => {
275			let bt = Backtrace::force_capture();
276			set_fatal_detail(describe_panic_payload(&payload));
277			error!("Panic in ffi_pull");
278			(-99, Some(bt))
279		}
280	};
281
282	if code < 0 {
283		let detail = take_fatal_detail().unwrap_or_default();
284		let input_desc = format!("row_count={}", count);
285		print_ffi_fatal(
286			"ffi_pull",
287			any::type_name::<O>(),
288			code,
289			&detail,
290			Some(&input_desc),
291			backtrace.as_ref(),
292		);
293		abort();
294	}
295	code
296}
297
298/// # Safety
299///
300/// - `instance` must be a valid pointer to an `OperatorWrapper<O>` created by `Box::new`.
301/// - `ctx` must be a valid pointer to a `ContextFFI`.
302#[instrument(name = "flow::operator::ffi::tick", level = "debug", skip_all, fields(
303	operator_type = any::type_name::<O>(),
304))]
305pub unsafe extern "C" fn ffi_tick<O: FFIOperator>(
306	instance: *mut c_void,
307	ctx: *mut ContextFFI,
308	timestamp_nanos: u64,
309) -> i32 {
310	let result = catch_unwind(AssertUnwindSafe(|| {
311		let wrapper = OperatorWrapper::<O>::from_ptr(instance);
312
313		let timestamp = DateTime::from_nanos(timestamp_nanos);
314		let mut op_ctx = OperatorContext::new(ctx);
315
316		match wrapper.operator.tick(&mut op_ctx, timestamp) {
317			Ok(true) => 0,
318			Ok(false) => 1,
319			Err(e) => {
320				warn!(?e, "Tick failed");
321				set_fatal_detail(format!("{:?}", e));
322				-2
323			}
324		}
325	}));
326
327	let (code, backtrace) = match result {
328		Ok(code) => (code, None),
329		Err(payload) => {
330			let bt = Backtrace::force_capture();
331			set_fatal_detail(describe_panic_payload(&payload));
332			error!("Panic in ffi_tick");
333			(-99, Some(bt))
334		}
335	};
336
337	if code < 0 {
338		let detail = take_fatal_detail().unwrap_or_default();
339		let input_desc = format!("timestamp_nanos={}", timestamp_nanos);
340		print_ffi_fatal(
341			"ffi_tick",
342			any::type_name::<O>(),
343			code,
344			&detail,
345			Some(&input_desc),
346			backtrace.as_ref(),
347		);
348		abort();
349	}
350	code
351}
352
353/// # Safety
354///
355/// - `instance` must be a valid pointer to an `OperatorWrapper<O>` originally created by `Box::new`, or null (in which
356///   case this is a no-op).
357pub unsafe extern "C" fn ffi_destroy<O: FFIOperator>(instance: *mut c_void) {
358	if instance.is_null() {
359		return;
360	}
361
362	let result = catch_unwind(AssertUnwindSafe(|| unsafe {
363		// Reconstruct the Box from the raw pointer and let it drop
364		let _wrapper = Box::from_raw(instance as *mut OperatorWrapper<O>);
365		// Wrapper will be dropped here, cleaning up the operator
366	}));
367
368	if let Err(payload) = result {
369		let bt = Backtrace::force_capture();
370		let detail = describe_panic_payload(&payload);
371		error!("Panic in ffi_destroy - aborting");
372		print_ffi_fatal("ffi_destroy", any::type_name::<O>(), -99, &detail, None, Some(&bt));
373		abort();
374	}
375}
376
377/// FFI entry point for `flush_state`. Called once per txn at commit time.
378///
379/// # Safety
380///
381/// - `instance` must be a valid pointer to an `OperatorWrapper<O>`.
382/// - `ctx` must point to a valid `ContextFFI` for the duration of the call.
383pub unsafe extern "C" fn ffi_flush_state<O: FFIOperator>(instance: *mut c_void, ctx: *mut ContextFFI) -> i32 {
384	if instance.is_null() || ctx.is_null() {
385		return FFI_ERROR_NULL_PTR;
386	}
387
388	let result = catch_unwind(AssertUnwindSafe(|| {
389		let wrapper = unsafe { &mut *(instance as *mut OperatorWrapper<O>) };
390		let mut op_ctx = OperatorContext::new(ctx);
391		wrapper.operator.flush_state(&mut op_ctx)
392	}));
393
394	match result {
395		Ok(Ok(())) => 0,
396		Ok(Err(e)) => {
397			error!(target: "ffi_flush_state", "operator flush_state failed: {:?}", e);
398			-2
399		}
400		Err(payload) => {
401			let bt = Backtrace::force_capture();
402			let detail = describe_panic_payload(&payload);
403			error!("Panic in ffi_flush_state - aborting");
404			print_ffi_fatal("ffi_flush_state", any::type_name::<O>(), -99, &detail, None, Some(&bt));
405			abort();
406		}
407	}
408}
409
410/// Create the vtable for an operator type
411pub fn create_vtable<O: FFIOperator>() -> OperatorVTableFFI {
412	OperatorVTableFFI {
413		apply: ffi_apply::<O>,
414		pull: ffi_pull::<O>,
415		tick: ffi_tick::<O>,
416		destroy: ffi_destroy::<O>,
417		flush_state: ffi_flush_state::<O>,
418	}
419}