reifydb_sub_flow/ffi/
loader.rs

1//! FFI operator dynamic library loader
2
3use std::{
4	collections::HashMap,
5	ffi::CStr,
6	path::{Path, PathBuf},
7	sync::OnceLock,
8};
9
10use libloading::{Library, Symbol};
11use parking_lot::RwLock;
12use reifydb_core::interface::FlowNodeId;
13use reifydb_flow_operator_abi::{CURRENT_API_VERSION, FFIOperatorCreateFn, FFIOperatorDescriptor};
14use reifydb_flow_operator_sdk::{FFIError, Result as FFIResult};
15
16use crate::operator::FFIOperator;
17
18/// Global singleton FFI operator loader
19/// Ensures libraries stay loaded for the entire process lifetime
20static GLOBAL_FFI_OPERATOR_LOADER: OnceLock<RwLock<FFIOperatorLoader>> = OnceLock::new();
21
22/// Get the global FFI operator loader
23pub fn ffi_operator_loader() -> &'static RwLock<FFIOperatorLoader> {
24	GLOBAL_FFI_OPERATOR_LOADER.get_or_init(|| RwLock::new(FFIOperatorLoader::new()))
25}
26
27/// FFI operator loader for dynamic libraries
28/// This is meant to be used as a singleton via get_global_loader()
29pub struct FFIOperatorLoader {
30	/// Loaded libraries mapped by path
31	loaded_libraries: HashMap<PathBuf, Library>,
32
33	/// Map of operator names to library paths for quick lookup
34	operator_paths: HashMap<String, PathBuf>,
35}
36
37impl FFIOperatorLoader {
38	/// Create a new FFI operator loader
39	fn new() -> Self {
40		Self {
41			loaded_libraries: HashMap::new(),
42			operator_paths: HashMap::new(),
43		}
44	}
45
46	/// Load an operator from a dynamic library
47	///
48	/// # Arguments
49	/// * `path` - Path to the shared library file
50	/// * `config` - Operator configuration data
51	/// * `operator_id` - ID for this operator instance
52	///
53	/// # Returns
54	/// * `Ok(FFIOperator)` - Successfully loaded operator
55	/// * `Err(FFIError)` - Loading or initialization failed
56	pub fn load_operator(&mut self, path: &Path, config: &[u8], operator_id: FlowNodeId) -> FFIResult<FFIOperator> {
57		// Load the library if not already loaded
58		if !self.loaded_libraries.contains_key(path) {
59			// Load the library
60			let lib = unsafe {
61				Library::new(path).map_err(|e| {
62					FFIError::Other(format!("Failed to load library {}: {}", path.display(), e))
63				})?
64			};
65
66			self.loaded_libraries.insert(path.to_path_buf(), lib);
67		};
68
69		let library = self.loaded_libraries.get(path).unwrap();
70
71		// Get the operator descriptor
72		let descriptor = unsafe {
73			let get_descriptor: Symbol<extern "C" fn() -> *const FFIOperatorDescriptor> =
74				library.get(b"ffi_operator_get_descriptor\0").map_err(|e| {
75					FFIError::Other(format!("Failed to find ffi_operator_get_descriptor: {}", e))
76				})?;
77
78			let descriptor_ptr = get_descriptor();
79			if descriptor_ptr.is_null() {
80				return Err(FFIError::Other("Descriptor is null".to_string()));
81			}
82
83			// Copy the descriptor fields
84			FFIOperatorDescriptor {
85				api_version: (*descriptor_ptr).api_version,
86				operator_name: (*descriptor_ptr).operator_name,
87				vtable: (*descriptor_ptr).vtable,
88			}
89		};
90
91		// Store operator name -> path mapping
92		let operator_name = unsafe { CStr::from_ptr(descriptor.operator_name).to_str().unwrap().to_string() };
93		self.operator_paths.insert(operator_name.clone(), path.to_path_buf());
94
95		// Verify API version
96		if descriptor.api_version != CURRENT_API_VERSION {
97			return Err(FFIError::Other(format!(
98				"API version mismatch: expected {}, got {}",
99				CURRENT_API_VERSION, descriptor.api_version
100			)));
101		}
102
103		// Get the create function
104		let create_fn: FFIOperatorCreateFn = unsafe {
105			let create_symbol: Symbol<FFIOperatorCreateFn> = library
106				.get(b"ffi_operator_create\0")
107				.map_err(|e| FFIError::Other(format!("Failed to find ffi_operator_create: {}", e)))?;
108
109			*create_symbol
110		};
111
112		// Create the operator instance
113		let instance = create_fn(config.as_ptr(), config.len(), operator_id.0);
114		if instance.is_null() {
115			return Err(FFIError::Other("Failed to create operator instance".to_string()));
116		}
117
118		// Create the FFI operator wrapper
119		// Library stays loaded via global cache and loader reference
120		Ok(FFIOperator::new(descriptor, instance, operator_id))
121	}
122
123	/// Create an operator instance from an already loaded library by name
124	///
125	/// # Arguments
126	/// * `operator_name` - Name of the operator type
127	/// * `operator_id` - Node ID for this operator instance
128	/// * `config` - Configuration data for the operator
129	///
130	/// # Returns
131	/// * `Ok(FFIOperator)` - Successfully created operator
132	/// * `Err(FFIError)` - Creation failed
133	pub fn create_operator_by_name(
134		&mut self,
135		operator_name: &str,
136		operator_id: FlowNodeId,
137		config: &[u8],
138	) -> FFIResult<FFIOperator> {
139		// Look up the path for this operator
140		let path = self
141			.operator_paths
142			.get(operator_name)
143			.ok_or_else(|| FFIError::Other(format!("Operator not found: {}", operator_name)))?
144			.clone();
145
146		// Load operator from the known path
147		self.load_operator(&path, config, operator_id)
148	}
149
150	/// Check if an operator name is registered
151	pub fn has_operator(&self, operator_name: &str) -> bool {
152		self.operator_paths.contains_key(operator_name)
153	}
154
155	/// Unload a library
156	///
157	/// # Arguments
158	/// * `path` - Path to the library to unload
159	///
160	/// # Safety
161	/// This will invalidate any operators created from this library.
162	/// Ensure all operators from this library are destroyed first.
163	pub fn unload_library(&mut self, path: &Path) -> FFIResult<()> {
164		if self.loaded_libraries.remove(path).is_some() {
165			Ok(())
166		} else {
167			Err(FFIError::Other(format!("Library not loaded: {}", path.display())))
168		}
169	}
170
171	/// Get the number of loaded libraries
172	pub fn loaded_count(&self) -> usize {
173		self.loaded_libraries.len()
174	}
175
176	/// Check if a library is loaded
177	pub fn is_loaded(&self, path: &Path) -> bool {
178		self.loaded_libraries.contains_key(path)
179	}
180
181	/// List all loaded operators with their metadata
182	///
183	/// Returns a vector of tuples containing:
184	/// - Operator name
185	/// - Library path
186	/// - API version
187	pub fn list_loaded_operators(&self) -> Vec<(String, PathBuf, u32)> {
188		let mut operators = Vec::new();
189
190		for (path, library) in &self.loaded_libraries {
191			// Get the operator descriptor from the library
192			unsafe {
193				let get_descriptor: Result<Symbol<extern "C" fn() -> *const FFIOperatorDescriptor>, _> =
194					library.get(b"ffi_operator_get_descriptor\0");
195
196				if let Ok(get_descriptor) = get_descriptor {
197					let descriptor_ptr = get_descriptor();
198					if !descriptor_ptr.is_null() {
199						let descriptor = &*descriptor_ptr;
200
201						// Extract operator name
202						let operator_name = CStr::from_ptr(descriptor.operator_name)
203							.to_str()
204							.unwrap_or("<invalid UTF-8>")
205							.to_string();
206
207						operators.push((operator_name, path.clone(), descriptor.api_version));
208					}
209				}
210			}
211		}
212
213		operators
214	}
215}
216
217impl Default for FFIOperatorLoader {
218	fn default() -> Self {
219		Self::new()
220	}
221}
222
223impl Drop for FFIOperatorLoader {
224	fn drop(&mut self) {
225		// Libraries will be automatically unloaded when dropped
226		self.loaded_libraries.clear();
227	}
228}