use std::{
any,
cell::RefCell,
ffi::c_void,
panic::{AssertUnwindSafe, catch_unwind},
process::abort,
slice,
};
use reifydb_abi::{
context::context::ContextFFI, data::column::ColumnsFFI, flow::change::ChangeFFI,
operator::vtable::OperatorVTableFFI,
};
use reifydb_core::interface::change::Change;
use reifydb_type::value::row_number::RowNumber;
use tracing::{Span, error, instrument, warn};
use crate::{
ffi::Arena,
operator::{FFIOperator, context::OperatorContext},
};
pub struct OperatorWrapper<O: FFIOperator> {
operator: O,
arena: RefCell<Arena>,
}
impl<O: FFIOperator> OperatorWrapper<O> {
pub fn new(operator: O) -> Self {
Self {
operator,
arena: RefCell::new(Arena::new()),
}
}
pub fn as_ptr(&mut self) -> *mut c_void {
self as *mut _ as *mut c_void
}
pub fn from_ptr(ptr: *mut c_void) -> &'static mut Self {
unsafe { &mut *(ptr as *mut Self) }
}
}
#[inline]
#[instrument(name = "unmarshal", level = "trace", skip_all)]
fn unmarshal_input<O: FFIOperator>(arena: &mut Arena, input: *const ChangeFFI) -> Result<Change, i32> {
unsafe {
match arena.unmarshal_change(&*input) {
Ok(change) => Ok(change),
Err(e) => {
warn!(?e, "Unmarshal failed");
Err(-3)
}
}
}
}
#[inline]
#[instrument(name = "operator_apply", level = "trace", skip_all)]
fn apply_operator<O: FFIOperator>(operator: &mut O, ctx: *mut ContextFFI, input_change: Change) -> Result<Change, i32> {
let mut op_ctx = OperatorContext::new(ctx);
match operator.apply(&mut op_ctx, input_change) {
Ok(change) => Ok(change),
Err(e) => {
warn!(?e, "Apply failed");
Err(-2)
}
}
}
#[inline]
#[instrument(name = "marshal", level = "trace", skip_all)]
fn marshal_output(arena: &mut Arena, output_change: &Change, output: *mut ChangeFFI) {
unsafe {
*output = arena.marshal_change(output_change);
}
}
#[instrument(name = "flow::operator::ffi::apply", level = "debug", skip_all, fields(
operator_type = any::type_name::<O>(),
input_diffs,
output_diffs
))]
pub extern "C" fn ffi_apply<O: FFIOperator>(
instance: *mut c_void,
ctx: *mut ContextFFI,
input: *const ChangeFFI,
output: *mut ChangeFFI,
) -> i32 {
let result = catch_unwind(AssertUnwindSafe(|| {
let wrapper = OperatorWrapper::<O>::from_ptr(instance);
let mut arena = wrapper.arena.borrow_mut();
arena.clear();
let input_change = match unmarshal_input::<O>(&mut arena, input) {
Ok(change) => {
Span::current().record("input_diffs", change.diffs.len());
change
}
Err(code) => return code,
};
let output_change = match apply_operator::<O>(&mut wrapper.operator, ctx, input_change) {
Ok(change) => {
Span::current().record("output_diffs", change.diffs.len());
change
}
Err(code) => return code,
};
marshal_output(&mut arena, &output_change, output);
0 }));
let code = result.unwrap_or_else(|e| {
error!(?e, "Panic in ffi_apply");
-99
});
if code < 0 {
error!(code, "ffi_apply failed - aborting");
abort();
}
code
}
#[instrument(name = "flow::operator::ffi::pull", level = "debug", skip_all, fields(
operator_type = any::type_name::<O>(),
row_count = count,
rows_returned
))]
pub extern "C" fn ffi_pull<O: FFIOperator>(
instance: *mut c_void,
ctx: *mut ContextFFI,
row_numbers: *const u64,
count: usize,
output: *mut ColumnsFFI,
) -> i32 {
let result = catch_unwind(AssertUnwindSafe(|| {
unsafe {
let wrapper = OperatorWrapper::<O>::from_ptr(instance);
let mut arena = wrapper.arena.borrow_mut();
arena.clear();
let numbers: Vec<RowNumber> = if !row_numbers.is_null() && count > 0 {
slice::from_raw_parts(row_numbers, count).iter().map(|&n| RowNumber::from(n)).collect()
} else {
Vec::new()
};
let mut op_ctx = OperatorContext::new(ctx);
let columns = match wrapper.operator.pull(&mut op_ctx, &numbers) {
Ok(cols) => {
Span::current().record("rows_returned", cols.row_count());
cols
}
Err(e) => {
warn!(?e, "pull failed");
return -2;
}
};
*output = arena.marshal_columns(&columns);
0 }
}));
let code = result.unwrap_or_else(|e| {
error!(?e, "Panic in ffi_pull");
-99
});
if code < 0 {
error!(code, "ffi_pull failed - aborting");
abort();
}
code
}
pub extern "C" fn ffi_destroy<O: FFIOperator>(instance: *mut c_void) {
if instance.is_null() {
return;
}
let result = catch_unwind(AssertUnwindSafe(|| unsafe {
let _wrapper = Box::from_raw(instance as *mut OperatorWrapper<O>);
}));
if let Err(e) = result {
error!(?e, "Panic in ffi_destroy - aborting");
abort();
}
}
pub fn create_vtable<O: FFIOperator>() -> OperatorVTableFFI {
OperatorVTableFFI {
apply: ffi_apply::<O>,
pull: ffi_pull::<O>,
destroy: ffi_destroy::<O>,
}
}