1use daedalus_data::convert::{ConversionProvenance, ConversionResolution, ConverterId};
2use daedalus_data::model::{TypeExpr, Value, ValueType};
3use daedalus_registry::ids::NodeId;
4use daedalus_registry::store::NodeDescriptor;
5use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
6
7use crate::diagnostics::{Diagnostic, DiagnosticCode};
8use crate::graph::NodeInstance;
9use crate::graph::{ComputeAffinity, Edge, ExecutionPlan, Graph, NodeRef, PortRef};
10
11const DYNAMIC_INPUT_TYPES_KEY: &str = "dynamic_input_types";
12const DYNAMIC_OUTPUT_TYPES_KEY: &str = "dynamic_output_types";
13const EMBEDDED_GRAPH_KEY: &str = "daedalus.embedded_graph";
14const EMBEDDED_HOST_KEY: &str = "daedalus.embedded_host";
15const EMBEDDED_GROUP_KEY: &str = "daedalus.embedded_group";
16
17#[derive(Clone, Debug, Default)]
25pub struct PlannerConfig {
26 pub enable_gpu: bool,
27 pub enable_lints: bool,
28 pub active_features: Vec<String>,
29 #[cfg(feature = "gpu")]
30 pub gpu_caps: Option<daedalus_gpu::GpuCapabilities>,
31}
32
33#[derive(Clone, Debug)]
43pub struct PlannerInput<'a> {
44 pub graph: Graph,
45 pub registry: &'a daedalus_registry::store::Registry,
46}
47
48#[derive(Clone, Debug)]
56pub struct PlannerOutput {
57 pub plan: ExecutionPlan,
58 pub diagnostics: Vec<Diagnostic>,
59}
60
61fn is_host_bridge(node: &NodeInstance) -> bool {
62 matches!(
63 node.metadata.get("host_bridge"),
64 Some(daedalus_data::model::Value::Bool(true))
65 )
66}
67
68fn diagnostic_node_id(node: &NodeInstance) -> String {
69 const UI_NODE_ID_KEY: &str = "helios.ui.node_id";
70 if let Some(daedalus_data::model::Value::String(value)) = node.metadata.get(UI_NODE_ID_KEY) {
71 let trimmed = value.trim();
72 if !trimmed.is_empty() {
73 return trimmed.to_string();
74 }
75 }
76 if let Some(label) = node.label.as_deref() {
77 let trimmed = label.trim();
78 if !trimmed.is_empty() {
79 return trimmed.to_string();
80 }
81 }
82 node.id.0.clone()
83}
84
85fn expand_embedded_graphs(
86 input: &mut PlannerInput<'_>,
87 view: &daedalus_registry::store::RegistryView,
88 diags: &mut Vec<Diagnostic>,
89) {
90 let mut embedded_graphs: HashMap<usize, Graph> = HashMap::new();
91 for (idx, node) in input.graph.nodes.iter().enumerate() {
92 let Some(desc) = latest_node(view, &node.id) else {
93 continue;
94 };
95 let Some(Value::String(raw)) = desc.metadata.get(EMBEDDED_GRAPH_KEY) else {
96 continue;
97 };
98 let parsed: Result<Graph, _> = serde_json::from_str(raw.as_ref());
99 match parsed {
100 Ok(graph) => {
101 embedded_graphs.insert(idx, graph);
102 }
103 Err(err) => {
104 diags.push(
105 Diagnostic::new(
106 DiagnosticCode::NodeMissing,
107 format!("embedded graph parse failed: {err}"),
108 )
109 .in_pass("expand_embedded")
110 .at_node(diagnostic_node_id(node)),
111 );
112 }
113 }
114 }
115
116 if embedded_graphs.is_empty() {
117 return;
118 }
119
120 let mut connected_inputs: HashMap<usize, HashSet<String>> = HashMap::new();
121 for edge in &input.graph.edges {
122 if embedded_graphs.contains_key(&edge.to.node.0) {
123 connected_inputs
124 .entry(edge.to.node.0)
125 .or_default()
126 .insert(edge.to.port.clone());
127 }
128 }
129
130 #[derive(Clone, Debug)]
131 struct EmbeddedMap {
132 inputs: BTreeMap<String, Vec<PortRef>>,
133 outputs: BTreeMap<String, Vec<PortRef>>,
134 }
135
136 let mut new_nodes: Vec<NodeInstance> = Vec::new();
137 let mut embedded_internal_edges: Vec<Edge> = Vec::new();
138 let mut remap: Vec<Option<usize>> = vec![None; input.graph.nodes.len()];
139 let mut embedded_maps: HashMap<usize, EmbeddedMap> = HashMap::new();
140
141 for (idx, node) in input.graph.nodes.iter().enumerate() {
142 let Some(graph) = embedded_graphs.get(&idx) else {
143 let new_idx = new_nodes.len();
144 new_nodes.push(node.clone());
145 remap[idx] = Some(new_idx);
146 continue;
147 };
148
149 let host_index = graph.nodes.iter().position(is_host_bridge).or_else(|| {
150 let host_label = latest_node(view, &node.id)
151 .and_then(|desc| desc.metadata.get(EMBEDDED_HOST_KEY))
152 .and_then(|val| match val {
153 Value::String(s) => {
154 let trimmed = s.trim();
155 if trimmed.is_empty() {
156 None
157 } else {
158 Some(trimmed.to_string())
159 }
160 }
161 _ => None,
162 });
163 host_label.and_then(|label| {
164 graph
165 .nodes
166 .iter()
167 .position(|n| n.label.as_deref() == Some(label.as_str()))
168 })
169 });
170
171 let Some(host_index) = host_index else {
172 diags.push(
173 Diagnostic::new(
174 DiagnosticCode::NodeMissing,
175 "embedded graph missing host bridge".to_string(),
176 )
177 .in_pass("expand_embedded")
178 .at_node(diagnostic_node_id(node)),
179 );
180 let new_idx = new_nodes.len();
181 new_nodes.push(node.clone());
182 remap[idx] = Some(new_idx);
183 continue;
184 };
185
186 let group_label = node
187 .label
188 .clone()
189 .unwrap_or_else(|| node.id.0.clone());
190 let prefix = format!("{group_label}::");
191 let mut index_map: Vec<Option<usize>> = vec![None; graph.nodes.len()];
192
193 for (g_idx, g_node) in graph.nodes.iter().enumerate() {
194 if g_idx == host_index {
195 continue;
196 }
197 let mut cloned = g_node.clone();
198 let base_label = cloned
199 .label
200 .clone()
201 .unwrap_or_else(|| cloned.id.0.clone());
202 cloned.label = Some(format!("{prefix}{base_label}"));
203 cloned.metadata.insert(
204 EMBEDDED_GROUP_KEY.to_string(),
205 Value::String(std::borrow::Cow::from(group_label.clone())),
206 );
207 let new_idx = new_nodes.len();
208 new_nodes.push(cloned);
209 index_map[g_idx] = Some(new_idx);
210 }
211
212 let mut inputs: BTreeMap<String, Vec<PortRef>> = BTreeMap::new();
213 let mut outputs: BTreeMap<String, Vec<PortRef>> = BTreeMap::new();
214
215 for edge in &graph.edges {
216 let from_is_host = edge.from.node.0 == host_index;
217 let to_is_host = edge.to.node.0 == host_index;
218
219 match (from_is_host, to_is_host) {
220 (true, false) => {
221 if let Some(target_idx) = index_map[edge.to.node.0] {
222 inputs
223 .entry(edge.from.port.clone())
224 .or_default()
225 .push(PortRef {
226 node: NodeRef(target_idx),
227 port: edge.to.port.clone(),
228 });
229 }
230 }
231 (false, true) => {
232 if let Some(source_idx) = index_map[edge.from.node.0] {
233 outputs
234 .entry(edge.to.port.clone())
235 .or_default()
236 .push(PortRef {
237 node: NodeRef(source_idx),
238 port: edge.from.port.clone(),
239 });
240 }
241 }
242 (false, false) => {
243 let Some(from_idx) = index_map[edge.from.node.0] else {
244 continue;
245 };
246 let Some(to_idx) = index_map[edge.to.node.0] else {
247 continue;
248 };
249 embedded_internal_edges.push(Edge {
250 from: PortRef {
251 node: NodeRef(from_idx),
252 port: edge.from.port.clone(),
253 },
254 to: PortRef {
255 node: NodeRef(to_idx),
256 port: edge.to.port.clone(),
257 },
258 metadata: edge.metadata.clone(),
259 });
260 }
261 (true, true) => {}
262 }
263 }
264
265 embedded_maps.insert(
266 idx,
267 EmbeddedMap {
268 inputs,
269 outputs,
270 },
271 );
272 }
273
274 let mut new_edges: Vec<Edge> = Vec::new();
275 for edge in &input.graph.edges {
276 let from_map = embedded_maps.get(&edge.from.node.0);
277 let to_map = embedded_maps.get(&edge.to.node.0);
278
279 match (from_map, to_map) {
280 (None, None) => {
281 let Some(from_idx) = remap[edge.from.node.0] else {
282 continue;
283 };
284 let Some(to_idx) = remap[edge.to.node.0] else {
285 continue;
286 };
287 new_edges.push(Edge {
288 from: PortRef {
289 node: NodeRef(from_idx),
290 port: edge.from.port.clone(),
291 },
292 to: PortRef {
293 node: NodeRef(to_idx),
294 port: edge.to.port.clone(),
295 },
296 metadata: edge.metadata.clone(),
297 });
298 }
299 (None, Some(to)) => {
300 let Some(from_idx) = remap[edge.from.node.0] else {
301 continue;
302 };
303 if let Some(targets) = to.inputs.get(&edge.to.port) {
304 for target in targets {
305 new_edges.push(Edge {
306 from: PortRef {
307 node: NodeRef(from_idx),
308 port: edge.from.port.clone(),
309 },
310 to: target.clone(),
311 metadata: edge.metadata.clone(),
312 });
313 }
314 }
315 }
316 (Some(from), None) => {
317 let Some(to_idx) = remap[edge.to.node.0] else {
318 continue;
319 };
320 if let Some(sources) = from.outputs.get(&edge.from.port) {
321 for source in sources {
322 new_edges.push(Edge {
323 from: source.clone(),
324 to: PortRef {
325 node: NodeRef(to_idx),
326 port: edge.to.port.clone(),
327 },
328 metadata: edge.metadata.clone(),
329 });
330 }
331 }
332 }
333 (Some(from), Some(to)) => {
334 let sources = from.outputs.get(&edge.from.port);
335 let targets = to.inputs.get(&edge.to.port);
336 if let (Some(sources), Some(targets)) = (sources, targets) {
337 for source in sources {
338 for target in targets {
339 new_edges.push(Edge {
340 from: source.clone(),
341 to: target.clone(),
342 metadata: edge.metadata.clone(),
343 });
344 }
345 }
346 }
347 }
348 }
349 }
350
351 for (idx, node) in input.graph.nodes.iter().enumerate() {
353 let Some(map) = embedded_maps.get(&idx) else {
354 continue;
355 };
356 let connected = connected_inputs.get(&idx);
357 for (port, value) in &node.const_inputs {
358 if connected
359 .map(|set| set.contains(port))
360 .unwrap_or(false)
361 {
362 continue;
363 }
364 if let Some(targets) = map.inputs.get(port) {
365 for target in targets {
366 if let Some(inner) = new_nodes.get_mut(target.node.0) {
367 inner.const_inputs.retain(|(name, _)| name != &target.port);
368 inner.const_inputs.push((target.port.clone(), value.clone()));
369 }
370 }
371 }
372 }
373 }
374
375 new_edges.extend(embedded_internal_edges);
376
377 input.graph.nodes = new_nodes;
378 input.graph.edges = new_edges;
379}
380
381fn apply_descriptor_defaults(graph: &mut Graph, view: &daedalus_registry::store::RegistryView) {
382 for node in &mut graph.nodes {
383 let Some(desc) = latest_node(view, &node.id) else {
384 continue;
385 };
386 for port in &desc.inputs {
387 let Some(value) = &port.const_value else {
388 continue;
389 };
390 if node.const_inputs.iter().any(|(name, _)| name == &port.name) {
391 continue;
392 }
393 node.const_inputs.push((port.name.clone(), value.clone()));
394 }
395 }
396}
397
398pub fn build_plan(mut input: PlannerInput<'_>, config: PlannerConfig) -> PlannerOutput {
410 let mut diags = Vec::new();
411 let view = input.registry.view();
412
413 for node in &mut input.graph.nodes {
416 node.metadata.remove(DYNAMIC_INPUT_TYPES_KEY);
417 node.metadata.remove(DYNAMIC_OUTPUT_TYPES_KEY);
418 node.metadata.remove("dynamic_inputs");
419 node.metadata.remove("dynamic_outputs");
420 }
421
422 expand_embedded_graphs(&mut input, &view, &mut diags);
424 apply_descriptor_defaults(&mut input.graph, &view);
425 hydrate_registry(&input, &view, &mut diags);
426 typecheck(&mut input.graph, &view, &mut diags);
427 convert(&mut input.graph, input.registry, &view, &mut diags, &config);
428 align(&mut input.graph, &mut diags);
429 gpu(&mut input.graph, &config, &mut diags);
430 schedule(&mut input.graph, &mut diags);
431 if config.enable_lints {
432 lint(&input, &mut diags);
433 }
434
435 let plan = ExecutionPlan::new(input.graph.clone(), diags.clone());
436 PlannerOutput {
437 plan,
438 diagnostics: diags,
439 }
440}
441
442fn latest_node<'a>(
443 view: &'a daedalus_registry::store::RegistryView,
444 id: &NodeId,
445) -> Option<&'a NodeDescriptor> {
446 view.nodes.get(id)
447}
448
449fn hydrate_registry(
450 input: &PlannerInput<'_>,
451 view: &daedalus_registry::store::RegistryView,
452 diags: &mut Vec<Diagnostic>,
453) {
454 for node in &input.graph.nodes {
455 if latest_node(view, &node.id).is_none() {
456 diags.push(
457 Diagnostic::new(
458 DiagnosticCode::NodeMissing,
459 format!("node {} not found in registry", node.id.0),
460 )
461 .in_pass("hydrate_registry")
462 .at_node(diagnostic_node_id(node)),
463 );
464 }
465 }
466}
467
468fn typecheck(
469 graph: &mut Graph,
470 view: &daedalus_registry::store::RegistryView,
471 diags: &mut Vec<Diagnostic>,
472) {
473 #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
474 struct TypeVarKey {
475 node: usize,
476 is_input: bool,
477 port: String,
478 }
479
480 #[derive(Clone, Debug)]
481 struct Dsu {
482 parent: Vec<usize>,
483 rank: Vec<u8>,
484 binding: Vec<Option<TypeExpr>>,
485 }
486
487 impl Dsu {
488 fn new() -> Self {
489 Self {
490 parent: Vec::new(),
491 rank: Vec::new(),
492 binding: Vec::new(),
493 }
494 }
495
496 fn make_set(&mut self) -> usize {
497 let id = self.parent.len();
498 self.parent.push(id);
499 self.rank.push(0);
500 self.binding.push(None);
501 id
502 }
503
504 fn find(&mut self, x: usize) -> usize {
505 if self.parent[x] != x {
506 let p = self.parent[x];
507 self.parent[x] = self.find(p);
508 }
509 self.parent[x]
510 }
511
512 fn union(&mut self, a: usize, b: usize) -> Result<usize, (TypeExpr, TypeExpr)> {
513 let mut ra = self.find(a);
514 let mut rb = self.find(b);
515 if ra == rb {
516 return Ok(ra);
517 }
518 if self.rank[ra] < self.rank[rb] {
519 std::mem::swap(&mut ra, &mut rb);
520 }
521 self.parent[rb] = ra;
522 if self.rank[ra] == self.rank[rb] {
523 self.rank[ra] = self.rank[ra].saturating_add(1);
524 }
525
526 match (&self.binding[ra], &self.binding[rb]) {
527 (Some(a), Some(b)) if a != b => return Err((a.clone(), b.clone())),
528 (None, Some(b)) => self.binding[ra] = Some(b.clone()),
529 _ => {}
530 }
531 Ok(ra)
532 }
533
534 fn bind(&mut self, var: usize, ty: TypeExpr) -> Result<(), (TypeExpr, TypeExpr)> {
535 let r = self.find(var);
536 if let Some(existing) = &self.binding[r] {
537 if existing != &ty {
538 return Err((existing.clone(), ty));
539 }
540 return Ok(());
541 }
542 self.binding[r] = Some(ty);
543 Ok(())
544 }
545
546 fn bound_type(&mut self, var: usize) -> Option<TypeExpr> {
547 let r = self.find(var);
548 self.binding[r].clone()
549 }
550 }
551
552 fn is_generic_marker(ty: &TypeExpr) -> bool {
553 matches!(ty, TypeExpr::Opaque(value) if value.eq_ignore_ascii_case("generic"))
554 }
555
556 fn upsert_string_map(meta: &mut BTreeMap<String, Value>, key: &str, port: &str, value: String) {
557 let entry = meta
558 .entry(key.to_string())
559 .or_insert_with(|| Value::Map(Vec::new()));
560 if !matches!(entry, Value::Map(_)) {
561 *entry = Value::Map(Vec::new());
562 }
563 let Value::Map(entries) = entry else { return };
564 let port_lc = port.to_ascii_lowercase();
565 let mut replaced = false;
566 for (k, v) in entries.iter_mut() {
567 if matches!(k, Value::String(s) if s.eq_ignore_ascii_case(&port_lc)) {
568 *v = Value::String(std::borrow::Cow::Owned(value.clone()));
569 replaced = true;
570 break;
571 }
572 }
573 if !replaced {
574 entries.push((
575 Value::String(std::borrow::Cow::Owned(port_lc)),
576 Value::String(std::borrow::Cow::Owned(value)),
577 ));
578 }
579 entries.sort_by(|(ak, _), (bk, _)| {
580 let a = match ak {
581 Value::String(s) => s.as_ref(),
582 _ => "",
583 };
584 let b = match bk {
585 Value::String(s) => s.as_ref(),
586 _ => "",
587 };
588 a.cmp(b)
589 });
590 }
591
592 let mut vars: BTreeMap<TypeVarKey, usize> = BTreeMap::new();
593 let mut dsu = Dsu::new();
594
595 for edge in &graph.edges {
596 let from_node = match graph.nodes.get(edge.from.node.0) {
597 Some(n) => n,
598 None => continue,
599 };
600 let to_node = match graph.nodes.get(edge.to.node.0) {
601 Some(n) => n,
602 None => continue,
603 };
604 let from_desc = latest_node(view, &from_node.id);
605 let to_desc = latest_node(view, &to_node.id);
606
607 let from_ty = from_desc.and_then(|d| port_type(from_node, d, &edge.from.port, false));
608 let to_ty = to_desc.and_then(|d| port_type(to_node, d, &edge.to.port, true));
609
610 if from_desc.is_none() {
611 diags.push(
612 Diagnostic::new(
613 DiagnosticCode::NodeMissing,
614 format!("node {} not found in registry", from_node.id.0),
615 )
616 .in_pass("typecheck")
617 .at_node(diagnostic_node_id(from_node)),
618 );
619 continue;
620 }
621 if to_desc.is_none() {
622 diags.push(
623 Diagnostic::new(
624 DiagnosticCode::NodeMissing,
625 format!("node {} not found in registry", to_node.id.0),
626 )
627 .in_pass("typecheck")
628 .at_node(diagnostic_node_id(to_node)),
629 );
630 continue;
631 }
632
633 if from_ty.is_none() {
634 diags.push(
635 Diagnostic::new(
636 DiagnosticCode::PortMissing,
637 format!(
638 "output port `{}` not found on node {}",
639 edge.from.port, from_node.id.0
640 ),
641 )
642 .in_pass("typecheck")
643 .at_node(diagnostic_node_id(from_node))
644 .at_port(edge.from.port.clone()),
645 );
646 }
647 if to_ty.is_none() {
648 diags.push(
649 Diagnostic::new(
650 DiagnosticCode::PortMissing,
651 format!(
652 "input port `{}` not found on node {}",
653 edge.to.port, to_node.id.0
654 ),
655 )
656 .in_pass("typecheck")
657 .at_node(diagnostic_node_id(to_node))
658 .at_port(edge.to.port.clone()),
659 );
660 }
661
662 let (Some(from_ty), Some(to_ty)) = (from_ty, to_ty) else {
663 continue;
664 };
665
666 let from_term = if is_generic_marker(&from_ty) {
668 let key = TypeVarKey {
669 node: edge.from.node.0,
670 is_input: false,
671 port: edge.from.port.clone(),
672 };
673 let id = *vars.entry(key).or_insert_with(|| dsu.make_set());
674 Some(id)
675 } else {
676 None
677 };
678 let to_term = if is_generic_marker(&to_ty) {
679 let key = TypeVarKey {
680 node: edge.to.node.0,
681 is_input: true,
682 port: edge.to.port.clone(),
683 };
684 let id = *vars.entry(key).or_insert_with(|| dsu.make_set());
685 Some(id)
686 } else {
687 None
688 };
689
690 let conflict = match (from_term, to_term) {
691 (Some(var), None) => dsu.bind(var, to_ty.clone()).err(),
692 (None, Some(var)) => dsu.bind(var, from_ty.clone()).err(),
693 (Some(a), Some(b)) => dsu.union(a, b).err(),
694 (None, None) => None,
695 };
696
697 if let Some((a, b)) = conflict {
698 let host = if is_generic_marker(&from_ty) {
699 from_node
700 } else {
701 to_node
702 };
703 let port = if is_generic_marker(&from_ty) {
704 edge.from.port.clone()
705 } else {
706 edge.to.port.clone()
707 };
708 diags.push(
709 Diagnostic::new(
710 DiagnosticCode::TypeMismatch,
711 format!(
712 "generic port `{}` inferred conflicting types: {:?} vs {:?} (edge {}.{} -> {}.{})",
713 port,
714 a,
715 b,
716 from_node.id.0,
717 edge.from.port,
718 to_node.id.0,
719 edge.to.port
720 ),
721 )
722 .in_pass("typecheck")
723 .at_node(diagnostic_node_id(host))
724 .at_port(port),
725 );
726 }
727 }
728
729 for (key, var) in vars {
731 let Some(ty) = dsu.bound_type(var) else {
732 continue;
733 };
734 let Some(node) = graph.nodes.get_mut(key.node) else {
735 continue;
736 };
737 let json = match serde_json::to_string(&ty) {
738 Ok(v) => v,
739 Err(_) => continue,
740 };
741 let meta_key = if key.is_input {
742 DYNAMIC_INPUT_TYPES_KEY
743 } else {
744 DYNAMIC_OUTPUT_TYPES_KEY
745 };
746 upsert_string_map(&mut node.metadata, meta_key, &key.port, json);
747 }
748}
749
750fn convert(
751 graph: &mut Graph,
752 registry: &daedalus_registry::store::Registry,
753 view: &daedalus_registry::store::RegistryView,
754 diags: &mut Vec<Diagnostic>,
755 config: &PlannerConfig,
756) {
757 fn collect_structural_steps(
758 registry: &daedalus_registry::store::Registry,
759 from: &TypeExpr,
760 to: &TypeExpr,
761 active_features: &[String],
762 allow_gpu: bool,
763 steps: &mut BTreeSet<ConverterId>,
764 depth: usize,
765 ) -> bool {
766 if from == to {
767 return true;
768 }
769 if depth > 64 {
770 return false;
771 }
772 if let Ok(res) =
773 registry.resolve_converter_with_context(from, to, active_features, allow_gpu)
774 {
775 for step in res.provenance.steps {
776 steps.insert(step);
777 }
778 return true;
779 }
780 match (from, to) {
781 (
782 TypeExpr::Scalar(ValueType::I32 | ValueType::U32),
783 TypeExpr::Scalar(ValueType::Int),
784 )
785 | (
786 TypeExpr::Scalar(ValueType::Int),
787 TypeExpr::Scalar(ValueType::I32 | ValueType::U32),
788 )
789 | (TypeExpr::Scalar(ValueType::F32), TypeExpr::Scalar(ValueType::Float))
790 | (TypeExpr::Scalar(ValueType::Float), TypeExpr::Scalar(ValueType::F32)) => true,
791 (TypeExpr::Optional(a), TypeExpr::Optional(b)) => collect_structural_steps(
792 registry,
793 a,
794 b,
795 active_features,
796 allow_gpu,
797 steps,
798 depth + 1,
799 ),
800 (TypeExpr::List(a), TypeExpr::List(b)) => collect_structural_steps(
801 registry,
802 a,
803 b,
804 active_features,
805 allow_gpu,
806 steps,
807 depth + 1,
808 ),
809 (TypeExpr::Map(ak, av), TypeExpr::Map(bk, bv)) => {
810 collect_structural_steps(
811 registry,
812 ak,
813 bk,
814 active_features,
815 allow_gpu,
816 steps,
817 depth + 1,
818 ) && collect_structural_steps(
819 registry,
820 av,
821 bv,
822 active_features,
823 allow_gpu,
824 steps,
825 depth + 1,
826 )
827 }
828 (TypeExpr::Tuple(a), TypeExpr::Tuple(b)) => {
829 if a.len() != b.len() {
830 return false;
831 }
832 a.iter().zip(b.iter()).all(|(ai, bi)| {
833 collect_structural_steps(
834 registry,
835 ai,
836 bi,
837 active_features,
838 allow_gpu,
839 steps,
840 depth + 1,
841 )
842 })
843 }
844 (TypeExpr::Struct(a_fields), TypeExpr::Struct(b_fields)) => {
845 if a_fields.len() != b_fields.len() {
846 return false;
847 }
848 for bf in b_fields {
849 let Some(af) = a_fields.iter().find(|af| af.name == bf.name) else {
850 return false;
851 };
852 if !collect_structural_steps(
853 registry,
854 &af.ty,
855 &bf.ty,
856 active_features,
857 allow_gpu,
858 steps,
859 depth + 1,
860 ) {
861 return false;
862 }
863 }
864 true
865 }
866 (TypeExpr::Enum(a_vars), TypeExpr::Enum(b_vars)) => {
867 if a_vars.len() != b_vars.len() {
868 return false;
869 }
870 for bv in b_vars {
871 let Some(av) = a_vars.iter().find(|av| av.name == bv.name) else {
872 return false;
873 };
874 match (&av.ty, &bv.ty) {
875 (None, None) => {}
876 (Some(at), Some(bt)) => {
877 if !collect_structural_steps(
878 registry,
879 at,
880 bt,
881 active_features,
882 allow_gpu,
883 steps,
884 depth + 1,
885 ) {
886 return false;
887 }
888 }
889 _ => return false,
890 }
891 }
892 true
893 }
894 _ => false,
895 }
896 }
897
898 fn resolve_structural(
899 registry: &daedalus_registry::store::Registry,
900 from: &TypeExpr,
901 to: &TypeExpr,
902 active_features: &[String],
903 allow_gpu: bool,
904 ) -> Option<Vec<ConverterId>> {
905 let mut steps = BTreeSet::new();
906 if !collect_structural_steps(
907 registry,
908 from,
909 to,
910 active_features,
911 allow_gpu,
912 &mut steps,
913 0,
914 ) {
915 return None;
916 }
917 Some(steps.into_iter().collect())
918 }
919
920 let mut converter_metadata: Vec<(String, String)> = Vec::new();
921 for edge in &graph.edges {
922 let from_node = match graph.nodes.get(edge.from.node.0) {
923 Some(n) => n,
924 None => continue,
925 };
926 let to_node = match graph.nodes.get(edge.to.node.0) {
927 Some(n) => n,
928 None => continue,
929 };
930 let from_desc = latest_node(view, &from_node.id);
931 let to_desc = latest_node(view, &to_node.id);
932 let from_ty = from_desc.and_then(|d| port_type(from_node, d, &edge.from.port, false));
933 let to_ty = to_desc.and_then(|d| port_type(to_node, d, &edge.to.port, true));
934 let (Some(out_ty), Some(in_ty)) = (from_ty, to_ty) else {
935 continue;
936 };
937 if out_ty == in_ty {
938 continue;
939 }
940 let allow_gpu = config.enable_gpu;
941 let features: Vec<String> = config.active_features.clone();
942 let result: Result<ConversionResolution, ()> =
943 resolve_structural(registry, &out_ty, &in_ty, &features, allow_gpu)
944 .map(|steps| ConversionResolution {
945 provenance: ConversionProvenance {
946 steps,
947 total_cost: 0,
948 skipped_cycles: vec![],
949 skipped_gpu: vec![],
950 skipped_features: vec![],
951 },
952 })
953 .ok_or(());
954 match result {
955 Ok(res) => {
956 let mut feats = features.clone();
957 feats.sort();
958 let feats_str = if feats.is_empty() {
959 "none".to_string()
960 } else {
961 feats.join(",")
962 };
963 let key = format!(
964 "converter:{}->{}:{}->{}",
965 from_node.id.0, to_node.id.0, edge.from.port, edge.to.port
966 );
967 let mut path: Vec<String> =
968 res.provenance.steps.iter().map(|c| c.0.clone()).collect();
969 if path.is_empty() {
970 path.push("identity".into());
971 }
972 let value = format!(
973 "cost={};path={};features={};gpu={};skipped_gpu={:?};skipped_features={:?}",
974 res.provenance.total_cost,
975 path.join(">"),
976 feats_str,
977 allow_gpu,
978 res.provenance.skipped_gpu,
979 res.provenance.skipped_features
980 );
981 converter_metadata.push((key, value));
982 }
983 Err(_) => {
984 let mut feats = features.clone();
985 feats.sort();
986 let feats_str = if feats.is_empty() {
987 "none".to_string()
988 } else {
989 feats.join(",")
990 };
991 diags.push(
992 Diagnostic::new(
993 DiagnosticCode::ConverterMissing,
994 format!(
995 "no converter from {:?} to {:?} for edge {}.{} -> {}.{} [features: {}; gpu: {}]",
996 out_ty,
997 in_ty,
998 from_node.id.0,
999 edge.from.port,
1000 to_node.id.0,
1001 edge.to.port,
1002 feats_str,
1003 allow_gpu
1004 ),
1005 )
1006 .in_pass("convert")
1007 .at_node(diagnostic_node_id(to_node))
1008 .at_port(edge.to.port.clone()),
1009 );
1010 }
1011 }
1012 }
1013 converter_metadata.sort_by(|a, b| a.0.cmp(&b.0));
1014 for (k, v) in converter_metadata {
1015 graph.metadata.insert(k, v);
1016 }
1017}
1018fn align(graph: &mut Graph, diags: &mut Vec<Diagnostic>) {
1019 let n = graph.nodes.len();
1021 let mut indegree = vec![0usize; n];
1022 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); n];
1023 for edge in &graph.edges {
1024 if edge.from.node.0 < n
1025 && edge.to.node.0 < n
1026 && !is_host_bridge(&graph.nodes[edge.from.node.0])
1027 && !is_host_bridge(&graph.nodes[edge.to.node.0])
1028 {
1029 adj[edge.from.node.0].push(edge.to.node.0);
1030 indegree[edge.to.node.0] += 1;
1031 }
1032 }
1033 let mut queue: Vec<usize> = indegree
1034 .iter()
1035 .enumerate()
1036 .filter(|(_, d)| **d == 0)
1037 .map(|(i, _)| i)
1038 .collect();
1039 let mut order = Vec::new();
1040 while let Some(v) = queue.pop() {
1041 order.push(v);
1042 for &nxt in &adj[v] {
1043 indegree[nxt] -= 1;
1044 if indegree[nxt] == 0 {
1045 queue.push(nxt);
1046 }
1047 }
1048 }
1049 if order.len() != n {
1050 let mut cyc_nodes: Vec<String> = indegree
1052 .iter()
1053 .enumerate()
1054 .filter(|(_, d)| **d > 0)
1055 .map(|(i, _)| graph.nodes[i].id.0.clone())
1056 .collect();
1057 cyc_nodes.sort();
1058 diags.push(
1059 Diagnostic::new(
1060 DiagnosticCode::ScheduleConflict,
1061 format!(
1062 "graph contains a cycle involving nodes: {}",
1063 cyc_nodes.join(",")
1064 ),
1065 )
1066 .in_pass("align"),
1067 );
1068 } else {
1069 let value = order
1070 .iter()
1071 .map(|&idx| graph.nodes[idx].id.0.clone())
1072 .collect::<Vec<_>>()
1073 .join(",");
1074 graph.metadata.insert("topo_order".into(), value);
1075 }
1076}
1077fn gpu(graph: &mut Graph, config: &PlannerConfig, diags: &mut Vec<Diagnostic>) {
1078 let mut gpu_reasons: Vec<String> = Vec::new();
1079 if !config.enable_gpu {
1081 gpu_reasons.push("gpu-disabled".into());
1082 let mut gpu_nodes: Vec<String> = Vec::new();
1083 for node in &graph.nodes {
1084 if matches!(node.compute, ComputeAffinity::GpuRequired) {
1085 gpu_nodes.push(node.id.0.clone());
1086 diags.push(
1087 Diagnostic::new(
1088 DiagnosticCode::GpuUnsupported,
1089 format!("node {} requires GPU but GPU is disabled", node.id.0),
1090 )
1091 .in_pass("gpu")
1092 .at_node(diagnostic_node_id(node)),
1093 );
1094 }
1095 }
1096 if !gpu_nodes.is_empty() {
1097 graph
1098 .metadata
1099 .insert("gpu_segments".into(), gpu_nodes.join(","));
1100 graph
1101 .metadata
1102 .insert("gpu_why".into(), gpu_reasons.join(","));
1103 }
1104 return;
1105 }
1106
1107 #[cfg(feature = "gpu")]
1109 if let Some(caps) = &config.gpu_caps {
1110 let require_format = daedalus_gpu::GpuFormat::Rgba8Unorm;
1111 let mut ok = true;
1112 let has_format = caps
1113 .format_features
1114 .iter()
1115 .find(|f| f.format == require_format && f.sampleable);
1116 if caps.queue_count == 0 || !caps.has_transfer_queue {
1117 ok = false;
1118 }
1119 if has_format.is_none() {
1120 ok = false;
1121 }
1122 if !ok {
1123 gpu_reasons.push(format!(
1124 "insufficient-caps:queues={} transfer={} format_sampleable={}",
1125 caps.queue_count,
1126 caps.has_transfer_queue,
1127 has_format.is_some()
1128 ));
1129 for node in &graph.nodes {
1130 if matches!(
1131 node.compute,
1132 ComputeAffinity::GpuRequired | ComputeAffinity::GpuPreferred
1133 ) {
1134 diags.push(
1135 Diagnostic::new(
1136 DiagnosticCode::GpuUnsupported,
1137 format!(
1138 "node {} cannot run on GPU: insufficient caps (queues={}, transfer={}, format={:?} sampleable={})",
1139 node.id.0,
1140 caps.queue_count,
1141 caps.has_transfer_queue,
1142 require_format,
1143 has_format.is_some()
1144 ),
1145 )
1146 .in_pass("gpu")
1147 .at_node(diagnostic_node_id(node)),
1148 );
1149 }
1150 }
1151 }
1152 }
1153
1154 let mut segments: Vec<Vec<String>> = Vec::new();
1156 let mut current: Vec<String> = Vec::new();
1157 for node in &graph.nodes {
1158 match node.compute {
1159 ComputeAffinity::GpuPreferred | ComputeAffinity::GpuRequired => {
1160 current.push(node.id.0.clone());
1161 }
1162 ComputeAffinity::CpuOnly => {
1163 if !current.is_empty() {
1164 segments.push(current);
1165 current = Vec::new();
1166 }
1167 }
1168 }
1169 }
1170 if !current.is_empty() {
1171 segments.push(current);
1172 }
1173 if !segments.is_empty() {
1174 let seg_strs: Vec<String> = segments.into_iter().map(|seg| seg.join("->")).collect();
1175 graph
1176 .metadata
1177 .insert("gpu_segments".into(), seg_strs.join("|"));
1178 }
1179 if !gpu_reasons.is_empty() {
1180 gpu_reasons.sort();
1181 gpu_reasons.dedup();
1182 graph
1183 .metadata
1184 .insert("gpu_why".into(), gpu_reasons.join(";"));
1185 }
1186}
1187fn schedule(graph: &mut Graph, _diags: &mut Vec<Diagnostic>) {
1188 let order = graph
1190 .metadata
1191 .get("topo_order")
1192 .cloned()
1193 .unwrap_or_else(|| {
1194 graph
1195 .nodes
1196 .iter()
1197 .map(|n| n.id.0.clone())
1198 .collect::<Vec<_>>()
1199 .join(",")
1200 });
1201 graph.metadata.insert("schedule_order".into(), order);
1202
1203 let mut priorities: Vec<(String, u8)> = graph
1205 .nodes
1206 .iter()
1207 .map(|n| {
1208 let p = match n.compute {
1209 ComputeAffinity::GpuPreferred => 1,
1210 ComputeAffinity::GpuRequired | ComputeAffinity::CpuOnly => 2,
1211 };
1212 (n.id.0.clone(), p)
1213 })
1214 .collect();
1215 priorities.sort_by(|a, b| a.1.cmp(&b.1).then_with(|| a.0.cmp(&b.0)));
1216 let pr_str = priorities
1217 .into_iter()
1218 .map(|(id, p)| format!("{id}:{p}"))
1219 .collect::<Vec<_>>()
1220 .join(",");
1221 graph.metadata.insert("schedule_priority".into(), pr_str);
1222}
1223fn lint(input: &PlannerInput<'_>, diags: &mut Vec<Diagnostic>) {
1224 let n = input.graph.nodes.len();
1225 let mut incoming: Vec<usize> = vec![0; n];
1226 let mut outgoing: Vec<usize> = vec![0; n];
1227 for e in &input.graph.edges {
1228 if e.from.node.0 < n {
1229 outgoing[e.from.node.0] += 1;
1230 }
1231 if e.to.node.0 < n {
1232 incoming[e.to.node.0] += 1;
1233 }
1234 }
1235
1236 for (idx, node) in input.graph.nodes.iter().enumerate() {
1237 if incoming[idx] == 0 && !node.inputs.is_empty() {
1238 diags.push(
1239 Diagnostic::new(
1240 DiagnosticCode::LintWarning,
1241 format!(
1242 "node {} has unconnected inputs: {}",
1243 node.id.0,
1244 node.inputs.join(",")
1245 ),
1246 )
1247 .in_pass("lint")
1248 .at_node(diagnostic_node_id(node)),
1249 );
1250 }
1251 if outgoing[idx] == 0 && !node.outputs.is_empty() {
1252 diags.push(
1253 Diagnostic::new(
1254 DiagnosticCode::LintWarning,
1255 format!(
1256 "node {} has unused outputs: {}",
1257 node.id.0,
1258 node.outputs.join(",")
1259 ),
1260 )
1261 .in_pass("lint")
1262 .at_node(diagnostic_node_id(node)),
1263 );
1264 }
1265 }
1266}
1267
1268fn port_type(
1269 node: &NodeInstance,
1270 desc: &NodeDescriptor,
1271 name: &str,
1272 is_input: bool,
1273) -> Option<TypeExpr> {
1274 fn is_generic_marker(ty: &TypeExpr) -> bool {
1275 matches!(ty, TypeExpr::Opaque(value) if value.eq_ignore_ascii_case("generic"))
1276 }
1277
1278 fn resolve_override(
1279 meta: &std::collections::BTreeMap<String, Value>,
1280 key: &str,
1281 port: &str,
1282 ) -> Option<TypeExpr> {
1283 let Value::Map(entries) = meta.get(key)? else {
1284 return None;
1285 };
1286 let port_lc = port.to_ascii_lowercase();
1287 let (_, value) = entries
1288 .iter()
1289 .find(|(k, _)| matches!(k, Value::String(s) if s.eq_ignore_ascii_case(&port_lc)))?;
1290 let Value::String(json) = value else {
1291 return None;
1292 };
1293 serde_json::from_str::<TypeExpr>(json).ok()
1294 }
1295
1296 if is_input {
1297 if let Some(ty) = desc.input_ty_for(name) {
1298 if is_generic_marker(ty)
1299 && let Some(solved) =
1300 resolve_override(&node.metadata, DYNAMIC_INPUT_TYPES_KEY, name)
1301 {
1302 return Some(solved);
1303 }
1304 return Some(ty.clone());
1305 }
1306 if let Some(ty) = resolve_override(&node.metadata, DYNAMIC_INPUT_TYPES_KEY, name) {
1307 return Some(ty);
1308 }
1309 } else if let Some(port) = desc.outputs.iter().find(|p| p.name == name) {
1310 if is_generic_marker(&port.ty)
1311 && let Some(solved) = resolve_override(&node.metadata, DYNAMIC_OUTPUT_TYPES_KEY, name)
1312 {
1313 return Some(solved);
1314 }
1315 return Some(port.ty.clone());
1316 } else if let Some(ty) = resolve_override(&node.metadata, DYNAMIC_OUTPUT_TYPES_KEY, name) {
1317 return Some(ty);
1318 }
1319 let key = if is_input {
1320 "dynamic_inputs"
1321 } else {
1322 "dynamic_outputs"
1323 };
1324 let resolve_meta = |meta: &std::collections::BTreeMap<String, Value>| match meta.get(key) {
1325 Some(Value::String(value)) if !value.trim().is_empty() => {
1326 Some(TypeExpr::Opaque(value.trim().to_string()))
1327 }
1328 _ => None,
1329 };
1330 resolve_meta(&desc.metadata)
1333}