Skip to main content

noether_engine/lagrange/
resolver.rs

1//! Pinning resolution pass.
2//!
3//! Rewrites a `CompositionNode` tree so every `Stage` node's `id` field
4//! holds a concrete [`noether_core::stage::StageId`]
5//! (implementation-level hash). After this pass runs, downstream
6//! passes — effect inference, `--allow-effects` enforcement, Ed25519
7//! verification, planner cost/parallel grouping, budget collection,
8//! grid-broker splitter — can look up stages via `store.get(id)`
9//! without regard for the original pinning.
10//!
11//! ## Why a separate pass?
12//!
13//! M2 introduced [`crate::lagrange::Pinning`]: a `Stage` node's `id`
14//! is either a `SignatureId` (resolve to the current Active impl) or
15//! an `ImplementationId` (bit-exact lookup). Teaching every downstream
16//! pass about pinning would have been a dozen file changes, each of
17//! them easy to get wrong.
18//!
19//! The pass approach is:
20//!
21//! 1. Call `resolve_pinning(&mut graph, &store)` once, after graph
22//!    construction and after any prefix/name resolution.
23//! 2. Every subsequent pass works on the mutated graph, where `id`
24//!    is guaranteed to be an `ImplementationId` that exists in the
25//!    store.
26//!
27//! This commits to "resolve once per execution". If the store changes
28//! between resolution and execution, the resolved graph keeps
29//! referring to the old implementation — a feature, not a bug: we
30//! want a single execution to see a consistent snapshot.
31//!
32//! ## What the pass does NOT do
33//!
34//! - It does not change the `pinning` field. A node that was declared
35//!   `Pinning::Signature` keeps that label even after its `id` has
36//!   been rewritten to an implementation hash. Consumers that
37//!   re-serialise the graph preserve the user's original intent (the
38//!   wire format's `pinning: "signature"` still means "signature" on
39//!   a future execution, not "both").
40//! - It does not walk `RemoteStage` — that's resolved at call-time
41//!   over HTTP, not via the local store.
42
43use crate::lagrange::ast::{CompositionNode, Pinning};
44use noether_core::stage::{SignatureId, StageId, StageLifecycle};
45use noether_store::StageStore;
46
47/// Error raised when a `Stage` node's reference cannot be resolved
48/// against the store.
49#[derive(Debug, Clone, PartialEq, thiserror::Error)]
50pub enum ResolutionError {
51    #[error(
52        "stage node with pinning=signature has id `{signature_id}` — \
53         no Active stage in the store matches that signature"
54    )]
55    SignatureNotFound { signature_id: String },
56
57    #[error(
58        "stage node with pinning=both has id `{implementation_id}` — \
59         no stage in the store has that implementation ID"
60    )]
61    ImplementationNotFound { implementation_id: String },
62
63    #[error(
64        "stage node with pinning=both has id `{implementation_id}` — \
65         the stage exists but its lifecycle is {lifecycle:?}; only \
66         Active stages may be referenced"
67    )]
68    ImplementationNotActive {
69        implementation_id: String,
70        lifecycle: StageLifecycle,
71    },
72}
73
74/// Walk a composition tree and rewrite every `Stage` node's `id`
75/// field to a concrete, in-store [`StageId`]. See the module doc for
76/// rationale and invariants.
77///
78/// Returns the list of rewrites and diagnostics performed on success,
79/// or the first [`ResolutionError`] on failure. The graph is left
80/// partially-rewritten on error; callers that want atomic behaviour
81/// should clone before calling.
82pub fn resolve_pinning<S>(
83    node: &mut CompositionNode,
84    store: &S,
85) -> Result<ResolutionReport, ResolutionError>
86where
87    S: StageStore + ?Sized,
88{
89    let mut report = ResolutionReport::default();
90    resolve_recursive(node, store, &mut report)?;
91    Ok(report)
92}
93
94/// Output of a successful [`resolve_pinning`] pass.
95#[derive(Debug, Clone, PartialEq, Default)]
96pub struct ResolutionReport {
97    /// One entry per Stage node whose `id` was changed by the pass.
98    pub rewrites: Vec<Rewrite>,
99    /// One entry per signature-pinned node where more than one Active
100    /// implementation matched — a "≤1 Active per signature" invariant
101    /// violation the CLI surfaces to the user. The pass still picks a
102    /// deterministic winner via [`noether_store::StageStore::get_by_signature`];
103    /// the warning exists so the user notices and fixes the store.
104    pub warnings: Vec<MultiActiveWarning>,
105}
106
107/// Record of one rewrite the pass performed. Useful for tracing:
108/// `noether run --trace-resolution` can print the before/after pairs.
109#[derive(Debug, Clone, PartialEq)]
110pub struct Rewrite {
111    pub before: String,
112    pub after: String,
113    pub pinning: Pinning,
114}
115
116/// Diagnostic raised when a signature-pinned ref matches more than
117/// one Active implementation. See [`ResolutionReport::warnings`].
118#[derive(Debug, Clone, PartialEq)]
119pub struct MultiActiveWarning {
120    pub signature_id: String,
121    pub active_implementation_ids: Vec<String>,
122    pub chosen: String,
123}
124
125fn resolve_recursive<S>(
126    node: &mut CompositionNode,
127    store: &S,
128    report: &mut ResolutionReport,
129) -> Result<(), ResolutionError>
130where
131    S: StageStore + ?Sized,
132{
133    match node {
134        CompositionNode::Stage { id, pinning, .. } => {
135            let before = id.0.clone();
136            // Diagnostic check for signature pinning: emit a warning if
137            // more than one Active impl matches.
138            if matches!(*pinning, Pinning::Signature) {
139                let sig = SignatureId(id.0.clone());
140                let matches = store.active_stages_with_signature(&sig);
141                if matches.len() > 1 {
142                    report.warnings.push(MultiActiveWarning {
143                        signature_id: id.0.clone(),
144                        active_implementation_ids: matches.iter().map(|s| s.id.0.clone()).collect(),
145                        chosen: matches[0].id.0.clone(),
146                    });
147                }
148            }
149            let resolved = resolve_single(id, *pinning, store)?;
150            if resolved.0 != before {
151                report.rewrites.push(Rewrite {
152                    before,
153                    after: resolved.0.clone(),
154                    pinning: *pinning,
155                });
156                *id = resolved;
157            }
158            Ok(())
159        }
160        // RemoteStage is resolved at call time over HTTP. Const has no
161        // stage ID.
162        CompositionNode::RemoteStage { .. } | CompositionNode::Const { .. } => Ok(()),
163        CompositionNode::Sequential { stages } => {
164            for s in stages {
165                resolve_recursive(s, store, report)?;
166            }
167            Ok(())
168        }
169        CompositionNode::Parallel { branches } => {
170            for b in branches.values_mut() {
171                resolve_recursive(b, store, report)?;
172            }
173            Ok(())
174        }
175        CompositionNode::Branch {
176            predicate,
177            if_true,
178            if_false,
179        } => {
180            resolve_recursive(predicate, store, report)?;
181            resolve_recursive(if_true, store, report)?;
182            resolve_recursive(if_false, store, report)?;
183            Ok(())
184        }
185        CompositionNode::Fanout { source, targets } => {
186            resolve_recursive(source, store, report)?;
187            for t in targets {
188                resolve_recursive(t, store, report)?;
189            }
190            Ok(())
191        }
192        CompositionNode::Merge { sources, target } => {
193            for s in sources {
194                resolve_recursive(s, store, report)?;
195            }
196            resolve_recursive(target, store, report)?;
197            Ok(())
198        }
199        CompositionNode::Retry { stage, .. } => resolve_recursive(stage, store, report),
200        CompositionNode::Let { bindings, body } => {
201            for b in bindings.values_mut() {
202                resolve_recursive(b, store, report)?;
203            }
204            resolve_recursive(body, store, report)
205        }
206    }
207}
208
209fn resolve_single<S>(id: &StageId, pinning: Pinning, store: &S) -> Result<StageId, ResolutionError>
210where
211    S: StageStore + ?Sized,
212{
213    match pinning {
214        Pinning::Signature => {
215            // First: treat id as a SignatureId and look up the Active
216            // implementation.
217            let sig = SignatureId(id.0.clone());
218            if let Some(stage) = store.get_by_signature(&sig) {
219                return Ok(stage.id.clone());
220            }
221            // Fallback: a name- or prefix-resolver pass may have
222            // already rewritten id into an implementation hash. Accept
223            // that lookup only if it points at an Active stage.
224            if let Ok(Some(stage)) = store.get(id) {
225                if matches!(stage.lifecycle, StageLifecycle::Active) {
226                    return Ok(stage.id.clone());
227                }
228            }
229            Err(ResolutionError::SignatureNotFound {
230                signature_id: id.0.clone(),
231            })
232        }
233        Pinning::Both => match store.get(id) {
234            Ok(Some(stage)) => match &stage.lifecycle {
235                StageLifecycle::Active => Ok(stage.id.clone()),
236                other => Err(ResolutionError::ImplementationNotActive {
237                    implementation_id: id.0.clone(),
238                    lifecycle: other.clone(),
239                }),
240            },
241            _ => Err(ResolutionError::ImplementationNotFound {
242                implementation_id: id.0.clone(),
243            }),
244        },
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251    use noether_core::effects::EffectSet;
252    use noether_core::stage::{CostEstimate, SignatureId, Stage, StageSignature};
253    use noether_core::types::NType;
254    use noether_store::MemoryStore;
255    use std::collections::{BTreeMap, BTreeSet};
256
257    fn make_stage(impl_id: &str, sig_id: Option<&str>, lifecycle: StageLifecycle) -> Stage {
258        Stage {
259            id: StageId(impl_id.into()),
260            signature_id: sig_id.map(|s| SignatureId(s.into())),
261            signature: StageSignature {
262                input: NType::Text,
263                output: NType::Number,
264                effects: EffectSet::pure(),
265                implementation_hash: format!("impl_{impl_id}"),
266            },
267            capabilities: BTreeSet::new(),
268            cost: CostEstimate {
269                time_ms_p50: None,
270                tokens_est: None,
271                memory_mb: None,
272            },
273            description: "test".into(),
274            examples: vec![],
275            lifecycle,
276            ed25519_signature: None,
277            signer_public_key: None,
278            implementation_code: None,
279            implementation_language: None,
280            ui_style: None,
281            tags: vec![],
282            aliases: vec![],
283            name: None,
284            properties: vec![],
285        }
286    }
287
288    fn store_with_impl(impl_id: &str, sig_id: &str) -> MemoryStore {
289        let mut store = MemoryStore::new();
290        store
291            .put(make_stage(impl_id, Some(sig_id), StageLifecycle::Active))
292            .unwrap();
293        store
294    }
295
296    #[test]
297    fn signature_pinning_rewrites_to_impl_id() {
298        let store = store_with_impl("impl_abc", "sig_xyz");
299
300        let mut node = CompositionNode::Stage {
301            id: StageId("sig_xyz".into()),
302            pinning: Pinning::Signature,
303            config: None,
304        };
305        let report = resolve_pinning(&mut node, &store).unwrap();
306
307        match &node {
308            CompositionNode::Stage { id, pinning, .. } => {
309                assert_eq!(id.0, "impl_abc", "id should be rewritten to impl hash");
310                // Pinning label is preserved — re-serialisation keeps user intent.
311                assert_eq!(*pinning, Pinning::Signature);
312            }
313            _ => panic!("expected Stage"),
314        }
315        assert_eq!(report.rewrites.len(), 1);
316        assert_eq!(report.rewrites[0].before, "sig_xyz");
317        assert_eq!(report.rewrites[0].after, "impl_abc");
318    }
319
320    #[test]
321    fn both_pinning_accepts_matching_impl_id() {
322        let store = store_with_impl("impl_abc", "sig_xyz");
323
324        let mut node = CompositionNode::Stage {
325            id: StageId("impl_abc".into()),
326            pinning: Pinning::Both,
327            config: None,
328        };
329        let report = resolve_pinning(&mut node, &store).unwrap();
330
331        // No rewrite — it already held a valid impl_id.
332        assert!(report.rewrites.is_empty());
333    }
334
335    #[test]
336    fn both_pinning_rejects_missing_impl() {
337        let store = store_with_impl("impl_abc", "sig_xyz");
338
339        let mut node = CompositionNode::Stage {
340            id: StageId("impl_does_not_exist".into()),
341            pinning: Pinning::Both,
342            config: None,
343        };
344        let err = resolve_pinning(&mut node, &store).unwrap_err();
345        assert!(matches!(
346            err,
347            ResolutionError::ImplementationNotFound { .. }
348        ));
349    }
350
351    #[test]
352    fn both_pinning_rejects_deprecated_impl() {
353        let mut store = MemoryStore::new();
354        store
355            .put(make_stage(
356                "impl_old",
357                Some("sig_xyz"),
358                StageLifecycle::Active,
359            ))
360            .unwrap();
361        store
362            .put(make_stage(
363                "impl_new",
364                Some("sig_xyz"),
365                StageLifecycle::Active,
366            ))
367            .unwrap();
368        store
369            .update_lifecycle(
370                &StageId("impl_old".into()),
371                StageLifecycle::Deprecated {
372                    successor_id: StageId("impl_new".into()),
373                },
374            )
375            .unwrap();
376
377        let mut node = CompositionNode::Stage {
378            id: StageId("impl_old".into()),
379            pinning: Pinning::Both,
380            config: None,
381        };
382        let err = resolve_pinning(&mut node, &store).unwrap_err();
383        assert!(matches!(
384            err,
385            ResolutionError::ImplementationNotActive { .. }
386        ));
387    }
388
389    #[test]
390    fn signature_pinning_rejects_missing_signature() {
391        let store = store_with_impl("impl_abc", "sig_xyz");
392
393        let mut node = CompositionNode::Stage {
394            id: StageId("sig_does_not_exist".into()),
395            pinning: Pinning::Signature,
396            config: None,
397        };
398        let err = resolve_pinning(&mut node, &store).unwrap_err();
399        assert!(matches!(err, ResolutionError::SignatureNotFound { .. }));
400    }
401
402    #[test]
403    fn signature_pinning_falls_back_to_impl_id_for_legacy_flows() {
404        // A prefix-resolver pass may have rewritten the id into an
405        // impl_id already. resolve_pinning accepts that, provided the
406        // stage is Active.
407        let store = store_with_impl("impl_abc", "sig_xyz");
408
409        let mut node = CompositionNode::Stage {
410            id: StageId("impl_abc".into()),
411            pinning: Pinning::Signature,
412            config: None,
413        };
414        let report = resolve_pinning(&mut node, &store).unwrap();
415        // No rewrite needed — the id already pointed at the right stage.
416        assert!(report.rewrites.is_empty());
417    }
418
419    #[test]
420    fn walks_into_nested_sequential() {
421        let store = store_with_impl("impl_abc", "sig_xyz");
422
423        let mut node = CompositionNode::Sequential {
424            stages: vec![
425                CompositionNode::Stage {
426                    id: StageId("sig_xyz".into()),
427                    pinning: Pinning::Signature,
428                    config: None,
429                },
430                CompositionNode::Stage {
431                    id: StageId("sig_xyz".into()),
432                    pinning: Pinning::Signature,
433                    config: None,
434                },
435            ],
436        };
437        let report = resolve_pinning(&mut node, &store).unwrap();
438        assert_eq!(report.rewrites.len(), 2);
439    }
440
441    #[test]
442    fn walks_into_parallel_branches() {
443        let store = store_with_impl("impl_abc", "sig_xyz");
444
445        let mut branches = BTreeMap::new();
446        branches.insert(
447            "a".into(),
448            CompositionNode::Stage {
449                id: StageId("sig_xyz".into()),
450                pinning: Pinning::Signature,
451                config: None,
452            },
453        );
454        branches.insert(
455            "b".into(),
456            CompositionNode::Stage {
457                id: StageId("sig_xyz".into()),
458                pinning: Pinning::Signature,
459                config: None,
460            },
461        );
462        let mut node = CompositionNode::Parallel { branches };
463        let report = resolve_pinning(&mut node, &store).unwrap();
464        assert_eq!(report.rewrites.len(), 2);
465    }
466
467    #[test]
468    fn walks_into_branch_predicate_and_arms() {
469        let store = store_with_impl("impl_abc", "sig_xyz");
470        let sig = || CompositionNode::Stage {
471            id: StageId("sig_xyz".into()),
472            pinning: Pinning::Signature,
473            config: None,
474        };
475        let mut node = CompositionNode::Branch {
476            predicate: Box::new(sig()),
477            if_true: Box::new(sig()),
478            if_false: Box::new(sig()),
479        };
480        let report = resolve_pinning(&mut node, &store).unwrap();
481        assert_eq!(report.rewrites.len(), 3);
482    }
483
484    #[test]
485    fn walks_into_fanout_source_and_targets() {
486        let store = store_with_impl("impl_abc", "sig_xyz");
487        let sig = || CompositionNode::Stage {
488            id: StageId("sig_xyz".into()),
489            pinning: Pinning::Signature,
490            config: None,
491        };
492        let mut node = CompositionNode::Fanout {
493            source: Box::new(sig()),
494            targets: vec![sig(), sig(), sig()],
495        };
496        let report = resolve_pinning(&mut node, &store).unwrap();
497        assert_eq!(report.rewrites.len(), 4);
498    }
499
500    #[test]
501    fn walks_into_merge_sources_and_target() {
502        let store = store_with_impl("impl_abc", "sig_xyz");
503        let sig = || CompositionNode::Stage {
504            id: StageId("sig_xyz".into()),
505            pinning: Pinning::Signature,
506            config: None,
507        };
508        let mut node = CompositionNode::Merge {
509            sources: vec![sig(), sig()],
510            target: Box::new(sig()),
511        };
512        let report = resolve_pinning(&mut node, &store).unwrap();
513        assert_eq!(report.rewrites.len(), 3);
514    }
515
516    #[test]
517    fn walks_into_let_bindings_and_body() {
518        let store = store_with_impl("impl_abc", "sig_xyz");
519        let sig = || CompositionNode::Stage {
520            id: StageId("sig_xyz".into()),
521            pinning: Pinning::Signature,
522            config: None,
523        };
524        let mut bindings = BTreeMap::new();
525        bindings.insert("a".into(), sig());
526        bindings.insert("b".into(), sig());
527        let mut node = CompositionNode::Let {
528            bindings,
529            body: Box::new(sig()),
530        };
531        let report = resolve_pinning(&mut node, &store).unwrap();
532        assert_eq!(report.rewrites.len(), 3);
533    }
534
535    #[test]
536    fn walks_into_retry_inner_stage() {
537        let store = store_with_impl("impl_abc", "sig_xyz");
538        let mut node = CompositionNode::Retry {
539            stage: Box::new(CompositionNode::Stage {
540                id: StageId("sig_xyz".into()),
541                pinning: Pinning::Signature,
542                config: None,
543            }),
544            max_attempts: 3,
545            delay_ms: None,
546        };
547        let report = resolve_pinning(&mut node, &store).unwrap();
548        assert_eq!(report.rewrites.len(), 1);
549    }
550
551    #[test]
552    fn stops_at_first_error_leaves_partial_rewrites() {
553        // First Stage resolves; second does not. Graph is
554        // partially-mutated when the pass returns Err.
555        let store = store_with_impl("impl_abc", "sig_xyz");
556
557        let mut node = CompositionNode::Sequential {
558            stages: vec![
559                CompositionNode::Stage {
560                    id: StageId("sig_xyz".into()),
561                    pinning: Pinning::Signature,
562                    config: None,
563                },
564                CompositionNode::Stage {
565                    id: StageId("sig_missing".into()),
566                    pinning: Pinning::Signature,
567                    config: None,
568                },
569            ],
570        };
571        let err = resolve_pinning(&mut node, &store).unwrap_err();
572        assert!(matches!(err, ResolutionError::SignatureNotFound { .. }));
573        // Verify the first stage was rewritten before the error.
574        match &node {
575            CompositionNode::Sequential { stages } => match &stages[0] {
576                CompositionNode::Stage { id, .. } => assert_eq!(id.0, "impl_abc"),
577                _ => panic!(),
578            },
579            _ => panic!(),
580        }
581    }
582
583    #[test]
584    fn idempotent_on_already_resolved_graph() {
585        let store = store_with_impl("impl_abc", "sig_xyz");
586
587        let mut node = CompositionNode::Stage {
588            id: StageId("sig_xyz".into()),
589            pinning: Pinning::Signature,
590            config: None,
591        };
592        let first = resolve_pinning(&mut node, &store).unwrap();
593        let second = resolve_pinning(&mut node, &store).unwrap();
594        assert_eq!(first.rewrites.len(), 1);
595        // Second pass is a no-op — the id is already an impl_id that
596        // the store has, and the signature-lookup fallback finds the
597        // same stage.
598        assert!(second.rewrites.is_empty());
599    }
600
601    #[test]
602    fn multi_active_signature_emits_warning() {
603        // Two Active stages with the same signature_id — an invariant
604        // violation the store alone can't catch without enforcement
605        // (tracked as follow-up; today the `stage add` CLI prevents it).
606        // resolve_pinning's job is to surface it.
607        let mut store = MemoryStore::new();
608        store
609            .put(make_stage(
610                "impl_a",
611                Some("shared_sig"),
612                StageLifecycle::Active,
613            ))
614            .unwrap();
615        store
616            .put(make_stage(
617                "impl_b",
618                Some("shared_sig"),
619                StageLifecycle::Active,
620            ))
621            .unwrap();
622
623        let mut node = CompositionNode::Stage {
624            id: StageId("shared_sig".into()),
625            pinning: Pinning::Signature,
626            config: None,
627        };
628        let report = resolve_pinning(&mut node, &store).unwrap();
629        assert_eq!(report.warnings.len(), 1);
630        let w = &report.warnings[0];
631        assert_eq!(w.signature_id, "shared_sig");
632        assert_eq!(w.active_implementation_ids.len(), 2);
633        // Deterministic pick: lexicographically smallest impl id.
634        assert_eq!(w.chosen, "impl_a");
635    }
636}