reifydb_flow_operator_sdk/ffi/
wrapper.rs

1//! Wrapper that bridges Rust operators to FFI interface
2
3use std::{
4	cell::RefCell,
5	ffi::c_void,
6	panic::{AssertUnwindSafe, catch_unwind},
7	sync::Mutex,
8};
9
10use reifydb_flow_operator_abi::*;
11use reifydb_type::RowNumber;
12use tracing::{debug_span, instrument, warn};
13
14use crate::{FFIOperator, context::OperatorContext, marshal::Marshaller};
15
16/// Wrapper that adapts a Rust operator to the FFI interface
17pub struct OperatorWrapper<O: FFIOperator> {
18	operator: Mutex<O>,
19	marshaller: RefCell<Marshaller>,
20}
21
22impl<O: FFIOperator> OperatorWrapper<O> {
23	/// Create a new operator wrapper
24	pub fn new(operator: O) -> Self {
25		Self {
26			operator: Mutex::new(operator),
27			marshaller: RefCell::new(Marshaller::new()),
28		}
29	}
30
31	/// Get a pointer to this wrapper as c_void
32	pub fn as_ptr(&mut self) -> *mut c_void {
33		self as *mut _ as *mut c_void
34	}
35
36	/// Create from a raw pointer
37	pub fn from_ptr(ptr: *mut c_void) -> &'static mut Self {
38		unsafe { &mut *(ptr as *mut Self) }
39	}
40}
41
42#[instrument(name = "flow::operator::ffi::apply", level = "debug", skip_all, fields(
43	operator_type = std::any::type_name::<O>(),
44	input_diffs,
45	output_diffs
46))]
47pub extern "C" fn ffi_apply<O: FFIOperator>(
48	instance: *mut c_void,
49	ctx: *mut FFIContext,
50	input: *const FlowChangeFFI,
51	output: *mut FlowChangeFFI,
52) -> i32 {
53	let result = catch_unwind(AssertUnwindSafe(|| {
54		unsafe {
55			let wrapper = OperatorWrapper::<O>::from_ptr(instance);
56			let mut operator = match wrapper.operator.lock() {
57				Ok(op) => op,
58				Err(_) => {
59					warn!("Failed to lock operator");
60					return -1;
61				}
62			};
63
64			let mut marshaller = wrapper.marshaller.borrow_mut();
65			marshaller.clear();
66
67			// Unmarshal input using the marshaller
68			let unmarshal_span = debug_span!("unmarshal");
69			let _guard = unmarshal_span.enter();
70			let input_change = match marshaller.unmarshal_flow_change(&*input) {
71				Ok(change) => {
72					tracing::Span::current().record("input_diffs", change.diffs.len());
73					change
74				}
75				Err(e) => {
76					warn!(?e, "Unmarshal failed");
77					return -3;
78				}
79			};
80			drop(_guard);
81
82			// Create context and apply operator
83			let apply_span = debug_span!("operator_apply");
84			let _guard = apply_span.enter();
85			let mut op_ctx = OperatorContext::new(ctx);
86			let output_change = match operator.apply(&mut op_ctx, input_change) {
87				Ok(change) => {
88					tracing::Span::current().record("output_diffs", change.diffs.len());
89					change
90				}
91				Err(e) => {
92					warn!(?e, "Apply failed");
93					return -2;
94				}
95			};
96			drop(_guard);
97
98			// Marshal output
99			let marshal_span = debug_span!("marshal");
100			let _guard = marshal_span.enter();
101			*output = marshaller.marshal_flow_change(&output_change);
102			drop(_guard);
103
104			0 // Success
105		}
106	}));
107
108	result.unwrap_or_else(|e| {
109		warn!(?e, "Panic in ffi_apply");
110		-99
111	})
112}
113
114#[instrument(name = "flow::operator::ffi::get_rows", level = "debug", skip_all, fields(
115	operator_type = std::any::type_name::<O>(),
116	row_count = count,
117	rows_returned
118))]
119pub extern "C" fn ffi_get_rows<O: FFIOperator>(
120	instance: *mut c_void,
121	ctx: *mut FFIContext,
122	row_numbers: *const u64,
123	count: usize,
124	output: *mut RowsFFI,
125) -> i32 {
126	let result = catch_unwind(AssertUnwindSafe(|| {
127		unsafe {
128			let wrapper = OperatorWrapper::<O>::from_ptr(instance);
129			let mut operator = match wrapper.operator.lock() {
130				Ok(op) => op,
131				Err(_) => {
132					warn!("Failed to lock operator");
133					return -1;
134				}
135			};
136
137			let mut marshaller = wrapper.marshaller.borrow_mut();
138			marshaller.clear();
139
140			// Convert row numbers
141			let numbers: Vec<RowNumber> = if !row_numbers.is_null() && count > 0 {
142				std::slice::from_raw_parts(row_numbers, count)
143					.iter()
144					.map(|&n| RowNumber::from(n))
145					.collect()
146			} else {
147				Vec::new()
148			};
149
150			// Create context
151			let mut op_ctx = OperatorContext::new(ctx);
152
153			// Call the operator
154			let rows = match operator.get_rows(&mut op_ctx, &numbers) {
155				Ok(rows) => {
156					tracing::Span::current().record("rows_returned", rows.len());
157					rows
158				}
159				Err(e) => {
160					warn!(?e, "get_rows failed");
161					return -2;
162				}
163			};
164
165			*output = marshaller.marshal_rows(&rows);
166
167			0 // Success
168		}
169	}));
170
171	result.unwrap_or_else(|e| {
172		warn!(?e, "Panic in ffi_get_rows");
173		-99
174	})
175}
176
177pub extern "C" fn ffi_destroy<O: FFIOperator>(instance: *mut c_void) {
178	if instance.is_null() {
179		return;
180	}
181
182	let result = catch_unwind(AssertUnwindSafe(|| unsafe {
183		// Reconstruct the Box from the raw pointer and let it drop
184		let _wrapper = Box::from_raw(instance as *mut OperatorWrapper<O>);
185		// Wrapper will be dropped here, cleaning up the operator
186	}));
187
188	if result.is_err() {
189		eprintln!("FFI operator panicked during destroy");
190	}
191}
192
193/// Create the vtable for an operator type
194pub fn create_vtable<O: FFIOperator>() -> FFIOperatorVTable {
195	FFIOperatorVTable {
196		apply: ffi_apply::<O>,
197		get_rows: ffi_get_rows::<O>,
198		destroy: ffi_destroy::<O>,
199	}
200}