Skip to main content

reifydb_sdk/ffi/
wrapper.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	any::{self, Any},
6	backtrace::Backtrace,
7	cell::RefCell,
8	ffi::c_void,
9	io::{self, Write},
10	panic::{AssertUnwindSafe, catch_unwind},
11	process::abort,
12	slice,
13};
14
15use reifydb_abi::{
16	constants::FFI_ERROR_NULL_PTR,
17	context::context::ContextFFI,
18	flow::{
19		change::ChangeFFI,
20		diff::{DiffFFI, DiffType},
21	},
22	operator::vtable::OperatorVTableFFI,
23};
24use reifydb_type::value::{datetime::DateTime, row_number::RowNumber};
25use tracing::{error, instrument, warn};
26
27use crate::operator::{FFIOperator, Tick, change::BorrowedChange, context::OperatorContext};
28
29thread_local! {
30
31
32
33	static FFI_FATAL_DETAIL: RefCell<Option<String>> = const { RefCell::new(None) };
34}
35
36fn set_fatal_detail(detail: String) {
37	FFI_FATAL_DETAIL.with(|cell| *cell.borrow_mut() = Some(detail));
38}
39
40fn take_fatal_detail() -> Option<String> {
41	FFI_FATAL_DETAIL.with(|cell| cell.borrow_mut().take())
42}
43
44fn origin_type_name(origin_type: u8) -> &'static str {
45	match origin_type {
46		0 => "Flow",
47		1 => "Table",
48		2 => "View",
49		3 => "VTable",
50		4 => "RingBuffer",
51		6 => "Dictionary",
52		7 => "Series",
53		_ => "Unknown",
54	}
55}
56
57unsafe fn describe_change_input(input: *const ChangeFFI) -> String {
58	if input.is_null() {
59		return "<null>".to_string();
60	}
61	let ffi = unsafe { &*input };
62	let types = if !ffi.diffs.is_null() && ffi.diff_count > 0 {
63		let diffs: &[DiffFFI] = unsafe { slice::from_raw_parts(ffi.diffs, ffi.diff_count) };
64		let names: Vec<&'static str> = diffs
65			.iter()
66			.map(|d| match d.diff_type {
67				DiffType::Insert => "Insert",
68				DiffType::Update => "Update",
69				DiffType::Remove => "Remove",
70			})
71			.collect();
72		format!("[{}]", names.join(", "))
73	} else {
74		"[]".to_string()
75	};
76	format!(
77		"origin={}({}) diff_count={} diff_types={}",
78		origin_type_name(ffi.origin.origin),
79		ffi.origin.id,
80		ffi.diff_count,
81		types,
82	)
83}
84
85fn describe_panic_payload(payload: &Box<dyn Any + Send>) -> String {
86	if let Some(s) = payload.downcast_ref::<&'static str>() {
87		s.to_string()
88	} else if let Some(s) = payload.downcast_ref::<String>() {
89		s.clone()
90	} else {
91		format!("<non-string panic payload, TypeId={:?}>", (**payload).type_id())
92	}
93}
94
95fn code_meaning(code: i32) -> &'static str {
96	match code {
97		-2 => "operator returned Err",
98		-3 => "unmarshal failed",
99		-99 => "panic caught in catch_unwind",
100		_ => "unknown",
101	}
102}
103
104fn print_ffi_fatal(
105	entry: &str,
106	operator: &str,
107	code: i32,
108	detail: &str,
109	input_description: Option<&str>,
110	backtrace: Option<&Backtrace>,
111) {
112	let mut err = io::stderr().lock();
113	let _ = writeln!(err, "========== FFI FATAL ==========");
114	let _ = writeln!(err, "entry:    {}", entry);
115	let _ = writeln!(err, "operator: {}", operator);
116	let _ = writeln!(err, "code:     {} ({})", code, code_meaning(code));
117	let _ = writeln!(
118		err,
119		"detail:   {}",
120		if detail.is_empty() {
121			"<none>"
122		} else {
123			detail
124		}
125	);
126	if let Some(desc) = input_description {
127		let _ = writeln!(err, "input:    {}", desc);
128	}
129	if let Some(bt) = backtrace {
130		let _ = writeln!(err, "backtrace:\n{}", bt);
131	}
132	let _ = writeln!(err, "===============================");
133	let _ = err.flush();
134}
135
136pub struct OperatorWrapper<O: FFIOperator> {
137	pub(crate) operator: O,
138}
139
140impl<O: FFIOperator> OperatorWrapper<O> {
141	pub fn new(operator: O) -> Self {
142		Self {
143			operator,
144		}
145	}
146
147	pub fn as_ptr(&mut self) -> *mut c_void {
148		self as *mut _ as *mut c_void
149	}
150
151	pub fn from_ptr(ptr: *mut c_void) -> &'static mut Self {
152		unsafe { &mut *(ptr as *mut Self) }
153	}
154}
155
156/// # Safety
157///
158/// - `instance` must be a valid pointer to an `OperatorWrapper<O>` created by `Box::new`.
159/// - `ctx` must be a valid pointer to a `ContextFFI`.
160/// - `input` must be a valid pointer to a `ChangeFFI` whose buffer pointers are valid for the duration of the call.
161#[instrument(name = "flow::operator::ffi::apply", level = "debug", skip_all, fields(
162	operator_type = any::type_name::<O>(),
163))]
164pub unsafe extern "C" fn ffi_apply<O: FFIOperator>(
165	instance: *mut c_void,
166	ctx: *mut ContextFFI,
167	input: *const ChangeFFI,
168) -> i32 {
169	let result = catch_unwind(AssertUnwindSafe(|| {
170		if input.is_null() {
171			set_fatal_detail("ffi_apply: input is null".to_string());
172			return -3;
173		}
174		let wrapper = OperatorWrapper::<O>::from_ptr(instance);
175		let borrowed = unsafe { BorrowedChange::from_raw(input) };
176		let mut op_ctx = OperatorContext::new(ctx);
177		match wrapper.operator.apply(&mut op_ctx, borrowed) {
178			Ok(()) => 0,
179			Err(e) => {
180				warn!(?e, "Apply failed");
181				set_fatal_detail(format!("{:?}", e));
182				-2
183			}
184		}
185	}));
186
187	let (code, backtrace) = match result {
188		Ok(code) => (code, None),
189		Err(payload) => {
190			let bt = Backtrace::force_capture();
191			set_fatal_detail(describe_panic_payload(&payload));
192			error!("Panic in ffi_apply");
193			(-99, Some(bt))
194		}
195	};
196
197	if code < 0 {
198		let detail = take_fatal_detail().unwrap_or_default();
199		let input_desc = unsafe { describe_change_input(input) };
200		print_ffi_fatal(
201			"ffi_apply",
202			any::type_name::<O>(),
203			code,
204			&detail,
205			Some(&input_desc),
206			backtrace.as_ref(),
207		);
208		abort();
209	}
210	code
211}
212
213/// # Safety
214///
215/// - `instance` must be a valid pointer to an `OperatorWrapper<O>` created by `Box::new`.
216/// - `ctx` must be a valid pointer to a `ContextFFI`.
217/// - `row_numbers` must be valid for reading `count` elements, or null if `count` is 0.
218#[instrument(name = "flow::operator::ffi::pull", level = "debug", skip_all, fields(
219	operator_type = any::type_name::<O>(),
220	row_count = count,
221))]
222pub unsafe extern "C" fn ffi_pull<O: FFIOperator>(
223	instance: *mut c_void,
224	ctx: *mut ContextFFI,
225	row_numbers: *const u64,
226	count: usize,
227) -> i32 {
228	let result = catch_unwind(AssertUnwindSafe(|| {
229		let wrapper = OperatorWrapper::<O>::from_ptr(instance);
230
231		let numbers: Vec<RowNumber> = if !row_numbers.is_null() && count > 0 {
232			unsafe { slice::from_raw_parts(row_numbers, count) }
233				.iter()
234				.map(|&n| RowNumber::from(n))
235				.collect()
236		} else {
237			Vec::new()
238		};
239
240		let mut op_ctx = OperatorContext::new(ctx);
241		match wrapper.operator.pull(&mut op_ctx, &numbers) {
242			Ok(()) => 0,
243			Err(e) => {
244				warn!(?e, "pull failed");
245				set_fatal_detail(format!("{:?}", e));
246				-2
247			}
248		}
249	}));
250
251	let (code, backtrace) = match result {
252		Ok(code) => (code, None),
253		Err(payload) => {
254			let bt = Backtrace::force_capture();
255			set_fatal_detail(describe_panic_payload(&payload));
256			error!("Panic in ffi_pull");
257			(-99, Some(bt))
258		}
259	};
260
261	if code < 0 {
262		let detail = take_fatal_detail().unwrap_or_default();
263		let input_desc = format!("row_count={}", count);
264		print_ffi_fatal(
265			"ffi_pull",
266			any::type_name::<O>(),
267			code,
268			&detail,
269			Some(&input_desc),
270			backtrace.as_ref(),
271		);
272		abort();
273	}
274	code
275}
276
277/// # Safety
278///
279/// - `instance` must be a valid pointer to an `OperatorWrapper<O>` created by `Box::new`.
280/// - `ctx` must be a valid pointer to a `ContextFFI`.
281#[instrument(name = "flow::operator::ffi::tick", level = "debug", skip_all, fields(
282	operator_type = any::type_name::<O>(),
283))]
284pub unsafe extern "C" fn ffi_tick<O: FFIOperator>(
285	instance: *mut c_void,
286	ctx: *mut ContextFFI,
287	timestamp_nanos: u64,
288) -> i32 {
289	let result = catch_unwind(AssertUnwindSafe(|| {
290		let wrapper = OperatorWrapper::<O>::from_ptr(instance);
291
292		let tick = Tick {
293			now: DateTime::from_nanos(timestamp_nanos),
294		};
295		let mut op_ctx = OperatorContext::new(ctx);
296
297		match wrapper.operator.tick(&mut op_ctx, tick) {
298			Ok(true) => 0,
299			Ok(false) => 1,
300			Err(e) => {
301				warn!(?e, "Tick failed");
302				set_fatal_detail(format!("{:?}", e));
303				-2
304			}
305		}
306	}));
307
308	let (code, backtrace) = match result {
309		Ok(code) => (code, None),
310		Err(payload) => {
311			let bt = Backtrace::force_capture();
312			set_fatal_detail(describe_panic_payload(&payload));
313			error!("Panic in ffi_tick");
314			(-99, Some(bt))
315		}
316	};
317
318	if code < 0 {
319		let detail = take_fatal_detail().unwrap_or_default();
320		let input_desc = format!("timestamp_nanos={}", timestamp_nanos);
321		print_ffi_fatal(
322			"ffi_tick",
323			any::type_name::<O>(),
324			code,
325			&detail,
326			Some(&input_desc),
327			backtrace.as_ref(),
328		);
329		abort();
330	}
331	code
332}
333
334/// # Safety
335///
336/// - `instance` must be a valid pointer to an `OperatorWrapper<O>` originally created by `Box::new`, or null (in which
337///   case this is a no-op).
338pub unsafe extern "C" fn ffi_destroy<O: FFIOperator>(instance: *mut c_void) {
339	if instance.is_null() {
340		return;
341	}
342
343	let result = catch_unwind(AssertUnwindSafe(|| unsafe {
344		let _wrapper = Box::from_raw(instance as *mut OperatorWrapper<O>);
345	}));
346
347	if let Err(payload) = result {
348		let bt = Backtrace::force_capture();
349		let detail = describe_panic_payload(&payload);
350		error!("Panic in ffi_destroy - aborting");
351		print_ffi_fatal("ffi_destroy", any::type_name::<O>(), -99, &detail, None, Some(&bt));
352		abort();
353	}
354}
355
356/// FFI entry point for `flush_state`. Called once per txn at commit time.
357///
358/// # Safety
359///
360/// - `instance` must be a valid pointer to an `OperatorWrapper<O>`.
361/// - `ctx` must point to a valid `ContextFFI` for the duration of the call.
362pub unsafe extern "C" fn ffi_flush_state<O: FFIOperator>(instance: *mut c_void, ctx: *mut ContextFFI) -> i32 {
363	if instance.is_null() || ctx.is_null() {
364		return FFI_ERROR_NULL_PTR;
365	}
366
367	let result = catch_unwind(AssertUnwindSafe(|| {
368		let wrapper = unsafe { &mut *(instance as *mut OperatorWrapper<O>) };
369		let mut op_ctx = OperatorContext::new(ctx);
370		wrapper.operator.flush_state(&mut op_ctx)
371	}));
372
373	match result {
374		Ok(Ok(())) => 0,
375		Ok(Err(e)) => {
376			error!(target: "ffi_flush_state", "operator flush_state failed: {:?}", e);
377			-2
378		}
379		Err(payload) => {
380			let bt = Backtrace::force_capture();
381			let detail = describe_panic_payload(&payload);
382			error!("Panic in ffi_flush_state - aborting");
383			print_ffi_fatal("ffi_flush_state", any::type_name::<O>(), -99, &detail, None, Some(&bt));
384			abort();
385		}
386	}
387}
388
389pub fn create_vtable<O: FFIOperator>() -> OperatorVTableFFI {
390	OperatorVTableFFI {
391		apply: ffi_apply::<O>,
392		pull: ffi_pull::<O>,
393		tick: ffi_tick::<O>,
394		destroy: ffi_destroy::<O>,
395		flush_state: ffi_flush_state::<O>,
396	}
397}