1use std::any::{Any, TypeId};
2use std::collections::BTreeMap;
3use std::collections::HashMap;
4#[cfg(feature = "gpu")]
5use std::collections::HashSet;
6use std::collections::VecDeque;
7use std::sync::Arc;
8use std::time::Instant;
9use std::sync::{OnceLock, RwLock};
10
11use daedalus_data::model::{TypeExpr, Value};
12use daedalus_data::typing;
13#[cfg(feature = "gpu")]
14use image::{DynamicImage, GrayAlphaImage, GrayImage, RgbImage, RgbaImage};
15
16use crate::executor::queue::{apply_policy, apply_policy_owned, ApplyPolicyOwnedArgs};
17use crate::executor::{
18 CorrelatedPayload, EdgePayload, EdgeStorage, ExecutionTelemetry, next_correlation_id,
19};
20use crate::fanin::parse_indexed_port;
21#[allow(unused_imports)]
22use crate::plan::{BackpressureStrategy, EdgePolicyKind, RuntimeNode};
23use daedalus_core::sync::{SyncGroup, SyncPolicy};
24use daedalus_planner::NodeRef;
25
26type EdgeInfo = (NodeRef, String, NodeRef, String, EdgePolicyKind);
27
28#[derive(Clone)]
29struct DrainedInput {
30 port: String,
31 edge_idx: usize,
32 payload: CorrelatedPayload,
33}
34
35pub type ConstCoercer = Box<
36 dyn Fn(&daedalus_data::model::Value) -> Option<Box<dyn Any + Send + Sync>>
37 + Send
38 + Sync
39 + 'static,
40>;
41
42pub type ConstCoercerMap = Arc<RwLock<HashMap<&'static str, ConstCoercer>>>;
43
44static GLOBAL_CONST_COERCERS: OnceLock<ConstCoercerMap> = OnceLock::new();
45type OutputMover = Box<
46 dyn Fn(Box<dyn Any + Send + Sync>) -> EdgePayload + Send + Sync + 'static
47>;
48pub type OutputMoverMap = Arc<RwLock<HashMap<TypeId, OutputMover>>>;
49static OUTPUT_MOVERS: OnceLock<OutputMoverMap> = OnceLock::new();
50
51fn output_movers() -> &'static OutputMoverMap {
52 OUTPUT_MOVERS.get_or_init(|| Arc::new(RwLock::new(HashMap::new())))
53}
54
55pub fn new_output_mover_map() -> OutputMoverMap {
63 Arc::new(RwLock::new(HashMap::new()))
64}
65
66pub fn register_output_mover_in<T, F>(map: &OutputMoverMap, mover: F)
70where
71 T: Any + Send + Sync + 'static,
72 F: Fn(T) -> EdgePayload + Send + Sync + 'static,
73{
74 let mut guard = map.write().expect("OUTPUT_MOVERS lock poisoned");
75 guard.insert(
76 TypeId::of::<T>(),
77 Box::new(move |any| {
78 let boxed = any.downcast::<T>().expect("output mover type mismatch");
79 mover(*boxed)
80 }),
81 );
82}
83
84pub fn register_output_mover<T, F>(mover: F)
86where
87 T: Any + Send + Sync + 'static,
88 F: Fn(T) -> EdgePayload + Send + Sync + 'static,
89{
90 register_output_mover_in(output_movers(), mover);
91}
92
93fn try_move_output<T>(movers: Option<&OutputMoverMap>, value: T) -> Result<EdgePayload, T>
94where
95 T: Any + Send + Sync + 'static,
96{
97 let map = match movers {
98 Some(map) => map,
99 None => return Err(value),
100 };
101 let guard = match map.read() {
102 Ok(guard) => guard,
103 Err(_) => return Err(value),
104 };
105 let mover = match guard.get(&TypeId::of::<T>()) {
106 Some(mover) => mover,
107 None => return Err(value),
108 };
109 let boxed: Box<dyn Any + Send + Sync> = Box::new(value);
110 Ok(mover(boxed))
111}
112
113#[cfg(feature = "gpu")]
114fn promote_payload_for_host(payload: EdgePayload) -> EdgePayload {
115 use daedalus_gpu::{ErasedPayload, Payload};
116
117 match payload {
118 EdgePayload::Any(a) => {
119 if let Some(ep) = a.downcast_ref::<ErasedPayload>() {
120 return EdgePayload::Payload(ep.clone());
121 }
122 if let Some(p) = a.downcast_ref::<Payload<DynamicImage>>() {
123 return match p.clone() {
124 Payload::Cpu(img) => EdgePayload::Payload(ErasedPayload::from_cpu::<DynamicImage>(img)),
125 Payload::Gpu(g) => EdgePayload::Payload(ErasedPayload::from_gpu::<DynamicImage>(g)),
126 };
127 }
128 if let Some(p) = a.downcast_ref::<Payload<GrayImage>>() {
129 return match p.clone() {
130 Payload::Cpu(img) => EdgePayload::Payload(ErasedPayload::from_cpu::<GrayImage>(img)),
131 Payload::Gpu(g) => EdgePayload::Payload(ErasedPayload::from_gpu::<GrayImage>(g)),
132 };
133 }
134 if let Some(img) = a.downcast_ref::<DynamicImage>() {
135 return EdgePayload::Payload(ErasedPayload::from_cpu::<DynamicImage>(img.clone()));
136 }
137 if let Some(img) = a.downcast_ref::<Arc<DynamicImage>>() {
138 return EdgePayload::Payload(ErasedPayload::from_cpu::<DynamicImage>((**img).clone()));
139 }
140 if let Some(img) = a.downcast_ref::<GrayImage>() {
141 return EdgePayload::Payload(ErasedPayload::from_cpu::<GrayImage>(img.clone()));
142 }
143 if let Some(img) = a.downcast_ref::<Arc<GrayImage>>() {
144 return EdgePayload::Payload(ErasedPayload::from_cpu::<GrayImage>((**img).clone()));
145 }
146 if let Some(img) = a.downcast_ref::<RgbImage>() {
147 return EdgePayload::Payload(ErasedPayload::from_cpu::<RgbImage>(img.clone()));
148 }
149 if let Some(img) = a.downcast_ref::<Arc<RgbImage>>() {
150 return EdgePayload::Payload(ErasedPayload::from_cpu::<RgbImage>((**img).clone()));
151 }
152 if let Some(img) = a.downcast_ref::<RgbaImage>() {
153 return EdgePayload::Payload(ErasedPayload::from_cpu::<RgbaImage>(img.clone()));
154 }
155 if let Some(img) = a.downcast_ref::<Arc<RgbaImage>>() {
156 return EdgePayload::Payload(ErasedPayload::from_cpu::<RgbaImage>((**img).clone()));
157 }
158 if let Some(img) = a.downcast_ref::<GrayAlphaImage>() {
159 let dyn_img = DynamicImage::ImageLumaA8(img.clone());
160 return EdgePayload::Payload(ErasedPayload::from_cpu::<DynamicImage>(dyn_img));
161 }
162 if let Some(img) = a.downcast_ref::<Arc<GrayAlphaImage>>() {
163 let dyn_img = DynamicImage::ImageLumaA8((**img).clone());
164 return EdgePayload::Payload(ErasedPayload::from_cpu::<DynamicImage>(dyn_img));
165 }
166 EdgePayload::Any(a)
167 }
168 other => other,
169 }
170}
171
172pub fn new_const_coercer_map() -> ConstCoercerMap {
180 Arc::new(RwLock::new(HashMap::new()))
181}
182
183pub fn register_const_coercer<T, F>(coercer: F)
202where
203 T: Any + Send + Sync + 'static,
204 F: Fn(&daedalus_data::model::Value) -> Option<T> + Send + Sync + 'static,
205{
206 let key = std::any::type_name::<T>();
207 let map = GLOBAL_CONST_COERCERS.get_or_init(new_const_coercer_map);
208 let mut guard = map.write().expect("GLOBAL_CONST_COERCERS lock poisoned");
209 guard.insert(
210 key,
211 Box::new(move |v| coercer(v).map(|t| Box::new(t) as Box<dyn Any + Send + Sync>)),
212 );
213}
214
215pub struct NodeIo<'a> {
226 inputs: Vec<(String, CorrelatedPayload)>,
227 borrowed_cache: std::cell::UnsafeCell<Vec<Box<dyn Any + Send + Sync>>>,
228 sync_groups: Vec<SyncGroup>,
229 port_overrides: HashMap<String, (Option<BackpressureStrategy>, Option<usize>)>,
230 current_corr_id: u64,
231 outgoing: Vec<usize>,
232 has_incoming_edges: bool,
233 queues: &'a Arc<Vec<EdgeStorage>>,
234 telemetry: &'a mut ExecutionTelemetry,
235 edges: &'a [EdgeInfo],
236 #[allow(dead_code)]
237 seg_idx: usize,
238 node_idx: usize,
239 node_id: String,
240 warnings_seen: &'a std::sync::Arc<std::sync::Mutex<std::collections::HashSet<String>>>,
241 backpressure: BackpressureStrategy,
242 #[cfg(feature = "gpu")]
243 gpu: Option<daedalus_gpu::GpuContextHandle>,
244 #[cfg(feature = "gpu")]
245 target_compute: daedalus_planner::ComputeAffinity,
246 #[cfg(feature = "gpu")]
247 payload_edges: &'a HashSet<usize>,
248 const_coercers: Option<ConstCoercerMap>,
249 output_movers: Option<OutputMoverMap>,
250}
251
252impl<'a> NodeIo<'a> {
253 #[cfg(feature = "gpu")]
254 fn dynamic_image_to_t<T: Any + Clone>(img: DynamicImage) -> Option<T> {
255 let want = TypeId::of::<T>();
256 if want == TypeId::of::<DynamicImage>() {
257 let any_ref: &dyn Any = &img;
258 return any_ref.downcast_ref::<T>().cloned();
259 }
260 if want == TypeId::of::<GrayImage>() {
261 let gray = img.to_luma8();
262 let any_ref: &dyn Any = &gray;
263 return any_ref.downcast_ref::<T>().cloned();
264 }
265 if want == TypeId::of::<GrayAlphaImage>() {
266 let gray = img.to_luma_alpha8();
267 let any_ref: &dyn Any = &gray;
268 return any_ref.downcast_ref::<T>().cloned();
269 }
270 if want == TypeId::of::<RgbImage>() {
271 let rgb = img.to_rgb8();
272 let any_ref: &dyn Any = &rgb;
273 return any_ref.downcast_ref::<T>().cloned();
274 }
275 if want == TypeId::of::<RgbaImage>() {
276 let rgba = img.to_rgba8();
277 let any_ref: &dyn Any = &rgba;
278 return any_ref.downcast_ref::<T>().cloned();
279 }
280 None
281 }
282
283 #[allow(clippy::too_many_arguments)]
284 pub fn new(
286 incoming_edges: Vec<usize>,
287 outgoing_edges: Vec<usize>,
288 queues: &'a Arc<Vec<EdgeStorage>>,
289 warnings_seen: &'a Arc<std::sync::Mutex<std::collections::HashSet<String>>>,
290 edges: &'a [EdgeInfo],
291 mut sync_groups: Vec<SyncGroup>,
292 #[cfg(feature = "gpu")] gpu_entry_edges: &'a HashSet<usize>,
293 #[cfg(feature = "gpu")] gpu_exit_edges: &'a HashSet<usize>,
294 #[cfg(feature = "gpu")] payload_edges: &'a HashSet<usize>,
295 seg_idx: usize,
296 node_idx: usize,
297 node_id: String,
298 telemetry: &'a mut ExecutionTelemetry,
299 backpressure: BackpressureStrategy,
300 const_inputs: &[(String, daedalus_data::model::Value)],
301 const_coercers: Option<ConstCoercerMap>,
302 output_movers: Option<OutputMoverMap>,
303 #[cfg(feature = "gpu")] gpu: Option<daedalus_gpu::GpuContextHandle>,
304 #[cfg(feature = "gpu")] target_compute: daedalus_planner::ComputeAffinity,
305 ) -> Self {
306 let is_host_bridge =
307 node_id.ends_with("io.host_bridge") || node_id.ends_with("io.host_output");
308 let has_incoming_edges = !incoming_edges.is_empty();
309 if sync_groups.is_empty() && !is_host_bridge {
310 let mut ports: Vec<String> = incoming_edges
314 .iter()
315 .filter_map(|edge_idx| edges.get(*edge_idx).map(|(_, _, _, to_port, _)| to_port))
316 .cloned()
317 .collect();
318 ports.sort();
319 ports.dedup();
320 if ports.len() > 1 {
321 sync_groups.push(SyncGroup {
322 name: "__implicit_all_ready".into(),
323 policy: SyncPolicy::AllReady,
324 backpressure: None,
325 capacity: None,
326 ports,
327 });
328 }
329 }
330
331 let mut drained: Vec<DrainedInput> = Vec::new();
332 for edge_idx in &incoming_edges {
333 if let Some(storage) = queues.get(*edge_idx) {
334 match storage {
335 EdgeStorage::Locked(q_arc) => {
336 if let Ok(mut q) = q_arc.lock() {
337 while let Some(payload) = q.pop_front() {
338 #[allow(unused_mut)]
339 let mut payload = payload;
340 let now = Instant::now();
341 telemetry
342 .record_edge_wait(*edge_idx, now.saturating_duration_since(payload.enqueued_at));
343 let payload_bytes = if cfg!(feature = "metrics")
344 && telemetry.metrics_level.is_detailed()
345 {
346 crate::executor::payload_size_bytes(&payload.inner)
347 } else {
348 None
349 };
350 telemetry.record_node_payload_in(node_idx, payload_bytes);
351 let port = edges
352 .get(*edge_idx)
353 .map(|(_, _, _, to_port, _)| to_port.clone())
354 .unwrap_or_default();
355 #[cfg(feature = "gpu")]
356 {
357 payload = Self::convert_incoming(
358 payload,
359 *edge_idx,
360 gpu_entry_edges,
361 gpu_exit_edges,
362 gpu.as_ref(),
363 telemetry,
364 );
365 }
366 drained.push(DrainedInput {
367 port,
368 edge_idx: *edge_idx,
369 payload,
370 });
371 }
372 }
373 }
374 #[cfg(feature = "lockfree-queues")]
375 EdgeStorage::BoundedLf(q) => {
376 while let Some(payload) = q.pop() {
377 #[allow(unused_mut)]
378 let mut payload = payload;
379 let now = Instant::now();
380 telemetry
381 .record_edge_wait(*edge_idx, now.saturating_duration_since(payload.enqueued_at));
382 let payload_bytes = if cfg!(feature = "metrics")
383 && telemetry.metrics_level.is_detailed()
384 {
385 crate::executor::payload_size_bytes(&payload.inner)
386 } else {
387 None
388 };
389 telemetry.record_node_payload_in(node_idx, payload_bytes);
390 let port = edges
391 .get(*edge_idx)
392 .map(|(_, _, _, to_port, _)| to_port.clone())
393 .unwrap_or_default();
394 #[cfg(feature = "gpu")]
395 {
396 payload = Self::convert_incoming(
397 payload,
398 *edge_idx,
399 gpu_entry_edges,
400 gpu_exit_edges,
401 gpu.as_ref(),
402 telemetry,
403 );
404 }
405 drained.push(DrainedInput {
406 port,
407 edge_idx: *edge_idx,
408 payload,
409 });
410 }
411 }
412 }
413 }
414 }
415 if std::env::var_os("DAEDALUS_TRACE_EDGE_IO").is_some() {
416 for item in &drained {
417 log::debug!(
418 "node input drained node={} port={} edge_idx={} payload={}",
419 node_id,
420 item.port,
421 item.edge_idx,
422 edge_payload_desc(&item.payload.inner)
423 );
424 }
425 }
426 if log::log_enabled!(log::Level::Debug) && drained.is_empty() {
427 let ports: Vec<String> = incoming_edges
428 .iter()
429 .filter_map(|edge_idx| edges.get(*edge_idx).map(|(_, _, _, to_port, _)| to_port.clone()))
430 .collect();
431 if !ports.is_empty() {
432 log::debug!("node inputs empty node={} ports={:?}", node_id, ports);
433 }
434 }
435
436 let has_drained = !drained.is_empty();
437 let mut const_payloads: Vec<(String, CorrelatedPayload)> = Vec::new();
438 for (port, value) in const_inputs {
440 if drained.iter().any(|p| p.port == *port) {
441 continue;
442 }
443 let payload = match value {
444 daedalus_data::model::Value::Int(v) => EdgePayload::Any(Arc::new(*v)),
447 daedalus_data::model::Value::Float(f) => EdgePayload::Any(Arc::new(*f)),
448 daedalus_data::model::Value::Bool(b) => EdgePayload::Any(Arc::new(*b)),
449 daedalus_data::model::Value::String(s) => {
451 EdgePayload::Any(Arc::new(s.to_string()))
452 }
453 other => EdgePayload::Value(other.clone()),
454 };
455 const_payloads.push((port.clone(), CorrelatedPayload::from_edge(payload)));
456 }
457
458 let (mut aligned_inputs, leftovers, ready) = align_drained_inputs(drained, &sync_groups);
459 if log::log_enabled!(log::Level::Debug) && !sync_groups.is_empty() && !ready {
460 log::debug!(
461 "node sync groups not ready node={} groups={:?}",
462 node_id,
463 sync_groups
464 );
465 }
466 if !sync_groups.is_empty() && !ready {
467 aligned_inputs.clear();
468 } else if has_drained || !has_incoming_edges {
469 aligned_inputs.extend(const_payloads);
470 }
471 requeue_drained(leftovers, queues, edges);
472
473 let current_corr_id = aligned_inputs
474 .first()
475 .map(|(_, cp)| cp.correlation_id)
476 .unwrap_or_else(next_correlation_id);
477
478 let mut port_overrides = HashMap::new();
479 for group in &sync_groups {
480 for port in &group.ports {
481 port_overrides.insert(port.clone(), (group.backpressure.clone(), group.capacity));
482 }
483 }
484
485 Self {
486 inputs: aligned_inputs,
487 borrowed_cache: std::cell::UnsafeCell::new(Vec::new()),
488 sync_groups,
489 port_overrides,
490 current_corr_id,
491 outgoing: outgoing_edges,
492 has_incoming_edges,
493 queues,
494 telemetry,
495 edges,
496 seg_idx,
497 node_idx,
498 node_id,
499 warnings_seen,
500 backpressure,
501 #[cfg(feature = "gpu")]
502 gpu,
503 #[cfg(feature = "gpu")]
504 target_compute,
505 #[cfg(feature = "gpu")]
506 payload_edges,
507 const_coercers,
508 output_movers,
509 }
510 }
511
512 pub fn inputs(&self) -> &[(String, CorrelatedPayload)] {
522 &self.inputs
523 }
524
525 pub fn has_incoming_edges(&self) -> bool {
527 self.has_incoming_edges
528 }
529
530 fn take_input(&mut self, port: &str) -> Option<(usize, CorrelatedPayload)> {
531 let idx = match self.inputs.iter().position(|(p, _)| p == port) {
532 Some(idx) => idx,
533 None => {
534 if std::env::var_os("DAEDALUS_TRACE_MISSING_INPUTS").is_some() {
535 let ports: Vec<&str> = self.inputs.iter().map(|(p, _)| p.as_str()).collect();
536 eprintln!(
537 "daedalus-runtime: missing input node={} port={} available_ports={:?}",
538 self.node_id,
539 port,
540 ports
541 );
542 }
543 return None;
544 }
545 };
546 let payload = self.inputs.remove(idx).1;
547 Some((idx, payload))
548 }
549
550 fn restore_input(&mut self, idx: usize, port: &str, payload: CorrelatedPayload) {
551 if idx <= self.inputs.len() {
552 self.inputs.insert(idx, (port.to_string(), payload));
553 } else {
554 self.inputs.push((port.to_string(), payload));
555 }
556 }
557
558 fn cache_borrowed<T: Any + Send + Sync>(&self, value: T) -> &T {
559 let cache = unsafe { &mut *self.borrowed_cache.get() };
560 cache.push(Box::new(value));
561 cache
562 .last()
563 .and_then(|boxed| boxed.downcast_ref::<T>())
564 .expect("borrowed cache type mismatch")
565 }
566
567 pub fn sync_groups(&self) -> &[SyncGroup] {
577 &self.sync_groups
578 }
579
580 pub fn push_output(&mut self, port: Option<&str>, payload: EdgePayload) {
591 let correlated = CorrelatedPayload {
592 correlation_id: self.current_corr_id,
593 inner: payload,
594 enqueued_at: Instant::now(),
595 };
596 self.push_correlated(port, correlated);
597 }
598
599 pub fn push_correlated_payload(&mut self, port: Option<&str>, correlated: CorrelatedPayload) {
601 self.push_correlated(port, correlated);
602 }
603
604 fn push_correlated(&mut self, port: Option<&str>, correlated: CorrelatedPayload) {
605 #[cfg(feature = "gpu")]
606 let mut matches: Vec<(usize, String, EdgePolicyKind, BackpressureStrategy, Option<usize>, bool)> = Vec::new();
607 #[cfg(not(feature = "gpu"))]
608 let mut matches: Vec<(usize, String, EdgePolicyKind, BackpressureStrategy, Option<usize>)> = Vec::new();
609 for edge_idx in &self.outgoing {
610 if let Some((_, from_port, _, _, policy)) = self.edges.get(*edge_idx) {
611 if let Some(p) = port
612 && !p.eq_ignore_ascii_case(from_port)
613 {
614 continue;
615 }
616 let (bp_override, cap_override) = self
617 .port_overrides
618 .get(from_port)
619 .cloned()
620 .unwrap_or((None, None));
621 let bp = bp_override.unwrap_or(self.backpressure.clone());
622 #[cfg(feature = "gpu")]
623 {
624 let needs_payload = self.payload_edges.contains(edge_idx);
625 matches.push((*edge_idx, from_port.clone(), policy.clone(), bp, cap_override, needs_payload));
626 }
627 #[cfg(not(feature = "gpu"))]
628 {
629 matches.push((*edge_idx, from_port.clone(), policy.clone(), bp, cap_override));
630 }
631 }
632 }
633
634 if matches.len() == 1 {
635 #[cfg(feature = "gpu")]
636 let (edge_idx, from_port, policy, bp, cap_override, needs_payload) = matches.remove(0);
637 #[cfg(not(feature = "gpu"))]
638 let (edge_idx, from_port, policy, bp, cap_override) = matches.remove(0);
639 if std::env::var_os("DAEDALUS_TRACE_EDGE_IO").is_some() {
640 log::warn!(
641 "node output enqueue node={} port={} edge_idx={} payload={}",
642 self.node_id,
643 from_port,
644 edge_idx,
645 edge_payload_desc(&correlated.inner)
646 );
647 }
648 if std::env::var_os("DAEDALUS_TRACE_EDGE_IO_STDERR").is_some() {
649 eprintln!(
650 "node output enqueue node={} port={} edge_idx={} payload={}",
651 self.node_id,
652 from_port,
653 edge_idx,
654 edge_payload_desc(&correlated.inner)
655 );
656 }
657 let mut effective_policy = policy;
658 if let Some(cap) = cap_override {
659 effective_policy = EdgePolicyKind::Bounded { cap };
660 }
661 #[cfg(feature = "gpu")]
662 let correlated = if needs_payload {
663 let mut updated = correlated;
664 updated.inner = promote_payload_for_host(updated.inner);
665 updated
666 } else {
667 correlated
668 };
669 let payload_bytes = if cfg!(feature = "metrics")
670 && self.telemetry.metrics_level.is_detailed()
671 {
672 crate::executor::payload_size_bytes(&correlated.inner)
673 } else {
674 None
675 };
676 self.telemetry
677 .record_node_payload_out(self.node_idx, payload_bytes);
678 apply_policy_owned(ApplyPolicyOwnedArgs {
679 edge_idx,
680 policy: &effective_policy,
681 payload: correlated,
682 queues: self.queues,
683 warnings_seen: self.warnings_seen,
684 telem: self.telemetry,
685 warning_label: Some(format!("edge_{}_{}", self.node_id, from_port)),
686 backpressure: bp,
687 });
688 return;
689 }
690
691 #[cfg(feature = "gpu")]
692 for (edge_idx, from_port, mut policy, bp, cap_override, needs_payload) in matches {
693 if std::env::var_os("DAEDALUS_TRACE_EDGE_IO").is_some() {
694 log::warn!(
695 "node output enqueue node={} port={} edge_idx={} payload={}",
696 self.node_id,
697 from_port,
698 edge_idx,
699 edge_payload_desc(&correlated.inner)
700 );
701 }
702 if std::env::var_os("DAEDALUS_TRACE_EDGE_IO_STDERR").is_some() {
703 eprintln!(
704 "node output enqueue node={} port={} edge_idx={} payload={}",
705 self.node_id,
706 from_port,
707 edge_idx,
708 edge_payload_desc(&correlated.inner)
709 );
710 }
711 if let Some(cap) = cap_override {
712 policy = EdgePolicyKind::Bounded { cap };
713 }
714 let mut payload = correlated.clone();
715 if needs_payload {
716 payload.inner = promote_payload_for_host(payload.inner);
717 }
718 let payload_bytes = if cfg!(feature = "metrics")
719 && self.telemetry.metrics_level.is_detailed()
720 {
721 crate::executor::payload_size_bytes(&payload.inner)
722 } else {
723 None
724 };
725 self.telemetry
726 .record_node_payload_out(self.node_idx, payload_bytes);
727 apply_policy(
728 edge_idx,
729 &policy,
730 &payload,
731 self.queues,
732 self.warnings_seen,
733 self.telemetry,
734 Some(format!("edge_{}_{}", self.node_id, from_port)),
735 bp,
736 );
737 }
738 #[cfg(not(feature = "gpu"))]
739 for (edge_idx, from_port, mut policy, bp, cap_override) in matches {
740 if std::env::var_os("DAEDALUS_TRACE_EDGE_IO").is_some() {
741 log::warn!(
742 "node output enqueue node={} port={} edge_idx={} payload={}",
743 self.node_id,
744 from_port,
745 edge_idx,
746 edge_payload_desc(&correlated.inner)
747 );
748 }
749 if std::env::var_os("DAEDALUS_TRACE_EDGE_IO_STDERR").is_some() {
750 eprintln!(
751 "node output enqueue node={} port={} edge_idx={} payload={}",
752 self.node_id,
753 from_port,
754 edge_idx,
755 edge_payload_desc(&correlated.inner)
756 );
757 }
758 if let Some(cap) = cap_override {
759 policy = EdgePolicyKind::Bounded { cap };
760 }
761 let payload_bytes = if cfg!(feature = "metrics")
762 && self.telemetry.metrics_level.is_detailed()
763 {
764 crate::executor::payload_size_bytes(&correlated.inner)
765 } else {
766 None
767 };
768 self.telemetry
769 .record_node_payload_out(self.node_idx, payload_bytes);
770 apply_policy(
771 edge_idx,
772 &policy,
773 &correlated,
774 self.queues,
775 self.warnings_seen,
776 self.telemetry,
777 Some(format!("edge_{}_{}", self.node_id, from_port)),
778 bp,
779 );
780 }
781 }
782
783 #[cfg(feature = "gpu")]
784 pub fn push_payload<T>(&mut self, port: Option<&str>, value: daedalus_gpu::Payload<T>)
785 where
786 T: daedalus_gpu::GpuSendable + Clone + Send + Sync + 'static,
787 T::GpuRepr: Clone + Send + Sync + 'static,
788 {
789 match value {
790 daedalus_gpu::Payload::Cpu(v) => {
791 let payload = EdgePayload::Payload(daedalus_gpu::ErasedPayload::from_cpu::<T>(v));
792 self.push_output(port, payload);
793 }
794 daedalus_gpu::Payload::Gpu(g) => {
795 let payload = EdgePayload::Payload(daedalus_gpu::ErasedPayload::from_gpu::<T>(g));
796 self.push_output(port, payload);
797 }
798 }
799 }
800
801 pub fn push_any<T: Any + Send + Sync + 'static>(&mut self, port: Option<&str>, value: T) {
802 self.push_output(port, EdgePayload::Any(Arc::new(value)));
803 }
804
805 pub fn push_typed<T>(&mut self, port: Option<&str>, value: T)
806 where
807 T: Any + Clone + Send + Sync + 'static,
808 {
809 if std::env::var_os("DAEDALUS_TRACE_EDGE_IO").is_some() {
810 let port_name = port.unwrap_or("<all>");
811 log::warn!(
812 "node output prepare node={} port={} type={}",
813 self.node_id,
814 port_name,
815 std::any::type_name::<T>()
816 );
817 }
818 if std::env::var_os("DAEDALUS_TRACE_EDGE_IO_STDERR").is_some() {
819 let port_name = port.unwrap_or("<all>");
820 eprintln!(
821 "node output prepare node={} port={} type={}",
822 self.node_id,
823 port_name,
824 std::any::type_name::<T>()
825 );
826 }
827 match try_move_output(self.output_movers.as_ref(), value) {
828 Ok(payload) => self.push_output(port, payload),
829 Err(value) => {
830 self.push_any(port, value);
831 }
832 }
833 }
834
835 pub fn push_value(&mut self, port: Option<&str>, value: daedalus_data::model::Value) {
845 self.push_output(port, EdgePayload::Value(value));
846 }
847
848 pub fn inputs_for<'b>(&'b self, port: &str) -> impl Iterator<Item = &'b CorrelatedPayload> {
858 self.inputs
859 .iter()
860 .filter(move |(p, _)| p == port)
861 .map(|(_, payload)| payload)
862 }
863
864 pub fn get_any<T: Any + Clone + Send + Sync>(&self, port: &str) -> Option<T> {
866 #[cfg(feature = "gpu")]
867 let want = TypeId::of::<T>();
868 self.inputs_for(port).find_map(|p| match &p.inner {
869 EdgePayload::Any(a) => {
870 a.downcast_ref::<T>()
871 .cloned()
872 .or_else(|| self.coerce_const_any::<T>(a.as_ref()))
873 }
874 #[cfg(feature = "gpu")]
875 EdgePayload::Payload(ep) => {
876 if let Some(v) = ep.try_downcast_cpu_any::<T>() {
877 return Some(v);
878 }
879 if want == TypeId::of::<DynamicImage>() {
880 return ep.clone_cpu::<DynamicImage>().and_then(Self::dynamic_image_to_t::<T>);
881 }
882 if want == TypeId::of::<GrayImage>()
883 && let Some(gray) = ep.clone_cpu::<GrayImage>()
884 {
885 let any_ref: &dyn Any = &gray;
886 return any_ref.downcast_ref::<T>().cloned();
887 }
888 if want == TypeId::of::<RgbImage>()
889 && let Some(rgb) = ep.clone_cpu::<RgbImage>()
890 {
891 let any_ref: &dyn Any = &rgb;
892 return any_ref.downcast_ref::<T>().cloned();
893 }
894 if want == TypeId::of::<RgbaImage>()
895 && let Some(rgba) = ep.clone_cpu::<RgbaImage>()
896 {
897 let any_ref: &dyn Any = &rgba;
898 return any_ref.downcast_ref::<T>().cloned();
899 }
900 ep.clone_cpu::<DynamicImage>().and_then(|img| {
901 if want == TypeId::of::<DynamicImage>()
902 || want == TypeId::of::<GrayImage>()
903 || want == TypeId::of::<GrayAlphaImage>()
904 || want == TypeId::of::<RgbImage>()
905 || want == TypeId::of::<RgbaImage>()
906 {
907 return Self::dynamic_image_to_t::<T>(img);
908 }
909 None
910 }).or_else(|| {
911 if log::log_enabled!(log::Level::Debug)
912 && (want == TypeId::of::<DynamicImage>()
913 || want == TypeId::of::<GrayImage>()
914 || want == TypeId::of::<GrayAlphaImage>()
915 || want == TypeId::of::<RgbImage>()
916 || want == TypeId::of::<RgbaImage>())
917 {
918 log::debug!("payload type mismatch port={} payload={:?}", port, ep);
919 }
920 None
921 })
922 },
923 #[cfg(feature = "gpu")]
924 EdgePayload::GpuImage(h)
925 if TypeId::of::<T>() == TypeId::of::<daedalus_gpu::GpuImageHandle>() =>
926 {
927 let any_ref: &dyn Any = h;
928 any_ref.downcast_ref::<T>().cloned()
929 }
930 _ => None,
931 })
932 }
933
934 pub fn get_any_ref<T: Any + Send + Sync>(&self, port: &str) -> Option<&T> {
936 self.inputs_for(port).find_map(|p| match &p.inner {
937 EdgePayload::Any(a) => a.downcast_ref::<T>(),
938 #[cfg(feature = "gpu")]
939 EdgePayload::GpuImage(h)
940 if TypeId::of::<T>() == TypeId::of::<daedalus_gpu::GpuImageHandle>() =>
941 {
942 let any_ref: &dyn Any = h;
943 any_ref.downcast_ref::<T>()
944 }
945 _ => None,
946 })
947 }
948
949 pub fn get_typed_ref<T>(&self, port: &str) -> Option<&T>
951 where
952 T: Any + Clone + Send + Sync,
953 {
954 if let Some(v) = self.get_any_ref::<T>(port) {
955 return Some(v);
956 }
957 if let Some(v) = self.get_any::<T>(port) {
958 return Some(self.cache_borrowed(v));
959 }
960
961 if let Some(v) = self.get_any_ref::<i64>(port) {
962 if let Some(t) = Self::coerce_from_i64::<T>(*v) {
963 return Some(self.cache_borrowed(t));
964 }
965 if let Some(t) = self.coerce_via_registry::<T>(&Value::Int(*v)) {
966 return Some(self.cache_borrowed(t));
967 }
968 }
969 if let Some(v) = self.get_any_ref::<f64>(port) {
970 if let Some(t) = Self::coerce_from_f64::<T>(*v) {
971 return Some(self.cache_borrowed(t));
972 }
973 if let Some(t) = self.coerce_via_registry::<T>(&Value::Float(*v)) {
974 return Some(self.cache_borrowed(t));
975 }
976 }
977 if let Some(v) = self.get_any_ref::<bool>(port) {
978 let any_ref: &dyn Any = v;
979 if let Some(t) = any_ref.downcast_ref::<T>().cloned() {
980 return Some(self.cache_borrowed(t));
981 }
982 if let Some(t) = self.coerce_via_registry::<T>(&Value::Bool(*v)) {
983 return Some(self.cache_borrowed(t));
984 }
985 }
986 if let Some(v) = self.get_any_ref::<String>(port) {
987 let any_ref: &dyn Any = v;
988 if let Some(t) = any_ref.downcast_ref::<T>().cloned() {
989 return Some(self.cache_borrowed(t));
990 }
991 if let Some(t) = self.coerce_via_registry::<T>(&Value::String(v.clone().into())) {
992 return Some(self.cache_borrowed(t));
993 }
994 }
995 if let Some(v) = self.get_value(port)
996 && let Some(t) = self.coerce_from_value::<T>(v)
997 {
998 return Some(self.cache_borrowed(t));
999 }
1000
1001 None
1002 }
1003
1004 pub fn get_any_mut<T>(&mut self, port: &str) -> Option<T>
1006 where
1007 T: Any + Clone + Send + Sync,
1008 {
1009 let (idx, payload) = self.take_input(port)?;
1010 let mut handled = None;
1011 let mut payload = payload;
1012 match std::mem::replace(&mut payload.inner, EdgePayload::Unit) {
1013 EdgePayload::Any(a) => match Arc::downcast::<T>(a) {
1014 Ok(arc) => {
1015 handled = Some(match Arc::try_unwrap(arc) {
1016 Ok(v) => v,
1017 Err(arc) => (*arc).clone(),
1018 });
1019 }
1020 Err(a) => {
1021 payload.inner = EdgePayload::Any(a);
1022 }
1023 },
1024 other => {
1025 payload.inner = other;
1026 }
1027 }
1028 if handled.is_none() {
1029 self.restore_input(idx, port, payload);
1030 }
1031 handled
1032 }
1033
1034 pub fn get_typed_mut<T>(&mut self, port: &str) -> Option<T>
1036 where
1037 T: Any + Clone + Send + Sync,
1038 {
1039 #[cfg(feature = "gpu")]
1040 let want = TypeId::of::<T>();
1041 let (idx, mut payload) = self.take_input(port)?;
1042 let mut out: Option<T> = None;
1043 match std::mem::replace(&mut payload.inner, EdgePayload::Unit) {
1044 #[cfg(feature = "gpu")]
1045 EdgePayload::Payload(ep) => {
1046 let mut ep_opt = Some(ep);
1047 let downcast_owned = |value: Box<dyn Any + Send + Sync>| {
1048 value.downcast::<T>().ok().map(|boxed| *boxed)
1049 };
1050 if std::env::var_os("DAEDALUS_TRACE_PAYLOAD_TAKE").is_some()
1051 && let Some(ep) = ep_opt.as_ref()
1052 {
1053 let clone_ok = if want == TypeId::of::<DynamicImage>() {
1054 ep.clone_cpu::<DynamicImage>().is_some()
1055 } else if want == TypeId::of::<GrayImage>() {
1056 ep.clone_cpu::<GrayImage>().is_some()
1057 } else if want == TypeId::of::<RgbImage>() {
1058 ep.clone_cpu::<RgbImage>().is_some()
1059 } else if want == TypeId::of::<RgbaImage>() {
1060 ep.clone_cpu::<RgbaImage>().is_some()
1061 } else {
1062 false
1063 };
1064 eprintln!(
1065 "daedalus-runtime: payload probe node={} port={} type={} clone_cpu={}",
1066 self.node_id,
1067 port,
1068 std::any::type_name::<T>(),
1069 clone_ok
1070 );
1071 }
1072 if want == TypeId::of::<DynamicImage>() {
1073 if let Some(ep) = ep_opt.take() {
1074 match ep.take_cpu::<DynamicImage>() {
1075 Ok(cpu) => out = downcast_owned(Box::new(cpu)),
1076 Err(rest) => {
1077 if let Some(cpu) = rest.clone_cpu::<DynamicImage>() {
1078 out = downcast_owned(Box::new(cpu));
1079 } else {
1080 ep_opt = Some(rest);
1081 }
1082 }
1083 }
1084 }
1085 } else if want == TypeId::of::<GrayImage>() {
1086 if let Some(ep) = ep_opt.take() {
1087 match ep.take_cpu::<GrayImage>() {
1088 Ok(cpu) => out = downcast_owned(Box::new(cpu)),
1089 Err(rest) => {
1090 if let Some(cpu) = rest.clone_cpu::<GrayImage>() {
1091 out = downcast_owned(Box::new(cpu));
1092 } else {
1093 ep_opt = Some(rest);
1094 }
1095 }
1096 }
1097 }
1098 } else if want == TypeId::of::<RgbImage>() {
1099 if let Some(ep) = ep_opt.take() {
1100 match ep.take_cpu::<RgbImage>() {
1101 Ok(cpu) => out = downcast_owned(Box::new(cpu)),
1102 Err(rest) => {
1103 if let Some(cpu) = rest.clone_cpu::<RgbImage>() {
1104 out = downcast_owned(Box::new(cpu));
1105 } else {
1106 ep_opt = Some(rest);
1107 }
1108 }
1109 }
1110 }
1111 } else if want == TypeId::of::<RgbaImage>() && let Some(ep) = ep_opt.take() {
1112 match ep.take_cpu::<RgbaImage>() {
1113 Ok(cpu) => out = downcast_owned(Box::new(cpu)),
1114 Err(rest) => {
1115 if let Some(cpu) = rest.clone_cpu::<RgbaImage>() {
1116 out = downcast_owned(Box::new(cpu));
1117 } else {
1118 ep_opt = Some(rest);
1119 }
1120 }
1121 }
1122 }
1123
1124 if out.is_none()
1125 && let Some(ep) = ep_opt.as_ref()
1126 && (want == TypeId::of::<DynamicImage>()
1127 || want == TypeId::of::<GrayImage>()
1128 || want == TypeId::of::<GrayAlphaImage>()
1129 || want == TypeId::of::<RgbImage>()
1130 || want == TypeId::of::<RgbaImage>())
1131 && let Some(img) = ep.clone_cpu::<DynamicImage>()
1132 {
1133 out = Self::dynamic_image_to_t::<T>(img);
1134 }
1135 if out.is_none()
1136 && let Some(ep) = ep_opt
1137 {
1138 payload.inner = EdgePayload::Payload(ep);
1139 }
1140 }
1141 EdgePayload::Any(a) => {
1142 let any = a;
1143 match Arc::downcast::<T>(any) {
1144 Ok(arc) => {
1145 out = Some(match Arc::try_unwrap(arc) {
1146 Ok(v) => v,
1147 Err(arc) => (*arc).clone(),
1148 });
1149 }
1150 Err(any) => match Arc::downcast::<i64>(any) {
1151 Ok(arc) => {
1152 let v = match Arc::try_unwrap(arc) {
1153 Ok(v) => v,
1154 Err(arc) => *arc,
1155 };
1156 out = Self::coerce_from_i64::<T>(v)
1157 .or_else(|| self.coerce_via_registry::<T>(&Value::Int(v)));
1158 }
1159 Err(any) => match Arc::downcast::<f64>(any) {
1160 Ok(arc) => {
1161 let v = match Arc::try_unwrap(arc) {
1162 Ok(v) => v,
1163 Err(arc) => *arc,
1164 };
1165 out = Self::coerce_from_f64::<T>(v)
1166 .or_else(|| self.coerce_via_registry::<T>(&Value::Float(v)));
1167 }
1168 Err(any) => match Arc::downcast::<bool>(any) {
1169 Ok(arc) => {
1170 let v = match Arc::try_unwrap(arc) {
1171 Ok(v) => v,
1172 Err(arc) => *arc,
1173 };
1174 let any_ref: &dyn Any = &v;
1175 out = any_ref
1176 .downcast_ref::<T>()
1177 .cloned()
1178 .or_else(|| self.coerce_via_registry::<T>(&Value::Bool(v)));
1179 }
1180 Err(any) => match Arc::downcast::<String>(any) {
1181 Ok(arc) => {
1182 let v = match Arc::try_unwrap(arc) {
1183 Ok(v) => v,
1184 Err(arc) => (*arc).clone(),
1185 };
1186 let any_ref: &dyn Any = &v;
1187 out = any_ref
1188 .downcast_ref::<T>()
1189 .cloned()
1190 .or_else(|| self.coerce_via_registry::<T>(&Value::String(v.into())));
1191 }
1192 Err(any) => match Arc::downcast::<daedalus_data::model::Value>(any) {
1193 Ok(arc) => {
1194 let v = match Arc::try_unwrap(arc) {
1195 Ok(v) => v,
1196 Err(arc) => (*arc).clone(),
1197 };
1198 out = self.coerce_from_value::<T>(&v);
1199 }
1200 Err(any) => {
1201 payload.inner = EdgePayload::Any(any);
1202 }
1203 },
1204 },
1205 },
1206 },
1207 },
1208 }
1209 }
1210 EdgePayload::Value(v) => {
1211 out = self.coerce_from_value::<T>(&v);
1212 payload.inner = EdgePayload::Value(v);
1213 }
1214 other => {
1215 payload.inner = other;
1216 }
1217 }
1218
1219 if out.is_none() {
1220 if std::env::var_os("DAEDALUS_TRACE_MISSING_INPUTS").is_some() {
1221 let desc = match &payload.inner {
1222 EdgePayload::Any(a) => format!("Any({})", std::any::type_name_of_val(a.as_ref())),
1223 #[cfg(feature = "gpu")]
1224 EdgePayload::Payload(ep) => format!("Payload({ep:?})"),
1225 #[cfg(feature = "gpu")]
1226 EdgePayload::GpuImage(_) => "GpuImage".to_string(),
1227 EdgePayload::Value(v) => format!("Value({v:?})"),
1228 EdgePayload::Bytes(_) => "Bytes".to_string(),
1229 EdgePayload::Unit => "Unit".to_string(),
1230 };
1231 eprintln!(
1232 "daedalus-runtime: input mismatch node={} port={} expected={} payload={}",
1233 self.node_id,
1234 port,
1235 std::any::type_name::<T>(),
1236 desc
1237 );
1238 }
1239 self.restore_input(idx, port, payload);
1240 }
1241 out
1242 }
1243
1244 pub fn get_any_all<T: Any + Clone + Send + Sync>(&self, port: &str) -> Vec<T> {
1246 #[cfg(feature = "gpu")]
1247 let want = TypeId::of::<T>();
1248 let mut out: Vec<T> = Vec::new();
1249 for p in self.inputs_for(port) {
1250 match &p.inner {
1251 EdgePayload::Any(a) => {
1252 if let Some(v) = a
1253 .downcast_ref::<T>()
1254 .cloned()
1255 .or_else(|| self.coerce_const_any::<T>(a.as_ref()))
1256 {
1257 out.push(v);
1258 }
1259 }
1260 #[cfg(feature = "gpu")]
1261 EdgePayload::Payload(ep) => {
1262 if let Some(img) = ep.clone_cpu::<DynamicImage>()
1263 && (want == TypeId::of::<DynamicImage>()
1264 || want == TypeId::of::<GrayImage>()
1265 || want == TypeId::of::<GrayAlphaImage>()
1266 || want == TypeId::of::<RgbImage>()
1267 || want == TypeId::of::<RgbaImage>())
1268 && let Some(v) = Self::dynamic_image_to_t::<T>(img)
1269 {
1270 out.push(v);
1271 }
1272 }
1273 #[cfg(feature = "gpu")]
1274 EdgePayload::GpuImage(h)
1275 if TypeId::of::<T>() == TypeId::of::<daedalus_gpu::GpuImageHandle>() =>
1276 {
1277 let any_ref: &dyn Any = h;
1278 if let Some(v) = any_ref.downcast_ref::<T>().cloned() {
1279 out.push(v);
1280 }
1281 }
1282 _ => {}
1283 }
1284 }
1285 out
1286 }
1287
1288 pub fn get_any_all_fanin<T: Any + Clone + Send + Sync>(&self, prefix: &str) -> Vec<T> {
1292 self.get_any_all_fanin_indexed::<T>(prefix)
1293 .into_iter()
1294 .map(|(_, v)| v)
1295 .collect()
1296 }
1297
1298 pub fn get_any_all_fanin_indexed<T: Any + Clone + Send + Sync>(&self, prefix: &str) -> Vec<(u32, T)> {
1301 let mut ports: BTreeMap<u32, String> = BTreeMap::new();
1302 for (port, _) in &self.inputs {
1303 if let Some(idx) = parse_indexed_port(prefix, port) {
1304 ports.entry(idx).or_insert_with(|| port.clone());
1305 }
1306 }
1307 let mut out = Vec::with_capacity(ports.len());
1308 for (idx, port) in ports {
1309 if let Some(v) = self.get_any::<T>(&port) {
1310 out.push((idx, v));
1311 }
1312 }
1313 out
1314 }
1315
1316 fn coerce_const_any<T: Any + Clone>(&self, v: &dyn Any) -> Option<T> {
1317 use daedalus_data::model::Value as V;
1318
1319 if let Some(i) = v
1320 .downcast_ref::<i64>()
1321 .copied()
1322 {
1323 return Self::coerce_from_i64::<T>(i)
1324 .or_else(|| self.coerce_via_registry::<T>(&V::Int(i)));
1325 }
1326
1327 if let Some(f) = v
1328 .downcast_ref::<f64>()
1329 .copied()
1330 {
1331 return Self::coerce_from_f64::<T>(f)
1332 .or_else(|| self.coerce_via_registry::<T>(&V::Float(f)));
1333 }
1334
1335 if let Some(b) = v
1336 .downcast_ref::<bool>()
1337 .copied()
1338 {
1339 let any_ref: &dyn Any = &b;
1340 return any_ref
1341 .downcast_ref::<T>()
1342 .cloned()
1343 .or_else(|| self.coerce_via_registry::<T>(&V::Bool(b)));
1344 }
1345
1346 if let Some(s) = v
1347 .downcast_ref::<String>()
1348 .cloned()
1349 {
1350 let any_ref: &dyn Any = &s;
1351 return any_ref
1352 .downcast_ref::<T>()
1353 .cloned()
1354 .or_else(|| self.coerce_via_registry::<T>(&V::String(s.into())));
1355 }
1356
1357 if let Some(val) = v
1358 .downcast_ref::<daedalus_data::model::Value>()
1359 .cloned()
1360 {
1361 return self.coerce_from_value::<T>(&val);
1362 }
1363
1364 None
1365 }
1366
1367 fn coerce_via_registry<T: Any + Clone>(&self, v: &daedalus_data::model::Value) -> Option<T> {
1368 let key = std::any::type_name::<T>();
1369 let global = GLOBAL_CONST_COERCERS.get_or_init(new_const_coercer_map);
1370 let map = self.const_coercers.as_ref().unwrap_or(global);
1371 let guard = map.read().ok()?;
1372 let coercer = guard.get(key)?;
1373 let out = coercer(v)?;
1374 out.downcast::<T>().ok().map(|b| (*b).clone())
1375 }
1376
1377 fn coerce_from_i64<T: Any + Clone>(v: i64) -> Option<T> {
1378 use std::any::TypeId;
1379 let want = TypeId::of::<T>();
1380
1381 macro_rules! cast_int {
1382 ($t:ty) => {{
1383 if want == TypeId::of::<$t>() {
1384 let out: $t = <$t>::try_from(v).ok()?;
1385 let any_ref: &dyn Any = &out;
1386 return any_ref.downcast_ref::<T>().cloned();
1387 }
1388 }};
1389 }
1390
1391 cast_int!(i8);
1392 cast_int!(i16);
1393 cast_int!(i32);
1394 cast_int!(i64);
1395 cast_int!(isize);
1396 cast_int!(u8);
1397 cast_int!(u16);
1398 cast_int!(u32);
1399 cast_int!(u64);
1400 cast_int!(usize);
1401
1402 if want == TypeId::of::<f32>() {
1403 let out = v as f32;
1404 let any_ref: &dyn Any = &out;
1405 return any_ref.downcast_ref::<T>().cloned();
1406 }
1407 if want == TypeId::of::<f64>() {
1408 let out = v as f64;
1409 let any_ref: &dyn Any = &out;
1410 return any_ref.downcast_ref::<T>().cloned();
1411 }
1412
1413 None
1414 }
1415
1416 fn coerce_from_f64<T: Any + Clone>(v: f64) -> Option<T> {
1417 use std::any::TypeId;
1418 let want = TypeId::of::<T>();
1419
1420 if want == TypeId::of::<f32>() {
1421 let out = v as f32;
1422 let any_ref: &dyn Any = &out;
1423 return any_ref.downcast_ref::<T>().cloned();
1424 }
1425 if want == TypeId::of::<f64>() {
1426 let out = v;
1427 let any_ref: &dyn Any = &out;
1428 return any_ref.downcast_ref::<T>().cloned();
1429 }
1430
1431 if v.fract() == 0.0 {
1433 let as_i = v as i64;
1434 return Self::coerce_from_i64::<T>(as_i);
1435 }
1436
1437 None
1438 }
1439
1440 pub fn get_typed<T>(&self, port: &str) -> Option<T>
1449 where
1450 T: Any + Clone + Send + Sync,
1451 {
1452 let want = std::any::TypeId::of::<T>();
1453 let initial = self.get_any::<T>(port);
1454 if log::log_enabled!(log::Level::Debug) && want == std::any::TypeId::of::<image::DynamicImage>() {
1455 log::debug!(
1456 "get_typed dynamic_image port={} has_any={}",
1457 port,
1458 initial.is_some()
1459 );
1460 }
1461 if let Some(v) = initial {
1462 return Some(v);
1463 }
1464
1465 if let Some(v) = self.get_any::<i64>(port) {
1467 if let Some(t) = Self::coerce_from_i64::<T>(v) {
1468 return Some(t);
1469 }
1470 if let Some(t) = self.coerce_via_registry::<T>(&Value::Int(v)) {
1471 return Some(t);
1472 }
1473 }
1474 if let Some(v) = self.get_any::<f64>(port) {
1475 if let Some(t) = Self::coerce_from_f64::<T>(v) {
1476 return Some(t);
1477 }
1478 if let Some(t) = self.coerce_via_registry::<T>(&Value::Float(v)) {
1479 return Some(t);
1480 }
1481 }
1482 if let Some(v) = self.get_any::<bool>(port) {
1483 let any_ref: &dyn Any = &v;
1484 if let Some(t) = any_ref.downcast_ref::<T>().cloned() {
1485 return Some(t);
1486 }
1487 if let Some(t) = self.coerce_via_registry::<T>(&Value::Bool(v)) {
1488 return Some(t);
1489 }
1490 }
1491 if let Some(v) = self.get_any::<String>(port) {
1492 let any_ref: &dyn Any = &v;
1493 if let Some(t) = any_ref.downcast_ref::<T>().cloned() {
1494 return Some(t);
1495 }
1496 if let Some(t) = self.coerce_via_registry::<T>(&Value::String(v.into())) {
1497 return Some(t);
1498 }
1499 }
1500 if let Some(v) = self.get_any::<daedalus_data::model::Value>(port)
1501 && let Some(t) = self.coerce_from_value::<T>(&v) {
1502 return Some(t);
1503 }
1504
1505 if let Some(v) = self.get_value(port)
1506 && let Some(t) = self.coerce_from_value::<T>(v) {
1507 return Some(t);
1508 }
1509
1510 None
1511 }
1512
1513 fn coerce_from_value<T: Any + Clone>(&self, v: &daedalus_data::model::Value) -> Option<T> {
1514 use daedalus_data::model::Value as V;
1515
1516 let any_ref: &dyn Any = v;
1517 if let Some(t) = any_ref.downcast_ref::<T>().cloned() {
1518 return Some(t);
1519 }
1520
1521 if let Some(name) = Self::enum_name_from_index::<T>(v)
1522 && let Some(t) = {
1523 let any_ref: &dyn Any = &name;
1524 any_ref.downcast_ref::<T>().cloned()
1525 }
1526 {
1527 return Some(t);
1528 }
1529
1530 let j: Option<daedalus_data::model::Value> = match v {
1531 V::Int(i) => {
1532 if let Some(t) = Self::coerce_from_i64::<T>(*i) {
1533 return Some(t);
1534 }
1535 Some(V::Int(*i))
1536 }
1537 V::Float(f) => {
1538 if let Some(t) = Self::coerce_from_f64::<T>(*f) {
1539 return Some(t);
1540 }
1541 Some(V::Float(*f))
1542 }
1543 V::Bool(b) => {
1544 let any_ref: &dyn Any = b;
1545 if let Some(t) = any_ref.downcast_ref::<T>().cloned() {
1546 return Some(t);
1547 }
1548 Some(V::Bool(*b))
1549 }
1550 V::String(s) => {
1551 let owned = s.clone().into_owned();
1552 let any_ref: &dyn Any = &owned;
1553 if let Some(t) = any_ref.downcast_ref::<T>().cloned() {
1554 return Some(t);
1555 }
1556 Some(V::String(owned.into()))
1558 }
1559 V::Enum(_) => Some(v.clone()),
1561 other => Some(other.clone()),
1562 };
1563
1564 j.as_ref().and_then(|v| self.coerce_via_registry::<T>(v))
1565 }
1566
1567 fn enum_name_from_index<T: Any>(v: &daedalus_data::model::Value) -> Option<String> {
1568 let Value::Int(raw) = v else { return None };
1569 if *raw < 0 {
1570 return None;
1571 }
1572 let idx = *raw as usize;
1573 let expr = typing::override_type_expr::<T>()
1574 .or_else(|| typing::lookup_type_by_rust_name(std::any::type_name::<T>()));
1575 match expr {
1576 Some(TypeExpr::Enum(variants)) => variants.get(idx).map(|ev| ev.name.clone()),
1577 _ => None,
1578 }
1579 }
1580
1581 pub fn get_any_raw(&self, port: &str) -> Option<&dyn Any> {
1583 self.inputs_for(port).find_map(|p| match &p.inner {
1584 EdgePayload::Any(a) => Some(a.as_ref() as &dyn Any),
1585 _ => None,
1586 })
1587 }
1588
1589 #[cfg(feature = "gpu")]
1590 pub fn get_erased_payload(&self, port: &str) -> Option<&daedalus_gpu::ErasedPayload> {
1591 self.inputs_for(port).find_map(|p| match &p.inner {
1592 EdgePayload::Payload(ep) => Some(ep),
1593 _ => None,
1594 })
1595 }
1596
1597 #[cfg(feature = "gpu")]
1598 pub fn get_payload<T>(&self, port: &str) -> Option<daedalus_gpu::Payload<T>>
1599 where
1600 T: daedalus_gpu::GpuSendable + Clone + Send + Sync + 'static,
1601 T::GpuRepr: Clone + Send + Sync + 'static,
1602 {
1603 let wants_gpu = self.gpu.is_some()
1610 && matches!(
1611 self.target_compute,
1612 daedalus_planner::ComputeAffinity::GpuPreferred
1613 | daedalus_planner::ComputeAffinity::GpuRequired
1614 );
1615 for p in self.inputs_for(port) {
1616 match &p.inner {
1617 EdgePayload::Payload(ep) => {
1618 if wants_gpu {
1619 if let Some(g) = ep.clone_gpu::<T>() {
1620 return Some(daedalus_gpu::Payload::Gpu(g));
1621 }
1622 if let Some(cpu) = ep.clone_cpu::<T>()
1623 && let Some(ctx) = &self.gpu
1624 && let Ok(handle) = cpu.upload(ctx)
1625 {
1626 return Some(daedalus_gpu::Payload::Gpu(handle));
1627 }
1628 } else {
1629 if let Some(cpu) = ep.clone_cpu::<T>() {
1630 return Some(daedalus_gpu::Payload::Cpu(cpu));
1631 }
1632 if let Some(g) = ep.clone_gpu::<T>()
1633 && let Some(ctx) = &self.gpu
1634 && let Ok(cpu) = T::download(&g, ctx)
1635 {
1636 return Some(daedalus_gpu::Payload::Cpu(cpu));
1637 }
1638 }
1639 }
1640 EdgePayload::Any(a) => {
1641 let payload_any: Option<daedalus_gpu::Payload<T>> = a
1646 .downcast_ref::<daedalus_gpu::Payload<T>>()
1647 .cloned();
1648 if let Some(payload_any) = payload_any {
1649 if wants_gpu {
1650 match payload_any {
1651 daedalus_gpu::Payload::Gpu(g) => {
1652 return Some(daedalus_gpu::Payload::Gpu(g));
1653 }
1654 daedalus_gpu::Payload::Cpu(cpu) => {
1655 if let Some(ctx) = &self.gpu
1656 && let Ok(handle) = cpu.upload(ctx)
1657 {
1658 return Some(daedalus_gpu::Payload::Gpu(handle));
1659 }
1660 }
1661 }
1662 } else {
1663 match payload_any {
1664 daedalus_gpu::Payload::Cpu(cpu) => {
1665 return Some(daedalus_gpu::Payload::Cpu(cpu));
1666 }
1667 daedalus_gpu::Payload::Gpu(g) => {
1668 if let Some(ctx) = &self.gpu
1669 && let Ok(cpu) = T::download(&g, ctx)
1670 {
1671 return Some(daedalus_gpu::Payload::Cpu(cpu));
1672 }
1673 }
1674 }
1675 }
1676 }
1677
1678 if wants_gpu {
1679 let cpu = a
1680 .downcast_ref::<T>()
1681 .cloned();
1682 if let Some(cpu) = cpu
1683 && let Some(ctx) = &self.gpu
1684 && let Ok(handle) = cpu.upload(ctx)
1685 {
1686 return Some(daedalus_gpu::Payload::Gpu(handle));
1687 }
1688 if let Some(converted) = crate::convert::convert_arc::<T>(a)
1689 && let Some(ctx) = &self.gpu
1690 && let Ok(handle) = converted.upload(ctx)
1691 {
1692 return Some(daedalus_gpu::Payload::Gpu(handle));
1693 }
1694 if let Some(ep) = a.downcast_ref::<daedalus_gpu::ErasedPayload>()
1695 && let Ok(uploaded) = ep.upload(self.gpu.as_ref()?)
1696 && let Some(g) = uploaded.as_gpu::<T>()
1697 {
1698 return Some(daedalus_gpu::Payload::Gpu(g.clone()));
1699 }
1700 } else {
1701 let cpu = a
1702 .downcast_ref::<T>()
1703 .cloned();
1704 if let Some(cpu) = cpu {
1705 return Some(daedalus_gpu::Payload::Cpu(cpu));
1706 }
1707 if let Some(converted) = crate::convert::convert_arc::<T>(a) {
1708 return Some(daedalus_gpu::Payload::Cpu(converted));
1709 }
1710 let g = a
1711 .downcast_ref::<T::GpuRepr>()
1712 .cloned();
1713 if let Some(g) = g
1714 && let Some(ctx) = &self.gpu
1715 && let Ok(cpu) = T::download(&g, ctx)
1716 {
1717 return Some(daedalus_gpu::Payload::Cpu(cpu));
1718 }
1719 if let Some(ep) = a.downcast_ref::<daedalus_gpu::ErasedPayload>()
1720 && let Ok(downloaded) = ep.download(self.gpu.as_ref()?)
1721 && let Some(cpu) = downloaded.as_cpu::<T>()
1722 {
1723 return Some(daedalus_gpu::Payload::Cpu(cpu.clone()));
1724 }
1725 }
1726 }
1727 EdgePayload::GpuImage(h) => {
1728 if TypeId::of::<T::GpuRepr>() == TypeId::of::<daedalus_gpu::GpuImageHandle>() {
1729 if wants_gpu {
1730 let any_ref: &dyn Any = h;
1731 if let Some(repr) = any_ref.downcast_ref::<T::GpuRepr>() {
1732 return Some(daedalus_gpu::Payload::Gpu(repr.clone()));
1733 }
1734 } else if let Some(ctx) = &self.gpu {
1735 let any_ref: &dyn Any = h;
1736 if let Some(repr) = any_ref.downcast_ref::<T::GpuRepr>()
1737 && let Ok(cpu) = T::download(repr, ctx)
1738 {
1739 return Some(daedalus_gpu::Payload::Cpu(cpu));
1740 }
1741 }
1742 }
1743 }
1744 _ => {}
1745 }
1746 }
1747 None
1748 }
1749
1750 #[cfg(feature = "gpu")]
1751 pub fn get_payload_mut<T>(&mut self, port: &str) -> Option<daedalus_gpu::Payload<T>>
1752 where
1753 T: daedalus_gpu::GpuSendable + Clone + Send + Sync + 'static,
1754 T::GpuRepr: Clone + Send + Sync + 'static,
1755 {
1756 let wants_gpu = self.gpu.is_some()
1757 && matches!(
1758 self.target_compute,
1759 daedalus_planner::ComputeAffinity::GpuPreferred
1760 | daedalus_planner::ComputeAffinity::GpuRequired
1761 );
1762 let (idx, mut payload) = self.take_input(port)?;
1763
1764 let mut out: Option<daedalus_gpu::Payload<T>> = None;
1765 match std::mem::replace(&mut payload.inner, EdgePayload::Unit) {
1766 EdgePayload::Payload(ep) => {
1767 let mut ep_opt = Some(ep);
1768 if wants_gpu {
1769 if let Some(ep) = ep_opt.as_ref() {
1770 if let Some(g) = ep.clone_gpu::<T>() {
1771 out = Some(daedalus_gpu::Payload::Gpu(g));
1772 } else if let Some(cpu) = ep.clone_cpu::<T>()
1773 && let Some(ctx) = &self.gpu
1774 && let Ok(handle) = cpu.upload(ctx)
1775 {
1776 out = Some(daedalus_gpu::Payload::Gpu(handle));
1777 }
1778 }
1779 } else if let Some(ep) = ep_opt.take() {
1780 match ep.take_cpu::<T>() {
1781 Ok(cpu) => out = Some(daedalus_gpu::Payload::Cpu(cpu)),
1782 Err(rest) => {
1783 ep_opt = Some(rest);
1784 if let Some(ep) = ep_opt.as_ref() {
1785 if let Some(cpu) = ep.clone_cpu::<T>() {
1786 out = Some(daedalus_gpu::Payload::Cpu(cpu));
1787 } else if let Some(g) = ep.clone_gpu::<T>()
1788 && let Some(ctx) = &self.gpu
1789 && let Ok(cpu) = T::download(&g, ctx)
1790 {
1791 out = Some(daedalus_gpu::Payload::Cpu(cpu));
1792 }
1793 }
1794 }
1795 }
1796 }
1797 if out.is_none()
1798 && let Some(ep) = ep_opt
1799 {
1800 payload.inner = EdgePayload::Payload(ep);
1801 }
1802 }
1803 EdgePayload::Any(a) => {
1804 let any = a;
1805 match Arc::downcast::<daedalus_gpu::Payload<T>>(any) {
1806 Ok(arc) => {
1807 let payload_any = match Arc::try_unwrap(arc) {
1808 Ok(v) => v,
1809 Err(arc) => (*arc).clone(),
1810 };
1811 if wants_gpu {
1812 match payload_any {
1813 daedalus_gpu::Payload::Gpu(g) => out = Some(daedalus_gpu::Payload::Gpu(g)),
1814 daedalus_gpu::Payload::Cpu(cpu) => {
1815 if let Some(ctx) = &self.gpu
1816 && let Ok(handle) = cpu.upload(ctx)
1817 {
1818 out = Some(daedalus_gpu::Payload::Gpu(handle));
1819 }
1820 }
1821 }
1822 } else {
1823 match payload_any {
1824 daedalus_gpu::Payload::Cpu(cpu) => out = Some(daedalus_gpu::Payload::Cpu(cpu)),
1825 daedalus_gpu::Payload::Gpu(g) => {
1826 if let Some(ctx) = &self.gpu
1827 && let Ok(cpu) = T::download(&g, ctx)
1828 {
1829 out = Some(daedalus_gpu::Payload::Cpu(cpu));
1830 }
1831 }
1832 }
1833 }
1834 }
1835 Err(any) => {
1836 if wants_gpu {
1837 match Arc::downcast::<T>(any) {
1838 Ok(arc) => {
1839 let cpu = match Arc::try_unwrap(arc) {
1840 Ok(v) => v,
1841 Err(arc) => (*arc).clone(),
1842 };
1843 if let Some(ctx) = &self.gpu
1844 && let Ok(handle) = cpu.upload(ctx)
1845 {
1846 out = Some(daedalus_gpu::Payload::Gpu(handle));
1847 }
1848 }
1849 Err(any) => {
1850 payload.inner = EdgePayload::Any(any);
1851 }
1852 }
1853 } else {
1854 match Arc::downcast::<T>(any) {
1855 Ok(arc) => {
1856 let cpu = match Arc::try_unwrap(arc) {
1857 Ok(v) => v,
1858 Err(arc) => (*arc).clone(),
1859 };
1860 out = Some(daedalus_gpu::Payload::Cpu(cpu));
1861 }
1862 Err(any) => {
1863 payload.inner = EdgePayload::Any(any);
1864 }
1865 }
1866 }
1867 }
1868 }
1869 }
1870 EdgePayload::GpuImage(h) => {
1871 payload.inner = EdgePayload::GpuImage(h);
1872 }
1873 other => {
1874 payload.inner = other;
1875 }
1876 }
1877
1878 if out.is_none() {
1879 self.restore_input(idx, port, payload);
1880 }
1881 out
1882 }
1883
1884 #[cfg(feature = "gpu")]
1885 fn convert_incoming(
1886 mut payload: CorrelatedPayload,
1887 edge_idx: usize,
1888 entries: &HashSet<usize>,
1889 exits: &HashSet<usize>,
1890 gpu: Option<&daedalus_gpu::GpuContextHandle>,
1891 telemetry: &mut ExecutionTelemetry,
1892 ) -> CorrelatedPayload {
1893 let Some(ctx) = gpu else {
1894 return payload;
1895 };
1896 if entries.contains(&edge_idx) {
1897 telemetry.record_edge_gpu_transfer(edge_idx, true);
1898 payload.inner = match payload.inner {
1899 EdgePayload::Any(ref a) => {
1900 if let Some(ep) = a.downcast_ref::<daedalus_gpu::ErasedPayload>() {
1901 ep.upload(ctx)
1902 .map(EdgePayload::Payload)
1903 .unwrap_or_else(|_| EdgePayload::Any(a.clone()))
1904 } else if let Some(img) = a.downcast_ref::<daedalus_gpu::GpuImageHandle>() {
1905 EdgePayload::GpuImage(img.clone())
1906 } else {
1907 EdgePayload::Any(a.clone())
1908 }
1909 }
1910 EdgePayload::Payload(ref ep) => ep
1911 .upload(ctx)
1912 .map(EdgePayload::Payload)
1913 .unwrap_or_else(|_| EdgePayload::Payload(ep.clone())),
1914 other => other,
1915 };
1916 } else if exits.contains(&edge_idx) {
1917 telemetry.record_edge_gpu_transfer(edge_idx, false);
1918 payload.inner = match payload.inner {
1919 EdgePayload::Any(ref a) => {
1920 if let Some(ep) = a.downcast_ref::<daedalus_gpu::ErasedPayload>() {
1921 ep.download(ctx)
1922 .map(EdgePayload::Payload)
1923 .unwrap_or_else(|_| EdgePayload::Any(a.clone()))
1924 } else {
1925 EdgePayload::Any(a.clone())
1926 }
1927 }
1928 EdgePayload::Payload(ref ep) => ep
1929 .download(ctx)
1930 .map(EdgePayload::Payload)
1931 .unwrap_or_else(|_| EdgePayload::Payload(ep.clone())),
1932 EdgePayload::GpuImage(h) => EdgePayload::GpuImage(h),
1933 other => other,
1934 };
1935 }
1936 payload
1937 }
1938
1939 pub fn get_value(&self, port: &str) -> Option<&Value> {
1949 self.inputs_for(port).find_map(|p| match &p.inner {
1950 EdgePayload::Value(v) => Some(v),
1951 _ => None,
1952 })
1953 }
1954
1955 pub fn get_int(&self, port: &str) -> Option<i64> {
1956 self.get_value(port).and_then(|v| match v {
1957 daedalus_data::model::Value::Int(i) => Some(*i),
1958 _ => None,
1959 })
1960 }
1961
1962 pub fn get_float(&self, port: &str) -> Option<f64> {
1963 self.get_value(port).and_then(|v| match v {
1964 daedalus_data::model::Value::Float(f) => Some(*f),
1965 _ => None,
1966 })
1967 }
1968
1969 pub fn inputs_grouped(&self) -> Vec<(String, Vec<&CorrelatedPayload>)> {
1971 let mut groups: Vec<(String, Vec<&CorrelatedPayload>)> = Vec::new();
1972 for (port, payload) in &self.inputs {
1973 if let Some((_, vec)) = groups.iter_mut().find(|(p, _)| p == port) {
1974 vec.push(payload);
1975 } else {
1976 groups.push((port.clone(), vec![payload]));
1977 }
1978 }
1979 groups
1980 }
1981
1982 pub fn flush(&mut self) -> Result<(), crate::executor::ExecuteError> {
1984 Ok(())
1985 }
1986}
1987
1988fn edge_payload_desc(payload: &EdgePayload) -> String {
1989 match payload {
1990 EdgePayload::Any(a) => format!("Any({})", std::any::type_name_of_val(a.as_ref())),
1991 #[cfg(feature = "gpu")]
1992 EdgePayload::Payload(ep) => format!("Payload({ep:?})"),
1993 #[cfg(feature = "gpu")]
1994 EdgePayload::GpuImage(_) => "GpuImage".to_string(),
1995 EdgePayload::Value(v) => format!("Value({v:?})"),
1996 EdgePayload::Bytes(_) => "Bytes".to_string(),
1997 EdgePayload::Unit => "Unit".to_string(),
1998 }
1999}
2000
2001fn align_drained_inputs(
2002 drained: Vec<DrainedInput>,
2003 sync_groups: &[SyncGroup],
2004) -> (Vec<(String, CorrelatedPayload)>, Vec<DrainedInput>, bool) {
2005 if sync_groups.is_empty() {
2006 let inputs = drained
2007 .into_iter()
2008 .map(|d| (d.port, d.payload))
2009 .collect::<Vec<_>>();
2010 return (inputs, Vec::new(), true);
2011 }
2012
2013 let mut grouped_ports: std::collections::HashSet<String> = std::collections::HashSet::new();
2014 for group in sync_groups {
2015 if group.ports.is_empty() {
2016 continue;
2017 }
2018 for port in &group.ports {
2019 grouped_ports.insert(port.clone());
2020 }
2021 }
2022
2023 let mut per_port: std::collections::HashMap<String, VecDeque<DrainedInput>> =
2024 std::collections::HashMap::new();
2025 for item in drained {
2026 per_port
2027 .entry(item.port.clone())
2028 .or_default()
2029 .push_back(item);
2030 }
2031
2032 let mut selected: Vec<DrainedInput> = Vec::new();
2033 let mut all_groups_ready = true;
2034
2035 for group in sync_groups {
2036 if group.ports.is_empty() {
2037 continue;
2040 }
2041 match group.policy {
2042 SyncPolicy::Latest => {
2043 for port in &group.ports {
2045 let Some(q) = per_port.get_mut(port) else {
2046 all_groups_ready = false;
2047 break;
2048 };
2049 if q.is_empty() {
2050 all_groups_ready = false;
2051 break;
2052 }
2053 }
2054 if !all_groups_ready {
2055 break;
2056 }
2057 for port in &group.ports {
2058 if let Some(q) = per_port.get_mut(port)
2059 && let Some(payload) = q.pop_back()
2060 {
2061 q.clear();
2062 selected.push(payload);
2063 }
2064 }
2065 }
2066 SyncPolicy::AllReady | SyncPolicy::ZipByTag => {
2067 let mut common: Option<std::collections::HashSet<u64>> = None;
2069 for port in &group.ports {
2070 let Some(q) = per_port.get(port) else {
2071 all_groups_ready = false;
2072 break;
2073 };
2074 let ids: std::collections::HashSet<u64> =
2075 q.iter().map(|cp| cp.payload.correlation_id).collect();
2076 common = match common {
2077 None => Some(ids),
2078 Some(mut acc) => {
2079 acc.retain(|id| ids.contains(id));
2080 Some(acc)
2081 }
2082 };
2083 if common.as_ref().is_some_and(|acc| acc.is_empty()) {
2084 all_groups_ready = false;
2085 break;
2086 }
2087 }
2088 if !all_groups_ready {
2089 break;
2090 }
2091 let Some(common) = common else {
2092 all_groups_ready = false;
2093 break;
2094 };
2095 let Some(target_id) = common.iter().copied().min() else {
2096 all_groups_ready = false;
2097 break;
2098 };
2099
2100 for port in &group.ports {
2101 if let Some(q) = per_port.get_mut(port)
2102 && let Some(idx) = q
2103 .iter()
2104 .position(|cp| cp.payload.correlation_id == target_id)
2105 {
2106 let payload = q.remove(idx).unwrap();
2107 selected.push(payload);
2108 } else {
2109 all_groups_ready = false;
2110 break;
2111 }
2112 }
2113 if !all_groups_ready {
2114 break;
2115 }
2116 }
2117 }
2118 }
2119
2120 if !all_groups_ready {
2121 let mut leftovers: Vec<DrainedInput> = Vec::new();
2123 leftovers.extend(selected);
2124 for (_, mut q) in per_port {
2125 while let Some(item) = q.pop_front() {
2126 leftovers.push(item);
2127 }
2128 }
2129 return (Vec::new(), leftovers, false);
2130 }
2131
2132 let mut inputs: Vec<(String, CorrelatedPayload)> =
2134 selected.into_iter().map(|d| (d.port, d.payload)).collect();
2135
2136 let mut leftovers: Vec<DrainedInput> = Vec::new();
2138 for (port, mut q) in per_port {
2139 if grouped_ports.contains(&port) {
2140 while let Some(item) = q.pop_front() {
2141 leftovers.push(item);
2142 }
2143 continue;
2144 }
2145 while let Some(item) = q.pop_front() {
2146 inputs.push((item.port, item.payload));
2147 }
2148 }
2149
2150 (inputs, leftovers, true)
2151}
2152
2153fn requeue_drained(
2154 leftovers: Vec<DrainedInput>,
2155 queues: &Arc<Vec<EdgeStorage>>,
2156 edges: &[EdgeInfo],
2157) {
2158 if leftovers.is_empty() {
2159 return;
2160 }
2161
2162 let mut per_edge: HashMap<usize, Vec<CorrelatedPayload>> = HashMap::new();
2163 for item in leftovers {
2164 per_edge
2165 .entry(item.edge_idx)
2166 .or_default()
2167 .push(item.payload);
2168 }
2169
2170 for (edge_idx, payloads) in per_edge {
2171 let Some(storage) = queues.get(edge_idx) else {
2172 continue;
2173 };
2174 let policy = edges
2175 .get(edge_idx)
2176 .map(|(_, _, _, _, policy)| policy)
2177 .cloned()
2178 .unwrap_or(EdgePolicyKind::Fifo);
2179 match storage {
2180 EdgeStorage::Locked(q_arc) => {
2181 if let Ok(mut q) = q_arc.lock() {
2182 q.ensure_policy(&policy);
2183 for mut payload in payloads {
2184 payload.enqueued_at = Instant::now();
2185 let _ = q.push(&policy, payload);
2186 }
2187 }
2188 }
2189 #[cfg(feature = "lockfree-queues")]
2190 EdgeStorage::BoundedLf(q) => {
2191 for mut payload in payloads {
2192 payload.enqueued_at = Instant::now();
2193 if q.push(payload.clone()).is_err() {
2194 let _ = q.pop();
2195 let _ = q.push(payload);
2196 }
2197 }
2198 }
2199 }
2200 }
2201}
2202
2203#[cfg(test)]
2204mod tests {
2205 use super::*;
2206 use crate::executor::ExecutionTelemetry;
2207 use crate::executor::queue::{EdgeQueue, EdgeStorage, RingBuf};
2208 use daedalus_planner::NodeRef;
2209 use serde::Deserialize;
2210 use std::collections::HashSet;
2211
2212 fn payload(v: i32, corr: u64) -> CorrelatedPayload {
2213 CorrelatedPayload {
2214 correlation_id: corr,
2215 inner: EdgePayload::Any(Arc::new(v)),
2216 enqueued_at: std::time::Instant::now(),
2217 }
2218 }
2219
2220 #[test]
2221 fn aligns_all_ready() {
2222 let drained = vec![
2223 DrainedInput {
2224 port: "a".into(),
2225 edge_idx: 0,
2226 payload: payload(1, 1),
2227 },
2228 DrainedInput {
2229 port: "a".into(),
2230 edge_idx: 0,
2231 payload: payload(2, 2),
2232 },
2233 DrainedInput {
2234 port: "b".into(),
2235 edge_idx: 1,
2236 payload: payload(10, 1),
2237 },
2238 DrainedInput {
2239 port: "b".into(),
2240 edge_idx: 1,
2241 payload: payload(20, 2),
2242 },
2243 ];
2244 let group = SyncGroup {
2245 name: "g".into(),
2246 policy: SyncPolicy::AllReady,
2247 backpressure: None,
2248 capacity: None,
2249 ports: vec!["a".into(), "b".into()],
2250 };
2251 let (out, leftovers, ready) = align_drained_inputs(drained, &[group]);
2252 assert!(ready);
2253 let vals: Vec<(String, i32)> = out
2254 .into_iter()
2255 .map(|(p, pl)| {
2256 (
2257 p,
2258 match pl.inner {
2259 EdgePayload::Any(ref a) => *a.downcast_ref::<i32>().unwrap(),
2260 _ => panic!("expected Any"),
2261 },
2262 )
2263 })
2264 .collect();
2265 assert_eq!(vals, vec![("a".into(), 1), ("b".into(), 10)]);
2266
2267 let mut left_vals: Vec<(String, i32)> = leftovers
2268 .into_iter()
2269 .map(|d| {
2270 let v = match d.payload.inner {
2271 EdgePayload::Any(ref a) => *a.downcast_ref::<i32>().unwrap(),
2272 _ => panic!("expected Any"),
2273 };
2274 (d.port, v)
2275 })
2276 .collect();
2277 left_vals.sort_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2278 assert_eq!(left_vals, vec![("a".into(), 2), ("b".into(), 20)]);
2279 }
2280
2281 #[test]
2282 fn aligns_latest() {
2283 let drained = vec![
2284 DrainedInput {
2285 port: "a".into(),
2286 edge_idx: 0,
2287 payload: payload(1, 1),
2288 },
2289 DrainedInput {
2290 port: "a".into(),
2291 edge_idx: 0,
2292 payload: payload(2, 2),
2293 },
2294 DrainedInput {
2295 port: "b".into(),
2296 edge_idx: 1,
2297 payload: payload(10, 3),
2298 },
2299 DrainedInput {
2300 port: "b".into(),
2301 edge_idx: 1,
2302 payload: payload(20, 4),
2303 },
2304 ];
2305 let group = SyncGroup {
2306 name: "g".into(),
2307 policy: SyncPolicy::Latest,
2308 backpressure: None,
2309 capacity: None,
2310 ports: vec!["a".into(), "b".into()],
2311 };
2312 let (out, leftovers, ready) = align_drained_inputs(drained, &[group]);
2313 assert!(ready);
2314 assert!(leftovers.is_empty());
2315 let vals: Vec<(String, i32)> = out
2316 .into_iter()
2317 .map(|(p, pl)| {
2318 (
2319 p,
2320 match pl.inner {
2321 EdgePayload::Any(ref a) => *a.downcast_ref::<i32>().unwrap(),
2322 _ => panic!("expected Any"),
2323 },
2324 )
2325 })
2326 .collect();
2327 assert_eq!(vals, vec![("a".into(), 2), ("b".into(), 20)]);
2328 }
2329
2330 #[test]
2331 fn aligns_zip_by_tag() {
2332 use daedalus_data::model::{StructFieldValue, Value};
2333 let tagged = |tag: &str, v: i32, corr: u64| {
2334 let fields = vec![
2335 StructFieldValue {
2336 name: "tag".to_string(),
2337 value: Value::String(tag.to_string().into()),
2338 },
2339 StructFieldValue {
2340 name: "v".to_string(),
2341 value: Value::Int(i64::from(v)),
2342 },
2343 ];
2344 CorrelatedPayload {
2345 correlation_id: corr,
2346 inner: EdgePayload::Value(Value::Struct(fields)),
2347 enqueued_at: std::time::Instant::now(),
2348 }
2349 };
2350 let inputs = vec![
2351 ("a".into(), tagged("x", 1, 1)),
2352 ("b".into(), tagged("y", 200, 2)),
2353 ("b".into(), tagged("x", 100, 1)),
2354 ("a".into(), tagged("y", 2, 2)),
2355 ];
2356 let group = SyncGroup {
2357 name: "g".into(),
2358 policy: SyncPolicy::ZipByTag,
2359 backpressure: None,
2360 capacity: None,
2361 ports: vec!["a".into(), "b".into()],
2362 };
2363 let drained = inputs
2364 .into_iter()
2365 .enumerate()
2366 .map(|(idx, (port, payload))| DrainedInput {
2367 port,
2368 edge_idx: idx,
2369 payload,
2370 })
2371 .collect::<Vec<_>>();
2372 let (out, leftovers, ready) = align_drained_inputs(drained, &[group]);
2373 assert!(ready);
2374 assert!(!out.is_empty());
2375 assert!(!leftovers.is_empty());
2376 let tags: Vec<String> = out
2377 .chunks(2)
2378 .map(|chunk| {
2379 chunk
2380 .iter()
2381 .map(|(_, p)| match &p.inner {
2382 EdgePayload::Value(Value::Struct(fields)) => fields
2383 .iter()
2384 .find(|f| f.name == "tag")
2385 .and_then(|f| match &f.value {
2386 Value::String(s) => Some(s.to_string()),
2387 _ => None,
2388 })
2389 .unwrap(),
2390 _ => "missing".to_string(),
2391 })
2392 .next()
2393 .unwrap()
2394 })
2395 .collect();
2396 assert_eq!(tags, vec!["x".to_string()]);
2397 }
2398
2399 #[test]
2400 fn not_ready_returns_no_inputs_and_requeues() {
2401 let drained = vec![DrainedInput {
2402 port: "a".into(),
2403 edge_idx: 0,
2404 payload: payload(1, 1),
2405 }];
2406 let group = SyncGroup {
2407 name: "g".into(),
2408 policy: SyncPolicy::AllReady,
2409 backpressure: None,
2410 capacity: None,
2411 ports: vec!["a".into(), "b".into()],
2412 };
2413 let (out, leftovers, ready) = align_drained_inputs(drained, &[group]);
2414 assert!(!ready);
2415 assert!(out.is_empty());
2416 assert_eq!(leftovers.len(), 1);
2417 }
2418
2419 #[test]
2420 fn port_override_applies_backpressure_and_capacity() {
2421 let queues = Arc::new(vec![EdgeStorage::Locked(Arc::new(std::sync::Mutex::new(
2422 EdgeQueue::Bounded {
2423 ring: RingBuf::new(5),
2424 },
2425 )))]);
2426 let edges = vec![(
2427 NodeRef(0),
2428 "out".to_string(),
2429 NodeRef(1),
2430 "in".to_string(),
2431 EdgePolicyKind::Bounded { cap: 5 },
2432 )];
2433 let sg = SyncGroup {
2434 name: "g".into(),
2435 policy: SyncPolicy::AllReady,
2436 backpressure: Some(BackpressureStrategy::ErrorOnOverflow),
2437 capacity: Some(1),
2438 ports: vec!["out".into()],
2439 };
2440 let warnings = Arc::new(std::sync::Mutex::new(HashSet::new()));
2441 let mut telem = ExecutionTelemetry::default();
2442
2443 #[cfg(feature = "gpu")]
2444 let gpu_entry_edges = HashSet::new();
2445 #[cfg(feature = "gpu")]
2446 let gpu_exit_edges = HashSet::new();
2447 #[cfg(feature = "gpu")]
2448 let payload_edges = HashSet::new();
2449
2450 let mut io = NodeIo::new(
2451 vec![],
2452 vec![0],
2453 &queues,
2454 &warnings,
2455 &edges,
2456 vec![sg],
2457 #[cfg(feature = "gpu")]
2458 &gpu_entry_edges,
2459 #[cfg(feature = "gpu")]
2460 &gpu_exit_edges,
2461 #[cfg(feature = "gpu")]
2462 &payload_edges,
2463 0,
2464 0,
2465 "node".into(),
2466 &mut telem,
2467 BackpressureStrategy::None,
2468 &[],
2469 None,
2470 None,
2471 #[cfg(feature = "gpu")]
2472 None,
2473 #[cfg(feature = "gpu")]
2474 daedalus_planner::ComputeAffinity::CpuOnly,
2475 );
2476
2477 io.push_any(Some("out"), 1i32);
2478 io.push_any(Some("out"), 2i32); assert!(telem.backpressure_events > 0);
2480 }
2481
2482 #[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)]
2483 #[serde(rename_all = "snake_case")]
2484 enum TestEnum {
2485 Auto,
2486 Cpu,
2487 Gpu,
2488 }
2489
2490 impl TestEnum {
2491 fn from_label(raw: &str) -> Option<Self> {
2492 match raw.trim().to_ascii_lowercase().as_str() {
2493 "auto" => Some(Self::Auto),
2494 "cpu" => Some(Self::Cpu),
2495 "gpu" => Some(Self::Gpu),
2496 _ => None,
2497 }
2498 }
2499 }
2500
2501 fn register_test_enum() {
2502 use daedalus_data::model::Value;
2503 daedalus_data::typing::register_enum::<TestEnum>(["auto", "cpu", "gpu"]);
2504 register_const_coercer::<TestEnum, _>(|v| match v {
2505 Value::Int(_) => NodeIo::enum_name_from_index::<TestEnum>(v)
2506 .and_then(|name| TestEnum::from_label(&name)),
2507 Value::String(s) => TestEnum::from_label(s),
2508 Value::Enum(ev) => TestEnum::from_label(&ev.name),
2509 _ => None,
2510 });
2511 }
2512
2513 #[cfg(feature = "gpu")]
2514 #[derive(Clone, Debug, PartialEq)]
2515 struct DummyPayload {
2516 value: i32,
2517 }
2518
2519 #[cfg(feature = "gpu")]
2520 impl daedalus_gpu::GpuSendable for DummyPayload {
2521 type GpuRepr = ();
2522 }
2523
2524 #[cfg(feature = "gpu")]
2525 fn make_io_with_payload(payload: CorrelatedPayload) -> NodeIo<'static> {
2526 let queues: &'static Arc<Vec<EdgeStorage>> = Box::leak(Box::new(Arc::new(vec![])));
2527 let edges: &'static [EdgeInfo] = Box::leak(Box::new(Vec::new()));
2528 let warnings: &'static Arc<std::sync::Mutex<std::collections::HashSet<String>>> =
2529 Box::leak(Box::new(Arc::new(std::sync::Mutex::new(HashSet::new()))));
2530 let telem: &'static mut ExecutionTelemetry = Box::leak(Box::new(ExecutionTelemetry::default()));
2531 let gpu_entry_edges: &'static HashSet<usize> = Box::leak(Box::new(HashSet::new()));
2532 let gpu_exit_edges: &'static HashSet<usize> = Box::leak(Box::new(HashSet::new()));
2533 let payload_edges: &'static HashSet<usize> = Box::leak(Box::new(HashSet::new()));
2534
2535 let mut io = NodeIo::new(
2536 vec![],
2537 vec![],
2538 queues,
2539 warnings,
2540 edges,
2541 vec![],
2542 gpu_entry_edges,
2543 gpu_exit_edges,
2544 payload_edges,
2545 0,
2546 0,
2547 "node".into(),
2548 telem,
2549 BackpressureStrategy::None,
2550 &[],
2551 None,
2552 None,
2553 None,
2554 daedalus_planner::ComputeAffinity::CpuOnly,
2555 );
2556 io.inputs = vec![("in".to_string(), payload)];
2557 io
2558 }
2559
2560 #[cfg(feature = "gpu")]
2561 #[test]
2562 fn get_any_reads_payload_any_type() {
2563 let payload = CorrelatedPayload {
2564 correlation_id: 1,
2565 inner: EdgePayload::Payload(daedalus_gpu::ErasedPayload::from_cpu::<DummyPayload>(DummyPayload { value: 42 })),
2566 enqueued_at: std::time::Instant::now(),
2567 };
2568 let io = make_io_with_payload(payload);
2569 let got = io.get_any::<DummyPayload>("in");
2570 assert_eq!(got, Some(DummyPayload { value: 42 }));
2571 }
2572
2573 #[cfg(feature = "gpu")]
2574 #[test]
2575 fn get_typed_mut_moves_payload_any_type() {
2576 let payload = CorrelatedPayload {
2577 correlation_id: 1,
2578 inner: EdgePayload::Payload(daedalus_gpu::ErasedPayload::from_cpu::<DummyPayload>(DummyPayload { value: 7 })),
2579 enqueued_at: std::time::Instant::now(),
2580 };
2581 let mut io = make_io_with_payload(payload);
2582 let got = io.get_typed_mut::<DummyPayload>("in");
2583 assert_eq!(got, Some(DummyPayload { value: 7 }));
2584 assert!(io.inputs.is_empty());
2585 }
2586
2587 #[test]
2588 fn get_typed_parses_enum_from_value_enum() {
2589 use daedalus_data::model::{EnumValue, Value};
2590 register_test_enum();
2591 let queues = Arc::new(vec![]);
2592 let edges = vec![];
2593 let warnings = Arc::new(std::sync::Mutex::new(HashSet::new()));
2594 let mut telem = ExecutionTelemetry::default();
2595
2596 #[cfg(feature = "gpu")]
2597 let gpu_entry_edges = HashSet::new();
2598 #[cfg(feature = "gpu")]
2599 let gpu_exit_edges = HashSet::new();
2600
2601 assert_eq!(
2602 NodeIo::enum_name_from_index::<TestEnum>(&Value::Int(2)),
2603 Some("gpu".to_string())
2604 );
2605
2606 let io = NodeIo::new(
2607 vec![],
2608 vec![],
2609 &queues,
2610 &warnings,
2611 &edges,
2612 vec![],
2613 #[cfg(feature = "gpu")]
2614 &gpu_entry_edges,
2615 #[cfg(feature = "gpu")]
2616 &gpu_exit_edges,
2617 #[cfg(feature = "gpu")]
2618 &payload_edges,
2619 0,
2620 0,
2621 "node".into(),
2622 &mut telem,
2623 BackpressureStrategy::None,
2624 &[(
2625 "mode".to_string(),
2626 Value::Enum(EnumValue {
2627 name: "gpu".to_string(),
2628 value: None,
2629 }),
2630 )],
2631 None,
2632 None,
2633 #[cfg(feature = "gpu")]
2634 None,
2635 #[cfg(feature = "gpu")]
2636 daedalus_planner::ComputeAffinity::CpuOnly,
2637 );
2638
2639 assert_eq!(io.get_typed::<TestEnum>("mode"), Some(TestEnum::Gpu));
2640 }
2641
2642 #[test]
2643 fn get_typed_parses_enum_from_value_string() {
2644 use daedalus_data::model::Value;
2645 register_test_enum();
2646 let queues = Arc::new(vec![]);
2647 let edges = vec![];
2648 let warnings = Arc::new(std::sync::Mutex::new(HashSet::new()));
2649 let mut telem = ExecutionTelemetry::default();
2650
2651 #[cfg(feature = "gpu")]
2652 let gpu_entry_edges = HashSet::new();
2653 #[cfg(feature = "gpu")]
2654 let gpu_exit_edges = HashSet::new();
2655 #[cfg(feature = "gpu")]
2656 let payload_edges = HashSet::new();
2657
2658 let io = NodeIo::new(
2659 vec![],
2660 vec![],
2661 &queues,
2662 &warnings,
2663 &edges,
2664 vec![],
2665 #[cfg(feature = "gpu")]
2666 &gpu_entry_edges,
2667 #[cfg(feature = "gpu")]
2668 &gpu_exit_edges,
2669 #[cfg(feature = "gpu")]
2670 &payload_edges,
2671 0,
2672 0,
2673 "node".into(),
2674 &mut telem,
2675 BackpressureStrategy::None,
2676 &[("mode".to_string(), Value::String("cpu".into()))],
2677 None,
2678 None,
2679 #[cfg(feature = "gpu")]
2680 None,
2681 #[cfg(feature = "gpu")]
2682 daedalus_planner::ComputeAffinity::CpuOnly,
2683 );
2684
2685 assert_eq!(io.get_typed::<TestEnum>("mode"), Some(TestEnum::Cpu));
2686 }
2687
2688 #[test]
2689 fn get_typed_parses_enum_from_value_int_index() {
2690 use daedalus_data::model::Value;
2691 register_test_enum();
2692 assert!(daedalus_data::typing::lookup_type::<TestEnum>().is_some());
2693 let queues = Arc::new(vec![]);
2694 let edges = vec![];
2695 let warnings = Arc::new(std::sync::Mutex::new(HashSet::new()));
2696 let mut telem = ExecutionTelemetry::default();
2697
2698 #[cfg(feature = "gpu")]
2699 let gpu_entry_edges = HashSet::new();
2700 #[cfg(feature = "gpu")]
2701 let gpu_exit_edges = HashSet::new();
2702 #[cfg(feature = "gpu")]
2703 let payload_edges = HashSet::new();
2704
2705 let io = NodeIo::new(
2707 vec![],
2708 vec![],
2709 &queues,
2710 &warnings,
2711 &edges,
2712 vec![],
2713 #[cfg(feature = "gpu")]
2714 &gpu_entry_edges,
2715 #[cfg(feature = "gpu")]
2716 &gpu_exit_edges,
2717 #[cfg(feature = "gpu")]
2718 &payload_edges,
2719 0,
2720 0,
2721 "node".into(),
2722 &mut telem,
2723 BackpressureStrategy::None,
2724 &[("mode".to_string(), Value::Int(2))],
2725 None,
2726 None,
2727 #[cfg(feature = "gpu")]
2728 None,
2729 #[cfg(feature = "gpu")]
2730 daedalus_planner::ComputeAffinity::CpuOnly,
2731 );
2732
2733 assert_eq!(io.get_typed::<TestEnum>("mode"), Some(TestEnum::Gpu));
2734 }
2735}