1#![cfg(reifydb_target = "native")]
2use std::{
8 collections::HashMap,
9 path::{Path, PathBuf},
10 slice, str,
11 sync::{OnceLock, RwLock},
12};
13
14use libloading::{Library, Symbol};
15use reifydb_abi::{
16 constants::{CURRENT_API, OPERATOR_MAGIC},
17 data::buffer::BufferFFI,
18 operator::{
19 column::OperatorColumnDefsFFI,
20 descriptor::OperatorDescriptorFFI,
21 types::{OperatorCreateFnFFI, OperatorMagicFnFFI},
22 },
23};
24use reifydb_core::interface::catalog::flow::FlowNodeId;
25use reifydb_engine::vm::executor::Executor;
26use reifydb_sdk::error::{FFIError, Result as FFIResult};
27use reifydb_type::value::constraint::{FFITypeConstraint, TypeConstraint};
28
29use crate::operator::ffi::FFIOperator;
30
31unsafe fn buffer_to_string(buffer: &BufferFFI) -> String {
36 if buffer.ptr.is_null() || buffer.len == 0 {
37 return String::new();
38 }
39 let slice = unsafe { slice::from_raw_parts(buffer.ptr, buffer.len) };
41 str::from_utf8(slice).unwrap_or("<invalid UTF-8>").to_string()
42}
43
44static GLOBAL_FFI_OPERATOR_LOADER: OnceLock<RwLock<FFIOperatorLoader>> = OnceLock::new();
47
48pub fn ffi_operator_loader() -> &'static RwLock<FFIOperatorLoader> {
50 GLOBAL_FFI_OPERATOR_LOADER.get_or_init(|| RwLock::new(FFIOperatorLoader::new()))
51}
52
53pub struct FFIOperatorLoader {
56 loaded_libraries: HashMap<PathBuf, Library>,
58
59 operator_paths: HashMap<String, PathBuf>,
61}
62
63impl FFIOperatorLoader {
64 fn new() -> Self {
66 Self {
67 loaded_libraries: HashMap::new(),
68 operator_paths: HashMap::new(),
69 }
70 }
71
72 pub fn load_operator_library(&mut self, path: &Path) -> FFIResult<bool> {
73 if !self.loaded_libraries.contains_key(path) {
75 let lib = unsafe {
76 Library::new(path).map_err(|e| {
77 FFIError::Other(format!("Failed to load library {}: {}", path.display(), e))
78 })?
79 };
80 self.loaded_libraries.insert(path.to_path_buf(), lib);
81 }
82
83 let library = self.loaded_libraries.get(path).unwrap();
84
85 let magic_result: Result<Symbol<OperatorMagicFnFFI>, _> =
87 unsafe { library.get(b"ffi_operator_magic\0") };
88
89 match magic_result {
90 Ok(magic_fn) => {
91 let magic = magic_fn();
92 Ok(magic == OPERATOR_MAGIC)
93 }
94 Err(_) => {
95 self.loaded_libraries.remove(path);
97 Ok(false)
98 }
99 }
100 }
101
102 fn get_descriptor(&self, library: &Library) -> FFIResult<OperatorDescriptorFFI> {
104 unsafe {
105 let get_descriptor: Symbol<extern "C" fn() -> *const OperatorDescriptorFFI> =
106 library.get(b"ffi_operator_get_descriptor\0").map_err(|e| {
107 FFIError::Other(format!("Failed to find ffi_operator_get_descriptor: {}", e))
108 })?;
109
110 let descriptor_ptr = get_descriptor();
111 if descriptor_ptr.is_null() {
112 return Err(FFIError::Other("Descriptor is null".to_string()));
113 }
114
115 Ok(OperatorDescriptorFFI {
117 api: (*descriptor_ptr).api,
118 operator: (*descriptor_ptr).operator,
119 version: (*descriptor_ptr).version,
120 description: (*descriptor_ptr).description,
121 input_columns: (*descriptor_ptr).input_columns,
122 output_columns: (*descriptor_ptr).output_columns,
123 capabilities: (*descriptor_ptr).capabilities,
124 vtable: (*descriptor_ptr).vtable,
125 })
126 }
127 }
128
129 fn validate_and_register(
132 &mut self,
133 descriptor: &OperatorDescriptorFFI,
134 path: &Path,
135 ) -> FFIResult<(String, u32)> {
136 if descriptor.api != CURRENT_API {
138 return Err(FFIError::Other(format!(
139 "API version mismatch: expected {}, got {}",
140 CURRENT_API, descriptor.api
141 )));
142 }
143
144 let operator = unsafe { buffer_to_string(&descriptor.operator) };
146
147 self.operator_paths.insert(operator.clone(), path.to_path_buf());
149
150 Ok((operator, descriptor.api))
151 }
152
153 pub fn register_operator(&mut self, path: &Path) -> FFIResult<Option<LoadedOperatorInfo>> {
166 if !self.load_operator_library(path)? {
167 return Ok(None);
168 }
169
170 let library = self.loaded_libraries.get(path).unwrap();
171 let descriptor = self.get_descriptor(library)?;
172 let (operator, api) = self.validate_and_register(&descriptor, path)?;
173
174 let info = unsafe {
176 LoadedOperatorInfo {
177 operator,
178 library_path: path.to_path_buf(),
179 api,
180 version: buffer_to_string(&descriptor.version),
181 description: buffer_to_string(&descriptor.description),
182 input_columns: extract_column_defs(&descriptor.input_columns),
183 output_columns: extract_column_defs(&descriptor.output_columns),
184 capabilities: descriptor.capabilities,
185 }
186 };
187
188 Ok(Some(info))
189 }
190
191 pub fn load_operator(
203 &mut self,
204 path: &Path,
205 config: &[u8],
206 operator_id: FlowNodeId,
207 executor: Executor,
208 ) -> FFIResult<Option<FFIOperator>> {
209 if !self.load_operator_library(path)? {
210 return Ok(None);
211 }
212
213 let descriptor = {
215 let library = self.loaded_libraries.get(path).unwrap();
216 self.get_descriptor(library)?
217 };
218
219 self.validate_and_register(&descriptor, path)?;
220
221 let library = self.loaded_libraries.get(path).unwrap();
223 let create_fn: OperatorCreateFnFFI = unsafe {
224 let create_symbol: Symbol<OperatorCreateFnFFI> = library
225 .get(b"ffi_operator_create\0")
226 .map_err(|e| FFIError::Other(format!("Failed to find ffi_operator_create: {}", e)))?;
227
228 *create_symbol
229 };
230
231 let instance = create_fn(config.as_ptr(), config.len(), operator_id.0);
233 if instance.is_null() {
234 return Err(FFIError::Other("Failed to create operator instance".to_string()));
235 }
236
237 Ok(Some(FFIOperator::new(descriptor, instance, operator_id, executor)))
240 }
241
242 pub fn create_operator_by_name(
253 &mut self,
254 operator: &str,
255 operator_id: FlowNodeId,
256 config: &[u8],
257 executor: Executor,
258 ) -> FFIResult<FFIOperator> {
259 let path = self
260 .operator_paths
261 .get(operator)
262 .ok_or_else(|| FFIError::Other(format!("Operator not found: {}", operator)))?
263 .clone();
264
265 self.load_operator(&path, config, operator_id, executor)?
268 .ok_or_else(|| FFIError::Other(format!("Operator library no longer valid: {}", operator)))
269 }
270
271 pub fn has_operator(&self, operator: &str) -> bool {
273 self.operator_paths.contains_key(operator)
274 }
275
276 pub fn list_loaded_operators(&self) -> Vec<LoadedOperatorInfo> {
278 let mut operators = Vec::new();
279
280 for (path, library) in &self.loaded_libraries {
281 unsafe {
283 let get_descriptor: Result<Symbol<extern "C" fn() -> *const OperatorDescriptorFFI>, _> =
284 library.get(b"ffi_operator_get_descriptor\0");
285
286 if let Ok(get_descriptor) = get_descriptor {
287 let descriptor_ptr = get_descriptor();
288 if !descriptor_ptr.is_null() {
289 let descriptor = &*descriptor_ptr;
290
291 operators.push(LoadedOperatorInfo {
292 operator: buffer_to_string(&descriptor.operator),
293 library_path: path.clone(),
294 api: descriptor.api,
295 version: buffer_to_string(&descriptor.version),
296 description: buffer_to_string(&descriptor.description),
297 input_columns: extract_column_defs(&descriptor.input_columns),
298 output_columns: extract_column_defs(&descriptor.output_columns),
299 capabilities: descriptor.capabilities,
300 });
301 }
302 }
303 }
304 }
305
306 operators
307 }
308}
309
310#[derive(Debug, Clone)]
312pub struct LoadedOperatorInfo {
313 pub operator: String,
314 pub library_path: PathBuf,
315 pub api: u32,
316 pub version: String,
317 pub description: String,
318 pub input_columns: Vec<ColumnDefInfo>,
319 pub output_columns: Vec<ColumnDefInfo>,
320 pub capabilities: u32,
321}
322
323#[derive(Debug, Clone)]
325pub struct ColumnDefInfo {
326 pub name: String,
327 pub field_type: TypeConstraint,
328 pub description: String,
329}
330
331unsafe fn extract_column_defs(column_defs: &OperatorColumnDefsFFI) -> Vec<ColumnDefInfo> {
336 if column_defs.columns.is_null() || column_defs.column_count == 0 {
337 return Vec::new();
338 }
339
340 let mut columns = Vec::with_capacity(column_defs.column_count);
341 for i in 0..column_defs.column_count {
342 let col = unsafe { &*column_defs.columns.add(i) };
344
345 let field_type = TypeConstraint::from_ffi(FFITypeConstraint {
347 base_type: col.base_type,
348 constraint_type: col.constraint_type,
349 constraint_param1: col.constraint_param1,
350 constraint_param2: col.constraint_param2,
351 });
352
353 columns.push(ColumnDefInfo {
354 name: unsafe { buffer_to_string(&col.name) },
356 field_type,
357 description: unsafe { buffer_to_string(&col.description) },
358 });
359 }
360
361 columns
362}
363
364impl Default for FFIOperatorLoader {
365 fn default() -> Self {
366 Self::new()
367 }
368}
369
370impl Drop for FFIOperatorLoader {
371 fn drop(&mut self) {
372 self.loaded_libraries.clear();
374 }
375}