reifydb_engine/transform/
loader.rs1#![cfg(reifydb_target = "native")]
2use std::{
8 collections::HashMap,
9 path::{Path, PathBuf},
10 sync::{OnceLock, RwLock},
11};
12
13use libloading::{Library, Symbol};
14use reifydb_abi::{
15 constants::CURRENT_API,
16 data::buffer::BufferFFI,
17 transform::{
18 descriptor::TransformDescriptorFFI,
19 types::{TRANSFORM_MAGIC, TransformCreateFnFFI, TransformMagicFnFFI},
20 },
21};
22use reifydb_sdk::error::{FFIError, Result as FFIResult};
23
24use super::{ffi::NativeTransformFFI, registry::Transforms};
25
26unsafe fn buffer_to_string(buffer: &BufferFFI) -> String {
31 if buffer.ptr.is_null() || buffer.len == 0 {
32 return String::new();
33 }
34 let slice = unsafe { std::slice::from_raw_parts(buffer.ptr, buffer.len) };
35 std::str::from_utf8(slice).unwrap_or("<invalid UTF-8>").to_string()
36}
37
38static GLOBAL_FFI_TRANSFORM_LOADER: OnceLock<RwLock<TransformLoader>> = OnceLock::new();
40
41pub fn ffi_transform_loader() -> &'static RwLock<TransformLoader> {
43 GLOBAL_FFI_TRANSFORM_LOADER.get_or_init(|| RwLock::new(TransformLoader::new()))
44}
45
46pub struct TransformLoader {
48 loaded_libraries: HashMap<PathBuf, Library>,
50 transform_paths: HashMap<String, PathBuf>,
52}
53
54impl TransformLoader {
55 fn new() -> Self {
56 Self {
57 loaded_libraries: HashMap::new(),
58 transform_paths: HashMap::new(),
59 }
60 }
61
62 pub fn load_transform_library(&mut self, path: &Path) -> FFIResult<bool> {
63 if !self.loaded_libraries.contains_key(path) {
64 let lib = unsafe {
65 Library::new(path).map_err(|e| {
66 FFIError::Other(format!("Failed to load library {}: {}", path.display(), e))
67 })?
68 };
69 self.loaded_libraries.insert(path.to_path_buf(), lib);
70 }
71
72 let library = self.loaded_libraries.get(path).unwrap();
73
74 let magic_result: Result<Symbol<TransformMagicFnFFI>, _> =
75 unsafe { library.get(b"ffi_transform_magic\0") };
76
77 match magic_result {
78 Ok(magic_fn) => {
79 let magic = magic_fn();
80 Ok(magic == TRANSFORM_MAGIC)
81 }
82 Err(_) => {
83 self.loaded_libraries.remove(path);
84 Ok(false)
85 }
86 }
87 }
88
89 fn get_descriptor(&self, library: &Library) -> FFIResult<TransformDescriptorFFI> {
90 unsafe {
91 let get_descriptor: Symbol<extern "C" fn() -> *const TransformDescriptorFFI> =
92 library.get(b"ffi_transform_get_descriptor\0").map_err(|e| {
93 FFIError::Other(format!("Failed to find ffi_transform_get_descriptor: {}", e))
94 })?;
95
96 let descriptor_ptr = get_descriptor();
97 if descriptor_ptr.is_null() {
98 return Err(FFIError::Other("Descriptor is null".to_string()));
99 }
100
101 Ok(TransformDescriptorFFI {
102 api: (*descriptor_ptr).api,
103 name: (*descriptor_ptr).name,
104 version: (*descriptor_ptr).version,
105 description: (*descriptor_ptr).description,
106 vtable: (*descriptor_ptr).vtable,
107 })
108 }
109 }
110
111 fn validate_and_register(
112 &mut self,
113 descriptor: &TransformDescriptorFFI,
114 path: &Path,
115 ) -> FFIResult<(String, u32)> {
116 if descriptor.api != CURRENT_API {
117 return Err(FFIError::Other(format!(
118 "API version mismatch: expected {}, got {}",
119 CURRENT_API, descriptor.api
120 )));
121 }
122
123 let name = unsafe { buffer_to_string(&descriptor.name) };
124 self.transform_paths.insert(name.clone(), path.to_path_buf());
125
126 Ok((name, descriptor.api))
127 }
128
129 pub fn register_transform(&mut self, path: &Path) -> FFIResult<Option<LoadedTransformInfo>> {
131 if !self.load_transform_library(path)? {
132 return Ok(None);
133 }
134
135 let library = self.loaded_libraries.get(path).unwrap();
136 let descriptor = self.get_descriptor(library)?;
137 let (name, api) = self.validate_and_register(&descriptor, path)?;
138
139 let info = unsafe {
140 LoadedTransformInfo {
141 name,
142 library_path: path.to_path_buf(),
143 api,
144 version: buffer_to_string(&descriptor.version),
145 description: buffer_to_string(&descriptor.description),
146 }
147 };
148
149 Ok(Some(info))
150 }
151
152 pub fn load_transform(&mut self, path: &Path, config: &[u8]) -> FFIResult<Option<NativeTransformFFI>> {
154 if !self.load_transform_library(path)? {
155 return Ok(None);
156 }
157
158 let descriptor = {
159 let library = self.loaded_libraries.get(path).unwrap();
160 self.get_descriptor(library)?
161 };
162
163 self.validate_and_register(&descriptor, path)?;
164
165 let library = self.loaded_libraries.get(path).unwrap();
166 let create_fn: TransformCreateFnFFI = unsafe {
167 let create_symbol: Symbol<TransformCreateFnFFI> = library
168 .get(b"ffi_transform_create\0")
169 .map_err(|e| FFIError::Other(format!("Failed to find ffi_transform_create: {}", e)))?;
170
171 *create_symbol
172 };
173
174 let instance = create_fn(config.as_ptr(), config.len());
175 if instance.is_null() {
176 return Err(FFIError::Other("Failed to create transform instance".to_string()));
177 }
178
179 Ok(Some(NativeTransformFFI::new(descriptor, instance)))
180 }
181
182 pub fn create_transform_by_name(&mut self, name: &str, config: &[u8]) -> FFIResult<NativeTransformFFI> {
184 let path = self
185 .transform_paths
186 .get(name)
187 .ok_or_else(|| FFIError::Other(format!("Transform not found: {}", name)))?
188 .clone();
189
190 self.load_transform(&path, config)?
191 .ok_or_else(|| FFIError::Other(format!("Transform library no longer valid: {}", name)))
192 }
193
194 pub fn has_transform(&self, name: &str) -> bool {
196 self.transform_paths.contains_key(name)
197 }
198}
199
200#[derive(Debug, Clone)]
202pub struct LoadedTransformInfo {
203 pub name: String,
204 pub library_path: PathBuf,
205 pub api: u32,
206 pub version: String,
207 pub description: String,
208}
209
210impl Default for TransformLoader {
211 fn default() -> Self {
212 Self::new()
213 }
214}
215
216impl Drop for TransformLoader {
217 fn drop(&mut self) {
218 self.loaded_libraries.clear();
219 }
220}
221
222pub fn load_transforms_from_dir(dir: &Path) -> reifydb_type::Result<Transforms> {
225 let loader = ffi_transform_loader();
226 let mut loader_guard = loader.write().unwrap();
227
228 let mut names = Vec::new();
229
230 let entries = std::fs::read_dir(dir).map_err(|e| {
231 reifydb_sdk::error::FFIError::Other(format!("Failed to read directory {}: {}", dir.display(), e))
232 })?;
233
234 for entry in entries {
235 let entry = entry.map_err(|e| {
236 reifydb_sdk::error::FFIError::Other(format!("Failed to read directory entry: {}", e))
237 })?;
238 let path = entry.path();
239 let ext = path.extension().and_then(|s| s.to_str());
240
241 if ext == Some("so") || ext == Some("dylib") {
242 match loader_guard.register_transform(&path) {
243 Ok(Some(info)) => {
244 names.push(info.name);
245 }
246 Ok(None) => {
247 }
249 Err(e) => {
250 eprintln!(
251 "Warning: Failed to register transform from {}: {}",
252 path.display(),
253 e
254 );
255 }
256 }
257 }
258 }
259
260 drop(loader_guard);
261
262 let mut builder = Transforms::builder();
263 for name in names {
264 let name_clone = name.clone();
265 builder = builder.register(&name, move || {
266 let loader = ffi_transform_loader();
267 let mut loader_guard = loader.write().unwrap();
268 loader_guard.create_transform_by_name(&name_clone, &[]).unwrap()
269 });
270 }
271
272 Ok(builder.build())
273}