reifydb_extension/transform/
ffi.rs1use std::{
7 cell::RefCell,
8 ffi::c_void,
9 panic::{AssertUnwindSafe, catch_unwind},
10 process::abort,
11};
12
13use reifydb_abi::{
14 data::column::ColumnsFFI,
15 transform::{descriptor::TransformDescriptorFFI, vtable::TransformVTableFFI},
16};
17use reifydb_core::value::column::columns::Columns;
18use reifydb_sdk::{error::FFIError, ffi::arena::Arena};
19use reifydb_type::{self, Result};
20use tracing::{error, instrument};
21
22use super::{Transform, context::TransformContext};
23
24pub struct NativeTransformFFI {
26 #[allow(dead_code)]
28 descriptor: TransformDescriptorFFI,
29 vtable: TransformVTableFFI,
31 instance: *mut c_void,
33 arena: RefCell<Arena>,
35}
36
37impl NativeTransformFFI {
38 pub fn new(descriptor: TransformDescriptorFFI, instance: *mut c_void) -> Self {
40 let vtable = descriptor.vtable;
41
42 Self {
43 descriptor,
44 vtable,
45 instance,
46 arena: RefCell::new(Arena::new()),
47 }
48 }
49
50 #[allow(dead_code)]
52 pub(crate) fn descriptor(&self) -> &TransformDescriptorFFI {
53 &self.descriptor
54 }
55}
56
57unsafe impl Send for NativeTransformFFI {}
60unsafe impl Sync for NativeTransformFFI {}
61
62impl Drop for NativeTransformFFI {
63 fn drop(&mut self) {
64 if !self.instance.is_null() {
65 unsafe { (self.vtable.destroy)(self.instance) };
66 }
67 }
68}
69
70impl Transform for NativeTransformFFI {
71 #[instrument(name = "transform::ffi::apply", level = "debug", skip_all)]
72 fn apply(&self, _ctx: &TransformContext, input: Columns) -> Result<Columns> {
73 let mut arena = self.arena.borrow_mut();
74
75 let ffi_input = arena.marshal_columns(&input);
76 let mut ffi_output = ColumnsFFI::empty();
77
78 let result = catch_unwind(AssertUnwindSafe(|| unsafe {
79 (self.vtable.transform)(self.instance, &ffi_input, &mut ffi_output)
80 }));
81
82 let result_code = match result {
83 Ok(code) => code,
84 Err(panic_info) => {
85 let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
86 s.to_string()
87 } else if let Some(s) = panic_info.downcast_ref::<String>() {
88 s.clone()
89 } else {
90 "Unknown panic".to_string()
91 };
92 error!("FFI transform panicked during apply: {}", msg);
93 abort();
94 }
95 };
96
97 if result_code != 0 {
98 arena.clear();
99 return Err(FFIError::Other(format!("FFI transform apply failed with code: {}", result_code))
100 .into());
101 }
102
103 let columns = arena.unmarshal_columns(&ffi_output);
104
105 arena.clear();
106
107 Ok(columns)
108 }
109}