Skip to main content

reifydb_sub_flow/ffi/
loader.rs

1#![cfg(reifydb_target = "native")]
2// SPDX-License-Identifier: AGPL-3.0-or-later
3// Copyright (c) 2025 ReifyDB
4
5//! FFI operator dynamic library loader
6
7use std::{
8	collections::HashMap,
9	path::{Path, PathBuf},
10	slice, str,
11	sync::{OnceLock, RwLock},
12};
13
14use libloading::{Library, Symbol};
15use reifydb_abi::{
16	constants::{CURRENT_API, OPERATOR_MAGIC},
17	data::buffer::BufferFFI,
18	operator::{
19		column::OperatorColumnDefsFFI,
20		descriptor::OperatorDescriptorFFI,
21		types::{OperatorCreateFnFFI, OperatorMagicFnFFI},
22	},
23};
24use reifydb_core::interface::catalog::flow::FlowNodeId;
25use reifydb_engine::vm::executor::Executor;
26use reifydb_sdk::error::{FFIError, Result as FFIResult};
27use reifydb_type::value::constraint::{FFITypeConstraint, TypeConstraint};
28
29use crate::operator::ffi::FFIOperator;
30
31/// Extract a UTF-8 string from a BufferFFI
32///
33/// # Safety
34/// The buffer must contain valid UTF-8 data and the pointer must be valid for the given length
35unsafe fn buffer_to_string(buffer: &BufferFFI) -> String {
36	if buffer.ptr.is_null() || buffer.len == 0 {
37		return String::new();
38	}
39	// SAFETY: caller guarantees buffer.ptr is valid for buffer.len bytes
40	let slice = unsafe { slice::from_raw_parts(buffer.ptr, buffer.len) };
41	str::from_utf8(slice).unwrap_or("<invalid UTF-8>").to_string()
42}
43
44/// Global singleton FFI operator loader
45/// Ensures libraries stay loaded for the entire process lifetime
46static GLOBAL_FFI_OPERATOR_LOADER: OnceLock<RwLock<FFIOperatorLoader>> = OnceLock::new();
47
48/// Get the global FFI operator loader
49pub fn ffi_operator_loader() -> &'static RwLock<FFIOperatorLoader> {
50	GLOBAL_FFI_OPERATOR_LOADER.get_or_init(|| RwLock::new(FFIOperatorLoader::new()))
51}
52
53/// FFI operator loader for dynamic libraries
54/// This is meant to be used as a singleton via get_global_loader()
55pub struct FFIOperatorLoader {
56	/// Loaded libraries mapped by path
57	loaded_libraries: HashMap<PathBuf, Library>,
58
59	/// Map of operator names to library paths for quick lookup
60	operator_paths: HashMap<String, PathBuf>,
61}
62
63impl FFIOperatorLoader {
64	/// Create a new FFI operator loader
65	fn new() -> Self {
66		Self {
67			loaded_libraries: HashMap::new(),
68			operator_paths: HashMap::new(),
69		}
70	}
71
72	pub fn load_operator_library(&mut self, path: &Path) -> FFIResult<bool> {
73		// Load the library if not already loaded
74		if !self.loaded_libraries.contains_key(path) {
75			let lib = unsafe {
76				Library::new(path).map_err(|e| {
77					FFIError::Other(format!("Failed to load library {}: {}", path.display(), e))
78				})?
79			};
80			self.loaded_libraries.insert(path.to_path_buf(), lib);
81		}
82
83		let library = self.loaded_libraries.get(path).unwrap();
84
85		// Check for magic symbol
86		let magic_result: Result<Symbol<OperatorMagicFnFFI>, _> =
87			unsafe { library.get(b"ffi_operator_magic\0") };
88
89		match magic_result {
90			Ok(magic_fn) => {
91				let magic = magic_fn();
92				Ok(magic == OPERATOR_MAGIC)
93			}
94			Err(_) => {
95				// Symbol not found - not an operator, remove from cache
96				self.loaded_libraries.remove(path);
97				Ok(false)
98			}
99		}
100	}
101
102	/// Get the operator descriptor from a loaded library
103	fn get_descriptor(&self, library: &Library) -> FFIResult<OperatorDescriptorFFI> {
104		unsafe {
105			let get_descriptor: Symbol<extern "C" fn() -> *const OperatorDescriptorFFI> =
106				library.get(b"ffi_operator_get_descriptor\0").map_err(|e| {
107					FFIError::Other(format!("Failed to find ffi_operator_get_descriptor: {}", e))
108				})?;
109
110			let descriptor_ptr = get_descriptor();
111			if descriptor_ptr.is_null() {
112				return Err(FFIError::Other("Descriptor is null".to_string()));
113			}
114
115			// Copy the descriptor fields
116			Ok(OperatorDescriptorFFI {
117				api: (*descriptor_ptr).api,
118				operator: (*descriptor_ptr).operator,
119				version: (*descriptor_ptr).version,
120				description: (*descriptor_ptr).description,
121				input_columns: (*descriptor_ptr).input_columns,
122				output_columns: (*descriptor_ptr).output_columns,
123				capabilities: (*descriptor_ptr).capabilities,
124				vtable: (*descriptor_ptr).vtable,
125			})
126		}
127	}
128
129	/// Validate descriptor and register operator name mapping
130	/// Returns the operator name and API version
131	fn validate_and_register(
132		&mut self,
133		descriptor: &OperatorDescriptorFFI,
134		path: &Path,
135	) -> FFIResult<(String, u32)> {
136		// Verify API version
137		if descriptor.api != CURRENT_API {
138			return Err(FFIError::Other(format!(
139				"API version mismatch: expected {}, got {}",
140				CURRENT_API, descriptor.api
141			)));
142		}
143
144		// Extract operator name
145		let operator = unsafe { buffer_to_string(&descriptor.operator) };
146
147		// Store operator name -> path mapping
148		self.operator_paths.insert(operator.clone(), path.to_path_buf());
149
150		Ok((operator, descriptor.api))
151	}
152
153	/// Register an operator library without instantiating it
154	///
155	/// This loads the library, validates it as an operator, and extracts metadata
156	/// without creating an operator instance. Use this for discovery/registration.
157	///
158	/// # Arguments
159	/// * `path` - Path to the shared library file
160	///
161	/// # Returns
162	/// * `Ok(Some(LoadedOperatorInfo))` - Successfully registered operator with full metadata
163	/// * `Ok(None)` - Library is not a valid FFI operator (silently skipped)
164	/// * `Err(FFIError)` - Loading or validation failed
165	pub fn register_operator(&mut self, path: &Path) -> FFIResult<Option<LoadedOperatorInfo>> {
166		if !self.load_operator_library(path)? {
167			return Ok(None);
168		}
169
170		let library = self.loaded_libraries.get(path).unwrap();
171		let descriptor = self.get_descriptor(library)?;
172		let (operator, api) = self.validate_and_register(&descriptor, path)?;
173
174		// Extract full operator info including column definitions
175		let info = unsafe {
176			LoadedOperatorInfo {
177				operator,
178				library_path: path.to_path_buf(),
179				api,
180				version: buffer_to_string(&descriptor.version),
181				description: buffer_to_string(&descriptor.description),
182				input_columns: extract_column_defs(&descriptor.input_columns),
183				output_columns: extract_column_defs(&descriptor.output_columns),
184				capabilities: descriptor.capabilities,
185			}
186		};
187
188		Ok(Some(info))
189	}
190
191	/// Load an operator from a dynamic library
192	///
193	/// # Arguments
194	/// * `path` - Path to the shared library file
195	/// * `config` - Operator configuration data
196	/// * `operator_id` - ID for this operator instance
197	///
198	/// # Returns
199	/// * `Ok(Some(FFIOperator))` - Successfully loaded operator
200	/// * `Ok(None)` - Library is not a valid FFI operator (silently skipped)
201	/// * `Err(FFIError)` - Loading or initialization failed
202	pub fn load_operator(
203		&mut self,
204		path: &Path,
205		config: &[u8],
206		operator_id: FlowNodeId,
207		executor: Executor,
208	) -> FFIResult<Option<FFIOperator>> {
209		if !self.load_operator_library(path)? {
210			return Ok(None);
211		}
212
213		// Get descriptor and validate - done in separate scope to avoid borrow conflicts
214		let descriptor = {
215			let library = self.loaded_libraries.get(path).unwrap();
216			self.get_descriptor(library)?
217		};
218
219		self.validate_and_register(&descriptor, path)?;
220
221		// Get the create function and instantiate operator
222		let library = self.loaded_libraries.get(path).unwrap();
223		let create_fn: OperatorCreateFnFFI = unsafe {
224			let create_symbol: Symbol<OperatorCreateFnFFI> = library
225				.get(b"ffi_operator_create\0")
226				.map_err(|e| FFIError::Other(format!("Failed to find ffi_operator_create: {}", e)))?;
227
228			*create_symbol
229		};
230
231		// Create the operator instance
232		let instance = create_fn(config.as_ptr(), config.len(), operator_id.0);
233		if instance.is_null() {
234			return Err(FFIError::Other("Failed to create operator instance".to_string()));
235		}
236
237		// Create the FFI operator wrapper
238		// Library stays loaded via global cache and loader reference
239		Ok(Some(FFIOperator::new(descriptor, instance, operator_id, executor)))
240	}
241
242	/// Create an operator instance from an already loaded library by name
243	///
244	/// # Arguments
245	/// * `operator` - Name of the operator type
246	/// * `operator_id` - Node ID for this operator instance
247	/// * `config` - Configuration data for the operator
248	///
249	/// # Returns
250	/// * `Ok(FFIOperator)` - Successfully created operator
251	/// * `Err(FFIError)` - Creation failed
252	pub fn create_operator_by_name(
253		&mut self,
254		operator: &str,
255		operator_id: FlowNodeId,
256		config: &[u8],
257		executor: Executor,
258	) -> FFIResult<FFIOperator> {
259		let path = self
260			.operator_paths
261			.get(operator)
262			.ok_or_else(|| FFIError::Other(format!("Operator not found: {}", operator)))?
263			.clone();
264
265		// Load operator from the known path
266		// Since this operator was previously registered, it should always be valid
267		self.load_operator(&path, config, operator_id, executor)?
268			.ok_or_else(|| FFIError::Other(format!("Operator library no longer valid: {}", operator)))
269	}
270
271	/// Check if an operator name is registered
272	pub fn has_operator(&self, operator: &str) -> bool {
273		self.operator_paths.contains_key(operator)
274	}
275
276	/// List all loaded operators with their metadata
277	pub fn list_loaded_operators(&self) -> Vec<LoadedOperatorInfo> {
278		let mut operators = Vec::new();
279
280		for (path, library) in &self.loaded_libraries {
281			// Get the operator descriptor from the library
282			unsafe {
283				let get_descriptor: Result<Symbol<extern "C" fn() -> *const OperatorDescriptorFFI>, _> =
284					library.get(b"ffi_operator_get_descriptor\0");
285
286				if let Ok(get_descriptor) = get_descriptor {
287					let descriptor_ptr = get_descriptor();
288					if !descriptor_ptr.is_null() {
289						let descriptor = &*descriptor_ptr;
290
291						operators.push(LoadedOperatorInfo {
292							operator: buffer_to_string(&descriptor.operator),
293							library_path: path.clone(),
294							api: descriptor.api,
295							version: buffer_to_string(&descriptor.version),
296							description: buffer_to_string(&descriptor.description),
297							input_columns: extract_column_defs(&descriptor.input_columns),
298							output_columns: extract_column_defs(&descriptor.output_columns),
299							capabilities: descriptor.capabilities,
300						});
301					}
302				}
303			}
304		}
305
306		operators
307	}
308}
309
310/// Information about a loaded FFI operator
311#[derive(Debug, Clone)]
312pub struct LoadedOperatorInfo {
313	pub operator: String,
314	pub library_path: PathBuf,
315	pub api: u32,
316	pub version: String,
317	pub description: String,
318	pub input_columns: Vec<ColumnDefInfo>,
319	pub output_columns: Vec<ColumnDefInfo>,
320	pub capabilities: u32,
321}
322
323/// Information about a single column definition in an operator
324#[derive(Debug, Clone)]
325pub struct ColumnDefInfo {
326	pub name: String,
327	pub field_type: TypeConstraint,
328	pub description: String,
329}
330
331/// Extract column definitions from an OperatorColumnDefsFFI
332///
333/// # Safety
334/// The column_defs must have valid columns pointer for column_count elements
335unsafe fn extract_column_defs(column_defs: &OperatorColumnDefsFFI) -> Vec<ColumnDefInfo> {
336	if column_defs.columns.is_null() || column_defs.column_count == 0 {
337		return Vec::new();
338	}
339
340	let mut columns = Vec::with_capacity(column_defs.column_count);
341	for i in 0..column_defs.column_count {
342		// SAFETY: caller guarantees column_defs.columns is valid for column_count elements
343		let col = unsafe { &*column_defs.columns.add(i) };
344
345		// Reconstruct TypeConstraint from FFI fields
346		let field_type = TypeConstraint::from_ffi(FFITypeConstraint {
347			base_type: col.base_type,
348			constraint_type: col.constraint_type,
349			constraint_param1: col.constraint_param1,
350			constraint_param2: col.constraint_param2,
351		});
352
353		columns.push(ColumnDefInfo {
354			// SAFETY: column buffers are valid UTF-8 strings from the operator
355			name: unsafe { buffer_to_string(&col.name) },
356			field_type,
357			description: unsafe { buffer_to_string(&col.description) },
358		});
359	}
360
361	columns
362}
363
364impl Default for FFIOperatorLoader {
365	fn default() -> Self {
366		Self::new()
367	}
368}
369
370impl Drop for FFIOperatorLoader {
371	fn drop(&mut self) {
372		// Libraries will be automatically unloaded when dropped
373		self.loaded_libraries.clear();
374	}
375}