Skip to main content

noether_engine/lagrange/
resolver.rs

1//! Pinning resolution pass.
2//!
3//! Rewrites a `CompositionNode` tree so every `Stage` node's `id` field
4//! holds a concrete [`noether_core::stage::StageId`]
5//! (implementation-level hash). After this pass runs, downstream
6//! passes — effect inference, `--allow-effects` enforcement, Ed25519
7//! verification, planner cost/parallel grouping, budget collection,
8//! grid-broker splitter — can look up stages via `store.get(id)`
9//! without regard for the original pinning.
10//!
11//! ## Why a separate pass?
12//!
13//! M2 introduced [`crate::lagrange::Pinning`]: a `Stage` node's `id`
14//! is either a `SignatureId` (resolve to the current Active impl) or
15//! an `ImplementationId` (bit-exact lookup). Teaching every downstream
16//! pass about pinning would have been a dozen file changes, each of
17//! them easy to get wrong.
18//!
19//! The pass approach is:
20//!
21//! 1. Call `resolve_pinning(&mut graph, &store)` once, after graph
22//!    construction and after any prefix/name resolution.
23//! 2. Every subsequent pass works on the mutated graph, where `id`
24//!    is guaranteed to be an `ImplementationId` that exists in the
25//!    store.
26//!
27//! This commits to "resolve once per execution". If the store changes
28//! between resolution and execution, the resolved graph keeps
29//! referring to the old implementation — a feature, not a bug: we
30//! want a single execution to see a consistent snapshot.
31//!
32//! ## What the pass does NOT do
33//!
34//! - It does not change the `pinning` field. A node that was declared
35//!   `Pinning::Signature` keeps that label even after its `id` has
36//!   been rewritten to an implementation hash. Consumers that
37//!   re-serialise the graph preserve the user's original intent (the
38//!   wire format's `pinning: "signature"` still means "signature" on
39//!   a future execution, not "both").
40//! - It does not walk `RemoteStage` — that's resolved at call-time
41//!   over HTTP, not via the local store.
42
43use crate::lagrange::ast::{CompositionNode, Pinning};
44use noether_core::stage::{SignatureId, StageId, StageLifecycle};
45use noether_store::StageStore;
46
47/// Error raised when a `Stage` node's reference cannot be resolved
48/// against the store.
49#[derive(Debug, Clone, PartialEq, thiserror::Error)]
50pub enum ResolutionError {
51    #[error(
52        "stage node with pinning=signature has id `{signature_id}` — \
53         no Active stage in the store matches that signature"
54    )]
55    SignatureNotFound { signature_id: String },
56
57    #[error(
58        "stage node with pinning=both has id `{implementation_id}` — \
59         no stage in the store has that implementation ID"
60    )]
61    ImplementationNotFound { implementation_id: String },
62
63    #[error(
64        "stage node with pinning=both has id `{implementation_id}` — \
65         the stage exists but its lifecycle is {lifecycle:?}; only \
66         Active stages may be referenced"
67    )]
68    ImplementationNotActive {
69        implementation_id: String,
70        lifecycle: StageLifecycle,
71    },
72}
73
74/// Walk a composition tree and rewrite every `Stage` node's `id`
75/// field to a concrete, in-store [`StageId`]. See the module doc for
76/// rationale and invariants.
77///
78/// Returns the list of rewrites and diagnostics performed on success,
79/// or the first [`ResolutionError`] on failure. The graph is left
80/// partially-rewritten on error; callers that want atomic behaviour
81/// should clone before calling.
82pub fn resolve_pinning<S>(
83    node: &mut CompositionNode,
84    store: &S,
85) -> Result<ResolutionReport, ResolutionError>
86where
87    S: StageStore + ?Sized,
88{
89    let mut report = ResolutionReport::default();
90    resolve_recursive(node, store, &mut report)?;
91    Ok(report)
92}
93
94/// Output of a successful [`resolve_pinning`] pass.
95#[derive(Debug, Clone, PartialEq, Default)]
96pub struct ResolutionReport {
97    /// One entry per Stage node whose `id` was changed by the pass.
98    pub rewrites: Vec<Rewrite>,
99    /// One entry per signature-pinned node where more than one Active
100    /// implementation matched — a "≤1 Active per signature" invariant
101    /// violation the CLI surfaces to the user. The pass still picks a
102    /// deterministic winner via [`noether_store::StageStore::get_by_signature`];
103    /// the warning exists so the user notices and fixes the store.
104    pub warnings: Vec<MultiActiveWarning>,
105}
106
107/// Record of one rewrite the pass performed. Useful for tracing:
108/// `noether run --trace-resolution` can print the before/after pairs.
109#[derive(Debug, Clone, PartialEq)]
110pub struct Rewrite {
111    pub before: String,
112    pub after: String,
113    pub pinning: Pinning,
114}
115
116/// Diagnostic raised when a signature-pinned ref matches more than
117/// one Active implementation. See [`ResolutionReport::warnings`].
118#[derive(Debug, Clone, PartialEq)]
119pub struct MultiActiveWarning {
120    pub signature_id: String,
121    pub active_implementation_ids: Vec<String>,
122    pub chosen: String,
123}
124
125fn resolve_recursive<S>(
126    node: &mut CompositionNode,
127    store: &S,
128    report: &mut ResolutionReport,
129) -> Result<(), ResolutionError>
130where
131    S: StageStore + ?Sized,
132{
133    match node {
134        CompositionNode::Stage { id, pinning, .. } => {
135            let before = id.0.clone();
136            // Diagnostic check for signature pinning: emit a warning if
137            // more than one Active impl matches.
138            if matches!(*pinning, Pinning::Signature) {
139                let sig = SignatureId(id.0.clone());
140                let matches = store.active_stages_with_signature(&sig);
141                if matches.len() > 1 {
142                    report.warnings.push(MultiActiveWarning {
143                        signature_id: id.0.clone(),
144                        active_implementation_ids: matches.iter().map(|s| s.id.0.clone()).collect(),
145                        chosen: matches[0].id.0.clone(),
146                    });
147                }
148            }
149            let resolved = resolve_single(id, *pinning, store)?;
150            if resolved.0 != before {
151                report.rewrites.push(Rewrite {
152                    before,
153                    after: resolved.0.clone(),
154                    pinning: *pinning,
155                });
156                *id = resolved;
157            }
158            Ok(())
159        }
160        // RemoteStage is resolved at call time over HTTP. Const has no
161        // stage ID.
162        CompositionNode::RemoteStage { .. } | CompositionNode::Const { .. } => Ok(()),
163        CompositionNode::Sequential { stages } => {
164            for s in stages {
165                resolve_recursive(s, store, report)?;
166            }
167            Ok(())
168        }
169        CompositionNode::Parallel { branches } => {
170            for b in branches.values_mut() {
171                resolve_recursive(b, store, report)?;
172            }
173            Ok(())
174        }
175        CompositionNode::Branch {
176            predicate,
177            if_true,
178            if_false,
179        } => {
180            resolve_recursive(predicate, store, report)?;
181            resolve_recursive(if_true, store, report)?;
182            resolve_recursive(if_false, store, report)?;
183            Ok(())
184        }
185        CompositionNode::Fanout { source, targets } => {
186            resolve_recursive(source, store, report)?;
187            for t in targets {
188                resolve_recursive(t, store, report)?;
189            }
190            Ok(())
191        }
192        CompositionNode::Merge { sources, target } => {
193            for s in sources {
194                resolve_recursive(s, store, report)?;
195            }
196            resolve_recursive(target, store, report)?;
197            Ok(())
198        }
199        CompositionNode::Retry { stage, .. } => resolve_recursive(stage, store, report),
200        CompositionNode::Let { bindings, body } => {
201            for b in bindings.values_mut() {
202                resolve_recursive(b, store, report)?;
203            }
204            resolve_recursive(body, store, report)
205        }
206    }
207}
208
209fn resolve_single<S>(id: &StageId, pinning: Pinning, store: &S) -> Result<StageId, ResolutionError>
210where
211    S: StageStore + ?Sized,
212{
213    match pinning {
214        Pinning::Signature => {
215            // First: treat id as a SignatureId and look up the Active
216            // implementation.
217            let sig = SignatureId(id.0.clone());
218            if let Some(stage) = store.get_by_signature(&sig) {
219                return Ok(stage.id.clone());
220            }
221            // Fallback: a name- or prefix-resolver pass may have
222            // already rewritten id into an implementation hash. Accept
223            // that lookup only if it points at an Active stage.
224            if let Ok(Some(stage)) = store.get(id) {
225                if matches!(stage.lifecycle, StageLifecycle::Active) {
226                    return Ok(stage.id.clone());
227                }
228            }
229            Err(ResolutionError::SignatureNotFound {
230                signature_id: id.0.clone(),
231            })
232        }
233        Pinning::Both => match store.get(id) {
234            Ok(Some(stage)) => match &stage.lifecycle {
235                StageLifecycle::Active => Ok(stage.id.clone()),
236                other => Err(ResolutionError::ImplementationNotActive {
237                    implementation_id: id.0.clone(),
238                    lifecycle: other.clone(),
239                }),
240            },
241            _ => Err(ResolutionError::ImplementationNotFound {
242                implementation_id: id.0.clone(),
243            }),
244        },
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251    use noether_core::effects::EffectSet;
252    use noether_core::stage::{CostEstimate, SignatureId, Stage, StageSignature};
253    use noether_core::types::NType;
254    use noether_store::MemoryStore;
255    use std::collections::{BTreeMap, BTreeSet};
256
257    fn make_stage(impl_id: &str, sig_id: Option<&str>, lifecycle: StageLifecycle) -> Stage {
258        Stage {
259            id: StageId(impl_id.into()),
260            signature_id: sig_id.map(|s| SignatureId(s.into())),
261            signature: StageSignature {
262                input: NType::Text,
263                output: NType::Number,
264                effects: EffectSet::pure(),
265                implementation_hash: format!("impl_{impl_id}"),
266            },
267            capabilities: BTreeSet::new(),
268            cost: CostEstimate {
269                time_ms_p50: None,
270                tokens_est: None,
271                memory_mb: None,
272            },
273            description: "test".into(),
274            examples: vec![],
275            lifecycle,
276            ed25519_signature: None,
277            signer_public_key: None,
278            implementation_code: None,
279            implementation_language: None,
280            ui_style: None,
281            tags: vec![],
282            aliases: vec![],
283            name: None,
284            properties: vec![],
285        }
286    }
287
288    fn store_with_impl(impl_id: &str, sig_id: &str) -> MemoryStore {
289        let mut store = MemoryStore::new();
290        store
291            .put(make_stage(impl_id, Some(sig_id), StageLifecycle::Active))
292            .unwrap();
293        store
294    }
295
296    #[test]
297    fn signature_pinning_rewrites_to_impl_id() {
298        let store = store_with_impl("impl_abc", "sig_xyz");
299
300        let mut node = CompositionNode::Stage {
301            id: StageId("sig_xyz".into()),
302            pinning: Pinning::Signature,
303            config: None,
304        };
305        let report = resolve_pinning(&mut node, &store).unwrap();
306
307        match &node {
308            CompositionNode::Stage { id, pinning, .. } => {
309                assert_eq!(id.0, "impl_abc", "id should be rewritten to impl hash");
310                // Pinning label is preserved — re-serialisation keeps user intent.
311                assert_eq!(*pinning, Pinning::Signature);
312            }
313            _ => panic!("expected Stage"),
314        }
315        assert_eq!(report.rewrites.len(), 1);
316        assert_eq!(report.rewrites[0].before, "sig_xyz");
317        assert_eq!(report.rewrites[0].after, "impl_abc");
318    }
319
320    #[test]
321    fn both_pinning_accepts_matching_impl_id() {
322        let store = store_with_impl("impl_abc", "sig_xyz");
323
324        let mut node = CompositionNode::Stage {
325            id: StageId("impl_abc".into()),
326            pinning: Pinning::Both,
327            config: None,
328        };
329        let report = resolve_pinning(&mut node, &store).unwrap();
330
331        // No rewrite — it already held a valid impl_id.
332        assert!(report.rewrites.is_empty());
333    }
334
335    #[test]
336    fn both_pinning_rejects_missing_impl() {
337        let store = store_with_impl("impl_abc", "sig_xyz");
338
339        let mut node = CompositionNode::Stage {
340            id: StageId("impl_does_not_exist".into()),
341            pinning: Pinning::Both,
342            config: None,
343        };
344        let err = resolve_pinning(&mut node, &store).unwrap_err();
345        assert!(matches!(
346            err,
347            ResolutionError::ImplementationNotFound { .. }
348        ));
349    }
350
351    #[test]
352    fn both_pinning_rejects_deprecated_impl() {
353        let mut store = MemoryStore::new();
354        store
355            .put(make_stage(
356                "impl_old",
357                Some("sig_xyz"),
358                StageLifecycle::Active,
359            ))
360            .unwrap();
361        // Putting a second Active stage with the same signature_id
362        // auto-deprecates impl_old — see M2.3 invariant enforcement in
363        // MemoryStore::put.
364        store
365            .put(make_stage(
366                "impl_new",
367                Some("sig_xyz"),
368                StageLifecycle::Active,
369            ))
370            .unwrap();
371        assert!(matches!(
372            store
373                .get(&StageId("impl_old".into()))
374                .unwrap()
375                .unwrap()
376                .lifecycle,
377            StageLifecycle::Deprecated { .. }
378        ));
379
380        let mut node = CompositionNode::Stage {
381            id: StageId("impl_old".into()),
382            pinning: Pinning::Both,
383            config: None,
384        };
385        let err = resolve_pinning(&mut node, &store).unwrap_err();
386        assert!(matches!(
387            err,
388            ResolutionError::ImplementationNotActive { .. }
389        ));
390    }
391
392    #[test]
393    fn signature_pinning_rejects_missing_signature() {
394        let store = store_with_impl("impl_abc", "sig_xyz");
395
396        let mut node = CompositionNode::Stage {
397            id: StageId("sig_does_not_exist".into()),
398            pinning: Pinning::Signature,
399            config: None,
400        };
401        let err = resolve_pinning(&mut node, &store).unwrap_err();
402        assert!(matches!(err, ResolutionError::SignatureNotFound { .. }));
403    }
404
405    #[test]
406    fn signature_pinning_falls_back_to_impl_id_for_legacy_flows() {
407        // A prefix-resolver pass may have rewritten the id into an
408        // impl_id already. resolve_pinning accepts that, provided the
409        // stage is Active.
410        let store = store_with_impl("impl_abc", "sig_xyz");
411
412        let mut node = CompositionNode::Stage {
413            id: StageId("impl_abc".into()),
414            pinning: Pinning::Signature,
415            config: None,
416        };
417        let report = resolve_pinning(&mut node, &store).unwrap();
418        // No rewrite needed — the id already pointed at the right stage.
419        assert!(report.rewrites.is_empty());
420    }
421
422    #[test]
423    fn walks_into_nested_sequential() {
424        let store = store_with_impl("impl_abc", "sig_xyz");
425
426        let mut node = CompositionNode::Sequential {
427            stages: vec![
428                CompositionNode::Stage {
429                    id: StageId("sig_xyz".into()),
430                    pinning: Pinning::Signature,
431                    config: None,
432                },
433                CompositionNode::Stage {
434                    id: StageId("sig_xyz".into()),
435                    pinning: Pinning::Signature,
436                    config: None,
437                },
438            ],
439        };
440        let report = resolve_pinning(&mut node, &store).unwrap();
441        assert_eq!(report.rewrites.len(), 2);
442    }
443
444    #[test]
445    fn walks_into_parallel_branches() {
446        let store = store_with_impl("impl_abc", "sig_xyz");
447
448        let mut branches = BTreeMap::new();
449        branches.insert(
450            "a".into(),
451            CompositionNode::Stage {
452                id: StageId("sig_xyz".into()),
453                pinning: Pinning::Signature,
454                config: None,
455            },
456        );
457        branches.insert(
458            "b".into(),
459            CompositionNode::Stage {
460                id: StageId("sig_xyz".into()),
461                pinning: Pinning::Signature,
462                config: None,
463            },
464        );
465        let mut node = CompositionNode::Parallel { branches };
466        let report = resolve_pinning(&mut node, &store).unwrap();
467        assert_eq!(report.rewrites.len(), 2);
468    }
469
470    #[test]
471    fn walks_into_branch_predicate_and_arms() {
472        let store = store_with_impl("impl_abc", "sig_xyz");
473        let sig = || CompositionNode::Stage {
474            id: StageId("sig_xyz".into()),
475            pinning: Pinning::Signature,
476            config: None,
477        };
478        let mut node = CompositionNode::Branch {
479            predicate: Box::new(sig()),
480            if_true: Box::new(sig()),
481            if_false: Box::new(sig()),
482        };
483        let report = resolve_pinning(&mut node, &store).unwrap();
484        assert_eq!(report.rewrites.len(), 3);
485    }
486
487    #[test]
488    fn walks_into_fanout_source_and_targets() {
489        let store = store_with_impl("impl_abc", "sig_xyz");
490        let sig = || CompositionNode::Stage {
491            id: StageId("sig_xyz".into()),
492            pinning: Pinning::Signature,
493            config: None,
494        };
495        let mut node = CompositionNode::Fanout {
496            source: Box::new(sig()),
497            targets: vec![sig(), sig(), sig()],
498        };
499        let report = resolve_pinning(&mut node, &store).unwrap();
500        assert_eq!(report.rewrites.len(), 4);
501    }
502
503    #[test]
504    fn walks_into_merge_sources_and_target() {
505        let store = store_with_impl("impl_abc", "sig_xyz");
506        let sig = || CompositionNode::Stage {
507            id: StageId("sig_xyz".into()),
508            pinning: Pinning::Signature,
509            config: None,
510        };
511        let mut node = CompositionNode::Merge {
512            sources: vec![sig(), sig()],
513            target: Box::new(sig()),
514        };
515        let report = resolve_pinning(&mut node, &store).unwrap();
516        assert_eq!(report.rewrites.len(), 3);
517    }
518
519    #[test]
520    fn walks_into_let_bindings_and_body() {
521        let store = store_with_impl("impl_abc", "sig_xyz");
522        let sig = || CompositionNode::Stage {
523            id: StageId("sig_xyz".into()),
524            pinning: Pinning::Signature,
525            config: None,
526        };
527        let mut bindings = BTreeMap::new();
528        bindings.insert("a".into(), sig());
529        bindings.insert("b".into(), sig());
530        let mut node = CompositionNode::Let {
531            bindings,
532            body: Box::new(sig()),
533        };
534        let report = resolve_pinning(&mut node, &store).unwrap();
535        assert_eq!(report.rewrites.len(), 3);
536    }
537
538    #[test]
539    fn walks_into_retry_inner_stage() {
540        let store = store_with_impl("impl_abc", "sig_xyz");
541        let mut node = CompositionNode::Retry {
542            stage: Box::new(CompositionNode::Stage {
543                id: StageId("sig_xyz".into()),
544                pinning: Pinning::Signature,
545                config: None,
546            }),
547            max_attempts: 3,
548            delay_ms: None,
549        };
550        let report = resolve_pinning(&mut node, &store).unwrap();
551        assert_eq!(report.rewrites.len(), 1);
552    }
553
554    #[test]
555    fn stops_at_first_error_leaves_partial_rewrites() {
556        // First Stage resolves; second does not. Graph is
557        // partially-mutated when the pass returns Err.
558        let store = store_with_impl("impl_abc", "sig_xyz");
559
560        let mut node = CompositionNode::Sequential {
561            stages: vec![
562                CompositionNode::Stage {
563                    id: StageId("sig_xyz".into()),
564                    pinning: Pinning::Signature,
565                    config: None,
566                },
567                CompositionNode::Stage {
568                    id: StageId("sig_missing".into()),
569                    pinning: Pinning::Signature,
570                    config: None,
571                },
572            ],
573        };
574        let err = resolve_pinning(&mut node, &store).unwrap_err();
575        assert!(matches!(err, ResolutionError::SignatureNotFound { .. }));
576        // Verify the first stage was rewritten before the error.
577        match &node {
578            CompositionNode::Sequential { stages } => match &stages[0] {
579                CompositionNode::Stage { id, .. } => assert_eq!(id.0, "impl_abc"),
580                _ => panic!(),
581            },
582            _ => panic!(),
583        }
584    }
585
586    #[test]
587    fn idempotent_on_already_resolved_graph() {
588        let store = store_with_impl("impl_abc", "sig_xyz");
589
590        let mut node = CompositionNode::Stage {
591            id: StageId("sig_xyz".into()),
592            pinning: Pinning::Signature,
593            config: None,
594        };
595        let first = resolve_pinning(&mut node, &store).unwrap();
596        let second = resolve_pinning(&mut node, &store).unwrap();
597        assert_eq!(first.rewrites.len(), 1);
598        // Second pass is a no-op — the id is already an impl_id that
599        // the store has, and the signature-lookup fallback finds the
600        // same stage.
601        assert!(second.rewrites.is_empty());
602    }
603
604    #[test]
605    fn multi_active_signature_emits_warning() {
606        // The store-level "≤1 Active per signature" invariant (M2.3)
607        // auto-deprecates duplicate Actives at `put` time, so the
608        // resolver's warning path isn't reachable through normal
609        // `put` sequences anymore. To exercise it, bypass `put` and
610        // mutate the internal HashMap directly (tests-only).
611        let mut store = MemoryStore::new();
612        store
613            .put(make_stage(
614                "impl_a",
615                Some("shared_sig"),
616                StageLifecycle::Active,
617            ))
618            .unwrap();
619        // Inject a second Active duplicate without going through put/upsert,
620        // emulating a violated invariant (e.g., a corrupted file store).
621        let extra = make_stage("impl_b", Some("shared_sig"), StageLifecycle::Active);
622        store.inject_raw_for_testing(extra);
623
624        let mut node = CompositionNode::Stage {
625            id: StageId("shared_sig".into()),
626            pinning: Pinning::Signature,
627            config: None,
628        };
629        let report = resolve_pinning(&mut node, &store).unwrap();
630        assert_eq!(report.warnings.len(), 1);
631        let w = &report.warnings[0];
632        assert_eq!(w.signature_id, "shared_sig");
633        assert_eq!(w.active_implementation_ids.len(), 2);
634        // Deterministic pick: lexicographically smallest impl id.
635        assert_eq!(w.chosen, "impl_a");
636    }
637}