Skip to main content

fidius_host/executor/
cdylib.rs

1// Copyright 2026 Colliery, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! `CdylibExecutor` — the cdylib execution backend: vtable/FFI dispatch with
16//! the bincode wire format.
17//!
18//! This is the original `PluginHandle` dispatch logic, moved behind the
19//! [`crate::executor::PluginExecutor`] seam (FIDIUS-I-0021). It keeps its own
20//! generic `call_method<I, O>` so the cdylib typed path serializes the concrete
21//! type with bincode **directly** — `Value` is never involved, so the bytes the
22//! plugin decodes are byte-identical to pre-refactor. The public-facing
23//! [`crate::handle::PluginHandle`] wraps this in an enum alongside the Python
24//! (and future WASM) backends.
25
26use std::ffi::c_void;
27use std::sync::Arc;
28
29use libloading::Library;
30use serde::de::DeserializeOwned;
31use serde::Serialize;
32
33use fidius_core::descriptor::{BufferStrategyKind, PluginDescriptor};
34use fidius_core::status::*;
35use fidius_core::wire;
36use fidius_core::PluginError;
37
38use crate::arena::{acquire_arena, grow_arena, release_arena, DEFAULT_ARENA_CAPACITY};
39use crate::error::{CallError, LoadError};
40use crate::executor::PluginExecutor;
41use crate::types::PluginInfo;
42
43/// Type alias for the PluginAllocated FFI function pointer signature.
44/// FIDIUS-A-0006: every method takes the instance pointer first.
45type FfiFn = unsafe extern "C" fn(*mut c_void, *const u8, u32, *mut *mut u8, *mut u32) -> i32;
46
47/// Type alias for the Arena FFI function pointer signature.
48type ArenaFn =
49    unsafe extern "C" fn(*mut c_void, *const u8, u32, *mut u8, u32, *mut u32, *mut u32) -> i32;
50
51/// Construct the plugin instance via the descriptor's `construct` (FIDIUS-A-0006).
52/// Empty config bytes = the zero-config / singleton case (CI.1; typed config is CI.2).
53///
54/// # Safety
55/// `descriptor` must point to a valid `PluginDescriptor`.
56unsafe fn construct_instance(descriptor: *const PluginDescriptor, cfg: &[u8]) -> *mut c_void {
57    match (*descriptor).construct {
58        Some(ctor) => ctor(cfg.as_ptr(), cfg.len() as u32),
59        None => std::ptr::null_mut(),
60    }
61}
62
63/// A handle to a loaded plugin, ready for calling methods.
64///
65/// Holds an `Arc<Library>` to keep the dylib loaded as long as any handle exists.
66/// Call methods via `call_method()` which handles serialization, FFI, and cleanup.
67///
68/// `CdylibExecutor` is `Send + Sync`. Plugin methods take `&self` (enforced by
69/// the macro), so concurrent calls from multiple threads are safe as long as
70/// the plugin implementation is thread-safe internally.
71pub struct CdylibExecutor {
72    /// Keeps the library alive for dylib-loaded plugins. `None` for in-process
73    /// handles built via [`CdylibExecutor::from_descriptor`] — in-process plugins
74    /// live in the current binary's address space and don't need Arc-tracking.
75    _library: Option<Arc<Library>>,
76    /// Pointer to the `#[repr(C)]` vtable struct in the loaded library.
77    vtable: *const c_void,
78    /// Pointer to the full descriptor in library memory. Used by metadata
79    /// accessors to read `method_metadata` / `trait_metadata`. Valid for the
80    /// handle's lifetime via `_library` Arc (or forever for in-process).
81    descriptor: *const PluginDescriptor,
82    /// Free function for plugin-allocated output buffers.
83    free_buffer: Option<unsafe extern "C" fn(*mut u8, usize)>,
84    /// Capability bitfield for optional method support.
85    capabilities: u64,
86    /// Total number of methods in the vtable.
87    method_count: u32,
88    /// Owned plugin metadata.
89    info: PluginInfo,
90    /// The plugin instance this handle owns (FIDIUS-A-0006), returned by the
91    /// descriptor's `construct` and passed to every vtable method. Freed via
92    /// `destroy` on drop. Null only for a malformed/legacy descriptor.
93    instance: *mut c_void,
94    /// Destructor for `instance` (from the descriptor).
95    destroy: Option<unsafe extern "C" fn(*mut c_void)>,
96}
97
98// SAFETY: CdylibExecutor is Send + Sync because:
99// - vtable and free_buffer are function pointers to static code in the loaded library
100// - Arc<Library> is Send + Sync and ensures the library stays loaded
101// - All access through call_method is read-only (no mutation of handle state)
102//
103// Plugin implementations must be thread-safe (&self methods, no &mut self)
104// if the CdylibExecutor is shared across threads. This is enforced at compile
105// time by the #[plugin_interface] macro which rejects &mut self methods.
106unsafe impl Send for CdylibExecutor {}
107unsafe impl Sync for CdylibExecutor {}
108
109impl Drop for CdylibExecutor {
110    fn drop(&mut self) {
111        // Release the plugin instance this handle owns (FIDIUS-A-0006).
112        if let Some(destroy) = self.destroy {
113            if !self.instance.is_null() {
114                unsafe { destroy(self.instance) };
115            }
116        }
117    }
118}
119
120impl CdylibExecutor {
121    /// Create a new CdylibExecutor. Crate-private — use `from_loaded()` instead.
122    #[allow(dead_code)]
123    pub(crate) fn new(
124        library: Arc<Library>,
125        vtable: *const c_void,
126        descriptor: *const PluginDescriptor,
127        free_buffer: Option<unsafe extern "C" fn(*mut u8, usize)>,
128        capabilities: u64,
129        method_count: u32,
130        info: PluginInfo,
131    ) -> Self {
132        let instance = unsafe { construct_instance(descriptor, &[]) };
133        let destroy = unsafe { (*descriptor).destroy };
134        Self {
135            _library: Some(library),
136            vtable,
137            descriptor,
138            free_buffer,
139            capabilities,
140            method_count,
141            info,
142            instance,
143            destroy,
144        }
145    }
146
147    /// Create a CdylibExecutor from a LoadedPlugin.
148    pub fn from_loaded(plugin: crate::loader::LoadedPlugin) -> Self {
149        let instance = unsafe { construct_instance(plugin.descriptor, &[]) };
150        let destroy = unsafe { (*plugin.descriptor).destroy };
151        Self {
152            _library: Some(plugin.library),
153            vtable: plugin.vtable,
154            descriptor: plugin.descriptor,
155            free_buffer: plugin.free_buffer,
156            capabilities: plugin.info.capabilities,
157            method_count: plugin.method_count,
158            info: plugin.info,
159            instance,
160            destroy,
161        }
162    }
163
164    /// Create a CdylibExecutor from a plugin descriptor already registered in
165    /// the current process's inventory (via a `#[plugin_impl]` linked into
166    /// the current binary as a normal rlib). No dylib is loaded — the
167    /// descriptor's vtable points at code in the current binary.
168    ///
169    /// Used by the generated `Client::in_process(plugin_name)` constructor.
170    /// Host applications normally use [`CdylibExecutor::from_loaded`] instead.
171    pub fn from_descriptor(desc: &'static PluginDescriptor) -> Result<Self, LoadError> {
172        Self::from_descriptor_with_config(desc, &[])
173    }
174
175    /// Like [`Self::from_descriptor`] but constructs the instance from serialized
176    /// config bytes (FIDIUS-A-0006 / CI.2) — the in-process *configured* path.
177    /// `cfg` is bincode of the plugin's config type (empty = the singleton).
178    pub fn from_descriptor_with_config(
179        desc: &'static PluginDescriptor,
180        cfg: &[u8],
181    ) -> Result<Self, LoadError> {
182        let info = PluginInfo {
183            name: unsafe { desc.plugin_name_str() }.to_string(),
184            interface_name: unsafe { desc.interface_name_str() }.to_string(),
185            interface_hash: desc.interface_hash,
186            interface_version: desc.interface_version,
187            capabilities: desc.capabilities,
188            buffer_strategy: desc
189                .buffer_strategy_kind()
190                .map_err(|v| LoadError::UnknownBufferStrategy { value: v })?,
191            runtime: crate::types::PluginRuntimeKind::Cdylib,
192        };
193        let descriptor = desc as *const PluginDescriptor;
194        let instance = unsafe { construct_instance(descriptor, cfg) };
195        Ok(Self {
196            _library: None,
197            vtable: desc.vtable,
198            descriptor,
199            free_buffer: desc.free_buffer,
200            capabilities: desc.capabilities,
201            method_count: desc.method_count,
202            info,
203            instance,
204            destroy: desc.destroy,
205        })
206    }
207
208    /// Look up a descriptor in the current process's inventory registry by
209    /// `plugin_name` (the Rust struct name that was passed to `#[plugin_impl]`).
210    /// Returns `LoadError::PluginNotFound` if no descriptor has that name.
211    ///
212    /// The returned reference has `'static` lifetime because descriptors
213    /// emitted by `#[plugin_impl]` live in the binary's `.rodata`.
214    pub fn find_in_process_descriptor(
215        plugin_name: &str,
216    ) -> Result<&'static PluginDescriptor, LoadError> {
217        let reg = fidius_core::registry::get_registry();
218        for i in 0..reg.plugin_count as usize {
219            let desc_ptr = unsafe { *reg.descriptors.add(i) };
220            let desc = unsafe { &*desc_ptr };
221            if unsafe { desc.plugin_name_str() } == plugin_name {
222                return Ok(desc);
223            }
224        }
225        Err(LoadError::PluginNotFound {
226            name: plugin_name.to_string(),
227        })
228    }
229
230    /// Call a plugin method by vtable index.
231    ///
232    /// Serializes the input, calls the FFI function pointer at the given index,
233    /// checks the status code, deserializes the output, and frees the plugin-allocated buffer.
234    ///
235    /// # Arguments
236    /// * `index` - The method index in the vtable (0-based, in declaration order)
237    /// * `input` - The input argument to serialize and pass to the plugin
238    ///
239    /// # No timeout
240    ///
241    /// This call runs synchronously on the calling thread and has no built-in
242    /// timeout or cancellation. A misbehaving plugin will block the caller
243    /// indefinitely. See the `fidius` crate top-level docs ("What fidius
244    /// does not provide") for the rationale and the recommended consumer-side
245    /// mitigation.
246    pub fn call_method<I: Serialize, O: DeserializeOwned>(
247        &self,
248        index: usize,
249        input: &I,
250    ) -> Result<O, CallError> {
251        // Bounds check: ensure index is within the vtable
252        if index >= self.method_count as usize {
253            return Err(CallError::InvalidMethodIndex {
254                index,
255                count: self.method_count,
256            });
257        }
258
259        let input_bytes =
260            wire::serialize(input).map_err(|e| CallError::Serialization(e.to_string()))?;
261
262        match self.info.buffer_strategy {
263            BufferStrategyKind::PluginAllocated => self.call_plugin_allocated(index, &input_bytes),
264            BufferStrategyKind::Arena => self.call_arena(index, &input_bytes),
265        }
266    }
267
268    /// Call a plugin method whose argument and successful return value are
269    /// raw bytes — bypassing bincode on both sides. Used by methods declared
270    /// with `#[wire(raw)]` on the interface trait.
271    ///
272    /// Errors and panic messages still use bincode (small typed payloads).
273    /// Returns the success bytes on `Ok`, or a `CallError::Plugin(_)` whose
274    /// inner `PluginError` was bincode-decoded from the plugin's error payload.
275    ///
276    /// Same no-timeout caveat as [`Self::call_method`].
277    pub fn call_method_raw(&self, index: usize, input: &[u8]) -> Result<Vec<u8>, CallError> {
278        if index >= self.method_count as usize {
279            return Err(CallError::InvalidMethodIndex {
280                index,
281                count: self.method_count,
282            });
283        }
284        match self.info.buffer_strategy {
285            BufferStrategyKind::PluginAllocated => self.call_plugin_allocated_raw(index, input),
286            BufferStrategyKind::Arena => self.call_arena_raw(index, input),
287        }
288    }
289
290    /// PluginAllocated path: plugin allocates an output buffer via
291    /// `Box::into_raw(Box<[u8]>)`, host deserializes and calls free_buffer.
292    fn call_plugin_allocated<O: DeserializeOwned>(
293        &self,
294        index: usize,
295        input_bytes: &[u8],
296    ) -> Result<O, CallError> {
297        // Read the slot as `Option<fn>`: an optional method the plugin did not
298        // implement has a NULL vtable slot — never call into it (would segfault).
299        let fn_ptr = match unsafe { *(self.vtable as *const Option<FfiFn>).add(index) } {
300            Some(f) => f,
301            None => return Err(CallError::NotImplemented { bit: index as u32 }),
302        };
303
304        let mut out_ptr: *mut u8 = std::ptr::null_mut();
305        let mut out_len: u32 = 0;
306
307        let status = unsafe {
308            fn_ptr(
309                self.instance,
310                input_bytes.as_ptr(),
311                input_bytes.len() as u32,
312                &mut out_ptr,
313                &mut out_len,
314            )
315        };
316
317        match status {
318            STATUS_OK => {}
319            STATUS_BUFFER_TOO_SMALL => return Err(CallError::BufferTooSmall),
320            STATUS_SERIALIZATION_ERROR => {
321                return Err(CallError::Serialization("FFI serialization failed".into()))
322            }
323            STATUS_PLUGIN_ERROR => {
324                if !out_ptr.is_null() && out_len > 0 {
325                    let output_slice =
326                        unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
327                    let plugin_err: PluginError = wire::deserialize(output_slice)
328                        .map_err(|e| CallError::Deserialization(e.to_string()))?;
329
330                    if let Some(free) = self.free_buffer {
331                        unsafe { free(out_ptr, out_len as usize) };
332                    }
333
334                    return Err(CallError::Plugin(plugin_err));
335                }
336                return Err(CallError::Plugin(PluginError::new(
337                    "UNKNOWN",
338                    "plugin returned error but no error data",
339                )));
340            }
341            STATUS_PANIC => {
342                let msg = if !out_ptr.is_null() && out_len > 0 {
343                    let slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
344                    let msg = wire::deserialize::<String>(slice)
345                        .unwrap_or_else(|_| "unknown panic".into());
346                    if let Some(free) = self.free_buffer {
347                        unsafe { free(out_ptr, out_len as usize) };
348                    }
349                    msg
350                } else {
351                    "unknown panic".into()
352                };
353                return Err(CallError::Panic(msg));
354            }
355            _ => return Err(CallError::UnknownStatus { code: status }),
356        }
357
358        if out_ptr.is_null() {
359            return Err(CallError::Serialization(
360                "plugin returned null output buffer".into(),
361            ));
362        }
363
364        let output_slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
365        let result: Result<O, CallError> =
366            wire::deserialize(output_slice).map_err(|e| CallError::Deserialization(e.to_string()));
367
368        if let Some(free) = self.free_buffer {
369            unsafe { free(out_ptr, out_len as usize) };
370        }
371
372        result
373    }
374
375    /// Arena path: host supplies a buffer from the thread-local pool. If the
376    /// plugin reports `STATUS_BUFFER_TOO_SMALL`, grow the buffer to the
377    /// requested size and retry exactly once (second too-small would indicate
378    /// a misbehaving plugin — bail with `CallError::BufferTooSmall`).
379    fn call_arena<O: DeserializeOwned>(
380        &self,
381        index: usize,
382        input_bytes: &[u8],
383    ) -> Result<O, CallError> {
384        let fn_ptr = match unsafe { *(self.vtable as *const Option<ArenaFn>).add(index) } {
385            Some(f) => f,
386            None => return Err(CallError::NotImplemented { bit: index as u32 }),
387        };
388
389        let mut arena = acquire_arena(DEFAULT_ARENA_CAPACITY);
390        let mut out_offset: u32 = 0;
391        let mut out_len: u32 = 0;
392        let mut retried = false;
393
394        let status = loop {
395            let s = unsafe {
396                fn_ptr(
397                    self.instance,
398                    input_bytes.as_ptr(),
399                    input_bytes.len() as u32,
400                    arena.as_mut_ptr(),
401                    arena.len() as u32,
402                    &mut out_offset,
403                    &mut out_len,
404                )
405            };
406            if s == STATUS_BUFFER_TOO_SMALL && !retried {
407                // Plugin wrote the needed size into out_len. Grow and retry once.
408                let needed = out_len as usize;
409                grow_arena(&mut arena, needed);
410                retried = true;
411                continue;
412            }
413            break s;
414        };
415
416        match status {
417            STATUS_OK => {
418                let start = out_offset as usize;
419                let end = start + out_len as usize;
420                if end > arena.len() {
421                    release_arena(arena);
422                    return Err(CallError::Serialization(
423                        "plugin reported out_offset/out_len outside arena".into(),
424                    ));
425                }
426                let result = wire::deserialize(&arena[start..end])
427                    .map_err(|e| CallError::Deserialization(e.to_string()));
428                release_arena(arena);
429                result
430            }
431            STATUS_BUFFER_TOO_SMALL => {
432                release_arena(arena);
433                Err(CallError::BufferTooSmall)
434            }
435            STATUS_SERIALIZATION_ERROR => {
436                release_arena(arena);
437                Err(CallError::Serialization("FFI serialization failed".into()))
438            }
439            STATUS_PLUGIN_ERROR => {
440                let start = out_offset as usize;
441                let end = start + out_len as usize;
442                let plugin_err = if out_len > 0 && end <= arena.len() {
443                    wire::deserialize::<PluginError>(&arena[start..end]).unwrap_or_else(|_| {
444                        PluginError::new("UNKNOWN", "plugin returned malformed error")
445                    })
446                } else {
447                    PluginError::new("UNKNOWN", "plugin returned error but no error data")
448                };
449                release_arena(arena);
450                Err(CallError::Plugin(plugin_err))
451            }
452            STATUS_PANIC => {
453                // Arena strategy's panic path returns out_len = 0 (the arena
454                // might be too small for the panic message). Host can't
455                // recover a message; report an opaque panic.
456                release_arena(arena);
457                Err(CallError::Panic(
458                    "plugin panicked (message not transmitted via Arena strategy)".into(),
459                ))
460            }
461            code => {
462                release_arena(arena);
463                Err(CallError::UnknownStatus { code })
464            }
465        }
466    }
467
468    /// PluginAllocated raw path — same FFI shape as `call_plugin_allocated`,
469    /// but the success buffer is returned to the caller as-is rather than
470    /// fed to bincode.
471    fn call_plugin_allocated_raw(
472        &self,
473        index: usize,
474        input_bytes: &[u8],
475    ) -> Result<Vec<u8>, CallError> {
476        // Read the slot as `Option<fn>`: an optional method the plugin did not
477        // implement has a NULL vtable slot — never call into it (would segfault).
478        let fn_ptr = match unsafe { *(self.vtable as *const Option<FfiFn>).add(index) } {
479            Some(f) => f,
480            None => return Err(CallError::NotImplemented { bit: index as u32 }),
481        };
482
483        let mut out_ptr: *mut u8 = std::ptr::null_mut();
484        let mut out_len: u32 = 0;
485
486        let status = unsafe {
487            fn_ptr(
488                self.instance,
489                input_bytes.as_ptr(),
490                input_bytes.len() as u32,
491                &mut out_ptr,
492                &mut out_len,
493            )
494        };
495
496        match status {
497            STATUS_OK => {}
498            STATUS_BUFFER_TOO_SMALL => return Err(CallError::BufferTooSmall),
499            STATUS_SERIALIZATION_ERROR => {
500                return Err(CallError::Serialization("FFI serialization failed".into()))
501            }
502            STATUS_PLUGIN_ERROR => {
503                if !out_ptr.is_null() && out_len > 0 {
504                    let output_slice =
505                        unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
506                    let plugin_err: PluginError = wire::deserialize(output_slice)
507                        .map_err(|e| CallError::Deserialization(e.to_string()))?;
508                    if let Some(free) = self.free_buffer {
509                        unsafe { free(out_ptr, out_len as usize) };
510                    }
511                    return Err(CallError::Plugin(plugin_err));
512                }
513                return Err(CallError::Plugin(PluginError::new(
514                    "UNKNOWN",
515                    "plugin returned error but no error data",
516                )));
517            }
518            STATUS_PANIC => {
519                let msg = if !out_ptr.is_null() && out_len > 0 {
520                    let slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
521                    let msg = wire::deserialize::<String>(slice)
522                        .unwrap_or_else(|_| "unknown panic".into());
523                    if let Some(free) = self.free_buffer {
524                        unsafe { free(out_ptr, out_len as usize) };
525                    }
526                    msg
527                } else {
528                    "unknown panic".into()
529                };
530                return Err(CallError::Panic(msg));
531            }
532            _ => return Err(CallError::UnknownStatus { code: status }),
533        }
534
535        if out_ptr.is_null() {
536            return Err(CallError::Serialization(
537                "plugin returned null output buffer".into(),
538            ));
539        }
540
541        // Copy the success bytes into a Vec, then free the plugin's buffer.
542        // This matches the existing Box<[u8]> ownership contract — the plugin
543        // owns the memory until `free_buffer` is called.
544        let output_slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
545        let result = output_slice.to_vec();
546
547        if let Some(free) = self.free_buffer {
548            unsafe { free(out_ptr, out_len as usize) };
549        }
550
551        Ok(result)
552    }
553
554    /// Client-streaming raw call (FIDIUS-I-0030 CS2.2). The vtable slot is a
555    /// `ClientStreamFn` that also takes the host's producer `handle`, from which
556    /// the plugin pulls its `Stream<T>` argument. `input_bytes` is the bincode of
557    /// the **non-stream** args; the bincode of the method's result is returned. The
558    /// plugin's consumer frees `handle` via its `drop_fn` — the host must not.
559    ///
560    /// # Safety
561    /// `handle` must be a valid, exclusively-owned producer handle (e.g. from
562    /// [`crate::client_stream::host_producer_handle`]); it is consumed by the call.
563    #[cfg(feature = "streaming")]
564    pub unsafe fn call_client_streaming_raw(
565        &self,
566        index: usize,
567        handle: *mut fidius_core::stream_ffi::FidiusStreamHandle,
568        input_bytes: &[u8],
569    ) -> Result<Vec<u8>, CallError> {
570        if index >= self.method_count as usize {
571            return Err(CallError::InvalidMethodIndex {
572                index,
573                count: self.method_count,
574            });
575        }
576        type ClientStreamFn = unsafe extern "C" fn(
577            *mut c_void,
578            *mut fidius_core::stream_ffi::FidiusStreamHandle,
579            *const u8,
580            u32,
581            *mut *mut u8,
582            *mut u32,
583        ) -> i32;
584        let fn_ptr = match unsafe { *(self.vtable as *const Option<ClientStreamFn>).add(index) } {
585            Some(f) => f,
586            None => return Err(CallError::NotImplemented { bit: index as u32 }),
587        };
588
589        let mut out_ptr: *mut u8 = std::ptr::null_mut();
590        let mut out_len: u32 = 0;
591        let status = unsafe {
592            fn_ptr(
593                self.instance,
594                handle,
595                input_bytes.as_ptr(),
596                input_bytes.len() as u32,
597                &mut out_ptr,
598                &mut out_len,
599            )
600        };
601
602        match status {
603            STATUS_OK => {}
604            STATUS_SERIALIZATION_ERROR => {
605                return Err(CallError::Serialization("FFI serialization failed".into()))
606            }
607            STATUS_PLUGIN_ERROR => {
608                let err = if !out_ptr.is_null() && out_len > 0 {
609                    let slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
610                    let pe: PluginError = wire::deserialize(slice)
611                        .unwrap_or_else(|_| PluginError::new("UNKNOWN", "plugin error"));
612                    if let Some(free) = self.free_buffer {
613                        unsafe { free(out_ptr, out_len as usize) };
614                    }
615                    pe
616                } else {
617                    PluginError::new("UNKNOWN", "plugin returned error but no data")
618                };
619                return Err(CallError::Plugin(err));
620            }
621            STATUS_PANIC => {
622                let msg = if !out_ptr.is_null() && out_len > 0 {
623                    let slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
624                    let m = wire::deserialize::<String>(slice)
625                        .unwrap_or_else(|_| "unknown panic".into());
626                    if let Some(free) = self.free_buffer {
627                        unsafe { free(out_ptr, out_len as usize) };
628                    }
629                    m
630                } else {
631                    "unknown panic".into()
632                };
633                return Err(CallError::Panic(msg));
634            }
635            _ => return Err(CallError::UnknownStatus { code: status }),
636        }
637
638        if out_ptr.is_null() {
639            return Err(CallError::Serialization(
640                "plugin returned null output buffer".into(),
641            ));
642        }
643        let result = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) }.to_vec();
644        if let Some(free) = self.free_buffer {
645            unsafe { free(out_ptr, out_len as usize) };
646        }
647        Ok(result)
648    }
649
650    /// Arena raw path — same FFI shape as `call_arena`, success bytes
651    /// returned as a `Vec<u8>` copied out of the arena.
652    fn call_arena_raw(&self, index: usize, input_bytes: &[u8]) -> Result<Vec<u8>, CallError> {
653        let fn_ptr = match unsafe { *(self.vtable as *const Option<ArenaFn>).add(index) } {
654            Some(f) => f,
655            None => return Err(CallError::NotImplemented { bit: index as u32 }),
656        };
657
658        let mut arena = acquire_arena(DEFAULT_ARENA_CAPACITY);
659        let mut out_offset: u32 = 0;
660        let mut out_len: u32 = 0;
661        let mut retried = false;
662
663        let status = loop {
664            let s = unsafe {
665                fn_ptr(
666                    self.instance,
667                    input_bytes.as_ptr(),
668                    input_bytes.len() as u32,
669                    arena.as_mut_ptr(),
670                    arena.len() as u32,
671                    &mut out_offset,
672                    &mut out_len,
673                )
674            };
675            if s == STATUS_BUFFER_TOO_SMALL && !retried {
676                let needed = out_len as usize;
677                grow_arena(&mut arena, needed);
678                retried = true;
679                continue;
680            }
681            break s;
682        };
683
684        match status {
685            STATUS_OK => {
686                let start = out_offset as usize;
687                let end = start + out_len as usize;
688                if end > arena.len() {
689                    release_arena(arena);
690                    return Err(CallError::Serialization(
691                        "plugin reported out_offset/out_len outside arena".into(),
692                    ));
693                }
694                let result = arena[start..end].to_vec();
695                release_arena(arena);
696                Ok(result)
697            }
698            STATUS_BUFFER_TOO_SMALL => {
699                release_arena(arena);
700                Err(CallError::BufferTooSmall)
701            }
702            STATUS_SERIALIZATION_ERROR => {
703                release_arena(arena);
704                Err(CallError::Serialization("FFI serialization failed".into()))
705            }
706            STATUS_PLUGIN_ERROR => {
707                let start = out_offset as usize;
708                let end = start + out_len as usize;
709                let plugin_err = if out_len > 0 && end <= arena.len() {
710                    wire::deserialize::<PluginError>(&arena[start..end]).unwrap_or_else(|_| {
711                        PluginError::new("UNKNOWN", "plugin returned malformed error")
712                    })
713                } else {
714                    PluginError::new("UNKNOWN", "plugin returned error but no error data")
715                };
716                release_arena(arena);
717                Err(CallError::Plugin(plugin_err))
718            }
719            STATUS_PANIC => {
720                release_arena(arena);
721                Err(CallError::Panic(
722                    "plugin panicked (message not transmitted via Arena strategy)".into(),
723                ))
724            }
725            code => {
726                release_arena(arena);
727                Err(CallError::UnknownStatus { code })
728            }
729        }
730    }
731
732    /// Start a server-streaming cdylib call (FIDIUS-I-0026 CS.1). `input_bytes`
733    /// is the **concrete bincode** of the args tuple (the cdylib path never goes
734    /// through `Value` — same as `call_method`), so the caller serialises with
735    /// `wire::serialize` directly.
736    ///
737    /// Calls the streaming method's vtable slot (an *init* shim with the ordinary
738    /// `FfiFn` shape) to obtain a `FidiusStreamHandle`, then pumps `next()` on a
739    /// dedicated thread (cdylib is synchronous) into a bounded channel →
740    /// `ChunkStream`. The pump owns **one reusable buffer** the guest writes each
741    /// item into (FIDIUS-T-0138 arena-style `next`) — so there's no per-item heap
742    /// alloc and no `free_buffer` crossing, just one `next` call per item. Each
743    /// item crosses as **concrete bincode** and is turned into a `Value` by the
744    /// caller-supplied `decode_item` (`wire::deserialize::<O>` + `to_value`,
745    /// FIDIUS-T-0137). Dropping the stream runs the guest's `drop_fn` (cancel).
746    #[cfg(feature = "streaming")]
747    pub fn call_streaming_raw(
748        &self,
749        index: usize,
750        input_bytes: &[u8],
751        decode_item: fn(&[u8]) -> Result<fidius_core::Value, CallError>,
752    ) -> Result<crate::stream::ChunkStream, CallError> {
753        if index >= self.method_count as usize {
754            return Err(CallError::InvalidMethodIndex {
755                index,
756                count: self.method_count,
757            });
758        }
759
760        // init: the streaming method's vtable slot (FfiFn shape) → handle. Guard
761        // the null slot of an unimplemented optional method.
762        let init = match unsafe { *(self.vtable as *const Option<FfiFn>).add(index) } {
763            Some(f) => f,
764            None => return Err(CallError::NotImplemented { bit: index as u32 }),
765        };
766        let mut out_ptr: *mut u8 = std::ptr::null_mut();
767        let mut out_len: u32 = 0;
768        let status = unsafe {
769            init(
770                self.instance,
771                input_bytes.as_ptr(),
772                input_bytes.len() as u32,
773                &mut out_ptr,
774                &mut out_len,
775            )
776        };
777        match status {
778            STATUS_OK => {}
779            STATUS_SERIALIZATION_ERROR => {
780                return Err(CallError::Serialization(
781                    "stream init: argument decode failed".into(),
782                ))
783            }
784            STATUS_PANIC => return Err(CallError::Panic("plugin panicked in stream init".into())),
785            code => return Err(CallError::UnknownStatus { code }),
786        }
787        if out_ptr.is_null() {
788            return Err(CallError::Backend {
789                runtime: "cdylib".into(),
790                message: "stream init returned a null handle".into(),
791            });
792        }
793
794        // Pump the returned handle into a `ChunkStream` (shared with the
795        // bidirectional path, which reaches the same output-stream handle).
796        Ok(pump_stream_handle(out_ptr, decode_item))
797    }
798
799    /// Bidirectional streaming (FIDIUS-I-0032 / ADR-0010): call a method whose
800    /// vtable slot is a `ClientStreamFn` returning the OUTPUT stream handle. `handle`
801    /// is the host's INPUT producer (the plugin pulls its `Stream<In>` from it);
802    /// `input_bytes` is the bincode of the non-stream args. Returns a `ChunkStream`
803    /// over the plugin's `Stream<Out>` — pulling it drives the plugin, which
804    /// re-enters `handle.next()` on demand (the synchronous lazy-pull composition).
805    ///
806    /// # Safety
807    /// `handle` must be a valid, exclusively-owned producer handle; it is consumed
808    /// by the call (the plugin's output stream frees it via its `drop_fn`).
809    #[cfg(feature = "streaming")]
810    pub unsafe fn call_bidi_streaming_raw(
811        &self,
812        index: usize,
813        handle: *mut fidius_core::stream_ffi::FidiusStreamHandle,
814        input_bytes: &[u8],
815        decode_item: fn(&[u8]) -> Result<fidius_core::Value, CallError>,
816    ) -> Result<crate::stream::ChunkStream, CallError> {
817        if index >= self.method_count as usize {
818            return Err(CallError::InvalidMethodIndex {
819                index,
820                count: self.method_count,
821            });
822        }
823        // Same FFI shape as client-streaming: instance + input handle + args + out.
824        type ClientStreamFn = unsafe extern "C" fn(
825            *mut c_void,
826            *mut fidius_core::stream_ffi::FidiusStreamHandle,
827            *const u8,
828            u32,
829            *mut *mut u8,
830            *mut u32,
831        ) -> i32;
832        let init = match unsafe { *(self.vtable as *const Option<ClientStreamFn>).add(index) } {
833            Some(f) => f,
834            None => return Err(CallError::NotImplemented { bit: index as u32 }),
835        };
836        let mut out_ptr: *mut u8 = std::ptr::null_mut();
837        let mut out_len: u32 = 0;
838        let status = unsafe {
839            init(
840                self.instance,
841                handle,
842                input_bytes.as_ptr(),
843                input_bytes.len() as u32,
844                &mut out_ptr,
845                &mut out_len,
846            )
847        };
848        match status {
849            STATUS_OK => {}
850            STATUS_SERIALIZATION_ERROR => {
851                return Err(CallError::Serialization(
852                    "bidi stream init: argument decode failed".into(),
853                ))
854            }
855            STATUS_PANIC => {
856                return Err(CallError::Panic(
857                    "plugin panicked in bidi stream init".into(),
858                ))
859            }
860            code => return Err(CallError::UnknownStatus { code }),
861        }
862        if out_ptr.is_null() {
863            return Err(CallError::Backend {
864                runtime: "cdylib".into(),
865                message: "bidi stream init returned a null output handle".into(),
866            });
867        }
868        Ok(pump_stream_handle(out_ptr, decode_item))
869    }
870
871    /// Check if an optional method is supported (capability bit is set).
872    ///
873    /// Returns `false` for bit indices >= 64 rather than panicking.
874    pub fn has_capability(&self, bit: u32) -> bool {
875        if bit >= 64 {
876            return false;
877        }
878        self.capabilities & (1u64 << bit) != 0
879    }
880
881    /// Access the plugin's owned metadata.
882    pub fn info(&self) -> &PluginInfo {
883        &self.info
884    }
885
886    /// Returns the static key/value metadata declared on the given method via
887    /// `#[method_meta(...)]` attributes on the trait, in declaration order.
888    ///
889    /// Returns an empty `Vec` if:
890    /// - `method_id >= method_count` (out of range)
891    /// - the interface declared no method metadata on any method
892    /// - this specific method has no metadata declared
893    ///
894    /// The returned `&str` slices borrow from the loaded library's `.rodata`
895    /// (for dylib-loaded handles) or from the current binary's `.rodata`
896    /// (for in-process handles). The handle's lifetime bounds them safely.
897    pub fn method_metadata(&self, method_id: u32) -> Vec<(&str, &str)> {
898        if method_id >= self.method_count {
899            return Vec::new();
900        }
901        // SAFETY: descriptor pointer is valid for the handle's lifetime.
902        let desc = unsafe { &*self.descriptor };
903        if desc.method_metadata.is_null() {
904            return Vec::new();
905        }
906        // SAFETY: when method_metadata is non-null, it points at an array
907        // of method_count entries (codegen invariant).
908        let entries =
909            unsafe { std::slice::from_raw_parts(desc.method_metadata, self.method_count as usize) };
910        let entry = &entries[method_id as usize];
911        if entry.kvs.is_null() || entry.kv_count == 0 {
912            return Vec::new();
913        }
914        // SAFETY: kvs points at an array of kv_count MetaKv entries.
915        let kvs = unsafe { std::slice::from_raw_parts(entry.kvs, entry.kv_count as usize) };
916        kvs.iter()
917            .map(|kv| {
918                // SAFETY: both pointers are static, null-terminated UTF-8
919                // per the ABI contract enforced by the macro.
920                let k = unsafe { std::ffi::CStr::from_ptr(kv.key) }
921                    .to_str()
922                    .expect("metadata key is not valid UTF-8");
923                let v = unsafe { std::ffi::CStr::from_ptr(kv.value) }
924                    .to_str()
925                    .expect("metadata value is not valid UTF-8");
926                (k, v)
927            })
928            .collect()
929    }
930
931    /// Returns the static key/value metadata declared on the trait via
932    /// `#[trait_meta(...)]` attributes, in declaration order.
933    ///
934    /// Returns an empty `Vec` if no trait-level metadata was declared.
935    pub fn trait_metadata(&self) -> Vec<(&str, &str)> {
936        // SAFETY: descriptor pointer is valid for the handle's lifetime.
937        let desc = unsafe { &*self.descriptor };
938        if desc.trait_metadata.is_null() || desc.trait_metadata_count == 0 {
939            return Vec::new();
940        }
941        // SAFETY: trait_metadata points at an array of trait_metadata_count entries.
942        let kvs = unsafe {
943            std::slice::from_raw_parts(desc.trait_metadata, desc.trait_metadata_count as usize)
944        };
945        kvs.iter()
946            .map(|kv| {
947                let k = unsafe { std::ffi::CStr::from_ptr(kv.key) }
948                    .to_str()
949                    .expect("trait metadata key is not valid UTF-8");
950                let v = unsafe { std::ffi::CStr::from_ptr(kv.value) }
951                    .to_str()
952                    .expect("trait metadata value is not valid UTF-8");
953                (k, v)
954            })
955            .collect()
956    }
957}
958
959impl PluginExecutor for CdylibExecutor {
960    fn info(&self) -> &PluginInfo {
961        &self.info
962    }
963
964    fn method_count(&self) -> u32 {
965        self.method_count
966    }
967
968    /// Raw byte dispatch. This is also the carrier for the cdylib *typed* path:
969    /// [`CdylibExecutor::call_method`] bincode-wraps the concrete type and the
970    /// `PluginHandle` wrapper routes typed cdylib calls through `call_method`
971    /// directly, so the bytes a plugin receives are unchanged from pre-refactor.
972    fn call_raw(&self, method: usize, input: &[u8]) -> Result<Vec<u8>, CallError> {
973        self.call_method_raw(method, input)
974    }
975}
976
977/// Pump a returned `FidiusStreamHandle` into a [`crate::stream::ChunkStream`] on a
978/// dedicated thread (cdylib is synchronous), into a bounded channel for
979/// backpressure. Shared by server-streaming ([`CdylibExecutor::call_streaming_raw`])
980/// and bidirectional ([`CdylibExecutor::call_bidi_streaming_raw`]) — both reach the
981/// same output-stream handle shape; only the init shim that produced it differs.
982/// Each item crosses as concrete bincode and is lifted to a `Value` by `decode_item`
983/// (FIDIUS-T-0137). One reusable buffer per stream (FIDIUS-T-0138); dropping the
984/// stream runs the guest's `drop_fn` (cancel).
985#[cfg(feature = "streaming")]
986fn pump_stream_handle(
987    out_ptr: *mut u8,
988    decode_item: fn(&[u8]) -> Result<fidius_core::Value, CallError>,
989) -> crate::stream::ChunkStream {
990    use fidius_core::stream_ffi::FidiusStreamHandle;
991    use fidius_core::Value;
992
993    /// Bounded backpressure/memory window between the pump thread and the async
994    /// consumer (mirrors the Python/WASM bridges).
995    const STREAM_CHANNEL_CAP: usize = 4;
996
997    // Send-wrap the raw handle for the pump thread (single-owner for the
998    // stream's lifetime).
999    struct SendHandle(*mut FidiusStreamHandle);
1000    unsafe impl Send for SendHandle {}
1001    let send_handle = SendHandle(out_ptr as *mut FidiusStreamHandle);
1002
1003    let (tx, rx) = tokio::sync::mpsc::channel::<Result<Value, CallError>>(STREAM_CHANNEL_CAP);
1004
1005    std::thread::spawn(move || {
1006        // Force capture of the whole `SendHandle` (which is `Send`), not the
1007        // disjoint raw-pointer field (2021 edition closure capture).
1008        let send_handle = send_handle;
1009        let handle = send_handle.0;
1010
1011        // ONE reusable buffer for the whole stream (FIDIUS-T-0138): the guest
1012        // writes each item into it, so there's no per-item heap alloc and no
1013        // `free_buffer` FFI crossing — just one `next` call per item. Grows on
1014        // demand when the guest reports BUFFER_TOO_SMALL.
1015        const INITIAL_ITEM_CAP: usize = 64;
1016        let mut buf = vec![0u8; INITIAL_ITEM_CAP];
1017
1018        loop {
1019            let next = unsafe { (*handle).next };
1020            let mut out_len: u32 = 0;
1021            let mut status =
1022                unsafe { next(handle, buf.as_mut_ptr(), buf.len() as u32, &mut out_len) };
1023            if status == STATUS_BUFFER_TOO_SMALL {
1024                // Guest reported the size it needs; grow + retry once. The guest
1025                // retains the serialized item across the retry, so nothing is lost.
1026                buf.resize(out_len as usize, 0);
1027                status = unsafe { next(handle, buf.as_mut_ptr(), buf.len() as u32, &mut out_len) };
1028            }
1029            match status {
1030                STATUS_OK => {
1031                    let item = decode_item(&buf[..out_len as usize]);
1032                    let is_err = item.is_err();
1033                    if tx.blocking_send(item).is_err() {
1034                        break; // consumer dropped → cancel
1035                    }
1036                    if is_err {
1037                        break;
1038                    }
1039                }
1040                STATUS_STREAM_END => break,
1041                STATUS_PLUGIN_ERROR => {
1042                    let pe = if out_len > 0 {
1043                        wire::deserialize::<PluginError>(&buf[..out_len as usize]).unwrap_or_else(
1044                            |_| PluginError::new("UNKNOWN", "malformed stream error"),
1045                        )
1046                    } else {
1047                        PluginError::new("UNKNOWN", "stream error without data")
1048                    };
1049                    let _ = tx.blocking_send(Err(CallError::Plugin(pe)));
1050                    break;
1051                }
1052                STATUS_BUFFER_TOO_SMALL => {
1053                    // Still too small after the grow-and-retry — misbehaving guest.
1054                    let _ = tx.blocking_send(Err(CallError::BufferTooSmall));
1055                    break;
1056                }
1057                STATUS_PANIC => {
1058                    let _ = tx.blocking_send(Err(CallError::Panic(
1059                        "plugin panicked in stream next".into(),
1060                    )));
1061                    break;
1062                }
1063                code => {
1064                    let _ = tx.blocking_send(Err(CallError::UnknownStatus { code }));
1065                    break;
1066                }
1067            }
1068        }
1069        // Run the guest destructor + free the handle (exactly once).
1070        unsafe {
1071            let drop_fn = (*handle).drop_fn;
1072            drop_fn(handle);
1073        }
1074    });
1075
1076    let body = futures::stream::unfold(rx, |mut rx| async move {
1077        rx.recv().await.map(|item| (item, rx))
1078    });
1079    crate::stream::ChunkStream::new(body)
1080}