reifydb_sub_flow/ffi/
loader.rs

1//! FFI operator dynamic library loader
2
3use std::{
4	collections::HashMap,
5	path::{Path, PathBuf},
6	sync::{OnceLock, RwLock},
7};
8
9use libloading::{Library, Symbol};
10use reifydb_core::interface::FlowNodeId;
11use reifydb_flow_operator_abi::{
12	BufferFFI, CURRENT_API, FFIOperatorColumnDefs, FFIOperatorCreateFn, FFIOperatorDescriptor, FFIOperatorMagicFn,
13	OPERATOR_MAGIC,
14};
15use reifydb_flow_operator_sdk::{FFIError, Result as FFIResult};
16
17use crate::operator::FFIOperator;
18
19/// Extract a UTF-8 string from a BufferFFI
20///
21/// # Safety
22/// The buffer must contain valid UTF-8 data and the pointer must be valid for the given length
23unsafe fn buffer_to_string(buffer: &BufferFFI) -> String {
24	if buffer.ptr.is_null() || buffer.len == 0 {
25		return String::new();
26	}
27	// SAFETY: caller guarantees buffer.ptr is valid for buffer.len bytes
28	let slice = unsafe { std::slice::from_raw_parts(buffer.ptr, buffer.len) };
29	std::str::from_utf8(slice).unwrap_or("<invalid UTF-8>").to_string()
30}
31
32/// Global singleton FFI operator loader
33/// Ensures libraries stay loaded for the entire process lifetime
34static GLOBAL_FFI_OPERATOR_LOADER: OnceLock<RwLock<FFIOperatorLoader>> = OnceLock::new();
35
36/// Get the global FFI operator loader
37pub fn ffi_operator_loader() -> &'static RwLock<FFIOperatorLoader> {
38	GLOBAL_FFI_OPERATOR_LOADER.get_or_init(|| RwLock::new(FFIOperatorLoader::new()))
39}
40
41/// FFI operator loader for dynamic libraries
42/// This is meant to be used as a singleton via get_global_loader()
43pub struct FFIOperatorLoader {
44	/// Loaded libraries mapped by path
45	loaded_libraries: HashMap<PathBuf, Library>,
46
47	/// Map of operator names to library paths for quick lookup
48	operator_paths: HashMap<String, PathBuf>,
49}
50
51impl FFIOperatorLoader {
52	/// Create a new FFI operator loader
53	fn new() -> Self {
54		Self {
55			loaded_libraries: HashMap::new(),
56			operator_paths: HashMap::new(),
57		}
58	}
59
60	pub fn load_operator_library(&mut self, path: &Path) -> FFIResult<bool> {
61		// Load the library if not already loaded
62		if !self.loaded_libraries.contains_key(path) {
63			let lib = unsafe {
64				Library::new(path).map_err(|e| {
65					FFIError::Other(format!("Failed to load library {}: {}", path.display(), e))
66				})?
67			};
68			self.loaded_libraries.insert(path.to_path_buf(), lib);
69		}
70
71		let library = self.loaded_libraries.get(path).unwrap();
72
73		// Check for magic symbol
74		let magic_result: Result<Symbol<FFIOperatorMagicFn>, _> =
75			unsafe { library.get(b"ffi_operator_magic\0") };
76
77		match magic_result {
78			Ok(magic_fn) => {
79				let magic = magic_fn();
80				Ok(magic == OPERATOR_MAGIC)
81			}
82			Err(_) => {
83				// Symbol not found - not an operator, remove from cache
84				self.loaded_libraries.remove(path);
85				Ok(false)
86			}
87		}
88	}
89
90	/// Get the operator descriptor from a loaded library
91	fn get_descriptor(&self, library: &Library) -> FFIResult<FFIOperatorDescriptor> {
92		unsafe {
93			let get_descriptor: Symbol<extern "C" fn() -> *const FFIOperatorDescriptor> =
94				library.get(b"ffi_operator_get_descriptor\0").map_err(|e| {
95					FFIError::Other(format!("Failed to find ffi_operator_get_descriptor: {}", e))
96				})?;
97
98			let descriptor_ptr = get_descriptor();
99			if descriptor_ptr.is_null() {
100				return Err(FFIError::Other("Descriptor is null".to_string()));
101			}
102
103			// Copy the descriptor fields
104			Ok(FFIOperatorDescriptor {
105				api: (*descriptor_ptr).api,
106				operator: (*descriptor_ptr).operator,
107				version: (*descriptor_ptr).version,
108				description: (*descriptor_ptr).description,
109				input_columns: (*descriptor_ptr).input_columns,
110				output_columns: (*descriptor_ptr).output_columns,
111				capabilities: (*descriptor_ptr).capabilities,
112				vtable: (*descriptor_ptr).vtable,
113			})
114		}
115	}
116
117	/// Validate descriptor and register operator name mapping
118	/// Returns the operator name and API version
119	fn validate_and_register(
120		&mut self,
121		descriptor: &FFIOperatorDescriptor,
122		path: &Path,
123	) -> FFIResult<(String, u32)> {
124		// Verify API version
125		if descriptor.api != CURRENT_API {
126			return Err(FFIError::Other(format!(
127				"API version mismatch: expected {}, got {}",
128				CURRENT_API, descriptor.api
129			)));
130		}
131
132		// Extract operator name
133		let operator = unsafe { buffer_to_string(&descriptor.operator) };
134
135		// Store operator name -> path mapping
136		self.operator_paths.insert(operator.clone(), path.to_path_buf());
137
138		Ok((operator, descriptor.api))
139	}
140
141	/// Register an operator library without instantiating it
142	///
143	/// This loads the library, validates it as an operator, and extracts metadata
144	/// without creating an operator instance. Use this for discovery/registration.
145	///
146	/// # Arguments
147	/// * `path` - Path to the shared library file
148	///
149	/// # Returns
150	/// * `Ok(Some(LoadedOperatorInfo))` - Successfully registered operator with full metadata
151	/// * `Ok(None)` - Library is not a valid FFI operator (silently skipped)
152	/// * `Err(FFIError)` - Loading or validation failed
153	pub fn register_operator(&mut self, path: &Path) -> FFIResult<Option<LoadedOperatorInfo>> {
154		if !self.load_operator_library(path)? {
155			return Ok(None);
156		}
157
158		let library = self.loaded_libraries.get(path).unwrap();
159		let descriptor = self.get_descriptor(library)?;
160		let (operator, api) = self.validate_and_register(&descriptor, path)?;
161
162		// Extract full operator info including column definitions
163		let info = unsafe {
164			LoadedOperatorInfo {
165				operator,
166				library_path: path.to_path_buf(),
167				api,
168				version: buffer_to_string(&descriptor.version),
169				description: buffer_to_string(&descriptor.description),
170				input_columns: extract_column_defs(&descriptor.input_columns),
171				output_columns: extract_column_defs(&descriptor.output_columns),
172				capabilities: descriptor.capabilities,
173			}
174		};
175
176		Ok(Some(info))
177	}
178
179	/// Load an operator from a dynamic library
180	///
181	/// # Arguments
182	/// * `path` - Path to the shared library file
183	/// * `config` - Operator configuration data
184	/// * `operator_id` - ID for this operator instance
185	///
186	/// # Returns
187	/// * `Ok(Some(FFIOperator))` - Successfully loaded operator
188	/// * `Ok(None)` - Library is not a valid FFI operator (silently skipped)
189	/// * `Err(FFIError)` - Loading or initialization failed
190	pub fn load_operator(
191		&mut self,
192		path: &Path,
193		config: &[u8],
194		operator_id: FlowNodeId,
195	) -> FFIResult<Option<FFIOperator>> {
196		if !self.load_operator_library(path)? {
197			return Ok(None);
198		}
199
200		// Get descriptor and validate - done in separate scope to avoid borrow conflicts
201		let descriptor = {
202			let library = self.loaded_libraries.get(path).unwrap();
203			self.get_descriptor(library)?
204		};
205
206		self.validate_and_register(&descriptor, path)?;
207
208		// Get the create function and instantiate operator
209		let library = self.loaded_libraries.get(path).unwrap();
210		let create_fn: FFIOperatorCreateFn = unsafe {
211			let create_symbol: Symbol<FFIOperatorCreateFn> = library
212				.get(b"ffi_operator_create\0")
213				.map_err(|e| FFIError::Other(format!("Failed to find ffi_operator_create: {}", e)))?;
214
215			*create_symbol
216		};
217
218		// Create the operator instance
219		let instance = create_fn(config.as_ptr(), config.len(), operator_id.0);
220		if instance.is_null() {
221			return Err(FFIError::Other("Failed to create operator instance".to_string()));
222		}
223
224		// Create the FFI operator wrapper
225		// Library stays loaded via global cache and loader reference
226		Ok(Some(FFIOperator::new(descriptor, instance, operator_id)))
227	}
228
229	/// Create an operator instance from an already loaded library by name
230	///
231	/// # Arguments
232	/// * `operator` - Name of the operator type
233	/// * `operator_id` - Node ID for this operator instance
234	/// * `config` - Configuration data for the operator
235	///
236	/// # Returns
237	/// * `Ok(FFIOperator)` - Successfully created operator
238	/// * `Err(FFIError)` - Creation failed
239	pub fn create_operator_by_name(
240		&mut self,
241		operator: &str,
242		operator_id: FlowNodeId,
243		config: &[u8],
244	) -> FFIResult<FFIOperator> {
245		let path = self
246			.operator_paths
247			.get(operator)
248			.ok_or_else(|| FFIError::Other(format!("Operator not found: {}", operator)))?
249			.clone();
250
251		// Load operator from the known path
252		// Since this operator was previously registered, it should always be valid
253		self.load_operator(&path, config, operator_id)?
254			.ok_or_else(|| FFIError::Other(format!("Operator library no longer valid: {}", operator)))
255	}
256
257	/// Check if an operator name is registered
258	pub fn has_operator(&self, operator: &str) -> bool {
259		self.operator_paths.contains_key(operator)
260	}
261
262	/// Unload a library
263	///
264	/// # Arguments
265	/// * `path` - Path to the library to unload
266	///
267	/// # Safety
268	/// This will invalidate any operators created from this library.
269	/// Ensure all operators from this library are destroyed first.
270	pub fn unload_library(&mut self, path: &Path) -> FFIResult<()> {
271		if self.loaded_libraries.remove(path).is_some() {
272			Ok(())
273		} else {
274			Err(FFIError::Other(format!("Library not loaded: {}", path.display())))
275		}
276	}
277
278	/// Get the number of loaded libraries
279	pub fn loaded_count(&self) -> usize {
280		self.loaded_libraries.len()
281	}
282
283	/// Check if a library is loaded
284	pub fn is_loaded(&self, path: &Path) -> bool {
285		self.loaded_libraries.contains_key(path)
286	}
287
288	/// List all loaded operators with their metadata
289	pub fn list_loaded_operators(&self) -> Vec<LoadedOperatorInfo> {
290		let mut operators = Vec::new();
291
292		for (path, library) in &self.loaded_libraries {
293			// Get the operator descriptor from the library
294			unsafe {
295				let get_descriptor: Result<Symbol<extern "C" fn() -> *const FFIOperatorDescriptor>, _> =
296					library.get(b"ffi_operator_get_descriptor\0");
297
298				if let Ok(get_descriptor) = get_descriptor {
299					let descriptor_ptr = get_descriptor();
300					if !descriptor_ptr.is_null() {
301						let descriptor = &*descriptor_ptr;
302
303						operators.push(LoadedOperatorInfo {
304							operator: buffer_to_string(&descriptor.operator),
305							library_path: path.clone(),
306							api: descriptor.api,
307							version: buffer_to_string(&descriptor.version),
308							description: buffer_to_string(&descriptor.description),
309							input_columns: extract_column_defs(&descriptor.input_columns),
310							output_columns: extract_column_defs(&descriptor.output_columns),
311							capabilities: descriptor.capabilities,
312						});
313					}
314				}
315			}
316		}
317
318		operators
319	}
320}
321
322/// Information about a loaded FFI operator
323#[derive(Debug, Clone)]
324pub struct LoadedOperatorInfo {
325	pub operator: String,
326	pub library_path: PathBuf,
327	pub api: u32,
328	pub version: String,
329	pub description: String,
330	pub input_columns: Vec<ColumnDefInfo>,
331	pub output_columns: Vec<ColumnDefInfo>,
332	pub capabilities: u32,
333}
334
335/// Information about a single column definition in an operator
336#[derive(Debug, Clone)]
337pub struct ColumnDefInfo {
338	pub name: String,
339	pub field_type: reifydb_type::TypeConstraint,
340	pub description: String,
341}
342
343/// Extract column definitions from an FFIOperatorColumnDefs
344///
345/// # Safety
346/// The column_defs must have valid columns pointer for column_count elements
347unsafe fn extract_column_defs(column_defs: &FFIOperatorColumnDefs) -> Vec<ColumnDefInfo> {
348	use reifydb_type::{FFITypeConstraint, TypeConstraint};
349
350	if column_defs.columns.is_null() || column_defs.column_count == 0 {
351		return Vec::new();
352	}
353
354	let mut columns = Vec::with_capacity(column_defs.column_count);
355	for i in 0..column_defs.column_count {
356		// SAFETY: caller guarantees column_defs.columns is valid for column_count elements
357		let col = unsafe { &*column_defs.columns.add(i) };
358
359		// Reconstruct TypeConstraint from FFI fields
360		let field_type = TypeConstraint::from_ffi(FFITypeConstraint {
361			base_type: col.base_type,
362			constraint_type: col.constraint_type,
363			constraint_param1: col.constraint_param1,
364			constraint_param2: col.constraint_param2,
365		});
366
367		columns.push(ColumnDefInfo {
368			// SAFETY: column buffers are valid UTF-8 strings from the operator
369			name: unsafe { buffer_to_string(&col.name) },
370			field_type,
371			description: unsafe { buffer_to_string(&col.description) },
372		});
373	}
374
375	columns
376}
377
378impl Default for FFIOperatorLoader {
379	fn default() -> Self {
380		Self::new()
381	}
382}
383
384impl Drop for FFIOperatorLoader {
385	fn drop(&mut self) {
386		// Libraries will be automatically unloaded when dropped
387		self.loaded_libraries.clear();
388	}
389}