reifydb_engine/procedure/
loader.rs1#![cfg(reifydb_target = "native")]
2use std::{
8 collections::HashMap,
9 fs,
10 path::{Path, PathBuf},
11 slice, str,
12 sync::{OnceLock, RwLock},
13};
14
15use libloading::{Library, Symbol};
16use reifydb_abi::{
17 constants::CURRENT_API,
18 data::buffer::BufferFFI,
19 procedure::{
20 descriptor::ProcedureDescriptorFFI,
21 types::{PROCEDURE_MAGIC, ProcedureCreateFnFFI, ProcedureMagicFnFFI},
22 },
23};
24use reifydb_sdk::error::{FFIError, Result as FFIResult};
25
26use super::{
27 ffi::NativeProcedureFFI,
28 registry::{Procedures, ProceduresBuilder},
29};
30
31unsafe fn buffer_to_string(buffer: &BufferFFI) -> String {
36 if buffer.ptr.is_null() || buffer.len == 0 {
37 return String::new();
38 }
39 let slice = unsafe { slice::from_raw_parts(buffer.ptr, buffer.len) };
40 str::from_utf8(slice).unwrap_or("<invalid UTF-8>").to_string()
41}
42
43static GLOBAL_FFI_PROCEDURE_LOADER: OnceLock<RwLock<ProcedureLoader>> = OnceLock::new();
45
46pub fn ffi_procedure_loader() -> &'static RwLock<ProcedureLoader> {
48 GLOBAL_FFI_PROCEDURE_LOADER.get_or_init(|| RwLock::new(ProcedureLoader::new()))
49}
50
51pub struct ProcedureLoader {
53 loaded_libraries: HashMap<PathBuf, Library>,
55 procedure_paths: HashMap<String, PathBuf>,
57}
58
59impl ProcedureLoader {
60 fn new() -> Self {
61 Self {
62 loaded_libraries: HashMap::new(),
63 procedure_paths: HashMap::new(),
64 }
65 }
66
67 pub fn load_procedure_library(&mut self, path: &Path) -> FFIResult<bool> {
68 if !self.loaded_libraries.contains_key(path) {
69 let lib = unsafe {
70 Library::new(path).map_err(|e| {
71 FFIError::Other(format!("Failed to load library {}: {}", path.display(), e))
72 })?
73 };
74 self.loaded_libraries.insert(path.to_path_buf(), lib);
75 }
76
77 let library = self.loaded_libraries.get(path).unwrap();
78
79 let magic_result: Result<Symbol<ProcedureMagicFnFFI>, _> =
80 unsafe { library.get(b"ffi_procedure_magic\0") };
81
82 match magic_result {
83 Ok(magic_fn) => {
84 let magic = magic_fn();
85 Ok(magic == PROCEDURE_MAGIC)
86 }
87 Err(_) => {
88 self.loaded_libraries.remove(path);
89 Ok(false)
90 }
91 }
92 }
93
94 fn get_descriptor(&self, library: &Library) -> FFIResult<ProcedureDescriptorFFI> {
95 unsafe {
96 let get_descriptor: Symbol<extern "C" fn() -> *const ProcedureDescriptorFFI> =
97 library.get(b"ffi_procedure_get_descriptor\0").map_err(|e| {
98 FFIError::Other(format!("Failed to find ffi_procedure_get_descriptor: {}", e))
99 })?;
100
101 let descriptor_ptr = get_descriptor();
102 if descriptor_ptr.is_null() {
103 return Err(FFIError::Other("Descriptor is null".to_string()));
104 }
105
106 Ok(ProcedureDescriptorFFI {
107 api: (*descriptor_ptr).api,
108 name: (*descriptor_ptr).name,
109 version: (*descriptor_ptr).version,
110 description: (*descriptor_ptr).description,
111 vtable: (*descriptor_ptr).vtable,
112 })
113 }
114 }
115
116 fn validate_and_register(
117 &mut self,
118 descriptor: &ProcedureDescriptorFFI,
119 path: &Path,
120 ) -> FFIResult<(String, u32)> {
121 if descriptor.api != CURRENT_API {
122 return Err(FFIError::Other(format!(
123 "API version mismatch: expected {}, got {}",
124 CURRENT_API, descriptor.api
125 )));
126 }
127
128 let name = unsafe { buffer_to_string(&descriptor.name) };
129 self.procedure_paths.insert(name.clone(), path.to_path_buf());
130
131 Ok((name, descriptor.api))
132 }
133
134 pub fn register_procedure(&mut self, path: &Path) -> FFIResult<Option<LoadedProcedureInfo>> {
136 if !self.load_procedure_library(path)? {
137 return Ok(None);
138 }
139
140 let library = self.loaded_libraries.get(path).unwrap();
141 let descriptor = self.get_descriptor(library)?;
142 let (name, api) = self.validate_and_register(&descriptor, path)?;
143
144 let info = unsafe {
145 LoadedProcedureInfo {
146 name,
147 library_path: path.to_path_buf(),
148 api,
149 version: buffer_to_string(&descriptor.version),
150 description: buffer_to_string(&descriptor.description),
151 }
152 };
153
154 Ok(Some(info))
155 }
156
157 pub fn load_procedure(&mut self, path: &Path, config: &[u8]) -> FFIResult<Option<NativeProcedureFFI>> {
159 if !self.load_procedure_library(path)? {
160 return Ok(None);
161 }
162
163 let descriptor = {
164 let library = self.loaded_libraries.get(path).unwrap();
165 self.get_descriptor(library)?
166 };
167
168 self.validate_and_register(&descriptor, path)?;
169
170 let library = self.loaded_libraries.get(path).unwrap();
171 let create_fn: ProcedureCreateFnFFI = unsafe {
172 let create_symbol: Symbol<ProcedureCreateFnFFI> = library
173 .get(b"ffi_procedure_create\0")
174 .map_err(|e| FFIError::Other(format!("Failed to find ffi_procedure_create: {}", e)))?;
175
176 *create_symbol
177 };
178
179 let instance = create_fn(config.as_ptr(), config.len());
180 if instance.is_null() {
181 return Err(FFIError::Other("Failed to create procedure instance".to_string()));
182 }
183
184 Ok(Some(NativeProcedureFFI::new(descriptor, instance)))
185 }
186
187 pub fn create_procedure_by_name(&mut self, name: &str, config: &[u8]) -> FFIResult<NativeProcedureFFI> {
189 let path = self
190 .procedure_paths
191 .get(name)
192 .ok_or_else(|| FFIError::Other(format!("Procedure not found: {}", name)))?
193 .clone();
194
195 self.load_procedure(&path, config)?
196 .ok_or_else(|| FFIError::Other(format!("Procedure library no longer valid: {}", name)))
197 }
198
199 pub fn has_procedure(&self, name: &str) -> bool {
201 self.procedure_paths.contains_key(name)
202 }
203}
204
205#[derive(Debug, Clone)]
207pub struct LoadedProcedureInfo {
208 pub name: String,
209 pub library_path: PathBuf,
210 pub api: u32,
211 pub version: String,
212 pub description: String,
213}
214
215impl Default for ProcedureLoader {
216 fn default() -> Self {
217 Self::new()
218 }
219}
220
221impl Drop for ProcedureLoader {
222 fn drop(&mut self) {
223 self.loaded_libraries.clear();
224 }
225}
226
227pub fn register_procedures_from_dir(dir: &Path, mut builder: ProceduresBuilder) -> FFIResult<ProceduresBuilder> {
230 let loader = ffi_procedure_loader();
231 let mut loader_guard = loader.write().unwrap();
232
233 let mut names = Vec::new();
234
235 let entries = fs::read_dir(dir)
236 .map_err(|e| FFIError::Other(format!("Failed to read directory {}: {}", dir.display(), e)))?;
237
238 for entry in entries {
239 let entry = entry.map_err(|e| FFIError::Other(format!("Failed to read directory entry: {}", e)))?;
240 let path = entry.path();
241 let ext = path.extension().and_then(|s| s.to_str());
242
243 if ext == Some("so") || ext == Some("dylib") {
244 match loader_guard.register_procedure(&path) {
245 Ok(Some(info)) => {
246 names.push(info.name);
247 }
248 Ok(None) => {
249 }
251 Err(e) => {
252 eprintln!(
253 "Warning: Failed to register procedure from {}: {}",
254 path.display(),
255 e
256 );
257 }
258 }
259 }
260 }
261
262 drop(loader_guard);
263
264 for name in names {
265 let name_clone = name.clone();
266 builder = builder.with_procedure(&name, move || {
267 let loader = ffi_procedure_loader();
268 let mut loader_guard = loader.write().unwrap();
269 loader_guard.create_procedure_by_name(&name_clone, &[]).unwrap()
270 });
271 }
272
273 Ok(builder)
274}
275
276pub fn load_procedures_from_dir(dir: &Path) -> FFIResult<Procedures> {
279 Ok(register_procedures_from_dir(dir, Procedures::builder())?.build())
280}