daedalus_runtime/
io.rs

1use std::any::{Any, TypeId};
2use std::collections::BTreeMap;
3use std::collections::HashMap;
4#[cfg(feature = "gpu")]
5use std::collections::HashSet;
6use std::collections::VecDeque;
7use std::sync::Arc;
8use std::time::Instant;
9use std::sync::{OnceLock, RwLock};
10
11use daedalus_data::model::{TypeExpr, Value};
12use daedalus_data::typing;
13#[cfg(feature = "gpu")]
14use image::{DynamicImage, GrayAlphaImage, GrayImage, RgbImage, RgbaImage};
15
16use crate::executor::queue::{apply_policy, apply_policy_owned, ApplyPolicyOwnedArgs};
17use crate::executor::{
18    CorrelatedPayload, EdgePayload, EdgeStorage, ExecutionTelemetry, next_correlation_id,
19};
20use crate::fanin::parse_indexed_port;
21#[allow(unused_imports)]
22use crate::plan::{BackpressureStrategy, EdgePolicyKind, RuntimeNode};
23use daedalus_core::sync::{SyncGroup, SyncPolicy};
24use daedalus_planner::NodeRef;
25
26type EdgeInfo = (NodeRef, String, NodeRef, String, EdgePolicyKind);
27
28#[derive(Clone)]
29struct DrainedInput {
30    port: String,
31    edge_idx: usize,
32    payload: CorrelatedPayload,
33}
34
35pub type ConstCoercer = Box<
36    dyn Fn(&daedalus_data::model::Value) -> Option<Box<dyn Any + Send + Sync>>
37        + Send
38        + Sync
39        + 'static,
40>;
41
42pub type ConstCoercerMap = Arc<RwLock<HashMap<&'static str, ConstCoercer>>>;
43
44static GLOBAL_CONST_COERCERS: OnceLock<ConstCoercerMap> = OnceLock::new();
45type OutputMover = Box<
46    dyn Fn(Box<dyn Any + Send + Sync>) -> EdgePayload + Send + Sync + 'static
47>;
48pub type OutputMoverMap = Arc<RwLock<HashMap<TypeId, OutputMover>>>;
49static OUTPUT_MOVERS: OnceLock<OutputMoverMap> = OnceLock::new();
50
51fn output_movers() -> &'static OutputMoverMap {
52    OUTPUT_MOVERS.get_or_init(|| Arc::new(RwLock::new(HashMap::new())))
53}
54
55/// Create a new output mover registry.
56///
57/// ```no_run
58/// use daedalus_runtime::io::new_output_mover_map;
59/// let map = new_output_mover_map();
60/// assert!(map.read().unwrap().is_empty());
61/// ```
62pub fn new_output_mover_map() -> OutputMoverMap {
63    Arc::new(RwLock::new(HashMap::new()))
64}
65
66/// Register an output mover in the provided map.
67///
68/// Movers take ownership of the output value, allowing zero-copy payload wrapping.
69pub fn register_output_mover_in<T, F>(map: &OutputMoverMap, mover: F)
70where
71    T: Any + Send + Sync + 'static,
72    F: Fn(T) -> EdgePayload + Send + Sync + 'static,
73{
74    let mut guard = map.write().expect("OUTPUT_MOVERS lock poisoned");
75    guard.insert(
76        TypeId::of::<T>(),
77        Box::new(move |any| {
78            let boxed = any.downcast::<T>().expect("output mover type mismatch");
79            mover(*boxed)
80        }),
81    );
82}
83
84/// Register a global output mover.
85pub fn register_output_mover<T, F>(mover: F)
86where
87    T: Any + Send + Sync + 'static,
88    F: Fn(T) -> EdgePayload + Send + Sync + 'static,
89{
90    register_output_mover_in(output_movers(), mover);
91}
92
93fn try_move_output<T>(movers: Option<&OutputMoverMap>, value: T) -> Result<EdgePayload, T>
94where
95    T: Any + Send + Sync + 'static,
96{
97    let map = match movers {
98        Some(map) => map,
99        None => return Err(value),
100    };
101    let guard = match map.read() {
102        Ok(guard) => guard,
103        Err(_) => return Err(value),
104    };
105    let mover = match guard.get(&TypeId::of::<T>()) {
106        Some(mover) => mover,
107        None => return Err(value),
108    };
109    let boxed: Box<dyn Any + Send + Sync> = Box::new(value);
110    Ok(mover(boxed))
111}
112
113#[cfg(feature = "gpu")]
114fn promote_payload_for_host(payload: EdgePayload) -> EdgePayload {
115    use daedalus_gpu::{ErasedPayload, Payload};
116
117    match payload {
118        EdgePayload::Any(a) => {
119            if let Some(ep) = a.downcast_ref::<ErasedPayload>() {
120                return EdgePayload::Payload(ep.clone());
121            }
122            if let Some(p) = a.downcast_ref::<Payload<DynamicImage>>() {
123                return match p.clone() {
124                    Payload::Cpu(img) => EdgePayload::Payload(ErasedPayload::from_cpu::<DynamicImage>(img)),
125                    Payload::Gpu(g) => EdgePayload::Payload(ErasedPayload::from_gpu::<DynamicImage>(g)),
126                };
127            }
128            if let Some(p) = a.downcast_ref::<Payload<GrayImage>>() {
129                return match p.clone() {
130                    Payload::Cpu(img) => EdgePayload::Payload(ErasedPayload::from_cpu::<GrayImage>(img)),
131                    Payload::Gpu(g) => EdgePayload::Payload(ErasedPayload::from_gpu::<GrayImage>(g)),
132                };
133            }
134            if let Some(img) = a.downcast_ref::<DynamicImage>() {
135                return EdgePayload::Payload(ErasedPayload::from_cpu::<DynamicImage>(img.clone()));
136            }
137            if let Some(img) = a.downcast_ref::<Arc<DynamicImage>>() {
138                return EdgePayload::Payload(ErasedPayload::from_cpu::<DynamicImage>((**img).clone()));
139            }
140            if let Some(img) = a.downcast_ref::<GrayImage>() {
141                return EdgePayload::Payload(ErasedPayload::from_cpu::<GrayImage>(img.clone()));
142            }
143            if let Some(img) = a.downcast_ref::<Arc<GrayImage>>() {
144                return EdgePayload::Payload(ErasedPayload::from_cpu::<GrayImage>((**img).clone()));
145            }
146            if let Some(img) = a.downcast_ref::<RgbImage>() {
147                return EdgePayload::Payload(ErasedPayload::from_cpu::<RgbImage>(img.clone()));
148            }
149            if let Some(img) = a.downcast_ref::<Arc<RgbImage>>() {
150                return EdgePayload::Payload(ErasedPayload::from_cpu::<RgbImage>((**img).clone()));
151            }
152            if let Some(img) = a.downcast_ref::<RgbaImage>() {
153                return EdgePayload::Payload(ErasedPayload::from_cpu::<RgbaImage>(img.clone()));
154            }
155            if let Some(img) = a.downcast_ref::<Arc<RgbaImage>>() {
156                return EdgePayload::Payload(ErasedPayload::from_cpu::<RgbaImage>((**img).clone()));
157            }
158            if let Some(img) = a.downcast_ref::<GrayAlphaImage>() {
159                let dyn_img = DynamicImage::ImageLumaA8(img.clone());
160                return EdgePayload::Payload(ErasedPayload::from_cpu::<DynamicImage>(dyn_img));
161            }
162            if let Some(img) = a.downcast_ref::<Arc<GrayAlphaImage>>() {
163                let dyn_img = DynamicImage::ImageLumaA8((**img).clone());
164                return EdgePayload::Payload(ErasedPayload::from_cpu::<DynamicImage>(dyn_img));
165            }
166            EdgePayload::Any(a)
167        }
168        other => other,
169    }
170}
171
172/// Create a new constant coercer registry.
173///
174/// ```
175/// use daedalus_runtime::io::new_const_coercer_map;
176/// let map = new_const_coercer_map();
177/// assert!(map.read().unwrap().is_empty());
178/// ```
179pub fn new_const_coercer_map() -> ConstCoercerMap {
180    Arc::new(RwLock::new(HashMap::new()))
181}
182
183/// Register a conversion for constant default values.
184///
185/// Constant defaults are injected into node inputs as `Any` payloads (typically `i64`, `f64`,
186/// `bool`, `String`, or `daedalus_data::model::Value`). Most payload types should flow through the
187/// graph without conversion, but scalar configuration inputs often want a native Rust type (e.g.
188/// `u32`, `f32`, or plugin-defined enums).
189///
190/// Static (non-plugin) builds can call this during initialization to support enum constants.
191///
192/// Dynamic plugins should prefer `PluginRegistry::register_const_coercer` so the host and plugin
193/// share a single coercer map.
194/// Register a global const value coercer.
195///
196/// ```
197/// use daedalus_runtime::io::register_const_coercer;
198/// use daedalus_data::model::Value;
199/// register_const_coercer::<i64, _>(|v| match v { Value::Int(i) => Some(*i), _ => None });
200/// ```
201pub fn register_const_coercer<T, F>(coercer: F)
202where
203    T: Any + Send + Sync + 'static,
204    F: Fn(&daedalus_data::model::Value) -> Option<T> + Send + Sync + 'static,
205{
206    let key = std::any::type_name::<T>();
207    let map = GLOBAL_CONST_COERCERS.get_or_init(new_const_coercer_map);
208    let mut guard = map.write().expect("GLOBAL_CONST_COERCERS lock poisoned");
209    guard.insert(
210        key,
211        Box::new(move |v| coercer(v).map(|t| Box::new(t) as Box<dyn Any + Send + Sync>)),
212    );
213}
214
215/// Minimal node I/O surface backed by executor queues.
216///
217/// ```no_run
218/// use daedalus_runtime::io::NodeIo;
219/// use daedalus_runtime::executor::EdgePayload;
220///
221/// fn handler(io: &mut NodeIo) {
222///     io.push_output(Some("out"), EdgePayload::Unit);
223/// }
224/// ```
225pub struct NodeIo<'a> {
226    inputs: Vec<(String, CorrelatedPayload)>,
227    borrowed_cache: std::cell::UnsafeCell<Vec<Box<dyn Any + Send + Sync>>>,
228    sync_groups: Vec<SyncGroup>,
229    port_overrides: HashMap<String, (Option<BackpressureStrategy>, Option<usize>)>,
230    current_corr_id: u64,
231    outgoing: Vec<usize>,
232    has_incoming_edges: bool,
233    queues: &'a Arc<Vec<EdgeStorage>>,
234    telemetry: &'a mut ExecutionTelemetry,
235    edges: &'a [EdgeInfo],
236    #[allow(dead_code)]
237    seg_idx: usize,
238    node_idx: usize,
239    node_id: String,
240    warnings_seen: &'a std::sync::Arc<std::sync::Mutex<std::collections::HashSet<String>>>,
241    backpressure: BackpressureStrategy,
242    #[cfg(feature = "gpu")]
243    gpu: Option<daedalus_gpu::GpuContextHandle>,
244    #[cfg(feature = "gpu")]
245    target_compute: daedalus_planner::ComputeAffinity,
246    #[cfg(feature = "gpu")]
247    payload_edges: &'a HashSet<usize>,
248    const_coercers: Option<ConstCoercerMap>,
249    output_movers: Option<OutputMoverMap>,
250}
251
252impl<'a> NodeIo<'a> {
253    #[cfg(feature = "gpu")]
254    fn dynamic_image_to_t<T: Any + Clone>(img: DynamicImage) -> Option<T> {
255        let want = TypeId::of::<T>();
256        if want == TypeId::of::<DynamicImage>() {
257            let any_ref: &dyn Any = &img;
258            return any_ref.downcast_ref::<T>().cloned();
259        }
260        if want == TypeId::of::<GrayImage>() {
261            let gray = img.to_luma8();
262            let any_ref: &dyn Any = &gray;
263            return any_ref.downcast_ref::<T>().cloned();
264        }
265        if want == TypeId::of::<GrayAlphaImage>() {
266            let gray = img.to_luma_alpha8();
267            let any_ref: &dyn Any = &gray;
268            return any_ref.downcast_ref::<T>().cloned();
269        }
270        if want == TypeId::of::<RgbImage>() {
271            let rgb = img.to_rgb8();
272            let any_ref: &dyn Any = &rgb;
273            return any_ref.downcast_ref::<T>().cloned();
274        }
275        if want == TypeId::of::<RgbaImage>() {
276            let rgba = img.to_rgba8();
277            let any_ref: &dyn Any = &rgba;
278            return any_ref.downcast_ref::<T>().cloned();
279        }
280        None
281    }
282
283    #[allow(clippy::too_many_arguments)]
284    /// Construct the I/O facade for a node (internal runtime API).
285    pub fn new(
286        incoming_edges: Vec<usize>,
287        outgoing_edges: Vec<usize>,
288        queues: &'a Arc<Vec<EdgeStorage>>,
289        warnings_seen: &'a Arc<std::sync::Mutex<std::collections::HashSet<String>>>,
290        edges: &'a [EdgeInfo],
291        mut sync_groups: Vec<SyncGroup>,
292        #[cfg(feature = "gpu")] gpu_entry_edges: &'a HashSet<usize>,
293        #[cfg(feature = "gpu")] gpu_exit_edges: &'a HashSet<usize>,
294        #[cfg(feature = "gpu")] payload_edges: &'a HashSet<usize>,
295        seg_idx: usize,
296        node_idx: usize,
297        node_id: String,
298        telemetry: &'a mut ExecutionTelemetry,
299        backpressure: BackpressureStrategy,
300        const_inputs: &[(String, daedalus_data::model::Value)],
301        const_coercers: Option<ConstCoercerMap>,
302        output_movers: Option<OutputMoverMap>,
303        #[cfg(feature = "gpu")] gpu: Option<daedalus_gpu::GpuContextHandle>,
304        #[cfg(feature = "gpu")] target_compute: daedalus_planner::ComputeAffinity,
305    ) -> Self {
306        let is_host_bridge =
307            node_id.ends_with("io.host_bridge") || node_id.ends_with("io.host_output");
308        let has_incoming_edges = !incoming_edges.is_empty();
309        if sync_groups.is_empty() && !is_host_bridge {
310            // Default behavior: multi-input nodes should only fire when all ports are ready.
311            // If the node doesn't specify sync metadata, create an implicit AllReady group
312            // across all incoming ports so downstream consumers never see partial inputs.
313            let mut ports: Vec<String> = incoming_edges
314                .iter()
315                .filter_map(|edge_idx| edges.get(*edge_idx).map(|(_, _, _, to_port, _)| to_port))
316                .cloned()
317                .collect();
318            ports.sort();
319            ports.dedup();
320            if ports.len() > 1 {
321                sync_groups.push(SyncGroup {
322                    name: "__implicit_all_ready".into(),
323                    policy: SyncPolicy::AllReady,
324                    backpressure: None,
325                    capacity: None,
326                    ports,
327                });
328            }
329        }
330
331        let mut drained: Vec<DrainedInput> = Vec::new();
332        for edge_idx in &incoming_edges {
333            if let Some(storage) = queues.get(*edge_idx) {
334                match storage {
335                    EdgeStorage::Locked(q_arc) => {
336                        if let Ok(mut q) = q_arc.lock() {
337                            while let Some(payload) = q.pop_front() {
338                                #[allow(unused_mut)]
339                                let mut payload = payload;
340                                let now = Instant::now();
341                                telemetry
342                                    .record_edge_wait(*edge_idx, now.saturating_duration_since(payload.enqueued_at));
343                                let payload_bytes = if cfg!(feature = "metrics")
344                                    && telemetry.metrics_level.is_detailed()
345                                {
346                                    crate::executor::payload_size_bytes(&payload.inner)
347                                } else {
348                                    None
349                                };
350                                telemetry.record_node_payload_in(node_idx, payload_bytes);
351                                let port = edges
352                                    .get(*edge_idx)
353                                    .map(|(_, _, _, to_port, _)| to_port.clone())
354                                    .unwrap_or_default();
355                                #[cfg(feature = "gpu")]
356                                {
357                                    payload = Self::convert_incoming(
358                                        payload,
359                                        *edge_idx,
360                                        gpu_entry_edges,
361                                        gpu_exit_edges,
362                                        gpu.as_ref(),
363                                        telemetry,
364                                    );
365                                }
366                                drained.push(DrainedInput {
367                                    port,
368                                    edge_idx: *edge_idx,
369                                    payload,
370                                });
371                            }
372                        }
373                    }
374                    #[cfg(feature = "lockfree-queues")]
375                    EdgeStorage::BoundedLf(q) => {
376                        while let Some(payload) = q.pop() {
377                            #[allow(unused_mut)]
378                            let mut payload = payload;
379                            let now = Instant::now();
380                            telemetry
381                                .record_edge_wait(*edge_idx, now.saturating_duration_since(payload.enqueued_at));
382                            let payload_bytes = if cfg!(feature = "metrics")
383                                && telemetry.metrics_level.is_detailed()
384                            {
385                                crate::executor::payload_size_bytes(&payload.inner)
386                            } else {
387                                None
388                            };
389                            telemetry.record_node_payload_in(node_idx, payload_bytes);
390                            let port = edges
391                                .get(*edge_idx)
392                                .map(|(_, _, _, to_port, _)| to_port.clone())
393                                .unwrap_or_default();
394                            #[cfg(feature = "gpu")]
395                            {
396                                payload = Self::convert_incoming(
397                                    payload,
398                                    *edge_idx,
399                                    gpu_entry_edges,
400                                    gpu_exit_edges,
401                                    gpu.as_ref(),
402                                    telemetry,
403                                );
404                            }
405                            drained.push(DrainedInput {
406                                port,
407                                edge_idx: *edge_idx,
408                                payload,
409                            });
410                        }
411                    }
412                }
413            }
414        }
415        if std::env::var_os("DAEDALUS_TRACE_EDGE_IO").is_some() {
416            for item in &drained {
417                log::debug!(
418                    "node input drained node={} port={} edge_idx={} payload={}",
419                    node_id,
420                    item.port,
421                    item.edge_idx,
422                    edge_payload_desc(&item.payload.inner)
423                );
424            }
425        }
426        if log::log_enabled!(log::Level::Debug) && drained.is_empty() {
427            let ports: Vec<String> = incoming_edges
428                .iter()
429                .filter_map(|edge_idx| edges.get(*edge_idx).map(|(_, _, _, to_port, _)| to_port.clone()))
430                .collect();
431            if !ports.is_empty() {
432                log::debug!("node inputs empty node={} ports={:?}", node_id, ports);
433            }
434        }
435
436        let has_drained = !drained.is_empty();
437        let mut const_payloads: Vec<(String, CorrelatedPayload)> = Vec::new();
438        // Apply constant defaults only when no incoming payload exists for the port.
439        for (port, value) in const_inputs {
440            if drained.iter().any(|p| p.port == *port) {
441                continue;
442            }
443            let payload = match value {
444                // Prefer scalar carriers for cross-dylib stability; coercion still has access to
445                // the `Value` shape via `coerce_const_any`.
446                daedalus_data::model::Value::Int(v) => EdgePayload::Any(Arc::new(*v)),
447                daedalus_data::model::Value::Float(f) => EdgePayload::Any(Arc::new(*f)),
448                daedalus_data::model::Value::Bool(b) => EdgePayload::Any(Arc::new(*b)),
449                // Keep JSON-authored constants available to coercers (e.g., enum coercion).
450                daedalus_data::model::Value::String(s) => {
451                    EdgePayload::Any(Arc::new(s.to_string()))
452                }
453                other => EdgePayload::Value(other.clone()),
454            };
455            const_payloads.push((port.clone(), CorrelatedPayload::from_edge(payload)));
456        }
457
458        let (mut aligned_inputs, leftovers, ready) = align_drained_inputs(drained, &sync_groups);
459        if log::log_enabled!(log::Level::Debug) && !sync_groups.is_empty() && !ready {
460            log::debug!(
461                "node sync groups not ready node={} groups={:?}",
462                node_id,
463                sync_groups
464            );
465        }
466        if !sync_groups.is_empty() && !ready {
467            aligned_inputs.clear();
468        } else if has_drained || !has_incoming_edges {
469            aligned_inputs.extend(const_payloads);
470        }
471        requeue_drained(leftovers, queues, edges);
472
473        let current_corr_id = aligned_inputs
474            .first()
475            .map(|(_, cp)| cp.correlation_id)
476            .unwrap_or_else(next_correlation_id);
477
478        let mut port_overrides = HashMap::new();
479        for group in &sync_groups {
480            for port in &group.ports {
481                port_overrides.insert(port.clone(), (group.backpressure.clone(), group.capacity));
482            }
483        }
484
485        Self {
486            inputs: aligned_inputs,
487            borrowed_cache: std::cell::UnsafeCell::new(Vec::new()),
488            sync_groups,
489            port_overrides,
490            current_corr_id,
491            outgoing: outgoing_edges,
492            has_incoming_edges,
493            queues,
494            telemetry,
495            edges,
496            seg_idx,
497            node_idx,
498            node_id,
499            warnings_seen,
500            backpressure,
501            #[cfg(feature = "gpu")]
502            gpu,
503            #[cfg(feature = "gpu")]
504            target_compute,
505            #[cfg(feature = "gpu")]
506            payload_edges,
507            const_coercers,
508            output_movers,
509        }
510    }
511
512    /// Returns the consumed inputs for this node.
513    /// Borrow the drained inputs.
514    ///
515    /// ```no_run
516    /// use daedalus_runtime::io::NodeIo;
517    /// fn handler(io: &NodeIo) {
518    ///     let _ = io.inputs();
519    /// }
520    /// ```
521    pub fn inputs(&self) -> &[(String, CorrelatedPayload)] {
522        &self.inputs
523    }
524
525    /// Whether this node has any incoming edges.
526    pub fn has_incoming_edges(&self) -> bool {
527        self.has_incoming_edges
528    }
529
530    fn take_input(&mut self, port: &str) -> Option<(usize, CorrelatedPayload)> {
531        let idx = match self.inputs.iter().position(|(p, _)| p == port) {
532            Some(idx) => idx,
533            None => {
534                if std::env::var_os("DAEDALUS_TRACE_MISSING_INPUTS").is_some() {
535                    let ports: Vec<&str> = self.inputs.iter().map(|(p, _)| p.as_str()).collect();
536                    eprintln!(
537                        "daedalus-runtime: missing input node={} port={} available_ports={:?}",
538                        self.node_id,
539                        port,
540                        ports
541                    );
542                }
543                return None;
544            }
545        };
546        let payload = self.inputs.remove(idx).1;
547        Some((idx, payload))
548    }
549
550    fn restore_input(&mut self, idx: usize, port: &str, payload: CorrelatedPayload) {
551        if idx <= self.inputs.len() {
552            self.inputs.insert(idx, (port.to_string(), payload));
553        } else {
554            self.inputs.push((port.to_string(), payload));
555        }
556    }
557
558    fn cache_borrowed<T: Any + Send + Sync>(&self, value: T) -> &T {
559        let cache = unsafe { &mut *self.borrowed_cache.get() };
560        cache.push(Box::new(value));
561        cache
562            .last()
563            .and_then(|boxed| boxed.downcast_ref::<T>())
564            .expect("borrowed cache type mismatch")
565    }
566
567    /// Returns sync groups metadata for this node.
568    /// Return sync group metadata.
569    ///
570    /// ```no_run
571    /// use daedalus_runtime::io::NodeIo;
572    /// fn handler(io: &NodeIo) {
573    ///     let _ = io.sync_groups();
574    /// }
575    /// ```
576    pub fn sync_groups(&self) -> &[SyncGroup] {
577        &self.sync_groups
578    }
579
580    /// Push a payload to all outgoing edges (fan-out). Optionally filter by port.
581    /// Push a prepared payload to an output port.
582    ///
583    /// ```no_run
584    /// use daedalus_runtime::io::NodeIo;
585    /// use daedalus_runtime::executor::EdgePayload;
586    /// fn handler(io: &mut NodeIo) {
587    ///     io.push_output(Some("out"), EdgePayload::Unit);
588    /// }
589    /// ```
590    pub fn push_output(&mut self, port: Option<&str>, payload: EdgePayload) {
591        let correlated = CorrelatedPayload {
592            correlation_id: self.current_corr_id,
593            inner: payload,
594            enqueued_at: Instant::now(),
595        };
596        self.push_correlated(port, correlated);
597    }
598
599    /// Push a pre-correlated payload (used by host-bridge style nodes).
600    pub fn push_correlated_payload(&mut self, port: Option<&str>, correlated: CorrelatedPayload) {
601        self.push_correlated(port, correlated);
602    }
603
604    fn push_correlated(&mut self, port: Option<&str>, correlated: CorrelatedPayload) {
605        #[cfg(feature = "gpu")]
606        let mut matches: Vec<(usize, String, EdgePolicyKind, BackpressureStrategy, Option<usize>, bool)> = Vec::new();
607        #[cfg(not(feature = "gpu"))]
608        let mut matches: Vec<(usize, String, EdgePolicyKind, BackpressureStrategy, Option<usize>)> = Vec::new();
609        for edge_idx in &self.outgoing {
610            if let Some((_, from_port, _, _, policy)) = self.edges.get(*edge_idx) {
611                if let Some(p) = port
612                    && !p.eq_ignore_ascii_case(from_port)
613                {
614                    continue;
615                }
616                let (bp_override, cap_override) = self
617                    .port_overrides
618                    .get(from_port)
619                    .cloned()
620                    .unwrap_or((None, None));
621                let bp = bp_override.unwrap_or(self.backpressure.clone());
622                #[cfg(feature = "gpu")]
623                {
624                    let needs_payload = self.payload_edges.contains(edge_idx);
625                    matches.push((*edge_idx, from_port.clone(), policy.clone(), bp, cap_override, needs_payload));
626                }
627                #[cfg(not(feature = "gpu"))]
628                {
629                    matches.push((*edge_idx, from_port.clone(), policy.clone(), bp, cap_override));
630                }
631            }
632        }
633
634        if matches.len() == 1 {
635            #[cfg(feature = "gpu")]
636            let (edge_idx, from_port, policy, bp, cap_override, needs_payload) = matches.remove(0);
637            #[cfg(not(feature = "gpu"))]
638            let (edge_idx, from_port, policy, bp, cap_override) = matches.remove(0);
639            if std::env::var_os("DAEDALUS_TRACE_EDGE_IO").is_some() {
640                log::warn!(
641                    "node output enqueue node={} port={} edge_idx={} payload={}",
642                    self.node_id,
643                    from_port,
644                    edge_idx,
645                    edge_payload_desc(&correlated.inner)
646                );
647            }
648            if std::env::var_os("DAEDALUS_TRACE_EDGE_IO_STDERR").is_some() {
649                eprintln!(
650                    "node output enqueue node={} port={} edge_idx={} payload={}",
651                    self.node_id,
652                    from_port,
653                    edge_idx,
654                    edge_payload_desc(&correlated.inner)
655                );
656            }
657            let mut effective_policy = policy;
658            if let Some(cap) = cap_override {
659                effective_policy = EdgePolicyKind::Bounded { cap };
660            }
661            #[cfg(feature = "gpu")]
662            let correlated = if needs_payload {
663                let mut updated = correlated;
664                updated.inner = promote_payload_for_host(updated.inner);
665                updated
666            } else {
667                correlated
668            };
669            let payload_bytes = if cfg!(feature = "metrics")
670                && self.telemetry.metrics_level.is_detailed()
671            {
672                crate::executor::payload_size_bytes(&correlated.inner)
673            } else {
674                None
675            };
676            self.telemetry
677                .record_node_payload_out(self.node_idx, payload_bytes);
678            apply_policy_owned(ApplyPolicyOwnedArgs {
679                edge_idx,
680                policy: &effective_policy,
681                payload: correlated,
682                queues: self.queues,
683                warnings_seen: self.warnings_seen,
684                telem: self.telemetry,
685                warning_label: Some(format!("edge_{}_{}", self.node_id, from_port)),
686                backpressure: bp,
687            });
688            return;
689        }
690
691        #[cfg(feature = "gpu")]
692        for (edge_idx, from_port, mut policy, bp, cap_override, needs_payload) in matches {
693            if std::env::var_os("DAEDALUS_TRACE_EDGE_IO").is_some() {
694                log::warn!(
695                    "node output enqueue node={} port={} edge_idx={} payload={}",
696                    self.node_id,
697                    from_port,
698                    edge_idx,
699                    edge_payload_desc(&correlated.inner)
700                );
701            }
702            if std::env::var_os("DAEDALUS_TRACE_EDGE_IO_STDERR").is_some() {
703                eprintln!(
704                    "node output enqueue node={} port={} edge_idx={} payload={}",
705                    self.node_id,
706                    from_port,
707                    edge_idx,
708                    edge_payload_desc(&correlated.inner)
709                );
710            }
711            if let Some(cap) = cap_override {
712                policy = EdgePolicyKind::Bounded { cap };
713            }
714            let mut payload = correlated.clone();
715            if needs_payload {
716                payload.inner = promote_payload_for_host(payload.inner);
717            }
718            let payload_bytes = if cfg!(feature = "metrics")
719                && self.telemetry.metrics_level.is_detailed()
720            {
721                crate::executor::payload_size_bytes(&payload.inner)
722            } else {
723                None
724            };
725            self.telemetry
726                .record_node_payload_out(self.node_idx, payload_bytes);
727            apply_policy(
728                edge_idx,
729                &policy,
730                &payload,
731                self.queues,
732                self.warnings_seen,
733                self.telemetry,
734                Some(format!("edge_{}_{}", self.node_id, from_port)),
735                bp,
736            );
737        }
738        #[cfg(not(feature = "gpu"))]
739        for (edge_idx, from_port, mut policy, bp, cap_override) in matches {
740            if std::env::var_os("DAEDALUS_TRACE_EDGE_IO").is_some() {
741                log::warn!(
742                    "node output enqueue node={} port={} edge_idx={} payload={}",
743                    self.node_id,
744                    from_port,
745                    edge_idx,
746                    edge_payload_desc(&correlated.inner)
747                );
748            }
749            if std::env::var_os("DAEDALUS_TRACE_EDGE_IO_STDERR").is_some() {
750                eprintln!(
751                    "node output enqueue node={} port={} edge_idx={} payload={}",
752                    self.node_id,
753                    from_port,
754                    edge_idx,
755                    edge_payload_desc(&correlated.inner)
756                );
757            }
758            if let Some(cap) = cap_override {
759                policy = EdgePolicyKind::Bounded { cap };
760            }
761            let payload_bytes = if cfg!(feature = "metrics")
762                && self.telemetry.metrics_level.is_detailed()
763            {
764                crate::executor::payload_size_bytes(&correlated.inner)
765            } else {
766                None
767            };
768            self.telemetry
769                .record_node_payload_out(self.node_idx, payload_bytes);
770            apply_policy(
771                edge_idx,
772                &policy,
773                &correlated,
774                self.queues,
775                self.warnings_seen,
776                self.telemetry,
777                Some(format!("edge_{}_{}", self.node_id, from_port)),
778                bp,
779            );
780        }
781    }
782
783    #[cfg(feature = "gpu")]
784    pub fn push_payload<T>(&mut self, port: Option<&str>, value: daedalus_gpu::Payload<T>)
785    where
786        T: daedalus_gpu::GpuSendable + Clone + Send + Sync + 'static,
787        T::GpuRepr: Clone + Send + Sync + 'static,
788    {
789        match value {
790            daedalus_gpu::Payload::Cpu(v) => {
791                let payload = EdgePayload::Payload(daedalus_gpu::ErasedPayload::from_cpu::<T>(v));
792                self.push_output(port, payload);
793            }
794            daedalus_gpu::Payload::Gpu(g) => {
795                let payload = EdgePayload::Payload(daedalus_gpu::ErasedPayload::from_gpu::<T>(g));
796                self.push_output(port, payload);
797            }
798        }
799    }
800
801    pub fn push_any<T: Any + Send + Sync + 'static>(&mut self, port: Option<&str>, value: T) {
802        self.push_output(port, EdgePayload::Any(Arc::new(value)));
803    }
804
805    pub fn push_typed<T>(&mut self, port: Option<&str>, value: T)
806    where
807        T: Any + Clone + Send + Sync + 'static,
808    {
809        if std::env::var_os("DAEDALUS_TRACE_EDGE_IO").is_some() {
810            let port_name = port.unwrap_or("<all>");
811            log::warn!(
812                "node output prepare node={} port={} type={}",
813                self.node_id,
814                port_name,
815                std::any::type_name::<T>()
816            );
817        }
818        if std::env::var_os("DAEDALUS_TRACE_EDGE_IO_STDERR").is_some() {
819            let port_name = port.unwrap_or("<all>");
820            eprintln!(
821                "node output prepare node={} port={} type={}",
822                self.node_id,
823                port_name,
824                std::any::type_name::<T>()
825            );
826        }
827        match try_move_output(self.output_movers.as_ref(), value) {
828            Ok(payload) => self.push_output(port, payload),
829            Err(value) => {
830                self.push_any(port, value);
831            }
832        }
833    }
834
835    /// Push a `Value` payload to an output port.
836    ///
837    /// ```no_run
838    /// use daedalus_runtime::io::NodeIo;
839    /// use daedalus_data::model::Value;
840    /// fn handler(io: &mut NodeIo) {
841    ///     io.push_value(Some("out"), Value::Int(1));
842    /// }
843    /// ```
844    pub fn push_value(&mut self, port: Option<&str>, value: daedalus_data::model::Value) {
845        self.push_output(port, EdgePayload::Value(value));
846    }
847
848    /// Iterate all inputs for a given port name.
849    /// Iterate inputs for a named port.
850    ///
851    /// ```no_run
852    /// use daedalus_runtime::io::NodeIo;
853    /// fn handler(io: &NodeIo) {
854    ///     for _payload in io.inputs_for("in") {}
855    /// }
856    /// ```
857    pub fn inputs_for<'b>(&'b self, port: &str) -> impl Iterator<Item = &'b CorrelatedPayload> {
858        self.inputs
859            .iter()
860            .filter(move |(p, _)| p == port)
861            .map(|(_, payload)| payload)
862    }
863
864    /// Typed accessor for Any payloads.
865    pub fn get_any<T: Any + Clone + Send + Sync>(&self, port: &str) -> Option<T> {
866        #[cfg(feature = "gpu")]
867        let want = TypeId::of::<T>();
868        self.inputs_for(port).find_map(|p| match &p.inner {
869            EdgePayload::Any(a) => {
870                a.downcast_ref::<T>()
871                    .cloned()
872                    .or_else(|| self.coerce_const_any::<T>(a.as_ref()))
873            }
874            #[cfg(feature = "gpu")]
875            EdgePayload::Payload(ep) => {
876                if let Some(v) = ep.try_downcast_cpu_any::<T>() {
877                    return Some(v);
878                }
879                if want == TypeId::of::<DynamicImage>() {
880                    return ep.clone_cpu::<DynamicImage>().and_then(Self::dynamic_image_to_t::<T>);
881                }
882                if want == TypeId::of::<GrayImage>()
883                    && let Some(gray) = ep.clone_cpu::<GrayImage>()
884                {
885                    let any_ref: &dyn Any = &gray;
886                    return any_ref.downcast_ref::<T>().cloned();
887                }
888                if want == TypeId::of::<RgbImage>()
889                    && let Some(rgb) = ep.clone_cpu::<RgbImage>()
890                {
891                    let any_ref: &dyn Any = &rgb;
892                    return any_ref.downcast_ref::<T>().cloned();
893                }
894                if want == TypeId::of::<RgbaImage>()
895                    && let Some(rgba) = ep.clone_cpu::<RgbaImage>()
896                {
897                    let any_ref: &dyn Any = &rgba;
898                    return any_ref.downcast_ref::<T>().cloned();
899                }
900                ep.clone_cpu::<DynamicImage>().and_then(|img| {
901                    if want == TypeId::of::<DynamicImage>()
902                        || want == TypeId::of::<GrayImage>()
903                        || want == TypeId::of::<GrayAlphaImage>()
904                        || want == TypeId::of::<RgbImage>()
905                        || want == TypeId::of::<RgbaImage>()
906                    {
907                        return Self::dynamic_image_to_t::<T>(img);
908                    }
909                    None
910                }).or_else(|| {
911                    if log::log_enabled!(log::Level::Debug)
912                        && (want == TypeId::of::<DynamicImage>()
913                            || want == TypeId::of::<GrayImage>()
914                            || want == TypeId::of::<GrayAlphaImage>()
915                            || want == TypeId::of::<RgbImage>()
916                            || want == TypeId::of::<RgbaImage>())
917                    {
918                        log::debug!("payload type mismatch port={} payload={:?}", port, ep);
919                    }
920                    None
921                })
922            },
923            #[cfg(feature = "gpu")]
924            EdgePayload::GpuImage(h)
925                if TypeId::of::<T>() == TypeId::of::<daedalus_gpu::GpuImageHandle>() =>
926            {
927                let any_ref: &dyn Any = h;
928                any_ref.downcast_ref::<T>().cloned()
929            }
930            _ => None,
931        })
932    }
933
934    /// Borrow a typed Any payload without cloning.
935    pub fn get_any_ref<T: Any + Send + Sync>(&self, port: &str) -> Option<&T> {
936        self.inputs_for(port).find_map(|p| match &p.inner {
937            EdgePayload::Any(a) => a.downcast_ref::<T>(),
938            #[cfg(feature = "gpu")]
939            EdgePayload::GpuImage(h)
940                if TypeId::of::<T>() == TypeId::of::<daedalus_gpu::GpuImageHandle>() =>
941            {
942                let any_ref: &dyn Any = h;
943                any_ref.downcast_ref::<T>()
944            }
945            _ => None,
946        })
947    }
948
949    /// Borrow a typed input with constant coercion support.
950    pub fn get_typed_ref<T>(&self, port: &str) -> Option<&T>
951    where
952        T: Any + Clone + Send + Sync,
953    {
954        if let Some(v) = self.get_any_ref::<T>(port) {
955            return Some(v);
956        }
957        if let Some(v) = self.get_any::<T>(port) {
958            return Some(self.cache_borrowed(v));
959        }
960
961        if let Some(v) = self.get_any_ref::<i64>(port) {
962            if let Some(t) = Self::coerce_from_i64::<T>(*v) {
963                return Some(self.cache_borrowed(t));
964            }
965            if let Some(t) = self.coerce_via_registry::<T>(&Value::Int(*v)) {
966                return Some(self.cache_borrowed(t));
967            }
968        }
969        if let Some(v) = self.get_any_ref::<f64>(port) {
970            if let Some(t) = Self::coerce_from_f64::<T>(*v) {
971                return Some(self.cache_borrowed(t));
972            }
973            if let Some(t) = self.coerce_via_registry::<T>(&Value::Float(*v)) {
974                return Some(self.cache_borrowed(t));
975            }
976        }
977        if let Some(v) = self.get_any_ref::<bool>(port) {
978            let any_ref: &dyn Any = v;
979            if let Some(t) = any_ref.downcast_ref::<T>().cloned() {
980                return Some(self.cache_borrowed(t));
981            }
982            if let Some(t) = self.coerce_via_registry::<T>(&Value::Bool(*v)) {
983                return Some(self.cache_borrowed(t));
984            }
985        }
986        if let Some(v) = self.get_any_ref::<String>(port) {
987            let any_ref: &dyn Any = v;
988            if let Some(t) = any_ref.downcast_ref::<T>().cloned() {
989                return Some(self.cache_borrowed(t));
990            }
991            if let Some(t) = self.coerce_via_registry::<T>(&Value::String(v.clone().into())) {
992                return Some(self.cache_borrowed(t));
993            }
994        }
995        if let Some(v) = self.get_value(port)
996            && let Some(t) = self.coerce_from_value::<T>(v)
997        {
998            return Some(self.cache_borrowed(t));
999        }
1000
1001        None
1002    }
1003
1004    /// Move a typed Any payload, cloning only when shared.
1005    pub fn get_any_mut<T>(&mut self, port: &str) -> Option<T>
1006    where
1007        T: Any + Clone + Send + Sync,
1008    {
1009        let (idx, payload) = self.take_input(port)?;
1010        let mut handled = None;
1011        let mut payload = payload;
1012        match std::mem::replace(&mut payload.inner, EdgePayload::Unit) {
1013            EdgePayload::Any(a) => match Arc::downcast::<T>(a) {
1014                Ok(arc) => {
1015                    handled = Some(match Arc::try_unwrap(arc) {
1016                        Ok(v) => v,
1017                        Err(arc) => (*arc).clone(),
1018                    });
1019                }
1020                Err(a) => {
1021                    payload.inner = EdgePayload::Any(a);
1022                }
1023            },
1024            other => {
1025                payload.inner = other;
1026            }
1027        }
1028        if handled.is_none() {
1029            self.restore_input(idx, port, payload);
1030        }
1031        handled
1032    }
1033
1034    /// Move a typed input with constant coercion support, cloning only when shared.
1035    pub fn get_typed_mut<T>(&mut self, port: &str) -> Option<T>
1036    where
1037        T: Any + Clone + Send + Sync,
1038    {
1039        #[cfg(feature = "gpu")]
1040        let want = TypeId::of::<T>();
1041        let (idx, mut payload) = self.take_input(port)?;
1042        let mut out: Option<T> = None;
1043        match std::mem::replace(&mut payload.inner, EdgePayload::Unit) {
1044            #[cfg(feature = "gpu")]
1045            EdgePayload::Payload(ep) => {
1046                let mut ep_opt = Some(ep);
1047                let downcast_owned = |value: Box<dyn Any + Send + Sync>| {
1048                    value.downcast::<T>().ok().map(|boxed| *boxed)
1049                };
1050                if std::env::var_os("DAEDALUS_TRACE_PAYLOAD_TAKE").is_some()
1051                    && let Some(ep) = ep_opt.as_ref()
1052                {
1053                    let clone_ok = if want == TypeId::of::<DynamicImage>() {
1054                        ep.clone_cpu::<DynamicImage>().is_some()
1055                    } else if want == TypeId::of::<GrayImage>() {
1056                        ep.clone_cpu::<GrayImage>().is_some()
1057                    } else if want == TypeId::of::<RgbImage>() {
1058                        ep.clone_cpu::<RgbImage>().is_some()
1059                    } else if want == TypeId::of::<RgbaImage>() {
1060                        ep.clone_cpu::<RgbaImage>().is_some()
1061                    } else {
1062                        false
1063                    };
1064                    eprintln!(
1065                        "daedalus-runtime: payload probe node={} port={} type={} clone_cpu={}",
1066                        self.node_id,
1067                        port,
1068                        std::any::type_name::<T>(),
1069                        clone_ok
1070                    );
1071                }
1072                if want == TypeId::of::<DynamicImage>() {
1073                    if let Some(ep) = ep_opt.take() {
1074                        match ep.take_cpu::<DynamicImage>() {
1075                            Ok(cpu) => out = downcast_owned(Box::new(cpu)),
1076                            Err(rest) => {
1077                                if let Some(cpu) = rest.clone_cpu::<DynamicImage>() {
1078                                    out = downcast_owned(Box::new(cpu));
1079                                } else {
1080                                    ep_opt = Some(rest);
1081                                }
1082                            }
1083                        }
1084                    }
1085                } else if want == TypeId::of::<GrayImage>() {
1086                    if let Some(ep) = ep_opt.take() {
1087                        match ep.take_cpu::<GrayImage>() {
1088                            Ok(cpu) => out = downcast_owned(Box::new(cpu)),
1089                            Err(rest) => {
1090                                if let Some(cpu) = rest.clone_cpu::<GrayImage>() {
1091                                    out = downcast_owned(Box::new(cpu));
1092                                } else {
1093                                    ep_opt = Some(rest);
1094                                }
1095                            }
1096                        }
1097                    }
1098                } else if want == TypeId::of::<RgbImage>() {
1099                    if let Some(ep) = ep_opt.take() {
1100                        match ep.take_cpu::<RgbImage>() {
1101                            Ok(cpu) => out = downcast_owned(Box::new(cpu)),
1102                            Err(rest) => {
1103                                if let Some(cpu) = rest.clone_cpu::<RgbImage>() {
1104                                    out = downcast_owned(Box::new(cpu));
1105                                } else {
1106                                    ep_opt = Some(rest);
1107                                }
1108                            }
1109                        }
1110                    }
1111                } else if want == TypeId::of::<RgbaImage>() && let Some(ep) = ep_opt.take() {
1112                    match ep.take_cpu::<RgbaImage>() {
1113                        Ok(cpu) => out = downcast_owned(Box::new(cpu)),
1114                        Err(rest) => {
1115                            if let Some(cpu) = rest.clone_cpu::<RgbaImage>() {
1116                                out = downcast_owned(Box::new(cpu));
1117                            } else {
1118                                ep_opt = Some(rest);
1119                            }
1120                        }
1121                    }
1122                }
1123
1124                if out.is_none()
1125                    && let Some(ep) = ep_opt.as_ref()
1126                    && (want == TypeId::of::<DynamicImage>()
1127                        || want == TypeId::of::<GrayImage>()
1128                        || want == TypeId::of::<GrayAlphaImage>()
1129                        || want == TypeId::of::<RgbImage>()
1130                        || want == TypeId::of::<RgbaImage>())
1131                    && let Some(img) = ep.clone_cpu::<DynamicImage>()
1132                {
1133                    out = Self::dynamic_image_to_t::<T>(img);
1134                }
1135                if out.is_none()
1136                    && let Some(ep) = ep_opt
1137                {
1138                    payload.inner = EdgePayload::Payload(ep);
1139                }
1140            }
1141            EdgePayload::Any(a) => {
1142                let any = a;
1143                match Arc::downcast::<T>(any) {
1144                    Ok(arc) => {
1145                        out = Some(match Arc::try_unwrap(arc) {
1146                            Ok(v) => v,
1147                            Err(arc) => (*arc).clone(),
1148                        });
1149                    }
1150                    Err(any) => match Arc::downcast::<i64>(any) {
1151                        Ok(arc) => {
1152                            let v = match Arc::try_unwrap(arc) {
1153                                Ok(v) => v,
1154                                Err(arc) => *arc,
1155                            };
1156                            out = Self::coerce_from_i64::<T>(v)
1157                                .or_else(|| self.coerce_via_registry::<T>(&Value::Int(v)));
1158                        }
1159                        Err(any) => match Arc::downcast::<f64>(any) {
1160                            Ok(arc) => {
1161                                let v = match Arc::try_unwrap(arc) {
1162                                    Ok(v) => v,
1163                                    Err(arc) => *arc,
1164                                };
1165                                out = Self::coerce_from_f64::<T>(v)
1166                                    .or_else(|| self.coerce_via_registry::<T>(&Value::Float(v)));
1167                            }
1168                            Err(any) => match Arc::downcast::<bool>(any) {
1169                                Ok(arc) => {
1170                                    let v = match Arc::try_unwrap(arc) {
1171                                        Ok(v) => v,
1172                                        Err(arc) => *arc,
1173                                    };
1174                                    let any_ref: &dyn Any = &v;
1175                                    out = any_ref
1176                                        .downcast_ref::<T>()
1177                                        .cloned()
1178                                        .or_else(|| self.coerce_via_registry::<T>(&Value::Bool(v)));
1179                                }
1180                                Err(any) => match Arc::downcast::<String>(any) {
1181                                    Ok(arc) => {
1182                                        let v = match Arc::try_unwrap(arc) {
1183                                            Ok(v) => v,
1184                                            Err(arc) => (*arc).clone(),
1185                                        };
1186                                        let any_ref: &dyn Any = &v;
1187                                        out = any_ref
1188                                            .downcast_ref::<T>()
1189                                            .cloned()
1190                                            .or_else(|| self.coerce_via_registry::<T>(&Value::String(v.into())));
1191                                    }
1192                                    Err(any) => match Arc::downcast::<daedalus_data::model::Value>(any) {
1193                                        Ok(arc) => {
1194                                            let v = match Arc::try_unwrap(arc) {
1195                                                Ok(v) => v,
1196                                                Err(arc) => (*arc).clone(),
1197                                            };
1198                                            out = self.coerce_from_value::<T>(&v);
1199                                        }
1200                                        Err(any) => {
1201                                            payload.inner = EdgePayload::Any(any);
1202                                        }
1203                                    },
1204                                },
1205                            },
1206                        },
1207                    },
1208                }
1209            }
1210            EdgePayload::Value(v) => {
1211                out = self.coerce_from_value::<T>(&v);
1212                payload.inner = EdgePayload::Value(v);
1213            }
1214            other => {
1215                payload.inner = other;
1216            }
1217        }
1218
1219        if out.is_none() {
1220            if std::env::var_os("DAEDALUS_TRACE_MISSING_INPUTS").is_some() {
1221                let desc = match &payload.inner {
1222                    EdgePayload::Any(a) => format!("Any({})", std::any::type_name_of_val(a.as_ref())),
1223                    #[cfg(feature = "gpu")]
1224                    EdgePayload::Payload(ep) => format!("Payload({ep:?})"),
1225                    #[cfg(feature = "gpu")]
1226                    EdgePayload::GpuImage(_) => "GpuImage".to_string(),
1227                    EdgePayload::Value(v) => format!("Value({v:?})"),
1228                    EdgePayload::Bytes(_) => "Bytes".to_string(),
1229                    EdgePayload::Unit => "Unit".to_string(),
1230                };
1231                eprintln!(
1232                    "daedalus-runtime: input mismatch node={} port={} expected={} payload={}",
1233                    self.node_id,
1234                    port,
1235                    std::any::type_name::<T>(),
1236                    desc
1237                );
1238            }
1239            self.restore_input(idx, port, payload);
1240        }
1241        out
1242    }
1243
1244    /// Collect all typed Any payloads for a port (in arrival order).
1245    pub fn get_any_all<T: Any + Clone + Send + Sync>(&self, port: &str) -> Vec<T> {
1246        #[cfg(feature = "gpu")]
1247        let want = TypeId::of::<T>();
1248        let mut out: Vec<T> = Vec::new();
1249        for p in self.inputs_for(port) {
1250            match &p.inner {
1251                EdgePayload::Any(a) => {
1252                    if let Some(v) = a
1253                        .downcast_ref::<T>()
1254                        .cloned()
1255                        .or_else(|| self.coerce_const_any::<T>(a.as_ref()))
1256                    {
1257                        out.push(v);
1258                    }
1259                }
1260                #[cfg(feature = "gpu")]
1261                EdgePayload::Payload(ep) => {
1262                    if let Some(img) = ep.clone_cpu::<DynamicImage>()
1263                        && (want == TypeId::of::<DynamicImage>()
1264                            || want == TypeId::of::<GrayImage>()
1265                            || want == TypeId::of::<GrayAlphaImage>()
1266                            || want == TypeId::of::<RgbImage>()
1267                            || want == TypeId::of::<RgbaImage>())
1268                        && let Some(v) = Self::dynamic_image_to_t::<T>(img)
1269                    {
1270                        out.push(v);
1271                    }
1272                }
1273                #[cfg(feature = "gpu")]
1274                EdgePayload::GpuImage(h)
1275                    if TypeId::of::<T>() == TypeId::of::<daedalus_gpu::GpuImageHandle>() =>
1276                {
1277                    let any_ref: &dyn Any = h;
1278                    if let Some(v) = any_ref.downcast_ref::<T>().cloned() {
1279                        out.push(v);
1280                    }
1281                }
1282                _ => {}
1283            }
1284        }
1285        out
1286    }
1287
1288    /// Collect typed Any payloads for indexed fan-in ports `{prefix}{N}` ordered by `N`.
1289    ///
1290    /// Example: `prefix="in"` collects from `in0`, `in1`, ... in numeric order.
1291    pub fn get_any_all_fanin<T: Any + Clone + Send + Sync>(&self, prefix: &str) -> Vec<T> {
1292        self.get_any_all_fanin_indexed::<T>(prefix)
1293            .into_iter()
1294            .map(|(_, v)| v)
1295            .collect()
1296    }
1297
1298    /// Collect typed Any payloads for indexed fan-in ports `{prefix}{N}` ordered by `N`,
1299    /// preserving the parsed index.
1300    pub fn get_any_all_fanin_indexed<T: Any + Clone + Send + Sync>(&self, prefix: &str) -> Vec<(u32, T)> {
1301        let mut ports: BTreeMap<u32, String> = BTreeMap::new();
1302        for (port, _) in &self.inputs {
1303            if let Some(idx) = parse_indexed_port(prefix, port) {
1304                ports.entry(idx).or_insert_with(|| port.clone());
1305            }
1306        }
1307        let mut out = Vec::with_capacity(ports.len());
1308        for (idx, port) in ports {
1309            if let Some(v) = self.get_any::<T>(&port) {
1310                out.push((idx, v));
1311            }
1312        }
1313        out
1314    }
1315
1316    fn coerce_const_any<T: Any + Clone>(&self, v: &dyn Any) -> Option<T> {
1317        use daedalus_data::model::Value as V;
1318
1319        if let Some(i) = v
1320            .downcast_ref::<i64>()
1321            .copied()
1322        {
1323            return Self::coerce_from_i64::<T>(i)
1324                .or_else(|| self.coerce_via_registry::<T>(&V::Int(i)));
1325        }
1326
1327        if let Some(f) = v
1328            .downcast_ref::<f64>()
1329            .copied()
1330        {
1331            return Self::coerce_from_f64::<T>(f)
1332                .or_else(|| self.coerce_via_registry::<T>(&V::Float(f)));
1333        }
1334
1335        if let Some(b) = v
1336            .downcast_ref::<bool>()
1337            .copied()
1338        {
1339            let any_ref: &dyn Any = &b;
1340            return any_ref
1341                .downcast_ref::<T>()
1342                .cloned()
1343                .or_else(|| self.coerce_via_registry::<T>(&V::Bool(b)));
1344        }
1345
1346        if let Some(s) = v
1347            .downcast_ref::<String>()
1348            .cloned()
1349        {
1350            let any_ref: &dyn Any = &s;
1351            return any_ref
1352                .downcast_ref::<T>()
1353                .cloned()
1354                .or_else(|| self.coerce_via_registry::<T>(&V::String(s.into())));
1355        }
1356
1357        if let Some(val) = v
1358            .downcast_ref::<daedalus_data::model::Value>()
1359            .cloned()
1360        {
1361            return self.coerce_from_value::<T>(&val);
1362        }
1363
1364        None
1365    }
1366
1367    fn coerce_via_registry<T: Any + Clone>(&self, v: &daedalus_data::model::Value) -> Option<T> {
1368        let key = std::any::type_name::<T>();
1369        let global = GLOBAL_CONST_COERCERS.get_or_init(new_const_coercer_map);
1370        let map = self.const_coercers.as_ref().unwrap_or(global);
1371        let guard = map.read().ok()?;
1372        let coercer = guard.get(key)?;
1373        let out = coercer(v)?;
1374        out.downcast::<T>().ok().map(|b| (*b).clone())
1375    }
1376
1377    fn coerce_from_i64<T: Any + Clone>(v: i64) -> Option<T> {
1378        use std::any::TypeId;
1379        let want = TypeId::of::<T>();
1380
1381        macro_rules! cast_int {
1382            ($t:ty) => {{
1383                if want == TypeId::of::<$t>() {
1384                    let out: $t = <$t>::try_from(v).ok()?;
1385                    let any_ref: &dyn Any = &out;
1386                    return any_ref.downcast_ref::<T>().cloned();
1387                }
1388            }};
1389        }
1390
1391        cast_int!(i8);
1392        cast_int!(i16);
1393        cast_int!(i32);
1394        cast_int!(i64);
1395        cast_int!(isize);
1396        cast_int!(u8);
1397        cast_int!(u16);
1398        cast_int!(u32);
1399        cast_int!(u64);
1400        cast_int!(usize);
1401
1402        if want == TypeId::of::<f32>() {
1403            let out = v as f32;
1404            let any_ref: &dyn Any = &out;
1405            return any_ref.downcast_ref::<T>().cloned();
1406        }
1407        if want == TypeId::of::<f64>() {
1408            let out = v as f64;
1409            let any_ref: &dyn Any = &out;
1410            return any_ref.downcast_ref::<T>().cloned();
1411        }
1412
1413        None
1414    }
1415
1416    fn coerce_from_f64<T: Any + Clone>(v: f64) -> Option<T> {
1417        use std::any::TypeId;
1418        let want = TypeId::of::<T>();
1419
1420        if want == TypeId::of::<f32>() {
1421            let out = v as f32;
1422            let any_ref: &dyn Any = &out;
1423            return any_ref.downcast_ref::<T>().cloned();
1424        }
1425        if want == TypeId::of::<f64>() {
1426            let out = v;
1427            let any_ref: &dyn Any = &out;
1428            return any_ref.downcast_ref::<T>().cloned();
1429        }
1430
1431        // Allow float -> int when it's integral.
1432        if v.fract() == 0.0 {
1433            let as_i = v as i64;
1434            return Self::coerce_from_i64::<T>(as_i);
1435        }
1436
1437        None
1438    }
1439
1440    /// Best-effort typed accessor that supports constant defaults.
1441    ///
1442    /// Daedalus graph JSON encodes constant inputs as `daedalus_data::model::Value`. At runtime we
1443    /// inject these into the node as `Any` payloads (e.g. `i64`, `f64`, `bool`, `String`, or
1444    /// `Value`). This helper bridges the gap so node handlers can request their native types
1445    /// (e.g. `u32`, `f32`, enums) without failing a `TypeId` downcast.
1446    ///
1447    /// This is intended for scalar/enum config inputs, not large payload types (e.g. image buffers).
1448    pub fn get_typed<T>(&self, port: &str) -> Option<T>
1449    where
1450        T: Any + Clone + Send + Sync,
1451    {
1452        let want = std::any::TypeId::of::<T>();
1453        let initial = self.get_any::<T>(port);
1454        if log::log_enabled!(log::Level::Debug) && want == std::any::TypeId::of::<image::DynamicImage>() {
1455            log::debug!(
1456                "get_typed dynamic_image port={} has_any={}",
1457                port,
1458                initial.is_some()
1459            );
1460        }
1461        if let Some(v) = initial {
1462            return Some(v);
1463        }
1464
1465        // Common scalar carriers produced by const injection.
1466        if let Some(v) = self.get_any::<i64>(port) {
1467            if let Some(t) = Self::coerce_from_i64::<T>(v) {
1468                return Some(t);
1469            }
1470            if let Some(t) = self.coerce_via_registry::<T>(&Value::Int(v)) {
1471                return Some(t);
1472            }
1473        }
1474        if let Some(v) = self.get_any::<f64>(port) {
1475            if let Some(t) = Self::coerce_from_f64::<T>(v) {
1476                return Some(t);
1477            }
1478            if let Some(t) = self.coerce_via_registry::<T>(&Value::Float(v)) {
1479                return Some(t);
1480            }
1481        }
1482        if let Some(v) = self.get_any::<bool>(port) {
1483            let any_ref: &dyn Any = &v;
1484            if let Some(t) = any_ref.downcast_ref::<T>().cloned() {
1485                return Some(t);
1486            }
1487            if let Some(t) = self.coerce_via_registry::<T>(&Value::Bool(v)) {
1488                return Some(t);
1489            }
1490        }
1491        if let Some(v) = self.get_any::<String>(port) {
1492            let any_ref: &dyn Any = &v;
1493            if let Some(t) = any_ref.downcast_ref::<T>().cloned() {
1494                return Some(t);
1495            }
1496            if let Some(t) = self.coerce_via_registry::<T>(&Value::String(v.into())) {
1497                return Some(t);
1498            }
1499        }
1500        if let Some(v) = self.get_any::<daedalus_data::model::Value>(port)
1501            && let Some(t) = self.coerce_from_value::<T>(&v) {
1502                return Some(t);
1503            }
1504
1505        if let Some(v) = self.get_value(port)
1506            && let Some(t) = self.coerce_from_value::<T>(v) {
1507                return Some(t);
1508            }
1509
1510        None
1511    }
1512
1513    fn coerce_from_value<T: Any + Clone>(&self, v: &daedalus_data::model::Value) -> Option<T> {
1514        use daedalus_data::model::Value as V;
1515
1516        let any_ref: &dyn Any = v;
1517        if let Some(t) = any_ref.downcast_ref::<T>().cloned() {
1518            return Some(t);
1519        }
1520
1521        if let Some(name) = Self::enum_name_from_index::<T>(v)
1522            && let Some(t) = {
1523                let any_ref: &dyn Any = &name;
1524                any_ref.downcast_ref::<T>().cloned()
1525            }
1526        {
1527            return Some(t);
1528        }
1529
1530        let j: Option<daedalus_data::model::Value> = match v {
1531            V::Int(i) => {
1532                if let Some(t) = Self::coerce_from_i64::<T>(*i) {
1533                    return Some(t);
1534                }
1535                Some(V::Int(*i))
1536            }
1537            V::Float(f) => {
1538                if let Some(t) = Self::coerce_from_f64::<T>(*f) {
1539                    return Some(t);
1540                }
1541                Some(V::Float(*f))
1542            }
1543            V::Bool(b) => {
1544                let any_ref: &dyn Any = b;
1545                if let Some(t) = any_ref.downcast_ref::<T>().cloned() {
1546                    return Some(t);
1547                }
1548                Some(V::Bool(*b))
1549            }
1550            V::String(s) => {
1551                let owned = s.clone().into_owned();
1552                let any_ref: &dyn Any = &owned;
1553                if let Some(t) = any_ref.downcast_ref::<T>().cloned() {
1554                    return Some(t);
1555                }
1556                // Enum coercion is handled via the registered const coercers.
1557                Some(V::String(owned.into()))
1558            }
1559            // Allow enum coercers to handle enum payloads.
1560            V::Enum(_) => Some(v.clone()),
1561            other => Some(other.clone()),
1562        };
1563
1564        j.as_ref().and_then(|v| self.coerce_via_registry::<T>(v))
1565    }
1566
1567    fn enum_name_from_index<T: Any>(v: &daedalus_data::model::Value) -> Option<String> {
1568        let Value::Int(raw) = v else { return None };
1569        if *raw < 0 {
1570            return None;
1571        }
1572        let idx = *raw as usize;
1573        let expr = typing::override_type_expr::<T>()
1574            .or_else(|| typing::lookup_type_by_rust_name(std::any::type_name::<T>()));
1575        match expr {
1576            Some(TypeExpr::Enum(variants)) => variants.get(idx).map(|ev| ev.name.clone()),
1577            _ => None,
1578        }
1579    }
1580
1581    /// Get a raw `Any` reference for capability-based dispatch.
1582    pub fn get_any_raw(&self, port: &str) -> Option<&dyn Any> {
1583        self.inputs_for(port).find_map(|p| match &p.inner {
1584            EdgePayload::Any(a) => Some(a.as_ref() as &dyn Any),
1585            _ => None,
1586        })
1587    }
1588
1589    #[cfg(feature = "gpu")]
1590    pub fn get_erased_payload(&self, port: &str) -> Option<&daedalus_gpu::ErasedPayload> {
1591        self.inputs_for(port).find_map(|p| match &p.inner {
1592            EdgePayload::Payload(ep) => Some(ep),
1593            _ => None,
1594        })
1595    }
1596
1597    #[cfg(feature = "gpu")]
1598    pub fn get_payload<T>(&self, port: &str) -> Option<daedalus_gpu::Payload<T>>
1599    where
1600        T: daedalus_gpu::GpuSendable + Clone + Send + Sync + 'static,
1601        T::GpuRepr: Clone + Send + Sync + 'static,
1602    {
1603        // Prefer GPU only when:
1604        // - the node declares a GPU affinity, AND
1605        // - a GPU context is actually available in this execution.
1606        //
1607        // This matters for `GpuPreferred` nodes: the executor may fall back to CPU when no GPU
1608        // is available, in which case payload decoding must also accept CPU values.
1609        let wants_gpu = self.gpu.is_some()
1610            && matches!(
1611                self.target_compute,
1612                daedalus_planner::ComputeAffinity::GpuPreferred
1613                    | daedalus_planner::ComputeAffinity::GpuRequired
1614            );
1615        for p in self.inputs_for(port) {
1616            match &p.inner {
1617                EdgePayload::Payload(ep) => {
1618                    if wants_gpu {
1619                        if let Some(g) = ep.clone_gpu::<T>() {
1620                            return Some(daedalus_gpu::Payload::Gpu(g));
1621                        }
1622                        if let Some(cpu) = ep.clone_cpu::<T>()
1623                            && let Some(ctx) = &self.gpu
1624                            && let Ok(handle) = cpu.upload(ctx)
1625                        {
1626                            return Some(daedalus_gpu::Payload::Gpu(handle));
1627                        }
1628                    } else {
1629                        if let Some(cpu) = ep.clone_cpu::<T>() {
1630                            return Some(daedalus_gpu::Payload::Cpu(cpu));
1631                        }
1632                        if let Some(g) = ep.clone_gpu::<T>()
1633                            && let Some(ctx) = &self.gpu
1634                            && let Ok(cpu) = T::download(&g, ctx)
1635                        {
1636                            return Some(daedalus_gpu::Payload::Cpu(cpu));
1637                        }
1638                    }
1639                }
1640            EdgePayload::Any(a) => {
1641                    // Plugins sometimes pass a `Payload<T>` through `Any` (e.g. when a node
1642                    // signature uses a type alias that the `#[node]` macro can't see through).
1643                    // Accept that representation here so downstream `Payload<T>` inputs can still
1644                    // be satisfied.
1645                    let payload_any: Option<daedalus_gpu::Payload<T>> = a
1646                        .downcast_ref::<daedalus_gpu::Payload<T>>()
1647                        .cloned();
1648                    if let Some(payload_any) = payload_any {
1649                        if wants_gpu {
1650                            match payload_any {
1651                                daedalus_gpu::Payload::Gpu(g) => {
1652                                    return Some(daedalus_gpu::Payload::Gpu(g));
1653                                }
1654                                daedalus_gpu::Payload::Cpu(cpu) => {
1655                                    if let Some(ctx) = &self.gpu
1656                                        && let Ok(handle) = cpu.upload(ctx)
1657                                    {
1658                                        return Some(daedalus_gpu::Payload::Gpu(handle));
1659                                    }
1660                                }
1661                            }
1662                        } else {
1663                            match payload_any {
1664                                daedalus_gpu::Payload::Cpu(cpu) => {
1665                                    return Some(daedalus_gpu::Payload::Cpu(cpu));
1666                                }
1667                                daedalus_gpu::Payload::Gpu(g) => {
1668                                    if let Some(ctx) = &self.gpu
1669                                        && let Ok(cpu) = T::download(&g, ctx)
1670                                    {
1671                                        return Some(daedalus_gpu::Payload::Cpu(cpu));
1672                                    }
1673                                }
1674                            }
1675                        }
1676                    }
1677
1678                    if wants_gpu {
1679                        let cpu = a
1680                            .downcast_ref::<T>()
1681                            .cloned();
1682                        if let Some(cpu) = cpu
1683                            && let Some(ctx) = &self.gpu
1684                            && let Ok(handle) = cpu.upload(ctx)
1685                        {
1686                            return Some(daedalus_gpu::Payload::Gpu(handle));
1687                        }
1688                        if let Some(converted) = crate::convert::convert_arc::<T>(a)
1689                            && let Some(ctx) = &self.gpu
1690                            && let Ok(handle) = converted.upload(ctx)
1691                        {
1692                            return Some(daedalus_gpu::Payload::Gpu(handle));
1693                        }
1694                        if let Some(ep) = a.downcast_ref::<daedalus_gpu::ErasedPayload>()
1695                            && let Ok(uploaded) = ep.upload(self.gpu.as_ref()?)
1696                            && let Some(g) = uploaded.as_gpu::<T>()
1697                        {
1698                            return Some(daedalus_gpu::Payload::Gpu(g.clone()));
1699                        }
1700                    } else {
1701                        let cpu = a
1702                            .downcast_ref::<T>()
1703                            .cloned();
1704                        if let Some(cpu) = cpu {
1705                            return Some(daedalus_gpu::Payload::Cpu(cpu));
1706                        }
1707                        if let Some(converted) = crate::convert::convert_arc::<T>(a) {
1708                            return Some(daedalus_gpu::Payload::Cpu(converted));
1709                        }
1710                        let g = a
1711                            .downcast_ref::<T::GpuRepr>()
1712                            .cloned();
1713                        if let Some(g) = g
1714                            && let Some(ctx) = &self.gpu
1715                            && let Ok(cpu) = T::download(&g, ctx)
1716                        {
1717                            return Some(daedalus_gpu::Payload::Cpu(cpu));
1718                        }
1719                        if let Some(ep) = a.downcast_ref::<daedalus_gpu::ErasedPayload>()
1720                            && let Ok(downloaded) = ep.download(self.gpu.as_ref()?)
1721                            && let Some(cpu) = downloaded.as_cpu::<T>()
1722                        {
1723                            return Some(daedalus_gpu::Payload::Cpu(cpu.clone()));
1724                        }
1725                    }
1726                }
1727                EdgePayload::GpuImage(h) => {
1728                    if TypeId::of::<T::GpuRepr>() == TypeId::of::<daedalus_gpu::GpuImageHandle>() {
1729                        if wants_gpu {
1730                            let any_ref: &dyn Any = h;
1731                            if let Some(repr) = any_ref.downcast_ref::<T::GpuRepr>() {
1732                                return Some(daedalus_gpu::Payload::Gpu(repr.clone()));
1733                            }
1734                        } else if let Some(ctx) = &self.gpu {
1735                            let any_ref: &dyn Any = h;
1736                            if let Some(repr) = any_ref.downcast_ref::<T::GpuRepr>()
1737                                && let Ok(cpu) = T::download(repr, ctx)
1738                            {
1739                                return Some(daedalus_gpu::Payload::Cpu(cpu));
1740                            }
1741                        }
1742                    }
1743                }
1744                _ => {}
1745            }
1746        }
1747        None
1748    }
1749
1750    #[cfg(feature = "gpu")]
1751    pub fn get_payload_mut<T>(&mut self, port: &str) -> Option<daedalus_gpu::Payload<T>>
1752    where
1753        T: daedalus_gpu::GpuSendable + Clone + Send + Sync + 'static,
1754        T::GpuRepr: Clone + Send + Sync + 'static,
1755    {
1756        let wants_gpu = self.gpu.is_some()
1757            && matches!(
1758                self.target_compute,
1759                daedalus_planner::ComputeAffinity::GpuPreferred
1760                    | daedalus_planner::ComputeAffinity::GpuRequired
1761            );
1762        let (idx, mut payload) = self.take_input(port)?;
1763
1764        let mut out: Option<daedalus_gpu::Payload<T>> = None;
1765        match std::mem::replace(&mut payload.inner, EdgePayload::Unit) {
1766            EdgePayload::Payload(ep) => {
1767                let mut ep_opt = Some(ep);
1768                if wants_gpu {
1769                    if let Some(ep) = ep_opt.as_ref() {
1770                        if let Some(g) = ep.clone_gpu::<T>() {
1771                            out = Some(daedalus_gpu::Payload::Gpu(g));
1772                        } else if let Some(cpu) = ep.clone_cpu::<T>()
1773                            && let Some(ctx) = &self.gpu
1774                            && let Ok(handle) = cpu.upload(ctx)
1775                        {
1776                            out = Some(daedalus_gpu::Payload::Gpu(handle));
1777                        }
1778                    }
1779                } else if let Some(ep) = ep_opt.take() {
1780                    match ep.take_cpu::<T>() {
1781                        Ok(cpu) => out = Some(daedalus_gpu::Payload::Cpu(cpu)),
1782                        Err(rest) => {
1783                            ep_opt = Some(rest);
1784                            if let Some(ep) = ep_opt.as_ref() {
1785                                if let Some(cpu) = ep.clone_cpu::<T>() {
1786                                    out = Some(daedalus_gpu::Payload::Cpu(cpu));
1787                                } else if let Some(g) = ep.clone_gpu::<T>()
1788                                    && let Some(ctx) = &self.gpu
1789                                    && let Ok(cpu) = T::download(&g, ctx)
1790                                {
1791                                    out = Some(daedalus_gpu::Payload::Cpu(cpu));
1792                                }
1793                            }
1794                        }
1795                    }
1796                }
1797                if out.is_none()
1798                    && let Some(ep) = ep_opt
1799                {
1800                    payload.inner = EdgePayload::Payload(ep);
1801                }
1802            }
1803            EdgePayload::Any(a) => {
1804                let any = a;
1805                match Arc::downcast::<daedalus_gpu::Payload<T>>(any) {
1806                    Ok(arc) => {
1807                        let payload_any = match Arc::try_unwrap(arc) {
1808                            Ok(v) => v,
1809                            Err(arc) => (*arc).clone(),
1810                        };
1811                        if wants_gpu {
1812                            match payload_any {
1813                                daedalus_gpu::Payload::Gpu(g) => out = Some(daedalus_gpu::Payload::Gpu(g)),
1814                                daedalus_gpu::Payload::Cpu(cpu) => {
1815                                    if let Some(ctx) = &self.gpu
1816                                        && let Ok(handle) = cpu.upload(ctx)
1817                                    {
1818                                        out = Some(daedalus_gpu::Payload::Gpu(handle));
1819                                    }
1820                                }
1821                            }
1822                        } else {
1823                            match payload_any {
1824                                daedalus_gpu::Payload::Cpu(cpu) => out = Some(daedalus_gpu::Payload::Cpu(cpu)),
1825                                daedalus_gpu::Payload::Gpu(g) => {
1826                                    if let Some(ctx) = &self.gpu
1827                                        && let Ok(cpu) = T::download(&g, ctx)
1828                                    {
1829                                        out = Some(daedalus_gpu::Payload::Cpu(cpu));
1830                                    }
1831                                }
1832                            }
1833                        }
1834                    }
1835                    Err(any) => {
1836                        if wants_gpu {
1837                            match Arc::downcast::<T>(any) {
1838                                Ok(arc) => {
1839                                    let cpu = match Arc::try_unwrap(arc) {
1840                                        Ok(v) => v,
1841                                        Err(arc) => (*arc).clone(),
1842                                    };
1843                                    if let Some(ctx) = &self.gpu
1844                                        && let Ok(handle) = cpu.upload(ctx)
1845                                    {
1846                                        out = Some(daedalus_gpu::Payload::Gpu(handle));
1847                                    }
1848                                }
1849                                Err(any) => {
1850                                    payload.inner = EdgePayload::Any(any);
1851                                }
1852                            }
1853                        } else {
1854                            match Arc::downcast::<T>(any) {
1855                                Ok(arc) => {
1856                                    let cpu = match Arc::try_unwrap(arc) {
1857                                        Ok(v) => v,
1858                                        Err(arc) => (*arc).clone(),
1859                                    };
1860                                    out = Some(daedalus_gpu::Payload::Cpu(cpu));
1861                                }
1862                                Err(any) => {
1863                                    payload.inner = EdgePayload::Any(any);
1864                                }
1865                            }
1866                        }
1867                    }
1868                }
1869            }
1870            EdgePayload::GpuImage(h) => {
1871                payload.inner = EdgePayload::GpuImage(h);
1872            }
1873            other => {
1874                payload.inner = other;
1875            }
1876        }
1877
1878        if out.is_none() {
1879            self.restore_input(idx, port, payload);
1880        }
1881        out
1882    }
1883
1884    #[cfg(feature = "gpu")]
1885    fn convert_incoming(
1886        mut payload: CorrelatedPayload,
1887        edge_idx: usize,
1888        entries: &HashSet<usize>,
1889        exits: &HashSet<usize>,
1890        gpu: Option<&daedalus_gpu::GpuContextHandle>,
1891        telemetry: &mut ExecutionTelemetry,
1892    ) -> CorrelatedPayload {
1893        let Some(ctx) = gpu else {
1894            return payload;
1895        };
1896        if entries.contains(&edge_idx) {
1897            telemetry.record_edge_gpu_transfer(edge_idx, true);
1898            payload.inner = match payload.inner {
1899                EdgePayload::Any(ref a) => {
1900                    if let Some(ep) = a.downcast_ref::<daedalus_gpu::ErasedPayload>() {
1901                        ep.upload(ctx)
1902                            .map(EdgePayload::Payload)
1903                            .unwrap_or_else(|_| EdgePayload::Any(a.clone()))
1904                    } else if let Some(img) = a.downcast_ref::<daedalus_gpu::GpuImageHandle>() {
1905                        EdgePayload::GpuImage(img.clone())
1906                    } else {
1907                        EdgePayload::Any(a.clone())
1908                    }
1909                }
1910                EdgePayload::Payload(ref ep) => ep
1911                    .upload(ctx)
1912                    .map(EdgePayload::Payload)
1913                    .unwrap_or_else(|_| EdgePayload::Payload(ep.clone())),
1914                other => other,
1915            };
1916        } else if exits.contains(&edge_idx) {
1917            telemetry.record_edge_gpu_transfer(edge_idx, false);
1918            payload.inner = match payload.inner {
1919                EdgePayload::Any(ref a) => {
1920                    if let Some(ep) = a.downcast_ref::<daedalus_gpu::ErasedPayload>() {
1921                        ep.download(ctx)
1922                            .map(EdgePayload::Payload)
1923                            .unwrap_or_else(|_| EdgePayload::Any(a.clone()))
1924                    } else {
1925                        EdgePayload::Any(a.clone())
1926                    }
1927                }
1928                EdgePayload::Payload(ref ep) => ep
1929                    .download(ctx)
1930                    .map(EdgePayload::Payload)
1931                    .unwrap_or_else(|_| EdgePayload::Payload(ep.clone())),
1932                EdgePayload::GpuImage(h) => EdgePayload::GpuImage(h),
1933                other => other,
1934            };
1935        }
1936        payload
1937    }
1938
1939    /// Convenience accessor for value payloads.
1940    /// Get a structured `Value` payload for a port.
1941    ///
1942    /// ```no_run
1943    /// use daedalus_runtime::io::NodeIo;
1944    /// fn handler(io: &NodeIo) {
1945    ///     let _ = io.get_value("in");
1946    /// }
1947    /// ```
1948    pub fn get_value(&self, port: &str) -> Option<&Value> {
1949        self.inputs_for(port).find_map(|p| match &p.inner {
1950            EdgePayload::Value(v) => Some(v),
1951            _ => None,
1952        })
1953    }
1954
1955    pub fn get_int(&self, port: &str) -> Option<i64> {
1956        self.get_value(port).and_then(|v| match v {
1957            daedalus_data::model::Value::Int(i) => Some(*i),
1958            _ => None,
1959        })
1960    }
1961
1962    pub fn get_float(&self, port: &str) -> Option<f64> {
1963        self.get_value(port).and_then(|v| match v {
1964            daedalus_data::model::Value::Float(f) => Some(*f),
1965            _ => None,
1966        })
1967    }
1968
1969    /// Group inputs by port name (preserves encounter order per port).
1970    pub fn inputs_grouped(&self) -> Vec<(String, Vec<&CorrelatedPayload>)> {
1971        let mut groups: Vec<(String, Vec<&CorrelatedPayload>)> = Vec::new();
1972        for (port, payload) in &self.inputs {
1973            if let Some((_, vec)) = groups.iter_mut().find(|(p, _)| p == port) {
1974                vec.push(payload);
1975            } else {
1976                groups.push((port.clone(), vec![payload]));
1977            }
1978        }
1979        groups
1980    }
1981
1982    /// Flush is a no-op now since we apply immediately; kept for symmetry.
1983    pub fn flush(&mut self) -> Result<(), crate::executor::ExecuteError> {
1984        Ok(())
1985    }
1986}
1987
1988fn edge_payload_desc(payload: &EdgePayload) -> String {
1989    match payload {
1990        EdgePayload::Any(a) => format!("Any({})", std::any::type_name_of_val(a.as_ref())),
1991        #[cfg(feature = "gpu")]
1992        EdgePayload::Payload(ep) => format!("Payload({ep:?})"),
1993        #[cfg(feature = "gpu")]
1994        EdgePayload::GpuImage(_) => "GpuImage".to_string(),
1995        EdgePayload::Value(v) => format!("Value({v:?})"),
1996        EdgePayload::Bytes(_) => "Bytes".to_string(),
1997        EdgePayload::Unit => "Unit".to_string(),
1998    }
1999}
2000
2001fn align_drained_inputs(
2002    drained: Vec<DrainedInput>,
2003    sync_groups: &[SyncGroup],
2004) -> (Vec<(String, CorrelatedPayload)>, Vec<DrainedInput>, bool) {
2005    if sync_groups.is_empty() {
2006        let inputs = drained
2007            .into_iter()
2008            .map(|d| (d.port, d.payload))
2009            .collect::<Vec<_>>();
2010        return (inputs, Vec::new(), true);
2011    }
2012
2013    let mut grouped_ports: std::collections::HashSet<String> = std::collections::HashSet::new();
2014    for group in sync_groups {
2015        if group.ports.is_empty() {
2016            continue;
2017        }
2018        for port in &group.ports {
2019            grouped_ports.insert(port.clone());
2020        }
2021    }
2022
2023    let mut per_port: std::collections::HashMap<String, VecDeque<DrainedInput>> =
2024        std::collections::HashMap::new();
2025    for item in drained {
2026        per_port
2027            .entry(item.port.clone())
2028            .or_default()
2029            .push_back(item);
2030    }
2031
2032    let mut selected: Vec<DrainedInput> = Vec::new();
2033    let mut all_groups_ready = true;
2034
2035    for group in sync_groups {
2036        if group.ports.is_empty() {
2037            // An empty sync group is treated as a no-op. This is useful for callers that
2038            // intentionally disable implicit sync behavior without requiring correlation alignment.
2039            continue;
2040        }
2041        match group.policy {
2042            SyncPolicy::Latest => {
2043                // Require every port to have at least one payload. Take the newest and drop older.
2044                for port in &group.ports {
2045                    let Some(q) = per_port.get_mut(port) else {
2046                        all_groups_ready = false;
2047                        break;
2048                    };
2049                    if q.is_empty() {
2050                        all_groups_ready = false;
2051                        break;
2052                    }
2053                }
2054                if !all_groups_ready {
2055                    break;
2056                }
2057                for port in &group.ports {
2058                    if let Some(q) = per_port.get_mut(port)
2059                        && let Some(payload) = q.pop_back()
2060                    {
2061                        q.clear();
2062                        selected.push(payload);
2063                    }
2064                }
2065            }
2066            SyncPolicy::AllReady | SyncPolicy::ZipByTag => {
2067                // Find the oldest correlation id present across all ports, then take one per port.
2068                let mut common: Option<std::collections::HashSet<u64>> = None;
2069                for port in &group.ports {
2070                    let Some(q) = per_port.get(port) else {
2071                        all_groups_ready = false;
2072                        break;
2073                    };
2074                    let ids: std::collections::HashSet<u64> =
2075                        q.iter().map(|cp| cp.payload.correlation_id).collect();
2076                    common = match common {
2077                        None => Some(ids),
2078                        Some(mut acc) => {
2079                            acc.retain(|id| ids.contains(id));
2080                            Some(acc)
2081                        }
2082                    };
2083                    if common.as_ref().is_some_and(|acc| acc.is_empty()) {
2084                        all_groups_ready = false;
2085                        break;
2086                    }
2087                }
2088                if !all_groups_ready {
2089                    break;
2090                }
2091                let Some(common) = common else {
2092                    all_groups_ready = false;
2093                    break;
2094                };
2095                let Some(target_id) = common.iter().copied().min() else {
2096                    all_groups_ready = false;
2097                    break;
2098                };
2099
2100                for port in &group.ports {
2101                    if let Some(q) = per_port.get_mut(port)
2102                        && let Some(idx) = q
2103                            .iter()
2104                            .position(|cp| cp.payload.correlation_id == target_id)
2105                    {
2106                        let payload = q.remove(idx).unwrap();
2107                        selected.push(payload);
2108                    } else {
2109                        all_groups_ready = false;
2110                        break;
2111                    }
2112                }
2113                if !all_groups_ready {
2114                    break;
2115                }
2116            }
2117        }
2118    }
2119
2120    if !all_groups_ready {
2121        // Node should not fire: requeue everything we drained and return no inputs.
2122        let mut leftovers: Vec<DrainedInput> = Vec::new();
2123        leftovers.extend(selected);
2124        for (_, mut q) in per_port {
2125            while let Some(item) = q.pop_front() {
2126                leftovers.push(item);
2127            }
2128        }
2129        return (Vec::new(), leftovers, false);
2130    }
2131
2132    // Fire: emit selected group payloads + any ungrouped payloads.
2133    let mut inputs: Vec<(String, CorrelatedPayload)> =
2134        selected.into_iter().map(|d| (d.port, d.payload)).collect();
2135
2136    // Preserve original per-port order for ungrouped payloads.
2137    let mut leftovers: Vec<DrainedInput> = Vec::new();
2138    for (port, mut q) in per_port {
2139        if grouped_ports.contains(&port) {
2140            while let Some(item) = q.pop_front() {
2141                leftovers.push(item);
2142            }
2143            continue;
2144        }
2145        while let Some(item) = q.pop_front() {
2146            inputs.push((item.port, item.payload));
2147        }
2148    }
2149
2150    (inputs, leftovers, true)
2151}
2152
2153fn requeue_drained(
2154    leftovers: Vec<DrainedInput>,
2155    queues: &Arc<Vec<EdgeStorage>>,
2156    edges: &[EdgeInfo],
2157) {
2158    if leftovers.is_empty() {
2159        return;
2160    }
2161
2162    let mut per_edge: HashMap<usize, Vec<CorrelatedPayload>> = HashMap::new();
2163    for item in leftovers {
2164        per_edge
2165            .entry(item.edge_idx)
2166            .or_default()
2167            .push(item.payload);
2168    }
2169
2170    for (edge_idx, payloads) in per_edge {
2171        let Some(storage) = queues.get(edge_idx) else {
2172            continue;
2173        };
2174        let policy = edges
2175            .get(edge_idx)
2176            .map(|(_, _, _, _, policy)| policy)
2177            .cloned()
2178            .unwrap_or(EdgePolicyKind::Fifo);
2179        match storage {
2180            EdgeStorage::Locked(q_arc) => {
2181                if let Ok(mut q) = q_arc.lock() {
2182                    q.ensure_policy(&policy);
2183                    for mut payload in payloads {
2184                        payload.enqueued_at = Instant::now();
2185                        let _ = q.push(&policy, payload);
2186                    }
2187                }
2188            }
2189            #[cfg(feature = "lockfree-queues")]
2190            EdgeStorage::BoundedLf(q) => {
2191                for mut payload in payloads {
2192                    payload.enqueued_at = Instant::now();
2193                    if q.push(payload.clone()).is_err() {
2194                        let _ = q.pop();
2195                        let _ = q.push(payload);
2196                    }
2197                }
2198            }
2199        }
2200    }
2201}
2202
2203#[cfg(test)]
2204mod tests {
2205    use super::*;
2206    use crate::executor::ExecutionTelemetry;
2207    use crate::executor::queue::{EdgeQueue, EdgeStorage, RingBuf};
2208    use daedalus_planner::NodeRef;
2209    use serde::Deserialize;
2210    use std::collections::HashSet;
2211
2212    fn payload(v: i32, corr: u64) -> CorrelatedPayload {
2213        CorrelatedPayload {
2214            correlation_id: corr,
2215            inner: EdgePayload::Any(Arc::new(v)),
2216            enqueued_at: std::time::Instant::now(),
2217        }
2218    }
2219
2220    #[test]
2221    fn aligns_all_ready() {
2222        let drained = vec![
2223            DrainedInput {
2224                port: "a".into(),
2225                edge_idx: 0,
2226                payload: payload(1, 1),
2227            },
2228            DrainedInput {
2229                port: "a".into(),
2230                edge_idx: 0,
2231                payload: payload(2, 2),
2232            },
2233            DrainedInput {
2234                port: "b".into(),
2235                edge_idx: 1,
2236                payload: payload(10, 1),
2237            },
2238            DrainedInput {
2239                port: "b".into(),
2240                edge_idx: 1,
2241                payload: payload(20, 2),
2242            },
2243        ];
2244        let group = SyncGroup {
2245            name: "g".into(),
2246            policy: SyncPolicy::AllReady,
2247            backpressure: None,
2248            capacity: None,
2249            ports: vec!["a".into(), "b".into()],
2250        };
2251        let (out, leftovers, ready) = align_drained_inputs(drained, &[group]);
2252        assert!(ready);
2253        let vals: Vec<(String, i32)> = out
2254            .into_iter()
2255            .map(|(p, pl)| {
2256                (
2257                    p,
2258                    match pl.inner {
2259                        EdgePayload::Any(ref a) => *a.downcast_ref::<i32>().unwrap(),
2260                        _ => panic!("expected Any"),
2261                    },
2262                )
2263            })
2264            .collect();
2265        assert_eq!(vals, vec![("a".into(), 1), ("b".into(), 10)]);
2266
2267        let mut left_vals: Vec<(String, i32)> = leftovers
2268            .into_iter()
2269            .map(|d| {
2270                let v = match d.payload.inner {
2271                    EdgePayload::Any(ref a) => *a.downcast_ref::<i32>().unwrap(),
2272                    _ => panic!("expected Any"),
2273                };
2274                (d.port, v)
2275            })
2276            .collect();
2277        left_vals.sort_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2278        assert_eq!(left_vals, vec![("a".into(), 2), ("b".into(), 20)]);
2279    }
2280
2281    #[test]
2282    fn aligns_latest() {
2283        let drained = vec![
2284            DrainedInput {
2285                port: "a".into(),
2286                edge_idx: 0,
2287                payload: payload(1, 1),
2288            },
2289            DrainedInput {
2290                port: "a".into(),
2291                edge_idx: 0,
2292                payload: payload(2, 2),
2293            },
2294            DrainedInput {
2295                port: "b".into(),
2296                edge_idx: 1,
2297                payload: payload(10, 3),
2298            },
2299            DrainedInput {
2300                port: "b".into(),
2301                edge_idx: 1,
2302                payload: payload(20, 4),
2303            },
2304        ];
2305        let group = SyncGroup {
2306            name: "g".into(),
2307            policy: SyncPolicy::Latest,
2308            backpressure: None,
2309            capacity: None,
2310            ports: vec!["a".into(), "b".into()],
2311        };
2312        let (out, leftovers, ready) = align_drained_inputs(drained, &[group]);
2313        assert!(ready);
2314        assert!(leftovers.is_empty());
2315        let vals: Vec<(String, i32)> = out
2316            .into_iter()
2317            .map(|(p, pl)| {
2318                (
2319                    p,
2320                    match pl.inner {
2321                        EdgePayload::Any(ref a) => *a.downcast_ref::<i32>().unwrap(),
2322                        _ => panic!("expected Any"),
2323                    },
2324                )
2325            })
2326            .collect();
2327        assert_eq!(vals, vec![("a".into(), 2), ("b".into(), 20)]);
2328    }
2329
2330    #[test]
2331    fn aligns_zip_by_tag() {
2332        use daedalus_data::model::{StructFieldValue, Value};
2333        let tagged = |tag: &str, v: i32, corr: u64| {
2334            let fields = vec![
2335                StructFieldValue {
2336                    name: "tag".to_string(),
2337                    value: Value::String(tag.to_string().into()),
2338                },
2339                StructFieldValue {
2340                    name: "v".to_string(),
2341                    value: Value::Int(i64::from(v)),
2342                },
2343            ];
2344            CorrelatedPayload {
2345                correlation_id: corr,
2346                inner: EdgePayload::Value(Value::Struct(fields)),
2347                enqueued_at: std::time::Instant::now(),
2348            }
2349        };
2350        let inputs = vec![
2351            ("a".into(), tagged("x", 1, 1)),
2352            ("b".into(), tagged("y", 200, 2)),
2353            ("b".into(), tagged("x", 100, 1)),
2354            ("a".into(), tagged("y", 2, 2)),
2355        ];
2356        let group = SyncGroup {
2357            name: "g".into(),
2358            policy: SyncPolicy::ZipByTag,
2359            backpressure: None,
2360            capacity: None,
2361            ports: vec!["a".into(), "b".into()],
2362        };
2363        let drained = inputs
2364            .into_iter()
2365            .enumerate()
2366            .map(|(idx, (port, payload))| DrainedInput {
2367                port,
2368                edge_idx: idx,
2369                payload,
2370            })
2371            .collect::<Vec<_>>();
2372        let (out, leftovers, ready) = align_drained_inputs(drained, &[group]);
2373        assert!(ready);
2374        assert!(!out.is_empty());
2375        assert!(!leftovers.is_empty());
2376        let tags: Vec<String> = out
2377            .chunks(2)
2378            .map(|chunk| {
2379                chunk
2380                    .iter()
2381                    .map(|(_, p)| match &p.inner {
2382                        EdgePayload::Value(Value::Struct(fields)) => fields
2383                            .iter()
2384                            .find(|f| f.name == "tag")
2385                            .and_then(|f| match &f.value {
2386                                Value::String(s) => Some(s.to_string()),
2387                                _ => None,
2388                            })
2389                            .unwrap(),
2390                        _ => "missing".to_string(),
2391                    })
2392                    .next()
2393                    .unwrap()
2394            })
2395            .collect();
2396        assert_eq!(tags, vec!["x".to_string()]);
2397    }
2398
2399    #[test]
2400    fn not_ready_returns_no_inputs_and_requeues() {
2401        let drained = vec![DrainedInput {
2402            port: "a".into(),
2403            edge_idx: 0,
2404            payload: payload(1, 1),
2405        }];
2406        let group = SyncGroup {
2407            name: "g".into(),
2408            policy: SyncPolicy::AllReady,
2409            backpressure: None,
2410            capacity: None,
2411            ports: vec!["a".into(), "b".into()],
2412        };
2413        let (out, leftovers, ready) = align_drained_inputs(drained, &[group]);
2414        assert!(!ready);
2415        assert!(out.is_empty());
2416        assert_eq!(leftovers.len(), 1);
2417    }
2418
2419    #[test]
2420    fn port_override_applies_backpressure_and_capacity() {
2421        let queues = Arc::new(vec![EdgeStorage::Locked(Arc::new(std::sync::Mutex::new(
2422            EdgeQueue::Bounded {
2423                ring: RingBuf::new(5),
2424            },
2425        )))]);
2426        let edges = vec![(
2427            NodeRef(0),
2428            "out".to_string(),
2429            NodeRef(1),
2430            "in".to_string(),
2431            EdgePolicyKind::Bounded { cap: 5 },
2432        )];
2433        let sg = SyncGroup {
2434            name: "g".into(),
2435            policy: SyncPolicy::AllReady,
2436            backpressure: Some(BackpressureStrategy::ErrorOnOverflow),
2437            capacity: Some(1),
2438            ports: vec!["out".into()],
2439        };
2440        let warnings = Arc::new(std::sync::Mutex::new(HashSet::new()));
2441        let mut telem = ExecutionTelemetry::default();
2442
2443        #[cfg(feature = "gpu")]
2444        let gpu_entry_edges = HashSet::new();
2445        #[cfg(feature = "gpu")]
2446        let gpu_exit_edges = HashSet::new();
2447        #[cfg(feature = "gpu")]
2448        let payload_edges = HashSet::new();
2449
2450        let mut io = NodeIo::new(
2451            vec![],
2452            vec![0],
2453            &queues,
2454            &warnings,
2455            &edges,
2456            vec![sg],
2457            #[cfg(feature = "gpu")]
2458            &gpu_entry_edges,
2459            #[cfg(feature = "gpu")]
2460            &gpu_exit_edges,
2461            #[cfg(feature = "gpu")]
2462            &payload_edges,
2463            0,
2464            0,
2465            "node".into(),
2466            &mut telem,
2467            BackpressureStrategy::None,
2468            &[],
2469            None,
2470            None,
2471            #[cfg(feature = "gpu")]
2472            None,
2473            #[cfg(feature = "gpu")]
2474            daedalus_planner::ComputeAffinity::CpuOnly,
2475        );
2476
2477        io.push_any(Some("out"), 1i32);
2478        io.push_any(Some("out"), 2i32); // should trigger overflow with cap=1 + ErrorOnOverflow override
2479        assert!(telem.backpressure_events > 0);
2480    }
2481
2482    #[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)]
2483    #[serde(rename_all = "snake_case")]
2484    enum TestEnum {
2485        Auto,
2486        Cpu,
2487        Gpu,
2488    }
2489
2490    impl TestEnum {
2491        fn from_label(raw: &str) -> Option<Self> {
2492            match raw.trim().to_ascii_lowercase().as_str() {
2493                "auto" => Some(Self::Auto),
2494                "cpu" => Some(Self::Cpu),
2495                "gpu" => Some(Self::Gpu),
2496                _ => None,
2497            }
2498        }
2499    }
2500
2501    fn register_test_enum() {
2502        use daedalus_data::model::Value;
2503        daedalus_data::typing::register_enum::<TestEnum>(["auto", "cpu", "gpu"]);
2504        register_const_coercer::<TestEnum, _>(|v| match v {
2505            Value::Int(_) => NodeIo::enum_name_from_index::<TestEnum>(v)
2506                .and_then(|name| TestEnum::from_label(&name)),
2507            Value::String(s) => TestEnum::from_label(s),
2508            Value::Enum(ev) => TestEnum::from_label(&ev.name),
2509            _ => None,
2510        });
2511    }
2512
2513    #[cfg(feature = "gpu")]
2514    #[derive(Clone, Debug, PartialEq)]
2515    struct DummyPayload {
2516        value: i32,
2517    }
2518
2519    #[cfg(feature = "gpu")]
2520    impl daedalus_gpu::GpuSendable for DummyPayload {
2521        type GpuRepr = ();
2522    }
2523
2524    #[cfg(feature = "gpu")]
2525    fn make_io_with_payload(payload: CorrelatedPayload) -> NodeIo<'static> {
2526        let queues: &'static Arc<Vec<EdgeStorage>> = Box::leak(Box::new(Arc::new(vec![])));
2527        let edges: &'static [EdgeInfo] = Box::leak(Box::new(Vec::new()));
2528        let warnings: &'static Arc<std::sync::Mutex<std::collections::HashSet<String>>> =
2529            Box::leak(Box::new(Arc::new(std::sync::Mutex::new(HashSet::new()))));
2530        let telem: &'static mut ExecutionTelemetry = Box::leak(Box::new(ExecutionTelemetry::default()));
2531        let gpu_entry_edges: &'static HashSet<usize> = Box::leak(Box::new(HashSet::new()));
2532        let gpu_exit_edges: &'static HashSet<usize> = Box::leak(Box::new(HashSet::new()));
2533        let payload_edges: &'static HashSet<usize> = Box::leak(Box::new(HashSet::new()));
2534
2535        let mut io = NodeIo::new(
2536            vec![],
2537            vec![],
2538            queues,
2539            warnings,
2540            edges,
2541            vec![],
2542            gpu_entry_edges,
2543            gpu_exit_edges,
2544            payload_edges,
2545            0,
2546            0,
2547            "node".into(),
2548            telem,
2549            BackpressureStrategy::None,
2550            &[],
2551            None,
2552            None,
2553            None,
2554            daedalus_planner::ComputeAffinity::CpuOnly,
2555        );
2556        io.inputs = vec![("in".to_string(), payload)];
2557        io
2558    }
2559
2560    #[cfg(feature = "gpu")]
2561    #[test]
2562    fn get_any_reads_payload_any_type() {
2563        let payload = CorrelatedPayload {
2564            correlation_id: 1,
2565            inner: EdgePayload::Payload(daedalus_gpu::ErasedPayload::from_cpu::<DummyPayload>(DummyPayload { value: 42 })),
2566            enqueued_at: std::time::Instant::now(),
2567        };
2568        let io = make_io_with_payload(payload);
2569        let got = io.get_any::<DummyPayload>("in");
2570        assert_eq!(got, Some(DummyPayload { value: 42 }));
2571    }
2572
2573    #[cfg(feature = "gpu")]
2574    #[test]
2575    fn get_typed_mut_moves_payload_any_type() {
2576        let payload = CorrelatedPayload {
2577            correlation_id: 1,
2578            inner: EdgePayload::Payload(daedalus_gpu::ErasedPayload::from_cpu::<DummyPayload>(DummyPayload { value: 7 })),
2579            enqueued_at: std::time::Instant::now(),
2580        };
2581        let mut io = make_io_with_payload(payload);
2582        let got = io.get_typed_mut::<DummyPayload>("in");
2583        assert_eq!(got, Some(DummyPayload { value: 7 }));
2584        assert!(io.inputs.is_empty());
2585    }
2586
2587    #[test]
2588    fn get_typed_parses_enum_from_value_enum() {
2589        use daedalus_data::model::{EnumValue, Value};
2590        register_test_enum();
2591        let queues = Arc::new(vec![]);
2592        let edges = vec![];
2593        let warnings = Arc::new(std::sync::Mutex::new(HashSet::new()));
2594        let mut telem = ExecutionTelemetry::default();
2595
2596        #[cfg(feature = "gpu")]
2597        let gpu_entry_edges = HashSet::new();
2598        #[cfg(feature = "gpu")]
2599        let gpu_exit_edges = HashSet::new();
2600
2601        assert_eq!(
2602            NodeIo::enum_name_from_index::<TestEnum>(&Value::Int(2)),
2603            Some("gpu".to_string())
2604        );
2605
2606        let io = NodeIo::new(
2607            vec![],
2608            vec![],
2609            &queues,
2610            &warnings,
2611            &edges,
2612            vec![],
2613            #[cfg(feature = "gpu")]
2614            &gpu_entry_edges,
2615            #[cfg(feature = "gpu")]
2616            &gpu_exit_edges,
2617            #[cfg(feature = "gpu")]
2618            &payload_edges,
2619            0,
2620            0,
2621            "node".into(),
2622            &mut telem,
2623            BackpressureStrategy::None,
2624            &[(
2625                "mode".to_string(),
2626                Value::Enum(EnumValue {
2627                    name: "gpu".to_string(),
2628                    value: None,
2629                }),
2630            )],
2631            None,
2632            None,
2633            #[cfg(feature = "gpu")]
2634            None,
2635            #[cfg(feature = "gpu")]
2636            daedalus_planner::ComputeAffinity::CpuOnly,
2637        );
2638
2639        assert_eq!(io.get_typed::<TestEnum>("mode"), Some(TestEnum::Gpu));
2640    }
2641
2642    #[test]
2643    fn get_typed_parses_enum_from_value_string() {
2644        use daedalus_data::model::Value;
2645        register_test_enum();
2646        let queues = Arc::new(vec![]);
2647        let edges = vec![];
2648        let warnings = Arc::new(std::sync::Mutex::new(HashSet::new()));
2649        let mut telem = ExecutionTelemetry::default();
2650
2651        #[cfg(feature = "gpu")]
2652        let gpu_entry_edges = HashSet::new();
2653        #[cfg(feature = "gpu")]
2654        let gpu_exit_edges = HashSet::new();
2655        #[cfg(feature = "gpu")]
2656        let payload_edges = HashSet::new();
2657
2658        let io = NodeIo::new(
2659            vec![],
2660            vec![],
2661            &queues,
2662            &warnings,
2663            &edges,
2664            vec![],
2665            #[cfg(feature = "gpu")]
2666            &gpu_entry_edges,
2667            #[cfg(feature = "gpu")]
2668            &gpu_exit_edges,
2669            #[cfg(feature = "gpu")]
2670            &payload_edges,
2671            0,
2672            0,
2673            "node".into(),
2674            &mut telem,
2675            BackpressureStrategy::None,
2676            &[("mode".to_string(), Value::String("cpu".into()))],
2677            None,
2678            None,
2679            #[cfg(feature = "gpu")]
2680            None,
2681            #[cfg(feature = "gpu")]
2682            daedalus_planner::ComputeAffinity::CpuOnly,
2683        );
2684
2685        assert_eq!(io.get_typed::<TestEnum>("mode"), Some(TestEnum::Cpu));
2686    }
2687
2688    #[test]
2689    fn get_typed_parses_enum_from_value_int_index() {
2690        use daedalus_data::model::Value;
2691        register_test_enum();
2692        assert!(daedalus_data::typing::lookup_type::<TestEnum>().is_some());
2693        let queues = Arc::new(vec![]);
2694        let edges = vec![];
2695        let warnings = Arc::new(std::sync::Mutex::new(HashSet::new()));
2696        let mut telem = ExecutionTelemetry::default();
2697
2698        #[cfg(feature = "gpu")]
2699        let gpu_entry_edges = HashSet::new();
2700        #[cfg(feature = "gpu")]
2701        let gpu_exit_edges = HashSet::new();
2702        #[cfg(feature = "gpu")]
2703        let payload_edges = HashSet::new();
2704
2705        // ExecMode variants are registered in order [Auto, Cpu, Gpu]; index 2 => Gpu.
2706        let io = NodeIo::new(
2707            vec![],
2708            vec![],
2709            &queues,
2710            &warnings,
2711            &edges,
2712            vec![],
2713            #[cfg(feature = "gpu")]
2714            &gpu_entry_edges,
2715            #[cfg(feature = "gpu")]
2716            &gpu_exit_edges,
2717            #[cfg(feature = "gpu")]
2718            &payload_edges,
2719            0,
2720            0,
2721            "node".into(),
2722            &mut telem,
2723            BackpressureStrategy::None,
2724            &[("mode".to_string(), Value::Int(2))],
2725            None,
2726            None,
2727            #[cfg(feature = "gpu")]
2728            None,
2729            #[cfg(feature = "gpu")]
2730            daedalus_planner::ComputeAffinity::CpuOnly,
2731        );
2732
2733        assert_eq!(io.get_typed::<TestEnum>("mode"), Some(TestEnum::Gpu));
2734    }
2735}