1use std::{
5 any::{self, Any},
6 backtrace::Backtrace,
7 cell::RefCell,
8 ffi::c_void,
9 io::{self, Write},
10 panic::{AssertUnwindSafe, catch_unwind},
11 process::abort,
12 slice,
13};
14
15use reifydb_abi::{
16 constants::FFI_ERROR_NULL_PTR,
17 context::context::ContextFFI,
18 flow::{
19 change::ChangeFFI,
20 diff::{DiffFFI, DiffType},
21 },
22 operator::vtable::OperatorVTableFFI,
23};
24use reifydb_type::value::{datetime::DateTime, row_number::RowNumber};
25use tracing::{error, instrument, warn};
26
27use crate::operator::{FFIOperator, change::BorrowedChange, context::OperatorContext};
28
29thread_local! {
30 static FFI_FATAL_DETAIL: RefCell<Option<String>> = const { RefCell::new(None) };
34}
35
36fn set_fatal_detail(detail: String) {
37 FFI_FATAL_DETAIL.with(|cell| *cell.borrow_mut() = Some(detail));
38}
39
40fn take_fatal_detail() -> Option<String> {
41 FFI_FATAL_DETAIL.with(|cell| cell.borrow_mut().take())
42}
43
44fn origin_type_name(origin_type: u8) -> &'static str {
45 match origin_type {
46 0 => "Flow",
47 1 => "Table",
48 2 => "View",
49 3 => "VTable",
50 4 => "RingBuffer",
51 6 => "Dictionary",
52 7 => "Series",
53 _ => "Unknown",
54 }
55}
56
57unsafe fn describe_change_input(input: *const ChangeFFI) -> String {
63 if input.is_null() {
64 return "<null>".to_string();
65 }
66 let ffi = unsafe { &*input };
67 let types = if !ffi.diffs.is_null() && ffi.diff_count > 0 {
68 let diffs: &[DiffFFI] = unsafe { slice::from_raw_parts(ffi.diffs, ffi.diff_count) };
69 let names: Vec<&'static str> = diffs
70 .iter()
71 .map(|d| match d.diff_type {
72 DiffType::Insert => "Insert",
73 DiffType::Update => "Update",
74 DiffType::Remove => "Remove",
75 })
76 .collect();
77 format!("[{}]", names.join(", "))
78 } else {
79 "[]".to_string()
80 };
81 format!(
82 "origin={}({}) diff_count={} diff_types={}",
83 origin_type_name(ffi.origin.origin),
84 ffi.origin.id,
85 ffi.diff_count,
86 types,
87 )
88}
89
90fn describe_panic_payload(payload: &Box<dyn Any + Send>) -> String {
91 if let Some(s) = payload.downcast_ref::<&'static str>() {
92 s.to_string()
93 } else if let Some(s) = payload.downcast_ref::<String>() {
94 s.clone()
95 } else {
96 format!("<non-string panic payload, TypeId={:?}>", (**payload).type_id())
97 }
98}
99
100fn code_meaning(code: i32) -> &'static str {
101 match code {
102 -2 => "operator returned Err",
103 -3 => "unmarshal failed",
104 -99 => "panic caught in catch_unwind",
105 _ => "unknown",
106 }
107}
108
109fn print_ffi_fatal(
114 entry: &str,
115 operator: &str,
116 code: i32,
117 detail: &str,
118 input_description: Option<&str>,
119 backtrace: Option<&Backtrace>,
120) {
121 let mut err = io::stderr().lock();
122 let _ = writeln!(err, "========== FFI FATAL ==========");
123 let _ = writeln!(err, "entry: {}", entry);
124 let _ = writeln!(err, "operator: {}", operator);
125 let _ = writeln!(err, "code: {} ({})", code, code_meaning(code));
126 let _ = writeln!(
127 err,
128 "detail: {}",
129 if detail.is_empty() {
130 "<none>"
131 } else {
132 detail
133 }
134 );
135 if let Some(desc) = input_description {
136 let _ = writeln!(err, "input: {}", desc);
137 }
138 if let Some(bt) = backtrace {
139 let _ = writeln!(err, "backtrace:\n{}", bt);
140 }
141 let _ = writeln!(err, "===============================");
142 let _ = err.flush();
143}
144
145pub struct OperatorWrapper<O: FFIOperator> {
152 pub(crate) operator: O,
153}
154
155impl<O: FFIOperator> OperatorWrapper<O> {
156 pub fn new(operator: O) -> Self {
158 Self {
159 operator,
160 }
161 }
162
163 pub fn as_ptr(&mut self) -> *mut c_void {
165 self as *mut _ as *mut c_void
166 }
167
168 pub fn from_ptr(ptr: *mut c_void) -> &'static mut Self {
170 unsafe { &mut *(ptr as *mut Self) }
171 }
172}
173
174#[instrument(name = "flow::operator::ffi::apply", level = "debug", skip_all, fields(
180 operator_type = any::type_name::<O>(),
181))]
182pub unsafe extern "C" fn ffi_apply<O: FFIOperator>(
183 instance: *mut c_void,
184 ctx: *mut ContextFFI,
185 input: *const ChangeFFI,
186) -> i32 {
187 let result = catch_unwind(AssertUnwindSafe(|| {
188 if input.is_null() {
189 set_fatal_detail("ffi_apply: input is null".to_string());
190 return -3;
191 }
192 let wrapper = OperatorWrapper::<O>::from_ptr(instance);
193 let borrowed = unsafe { BorrowedChange::from_raw(input) };
194 let mut op_ctx = OperatorContext::new(ctx);
195 match wrapper.operator.apply(&mut op_ctx, borrowed) {
196 Ok(()) => 0,
197 Err(e) => {
198 warn!(?e, "Apply failed");
199 set_fatal_detail(format!("{:?}", e));
200 -2
201 }
202 }
203 }));
204
205 let (code, backtrace) = match result {
206 Ok(code) => (code, None),
207 Err(payload) => {
208 let bt = Backtrace::force_capture();
209 set_fatal_detail(describe_panic_payload(&payload));
210 error!("Panic in ffi_apply");
211 (-99, Some(bt))
212 }
213 };
214
215 if code < 0 {
216 let detail = take_fatal_detail().unwrap_or_default();
217 let input_desc = unsafe { describe_change_input(input) };
218 print_ffi_fatal(
219 "ffi_apply",
220 any::type_name::<O>(),
221 code,
222 &detail,
223 Some(&input_desc),
224 backtrace.as_ref(),
225 );
226 abort();
227 }
228 code
229}
230
231#[instrument(name = "flow::operator::ffi::pull", level = "debug", skip_all, fields(
237 operator_type = any::type_name::<O>(),
238 row_count = count,
239))]
240pub unsafe extern "C" fn ffi_pull<O: FFIOperator>(
241 instance: *mut c_void,
242 ctx: *mut ContextFFI,
243 row_numbers: *const u64,
244 count: usize,
245) -> i32 {
246 let result = catch_unwind(AssertUnwindSafe(|| {
247 let wrapper = OperatorWrapper::<O>::from_ptr(instance);
248
249 let numbers: Vec<RowNumber> = if !row_numbers.is_null() && count > 0 {
253 unsafe { slice::from_raw_parts(row_numbers, count) }
254 .iter()
255 .map(|&n| RowNumber::from(n))
256 .collect()
257 } else {
258 Vec::new()
259 };
260
261 let mut op_ctx = OperatorContext::new(ctx);
262 match wrapper.operator.pull(&mut op_ctx, &numbers) {
263 Ok(()) => 0,
264 Err(e) => {
265 warn!(?e, "pull failed");
266 set_fatal_detail(format!("{:?}", e));
267 -2
268 }
269 }
270 }));
271
272 let (code, backtrace) = match result {
273 Ok(code) => (code, None),
274 Err(payload) => {
275 let bt = Backtrace::force_capture();
276 set_fatal_detail(describe_panic_payload(&payload));
277 error!("Panic in ffi_pull");
278 (-99, Some(bt))
279 }
280 };
281
282 if code < 0 {
283 let detail = take_fatal_detail().unwrap_or_default();
284 let input_desc = format!("row_count={}", count);
285 print_ffi_fatal(
286 "ffi_pull",
287 any::type_name::<O>(),
288 code,
289 &detail,
290 Some(&input_desc),
291 backtrace.as_ref(),
292 );
293 abort();
294 }
295 code
296}
297
298#[instrument(name = "flow::operator::ffi::tick", level = "debug", skip_all, fields(
303 operator_type = any::type_name::<O>(),
304))]
305pub unsafe extern "C" fn ffi_tick<O: FFIOperator>(
306 instance: *mut c_void,
307 ctx: *mut ContextFFI,
308 timestamp_nanos: u64,
309) -> i32 {
310 let result = catch_unwind(AssertUnwindSafe(|| {
311 let wrapper = OperatorWrapper::<O>::from_ptr(instance);
312
313 let timestamp = DateTime::from_nanos(timestamp_nanos);
314 let mut op_ctx = OperatorContext::new(ctx);
315
316 match wrapper.operator.tick(&mut op_ctx, timestamp) {
317 Ok(true) => 0,
318 Ok(false) => 1,
319 Err(e) => {
320 warn!(?e, "Tick failed");
321 set_fatal_detail(format!("{:?}", e));
322 -2
323 }
324 }
325 }));
326
327 let (code, backtrace) = match result {
328 Ok(code) => (code, None),
329 Err(payload) => {
330 let bt = Backtrace::force_capture();
331 set_fatal_detail(describe_panic_payload(&payload));
332 error!("Panic in ffi_tick");
333 (-99, Some(bt))
334 }
335 };
336
337 if code < 0 {
338 let detail = take_fatal_detail().unwrap_or_default();
339 let input_desc = format!("timestamp_nanos={}", timestamp_nanos);
340 print_ffi_fatal(
341 "ffi_tick",
342 any::type_name::<O>(),
343 code,
344 &detail,
345 Some(&input_desc),
346 backtrace.as_ref(),
347 );
348 abort();
349 }
350 code
351}
352
353pub unsafe extern "C" fn ffi_destroy<O: FFIOperator>(instance: *mut c_void) {
358 if instance.is_null() {
359 return;
360 }
361
362 let result = catch_unwind(AssertUnwindSafe(|| unsafe {
363 let _wrapper = Box::from_raw(instance as *mut OperatorWrapper<O>);
365 }));
367
368 if let Err(payload) = result {
369 let bt = Backtrace::force_capture();
370 let detail = describe_panic_payload(&payload);
371 error!("Panic in ffi_destroy - aborting");
372 print_ffi_fatal("ffi_destroy", any::type_name::<O>(), -99, &detail, None, Some(&bt));
373 abort();
374 }
375}
376
377pub unsafe extern "C" fn ffi_flush_state<O: FFIOperator>(instance: *mut c_void, ctx: *mut ContextFFI) -> i32 {
384 if instance.is_null() || ctx.is_null() {
385 return FFI_ERROR_NULL_PTR;
386 }
387
388 let result = catch_unwind(AssertUnwindSafe(|| {
389 let wrapper = unsafe { &mut *(instance as *mut OperatorWrapper<O>) };
390 let mut op_ctx = OperatorContext::new(ctx);
391 wrapper.operator.flush_state(&mut op_ctx)
392 }));
393
394 match result {
395 Ok(Ok(())) => 0,
396 Ok(Err(e)) => {
397 error!(target: "ffi_flush_state", "operator flush_state failed: {:?}", e);
398 -2
399 }
400 Err(payload) => {
401 let bt = Backtrace::force_capture();
402 let detail = describe_panic_payload(&payload);
403 error!("Panic in ffi_flush_state - aborting");
404 print_ffi_fatal("ffi_flush_state", any::type_name::<O>(), -99, &detail, None, Some(&bt));
405 abort();
406 }
407 }
408}
409
410pub fn create_vtable<O: FFIOperator>() -> OperatorVTableFFI {
412 OperatorVTableFFI {
413 apply: ffi_apply::<O>,
414 pull: ffi_pull::<O>,
415 tick: ffi_tick::<O>,
416 destroy: ffi_destroy::<O>,
417 flush_state: ffi_flush_state::<O>,
418 }
419}