Skip to main content

fidius_host/executor/
cdylib.rs

1// Copyright 2026 Colliery, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! `CdylibExecutor` — the cdylib execution backend: vtable/FFI dispatch with
16//! the bincode wire format.
17//!
18//! This is the original `PluginHandle` dispatch logic, moved behind the
19//! [`crate::executor::PluginExecutor`] seam (FIDIUS-I-0021). It keeps its own
20//! generic `call_method<I, O>` so the cdylib typed path serializes the concrete
21//! type with bincode **directly** — `Value` is never involved, so the bytes the
22//! plugin decodes are byte-identical to pre-refactor. The public-facing
23//! [`crate::handle::PluginHandle`] wraps this in an enum alongside the Python
24//! (and future WASM) backends.
25
26use std::ffi::c_void;
27use std::sync::Arc;
28
29use libloading::Library;
30use serde::de::DeserializeOwned;
31use serde::Serialize;
32
33use fidius_core::descriptor::{BufferStrategyKind, PluginDescriptor};
34use fidius_core::status::*;
35use fidius_core::wire;
36use fidius_core::PluginError;
37
38use crate::arena::{acquire_arena, grow_arena, release_arena, DEFAULT_ARENA_CAPACITY};
39use crate::error::{CallError, LoadError};
40use crate::executor::PluginExecutor;
41use crate::types::PluginInfo;
42
43/// Type alias for the PluginAllocated FFI function pointer signature.
44type FfiFn = unsafe extern "C" fn(*const u8, u32, *mut *mut u8, *mut u32) -> i32;
45
46/// Type alias for the Arena FFI function pointer signature.
47type ArenaFn = unsafe extern "C" fn(*const u8, u32, *mut u8, u32, *mut u32, *mut u32) -> i32;
48
49/// A handle to a loaded plugin, ready for calling methods.
50///
51/// Holds an `Arc<Library>` to keep the dylib loaded as long as any handle exists.
52/// Call methods via `call_method()` which handles serialization, FFI, and cleanup.
53///
54/// `CdylibExecutor` is `Send + Sync`. Plugin methods take `&self` (enforced by
55/// the macro), so concurrent calls from multiple threads are safe as long as
56/// the plugin implementation is thread-safe internally.
57pub struct CdylibExecutor {
58    /// Keeps the library alive for dylib-loaded plugins. `None` for in-process
59    /// handles built via [`CdylibExecutor::from_descriptor`] — in-process plugins
60    /// live in the current binary's address space and don't need Arc-tracking.
61    _library: Option<Arc<Library>>,
62    /// Pointer to the `#[repr(C)]` vtable struct in the loaded library.
63    vtable: *const c_void,
64    /// Pointer to the full descriptor in library memory. Used by metadata
65    /// accessors to read `method_metadata` / `trait_metadata`. Valid for the
66    /// handle's lifetime via `_library` Arc (or forever for in-process).
67    descriptor: *const PluginDescriptor,
68    /// Free function for plugin-allocated output buffers.
69    free_buffer: Option<unsafe extern "C" fn(*mut u8, usize)>,
70    /// Capability bitfield for optional method support.
71    capabilities: u64,
72    /// Total number of methods in the vtable.
73    method_count: u32,
74    /// Owned plugin metadata.
75    info: PluginInfo,
76}
77
78// SAFETY: CdylibExecutor is Send + Sync because:
79// - vtable and free_buffer are function pointers to static code in the loaded library
80// - Arc<Library> is Send + Sync and ensures the library stays loaded
81// - All access through call_method is read-only (no mutation of handle state)
82//
83// Plugin implementations must be thread-safe (&self methods, no &mut self)
84// if the CdylibExecutor is shared across threads. This is enforced at compile
85// time by the #[plugin_interface] macro which rejects &mut self methods.
86unsafe impl Send for CdylibExecutor {}
87unsafe impl Sync for CdylibExecutor {}
88
89impl CdylibExecutor {
90    /// Create a new CdylibExecutor. Crate-private — use `from_loaded()` instead.
91    #[allow(dead_code)]
92    pub(crate) fn new(
93        library: Arc<Library>,
94        vtable: *const c_void,
95        descriptor: *const PluginDescriptor,
96        free_buffer: Option<unsafe extern "C" fn(*mut u8, usize)>,
97        capabilities: u64,
98        method_count: u32,
99        info: PluginInfo,
100    ) -> Self {
101        Self {
102            _library: Some(library),
103            vtable,
104            descriptor,
105            free_buffer,
106            capabilities,
107            method_count,
108            info,
109        }
110    }
111
112    /// Create a CdylibExecutor from a LoadedPlugin.
113    pub fn from_loaded(plugin: crate::loader::LoadedPlugin) -> Self {
114        Self {
115            _library: Some(plugin.library),
116            vtable: plugin.vtable,
117            descriptor: plugin.descriptor,
118            free_buffer: plugin.free_buffer,
119            capabilities: plugin.info.capabilities,
120            method_count: plugin.method_count,
121            info: plugin.info,
122        }
123    }
124
125    /// Create a CdylibExecutor from a plugin descriptor already registered in
126    /// the current process's inventory (via a `#[plugin_impl]` linked into
127    /// the current binary as a normal rlib). No dylib is loaded — the
128    /// descriptor's vtable points at code in the current binary.
129    ///
130    /// Used by the generated `Client::in_process(plugin_name)` constructor.
131    /// Host applications normally use [`CdylibExecutor::from_loaded`] instead.
132    pub fn from_descriptor(desc: &'static PluginDescriptor) -> Result<Self, LoadError> {
133        let info = PluginInfo {
134            name: unsafe { desc.plugin_name_str() }.to_string(),
135            interface_name: unsafe { desc.interface_name_str() }.to_string(),
136            interface_hash: desc.interface_hash,
137            interface_version: desc.interface_version,
138            capabilities: desc.capabilities,
139            buffer_strategy: desc
140                .buffer_strategy_kind()
141                .map_err(|v| LoadError::UnknownBufferStrategy { value: v })?,
142            runtime: crate::types::PluginRuntimeKind::Cdylib,
143        };
144        Ok(Self {
145            _library: None,
146            vtable: desc.vtable,
147            descriptor: desc as *const PluginDescriptor,
148            free_buffer: desc.free_buffer,
149            capabilities: desc.capabilities,
150            method_count: desc.method_count,
151            info,
152        })
153    }
154
155    /// Look up a descriptor in the current process's inventory registry by
156    /// `plugin_name` (the Rust struct name that was passed to `#[plugin_impl]`).
157    /// Returns `LoadError::PluginNotFound` if no descriptor has that name.
158    ///
159    /// The returned reference has `'static` lifetime because descriptors
160    /// emitted by `#[plugin_impl]` live in the binary's `.rodata`.
161    pub fn find_in_process_descriptor(
162        plugin_name: &str,
163    ) -> Result<&'static PluginDescriptor, LoadError> {
164        let reg = fidius_core::registry::get_registry();
165        for i in 0..reg.plugin_count as usize {
166            let desc_ptr = unsafe { *reg.descriptors.add(i) };
167            let desc = unsafe { &*desc_ptr };
168            if unsafe { desc.plugin_name_str() } == plugin_name {
169                return Ok(desc);
170            }
171        }
172        Err(LoadError::PluginNotFound {
173            name: plugin_name.to_string(),
174        })
175    }
176
177    /// Call a plugin method by vtable index.
178    ///
179    /// Serializes the input, calls the FFI function pointer at the given index,
180    /// checks the status code, deserializes the output, and frees the plugin-allocated buffer.
181    ///
182    /// # Arguments
183    /// * `index` - The method index in the vtable (0-based, in declaration order)
184    /// * `input` - The input argument to serialize and pass to the plugin
185    ///
186    /// # No timeout
187    ///
188    /// This call runs synchronously on the calling thread and has no built-in
189    /// timeout or cancellation. A misbehaving plugin will block the caller
190    /// indefinitely. See the `fidius` crate top-level docs ("What fidius
191    /// does not provide") for the rationale and the recommended consumer-side
192    /// mitigation.
193    pub fn call_method<I: Serialize, O: DeserializeOwned>(
194        &self,
195        index: usize,
196        input: &I,
197    ) -> Result<O, CallError> {
198        // Bounds check: ensure index is within the vtable
199        if index >= self.method_count as usize {
200            return Err(CallError::InvalidMethodIndex {
201                index,
202                count: self.method_count,
203            });
204        }
205
206        let input_bytes =
207            wire::serialize(input).map_err(|e| CallError::Serialization(e.to_string()))?;
208
209        match self.info.buffer_strategy {
210            BufferStrategyKind::PluginAllocated => self.call_plugin_allocated(index, &input_bytes),
211            BufferStrategyKind::Arena => self.call_arena(index, &input_bytes),
212        }
213    }
214
215    /// Call a plugin method whose argument and successful return value are
216    /// raw bytes — bypassing bincode on both sides. Used by methods declared
217    /// with `#[wire(raw)]` on the interface trait.
218    ///
219    /// Errors and panic messages still use bincode (small typed payloads).
220    /// Returns the success bytes on `Ok`, or a `CallError::Plugin(_)` whose
221    /// inner `PluginError` was bincode-decoded from the plugin's error payload.
222    ///
223    /// Same no-timeout caveat as [`Self::call_method`].
224    pub fn call_method_raw(&self, index: usize, input: &[u8]) -> Result<Vec<u8>, CallError> {
225        if index >= self.method_count as usize {
226            return Err(CallError::InvalidMethodIndex {
227                index,
228                count: self.method_count,
229            });
230        }
231        match self.info.buffer_strategy {
232            BufferStrategyKind::PluginAllocated => self.call_plugin_allocated_raw(index, input),
233            BufferStrategyKind::Arena => self.call_arena_raw(index, input),
234        }
235    }
236
237    /// PluginAllocated path: plugin allocates an output buffer via
238    /// `Box::into_raw(Box<[u8]>)`, host deserializes and calls free_buffer.
239    fn call_plugin_allocated<O: DeserializeOwned>(
240        &self,
241        index: usize,
242        input_bytes: &[u8],
243    ) -> Result<O, CallError> {
244        let fn_ptr = unsafe {
245            let fn_ptrs = self.vtable as *const FfiFn;
246            *fn_ptrs.add(index)
247        };
248
249        let mut out_ptr: *mut u8 = std::ptr::null_mut();
250        let mut out_len: u32 = 0;
251
252        let status = unsafe {
253            fn_ptr(
254                input_bytes.as_ptr(),
255                input_bytes.len() as u32,
256                &mut out_ptr,
257                &mut out_len,
258            )
259        };
260
261        match status {
262            STATUS_OK => {}
263            STATUS_BUFFER_TOO_SMALL => return Err(CallError::BufferTooSmall),
264            STATUS_SERIALIZATION_ERROR => {
265                return Err(CallError::Serialization("FFI serialization failed".into()))
266            }
267            STATUS_PLUGIN_ERROR => {
268                if !out_ptr.is_null() && out_len > 0 {
269                    let output_slice =
270                        unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
271                    let plugin_err: PluginError = wire::deserialize(output_slice)
272                        .map_err(|e| CallError::Deserialization(e.to_string()))?;
273
274                    if let Some(free) = self.free_buffer {
275                        unsafe { free(out_ptr, out_len as usize) };
276                    }
277
278                    return Err(CallError::Plugin(plugin_err));
279                }
280                return Err(CallError::Plugin(PluginError::new(
281                    "UNKNOWN",
282                    "plugin returned error but no error data",
283                )));
284            }
285            STATUS_PANIC => {
286                let msg = if !out_ptr.is_null() && out_len > 0 {
287                    let slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
288                    let msg = wire::deserialize::<String>(slice)
289                        .unwrap_or_else(|_| "unknown panic".into());
290                    if let Some(free) = self.free_buffer {
291                        unsafe { free(out_ptr, out_len as usize) };
292                    }
293                    msg
294                } else {
295                    "unknown panic".into()
296                };
297                return Err(CallError::Panic(msg));
298            }
299            _ => return Err(CallError::UnknownStatus { code: status }),
300        }
301
302        if out_ptr.is_null() {
303            return Err(CallError::Serialization(
304                "plugin returned null output buffer".into(),
305            ));
306        }
307
308        let output_slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
309        let result: Result<O, CallError> =
310            wire::deserialize(output_slice).map_err(|e| CallError::Deserialization(e.to_string()));
311
312        if let Some(free) = self.free_buffer {
313            unsafe { free(out_ptr, out_len as usize) };
314        }
315
316        result
317    }
318
319    /// Arena path: host supplies a buffer from the thread-local pool. If the
320    /// plugin reports `STATUS_BUFFER_TOO_SMALL`, grow the buffer to the
321    /// requested size and retry exactly once (second too-small would indicate
322    /// a misbehaving plugin — bail with `CallError::BufferTooSmall`).
323    fn call_arena<O: DeserializeOwned>(
324        &self,
325        index: usize,
326        input_bytes: &[u8],
327    ) -> Result<O, CallError> {
328        let fn_ptr = unsafe {
329            let fn_ptrs = self.vtable as *const ArenaFn;
330            *fn_ptrs.add(index)
331        };
332
333        let mut arena = acquire_arena(DEFAULT_ARENA_CAPACITY);
334        let mut out_offset: u32 = 0;
335        let mut out_len: u32 = 0;
336        let mut retried = false;
337
338        let status = loop {
339            let s = unsafe {
340                fn_ptr(
341                    input_bytes.as_ptr(),
342                    input_bytes.len() as u32,
343                    arena.as_mut_ptr(),
344                    arena.len() as u32,
345                    &mut out_offset,
346                    &mut out_len,
347                )
348            };
349            if s == STATUS_BUFFER_TOO_SMALL && !retried {
350                // Plugin wrote the needed size into out_len. Grow and retry once.
351                let needed = out_len as usize;
352                grow_arena(&mut arena, needed);
353                retried = true;
354                continue;
355            }
356            break s;
357        };
358
359        match status {
360            STATUS_OK => {
361                let start = out_offset as usize;
362                let end = start + out_len as usize;
363                if end > arena.len() {
364                    release_arena(arena);
365                    return Err(CallError::Serialization(
366                        "plugin reported out_offset/out_len outside arena".into(),
367                    ));
368                }
369                let result = wire::deserialize(&arena[start..end])
370                    .map_err(|e| CallError::Deserialization(e.to_string()));
371                release_arena(arena);
372                result
373            }
374            STATUS_BUFFER_TOO_SMALL => {
375                release_arena(arena);
376                Err(CallError::BufferTooSmall)
377            }
378            STATUS_SERIALIZATION_ERROR => {
379                release_arena(arena);
380                Err(CallError::Serialization("FFI serialization failed".into()))
381            }
382            STATUS_PLUGIN_ERROR => {
383                let start = out_offset as usize;
384                let end = start + out_len as usize;
385                let plugin_err = if out_len > 0 && end <= arena.len() {
386                    wire::deserialize::<PluginError>(&arena[start..end]).unwrap_or_else(|_| {
387                        PluginError::new("UNKNOWN", "plugin returned malformed error")
388                    })
389                } else {
390                    PluginError::new("UNKNOWN", "plugin returned error but no error data")
391                };
392                release_arena(arena);
393                Err(CallError::Plugin(plugin_err))
394            }
395            STATUS_PANIC => {
396                // Arena strategy's panic path returns out_len = 0 (the arena
397                // might be too small for the panic message). Host can't
398                // recover a message; report an opaque panic.
399                release_arena(arena);
400                Err(CallError::Panic(
401                    "plugin panicked (message not transmitted via Arena strategy)".into(),
402                ))
403            }
404            code => {
405                release_arena(arena);
406                Err(CallError::UnknownStatus { code })
407            }
408        }
409    }
410
411    /// PluginAllocated raw path — same FFI shape as `call_plugin_allocated`,
412    /// but the success buffer is returned to the caller as-is rather than
413    /// fed to bincode.
414    fn call_plugin_allocated_raw(
415        &self,
416        index: usize,
417        input_bytes: &[u8],
418    ) -> Result<Vec<u8>, CallError> {
419        let fn_ptr = unsafe {
420            let fn_ptrs = self.vtable as *const FfiFn;
421            *fn_ptrs.add(index)
422        };
423
424        let mut out_ptr: *mut u8 = std::ptr::null_mut();
425        let mut out_len: u32 = 0;
426
427        let status = unsafe {
428            fn_ptr(
429                input_bytes.as_ptr(),
430                input_bytes.len() as u32,
431                &mut out_ptr,
432                &mut out_len,
433            )
434        };
435
436        match status {
437            STATUS_OK => {}
438            STATUS_BUFFER_TOO_SMALL => return Err(CallError::BufferTooSmall),
439            STATUS_SERIALIZATION_ERROR => {
440                return Err(CallError::Serialization("FFI serialization failed".into()))
441            }
442            STATUS_PLUGIN_ERROR => {
443                if !out_ptr.is_null() && out_len > 0 {
444                    let output_slice =
445                        unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
446                    let plugin_err: PluginError = wire::deserialize(output_slice)
447                        .map_err(|e| CallError::Deserialization(e.to_string()))?;
448                    if let Some(free) = self.free_buffer {
449                        unsafe { free(out_ptr, out_len as usize) };
450                    }
451                    return Err(CallError::Plugin(plugin_err));
452                }
453                return Err(CallError::Plugin(PluginError::new(
454                    "UNKNOWN",
455                    "plugin returned error but no error data",
456                )));
457            }
458            STATUS_PANIC => {
459                let msg = if !out_ptr.is_null() && out_len > 0 {
460                    let slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
461                    let msg = wire::deserialize::<String>(slice)
462                        .unwrap_or_else(|_| "unknown panic".into());
463                    if let Some(free) = self.free_buffer {
464                        unsafe { free(out_ptr, out_len as usize) };
465                    }
466                    msg
467                } else {
468                    "unknown panic".into()
469                };
470                return Err(CallError::Panic(msg));
471            }
472            _ => return Err(CallError::UnknownStatus { code: status }),
473        }
474
475        if out_ptr.is_null() {
476            return Err(CallError::Serialization(
477                "plugin returned null output buffer".into(),
478            ));
479        }
480
481        // Copy the success bytes into a Vec, then free the plugin's buffer.
482        // This matches the existing Box<[u8]> ownership contract — the plugin
483        // owns the memory until `free_buffer` is called.
484        let output_slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len as usize) };
485        let result = output_slice.to_vec();
486
487        if let Some(free) = self.free_buffer {
488            unsafe { free(out_ptr, out_len as usize) };
489        }
490
491        Ok(result)
492    }
493
494    /// Arena raw path — same FFI shape as `call_arena`, success bytes
495    /// returned as a `Vec<u8>` copied out of the arena.
496    fn call_arena_raw(&self, index: usize, input_bytes: &[u8]) -> Result<Vec<u8>, CallError> {
497        let fn_ptr = unsafe {
498            let fn_ptrs = self.vtable as *const ArenaFn;
499            *fn_ptrs.add(index)
500        };
501
502        let mut arena = acquire_arena(DEFAULT_ARENA_CAPACITY);
503        let mut out_offset: u32 = 0;
504        let mut out_len: u32 = 0;
505        let mut retried = false;
506
507        let status = loop {
508            let s = unsafe {
509                fn_ptr(
510                    input_bytes.as_ptr(),
511                    input_bytes.len() as u32,
512                    arena.as_mut_ptr(),
513                    arena.len() as u32,
514                    &mut out_offset,
515                    &mut out_len,
516                )
517            };
518            if s == STATUS_BUFFER_TOO_SMALL && !retried {
519                let needed = out_len as usize;
520                grow_arena(&mut arena, needed);
521                retried = true;
522                continue;
523            }
524            break s;
525        };
526
527        match status {
528            STATUS_OK => {
529                let start = out_offset as usize;
530                let end = start + out_len as usize;
531                if end > arena.len() {
532                    release_arena(arena);
533                    return Err(CallError::Serialization(
534                        "plugin reported out_offset/out_len outside arena".into(),
535                    ));
536                }
537                let result = arena[start..end].to_vec();
538                release_arena(arena);
539                Ok(result)
540            }
541            STATUS_BUFFER_TOO_SMALL => {
542                release_arena(arena);
543                Err(CallError::BufferTooSmall)
544            }
545            STATUS_SERIALIZATION_ERROR => {
546                release_arena(arena);
547                Err(CallError::Serialization("FFI serialization failed".into()))
548            }
549            STATUS_PLUGIN_ERROR => {
550                let start = out_offset as usize;
551                let end = start + out_len as usize;
552                let plugin_err = if out_len > 0 && end <= arena.len() {
553                    wire::deserialize::<PluginError>(&arena[start..end]).unwrap_or_else(|_| {
554                        PluginError::new("UNKNOWN", "plugin returned malformed error")
555                    })
556                } else {
557                    PluginError::new("UNKNOWN", "plugin returned error but no error data")
558                };
559                release_arena(arena);
560                Err(CallError::Plugin(plugin_err))
561            }
562            STATUS_PANIC => {
563                release_arena(arena);
564                Err(CallError::Panic(
565                    "plugin panicked (message not transmitted via Arena strategy)".into(),
566                ))
567            }
568            code => {
569                release_arena(arena);
570                Err(CallError::UnknownStatus { code })
571            }
572        }
573    }
574
575    /// Start a server-streaming cdylib call (FIDIUS-I-0026 CS.1). `input_bytes`
576    /// is the **concrete bincode** of the args tuple (the cdylib path never goes
577    /// through `Value` — same as `call_method`), so the caller serialises with
578    /// `wire::serialize` directly.
579    ///
580    /// Calls the streaming method's vtable slot (an *init* shim with the ordinary
581    /// `FfiFn` shape) to obtain a `FidiusStreamHandle`, then pumps `next()` on a
582    /// dedicated thread (cdylib is synchronous) into a bounded channel →
583    /// `ChunkStream`. The pump owns **one reusable buffer** the guest writes each
584    /// item into (FIDIUS-T-0138 arena-style `next`) — so there's no per-item heap
585    /// alloc and no `free_buffer` crossing, just one `next` call per item. Each
586    /// item crosses as **concrete bincode** and is turned into a `Value` by the
587    /// caller-supplied `decode_item` (`wire::deserialize::<O>` + `to_value`,
588    /// FIDIUS-T-0137). Dropping the stream runs the guest's `drop_fn` (cancel).
589    #[cfg(feature = "streaming")]
590    pub fn call_streaming_raw(
591        &self,
592        index: usize,
593        input_bytes: &[u8],
594        decode_item: fn(&[u8]) -> Result<fidius_core::Value, CallError>,
595    ) -> Result<crate::stream::ChunkStream, CallError> {
596        use fidius_core::stream_ffi::FidiusStreamHandle;
597        use fidius_core::Value;
598
599        /// Bounded backpressure/memory window between the pump thread and the
600        /// async consumer (mirrors the Python/WASM bridges).
601        const STREAM_CHANNEL_CAP: usize = 4;
602
603        if index >= self.method_count as usize {
604            return Err(CallError::InvalidMethodIndex {
605                index,
606                count: self.method_count,
607            });
608        }
609
610        // init: call the streaming method's vtable slot (FfiFn shape) → handle.
611        let init = unsafe { *(self.vtable as *const FfiFn).add(index) };
612        let mut out_ptr: *mut u8 = std::ptr::null_mut();
613        let mut out_len: u32 = 0;
614        let status = unsafe {
615            init(
616                input_bytes.as_ptr(),
617                input_bytes.len() as u32,
618                &mut out_ptr,
619                &mut out_len,
620            )
621        };
622        match status {
623            STATUS_OK => {}
624            STATUS_SERIALIZATION_ERROR => {
625                return Err(CallError::Serialization(
626                    "stream init: argument decode failed".into(),
627                ))
628            }
629            STATUS_PANIC => return Err(CallError::Panic("plugin panicked in stream init".into())),
630            code => return Err(CallError::UnknownStatus { code }),
631        }
632        if out_ptr.is_null() {
633            return Err(CallError::Backend {
634                runtime: "cdylib".into(),
635                message: "stream init returned a null handle".into(),
636            });
637        }
638
639        // Send-wrap the raw handle for the pump thread (single-owner for the
640        // stream's lifetime).
641        struct SendHandle(*mut FidiusStreamHandle);
642        unsafe impl Send for SendHandle {}
643        let send_handle = SendHandle(out_ptr as *mut FidiusStreamHandle);
644
645        let (tx, rx) = tokio::sync::mpsc::channel::<Result<Value, CallError>>(STREAM_CHANNEL_CAP);
646
647        std::thread::spawn(move || {
648            // Force capture of the whole `SendHandle` (which is `Send`), not the
649            // disjoint raw-pointer field (2021 edition closure capture).
650            let send_handle = send_handle;
651            let handle = send_handle.0;
652
653            // ONE reusable buffer for the whole stream (FIDIUS-T-0138): the guest
654            // writes each item into it, so there's no per-item heap alloc and no
655            // `free_buffer` FFI crossing — just one `next` call per item. Grows on
656            // demand when the guest reports BUFFER_TOO_SMALL.
657            const INITIAL_ITEM_CAP: usize = 64;
658            let mut buf = vec![0u8; INITIAL_ITEM_CAP];
659
660            loop {
661                let next = unsafe { (*handle).next };
662                let mut out_len: u32 = 0;
663                let mut status =
664                    unsafe { next(handle, buf.as_mut_ptr(), buf.len() as u32, &mut out_len) };
665                if status == STATUS_BUFFER_TOO_SMALL {
666                    // Guest reported the size it needs; grow + retry once. The guest
667                    // retains the serialized item across the retry, so nothing is lost.
668                    buf.resize(out_len as usize, 0);
669                    status =
670                        unsafe { next(handle, buf.as_mut_ptr(), buf.len() as u32, &mut out_len) };
671                }
672                match status {
673                    STATUS_OK => {
674                        let item = decode_item(&buf[..out_len as usize]);
675                        let is_err = item.is_err();
676                        if tx.blocking_send(item).is_err() {
677                            break; // consumer dropped → cancel
678                        }
679                        if is_err {
680                            break;
681                        }
682                    }
683                    STATUS_STREAM_END => break,
684                    STATUS_PLUGIN_ERROR => {
685                        let pe = if out_len > 0 {
686                            wire::deserialize::<PluginError>(&buf[..out_len as usize])
687                                .unwrap_or_else(|_| {
688                                    PluginError::new("UNKNOWN", "malformed stream error")
689                                })
690                        } else {
691                            PluginError::new("UNKNOWN", "stream error without data")
692                        };
693                        let _ = tx.blocking_send(Err(CallError::Plugin(pe)));
694                        break;
695                    }
696                    STATUS_BUFFER_TOO_SMALL => {
697                        // Still too small after the grow-and-retry — misbehaving guest.
698                        let _ = tx.blocking_send(Err(CallError::BufferTooSmall));
699                        break;
700                    }
701                    STATUS_PANIC => {
702                        let _ = tx.blocking_send(Err(CallError::Panic(
703                            "plugin panicked in stream next".into(),
704                        )));
705                        break;
706                    }
707                    code => {
708                        let _ = tx.blocking_send(Err(CallError::UnknownStatus { code }));
709                        break;
710                    }
711                }
712            }
713            // Run the guest destructor + free the handle (exactly once).
714            unsafe {
715                let drop_fn = (*handle).drop_fn;
716                drop_fn(handle);
717            }
718        });
719
720        let body = futures::stream::unfold(rx, |mut rx| async move {
721            rx.recv().await.map(|item| (item, rx))
722        });
723        Ok(crate::stream::ChunkStream::new(body))
724    }
725
726    /// Check if an optional method is supported (capability bit is set).
727    ///
728    /// Returns `false` for bit indices >= 64 rather than panicking.
729    pub fn has_capability(&self, bit: u32) -> bool {
730        if bit >= 64 {
731            return false;
732        }
733        self.capabilities & (1u64 << bit) != 0
734    }
735
736    /// Access the plugin's owned metadata.
737    pub fn info(&self) -> &PluginInfo {
738        &self.info
739    }
740
741    /// Returns the static key/value metadata declared on the given method via
742    /// `#[method_meta(...)]` attributes on the trait, in declaration order.
743    ///
744    /// Returns an empty `Vec` if:
745    /// - `method_id >= method_count` (out of range)
746    /// - the interface declared no method metadata on any method
747    /// - this specific method has no metadata declared
748    ///
749    /// The returned `&str` slices borrow from the loaded library's `.rodata`
750    /// (for dylib-loaded handles) or from the current binary's `.rodata`
751    /// (for in-process handles). The handle's lifetime bounds them safely.
752    pub fn method_metadata(&self, method_id: u32) -> Vec<(&str, &str)> {
753        if method_id >= self.method_count {
754            return Vec::new();
755        }
756        // SAFETY: descriptor pointer is valid for the handle's lifetime.
757        let desc = unsafe { &*self.descriptor };
758        if desc.method_metadata.is_null() {
759            return Vec::new();
760        }
761        // SAFETY: when method_metadata is non-null, it points at an array
762        // of method_count entries (codegen invariant).
763        let entries =
764            unsafe { std::slice::from_raw_parts(desc.method_metadata, self.method_count as usize) };
765        let entry = &entries[method_id as usize];
766        if entry.kvs.is_null() || entry.kv_count == 0 {
767            return Vec::new();
768        }
769        // SAFETY: kvs points at an array of kv_count MetaKv entries.
770        let kvs = unsafe { std::slice::from_raw_parts(entry.kvs, entry.kv_count as usize) };
771        kvs.iter()
772            .map(|kv| {
773                // SAFETY: both pointers are static, null-terminated UTF-8
774                // per the ABI contract enforced by the macro.
775                let k = unsafe { std::ffi::CStr::from_ptr(kv.key) }
776                    .to_str()
777                    .expect("metadata key is not valid UTF-8");
778                let v = unsafe { std::ffi::CStr::from_ptr(kv.value) }
779                    .to_str()
780                    .expect("metadata value is not valid UTF-8");
781                (k, v)
782            })
783            .collect()
784    }
785
786    /// Returns the static key/value metadata declared on the trait via
787    /// `#[trait_meta(...)]` attributes, in declaration order.
788    ///
789    /// Returns an empty `Vec` if no trait-level metadata was declared.
790    pub fn trait_metadata(&self) -> Vec<(&str, &str)> {
791        // SAFETY: descriptor pointer is valid for the handle's lifetime.
792        let desc = unsafe { &*self.descriptor };
793        if desc.trait_metadata.is_null() || desc.trait_metadata_count == 0 {
794            return Vec::new();
795        }
796        // SAFETY: trait_metadata points at an array of trait_metadata_count entries.
797        let kvs = unsafe {
798            std::slice::from_raw_parts(desc.trait_metadata, desc.trait_metadata_count as usize)
799        };
800        kvs.iter()
801            .map(|kv| {
802                let k = unsafe { std::ffi::CStr::from_ptr(kv.key) }
803                    .to_str()
804                    .expect("trait metadata key is not valid UTF-8");
805                let v = unsafe { std::ffi::CStr::from_ptr(kv.value) }
806                    .to_str()
807                    .expect("trait metadata value is not valid UTF-8");
808                (k, v)
809            })
810            .collect()
811    }
812}
813
814impl PluginExecutor for CdylibExecutor {
815    fn info(&self) -> &PluginInfo {
816        &self.info
817    }
818
819    fn method_count(&self) -> u32 {
820        self.method_count
821    }
822
823    /// Raw byte dispatch. This is also the carrier for the cdylib *typed* path:
824    /// [`CdylibExecutor::call_method`] bincode-wraps the concrete type and the
825    /// `PluginHandle` wrapper routes typed cdylib calls through `call_method`
826    /// directly, so the bytes a plugin receives are unchanged from pre-refactor.
827    fn call_raw(&self, method: usize, input: &[u8]) -> Result<Vec<u8>, CallError> {
828        self.call_method_raw(method, input)
829    }
830}