reifydb_sub_flow/ffi/
loader.rs1use std::{
4 collections::HashMap,
5 ffi::CStr,
6 path::{Path, PathBuf},
7 sync::OnceLock,
8};
9
10use libloading::{Library, Symbol};
11use parking_lot::RwLock;
12use reifydb_core::interface::FlowNodeId;
13use reifydb_flow_operator_abi::{CURRENT_API_VERSION, FFIOperatorCreateFn, FFIOperatorDescriptor};
14use reifydb_flow_operator_sdk::{FFIError, Result as FFIResult};
15
16use crate::operator::FFIOperator;
17
18static GLOBAL_FFI_OPERATOR_LOADER: OnceLock<RwLock<FFIOperatorLoader>> = OnceLock::new();
21
22pub fn ffi_operator_loader() -> &'static RwLock<FFIOperatorLoader> {
24 GLOBAL_FFI_OPERATOR_LOADER.get_or_init(|| RwLock::new(FFIOperatorLoader::new()))
25}
26
27pub struct FFIOperatorLoader {
30 loaded_libraries: HashMap<PathBuf, Library>,
32
33 operator_paths: HashMap<String, PathBuf>,
35}
36
37impl FFIOperatorLoader {
38 fn new() -> Self {
40 Self {
41 loaded_libraries: HashMap::new(),
42 operator_paths: HashMap::new(),
43 }
44 }
45
46 pub fn load_operator(&mut self, path: &Path, config: &[u8], operator_id: FlowNodeId) -> FFIResult<FFIOperator> {
57 if !self.loaded_libraries.contains_key(path) {
59 let lib = unsafe {
61 Library::new(path).map_err(|e| {
62 FFIError::Other(format!("Failed to load library {}: {}", path.display(), e))
63 })?
64 };
65
66 self.loaded_libraries.insert(path.to_path_buf(), lib);
67 };
68
69 let library = self.loaded_libraries.get(path).unwrap();
70
71 let descriptor = unsafe {
73 let get_descriptor: Symbol<extern "C" fn() -> *const FFIOperatorDescriptor> =
74 library.get(b"ffi_operator_get_descriptor\0").map_err(|e| {
75 FFIError::Other(format!("Failed to find ffi_operator_get_descriptor: {}", e))
76 })?;
77
78 let descriptor_ptr = get_descriptor();
79 if descriptor_ptr.is_null() {
80 return Err(FFIError::Other("Descriptor is null".to_string()));
81 }
82
83 FFIOperatorDescriptor {
85 api_version: (*descriptor_ptr).api_version,
86 operator_name: (*descriptor_ptr).operator_name,
87 vtable: (*descriptor_ptr).vtable,
88 }
89 };
90
91 let operator_name = unsafe { CStr::from_ptr(descriptor.operator_name).to_str().unwrap().to_string() };
93 self.operator_paths.insert(operator_name.clone(), path.to_path_buf());
94
95 if descriptor.api_version != CURRENT_API_VERSION {
97 return Err(FFIError::Other(format!(
98 "API version mismatch: expected {}, got {}",
99 CURRENT_API_VERSION, descriptor.api_version
100 )));
101 }
102
103 let create_fn: FFIOperatorCreateFn = unsafe {
105 let create_symbol: Symbol<FFIOperatorCreateFn> = library
106 .get(b"ffi_operator_create\0")
107 .map_err(|e| FFIError::Other(format!("Failed to find ffi_operator_create: {}", e)))?;
108
109 *create_symbol
110 };
111
112 let instance = create_fn(config.as_ptr(), config.len(), operator_id.0);
114 if instance.is_null() {
115 return Err(FFIError::Other("Failed to create operator instance".to_string()));
116 }
117
118 Ok(FFIOperator::new(descriptor, instance, operator_id))
121 }
122
123 pub fn create_operator_by_name(
134 &mut self,
135 operator_name: &str,
136 operator_id: FlowNodeId,
137 config: &[u8],
138 ) -> FFIResult<FFIOperator> {
139 let path = self
141 .operator_paths
142 .get(operator_name)
143 .ok_or_else(|| FFIError::Other(format!("Operator not found: {}", operator_name)))?
144 .clone();
145
146 self.load_operator(&path, config, operator_id)
148 }
149
150 pub fn has_operator(&self, operator_name: &str) -> bool {
152 self.operator_paths.contains_key(operator_name)
153 }
154
155 pub fn unload_library(&mut self, path: &Path) -> FFIResult<()> {
164 if self.loaded_libraries.remove(path).is_some() {
165 Ok(())
166 } else {
167 Err(FFIError::Other(format!("Library not loaded: {}", path.display())))
168 }
169 }
170
171 pub fn loaded_count(&self) -> usize {
173 self.loaded_libraries.len()
174 }
175
176 pub fn is_loaded(&self, path: &Path) -> bool {
178 self.loaded_libraries.contains_key(path)
179 }
180
181 pub fn list_loaded_operators(&self) -> Vec<(String, PathBuf, u32)> {
188 let mut operators = Vec::new();
189
190 for (path, library) in &self.loaded_libraries {
191 unsafe {
193 let get_descriptor: Result<Symbol<extern "C" fn() -> *const FFIOperatorDescriptor>, _> =
194 library.get(b"ffi_operator_get_descriptor\0");
195
196 if let Ok(get_descriptor) = get_descriptor {
197 let descriptor_ptr = get_descriptor();
198 if !descriptor_ptr.is_null() {
199 let descriptor = &*descriptor_ptr;
200
201 let operator_name = CStr::from_ptr(descriptor.operator_name)
203 .to_str()
204 .unwrap_or("<invalid UTF-8>")
205 .to_string();
206
207 operators.push((operator_name, path.clone(), descriptor.api_version));
208 }
209 }
210 }
211 }
212
213 operators
214 }
215}
216
217impl Default for FFIOperatorLoader {
218 fn default() -> Self {
219 Self::new()
220 }
221}
222
223impl Drop for FFIOperatorLoader {
224 fn drop(&mut self) {
225 self.loaded_libraries.clear();
227 }
228}