reifydb_engine/transform/
ffi.rs1#![cfg(reifydb_target = "native")]
2use std::{
8 cell::RefCell,
9 ffi::c_void,
10 panic::{AssertUnwindSafe, catch_unwind},
11 process::abort,
12};
13
14use reifydb_abi::{
15 data::column::ColumnsFFI,
16 transform::{descriptor::TransformDescriptorFFI, vtable::TransformVTableFFI},
17};
18use reifydb_core::value::column::columns::Columns;
19use reifydb_sdk::{error::FFIError, ffi::arena::Arena};
20use reifydb_type::{self, Result};
21use tracing::{error, instrument};
22
23use super::{Transform, context::TransformContext};
24
25pub struct NativeTransformFFI {
27 #[allow(dead_code)]
29 descriptor: TransformDescriptorFFI,
30 vtable: TransformVTableFFI,
32 instance: *mut c_void,
34 arena: RefCell<Arena>,
36}
37
38impl NativeTransformFFI {
39 pub fn new(descriptor: TransformDescriptorFFI, instance: *mut c_void) -> Self {
41 let vtable = descriptor.vtable;
42
43 Self {
44 descriptor,
45 vtable,
46 instance,
47 arena: RefCell::new(Arena::new()),
48 }
49 }
50
51 #[allow(dead_code)]
53 pub(crate) fn descriptor(&self) -> &TransformDescriptorFFI {
54 &self.descriptor
55 }
56}
57
58unsafe impl Send for NativeTransformFFI {}
61unsafe impl Sync for NativeTransformFFI {}
62
63impl Drop for NativeTransformFFI {
64 fn drop(&mut self) {
65 if !self.instance.is_null() {
66 (self.vtable.destroy)(self.instance);
67 }
68 }
69}
70
71impl Transform for NativeTransformFFI {
72 #[instrument(name = "transform::ffi::apply", level = "debug", skip_all)]
73 fn apply(&self, _ctx: &TransformContext, input: Columns) -> Result<Columns> {
74 let mut arena = self.arena.borrow_mut();
75
76 let ffi_input = arena.marshal_columns(&input);
77 let mut ffi_output = ColumnsFFI::empty();
78
79 let result = catch_unwind(AssertUnwindSafe(|| {
80 (self.vtable.transform)(self.instance, &ffi_input, &mut ffi_output)
81 }));
82
83 let result_code = match result {
84 Ok(code) => code,
85 Err(panic_info) => {
86 let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
87 s.to_string()
88 } else if let Some(s) = panic_info.downcast_ref::<String>() {
89 s.clone()
90 } else {
91 "Unknown panic".to_string()
92 };
93 error!("FFI transform panicked during apply: {}", msg);
94 abort();
95 }
96 };
97
98 if result_code != 0 {
99 arena.clear();
100 return Err(FFIError::Other(format!("FFI transform apply failed with code: {}", result_code))
101 .into());
102 }
103
104 let columns = arena.unmarshal_columns(&ffi_output);
105
106 arena.clear();
107
108 Ok(columns)
109 }
110}