daedalus_planner/passes/
mod.rs

1use daedalus_data::convert::{ConversionProvenance, ConversionResolution, ConverterId};
2use daedalus_data::model::{TypeExpr, Value, ValueType};
3use daedalus_registry::ids::NodeId;
4use daedalus_registry::store::NodeDescriptor;
5use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
6
7use crate::diagnostics::{Diagnostic, DiagnosticCode};
8use crate::graph::NodeInstance;
9use crate::graph::{ComputeAffinity, Edge, ExecutionPlan, Graph, NodeRef, PortRef};
10
11const DYNAMIC_INPUT_TYPES_KEY: &str = "dynamic_input_types";
12const DYNAMIC_OUTPUT_TYPES_KEY: &str = "dynamic_output_types";
13const EMBEDDED_GRAPH_KEY: &str = "daedalus.embedded_graph";
14const EMBEDDED_HOST_KEY: &str = "daedalus.embedded_host";
15const EMBEDDED_GROUP_KEY: &str = "daedalus.embedded_group";
16
17/// Static planner config controlling optional passes.
18///
19/// ```
20/// use daedalus_planner::PlannerConfig;
21/// let cfg = PlannerConfig::default();
22/// assert!(!cfg.enable_gpu);
23/// ```
24#[derive(Clone, Debug, Default)]
25pub struct PlannerConfig {
26    pub enable_gpu: bool,
27    pub enable_lints: bool,
28    pub active_features: Vec<String>,
29    #[cfg(feature = "gpu")]
30    pub gpu_caps: Option<daedalus_gpu::GpuCapabilities>,
31}
32
33/// Input to the planner: a graph plus registry reference.
34///
35/// ```
36/// use daedalus_planner::{PlannerInput, Graph};
37/// use daedalus_registry::store::Registry;
38/// let registry = Registry::new();
39/// let input = PlannerInput { graph: Graph::default(), registry: &registry };
40/// assert_eq!(input.graph.nodes.len(), 0);
41/// ```
42#[derive(Clone, Debug)]
43pub struct PlannerInput<'a> {
44    pub graph: Graph,
45    pub registry: &'a daedalus_registry::store::Registry,
46}
47
48/// Planner output: final plan and any diagnostics.
49///
50/// ```
51/// use daedalus_planner::{PlannerOutput, ExecutionPlan, Graph};
52/// let out = PlannerOutput { plan: ExecutionPlan::new(Graph::default(), vec![]), diagnostics: vec![] };
53/// assert!(out.diagnostics.is_empty());
54/// ```
55#[derive(Clone, Debug)]
56pub struct PlannerOutput {
57    pub plan: ExecutionPlan,
58    pub diagnostics: Vec<Diagnostic>,
59}
60
61fn is_host_bridge(node: &NodeInstance) -> bool {
62    matches!(
63        node.metadata.get("host_bridge"),
64        Some(daedalus_data::model::Value::Bool(true))
65    )
66}
67
68fn diagnostic_node_id(node: &NodeInstance) -> String {
69    const UI_NODE_ID_KEY: &str = "helios.ui.node_id";
70    if let Some(daedalus_data::model::Value::String(value)) = node.metadata.get(UI_NODE_ID_KEY) {
71        let trimmed = value.trim();
72        if !trimmed.is_empty() {
73            return trimmed.to_string();
74        }
75    }
76    if let Some(label) = node.label.as_deref() {
77        let trimmed = label.trim();
78        if !trimmed.is_empty() {
79            return trimmed.to_string();
80        }
81    }
82    node.id.0.clone()
83}
84
85fn expand_embedded_graphs(
86    input: &mut PlannerInput<'_>,
87    view: &daedalus_registry::store::RegistryView,
88    diags: &mut Vec<Diagnostic>,
89) {
90    let mut embedded_graphs: HashMap<usize, Graph> = HashMap::new();
91    for (idx, node) in input.graph.nodes.iter().enumerate() {
92        let Some(desc) = latest_node(view, &node.id) else {
93            continue;
94        };
95        let Some(Value::String(raw)) = desc.metadata.get(EMBEDDED_GRAPH_KEY) else {
96            continue;
97        };
98        let parsed: Result<Graph, _> = serde_json::from_str(raw.as_ref());
99        match parsed {
100            Ok(graph) => {
101                embedded_graphs.insert(idx, graph);
102            }
103            Err(err) => {
104                diags.push(
105                    Diagnostic::new(
106                        DiagnosticCode::NodeMissing,
107                        format!("embedded graph parse failed: {err}"),
108                    )
109                    .in_pass("expand_embedded")
110                    .at_node(diagnostic_node_id(node)),
111                );
112            }
113        }
114    }
115
116    if embedded_graphs.is_empty() {
117        return;
118    }
119
120    let mut connected_inputs: HashMap<usize, HashSet<String>> = HashMap::new();
121    for edge in &input.graph.edges {
122        if embedded_graphs.contains_key(&edge.to.node.0) {
123            connected_inputs
124                .entry(edge.to.node.0)
125                .or_default()
126                .insert(edge.to.port.clone());
127        }
128    }
129
130    #[derive(Clone, Debug)]
131    struct EmbeddedMap {
132        inputs: BTreeMap<String, Vec<PortRef>>,
133        outputs: BTreeMap<String, Vec<PortRef>>,
134    }
135
136    let mut new_nodes: Vec<NodeInstance> = Vec::new();
137    let mut embedded_internal_edges: Vec<Edge> = Vec::new();
138    let mut remap: Vec<Option<usize>> = vec![None; input.graph.nodes.len()];
139    let mut embedded_maps: HashMap<usize, EmbeddedMap> = HashMap::new();
140
141    for (idx, node) in input.graph.nodes.iter().enumerate() {
142        let Some(graph) = embedded_graphs.get(&idx) else {
143            let new_idx = new_nodes.len();
144            new_nodes.push(node.clone());
145            remap[idx] = Some(new_idx);
146            continue;
147        };
148
149        let host_index = graph.nodes.iter().position(is_host_bridge).or_else(|| {
150            let host_label = latest_node(view, &node.id)
151                .and_then(|desc| desc.metadata.get(EMBEDDED_HOST_KEY))
152                .and_then(|val| match val {
153                    Value::String(s) => {
154                        let trimmed = s.trim();
155                        if trimmed.is_empty() {
156                            None
157                        } else {
158                            Some(trimmed.to_string())
159                        }
160                    }
161                    _ => None,
162                });
163            host_label.and_then(|label| {
164                graph
165                    .nodes
166                    .iter()
167                    .position(|n| n.label.as_deref() == Some(label.as_str()))
168            })
169        });
170
171        let Some(host_index) = host_index else {
172            diags.push(
173                Diagnostic::new(
174                    DiagnosticCode::NodeMissing,
175                    "embedded graph missing host bridge".to_string(),
176                )
177                .in_pass("expand_embedded")
178                .at_node(diagnostic_node_id(node)),
179            );
180            let new_idx = new_nodes.len();
181            new_nodes.push(node.clone());
182            remap[idx] = Some(new_idx);
183            continue;
184        };
185
186        let group_label = node
187            .label
188            .clone()
189            .unwrap_or_else(|| node.id.0.clone());
190        let prefix = format!("{group_label}::");
191        let mut index_map: Vec<Option<usize>> = vec![None; graph.nodes.len()];
192
193        for (g_idx, g_node) in graph.nodes.iter().enumerate() {
194            if g_idx == host_index {
195                continue;
196            }
197            let mut cloned = g_node.clone();
198            let base_label = cloned
199                .label
200                .clone()
201                .unwrap_or_else(|| cloned.id.0.clone());
202            cloned.label = Some(format!("{prefix}{base_label}"));
203            cloned.metadata.insert(
204                EMBEDDED_GROUP_KEY.to_string(),
205                Value::String(std::borrow::Cow::from(group_label.clone())),
206            );
207            let new_idx = new_nodes.len();
208            new_nodes.push(cloned);
209            index_map[g_idx] = Some(new_idx);
210        }
211
212        let mut inputs: BTreeMap<String, Vec<PortRef>> = BTreeMap::new();
213        let mut outputs: BTreeMap<String, Vec<PortRef>> = BTreeMap::new();
214
215        for edge in &graph.edges {
216            let from_is_host = edge.from.node.0 == host_index;
217            let to_is_host = edge.to.node.0 == host_index;
218
219            match (from_is_host, to_is_host) {
220                (true, false) => {
221                    if let Some(target_idx) = index_map[edge.to.node.0] {
222                        inputs
223                            .entry(edge.from.port.clone())
224                            .or_default()
225                            .push(PortRef {
226                                node: NodeRef(target_idx),
227                                port: edge.to.port.clone(),
228                            });
229                    }
230                }
231                (false, true) => {
232                    if let Some(source_idx) = index_map[edge.from.node.0] {
233                        outputs
234                            .entry(edge.to.port.clone())
235                            .or_default()
236                            .push(PortRef {
237                                node: NodeRef(source_idx),
238                                port: edge.from.port.clone(),
239                            });
240                    }
241                }
242                (false, false) => {
243                    let Some(from_idx) = index_map[edge.from.node.0] else {
244                        continue;
245                    };
246                    let Some(to_idx) = index_map[edge.to.node.0] else {
247                        continue;
248                    };
249                    embedded_internal_edges.push(Edge {
250                        from: PortRef {
251                            node: NodeRef(from_idx),
252                            port: edge.from.port.clone(),
253                        },
254                        to: PortRef {
255                            node: NodeRef(to_idx),
256                            port: edge.to.port.clone(),
257                        },
258                        metadata: edge.metadata.clone(),
259                    });
260                }
261                (true, true) => {}
262            }
263        }
264
265        embedded_maps.insert(
266            idx,
267            EmbeddedMap {
268                inputs,
269                outputs,
270            },
271        );
272    }
273
274    let mut new_edges: Vec<Edge> = Vec::new();
275    for edge in &input.graph.edges {
276        let from_map = embedded_maps.get(&edge.from.node.0);
277        let to_map = embedded_maps.get(&edge.to.node.0);
278
279        match (from_map, to_map) {
280            (None, None) => {
281                let Some(from_idx) = remap[edge.from.node.0] else {
282                    continue;
283                };
284                let Some(to_idx) = remap[edge.to.node.0] else {
285                    continue;
286                };
287                new_edges.push(Edge {
288                    from: PortRef {
289                        node: NodeRef(from_idx),
290                        port: edge.from.port.clone(),
291                    },
292                    to: PortRef {
293                        node: NodeRef(to_idx),
294                        port: edge.to.port.clone(),
295                    },
296                    metadata: edge.metadata.clone(),
297                });
298            }
299            (None, Some(to)) => {
300                let Some(from_idx) = remap[edge.from.node.0] else {
301                    continue;
302                };
303                if let Some(targets) = to.inputs.get(&edge.to.port) {
304                    for target in targets {
305                        new_edges.push(Edge {
306                            from: PortRef {
307                                node: NodeRef(from_idx),
308                                port: edge.from.port.clone(),
309                            },
310                            to: target.clone(),
311                            metadata: edge.metadata.clone(),
312                        });
313                    }
314                }
315            }
316            (Some(from), None) => {
317                let Some(to_idx) = remap[edge.to.node.0] else {
318                    continue;
319                };
320                if let Some(sources) = from.outputs.get(&edge.from.port) {
321                    for source in sources {
322                        new_edges.push(Edge {
323                            from: source.clone(),
324                            to: PortRef {
325                                node: NodeRef(to_idx),
326                                port: edge.to.port.clone(),
327                            },
328                            metadata: edge.metadata.clone(),
329                        });
330                    }
331                }
332            }
333            (Some(from), Some(to)) => {
334                let sources = from.outputs.get(&edge.from.port);
335                let targets = to.inputs.get(&edge.to.port);
336                if let (Some(sources), Some(targets)) = (sources, targets) {
337                    for source in sources {
338                        for target in targets {
339                            new_edges.push(Edge {
340                                from: source.clone(),
341                                to: target.clone(),
342                                metadata: edge.metadata.clone(),
343                            });
344                        }
345                    }
346                }
347            }
348        }
349    }
350
351    // Apply const inputs from embedded nodes when there is no incoming edge.
352    for (idx, node) in input.graph.nodes.iter().enumerate() {
353        let Some(map) = embedded_maps.get(&idx) else {
354            continue;
355        };
356        let connected = connected_inputs.get(&idx);
357        for (port, value) in &node.const_inputs {
358            if connected
359                .map(|set| set.contains(port))
360                .unwrap_or(false)
361            {
362                continue;
363            }
364            if let Some(targets) = map.inputs.get(port) {
365                for target in targets {
366                    if let Some(inner) = new_nodes.get_mut(target.node.0) {
367                        inner.const_inputs.retain(|(name, _)| name != &target.port);
368                        inner.const_inputs.push((target.port.clone(), value.clone()));
369                    }
370                }
371            }
372        }
373    }
374
375    new_edges.extend(embedded_internal_edges);
376
377    input.graph.nodes = new_nodes;
378    input.graph.edges = new_edges;
379}
380
381fn apply_descriptor_defaults(graph: &mut Graph, view: &daedalus_registry::store::RegistryView) {
382    for node in &mut graph.nodes {
383        let Some(desc) = latest_node(view, &node.id) else {
384            continue;
385        };
386        for port in &desc.inputs {
387            let Some(value) = &port.const_value else {
388                continue;
389            };
390            if node.const_inputs.iter().any(|(name, _)| name == &port.name) {
391                continue;
392            }
393            node.const_inputs.push((port.name.clone(), value.clone()));
394        }
395    }
396}
397
398/// Build an execution plan by running the ordered pass pipeline.
399/// Currently stubs; contracts are enforced via deterministic diagnostics ordering.
400/// Build an execution plan from a graph and registry.
401///
402/// ```
403/// use daedalus_planner::{build_plan, PlannerConfig, PlannerInput, Graph};
404/// use daedalus_registry::store::Registry;
405/// let registry = Registry::new();
406/// let out = build_plan(PlannerInput { graph: Graph::default(), registry: &registry }, PlannerConfig::default());
407/// assert_eq!(out.plan.graph.nodes.len(), 0);
408/// ```
409pub fn build_plan(mut input: PlannerInput<'_>, config: PlannerConfig) -> PlannerOutput {
410    let mut diags = Vec::new();
411    let view = input.registry.view();
412
413    // Security/integrity: clients can attach arbitrary node metadata in Graph JSON. These keys are
414    // planner-owned and must not be accepted as inputs, otherwise a client can "force" types.
415    for node in &mut input.graph.nodes {
416        node.metadata.remove(DYNAMIC_INPUT_TYPES_KEY);
417        node.metadata.remove(DYNAMIC_OUTPUT_TYPES_KEY);
418        node.metadata.remove("dynamic_inputs");
419        node.metadata.remove("dynamic_outputs");
420    }
421
422    // Placeholder passes; extend with real logic per PLAN.md.
423    expand_embedded_graphs(&mut input, &view, &mut diags);
424    apply_descriptor_defaults(&mut input.graph, &view);
425    hydrate_registry(&input, &view, &mut diags);
426    typecheck(&mut input.graph, &view, &mut diags);
427    convert(&mut input.graph, input.registry, &view, &mut diags, &config);
428    align(&mut input.graph, &mut diags);
429    gpu(&mut input.graph, &config, &mut diags);
430    schedule(&mut input.graph, &mut diags);
431    if config.enable_lints {
432        lint(&input, &mut diags);
433    }
434
435    let plan = ExecutionPlan::new(input.graph.clone(), diags.clone());
436    PlannerOutput {
437        plan,
438        diagnostics: diags,
439    }
440}
441
442fn latest_node<'a>(
443    view: &'a daedalus_registry::store::RegistryView,
444    id: &NodeId,
445) -> Option<&'a NodeDescriptor> {
446    view.nodes.get(id)
447}
448
449fn hydrate_registry(
450    input: &PlannerInput<'_>,
451    view: &daedalus_registry::store::RegistryView,
452    diags: &mut Vec<Diagnostic>,
453) {
454    for node in &input.graph.nodes {
455        if latest_node(view, &node.id).is_none() {
456            diags.push(
457                Diagnostic::new(
458                    DiagnosticCode::NodeMissing,
459                    format!("node {} not found in registry", node.id.0),
460                )
461                .in_pass("hydrate_registry")
462                .at_node(diagnostic_node_id(node)),
463            );
464        }
465    }
466}
467
468fn typecheck(
469    graph: &mut Graph,
470    view: &daedalus_registry::store::RegistryView,
471    diags: &mut Vec<Diagnostic>,
472) {
473    #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
474    struct TypeVarKey {
475        node: usize,
476        is_input: bool,
477        port: String,
478    }
479
480    #[derive(Clone, Debug)]
481    struct Dsu {
482        parent: Vec<usize>,
483        rank: Vec<u8>,
484        binding: Vec<Option<TypeExpr>>,
485    }
486
487    impl Dsu {
488        fn new() -> Self {
489            Self {
490                parent: Vec::new(),
491                rank: Vec::new(),
492                binding: Vec::new(),
493            }
494        }
495
496        fn make_set(&mut self) -> usize {
497            let id = self.parent.len();
498            self.parent.push(id);
499            self.rank.push(0);
500            self.binding.push(None);
501            id
502        }
503
504        fn find(&mut self, x: usize) -> usize {
505            if self.parent[x] != x {
506                let p = self.parent[x];
507                self.parent[x] = self.find(p);
508            }
509            self.parent[x]
510        }
511
512        fn union(&mut self, a: usize, b: usize) -> Result<usize, (TypeExpr, TypeExpr)> {
513            let mut ra = self.find(a);
514            let mut rb = self.find(b);
515            if ra == rb {
516                return Ok(ra);
517            }
518            if self.rank[ra] < self.rank[rb] {
519                std::mem::swap(&mut ra, &mut rb);
520            }
521            self.parent[rb] = ra;
522            if self.rank[ra] == self.rank[rb] {
523                self.rank[ra] = self.rank[ra].saturating_add(1);
524            }
525
526            match (&self.binding[ra], &self.binding[rb]) {
527                (Some(a), Some(b)) if a != b => return Err((a.clone(), b.clone())),
528                (None, Some(b)) => self.binding[ra] = Some(b.clone()),
529                _ => {}
530            }
531            Ok(ra)
532        }
533
534        fn bind(&mut self, var: usize, ty: TypeExpr) -> Result<(), (TypeExpr, TypeExpr)> {
535            let r = self.find(var);
536            if let Some(existing) = &self.binding[r] {
537                if existing != &ty {
538                    return Err((existing.clone(), ty));
539                }
540                return Ok(());
541            }
542            self.binding[r] = Some(ty);
543            Ok(())
544        }
545
546        fn bound_type(&mut self, var: usize) -> Option<TypeExpr> {
547            let r = self.find(var);
548            self.binding[r].clone()
549        }
550    }
551
552    fn is_generic_marker(ty: &TypeExpr) -> bool {
553        matches!(ty, TypeExpr::Opaque(value) if value.eq_ignore_ascii_case("generic"))
554    }
555
556    fn upsert_string_map(meta: &mut BTreeMap<String, Value>, key: &str, port: &str, value: String) {
557        let entry = meta
558            .entry(key.to_string())
559            .or_insert_with(|| Value::Map(Vec::new()));
560        if !matches!(entry, Value::Map(_)) {
561            *entry = Value::Map(Vec::new());
562        }
563        let Value::Map(entries) = entry else { return };
564        let port_lc = port.to_ascii_lowercase();
565        let mut replaced = false;
566        for (k, v) in entries.iter_mut() {
567            if matches!(k, Value::String(s) if s.eq_ignore_ascii_case(&port_lc)) {
568                *v = Value::String(std::borrow::Cow::Owned(value.clone()));
569                replaced = true;
570                break;
571            }
572        }
573        if !replaced {
574            entries.push((
575                Value::String(std::borrow::Cow::Owned(port_lc)),
576                Value::String(std::borrow::Cow::Owned(value)),
577            ));
578        }
579        entries.sort_by(|(ak, _), (bk, _)| {
580            let a = match ak {
581                Value::String(s) => s.as_ref(),
582                _ => "",
583            };
584            let b = match bk {
585                Value::String(s) => s.as_ref(),
586                _ => "",
587            };
588            a.cmp(b)
589        });
590    }
591
592    let mut vars: BTreeMap<TypeVarKey, usize> = BTreeMap::new();
593    let mut dsu = Dsu::new();
594
595    for edge in &graph.edges {
596        let from_node = match graph.nodes.get(edge.from.node.0) {
597            Some(n) => n,
598            None => continue,
599        };
600        let to_node = match graph.nodes.get(edge.to.node.0) {
601            Some(n) => n,
602            None => continue,
603        };
604        let from_desc = latest_node(view, &from_node.id);
605        let to_desc = latest_node(view, &to_node.id);
606
607        let from_ty = from_desc.and_then(|d| port_type(from_node, d, &edge.from.port, false));
608        let to_ty = to_desc.and_then(|d| port_type(to_node, d, &edge.to.port, true));
609
610        if from_desc.is_none() {
611            diags.push(
612                Diagnostic::new(
613                    DiagnosticCode::NodeMissing,
614                    format!("node {} not found in registry", from_node.id.0),
615                )
616                .in_pass("typecheck")
617                .at_node(diagnostic_node_id(from_node)),
618            );
619            continue;
620        }
621        if to_desc.is_none() {
622            diags.push(
623                Diagnostic::new(
624                    DiagnosticCode::NodeMissing,
625                    format!("node {} not found in registry", to_node.id.0),
626                )
627                .in_pass("typecheck")
628                .at_node(diagnostic_node_id(to_node)),
629            );
630            continue;
631        }
632
633        if from_ty.is_none() {
634            diags.push(
635                Diagnostic::new(
636                    DiagnosticCode::PortMissing,
637                    format!(
638                        "output port `{}` not found on node {}",
639                        edge.from.port, from_node.id.0
640                    ),
641                )
642                .in_pass("typecheck")
643                .at_node(diagnostic_node_id(from_node))
644                .at_port(edge.from.port.clone()),
645            );
646        }
647        if to_ty.is_none() {
648            diags.push(
649                Diagnostic::new(
650                    DiagnosticCode::PortMissing,
651                    format!(
652                        "input port `{}` not found on node {}",
653                        edge.to.port, to_node.id.0
654                    ),
655                )
656                .in_pass("typecheck")
657                .at_node(diagnostic_node_id(to_node))
658                .at_port(edge.to.port.clone()),
659            );
660        }
661
662        let (Some(from_ty), Some(to_ty)) = (from_ty, to_ty) else {
663            continue;
664        };
665
666        // Resolve `Opaque("generic")` as a proper type variable: graph edges constrain it.
667        let from_term = if is_generic_marker(&from_ty) {
668            let key = TypeVarKey {
669                node: edge.from.node.0,
670                is_input: false,
671                port: edge.from.port.clone(),
672            };
673            let id = *vars.entry(key).or_insert_with(|| dsu.make_set());
674            Some(id)
675        } else {
676            None
677        };
678        let to_term = if is_generic_marker(&to_ty) {
679            let key = TypeVarKey {
680                node: edge.to.node.0,
681                is_input: true,
682                port: edge.to.port.clone(),
683            };
684            let id = *vars.entry(key).or_insert_with(|| dsu.make_set());
685            Some(id)
686        } else {
687            None
688        };
689
690        let conflict = match (from_term, to_term) {
691            (Some(var), None) => dsu.bind(var, to_ty.clone()).err(),
692            (None, Some(var)) => dsu.bind(var, from_ty.clone()).err(),
693            (Some(a), Some(b)) => dsu.union(a, b).err(),
694            (None, None) => None,
695        };
696
697        if let Some((a, b)) = conflict {
698            let host = if is_generic_marker(&from_ty) {
699                from_node
700            } else {
701                to_node
702            };
703            let port = if is_generic_marker(&from_ty) {
704                edge.from.port.clone()
705            } else {
706                edge.to.port.clone()
707            };
708            diags.push(
709                Diagnostic::new(
710                    DiagnosticCode::TypeMismatch,
711                    format!(
712                        "generic port `{}` inferred conflicting types: {:?} vs {:?} (edge {}.{} -> {}.{})",
713                        port,
714                        a,
715                        b,
716                        from_node.id.0,
717                        edge.from.port,
718                        to_node.id.0,
719                        edge.to.port
720                    ),
721                )
722                .in_pass("typecheck")
723                .at_node(diagnostic_node_id(host))
724                .at_port(port),
725            );
726        }
727    }
728
729    // Apply solved generic types back onto node metadata so later passes/runtime can consume them.
730    for (key, var) in vars {
731        let Some(ty) = dsu.bound_type(var) else {
732            continue;
733        };
734        let Some(node) = graph.nodes.get_mut(key.node) else {
735            continue;
736        };
737        let json = match serde_json::to_string(&ty) {
738            Ok(v) => v,
739            Err(_) => continue,
740        };
741        let meta_key = if key.is_input {
742            DYNAMIC_INPUT_TYPES_KEY
743        } else {
744            DYNAMIC_OUTPUT_TYPES_KEY
745        };
746        upsert_string_map(&mut node.metadata, meta_key, &key.port, json);
747    }
748}
749
750fn convert(
751    graph: &mut Graph,
752    registry: &daedalus_registry::store::Registry,
753    view: &daedalus_registry::store::RegistryView,
754    diags: &mut Vec<Diagnostic>,
755    config: &PlannerConfig,
756) {
757    fn collect_structural_steps(
758        registry: &daedalus_registry::store::Registry,
759        from: &TypeExpr,
760        to: &TypeExpr,
761        active_features: &[String],
762        allow_gpu: bool,
763        steps: &mut BTreeSet<ConverterId>,
764        depth: usize,
765    ) -> bool {
766        if from == to {
767            return true;
768        }
769        if depth > 64 {
770            return false;
771        }
772        if let Ok(res) =
773            registry.resolve_converter_with_context(from, to, active_features, allow_gpu)
774        {
775            for step in res.provenance.steps {
776                steps.insert(step);
777            }
778            return true;
779        }
780        match (from, to) {
781            (
782                TypeExpr::Scalar(ValueType::I32 | ValueType::U32),
783                TypeExpr::Scalar(ValueType::Int),
784            )
785            | (
786                TypeExpr::Scalar(ValueType::Int),
787                TypeExpr::Scalar(ValueType::I32 | ValueType::U32),
788            )
789            | (TypeExpr::Scalar(ValueType::F32), TypeExpr::Scalar(ValueType::Float))
790            | (TypeExpr::Scalar(ValueType::Float), TypeExpr::Scalar(ValueType::F32)) => true,
791            (TypeExpr::Optional(a), TypeExpr::Optional(b)) => collect_structural_steps(
792                registry,
793                a,
794                b,
795                active_features,
796                allow_gpu,
797                steps,
798                depth + 1,
799            ),
800            (TypeExpr::List(a), TypeExpr::List(b)) => collect_structural_steps(
801                registry,
802                a,
803                b,
804                active_features,
805                allow_gpu,
806                steps,
807                depth + 1,
808            ),
809            (TypeExpr::Map(ak, av), TypeExpr::Map(bk, bv)) => {
810                collect_structural_steps(
811                    registry,
812                    ak,
813                    bk,
814                    active_features,
815                    allow_gpu,
816                    steps,
817                    depth + 1,
818                ) && collect_structural_steps(
819                    registry,
820                    av,
821                    bv,
822                    active_features,
823                    allow_gpu,
824                    steps,
825                    depth + 1,
826                )
827            }
828            (TypeExpr::Tuple(a), TypeExpr::Tuple(b)) => {
829                if a.len() != b.len() {
830                    return false;
831                }
832                a.iter().zip(b.iter()).all(|(ai, bi)| {
833                    collect_structural_steps(
834                        registry,
835                        ai,
836                        bi,
837                        active_features,
838                        allow_gpu,
839                        steps,
840                        depth + 1,
841                    )
842                })
843            }
844            (TypeExpr::Struct(a_fields), TypeExpr::Struct(b_fields)) => {
845                if a_fields.len() != b_fields.len() {
846                    return false;
847                }
848                for bf in b_fields {
849                    let Some(af) = a_fields.iter().find(|af| af.name == bf.name) else {
850                        return false;
851                    };
852                    if !collect_structural_steps(
853                        registry,
854                        &af.ty,
855                        &bf.ty,
856                        active_features,
857                        allow_gpu,
858                        steps,
859                        depth + 1,
860                    ) {
861                        return false;
862                    }
863                }
864                true
865            }
866            (TypeExpr::Enum(a_vars), TypeExpr::Enum(b_vars)) => {
867                if a_vars.len() != b_vars.len() {
868                    return false;
869                }
870                for bv in b_vars {
871                    let Some(av) = a_vars.iter().find(|av| av.name == bv.name) else {
872                        return false;
873                    };
874                    match (&av.ty, &bv.ty) {
875                        (None, None) => {}
876                        (Some(at), Some(bt)) => {
877                            if !collect_structural_steps(
878                                registry,
879                                at,
880                                bt,
881                                active_features,
882                                allow_gpu,
883                                steps,
884                                depth + 1,
885                            ) {
886                                return false;
887                            }
888                        }
889                        _ => return false,
890                    }
891                }
892                true
893            }
894            _ => false,
895        }
896    }
897
898    fn resolve_structural(
899        registry: &daedalus_registry::store::Registry,
900        from: &TypeExpr,
901        to: &TypeExpr,
902        active_features: &[String],
903        allow_gpu: bool,
904    ) -> Option<Vec<ConverterId>> {
905        let mut steps = BTreeSet::new();
906        if !collect_structural_steps(
907            registry,
908            from,
909            to,
910            active_features,
911            allow_gpu,
912            &mut steps,
913            0,
914        ) {
915            return None;
916        }
917        Some(steps.into_iter().collect())
918    }
919
920    let mut converter_metadata: Vec<(String, String)> = Vec::new();
921    for edge in &graph.edges {
922        let from_node = match graph.nodes.get(edge.from.node.0) {
923            Some(n) => n,
924            None => continue,
925        };
926        let to_node = match graph.nodes.get(edge.to.node.0) {
927            Some(n) => n,
928            None => continue,
929        };
930        let from_desc = latest_node(view, &from_node.id);
931        let to_desc = latest_node(view, &to_node.id);
932        let from_ty = from_desc.and_then(|d| port_type(from_node, d, &edge.from.port, false));
933        let to_ty = to_desc.and_then(|d| port_type(to_node, d, &edge.to.port, true));
934        let (Some(out_ty), Some(in_ty)) = (from_ty, to_ty) else {
935            continue;
936        };
937        if out_ty == in_ty {
938            continue;
939        }
940        let allow_gpu = config.enable_gpu;
941        let features: Vec<String> = config.active_features.clone();
942        let result: Result<ConversionResolution, ()> =
943            resolve_structural(registry, &out_ty, &in_ty, &features, allow_gpu)
944                .map(|steps| ConversionResolution {
945                    provenance: ConversionProvenance {
946                        steps,
947                        total_cost: 0,
948                        skipped_cycles: vec![],
949                        skipped_gpu: vec![],
950                        skipped_features: vec![],
951                    },
952                })
953                .ok_or(());
954        match result {
955            Ok(res) => {
956                let mut feats = features.clone();
957                feats.sort();
958                let feats_str = if feats.is_empty() {
959                    "none".to_string()
960                } else {
961                    feats.join(",")
962                };
963                let key = format!(
964                    "converter:{}->{}:{}->{}",
965                    from_node.id.0, to_node.id.0, edge.from.port, edge.to.port
966                );
967                let mut path: Vec<String> =
968                    res.provenance.steps.iter().map(|c| c.0.clone()).collect();
969                if path.is_empty() {
970                    path.push("identity".into());
971                }
972                let value = format!(
973                    "cost={};path={};features={};gpu={};skipped_gpu={:?};skipped_features={:?}",
974                    res.provenance.total_cost,
975                    path.join(">"),
976                    feats_str,
977                    allow_gpu,
978                    res.provenance.skipped_gpu,
979                    res.provenance.skipped_features
980                );
981                converter_metadata.push((key, value));
982            }
983            Err(_) => {
984                let mut feats = features.clone();
985                feats.sort();
986                let feats_str = if feats.is_empty() {
987                    "none".to_string()
988                } else {
989                    feats.join(",")
990                };
991                diags.push(
992                    Diagnostic::new(
993                        DiagnosticCode::ConverterMissing,
994                        format!(
995                            "no converter from {:?} to {:?} for edge {}.{} -> {}.{} [features: {}; gpu: {}]",
996                            out_ty,
997                            in_ty,
998                            from_node.id.0,
999                            edge.from.port,
1000                            to_node.id.0,
1001                            edge.to.port,
1002                            feats_str,
1003                            allow_gpu
1004                        ),
1005                    )
1006                    .in_pass("convert")
1007                    .at_node(diagnostic_node_id(to_node))
1008                    .at_port(edge.to.port.clone()),
1009                );
1010            }
1011        }
1012    }
1013    converter_metadata.sort_by(|a, b| a.0.cmp(&b.0));
1014    for (k, v) in converter_metadata {
1015        graph.metadata.insert(k, v);
1016    }
1017}
1018fn align(graph: &mut Graph, diags: &mut Vec<Diagnostic>) {
1019    // Kahn topo sort to detect cycles and emit ordering metadata.
1020    let n = graph.nodes.len();
1021    let mut indegree = vec![0usize; n];
1022    let mut adj: Vec<Vec<usize>> = vec![Vec::new(); n];
1023    for edge in &graph.edges {
1024        if edge.from.node.0 < n
1025            && edge.to.node.0 < n
1026            && !is_host_bridge(&graph.nodes[edge.from.node.0])
1027            && !is_host_bridge(&graph.nodes[edge.to.node.0])
1028        {
1029            adj[edge.from.node.0].push(edge.to.node.0);
1030            indegree[edge.to.node.0] += 1;
1031        }
1032    }
1033    let mut queue: Vec<usize> = indegree
1034        .iter()
1035        .enumerate()
1036        .filter(|(_, d)| **d == 0)
1037        .map(|(i, _)| i)
1038        .collect();
1039    let mut order = Vec::new();
1040    while let Some(v) = queue.pop() {
1041        order.push(v);
1042        for &nxt in &adj[v] {
1043            indegree[nxt] -= 1;
1044            if indegree[nxt] == 0 {
1045                queue.push(nxt);
1046            }
1047        }
1048    }
1049    if order.len() != n {
1050        // Cycle: collect nodes with indegree > 0 for deterministic message.
1051        let mut cyc_nodes: Vec<String> = indegree
1052            .iter()
1053            .enumerate()
1054            .filter(|(_, d)| **d > 0)
1055            .map(|(i, _)| graph.nodes[i].id.0.clone())
1056            .collect();
1057        cyc_nodes.sort();
1058        diags.push(
1059            Diagnostic::new(
1060                DiagnosticCode::ScheduleConflict,
1061                format!(
1062                    "graph contains a cycle involving nodes: {}",
1063                    cyc_nodes.join(",")
1064                ),
1065            )
1066            .in_pass("align"),
1067        );
1068    } else {
1069        let value = order
1070            .iter()
1071            .map(|&idx| graph.nodes[idx].id.0.clone())
1072            .collect::<Vec<_>>()
1073            .join(",");
1074        graph.metadata.insert("topo_order".into(), value);
1075    }
1076}
1077fn gpu(graph: &mut Graph, config: &PlannerConfig, diags: &mut Vec<Diagnostic>) {
1078    let mut gpu_reasons: Vec<String> = Vec::new();
1079    // If GPU is disabled, flag required nodes.
1080    if !config.enable_gpu {
1081        gpu_reasons.push("gpu-disabled".into());
1082        let mut gpu_nodes: Vec<String> = Vec::new();
1083        for node in &graph.nodes {
1084            if matches!(node.compute, ComputeAffinity::GpuRequired) {
1085                gpu_nodes.push(node.id.0.clone());
1086                diags.push(
1087                    Diagnostic::new(
1088                        DiagnosticCode::GpuUnsupported,
1089                        format!("node {} requires GPU but GPU is disabled", node.id.0),
1090                    )
1091                    .in_pass("gpu")
1092                    .at_node(diagnostic_node_id(node)),
1093                );
1094            }
1095        }
1096        if !gpu_nodes.is_empty() {
1097            graph
1098                .metadata
1099                .insert("gpu_segments".into(), gpu_nodes.join(","));
1100            graph
1101                .metadata
1102                .insert("gpu_why".into(), gpu_reasons.join(","));
1103        }
1104        return;
1105    }
1106
1107    // If caps are provided, validate support.
1108    #[cfg(feature = "gpu")]
1109    if let Some(caps) = &config.gpu_caps {
1110        let require_format = daedalus_gpu::GpuFormat::Rgba8Unorm;
1111        let mut ok = true;
1112        let has_format = caps
1113            .format_features
1114            .iter()
1115            .find(|f| f.format == require_format && f.sampleable);
1116        if caps.queue_count == 0 || !caps.has_transfer_queue {
1117            ok = false;
1118        }
1119        if has_format.is_none() {
1120            ok = false;
1121        }
1122        if !ok {
1123            gpu_reasons.push(format!(
1124                "insufficient-caps:queues={} transfer={} format_sampleable={}",
1125                caps.queue_count,
1126                caps.has_transfer_queue,
1127                has_format.is_some()
1128            ));
1129            for node in &graph.nodes {
1130                if matches!(
1131                    node.compute,
1132                    ComputeAffinity::GpuRequired | ComputeAffinity::GpuPreferred
1133                ) {
1134                    diags.push(
1135                        Diagnostic::new(
1136                            DiagnosticCode::GpuUnsupported,
1137                            format!(
1138                                "node {} cannot run on GPU: insufficient caps (queues={}, transfer={}, format={:?} sampleable={})",
1139                                node.id.0,
1140                                caps.queue_count,
1141                                caps.has_transfer_queue,
1142                                require_format,
1143                                has_format.is_some()
1144                            ),
1145                        )
1146                        .in_pass("gpu")
1147                        .at_node(diagnostic_node_id(node)),
1148                    );
1149                }
1150            }
1151        }
1152    }
1153
1154    // Record GPU segments metadata (simple grouping of contiguous GPU-pref/required nodes).
1155    let mut segments: Vec<Vec<String>> = Vec::new();
1156    let mut current: Vec<String> = Vec::new();
1157    for node in &graph.nodes {
1158        match node.compute {
1159            ComputeAffinity::GpuPreferred | ComputeAffinity::GpuRequired => {
1160                current.push(node.id.0.clone());
1161            }
1162            ComputeAffinity::CpuOnly => {
1163                if !current.is_empty() {
1164                    segments.push(current);
1165                    current = Vec::new();
1166                }
1167            }
1168        }
1169    }
1170    if !current.is_empty() {
1171        segments.push(current);
1172    }
1173    if !segments.is_empty() {
1174        let seg_strs: Vec<String> = segments.into_iter().map(|seg| seg.join("->")).collect();
1175        graph
1176            .metadata
1177            .insert("gpu_segments".into(), seg_strs.join("|"));
1178    }
1179    if !gpu_reasons.is_empty() {
1180        gpu_reasons.sort();
1181        gpu_reasons.dedup();
1182        graph
1183            .metadata
1184            .insert("gpu_why".into(), gpu_reasons.join(";"));
1185    }
1186}
1187fn schedule(graph: &mut Graph, _diags: &mut Vec<Diagnostic>) {
1188    // If topo_order exists, use it; else declared order. Attach basic priority info.
1189    let order = graph
1190        .metadata
1191        .get("topo_order")
1192        .cloned()
1193        .unwrap_or_else(|| {
1194            graph
1195                .nodes
1196                .iter()
1197                .map(|n| n.id.0.clone())
1198                .collect::<Vec<_>>()
1199                .join(",")
1200        });
1201    graph.metadata.insert("schedule_order".into(), order);
1202
1203    // Prefer GPU-required nodes first within same topo layer (simple heuristic).
1204    let mut priorities: Vec<(String, u8)> = graph
1205        .nodes
1206        .iter()
1207        .map(|n| {
1208            let p = match n.compute {
1209                ComputeAffinity::GpuPreferred => 1,
1210                ComputeAffinity::GpuRequired | ComputeAffinity::CpuOnly => 2,
1211            };
1212            (n.id.0.clone(), p)
1213        })
1214        .collect();
1215    priorities.sort_by(|a, b| a.1.cmp(&b.1).then_with(|| a.0.cmp(&b.0)));
1216    let pr_str = priorities
1217        .into_iter()
1218        .map(|(id, p)| format!("{id}:{p}"))
1219        .collect::<Vec<_>>()
1220        .join(",");
1221    graph.metadata.insert("schedule_priority".into(), pr_str);
1222}
1223fn lint(input: &PlannerInput<'_>, diags: &mut Vec<Diagnostic>) {
1224    let n = input.graph.nodes.len();
1225    let mut incoming: Vec<usize> = vec![0; n];
1226    let mut outgoing: Vec<usize> = vec![0; n];
1227    for e in &input.graph.edges {
1228        if e.from.node.0 < n {
1229            outgoing[e.from.node.0] += 1;
1230        }
1231        if e.to.node.0 < n {
1232            incoming[e.to.node.0] += 1;
1233        }
1234    }
1235
1236    for (idx, node) in input.graph.nodes.iter().enumerate() {
1237        if incoming[idx] == 0 && !node.inputs.is_empty() {
1238            diags.push(
1239                Diagnostic::new(
1240                    DiagnosticCode::LintWarning,
1241                    format!(
1242                        "node {} has unconnected inputs: {}",
1243                        node.id.0,
1244                        node.inputs.join(",")
1245                    ),
1246                )
1247                .in_pass("lint")
1248                .at_node(diagnostic_node_id(node)),
1249            );
1250        }
1251        if outgoing[idx] == 0 && !node.outputs.is_empty() {
1252            diags.push(
1253                Diagnostic::new(
1254                    DiagnosticCode::LintWarning,
1255                    format!(
1256                        "node {} has unused outputs: {}",
1257                        node.id.0,
1258                        node.outputs.join(",")
1259                    ),
1260                )
1261                .in_pass("lint")
1262                .at_node(diagnostic_node_id(node)),
1263            );
1264        }
1265    }
1266}
1267
1268fn port_type(
1269    node: &NodeInstance,
1270    desc: &NodeDescriptor,
1271    name: &str,
1272    is_input: bool,
1273) -> Option<TypeExpr> {
1274    fn is_generic_marker(ty: &TypeExpr) -> bool {
1275        matches!(ty, TypeExpr::Opaque(value) if value.eq_ignore_ascii_case("generic"))
1276    }
1277
1278    fn resolve_override(
1279        meta: &std::collections::BTreeMap<String, Value>,
1280        key: &str,
1281        port: &str,
1282    ) -> Option<TypeExpr> {
1283        let Value::Map(entries) = meta.get(key)? else {
1284            return None;
1285        };
1286        let port_lc = port.to_ascii_lowercase();
1287        let (_, value) = entries
1288            .iter()
1289            .find(|(k, _)| matches!(k, Value::String(s) if s.eq_ignore_ascii_case(&port_lc)))?;
1290        let Value::String(json) = value else {
1291            return None;
1292        };
1293        serde_json::from_str::<TypeExpr>(json).ok()
1294    }
1295
1296    if is_input {
1297        if let Some(ty) = desc.input_ty_for(name) {
1298            if is_generic_marker(ty)
1299                && let Some(solved) =
1300                    resolve_override(&node.metadata, DYNAMIC_INPUT_TYPES_KEY, name)
1301            {
1302                return Some(solved);
1303            }
1304            return Some(ty.clone());
1305        }
1306        if let Some(ty) = resolve_override(&node.metadata, DYNAMIC_INPUT_TYPES_KEY, name) {
1307            return Some(ty);
1308        }
1309    } else if let Some(port) = desc.outputs.iter().find(|p| p.name == name) {
1310        if is_generic_marker(&port.ty)
1311            && let Some(solved) = resolve_override(&node.metadata, DYNAMIC_OUTPUT_TYPES_KEY, name)
1312        {
1313            return Some(solved);
1314        }
1315        return Some(port.ty.clone());
1316    } else if let Some(ty) = resolve_override(&node.metadata, DYNAMIC_OUTPUT_TYPES_KEY, name) {
1317        return Some(ty);
1318    }
1319    let key = if is_input {
1320        "dynamic_inputs"
1321    } else {
1322        "dynamic_outputs"
1323    };
1324    let resolve_meta = |meta: &std::collections::BTreeMap<String, Value>| match meta.get(key) {
1325        Some(Value::String(value)) if !value.trim().is_empty() => {
1326            Some(TypeExpr::Opaque(value.trim().to_string()))
1327        }
1328        _ => None,
1329    };
1330    // Dynamic port declarations are trusted only from the registry descriptor, not per-node
1331    // graph metadata (which may come from untrusted UI/clients).
1332    resolve_meta(&desc.metadata)
1333}