reifydb_sub_flow/operator/
ffi.rs1#![cfg(reifydb_target = "native")]
2use std::{
8 cell::RefCell,
9 ffi::c_void,
10 panic::{AssertUnwindSafe, catch_unwind},
11 process::abort,
12 result::Result as StdResult,
13};
14
15use reifydb_abi::{
16 context::context::ContextFFI,
17 data::column::ColumnsFFI,
18 flow::change::ChangeFFI,
19 operator::{descriptor::OperatorDescriptorFFI, vtable::OperatorVTableFFI},
20};
21use reifydb_core::{
22 interface::{catalog::flow::FlowNodeId, change::Change},
23 value::column::columns::Columns,
24};
25use reifydb_engine::vm::executor::Executor;
26use reifydb_sdk::{error::FFIError, ffi::arena::Arena};
27use reifydb_type::{Result, value::row_number::RowNumber};
28use tracing::{Span, error, field, instrument};
29
30use crate::{
31 ffi::{callbacks::create_host_callbacks, context::new_ffi_context},
32 operator::Operator,
33 transaction::FlowTransaction,
34};
35
36pub struct FFIOperator {
38 descriptor: OperatorDescriptorFFI,
40 vtable: OperatorVTableFFI,
42 instance: *mut c_void,
44 operator_id: FlowNodeId,
46 executor: Executor,
48 arena: RefCell<Arena>,
50}
51
52impl FFIOperator {
53 pub fn new(
55 descriptor: OperatorDescriptorFFI,
56 instance: *mut c_void,
57 operator_id: FlowNodeId,
58 executor: Executor,
59 ) -> Self {
60 let vtable = descriptor.vtable;
61
62 Self {
63 descriptor,
64 vtable,
65 instance,
66 operator_id,
67 executor,
68 arena: RefCell::new(Arena::new()),
69 }
70 }
71
72 pub(crate) fn descriptor(&self) -> &OperatorDescriptorFFI {
74 &self.descriptor
75 }
76}
77
78unsafe impl Send for FFIOperator {}
81unsafe impl Sync for FFIOperator {}
82
83impl Drop for FFIOperator {
84 fn drop(&mut self) {
85 if !self.instance.is_null() {
87 (self.vtable.destroy)(self.instance);
88 }
89 }
90}
91
92#[inline]
94#[instrument(name = "flow::ffi::marshal", level = "trace", skip_all)]
95fn marshal_input(arena: &mut Arena, change: &Change) -> ChangeFFI {
96 arena.marshal_change(change)
97}
98
99#[inline]
101#[instrument(name = "flow::ffi::vtable_call", level = "trace", skip_all, fields(operator_id = operator_id.0))]
102fn call_vtable(
103 vtable: &OperatorVTableFFI,
104 instance: *mut c_void,
105 ffi_ctx_ptr: *mut ContextFFI,
106 ffi_input: &ChangeFFI,
107 ffi_output: &mut ChangeFFI,
108 operator_id: FlowNodeId,
109) -> i32 {
110 let result = catch_unwind(AssertUnwindSafe(|| (vtable.apply)(instance, ffi_ctx_ptr, ffi_input, ffi_output)));
111
112 match result {
113 Ok(code) => code,
114 Err(panic_info) => {
115 let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
116 s.to_string()
117 } else if let Some(s) = panic_info.downcast_ref::<String>() {
118 s.clone()
119 } else {
120 "Unknown panic".to_string()
121 };
122 error!(operator_id = operator_id.0, "FFI operator panicked during apply: {}", msg);
123 abort();
124 }
125 }
126}
127
128#[inline]
130#[instrument(name = "flow::ffi::unmarshal", level = "trace", skip_all)]
131fn unmarshal_output(arena: &mut Arena, ffi_output: &ChangeFFI) -> StdResult<Change, String> {
132 arena.unmarshal_change(ffi_output)
133}
134
135impl Operator for FFIOperator {
136 fn id(&self) -> FlowNodeId {
137 self.operator_id
138 }
139
140 #[instrument(name = "flow::ffi::apply", level = "debug", skip_all, fields(
141 operator_id = self.operator_id.0,
142 input_diff_count = change.diffs.len(),
143 output_diff_count = field::Empty
144 ))]
145 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
146 let mut arena = self.arena.borrow_mut();
147
148 let ffi_input = marshal_input(&mut arena, &change);
149
150 let mut ffi_output = ChangeFFI::empty();
151
152 let ffi_ctx = new_ffi_context(txn, &self.executor, self.operator_id, create_host_callbacks());
153 let ffi_ctx_ptr = &ffi_ctx as *const _ as *mut ContextFFI;
154
155 let result_code = call_vtable(
156 &self.vtable,
157 self.instance,
158 ffi_ctx_ptr,
159 &ffi_input,
160 &mut ffi_output,
161 self.operator_id,
162 );
163
164 if result_code != 0 {
165 return Err(
166 FFIError::Other(format!("FFI operator apply failed with code: {}", result_code)).into()
167 );
168 }
169
170 let output_change = unmarshal_output(&mut arena, &ffi_output).map_err(|e| FFIError::Other(e))?;
171
172 arena.clear();
174
175 Span::current().record("output_diff_count", output_change.diffs.len());
176
177 Ok(output_change)
178 }
179
180 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
181 let mut arena = self.arena.borrow_mut();
182
183 let row_numbers: Vec<u64> = rows.iter().map(|r| (*r).into()).collect();
184
185 let mut ffi_output = ColumnsFFI::empty();
186
187 let ffi_ctx = new_ffi_context(txn, &self.executor, self.operator_id, create_host_callbacks());
188 let ffi_ctx_ptr = &ffi_ctx as *const _ as *mut ContextFFI;
189
190 let result = catch_unwind(AssertUnwindSafe(|| {
192 (self.vtable.pull)(
193 self.instance,
194 ffi_ctx_ptr,
195 row_numbers.as_ptr(),
196 row_numbers.len(),
197 &mut ffi_output,
198 )
199 }));
200
201 let result_code = match result {
202 Ok(code) => code,
203 Err(panic_info) => {
204 let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
205 s.to_string()
206 } else if let Some(s) = panic_info.downcast_ref::<String>() {
207 s.clone()
208 } else {
209 "Unknown panic".to_string()
210 };
211 error!(operator_id = self.operator_id.0, "FFI operator panicked during pull: {}", msg);
212 abort();
213 }
214 };
215
216 if result_code != 0 {
217 return Err(
218 FFIError::Other(format!("FFI operator pull failed with code: {}", result_code)).into()
219 );
220 }
221
222 let columns = arena.unmarshal_columns(&ffi_output);
224
225 arena.clear();
227
228 Ok(columns)
229 }
230}