1use std::ffi::c_void;
27use std::sync::Arc;
28
29use libloading::Library;
30use serde::de::DeserializeOwned;
31use serde::Serialize;
32
33use fidius_core::descriptor::{BufferStrategyKind, PluginDescriptor};
34use fidius_core::status::*;
35use fidius_core::wire;
36use fidius_core::PluginError;
37
38use crate::arena::{acquire_arena, grow_arena, release_arena, DEFAULT_ARENA_CAPACITY};
39use crate::error::{CallError, LoadError};
40use crate::executor::PluginExecutor;
41use crate::types::PluginInfo;
42
43type FfiFn = unsafe extern "C" fn(*const u8, u32, *mut *mut u8, *mut u32) -> i32;
45
46type ArenaFn = unsafe extern "C" fn(*const u8, u32, *mut u8, u32, *mut u32, *mut u32) -> i32;
48
49pub struct CdylibExecutor {
58 _library: Option<Arc<Library>>,
62 vtable: *const c_void,
64 descriptor: *const PluginDescriptor,
68 free_buffer: Option<unsafe extern "C" fn(*mut u8, usize)>,
70 capabilities: u64,
72 method_count: u32,
74 info: PluginInfo,
76}
77
78unsafe impl Send for CdylibExecutor {}
87unsafe impl Sync for CdylibExecutor {}
88
89impl CdylibExecutor {
90 #[allow(dead_code)]
92 pub(crate) fn new(
93 library: Arc<Library>,
94 vtable: *const c_void,
95 descriptor: *const PluginDescriptor,
96 free_buffer: Option<unsafe extern "C" fn(*mut u8, usize)>,
97 capabilities: u64,
98 method_count: u32,
99 info: PluginInfo,
100 ) -> Self {
101 Self {
102 _library: Some(library),
103 vtable,
104 descriptor,
105 free_buffer,
106 capabilities,
107 method_count,
108 info,
109 }
110 }
111
112 pub fn from_loaded(plugin: crate::loader::LoadedPlugin) -> Self {
114 Self {
115 _library: Some(plugin.library),
116 vtable: plugin.vtable,
117 descriptor: plugin.descriptor,
118 free_buffer: plugin.free_buffer,
119 capabilities: plugin.info.capabilities,
120 method_count: plugin.method_count,
121 info: plugin.info,
122 }
123 }
124
125 pub fn from_descriptor(desc: &'static PluginDescriptor) -> Result<Self, LoadError> {
133 let info = PluginInfo {
134 name: unsafe { desc.plugin_name_str() }.to_string(),
135 interface_name: unsafe { desc.interface_name_str() }.to_string(),
136 interface_hash: desc.interface_hash,
137 interface_version: desc.interface_version,
138 capabilities: desc.capabilities,
139 buffer_strategy: desc
140 .buffer_strategy_kind()
141 .map_err(|v| LoadError::UnknownBufferStrategy { value: v })?,
142 runtime: crate::types::PluginRuntimeKind::Cdylib,
143 };
144 Ok(Self {
145 _library: None,
146 vtable: desc.vtable,
147 descriptor: desc as *const PluginDescriptor,
148 free_buffer: desc.free_buffer,
149 capabilities: desc.capabilities,
150 method_count: desc.method_count,
151 info,
152 })
153 }
154
155 pub fn find_in_process_descriptor(
162 plugin_name: &str,
163 ) -> Result<&'static PluginDescriptor, LoadError> {
164 let reg = fidius_core::registry::get_registry();
165 for i in 0..reg.plugin_count as usize {
166 let desc_ptr = unsafe { *reg.descriptors.add(i) };
167 let desc = unsafe { &*desc_ptr };
168 if unsafe { desc.plugin_name_str() } == plugin_name {
169 return Ok(desc);
170 }
171 }
172 Err(LoadError::PluginNotFound {
173 name: plugin_name.to_string(),
174 })
175 }
176
177 pub fn call_method<I: Serialize, O: DeserializeOwned>(
194 &self,
195 index: usize,
196 input: &I,
197 ) -> Result<O, CallError> {
198 if index >= self.method_count as usize {
200 return Err(CallError::InvalidMethodIndex {
201 index,
202 count: self.method_count,
203 });
204 }
205
206 let input_bytes =
207 wire::serialize(input).map_err(|e| CallError::Serialization(e.to_string()))?;
208
209 match self.info.buffer_strategy {
210 BufferStrategyKind::PluginAllocated => self.call_plugin_allocated(index, &input_bytes),
211 BufferStrategyKind::Arena => self.call_arena(index, &input_bytes),
212 }
213 }
214
215 pub fn call_method_raw(&self, index: usize, input: &[u8]) -> Result<Vec<u8>, CallError> {
225 if index >= self.method_count as usize {
226 return Err(CallError::InvalidMethodIndex {
227 index,
228 count: self.method_count,
229 });
230 }
231 match self.info.buffer_strategy {
232 BufferStrategyKind::PluginAllocated => self.call_plugin_allocated_raw(index, input),
233 BufferStrategyKind::Arena => self.call_arena_raw(index, input),
234 }
235 }
236
237 fn call_plugin_allocated<O: DeserializeOwned>(
240 &self,
241 index: usize,
242 input_bytes: &[u8],
243 ) -> Result<O, CallError> {
244 let fn_ptr = unsafe {
245 let fn_ptrs = self.vtable as *const FfiFn;
246 *fn_ptrs.add(index)
247 };
248
249 let mut out_ptr: *mut u8 = std::ptr::null_mut();
250 let mut out_len: u32 = 0;
251
252 let status = unsafe {
253 fn_ptr(
254 input_bytes.as_ptr(),
255 input_bytes.len() as u32,
256 &mut out_ptr,
257 &mut out_len,
258 )
259 };
260
261 match status {
262 STATUS_OK => {}
263 STATUS_BUFFER_TOO_SMALL => return Err(CallError::BufferTooSmall),
264 STATUS_SERIALIZATION_ERROR => {
265 return Err(CallError::Serialization("FFI serialization failed".into()))
266 }
267 STATUS_PLUGIN_ERROR => {
268 if !out_ptr.is_null() && out_len > 0 {
269 let output_slice =
270 unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
271 let plugin_err: PluginError = wire::deserialize(output_slice)
272 .map_err(|e| CallError::Deserialization(e.to_string()))?;
273
274 if let Some(free) = self.free_buffer {
275 unsafe { free(out_ptr, out_len as usize) };
276 }
277
278 return Err(CallError::Plugin(plugin_err));
279 }
280 return Err(CallError::Plugin(PluginError::new(
281 "UNKNOWN",
282 "plugin returned error but no error data",
283 )));
284 }
285 STATUS_PANIC => {
286 let msg = if !out_ptr.is_null() && out_len > 0 {
287 let slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
288 let msg = wire::deserialize::<String>(slice)
289 .unwrap_or_else(|_| "unknown panic".into());
290 if let Some(free) = self.free_buffer {
291 unsafe { free(out_ptr, out_len as usize) };
292 }
293 msg
294 } else {
295 "unknown panic".into()
296 };
297 return Err(CallError::Panic(msg));
298 }
299 _ => return Err(CallError::UnknownStatus { code: status }),
300 }
301
302 if out_ptr.is_null() {
303 return Err(CallError::Serialization(
304 "plugin returned null output buffer".into(),
305 ));
306 }
307
308 let output_slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
309 let result: Result<O, CallError> =
310 wire::deserialize(output_slice).map_err(|e| CallError::Deserialization(e.to_string()));
311
312 if let Some(free) = self.free_buffer {
313 unsafe { free(out_ptr, out_len as usize) };
314 }
315
316 result
317 }
318
319 fn call_arena<O: DeserializeOwned>(
324 &self,
325 index: usize,
326 input_bytes: &[u8],
327 ) -> Result<O, CallError> {
328 let fn_ptr = unsafe {
329 let fn_ptrs = self.vtable as *const ArenaFn;
330 *fn_ptrs.add(index)
331 };
332
333 let mut arena = acquire_arena(DEFAULT_ARENA_CAPACITY);
334 let mut out_offset: u32 = 0;
335 let mut out_len: u32 = 0;
336 let mut retried = false;
337
338 let status = loop {
339 let s = unsafe {
340 fn_ptr(
341 input_bytes.as_ptr(),
342 input_bytes.len() as u32,
343 arena.as_mut_ptr(),
344 arena.len() as u32,
345 &mut out_offset,
346 &mut out_len,
347 )
348 };
349 if s == STATUS_BUFFER_TOO_SMALL && !retried {
350 let needed = out_len as usize;
352 grow_arena(&mut arena, needed);
353 retried = true;
354 continue;
355 }
356 break s;
357 };
358
359 match status {
360 STATUS_OK => {
361 let start = out_offset as usize;
362 let end = start + out_len as usize;
363 if end > arena.len() {
364 release_arena(arena);
365 return Err(CallError::Serialization(
366 "plugin reported out_offset/out_len outside arena".into(),
367 ));
368 }
369 let result = wire::deserialize(&arena[start..end])
370 .map_err(|e| CallError::Deserialization(e.to_string()));
371 release_arena(arena);
372 result
373 }
374 STATUS_BUFFER_TOO_SMALL => {
375 release_arena(arena);
376 Err(CallError::BufferTooSmall)
377 }
378 STATUS_SERIALIZATION_ERROR => {
379 release_arena(arena);
380 Err(CallError::Serialization("FFI serialization failed".into()))
381 }
382 STATUS_PLUGIN_ERROR => {
383 let start = out_offset as usize;
384 let end = start + out_len as usize;
385 let plugin_err = if out_len > 0 && end <= arena.len() {
386 wire::deserialize::<PluginError>(&arena[start..end]).unwrap_or_else(|_| {
387 PluginError::new("UNKNOWN", "plugin returned malformed error")
388 })
389 } else {
390 PluginError::new("UNKNOWN", "plugin returned error but no error data")
391 };
392 release_arena(arena);
393 Err(CallError::Plugin(plugin_err))
394 }
395 STATUS_PANIC => {
396 release_arena(arena);
400 Err(CallError::Panic(
401 "plugin panicked (message not transmitted via Arena strategy)".into(),
402 ))
403 }
404 code => {
405 release_arena(arena);
406 Err(CallError::UnknownStatus { code })
407 }
408 }
409 }
410
411 fn call_plugin_allocated_raw(
415 &self,
416 index: usize,
417 input_bytes: &[u8],
418 ) -> Result<Vec<u8>, CallError> {
419 let fn_ptr = unsafe {
420 let fn_ptrs = self.vtable as *const FfiFn;
421 *fn_ptrs.add(index)
422 };
423
424 let mut out_ptr: *mut u8 = std::ptr::null_mut();
425 let mut out_len: u32 = 0;
426
427 let status = unsafe {
428 fn_ptr(
429 input_bytes.as_ptr(),
430 input_bytes.len() as u32,
431 &mut out_ptr,
432 &mut out_len,
433 )
434 };
435
436 match status {
437 STATUS_OK => {}
438 STATUS_BUFFER_TOO_SMALL => return Err(CallError::BufferTooSmall),
439 STATUS_SERIALIZATION_ERROR => {
440 return Err(CallError::Serialization("FFI serialization failed".into()))
441 }
442 STATUS_PLUGIN_ERROR => {
443 if !out_ptr.is_null() && out_len > 0 {
444 let output_slice =
445 unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
446 let plugin_err: PluginError = wire::deserialize(output_slice)
447 .map_err(|e| CallError::Deserialization(e.to_string()))?;
448 if let Some(free) = self.free_buffer {
449 unsafe { free(out_ptr, out_len as usize) };
450 }
451 return Err(CallError::Plugin(plugin_err));
452 }
453 return Err(CallError::Plugin(PluginError::new(
454 "UNKNOWN",
455 "plugin returned error but no error data",
456 )));
457 }
458 STATUS_PANIC => {
459 let msg = if !out_ptr.is_null() && out_len > 0 {
460 let slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
461 let msg = wire::deserialize::<String>(slice)
462 .unwrap_or_else(|_| "unknown panic".into());
463 if let Some(free) = self.free_buffer {
464 unsafe { free(out_ptr, out_len as usize) };
465 }
466 msg
467 } else {
468 "unknown panic".into()
469 };
470 return Err(CallError::Panic(msg));
471 }
472 _ => return Err(CallError::UnknownStatus { code: status }),
473 }
474
475 if out_ptr.is_null() {
476 return Err(CallError::Serialization(
477 "plugin returned null output buffer".into(),
478 ));
479 }
480
481 let output_slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
485 let result = output_slice.to_vec();
486
487 if let Some(free) = self.free_buffer {
488 unsafe { free(out_ptr, out_len as usize) };
489 }
490
491 Ok(result)
492 }
493
494 fn call_arena_raw(&self, index: usize, input_bytes: &[u8]) -> Result<Vec<u8>, CallError> {
497 let fn_ptr = unsafe {
498 let fn_ptrs = self.vtable as *const ArenaFn;
499 *fn_ptrs.add(index)
500 };
501
502 let mut arena = acquire_arena(DEFAULT_ARENA_CAPACITY);
503 let mut out_offset: u32 = 0;
504 let mut out_len: u32 = 0;
505 let mut retried = false;
506
507 let status = loop {
508 let s = unsafe {
509 fn_ptr(
510 input_bytes.as_ptr(),
511 input_bytes.len() as u32,
512 arena.as_mut_ptr(),
513 arena.len() as u32,
514 &mut out_offset,
515 &mut out_len,
516 )
517 };
518 if s == STATUS_BUFFER_TOO_SMALL && !retried {
519 let needed = out_len as usize;
520 grow_arena(&mut arena, needed);
521 retried = true;
522 continue;
523 }
524 break s;
525 };
526
527 match status {
528 STATUS_OK => {
529 let start = out_offset as usize;
530 let end = start + out_len as usize;
531 if end > arena.len() {
532 release_arena(arena);
533 return Err(CallError::Serialization(
534 "plugin reported out_offset/out_len outside arena".into(),
535 ));
536 }
537 let result = arena[start..end].to_vec();
538 release_arena(arena);
539 Ok(result)
540 }
541 STATUS_BUFFER_TOO_SMALL => {
542 release_arena(arena);
543 Err(CallError::BufferTooSmall)
544 }
545 STATUS_SERIALIZATION_ERROR => {
546 release_arena(arena);
547 Err(CallError::Serialization("FFI serialization failed".into()))
548 }
549 STATUS_PLUGIN_ERROR => {
550 let start = out_offset as usize;
551 let end = start + out_len as usize;
552 let plugin_err = if out_len > 0 && end <= arena.len() {
553 wire::deserialize::<PluginError>(&arena[start..end]).unwrap_or_else(|_| {
554 PluginError::new("UNKNOWN", "plugin returned malformed error")
555 })
556 } else {
557 PluginError::new("UNKNOWN", "plugin returned error but no error data")
558 };
559 release_arena(arena);
560 Err(CallError::Plugin(plugin_err))
561 }
562 STATUS_PANIC => {
563 release_arena(arena);
564 Err(CallError::Panic(
565 "plugin panicked (message not transmitted via Arena strategy)".into(),
566 ))
567 }
568 code => {
569 release_arena(arena);
570 Err(CallError::UnknownStatus { code })
571 }
572 }
573 }
574
575 #[cfg(feature = "streaming")]
590 pub fn call_streaming_raw(
591 &self,
592 index: usize,
593 input_bytes: &[u8],
594 decode_item: fn(&[u8]) -> Result<fidius_core::Value, CallError>,
595 ) -> Result<crate::stream::ChunkStream, CallError> {
596 use fidius_core::stream_ffi::FidiusStreamHandle;
597 use fidius_core::Value;
598
599 const STREAM_CHANNEL_CAP: usize = 4;
602
603 if index >= self.method_count as usize {
604 return Err(CallError::InvalidMethodIndex {
605 index,
606 count: self.method_count,
607 });
608 }
609
610 let init = unsafe { *(self.vtable as *const FfiFn).add(index) };
612 let mut out_ptr: *mut u8 = std::ptr::null_mut();
613 let mut out_len: u32 = 0;
614 let status = unsafe {
615 init(
616 input_bytes.as_ptr(),
617 input_bytes.len() as u32,
618 &mut out_ptr,
619 &mut out_len,
620 )
621 };
622 match status {
623 STATUS_OK => {}
624 STATUS_SERIALIZATION_ERROR => {
625 return Err(CallError::Serialization(
626 "stream init: argument decode failed".into(),
627 ))
628 }
629 STATUS_PANIC => return Err(CallError::Panic("plugin panicked in stream init".into())),
630 code => return Err(CallError::UnknownStatus { code }),
631 }
632 if out_ptr.is_null() {
633 return Err(CallError::Backend {
634 runtime: "cdylib".into(),
635 message: "stream init returned a null handle".into(),
636 });
637 }
638
639 struct SendHandle(*mut FidiusStreamHandle);
642 unsafe impl Send for SendHandle {}
643 let send_handle = SendHandle(out_ptr as *mut FidiusStreamHandle);
644
645 let (tx, rx) = tokio::sync::mpsc::channel::<Result<Value, CallError>>(STREAM_CHANNEL_CAP);
646
647 std::thread::spawn(move || {
648 let send_handle = send_handle;
651 let handle = send_handle.0;
652
653 const INITIAL_ITEM_CAP: usize = 64;
658 let mut buf = vec![0u8; INITIAL_ITEM_CAP];
659
660 loop {
661 let next = unsafe { (*handle).next };
662 let mut out_len: u32 = 0;
663 let mut status =
664 unsafe { next(handle, buf.as_mut_ptr(), buf.len() as u32, &mut out_len) };
665 if status == STATUS_BUFFER_TOO_SMALL {
666 buf.resize(out_len as usize, 0);
669 status =
670 unsafe { next(handle, buf.as_mut_ptr(), buf.len() as u32, &mut out_len) };
671 }
672 match status {
673 STATUS_OK => {
674 let item = decode_item(&buf[..out_len as usize]);
675 let is_err = item.is_err();
676 if tx.blocking_send(item).is_err() {
677 break; }
679 if is_err {
680 break;
681 }
682 }
683 STATUS_STREAM_END => break,
684 STATUS_PLUGIN_ERROR => {
685 let pe = if out_len > 0 {
686 wire::deserialize::<PluginError>(&buf[..out_len as usize])
687 .unwrap_or_else(|_| {
688 PluginError::new("UNKNOWN", "malformed stream error")
689 })
690 } else {
691 PluginError::new("UNKNOWN", "stream error without data")
692 };
693 let _ = tx.blocking_send(Err(CallError::Plugin(pe)));
694 break;
695 }
696 STATUS_BUFFER_TOO_SMALL => {
697 let _ = tx.blocking_send(Err(CallError::BufferTooSmall));
699 break;
700 }
701 STATUS_PANIC => {
702 let _ = tx.blocking_send(Err(CallError::Panic(
703 "plugin panicked in stream next".into(),
704 )));
705 break;
706 }
707 code => {
708 let _ = tx.blocking_send(Err(CallError::UnknownStatus { code }));
709 break;
710 }
711 }
712 }
713 unsafe {
715 let drop_fn = (*handle).drop_fn;
716 drop_fn(handle);
717 }
718 });
719
720 let body = futures::stream::unfold(rx, |mut rx| async move {
721 rx.recv().await.map(|item| (item, rx))
722 });
723 Ok(crate::stream::ChunkStream::new(body))
724 }
725
726 pub fn has_capability(&self, bit: u32) -> bool {
730 if bit >= 64 {
731 return false;
732 }
733 self.capabilities & (1u64 << bit) != 0
734 }
735
736 pub fn info(&self) -> &PluginInfo {
738 &self.info
739 }
740
741 pub fn method_metadata(&self, method_id: u32) -> Vec<(&str, &str)> {
753 if method_id >= self.method_count {
754 return Vec::new();
755 }
756 let desc = unsafe { &*self.descriptor };
758 if desc.method_metadata.is_null() {
759 return Vec::new();
760 }
761 let entries =
764 unsafe { std::slice::from_raw_parts(desc.method_metadata, self.method_count as usize) };
765 let entry = &entries[method_id as usize];
766 if entry.kvs.is_null() || entry.kv_count == 0 {
767 return Vec::new();
768 }
769 let kvs = unsafe { std::slice::from_raw_parts(entry.kvs, entry.kv_count as usize) };
771 kvs.iter()
772 .map(|kv| {
773 let k = unsafe { std::ffi::CStr::from_ptr(kv.key) }
776 .to_str()
777 .expect("metadata key is not valid UTF-8");
778 let v = unsafe { std::ffi::CStr::from_ptr(kv.value) }
779 .to_str()
780 .expect("metadata value is not valid UTF-8");
781 (k, v)
782 })
783 .collect()
784 }
785
786 pub fn trait_metadata(&self) -> Vec<(&str, &str)> {
791 let desc = unsafe { &*self.descriptor };
793 if desc.trait_metadata.is_null() || desc.trait_metadata_count == 0 {
794 return Vec::new();
795 }
796 let kvs = unsafe {
798 std::slice::from_raw_parts(desc.trait_metadata, desc.trait_metadata_count as usize)
799 };
800 kvs.iter()
801 .map(|kv| {
802 let k = unsafe { std::ffi::CStr::from_ptr(kv.key) }
803 .to_str()
804 .expect("trait metadata key is not valid UTF-8");
805 let v = unsafe { std::ffi::CStr::from_ptr(kv.value) }
806 .to_str()
807 .expect("trait metadata value is not valid UTF-8");
808 (k, v)
809 })
810 .collect()
811 }
812}
813
814impl PluginExecutor for CdylibExecutor {
815 fn info(&self) -> &PluginInfo {
816 &self.info
817 }
818
819 fn method_count(&self) -> u32 {
820 self.method_count
821 }
822
823 fn call_raw(&self, method: usize, input: &[u8]) -> Result<Vec<u8>, CallError> {
828 self.call_method_raw(method, input)
829 }
830}