reifydb_sub_flow/operator/
ffi.rs1#![cfg(reifydb_target = "native")]
5use std::{
11 cell::RefCell,
12 ffi::c_void,
13 panic::{AssertUnwindSafe, catch_unwind},
14 process::abort,
15 result::Result as StdResult,
16};
17
18use reifydb_abi::{
19 context::context::ContextFFI,
20 data::column::ColumnsFFI,
21 flow::change::ChangeFFI,
22 operator::{descriptor::OperatorDescriptorFFI, vtable::OperatorVTableFFI},
23};
24use reifydb_core::{
25 interface::{catalog::flow::FlowNodeId, change::Change},
26 value::column::columns::Columns,
27};
28use reifydb_engine::vm::executor::Executor;
29use reifydb_sdk::{error::FFIError, ffi::arena::Arena};
30use reifydb_type::{
31 Result,
32 value::{datetime::DateTime, row_number::RowNumber},
33};
34use tracing::{Span, error, field, instrument};
35
36use crate::{
37 ffi::{callbacks::create_host_callbacks, context::new_ffi_context},
38 operator::Operator,
39 transaction::FlowTransaction,
40};
41
42pub struct FFIOperator {
44 descriptor: OperatorDescriptorFFI,
46 vtable: OperatorVTableFFI,
48 instance: *mut c_void,
50 operator_id: FlowNodeId,
52 executor: Executor,
54 arena: RefCell<Arena>,
56}
57
58impl FFIOperator {
59 pub fn new(
61 descriptor: OperatorDescriptorFFI,
62 instance: *mut c_void,
63 operator_id: FlowNodeId,
64 executor: Executor,
65 ) -> Self {
66 let vtable = descriptor.vtable;
67
68 Self {
69 descriptor,
70 vtable,
71 instance,
72 operator_id,
73 executor,
74 arena: RefCell::new(Arena::new()),
75 }
76 }
77
78 pub(crate) fn descriptor(&self) -> &OperatorDescriptorFFI {
80 &self.descriptor
81 }
82}
83
84unsafe impl Send for FFIOperator {}
87unsafe impl Sync for FFIOperator {}
88
89impl Drop for FFIOperator {
90 fn drop(&mut self) {
91 if !self.instance.is_null() {
93 (self.vtable.destroy)(self.instance);
94 }
95 }
96}
97
98#[inline]
100#[instrument(name = "flow::ffi::marshal", level = "trace", skip_all)]
101fn marshal_input(arena: &mut Arena, change: &Change) -> ChangeFFI {
102 arena.marshal_change(change)
103}
104
105#[inline]
107#[instrument(name = "flow::ffi::vtable_call", level = "trace", skip_all, fields(operator_id = operator_id.0))]
108fn call_vtable(
109 vtable: &OperatorVTableFFI,
110 instance: *mut c_void,
111 ffi_ctx_ptr: *mut ContextFFI,
112 ffi_input: &ChangeFFI,
113 ffi_output: &mut ChangeFFI,
114 operator_id: FlowNodeId,
115) -> i32 {
116 let result = catch_unwind(AssertUnwindSafe(|| (vtable.apply)(instance, ffi_ctx_ptr, ffi_input, ffi_output)));
117
118 match result {
119 Ok(code) => code,
120 Err(panic_info) => {
121 let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
122 s.to_string()
123 } else if let Some(s) = panic_info.downcast_ref::<String>() {
124 s.clone()
125 } else {
126 "Unknown panic".to_string()
127 };
128 error!(operator_id = operator_id.0, "FFI operator panicked during apply: {}", msg);
129 abort();
130 }
131 }
132}
133
134#[inline]
136#[instrument(name = "flow::ffi::unmarshal", level = "trace", skip_all)]
137fn unmarshal_output(arena: &mut Arena, ffi_output: &ChangeFFI) -> StdResult<Change, String> {
138 arena.unmarshal_change(ffi_output)
139}
140
141impl Operator for FFIOperator {
142 fn id(&self) -> FlowNodeId {
143 self.operator_id
144 }
145
146 #[instrument(name = "flow::ffi::apply", level = "debug", skip_all, fields(
147 operator_id = self.operator_id.0,
148 input_diff_count = change.diffs.len(),
149 output_diff_count = field::Empty
150 ))]
151 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
152 let mut arena = self.arena.borrow_mut();
153
154 let ffi_input = marshal_input(&mut arena, &change);
155
156 let mut ffi_output = ChangeFFI::empty();
157
158 let ffi_ctx = new_ffi_context(txn, &self.executor, self.operator_id, create_host_callbacks());
159 let ffi_ctx_ptr = &ffi_ctx as *const _ as *mut ContextFFI;
160
161 let result_code = call_vtable(
162 &self.vtable,
163 self.instance,
164 ffi_ctx_ptr,
165 &ffi_input,
166 &mut ffi_output,
167 self.operator_id,
168 );
169
170 if result_code != 0 {
171 return Err(
172 FFIError::Other(format!("FFI operator apply failed with code: {}", result_code)).into()
173 );
174 }
175
176 let output_change = unmarshal_output(&mut arena, &ffi_output).map_err(|e| FFIError::Other(e))?;
177
178 arena.clear();
180
181 Span::current().record("output_diff_count", output_change.diffs.len());
182
183 Ok(output_change)
184 }
185
186 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
187 let mut arena = self.arena.borrow_mut();
188
189 let row_numbers: Vec<u64> = rows.iter().map(|r| (*r).into()).collect();
190
191 let mut ffi_output = ColumnsFFI::empty();
192
193 let ffi_ctx = new_ffi_context(txn, &self.executor, self.operator_id, create_host_callbacks());
194 let ffi_ctx_ptr = &ffi_ctx as *const _ as *mut ContextFFI;
195
196 let result = catch_unwind(AssertUnwindSafe(|| {
198 (self.vtable.pull)(
199 self.instance,
200 ffi_ctx_ptr,
201 row_numbers.as_ptr(),
202 row_numbers.len(),
203 &mut ffi_output,
204 )
205 }));
206
207 let result_code = match result {
208 Ok(code) => code,
209 Err(panic_info) => {
210 let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
211 s.to_string()
212 } else if let Some(s) = panic_info.downcast_ref::<String>() {
213 s.clone()
214 } else {
215 "Unknown panic".to_string()
216 };
217 error!(operator_id = self.operator_id.0, "FFI operator panicked during pull: {}", msg);
218 abort();
219 }
220 };
221
222 if result_code != 0 {
223 return Err(
224 FFIError::Other(format!("FFI operator pull failed with code: {}", result_code)).into()
225 );
226 }
227
228 let columns = arena.unmarshal_columns(&ffi_output);
230
231 arena.clear();
233
234 Ok(columns)
235 }
236
237 #[instrument(name = "flow::ffi::tick", level = "debug", skip_all, fields(
238 operator_id = self.operator_id.0,
239 output_diff_count = field::Empty
240 ))]
241 fn tick(&self, txn: &mut FlowTransaction, timestamp: DateTime) -> Result<Option<Change>> {
242 let mut arena = self.arena.borrow_mut();
243
244 let timestamp_nanos = timestamp.to_nanos();
245 let mut ffi_output = ChangeFFI::empty();
246
247 let ffi_ctx = new_ffi_context(txn, &self.executor, self.operator_id, create_host_callbacks());
248 let ffi_ctx_ptr = &ffi_ctx as *const _ as *mut ContextFFI;
249
250 let result = catch_unwind(AssertUnwindSafe(|| {
251 (self.vtable.tick)(self.instance, ffi_ctx_ptr, timestamp_nanos, &mut ffi_output)
252 }));
253
254 let result_code = match result {
255 Ok(code) => code,
256 Err(panic_info) => {
257 let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
258 s.to_string()
259 } else if let Some(s) = panic_info.downcast_ref::<String>() {
260 s.clone()
261 } else {
262 "Unknown panic".to_string()
263 };
264 error!(operator_id = self.operator_id.0, "FFI operator panicked during tick: {}", msg);
265 abort();
266 }
267 };
268
269 if result_code < 0 {
270 return Err(
271 FFIError::Other(format!("FFI operator tick failed with code: {}", result_code)).into()
272 );
273 }
274
275 if result_code == 1 {
276 arena.clear();
278 return Ok(None);
279 }
280
281 let output_change = unmarshal_output(&mut arena, &ffi_output).map_err(|e| FFIError::Other(e))?;
282
283 arena.clear();
284
285 Span::current().record("output_diff_count", output_change.diffs.len());
286
287 Ok(Some(output_change))
288 }
289}