1use std::{
4 collections::HashMap,
5 path::{Path, PathBuf},
6 sync::{OnceLock, RwLock},
7};
8
9use libloading::{Library, Symbol};
10use reifydb_core::interface::FlowNodeId;
11use reifydb_flow_operator_abi::{
12 BufferFFI, CURRENT_API, FFIOperatorColumnDefs, FFIOperatorCreateFn, FFIOperatorDescriptor, FFIOperatorMagicFn,
13 OPERATOR_MAGIC,
14};
15use reifydb_flow_operator_sdk::{FFIError, Result as FFIResult};
16
17use crate::operator::FFIOperator;
18
19unsafe fn buffer_to_string(buffer: &BufferFFI) -> String {
24 if buffer.ptr.is_null() || buffer.len == 0 {
25 return String::new();
26 }
27 let slice = unsafe { std::slice::from_raw_parts(buffer.ptr, buffer.len) };
29 std::str::from_utf8(slice).unwrap_or("<invalid UTF-8>").to_string()
30}
31
32static GLOBAL_FFI_OPERATOR_LOADER: OnceLock<RwLock<FFIOperatorLoader>> = OnceLock::new();
35
36pub fn ffi_operator_loader() -> &'static RwLock<FFIOperatorLoader> {
38 GLOBAL_FFI_OPERATOR_LOADER.get_or_init(|| RwLock::new(FFIOperatorLoader::new()))
39}
40
41pub struct FFIOperatorLoader {
44 loaded_libraries: HashMap<PathBuf, Library>,
46
47 operator_paths: HashMap<String, PathBuf>,
49}
50
51impl FFIOperatorLoader {
52 fn new() -> Self {
54 Self {
55 loaded_libraries: HashMap::new(),
56 operator_paths: HashMap::new(),
57 }
58 }
59
60 pub fn load_operator_library(&mut self, path: &Path) -> FFIResult<bool> {
61 if !self.loaded_libraries.contains_key(path) {
63 let lib = unsafe {
64 Library::new(path).map_err(|e| {
65 FFIError::Other(format!("Failed to load library {}: {}", path.display(), e))
66 })?
67 };
68 self.loaded_libraries.insert(path.to_path_buf(), lib);
69 }
70
71 let library = self.loaded_libraries.get(path).unwrap();
72
73 let magic_result: Result<Symbol<FFIOperatorMagicFn>, _> =
75 unsafe { library.get(b"ffi_operator_magic\0") };
76
77 match magic_result {
78 Ok(magic_fn) => {
79 let magic = magic_fn();
80 Ok(magic == OPERATOR_MAGIC)
81 }
82 Err(_) => {
83 self.loaded_libraries.remove(path);
85 Ok(false)
86 }
87 }
88 }
89
90 fn get_descriptor(&self, library: &Library) -> FFIResult<FFIOperatorDescriptor> {
92 unsafe {
93 let get_descriptor: Symbol<extern "C" fn() -> *const FFIOperatorDescriptor> =
94 library.get(b"ffi_operator_get_descriptor\0").map_err(|e| {
95 FFIError::Other(format!("Failed to find ffi_operator_get_descriptor: {}", e))
96 })?;
97
98 let descriptor_ptr = get_descriptor();
99 if descriptor_ptr.is_null() {
100 return Err(FFIError::Other("Descriptor is null".to_string()));
101 }
102
103 Ok(FFIOperatorDescriptor {
105 api: (*descriptor_ptr).api,
106 operator: (*descriptor_ptr).operator,
107 version: (*descriptor_ptr).version,
108 description: (*descriptor_ptr).description,
109 input_columns: (*descriptor_ptr).input_columns,
110 output_columns: (*descriptor_ptr).output_columns,
111 capabilities: (*descriptor_ptr).capabilities,
112 vtable: (*descriptor_ptr).vtable,
113 })
114 }
115 }
116
117 fn validate_and_register(
120 &mut self,
121 descriptor: &FFIOperatorDescriptor,
122 path: &Path,
123 ) -> FFIResult<(String, u32)> {
124 if descriptor.api != CURRENT_API {
126 return Err(FFIError::Other(format!(
127 "API version mismatch: expected {}, got {}",
128 CURRENT_API, descriptor.api
129 )));
130 }
131
132 let operator = unsafe { buffer_to_string(&descriptor.operator) };
134
135 self.operator_paths.insert(operator.clone(), path.to_path_buf());
137
138 Ok((operator, descriptor.api))
139 }
140
141 pub fn register_operator(&mut self, path: &Path) -> FFIResult<Option<LoadedOperatorInfo>> {
154 if !self.load_operator_library(path)? {
155 return Ok(None);
156 }
157
158 let library = self.loaded_libraries.get(path).unwrap();
159 let descriptor = self.get_descriptor(library)?;
160 let (operator, api) = self.validate_and_register(&descriptor, path)?;
161
162 let info = unsafe {
164 LoadedOperatorInfo {
165 operator,
166 library_path: path.to_path_buf(),
167 api,
168 version: buffer_to_string(&descriptor.version),
169 description: buffer_to_string(&descriptor.description),
170 input_columns: extract_column_defs(&descriptor.input_columns),
171 output_columns: extract_column_defs(&descriptor.output_columns),
172 capabilities: descriptor.capabilities,
173 }
174 };
175
176 Ok(Some(info))
177 }
178
179 pub fn load_operator(
191 &mut self,
192 path: &Path,
193 config: &[u8],
194 operator_id: FlowNodeId,
195 ) -> FFIResult<Option<FFIOperator>> {
196 if !self.load_operator_library(path)? {
197 return Ok(None);
198 }
199
200 let descriptor = {
202 let library = self.loaded_libraries.get(path).unwrap();
203 self.get_descriptor(library)?
204 };
205
206 self.validate_and_register(&descriptor, path)?;
207
208 let library = self.loaded_libraries.get(path).unwrap();
210 let create_fn: FFIOperatorCreateFn = unsafe {
211 let create_symbol: Symbol<FFIOperatorCreateFn> = library
212 .get(b"ffi_operator_create\0")
213 .map_err(|e| FFIError::Other(format!("Failed to find ffi_operator_create: {}", e)))?;
214
215 *create_symbol
216 };
217
218 let instance = create_fn(config.as_ptr(), config.len(), operator_id.0);
220 if instance.is_null() {
221 return Err(FFIError::Other("Failed to create operator instance".to_string()));
222 }
223
224 Ok(Some(FFIOperator::new(descriptor, instance, operator_id)))
227 }
228
229 pub fn create_operator_by_name(
240 &mut self,
241 operator: &str,
242 operator_id: FlowNodeId,
243 config: &[u8],
244 ) -> FFIResult<FFIOperator> {
245 let path = self
246 .operator_paths
247 .get(operator)
248 .ok_or_else(|| FFIError::Other(format!("Operator not found: {}", operator)))?
249 .clone();
250
251 self.load_operator(&path, config, operator_id)?
254 .ok_or_else(|| FFIError::Other(format!("Operator library no longer valid: {}", operator)))
255 }
256
257 pub fn has_operator(&self, operator: &str) -> bool {
259 self.operator_paths.contains_key(operator)
260 }
261
262 pub fn unload_library(&mut self, path: &Path) -> FFIResult<()> {
271 if self.loaded_libraries.remove(path).is_some() {
272 Ok(())
273 } else {
274 Err(FFIError::Other(format!("Library not loaded: {}", path.display())))
275 }
276 }
277
278 pub fn loaded_count(&self) -> usize {
280 self.loaded_libraries.len()
281 }
282
283 pub fn is_loaded(&self, path: &Path) -> bool {
285 self.loaded_libraries.contains_key(path)
286 }
287
288 pub fn list_loaded_operators(&self) -> Vec<LoadedOperatorInfo> {
290 let mut operators = Vec::new();
291
292 for (path, library) in &self.loaded_libraries {
293 unsafe {
295 let get_descriptor: Result<Symbol<extern "C" fn() -> *const FFIOperatorDescriptor>, _> =
296 library.get(b"ffi_operator_get_descriptor\0");
297
298 if let Ok(get_descriptor) = get_descriptor {
299 let descriptor_ptr = get_descriptor();
300 if !descriptor_ptr.is_null() {
301 let descriptor = &*descriptor_ptr;
302
303 operators.push(LoadedOperatorInfo {
304 operator: buffer_to_string(&descriptor.operator),
305 library_path: path.clone(),
306 api: descriptor.api,
307 version: buffer_to_string(&descriptor.version),
308 description: buffer_to_string(&descriptor.description),
309 input_columns: extract_column_defs(&descriptor.input_columns),
310 output_columns: extract_column_defs(&descriptor.output_columns),
311 capabilities: descriptor.capabilities,
312 });
313 }
314 }
315 }
316 }
317
318 operators
319 }
320}
321
322#[derive(Debug, Clone)]
324pub struct LoadedOperatorInfo {
325 pub operator: String,
326 pub library_path: PathBuf,
327 pub api: u32,
328 pub version: String,
329 pub description: String,
330 pub input_columns: Vec<ColumnDefInfo>,
331 pub output_columns: Vec<ColumnDefInfo>,
332 pub capabilities: u32,
333}
334
335#[derive(Debug, Clone)]
337pub struct ColumnDefInfo {
338 pub name: String,
339 pub field_type: reifydb_type::TypeConstraint,
340 pub description: String,
341}
342
343unsafe fn extract_column_defs(column_defs: &FFIOperatorColumnDefs) -> Vec<ColumnDefInfo> {
348 use reifydb_type::{FFITypeConstraint, TypeConstraint};
349
350 if column_defs.columns.is_null() || column_defs.column_count == 0 {
351 return Vec::new();
352 }
353
354 let mut columns = Vec::with_capacity(column_defs.column_count);
355 for i in 0..column_defs.column_count {
356 let col = unsafe { &*column_defs.columns.add(i) };
358
359 let field_type = TypeConstraint::from_ffi(FFITypeConstraint {
361 base_type: col.base_type,
362 constraint_type: col.constraint_type,
363 constraint_param1: col.constraint_param1,
364 constraint_param2: col.constraint_param2,
365 });
366
367 columns.push(ColumnDefInfo {
368 name: unsafe { buffer_to_string(&col.name) },
370 field_type,
371 description: unsafe { buffer_to_string(&col.description) },
372 });
373 }
374
375 columns
376}
377
378impl Default for FFIOperatorLoader {
379 fn default() -> Self {
380 Self::new()
381 }
382}
383
384impl Drop for FFIOperatorLoader {
385 fn drop(&mut self) {
386 self.loaded_libraries.clear();
388 }
389}