1use crate::lagrange::CompositionNode;
2use noether_core::capability::Capability;
3use noether_core::effects::{Effect, EffectKind, EffectSet};
4use noether_core::stage::StageId;
5use noether_core::types::{is_subtype_of, IncompatibilityReason, NType, TypeCompatibility};
6use noether_store::StageStore;
7use std::collections::{BTreeMap, BTreeSet};
8use std::fmt;
9
10#[derive(Debug, Clone)]
12pub struct ResolvedType {
13 pub input: NType,
14 pub output: NType,
15}
16
17#[derive(Debug, Clone, Default)]
24pub struct CapabilityPolicy {
25 pub allowed: BTreeSet<Capability>,
27}
28
29impl CapabilityPolicy {
30 pub fn allow_all() -> Self {
32 Self {
33 allowed: BTreeSet::new(),
34 }
35 }
36
37 pub fn restrict(caps: impl IntoIterator<Item = Capability>) -> Self {
39 Self {
40 allowed: caps.into_iter().collect(),
41 }
42 }
43
44 fn is_allowed(&self, cap: &Capability) -> bool {
45 self.allowed.is_empty() || self.allowed.contains(cap)
46 }
47}
48
49#[derive(Debug, Clone)]
51pub struct CapabilityViolation {
52 pub stage_id: StageId,
53 pub required: Capability,
54 pub message: String,
55}
56
57impl fmt::Display for CapabilityViolation {
58 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 write!(
60 f,
61 "stage {} requires capability {:?} which is not granted",
62 self.stage_id.0, self.required
63 )
64 }
65}
66
67pub fn check_capabilities(
70 node: &CompositionNode,
71 store: &(impl StageStore + ?Sized),
72 policy: &CapabilityPolicy,
73) -> Vec<CapabilityViolation> {
74 let mut violations = Vec::new();
75 collect_capability_violations(node, store, policy, &mut violations);
76 violations
77}
78
79fn collect_capability_violations(
80 node: &CompositionNode,
81 store: &(impl StageStore + ?Sized),
82 policy: &CapabilityPolicy,
83 violations: &mut Vec<CapabilityViolation>,
84) {
85 match node {
86 CompositionNode::Stage { id, .. } => {
87 if let Ok(Some(stage)) = store.get(id) {
88 for cap in &stage.capabilities {
89 if !policy.is_allowed(cap) {
90 violations.push(CapabilityViolation {
91 stage_id: id.clone(),
92 required: cap.clone(),
93 message: format!(
94 "stage '{}' requires {:?}; grant it with --allow-capabilities",
95 stage.description, cap
96 ),
97 });
98 }
99 }
100 }
101 }
102 CompositionNode::RemoteStage { .. } => {} CompositionNode::Const { .. } => {} CompositionNode::Sequential { stages } => {
105 for s in stages {
106 collect_capability_violations(s, store, policy, violations);
107 }
108 }
109 CompositionNode::Parallel { branches } => {
110 for branch in branches.values() {
111 collect_capability_violations(branch, store, policy, violations);
112 }
113 }
114 CompositionNode::Branch {
115 predicate,
116 if_true,
117 if_false,
118 } => {
119 collect_capability_violations(predicate, store, policy, violations);
120 collect_capability_violations(if_true, store, policy, violations);
121 collect_capability_violations(if_false, store, policy, violations);
122 }
123 CompositionNode::Fanout { source, targets } => {
124 collect_capability_violations(source, store, policy, violations);
125 for t in targets {
126 collect_capability_violations(t, store, policy, violations);
127 }
128 }
129 CompositionNode::Merge { sources, target } => {
130 for s in sources {
131 collect_capability_violations(s, store, policy, violations);
132 }
133 collect_capability_violations(target, store, policy, violations);
134 }
135 CompositionNode::Retry { stage, .. } => {
136 collect_capability_violations(stage, store, policy, violations);
137 }
138 CompositionNode::Let { bindings, body } => {
139 for b in bindings.values() {
140 collect_capability_violations(b, store, policy, violations);
141 }
142 collect_capability_violations(body, store, policy, violations);
143 }
144 }
145}
146
147#[derive(Debug, Clone, Default)]
155pub struct EffectPolicy {
156 pub allowed: BTreeSet<EffectKind>,
158}
159
160impl EffectPolicy {
161 pub fn allow_all() -> Self {
163 Self {
164 allowed: BTreeSet::new(),
165 }
166 }
167
168 pub fn restrict(kinds: impl IntoIterator<Item = EffectKind>) -> Self {
170 Self {
171 allowed: kinds.into_iter().collect(),
172 }
173 }
174
175 pub fn is_allowed(&self, kind: &EffectKind) -> bool {
176 self.allowed.is_empty() || self.allowed.contains(kind)
177 }
178}
179
180#[derive(Debug, Clone)]
182pub struct EffectViolation {
183 pub stage_id: StageId,
184 pub effect: Effect,
185 pub message: String,
186}
187
188impl fmt::Display for EffectViolation {
189 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190 write!(f, "{}", self.message)
191 }
192}
193
194pub fn infer_effects(node: &CompositionNode, store: &(impl StageStore + ?Sized)) -> EffectSet {
198 let mut effects: BTreeSet<Effect> = BTreeSet::new();
199 collect_effects_inner(node, store, &mut effects);
200 EffectSet::new(effects)
201}
202
203fn collect_effects_inner(
204 node: &CompositionNode,
205 store: &(impl StageStore + ?Sized),
206 effects: &mut BTreeSet<Effect>,
207) {
208 match node {
209 CompositionNode::Stage { id, .. } => match store.get(id) {
210 Ok(Some(stage)) => {
211 for e in stage.signature.effects.iter() {
212 effects.insert(e.clone());
213 }
214 }
215 _ => {
216 effects.insert(Effect::Unknown);
217 }
218 },
219 CompositionNode::RemoteStage { .. } => {
220 effects.insert(Effect::Network);
221 effects.insert(Effect::Fallible);
222 }
223 CompositionNode::Const { .. } => {
224 effects.insert(Effect::Pure);
225 }
226 CompositionNode::Sequential { stages } => {
227 for s in stages {
228 collect_effects_inner(s, store, effects);
229 }
230 }
231 CompositionNode::Parallel { branches } => {
232 for branch in branches.values() {
233 collect_effects_inner(branch, store, effects);
234 }
235 }
236 CompositionNode::Branch {
237 predicate,
238 if_true,
239 if_false,
240 } => {
241 collect_effects_inner(predicate, store, effects);
242 collect_effects_inner(if_true, store, effects);
243 collect_effects_inner(if_false, store, effects);
244 }
245 CompositionNode::Fanout { source, targets } => {
246 collect_effects_inner(source, store, effects);
247 for t in targets {
248 collect_effects_inner(t, store, effects);
249 }
250 }
251 CompositionNode::Merge { sources, target } => {
252 for s in sources {
253 collect_effects_inner(s, store, effects);
254 }
255 collect_effects_inner(target, store, effects);
256 }
257 CompositionNode::Retry { stage, .. } => {
258 collect_effects_inner(stage, store, effects);
259 }
260 CompositionNode::Let { bindings, body } => {
261 for b in bindings.values() {
262 collect_effects_inner(b, store, effects);
263 }
264 collect_effects_inner(body, store, effects);
265 }
266 }
267}
268
269pub fn check_effects(
272 node: &CompositionNode,
273 store: &(impl StageStore + ?Sized),
274 policy: &EffectPolicy,
275) -> Vec<EffectViolation> {
276 let mut violations = Vec::new();
277 collect_effect_violations(node, store, policy, &mut violations);
278 violations
279}
280
281fn collect_effect_violations(
282 node: &CompositionNode,
283 store: &(impl StageStore + ?Sized),
284 policy: &EffectPolicy,
285 violations: &mut Vec<EffectViolation>,
286) {
287 match node {
288 CompositionNode::Stage { id, .. } => match store.get(id) {
289 Ok(Some(stage)) => {
290 for effect in stage.signature.effects.iter() {
291 let kind = effect.kind();
292 if !policy.is_allowed(&kind) {
293 violations.push(EffectViolation {
294 stage_id: id.clone(),
295 effect: effect.clone(),
296 message: format!(
297 "stage '{}' declares effect {kind}; grant it with --allow-effects {kind}",
298 stage.description
299 ),
300 });
301 }
302 }
303 }
304 _ => {
305 let kind = EffectKind::Unknown;
306 if !policy.is_allowed(&kind) {
307 violations.push(EffectViolation {
308 stage_id: id.clone(),
309 effect: Effect::Unknown,
310 message: format!(
311 "stage {} has unknown effects (not in store); grant with --allow-effects unknown",
312 id.0
313 ),
314 });
315 }
316 }
317 },
318 CompositionNode::RemoteStage { .. } => {
319 for effect in &[Effect::Network, Effect::Fallible] {
320 let kind = effect.kind();
321 if !policy.is_allowed(&kind) {
322 violations.push(EffectViolation {
323 stage_id: StageId("remote".into()),
324 effect: effect.clone(),
325 message: format!(
326 "RemoteStage declares implicit effect {kind}; grant with --allow-effects {kind}"
327 ),
328 });
329 }
330 }
331 }
332 CompositionNode::Const { .. } => {}
333 CompositionNode::Sequential { stages } => {
334 for s in stages {
335 collect_effect_violations(s, store, policy, violations);
336 }
337 }
338 CompositionNode::Parallel { branches } => {
339 for branch in branches.values() {
340 collect_effect_violations(branch, store, policy, violations);
341 }
342 }
343 CompositionNode::Branch {
344 predicate,
345 if_true,
346 if_false,
347 } => {
348 collect_effect_violations(predicate, store, policy, violations);
349 collect_effect_violations(if_true, store, policy, violations);
350 collect_effect_violations(if_false, store, policy, violations);
351 }
352 CompositionNode::Fanout { source, targets } => {
353 collect_effect_violations(source, store, policy, violations);
354 for t in targets {
355 collect_effect_violations(t, store, policy, violations);
356 }
357 }
358 CompositionNode::Merge { sources, target } => {
359 for s in sources {
360 collect_effect_violations(s, store, policy, violations);
361 }
362 collect_effect_violations(target, store, policy, violations);
363 }
364 CompositionNode::Retry { stage, .. } => {
365 collect_effect_violations(stage, store, policy, violations);
366 }
367 CompositionNode::Let { bindings, body } => {
368 for b in bindings.values() {
369 collect_effect_violations(b, store, policy, violations);
370 }
371 collect_effect_violations(body, store, policy, violations);
372 }
373 }
374}
375
376#[derive(Debug, Clone, PartialEq, Eq)]
380pub enum SignatureViolationKind {
381 Missing,
383 Invalid,
385}
386
387impl fmt::Display for SignatureViolationKind {
388 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
389 match self {
390 Self::Missing => write!(f, "unsigned"),
391 Self::Invalid => write!(f, "invalid signature"),
392 }
393 }
394}
395
396#[derive(Debug, Clone)]
398pub struct SignatureViolation {
399 pub stage_id: StageId,
400 pub kind: SignatureViolationKind,
401 pub message: String,
402}
403
404impl fmt::Display for SignatureViolation {
405 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
406 write!(f, "stage {} — {}", self.stage_id.0, self.message)
407 }
408}
409
410pub fn verify_signatures(
415 node: &CompositionNode,
416 store: &(impl StageStore + ?Sized),
417) -> Vec<SignatureViolation> {
418 let mut violations = Vec::new();
419 collect_signature_violations(node, store, &mut violations);
420 violations
421}
422
423fn collect_signature_violations(
424 node: &CompositionNode,
425 store: &(impl StageStore + ?Sized),
426 violations: &mut Vec<SignatureViolation>,
427) {
428 match node {
429 CompositionNode::Stage { id, .. } => {
430 if let Ok(Some(stage)) = store.get(id) {
431 match (&stage.ed25519_signature, &stage.signer_public_key) {
432 (None, _) | (_, None) => {
433 violations.push(SignatureViolation {
434 stage_id: id.clone(),
435 kind: SignatureViolationKind::Missing,
436 message: format!(
437 "stage '{}' has no signature — add it via the signing pipeline",
438 stage.description
439 ),
440 });
441 }
442 (Some(sig_hex), Some(pub_hex)) => {
443 match noether_core::stage::verify_stage_signature(id, sig_hex, pub_hex) {
444 Ok(true) => {} Ok(false) => {
446 violations.push(SignatureViolation {
447 stage_id: id.clone(),
448 kind: SignatureViolationKind::Invalid,
449 message: format!(
450 "stage '{}' signature verification failed — possible tampering",
451 stage.description
452 ),
453 });
454 }
455 Err(e) => {
456 violations.push(SignatureViolation {
457 stage_id: id.clone(),
458 kind: SignatureViolationKind::Invalid,
459 message: format!(
460 "stage '{}' signature could not be decoded: {e}",
461 stage.description
462 ),
463 });
464 }
465 }
466 }
467 }
468 }
469 }
472 CompositionNode::Const { .. } => {} CompositionNode::RemoteStage { .. } => {} CompositionNode::Sequential { stages } => {
475 for s in stages {
476 collect_signature_violations(s, store, violations);
477 }
478 }
479 CompositionNode::Parallel { branches } => {
480 for branch in branches.values() {
481 collect_signature_violations(branch, store, violations);
482 }
483 }
484 CompositionNode::Branch {
485 predicate,
486 if_true,
487 if_false,
488 } => {
489 collect_signature_violations(predicate, store, violations);
490 collect_signature_violations(if_true, store, violations);
491 collect_signature_violations(if_false, store, violations);
492 }
493 CompositionNode::Fanout { source, targets } => {
494 collect_signature_violations(source, store, violations);
495 for t in targets {
496 collect_signature_violations(t, store, violations);
497 }
498 }
499 CompositionNode::Merge { sources, target } => {
500 for s in sources {
501 collect_signature_violations(s, store, violations);
502 }
503 collect_signature_violations(target, store, violations);
504 }
505 CompositionNode::Retry { stage, .. } => {
506 collect_signature_violations(stage, store, violations);
507 }
508 CompositionNode::Let { bindings, body } => {
509 for b in bindings.values() {
510 collect_signature_violations(b, store, violations);
511 }
512 collect_signature_violations(body, store, violations);
513 }
514 }
515}
516
517#[derive(Debug, Clone)]
524pub enum EffectWarning {
525 FallibleWithoutRetry { stage_id: StageId },
527 NonDeterministicFeedingPure { from: StageId, to: StageId },
529 CostBudgetExceeded { total_cents: u64, budget_cents: u64 },
531}
532
533impl fmt::Display for EffectWarning {
534 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
535 match self {
536 EffectWarning::FallibleWithoutRetry { stage_id } => write!(
537 f,
538 "stage {} is Fallible but has no Retry wrapper; failures will propagate",
539 stage_id.0
540 ),
541 EffectWarning::NonDeterministicFeedingPure { from, to } => write!(
542 f,
543 "stage {} is NonDeterministic but feeds Pure stage {}; Pure caching will be bypassed",
544 from.0, to.0
545 ),
546 EffectWarning::CostBudgetExceeded { total_cents, budget_cents } => write!(
547 f,
548 "estimated composition cost ({total_cents}¢) exceeds budget ({budget_cents}¢)"
549 ),
550 }
551 }
552}
553
554#[derive(Debug, Clone)]
556pub struct CheckResult {
557 pub resolved: ResolvedType,
558 pub warnings: Vec<EffectWarning>,
559}
560
561#[derive(Debug, Clone)]
563pub enum GraphTypeError {
564 StageNotFound {
565 id: StageId,
566 },
567 SequentialTypeMismatch {
568 position: usize,
569 from_output: NType,
570 to_input: NType,
571 reason: IncompatibilityReason,
572 },
573 BranchPredicateNotBool {
574 actual: NType,
575 },
576 BranchOutputMismatch {
577 true_output: NType,
578 false_output: NType,
579 reason: IncompatibilityReason,
580 },
581 FanoutInputMismatch {
582 target_index: usize,
583 source_output: NType,
584 target_input: NType,
585 reason: IncompatibilityReason,
586 },
587 MergeOutputMismatch {
588 merged_type: NType,
589 target_input: NType,
590 reason: IncompatibilityReason,
591 },
592 EmptyNode {
593 operator: String,
594 },
595}
596
597impl fmt::Display for GraphTypeError {
598 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
599 match self {
600 GraphTypeError::StageNotFound { id } => {
601 write!(f, "stage {} not found in store", id.0)
602 }
603 GraphTypeError::SequentialTypeMismatch {
604 position,
605 from_output,
606 to_input,
607 reason,
608 } => write!(
609 f,
610 "type mismatch at position {position}: output {from_output} is not subtype of input {to_input}: {reason}"
611 ),
612 GraphTypeError::BranchPredicateNotBool { actual } => {
613 write!(f, "branch predicate must produce Bool, got {actual}")
614 }
615 GraphTypeError::BranchOutputMismatch {
616 true_output,
617 false_output,
618 reason,
619 } => write!(
620 f,
621 "branch outputs must be compatible: if_true produces {true_output}, if_false produces {false_output}: {reason}"
622 ),
623 GraphTypeError::FanoutInputMismatch {
624 target_index,
625 source_output,
626 target_input,
627 reason,
628 } => write!(
629 f,
630 "fanout target {target_index}: source output {source_output} is not subtype of target input {target_input}: {reason}"
631 ),
632 GraphTypeError::MergeOutputMismatch {
633 merged_type,
634 target_input,
635 reason,
636 } => write!(
637 f,
638 "merge: merged type {merged_type} is not subtype of target input {target_input}: {reason}"
639 ),
640 GraphTypeError::EmptyNode { operator } => {
641 write!(f, "empty {operator} node")
642 }
643 }
644 }
645}
646
647pub fn check_graph(
652 node: &CompositionNode,
653 store: &(impl StageStore + ?Sized),
654) -> Result<CheckResult, Vec<GraphTypeError>> {
655 let mut errors = Vec::new();
656 let result = check_node(node, store, &mut errors);
657 if errors.is_empty() {
658 let resolved = result.unwrap();
659 let warnings = collect_effect_warnings(node, store, None);
660 Ok(CheckResult { resolved, warnings })
661 } else {
662 Err(errors)
663 }
664}
665
666pub fn collect_effect_warnings(
669 node: &CompositionNode,
670 store: &(impl StageStore + ?Sized),
671 cost_budget_cents: Option<u64>,
672) -> Vec<EffectWarning> {
673 let mut warnings = Vec::new();
674 let mut total_cost: u64 = 0;
675 collect_warnings_inner(node, store, &mut warnings, &mut total_cost, false);
676 if let Some(budget) = cost_budget_cents {
677 if total_cost > budget {
678 warnings.push(EffectWarning::CostBudgetExceeded {
679 total_cents: total_cost,
680 budget_cents: budget,
681 });
682 }
683 }
684 warnings
685}
686
687fn collect_warnings_inner(
688 node: &CompositionNode,
689 store: &(impl StageStore + ?Sized),
690 warnings: &mut Vec<EffectWarning>,
691 total_cost: &mut u64,
692 _parent_is_retry: bool,
693) {
694 match node {
695 CompositionNode::Stage { id, .. } => {
696 if let Ok(Some(stage)) = store.get(id) {
697 for effect in stage.signature.effects.iter() {
699 if let Effect::Cost { cents } = effect {
700 *total_cost = total_cost.saturating_add(*cents);
701 }
702 }
703 }
705 }
706 CompositionNode::RemoteStage { .. } => {} CompositionNode::Const { .. } => {} CompositionNode::Sequential { stages } => {
709 for (i, s) in stages.iter().enumerate() {
710 collect_warnings_inner(s, store, warnings, total_cost, false);
711
712 if let CompositionNode::Stage { id, .. } = s {
714 if let Ok(Some(stage)) = store.get(id) {
715 if stage.signature.effects.contains(&Effect::Fallible) {
716 warnings.push(EffectWarning::FallibleWithoutRetry {
717 stage_id: id.clone(),
718 });
719 }
720 }
721 }
722
723 if i + 1 < stages.len() {
725 if let (
726 CompositionNode::Stage { id: from_id, .. },
727 CompositionNode::Stage { id: to_id, .. },
728 ) = (s, &stages[i + 1])
729 {
730 let from_nd = store
731 .get(from_id)
732 .ok()
733 .flatten()
734 .map(|s| s.signature.effects.contains(&Effect::NonDeterministic))
735 .unwrap_or(false);
736 let to_pure = store
737 .get(to_id)
738 .ok()
739 .flatten()
740 .map(|s| s.signature.effects.contains(&Effect::Pure))
741 .unwrap_or(false);
742
743 if from_nd && to_pure {
744 warnings.push(EffectWarning::NonDeterministicFeedingPure {
745 from: from_id.clone(),
746 to: to_id.clone(),
747 });
748 }
749 }
750 }
751 }
752 }
753 CompositionNode::Parallel { branches } => {
754 for branch in branches.values() {
755 collect_warnings_inner(branch, store, warnings, total_cost, false);
756 }
757 }
758 CompositionNode::Branch {
759 predicate,
760 if_true,
761 if_false,
762 } => {
763 collect_warnings_inner(predicate, store, warnings, total_cost, false);
764 collect_warnings_inner(if_true, store, warnings, total_cost, false);
765 collect_warnings_inner(if_false, store, warnings, total_cost, false);
766 }
767 CompositionNode::Fanout { source, targets } => {
768 collect_warnings_inner(source, store, warnings, total_cost, false);
769 for t in targets {
770 collect_warnings_inner(t, store, warnings, total_cost, false);
771 }
772 }
773 CompositionNode::Merge { sources, target } => {
774 for s in sources {
775 collect_warnings_inner(s, store, warnings, total_cost, false);
776 }
777 collect_warnings_inner(target, store, warnings, total_cost, false);
778 }
779 CompositionNode::Retry { stage, .. } => {
780 collect_warnings_inner(stage, store, warnings, total_cost, true);
782 if let CompositionNode::Stage { id, .. } = stage.as_ref() {
784 warnings.retain(|w| !matches!(w, EffectWarning::FallibleWithoutRetry { stage_id } if stage_id == id));
785 }
786 }
787 CompositionNode::Let { bindings, body } => {
788 for b in bindings.values() {
789 collect_warnings_inner(b, store, warnings, total_cost, false);
790 }
791 collect_warnings_inner(body, store, warnings, total_cost, false);
792 }
793 }
794}
795
796fn check_node(
797 node: &CompositionNode,
798 store: &(impl StageStore + ?Sized),
799 errors: &mut Vec<GraphTypeError>,
800) -> Option<ResolvedType> {
801 match node {
802 CompositionNode::Stage { id, config } => {
803 let resolved = check_stage(id, store, errors)?;
804 if let Some(cfg) = config {
806 if !cfg.is_empty() {
807 if let NType::Record(fields) = &resolved.input {
808 let remaining: std::collections::BTreeMap<String, NType> = fields
809 .iter()
810 .filter(|(name, _)| !cfg.contains_key(*name))
811 .map(|(name, ty)| (name.clone(), ty.clone()))
812 .collect();
813 let effective = if remaining.is_empty() || remaining.len() == 1 {
814 NType::Any
815 } else {
816 NType::Record(remaining)
817 };
818 return Some(ResolvedType {
819 input: effective,
820 output: resolved.output,
821 });
822 }
823 }
824 }
825 Some(resolved)
826 }
827 CompositionNode::RemoteStage { input, output, .. } => Some(ResolvedType {
830 input: input.clone(),
831 output: output.clone(),
832 }),
833 CompositionNode::Const { .. } => Some(ResolvedType {
835 input: NType::Any,
836 output: NType::Any,
837 }),
838 CompositionNode::Sequential { stages } => check_sequential(stages, store, errors),
839 CompositionNode::Parallel { branches } => check_parallel(branches, store, errors),
840 CompositionNode::Branch {
841 predicate,
842 if_true,
843 if_false,
844 } => check_branch(predicate, if_true, if_false, store, errors),
845 CompositionNode::Fanout { source, targets } => check_fanout(source, targets, store, errors),
846 CompositionNode::Merge { sources, target } => check_merge(sources, target, store, errors),
847 CompositionNode::Retry { stage, .. } => check_node(stage, store, errors),
848 CompositionNode::Let { bindings, body } => check_let(bindings, body, store, errors),
849 }
850}
851
852fn check_let(
865 bindings: &BTreeMap<String, CompositionNode>,
866 body: &CompositionNode,
867 store: &(impl StageStore + ?Sized),
868 errors: &mut Vec<GraphTypeError>,
869) -> Option<ResolvedType> {
870 if bindings.is_empty() {
871 errors.push(GraphTypeError::EmptyNode {
872 operator: "Let".into(),
873 });
874 return None;
875 }
876
877 let mut binding_outputs: BTreeMap<String, NType> = BTreeMap::new();
879 let mut required_input: BTreeMap<String, NType> = BTreeMap::new();
880 let mut any_input = false;
881
882 for (name, node) in bindings {
883 let resolved = check_node(node, store, errors)?;
884 binding_outputs.insert(name.clone(), resolved.output);
885 match resolved.input {
886 NType::Record(fields) => {
887 for (f, ty) in fields {
888 required_input.insert(f, ty);
889 }
890 }
891 NType::Any => {
892 any_input = true;
893 }
894 other => {
895 let _ = other;
899 any_input = true;
900 }
901 }
902 }
903
904 let mut body_input_fields = required_input.clone();
907 for (name, out_ty) in &binding_outputs {
908 body_input_fields.insert(name.clone(), out_ty.clone());
909 }
910
911 let body_resolved = check_node(body, store, errors)?;
912
913 if let NType::Record(body_fields) = &body_resolved.input {
919 for (name, expected_ty) in body_fields {
920 let provided = body_input_fields.get(name).cloned();
921 match provided {
922 Some(actual) => {
923 if let TypeCompatibility::Incompatible(reason) =
924 is_subtype_of(&actual, expected_ty)
925 {
926 errors.push(GraphTypeError::SequentialTypeMismatch {
927 position: 0,
928 from_output: actual,
929 to_input: expected_ty.clone(),
930 reason,
931 });
932 }
933 }
934 None => {
935 required_input.insert(name.clone(), expected_ty.clone());
939 }
940 }
941 }
942 }
943
944 let input = if any_input || required_input.is_empty() {
945 NType::Any
946 } else {
947 NType::Record(required_input)
948 };
949
950 Some(ResolvedType {
951 input,
952 output: body_resolved.output,
953 })
954}
955
956fn check_stage(
957 id: &StageId,
958 store: &(impl StageStore + ?Sized),
959 errors: &mut Vec<GraphTypeError>,
960) -> Option<ResolvedType> {
961 match store.get(id) {
962 Ok(Some(stage)) => Some(ResolvedType {
963 input: stage.signature.input.clone(),
964 output: stage.signature.output.clone(),
965 }),
966 _ => {
967 errors.push(GraphTypeError::StageNotFound { id: id.clone() });
968 None
969 }
970 }
971}
972
973fn check_sequential(
974 stages: &[CompositionNode],
975 store: &(impl StageStore + ?Sized),
976 errors: &mut Vec<GraphTypeError>,
977) -> Option<ResolvedType> {
978 if stages.is_empty() {
979 errors.push(GraphTypeError::EmptyNode {
980 operator: "Sequential".into(),
981 });
982 return None;
983 }
984
985 let resolved: Vec<Option<ResolvedType>> = stages
986 .iter()
987 .map(|s| check_node(s, store, errors))
988 .collect();
989
990 for i in 0..resolved.len() - 1 {
992 if let (Some(from), Some(to)) = (&resolved[i], &resolved[i + 1]) {
993 if let TypeCompatibility::Incompatible(reason) = is_subtype_of(&from.output, &to.input)
994 {
995 errors.push(GraphTypeError::SequentialTypeMismatch {
996 position: i,
997 from_output: from.output.clone(),
998 to_input: to.input.clone(),
999 reason,
1000 });
1001 }
1002 }
1003 }
1004
1005 let first_input = resolved
1006 .first()
1007 .and_then(|r| r.as_ref())
1008 .map(|r| r.input.clone());
1009 let last_output = resolved
1010 .last()
1011 .and_then(|r| r.as_ref())
1012 .map(|r| r.output.clone());
1013
1014 match (first_input, last_output) {
1015 (Some(input), Some(output)) => Some(ResolvedType { input, output }),
1016 _ => None,
1017 }
1018}
1019
1020fn check_parallel(
1021 branches: &BTreeMap<String, CompositionNode>,
1022 store: &(impl StageStore + ?Sized),
1023 errors: &mut Vec<GraphTypeError>,
1024) -> Option<ResolvedType> {
1025 if branches.is_empty() {
1026 errors.push(GraphTypeError::EmptyNode {
1027 operator: "Parallel".into(),
1028 });
1029 return None;
1030 }
1031
1032 let mut input_fields = BTreeMap::new();
1033 let mut output_fields = BTreeMap::new();
1034
1035 for (name, node) in branches {
1036 if let Some(resolved) = check_node(node, store, errors) {
1037 input_fields.insert(name.clone(), resolved.input);
1038 output_fields.insert(name.clone(), resolved.output);
1039 }
1040 }
1041
1042 if input_fields.len() == branches.len() {
1043 Some(ResolvedType {
1044 input: NType::Record(input_fields),
1045 output: NType::Record(output_fields),
1046 })
1047 } else {
1048 None
1049 }
1050}
1051
1052fn check_branch(
1053 predicate: &CompositionNode,
1054 if_true: &CompositionNode,
1055 if_false: &CompositionNode,
1056 store: &(impl StageStore + ?Sized),
1057 errors: &mut Vec<GraphTypeError>,
1058) -> Option<ResolvedType> {
1059 let pred = check_node(predicate, store, errors);
1060 let true_branch = check_node(if_true, store, errors);
1061 let false_branch = check_node(if_false, store, errors);
1062
1063 if let Some(ref p) = pred {
1065 if let TypeCompatibility::Incompatible(_) = is_subtype_of(&p.output, &NType::Bool) {
1066 errors.push(GraphTypeError::BranchPredicateNotBool {
1067 actual: p.output.clone(),
1068 });
1069 }
1070 }
1071
1072 match (pred, true_branch, false_branch) {
1076 (Some(p), Some(t), Some(f)) => Some(ResolvedType {
1077 input: p.input,
1078 output: NType::union(vec![t.output, f.output]),
1079 }),
1080 _ => None,
1081 }
1082}
1083
1084fn check_fanout(
1085 source: &CompositionNode,
1086 targets: &[CompositionNode],
1087 store: &(impl StageStore + ?Sized),
1088 errors: &mut Vec<GraphTypeError>,
1089) -> Option<ResolvedType> {
1090 if targets.is_empty() {
1091 errors.push(GraphTypeError::EmptyNode {
1092 operator: "Fanout".into(),
1093 });
1094 return None;
1095 }
1096
1097 let src = check_node(source, store, errors);
1098 let tgts: Vec<Option<ResolvedType>> = targets
1099 .iter()
1100 .map(|t| check_node(t, store, errors))
1101 .collect();
1102
1103 if let Some(ref s) = src {
1105 for (i, t) in tgts.iter().enumerate() {
1106 if let Some(ref t) = t {
1107 if let TypeCompatibility::Incompatible(reason) = is_subtype_of(&s.output, &t.input)
1108 {
1109 errors.push(GraphTypeError::FanoutInputMismatch {
1110 target_index: i,
1111 source_output: s.output.clone(),
1112 target_input: t.input.clone(),
1113 reason,
1114 });
1115 }
1116 }
1117 }
1118 }
1119
1120 let output_types: Vec<NType> = tgts
1121 .iter()
1122 .filter_map(|t| t.as_ref().map(|r| r.output.clone()))
1123 .collect();
1124
1125 match src {
1126 Some(s) if output_types.len() == targets.len() => Some(ResolvedType {
1127 input: s.input,
1128 output: NType::List(Box::new(if output_types.len() == 1 {
1129 output_types.into_iter().next().unwrap()
1130 } else {
1131 NType::union(output_types)
1132 })),
1133 }),
1134 _ => None,
1135 }
1136}
1137
1138fn check_merge(
1139 sources: &[CompositionNode],
1140 target: &CompositionNode,
1141 store: &(impl StageStore + ?Sized),
1142 errors: &mut Vec<GraphTypeError>,
1143) -> Option<ResolvedType> {
1144 if sources.is_empty() {
1145 errors.push(GraphTypeError::EmptyNode {
1146 operator: "Merge".into(),
1147 });
1148 return None;
1149 }
1150
1151 let srcs: Vec<Option<ResolvedType>> = sources
1152 .iter()
1153 .map(|s| check_node(s, store, errors))
1154 .collect();
1155 let tgt = check_node(target, store, errors);
1156
1157 let mut merged_fields = BTreeMap::new();
1159 for (i, s) in srcs.iter().enumerate() {
1160 if let Some(ref r) = s {
1161 merged_fields.insert(format!("source_{i}"), r.output.clone());
1162 }
1163 }
1164 let merged_type = NType::Record(merged_fields);
1165
1166 if let Some(ref t) = tgt {
1168 if let TypeCompatibility::Incompatible(reason) = is_subtype_of(&merged_type, &t.input) {
1169 errors.push(GraphTypeError::MergeOutputMismatch {
1170 merged_type: merged_type.clone(),
1171 target_input: t.input.clone(),
1172 reason,
1173 });
1174 }
1175 }
1176
1177 let mut input_fields = BTreeMap::new();
1179 for (i, s) in srcs.iter().enumerate() {
1180 if let Some(ref r) = s {
1181 input_fields.insert(format!("source_{i}"), r.input.clone());
1182 }
1183 }
1184
1185 match tgt {
1186 Some(t) => Some(ResolvedType {
1187 input: NType::Record(input_fields),
1188 output: t.output,
1189 }),
1190 None => None,
1191 }
1192}
1193
1194#[cfg(test)]
1195mod tests {
1196 use super::*;
1197 use noether_core::capability::Capability;
1198 use noether_core::effects::EffectSet;
1199 use noether_core::stage::{CostEstimate, Stage, StageSignature};
1200 use noether_store::MemoryStore;
1201 use std::collections::BTreeSet;
1202
1203 fn make_stage(id: &str, input: NType, output: NType) -> Stage {
1204 Stage {
1205 id: StageId(id.into()),
1206 canonical_id: None,
1207 signature: StageSignature {
1208 input,
1209 output,
1210 effects: EffectSet::pure(),
1211 implementation_hash: format!("impl_{id}"),
1212 },
1213 capabilities: BTreeSet::new(),
1214 cost: CostEstimate {
1215 time_ms_p50: Some(10),
1216 tokens_est: None,
1217 memory_mb: None,
1218 },
1219 description: format!("test stage {id}"),
1220 examples: vec![],
1221 lifecycle: noether_core::stage::StageLifecycle::Active,
1222 ed25519_signature: None,
1223 signer_public_key: None,
1224 implementation_code: None,
1225 implementation_language: None,
1226 ui_style: None,
1227 tags: vec![],
1228 aliases: vec![],
1229 }
1230 }
1231
1232 fn test_store() -> MemoryStore {
1233 let mut store = MemoryStore::new();
1234 store
1235 .put(make_stage("text_to_num", NType::Text, NType::Number))
1236 .unwrap();
1237 store
1238 .put(make_stage("num_to_bool", NType::Number, NType::Bool))
1239 .unwrap();
1240 store
1241 .put(make_stage("text_to_text", NType::Text, NType::Text))
1242 .unwrap();
1243 store
1244 .put(make_stage("bool_pred", NType::Text, NType::Bool))
1245 .unwrap();
1246 store
1247 .put(make_stage("any_to_text", NType::Any, NType::Text))
1248 .unwrap();
1249 store
1250 }
1251
1252 fn stage(id: &str) -> CompositionNode {
1253 CompositionNode::Stage {
1254 id: StageId(id.into()),
1255 config: None,
1256 }
1257 }
1258
1259 #[test]
1260 fn check_single_stage() {
1261 let store = test_store();
1262 let result = check_graph(&stage("text_to_num"), &store);
1263 let check = result.unwrap();
1264 assert_eq!(check.resolved.input, NType::Text);
1265 assert_eq!(check.resolved.output, NType::Number);
1266 }
1267
1268 #[test]
1269 fn check_missing_stage() {
1270 let store = test_store();
1271 let result = check_graph(&stage("nonexistent"), &store);
1272 assert!(result.is_err());
1273 let errors = result.unwrap_err();
1274 assert!(matches!(errors[0], GraphTypeError::StageNotFound { .. }));
1275 }
1276
1277 #[test]
1278 fn check_valid_sequential() {
1279 let store = test_store();
1280 let node = CompositionNode::Sequential {
1281 stages: vec![stage("text_to_num"), stage("num_to_bool")],
1282 };
1283 let result = check_graph(&node, &store);
1284 let check = result.unwrap();
1285 assert_eq!(check.resolved.input, NType::Text);
1286 assert_eq!(check.resolved.output, NType::Bool);
1287 }
1288
1289 #[test]
1290 fn check_invalid_sequential() {
1291 let store = test_store();
1292 let node = CompositionNode::Sequential {
1294 stages: vec![stage("num_to_bool"), stage("text_to_num")],
1295 };
1296 let result = check_graph(&node, &store);
1297 assert!(result.is_err());
1298 let errors = result.unwrap_err();
1299 assert!(matches!(
1300 errors[0],
1301 GraphTypeError::SequentialTypeMismatch { .. }
1302 ));
1303 }
1304
1305 #[test]
1306 fn check_parallel() {
1307 let store = test_store();
1308 let node = CompositionNode::Parallel {
1309 branches: BTreeMap::from([
1310 ("nums".into(), stage("text_to_num")),
1311 ("bools".into(), stage("bool_pred")),
1312 ]),
1313 };
1314 let result = check_graph(&node, &store);
1315 let check = result.unwrap();
1316 assert!(matches!(check.resolved.input, NType::Record(_)));
1319 assert!(matches!(check.resolved.output, NType::Record(_)));
1320 }
1321
1322 #[test]
1323 fn check_branch_valid() {
1324 let store = test_store();
1325 let node = CompositionNode::Branch {
1326 predicate: Box::new(stage("bool_pred")),
1327 if_true: Box::new(stage("text_to_num")),
1328 if_false: Box::new(stage("text_to_text")),
1329 };
1330 let result = check_graph(&node, &store);
1334 let check = result.unwrap();
1335 assert_eq!(check.resolved.input, NType::Text);
1336 }
1337
1338 #[test]
1339 fn check_retry_transparent() {
1340 let store = test_store();
1341 let node = CompositionNode::Retry {
1342 stage: Box::new(stage("text_to_num")),
1343 max_attempts: 3,
1344 delay_ms: Some(100),
1345 };
1346 let result = check_graph(&node, &store);
1347 let check = result.unwrap();
1348 assert_eq!(check.resolved.input, NType::Text);
1349 assert_eq!(check.resolved.output, NType::Number);
1350 }
1351
1352 #[test]
1353 fn capability_policy_allow_all_passes() {
1354 let mut store = test_store();
1355 let mut stage_net = make_stage("net_stage", NType::Text, NType::Text);
1356 stage_net.capabilities.insert(Capability::Network);
1357 store.put(stage_net).unwrap();
1358
1359 let policy = CapabilityPolicy::allow_all();
1360 let violations = check_capabilities(&stage("net_stage"), &store, &policy);
1361 assert!(violations.is_empty());
1362 }
1363
1364 #[test]
1365 fn capability_policy_restrict_blocks_network() {
1366 let mut store = test_store();
1367 let mut stage_net = make_stage("net_stage2", NType::Text, NType::Text);
1368 stage_net.capabilities.insert(Capability::Network);
1369 store.put(stage_net).unwrap();
1370
1371 let policy = CapabilityPolicy::restrict([Capability::FsRead]);
1372 let violations = check_capabilities(&stage("net_stage2"), &store, &policy);
1373 assert_eq!(violations.len(), 1);
1374 assert!(matches!(violations[0].required, Capability::Network));
1375 }
1376
1377 #[test]
1378 fn capability_policy_restrict_allows_declared() {
1379 let mut store = test_store();
1380 let mut stage_net = make_stage("net_stage3", NType::Text, NType::Text);
1381 stage_net.capabilities.insert(Capability::Network);
1382 store.put(stage_net).unwrap();
1383
1384 let policy = CapabilityPolicy::restrict([Capability::Network]);
1385 let violations = check_capabilities(&stage("net_stage3"), &store, &policy);
1386 assert!(violations.is_empty());
1387 }
1388
1389 #[test]
1390 fn remote_stage_resolves_declared_types() {
1391 let store = test_store();
1392 let node = CompositionNode::RemoteStage {
1393 url: "http://api.example.com".into(),
1394 input: NType::Text,
1395 output: NType::Number,
1396 };
1397 let result = check_graph(&node, &store).unwrap();
1398 assert_eq!(result.resolved.input, NType::Text);
1399 assert_eq!(result.resolved.output, NType::Number);
1400 }
1401
1402 #[test]
1403 fn remote_stage_in_sequential_type_flows() {
1404 let mut store = test_store();
1405 store
1406 .put(make_stage("num_render", NType::Number, NType::Text))
1407 .unwrap();
1408
1409 let node = CompositionNode::Sequential {
1411 stages: vec![
1412 CompositionNode::RemoteStage {
1413 url: "http://api:8080".into(),
1414 input: NType::Text,
1415 output: NType::Number,
1416 },
1417 CompositionNode::Stage {
1418 id: StageId("num_render".into()),
1419 config: None,
1420 },
1421 ],
1422 };
1423 let result = check_graph(&node, &store).unwrap();
1424 assert_eq!(result.resolved.input, NType::Text);
1425 assert_eq!(result.resolved.output, NType::Text);
1426 }
1427
1428 #[test]
1429 fn remote_stage_type_mismatch_is_detected() {
1430 let store = test_store();
1431 let node = CompositionNode::Sequential {
1433 stages: vec![
1434 CompositionNode::RemoteStage {
1435 url: "http://api:8080".into(),
1436 input: NType::Text,
1437 output: NType::Bool,
1438 },
1439 CompositionNode::Stage {
1440 id: StageId("text_to_num".into()),
1441 config: None,
1442 },
1443 ],
1444 };
1445 let result = check_graph(&node, &store);
1446 assert!(result.is_err());
1447 let errors = result.unwrap_err();
1448 assert!(errors
1449 .iter()
1450 .any(|e| matches!(e, GraphTypeError::SequentialTypeMismatch { .. })));
1451 }
1452
1453 fn make_stage_with_effects(id: &str, effects: EffectSet) -> Stage {
1456 let mut s = make_stage(id, NType::Any, NType::Any);
1457 s.signature.effects = effects;
1458 s
1459 }
1460
1461 #[test]
1462 fn infer_effects_pure_stage() {
1463 let mut store = MemoryStore::new();
1464 let stage = make_stage_with_effects("pure1", EffectSet::pure());
1465 store.put(stage.clone()).unwrap();
1466 let node = CompositionNode::Stage {
1467 id: StageId("pure1".into()),
1468 config: None,
1469 };
1470 let effects = infer_effects(&node, &store);
1471 assert!(effects.contains(&Effect::Pure));
1472 assert!(!effects.contains(&Effect::Network));
1473 }
1474
1475 #[test]
1476 fn infer_effects_union_sequential() {
1477 let mut store = MemoryStore::new();
1478 store
1479 .put(make_stage_with_effects("a", EffectSet::new([Effect::Pure])))
1480 .unwrap();
1481 store
1482 .put(make_stage_with_effects(
1483 "b",
1484 EffectSet::new([Effect::Network]),
1485 ))
1486 .unwrap();
1487 let node = CompositionNode::Sequential {
1488 stages: vec![
1489 CompositionNode::Stage {
1490 id: StageId("a".into()),
1491 config: None,
1492 },
1493 CompositionNode::Stage {
1494 id: StageId("b".into()),
1495 config: None,
1496 },
1497 ],
1498 };
1499 let effects = infer_effects(&node, &store);
1500 assert!(effects.contains(&Effect::Pure));
1501 assert!(effects.contains(&Effect::Network));
1502 }
1503
1504 #[test]
1505 fn infer_effects_remote_stage_adds_network() {
1506 let store = MemoryStore::new();
1507 let node = CompositionNode::RemoteStage {
1508 url: "http://localhost:8080".into(),
1509 input: NType::Any,
1510 output: NType::Any,
1511 };
1512 let effects = infer_effects(&node, &store);
1513 assert!(effects.contains(&Effect::Network));
1514 assert!(effects.contains(&Effect::Fallible));
1515 }
1516
1517 #[test]
1518 fn infer_effects_missing_stage_adds_unknown() {
1519 let store = MemoryStore::new();
1520 let node = CompositionNode::Stage {
1521 id: StageId("missing".into()),
1522 config: None,
1523 };
1524 let effects = infer_effects(&node, &store);
1525 assert!(effects.contains(&Effect::Unknown));
1526 }
1527
1528 #[test]
1531 fn effect_policy_allow_all_never_violates() {
1532 let mut store = MemoryStore::new();
1533 store
1534 .put(make_stage_with_effects(
1535 "net",
1536 EffectSet::new([Effect::Network, Effect::Fallible]),
1537 ))
1538 .unwrap();
1539 let node = CompositionNode::Stage {
1540 id: StageId("net".into()),
1541 config: None,
1542 };
1543 let policy = EffectPolicy::allow_all();
1544 assert!(check_effects(&node, &store, &policy).is_empty());
1545 }
1546
1547 #[test]
1548 fn effect_policy_restrict_blocks_network() {
1549 let mut store = MemoryStore::new();
1550 store
1551 .put(make_stage_with_effects(
1552 "net",
1553 EffectSet::new([Effect::Network]),
1554 ))
1555 .unwrap();
1556 let node = CompositionNode::Stage {
1557 id: StageId("net".into()),
1558 config: None,
1559 };
1560 let policy = EffectPolicy::restrict([EffectKind::Pure]);
1561 let violations = check_effects(&node, &store, &policy);
1562 assert!(!violations.is_empty());
1563 assert!(violations[0].message.contains("network"));
1564 }
1565
1566 #[test]
1567 fn effect_policy_restrict_allows_matching_effect() {
1568 let mut store = MemoryStore::new();
1569 store
1570 .put(make_stage_with_effects(
1571 "llm",
1572 EffectSet::new([Effect::Llm {
1573 model: "gpt-4o".into(),
1574 }]),
1575 ))
1576 .unwrap();
1577 let node = CompositionNode::Stage {
1578 id: StageId("llm".into()),
1579 config: None,
1580 };
1581 let policy = EffectPolicy::restrict([EffectKind::Llm]);
1582 assert!(check_effects(&node, &store, &policy).is_empty());
1583 }
1584}