1use std::borrow::Cow;
2use std::collections::{HashMap, HashSet};
3
4use crate::common::{doc_anchor_for_rule, ErrorInfo, Phase};
5use semver::{Version as SemverVersion, VersionReq};
6
7pub type Version = String;
8pub type NodeId = String;
9
10#[derive(Debug, Clone, PartialEq)]
11pub struct ClusterDefinition {
12 pub id: String,
13 pub version: Version,
14 pub nodes: HashMap<NodeId, NodeInstance>,
15 pub edges: Vec<Edge>,
16 pub input_ports: Vec<InputPortSpec>,
17 pub output_ports: Vec<OutputPortSpec>,
18 pub parameters: Vec<ParameterSpec>,
19 pub declared_signature: Option<Signature>,
20}
21
22#[derive(Debug, Clone, PartialEq)]
23pub struct NodeInstance {
24 pub id: NodeId,
25 pub kind: NodeKind,
26 pub parameter_bindings: HashMap<String, ParameterBinding>,
27}
28
29#[derive(Debug, Clone, PartialEq)]
30pub enum NodeKind {
31 Impl {
32 impl_id: String,
33 version: Version,
34 },
35 Cluster {
36 cluster_id: String,
37 version: Version,
38 },
39}
40
41#[derive(Debug, Clone, PartialEq)]
42pub struct Edge {
43 pub from: OutputRef,
44 pub to: InputRef,
45}
46
47#[derive(Debug, Clone, PartialEq)]
48pub struct OutputRef {
49 pub node_id: NodeId,
50 pub port_name: String,
51}
52
53#[derive(Debug, Clone, PartialEq)]
54pub struct InputRef {
55 pub node_id: NodeId,
56 pub port_name: String,
57}
58
59#[derive(Debug, Clone, PartialEq)]
60pub struct InputPortSpec {
61 pub name: String,
62 pub maps_to: GraphInputPlaceholder,
63}
64
65#[derive(Debug, Clone, PartialEq)]
66pub struct OutputPortSpec {
67 pub name: String,
68 pub maps_to: OutputRef,
69}
70
71#[derive(Debug, Clone, PartialEq)]
72pub struct GraphInputPlaceholder {
73 pub name: String,
74 pub ty: ValueType,
75 pub required: bool,
76}
77
78#[derive(Debug, Clone, PartialEq)]
79pub enum ParameterDefault {
80 Literal(ParameterValue),
81 DeriveKey { slot_name: String },
82}
83
84#[derive(Debug, Clone, PartialEq)]
85pub struct ParameterSpec {
86 pub name: String,
87 pub ty: ParameterType,
88 pub default: Option<ParameterDefault>,
89 pub required: bool,
90}
91
92#[derive(Debug, Clone, PartialEq)]
93pub enum ParameterBinding {
94 Literal { value: ParameterValue },
95 Exposed { parent_param: String },
96}
97
98#[derive(Debug, Clone, PartialEq)]
99pub struct Signature {
100 pub kind: BoundaryKind,
101 pub inputs: Vec<PortSpec>,
102 pub outputs: Vec<PortSpec>,
103 pub has_side_effects: bool,
104 pub is_origin: bool,
105}
106
107#[derive(Debug, Clone, PartialEq)]
108pub struct PortSpec {
109 pub name: String,
110 pub ty: ValueType,
111 pub cardinality: Cardinality,
112 pub wireable: bool,
113}
114
115#[derive(Debug, Clone, PartialEq)]
116pub enum BoundaryKind {
117 SourceLike,
118 ComputeLike,
119 TriggerLike,
120 ActionLike,
121}
122
123#[derive(Debug, Clone, PartialEq)]
124pub enum ValueType {
125 Number,
126 Series,
127 Bool,
128 Event,
129 String,
130}
131
132#[derive(Debug, Clone, PartialEq)]
133pub enum Cardinality {
134 Single,
135 Multiple,
136}
137
138#[derive(Debug, Clone, PartialEq)]
139pub enum ParameterType {
140 Int,
141 Number,
142 Bool,
143 String,
144 Enum,
145}
146
147#[derive(Debug, Clone, PartialEq)]
148pub enum ParameterValue {
149 Int(i64),
150 Number(f64),
151 Bool(bool),
152 String(String),
153 Enum(String),
154}
155
156#[derive(Debug, Clone, PartialEq)]
157pub enum PrimitiveKind {
158 Source,
159 Compute,
160 Trigger,
161 Action,
162}
163
164#[derive(Debug, Clone, PartialEq)]
165pub struct OutputMetadata {
166 pub value_type: ValueType,
167 pub cardinality: Cardinality,
168}
169
170#[derive(Debug, Clone, PartialEq)]
171pub struct PrimitiveMetadata {
172 pub kind: PrimitiveKind,
173 pub inputs: Vec<InputMetadata>,
174 pub outputs: HashMap<String, OutputMetadata>,
175 pub parameters: Vec<ParameterMetadata>,
177}
178
179#[derive(Debug, Clone, PartialEq)]
180pub struct InputMetadata {
181 pub name: String,
182 pub value_type: ValueType,
183 pub required: bool,
184}
185
186#[derive(Debug, Clone, PartialEq)]
189pub struct ParameterMetadata {
190 pub name: String,
191 pub ty: ParameterType,
192 pub default: Option<ParameterValue>,
193 pub required: bool,
194}
195
196#[derive(Debug, Clone, PartialEq)]
200pub struct ExpandedGraph {
201 pub nodes: HashMap<String, ExpandedNode>,
202 pub edges: Vec<ExpandedEdge>,
203 pub boundary_inputs: Vec<InputPortSpec>,
204 pub boundary_outputs: Vec<OutputPortSpec>,
205}
206
207#[derive(Debug, Clone, PartialEq)]
212pub struct ExpandedNode {
213 pub runtime_id: String,
214 pub authoring_path: Vec<(String, NodeId)>,
215 pub implementation: ImplementationInstance,
216 pub parameters: HashMap<String, ParameterValue>,
217}
218
219#[derive(Debug, Clone, PartialEq)]
220pub struct ImplementationInstance {
221 pub impl_id: String,
223 pub requested_version: Version,
225 pub version: Version,
227}
228
229#[derive(Debug, Clone, PartialEq)]
230pub struct ExpandedEdge {
231 pub from: ExpandedEndpoint,
232 pub to: ExpandedEndpoint,
233}
234
235#[derive(Debug, Clone, PartialEq)]
236pub enum ExpandedEndpoint {
237 NodePort { node_id: String, port_name: String },
238 ExternalInput { name: String },
239}
240
241#[derive(Debug, Clone, PartialEq)]
242#[non_exhaustive]
243pub enum ExpandError {
244 InvariantViolation(String),
246 EmptyCluster,
247 MissingCluster {
248 id: String,
249 version: Version,
250 },
251 InvalidVersionSelector {
253 target_kind: VersionTargetKind,
254 id: String,
255 selector: Version,
256 },
257 UnsatisfiedVersionConstraint {
259 target_kind: VersionTargetKind,
260 id: String,
261 selector: Version,
262 available_versions: Vec<Version>,
263 },
264 InvalidAvailableVersion {
266 target_kind: VersionTargetKind,
267 id: String,
268 version: Version,
269 },
270 DuplicateInputPort {
271 name: String,
272 },
273 DuplicateOutputPort {
274 name: String,
275 },
276 DuplicateParameter {
277 name: String,
278 },
279 ParameterDefaultTypeMismatch {
280 name: String,
281 expected: ParameterType,
282 got: ParameterType,
283 },
284 InvalidDeriveKeySlot {
285 parameter: String,
286 },
287 SignatureInferenceFailed(SignatureInferenceError),
288 DeclaredSignatureInvalid(ClusterValidationError),
289 MissingRequiredParameter {
291 cluster_id: String,
292 parameter: String,
293 },
294 ParameterBindingTypeMismatch {
296 cluster_id: String,
297 parameter: String,
298 expected: ParameterType,
299 got: ParameterType,
300 },
301 ExposedParameterNotFound {
303 cluster_id: String,
304 parameter: String,
305 referenced: String,
306 },
307 ExposedParameterTypeMismatch {
309 cluster_id: String,
310 parameter: String,
311 expected: ParameterType,
312 got: ParameterType,
313 },
314 UnresolvedExposedBinding {
316 node_id: String,
317 parameter: String,
318 referenced: String,
319 },
320 UndeclaredParameter {
322 node_id: String,
323 parameter: String,
324 },
325 UnmappedBoundaryOutput {
327 port_name: String,
328 node_id: String,
329 },
330 UnmappedNestedOutput {
332 cluster_id: String,
333 port_name: String,
334 },
335}
336
337#[derive(Debug, Clone, PartialEq)]
338#[non_exhaustive]
339pub enum SignatureInferenceError {
340 MissingPrimitive {
341 id: String,
342 version: Version,
343 },
344 MissingNode(String),
345 MissingOutput {
346 impl_id: String,
347 version: Version,
348 output: String,
349 },
350}
351
352#[derive(Debug, Clone, Copy, PartialEq, Eq)]
353pub enum VersionTargetKind {
354 Primitive,
355 Cluster,
356}
357
358impl VersionTargetKind {
359 fn label(self) -> &'static str {
360 match self {
361 Self::Primitive => "primitive",
362 Self::Cluster => "cluster",
363 }
364 }
365}
366
367#[derive(Debug, Clone, PartialEq)]
369#[non_exhaustive]
370pub enum ClusterValidationError {
371 WireabilityExceedsInferred { port_name: String },
373}
374
375impl ErrorInfo for SignatureInferenceError {
376 fn rule_id(&self) -> &'static str {
377 "D.4"
378 }
379
380 fn phase(&self) -> Phase {
381 Phase::Composition
382 }
383
384 fn doc_anchor(&self) -> &'static str {
385 doc_anchor_for_rule(self.rule_id())
386 }
387
388 fn summary(&self) -> Cow<'static, str> {
389 match self {
390 Self::MissingPrimitive { id, version } => {
391 Cow::Owned(format!("Missing primitive '{}@{}'", id, version))
392 }
393 Self::MissingNode(node) => Cow::Owned(format!("Missing node '{}'", node)),
394 Self::MissingOutput {
395 impl_id,
396 version,
397 output,
398 } => Cow::Owned(format!(
399 "Missing output '{}' on primitive '{}@{}'",
400 output, impl_id, version
401 )),
402 }
403 }
404
405 fn path(&self) -> Option<Cow<'static, str>> {
406 Some(Cow::Borrowed("$.output_ports"))
407 }
408
409 fn fix(&self) -> Option<Cow<'static, str>> {
410 Some(Cow::Borrowed(
411 "Ensure all output ports map to existing node outputs",
412 ))
413 }
414}
415
416impl ErrorInfo for ClusterValidationError {
417 fn rule_id(&self) -> &'static str {
418 match self {
419 Self::WireabilityExceedsInferred { .. } => "D.11",
420 }
421 }
422
423 fn phase(&self) -> Phase {
424 Phase::Composition
425 }
426
427 fn doc_anchor(&self) -> &'static str {
428 doc_anchor_for_rule(self.rule_id())
429 }
430
431 fn summary(&self) -> Cow<'static, str> {
432 match self {
433 Self::WireabilityExceedsInferred { port_name } => Cow::Owned(format!(
434 "Declared wireability exceeds inferred for port '{}'",
435 port_name
436 )),
437 }
438 }
439
440 fn path(&self) -> Option<Cow<'static, str>> {
441 Some(Cow::Borrowed("$.declared_signature"))
442 }
443
444 fn fix(&self) -> Option<Cow<'static, str>> {
445 Some(Cow::Borrowed(
446 "Adjust declared wireability to be <= inferred wireability",
447 ))
448 }
449}
450
451impl ErrorInfo for ExpandError {
452 fn rule_id(&self) -> &'static str {
453 match self {
454 Self::InvariantViolation(_) => "E.3",
455 Self::EmptyCluster => "D.1",
456 Self::DuplicateInputPort { .. } => "D.5",
457 Self::DuplicateOutputPort { .. } => "D.6",
458 Self::DuplicateParameter { .. } => "D.9",
459 Self::ParameterDefaultTypeMismatch { .. } => "D.8",
460 Self::InvalidDeriveKeySlot { .. } => "D.8",
461 Self::SignatureInferenceFailed(_) => "D.4",
462 Self::DeclaredSignatureInvalid(_) => "D.10",
463 Self::MissingCluster { .. } => "E.9",
464 Self::InvalidVersionSelector { .. }
465 | Self::UnsatisfiedVersionConstraint { .. }
466 | Self::InvalidAvailableVersion { .. } => "I.6",
467 Self::MissingRequiredParameter { .. } | Self::UnresolvedExposedBinding { .. } => "I.3",
468 Self::ParameterBindingTypeMismatch { .. }
469 | Self::ExposedParameterTypeMismatch { .. } => "I.4",
470 Self::ExposedParameterNotFound { .. } => "I.5",
471 Self::UndeclaredParameter { .. } => "I.7",
472 Self::UnmappedBoundaryOutput { .. } | Self::UnmappedNestedOutput { .. } => "D.4",
473 }
474 }
475
476 fn phase(&self) -> Phase {
477 Phase::Composition
478 }
479
480 fn doc_anchor(&self) -> &'static str {
481 doc_anchor_for_rule(self.rule_id())
482 }
483
484 fn summary(&self) -> Cow<'static, str> {
485 match self {
486 Self::InvariantViolation(msg) => Cow::Owned(msg.clone()),
487 Self::EmptyCluster => Cow::Borrowed("Cluster contains no nodes"),
488 Self::MissingCluster { id, version } => {
489 Cow::Owned(format!("Missing cluster '{}@{}'", id, version))
490 }
491 Self::InvalidVersionSelector {
492 target_kind,
493 id,
494 selector,
495 } => Cow::Owned(format!(
496 "Invalid {} version selector '{}@{}' (expected exact semver or semver constraint)",
497 target_kind.label(),
498 id,
499 selector
500 )),
501 Self::UnsatisfiedVersionConstraint {
502 target_kind,
503 id,
504 selector,
505 available_versions,
506 } => Cow::Owned(format!(
507 "No available {} version for '{}' satisfies selector '{}' (available: {})",
508 target_kind.label(),
509 id,
510 selector,
511 if available_versions.is_empty() {
512 "<none>".to_string()
513 } else {
514 available_versions.join(", ")
515 }
516 )),
517 Self::InvalidAvailableVersion {
518 target_kind,
519 id,
520 version,
521 } => Cow::Owned(format!(
522 "Registered {} version '{}@{}' is not valid semver",
523 target_kind.label(),
524 id,
525 version
526 )),
527 Self::DuplicateInputPort { name } => {
528 Cow::Owned(format!("Duplicate input port name: '{}'", name))
529 }
530 Self::DuplicateOutputPort { name } => {
531 Cow::Owned(format!("Duplicate output port name: '{}'", name))
532 }
533 Self::DuplicateParameter { name } => {
534 Cow::Owned(format!("Duplicate parameter name: '{}'", name))
535 }
536 Self::ParameterDefaultTypeMismatch {
537 name,
538 expected,
539 got,
540 } => Cow::Owned(format!(
541 "Parameter '{}' default has wrong type (expected {:?}, got {:?})",
542 name, expected, got
543 )),
544 Self::InvalidDeriveKeySlot { parameter } => Cow::Owned(format!(
545 "Parameter '{}' has derive_key default with empty slot_name",
546 parameter
547 )),
548 Self::SignatureInferenceFailed(inner) => inner.summary(),
549 Self::DeclaredSignatureInvalid(inner) => inner.summary(),
550 Self::MissingRequiredParameter {
551 cluster_id,
552 parameter,
553 } => Cow::Owned(format!(
554 "Missing required parameter '{}' for cluster '{}'",
555 parameter, cluster_id
556 )),
557 Self::ParameterBindingTypeMismatch {
558 cluster_id,
559 parameter,
560 expected,
561 got,
562 } => Cow::Owned(format!(
563 "Parameter '{}' on cluster '{}' has wrong type (expected {:?}, got {:?})",
564 parameter, cluster_id, expected, got
565 )),
566 Self::ExposedParameterNotFound {
567 cluster_id,
568 parameter,
569 referenced,
570 } => Cow::Owned(format!(
571 "Exposed parameter '{}' on cluster '{}' references missing '{}'",
572 parameter, cluster_id, referenced
573 )),
574 Self::ExposedParameterTypeMismatch {
575 cluster_id,
576 parameter,
577 expected,
578 got,
579 } => Cow::Owned(format!(
580 "Exposed parameter '{}' on cluster '{}' has wrong type (expected {:?}, got {:?})",
581 parameter, cluster_id, expected, got
582 )),
583 Self::UnresolvedExposedBinding {
584 node_id,
585 parameter,
586 referenced,
587 } => Cow::Owned(format!(
588 "Unresolved exposed binding '{}' for parameter '{}' on node '{}'",
589 referenced, parameter, node_id
590 )),
591 Self::UndeclaredParameter { node_id, parameter } => Cow::Owned(format!(
592 "Undeclared parameter '{}' on node '{}' (not in manifest)",
593 parameter, node_id
594 )),
595 Self::UnmappedBoundaryOutput { port_name, .. } => Cow::Owned(format!(
596 "Boundary output '{}' maps to a missing node output",
597 port_name
598 )),
599 Self::UnmappedNestedOutput {
600 cluster_id,
601 port_name,
602 } => Cow::Owned(format!(
603 "Nested output '{}' in cluster '{}' maps to a missing node output",
604 port_name, cluster_id
605 )),
606 }
607 }
608
609 fn path(&self) -> Option<Cow<'static, str>> {
610 match self {
611 Self::InvariantViolation(_) => Some(Cow::Borrowed("$.edges")),
612 Self::EmptyCluster => Some(Cow::Borrowed("$.nodes")),
613 Self::DuplicateInputPort { .. } => Some(Cow::Borrowed("$.input_ports")),
614 Self::DuplicateOutputPort { .. } => Some(Cow::Borrowed("$.output_ports")),
615 Self::DuplicateParameter { .. } => Some(Cow::Borrowed("$.parameters")),
616 Self::ParameterDefaultTypeMismatch { .. } => Some(Cow::Borrowed("$.parameters")),
617 Self::InvalidDeriveKeySlot { .. } => Some(Cow::Borrowed("$.parameters")),
618 Self::SignatureInferenceFailed(_) => Some(Cow::Borrowed("$.output_ports")),
619 Self::DeclaredSignatureInvalid(_) => Some(Cow::Borrowed("$.declared_signature")),
620 Self::InvalidVersionSelector { .. }
621 | Self::UnsatisfiedVersionConstraint { .. }
622 | Self::InvalidAvailableVersion { .. } => Some(Cow::Borrowed("$.nodes")),
623 Self::MissingRequiredParameter { .. }
624 | Self::ParameterBindingTypeMismatch { .. }
625 | Self::ExposedParameterNotFound { .. }
626 | Self::ExposedParameterTypeMismatch { .. }
627 | Self::UnresolvedExposedBinding { .. }
628 | Self::UndeclaredParameter { .. } => Some(Cow::Borrowed("$.nodes")),
629 Self::UnmappedBoundaryOutput { .. } => Some(Cow::Borrowed("$.output_ports")),
630 Self::UnmappedNestedOutput { .. } => Some(Cow::Borrowed("$.nodes")),
631 Self::MissingCluster { .. } => Some(Cow::Borrowed("$.nodes")),
632 }
633 }
634
635 fn fix(&self) -> Option<Cow<'static, str>> {
636 match self {
637 Self::InvariantViolation(_) => None,
638 Self::EmptyCluster => Some(Cow::Borrowed("Add at least one node to the cluster")),
639 Self::MissingCluster { .. } => Some(Cow::Borrowed(
640 "Ensure referenced cluster ID and version exist",
641 )),
642 Self::InvalidVersionSelector { .. } => Some(Cow::Borrowed(
643 "Use strict semver (e.g. '1.2.3') or a semver constraint (e.g. '^1.2')",
644 )),
645 Self::UnsatisfiedVersionConstraint { .. } => Some(Cow::Borrowed(
646 "Publish or reference a version that satisfies the selector",
647 )),
648 Self::InvalidAvailableVersion { .. } => Some(Cow::Borrowed(
649 "Register only strict semver versions in the catalog/cluster loader",
650 )),
651 Self::DuplicateInputPort { name } => Some(Cow::Owned(format!(
652 "Rename input port '{}' to a unique name",
653 name
654 ))),
655 Self::DuplicateOutputPort { name } => Some(Cow::Owned(format!(
656 "Rename output port '{}' to a unique name",
657 name
658 ))),
659 Self::DuplicateParameter { name } => Some(Cow::Owned(format!(
660 "Rename parameter '{}' to a unique name",
661 name
662 ))),
663 Self::ParameterDefaultTypeMismatch { name, expected, .. } => Some(Cow::Owned(format!(
664 "Set default for '{}' to type {:?}",
665 name, expected
666 ))),
667 Self::InvalidDeriveKeySlot { parameter } => Some(Cow::Owned(format!(
668 "Provide a non-empty slot_name for derive_key on parameter '{}'",
669 parameter
670 ))),
671 Self::SignatureInferenceFailed(_) => Some(Cow::Borrowed(
672 "Ensure output ports map to valid node outputs",
673 )),
674 Self::DeclaredSignatureInvalid(_) => Some(Cow::Borrowed(
675 "Align declared signature with the inferred signature",
676 )),
677 Self::MissingRequiredParameter { parameter, .. } => Some(Cow::Owned(format!(
678 "Bind required parameter '{}' or provide a default",
679 parameter
680 ))),
681 Self::ParameterBindingTypeMismatch {
682 parameter,
683 expected,
684 ..
685 } => Some(Cow::Owned(format!(
686 "Bind parameter '{}' with type {:?}",
687 parameter, expected
688 ))),
689 Self::ExposedParameterNotFound { referenced, .. } => Some(Cow::Owned(format!(
690 "Expose an existing parent parameter '{}'",
691 referenced
692 ))),
693 Self::ExposedParameterTypeMismatch {
694 parameter,
695 expected,
696 ..
697 } => Some(Cow::Owned(format!(
698 "Match exposed parameter '{}' type to {:?}",
699 parameter, expected
700 ))),
701 Self::UnresolvedExposedBinding { referenced, .. } => Some(Cow::Owned(format!(
702 "Provide a value for exposed parameter '{}'",
703 referenced
704 ))),
705 Self::UndeclaredParameter { parameter, .. } => Some(Cow::Owned(format!(
706 "Remove binding '{}' or add it to the primitive's manifest parameters",
707 parameter
708 ))),
709 Self::UnmappedBoundaryOutput { port_name, .. } => Some(Cow::Owned(format!(
710 "Map output port '{}' to a valid node output",
711 port_name
712 ))),
713 Self::UnmappedNestedOutput { port_name, .. } => Some(Cow::Owned(format!(
714 "Map nested output '{}' to a valid node output",
715 port_name
716 ))),
717 }
718 }
719}
720
721pub trait ClusterLoader {
722 fn load(&self, id: &str, version: &Version) -> Option<ClusterDefinition>;
723}
724
725pub trait ClusterVersionIndex {
726 fn available_versions(&self, id: &str) -> Vec<Version>;
727}
728
729pub trait PrimitiveCatalog {
730 fn get(&self, id: &str, version: &Version) -> Option<PrimitiveMetadata>;
731}
732
733pub trait PrimitiveVersionIndex {
734 fn available_versions(&self, id: &str) -> Vec<Version>;
735}
736
737pub fn expand<L, C>(
738 cluster_def: &ClusterDefinition,
739 loader: &L,
740 catalog: &C,
741) -> Result<ExpandedGraph, ExpandError>
742where
743 L: ClusterLoader + ClusterVersionIndex,
744 C: PrimitiveCatalog + PrimitiveVersionIndex,
745{
746 validate_cluster_definition(cluster_def)?;
747
748 let mut ctx = ExpandContext::new();
749 let build = expand_with_context(cluster_def, loader, catalog, &mut ctx, &[], &HashMap::new())?;
750
751 let mut graph = build.graph;
752 graph.boundary_inputs = cluster_def.input_ports.clone();
753 graph.boundary_outputs = map_boundary_outputs(
754 &cluster_def.output_ports,
755 &build.node_mapping,
756 &build.cluster_output_map,
757 )?;
758
759 for edge in &graph.edges {
761 if let ExpandedEndpoint::ExternalInput { name } = &edge.to {
762 return Err(ExpandError::InvariantViolation(format!(
763 "E.3: ExternalInput '{}' cannot be edge sink after expansion",
764 name
765 )));
766 }
767 }
768
769 if let Some(declared) = &cluster_def.declared_signature {
770 let inferred =
771 infer_signature(&graph, catalog).map_err(ExpandError::SignatureInferenceFailed)?;
772 validate_declared_signature(declared, &inferred)
773 .map_err(ExpandError::DeclaredSignatureInvalid)?;
774 }
775
776 Ok(graph)
777}
778
779fn parse_available_versions(
780 target_kind: VersionTargetKind,
781 id: &str,
782 available_versions: Vec<Version>,
783) -> Result<Vec<(SemverVersion, Version)>, ExpandError> {
784 let mut parsed = Vec::with_capacity(available_versions.len());
785 for version in available_versions {
786 let semver =
787 SemverVersion::parse(&version).map_err(|_| ExpandError::InvalidAvailableVersion {
788 target_kind,
789 id: id.to_string(),
790 version: version.clone(),
791 })?;
792 parsed.push((semver, version));
793 }
794 parsed.sort_by(|a, b| a.0.cmp(&b.0));
795 Ok(parsed)
796}
797
798fn normalize_available_versions(parsed: &[(SemverVersion, Version)]) -> Vec<Version> {
799 parsed.iter().map(|(_, raw)| raw.clone()).collect()
800}
801
802fn resolve_version_selector(
803 target_kind: VersionTargetKind,
804 id: &str,
805 selector: &Version,
806 available_versions: Vec<Version>,
807) -> Result<Version, ExpandError> {
808 if let Ok(exact) = SemverVersion::parse(selector) {
809 if available_versions.is_empty() {
810 return Ok(exact.to_string());
812 }
813
814 let parsed = parse_available_versions(target_kind, id, available_versions)?;
815 if let Some((matched, _)) = parsed.iter().find(|(candidate, _)| *candidate == exact) {
816 return Ok(matched.to_string());
817 }
818
819 return Err(ExpandError::UnsatisfiedVersionConstraint {
820 target_kind,
821 id: id.to_string(),
822 selector: selector.clone(),
823 available_versions: normalize_available_versions(&parsed),
824 });
825 }
826
827 let req = VersionReq::parse(selector).map_err(|_| ExpandError::InvalidVersionSelector {
828 target_kind,
829 id: id.to_string(),
830 selector: selector.clone(),
831 })?;
832
833 let parsed = parse_available_versions(target_kind, id, available_versions)?;
834 if let Some((matched, _)) = parsed
835 .iter()
836 .rev()
837 .find(|(candidate, _)| req.matches(candidate))
838 {
839 return Ok(matched.to_string());
840 }
841
842 Err(ExpandError::UnsatisfiedVersionConstraint {
843 target_kind,
844 id: id.to_string(),
845 selector: selector.clone(),
846 available_versions: normalize_available_versions(&parsed),
847 })
848}
849
850fn resolve_primitive_version<C: PrimitiveVersionIndex>(
851 catalog: &C,
852 impl_id: &str,
853 selector: &Version,
854) -> Result<Version, ExpandError> {
855 resolve_version_selector(
856 VersionTargetKind::Primitive,
857 impl_id,
858 selector,
859 catalog.available_versions(impl_id),
860 )
861}
862
863fn resolve_cluster_version<L: ClusterVersionIndex>(
864 loader: &L,
865 cluster_id: &str,
866 selector: &Version,
867) -> Result<Version, ExpandError> {
868 resolve_version_selector(
869 VersionTargetKind::Cluster,
870 cluster_id,
871 selector,
872 loader.available_versions(cluster_id),
873 )
874}
875
876fn validate_cluster_definition(cluster_def: &ClusterDefinition) -> Result<(), ExpandError> {
877 let mut input_names = HashSet::new();
878 for input in &cluster_def.input_ports {
879 if !input_names.insert(input.name.clone()) {
880 return Err(ExpandError::DuplicateInputPort {
881 name: input.name.clone(),
882 });
883 }
884 }
885
886 let mut output_names = HashSet::new();
887 for output in &cluster_def.output_ports {
888 if !output_names.insert(output.name.clone()) {
889 return Err(ExpandError::DuplicateOutputPort {
890 name: output.name.clone(),
891 });
892 }
893 }
894
895 let mut parameter_names = HashSet::new();
896 for param in &cluster_def.parameters {
897 if !parameter_names.insert(param.name.clone()) {
898 return Err(ExpandError::DuplicateParameter {
899 name: param.name.clone(),
900 });
901 }
902
903 if let Some(default) = ¶m.default {
904 match default {
905 ParameterDefault::Literal(v) => {
906 let got = parameter_value_type(v);
907 if got != param.ty {
908 return Err(ExpandError::ParameterDefaultTypeMismatch {
909 name: param.name.clone(),
910 expected: param.ty.clone(),
911 got,
912 });
913 }
914 }
915 ParameterDefault::DeriveKey { slot_name } => {
916 if param.ty != ParameterType::String {
917 return Err(ExpandError::ParameterDefaultTypeMismatch {
918 name: param.name.clone(),
919 expected: param.ty.clone(),
920 got: ParameterType::String,
921 });
922 }
923 if slot_name.is_empty() {
924 return Err(ExpandError::InvalidDeriveKeySlot {
925 parameter: param.name.clone(),
926 });
927 }
928 }
929 }
930 }
931 }
932
933 Ok(())
934}
935
936fn parameter_value_type(value: &ParameterValue) -> ParameterType {
937 match value {
938 ParameterValue::Int(_) => ParameterType::Int,
939 ParameterValue::Number(_) => ParameterType::Number,
940 ParameterValue::Bool(_) => ParameterType::Bool,
941 ParameterValue::String(_) => ParameterType::String,
942 ParameterValue::Enum(_) => ParameterType::Enum,
943 }
944}
945
946pub fn infer_signature<C: PrimitiveCatalog>(
956 graph: &ExpandedGraph,
957 catalog: &C,
958) -> Result<Signature, SignatureInferenceError> {
959 let mut node_meta: HashMap<String, PrimitiveMetadata> = HashMap::new();
960 let mut has_side_effects = false;
961
962 for (node_id, node) in &graph.nodes {
963 let meta = catalog
964 .get(&node.implementation.impl_id, &node.implementation.version)
965 .ok_or_else(|| SignatureInferenceError::MissingPrimitive {
966 id: node.implementation.impl_id.clone(),
967 version: node.implementation.version.clone(),
968 })?;
969 if meta.kind == PrimitiveKind::Action {
970 has_side_effects = true;
971 }
972 node_meta.insert(node_id.clone(), meta);
973 }
974
975 let mut inputs: Vec<PortSpec> = Vec::new();
976 for input in &graph.boundary_inputs {
977 let port = PortSpec {
978 name: input.name.clone(),
979 ty: input.maps_to.ty.clone(),
980 cardinality: Cardinality::Single,
981 wireable: false, };
983 debug_assert!(
985 !port.wireable,
986 "Invariant F.1 violated: input port '{}' must not be wireable",
987 port.name
988 );
989 inputs.push(port);
990 }
991
992 let mut outputs: Vec<PortSpec> = Vec::new();
993 let mut has_wireable_outputs = false;
994 let mut wireable_out_types: Vec<ValueType> = Vec::new();
995
996 for output in &graph.boundary_outputs {
997 let meta = node_meta
998 .get(&output.maps_to.node_id)
999 .ok_or_else(|| SignatureInferenceError::MissingNode(output.maps_to.node_id.clone()))?;
1000
1001 let out_meta = meta.outputs.get(&output.maps_to.port_name).ok_or_else(|| {
1002 SignatureInferenceError::MissingOutput {
1003 impl_id: graph
1004 .nodes
1005 .get(&output.maps_to.node_id)
1006 .map(|n| n.implementation.impl_id.clone())
1007 .unwrap_or_default(),
1008 version: graph
1009 .nodes
1010 .get(&output.maps_to.node_id)
1011 .map(|n| n.implementation.version.clone())
1012 .unwrap_or_default(),
1013 output: output.maps_to.port_name.clone(),
1014 }
1015 })?;
1016
1017 let wireable = meta.kind != PrimitiveKind::Action;
1018 if wireable {
1019 has_wireable_outputs = true;
1020 wireable_out_types.push(out_meta.value_type.clone());
1021 }
1022
1023 outputs.push(PortSpec {
1024 name: output.name.clone(),
1025 ty: out_meta.value_type.clone(),
1026 cardinality: out_meta.cardinality.clone(),
1027 wireable,
1028 });
1029 }
1030
1031 let has_wireable_event_out = wireable_out_types
1032 .iter()
1033 .any(|t| matches!(t, ValueType::Event));
1034
1035 let kind = if !has_wireable_outputs {
1036 BoundaryKind::ActionLike
1037 } else if graph.boundary_inputs.is_empty()
1038 && wireable_out_types.iter().all(|t| {
1039 matches!(
1040 t,
1041 ValueType::Number | ValueType::Series | ValueType::Bool | ValueType::String
1042 )
1043 })
1044 {
1045 BoundaryKind::SourceLike
1046 } else if has_wireable_event_out {
1047 BoundaryKind::TriggerLike
1048 } else {
1049 BoundaryKind::ComputeLike
1050 };
1051
1052 let is_origin = graph.boundary_inputs.is_empty() && roots_are_sources(graph, &node_meta);
1053
1054 Ok(Signature {
1055 kind,
1056 inputs,
1057 outputs,
1058 has_side_effects,
1059 is_origin,
1060 })
1061}
1062
1063pub fn validate_declared_signature(
1066 declared: &Signature,
1067 inferred: &Signature,
1068) -> Result<(), ClusterValidationError> {
1069 for declared_port in &declared.outputs {
1071 if let Some(inferred_port) = inferred
1072 .outputs
1073 .iter()
1074 .find(|p| p.name == declared_port.name)
1075 {
1076 if declared_port.wireable && !inferred_port.wireable {
1078 return Err(ClusterValidationError::WireabilityExceedsInferred {
1079 port_name: declared_port.name.clone(),
1080 });
1081 }
1082 }
1083 }
1084
1085 for declared_port in &declared.inputs {
1088 if let Some(inferred_port) = inferred
1089 .inputs
1090 .iter()
1091 .find(|p| p.name == declared_port.name)
1092 {
1093 if declared_port.wireable && !inferred_port.wireable {
1094 return Err(ClusterValidationError::WireabilityExceedsInferred {
1095 port_name: declared_port.name.clone(),
1096 });
1097 }
1098 }
1099 }
1100
1101 Ok(())
1102}
1103
1104fn roots_are_sources(graph: &ExpandedGraph, meta: &HashMap<String, PrimitiveMetadata>) -> bool {
1105 let mut incoming: HashSet<&String> = HashSet::new();
1106 for edge in &graph.edges {
1107 if let (
1108 ExpandedEndpoint::NodePort { node_id: _from, .. },
1109 ExpandedEndpoint::NodePort { node_id: to, .. },
1110 ) = (&edge.from, &edge.to)
1111 {
1112 incoming.insert(to);
1113 }
1114 }
1115
1116 for node_id in graph.nodes.keys() {
1117 if !incoming.contains(node_id) {
1118 if let Some(m) = meta.get(node_id) {
1119 if m.kind != PrimitiveKind::Source {
1120 return false;
1121 }
1122 } else {
1123 return false;
1124 }
1125 }
1126 }
1127
1128 true
1129}
1130
1131#[derive(Debug)]
1132struct ExpandContext {
1133 next_id: usize,
1134}
1135
1136impl ExpandContext {
1137 fn new() -> Self {
1138 Self { next_id: 0 }
1139 }
1140
1141 fn next_runtime_id(&mut self) -> String {
1142 let id = format!("n{}", self.next_id);
1143 self.next_id += 1;
1144 id
1145 }
1146}
1147
1148fn validate_parameter_bindings(
1150 nested_def: &ClusterDefinition,
1151 bindings: &HashMap<String, ParameterBinding>,
1152 parent_parameters: &[ParameterSpec],
1153) -> Result<(), ExpandError> {
1154 let spec_names: std::collections::HashSet<&str> = nested_def
1156 .parameters
1157 .iter()
1158 .map(|s| s.name.as_str())
1159 .collect();
1160 for key in bindings.keys() {
1161 if !spec_names.contains(key.as_str()) {
1162 return Err(ExpandError::UndeclaredParameter {
1163 node_id: nested_def.id.clone(),
1164 parameter: key.clone(),
1165 });
1166 }
1167 }
1168
1169 for param_spec in &nested_def.parameters {
1170 match bindings.get(¶m_spec.name) {
1171 None => {
1172 if param_spec.required && param_spec.default.is_none() {
1174 return Err(ExpandError::MissingRequiredParameter {
1175 cluster_id: nested_def.id.clone(),
1176 parameter: param_spec.name.clone(),
1177 });
1178 }
1179 }
1180 Some(ParameterBinding::Literal { value }) => {
1181 let got = parameter_value_type(value);
1183 if got != param_spec.ty {
1184 return Err(ExpandError::ParameterBindingTypeMismatch {
1185 cluster_id: nested_def.id.clone(),
1186 parameter: param_spec.name.clone(),
1187 expected: param_spec.ty.clone(),
1188 got,
1189 });
1190 }
1191 }
1192 Some(ParameterBinding::Exposed { parent_param }) => {
1193 let parent_spec = parent_parameters.iter().find(|p| &p.name == parent_param);
1196 match parent_spec {
1197 None => {
1198 return Err(ExpandError::ExposedParameterNotFound {
1199 cluster_id: nested_def.id.clone(),
1200 parameter: param_spec.name.clone(),
1201 referenced: parent_param.clone(),
1202 });
1203 }
1204 Some(spec) if spec.ty != param_spec.ty => {
1205 return Err(ExpandError::ExposedParameterTypeMismatch {
1206 cluster_id: nested_def.id.clone(),
1207 parameter: param_spec.name.clone(),
1208 expected: param_spec.ty.clone(),
1209 got: spec.ty.clone(),
1210 });
1211 }
1212 Some(_) => {} }
1214 }
1215 }
1216 }
1217 Ok(())
1218}
1219
1220#[derive(Debug, Clone)]
1221struct ExpandBuild {
1222 graph: ExpandedGraph,
1223 node_mapping: HashMap<NodeId, String>,
1224 placeholder_map: HashMap<String, String>,
1225 cluster_output_map: HashMap<NodeId, HashMap<String, ExpandedEndpoint>>,
1226}
1227
1228fn expand_with_context<L, C>(
1229 cluster_def: &ClusterDefinition,
1230 loader: &L,
1231 catalog: &C,
1232 ctx: &mut ExpandContext,
1233 authoring_prefix: &[(String, NodeId)],
1234 resolved_params: &HashMap<String, ParameterValue>,
1235) -> Result<ExpandBuild, ExpandError>
1236where
1237 L: ClusterLoader + ClusterVersionIndex,
1238 C: PrimitiveCatalog + PrimitiveVersionIndex,
1239{
1240 if cluster_def.nodes.is_empty() {
1241 return Err(ExpandError::EmptyCluster);
1242 }
1243
1244 let placeholder_map =
1245 build_placeholder_map(authoring_prefix, &cluster_def.id, &cluster_def.input_ports);
1246
1247 let mut graph = ExpandedGraph {
1248 nodes: HashMap::new(),
1249 edges: Vec::new(),
1250 boundary_inputs: Vec::new(),
1251 boundary_outputs: Vec::new(),
1252 };
1253 let mut node_mapping: HashMap<NodeId, String> = HashMap::new();
1254 let mut cluster_output_map: HashMap<NodeId, HashMap<String, ExpandedEndpoint>> = HashMap::new();
1255 let mut cluster_input_map: HashMap<NodeId, HashMap<String, String>> = HashMap::new();
1256
1257 let mut sorted_node_ids: Vec<_> = cluster_def.nodes.keys().collect();
1259 sorted_node_ids.sort();
1260 for node_id in sorted_node_ids {
1261 let node = cluster_def.nodes.get(node_id).unwrap();
1262 match &node.kind {
1263 NodeKind::Impl { impl_id, version } => {
1264 let runtime_id = ctx.next_runtime_id();
1265 let mut authoring_path = authoring_prefix.to_vec();
1266 authoring_path.push((cluster_def.id.clone(), node.id.clone()));
1267
1268 let resolved_version = resolve_primitive_version(catalog, impl_id, version)?;
1269 let primitive_meta = catalog.get(impl_id, &resolved_version);
1271
1272 let resolved_bindings = if let Some(ref meta) = primitive_meta {
1276 resolve_impl_parameters(
1277 &node.id,
1278 &meta.parameters,
1279 &node.parameter_bindings,
1280 resolved_params,
1281 )?
1282 } else {
1283 resolve_bindings_with_context(
1285 &node.id,
1286 &node.parameter_bindings,
1287 resolved_params,
1288 )?
1289 };
1290
1291 graph.nodes.insert(
1292 runtime_id.clone(),
1293 ExpandedNode {
1294 runtime_id: runtime_id.clone(),
1295 authoring_path,
1296 implementation: ImplementationInstance {
1297 impl_id: impl_id.clone(),
1298 requested_version: version.clone(),
1299 version: resolved_version,
1300 },
1301 parameters: resolved_bindings,
1302 },
1303 );
1304
1305 node_mapping.insert(node.id.clone(), runtime_id);
1306 }
1307 NodeKind::Cluster {
1308 cluster_id,
1309 version,
1310 } => {
1311 let resolved_cluster_version =
1312 resolve_cluster_version(loader, cluster_id, version)?;
1313 let nested_def = loader
1314 .load(cluster_id, &resolved_cluster_version)
1315 .ok_or_else(|| ExpandError::MissingCluster {
1316 id: cluster_id.clone(),
1317 version: resolved_cluster_version.clone(),
1318 })?;
1319
1320 validate_parameter_bindings(
1322 &nested_def,
1323 &node.parameter_bindings,
1324 &cluster_def.parameters,
1325 )?;
1326
1327 let bound_nested = apply_literal_bindings(&nested_def, &node.parameter_bindings);
1328
1329 let mut nested_prefix = authoring_prefix.to_vec();
1332 nested_prefix.push((cluster_def.id.clone(), node.id.clone()));
1333
1334 let nested_resolved_params = build_resolved_params(
1340 &nested_def.id,
1341 &nested_def.parameters,
1342 &node.parameter_bindings,
1343 resolved_params,
1344 &nested_prefix,
1345 )?;
1346
1347 let nested_build = expand_with_context(
1348 &bound_nested,
1349 loader,
1350 catalog,
1351 ctx,
1352 &nested_prefix,
1353 &nested_resolved_params,
1354 )?;
1355
1356 merge_graph(&mut graph, nested_build.graph);
1357
1358 let mut input_map: HashMap<String, String> = HashMap::new();
1359 for input_port in &bound_nested.input_ports {
1360 if let Some(mapped) = nested_build.placeholder_map.get(&input_port.maps_to.name)
1361 {
1362 input_map.insert(input_port.name.clone(), mapped.clone());
1363 }
1364 }
1365 cluster_input_map.insert(node.id.clone(), input_map);
1366
1367 let mut output_map: HashMap<String, ExpandedEndpoint> = HashMap::new();
1369 for output_port in &bound_nested.output_ports {
1370 let mapped_output = resolve_mapped_output(
1371 &output_port.maps_to,
1372 &nested_build.node_mapping,
1373 &nested_build.cluster_output_map,
1374 )
1375 .ok_or_else(|| ExpandError::UnmappedNestedOutput {
1376 cluster_id: node.id.clone(),
1377 port_name: output_port.name.clone(),
1378 })?;
1379 let ExpandedEndpoint::NodePort { node_id, port_name } = mapped_output else {
1380 return Err(ExpandError::UnmappedNestedOutput {
1381 cluster_id: node.id.clone(),
1382 port_name: output_port.name.clone(),
1383 });
1384 };
1385 output_map.insert(
1386 output_port.name.clone(),
1387 ExpandedEndpoint::NodePort { node_id, port_name },
1388 );
1389 }
1390 cluster_output_map.insert(node.id.clone(), output_map);
1391
1392 for (k, v) in nested_build.node_mapping {
1393 node_mapping.insert(k, v);
1394 }
1395 }
1396 }
1397 }
1398
1399 for edge in &cluster_def.edges {
1400 let from = resolve_output_endpoint(
1401 &edge.from,
1402 &node_mapping,
1403 &cluster_output_map,
1404 authoring_prefix,
1405 &cluster_def.id,
1406 );
1407 let to = resolve_input_endpoint(
1408 &edge.to,
1409 &node_mapping,
1410 &cluster_input_map,
1411 &placeholder_map,
1412 authoring_prefix,
1413 &cluster_def.id,
1414 );
1415
1416 if let ExpandedEndpoint::ExternalInput { name } = &to {
1417 let replaced = redirect_placeholder_edges(&mut graph.edges, name, &from);
1418 if !replaced {
1419 graph.edges.push(ExpandedEdge {
1420 from: from.clone(),
1421 to: to.clone(),
1422 });
1423 }
1424 } else {
1425 graph.edges.push(ExpandedEdge { from, to });
1426 }
1427 }
1428
1429 Ok(ExpandBuild {
1430 graph,
1431 node_mapping,
1432 placeholder_map,
1433 cluster_output_map,
1434 })
1435}
1436
1437fn build_placeholder_map(
1438 authoring_prefix: &[(String, NodeId)],
1439 cluster_id: &str,
1440 input_ports: &[InputPortSpec],
1441) -> HashMap<String, String> {
1442 let mut map = HashMap::new();
1443 for input in input_ports {
1444 let key = external_key(authoring_prefix, cluster_id, &input.maps_to.name);
1445 map.insert(input.maps_to.name.clone(), key);
1446 }
1447 map
1448}
1449
1450fn external_key(authoring_prefix: &[(String, NodeId)], cluster_id: &str, name: &str) -> String {
1451 let mut parts: Vec<String> = authoring_prefix
1452 .iter()
1453 .map(|(c, n)| format!("{}:{}", c, n))
1454 .collect();
1455 parts.push(cluster_id.to_string());
1456 parts.push(name.to_string());
1457 parts.join("/")
1458}
1459
1460fn merge_graph(target: &mut ExpandedGraph, nested: ExpandedGraph) {
1461 for (id, node) in nested.nodes {
1462 target.nodes.insert(id, node);
1463 }
1464 target.edges.extend(nested.edges);
1465}
1466
1467fn resolve_output_endpoint(
1468 output: &OutputRef,
1469 node_mapping: &HashMap<NodeId, String>,
1470 cluster_output_map: &HashMap<NodeId, HashMap<String, ExpandedEndpoint>>,
1471 authoring_prefix: &[(String, NodeId)],
1472 cluster_id: &str,
1473) -> ExpandedEndpoint {
1474 if let Some(node_id) = node_mapping.get(&output.node_id) {
1475 return ExpandedEndpoint::NodePort {
1476 node_id: node_id.clone(),
1477 port_name: output.port_name.clone(),
1478 };
1479 }
1480
1481 if let Some(map) = cluster_output_map.get(&output.node_id) {
1482 if let Some(ep) = map.get(&output.port_name) {
1483 return ep.clone();
1484 }
1485 }
1486
1487 ExpandedEndpoint::ExternalInput {
1488 name: external_key(authoring_prefix, cluster_id, &output.node_id),
1489 }
1490}
1491
1492fn resolve_mapped_output(
1493 output: &OutputRef,
1494 node_mapping: &HashMap<NodeId, String>,
1495 cluster_output_map: &HashMap<NodeId, HashMap<String, ExpandedEndpoint>>,
1496) -> Option<ExpandedEndpoint> {
1497 if let Some(node_id) = node_mapping.get(&output.node_id) {
1498 return Some(ExpandedEndpoint::NodePort {
1499 node_id: node_id.clone(),
1500 port_name: output.port_name.clone(),
1501 });
1502 }
1503
1504 cluster_output_map
1505 .get(&output.node_id)
1506 .and_then(|map| map.get(&output.port_name).cloned())
1507}
1508
1509fn resolve_input_endpoint(
1510 input: &InputRef,
1511 node_mapping: &HashMap<NodeId, String>,
1512 cluster_input_map: &HashMap<NodeId, HashMap<String, String>>,
1513 placeholder_map: &HashMap<String, String>,
1514 authoring_prefix: &[(String, NodeId)],
1515 cluster_id: &str,
1516) -> ExpandedEndpoint {
1517 if let Some(node_id) = node_mapping.get(&input.node_id) {
1518 return ExpandedEndpoint::NodePort {
1519 node_id: node_id.clone(),
1520 port_name: input.port_name.clone(),
1521 };
1522 }
1523
1524 if let Some(map) = cluster_input_map.get(&input.node_id) {
1525 if let Some(name) = map.get(&input.port_name) {
1526 return ExpandedEndpoint::ExternalInput { name: name.clone() };
1527 }
1528 }
1529
1530 if let Some(name) = placeholder_map.get(&input.node_id) {
1531 return ExpandedEndpoint::ExternalInput { name: name.clone() };
1532 }
1533
1534 ExpandedEndpoint::ExternalInput {
1535 name: external_key(authoring_prefix, cluster_id, &input.node_id),
1536 }
1537}
1538
1539fn redirect_placeholder_edges(
1540 edges: &mut [ExpandedEdge],
1541 placeholder: &str,
1542 source: &ExpandedEndpoint,
1543) -> bool {
1544 let mut replaced = false;
1545 for edge in edges.iter_mut() {
1546 if let ExpandedEndpoint::ExternalInput { name } = &edge.from {
1547 if name == placeholder {
1548 edge.from = source.clone();
1549 replaced = true;
1550 }
1551 }
1552 }
1553 replaced
1554}
1555
1556fn apply_literal_bindings(
1557 cluster_def: &ClusterDefinition,
1558 bindings: &HashMap<String, ParameterBinding>,
1559) -> ClusterDefinition {
1560 let mut updated = cluster_def.clone();
1562 for node in updated.nodes.values_mut() {
1563 for binding in node.parameter_bindings.values_mut() {
1564 if let ParameterBinding::Exposed { parent_param } = binding {
1565 if let Some(ParameterBinding::Literal { value }) = bindings.get(parent_param) {
1566 *binding = ParameterBinding::Literal {
1567 value: value.clone(),
1568 };
1569 }
1570 }
1571 }
1572 }
1573 updated
1574}
1575
1576fn resolve_bindings_with_context(
1581 node_id: &str,
1582 bindings: &HashMap<String, ParameterBinding>,
1583 resolved_params: &HashMap<String, ParameterValue>,
1584) -> Result<HashMap<String, ParameterValue>, ExpandError> {
1585 let mut result = HashMap::new();
1586 for (name, binding) in bindings {
1587 match binding {
1588 ParameterBinding::Literal { value } => {
1589 result.insert(name.clone(), value.clone());
1590 }
1591 ParameterBinding::Exposed { parent_param } => {
1592 if let Some(value) = resolved_params.get(parent_param) {
1594 result.insert(name.clone(), value.clone());
1595 } else {
1596 return Err(ExpandError::UnresolvedExposedBinding {
1597 node_id: node_id.to_string(),
1598 parameter: name.clone(),
1599 referenced: parent_param.clone(),
1600 });
1601 }
1602 }
1603 }
1604 }
1605 Ok(result)
1606}
1607
1608fn resolve_impl_parameters(
1613 node_id: &str,
1614 specs: &[ParameterMetadata],
1615 bindings: &HashMap<String, ParameterBinding>,
1616 parent_resolved: &HashMap<String, ParameterValue>,
1617) -> Result<HashMap<String, ParameterValue>, ExpandError> {
1618 let spec_names: std::collections::HashSet<&str> =
1620 specs.iter().map(|s| s.name.as_str()).collect();
1621 for key in bindings.keys() {
1622 if !spec_names.contains(key.as_str()) {
1623 return Err(ExpandError::UndeclaredParameter {
1624 node_id: node_id.to_string(),
1625 parameter: key.clone(),
1626 });
1627 }
1628 }
1629
1630 let mut result = HashMap::new();
1631
1632 for spec in specs {
1633 match bindings.get(&spec.name) {
1634 Some(ParameterBinding::Literal { value }) => {
1635 result.insert(spec.name.clone(), value.clone());
1636 }
1637 Some(ParameterBinding::Exposed { parent_param }) => {
1638 if let Some(value) = parent_resolved.get(parent_param) {
1639 result.insert(spec.name.clone(), value.clone());
1640 } else {
1641 return Err(ExpandError::UnresolvedExposedBinding {
1642 node_id: node_id.to_string(),
1643 parameter: spec.name.clone(),
1644 referenced: parent_param.clone(),
1645 });
1646 }
1647 }
1648 None => {
1649 if let Some(default) = &spec.default {
1651 result.insert(spec.name.clone(), default.clone());
1652 } else if spec.required {
1653 return Err(ExpandError::MissingRequiredParameter {
1654 cluster_id: node_id.to_string(),
1655 parameter: spec.name.clone(),
1656 });
1657 }
1658 }
1660 }
1661 }
1662
1663 Ok(result)
1664}
1665
1666fn build_resolved_params(
1675 cluster_id: &str,
1676 specs: &[ParameterSpec],
1677 bindings: &HashMap<String, ParameterBinding>,
1678 resolved_params: &HashMap<String, ParameterValue>,
1679 authoring_path: &[(String, NodeId)],
1680) -> Result<HashMap<String, ParameterValue>, ExpandError> {
1681 let spec_names: std::collections::HashSet<&str> =
1683 specs.iter().map(|s| s.name.as_str()).collect();
1684 for key in bindings.keys() {
1685 if !spec_names.contains(key.as_str()) {
1686 return Err(ExpandError::UndeclaredParameter {
1687 node_id: cluster_id.to_string(),
1688 parameter: key.clone(),
1689 });
1690 }
1691 }
1692
1693 let mut result = HashMap::new();
1694
1695 for spec in specs {
1696 match bindings.get(&spec.name) {
1697 Some(ParameterBinding::Literal { value }) => {
1698 result.insert(spec.name.clone(), value.clone());
1699 }
1700 Some(ParameterBinding::Exposed { parent_param }) => {
1701 if let Some(value) = resolved_params.get(parent_param) {
1702 result.insert(spec.name.clone(), value.clone());
1703 } else {
1704 return Err(ExpandError::UnresolvedExposedBinding {
1705 node_id: cluster_id.to_string(),
1706 parameter: spec.name.clone(),
1707 referenced: parent_param.clone(),
1708 });
1709 }
1710 }
1711 None => {
1712 if let Some(default) = &spec.default {
1714 match default {
1715 ParameterDefault::Literal(v) => {
1716 result.insert(spec.name.clone(), v.clone());
1717 }
1718 ParameterDefault::DeriveKey { slot_name } => {
1719 result.insert(
1720 spec.name.clone(),
1721 ParameterValue::String(derive_key(authoring_path, slot_name)),
1722 );
1723 }
1724 }
1725 } else if spec.required {
1726 return Err(ExpandError::MissingRequiredParameter {
1727 cluster_id: cluster_id.to_string(),
1728 parameter: spec.name.clone(),
1729 });
1730 }
1731 }
1733 }
1734 }
1735
1736 Ok(result)
1737}
1738
1739fn map_boundary_outputs(
1741 outputs: &[OutputPortSpec],
1742 mapping: &HashMap<NodeId, String>,
1743 cluster_output_map: &HashMap<NodeId, HashMap<String, ExpandedEndpoint>>,
1744) -> Result<Vec<OutputPortSpec>, ExpandError> {
1745 let mut result = Vec::with_capacity(outputs.len());
1746 for o in outputs {
1747 let mapped_output = resolve_mapped_output(&o.maps_to, mapping, cluster_output_map)
1748 .ok_or_else(|| ExpandError::UnmappedBoundaryOutput {
1749 port_name: o.name.clone(),
1750 node_id: o.maps_to.node_id.clone(),
1751 })?;
1752 let ExpandedEndpoint::NodePort { node_id, port_name } = mapped_output else {
1753 return Err(ExpandError::UnmappedBoundaryOutput {
1754 port_name: o.name.clone(),
1755 node_id: o.maps_to.node_id.clone(),
1756 });
1757 };
1758 result.push(OutputPortSpec {
1759 name: o.name.clone(),
1760 maps_to: OutputRef { node_id, port_name },
1761 });
1762 }
1763 Ok(result)
1764}
1765
1766pub fn derive_key(authoring_path: &[(String, NodeId)], slot_name: &str) -> String {
1776 let mut parts = Vec::new();
1777 for (cluster_id, node_id) in authoring_path {
1778 parts.push(format!("{}:{}", cluster_id.len(), cluster_id));
1779 parts.push(format!("{}:{}", node_id.len(), node_id));
1780 }
1781 parts.push(format!("{}:{}", slot_name.len(), slot_name));
1782 format!("__ergo/{}", parts.join("/"))
1783}
1784
1785#[cfg(test)]
1786mod tests;