1use std::{
5 any::{self, Any},
6 backtrace::Backtrace,
7 cell::RefCell,
8 ffi::c_void,
9 io::{self, Write},
10 panic::{AssertUnwindSafe, catch_unwind},
11 process::abort,
12 slice,
13};
14
15use reifydb_abi::{
16 constants::FFI_ERROR_NULL_PTR,
17 context::context::ContextFFI,
18 flow::{
19 change::ChangeFFI,
20 diff::{DiffFFI, DiffType},
21 },
22 operator::vtable::OperatorVTableFFI,
23};
24use reifydb_type::value::{datetime::DateTime, row_number::RowNumber};
25use tracing::{error, instrument, warn};
26
27use crate::operator::{FFIOperator, Tick, change::BorrowedChange, context::OperatorContext};
28
29thread_local! {
30
31
32
33 static FFI_FATAL_DETAIL: RefCell<Option<String>> = const { RefCell::new(None) };
34}
35
36fn set_fatal_detail(detail: String) {
37 FFI_FATAL_DETAIL.with(|cell| *cell.borrow_mut() = Some(detail));
38}
39
40fn take_fatal_detail() -> Option<String> {
41 FFI_FATAL_DETAIL.with(|cell| cell.borrow_mut().take())
42}
43
44fn origin_type_name(origin_type: u8) -> &'static str {
45 match origin_type {
46 0 => "Flow",
47 1 => "Table",
48 2 => "View",
49 3 => "VTable",
50 4 => "RingBuffer",
51 6 => "Dictionary",
52 7 => "Series",
53 _ => "Unknown",
54 }
55}
56
57unsafe fn describe_change_input(input: *const ChangeFFI) -> String {
58 if input.is_null() {
59 return "<null>".to_string();
60 }
61 let ffi = unsafe { &*input };
62 let types = if !ffi.diffs.is_null() && ffi.diff_count > 0 {
63 let diffs: &[DiffFFI] = unsafe { slice::from_raw_parts(ffi.diffs, ffi.diff_count) };
64 let names: Vec<&'static str> = diffs
65 .iter()
66 .map(|d| match d.diff_type {
67 DiffType::Insert => "Insert",
68 DiffType::Update => "Update",
69 DiffType::Remove => "Remove",
70 })
71 .collect();
72 format!("[{}]", names.join(", "))
73 } else {
74 "[]".to_string()
75 };
76 format!(
77 "origin={}({}) diff_count={} diff_types={}",
78 origin_type_name(ffi.origin.origin),
79 ffi.origin.id,
80 ffi.diff_count,
81 types,
82 )
83}
84
85fn describe_panic_payload(payload: &Box<dyn Any + Send>) -> String {
86 if let Some(s) = payload.downcast_ref::<&'static str>() {
87 s.to_string()
88 } else if let Some(s) = payload.downcast_ref::<String>() {
89 s.clone()
90 } else {
91 format!("<non-string panic payload, TypeId={:?}>", (**payload).type_id())
92 }
93}
94
95fn code_meaning(code: i32) -> &'static str {
96 match code {
97 -2 => "operator returned Err",
98 -3 => "unmarshal failed",
99 -99 => "panic caught in catch_unwind",
100 _ => "unknown",
101 }
102}
103
104fn print_ffi_fatal(
105 entry: &str,
106 operator: &str,
107 code: i32,
108 detail: &str,
109 input_description: Option<&str>,
110 backtrace: Option<&Backtrace>,
111) {
112 let mut err = io::stderr().lock();
113 let _ = writeln!(err, "========== FFI FATAL ==========");
114 let _ = writeln!(err, "entry: {}", entry);
115 let _ = writeln!(err, "operator: {}", operator);
116 let _ = writeln!(err, "code: {} ({})", code, code_meaning(code));
117 let _ = writeln!(
118 err,
119 "detail: {}",
120 if detail.is_empty() {
121 "<none>"
122 } else {
123 detail
124 }
125 );
126 if let Some(desc) = input_description {
127 let _ = writeln!(err, "input: {}", desc);
128 }
129 if let Some(bt) = backtrace {
130 let _ = writeln!(err, "backtrace:\n{}", bt);
131 }
132 let _ = writeln!(err, "===============================");
133 let _ = err.flush();
134}
135
136pub struct OperatorWrapper<O: FFIOperator> {
137 pub(crate) operator: O,
138}
139
140impl<O: FFIOperator> OperatorWrapper<O> {
141 pub fn new(operator: O) -> Self {
142 Self {
143 operator,
144 }
145 }
146
147 pub fn as_ptr(&mut self) -> *mut c_void {
148 self as *mut _ as *mut c_void
149 }
150
151 pub fn from_ptr(ptr: *mut c_void) -> &'static mut Self {
152 unsafe { &mut *(ptr as *mut Self) }
153 }
154}
155
156#[instrument(name = "flow::operator::ffi::apply", level = "debug", skip_all, fields(
162 operator_type = any::type_name::<O>(),
163))]
164pub unsafe extern "C" fn ffi_apply<O: FFIOperator>(
165 instance: *mut c_void,
166 ctx: *mut ContextFFI,
167 input: *const ChangeFFI,
168) -> i32 {
169 let result = catch_unwind(AssertUnwindSafe(|| {
170 if input.is_null() {
171 set_fatal_detail("ffi_apply: input is null".to_string());
172 return -3;
173 }
174 let wrapper = OperatorWrapper::<O>::from_ptr(instance);
175 let borrowed = unsafe { BorrowedChange::from_raw(input) };
176 let mut op_ctx = OperatorContext::new(ctx);
177 match wrapper.operator.apply(&mut op_ctx, borrowed) {
178 Ok(()) => 0,
179 Err(e) => {
180 warn!(?e, "Apply failed");
181 set_fatal_detail(format!("{:?}", e));
182 -2
183 }
184 }
185 }));
186
187 let (code, backtrace) = match result {
188 Ok(code) => (code, None),
189 Err(payload) => {
190 let bt = Backtrace::force_capture();
191 set_fatal_detail(describe_panic_payload(&payload));
192 error!("Panic in ffi_apply");
193 (-99, Some(bt))
194 }
195 };
196
197 if code < 0 {
198 let detail = take_fatal_detail().unwrap_or_default();
199 let input_desc = unsafe { describe_change_input(input) };
200 print_ffi_fatal(
201 "ffi_apply",
202 any::type_name::<O>(),
203 code,
204 &detail,
205 Some(&input_desc),
206 backtrace.as_ref(),
207 );
208 abort();
209 }
210 code
211}
212
213#[instrument(name = "flow::operator::ffi::pull", level = "debug", skip_all, fields(
219 operator_type = any::type_name::<O>(),
220 row_count = count,
221))]
222pub unsafe extern "C" fn ffi_pull<O: FFIOperator>(
223 instance: *mut c_void,
224 ctx: *mut ContextFFI,
225 row_numbers: *const u64,
226 count: usize,
227) -> i32 {
228 let result = catch_unwind(AssertUnwindSafe(|| {
229 let wrapper = OperatorWrapper::<O>::from_ptr(instance);
230
231 let numbers: Vec<RowNumber> = if !row_numbers.is_null() && count > 0 {
232 unsafe { slice::from_raw_parts(row_numbers, count) }
233 .iter()
234 .map(|&n| RowNumber::from(n))
235 .collect()
236 } else {
237 Vec::new()
238 };
239
240 let mut op_ctx = OperatorContext::new(ctx);
241 match wrapper.operator.pull(&mut op_ctx, &numbers) {
242 Ok(()) => 0,
243 Err(e) => {
244 warn!(?e, "pull failed");
245 set_fatal_detail(format!("{:?}", e));
246 -2
247 }
248 }
249 }));
250
251 let (code, backtrace) = match result {
252 Ok(code) => (code, None),
253 Err(payload) => {
254 let bt = Backtrace::force_capture();
255 set_fatal_detail(describe_panic_payload(&payload));
256 error!("Panic in ffi_pull");
257 (-99, Some(bt))
258 }
259 };
260
261 if code < 0 {
262 let detail = take_fatal_detail().unwrap_or_default();
263 let input_desc = format!("row_count={}", count);
264 print_ffi_fatal(
265 "ffi_pull",
266 any::type_name::<O>(),
267 code,
268 &detail,
269 Some(&input_desc),
270 backtrace.as_ref(),
271 );
272 abort();
273 }
274 code
275}
276
277#[instrument(name = "flow::operator::ffi::tick", level = "debug", skip_all, fields(
282 operator_type = any::type_name::<O>(),
283))]
284pub unsafe extern "C" fn ffi_tick<O: FFIOperator>(
285 instance: *mut c_void,
286 ctx: *mut ContextFFI,
287 timestamp_nanos: u64,
288) -> i32 {
289 let result = catch_unwind(AssertUnwindSafe(|| {
290 let wrapper = OperatorWrapper::<O>::from_ptr(instance);
291
292 let tick = Tick {
293 now: DateTime::from_nanos(timestamp_nanos),
294 };
295 let mut op_ctx = OperatorContext::new(ctx);
296
297 match wrapper.operator.tick(&mut op_ctx, tick) {
298 Ok(true) => 0,
299 Ok(false) => 1,
300 Err(e) => {
301 warn!(?e, "Tick failed");
302 set_fatal_detail(format!("{:?}", e));
303 -2
304 }
305 }
306 }));
307
308 let (code, backtrace) = match result {
309 Ok(code) => (code, None),
310 Err(payload) => {
311 let bt = Backtrace::force_capture();
312 set_fatal_detail(describe_panic_payload(&payload));
313 error!("Panic in ffi_tick");
314 (-99, Some(bt))
315 }
316 };
317
318 if code < 0 {
319 let detail = take_fatal_detail().unwrap_or_default();
320 let input_desc = format!("timestamp_nanos={}", timestamp_nanos);
321 print_ffi_fatal(
322 "ffi_tick",
323 any::type_name::<O>(),
324 code,
325 &detail,
326 Some(&input_desc),
327 backtrace.as_ref(),
328 );
329 abort();
330 }
331 code
332}
333
334pub unsafe extern "C" fn ffi_destroy<O: FFIOperator>(instance: *mut c_void) {
339 if instance.is_null() {
340 return;
341 }
342
343 let result = catch_unwind(AssertUnwindSafe(|| unsafe {
344 let _wrapper = Box::from_raw(instance as *mut OperatorWrapper<O>);
345 }));
346
347 if let Err(payload) = result {
348 let bt = Backtrace::force_capture();
349 let detail = describe_panic_payload(&payload);
350 error!("Panic in ffi_destroy - aborting");
351 print_ffi_fatal("ffi_destroy", any::type_name::<O>(), -99, &detail, None, Some(&bt));
352 abort();
353 }
354}
355
356pub unsafe extern "C" fn ffi_flush_state<O: FFIOperator>(instance: *mut c_void, ctx: *mut ContextFFI) -> i32 {
363 if instance.is_null() || ctx.is_null() {
364 return FFI_ERROR_NULL_PTR;
365 }
366
367 let result = catch_unwind(AssertUnwindSafe(|| {
368 let wrapper = unsafe { &mut *(instance as *mut OperatorWrapper<O>) };
369 let mut op_ctx = OperatorContext::new(ctx);
370 wrapper.operator.flush_state(&mut op_ctx)
371 }));
372
373 match result {
374 Ok(Ok(())) => 0,
375 Ok(Err(e)) => {
376 error!(target: "ffi_flush_state", "operator flush_state failed: {:?}", e);
377 -2
378 }
379 Err(payload) => {
380 let bt = Backtrace::force_capture();
381 let detail = describe_panic_payload(&payload);
382 error!("Panic in ffi_flush_state - aborting");
383 print_ffi_fatal("ffi_flush_state", any::type_name::<O>(), -99, &detail, None, Some(&bt));
384 abort();
385 }
386 }
387}
388
389pub fn create_vtable<O: FFIOperator>() -> OperatorVTableFFI {
390 OperatorVTableFFI {
391 apply: ffi_apply::<O>,
392 pull: ffi_pull::<O>,
393 tick: ffi_tick::<O>,
394 destroy: ffi_destroy::<O>,
395 flush_state: ffi_flush_state::<O>,
396 }
397}