reifydb_flow_operator_sdk/ffi/
wrapper.rs1use std::{
4 cell::RefCell,
5 ffi::c_void,
6 panic::{AssertUnwindSafe, catch_unwind},
7 sync::Mutex,
8};
9
10use reifydb_flow_operator_abi::*;
11use reifydb_type::RowNumber;
12use tracing::{debug_span, instrument, warn};
13
14use crate::{FFIOperator, context::OperatorContext, marshal::Marshaller};
15
16pub struct OperatorWrapper<O: FFIOperator> {
18 operator: Mutex<O>,
19 marshaller: RefCell<Marshaller>,
20}
21
22impl<O: FFIOperator> OperatorWrapper<O> {
23 pub fn new(operator: O) -> Self {
25 Self {
26 operator: Mutex::new(operator),
27 marshaller: RefCell::new(Marshaller::new()),
28 }
29 }
30
31 pub fn as_ptr(&mut self) -> *mut c_void {
33 self as *mut _ as *mut c_void
34 }
35
36 pub fn from_ptr(ptr: *mut c_void) -> &'static mut Self {
38 unsafe { &mut *(ptr as *mut Self) }
39 }
40}
41
42#[instrument(name = "flow::operator::ffi::apply", level = "debug", skip_all, fields(
43 operator_type = std::any::type_name::<O>(),
44 input_diffs,
45 output_diffs
46))]
47pub extern "C" fn ffi_apply<O: FFIOperator>(
48 instance: *mut c_void,
49 ctx: *mut FFIContext,
50 input: *const FlowChangeFFI,
51 output: *mut FlowChangeFFI,
52) -> i32 {
53 let result = catch_unwind(AssertUnwindSafe(|| {
54 unsafe {
55 let wrapper = OperatorWrapper::<O>::from_ptr(instance);
56 let mut operator = match wrapper.operator.lock() {
57 Ok(op) => op,
58 Err(_) => {
59 warn!("Failed to lock operator");
60 return -1;
61 }
62 };
63
64 let mut marshaller = wrapper.marshaller.borrow_mut();
65 marshaller.clear();
66
67 let unmarshal_span = debug_span!("unmarshal");
69 let _guard = unmarshal_span.enter();
70 let input_change = match marshaller.unmarshal_flow_change(&*input) {
71 Ok(change) => {
72 tracing::Span::current().record("input_diffs", change.diffs.len());
73 change
74 }
75 Err(e) => {
76 warn!(?e, "Unmarshal failed");
77 return -3;
78 }
79 };
80 drop(_guard);
81
82 let apply_span = debug_span!("operator_apply");
84 let _guard = apply_span.enter();
85 let mut op_ctx = OperatorContext::new(ctx);
86 let output_change = match operator.apply(&mut op_ctx, input_change) {
87 Ok(change) => {
88 tracing::Span::current().record("output_diffs", change.diffs.len());
89 change
90 }
91 Err(e) => {
92 warn!(?e, "Apply failed");
93 return -2;
94 }
95 };
96 drop(_guard);
97
98 let marshal_span = debug_span!("marshal");
100 let _guard = marshal_span.enter();
101 *output = marshaller.marshal_flow_change(&output_change);
102 drop(_guard);
103
104 0 }
106 }));
107
108 result.unwrap_or_else(|e| {
109 warn!(?e, "Panic in ffi_apply");
110 -99
111 })
112}
113
114#[instrument(name = "flow::operator::ffi::get_rows", level = "debug", skip_all, fields(
115 operator_type = std::any::type_name::<O>(),
116 row_count = count,
117 rows_returned
118))]
119pub extern "C" fn ffi_get_rows<O: FFIOperator>(
120 instance: *mut c_void,
121 ctx: *mut FFIContext,
122 row_numbers: *const u64,
123 count: usize,
124 output: *mut RowsFFI,
125) -> i32 {
126 let result = catch_unwind(AssertUnwindSafe(|| {
127 unsafe {
128 let wrapper = OperatorWrapper::<O>::from_ptr(instance);
129 let mut operator = match wrapper.operator.lock() {
130 Ok(op) => op,
131 Err(_) => {
132 warn!("Failed to lock operator");
133 return -1;
134 }
135 };
136
137 let mut marshaller = wrapper.marshaller.borrow_mut();
138 marshaller.clear();
139
140 let numbers: Vec<RowNumber> = if !row_numbers.is_null() && count > 0 {
142 std::slice::from_raw_parts(row_numbers, count)
143 .iter()
144 .map(|&n| RowNumber::from(n))
145 .collect()
146 } else {
147 Vec::new()
148 };
149
150 let mut op_ctx = OperatorContext::new(ctx);
152
153 let rows = match operator.get_rows(&mut op_ctx, &numbers) {
155 Ok(rows) => {
156 tracing::Span::current().record("rows_returned", rows.len());
157 rows
158 }
159 Err(e) => {
160 warn!(?e, "get_rows failed");
161 return -2;
162 }
163 };
164
165 *output = marshaller.marshal_rows(&rows);
166
167 0 }
169 }));
170
171 result.unwrap_or_else(|e| {
172 warn!(?e, "Panic in ffi_get_rows");
173 -99
174 })
175}
176
177pub extern "C" fn ffi_destroy<O: FFIOperator>(instance: *mut c_void) {
178 if instance.is_null() {
179 return;
180 }
181
182 let result = catch_unwind(AssertUnwindSafe(|| unsafe {
183 let _wrapper = Box::from_raw(instance as *mut OperatorWrapper<O>);
185 }));
187
188 if result.is_err() {
189 eprintln!("FFI operator panicked during destroy");
190 }
191}
192
193pub fn create_vtable<O: FFIOperator>() -> FFIOperatorVTable {
195 FFIOperatorVTable {
196 apply: ffi_apply::<O>,
197 get_rows: ffi_get_rows::<O>,
198 destroy: ffi_destroy::<O>,
199 }
200}