1use std::ffi::c_void;
27use std::sync::Arc;
28
29use libloading::Library;
30use serde::de::DeserializeOwned;
31use serde::Serialize;
32
33use fidius_core::descriptor::{BufferStrategyKind, PluginDescriptor};
34use fidius_core::status::*;
35use fidius_core::wire;
36use fidius_core::PluginError;
37
38use crate::arena::{acquire_arena, grow_arena, release_arena, DEFAULT_ARENA_CAPACITY};
39use crate::error::{CallError, LoadError};
40use crate::executor::PluginExecutor;
41use crate::types::PluginInfo;
42
43type FfiFn = unsafe extern "C" fn(*mut c_void, *const u8, u32, *mut *mut u8, *mut u32) -> i32;
46
47type ArenaFn =
49 unsafe extern "C" fn(*mut c_void, *const u8, u32, *mut u8, u32, *mut u32, *mut u32) -> i32;
50
51unsafe fn construct_instance(descriptor: *const PluginDescriptor, cfg: &[u8]) -> *mut c_void {
57 match (*descriptor).construct {
58 Some(ctor) => ctor(cfg.as_ptr(), cfg.len() as u32),
59 None => std::ptr::null_mut(),
60 }
61}
62
63pub struct CdylibExecutor {
72 _library: Option<Arc<Library>>,
76 vtable: *const c_void,
78 descriptor: *const PluginDescriptor,
82 free_buffer: Option<unsafe extern "C" fn(*mut u8, usize)>,
84 capabilities: u64,
86 method_count: u32,
88 info: PluginInfo,
90 instance: *mut c_void,
94 destroy: Option<unsafe extern "C" fn(*mut c_void)>,
96}
97
98unsafe impl Send for CdylibExecutor {}
107unsafe impl Sync for CdylibExecutor {}
108
109impl Drop for CdylibExecutor {
110 fn drop(&mut self) {
111 if let Some(destroy) = self.destroy {
113 if !self.instance.is_null() {
114 unsafe { destroy(self.instance) };
115 }
116 }
117 }
118}
119
120impl CdylibExecutor {
121 #[allow(dead_code)]
123 pub(crate) fn new(
124 library: Arc<Library>,
125 vtable: *const c_void,
126 descriptor: *const PluginDescriptor,
127 free_buffer: Option<unsafe extern "C" fn(*mut u8, usize)>,
128 capabilities: u64,
129 method_count: u32,
130 info: PluginInfo,
131 ) -> Self {
132 let instance = unsafe { construct_instance(descriptor, &[]) };
133 let destroy = unsafe { (*descriptor).destroy };
134 Self {
135 _library: Some(library),
136 vtable,
137 descriptor,
138 free_buffer,
139 capabilities,
140 method_count,
141 info,
142 instance,
143 destroy,
144 }
145 }
146
147 pub fn from_loaded(plugin: crate::loader::LoadedPlugin) -> Self {
149 let instance = unsafe { construct_instance(plugin.descriptor, &[]) };
150 let destroy = unsafe { (*plugin.descriptor).destroy };
151 Self {
152 _library: Some(plugin.library),
153 vtable: plugin.vtable,
154 descriptor: plugin.descriptor,
155 free_buffer: plugin.free_buffer,
156 capabilities: plugin.info.capabilities,
157 method_count: plugin.method_count,
158 info: plugin.info,
159 instance,
160 destroy,
161 }
162 }
163
164 pub fn from_descriptor(desc: &'static PluginDescriptor) -> Result<Self, LoadError> {
172 Self::from_descriptor_with_config(desc, &[])
173 }
174
175 pub fn from_descriptor_with_config(
179 desc: &'static PluginDescriptor,
180 cfg: &[u8],
181 ) -> Result<Self, LoadError> {
182 let info = PluginInfo {
183 name: unsafe { desc.plugin_name_str() }.to_string(),
184 interface_name: unsafe { desc.interface_name_str() }.to_string(),
185 interface_hash: desc.interface_hash,
186 interface_version: desc.interface_version,
187 capabilities: desc.capabilities,
188 buffer_strategy: desc
189 .buffer_strategy_kind()
190 .map_err(|v| LoadError::UnknownBufferStrategy { value: v })?,
191 runtime: crate::types::PluginRuntimeKind::Cdylib,
192 };
193 let descriptor = desc as *const PluginDescriptor;
194 let instance = unsafe { construct_instance(descriptor, cfg) };
195 Ok(Self {
196 _library: None,
197 vtable: desc.vtable,
198 descriptor,
199 free_buffer: desc.free_buffer,
200 capabilities: desc.capabilities,
201 method_count: desc.method_count,
202 info,
203 instance,
204 destroy: desc.destroy,
205 })
206 }
207
208 pub fn find_in_process_descriptor(
215 plugin_name: &str,
216 ) -> Result<&'static PluginDescriptor, LoadError> {
217 let reg = fidius_core::registry::get_registry();
218 for i in 0..reg.plugin_count as usize {
219 let desc_ptr = unsafe { *reg.descriptors.add(i) };
220 let desc = unsafe { &*desc_ptr };
221 if unsafe { desc.plugin_name_str() } == plugin_name {
222 return Ok(desc);
223 }
224 }
225 Err(LoadError::PluginNotFound {
226 name: plugin_name.to_string(),
227 })
228 }
229
230 pub fn call_method<I: Serialize, O: DeserializeOwned>(
247 &self,
248 index: usize,
249 input: &I,
250 ) -> Result<O, CallError> {
251 if index >= self.method_count as usize {
253 return Err(CallError::InvalidMethodIndex {
254 index,
255 count: self.method_count,
256 });
257 }
258
259 let input_bytes =
260 wire::serialize(input).map_err(|e| CallError::Serialization(e.to_string()))?;
261
262 match self.info.buffer_strategy {
263 BufferStrategyKind::PluginAllocated => self.call_plugin_allocated(index, &input_bytes),
264 BufferStrategyKind::Arena => self.call_arena(index, &input_bytes),
265 }
266 }
267
268 pub fn call_method_raw(&self, index: usize, input: &[u8]) -> Result<Vec<u8>, CallError> {
278 if index >= self.method_count as usize {
279 return Err(CallError::InvalidMethodIndex {
280 index,
281 count: self.method_count,
282 });
283 }
284 match self.info.buffer_strategy {
285 BufferStrategyKind::PluginAllocated => self.call_plugin_allocated_raw(index, input),
286 BufferStrategyKind::Arena => self.call_arena_raw(index, input),
287 }
288 }
289
290 fn call_plugin_allocated<O: DeserializeOwned>(
293 &self,
294 index: usize,
295 input_bytes: &[u8],
296 ) -> Result<O, CallError> {
297 let fn_ptr = match unsafe { *(self.vtable as *const Option<FfiFn>).add(index) } {
300 Some(f) => f,
301 None => return Err(CallError::NotImplemented { bit: index as u32 }),
302 };
303
304 let mut out_ptr: *mut u8 = std::ptr::null_mut();
305 let mut out_len: u32 = 0;
306
307 let status = unsafe {
308 fn_ptr(
309 self.instance,
310 input_bytes.as_ptr(),
311 input_bytes.len() as u32,
312 &mut out_ptr,
313 &mut out_len,
314 )
315 };
316
317 match status {
318 STATUS_OK => {}
319 STATUS_BUFFER_TOO_SMALL => return Err(CallError::BufferTooSmall),
320 STATUS_SERIALIZATION_ERROR => {
321 return Err(CallError::Serialization("FFI serialization failed".into()))
322 }
323 STATUS_PLUGIN_ERROR => {
324 if !out_ptr.is_null() && out_len > 0 {
325 let output_slice =
326 unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
327 let plugin_err: PluginError = wire::deserialize(output_slice)
328 .map_err(|e| CallError::Deserialization(e.to_string()))?;
329
330 if let Some(free) = self.free_buffer {
331 unsafe { free(out_ptr, out_len as usize) };
332 }
333
334 return Err(CallError::Plugin(plugin_err));
335 }
336 return Err(CallError::Plugin(PluginError::new(
337 "UNKNOWN",
338 "plugin returned error but no error data",
339 )));
340 }
341 STATUS_PANIC => {
342 let msg = if !out_ptr.is_null() && out_len > 0 {
343 let slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
344 let msg = wire::deserialize::<String>(slice)
345 .unwrap_or_else(|_| "unknown panic".into());
346 if let Some(free) = self.free_buffer {
347 unsafe { free(out_ptr, out_len as usize) };
348 }
349 msg
350 } else {
351 "unknown panic".into()
352 };
353 return Err(CallError::Panic(msg));
354 }
355 _ => return Err(CallError::UnknownStatus { code: status }),
356 }
357
358 if out_ptr.is_null() {
359 return Err(CallError::Serialization(
360 "plugin returned null output buffer".into(),
361 ));
362 }
363
364 let output_slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
365 let result: Result<O, CallError> =
366 wire::deserialize(output_slice).map_err(|e| CallError::Deserialization(e.to_string()));
367
368 if let Some(free) = self.free_buffer {
369 unsafe { free(out_ptr, out_len as usize) };
370 }
371
372 result
373 }
374
375 fn call_arena<O: DeserializeOwned>(
380 &self,
381 index: usize,
382 input_bytes: &[u8],
383 ) -> Result<O, CallError> {
384 let fn_ptr = match unsafe { *(self.vtable as *const Option<ArenaFn>).add(index) } {
385 Some(f) => f,
386 None => return Err(CallError::NotImplemented { bit: index as u32 }),
387 };
388
389 let mut arena = acquire_arena(DEFAULT_ARENA_CAPACITY);
390 let mut out_offset: u32 = 0;
391 let mut out_len: u32 = 0;
392 let mut retried = false;
393
394 let status = loop {
395 let s = unsafe {
396 fn_ptr(
397 self.instance,
398 input_bytes.as_ptr(),
399 input_bytes.len() as u32,
400 arena.as_mut_ptr(),
401 arena.len() as u32,
402 &mut out_offset,
403 &mut out_len,
404 )
405 };
406 if s == STATUS_BUFFER_TOO_SMALL && !retried {
407 let needed = out_len as usize;
409 grow_arena(&mut arena, needed);
410 retried = true;
411 continue;
412 }
413 break s;
414 };
415
416 match status {
417 STATUS_OK => {
418 let start = out_offset as usize;
419 let end = start + out_len as usize;
420 if end > arena.len() {
421 release_arena(arena);
422 return Err(CallError::Serialization(
423 "plugin reported out_offset/out_len outside arena".into(),
424 ));
425 }
426 let result = wire::deserialize(&arena[start..end])
427 .map_err(|e| CallError::Deserialization(e.to_string()));
428 release_arena(arena);
429 result
430 }
431 STATUS_BUFFER_TOO_SMALL => {
432 release_arena(arena);
433 Err(CallError::BufferTooSmall)
434 }
435 STATUS_SERIALIZATION_ERROR => {
436 release_arena(arena);
437 Err(CallError::Serialization("FFI serialization failed".into()))
438 }
439 STATUS_PLUGIN_ERROR => {
440 let start = out_offset as usize;
441 let end = start + out_len as usize;
442 let plugin_err = if out_len > 0 && end <= arena.len() {
443 wire::deserialize::<PluginError>(&arena[start..end]).unwrap_or_else(|_| {
444 PluginError::new("UNKNOWN", "plugin returned malformed error")
445 })
446 } else {
447 PluginError::new("UNKNOWN", "plugin returned error but no error data")
448 };
449 release_arena(arena);
450 Err(CallError::Plugin(plugin_err))
451 }
452 STATUS_PANIC => {
453 release_arena(arena);
457 Err(CallError::Panic(
458 "plugin panicked (message not transmitted via Arena strategy)".into(),
459 ))
460 }
461 code => {
462 release_arena(arena);
463 Err(CallError::UnknownStatus { code })
464 }
465 }
466 }
467
468 fn call_plugin_allocated_raw(
472 &self,
473 index: usize,
474 input_bytes: &[u8],
475 ) -> Result<Vec<u8>, CallError> {
476 let fn_ptr = match unsafe { *(self.vtable as *const Option<FfiFn>).add(index) } {
479 Some(f) => f,
480 None => return Err(CallError::NotImplemented { bit: index as u32 }),
481 };
482
483 let mut out_ptr: *mut u8 = std::ptr::null_mut();
484 let mut out_len: u32 = 0;
485
486 let status = unsafe {
487 fn_ptr(
488 self.instance,
489 input_bytes.as_ptr(),
490 input_bytes.len() as u32,
491 &mut out_ptr,
492 &mut out_len,
493 )
494 };
495
496 match status {
497 STATUS_OK => {}
498 STATUS_BUFFER_TOO_SMALL => return Err(CallError::BufferTooSmall),
499 STATUS_SERIALIZATION_ERROR => {
500 return Err(CallError::Serialization("FFI serialization failed".into()))
501 }
502 STATUS_PLUGIN_ERROR => {
503 if !out_ptr.is_null() && out_len > 0 {
504 let output_slice =
505 unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
506 let plugin_err: PluginError = wire::deserialize(output_slice)
507 .map_err(|e| CallError::Deserialization(e.to_string()))?;
508 if let Some(free) = self.free_buffer {
509 unsafe { free(out_ptr, out_len as usize) };
510 }
511 return Err(CallError::Plugin(plugin_err));
512 }
513 return Err(CallError::Plugin(PluginError::new(
514 "UNKNOWN",
515 "plugin returned error but no error data",
516 )));
517 }
518 STATUS_PANIC => {
519 let msg = if !out_ptr.is_null() && out_len > 0 {
520 let slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
521 let msg = wire::deserialize::<String>(slice)
522 .unwrap_or_else(|_| "unknown panic".into());
523 if let Some(free) = self.free_buffer {
524 unsafe { free(out_ptr, out_len as usize) };
525 }
526 msg
527 } else {
528 "unknown panic".into()
529 };
530 return Err(CallError::Panic(msg));
531 }
532 _ => return Err(CallError::UnknownStatus { code: status }),
533 }
534
535 if out_ptr.is_null() {
536 return Err(CallError::Serialization(
537 "plugin returned null output buffer".into(),
538 ));
539 }
540
541 let output_slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
545 let result = output_slice.to_vec();
546
547 if let Some(free) = self.free_buffer {
548 unsafe { free(out_ptr, out_len as usize) };
549 }
550
551 Ok(result)
552 }
553
554 #[cfg(feature = "streaming")]
564 pub unsafe fn call_client_streaming_raw(
565 &self,
566 index: usize,
567 handle: *mut fidius_core::stream_ffi::FidiusStreamHandle,
568 input_bytes: &[u8],
569 ) -> Result<Vec<u8>, CallError> {
570 if index >= self.method_count as usize {
571 return Err(CallError::InvalidMethodIndex {
572 index,
573 count: self.method_count,
574 });
575 }
576 type ClientStreamFn = unsafe extern "C" fn(
577 *mut c_void,
578 *mut fidius_core::stream_ffi::FidiusStreamHandle,
579 *const u8,
580 u32,
581 *mut *mut u8,
582 *mut u32,
583 ) -> i32;
584 let fn_ptr = match unsafe { *(self.vtable as *const Option<ClientStreamFn>).add(index) } {
585 Some(f) => f,
586 None => return Err(CallError::NotImplemented { bit: index as u32 }),
587 };
588
589 let mut out_ptr: *mut u8 = std::ptr::null_mut();
590 let mut out_len: u32 = 0;
591 let status = unsafe {
592 fn_ptr(
593 self.instance,
594 handle,
595 input_bytes.as_ptr(),
596 input_bytes.len() as u32,
597 &mut out_ptr,
598 &mut out_len,
599 )
600 };
601
602 match status {
603 STATUS_OK => {}
604 STATUS_SERIALIZATION_ERROR => {
605 return Err(CallError::Serialization("FFI serialization failed".into()))
606 }
607 STATUS_PLUGIN_ERROR => {
608 let err = if !out_ptr.is_null() && out_len > 0 {
609 let slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
610 let pe: PluginError = wire::deserialize(slice)
611 .unwrap_or_else(|_| PluginError::new("UNKNOWN", "plugin error"));
612 if let Some(free) = self.free_buffer {
613 unsafe { free(out_ptr, out_len as usize) };
614 }
615 pe
616 } else {
617 PluginError::new("UNKNOWN", "plugin returned error but no data")
618 };
619 return Err(CallError::Plugin(err));
620 }
621 STATUS_PANIC => {
622 let msg = if !out_ptr.is_null() && out_len > 0 {
623 let slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
624 let m = wire::deserialize::<String>(slice)
625 .unwrap_or_else(|_| "unknown panic".into());
626 if let Some(free) = self.free_buffer {
627 unsafe { free(out_ptr, out_len as usize) };
628 }
629 m
630 } else {
631 "unknown panic".into()
632 };
633 return Err(CallError::Panic(msg));
634 }
635 _ => return Err(CallError::UnknownStatus { code: status }),
636 }
637
638 if out_ptr.is_null() {
639 return Err(CallError::Serialization(
640 "plugin returned null output buffer".into(),
641 ));
642 }
643 let result = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) }.to_vec();
644 if let Some(free) = self.free_buffer {
645 unsafe { free(out_ptr, out_len as usize) };
646 }
647 Ok(result)
648 }
649
650 fn call_arena_raw(&self, index: usize, input_bytes: &[u8]) -> Result<Vec<u8>, CallError> {
653 let fn_ptr = match unsafe { *(self.vtable as *const Option<ArenaFn>).add(index) } {
654 Some(f) => f,
655 None => return Err(CallError::NotImplemented { bit: index as u32 }),
656 };
657
658 let mut arena = acquire_arena(DEFAULT_ARENA_CAPACITY);
659 let mut out_offset: u32 = 0;
660 let mut out_len: u32 = 0;
661 let mut retried = false;
662
663 let status = loop {
664 let s = unsafe {
665 fn_ptr(
666 self.instance,
667 input_bytes.as_ptr(),
668 input_bytes.len() as u32,
669 arena.as_mut_ptr(),
670 arena.len() as u32,
671 &mut out_offset,
672 &mut out_len,
673 )
674 };
675 if s == STATUS_BUFFER_TOO_SMALL && !retried {
676 let needed = out_len as usize;
677 grow_arena(&mut arena, needed);
678 retried = true;
679 continue;
680 }
681 break s;
682 };
683
684 match status {
685 STATUS_OK => {
686 let start = out_offset as usize;
687 let end = start + out_len as usize;
688 if end > arena.len() {
689 release_arena(arena);
690 return Err(CallError::Serialization(
691 "plugin reported out_offset/out_len outside arena".into(),
692 ));
693 }
694 let result = arena[start..end].to_vec();
695 release_arena(arena);
696 Ok(result)
697 }
698 STATUS_BUFFER_TOO_SMALL => {
699 release_arena(arena);
700 Err(CallError::BufferTooSmall)
701 }
702 STATUS_SERIALIZATION_ERROR => {
703 release_arena(arena);
704 Err(CallError::Serialization("FFI serialization failed".into()))
705 }
706 STATUS_PLUGIN_ERROR => {
707 let start = out_offset as usize;
708 let end = start + out_len as usize;
709 let plugin_err = if out_len > 0 && end <= arena.len() {
710 wire::deserialize::<PluginError>(&arena[start..end]).unwrap_or_else(|_| {
711 PluginError::new("UNKNOWN", "plugin returned malformed error")
712 })
713 } else {
714 PluginError::new("UNKNOWN", "plugin returned error but no error data")
715 };
716 release_arena(arena);
717 Err(CallError::Plugin(plugin_err))
718 }
719 STATUS_PANIC => {
720 release_arena(arena);
721 Err(CallError::Panic(
722 "plugin panicked (message not transmitted via Arena strategy)".into(),
723 ))
724 }
725 code => {
726 release_arena(arena);
727 Err(CallError::UnknownStatus { code })
728 }
729 }
730 }
731
732 #[cfg(feature = "streaming")]
747 pub fn call_streaming_raw(
748 &self,
749 index: usize,
750 input_bytes: &[u8],
751 decode_item: fn(&[u8]) -> Result<fidius_core::Value, CallError>,
752 ) -> Result<crate::stream::ChunkStream, CallError> {
753 if index >= self.method_count as usize {
754 return Err(CallError::InvalidMethodIndex {
755 index,
756 count: self.method_count,
757 });
758 }
759
760 let init = match unsafe { *(self.vtable as *const Option<FfiFn>).add(index) } {
763 Some(f) => f,
764 None => return Err(CallError::NotImplemented { bit: index as u32 }),
765 };
766 let mut out_ptr: *mut u8 = std::ptr::null_mut();
767 let mut out_len: u32 = 0;
768 let status = unsafe {
769 init(
770 self.instance,
771 input_bytes.as_ptr(),
772 input_bytes.len() as u32,
773 &mut out_ptr,
774 &mut out_len,
775 )
776 };
777 match status {
778 STATUS_OK => {}
779 STATUS_SERIALIZATION_ERROR => {
780 return Err(CallError::Serialization(
781 "stream init: argument decode failed".into(),
782 ))
783 }
784 STATUS_PANIC => return Err(CallError::Panic("plugin panicked in stream init".into())),
785 code => return Err(CallError::UnknownStatus { code }),
786 }
787 if out_ptr.is_null() {
788 return Err(CallError::Backend {
789 runtime: "cdylib".into(),
790 message: "stream init returned a null handle".into(),
791 });
792 }
793
794 Ok(pump_stream_handle(out_ptr, decode_item))
797 }
798
799 #[cfg(feature = "streaming")]
810 pub unsafe fn call_bidi_streaming_raw(
811 &self,
812 index: usize,
813 handle: *mut fidius_core::stream_ffi::FidiusStreamHandle,
814 input_bytes: &[u8],
815 decode_item: fn(&[u8]) -> Result<fidius_core::Value, CallError>,
816 ) -> Result<crate::stream::ChunkStream, CallError> {
817 if index >= self.method_count as usize {
818 return Err(CallError::InvalidMethodIndex {
819 index,
820 count: self.method_count,
821 });
822 }
823 type ClientStreamFn = unsafe extern "C" fn(
825 *mut c_void,
826 *mut fidius_core::stream_ffi::FidiusStreamHandle,
827 *const u8,
828 u32,
829 *mut *mut u8,
830 *mut u32,
831 ) -> i32;
832 let init = match unsafe { *(self.vtable as *const Option<ClientStreamFn>).add(index) } {
833 Some(f) => f,
834 None => return Err(CallError::NotImplemented { bit: index as u32 }),
835 };
836 let mut out_ptr: *mut u8 = std::ptr::null_mut();
837 let mut out_len: u32 = 0;
838 let status = unsafe {
839 init(
840 self.instance,
841 handle,
842 input_bytes.as_ptr(),
843 input_bytes.len() as u32,
844 &mut out_ptr,
845 &mut out_len,
846 )
847 };
848 match status {
849 STATUS_OK => {}
850 STATUS_SERIALIZATION_ERROR => {
851 return Err(CallError::Serialization(
852 "bidi stream init: argument decode failed".into(),
853 ))
854 }
855 STATUS_PANIC => {
856 return Err(CallError::Panic(
857 "plugin panicked in bidi stream init".into(),
858 ))
859 }
860 code => return Err(CallError::UnknownStatus { code }),
861 }
862 if out_ptr.is_null() {
863 return Err(CallError::Backend {
864 runtime: "cdylib".into(),
865 message: "bidi stream init returned a null output handle".into(),
866 });
867 }
868 Ok(pump_stream_handle(out_ptr, decode_item))
869 }
870
871 pub fn has_capability(&self, bit: u32) -> bool {
875 if bit >= 64 {
876 return false;
877 }
878 self.capabilities & (1u64 << bit) != 0
879 }
880
881 pub fn info(&self) -> &PluginInfo {
883 &self.info
884 }
885
886 pub fn method_metadata(&self, method_id: u32) -> Vec<(&str, &str)> {
898 if method_id >= self.method_count {
899 return Vec::new();
900 }
901 let desc = unsafe { &*self.descriptor };
903 if desc.method_metadata.is_null() {
904 return Vec::new();
905 }
906 let entries =
909 unsafe { std::slice::from_raw_parts(desc.method_metadata, self.method_count as usize) };
910 let entry = &entries[method_id as usize];
911 if entry.kvs.is_null() || entry.kv_count == 0 {
912 return Vec::new();
913 }
914 let kvs = unsafe { std::slice::from_raw_parts(entry.kvs, entry.kv_count as usize) };
916 kvs.iter()
917 .map(|kv| {
918 let k = unsafe { std::ffi::CStr::from_ptr(kv.key) }
921 .to_str()
922 .expect("metadata key is not valid UTF-8");
923 let v = unsafe { std::ffi::CStr::from_ptr(kv.value) }
924 .to_str()
925 .expect("metadata value is not valid UTF-8");
926 (k, v)
927 })
928 .collect()
929 }
930
931 pub fn trait_metadata(&self) -> Vec<(&str, &str)> {
936 let desc = unsafe { &*self.descriptor };
938 if desc.trait_metadata.is_null() || desc.trait_metadata_count == 0 {
939 return Vec::new();
940 }
941 let kvs = unsafe {
943 std::slice::from_raw_parts(desc.trait_metadata, desc.trait_metadata_count as usize)
944 };
945 kvs.iter()
946 .map(|kv| {
947 let k = unsafe { std::ffi::CStr::from_ptr(kv.key) }
948 .to_str()
949 .expect("trait metadata key is not valid UTF-8");
950 let v = unsafe { std::ffi::CStr::from_ptr(kv.value) }
951 .to_str()
952 .expect("trait metadata value is not valid UTF-8");
953 (k, v)
954 })
955 .collect()
956 }
957}
958
959impl PluginExecutor for CdylibExecutor {
960 fn info(&self) -> &PluginInfo {
961 &self.info
962 }
963
964 fn method_count(&self) -> u32 {
965 self.method_count
966 }
967
968 fn call_raw(&self, method: usize, input: &[u8]) -> Result<Vec<u8>, CallError> {
973 self.call_method_raw(method, input)
974 }
975}
976
977#[cfg(feature = "streaming")]
986fn pump_stream_handle(
987 out_ptr: *mut u8,
988 decode_item: fn(&[u8]) -> Result<fidius_core::Value, CallError>,
989) -> crate::stream::ChunkStream {
990 use fidius_core::stream_ffi::FidiusStreamHandle;
991 use fidius_core::Value;
992
993 const STREAM_CHANNEL_CAP: usize = 4;
996
997 struct SendHandle(*mut FidiusStreamHandle);
1000 unsafe impl Send for SendHandle {}
1001 let send_handle = SendHandle(out_ptr as *mut FidiusStreamHandle);
1002
1003 let (tx, rx) = tokio::sync::mpsc::channel::<Result<Value, CallError>>(STREAM_CHANNEL_CAP);
1004
1005 std::thread::spawn(move || {
1006 let send_handle = send_handle;
1009 let handle = send_handle.0;
1010
1011 const INITIAL_ITEM_CAP: usize = 64;
1016 let mut buf = vec![0u8; INITIAL_ITEM_CAP];
1017
1018 loop {
1019 let next = unsafe { (*handle).next };
1020 let mut out_len: u32 = 0;
1021 let mut status =
1022 unsafe { next(handle, buf.as_mut_ptr(), buf.len() as u32, &mut out_len) };
1023 if status == STATUS_BUFFER_TOO_SMALL {
1024 buf.resize(out_len as usize, 0);
1027 status = unsafe { next(handle, buf.as_mut_ptr(), buf.len() as u32, &mut out_len) };
1028 }
1029 match status {
1030 STATUS_OK => {
1031 let item = decode_item(&buf[..out_len as usize]);
1032 let is_err = item.is_err();
1033 if tx.blocking_send(item).is_err() {
1034 break; }
1036 if is_err {
1037 break;
1038 }
1039 }
1040 STATUS_STREAM_END => break,
1041 STATUS_PLUGIN_ERROR => {
1042 let pe = if out_len > 0 {
1043 wire::deserialize::<PluginError>(&buf[..out_len as usize]).unwrap_or_else(
1044 |_| PluginError::new("UNKNOWN", "malformed stream error"),
1045 )
1046 } else {
1047 PluginError::new("UNKNOWN", "stream error without data")
1048 };
1049 let _ = tx.blocking_send(Err(CallError::Plugin(pe)));
1050 break;
1051 }
1052 STATUS_BUFFER_TOO_SMALL => {
1053 let _ = tx.blocking_send(Err(CallError::BufferTooSmall));
1055 break;
1056 }
1057 STATUS_PANIC => {
1058 let _ = tx.blocking_send(Err(CallError::Panic(
1059 "plugin panicked in stream next".into(),
1060 )));
1061 break;
1062 }
1063 code => {
1064 let _ = tx.blocking_send(Err(CallError::UnknownStatus { code }));
1065 break;
1066 }
1067 }
1068 }
1069 unsafe {
1071 let drop_fn = (*handle).drop_fn;
1072 drop_fn(handle);
1073 }
1074 });
1075
1076 let body = futures::stream::unfold(rx, |mut rx| async move {
1077 rx.recv().await.map(|item| (item, rx))
1078 });
1079 crate::stream::ChunkStream::new(body)
1080}