daedalus_runtime/
host_bridge.rs

1use std::any::Any;
2use std::collections::{HashMap, VecDeque};
3use std::sync::{Arc, Mutex};
4use std::sync::{OnceLock, RwLock};
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::task::Poll;
7
8use crate::convert::convert_arc;
9use crate::executor::{CorrelatedPayload, EdgePayload, NodeError, next_correlation_id};
10use crate::io::NodeIo;
11use crate::plan::EdgePolicyKind;
12use daedalus_data::json;
13use daedalus_data::model::{TypeExpr, Value, ValueType};
14use daedalus_data::typing;
15use futures_util::future::poll_fn;
16use futures_util::task::AtomicWaker;
17
18#[cfg(feature = "gpu")]
19use daedalus_gpu::GpuContextHandle;
20
21use image::DynamicImage;
22
23/// Metadata key attached to host-bridge descriptors to mark them for runtime wiring.
24pub const HOST_BRIDGE_META_KEY: &str = "host_bridge";
25/// Canonical registry id for the host-bridge node.
26pub const HOST_BRIDGE_ID: &str = "io.host_bridge";
27
28/// Shared buffers for a single host bridge node. Host code pushes into `inbound`
29/// (host -> graph), handler drains it and forwards into runtime edges; graph
30/// outputs are collected into `outbound` for host consumption.
31#[derive(Default)]
32struct HostBridgeBuffers {
33    inbound: HashMap<String, VecDeque<CorrelatedPayload>>,
34    outbound: HashMap<String, VecDeque<CorrelatedPayload>>,
35    wakers: HashMap<String, Arc<AtomicWaker>>,
36}
37
38/// Handle for interacting with a specific host bridge node.
39#[derive(Clone)]
40pub struct HostBridgeHandle {
41    alias: String,
42    shared: Arc<Mutex<HostBridgeBuffers>>,
43    outgoing: HashMap<String, EdgePolicyKind>,
44    outgoing_types: HashMap<String, TypeExpr>,
45    incoming_types: HashMap<String, TypeExpr>,
46    #[cfg(feature = "gpu")]
47    gpu: Arc<Mutex<Option<GpuContextHandle>>>,
48}
49
50#[cfg(feature = "gpu")]
51fn payload_any_type(any: &dyn Any) -> Option<&'static str> {
52    if any.is::<DynamicImage>() {
53        return Some(std::any::type_name::<DynamicImage>());
54    }
55    if any.is::<image::GrayImage>() {
56        return Some(std::any::type_name::<image::GrayImage>());
57    }
58    if any.is::<image::GrayAlphaImage>() {
59        return Some(std::any::type_name::<image::GrayAlphaImage>());
60    }
61    if any.is::<image::RgbImage>() {
62        return Some(std::any::type_name::<image::RgbImage>());
63    }
64    if any.is::<image::RgbaImage>() {
65        return Some(std::any::type_name::<image::RgbaImage>());
66    }
67    if any.is::<Arc<DynamicImage>>() {
68        return Some(std::any::type_name::<Arc<DynamicImage>>());
69    }
70    if any.is::<Arc<image::GrayImage>>() {
71        return Some(std::any::type_name::<Arc<image::GrayImage>>());
72    }
73    if any.is::<Arc<image::GrayAlphaImage>>() {
74        return Some(std::any::type_name::<Arc<image::GrayAlphaImage>>());
75    }
76    if any.is::<Arc<image::RgbImage>>() {
77        return Some(std::any::type_name::<Arc<image::RgbImage>>());
78    }
79    if any.is::<Arc<image::RgbaImage>>() {
80        return Some(std::any::type_name::<Arc<image::RgbaImage>>());
81    }
82    if any.is::<daedalus_gpu::Payload<DynamicImage>>() {
83        return Some(std::any::type_name::<daedalus_gpu::Payload<DynamicImage>>());
84    }
85    if any.is::<daedalus_gpu::Payload<image::GrayImage>>() {
86        return Some(std::any::type_name::<daedalus_gpu::Payload<image::GrayImage>>());
87    }
88    if any.is::<daedalus_gpu::Payload<image::RgbImage>>() {
89        return Some(std::any::type_name::<daedalus_gpu::Payload<image::RgbImage>>());
90    }
91    if any.is::<daedalus_gpu::Payload<image::RgbaImage>>() {
92        return Some(std::any::type_name::<daedalus_gpu::Payload<image::RgbaImage>>());
93    }
94    if any.is::<daedalus_gpu::ErasedPayload>() {
95        return Some(std::any::type_name::<daedalus_gpu::ErasedPayload>());
96    }
97    None
98}
99
100#[cfg(not(feature = "gpu"))]
101fn payload_any_type(_any: &dyn Any) -> Option<&'static str> {
102    None
103}
104
105impl HostBridgeHandle {
106    fn new(
107        alias: String,
108        shared: Arc<Mutex<HostBridgeBuffers>>,
109        outgoing: HashMap<String, EdgePolicyKind>,
110        outgoing_types: HashMap<String, TypeExpr>,
111        incoming_types: HashMap<String, TypeExpr>,
112        #[cfg(feature = "gpu")] gpu: Arc<Mutex<Option<GpuContextHandle>>>,
113    ) -> Self {
114        Self {
115            alias,
116            shared,
117            outgoing,
118            outgoing_types,
119            incoming_types,
120            #[cfg(feature = "gpu")]
121            gpu,
122        }
123    }
124
125    /// Push a payload for a given output port on the bridge (host -> graph).
126    pub fn push(
127        &self,
128        port: impl AsRef<str>,
129        payload: EdgePayload,
130        correlation_id: Option<u64>,
131    ) -> u64 {
132        let port = port.as_ref().to_ascii_lowercase();
133        if let EdgePayload::Any(any) = &payload
134            && let Some(ty) = payload_any_type(any.as_ref())
135        {
136            panic!(
137                "host bridge port {port}: payload type {ty} must be sent as EdgePayload::Payload, not Any"
138            );
139        }
140        let id = correlation_id.unwrap_or_else(next_correlation_id);
141        let mut guard = self.shared.lock().expect("host bridge poisoned");
142        guard
143            .inbound
144            .entry(port.clone())
145            .or_default()
146            .push_back(CorrelatedPayload {
147                correlation_id: id,
148                inner: payload,
149                enqueued_at: std::time::Instant::now(),
150            });
151        if let Some(waker) = guard.wakers.get(&port) {
152            waker.wake();
153        }
154        id
155    }
156
157    fn restore_outbound(&self, port: &str, payload: CorrelatedPayload) {
158        if let Ok(mut buf) = self.shared.lock() {
159            let key = port.to_ascii_lowercase();
160            let q = buf.outbound.entry(key.clone()).or_default();
161            q.push_front(payload);
162            if let Some(waker) = buf.wakers.get(&key) {
163                waker.wake();
164            }
165        }
166    }
167
168    /// Push any typed payload (auto-wrapped).
169    pub fn push_any<T: Any + Send + Sync + 'static>(&self, port: impl AsRef<str>, value: T) -> u64 {
170        self.push(port, EdgePayload::Any(Arc::new(value)), None)
171    }
172
173    /// Push a serialized payload, decoding it into a runtime payload first.
174    pub fn push_serialized(
175        &self,
176        port: impl AsRef<str>,
177        payload: HostBridgeSerialized,
178        correlation_id: Option<u64>,
179    ) -> Result<u64, NodeError> {
180        let port = port.as_ref().to_ascii_lowercase();
181        let port_type = self.outgoing_types.get(&port);
182        let edge_payload = deserialize_serialized_payload(&port, port_type, payload)?;
183        Ok(self.push(&port, edge_payload, correlation_id))
184    }
185
186    /// Async wait for the next payload on a port.
187    pub async fn recv(&self, port: impl AsRef<str>) -> Option<CorrelatedPayload> {
188        let port = port.as_ref().to_ascii_lowercase();
189        poll_fn(|cx| {
190            let mut guard = self.shared.lock().expect("host bridge poisoned");
191            if let Some(q) = guard.outbound.get_mut(&port)
192                && let Some(item) = q.pop_front()
193            {
194                return Poll::Ready(Some(item));
195            }
196            let waker = guard
197                .wakers
198                .entry(port.clone())
199                .or_insert_with(|| Arc::new(AtomicWaker::new()))
200                .clone();
201            waker.register(cx.waker());
202            Poll::Pending
203        })
204        .await
205    }
206
207    /// Async wait for the next payload on a port and serialize it.
208    pub async fn recv_serialized(
209        &self,
210        port: impl AsRef<str>,
211    ) -> Result<Option<HostBridgeSerializedPayload>, NodeError> {
212        let port = port.as_ref().to_ascii_lowercase();
213        let payload = self.recv(&port).await;
214        match payload {
215            Some(p) => {
216                serialize_outbound_payload(&port, self.incoming_types.get(&port), p).map(Some)
217            }
218            None => Ok(None),
219        }
220    }
221
222    /// Pop and downcast a single payload from the outbound queue for a port.
223    pub fn pull_any<T: Any + Clone>(&self, port: impl AsRef<str>) -> Option<T> {
224        let port = port.as_ref().to_ascii_lowercase();
225        let mut guard = self.shared.lock().ok()?;
226        let payload = guard.outbound.get_mut(&port)?.pop_front()?;
227        match payload.inner {
228            EdgePayload::Any(a) => a.downcast_ref::<T>().cloned(),
229            _ => None,
230        }
231    }
232
233    /// Try to pop a single payload emitted by the graph on the given port.
234    pub fn try_pop(&self, port: impl AsRef<str>) -> Option<CorrelatedPayload> {
235        let port = port.as_ref().to_ascii_lowercase();
236        let mut guard = self.shared.lock().expect("host bridge poisoned");
237        guard.outbound.get_mut(&port).and_then(|q| q.pop_front())
238    }
239
240    /// Try to pop and serialize a single payload emitted by the graph.
241    pub fn try_pop_serialized(
242        &self,
243        port: impl AsRef<str>,
244    ) -> Result<Option<HostBridgeSerializedPayload>, NodeError> {
245        let port = port.as_ref().to_ascii_lowercase();
246        let payload = self.try_pop(&port);
247        match payload {
248            Some(p) => {
249                serialize_outbound_payload(&port, self.incoming_types.get(&port), p).map(Some)
250            }
251            None => Ok(None),
252        }
253    }
254
255    /// Drain all pending payloads for the given port.
256    pub fn drain(&self, port: impl AsRef<str>) -> Vec<CorrelatedPayload> {
257        let port = port.as_ref().to_ascii_lowercase();
258        let mut guard = self.shared.lock().expect("host bridge poisoned");
259        guard
260            .outbound
261            .remove(&port)
262            .map(|q| q.into_iter().collect())
263            .unwrap_or_default()
264    }
265
266    /// Drain and serialize all pending payloads for the given port.
267    pub fn drain_serialized(
268        &self,
269        port: impl AsRef<str>,
270    ) -> Result<Vec<HostBridgeSerializedPayload>, NodeError> {
271        let port = port.as_ref().to_ascii_lowercase();
272        let drained = self.drain(&port);
273        drained
274            .into_iter()
275            .map(|p| serialize_outbound_payload(&port, self.incoming_types.get(&port), p))
276            .collect()
277    }
278
279    /// Ports supported by this bridge (outgoing from host into the graph).
280    pub fn ports(&self) -> impl Iterator<Item = &str> {
281        self.outgoing.keys().map(|k| k.as_str())
282    }
283
284    /// Alias for `ports()` to make direction explicit (`host -> graph`).
285    pub fn outgoing_ports(&self) -> impl Iterator<Item = &str> {
286        self.ports()
287    }
288
289    /// Return a snapshot of known graph->host port names.
290    ///
291    /// This uses both:
292    /// - inferred port types from the planner (preferred)
293    /// - any ports that have appeared in the outbound queues
294    pub fn incoming_port_names(&self) -> Vec<String> {
295        let mut out: Vec<String> = self.incoming_types.keys().cloned().collect();
296        if let Ok(guard) = self.shared.lock() {
297            out.extend(guard.outbound.keys().cloned());
298        }
299        out.sort();
300        out.dedup();
301        out
302    }
303
304    /// Iterate all known graph->host ports as `HostPortOwned` values.
305    ///
306    /// This is the ergonomic path for host consumers:
307    ///
308    /// ```ignore
309    /// for port in host.incoming_ports().filter(|p| p.can_type_to::<Value>()) {
310    ///   if let Some((_corr, value)) = port.try_pop::<Value>()? { /* ... */ }
311    /// }
312    /// ```
313    pub fn incoming_ports(&self) -> HostPortOwnedIter<'_> {
314        HostPortOwnedIter {
315            handle: self,
316            names: self.incoming_port_names(),
317            idx: 0,
318        }
319    }
320
321    /// Iterate over a provided list of graph->host port names as "typed ports" that can be
322    /// filtered and popped in an idiomatic Rust style.
323    pub fn iter_ports<'a>(&'a self, ports: &'a [String]) -> HostPorts<'a> {
324        HostPorts {
325            handle: self,
326            ports,
327            idx: 0,
328        }
329    }
330
331    /// Returns the node alias associated with this bridge.
332    pub fn alias(&self) -> &str {
333        &self.alias
334    }
335
336    pub fn outgoing_port_type(&self, port: impl AsRef<str>) -> Option<&TypeExpr> {
337        let port = port.as_ref().to_ascii_lowercase();
338        self.outgoing_types.get(&port)
339    }
340
341    pub fn incoming_port_type(&self, port: impl AsRef<str>) -> Option<&TypeExpr> {
342        let port = port.as_ref().to_ascii_lowercase();
343        self.incoming_types.get(&port)
344    }
345
346    #[cfg(feature = "gpu")]
347    fn gpu_ctx(&self) -> Option<GpuContextHandle> {
348        self.gpu.lock().ok().and_then(|g| g.as_ref().cloned())
349    }
350
351    /// Try to pop a single payload for `port` and decode it into a `Value` when possible,
352    /// without going through JSON string serialization.
353    ///
354    /// This is intended for "non-image" / structured outputs that hosts want to sample cheaply.
355    /// If the payload is not value-like (e.g. an image payload stored in `Any`), this returns an error.
356    pub fn try_pop_value(&self, port: impl AsRef<str>) -> Result<Option<(u64, Value)>, NodeError> {
357        let port = port.as_ref().to_ascii_lowercase();
358        let Some(payload) = self.try_pop(&port) else {
359            return Ok(None);
360        };
361        let corr = payload.correlation_id;
362        let value = match payload.inner {
363            EdgePayload::Unit => Value::Unit,
364            EdgePayload::Bytes(bytes) => Value::Bytes(bytes.to_vec().into()),
365            EdgePayload::Value(value) => value,
366            EdgePayload::Any(any) => any_to_value(any.as_ref()).ok_or_else(|| {
367                NodeError::InvalidInput(format!(
368                    "host bridge port {port}: payload is not value-like"
369                ))
370            })?,
371            #[cfg(feature = "gpu")]
372            EdgePayload::Payload(_) | EdgePayload::GpuImage(_) => {
373                return Err(NodeError::InvalidInput(format!(
374                    "host bridge port {port}: gpu payloads are not value-like"
375                )));
376            }
377        };
378        Ok(Some((corr, value)))
379    }
380
381    /// Drain all pending payloads for `port`, decoding each into a `Value` when possible.
382    ///
383    /// Any payload that cannot be represented as a `Value` produces an error.
384    pub fn drain_values(&self, port: impl AsRef<str>) -> Result<Vec<(u64, Value)>, NodeError> {
385        let port = port.as_ref().to_ascii_lowercase();
386        let drained = self.drain(&port);
387        drained
388            .into_iter()
389            .map(|p| {
390                let corr = p.correlation_id;
391                let value = match p.inner {
392                    EdgePayload::Unit => Value::Unit,
393                    EdgePayload::Bytes(bytes) => Value::Bytes(bytes.to_vec().into()),
394                    EdgePayload::Value(value) => value,
395                    EdgePayload::Any(any) => any_to_value(any.as_ref()).ok_or_else(|| {
396                        NodeError::InvalidInput(format!(
397                            "host bridge port {port}: payload is not value-like"
398                        ))
399                    })?,
400                    #[cfg(feature = "gpu")]
401                    EdgePayload::Payload(_) | EdgePayload::GpuImage(_) => {
402                        return Err(NodeError::InvalidInput(format!(
403                            "host bridge port {port}: gpu payloads are not value-like"
404                        )));
405                    }
406                };
407                Ok((corr, value))
408            })
409            .collect()
410    }
411}
412
413/// Iterator adapter returned by `HostBridgeHandle::incoming_ports`.
414pub struct HostPortOwnedIter<'a> {
415    handle: &'a HostBridgeHandle,
416    names: Vec<String>,
417    idx: usize,
418}
419
420impl<'a> Iterator for HostPortOwnedIter<'a> {
421    type Item = HostPortOwned<'a>;
422
423    fn next(&mut self) -> Option<Self::Item> {
424        let name = self.names.get(self.idx)?.clone();
425        self.idx += 1;
426        Some(HostPortOwned {
427            handle: self.handle,
428            name,
429        })
430    }
431}
432
433/// A single graph->host port, owning its name (useful for iterator/filter use).
434pub struct HostPortOwned<'a> {
435    handle: &'a HostBridgeHandle,
436    name: String,
437}
438
439impl<'a> HostPortOwned<'a> {
440    pub fn name(&self) -> &str {
441        &self.name
442    }
443
444    pub fn resolved_type(&self) -> Option<&TypeExpr> {
445        self.handle.incoming_port_type(&self.name)
446    }
447
448    /// Alias for `can_poll` (terminology: "can this port be typed/popped as T?").
449    pub fn can_type_to<T: HostPollable>(&self) -> bool {
450        self.can_poll::<T>()
451    }
452
453    pub fn can_poll<T: HostPollable>(&self) -> bool {
454        T::can_poll(self.resolved_type())
455    }
456
457    pub fn is_type_expr(&self, ty: &TypeExpr) -> bool {
458        self.resolved_type().is_some_and(|t| t == ty)
459    }
460
461    pub fn try_pop<T: HostPollable>(&self) -> Result<Option<(u64, T)>, NodeError> {
462        let Some(payload) = self.handle.try_pop(self.name()) else {
463            return Ok(None);
464        };
465        let corr = payload.correlation_id;
466        match T::decode(self.handle, self.name(), self.resolved_type(), payload.clone()) {
467            Ok(value) => Ok(Some((corr, value))),
468            Err(err) => {
469                self.handle.restore_outbound(self.name(), payload);
470                Err(err)
471            }
472        }
473    }
474
475    /// Pop and attempt to downcast directly from `Any` without going through `Value`.
476    pub fn try_pop_any<T: Any + Clone>(&self) -> Option<(u64, T)> {
477        let payload = self.handle.try_pop(self.name())?;
478        let corr = payload.correlation_id;
479        match payload.inner {
480            EdgePayload::Any(a) => a
481                .downcast_ref::<T>()
482                .cloned()
483                .map(|v| (corr, v)),
484            _ => None,
485        }
486    }
487}
488
489/// Iterator adapter returned by `HostBridgeHandle::iter_ports`.
490pub struct HostPorts<'a> {
491    handle: &'a HostBridgeHandle,
492    ports: &'a [String],
493    idx: usize,
494}
495
496impl<'a> Iterator for HostPorts<'a> {
497    type Item = HostPort<'a>;
498
499    fn next(&mut self) -> Option<Self::Item> {
500        let port = self.ports.get(self.idx)?.as_str();
501        self.idx += 1;
502        Some(HostPort {
503            handle: self.handle,
504            name: port,
505        })
506    }
507}
508
509/// A single graph->host port, providing typed polling and convenience helpers.
510#[derive(Clone, Copy)]
511pub struct HostPort<'a> {
512    handle: &'a HostBridgeHandle,
513    name: &'a str,
514}
515
516impl<'a> HostPort<'a> {
517    pub fn name(&self) -> &'a str {
518        self.name
519    }
520
521    /// The resolved graph->host port type (after planner inference), if known.
522    pub fn resolved_type(&self) -> Option<&'a TypeExpr> {
523        self.handle.incoming_port_type(self.name)
524    }
525
526    /// Alias for `can_poll` (terminology: "can this port be typed/popped as T?").
527    pub fn can_type_to<T: HostPollable>(&self) -> bool {
528        self.can_poll::<T>()
529    }
530
531    /// Returns `true` if this port can be pulled as `T` without additional user glue.
532    pub fn can_poll<T: HostPollable>(&self) -> bool {
533        T::can_poll(self.resolved_type())
534    }
535
536    pub fn is_type_expr(&self, ty: &TypeExpr) -> bool {
537        self.resolved_type().is_some_and(|t| t == ty)
538    }
539
540    /// Try to pop a single value from this port and decode it as `T`.
541    pub fn try_pop<T: HostPollable>(&self) -> Result<Option<(u64, T)>, NodeError> {
542        let Some(payload) = self.handle.try_pop(self.name) else {
543            return Ok(None);
544        };
545        let corr = payload.correlation_id;
546        match T::decode(self.handle, self.name, self.resolved_type(), payload.clone()) {
547            Ok(value) => Ok(Some((corr, value))),
548            Err(err) => {
549                self.handle.restore_outbound(self.name, payload);
550                Err(err)
551            }
552        }
553    }
554
555    /// Pop and attempt to downcast directly from `Any` without going through `Value`.
556    pub fn try_pop_any<T: Any + Clone>(&self) -> Option<(u64, T)> {
557        let payload = self.handle.try_pop(self.name)?;
558        let corr = payload.correlation_id;
559        match payload.inner {
560            EdgePayload::Any(a) => a
561                .downcast_ref::<T>()
562                .cloned()
563                .map(|v| (corr, v)),
564            _ => None,
565        }
566    }
567}
568
569/// Trait used by `HostPort::can_poll::<T>()` and `HostPort::try_pop::<T>()`.
570pub trait HostPollable: Sized {
571    fn can_poll(port_type: Option<&TypeExpr>) -> bool;
572
573    fn decode(
574        handle: &HostBridgeHandle,
575        port: &str,
576        port_type: Option<&TypeExpr>,
577        payload: CorrelatedPayload,
578    ) -> Result<Self, NodeError>;
579}
580
581impl HostPollable for Value {
582    fn can_poll(_port_type: Option<&TypeExpr>) -> bool {
583        true
584    }
585
586    fn decode(
587        _handle: &HostBridgeHandle,
588        port: &str,
589        _port_type: Option<&TypeExpr>,
590        payload: CorrelatedPayload,
591    ) -> Result<Self, NodeError> {
592        match payload.inner {
593            EdgePayload::Unit => Ok(Value::Unit),
594            EdgePayload::Bytes(bytes) => Ok(Value::Bytes(bytes.to_vec().into())),
595            EdgePayload::Value(value) => Ok(value),
596            EdgePayload::Any(any) => any_to_value(any.as_ref()).ok_or_else(|| {
597                let ty = std::any::type_name_of_val(any.as_ref());
598                NodeError::InvalidInput(format!(
599                    "host bridge port {port}: payload is not value-like (type={ty})"
600                ))
601            }),
602            #[cfg(feature = "gpu")]
603            EdgePayload::Payload(_) | EdgePayload::GpuImage(_) => Err(NodeError::InvalidInput(
604                format!("host bridge port {port}: gpu payloads are not value-like"),
605            )),
606        }
607    }
608}
609
610impl HostPollable for DynamicImage {
611    fn can_poll(port_type: Option<&TypeExpr>) -> bool {
612        let Some(port_type) = port_type else {
613            return false;
614        };
615        let target = typing::lookup_type::<DynamicImage>()
616            .unwrap_or_else(|| TypeExpr::opaque("image:dynamic"));
617        port_type == &target || typing::can_convert_typeexpr(port_type, &target)
618    }
619
620    fn decode(
621        handle: &HostBridgeHandle,
622        port: &str,
623        _port_type: Option<&TypeExpr>,
624        payload: CorrelatedPayload,
625    ) -> Result<Self, NodeError> {
626        let _ = handle;
627        match payload.inner {
628            EdgePayload::Any(any) => {
629                if let Some(img) = any.downcast_ref::<DynamicImage>().cloned() {
630                    return Ok(img);
631                }
632                if let Some(inner) = any.downcast_ref::<Arc<dyn Any + Send + Sync>>() {
633                    let inner_ref = inner.as_ref();
634                    if let Some(img) = inner_ref.downcast_ref::<DynamicImage>().cloned() {
635                        return Ok(img);
636                    }
637                    if let Some(img) = inner_ref.downcast_ref::<image::RgbaImage>() {
638                        return Ok(DynamicImage::ImageRgba8(img.clone()));
639                    }
640                    if let Some(img) = inner_ref.downcast_ref::<image::RgbImage>() {
641                        return Ok(DynamicImage::ImageRgb8(img.clone()));
642                    }
643                    if let Some(img) = inner_ref.downcast_ref::<image::GrayImage>() {
644                        return Ok(DynamicImage::ImageLuma8(img.clone()));
645                    }
646                    if let Some(img) = inner_ref.downcast_ref::<image::GrayAlphaImage>() {
647                        return Ok(DynamicImage::ImageLumaA8(img.clone()));
648                    }
649                    if let Some(converted) = convert_arc::<DynamicImage>(inner) {
650                        return Ok(converted);
651                    }
652                }
653                if let Some(img) = any.downcast_ref::<image::RgbaImage>() {
654                    return Ok(DynamicImage::ImageRgba8(img.clone()));
655                }
656                if let Some(img) = any.downcast_ref::<image::RgbImage>() {
657                    return Ok(DynamicImage::ImageRgb8(img.clone()));
658                }
659                if let Some(img) = any.downcast_ref::<image::GrayImage>() {
660                    return Ok(DynamicImage::ImageLuma8(img.clone()));
661                }
662                if let Some(img) = any.downcast_ref::<image::GrayAlphaImage>() {
663                    return Ok(DynamicImage::ImageLumaA8(img.clone()));
664                }
665                if let Some(img) = any.downcast_ref::<Arc<DynamicImage>>() {
666                    return Ok((**img).clone());
667                }
668                if let Some(img) = any.downcast_ref::<Arc<image::RgbaImage>>() {
669                    return Ok(DynamicImage::ImageRgba8((**img).clone()));
670                }
671                if let Some(img) = any.downcast_ref::<Arc<image::RgbImage>>() {
672                    return Ok(DynamicImage::ImageRgb8((**img).clone()));
673                }
674                if let Some(img) = any.downcast_ref::<Arc<image::GrayImage>>() {
675                    return Ok(DynamicImage::ImageLuma8((**img).clone()));
676                }
677                if let Some(img) = any.downcast_ref::<Arc<image::GrayAlphaImage>>() {
678                    return Ok(DynamicImage::ImageLumaA8((**img).clone()));
679                }
680                #[cfg(feature = "gpu")]
681                {
682                    // Accept `Payload<DynamicImage>` sent through `Any`.
683                    if let Some(p) = any
684                        .downcast_ref::<daedalus_gpu::Payload<DynamicImage>>()
685                        .cloned()
686                    {
687                        return match p {
688                            daedalus_gpu::Payload::Cpu(img) => Ok(img),
689                            daedalus_gpu::Payload::Gpu(h) => {
690                                let ctx = handle.gpu_ctx().ok_or_else(|| {
691                                    NodeError::InvalidInput(format!(
692                                        "host bridge port {port}: gpu output requires a GPU context"
693                                    ))
694                                })?;
695                                <DynamicImage as daedalus_gpu::GpuSendable>::download(&h, &ctx)
696                                    .map_err(|e| {
697                                        NodeError::InvalidInput(format!(
698                                            "host bridge port {port}: failed to download gpu image ({e})"
699                                        ))
700                                    })
701                            }
702                        };
703                    }
704
705                    // Accept `ErasedPayload` carrying a CPU/GPU image.
706                    if let Some(ep) = any
707                        .downcast_ref::<daedalus_gpu::ErasedPayload>()
708                        .cloned()
709                    {
710                        if let Some(cpu) = ep.clone_cpu::<DynamicImage>() {
711                            return Ok(cpu);
712                        }
713                        if ep.is_gpu() {
714                            let ctx = handle.gpu_ctx().ok_or_else(|| {
715                                NodeError::InvalidInput(format!(
716                                    "host bridge port {port}: gpu output requires a GPU context"
717                                ))
718                            })?;
719                            if let Ok(downloaded) = ep.download(&ctx)
720                                && let Some(cpu) = downloaded.as_cpu::<DynamicImage>().cloned()
721                            {
722                                return Ok(cpu);
723                            }
724                        }
725                    }
726
727                    // Accept a raw GPU handle sent through `Any`.
728                    if let Some(h) = any
729                        .downcast_ref::<daedalus_gpu::GpuImageHandle>()
730                        .cloned()
731                    {
732                        let ctx = handle.gpu_ctx().ok_or_else(|| {
733                            NodeError::InvalidInput(format!(
734                                "host bridge port {port}: gpu output requires a GPU context"
735                            ))
736                        })?;
737                        return <DynamicImage as daedalus_gpu::GpuSendable>::download(&h, &ctx)
738                            .map_err(|e| {
739                                NodeError::InvalidInput(format!(
740                                    "host bridge port {port}: failed to download gpu image ({e})"
741                                ))
742                            });
743                    }
744                }
745                if let Some(converted) = convert_arc::<DynamicImage>(&any) {
746                    return Ok(converted);
747                }
748                let ty = std::any::type_name_of_val(any.as_ref());
749                Err(NodeError::InvalidInput(format!(
750                    "host bridge port {port}: Any payload is not a DynamicImage (type={ty})"
751                )))
752            }
753            #[cfg(feature = "gpu")]
754            EdgePayload::GpuImage(handle_img) => {
755                let ctx = handle.gpu_ctx().ok_or_else(|| {
756                    NodeError::InvalidInput(format!(
757                        "host bridge port {port}: gpu output requires a GPU context"
758                    ))
759                })?;
760                <DynamicImage as daedalus_gpu::GpuSendable>::download(&handle_img, &ctx).map_err(
761                    |e| {
762                        NodeError::InvalidInput(format!(
763                            "host bridge port {port}: failed to download gpu image ({e})"
764                        ))
765                    },
766                )
767            }
768            #[cfg(feature = "gpu")]
769            EdgePayload::Payload(ep) => {
770                if let Some(cpu) = ep.clone_cpu::<DynamicImage>() {
771                    return Ok(cpu);
772                }
773                if let Some(gpu) = ep.clone_gpu::<DynamicImage>() {
774                    let ctx = handle.gpu_ctx().ok_or_else(|| {
775                        NodeError::InvalidInput(format!(
776                            "host bridge port {port}: gpu output requires a GPU context"
777                        ))
778                    })?;
779                    return <DynamicImage as daedalus_gpu::GpuSendable>::download(&gpu, &ctx)
780                        .map_err(|e| {
781                            NodeError::InvalidInput(format!(
782                                "host bridge port {port}: failed to download gpu payload ({e})"
783                            ))
784                        });
785                }
786                Err(NodeError::InvalidInput(format!(
787                    "host bridge port {port}: payload does not contain an image"
788                )))
789            }
790            EdgePayload::Unit | EdgePayload::Bytes(_) => Err(NodeError::InvalidInput(format!(
791                "host bridge port {port}: payload is not an image"
792            ))),
793            EdgePayload::Value(_) => Err(NodeError::InvalidInput(format!(
794                "host bridge port {port}: payload is not an image"
795            ))),
796        }
797    }
798}
799
800#[cfg(feature = "gpu")]
801impl HostPollable for daedalus_gpu::GpuImageHandle {
802    fn can_poll(port_type: Option<&TypeExpr>) -> bool {
803        // Host image ports are represented as `image:dynamic` in TypeExpr-land.
804        matches!(port_type, Some(t) if *t == TypeExpr::opaque("image:dynamic"))
805    }
806
807    fn decode(
808        _handle: &HostBridgeHandle,
809        port: &str,
810        _port_type: Option<&TypeExpr>,
811        payload: CorrelatedPayload,
812    ) -> Result<Self, NodeError> {
813        match payload.inner {
814            EdgePayload::GpuImage(h) => Ok(h),
815            EdgePayload::Payload(ep) => ep.clone_gpu::<DynamicImage>().ok_or_else(|| {
816                NodeError::InvalidInput(format!(
817                    "host bridge port {port}: payload does not contain a gpu image handle"
818                ))
819            }),
820            EdgePayload::Any(any) => any
821                .downcast_ref::<daedalus_gpu::GpuImageHandle>()
822                .cloned()
823                .ok_or_else(|| {
824                    NodeError::InvalidInput(format!(
825                        "host bridge port {port}: Any payload is not a gpu image handle"
826                    ))
827                }),
828            EdgePayload::Unit | EdgePayload::Bytes(_) => Err(NodeError::InvalidInput(format!(
829                "host bridge port {port}: payload is not a gpu image handle"
830            ))),
831            EdgePayload::Value(_) => Err(NodeError::InvalidInput(format!(
832                "host bridge port {port}: payload is not a gpu image handle"
833            ))),
834        }
835    }
836}
837
838#[cfg(feature = "gpu")]
839impl<T> HostPollable for daedalus_gpu::Payload<T>
840where
841    T: daedalus_gpu::GpuSendable + Clone + Send + Sync + 'static,
842    T::GpuRepr: Clone + Send + Sync + 'static,
843{
844    fn can_poll(_port_type: Option<&TypeExpr>) -> bool {
845        true
846    }
847
848    fn decode(
849        handle: &HostBridgeHandle,
850        port: &str,
851        _port_type: Option<&TypeExpr>,
852        payload: CorrelatedPayload,
853    ) -> Result<Self, NodeError> {
854        match payload.inner {
855            EdgePayload::Payload(ep) => {
856                if let Some(cpu) = ep.clone_cpu::<T>() {
857                    return Ok(daedalus_gpu::Payload::Cpu(cpu));
858                }
859                if let Some(g) = ep.clone_gpu::<T>() {
860                    return Ok(daedalus_gpu::Payload::Gpu(g));
861                }
862                Err(NodeError::InvalidInput(format!(
863                    "host bridge port {port}: payload does not contain requested type"
864                )))
865            }
866            EdgePayload::GpuImage(h) => {
867                let any_ref: &dyn Any = &h;
868                if let Some(repr) = any_ref.downcast_ref::<T::GpuRepr>() {
869                    return Ok(daedalus_gpu::Payload::Gpu(repr.clone()));
870                }
871                Err(NodeError::InvalidInput(format!(
872                    "host bridge port {port}: gpu image handle is not compatible with requested payload type"
873                )))
874            }
875            EdgePayload::Any(any) => {
876                if let Some(p) = any
877                    .downcast_ref::<daedalus_gpu::Payload<T>>()
878                    .cloned()
879                {
880                    return Ok(p);
881                }
882                if let Some(cpu) = any
883                    .downcast_ref::<T>()
884                    .cloned()
885                {
886                    return Ok(daedalus_gpu::Payload::Cpu(cpu));
887                }
888                if let Some(g) = any
889                    .downcast_ref::<T::GpuRepr>()
890                    .cloned()
891                {
892                    return Ok(daedalus_gpu::Payload::Gpu(g));
893                }
894                if let Some(ep) = any
895                    .downcast_ref::<daedalus_gpu::ErasedPayload>()
896                    .cloned()
897                {
898                    if let Some(cpu) = ep.clone_cpu::<T>() {
899                        return Ok(daedalus_gpu::Payload::Cpu(cpu));
900                    }
901                    if let Some(g) = ep.clone_gpu::<T>() {
902                        return Ok(daedalus_gpu::Payload::Gpu(g));
903                    }
904                    if ep.is_gpu()
905                        && let Some(ctx) = handle.gpu_ctx()
906                        && let Ok(downloaded) = ep.download(&ctx)
907                        && let Some(cpu) = downloaded.as_cpu::<T>()
908                    {
909                        return Ok(daedalus_gpu::Payload::Cpu(cpu.clone()));
910                    }
911                }
912                Err(NodeError::InvalidInput(format!(
913                    "host bridge port {port}: Any payload is not compatible with requested payload type"
914                )))
915            }
916            EdgePayload::Unit | EdgePayload::Bytes(_) => Err(NodeError::InvalidInput(format!(
917                "host bridge port {port}: payload is not compatible with requested payload type"
918            ))),
919            EdgePayload::Value(_) => Err(NodeError::InvalidInput(format!(
920                "host bridge port {port}: payload is not compatible with requested payload type"
921            ))),
922        }
923    }
924}
925
926/// Shared value-pop behavior for `HostPort`-like items.
927pub trait HostValuePort {
928    fn name(&self) -> &str;
929    fn try_pop_value(&self) -> Result<Option<(u64, Value)>, NodeError>;
930}
931
932impl<'a> HostValuePort for HostPort<'a> {
933    fn name(&self) -> &str {
934        self.name()
935    }
936
937    fn try_pop_value(&self) -> Result<Option<(u64, Value)>, NodeError> {
938        self.try_pop::<Value>()
939    }
940}
941
942impl<'a> HostValuePort for HostPortOwned<'a> {
943    fn name(&self) -> &str {
944        self.name()
945    }
946
947    fn try_pop_value(&self) -> Result<Option<(u64, Value)>, NodeError> {
948        self.try_pop::<Value>()
949    }
950}
951
952/// Iterator extensions over "host ports" (both `HostPort` and `HostPortOwned`).
953pub trait HostPortIterExt: Iterator + Sized
954where
955    Self::Item: HostValuePort,
956{
957    fn try_pop_all_values(self) -> Result<Vec<(String, u64, Value)>, NodeError> {
958        let mut out = Vec::new();
959        for port in self {
960            if let Some((corr, value)) = port.try_pop_value()? {
961                out.push((port.name().to_string(), corr, value));
962            }
963        }
964        Ok(out)
965    }
966}
967
968impl<I> HostPortIterExt for I
969where
970    I: Iterator + Sized,
971    I::Item: HostValuePort,
972{
973}
974
975/// Collection of host bridge handles keyed by node alias.
976#[derive(Clone, Default)]
977pub struct HostBridgeManager {
978    inner: Arc<Mutex<HashMap<String, Arc<Mutex<HostBridgeBuffers>>>>>,
979    outgoing: Arc<Mutex<HashMap<String, HashMap<String, EdgePolicyKind>>>>,
980    outgoing_types: Arc<Mutex<HashMap<String, HashMap<String, TypeExpr>>>>,
981    incoming_types: Arc<Mutex<HashMap<String, HashMap<String, TypeExpr>>>>,
982    #[cfg(feature = "gpu")]
983    gpu: Arc<Mutex<Option<GpuContextHandle>>>,
984}
985
986static HOST_BRIDGE_OUTBOUND_LOGS: AtomicU64 = AtomicU64::new(0);
987
988impl HostBridgeManager {
989    /// Create an empty manager.
990    pub fn new() -> Self {
991        Self::default()
992    }
993
994    /// Attach a GPU context handle so host-side polling can download GPU-resident payloads
995    /// without requiring the caller to thread `GpuContextHandle` explicitly.
996    #[cfg(feature = "gpu")]
997    pub fn attach_gpu(&self, gpu: GpuContextHandle) {
998        let mut guard = self.gpu.lock().expect("host bridge gpu lock poisoned");
999        *guard = Some(gpu);
1000    }
1001
1002    /// Register a host bridge node by alias with its outgoing ports and policies.
1003    pub fn register_bridge(
1004        &self,
1005        alias: impl Into<String>,
1006        ports: impl IntoIterator<Item = (String, EdgePolicyKind)>,
1007    ) {
1008        let alias = alias.into().to_ascii_lowercase();
1009        {
1010            let mut guard = self.inner.lock().expect("host bridge map poisoned");
1011            guard.entry(alias.clone()).or_default();
1012        }
1013        let mut guard = self.outgoing.lock().expect("host bridge ports poisoned");
1014        guard.insert(alias, ports.into_iter().collect());
1015    }
1016
1017    /// Register port type metadata for a host bridge alias.
1018    pub fn register_port_types(
1019        &self,
1020        alias: impl Into<String>,
1021        outgoing: impl IntoIterator<Item = (String, TypeExpr)>,
1022        incoming: impl IntoIterator<Item = (String, TypeExpr)>,
1023    ) {
1024        let alias = alias.into().to_ascii_lowercase();
1025        {
1026            let mut guard = self.inner.lock().expect("host bridge map poisoned");
1027            guard.entry(alias.clone()).or_default();
1028        }
1029        let mut out_guard = self
1030            .outgoing_types
1031            .lock()
1032            .expect("host bridge types poisoned");
1033        let out_map = out_guard.entry(alias.clone()).or_default();
1034        for (port, ty) in outgoing {
1035            out_map.insert(port.to_ascii_lowercase(), ty);
1036        }
1037
1038        let mut in_guard = self
1039            .incoming_types
1040            .lock()
1041            .expect("host bridge types poisoned");
1042        let in_map = in_guard.entry(alias).or_default();
1043        for (port, ty) in incoming {
1044            in_map.insert(port.to_ascii_lowercase(), ty);
1045        }
1046    }
1047
1048    /// Build a manager from a runtime plan by detecting nodes tagged as host bridges.
1049    pub fn from_plan(plan: &crate::plan::RuntimePlan) -> Self {
1050        let mgr = Self::new();
1051        mgr.populate_from_plan(plan);
1052        mgr
1053    }
1054
1055    /// Populate an existing manager based on the runtime plan.
1056    pub fn populate_from_plan(&self, plan: &crate::plan::RuntimePlan) {
1057        fn parse_type_map(value: Option<&Value>) -> Vec<(String, TypeExpr)> {
1058            let Some(Value::Map(entries)) = value else {
1059                return Vec::new();
1060            };
1061            let mut out = Vec::new();
1062            for (k, v) in entries {
1063                let (Value::String(port), Value::String(json)) = (k, v) else {
1064                    continue;
1065                };
1066                if let Ok(ty) = serde_json::from_str::<TypeExpr>(json) {
1067                    out.push((port.to_ascii_lowercase(), ty));
1068                }
1069            }
1070            out.sort_by(|a, b| a.0.cmp(&b.0));
1071            out
1072        }
1073
1074        for (idx, node) in plan.nodes.iter().enumerate() {
1075            let is_bridge = matches!(
1076                node.metadata.get(HOST_BRIDGE_META_KEY),
1077                Some(Value::Bool(true))
1078            );
1079            if !is_bridge {
1080                continue;
1081            }
1082            let alias = node
1083                .label
1084                .as_deref()
1085                .unwrap_or(node.id.as_str())
1086                .to_ascii_lowercase();
1087            let outgoing_types = parse_type_map(node.metadata.get("dynamic_output_types"));
1088            let incoming_types = parse_type_map(node.metadata.get("dynamic_input_types"));
1089            // Gather ports where this node is the source (host -> graph).
1090            let mut outgoing: HashMap<String, EdgePolicyKind> = HashMap::new();
1091            for (from, from_port, _, _, policy) in plan.edges.iter() {
1092                if from.0 == idx {
1093                    outgoing.insert(from_port.to_ascii_lowercase(), policy.clone());
1094                }
1095            }
1096            self.register_bridge(alias.clone(), outgoing.into_iter());
1097            self.register_port_types(alias, outgoing_types, incoming_types);
1098        }
1099    }
1100
1101    /// Lookup a handle for the given alias, if present.
1102    pub fn handle(&self, alias: impl AsRef<str>) -> Option<HostBridgeHandle> {
1103        let alias = alias.as_ref().to_ascii_lowercase();
1104        let shared = {
1105            let guard = self.inner.lock().ok()?;
1106            guard.get(&alias)?.clone()
1107        };
1108        let outgoing = {
1109            let guard = self.outgoing.lock().ok()?;
1110            guard.get(&alias)?.clone()
1111        };
1112        let outgoing_types = {
1113            let guard = self.outgoing_types.lock().ok()?;
1114            guard.get(&alias).cloned().unwrap_or_default()
1115        };
1116        let incoming_types = {
1117            let guard = self.incoming_types.lock().ok()?;
1118            guard.get(&alias).cloned().unwrap_or_default()
1119        };
1120        Some(HostBridgeHandle::new(
1121            alias,
1122            shared,
1123            outgoing,
1124            outgoing_types,
1125            incoming_types,
1126            #[cfg(feature = "gpu")]
1127            self.gpu.clone(),
1128        ))
1129    }
1130
1131    /// Internal helper: record an outbound payload for host consumption.
1132    fn push_outbound(&self, alias: &str, port: &str, payload: CorrelatedPayload) {
1133        if let Ok(mut guard) = self.inner.lock()
1134            && let Some(shared) = guard.get_mut(&alias.to_ascii_lowercase())
1135            && let Ok(mut buf) = shared.lock()
1136        {
1137            let key = port.to_ascii_lowercase();
1138            let q = buf.outbound.entry(key.clone()).or_default();
1139            q.push_back(payload);
1140            if host_bridge_trace_enabled() {
1141                let count = HOST_BRIDGE_OUTBOUND_LOGS.fetch_add(1, Ordering::Relaxed);
1142                if count < 5 || count.is_multiple_of(500) {
1143                    log::debug!(
1144                        "host-bridge outbound queued alias={} port={} len={}",
1145                        alias,
1146                        key,
1147                        q.len()
1148                    );
1149                }
1150            }
1151            if let Some(waker) = buf.wakers.get(&key) {
1152                waker.wake();
1153            }
1154        }
1155    }
1156
1157    /// Internal helper: drain inbound payloads enqueued by the host.
1158    fn take_inbound(&self, alias: &str) -> Vec<(String, CorrelatedPayload)> {
1159        if let Ok(mut guard) = self.inner.lock()
1160            && let Some(shared) = guard.get_mut(&alias.to_ascii_lowercase())
1161            && let Ok(mut buf) = shared.lock()
1162        {
1163            let mut drained = Vec::new();
1164            for (port, queue) in buf.inbound.iter_mut() {
1165                while let Some(p) = queue.pop_front() {
1166                    drained.push((port.clone(), p));
1167                }
1168            }
1169            return drained;
1170        }
1171        Vec::new()
1172    }
1173}
1174
1175/// Build a host bridge handler that moves data between host-managed buffers and runtime edges.
1176pub fn bridge_handler(
1177    bridges: HostBridgeManager,
1178) -> impl FnMut(
1179    &crate::plan::RuntimeNode,
1180    &crate::state::ExecutionContext,
1181    &mut NodeIo,
1182) -> Result<(), NodeError> {
1183    move |node, _ctx, io| {
1184        let alias = node
1185            .label
1186            .as_deref()
1187            .unwrap_or(node.id.as_str())
1188            .to_ascii_lowercase();
1189
1190        // Host -> graph: forward anything queued on the host side.
1191        let inbound = bridges.take_inbound(&alias);
1192        if host_bridge_trace_enabled() && !inbound.is_empty() {
1193            let mut entries = Vec::new();
1194            for (port, payload) in &inbound {
1195                entries.push(format!("{}#{}", port, describe_payload(payload)));
1196            }
1197            log::debug!(
1198                "host-bridge inbound alias={} node={} ports={}",
1199                alias,
1200                node.id,
1201                entries.join(", ")
1202            );
1203        }
1204        for (port, payload) in inbound {
1205            io.push_correlated_payload(Some(&port), payload);
1206        }
1207
1208        // Graph -> host: collect inputs and stash for host consumption.
1209        if host_bridge_trace_enabled() && !io.inputs().is_empty() {
1210            let mut entries = Vec::new();
1211            for (port, payload) in io.inputs() {
1212                entries.push(format!("{}#{}", port, describe_edge(payload)));
1213            }
1214            log::debug!(
1215                "host-bridge outbound alias={} node={} ports={}",
1216                alias,
1217                node.id,
1218                entries.join(", ")
1219            );
1220        }
1221        for (port, payload) in io.inputs() {
1222            bridges.push_outbound(&alias, port, payload.clone());
1223        }
1224        Ok(())
1225    }
1226}
1227
1228fn host_bridge_trace_enabled() -> bool {
1229    static FLAG: OnceLock<bool> = OnceLock::new();
1230    *FLAG.get_or_init(|| {
1231        std::env::var("DAEDALUS_HOST_BRIDGE_TRACE")
1232            .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
1233            .unwrap_or(false)
1234    })
1235}
1236
1237fn describe_payload(p: &CorrelatedPayload) -> String {
1238    match &p.inner {
1239        EdgePayload::Any(_) => "any".to_string(),
1240        EdgePayload::Value(v) => format!("value({})", describe_value(v)),
1241        EdgePayload::Bytes(b) => format!("bytes({}b)", b.len()),
1242        EdgePayload::Unit => "unit".to_string(),
1243        #[cfg(feature = "gpu")]
1244        EdgePayload::GpuImage(_) => "gpu_image".to_string(),
1245        #[cfg(feature = "gpu")]
1246        EdgePayload::Payload(ep) => {
1247            if ep.is_gpu() { "gpu_payload" } else { "cpu_payload" }.to_string()
1248        },
1249    }
1250}
1251
1252fn describe_edge(p: &CorrelatedPayload) -> String {
1253    match &p.inner {
1254        EdgePayload::Any(_) => "any".to_string(),
1255        EdgePayload::Value(v) => format!("value({})", describe_value(v)),
1256        EdgePayload::Bytes(b) => format!("bytes({}b)", b.len()),
1257        EdgePayload::Unit => "unit".to_string(),
1258        #[cfg(feature = "gpu")]
1259        EdgePayload::GpuImage(_) => "gpu_image".to_string(),
1260        #[cfg(feature = "gpu")]
1261        EdgePayload::Payload(ep) => {
1262            if ep.is_gpu() { "gpu_payload" } else { "cpu_payload" }.to_string()
1263        },
1264    }
1265}
1266
1267fn describe_value(v: &Value) -> String {
1268    match v {
1269        Value::Int(i) => format!("int({})", i),
1270        Value::Float(f) => format!("float({})", f),
1271        Value::Bool(b) => format!("bool({})", b),
1272        Value::String(s) => format!("string({})", s),
1273        Value::Enum(ev) => format!("enum({})", ev.name),
1274        Value::List(_) => "list".to_string(),
1275        Value::Struct(_) => "struct".to_string(),
1276        Value::Tuple(_) => "tuple".to_string(),
1277        Value::Map(_) => "map".to_string(),
1278        Value::Unit => "unit".to_string(),
1279        Value::Bytes(b) => format!("bytes({}b)", b.len()),
1280    }
1281}
1282
1283/// Serialized payload representation for host bridge boundaries.
1284#[derive(Clone, Debug, PartialEq)]
1285pub enum HostBridgeSerialized {
1286    Json(String),
1287    Bytes(Arc<[u8]>),
1288}
1289
1290/// Serialized payload with correlation metadata.
1291#[derive(Clone, Debug, PartialEq)]
1292pub struct HostBridgeSerializedPayload {
1293    pub correlation_id: u64,
1294    pub port_type: Option<TypeExpr>,
1295    pub payload: HostBridgeSerialized,
1296}
1297
1298fn deserialize_serialized_payload(
1299    port: &str,
1300    port_type: Option<&TypeExpr>,
1301    payload: HostBridgeSerialized,
1302) -> Result<EdgePayload, NodeError> {
1303    match payload {
1304        HostBridgeSerialized::Bytes(bytes) => {
1305            if port_type.is_some_and(is_bytes_type) {
1306                return Ok(EdgePayload::Bytes(bytes));
1307            }
1308            let text = std::str::from_utf8(&bytes).map_err(|err| {
1309                NodeError::InvalidInput(format!(
1310                    "host bridge port {port}: bytes are not utf-8 ({err})"
1311                ))
1312            })?;
1313            let value = parse_json_value(port, text)?;
1314            Ok(value_payload(value))
1315        }
1316        HostBridgeSerialized::Json(json) => {
1317            let value = parse_json_value(port, &json)?;
1318            Ok(value_payload(value))
1319        }
1320    }
1321}
1322
1323fn serialize_outbound_payload(
1324    port: &str,
1325    port_type: Option<&TypeExpr>,
1326    payload: CorrelatedPayload,
1327) -> Result<HostBridgeSerializedPayload, NodeError> {
1328    let serialized = match payload.inner {
1329        EdgePayload::Unit => {
1330            HostBridgeSerialized::Json(serialize_value_to_json(port, &Value::Unit)?)
1331        }
1332        EdgePayload::Bytes(bytes) => HostBridgeSerialized::Bytes(bytes),
1333        EdgePayload::Value(value) => {
1334            HostBridgeSerialized::Json(serialize_value_to_json(port, &value)?)
1335        }
1336        EdgePayload::Any(any) => {
1337            if port_type.is_some_and(is_bytes_type) {
1338                if let Some(bytes) = any_to_bytes(any.as_ref()) {
1339                    HostBridgeSerialized::Bytes(bytes)
1340                } else if let Some(value) = any_to_value(any.as_ref()) {
1341                    HostBridgeSerialized::Json(serialize_value_to_json(port, &value)?)
1342                } else {
1343                    return Err(NodeError::InvalidInput(format!(
1344                        "host bridge port {port}: unsupported Any payload for bytes output"
1345                    )));
1346                }
1347            } else if let Some(value) = any_to_value(any.as_ref()) {
1348                HostBridgeSerialized::Json(serialize_value_to_json(port, &value)?)
1349            } else if let Some(bytes) = any_to_bytes(any.as_ref()) {
1350                HostBridgeSerialized::Bytes(bytes)
1351            } else {
1352                return Err(NodeError::InvalidInput(format!(
1353                    "host bridge port {port}: unsupported Any payload"
1354                )));
1355            }
1356        }
1357        #[cfg(feature = "gpu")]
1358        EdgePayload::Payload(_) | EdgePayload::GpuImage(_) => {
1359            return Err(NodeError::InvalidInput(format!(
1360                "host bridge port {port}: gpu payloads cannot be serialized"
1361            )));
1362        }
1363    };
1364
1365    Ok(HostBridgeSerializedPayload {
1366        correlation_id: payload.correlation_id,
1367        port_type: port_type.cloned(),
1368        payload: serialized,
1369    })
1370}
1371
1372fn is_bytes_type(ty: &TypeExpr) -> bool {
1373    matches!(ty, TypeExpr::Scalar(ValueType::Bytes))
1374}
1375
1376fn parse_json_value(port: &str, json_str: &str) -> Result<Value, NodeError> {
1377    json::from_json(json_str).map_err(|err| {
1378        NodeError::InvalidInput(format!(
1379            "host bridge port {port}: invalid typed json ({err})"
1380        ))
1381    })
1382}
1383
1384fn serialize_value_to_json(port: &str, value: &Value) -> Result<String, NodeError> {
1385    json::to_json(value).map_err(|err| {
1386        NodeError::InvalidInput(format!(
1387            "host bridge port {port}: failed to serialize value ({err})"
1388        ))
1389    })
1390}
1391
1392fn value_payload(value: Value) -> EdgePayload {
1393    EdgePayload::Value(value)
1394}
1395
1396type AnyValueSerializer = Box<dyn Fn(&dyn Any) -> Option<Value> + Send + Sync + 'static>;
1397
1398fn value_serializers() -> &'static RwLock<Vec<AnyValueSerializer>> {
1399    static REGISTRY: OnceLock<RwLock<Vec<AnyValueSerializer>>> = OnceLock::new();
1400    REGISTRY.get_or_init(|| RwLock::new(Vec::new()))
1401}
1402
1403fn try_serialize_any_value(any: &dyn Any) -> Option<Value> {
1404    let guard = value_serializers().read().ok()?;
1405    for serializer in guard.iter() {
1406        if let Some(value) = serializer(any) {
1407            return Some(value);
1408        }
1409    }
1410    None
1411}
1412
1413/// Register a conversion from a typed payload `T` into a runtime `Value`.
1414///
1415/// This allows host-bridge output serialization to support plugin-defined structured types.
1416pub fn register_value_serializer<T, F>(serializer: F)
1417where
1418    T: Any + Clone + Send + Sync + 'static,
1419    F: Fn(&T) -> Value + Send + Sync + 'static,
1420{
1421    let mut guard = value_serializers()
1422        .write()
1423        .expect("daedalus-runtime host bridge serializer lock poisoned");
1424    guard.push(Box::new(move |any| {
1425        if let Some(value) = any.downcast_ref::<T>() {
1426            return Some(serializer(value));
1427        }
1428        None
1429    }));
1430}
1431
1432fn any_to_value(any: &dyn Any) -> Option<Value> {
1433    if let Some(value) = try_serialize_any_value(any) {
1434        return Some(value);
1435    }
1436    let value = any
1437        .downcast_ref::<Value>()
1438        .cloned();
1439    if let Some(value) = value {
1440        return Some(value);
1441    }
1442
1443    let i = any
1444        .downcast_ref::<i64>()
1445        .copied();
1446    if let Some(i) = i {
1447        return Some(Value::Int(i));
1448    }
1449
1450    let f = any
1451        .downcast_ref::<f64>()
1452        .copied();
1453    if let Some(f) = f {
1454        return Some(Value::Float(f));
1455    }
1456
1457    let b = any
1458        .downcast_ref::<bool>()
1459        .copied();
1460    if let Some(b) = b {
1461        return Some(Value::Bool(b));
1462    }
1463
1464    let s = any
1465        .downcast_ref::<String>()
1466        .cloned();
1467    if let Some(s) = s {
1468        return Some(Value::String(s.into()));
1469    }
1470
1471    let bytes = any
1472        .downcast_ref::<Vec<u8>>()
1473        .cloned();
1474    bytes.map(|b| Value::Bytes(b.into()))
1475}
1476
1477fn any_to_bytes(any: &dyn Any) -> Option<Arc<[u8]>> {
1478    if let Some(bytes) = any.downcast_ref::<Vec<u8>>().cloned() {
1479        return Some(Arc::from(bytes));
1480    }
1481    if let Some(bytes) = any.downcast_ref::<Arc<[u8]>>() {
1482        return Some(bytes.clone());
1483    }
1484    None
1485}
1486
1487#[cfg(test)]
1488mod tests {
1489    use super::*;
1490
1491    fn empty_policies() -> Vec<(String, EdgePolicyKind)> {
1492        Vec::new()
1493    }
1494
1495    #[test]
1496    fn push_serialized_json_decodes_to_value() {
1497        let mgr = HostBridgeManager::new();
1498        mgr.register_bridge("host", empty_policies());
1499        mgr.register_port_types(
1500            "host",
1501            vec![("config".into(), TypeExpr::Scalar(ValueType::Int))],
1502            Vec::new(),
1503        );
1504        let handle = mgr.handle("host").expect("host handle");
1505
1506        let json = json::to_json(&Value::Int(5)).expect("json");
1507        let id = handle
1508            .push_serialized("config", HostBridgeSerialized::Json(json), None)
1509            .expect("push");
1510
1511        let inbound = mgr.take_inbound("host");
1512        assert_eq!(inbound.len(), 1);
1513        let (port, payload) = &inbound[0];
1514        assert_eq!(port, "config");
1515        assert_eq!(payload.correlation_id, id);
1516
1517        match &payload.inner {
1518            EdgePayload::Value(Value::Int(v)) => assert_eq!(*v, 5),
1519            other => panic!("unexpected payload {other:?}"),
1520        }
1521    }
1522
1523    #[test]
1524    fn try_pop_serialized_encodes_value_to_json() {
1525        let mgr = HostBridgeManager::new();
1526        mgr.register_bridge("host", empty_policies());
1527        mgr.register_port_types(
1528            "host",
1529            Vec::new(),
1530            vec![("status".into(), TypeExpr::Scalar(ValueType::Int))],
1531        );
1532        let payload = CorrelatedPayload::from_edge(value_payload(Value::Int(7)));
1533        mgr.push_outbound("host", "status", payload);
1534
1535        let handle = mgr.handle("host").expect("host handle");
1536        let serialized = handle
1537            .try_pop_serialized("status")
1538            .expect("serialize")
1539            .expect("payload");
1540
1541        assert_eq!(serialized.port_type, Some(TypeExpr::Scalar(ValueType::Int)));
1542
1543        match serialized.payload {
1544            HostBridgeSerialized::Json(json_str) => {
1545                let value = json::from_json(&json_str).expect("from json");
1546                assert_eq!(value, Value::Int(7));
1547            }
1548            other => panic!("unexpected payload {other:?}"),
1549        }
1550    }
1551
1552    #[test]
1553    fn bytes_payloads_pass_through_serialization() {
1554        let mgr = HostBridgeManager::new();
1555        mgr.register_bridge("host", empty_policies());
1556        mgr.register_port_types(
1557            "host",
1558            Vec::new(),
1559            vec![("blob".into(), TypeExpr::Scalar(ValueType::Bytes))],
1560        );
1561        let bytes: Arc<[u8]> = Arc::from(vec![1_u8, 2, 3, 4]);
1562        let payload = CorrelatedPayload::from_edge(EdgePayload::Bytes(bytes.clone()));
1563        mgr.push_outbound("host", "blob", payload);
1564
1565        let handle = mgr.handle("host").expect("host handle");
1566        let serialized = handle
1567            .try_pop_serialized("blob")
1568            .expect("serialize")
1569            .expect("payload");
1570
1571        match serialized.payload {
1572            HostBridgeSerialized::Bytes(out) => assert_eq!(out.as_ref(), bytes.as_ref()),
1573            other => panic!("unexpected payload {other:?}"),
1574        }
1575    }
1576
1577    #[test]
1578    fn try_pop_value_decodes_value_like_any() {
1579        let mgr = HostBridgeManager::new();
1580        mgr.register_bridge("host", empty_policies());
1581        mgr.register_port_types(
1582            "host",
1583            Vec::new(),
1584            vec![("out".into(), TypeExpr::Scalar(ValueType::Int))],
1585        );
1586        mgr.push_outbound(
1587            "host",
1588            "out",
1589            CorrelatedPayload::from_edge(EdgePayload::Any(Arc::new(7_i64))),
1590        );
1591
1592        let host = mgr.handle("host").expect("host handle");
1593        let (_corr, value) = host.try_pop_value("out").expect("value").expect("payload");
1594        assert_eq!(value, Value::Int(7));
1595    }
1596
1597    #[test]
1598    fn try_pop_value_rejects_non_value_any() {
1599        #[derive(Clone)]
1600        struct NotValue;
1601
1602        let mgr = HostBridgeManager::new();
1603        mgr.register_bridge("host", empty_policies());
1604        mgr.register_port_types(
1605            "host",
1606            Vec::new(),
1607            vec![("out".into(), TypeExpr::opaque("image:dynamic"))],
1608        );
1609        mgr.push_outbound(
1610            "host",
1611            "out",
1612            CorrelatedPayload::from_edge(EdgePayload::Any(Arc::new(NotValue))),
1613        );
1614
1615        let host = mgr.handle("host").expect("host handle");
1616        let err = host.try_pop_value("out").expect_err("expected error");
1617        match err {
1618            NodeError::InvalidInput(msg) => assert!(msg.contains("not value-like")),
1619            other => panic!("unexpected error {other:?}"),
1620        }
1621    }
1622
1623    #[test]
1624    fn drain_values_decodes_multiple_payloads() {
1625        let mgr = HostBridgeManager::new();
1626        mgr.register_bridge("host", empty_policies());
1627        mgr.register_port_types(
1628            "host",
1629            Vec::new(),
1630            vec![("out".into(), TypeExpr::Scalar(ValueType::Int))],
1631        );
1632        mgr.push_outbound(
1633            "host",
1634            "out",
1635            CorrelatedPayload::from_edge(value_payload(Value::Int(1))),
1636        );
1637        mgr.push_outbound(
1638            "host",
1639            "out",
1640            CorrelatedPayload::from_edge(value_payload(Value::Int(2))),
1641        );
1642
1643        let host = mgr.handle("host").expect("host handle");
1644        let vals = host.drain_values("out").expect("drain");
1645        assert_eq!(vals.len(), 2);
1646        assert_eq!(vals[0].1, Value::Int(1));
1647        assert_eq!(vals[1].1, Value::Int(2));
1648    }
1649
1650    #[test]
1651    fn iter_ports_can_filter_and_pop_values() {
1652        let mgr = HostBridgeManager::new();
1653        mgr.register_bridge("host", empty_policies());
1654        mgr.register_port_types(
1655            "host",
1656            Vec::new(),
1657            vec![
1658                ("a".into(), TypeExpr::Scalar(ValueType::Int)),
1659                ("b".into(), TypeExpr::Scalar(ValueType::Int)),
1660            ],
1661        );
1662        mgr.push_outbound(
1663            "host",
1664            "a",
1665            CorrelatedPayload::from_edge(value_payload(Value::Int(42))),
1666        );
1667
1668        let host = mgr.handle("host").expect("host handle");
1669        let ports = vec!["a".to_string(), "b".to_string()];
1670        let got = host
1671            .iter_ports(&ports)
1672            .filter(|p| p.can_poll::<Value>())
1673            .try_pop_all_values()
1674            .expect("pop");
1675        assert_eq!(got.len(), 1);
1676        assert_eq!(got[0].0, "a");
1677        assert_eq!(got[0].2, Value::Int(42));
1678    }
1679
1680    #[test]
1681    fn incoming_ports_can_filter_and_pop_values() {
1682        let mgr = HostBridgeManager::new();
1683        mgr.register_bridge("host", empty_policies());
1684        mgr.register_port_types(
1685            "host",
1686            Vec::new(),
1687            vec![
1688                ("a".into(), TypeExpr::Scalar(ValueType::Int)),
1689                ("b".into(), TypeExpr::Scalar(ValueType::Int)),
1690            ],
1691        );
1692        mgr.push_outbound(
1693            "host",
1694            "a",
1695            CorrelatedPayload::from_edge(value_payload(Value::Int(42))),
1696        );
1697
1698        let host = mgr.handle("host").expect("host handle");
1699        let got = host
1700            .incoming_ports()
1701            .filter(|p| p.can_type_to::<Value>())
1702            .try_pop_all_values()
1703            .expect("pop");
1704        assert_eq!(got.len(), 1);
1705        assert_eq!(got[0].0, "a");
1706        assert_eq!(got[0].2, Value::Int(42));
1707    }
1708
1709    #[test]
1710    fn iter_ports_can_poll_dynamic_image_by_resolved_type() {
1711        use image::{GenericImageView, ImageBuffer, Rgba};
1712
1713        let mgr = HostBridgeManager::new();
1714        mgr.register_bridge("host", empty_policies());
1715        mgr.register_port_types(
1716            "host",
1717            Vec::new(),
1718            vec![("frame".into(), TypeExpr::opaque("image:dynamic"))],
1719        );
1720
1721        let img = DynamicImage::ImageRgba8(ImageBuffer::from_pixel(2, 2, Rgba([0, 0, 0, 255])));
1722        mgr.push_outbound(
1723            "host",
1724            "frame",
1725            CorrelatedPayload::from_edge(EdgePayload::Any(Arc::new(img.clone()))),
1726        );
1727
1728        let host = mgr.handle("host").expect("host handle");
1729        let ports = vec!["frame".to_string()];
1730        let port = host.iter_ports(&ports).next().expect("port");
1731        assert!(port.can_poll::<DynamicImage>());
1732        let (_corr, got) = port
1733            .try_pop::<DynamicImage>()
1734            .expect("pop")
1735            .expect("payload");
1736        assert_eq!(got.dimensions(), img.dimensions());
1737    }
1738}