reifydb_sub_flow/ffi/
loader.rs1use std::{
4 collections::HashMap,
5 ffi::CStr,
6 path::{Path, PathBuf},
7 sync::OnceLock,
8};
9
10use libloading::{Library, Symbol};
11use parking_lot::RwLock;
12use reifydb_core::interface::FlowNodeId;
13use reifydb_flow_operator_abi::{
14 CURRENT_API_VERSION, FFIOperatorCreateFn, FFIOperatorDescriptor, FFIOperatorMagicFn, OPERATOR_MAGIC,
15};
16use reifydb_flow_operator_sdk::{FFIError, Result as FFIResult};
17
18use crate::operator::FFIOperator;
19
20static GLOBAL_FFI_OPERATOR_LOADER: OnceLock<RwLock<FFIOperatorLoader>> = OnceLock::new();
23
24pub fn ffi_operator_loader() -> &'static RwLock<FFIOperatorLoader> {
26 GLOBAL_FFI_OPERATOR_LOADER.get_or_init(|| RwLock::new(FFIOperatorLoader::new()))
27}
28
29pub struct FFIOperatorLoader {
32 loaded_libraries: HashMap<PathBuf, Library>,
34
35 operator_paths: HashMap<String, PathBuf>,
37}
38
39impl FFIOperatorLoader {
40 fn new() -> Self {
42 Self {
43 loaded_libraries: HashMap::new(),
44 operator_paths: HashMap::new(),
45 }
46 }
47
48 pub fn load_operator_library(&mut self, path: &Path) -> FFIResult<bool> {
49 if !self.loaded_libraries.contains_key(path) {
51 let lib = unsafe {
52 Library::new(path).map_err(|e| {
53 FFIError::Other(format!("Failed to load library {}: {}", path.display(), e))
54 })?
55 };
56 self.loaded_libraries.insert(path.to_path_buf(), lib);
57 }
58
59 let library = self.loaded_libraries.get(path).unwrap();
60
61 let magic_result: Result<Symbol<FFIOperatorMagicFn>, _> =
63 unsafe { library.get(b"ffi_operator_magic\0") };
64
65 match magic_result {
66 Ok(magic_fn) => {
67 let magic = magic_fn();
68 Ok(magic == OPERATOR_MAGIC)
69 }
70 Err(_) => {
71 self.loaded_libraries.remove(path);
73 Ok(false)
74 }
75 }
76 }
77
78 fn get_descriptor(&self, library: &Library) -> FFIResult<FFIOperatorDescriptor> {
80 unsafe {
81 let get_descriptor: Symbol<extern "C" fn() -> *const FFIOperatorDescriptor> =
82 library.get(b"ffi_operator_get_descriptor\0").map_err(|e| {
83 FFIError::Other(format!("Failed to find ffi_operator_get_descriptor: {}", e))
84 })?;
85
86 let descriptor_ptr = get_descriptor();
87 if descriptor_ptr.is_null() {
88 return Err(FFIError::Other("Descriptor is null".to_string()));
89 }
90
91 Ok(FFIOperatorDescriptor {
93 api_version: (*descriptor_ptr).api_version,
94 operator_name: (*descriptor_ptr).operator_name,
95 vtable: (*descriptor_ptr).vtable,
96 })
97 }
98 }
99
100 fn validate_and_register(
103 &mut self,
104 descriptor: &FFIOperatorDescriptor,
105 path: &Path,
106 ) -> FFIResult<(String, u32)> {
107 if descriptor.api_version != CURRENT_API_VERSION {
109 return Err(FFIError::Other(format!(
110 "API version mismatch: expected {}, got {}",
111 CURRENT_API_VERSION, descriptor.api_version
112 )));
113 }
114
115 let operator_name = unsafe { CStr::from_ptr(descriptor.operator_name).to_str().unwrap().to_string() };
117
118 self.operator_paths.insert(operator_name.clone(), path.to_path_buf());
120
121 Ok((operator_name, descriptor.api_version))
122 }
123
124 pub fn register_operator(&mut self, path: &Path) -> FFIResult<Option<(String, u32)>> {
137 if !self.load_operator_library(path)? {
138 return Ok(None);
139 }
140
141 let library = self.loaded_libraries.get(path).unwrap();
142 let descriptor = self.get_descriptor(library)?;
143 let (operator_name, api_version) = self.validate_and_register(&descriptor, path)?;
144
145 Ok(Some((operator_name, api_version)))
146 }
147
148 pub fn load_operator(
160 &mut self,
161 path: &Path,
162 config: &[u8],
163 operator_id: FlowNodeId,
164 ) -> FFIResult<Option<FFIOperator>> {
165 if !self.load_operator_library(path)? {
166 return Ok(None);
167 }
168
169 let descriptor = {
171 let library = self.loaded_libraries.get(path).unwrap();
172 self.get_descriptor(library)?
173 };
174
175 self.validate_and_register(&descriptor, path)?;
176
177 let library = self.loaded_libraries.get(path).unwrap();
179 let create_fn: FFIOperatorCreateFn = unsafe {
180 let create_symbol: Symbol<FFIOperatorCreateFn> = library
181 .get(b"ffi_operator_create\0")
182 .map_err(|e| FFIError::Other(format!("Failed to find ffi_operator_create: {}", e)))?;
183
184 *create_symbol
185 };
186
187 let instance = create_fn(config.as_ptr(), config.len(), operator_id.0);
189 if instance.is_null() {
190 return Err(FFIError::Other("Failed to create operator instance".to_string()));
191 }
192
193 Ok(Some(FFIOperator::new(descriptor, instance, operator_id)))
196 }
197
198 pub fn create_operator_by_name(
209 &mut self,
210 operator_name: &str,
211 operator_id: FlowNodeId,
212 config: &[u8],
213 ) -> FFIResult<FFIOperator> {
214 let path = self
215 .operator_paths
216 .get(operator_name)
217 .ok_or_else(|| FFIError::Other(format!("Operator not found: {}", operator_name)))?
218 .clone();
219
220 self.load_operator(&path, config, operator_id)?
223 .ok_or_else(|| FFIError::Other(format!("Operator library no longer valid: {}", operator_name)))
224 }
225
226 pub fn has_operator(&self, operator_name: &str) -> bool {
228 self.operator_paths.contains_key(operator_name)
229 }
230
231 pub fn unload_library(&mut self, path: &Path) -> FFIResult<()> {
240 if self.loaded_libraries.remove(path).is_some() {
241 Ok(())
242 } else {
243 Err(FFIError::Other(format!("Library not loaded: {}", path.display())))
244 }
245 }
246
247 pub fn loaded_count(&self) -> usize {
249 self.loaded_libraries.len()
250 }
251
252 pub fn is_loaded(&self, path: &Path) -> bool {
254 self.loaded_libraries.contains_key(path)
255 }
256
257 pub fn list_loaded_operators(&self) -> Vec<(String, PathBuf, u32)> {
264 let mut operators = Vec::new();
265
266 for (path, library) in &self.loaded_libraries {
267 unsafe {
269 let get_descriptor: Result<Symbol<extern "C" fn() -> *const FFIOperatorDescriptor>, _> =
270 library.get(b"ffi_operator_get_descriptor\0");
271
272 if let Ok(get_descriptor) = get_descriptor {
273 let descriptor_ptr = get_descriptor();
274 if !descriptor_ptr.is_null() {
275 let descriptor = &*descriptor_ptr;
276
277 let operator_name = CStr::from_ptr(descriptor.operator_name)
279 .to_str()
280 .unwrap_or("<invalid UTF-8>")
281 .to_string();
282
283 operators.push((operator_name, path.clone(), descriptor.api_version));
284 }
285 }
286 }
287 }
288
289 operators
290 }
291}
292
293impl Default for FFIOperatorLoader {
294 fn default() -> Self {
295 Self::new()
296 }
297}
298
299impl Drop for FFIOperatorLoader {
300 fn drop(&mut self) {
301 self.loaded_libraries.clear();
303 }
304}