reifydb_sub_flow/ffi/
loader.rs

1//! FFI operator dynamic library loader
2
3use std::{
4	collections::HashMap,
5	ffi::CStr,
6	path::{Path, PathBuf},
7	sync::OnceLock,
8};
9
10use libloading::{Library, Symbol};
11use parking_lot::RwLock;
12use reifydb_core::interface::FlowNodeId;
13use reifydb_flow_operator_abi::{
14	CURRENT_API_VERSION, FFIOperatorCreateFn, FFIOperatorDescriptor, FFIOperatorMagicFn, OPERATOR_MAGIC,
15};
16use reifydb_flow_operator_sdk::{FFIError, Result as FFIResult};
17
18use crate::operator::FFIOperator;
19
20/// Global singleton FFI operator loader
21/// Ensures libraries stay loaded for the entire process lifetime
22static GLOBAL_FFI_OPERATOR_LOADER: OnceLock<RwLock<FFIOperatorLoader>> = OnceLock::new();
23
24/// Get the global FFI operator loader
25pub fn ffi_operator_loader() -> &'static RwLock<FFIOperatorLoader> {
26	GLOBAL_FFI_OPERATOR_LOADER.get_or_init(|| RwLock::new(FFIOperatorLoader::new()))
27}
28
29/// FFI operator loader for dynamic libraries
30/// This is meant to be used as a singleton via get_global_loader()
31pub struct FFIOperatorLoader {
32	/// Loaded libraries mapped by path
33	loaded_libraries: HashMap<PathBuf, Library>,
34
35	/// Map of operator names to library paths for quick lookup
36	operator_paths: HashMap<String, PathBuf>,
37}
38
39impl FFIOperatorLoader {
40	/// Create a new FFI operator loader
41	fn new() -> Self {
42		Self {
43			loaded_libraries: HashMap::new(),
44			operator_paths: HashMap::new(),
45		}
46	}
47
48	pub fn load_operator_library(&mut self, path: &Path) -> FFIResult<bool> {
49		// Load the library if not already loaded
50		if !self.loaded_libraries.contains_key(path) {
51			let lib = unsafe {
52				Library::new(path).map_err(|e| {
53					FFIError::Other(format!("Failed to load library {}: {}", path.display(), e))
54				})?
55			};
56			self.loaded_libraries.insert(path.to_path_buf(), lib);
57		}
58
59		let library = self.loaded_libraries.get(path).unwrap();
60
61		// Check for magic symbol
62		let magic_result: Result<Symbol<FFIOperatorMagicFn>, _> =
63			unsafe { library.get(b"ffi_operator_magic\0") };
64
65		match magic_result {
66			Ok(magic_fn) => {
67				let magic = magic_fn();
68				Ok(magic == OPERATOR_MAGIC)
69			}
70			Err(_) => {
71				// Symbol not found - not an operator, remove from cache
72				self.loaded_libraries.remove(path);
73				Ok(false)
74			}
75		}
76	}
77
78	/// Get the operator descriptor from a loaded library
79	fn get_descriptor(&self, library: &Library) -> FFIResult<FFIOperatorDescriptor> {
80		unsafe {
81			let get_descriptor: Symbol<extern "C" fn() -> *const FFIOperatorDescriptor> =
82				library.get(b"ffi_operator_get_descriptor\0").map_err(|e| {
83					FFIError::Other(format!("Failed to find ffi_operator_get_descriptor: {}", e))
84				})?;
85
86			let descriptor_ptr = get_descriptor();
87			if descriptor_ptr.is_null() {
88				return Err(FFIError::Other("Descriptor is null".to_string()));
89			}
90
91			// Copy the descriptor fields
92			Ok(FFIOperatorDescriptor {
93				api_version: (*descriptor_ptr).api_version,
94				operator_name: (*descriptor_ptr).operator_name,
95				vtable: (*descriptor_ptr).vtable,
96			})
97		}
98	}
99
100	/// Validate descriptor and register operator name mapping
101	/// Returns the operator name and API version
102	fn validate_and_register(
103		&mut self,
104		descriptor: &FFIOperatorDescriptor,
105		path: &Path,
106	) -> FFIResult<(String, u32)> {
107		// Verify API version
108		if descriptor.api_version != CURRENT_API_VERSION {
109			return Err(FFIError::Other(format!(
110				"API version mismatch: expected {}, got {}",
111				CURRENT_API_VERSION, descriptor.api_version
112			)));
113		}
114
115		// Extract operator name
116		let operator_name = unsafe { CStr::from_ptr(descriptor.operator_name).to_str().unwrap().to_string() };
117
118		// Store operator name -> path mapping
119		self.operator_paths.insert(operator_name.clone(), path.to_path_buf());
120
121		Ok((operator_name, descriptor.api_version))
122	}
123
124	/// Register an operator library without instantiating it
125	///
126	/// This loads the library, validates it as an operator, and extracts metadata
127	/// without creating an operator instance. Use this for discovery/registration.
128	///
129	/// # Arguments
130	/// * `path` - Path to the shared library file
131	///
132	/// # Returns
133	/// * `Ok(Some((name, api_version)))` - Successfully registered operator
134	/// * `Ok(None)` - Library is not a valid FFI operator (silently skipped)
135	/// * `Err(FFIError)` - Loading or validation failed
136	pub fn register_operator(&mut self, path: &Path) -> FFIResult<Option<(String, u32)>> {
137		if !self.load_operator_library(path)? {
138			return Ok(None);
139		}
140
141		let library = self.loaded_libraries.get(path).unwrap();
142		let descriptor = self.get_descriptor(library)?;
143		let (operator_name, api_version) = self.validate_and_register(&descriptor, path)?;
144
145		Ok(Some((operator_name, api_version)))
146	}
147
148	/// Load an operator from a dynamic library
149	///
150	/// # Arguments
151	/// * `path` - Path to the shared library file
152	/// * `config` - Operator configuration data
153	/// * `operator_id` - ID for this operator instance
154	///
155	/// # Returns
156	/// * `Ok(Some(FFIOperator))` - Successfully loaded operator
157	/// * `Ok(None)` - Library is not a valid FFI operator (silently skipped)
158	/// * `Err(FFIError)` - Loading or initialization failed
159	pub fn load_operator(
160		&mut self,
161		path: &Path,
162		config: &[u8],
163		operator_id: FlowNodeId,
164	) -> FFIResult<Option<FFIOperator>> {
165		if !self.load_operator_library(path)? {
166			return Ok(None);
167		}
168
169		// Get descriptor and validate - done in separate scope to avoid borrow conflicts
170		let descriptor = {
171			let library = self.loaded_libraries.get(path).unwrap();
172			self.get_descriptor(library)?
173		};
174
175		self.validate_and_register(&descriptor, path)?;
176
177		// Get the create function and instantiate operator
178		let library = self.loaded_libraries.get(path).unwrap();
179		let create_fn: FFIOperatorCreateFn = unsafe {
180			let create_symbol: Symbol<FFIOperatorCreateFn> = library
181				.get(b"ffi_operator_create\0")
182				.map_err(|e| FFIError::Other(format!("Failed to find ffi_operator_create: {}", e)))?;
183
184			*create_symbol
185		};
186
187		// Create the operator instance
188		let instance = create_fn(config.as_ptr(), config.len(), operator_id.0);
189		if instance.is_null() {
190			return Err(FFIError::Other("Failed to create operator instance".to_string()));
191		}
192
193		// Create the FFI operator wrapper
194		// Library stays loaded via global cache and loader reference
195		Ok(Some(FFIOperator::new(descriptor, instance, operator_id)))
196	}
197
198	/// Create an operator instance from an already loaded library by name
199	///
200	/// # Arguments
201	/// * `operator_name` - Name of the operator type
202	/// * `operator_id` - Node ID for this operator instance
203	/// * `config` - Configuration data for the operator
204	///
205	/// # Returns
206	/// * `Ok(FFIOperator)` - Successfully created operator
207	/// * `Err(FFIError)` - Creation failed
208	pub fn create_operator_by_name(
209		&mut self,
210		operator_name: &str,
211		operator_id: FlowNodeId,
212		config: &[u8],
213	) -> FFIResult<FFIOperator> {
214		let path = self
215			.operator_paths
216			.get(operator_name)
217			.ok_or_else(|| FFIError::Other(format!("Operator not found: {}", operator_name)))?
218			.clone();
219
220		// Load operator from the known path
221		// Since this operator was previously registered, it should always be valid
222		self.load_operator(&path, config, operator_id)?
223			.ok_or_else(|| FFIError::Other(format!("Operator library no longer valid: {}", operator_name)))
224	}
225
226	/// Check if an operator name is registered
227	pub fn has_operator(&self, operator_name: &str) -> bool {
228		self.operator_paths.contains_key(operator_name)
229	}
230
231	/// Unload a library
232	///
233	/// # Arguments
234	/// * `path` - Path to the library to unload
235	///
236	/// # Safety
237	/// This will invalidate any operators created from this library.
238	/// Ensure all operators from this library are destroyed first.
239	pub fn unload_library(&mut self, path: &Path) -> FFIResult<()> {
240		if self.loaded_libraries.remove(path).is_some() {
241			Ok(())
242		} else {
243			Err(FFIError::Other(format!("Library not loaded: {}", path.display())))
244		}
245	}
246
247	/// Get the number of loaded libraries
248	pub fn loaded_count(&self) -> usize {
249		self.loaded_libraries.len()
250	}
251
252	/// Check if a library is loaded
253	pub fn is_loaded(&self, path: &Path) -> bool {
254		self.loaded_libraries.contains_key(path)
255	}
256
257	/// List all loaded operators with their metadata
258	///
259	/// Returns a vector of tuples containing:
260	/// - Operator name
261	/// - Library path
262	/// - API version
263	pub fn list_loaded_operators(&self) -> Vec<(String, PathBuf, u32)> {
264		let mut operators = Vec::new();
265
266		for (path, library) in &self.loaded_libraries {
267			// Get the operator descriptor from the library
268			unsafe {
269				let get_descriptor: Result<Symbol<extern "C" fn() -> *const FFIOperatorDescriptor>, _> =
270					library.get(b"ffi_operator_get_descriptor\0");
271
272				if let Ok(get_descriptor) = get_descriptor {
273					let descriptor_ptr = get_descriptor();
274					if !descriptor_ptr.is_null() {
275						let descriptor = &*descriptor_ptr;
276
277						// Extract operator name
278						let operator_name = CStr::from_ptr(descriptor.operator_name)
279							.to_str()
280							.unwrap_or("<invalid UTF-8>")
281							.to_string();
282
283						operators.push((operator_name, path.clone(), descriptor.api_version));
284					}
285				}
286			}
287		}
288
289		operators
290	}
291}
292
293impl Default for FFIOperatorLoader {
294	fn default() -> Self {
295		Self::new()
296	}
297}
298
299impl Drop for FFIOperatorLoader {
300	fn drop(&mut self) {
301		// Libraries will be automatically unloaded when dropped
302		self.loaded_libraries.clear();
303	}
304}