1use std::any::Any;
2use std::collections::{HashMap, VecDeque};
3use std::sync::{Arc, Mutex};
4use std::sync::{OnceLock, RwLock};
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::task::Poll;
7
8use crate::convert::convert_arc;
9use crate::executor::{CorrelatedPayload, EdgePayload, NodeError, next_correlation_id};
10use crate::io::NodeIo;
11use crate::plan::EdgePolicyKind;
12use daedalus_data::json;
13use daedalus_data::model::{TypeExpr, Value, ValueType};
14use daedalus_data::typing;
15use futures_util::future::poll_fn;
16use futures_util::task::AtomicWaker;
17
18#[cfg(feature = "gpu")]
19use daedalus_gpu::GpuContextHandle;
20
21use image::DynamicImage;
22
23pub const HOST_BRIDGE_META_KEY: &str = "host_bridge";
25pub const HOST_BRIDGE_ID: &str = "io.host_bridge";
27
28#[derive(Default)]
32struct HostBridgeBuffers {
33 inbound: HashMap<String, VecDeque<CorrelatedPayload>>,
34 outbound: HashMap<String, VecDeque<CorrelatedPayload>>,
35 wakers: HashMap<String, Arc<AtomicWaker>>,
36}
37
38#[derive(Clone)]
40pub struct HostBridgeHandle {
41 alias: String,
42 shared: Arc<Mutex<HostBridgeBuffers>>,
43 outgoing: HashMap<String, EdgePolicyKind>,
44 outgoing_types: HashMap<String, TypeExpr>,
45 incoming_types: HashMap<String, TypeExpr>,
46 #[cfg(feature = "gpu")]
47 gpu: Arc<Mutex<Option<GpuContextHandle>>>,
48}
49
50#[cfg(feature = "gpu")]
51fn payload_any_type(any: &dyn Any) -> Option<&'static str> {
52 if any.is::<DynamicImage>() {
53 return Some(std::any::type_name::<DynamicImage>());
54 }
55 if any.is::<image::GrayImage>() {
56 return Some(std::any::type_name::<image::GrayImage>());
57 }
58 if any.is::<image::GrayAlphaImage>() {
59 return Some(std::any::type_name::<image::GrayAlphaImage>());
60 }
61 if any.is::<image::RgbImage>() {
62 return Some(std::any::type_name::<image::RgbImage>());
63 }
64 if any.is::<image::RgbaImage>() {
65 return Some(std::any::type_name::<image::RgbaImage>());
66 }
67 if any.is::<Arc<DynamicImage>>() {
68 return Some(std::any::type_name::<Arc<DynamicImage>>());
69 }
70 if any.is::<Arc<image::GrayImage>>() {
71 return Some(std::any::type_name::<Arc<image::GrayImage>>());
72 }
73 if any.is::<Arc<image::GrayAlphaImage>>() {
74 return Some(std::any::type_name::<Arc<image::GrayAlphaImage>>());
75 }
76 if any.is::<Arc<image::RgbImage>>() {
77 return Some(std::any::type_name::<Arc<image::RgbImage>>());
78 }
79 if any.is::<Arc<image::RgbaImage>>() {
80 return Some(std::any::type_name::<Arc<image::RgbaImage>>());
81 }
82 if any.is::<daedalus_gpu::Payload<DynamicImage>>() {
83 return Some(std::any::type_name::<daedalus_gpu::Payload<DynamicImage>>());
84 }
85 if any.is::<daedalus_gpu::Payload<image::GrayImage>>() {
86 return Some(std::any::type_name::<daedalus_gpu::Payload<image::GrayImage>>());
87 }
88 if any.is::<daedalus_gpu::Payload<image::RgbImage>>() {
89 return Some(std::any::type_name::<daedalus_gpu::Payload<image::RgbImage>>());
90 }
91 if any.is::<daedalus_gpu::Payload<image::RgbaImage>>() {
92 return Some(std::any::type_name::<daedalus_gpu::Payload<image::RgbaImage>>());
93 }
94 if any.is::<daedalus_gpu::ErasedPayload>() {
95 return Some(std::any::type_name::<daedalus_gpu::ErasedPayload>());
96 }
97 None
98}
99
100#[cfg(not(feature = "gpu"))]
101fn payload_any_type(_any: &dyn Any) -> Option<&'static str> {
102 None
103}
104
105impl HostBridgeHandle {
106 fn new(
107 alias: String,
108 shared: Arc<Mutex<HostBridgeBuffers>>,
109 outgoing: HashMap<String, EdgePolicyKind>,
110 outgoing_types: HashMap<String, TypeExpr>,
111 incoming_types: HashMap<String, TypeExpr>,
112 #[cfg(feature = "gpu")] gpu: Arc<Mutex<Option<GpuContextHandle>>>,
113 ) -> Self {
114 Self {
115 alias,
116 shared,
117 outgoing,
118 outgoing_types,
119 incoming_types,
120 #[cfg(feature = "gpu")]
121 gpu,
122 }
123 }
124
125 pub fn push(
127 &self,
128 port: impl AsRef<str>,
129 payload: EdgePayload,
130 correlation_id: Option<u64>,
131 ) -> u64 {
132 let port = port.as_ref().to_ascii_lowercase();
133 if let EdgePayload::Any(any) = &payload
134 && let Some(ty) = payload_any_type(any.as_ref())
135 {
136 panic!(
137 "host bridge port {port}: payload type {ty} must be sent as EdgePayload::Payload, not Any"
138 );
139 }
140 let id = correlation_id.unwrap_or_else(next_correlation_id);
141 let mut guard = self.shared.lock().expect("host bridge poisoned");
142 guard
143 .inbound
144 .entry(port.clone())
145 .or_default()
146 .push_back(CorrelatedPayload {
147 correlation_id: id,
148 inner: payload,
149 enqueued_at: std::time::Instant::now(),
150 });
151 if let Some(waker) = guard.wakers.get(&port) {
152 waker.wake();
153 }
154 id
155 }
156
157 fn restore_outbound(&self, port: &str, payload: CorrelatedPayload) {
158 if let Ok(mut buf) = self.shared.lock() {
159 let key = port.to_ascii_lowercase();
160 let q = buf.outbound.entry(key.clone()).or_default();
161 q.push_front(payload);
162 if let Some(waker) = buf.wakers.get(&key) {
163 waker.wake();
164 }
165 }
166 }
167
168 pub fn push_any<T: Any + Send + Sync + 'static>(&self, port: impl AsRef<str>, value: T) -> u64 {
170 self.push(port, EdgePayload::Any(Arc::new(value)), None)
171 }
172
173 pub fn push_serialized(
175 &self,
176 port: impl AsRef<str>,
177 payload: HostBridgeSerialized,
178 correlation_id: Option<u64>,
179 ) -> Result<u64, NodeError> {
180 let port = port.as_ref().to_ascii_lowercase();
181 let port_type = self.outgoing_types.get(&port);
182 let edge_payload = deserialize_serialized_payload(&port, port_type, payload)?;
183 Ok(self.push(&port, edge_payload, correlation_id))
184 }
185
186 pub async fn recv(&self, port: impl AsRef<str>) -> Option<CorrelatedPayload> {
188 let port = port.as_ref().to_ascii_lowercase();
189 poll_fn(|cx| {
190 let mut guard = self.shared.lock().expect("host bridge poisoned");
191 if let Some(q) = guard.outbound.get_mut(&port)
192 && let Some(item) = q.pop_front()
193 {
194 return Poll::Ready(Some(item));
195 }
196 let waker = guard
197 .wakers
198 .entry(port.clone())
199 .or_insert_with(|| Arc::new(AtomicWaker::new()))
200 .clone();
201 waker.register(cx.waker());
202 Poll::Pending
203 })
204 .await
205 }
206
207 pub async fn recv_serialized(
209 &self,
210 port: impl AsRef<str>,
211 ) -> Result<Option<HostBridgeSerializedPayload>, NodeError> {
212 let port = port.as_ref().to_ascii_lowercase();
213 let payload = self.recv(&port).await;
214 match payload {
215 Some(p) => {
216 serialize_outbound_payload(&port, self.incoming_types.get(&port), p).map(Some)
217 }
218 None => Ok(None),
219 }
220 }
221
222 pub fn pull_any<T: Any + Clone>(&self, port: impl AsRef<str>) -> Option<T> {
224 let port = port.as_ref().to_ascii_lowercase();
225 let mut guard = self.shared.lock().ok()?;
226 let payload = guard.outbound.get_mut(&port)?.pop_front()?;
227 match payload.inner {
228 EdgePayload::Any(a) => a.downcast_ref::<T>().cloned(),
229 _ => None,
230 }
231 }
232
233 pub fn try_pop(&self, port: impl AsRef<str>) -> Option<CorrelatedPayload> {
235 let port = port.as_ref().to_ascii_lowercase();
236 let mut guard = self.shared.lock().expect("host bridge poisoned");
237 guard.outbound.get_mut(&port).and_then(|q| q.pop_front())
238 }
239
240 pub fn try_pop_serialized(
242 &self,
243 port: impl AsRef<str>,
244 ) -> Result<Option<HostBridgeSerializedPayload>, NodeError> {
245 let port = port.as_ref().to_ascii_lowercase();
246 let payload = self.try_pop(&port);
247 match payload {
248 Some(p) => {
249 serialize_outbound_payload(&port, self.incoming_types.get(&port), p).map(Some)
250 }
251 None => Ok(None),
252 }
253 }
254
255 pub fn drain(&self, port: impl AsRef<str>) -> Vec<CorrelatedPayload> {
257 let port = port.as_ref().to_ascii_lowercase();
258 let mut guard = self.shared.lock().expect("host bridge poisoned");
259 guard
260 .outbound
261 .remove(&port)
262 .map(|q| q.into_iter().collect())
263 .unwrap_or_default()
264 }
265
266 pub fn drain_serialized(
268 &self,
269 port: impl AsRef<str>,
270 ) -> Result<Vec<HostBridgeSerializedPayload>, NodeError> {
271 let port = port.as_ref().to_ascii_lowercase();
272 let drained = self.drain(&port);
273 drained
274 .into_iter()
275 .map(|p| serialize_outbound_payload(&port, self.incoming_types.get(&port), p))
276 .collect()
277 }
278
279 pub fn ports(&self) -> impl Iterator<Item = &str> {
281 self.outgoing.keys().map(|k| k.as_str())
282 }
283
284 pub fn outgoing_ports(&self) -> impl Iterator<Item = &str> {
286 self.ports()
287 }
288
289 pub fn incoming_port_names(&self) -> Vec<String> {
295 let mut out: Vec<String> = self.incoming_types.keys().cloned().collect();
296 if let Ok(guard) = self.shared.lock() {
297 out.extend(guard.outbound.keys().cloned());
298 }
299 out.sort();
300 out.dedup();
301 out
302 }
303
304 pub fn incoming_ports(&self) -> HostPortOwnedIter<'_> {
314 HostPortOwnedIter {
315 handle: self,
316 names: self.incoming_port_names(),
317 idx: 0,
318 }
319 }
320
321 pub fn iter_ports<'a>(&'a self, ports: &'a [String]) -> HostPorts<'a> {
324 HostPorts {
325 handle: self,
326 ports,
327 idx: 0,
328 }
329 }
330
331 pub fn alias(&self) -> &str {
333 &self.alias
334 }
335
336 pub fn outgoing_port_type(&self, port: impl AsRef<str>) -> Option<&TypeExpr> {
337 let port = port.as_ref().to_ascii_lowercase();
338 self.outgoing_types.get(&port)
339 }
340
341 pub fn incoming_port_type(&self, port: impl AsRef<str>) -> Option<&TypeExpr> {
342 let port = port.as_ref().to_ascii_lowercase();
343 self.incoming_types.get(&port)
344 }
345
346 #[cfg(feature = "gpu")]
347 fn gpu_ctx(&self) -> Option<GpuContextHandle> {
348 self.gpu.lock().ok().and_then(|g| g.as_ref().cloned())
349 }
350
351 pub fn try_pop_value(&self, port: impl AsRef<str>) -> Result<Option<(u64, Value)>, NodeError> {
357 let port = port.as_ref().to_ascii_lowercase();
358 let Some(payload) = self.try_pop(&port) else {
359 return Ok(None);
360 };
361 let corr = payload.correlation_id;
362 let value = match payload.inner {
363 EdgePayload::Unit => Value::Unit,
364 EdgePayload::Bytes(bytes) => Value::Bytes(bytes.to_vec().into()),
365 EdgePayload::Value(value) => value,
366 EdgePayload::Any(any) => any_to_value(any.as_ref()).ok_or_else(|| {
367 NodeError::InvalidInput(format!(
368 "host bridge port {port}: payload is not value-like"
369 ))
370 })?,
371 #[cfg(feature = "gpu")]
372 EdgePayload::Payload(_) | EdgePayload::GpuImage(_) => {
373 return Err(NodeError::InvalidInput(format!(
374 "host bridge port {port}: gpu payloads are not value-like"
375 )));
376 }
377 };
378 Ok(Some((corr, value)))
379 }
380
381 pub fn drain_values(&self, port: impl AsRef<str>) -> Result<Vec<(u64, Value)>, NodeError> {
385 let port = port.as_ref().to_ascii_lowercase();
386 let drained = self.drain(&port);
387 drained
388 .into_iter()
389 .map(|p| {
390 let corr = p.correlation_id;
391 let value = match p.inner {
392 EdgePayload::Unit => Value::Unit,
393 EdgePayload::Bytes(bytes) => Value::Bytes(bytes.to_vec().into()),
394 EdgePayload::Value(value) => value,
395 EdgePayload::Any(any) => any_to_value(any.as_ref()).ok_or_else(|| {
396 NodeError::InvalidInput(format!(
397 "host bridge port {port}: payload is not value-like"
398 ))
399 })?,
400 #[cfg(feature = "gpu")]
401 EdgePayload::Payload(_) | EdgePayload::GpuImage(_) => {
402 return Err(NodeError::InvalidInput(format!(
403 "host bridge port {port}: gpu payloads are not value-like"
404 )));
405 }
406 };
407 Ok((corr, value))
408 })
409 .collect()
410 }
411}
412
413pub struct HostPortOwnedIter<'a> {
415 handle: &'a HostBridgeHandle,
416 names: Vec<String>,
417 idx: usize,
418}
419
420impl<'a> Iterator for HostPortOwnedIter<'a> {
421 type Item = HostPortOwned<'a>;
422
423 fn next(&mut self) -> Option<Self::Item> {
424 let name = self.names.get(self.idx)?.clone();
425 self.idx += 1;
426 Some(HostPortOwned {
427 handle: self.handle,
428 name,
429 })
430 }
431}
432
433pub struct HostPortOwned<'a> {
435 handle: &'a HostBridgeHandle,
436 name: String,
437}
438
439impl<'a> HostPortOwned<'a> {
440 pub fn name(&self) -> &str {
441 &self.name
442 }
443
444 pub fn resolved_type(&self) -> Option<&TypeExpr> {
445 self.handle.incoming_port_type(&self.name)
446 }
447
448 pub fn can_type_to<T: HostPollable>(&self) -> bool {
450 self.can_poll::<T>()
451 }
452
453 pub fn can_poll<T: HostPollable>(&self) -> bool {
454 T::can_poll(self.resolved_type())
455 }
456
457 pub fn is_type_expr(&self, ty: &TypeExpr) -> bool {
458 self.resolved_type().is_some_and(|t| t == ty)
459 }
460
461 pub fn try_pop<T: HostPollable>(&self) -> Result<Option<(u64, T)>, NodeError> {
462 let Some(payload) = self.handle.try_pop(self.name()) else {
463 return Ok(None);
464 };
465 let corr = payload.correlation_id;
466 match T::decode(self.handle, self.name(), self.resolved_type(), payload.clone()) {
467 Ok(value) => Ok(Some((corr, value))),
468 Err(err) => {
469 self.handle.restore_outbound(self.name(), payload);
470 Err(err)
471 }
472 }
473 }
474
475 pub fn try_pop_any<T: Any + Clone>(&self) -> Option<(u64, T)> {
477 let payload = self.handle.try_pop(self.name())?;
478 let corr = payload.correlation_id;
479 match payload.inner {
480 EdgePayload::Any(a) => a
481 .downcast_ref::<T>()
482 .cloned()
483 .map(|v| (corr, v)),
484 _ => None,
485 }
486 }
487}
488
489pub struct HostPorts<'a> {
491 handle: &'a HostBridgeHandle,
492 ports: &'a [String],
493 idx: usize,
494}
495
496impl<'a> Iterator for HostPorts<'a> {
497 type Item = HostPort<'a>;
498
499 fn next(&mut self) -> Option<Self::Item> {
500 let port = self.ports.get(self.idx)?.as_str();
501 self.idx += 1;
502 Some(HostPort {
503 handle: self.handle,
504 name: port,
505 })
506 }
507}
508
509#[derive(Clone, Copy)]
511pub struct HostPort<'a> {
512 handle: &'a HostBridgeHandle,
513 name: &'a str,
514}
515
516impl<'a> HostPort<'a> {
517 pub fn name(&self) -> &'a str {
518 self.name
519 }
520
521 pub fn resolved_type(&self) -> Option<&'a TypeExpr> {
523 self.handle.incoming_port_type(self.name)
524 }
525
526 pub fn can_type_to<T: HostPollable>(&self) -> bool {
528 self.can_poll::<T>()
529 }
530
531 pub fn can_poll<T: HostPollable>(&self) -> bool {
533 T::can_poll(self.resolved_type())
534 }
535
536 pub fn is_type_expr(&self, ty: &TypeExpr) -> bool {
537 self.resolved_type().is_some_and(|t| t == ty)
538 }
539
540 pub fn try_pop<T: HostPollable>(&self) -> Result<Option<(u64, T)>, NodeError> {
542 let Some(payload) = self.handle.try_pop(self.name) else {
543 return Ok(None);
544 };
545 let corr = payload.correlation_id;
546 match T::decode(self.handle, self.name, self.resolved_type(), payload.clone()) {
547 Ok(value) => Ok(Some((corr, value))),
548 Err(err) => {
549 self.handle.restore_outbound(self.name, payload);
550 Err(err)
551 }
552 }
553 }
554
555 pub fn try_pop_any<T: Any + Clone>(&self) -> Option<(u64, T)> {
557 let payload = self.handle.try_pop(self.name)?;
558 let corr = payload.correlation_id;
559 match payload.inner {
560 EdgePayload::Any(a) => a
561 .downcast_ref::<T>()
562 .cloned()
563 .map(|v| (corr, v)),
564 _ => None,
565 }
566 }
567}
568
569pub trait HostPollable: Sized {
571 fn can_poll(port_type: Option<&TypeExpr>) -> bool;
572
573 fn decode(
574 handle: &HostBridgeHandle,
575 port: &str,
576 port_type: Option<&TypeExpr>,
577 payload: CorrelatedPayload,
578 ) -> Result<Self, NodeError>;
579}
580
581impl HostPollable for Value {
582 fn can_poll(_port_type: Option<&TypeExpr>) -> bool {
583 true
584 }
585
586 fn decode(
587 _handle: &HostBridgeHandle,
588 port: &str,
589 _port_type: Option<&TypeExpr>,
590 payload: CorrelatedPayload,
591 ) -> Result<Self, NodeError> {
592 match payload.inner {
593 EdgePayload::Unit => Ok(Value::Unit),
594 EdgePayload::Bytes(bytes) => Ok(Value::Bytes(bytes.to_vec().into())),
595 EdgePayload::Value(value) => Ok(value),
596 EdgePayload::Any(any) => any_to_value(any.as_ref()).ok_or_else(|| {
597 let ty = std::any::type_name_of_val(any.as_ref());
598 NodeError::InvalidInput(format!(
599 "host bridge port {port}: payload is not value-like (type={ty})"
600 ))
601 }),
602 #[cfg(feature = "gpu")]
603 EdgePayload::Payload(_) | EdgePayload::GpuImage(_) => Err(NodeError::InvalidInput(
604 format!("host bridge port {port}: gpu payloads are not value-like"),
605 )),
606 }
607 }
608}
609
610impl HostPollable for DynamicImage {
611 fn can_poll(port_type: Option<&TypeExpr>) -> bool {
612 let Some(port_type) = port_type else {
613 return false;
614 };
615 let target = typing::lookup_type::<DynamicImage>()
616 .unwrap_or_else(|| TypeExpr::opaque("image:dynamic"));
617 port_type == &target || typing::can_convert_typeexpr(port_type, &target)
618 }
619
620 fn decode(
621 handle: &HostBridgeHandle,
622 port: &str,
623 _port_type: Option<&TypeExpr>,
624 payload: CorrelatedPayload,
625 ) -> Result<Self, NodeError> {
626 let _ = handle;
627 match payload.inner {
628 EdgePayload::Any(any) => {
629 if let Some(img) = any.downcast_ref::<DynamicImage>().cloned() {
630 return Ok(img);
631 }
632 if let Some(inner) = any.downcast_ref::<Arc<dyn Any + Send + Sync>>() {
633 let inner_ref = inner.as_ref();
634 if let Some(img) = inner_ref.downcast_ref::<DynamicImage>().cloned() {
635 return Ok(img);
636 }
637 if let Some(img) = inner_ref.downcast_ref::<image::RgbaImage>() {
638 return Ok(DynamicImage::ImageRgba8(img.clone()));
639 }
640 if let Some(img) = inner_ref.downcast_ref::<image::RgbImage>() {
641 return Ok(DynamicImage::ImageRgb8(img.clone()));
642 }
643 if let Some(img) = inner_ref.downcast_ref::<image::GrayImage>() {
644 return Ok(DynamicImage::ImageLuma8(img.clone()));
645 }
646 if let Some(img) = inner_ref.downcast_ref::<image::GrayAlphaImage>() {
647 return Ok(DynamicImage::ImageLumaA8(img.clone()));
648 }
649 if let Some(converted) = convert_arc::<DynamicImage>(inner) {
650 return Ok(converted);
651 }
652 }
653 if let Some(img) = any.downcast_ref::<image::RgbaImage>() {
654 return Ok(DynamicImage::ImageRgba8(img.clone()));
655 }
656 if let Some(img) = any.downcast_ref::<image::RgbImage>() {
657 return Ok(DynamicImage::ImageRgb8(img.clone()));
658 }
659 if let Some(img) = any.downcast_ref::<image::GrayImage>() {
660 return Ok(DynamicImage::ImageLuma8(img.clone()));
661 }
662 if let Some(img) = any.downcast_ref::<image::GrayAlphaImage>() {
663 return Ok(DynamicImage::ImageLumaA8(img.clone()));
664 }
665 if let Some(img) = any.downcast_ref::<Arc<DynamicImage>>() {
666 return Ok((**img).clone());
667 }
668 if let Some(img) = any.downcast_ref::<Arc<image::RgbaImage>>() {
669 return Ok(DynamicImage::ImageRgba8((**img).clone()));
670 }
671 if let Some(img) = any.downcast_ref::<Arc<image::RgbImage>>() {
672 return Ok(DynamicImage::ImageRgb8((**img).clone()));
673 }
674 if let Some(img) = any.downcast_ref::<Arc<image::GrayImage>>() {
675 return Ok(DynamicImage::ImageLuma8((**img).clone()));
676 }
677 if let Some(img) = any.downcast_ref::<Arc<image::GrayAlphaImage>>() {
678 return Ok(DynamicImage::ImageLumaA8((**img).clone()));
679 }
680 #[cfg(feature = "gpu")]
681 {
682 if let Some(p) = any
684 .downcast_ref::<daedalus_gpu::Payload<DynamicImage>>()
685 .cloned()
686 {
687 return match p {
688 daedalus_gpu::Payload::Cpu(img) => Ok(img),
689 daedalus_gpu::Payload::Gpu(h) => {
690 let ctx = handle.gpu_ctx().ok_or_else(|| {
691 NodeError::InvalidInput(format!(
692 "host bridge port {port}: gpu output requires a GPU context"
693 ))
694 })?;
695 <DynamicImage as daedalus_gpu::GpuSendable>::download(&h, &ctx)
696 .map_err(|e| {
697 NodeError::InvalidInput(format!(
698 "host bridge port {port}: failed to download gpu image ({e})"
699 ))
700 })
701 }
702 };
703 }
704
705 if let Some(ep) = any
707 .downcast_ref::<daedalus_gpu::ErasedPayload>()
708 .cloned()
709 {
710 if let Some(cpu) = ep.clone_cpu::<DynamicImage>() {
711 return Ok(cpu);
712 }
713 if ep.is_gpu() {
714 let ctx = handle.gpu_ctx().ok_or_else(|| {
715 NodeError::InvalidInput(format!(
716 "host bridge port {port}: gpu output requires a GPU context"
717 ))
718 })?;
719 if let Ok(downloaded) = ep.download(&ctx)
720 && let Some(cpu) = downloaded.as_cpu::<DynamicImage>().cloned()
721 {
722 return Ok(cpu);
723 }
724 }
725 }
726
727 if let Some(h) = any
729 .downcast_ref::<daedalus_gpu::GpuImageHandle>()
730 .cloned()
731 {
732 let ctx = handle.gpu_ctx().ok_or_else(|| {
733 NodeError::InvalidInput(format!(
734 "host bridge port {port}: gpu output requires a GPU context"
735 ))
736 })?;
737 return <DynamicImage as daedalus_gpu::GpuSendable>::download(&h, &ctx)
738 .map_err(|e| {
739 NodeError::InvalidInput(format!(
740 "host bridge port {port}: failed to download gpu image ({e})"
741 ))
742 });
743 }
744 }
745 if let Some(converted) = convert_arc::<DynamicImage>(&any) {
746 return Ok(converted);
747 }
748 let ty = std::any::type_name_of_val(any.as_ref());
749 Err(NodeError::InvalidInput(format!(
750 "host bridge port {port}: Any payload is not a DynamicImage (type={ty})"
751 )))
752 }
753 #[cfg(feature = "gpu")]
754 EdgePayload::GpuImage(handle_img) => {
755 let ctx = handle.gpu_ctx().ok_or_else(|| {
756 NodeError::InvalidInput(format!(
757 "host bridge port {port}: gpu output requires a GPU context"
758 ))
759 })?;
760 <DynamicImage as daedalus_gpu::GpuSendable>::download(&handle_img, &ctx).map_err(
761 |e| {
762 NodeError::InvalidInput(format!(
763 "host bridge port {port}: failed to download gpu image ({e})"
764 ))
765 },
766 )
767 }
768 #[cfg(feature = "gpu")]
769 EdgePayload::Payload(ep) => {
770 if let Some(cpu) = ep.clone_cpu::<DynamicImage>() {
771 return Ok(cpu);
772 }
773 if let Some(gpu) = ep.clone_gpu::<DynamicImage>() {
774 let ctx = handle.gpu_ctx().ok_or_else(|| {
775 NodeError::InvalidInput(format!(
776 "host bridge port {port}: gpu output requires a GPU context"
777 ))
778 })?;
779 return <DynamicImage as daedalus_gpu::GpuSendable>::download(&gpu, &ctx)
780 .map_err(|e| {
781 NodeError::InvalidInput(format!(
782 "host bridge port {port}: failed to download gpu payload ({e})"
783 ))
784 });
785 }
786 Err(NodeError::InvalidInput(format!(
787 "host bridge port {port}: payload does not contain an image"
788 )))
789 }
790 EdgePayload::Unit | EdgePayload::Bytes(_) => Err(NodeError::InvalidInput(format!(
791 "host bridge port {port}: payload is not an image"
792 ))),
793 EdgePayload::Value(_) => Err(NodeError::InvalidInput(format!(
794 "host bridge port {port}: payload is not an image"
795 ))),
796 }
797 }
798}
799
800#[cfg(feature = "gpu")]
801impl HostPollable for daedalus_gpu::GpuImageHandle {
802 fn can_poll(port_type: Option<&TypeExpr>) -> bool {
803 matches!(port_type, Some(t) if *t == TypeExpr::opaque("image:dynamic"))
805 }
806
807 fn decode(
808 _handle: &HostBridgeHandle,
809 port: &str,
810 _port_type: Option<&TypeExpr>,
811 payload: CorrelatedPayload,
812 ) -> Result<Self, NodeError> {
813 match payload.inner {
814 EdgePayload::GpuImage(h) => Ok(h),
815 EdgePayload::Payload(ep) => ep.clone_gpu::<DynamicImage>().ok_or_else(|| {
816 NodeError::InvalidInput(format!(
817 "host bridge port {port}: payload does not contain a gpu image handle"
818 ))
819 }),
820 EdgePayload::Any(any) => any
821 .downcast_ref::<daedalus_gpu::GpuImageHandle>()
822 .cloned()
823 .ok_or_else(|| {
824 NodeError::InvalidInput(format!(
825 "host bridge port {port}: Any payload is not a gpu image handle"
826 ))
827 }),
828 EdgePayload::Unit | EdgePayload::Bytes(_) => Err(NodeError::InvalidInput(format!(
829 "host bridge port {port}: payload is not a gpu image handle"
830 ))),
831 EdgePayload::Value(_) => Err(NodeError::InvalidInput(format!(
832 "host bridge port {port}: payload is not a gpu image handle"
833 ))),
834 }
835 }
836}
837
838#[cfg(feature = "gpu")]
839impl<T> HostPollable for daedalus_gpu::Payload<T>
840where
841 T: daedalus_gpu::GpuSendable + Clone + Send + Sync + 'static,
842 T::GpuRepr: Clone + Send + Sync + 'static,
843{
844 fn can_poll(_port_type: Option<&TypeExpr>) -> bool {
845 true
846 }
847
848 fn decode(
849 handle: &HostBridgeHandle,
850 port: &str,
851 _port_type: Option<&TypeExpr>,
852 payload: CorrelatedPayload,
853 ) -> Result<Self, NodeError> {
854 match payload.inner {
855 EdgePayload::Payload(ep) => {
856 if let Some(cpu) = ep.clone_cpu::<T>() {
857 return Ok(daedalus_gpu::Payload::Cpu(cpu));
858 }
859 if let Some(g) = ep.clone_gpu::<T>() {
860 return Ok(daedalus_gpu::Payload::Gpu(g));
861 }
862 Err(NodeError::InvalidInput(format!(
863 "host bridge port {port}: payload does not contain requested type"
864 )))
865 }
866 EdgePayload::GpuImage(h) => {
867 let any_ref: &dyn Any = &h;
868 if let Some(repr) = any_ref.downcast_ref::<T::GpuRepr>() {
869 return Ok(daedalus_gpu::Payload::Gpu(repr.clone()));
870 }
871 Err(NodeError::InvalidInput(format!(
872 "host bridge port {port}: gpu image handle is not compatible with requested payload type"
873 )))
874 }
875 EdgePayload::Any(any) => {
876 if let Some(p) = any
877 .downcast_ref::<daedalus_gpu::Payload<T>>()
878 .cloned()
879 {
880 return Ok(p);
881 }
882 if let Some(cpu) = any
883 .downcast_ref::<T>()
884 .cloned()
885 {
886 return Ok(daedalus_gpu::Payload::Cpu(cpu));
887 }
888 if let Some(g) = any
889 .downcast_ref::<T::GpuRepr>()
890 .cloned()
891 {
892 return Ok(daedalus_gpu::Payload::Gpu(g));
893 }
894 if let Some(ep) = any
895 .downcast_ref::<daedalus_gpu::ErasedPayload>()
896 .cloned()
897 {
898 if let Some(cpu) = ep.clone_cpu::<T>() {
899 return Ok(daedalus_gpu::Payload::Cpu(cpu));
900 }
901 if let Some(g) = ep.clone_gpu::<T>() {
902 return Ok(daedalus_gpu::Payload::Gpu(g));
903 }
904 if ep.is_gpu()
905 && let Some(ctx) = handle.gpu_ctx()
906 && let Ok(downloaded) = ep.download(&ctx)
907 && let Some(cpu) = downloaded.as_cpu::<T>()
908 {
909 return Ok(daedalus_gpu::Payload::Cpu(cpu.clone()));
910 }
911 }
912 Err(NodeError::InvalidInput(format!(
913 "host bridge port {port}: Any payload is not compatible with requested payload type"
914 )))
915 }
916 EdgePayload::Unit | EdgePayload::Bytes(_) => Err(NodeError::InvalidInput(format!(
917 "host bridge port {port}: payload is not compatible with requested payload type"
918 ))),
919 EdgePayload::Value(_) => Err(NodeError::InvalidInput(format!(
920 "host bridge port {port}: payload is not compatible with requested payload type"
921 ))),
922 }
923 }
924}
925
926pub trait HostValuePort {
928 fn name(&self) -> &str;
929 fn try_pop_value(&self) -> Result<Option<(u64, Value)>, NodeError>;
930}
931
932impl<'a> HostValuePort for HostPort<'a> {
933 fn name(&self) -> &str {
934 self.name()
935 }
936
937 fn try_pop_value(&self) -> Result<Option<(u64, Value)>, NodeError> {
938 self.try_pop::<Value>()
939 }
940}
941
942impl<'a> HostValuePort for HostPortOwned<'a> {
943 fn name(&self) -> &str {
944 self.name()
945 }
946
947 fn try_pop_value(&self) -> Result<Option<(u64, Value)>, NodeError> {
948 self.try_pop::<Value>()
949 }
950}
951
952pub trait HostPortIterExt: Iterator + Sized
954where
955 Self::Item: HostValuePort,
956{
957 fn try_pop_all_values(self) -> Result<Vec<(String, u64, Value)>, NodeError> {
958 let mut out = Vec::new();
959 for port in self {
960 if let Some((corr, value)) = port.try_pop_value()? {
961 out.push((port.name().to_string(), corr, value));
962 }
963 }
964 Ok(out)
965 }
966}
967
968impl<I> HostPortIterExt for I
969where
970 I: Iterator + Sized,
971 I::Item: HostValuePort,
972{
973}
974
975#[derive(Clone, Default)]
977pub struct HostBridgeManager {
978 inner: Arc<Mutex<HashMap<String, Arc<Mutex<HostBridgeBuffers>>>>>,
979 outgoing: Arc<Mutex<HashMap<String, HashMap<String, EdgePolicyKind>>>>,
980 outgoing_types: Arc<Mutex<HashMap<String, HashMap<String, TypeExpr>>>>,
981 incoming_types: Arc<Mutex<HashMap<String, HashMap<String, TypeExpr>>>>,
982 #[cfg(feature = "gpu")]
983 gpu: Arc<Mutex<Option<GpuContextHandle>>>,
984}
985
986static HOST_BRIDGE_OUTBOUND_LOGS: AtomicU64 = AtomicU64::new(0);
987
988impl HostBridgeManager {
989 pub fn new() -> Self {
991 Self::default()
992 }
993
994 #[cfg(feature = "gpu")]
997 pub fn attach_gpu(&self, gpu: GpuContextHandle) {
998 let mut guard = self.gpu.lock().expect("host bridge gpu lock poisoned");
999 *guard = Some(gpu);
1000 }
1001
1002 pub fn register_bridge(
1004 &self,
1005 alias: impl Into<String>,
1006 ports: impl IntoIterator<Item = (String, EdgePolicyKind)>,
1007 ) {
1008 let alias = alias.into().to_ascii_lowercase();
1009 {
1010 let mut guard = self.inner.lock().expect("host bridge map poisoned");
1011 guard.entry(alias.clone()).or_default();
1012 }
1013 let mut guard = self.outgoing.lock().expect("host bridge ports poisoned");
1014 guard.insert(alias, ports.into_iter().collect());
1015 }
1016
1017 pub fn register_port_types(
1019 &self,
1020 alias: impl Into<String>,
1021 outgoing: impl IntoIterator<Item = (String, TypeExpr)>,
1022 incoming: impl IntoIterator<Item = (String, TypeExpr)>,
1023 ) {
1024 let alias = alias.into().to_ascii_lowercase();
1025 {
1026 let mut guard = self.inner.lock().expect("host bridge map poisoned");
1027 guard.entry(alias.clone()).or_default();
1028 }
1029 let mut out_guard = self
1030 .outgoing_types
1031 .lock()
1032 .expect("host bridge types poisoned");
1033 let out_map = out_guard.entry(alias.clone()).or_default();
1034 for (port, ty) in outgoing {
1035 out_map.insert(port.to_ascii_lowercase(), ty);
1036 }
1037
1038 let mut in_guard = self
1039 .incoming_types
1040 .lock()
1041 .expect("host bridge types poisoned");
1042 let in_map = in_guard.entry(alias).or_default();
1043 for (port, ty) in incoming {
1044 in_map.insert(port.to_ascii_lowercase(), ty);
1045 }
1046 }
1047
1048 pub fn from_plan(plan: &crate::plan::RuntimePlan) -> Self {
1050 let mgr = Self::new();
1051 mgr.populate_from_plan(plan);
1052 mgr
1053 }
1054
1055 pub fn populate_from_plan(&self, plan: &crate::plan::RuntimePlan) {
1057 fn parse_type_map(value: Option<&Value>) -> Vec<(String, TypeExpr)> {
1058 let Some(Value::Map(entries)) = value else {
1059 return Vec::new();
1060 };
1061 let mut out = Vec::new();
1062 for (k, v) in entries {
1063 let (Value::String(port), Value::String(json)) = (k, v) else {
1064 continue;
1065 };
1066 if let Ok(ty) = serde_json::from_str::<TypeExpr>(json) {
1067 out.push((port.to_ascii_lowercase(), ty));
1068 }
1069 }
1070 out.sort_by(|a, b| a.0.cmp(&b.0));
1071 out
1072 }
1073
1074 for (idx, node) in plan.nodes.iter().enumerate() {
1075 let is_bridge = matches!(
1076 node.metadata.get(HOST_BRIDGE_META_KEY),
1077 Some(Value::Bool(true))
1078 );
1079 if !is_bridge {
1080 continue;
1081 }
1082 let alias = node
1083 .label
1084 .as_deref()
1085 .unwrap_or(node.id.as_str())
1086 .to_ascii_lowercase();
1087 let outgoing_types = parse_type_map(node.metadata.get("dynamic_output_types"));
1088 let incoming_types = parse_type_map(node.metadata.get("dynamic_input_types"));
1089 let mut outgoing: HashMap<String, EdgePolicyKind> = HashMap::new();
1091 for (from, from_port, _, _, policy) in plan.edges.iter() {
1092 if from.0 == idx {
1093 outgoing.insert(from_port.to_ascii_lowercase(), policy.clone());
1094 }
1095 }
1096 self.register_bridge(alias.clone(), outgoing.into_iter());
1097 self.register_port_types(alias, outgoing_types, incoming_types);
1098 }
1099 }
1100
1101 pub fn handle(&self, alias: impl AsRef<str>) -> Option<HostBridgeHandle> {
1103 let alias = alias.as_ref().to_ascii_lowercase();
1104 let shared = {
1105 let guard = self.inner.lock().ok()?;
1106 guard.get(&alias)?.clone()
1107 };
1108 let outgoing = {
1109 let guard = self.outgoing.lock().ok()?;
1110 guard.get(&alias)?.clone()
1111 };
1112 let outgoing_types = {
1113 let guard = self.outgoing_types.lock().ok()?;
1114 guard.get(&alias).cloned().unwrap_or_default()
1115 };
1116 let incoming_types = {
1117 let guard = self.incoming_types.lock().ok()?;
1118 guard.get(&alias).cloned().unwrap_or_default()
1119 };
1120 Some(HostBridgeHandle::new(
1121 alias,
1122 shared,
1123 outgoing,
1124 outgoing_types,
1125 incoming_types,
1126 #[cfg(feature = "gpu")]
1127 self.gpu.clone(),
1128 ))
1129 }
1130
1131 fn push_outbound(&self, alias: &str, port: &str, payload: CorrelatedPayload) {
1133 if let Ok(mut guard) = self.inner.lock()
1134 && let Some(shared) = guard.get_mut(&alias.to_ascii_lowercase())
1135 && let Ok(mut buf) = shared.lock()
1136 {
1137 let key = port.to_ascii_lowercase();
1138 let q = buf.outbound.entry(key.clone()).or_default();
1139 q.push_back(payload);
1140 if host_bridge_trace_enabled() {
1141 let count = HOST_BRIDGE_OUTBOUND_LOGS.fetch_add(1, Ordering::Relaxed);
1142 if count < 5 || count.is_multiple_of(500) {
1143 log::debug!(
1144 "host-bridge outbound queued alias={} port={} len={}",
1145 alias,
1146 key,
1147 q.len()
1148 );
1149 }
1150 }
1151 if let Some(waker) = buf.wakers.get(&key) {
1152 waker.wake();
1153 }
1154 }
1155 }
1156
1157 fn take_inbound(&self, alias: &str) -> Vec<(String, CorrelatedPayload)> {
1159 if let Ok(mut guard) = self.inner.lock()
1160 && let Some(shared) = guard.get_mut(&alias.to_ascii_lowercase())
1161 && let Ok(mut buf) = shared.lock()
1162 {
1163 let mut drained = Vec::new();
1164 for (port, queue) in buf.inbound.iter_mut() {
1165 while let Some(p) = queue.pop_front() {
1166 drained.push((port.clone(), p));
1167 }
1168 }
1169 return drained;
1170 }
1171 Vec::new()
1172 }
1173}
1174
1175pub fn bridge_handler(
1177 bridges: HostBridgeManager,
1178) -> impl FnMut(
1179 &crate::plan::RuntimeNode,
1180 &crate::state::ExecutionContext,
1181 &mut NodeIo,
1182) -> Result<(), NodeError> {
1183 move |node, _ctx, io| {
1184 let alias = node
1185 .label
1186 .as_deref()
1187 .unwrap_or(node.id.as_str())
1188 .to_ascii_lowercase();
1189
1190 let inbound = bridges.take_inbound(&alias);
1192 if host_bridge_trace_enabled() && !inbound.is_empty() {
1193 let mut entries = Vec::new();
1194 for (port, payload) in &inbound {
1195 entries.push(format!("{}#{}", port, describe_payload(payload)));
1196 }
1197 log::debug!(
1198 "host-bridge inbound alias={} node={} ports={}",
1199 alias,
1200 node.id,
1201 entries.join(", ")
1202 );
1203 }
1204 for (port, payload) in inbound {
1205 io.push_correlated_payload(Some(&port), payload);
1206 }
1207
1208 if host_bridge_trace_enabled() && !io.inputs().is_empty() {
1210 let mut entries = Vec::new();
1211 for (port, payload) in io.inputs() {
1212 entries.push(format!("{}#{}", port, describe_edge(payload)));
1213 }
1214 log::debug!(
1215 "host-bridge outbound alias={} node={} ports={}",
1216 alias,
1217 node.id,
1218 entries.join(", ")
1219 );
1220 }
1221 for (port, payload) in io.inputs() {
1222 bridges.push_outbound(&alias, port, payload.clone());
1223 }
1224 Ok(())
1225 }
1226}
1227
1228fn host_bridge_trace_enabled() -> bool {
1229 static FLAG: OnceLock<bool> = OnceLock::new();
1230 *FLAG.get_or_init(|| {
1231 std::env::var("DAEDALUS_HOST_BRIDGE_TRACE")
1232 .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
1233 .unwrap_or(false)
1234 })
1235}
1236
1237fn describe_payload(p: &CorrelatedPayload) -> String {
1238 match &p.inner {
1239 EdgePayload::Any(_) => "any".to_string(),
1240 EdgePayload::Value(v) => format!("value({})", describe_value(v)),
1241 EdgePayload::Bytes(b) => format!("bytes({}b)", b.len()),
1242 EdgePayload::Unit => "unit".to_string(),
1243 #[cfg(feature = "gpu")]
1244 EdgePayload::GpuImage(_) => "gpu_image".to_string(),
1245 #[cfg(feature = "gpu")]
1246 EdgePayload::Payload(ep) => {
1247 if ep.is_gpu() { "gpu_payload" } else { "cpu_payload" }.to_string()
1248 },
1249 }
1250}
1251
1252fn describe_edge(p: &CorrelatedPayload) -> String {
1253 match &p.inner {
1254 EdgePayload::Any(_) => "any".to_string(),
1255 EdgePayload::Value(v) => format!("value({})", describe_value(v)),
1256 EdgePayload::Bytes(b) => format!("bytes({}b)", b.len()),
1257 EdgePayload::Unit => "unit".to_string(),
1258 #[cfg(feature = "gpu")]
1259 EdgePayload::GpuImage(_) => "gpu_image".to_string(),
1260 #[cfg(feature = "gpu")]
1261 EdgePayload::Payload(ep) => {
1262 if ep.is_gpu() { "gpu_payload" } else { "cpu_payload" }.to_string()
1263 },
1264 }
1265}
1266
1267fn describe_value(v: &Value) -> String {
1268 match v {
1269 Value::Int(i) => format!("int({})", i),
1270 Value::Float(f) => format!("float({})", f),
1271 Value::Bool(b) => format!("bool({})", b),
1272 Value::String(s) => format!("string({})", s),
1273 Value::Enum(ev) => format!("enum({})", ev.name),
1274 Value::List(_) => "list".to_string(),
1275 Value::Struct(_) => "struct".to_string(),
1276 Value::Tuple(_) => "tuple".to_string(),
1277 Value::Map(_) => "map".to_string(),
1278 Value::Unit => "unit".to_string(),
1279 Value::Bytes(b) => format!("bytes({}b)", b.len()),
1280 }
1281}
1282
1283#[derive(Clone, Debug, PartialEq)]
1285pub enum HostBridgeSerialized {
1286 Json(String),
1287 Bytes(Arc<[u8]>),
1288}
1289
1290#[derive(Clone, Debug, PartialEq)]
1292pub struct HostBridgeSerializedPayload {
1293 pub correlation_id: u64,
1294 pub port_type: Option<TypeExpr>,
1295 pub payload: HostBridgeSerialized,
1296}
1297
1298fn deserialize_serialized_payload(
1299 port: &str,
1300 port_type: Option<&TypeExpr>,
1301 payload: HostBridgeSerialized,
1302) -> Result<EdgePayload, NodeError> {
1303 match payload {
1304 HostBridgeSerialized::Bytes(bytes) => {
1305 if port_type.is_some_and(is_bytes_type) {
1306 return Ok(EdgePayload::Bytes(bytes));
1307 }
1308 let text = std::str::from_utf8(&bytes).map_err(|err| {
1309 NodeError::InvalidInput(format!(
1310 "host bridge port {port}: bytes are not utf-8 ({err})"
1311 ))
1312 })?;
1313 let value = parse_json_value(port, text)?;
1314 Ok(value_payload(value))
1315 }
1316 HostBridgeSerialized::Json(json) => {
1317 let value = parse_json_value(port, &json)?;
1318 Ok(value_payload(value))
1319 }
1320 }
1321}
1322
1323fn serialize_outbound_payload(
1324 port: &str,
1325 port_type: Option<&TypeExpr>,
1326 payload: CorrelatedPayload,
1327) -> Result<HostBridgeSerializedPayload, NodeError> {
1328 let serialized = match payload.inner {
1329 EdgePayload::Unit => {
1330 HostBridgeSerialized::Json(serialize_value_to_json(port, &Value::Unit)?)
1331 }
1332 EdgePayload::Bytes(bytes) => HostBridgeSerialized::Bytes(bytes),
1333 EdgePayload::Value(value) => {
1334 HostBridgeSerialized::Json(serialize_value_to_json(port, &value)?)
1335 }
1336 EdgePayload::Any(any) => {
1337 if port_type.is_some_and(is_bytes_type) {
1338 if let Some(bytes) = any_to_bytes(any.as_ref()) {
1339 HostBridgeSerialized::Bytes(bytes)
1340 } else if let Some(value) = any_to_value(any.as_ref()) {
1341 HostBridgeSerialized::Json(serialize_value_to_json(port, &value)?)
1342 } else {
1343 return Err(NodeError::InvalidInput(format!(
1344 "host bridge port {port}: unsupported Any payload for bytes output"
1345 )));
1346 }
1347 } else if let Some(value) = any_to_value(any.as_ref()) {
1348 HostBridgeSerialized::Json(serialize_value_to_json(port, &value)?)
1349 } else if let Some(bytes) = any_to_bytes(any.as_ref()) {
1350 HostBridgeSerialized::Bytes(bytes)
1351 } else {
1352 return Err(NodeError::InvalidInput(format!(
1353 "host bridge port {port}: unsupported Any payload"
1354 )));
1355 }
1356 }
1357 #[cfg(feature = "gpu")]
1358 EdgePayload::Payload(_) | EdgePayload::GpuImage(_) => {
1359 return Err(NodeError::InvalidInput(format!(
1360 "host bridge port {port}: gpu payloads cannot be serialized"
1361 )));
1362 }
1363 };
1364
1365 Ok(HostBridgeSerializedPayload {
1366 correlation_id: payload.correlation_id,
1367 port_type: port_type.cloned(),
1368 payload: serialized,
1369 })
1370}
1371
1372fn is_bytes_type(ty: &TypeExpr) -> bool {
1373 matches!(ty, TypeExpr::Scalar(ValueType::Bytes))
1374}
1375
1376fn parse_json_value(port: &str, json_str: &str) -> Result<Value, NodeError> {
1377 json::from_json(json_str).map_err(|err| {
1378 NodeError::InvalidInput(format!(
1379 "host bridge port {port}: invalid typed json ({err})"
1380 ))
1381 })
1382}
1383
1384fn serialize_value_to_json(port: &str, value: &Value) -> Result<String, NodeError> {
1385 json::to_json(value).map_err(|err| {
1386 NodeError::InvalidInput(format!(
1387 "host bridge port {port}: failed to serialize value ({err})"
1388 ))
1389 })
1390}
1391
1392fn value_payload(value: Value) -> EdgePayload {
1393 EdgePayload::Value(value)
1394}
1395
1396type AnyValueSerializer = Box<dyn Fn(&dyn Any) -> Option<Value> + Send + Sync + 'static>;
1397
1398fn value_serializers() -> &'static RwLock<Vec<AnyValueSerializer>> {
1399 static REGISTRY: OnceLock<RwLock<Vec<AnyValueSerializer>>> = OnceLock::new();
1400 REGISTRY.get_or_init(|| RwLock::new(Vec::new()))
1401}
1402
1403fn try_serialize_any_value(any: &dyn Any) -> Option<Value> {
1404 let guard = value_serializers().read().ok()?;
1405 for serializer in guard.iter() {
1406 if let Some(value) = serializer(any) {
1407 return Some(value);
1408 }
1409 }
1410 None
1411}
1412
1413pub fn register_value_serializer<T, F>(serializer: F)
1417where
1418 T: Any + Clone + Send + Sync + 'static,
1419 F: Fn(&T) -> Value + Send + Sync + 'static,
1420{
1421 let mut guard = value_serializers()
1422 .write()
1423 .expect("daedalus-runtime host bridge serializer lock poisoned");
1424 guard.push(Box::new(move |any| {
1425 if let Some(value) = any.downcast_ref::<T>() {
1426 return Some(serializer(value));
1427 }
1428 None
1429 }));
1430}
1431
1432fn any_to_value(any: &dyn Any) -> Option<Value> {
1433 if let Some(value) = try_serialize_any_value(any) {
1434 return Some(value);
1435 }
1436 let value = any
1437 .downcast_ref::<Value>()
1438 .cloned();
1439 if let Some(value) = value {
1440 return Some(value);
1441 }
1442
1443 let i = any
1444 .downcast_ref::<i64>()
1445 .copied();
1446 if let Some(i) = i {
1447 return Some(Value::Int(i));
1448 }
1449
1450 let f = any
1451 .downcast_ref::<f64>()
1452 .copied();
1453 if let Some(f) = f {
1454 return Some(Value::Float(f));
1455 }
1456
1457 let b = any
1458 .downcast_ref::<bool>()
1459 .copied();
1460 if let Some(b) = b {
1461 return Some(Value::Bool(b));
1462 }
1463
1464 let s = any
1465 .downcast_ref::<String>()
1466 .cloned();
1467 if let Some(s) = s {
1468 return Some(Value::String(s.into()));
1469 }
1470
1471 let bytes = any
1472 .downcast_ref::<Vec<u8>>()
1473 .cloned();
1474 bytes.map(|b| Value::Bytes(b.into()))
1475}
1476
1477fn any_to_bytes(any: &dyn Any) -> Option<Arc<[u8]>> {
1478 if let Some(bytes) = any.downcast_ref::<Vec<u8>>().cloned() {
1479 return Some(Arc::from(bytes));
1480 }
1481 if let Some(bytes) = any.downcast_ref::<Arc<[u8]>>() {
1482 return Some(bytes.clone());
1483 }
1484 None
1485}
1486
1487#[cfg(test)]
1488mod tests {
1489 use super::*;
1490
1491 fn empty_policies() -> Vec<(String, EdgePolicyKind)> {
1492 Vec::new()
1493 }
1494
1495 #[test]
1496 fn push_serialized_json_decodes_to_value() {
1497 let mgr = HostBridgeManager::new();
1498 mgr.register_bridge("host", empty_policies());
1499 mgr.register_port_types(
1500 "host",
1501 vec![("config".into(), TypeExpr::Scalar(ValueType::Int))],
1502 Vec::new(),
1503 );
1504 let handle = mgr.handle("host").expect("host handle");
1505
1506 let json = json::to_json(&Value::Int(5)).expect("json");
1507 let id = handle
1508 .push_serialized("config", HostBridgeSerialized::Json(json), None)
1509 .expect("push");
1510
1511 let inbound = mgr.take_inbound("host");
1512 assert_eq!(inbound.len(), 1);
1513 let (port, payload) = &inbound[0];
1514 assert_eq!(port, "config");
1515 assert_eq!(payload.correlation_id, id);
1516
1517 match &payload.inner {
1518 EdgePayload::Value(Value::Int(v)) => assert_eq!(*v, 5),
1519 other => panic!("unexpected payload {other:?}"),
1520 }
1521 }
1522
1523 #[test]
1524 fn try_pop_serialized_encodes_value_to_json() {
1525 let mgr = HostBridgeManager::new();
1526 mgr.register_bridge("host", empty_policies());
1527 mgr.register_port_types(
1528 "host",
1529 Vec::new(),
1530 vec![("status".into(), TypeExpr::Scalar(ValueType::Int))],
1531 );
1532 let payload = CorrelatedPayload::from_edge(value_payload(Value::Int(7)));
1533 mgr.push_outbound("host", "status", payload);
1534
1535 let handle = mgr.handle("host").expect("host handle");
1536 let serialized = handle
1537 .try_pop_serialized("status")
1538 .expect("serialize")
1539 .expect("payload");
1540
1541 assert_eq!(serialized.port_type, Some(TypeExpr::Scalar(ValueType::Int)));
1542
1543 match serialized.payload {
1544 HostBridgeSerialized::Json(json_str) => {
1545 let value = json::from_json(&json_str).expect("from json");
1546 assert_eq!(value, Value::Int(7));
1547 }
1548 other => panic!("unexpected payload {other:?}"),
1549 }
1550 }
1551
1552 #[test]
1553 fn bytes_payloads_pass_through_serialization() {
1554 let mgr = HostBridgeManager::new();
1555 mgr.register_bridge("host", empty_policies());
1556 mgr.register_port_types(
1557 "host",
1558 Vec::new(),
1559 vec![("blob".into(), TypeExpr::Scalar(ValueType::Bytes))],
1560 );
1561 let bytes: Arc<[u8]> = Arc::from(vec![1_u8, 2, 3, 4]);
1562 let payload = CorrelatedPayload::from_edge(EdgePayload::Bytes(bytes.clone()));
1563 mgr.push_outbound("host", "blob", payload);
1564
1565 let handle = mgr.handle("host").expect("host handle");
1566 let serialized = handle
1567 .try_pop_serialized("blob")
1568 .expect("serialize")
1569 .expect("payload");
1570
1571 match serialized.payload {
1572 HostBridgeSerialized::Bytes(out) => assert_eq!(out.as_ref(), bytes.as_ref()),
1573 other => panic!("unexpected payload {other:?}"),
1574 }
1575 }
1576
1577 #[test]
1578 fn try_pop_value_decodes_value_like_any() {
1579 let mgr = HostBridgeManager::new();
1580 mgr.register_bridge("host", empty_policies());
1581 mgr.register_port_types(
1582 "host",
1583 Vec::new(),
1584 vec![("out".into(), TypeExpr::Scalar(ValueType::Int))],
1585 );
1586 mgr.push_outbound(
1587 "host",
1588 "out",
1589 CorrelatedPayload::from_edge(EdgePayload::Any(Arc::new(7_i64))),
1590 );
1591
1592 let host = mgr.handle("host").expect("host handle");
1593 let (_corr, value) = host.try_pop_value("out").expect("value").expect("payload");
1594 assert_eq!(value, Value::Int(7));
1595 }
1596
1597 #[test]
1598 fn try_pop_value_rejects_non_value_any() {
1599 #[derive(Clone)]
1600 struct NotValue;
1601
1602 let mgr = HostBridgeManager::new();
1603 mgr.register_bridge("host", empty_policies());
1604 mgr.register_port_types(
1605 "host",
1606 Vec::new(),
1607 vec![("out".into(), TypeExpr::opaque("image:dynamic"))],
1608 );
1609 mgr.push_outbound(
1610 "host",
1611 "out",
1612 CorrelatedPayload::from_edge(EdgePayload::Any(Arc::new(NotValue))),
1613 );
1614
1615 let host = mgr.handle("host").expect("host handle");
1616 let err = host.try_pop_value("out").expect_err("expected error");
1617 match err {
1618 NodeError::InvalidInput(msg) => assert!(msg.contains("not value-like")),
1619 other => panic!("unexpected error {other:?}"),
1620 }
1621 }
1622
1623 #[test]
1624 fn drain_values_decodes_multiple_payloads() {
1625 let mgr = HostBridgeManager::new();
1626 mgr.register_bridge("host", empty_policies());
1627 mgr.register_port_types(
1628 "host",
1629 Vec::new(),
1630 vec![("out".into(), TypeExpr::Scalar(ValueType::Int))],
1631 );
1632 mgr.push_outbound(
1633 "host",
1634 "out",
1635 CorrelatedPayload::from_edge(value_payload(Value::Int(1))),
1636 );
1637 mgr.push_outbound(
1638 "host",
1639 "out",
1640 CorrelatedPayload::from_edge(value_payload(Value::Int(2))),
1641 );
1642
1643 let host = mgr.handle("host").expect("host handle");
1644 let vals = host.drain_values("out").expect("drain");
1645 assert_eq!(vals.len(), 2);
1646 assert_eq!(vals[0].1, Value::Int(1));
1647 assert_eq!(vals[1].1, Value::Int(2));
1648 }
1649
1650 #[test]
1651 fn iter_ports_can_filter_and_pop_values() {
1652 let mgr = HostBridgeManager::new();
1653 mgr.register_bridge("host", empty_policies());
1654 mgr.register_port_types(
1655 "host",
1656 Vec::new(),
1657 vec![
1658 ("a".into(), TypeExpr::Scalar(ValueType::Int)),
1659 ("b".into(), TypeExpr::Scalar(ValueType::Int)),
1660 ],
1661 );
1662 mgr.push_outbound(
1663 "host",
1664 "a",
1665 CorrelatedPayload::from_edge(value_payload(Value::Int(42))),
1666 );
1667
1668 let host = mgr.handle("host").expect("host handle");
1669 let ports = vec!["a".to_string(), "b".to_string()];
1670 let got = host
1671 .iter_ports(&ports)
1672 .filter(|p| p.can_poll::<Value>())
1673 .try_pop_all_values()
1674 .expect("pop");
1675 assert_eq!(got.len(), 1);
1676 assert_eq!(got[0].0, "a");
1677 assert_eq!(got[0].2, Value::Int(42));
1678 }
1679
1680 #[test]
1681 fn incoming_ports_can_filter_and_pop_values() {
1682 let mgr = HostBridgeManager::new();
1683 mgr.register_bridge("host", empty_policies());
1684 mgr.register_port_types(
1685 "host",
1686 Vec::new(),
1687 vec![
1688 ("a".into(), TypeExpr::Scalar(ValueType::Int)),
1689 ("b".into(), TypeExpr::Scalar(ValueType::Int)),
1690 ],
1691 );
1692 mgr.push_outbound(
1693 "host",
1694 "a",
1695 CorrelatedPayload::from_edge(value_payload(Value::Int(42))),
1696 );
1697
1698 let host = mgr.handle("host").expect("host handle");
1699 let got = host
1700 .incoming_ports()
1701 .filter(|p| p.can_type_to::<Value>())
1702 .try_pop_all_values()
1703 .expect("pop");
1704 assert_eq!(got.len(), 1);
1705 assert_eq!(got[0].0, "a");
1706 assert_eq!(got[0].2, Value::Int(42));
1707 }
1708
1709 #[test]
1710 fn iter_ports_can_poll_dynamic_image_by_resolved_type() {
1711 use image::{GenericImageView, ImageBuffer, Rgba};
1712
1713 let mgr = HostBridgeManager::new();
1714 mgr.register_bridge("host", empty_policies());
1715 mgr.register_port_types(
1716 "host",
1717 Vec::new(),
1718 vec![("frame".into(), TypeExpr::opaque("image:dynamic"))],
1719 );
1720
1721 let img = DynamicImage::ImageRgba8(ImageBuffer::from_pixel(2, 2, Rgba([0, 0, 0, 255])));
1722 mgr.push_outbound(
1723 "host",
1724 "frame",
1725 CorrelatedPayload::from_edge(EdgePayload::Any(Arc::new(img.clone()))),
1726 );
1727
1728 let host = mgr.handle("host").expect("host handle");
1729 let ports = vec!["frame".to_string()];
1730 let port = host.iter_ports(&ports).next().expect("port");
1731 assert!(port.can_poll::<DynamicImage>());
1732 let (_corr, got) = port
1733 .try_pop::<DynamicImage>()
1734 .expect("pop")
1735 .expect("payload");
1736 assert_eq!(got.dimensions(), img.dimensions());
1737 }
1738}