Skip to main content

noether_engine/
checker.rs

1use crate::lagrange::CompositionNode;
2use noether_core::capability::Capability;
3use noether_core::effects::{Effect, EffectKind, EffectSet};
4use noether_core::stage::StageId;
5use noether_core::types::{is_subtype_of, IncompatibilityReason, NType, TypeCompatibility};
6use noether_store::StageStore;
7use std::collections::{BTreeMap, BTreeSet};
8use std::fmt;
9
10/// The resolved input/output types of a composition node.
11#[derive(Debug, Clone)]
12pub struct ResolvedType {
13    pub input: NType,
14    pub output: NType,
15}
16
17// ── Capability enforcement ─────────────────────────────────────────────────
18
19/// Policy controlling which capabilities a composition is allowed to use.
20///
21/// `allowed` is empty → all capabilities permitted (default / backward-compatible).
22/// `allowed` is non-empty → only the listed capabilities are permitted.
23#[derive(Debug, Clone, Default)]
24pub struct CapabilityPolicy {
25    /// Capabilities the caller grants. Empty set = allow all.
26    pub allowed: BTreeSet<Capability>,
27}
28
29impl CapabilityPolicy {
30    /// A policy that allows every capability.
31    pub fn allow_all() -> Self {
32        Self {
33            allowed: BTreeSet::new(),
34        }
35    }
36
37    /// A policy that permits only the listed capabilities.
38    pub fn restrict(caps: impl IntoIterator<Item = Capability>) -> Self {
39        Self {
40            allowed: caps.into_iter().collect(),
41        }
42    }
43
44    fn is_allowed(&self, cap: &Capability) -> bool {
45        self.allowed.is_empty() || self.allowed.contains(cap)
46    }
47}
48
49/// A single capability violation found during pre-flight checking.
50#[derive(Debug, Clone)]
51pub struct CapabilityViolation {
52    pub stage_id: StageId,
53    pub required: Capability,
54    pub message: String,
55}
56
57impl fmt::Display for CapabilityViolation {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        write!(
60            f,
61            "stage {} requires capability {:?} which is not granted",
62            self.stage_id.0, self.required
63        )
64    }
65}
66
67/// Pre-flight check: walk the graph and verify every stage's declared capabilities
68/// are within the granted policy. Returns an empty vec when all capabilities pass.
69pub fn check_capabilities(
70    node: &CompositionNode,
71    store: &(impl StageStore + ?Sized),
72    policy: &CapabilityPolicy,
73) -> Vec<CapabilityViolation> {
74    let mut violations = Vec::new();
75    collect_capability_violations(node, store, policy, &mut violations);
76    violations
77}
78
79fn collect_capability_violations(
80    node: &CompositionNode,
81    store: &(impl StageStore + ?Sized),
82    policy: &CapabilityPolicy,
83    violations: &mut Vec<CapabilityViolation>,
84) {
85    match node {
86        CompositionNode::Stage { id, .. } => {
87            if let Ok(Some(stage)) = store.get(id) {
88                for cap in &stage.capabilities {
89                    if !policy.is_allowed(cap) {
90                        violations.push(CapabilityViolation {
91                            stage_id: id.clone(),
92                            required: cap.clone(),
93                            message: format!(
94                                "stage '{}' requires {:?}; grant it with --allow-capabilities",
95                                stage.description, cap
96                            ),
97                        });
98                    }
99                }
100            }
101        }
102        CompositionNode::RemoteStage { .. } => {} // remote stages have no local capabilities
103        CompositionNode::Const { .. } => {}       // no capabilities in a constant
104        CompositionNode::Sequential { stages } => {
105            for s in stages {
106                collect_capability_violations(s, store, policy, violations);
107            }
108        }
109        CompositionNode::Parallel { branches } => {
110            for branch in branches.values() {
111                collect_capability_violations(branch, store, policy, violations);
112            }
113        }
114        CompositionNode::Branch {
115            predicate,
116            if_true,
117            if_false,
118        } => {
119            collect_capability_violations(predicate, store, policy, violations);
120            collect_capability_violations(if_true, store, policy, violations);
121            collect_capability_violations(if_false, store, policy, violations);
122        }
123        CompositionNode::Fanout { source, targets } => {
124            collect_capability_violations(source, store, policy, violations);
125            for t in targets {
126                collect_capability_violations(t, store, policy, violations);
127            }
128        }
129        CompositionNode::Merge { sources, target } => {
130            for s in sources {
131                collect_capability_violations(s, store, policy, violations);
132            }
133            collect_capability_violations(target, store, policy, violations);
134        }
135        CompositionNode::Retry { stage, .. } => {
136            collect_capability_violations(stage, store, policy, violations);
137        }
138        CompositionNode::Let { bindings, body } => {
139            for b in bindings.values() {
140                collect_capability_violations(b, store, policy, violations);
141            }
142            collect_capability_violations(body, store, policy, violations);
143        }
144    }
145}
146
147// ── Effect inference & enforcement ────────────────────────────────────────
148
149/// Policy controlling which effect kinds a composition is allowed to declare.
150///
151/// `allowed` is empty → all effects permitted (default / backward-compatible).
152/// `allowed` is non-empty → only the listed effect kinds are permitted; others
153/// produce an [`EffectViolation`].
154#[derive(Debug, Clone, Default)]
155pub struct EffectPolicy {
156    /// Effect kinds the caller grants. Empty set = allow all.
157    pub allowed: BTreeSet<EffectKind>,
158}
159
160impl EffectPolicy {
161    /// A policy that allows every effect (default).
162    pub fn allow_all() -> Self {
163        Self {
164            allowed: BTreeSet::new(),
165        }
166    }
167
168    /// A policy that permits only the listed effect kinds.
169    pub fn restrict(kinds: impl IntoIterator<Item = EffectKind>) -> Self {
170        Self {
171            allowed: kinds.into_iter().collect(),
172        }
173    }
174
175    pub fn is_allowed(&self, kind: &EffectKind) -> bool {
176        self.allowed.is_empty() || self.allowed.contains(kind)
177    }
178}
179
180/// A single effect violation found during pre-flight checking.
181#[derive(Debug, Clone)]
182pub struct EffectViolation {
183    pub stage_id: StageId,
184    pub effect: Effect,
185    pub message: String,
186}
187
188impl fmt::Display for EffectViolation {
189    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190        write!(f, "{}", self.message)
191    }
192}
193
194/// Walk the composition graph and return the union of all effects declared by
195/// every stage. `RemoteStage` nodes always contribute `Effect::Network`.
196/// Stages not found in the store contribute `Effect::Unknown`.
197pub fn infer_effects(node: &CompositionNode, store: &(impl StageStore + ?Sized)) -> EffectSet {
198    let mut effects: BTreeSet<Effect> = BTreeSet::new();
199    collect_effects_inner(node, store, &mut effects);
200    EffectSet::new(effects)
201}
202
203fn collect_effects_inner(
204    node: &CompositionNode,
205    store: &(impl StageStore + ?Sized),
206    effects: &mut BTreeSet<Effect>,
207) {
208    match node {
209        CompositionNode::Stage { id, .. } => match store.get(id) {
210            Ok(Some(stage)) => {
211                for e in stage.signature.effects.iter() {
212                    effects.insert(e.clone());
213                }
214            }
215            _ => {
216                effects.insert(Effect::Unknown);
217            }
218        },
219        CompositionNode::RemoteStage { .. } => {
220            effects.insert(Effect::Network);
221            effects.insert(Effect::Fallible);
222        }
223        CompositionNode::Const { .. } => {
224            effects.insert(Effect::Pure);
225        }
226        CompositionNode::Sequential { stages } => {
227            for s in stages {
228                collect_effects_inner(s, store, effects);
229            }
230        }
231        CompositionNode::Parallel { branches } => {
232            for branch in branches.values() {
233                collect_effects_inner(branch, store, effects);
234            }
235        }
236        CompositionNode::Branch {
237            predicate,
238            if_true,
239            if_false,
240        } => {
241            collect_effects_inner(predicate, store, effects);
242            collect_effects_inner(if_true, store, effects);
243            collect_effects_inner(if_false, store, effects);
244        }
245        CompositionNode::Fanout { source, targets } => {
246            collect_effects_inner(source, store, effects);
247            for t in targets {
248                collect_effects_inner(t, store, effects);
249            }
250        }
251        CompositionNode::Merge { sources, target } => {
252            for s in sources {
253                collect_effects_inner(s, store, effects);
254            }
255            collect_effects_inner(target, store, effects);
256        }
257        CompositionNode::Retry { stage, .. } => {
258            collect_effects_inner(stage, store, effects);
259        }
260        CompositionNode::Let { bindings, body } => {
261            for b in bindings.values() {
262                collect_effects_inner(b, store, effects);
263            }
264            collect_effects_inner(body, store, effects);
265        }
266    }
267}
268
269/// Pre-flight check: walk the graph and verify every stage's declared effects
270/// are within the granted policy. Returns an empty vec when all effects are allowed.
271pub fn check_effects(
272    node: &CompositionNode,
273    store: &(impl StageStore + ?Sized),
274    policy: &EffectPolicy,
275) -> Vec<EffectViolation> {
276    let mut violations = Vec::new();
277    collect_effect_violations(node, store, policy, &mut violations);
278    violations
279}
280
281fn collect_effect_violations(
282    node: &CompositionNode,
283    store: &(impl StageStore + ?Sized),
284    policy: &EffectPolicy,
285    violations: &mut Vec<EffectViolation>,
286) {
287    match node {
288        CompositionNode::Stage { id, .. } => match store.get(id) {
289            Ok(Some(stage)) => {
290                for effect in stage.signature.effects.iter() {
291                    let kind = effect.kind();
292                    if !policy.is_allowed(&kind) {
293                        violations.push(EffectViolation {
294                            stage_id: id.clone(),
295                            effect: effect.clone(),
296                            message: format!(
297                                "stage '{}' declares effect {kind}; grant it with --allow-effects {kind}",
298                                stage.description
299                            ),
300                        });
301                    }
302                }
303            }
304            _ => {
305                let kind = EffectKind::Unknown;
306                if !policy.is_allowed(&kind) {
307                    violations.push(EffectViolation {
308                        stage_id: id.clone(),
309                        effect: Effect::Unknown,
310                        message: format!(
311                            "stage {} has unknown effects (not in store); grant with --allow-effects unknown",
312                            id.0
313                        ),
314                    });
315                }
316            }
317        },
318        CompositionNode::RemoteStage { .. } => {
319            for effect in &[Effect::Network, Effect::Fallible] {
320                let kind = effect.kind();
321                if !policy.is_allowed(&kind) {
322                    violations.push(EffectViolation {
323                        stage_id: StageId("remote".into()),
324                        effect: effect.clone(),
325                        message: format!(
326                            "RemoteStage declares implicit effect {kind}; grant with --allow-effects {kind}"
327                        ),
328                    });
329                }
330            }
331        }
332        CompositionNode::Const { .. } => {}
333        CompositionNode::Sequential { stages } => {
334            for s in stages {
335                collect_effect_violations(s, store, policy, violations);
336            }
337        }
338        CompositionNode::Parallel { branches } => {
339            for branch in branches.values() {
340                collect_effect_violations(branch, store, policy, violations);
341            }
342        }
343        CompositionNode::Branch {
344            predicate,
345            if_true,
346            if_false,
347        } => {
348            collect_effect_violations(predicate, store, policy, violations);
349            collect_effect_violations(if_true, store, policy, violations);
350            collect_effect_violations(if_false, store, policy, violations);
351        }
352        CompositionNode::Fanout { source, targets } => {
353            collect_effect_violations(source, store, policy, violations);
354            for t in targets {
355                collect_effect_violations(t, store, policy, violations);
356            }
357        }
358        CompositionNode::Merge { sources, target } => {
359            for s in sources {
360                collect_effect_violations(s, store, policy, violations);
361            }
362            collect_effect_violations(target, store, policy, violations);
363        }
364        CompositionNode::Retry { stage, .. } => {
365            collect_effect_violations(stage, store, policy, violations);
366        }
367        CompositionNode::Let { bindings, body } => {
368            for b in bindings.values() {
369                collect_effect_violations(b, store, policy, violations);
370            }
371            collect_effect_violations(body, store, policy, violations);
372        }
373    }
374}
375
376// ── Signature verification ─────────────────────────────────────────────────
377
378/// Why a stage's signature check failed.
379#[derive(Debug, Clone, PartialEq, Eq)]
380pub enum SignatureViolationKind {
381    /// The stage has no `ed25519_signature` / `signer_public_key` — it was built unsigned.
382    Missing,
383    /// A signature is present but cryptographic verification failed (tampered stage).
384    Invalid,
385}
386
387impl fmt::Display for SignatureViolationKind {
388    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
389        match self {
390            Self::Missing => write!(f, "unsigned"),
391            Self::Invalid => write!(f, "invalid signature"),
392        }
393    }
394}
395
396/// A single signature violation found during pre-flight checking.
397#[derive(Debug, Clone)]
398pub struct SignatureViolation {
399    pub stage_id: StageId,
400    pub kind: SignatureViolationKind,
401    pub message: String,
402}
403
404impl fmt::Display for SignatureViolation {
405    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
406        write!(f, "stage {} — {}", self.stage_id.0, self.message)
407    }
408}
409
410/// Pre-flight check: walk the graph and verify every stage's Ed25519 signature.
411///
412/// Returns an empty vec when all signatures pass. Stages with a missing
413/// signature OR an invalid signature are both reported as violations.
414pub fn verify_signatures(
415    node: &CompositionNode,
416    store: &(impl StageStore + ?Sized),
417) -> Vec<SignatureViolation> {
418    let mut violations = Vec::new();
419    collect_signature_violations(node, store, &mut violations);
420    violations
421}
422
423fn collect_signature_violations(
424    node: &CompositionNode,
425    store: &(impl StageStore + ?Sized),
426    violations: &mut Vec<SignatureViolation>,
427) {
428    match node {
429        CompositionNode::Stage { id, .. } => {
430            if let Ok(Some(stage)) = store.get(id) {
431                match (&stage.ed25519_signature, &stage.signer_public_key) {
432                    (None, _) | (_, None) => {
433                        violations.push(SignatureViolation {
434                            stage_id: id.clone(),
435                            kind: SignatureViolationKind::Missing,
436                            message: format!(
437                                "stage '{}' has no signature — add it via the signing pipeline",
438                                stage.description
439                            ),
440                        });
441                    }
442                    (Some(sig_hex), Some(pub_hex)) => {
443                        match noether_core::stage::verify_stage_signature(id, sig_hex, pub_hex) {
444                            Ok(true) => {} // valid
445                            Ok(false) => {
446                                violations.push(SignatureViolation {
447                                    stage_id: id.clone(),
448                                    kind: SignatureViolationKind::Invalid,
449                                    message: format!(
450                                        "stage '{}' signature verification failed — possible tampering",
451                                        stage.description
452                                    ),
453                                });
454                            }
455                            Err(e) => {
456                                violations.push(SignatureViolation {
457                                    stage_id: id.clone(),
458                                    kind: SignatureViolationKind::Invalid,
459                                    message: format!(
460                                        "stage '{}' signature could not be decoded: {e}",
461                                        stage.description
462                                    ),
463                                });
464                            }
465                        }
466                    }
467                }
468            }
469            // If the stage is not in the store, the type-checker will already
470            // have reported an unknown-stage error; skip here.
471        }
472        CompositionNode::Const { .. } => {} // constants have no signature to verify
473        CompositionNode::RemoteStage { .. } => {} // remote stages have no local signature to verify
474        CompositionNode::Sequential { stages } => {
475            for s in stages {
476                collect_signature_violations(s, store, violations);
477            }
478        }
479        CompositionNode::Parallel { branches } => {
480            for branch in branches.values() {
481                collect_signature_violations(branch, store, violations);
482            }
483        }
484        CompositionNode::Branch {
485            predicate,
486            if_true,
487            if_false,
488        } => {
489            collect_signature_violations(predicate, store, violations);
490            collect_signature_violations(if_true, store, violations);
491            collect_signature_violations(if_false, store, violations);
492        }
493        CompositionNode::Fanout { source, targets } => {
494            collect_signature_violations(source, store, violations);
495            for t in targets {
496                collect_signature_violations(t, store, violations);
497            }
498        }
499        CompositionNode::Merge { sources, target } => {
500            for s in sources {
501                collect_signature_violations(s, store, violations);
502            }
503            collect_signature_violations(target, store, violations);
504        }
505        CompositionNode::Retry { stage, .. } => {
506            collect_signature_violations(stage, store, violations);
507        }
508        CompositionNode::Let { bindings, body } => {
509            for b in bindings.values() {
510                collect_signature_violations(b, store, violations);
511            }
512            collect_signature_violations(body, store, violations);
513        }
514    }
515}
516
517// ── Effect warnings ────────────────────────────────────────────────────────
518
519/// Warnings about effect usage detected during graph type-checking.
520///
521/// These are soft issues — the graph is structurally valid but may have
522/// surprising runtime behaviour. Callers decide whether to block or surface them.
523#[derive(Debug, Clone)]
524pub enum EffectWarning {
525    /// A `Fallible` stage is not wrapped in a `Retry` node. Failures propagate.
526    FallibleWithoutRetry { stage_id: StageId },
527    /// A `NonDeterministic` stage's output feeds a `Pure` stage.
528    NonDeterministicFeedingPure { from: StageId, to: StageId },
529    /// The sum of declared `Cost` effects exceeds the given budget (in cents).
530    CostBudgetExceeded { total_cents: u64, budget_cents: u64 },
531}
532
533impl fmt::Display for EffectWarning {
534    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
535        match self {
536            EffectWarning::FallibleWithoutRetry { stage_id } => write!(
537                f,
538                "stage {} is Fallible but has no Retry wrapper; failures will propagate",
539                stage_id.0
540            ),
541            EffectWarning::NonDeterministicFeedingPure { from, to } => write!(
542                f,
543                "stage {} is NonDeterministic but feeds Pure stage {}; Pure caching will be bypassed",
544                from.0, to.0
545            ),
546            EffectWarning::CostBudgetExceeded { total_cents, budget_cents } => write!(
547                f,
548                "estimated composition cost ({total_cents}¢) exceeds budget ({budget_cents}¢)"
549            ),
550        }
551    }
552}
553
554/// The result of a successful graph type-check: resolved types plus any effect warnings.
555#[derive(Debug, Clone)]
556pub struct CheckResult {
557    pub resolved: ResolvedType,
558    pub warnings: Vec<EffectWarning>,
559}
560
561// ── Errors detected during graph type checking ────────────────────────────
562#[derive(Debug, Clone)]
563pub enum GraphTypeError {
564    StageNotFound {
565        id: StageId,
566    },
567    SequentialTypeMismatch {
568        position: usize,
569        from_output: NType,
570        to_input: NType,
571        reason: IncompatibilityReason,
572    },
573    BranchPredicateNotBool {
574        actual: NType,
575    },
576    BranchOutputMismatch {
577        true_output: NType,
578        false_output: NType,
579        reason: IncompatibilityReason,
580    },
581    FanoutInputMismatch {
582        target_index: usize,
583        source_output: NType,
584        target_input: NType,
585        reason: IncompatibilityReason,
586    },
587    MergeOutputMismatch {
588        merged_type: NType,
589        target_input: NType,
590        reason: IncompatibilityReason,
591    },
592    EmptyNode {
593        operator: String,
594    },
595}
596
597impl fmt::Display for GraphTypeError {
598    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
599        match self {
600            GraphTypeError::StageNotFound { id } => {
601                write!(f, "stage {} not found in store", id.0)
602            }
603            GraphTypeError::SequentialTypeMismatch {
604                position,
605                from_output,
606                to_input,
607                reason,
608            } => write!(
609                f,
610                "type mismatch at position {position}: output {from_output} is not subtype of input {to_input}: {reason}"
611            ),
612            GraphTypeError::BranchPredicateNotBool { actual } => {
613                write!(f, "branch predicate must produce Bool, got {actual}")
614            }
615            GraphTypeError::BranchOutputMismatch {
616                true_output,
617                false_output,
618                reason,
619            } => write!(
620                f,
621                "branch outputs must be compatible: if_true produces {true_output}, if_false produces {false_output}: {reason}"
622            ),
623            GraphTypeError::FanoutInputMismatch {
624                target_index,
625                source_output,
626                target_input,
627                reason,
628            } => write!(
629                f,
630                "fanout target {target_index}: source output {source_output} is not subtype of target input {target_input}: {reason}"
631            ),
632            GraphTypeError::MergeOutputMismatch {
633                merged_type,
634                target_input,
635                reason,
636            } => write!(
637                f,
638                "merge: merged type {merged_type} is not subtype of target input {target_input}: {reason}"
639            ),
640            GraphTypeError::EmptyNode { operator } => {
641                write!(f, "empty {operator} node")
642            }
643        }
644    }
645}
646
647/// Type-check a composition graph against the stage store.
648///
649/// Returns `CheckResult` (resolved types + effect warnings) on success,
650/// or a list of hard type errors on failure.
651pub fn check_graph(
652    node: &CompositionNode,
653    store: &(impl StageStore + ?Sized),
654) -> Result<CheckResult, Vec<GraphTypeError>> {
655    let mut errors = Vec::new();
656    let result = check_node(node, store, &mut errors);
657    if errors.is_empty() {
658        let resolved = result.unwrap();
659        let warnings = collect_effect_warnings(node, store, None);
660        Ok(CheckResult { resolved, warnings })
661    } else {
662        Err(errors)
663    }
664}
665
666/// Collect effect warnings by walking the graph.
667/// `cost_budget_cents` — pass `Some(n)` to enable budget enforcement.
668pub fn collect_effect_warnings(
669    node: &CompositionNode,
670    store: &(impl StageStore + ?Sized),
671    cost_budget_cents: Option<u64>,
672) -> Vec<EffectWarning> {
673    let mut warnings = Vec::new();
674    let mut total_cost: u64 = 0;
675    collect_warnings_inner(node, store, &mut warnings, &mut total_cost, false);
676    if let Some(budget) = cost_budget_cents {
677        if total_cost > budget {
678            warnings.push(EffectWarning::CostBudgetExceeded {
679                total_cents: total_cost,
680                budget_cents: budget,
681            });
682        }
683    }
684    warnings
685}
686
687fn collect_warnings_inner(
688    node: &CompositionNode,
689    store: &(impl StageStore + ?Sized),
690    warnings: &mut Vec<EffectWarning>,
691    total_cost: &mut u64,
692    _parent_is_retry: bool,
693) {
694    match node {
695        CompositionNode::Stage { id, .. } => {
696            if let Ok(Some(stage)) = store.get(id) {
697                // Accumulate cost
698                for effect in stage.signature.effects.iter() {
699                    if let Effect::Cost { cents } = effect {
700                        *total_cost = total_cost.saturating_add(*cents);
701                    }
702                }
703                // Fallible without retry is handled at the parent sequential level
704            }
705        }
706        CompositionNode::RemoteStage { .. } => {} // remote calls have no local effects to warn about
707        CompositionNode::Const { .. } => {}       // no effects in a constant
708        CompositionNode::Sequential { stages } => {
709            for (i, s) in stages.iter().enumerate() {
710                collect_warnings_inner(s, store, warnings, total_cost, false);
711
712                // Rule: Fallible stage not wrapped in Retry
713                if let CompositionNode::Stage { id, .. } = s {
714                    if let Ok(Some(stage)) = store.get(id) {
715                        if stage.signature.effects.contains(&Effect::Fallible) {
716                            warnings.push(EffectWarning::FallibleWithoutRetry {
717                                stage_id: id.clone(),
718                            });
719                        }
720                    }
721                }
722
723                // Rule: NonDeterministic output → Pure input
724                if i + 1 < stages.len() {
725                    if let (
726                        CompositionNode::Stage { id: from_id, .. },
727                        CompositionNode::Stage { id: to_id, .. },
728                    ) = (s, &stages[i + 1])
729                    {
730                        let from_nd = store
731                            .get(from_id)
732                            .ok()
733                            .flatten()
734                            .map(|s| s.signature.effects.contains(&Effect::NonDeterministic))
735                            .unwrap_or(false);
736                        let to_pure = store
737                            .get(to_id)
738                            .ok()
739                            .flatten()
740                            .map(|s| s.signature.effects.contains(&Effect::Pure))
741                            .unwrap_or(false);
742
743                        if from_nd && to_pure {
744                            warnings.push(EffectWarning::NonDeterministicFeedingPure {
745                                from: from_id.clone(),
746                                to: to_id.clone(),
747                            });
748                        }
749                    }
750                }
751            }
752        }
753        CompositionNode::Parallel { branches } => {
754            for branch in branches.values() {
755                collect_warnings_inner(branch, store, warnings, total_cost, false);
756            }
757        }
758        CompositionNode::Branch {
759            predicate,
760            if_true,
761            if_false,
762        } => {
763            collect_warnings_inner(predicate, store, warnings, total_cost, false);
764            collect_warnings_inner(if_true, store, warnings, total_cost, false);
765            collect_warnings_inner(if_false, store, warnings, total_cost, false);
766        }
767        CompositionNode::Fanout { source, targets } => {
768            collect_warnings_inner(source, store, warnings, total_cost, false);
769            for t in targets {
770                collect_warnings_inner(t, store, warnings, total_cost, false);
771            }
772        }
773        CompositionNode::Merge { sources, target } => {
774            for s in sources {
775                collect_warnings_inner(s, store, warnings, total_cost, false);
776            }
777            collect_warnings_inner(target, store, warnings, total_cost, false);
778        }
779        CompositionNode::Retry { stage, .. } => {
780            // Retry wraps Fallible — suppress FallibleWithoutRetry for direct child
781            collect_warnings_inner(stage, store, warnings, total_cost, true);
782            // Remove any FallibleWithoutRetry that was just added for the immediate child
783            if let CompositionNode::Stage { id, .. } = stage.as_ref() {
784                warnings.retain(|w| !matches!(w, EffectWarning::FallibleWithoutRetry { stage_id } if stage_id == id));
785            }
786        }
787        CompositionNode::Let { bindings, body } => {
788            for b in bindings.values() {
789                collect_warnings_inner(b, store, warnings, total_cost, false);
790            }
791            collect_warnings_inner(body, store, warnings, total_cost, false);
792        }
793    }
794}
795
796fn check_node(
797    node: &CompositionNode,
798    store: &(impl StageStore + ?Sized),
799    errors: &mut Vec<GraphTypeError>,
800) -> Option<ResolvedType> {
801    match node {
802        CompositionNode::Stage { id, config } => {
803            let resolved = check_stage(id, store, errors)?;
804            // When config provides fields, reduce the effective input type
805            if let Some(cfg) = config {
806                if !cfg.is_empty() {
807                    if let NType::Record(fields) = &resolved.input {
808                        let remaining: std::collections::BTreeMap<String, NType> = fields
809                            .iter()
810                            .filter(|(name, _)| !cfg.contains_key(*name))
811                            .map(|(name, ty)| (name.clone(), ty.clone()))
812                            .collect();
813                        let effective = if remaining.is_empty() || remaining.len() == 1 {
814                            NType::Any
815                        } else {
816                            NType::Record(remaining)
817                        };
818                        return Some(ResolvedType {
819                            input: effective,
820                            output: resolved.output,
821                        });
822                    }
823                }
824            }
825            Some(resolved)
826        }
827        // RemoteStage: types are declared inline — no store lookup needed.
828        // The type checker trusts the declared input/output types.
829        CompositionNode::RemoteStage { input, output, .. } => Some(ResolvedType {
830            input: input.clone(),
831            output: output.clone(),
832        }),
833        // Const: accepts Any input, emits Any output (actual type is inferred from value at runtime)
834        CompositionNode::Const { .. } => Some(ResolvedType {
835            input: NType::Any,
836            output: NType::Any,
837        }),
838        CompositionNode::Sequential { stages } => check_sequential(stages, store, errors),
839        CompositionNode::Parallel { branches } => check_parallel(branches, store, errors),
840        CompositionNode::Branch {
841            predicate,
842            if_true,
843            if_false,
844        } => check_branch(predicate, if_true, if_false, store, errors),
845        CompositionNode::Fanout { source, targets } => check_fanout(source, targets, store, errors),
846        CompositionNode::Merge { sources, target } => check_merge(sources, target, store, errors),
847        CompositionNode::Retry { stage, .. } => check_node(stage, store, errors),
848        CompositionNode::Let { bindings, body } => check_let(bindings, body, store, errors),
849    }
850}
851
852/// Type-check a `Let` node.
853///
854/// Each binding sees the **outer Let input**. The body sees an augmented
855/// record `{ ...outer-input fields, <binding>: <binding-output> }`. The
856/// Let's overall input requirement is the union of:
857///   - every binding's input field requirements (each binding sees the same
858///     outer input), and
859///   - any field the body's input requires that is *not* satisfied by a
860///     binding (those must come through from the outer input).
861///
862/// The Let's output is the body's output. When inputs are not Records (e.g.
863/// `Any`), we conservatively widen to `NType::Any` rather than failing.
864fn check_let(
865    bindings: &BTreeMap<String, CompositionNode>,
866    body: &CompositionNode,
867    store: &(impl StageStore + ?Sized),
868    errors: &mut Vec<GraphTypeError>,
869) -> Option<ResolvedType> {
870    if bindings.is_empty() {
871        errors.push(GraphTypeError::EmptyNode {
872            operator: "Let".into(),
873        });
874        return None;
875    }
876
877    // Resolve every binding's types.
878    let mut binding_outputs: BTreeMap<String, NType> = BTreeMap::new();
879    let mut required_input: BTreeMap<String, NType> = BTreeMap::new();
880    let mut any_input = false;
881
882    for (name, node) in bindings {
883        let resolved = check_node(node, store, errors)?;
884        binding_outputs.insert(name.clone(), resolved.output);
885        match resolved.input {
886            NType::Record(fields) => {
887                for (f, ty) in fields {
888                    required_input.insert(f, ty);
889                }
890            }
891            NType::Any => {
892                any_input = true;
893            }
894            other => {
895                // A binding that wants a non-Record, non-Any input doesn't
896                // compose cleanly with the Let's record-shaped input. We
897                // conservatively require the outer input to be Any.
898                let _ = other;
899                any_input = true;
900            }
901        }
902    }
903
904    // Build the body's input record by merging outer-input requirements with
905    // the binding outputs (bindings shadow outer fields with the same name).
906    let mut body_input_fields = required_input.clone();
907    for (name, out_ty) in &binding_outputs {
908        body_input_fields.insert(name.clone(), out_ty.clone());
909    }
910
911    let body_resolved = check_node(body, store, errors)?;
912
913    // Verify the body's input is satisfied by the augmented record. For each
914    // field the body requires, either it must come from a binding output (in
915    // which case the binding's output must be a subtype of the expected
916    // field) or from the outer input — in which case we add it to the Let's
917    // overall input requirement.
918    if let NType::Record(body_fields) = &body_resolved.input {
919        for (name, expected_ty) in body_fields {
920            let provided = body_input_fields.get(name).cloned();
921            match provided {
922                Some(actual) => {
923                    if let TypeCompatibility::Incompatible(reason) =
924                        is_subtype_of(&actual, expected_ty)
925                    {
926                        errors.push(GraphTypeError::SequentialTypeMismatch {
927                            position: 0,
928                            from_output: actual,
929                            to_input: expected_ty.clone(),
930                            reason,
931                        });
932                    }
933                }
934                None => {
935                    // Body needs a field neither bindings nor known outer
936                    // requirements provide. Mark it as required from outer
937                    // input.
938                    required_input.insert(name.clone(), expected_ty.clone());
939                }
940            }
941        }
942    }
943
944    let input = if any_input || required_input.is_empty() {
945        NType::Any
946    } else {
947        NType::Record(required_input)
948    };
949
950    Some(ResolvedType {
951        input,
952        output: body_resolved.output,
953    })
954}
955
956fn check_stage(
957    id: &StageId,
958    store: &(impl StageStore + ?Sized),
959    errors: &mut Vec<GraphTypeError>,
960) -> Option<ResolvedType> {
961    match store.get(id) {
962        Ok(Some(stage)) => Some(ResolvedType {
963            input: stage.signature.input.clone(),
964            output: stage.signature.output.clone(),
965        }),
966        _ => {
967            errors.push(GraphTypeError::StageNotFound { id: id.clone() });
968            None
969        }
970    }
971}
972
973fn check_sequential(
974    stages: &[CompositionNode],
975    store: &(impl StageStore + ?Sized),
976    errors: &mut Vec<GraphTypeError>,
977) -> Option<ResolvedType> {
978    if stages.is_empty() {
979        errors.push(GraphTypeError::EmptyNode {
980            operator: "Sequential".into(),
981        });
982        return None;
983    }
984
985    let resolved: Vec<Option<ResolvedType>> = stages
986        .iter()
987        .map(|s| check_node(s, store, errors))
988        .collect();
989
990    // Check consecutive pairs
991    for i in 0..resolved.len() - 1 {
992        if let (Some(from), Some(to)) = (&resolved[i], &resolved[i + 1]) {
993            if let TypeCompatibility::Incompatible(reason) = is_subtype_of(&from.output, &to.input)
994            {
995                errors.push(GraphTypeError::SequentialTypeMismatch {
996                    position: i,
997                    from_output: from.output.clone(),
998                    to_input: to.input.clone(),
999                    reason,
1000                });
1001            }
1002        }
1003    }
1004
1005    let first_input = resolved
1006        .first()
1007        .and_then(|r| r.as_ref())
1008        .map(|r| r.input.clone());
1009    let last_output = resolved
1010        .last()
1011        .and_then(|r| r.as_ref())
1012        .map(|r| r.output.clone());
1013
1014    match (first_input, last_output) {
1015        (Some(input), Some(output)) => Some(ResolvedType { input, output }),
1016        _ => None,
1017    }
1018}
1019
1020fn check_parallel(
1021    branches: &BTreeMap<String, CompositionNode>,
1022    store: &(impl StageStore + ?Sized),
1023    errors: &mut Vec<GraphTypeError>,
1024) -> Option<ResolvedType> {
1025    if branches.is_empty() {
1026        errors.push(GraphTypeError::EmptyNode {
1027            operator: "Parallel".into(),
1028        });
1029        return None;
1030    }
1031
1032    let mut input_fields = BTreeMap::new();
1033    let mut output_fields = BTreeMap::new();
1034
1035    for (name, node) in branches {
1036        if let Some(resolved) = check_node(node, store, errors) {
1037            input_fields.insert(name.clone(), resolved.input);
1038            output_fields.insert(name.clone(), resolved.output);
1039        }
1040    }
1041
1042    if input_fields.len() == branches.len() {
1043        Some(ResolvedType {
1044            input: NType::Record(input_fields),
1045            output: NType::Record(output_fields),
1046        })
1047    } else {
1048        None
1049    }
1050}
1051
1052fn check_branch(
1053    predicate: &CompositionNode,
1054    if_true: &CompositionNode,
1055    if_false: &CompositionNode,
1056    store: &(impl StageStore + ?Sized),
1057    errors: &mut Vec<GraphTypeError>,
1058) -> Option<ResolvedType> {
1059    let pred = check_node(predicate, store, errors);
1060    let true_branch = check_node(if_true, store, errors);
1061    let false_branch = check_node(if_false, store, errors);
1062
1063    // Check predicate output is Bool
1064    if let Some(ref p) = pred {
1065        if let TypeCompatibility::Incompatible(_) = is_subtype_of(&p.output, &NType::Bool) {
1066            errors.push(GraphTypeError::BranchPredicateNotBool {
1067                actual: p.output.clone(),
1068            });
1069        }
1070    }
1071
1072    // Branch outputs are unioned — both paths are valid return types.
1073    // No compatibility check required between branches; the consumer
1074    // of the branch output must handle the union type.
1075    match (pred, true_branch, false_branch) {
1076        (Some(p), Some(t), Some(f)) => Some(ResolvedType {
1077            input: p.input,
1078            output: NType::union(vec![t.output, f.output]),
1079        }),
1080        _ => None,
1081    }
1082}
1083
1084fn check_fanout(
1085    source: &CompositionNode,
1086    targets: &[CompositionNode],
1087    store: &(impl StageStore + ?Sized),
1088    errors: &mut Vec<GraphTypeError>,
1089) -> Option<ResolvedType> {
1090    if targets.is_empty() {
1091        errors.push(GraphTypeError::EmptyNode {
1092            operator: "Fanout".into(),
1093        });
1094        return None;
1095    }
1096
1097    let src = check_node(source, store, errors);
1098    let tgts: Vec<Option<ResolvedType>> = targets
1099        .iter()
1100        .map(|t| check_node(t, store, errors))
1101        .collect();
1102
1103    // Check source output is subtype of each target input
1104    if let Some(ref s) = src {
1105        for (i, t) in tgts.iter().enumerate() {
1106            if let Some(ref t) = t {
1107                if let TypeCompatibility::Incompatible(reason) = is_subtype_of(&s.output, &t.input)
1108                {
1109                    errors.push(GraphTypeError::FanoutInputMismatch {
1110                        target_index: i,
1111                        source_output: s.output.clone(),
1112                        target_input: t.input.clone(),
1113                        reason,
1114                    });
1115                }
1116            }
1117        }
1118    }
1119
1120    let output_types: Vec<NType> = tgts
1121        .iter()
1122        .filter_map(|t| t.as_ref().map(|r| r.output.clone()))
1123        .collect();
1124
1125    match src {
1126        Some(s) if output_types.len() == targets.len() => Some(ResolvedType {
1127            input: s.input,
1128            output: NType::List(Box::new(if output_types.len() == 1 {
1129                output_types.into_iter().next().unwrap()
1130            } else {
1131                NType::union(output_types)
1132            })),
1133        }),
1134        _ => None,
1135    }
1136}
1137
1138fn check_merge(
1139    sources: &[CompositionNode],
1140    target: &CompositionNode,
1141    store: &(impl StageStore + ?Sized),
1142    errors: &mut Vec<GraphTypeError>,
1143) -> Option<ResolvedType> {
1144    if sources.is_empty() {
1145        errors.push(GraphTypeError::EmptyNode {
1146            operator: "Merge".into(),
1147        });
1148        return None;
1149    }
1150
1151    let srcs: Vec<Option<ResolvedType>> = sources
1152        .iter()
1153        .map(|s| check_node(s, store, errors))
1154        .collect();
1155    let tgt = check_node(target, store, errors);
1156
1157    // Build merged output record from sources
1158    let mut merged_fields = BTreeMap::new();
1159    for (i, s) in srcs.iter().enumerate() {
1160        if let Some(ref r) = s {
1161            merged_fields.insert(format!("source_{i}"), r.output.clone());
1162        }
1163    }
1164    let merged_type = NType::Record(merged_fields);
1165
1166    // Check merged type is subtype of target input
1167    if let Some(ref t) = tgt {
1168        if let TypeCompatibility::Incompatible(reason) = is_subtype_of(&merged_type, &t.input) {
1169            errors.push(GraphTypeError::MergeOutputMismatch {
1170                merged_type: merged_type.clone(),
1171                target_input: t.input.clone(),
1172                reason,
1173            });
1174        }
1175    }
1176
1177    // Overall: input is record of source inputs, output is target output
1178    let mut input_fields = BTreeMap::new();
1179    for (i, s) in srcs.iter().enumerate() {
1180        if let Some(ref r) = s {
1181            input_fields.insert(format!("source_{i}"), r.input.clone());
1182        }
1183    }
1184
1185    match tgt {
1186        Some(t) => Some(ResolvedType {
1187            input: NType::Record(input_fields),
1188            output: t.output,
1189        }),
1190        None => None,
1191    }
1192}
1193
1194#[cfg(test)]
1195mod tests {
1196    use super::*;
1197    use noether_core::capability::Capability;
1198    use noether_core::effects::EffectSet;
1199    use noether_core::stage::{CostEstimate, Stage, StageSignature};
1200    use noether_store::MemoryStore;
1201    use std::collections::BTreeSet;
1202
1203    fn make_stage(id: &str, input: NType, output: NType) -> Stage {
1204        Stage {
1205            id: StageId(id.into()),
1206            canonical_id: None,
1207            signature: StageSignature {
1208                input,
1209                output,
1210                effects: EffectSet::pure(),
1211                implementation_hash: format!("impl_{id}"),
1212            },
1213            capabilities: BTreeSet::new(),
1214            cost: CostEstimate {
1215                time_ms_p50: Some(10),
1216                tokens_est: None,
1217                memory_mb: None,
1218            },
1219            description: format!("test stage {id}"),
1220            examples: vec![],
1221            lifecycle: noether_core::stage::StageLifecycle::Active,
1222            ed25519_signature: None,
1223            signer_public_key: None,
1224            implementation_code: None,
1225            implementation_language: None,
1226            ui_style: None,
1227            tags: vec![],
1228            aliases: vec![],
1229        }
1230    }
1231
1232    fn test_store() -> MemoryStore {
1233        let mut store = MemoryStore::new();
1234        store
1235            .put(make_stage("text_to_num", NType::Text, NType::Number))
1236            .unwrap();
1237        store
1238            .put(make_stage("num_to_bool", NType::Number, NType::Bool))
1239            .unwrap();
1240        store
1241            .put(make_stage("text_to_text", NType::Text, NType::Text))
1242            .unwrap();
1243        store
1244            .put(make_stage("bool_pred", NType::Text, NType::Bool))
1245            .unwrap();
1246        store
1247            .put(make_stage("any_to_text", NType::Any, NType::Text))
1248            .unwrap();
1249        store
1250    }
1251
1252    fn stage(id: &str) -> CompositionNode {
1253        CompositionNode::Stage {
1254            id: StageId(id.into()),
1255            config: None,
1256        }
1257    }
1258
1259    #[test]
1260    fn check_single_stage() {
1261        let store = test_store();
1262        let result = check_graph(&stage("text_to_num"), &store);
1263        let check = result.unwrap();
1264        assert_eq!(check.resolved.input, NType::Text);
1265        assert_eq!(check.resolved.output, NType::Number);
1266    }
1267
1268    #[test]
1269    fn check_missing_stage() {
1270        let store = test_store();
1271        let result = check_graph(&stage("nonexistent"), &store);
1272        assert!(result.is_err());
1273        let errors = result.unwrap_err();
1274        assert!(matches!(errors[0], GraphTypeError::StageNotFound { .. }));
1275    }
1276
1277    #[test]
1278    fn check_valid_sequential() {
1279        let store = test_store();
1280        let node = CompositionNode::Sequential {
1281            stages: vec![stage("text_to_num"), stage("num_to_bool")],
1282        };
1283        let result = check_graph(&node, &store);
1284        let check = result.unwrap();
1285        assert_eq!(check.resolved.input, NType::Text);
1286        assert_eq!(check.resolved.output, NType::Bool);
1287    }
1288
1289    #[test]
1290    fn check_invalid_sequential() {
1291        let store = test_store();
1292        // Bool output cannot feed Text input
1293        let node = CompositionNode::Sequential {
1294            stages: vec![stage("num_to_bool"), stage("text_to_num")],
1295        };
1296        let result = check_graph(&node, &store);
1297        assert!(result.is_err());
1298        let errors = result.unwrap_err();
1299        assert!(matches!(
1300            errors[0],
1301            GraphTypeError::SequentialTypeMismatch { .. }
1302        ));
1303    }
1304
1305    #[test]
1306    fn check_parallel() {
1307        let store = test_store();
1308        let node = CompositionNode::Parallel {
1309            branches: BTreeMap::from([
1310                ("nums".into(), stage("text_to_num")),
1311                ("bools".into(), stage("bool_pred")),
1312            ]),
1313        };
1314        let result = check_graph(&node, &store);
1315        let check = result.unwrap();
1316        // Input is Record { bools: Text, nums: Text }
1317        // Output is Record { bools: Bool, nums: Number }
1318        assert!(matches!(check.resolved.input, NType::Record(_)));
1319        assert!(matches!(check.resolved.output, NType::Record(_)));
1320    }
1321
1322    #[test]
1323    fn check_branch_valid() {
1324        let store = test_store();
1325        let node = CompositionNode::Branch {
1326            predicate: Box::new(stage("bool_pred")),
1327            if_true: Box::new(stage("text_to_num")),
1328            if_false: Box::new(stage("text_to_text")),
1329        };
1330        // Predicate: Text -> Bool ✓
1331        // Both branches take Text, so input matches
1332        // Outputs are Number and Text, which union into Number | Text
1333        let result = check_graph(&node, &store);
1334        let check = result.unwrap();
1335        assert_eq!(check.resolved.input, NType::Text);
1336    }
1337
1338    #[test]
1339    fn check_retry_transparent() {
1340        let store = test_store();
1341        let node = CompositionNode::Retry {
1342            stage: Box::new(stage("text_to_num")),
1343            max_attempts: 3,
1344            delay_ms: Some(100),
1345        };
1346        let result = check_graph(&node, &store);
1347        let check = result.unwrap();
1348        assert_eq!(check.resolved.input, NType::Text);
1349        assert_eq!(check.resolved.output, NType::Number);
1350    }
1351
1352    #[test]
1353    fn capability_policy_allow_all_passes() {
1354        let mut store = test_store();
1355        let mut stage_net = make_stage("net_stage", NType::Text, NType::Text);
1356        stage_net.capabilities.insert(Capability::Network);
1357        store.put(stage_net).unwrap();
1358
1359        let policy = CapabilityPolicy::allow_all();
1360        let violations = check_capabilities(&stage("net_stage"), &store, &policy);
1361        assert!(violations.is_empty());
1362    }
1363
1364    #[test]
1365    fn capability_policy_restrict_blocks_network() {
1366        let mut store = test_store();
1367        let mut stage_net = make_stage("net_stage2", NType::Text, NType::Text);
1368        stage_net.capabilities.insert(Capability::Network);
1369        store.put(stage_net).unwrap();
1370
1371        let policy = CapabilityPolicy::restrict([Capability::FsRead]);
1372        let violations = check_capabilities(&stage("net_stage2"), &store, &policy);
1373        assert_eq!(violations.len(), 1);
1374        assert!(matches!(violations[0].required, Capability::Network));
1375    }
1376
1377    #[test]
1378    fn capability_policy_restrict_allows_declared() {
1379        let mut store = test_store();
1380        let mut stage_net = make_stage("net_stage3", NType::Text, NType::Text);
1381        stage_net.capabilities.insert(Capability::Network);
1382        store.put(stage_net).unwrap();
1383
1384        let policy = CapabilityPolicy::restrict([Capability::Network]);
1385        let violations = check_capabilities(&stage("net_stage3"), &store, &policy);
1386        assert!(violations.is_empty());
1387    }
1388
1389    #[test]
1390    fn remote_stage_resolves_declared_types() {
1391        let store = test_store();
1392        let node = CompositionNode::RemoteStage {
1393            url: "http://api.example.com".into(),
1394            input: NType::Text,
1395            output: NType::Number,
1396        };
1397        let result = check_graph(&node, &store).unwrap();
1398        assert_eq!(result.resolved.input, NType::Text);
1399        assert_eq!(result.resolved.output, NType::Number);
1400    }
1401
1402    #[test]
1403    fn remote_stage_in_sequential_type_flows() {
1404        let mut store = test_store();
1405        store
1406            .put(make_stage("num_render", NType::Number, NType::Text))
1407            .unwrap();
1408
1409        // Text -> RemoteStage(Text->Number) -> num_render(Number->Text) = Text->Text
1410        let node = CompositionNode::Sequential {
1411            stages: vec![
1412                CompositionNode::RemoteStage {
1413                    url: "http://api:8080".into(),
1414                    input: NType::Text,
1415                    output: NType::Number,
1416                },
1417                CompositionNode::Stage {
1418                    id: StageId("num_render".into()),
1419                    config: None,
1420                },
1421            ],
1422        };
1423        let result = check_graph(&node, &store).unwrap();
1424        assert_eq!(result.resolved.input, NType::Text);
1425        assert_eq!(result.resolved.output, NType::Text);
1426    }
1427
1428    #[test]
1429    fn remote_stage_type_mismatch_is_detected() {
1430        let store = test_store();
1431        // RemoteStage outputs Number, but next stage expects Text
1432        let node = CompositionNode::Sequential {
1433            stages: vec![
1434                CompositionNode::RemoteStage {
1435                    url: "http://api:8080".into(),
1436                    input: NType::Text,
1437                    output: NType::Bool,
1438                },
1439                CompositionNode::Stage {
1440                    id: StageId("text_to_num".into()),
1441                    config: None,
1442                },
1443            ],
1444        };
1445        let result = check_graph(&node, &store);
1446        assert!(result.is_err());
1447        let errors = result.unwrap_err();
1448        assert!(errors
1449            .iter()
1450            .any(|e| matches!(e, GraphTypeError::SequentialTypeMismatch { .. })));
1451    }
1452
1453    // ── Effect inference ────────────────────────────────────────────────────
1454
1455    fn make_stage_with_effects(id: &str, effects: EffectSet) -> Stage {
1456        let mut s = make_stage(id, NType::Any, NType::Any);
1457        s.signature.effects = effects;
1458        s
1459    }
1460
1461    #[test]
1462    fn infer_effects_pure_stage() {
1463        let mut store = MemoryStore::new();
1464        let stage = make_stage_with_effects("pure1", EffectSet::pure());
1465        store.put(stage.clone()).unwrap();
1466        let node = CompositionNode::Stage {
1467            id: StageId("pure1".into()),
1468            config: None,
1469        };
1470        let effects = infer_effects(&node, &store);
1471        assert!(effects.contains(&Effect::Pure));
1472        assert!(!effects.contains(&Effect::Network));
1473    }
1474
1475    #[test]
1476    fn infer_effects_union_sequential() {
1477        let mut store = MemoryStore::new();
1478        store
1479            .put(make_stage_with_effects("a", EffectSet::new([Effect::Pure])))
1480            .unwrap();
1481        store
1482            .put(make_stage_with_effects(
1483                "b",
1484                EffectSet::new([Effect::Network]),
1485            ))
1486            .unwrap();
1487        let node = CompositionNode::Sequential {
1488            stages: vec![
1489                CompositionNode::Stage {
1490                    id: StageId("a".into()),
1491                    config: None,
1492                },
1493                CompositionNode::Stage {
1494                    id: StageId("b".into()),
1495                    config: None,
1496                },
1497            ],
1498        };
1499        let effects = infer_effects(&node, &store);
1500        assert!(effects.contains(&Effect::Pure));
1501        assert!(effects.contains(&Effect::Network));
1502    }
1503
1504    #[test]
1505    fn infer_effects_remote_stage_adds_network() {
1506        let store = MemoryStore::new();
1507        let node = CompositionNode::RemoteStage {
1508            url: "http://localhost:8080".into(),
1509            input: NType::Any,
1510            output: NType::Any,
1511        };
1512        let effects = infer_effects(&node, &store);
1513        assert!(effects.contains(&Effect::Network));
1514        assert!(effects.contains(&Effect::Fallible));
1515    }
1516
1517    #[test]
1518    fn infer_effects_missing_stage_adds_unknown() {
1519        let store = MemoryStore::new();
1520        let node = CompositionNode::Stage {
1521            id: StageId("missing".into()),
1522            config: None,
1523        };
1524        let effects = infer_effects(&node, &store);
1525        assert!(effects.contains(&Effect::Unknown));
1526    }
1527
1528    // ── Effect policy ───────────────────────────────────────────────────────
1529
1530    #[test]
1531    fn effect_policy_allow_all_never_violates() {
1532        let mut store = MemoryStore::new();
1533        store
1534            .put(make_stage_with_effects(
1535                "net",
1536                EffectSet::new([Effect::Network, Effect::Fallible]),
1537            ))
1538            .unwrap();
1539        let node = CompositionNode::Stage {
1540            id: StageId("net".into()),
1541            config: None,
1542        };
1543        let policy = EffectPolicy::allow_all();
1544        assert!(check_effects(&node, &store, &policy).is_empty());
1545    }
1546
1547    #[test]
1548    fn effect_policy_restrict_blocks_network() {
1549        let mut store = MemoryStore::new();
1550        store
1551            .put(make_stage_with_effects(
1552                "net",
1553                EffectSet::new([Effect::Network]),
1554            ))
1555            .unwrap();
1556        let node = CompositionNode::Stage {
1557            id: StageId("net".into()),
1558            config: None,
1559        };
1560        let policy = EffectPolicy::restrict([EffectKind::Pure]);
1561        let violations = check_effects(&node, &store, &policy);
1562        assert!(!violations.is_empty());
1563        assert!(violations[0].message.contains("network"));
1564    }
1565
1566    #[test]
1567    fn effect_policy_restrict_allows_matching_effect() {
1568        let mut store = MemoryStore::new();
1569        store
1570            .put(make_stage_with_effects(
1571                "llm",
1572                EffectSet::new([Effect::Llm {
1573                    model: "gpt-4o".into(),
1574                }]),
1575            ))
1576            .unwrap();
1577        let node = CompositionNode::Stage {
1578            id: StageId("llm".into()),
1579            config: None,
1580        };
1581        let policy = EffectPolicy::restrict([EffectKind::Llm]);
1582        assert!(check_effects(&node, &store, &policy).is_empty());
1583    }
1584}