reifydb_sub_flow/operator/
ffi.rs1use std::{
5 any::Any,
6 cell::{Cell, UnsafeCell},
7 ffi::c_void,
8 panic::{AssertUnwindSafe, catch_unwind},
9 process::abort,
10 ptr,
11};
12
13use reifydb_abi::{
14 callbacks::builder::EmitDiffKind,
15 context::context::ContextFFI,
16 flow::change::ChangeFFI,
17 operator::{descriptor::OperatorDescriptorFFI, vtable::OperatorVTableFFI},
18};
19use reifydb_core::{
20 common::CommitVersion,
21 interface::{
22 catalog::flow::FlowNodeId,
23 change::{Change, Diff, Diffs},
24 },
25 value::column::columns::Columns,
26};
27use reifydb_engine::vm::executor::Executor;
28use reifydb_extension::ffi_callbacks::builder::{BuilderRegistry, with_registry};
29use reifydb_sdk::{error::FFIError, ffi::arena::Arena};
30use reifydb_type::{
31 Result,
32 value::{datetime::DateTime, row_number::RowNumber},
33};
34use tracing::{Span, error, field, instrument};
35
36use crate::{
37 ffi::{callbacks::create_host_callbacks, context::new_ffi_context},
38 operator::Operator,
39 transaction::{FlowTransaction, slot::PersistFn},
40};
41
42thread_local! {
48 static FFI_MARSHAL_ARENA: UnsafeCell<Arena> = UnsafeCell::new(Arena::new());
49}
50
51#[derive(Clone, Copy)]
56struct SendableInstance(*mut c_void);
57unsafe impl Send for SendableInstance {}
58unsafe impl Sync for SendableInstance {}
59
60pub struct FFIOperator {
62 descriptor: OperatorDescriptorFFI,
64 vtable: OperatorVTableFFI,
66 instance: *mut c_void,
68 operator_id: FlowNodeId,
70 executor: Executor,
72 builder_registry: BuilderRegistry,
78 last_registered_txn: Cell<u64>,
83 cached_ctx: UnsafeCell<ContextFFI>,
89}
90
91impl FFIOperator {
92 pub fn new(
94 descriptor: OperatorDescriptorFFI,
95 instance: *mut c_void,
96 operator_id: FlowNodeId,
97 executor: Executor,
98 ) -> Self {
99 let vtable = descriptor.vtable;
100
101 Self {
102 descriptor,
103 vtable,
104 instance,
105 operator_id,
106 executor,
107 builder_registry: BuilderRegistry::new(),
108 last_registered_txn: Cell::new(u64::MAX),
109 cached_ctx: UnsafeCell::new(ContextFFI {
110 txn_ptr: ptr::null_mut(),
111 executor_ptr: ptr::null(),
112 operator_id: operator_id.0,
113 clock_now_nanos: 0,
114 callbacks: create_host_callbacks(),
115 }),
116 }
117 }
118
119 pub(crate) fn descriptor(&self) -> &OperatorDescriptorFFI {
121 &self.descriptor
122 }
123
124 fn ensure_txn_setup(&self, txn: &mut FlowTransaction) -> Result<()> {
125 let txn_version = txn.version().0;
126 if self.last_registered_txn.get() != txn_version {
127 ensure_flush_slot(txn, self.operator_id, self.vtable, self.instance, self.executor.clone())?;
128 self.last_registered_txn.set(txn_version);
129 let ctx = unsafe { &mut *self.cached_ctx.get() };
132 ctx.txn_ptr = txn as *mut _ as *mut c_void;
133 ctx.executor_ptr = &self.executor as *const _ as *const c_void;
134 ctx.clock_now_nanos = txn.clock().now_nanos();
135 }
136 Ok(())
137 }
138}
139
140unsafe impl Send for FFIOperator {}
143unsafe impl Sync for FFIOperator {}
144
145impl Drop for FFIOperator {
146 fn drop(&mut self) {
147 if !self.instance.is_null() {
149 unsafe { (self.vtable.destroy)(self.instance) };
150 }
151 }
152}
153
154#[inline]
156#[instrument(name = "flow::ffi::marshal", level = "trace", skip_all)]
157fn marshal_input(arena: &mut Arena, change: &Change) -> ChangeFFI {
158 arena.marshal_change(change)
159}
160
161#[inline]
163#[instrument(name = "flow::ffi::vtable_call", level = "trace", skip_all, fields(operator_id = operator_id.0))]
164fn call_vtable(
165 vtable: &OperatorVTableFFI,
166 instance: *mut c_void,
167 ffi_ctx_ptr: *mut ContextFFI,
168 ffi_input: &ChangeFFI,
169 operator_id: FlowNodeId,
170) -> i32 {
171 let result = catch_unwind(AssertUnwindSafe(|| unsafe { (vtable.apply)(instance, ffi_ctx_ptr, ffi_input) }));
172
173 match result {
174 Ok(code) => code,
175 Err(panic_info) => {
176 let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
177 s.to_string()
178 } else if let Some(s) = panic_info.downcast_ref::<String>() {
179 s.clone()
180 } else {
181 "Unknown panic".to_string()
182 };
183 error!(operator_id = operator_id.0, "FFI operator panicked during apply: {}", msg);
184 abort();
185 }
186 }
187}
188
189fn ensure_flush_slot(
200 txn: &mut FlowTransaction,
201 operator_id: FlowNodeId,
202 vtable: OperatorVTableFFI,
203 instance: *mut c_void,
204 executor: Executor,
205) -> Result<()> {
206 let send_instance = SendableInstance(instance);
207 let _ = txn.operator_state(operator_id, move |_txn| {
208 let captured_instance = send_instance;
209 let captured_vtable = vtable;
210 let captured_executor = executor;
211 let captured_id = operator_id;
212 let persist: PersistFn = Box::new(move |txn, _value: Box<dyn Any>| {
213 let ffi_ctx = new_ffi_context(txn, &captured_executor, captured_id, create_host_callbacks());
214 let ffi_ctx_ptr = &ffi_ctx as *const _ as *mut ContextFFI;
215 let inst = captured_instance;
216 let result = catch_unwind(AssertUnwindSafe(|| unsafe {
217 (captured_vtable.flush_state)(inst.0, ffi_ctx_ptr)
218 }));
219 match result {
220 Ok(0) => Ok(()),
221 Ok(code) => Err(FFIError::Other(format!(
222 "FFI operator flush_state failed with code: {}",
223 code
224 ))
225 .into()),
226 Err(_) => {
227 error!(operator_id = captured_id.0, "FFI operator panicked during flush_state");
228 abort();
229 }
230 }
231 });
232 Ok(((), persist))
234 })?;
235 txn.mark_state_dirty(operator_id);
236 Ok(())
237}
238
239impl Operator for FFIOperator {
240 fn id(&self) -> FlowNodeId {
241 self.operator_id
242 }
243
244 #[instrument(name = "flow::ffi::apply", level = "debug", skip_all, fields(
245 operator_id = self.operator_id.0,
246 input_diff_count = change.diffs.len(),
247 output_diff_count = field::Empty
248 ))]
249 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
250 self.ensure_txn_setup(txn)?;
251
252 FFI_MARSHAL_ARENA.with(|cell| unsafe { (*cell.get()).clear() });
259 let ffi_input = FFI_MARSHAL_ARENA.with(|cell| marshal_input(unsafe { &mut *cell.get() }, &change));
260
261 let version = change.version;
262 let changed_at = change.changed_at;
263
264 let ffi_ctx_ptr = self.cached_ctx.get();
265
266 let result_code = with_registry(&self.builder_registry, || {
267 call_vtable(&self.vtable, self.instance, ffi_ctx_ptr, &ffi_input, self.operator_id)
268 });
269
270 if result_code != 0 {
271 let _ = self.builder_registry.drain();
273 return Err(
274 FFIError::Other(format!("FFI operator apply failed with code: {}", result_code)).into()
275 );
276 }
277
278 let output_change = drain_emitted_diffs(&self.builder_registry, self.operator_id, version, changed_at);
279
280 Span::current().record("output_diff_count", output_change.diffs.len());
281
282 Ok(output_change)
283 }
284
285 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
286 self.ensure_txn_setup(txn)?;
287
288 let row_numbers: Vec<u64> = rows.iter().map(|r| (*r).into()).collect();
289 let ffi_ctx_ptr = self.cached_ctx.get();
290
291 let result_code = self.invoke_under_panic_guard("pull", || unsafe {
292 (self.vtable.pull)(self.instance, ffi_ctx_ptr, row_numbers.as_ptr(), row_numbers.len())
293 });
294
295 if result_code != 0 {
296 let _ = self.builder_registry.drain();
297 return Err(
298 FFIError::Other(format!("FFI operator pull failed with code: {}", result_code)).into()
299 );
300 }
301
302 let mut diffs = self.builder_registry.drain();
306 let columns = if let Some(first) = diffs.drain(..).next() {
307 first.post.or(first.pre).unwrap_or_else(Columns::empty)
308 } else {
309 Columns::empty()
310 };
311
312 Ok(columns)
313 }
314
315 #[instrument(name = "flow::ffi::tick", level = "debug", skip_all, fields(
316 operator_id = self.operator_id.0,
317 output_diff_count = field::Empty
318 ))]
319 fn tick(&self, txn: &mut FlowTransaction, timestamp: DateTime) -> Result<Option<Change>> {
320 self.ensure_txn_setup(txn)?;
321
322 let timestamp_nanos = timestamp.to_nanos();
323 let ffi_ctx_ptr = self.cached_ctx.get();
324
325 let result_code = self.invoke_under_panic_guard("tick", || unsafe {
326 (self.vtable.tick)(self.instance, ffi_ctx_ptr, timestamp_nanos)
327 });
328
329 if result_code < 0 {
330 let _ = self.builder_registry.drain();
331 return Err(
332 FFIError::Other(format!("FFI operator tick failed with code: {}", result_code)).into()
333 );
334 }
335
336 if result_code == 1 {
337 let _ = self.builder_registry.drain();
339 return Ok(None);
340 }
341
342 let version = CommitVersion(timestamp_nanos);
346 let output_change = drain_emitted_diffs(&self.builder_registry, self.operator_id, version, timestamp);
347 Span::current().record("output_diff_count", output_change.diffs.len());
348 Ok(Some(output_change))
349 }
350}
351
352impl FFIOperator {
353 #[inline]
354 fn invoke_under_panic_guard<F>(&self, op: &'static str, call: F) -> i32
355 where
356 F: FnOnce() -> i32,
357 {
358 with_registry(&self.builder_registry, || {
359 let result = catch_unwind(AssertUnwindSafe(call));
360 match result {
361 Ok(code) => code,
362 Err(panic_info) => {
363 let msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
364 s.to_string()
365 } else if let Some(s) = panic_info.downcast_ref::<String>() {
366 s.clone()
367 } else {
368 "Unknown panic".to_string()
369 };
370 error!(
371 operator_id = self.operator_id.0,
372 "FFI operator panicked during {}: {}", op, msg
373 );
374 abort();
375 }
376 }
377 })
378 }
379}
380
381fn drain_emitted_diffs(
384 registry: &BuilderRegistry,
385 operator_id: FlowNodeId,
386 version: CommitVersion,
387 changed_at: DateTime,
388) -> Change {
389 let emitted = registry.drain();
390 let diffs: Diffs = emitted
391 .into_iter()
392 .map(|d| match d.kind {
393 EmitDiffKind::Insert => Diff::insert(d.post.unwrap_or_else(Columns::empty)),
394 EmitDiffKind::Update => Diff::update(
395 d.pre.unwrap_or_else(Columns::empty),
396 d.post.unwrap_or_else(Columns::empty),
397 ),
398 EmitDiffKind::Remove => Diff::remove(d.pre.unwrap_or_else(Columns::empty)),
399 })
400 .collect();
401 Change::from_flow(operator_id, version, diffs, changed_at)
402}