reifydb_engine/transform/
loader.rs1#![cfg(reifydb_target = "native")]
2use std::{
8 collections::HashMap,
9 fs,
10 path::{Path, PathBuf},
11 slice, str,
12 sync::{OnceLock, RwLock},
13};
14
15use libloading::{Library, Symbol};
16use reifydb_abi::{
17 constants::CURRENT_API,
18 data::buffer::BufferFFI,
19 transform::{
20 descriptor::TransformDescriptorFFI,
21 types::{TRANSFORM_MAGIC, TransformCreateFnFFI, TransformMagicFnFFI},
22 },
23};
24use reifydb_sdk::error::{FFIError, Result as FFIResult};
25
26use super::{ffi::NativeTransformFFI, registry::Transforms};
27
28unsafe fn buffer_to_string(buffer: &BufferFFI) -> String {
33 if buffer.ptr.is_null() || buffer.len == 0 {
34 return String::new();
35 }
36 let slice = unsafe { slice::from_raw_parts(buffer.ptr, buffer.len) };
37 str::from_utf8(slice).unwrap_or("<invalid UTF-8>").to_string()
38}
39
40static GLOBAL_FFI_TRANSFORM_LOADER: OnceLock<RwLock<TransformLoader>> = OnceLock::new();
42
43pub fn ffi_transform_loader() -> &'static RwLock<TransformLoader> {
45 GLOBAL_FFI_TRANSFORM_LOADER.get_or_init(|| RwLock::new(TransformLoader::new()))
46}
47
48pub struct TransformLoader {
50 loaded_libraries: HashMap<PathBuf, Library>,
52 transform_paths: HashMap<String, PathBuf>,
54}
55
56impl TransformLoader {
57 fn new() -> Self {
58 Self {
59 loaded_libraries: HashMap::new(),
60 transform_paths: HashMap::new(),
61 }
62 }
63
64 pub fn load_transform_library(&mut self, path: &Path) -> FFIResult<bool> {
65 if !self.loaded_libraries.contains_key(path) {
66 let lib = unsafe {
67 Library::new(path).map_err(|e| {
68 FFIError::Other(format!("Failed to load library {}: {}", path.display(), e))
69 })?
70 };
71 self.loaded_libraries.insert(path.to_path_buf(), lib);
72 }
73
74 let library = self.loaded_libraries.get(path).unwrap();
75
76 let magic_result: Result<Symbol<TransformMagicFnFFI>, _> =
77 unsafe { library.get(b"ffi_transform_magic\0") };
78
79 match magic_result {
80 Ok(magic_fn) => {
81 let magic = magic_fn();
82 Ok(magic == TRANSFORM_MAGIC)
83 }
84 Err(_) => {
85 self.loaded_libraries.remove(path);
86 Ok(false)
87 }
88 }
89 }
90
91 fn get_descriptor(&self, library: &Library) -> FFIResult<TransformDescriptorFFI> {
92 unsafe {
93 let get_descriptor: Symbol<extern "C" fn() -> *const TransformDescriptorFFI> =
94 library.get(b"ffi_transform_get_descriptor\0").map_err(|e| {
95 FFIError::Other(format!("Failed to find ffi_transform_get_descriptor: {}", e))
96 })?;
97
98 let descriptor_ptr = get_descriptor();
99 if descriptor_ptr.is_null() {
100 return Err(FFIError::Other("Descriptor is null".to_string()));
101 }
102
103 Ok(TransformDescriptorFFI {
104 api: (*descriptor_ptr).api,
105 name: (*descriptor_ptr).name,
106 version: (*descriptor_ptr).version,
107 description: (*descriptor_ptr).description,
108 vtable: (*descriptor_ptr).vtable,
109 })
110 }
111 }
112
113 fn validate_and_register(
114 &mut self,
115 descriptor: &TransformDescriptorFFI,
116 path: &Path,
117 ) -> FFIResult<(String, u32)> {
118 if descriptor.api != CURRENT_API {
119 return Err(FFIError::Other(format!(
120 "API version mismatch: expected {}, got {}",
121 CURRENT_API, descriptor.api
122 )));
123 }
124
125 let name = unsafe { buffer_to_string(&descriptor.name) };
126 self.transform_paths.insert(name.clone(), path.to_path_buf());
127
128 Ok((name, descriptor.api))
129 }
130
131 pub fn register_transform(&mut self, path: &Path) -> FFIResult<Option<LoadedTransformInfo>> {
133 if !self.load_transform_library(path)? {
134 return Ok(None);
135 }
136
137 let library = self.loaded_libraries.get(path).unwrap();
138 let descriptor = self.get_descriptor(library)?;
139 let (name, api) = self.validate_and_register(&descriptor, path)?;
140
141 let info = unsafe {
142 LoadedTransformInfo {
143 name,
144 library_path: path.to_path_buf(),
145 api,
146 version: buffer_to_string(&descriptor.version),
147 description: buffer_to_string(&descriptor.description),
148 }
149 };
150
151 Ok(Some(info))
152 }
153
154 pub fn load_transform(&mut self, path: &Path, config: &[u8]) -> FFIResult<Option<NativeTransformFFI>> {
156 if !self.load_transform_library(path)? {
157 return Ok(None);
158 }
159
160 let descriptor = {
161 let library = self.loaded_libraries.get(path).unwrap();
162 self.get_descriptor(library)?
163 };
164
165 self.validate_and_register(&descriptor, path)?;
166
167 let library = self.loaded_libraries.get(path).unwrap();
168 let create_fn: TransformCreateFnFFI = unsafe {
169 let create_symbol: Symbol<TransformCreateFnFFI> = library
170 .get(b"ffi_transform_create\0")
171 .map_err(|e| FFIError::Other(format!("Failed to find ffi_transform_create: {}", e)))?;
172
173 *create_symbol
174 };
175
176 let instance = create_fn(config.as_ptr(), config.len());
177 if instance.is_null() {
178 return Err(FFIError::Other("Failed to create transform instance".to_string()));
179 }
180
181 Ok(Some(NativeTransformFFI::new(descriptor, instance)))
182 }
183
184 pub fn create_transform_by_name(&mut self, name: &str, config: &[u8]) -> FFIResult<NativeTransformFFI> {
186 let path = self
187 .transform_paths
188 .get(name)
189 .ok_or_else(|| FFIError::Other(format!("Transform not found: {}", name)))?
190 .clone();
191
192 self.load_transform(&path, config)?
193 .ok_or_else(|| FFIError::Other(format!("Transform library no longer valid: {}", name)))
194 }
195
196 pub fn has_transform(&self, name: &str) -> bool {
198 self.transform_paths.contains_key(name)
199 }
200}
201
202#[derive(Debug, Clone)]
204pub struct LoadedTransformInfo {
205 pub name: String,
206 pub library_path: PathBuf,
207 pub api: u32,
208 pub version: String,
209 pub description: String,
210}
211
212impl Default for TransformLoader {
213 fn default() -> Self {
214 Self::new()
215 }
216}
217
218impl Drop for TransformLoader {
219 fn drop(&mut self) {
220 self.loaded_libraries.clear();
221 }
222}
223
224pub fn load_transforms_from_dir(dir: &Path) -> FFIResult<Transforms> {
227 let loader = ffi_transform_loader();
228 let mut loader_guard = loader.write().unwrap();
229
230 let mut names = Vec::new();
231
232 let entries = fs::read_dir(dir)
233 .map_err(|e| FFIError::Other(format!("Failed to read directory {}: {}", dir.display(), e)))?;
234
235 for entry in entries {
236 let entry = entry.map_err(|e| FFIError::Other(format!("Failed to read directory entry: {}", e)))?;
237 let path = entry.path();
238 let ext = path.extension().and_then(|s| s.to_str());
239
240 if ext == Some("so") || ext == Some("dylib") {
241 match loader_guard.register_transform(&path) {
242 Ok(Some(info)) => {
243 names.push(info.name);
244 }
245 Ok(None) => {
246 }
248 Err(e) => {
249 eprintln!(
250 "Warning: Failed to register transform from {}: {}",
251 path.display(),
252 e
253 );
254 }
255 }
256 }
257 }
258
259 drop(loader_guard);
260
261 let mut builder = Transforms::builder();
262 for name in names {
263 let name_clone = name.clone();
264 builder = builder.register(&name, move || {
265 let loader = ffi_transform_loader();
266 let mut loader_guard = loader.write().unwrap();
267 loader_guard.create_transform_by_name(&name_clone, &[]).unwrap()
268 });
269 }
270
271 Ok(builder.build())
272}