Skip to main content

noether_engine/lagrange/
ast.rs

1use noether_core::stage::StageId;
2use noether_core::types::NType;
3use serde::{Deserialize, Serialize};
4use std::collections::BTreeMap;
5
6/// How a `Stage` reference resolves to a concrete stage in the store.
7///
8/// Per M2 (v0.6.0), every graph node that names a stage declares its
9/// pinning. Default is [`Pinning::Signature`], which picks up
10/// implementation bugfixes automatically. [`Pinning::Both`] is the
11/// bit-reproducible option — the resolver refuses to substitute a
12/// different implementation even if the stored one has been deprecated.
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
14#[serde(rename_all = "lowercase")]
15pub enum Pinning {
16    /// Interpret the node's `id` as a [`noether_core::stage::SignatureId`]
17    /// and resolve to whichever stage is currently Active with that
18    /// signature. Default — matches the v0.6.0 recommendation in
19    /// `STABILITY.md`.
20    #[default]
21    Signature,
22    /// Interpret the node's `id` as an implementation-inclusive
23    /// [`StageId`] and require an exact match. The resolver refuses to
24    /// fall back to any other implementation of the same signature.
25    Both,
26}
27
28impl Pinning {
29    /// Helper for `#[serde(skip_serializing_if = ...)]` — omit the field
30    /// from JSON when the value is the default (`Signature`).
31    pub fn is_signature(&self) -> bool {
32        matches!(self, Pinning::Signature)
33    }
34}
35
36/// A composition graph node. The core AST for Noether's composition language.
37#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
38#[serde(tag = "op")]
39pub enum CompositionNode {
40    /// Leaf node: reference to a stage in the store.
41    ///
42    /// The `id` field is interpreted according to `pinning`:
43    /// - [`Pinning::Signature`] (default): `id` is a signature-level
44    ///   hash (`SignatureId`) and the resolver returns the currently
45    ///   Active implementation with that signature.
46    /// - [`Pinning::Both`]: `id` is an implementation-inclusive hash
47    ///   (`ImplementationId` / `StageId`) and the resolver requires an
48    ///   exact match. No fallback.
49    ///
50    /// The optional `config` provides static parameter values merged
51    /// with the pipeline input before the stage executes.
52    ///
53    /// Use [`CompositionNode::stage`] to construct a node with default
54    /// pinning; use the struct literal only when you need a non-default
55    /// pinning or a config.
56    ///
57    /// # Known gap in M2 (v0.6.0)
58    ///
59    /// [`resolve_stage_ref`] is only wired into the type checker and
60    /// executor runner. Other passes (effect inference, Ed25519
61    /// verify, planner cost/parallel grouping, budget, grid-broker
62    /// splitter) still look up by [`StageId`] directly, which means
63    /// a `Pinning::Signature` node may type-check but fail at those
64    /// downstream passes. The resolver-normalisation pass lands as a
65    /// follow-up: it rewrites graph nodes to implementation IDs before
66    /// any downstream pass runs, so the rest of the engine keeps
67    /// operating on concrete implementation hashes.
68    Stage {
69        id: StageId,
70        #[serde(default, skip_serializing_if = "Pinning::is_signature")]
71        pinning: Pinning,
72        #[serde(default, skip_serializing_if = "Option::is_none")]
73        config: Option<BTreeMap<String, serde_json::Value>>,
74    },
75
76    /// Call a remote Noether API endpoint over HTTP.
77    ///
78    /// The declared `input` and `output` types are verified by the type checker
79    /// at build time — the remote server does not need to be running during
80    /// `noether build`. In native builds, execution uses reqwest. In browser
81    /// builds, the JS runtime makes a `fetch()` call.
82    RemoteStage {
83        /// URL of the remote Noether API (e.g. "http://localhost:8080")
84        url: String,
85        /// Declared input type — what this node accepts from the pipeline
86        input: NType,
87        /// Declared output type — what this node returns to the pipeline
88        output: NType,
89    },
90
91    /// Emits a constant JSON value, ignoring its input entirely.
92    /// Used to inject literal strings, numbers, or objects into a pipeline.
93    Const { value: serde_json::Value },
94
95    /// A >> B >> C: output of each stage feeds the next.
96    Sequential { stages: Vec<CompositionNode> },
97
98    /// Execute branches concurrently, merge outputs into a Record keyed by
99    /// branch name. Each branch receives `input[branch_name]` if the input is
100    /// a Record containing that key; otherwise it receives the full input.
101    /// `Const` branches ignore their input entirely — use them for literals.
102    Parallel {
103        branches: BTreeMap<String, CompositionNode>,
104    },
105
106    /// Conditional routing based on a predicate stage.
107    Branch {
108        predicate: Box<CompositionNode>,
109        if_true: Box<CompositionNode>,
110        if_false: Box<CompositionNode>,
111    },
112
113    /// Source output sent to all targets concurrently.
114    Fanout {
115        source: Box<CompositionNode>,
116        targets: Vec<CompositionNode>,
117    },
118
119    /// Multiple sources merge into a single target.
120    Merge {
121        sources: Vec<CompositionNode>,
122        target: Box<CompositionNode>,
123    },
124
125    /// Retry a stage up to max_attempts times on failure.
126    Retry {
127        stage: Box<CompositionNode>,
128        max_attempts: u32,
129        delay_ms: Option<u64>,
130    },
131
132    /// Bind named intermediate computations and reference them in `body`.
133    ///
134    /// Each binding sub-node receives the **outer Let input** (the same value
135    /// passed to the Let node). After all bindings have produced a value, the
136    /// `body` runs against an augmented input record:
137    ///
138    ///   `{ ...outer-input fields, <binding-name>: <binding-output>, ... }`
139    ///
140    /// Bindings with the same name as an outer-input field shadow it. This
141    /// makes it possible to carry original-input fields into stages later in a
142    /// Sequential pipeline — the canonical example is scan → hash → diff,
143    /// where `diff` needs `state_path` from the original input but `hash`
144    /// would otherwise erase it.
145    ///
146    /// All bindings are scheduled concurrently — there are no inter-binding
147    /// references. If you need a binding to depend on another, wrap it in a
148    /// nested `Sequential`.
149    Let {
150        bindings: BTreeMap<String, CompositionNode>,
151        body: Box<CompositionNode>,
152    },
153}
154
155impl CompositionNode {
156    /// Build a `Stage` node with default pinning (`Signature`) and no
157    /// config. Use this in place of the struct literal when you don't
158    /// need to set pinning or config explicitly.
159    pub fn stage(id: impl Into<String>) -> Self {
160        Self::Stage {
161            id: StageId(id.into()),
162            pinning: Pinning::Signature,
163            config: None,
164        }
165    }
166
167    /// Build a `Stage` node with an explicit `Both` pinning — the
168    /// resolver will require the exact implementation named by `id`.
169    pub fn stage_pinned(id: impl Into<String>) -> Self {
170        Self::Stage {
171            id: StageId(id.into()),
172            pinning: Pinning::Both,
173            config: None,
174        }
175    }
176}
177
178/// Resolve a `CompositionNode::Stage` reference to a concrete stage
179/// in the store, respecting the node's pinning.
180///
181/// - [`Pinning::Signature`]: `id` is interpreted as a
182///   [`noether_core::stage::SignatureId`]; the resolver returns the
183///   store's Active implementation for that signature.
184///   - If no Active match is found, the resolver **falls back** to
185///     `store.get(id)` *and* requires the looked-up stage to be
186///     Active. This fallback catches the case where a name- or
187///     prefix-resolver pass has already rewritten `id` into an
188///     implementation hash. It deliberately does NOT run Deprecated
189///     or Tombstone implementations — per STABILITY.md, deprecated
190///     stages should emit a warning when invoked, which a silent
191///     resolver cannot do. The right place for "run deprecated with
192///     a warning" is the CLI layer, not this helper.
193/// - [`Pinning::Both`]: `id` is an implementation-inclusive
194///   [`StageId`]; the resolver requires an exact `store.get` match.
195///   No fallback to signature-level resolution.
196///
197/// # Known gap (tracked as M2 follow-up)
198///
199/// In M2 only the type checker and runtime executor use this helper.
200/// The effect-inference walk, `--allow-effects` enforcement, Ed25519
201/// verification pass, cost summary, planner parallel-grouping, budget
202/// collection, and grid-broker splitter all still call `store.get(id)`
203/// directly — which means a `Pinning::Signature` node can't resolve
204/// through those paths yet. The follow-up "resolver normalisation
205/// pass" rewrites graph nodes to their resolved implementation IDs
206/// before anything else runs, so the rest of the engine keeps
207/// operating on concrete implementation hashes.
208pub fn resolve_stage_ref<'a, S>(
209    id: &StageId,
210    pinning: Pinning,
211    store: &'a S,
212) -> Option<&'a noether_core::stage::Stage>
213where
214    S: noether_store::StageStore + ?Sized,
215{
216    use noether_core::stage::{SignatureId, StageLifecycle};
217    match pinning {
218        Pinning::Signature => {
219            let sig = SignatureId(id.0.clone());
220            if let Some(stage) = store.get_by_signature(&sig) {
221                return Some(stage);
222            }
223            // Fallback for mixed legacy flows. Must be Active; a
224            // silent resolver must not execute a deprecated or
225            // tombstoned stage on behalf of the user.
226            match store.get(id).ok().flatten() {
227                Some(s) if matches!(s.lifecycle, StageLifecycle::Active) => Some(s),
228                _ => None,
229            }
230        }
231        Pinning::Both => store.get(id).ok().flatten(),
232    }
233}
234
235/// A complete composition graph with metadata.
236#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
237pub struct CompositionGraph {
238    pub description: String,
239    pub root: CompositionNode,
240    pub version: String,
241}
242
243impl CompositionGraph {
244    pub fn new(description: impl Into<String>, root: CompositionNode) -> Self {
245        Self {
246            description: description.into(),
247            root,
248            version: "0.1.0".into(),
249        }
250    }
251}
252
253/// Collect all StageIds referenced in a composition node.
254pub fn collect_stage_ids(node: &CompositionNode) -> Vec<&StageId> {
255    let mut ids = Vec::new();
256    collect_ids_recursive(node, &mut ids);
257    ids
258}
259
260fn collect_ids_recursive<'a>(node: &'a CompositionNode, ids: &mut Vec<&'a StageId>) {
261    match node {
262        CompositionNode::Stage { id, .. } => ids.push(id),
263        CompositionNode::RemoteStage { .. } => {} // no local stage ID; URL is resolved at runtime
264        CompositionNode::Const { .. } => {}       // no stage IDs in a constant
265        CompositionNode::Sequential { stages } => {
266            for s in stages {
267                collect_ids_recursive(s, ids);
268            }
269        }
270        CompositionNode::Parallel { branches } => {
271            for b in branches.values() {
272                collect_ids_recursive(b, ids);
273            }
274        }
275        CompositionNode::Branch {
276            predicate,
277            if_true,
278            if_false,
279        } => {
280            collect_ids_recursive(predicate, ids);
281            collect_ids_recursive(if_true, ids);
282            collect_ids_recursive(if_false, ids);
283        }
284        CompositionNode::Fanout { source, targets } => {
285            collect_ids_recursive(source, ids);
286            for t in targets {
287                collect_ids_recursive(t, ids);
288            }
289        }
290        CompositionNode::Merge { sources, target } => {
291            for s in sources {
292                collect_ids_recursive(s, ids);
293            }
294            collect_ids_recursive(target, ids);
295        }
296        CompositionNode::Retry { stage, .. } => {
297            collect_ids_recursive(stage, ids);
298        }
299        CompositionNode::Let { bindings, body } => {
300            for b in bindings.values() {
301                collect_ids_recursive(b, ids);
302            }
303            collect_ids_recursive(body, ids);
304        }
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311    use serde_json::json;
312
313    fn stage(id: &str) -> CompositionNode {
314        CompositionNode::Stage {
315            id: StageId(id.into()),
316            pinning: Pinning::Signature,
317            config: None,
318        }
319    }
320
321    #[test]
322    fn serde_stage_round_trip() {
323        let node = stage("abc123");
324        let json = serde_json::to_string(&node).unwrap();
325        let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
326        assert_eq!(node, parsed);
327    }
328
329    #[test]
330    fn serde_sequential() {
331        let node = CompositionNode::Sequential {
332            stages: vec![stage("a"), stage("b"), stage("c")],
333        };
334        let json = serde_json::to_string_pretty(&node).unwrap();
335        let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
336        assert_eq!(node, parsed);
337    }
338
339    #[test]
340    fn serde_parallel() {
341        let mut branches = BTreeMap::new();
342        branches.insert("left".into(), stage("a"));
343        branches.insert("right".into(), stage("b"));
344        let node = CompositionNode::Parallel { branches };
345        let json = serde_json::to_string(&node).unwrap();
346        let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
347        assert_eq!(node, parsed);
348    }
349
350    #[test]
351    fn serde_branch() {
352        let node = CompositionNode::Branch {
353            predicate: Box::new(stage("pred")),
354            if_true: Box::new(stage("yes")),
355            if_false: Box::new(stage("no")),
356        };
357        let json = serde_json::to_string(&node).unwrap();
358        let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
359        assert_eq!(node, parsed);
360    }
361
362    #[test]
363    fn serde_retry() {
364        let node = CompositionNode::Retry {
365            stage: Box::new(stage("fallible")),
366            max_attempts: 3,
367            delay_ms: Some(500),
368        };
369        let json = serde_json::to_string(&node).unwrap();
370        let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
371        assert_eq!(node, parsed);
372    }
373
374    #[test]
375    fn serde_full_graph() {
376        let graph = CompositionGraph::new(
377            "test pipeline",
378            CompositionNode::Sequential {
379                stages: vec![stage("parse"), stage("transform"), stage("output")],
380            },
381        );
382        let json = serde_json::to_string_pretty(&graph).unwrap();
383        let parsed: CompositionGraph = serde_json::from_str(&json).unwrap();
384        assert_eq!(graph, parsed);
385    }
386
387    #[test]
388    fn serde_nested_composition() {
389        let node = CompositionNode::Sequential {
390            stages: vec![
391                stage("input"),
392                CompositionNode::Retry {
393                    stage: Box::new(CompositionNode::Sequential {
394                        stages: vec![stage("a"), stage("b")],
395                    }),
396                    max_attempts: 2,
397                    delay_ms: None,
398                },
399                stage("output"),
400            ],
401        };
402        let json = serde_json::to_string(&node).unwrap();
403        let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
404        assert_eq!(node, parsed);
405    }
406
407    #[test]
408    fn collect_stage_ids_finds_all() {
409        let node = CompositionNode::Sequential {
410            stages: vec![
411                stage("a"),
412                CompositionNode::Parallel {
413                    branches: BTreeMap::from([("x".into(), stage("b")), ("y".into(), stage("c"))]),
414                },
415                stage("d"),
416            ],
417        };
418        let ids = collect_stage_ids(&node);
419        assert_eq!(ids.len(), 4);
420    }
421
422    #[test]
423    fn json_format_is_tagged() {
424        let node = stage("abc123");
425        let v: serde_json::Value = serde_json::to_value(&node).unwrap();
426        assert_eq!(v["op"], json!("Stage"));
427        assert_eq!(v["id"], json!("abc123"));
428    }
429
430    #[test]
431    fn default_pinning_omitted_from_json() {
432        // A Stage node with the default Signature pinning should not
433        // emit `"pinning"` in the JSON — keeps the wire format small
434        // and backwards-compatible with readers that only expect `id`.
435        let node = stage("abc123");
436        let v: serde_json::Value = serde_json::to_value(&node).unwrap();
437        assert!(
438            v.get("pinning").is_none(),
439            "default Signature pinning should be omitted from JSON, got: {v}"
440        );
441    }
442
443    #[test]
444    fn both_pinning_serialises_explicitly() {
445        let node = CompositionNode::stage_pinned("impl_abc");
446        let v: serde_json::Value = serde_json::to_value(&node).unwrap();
447        assert_eq!(v["pinning"], json!("both"));
448    }
449
450    #[test]
451    fn legacy_graph_without_pinning_deserialises() {
452        // v0.5.x graphs had only `{"op": "Stage", "id": "..."}`. The
453        // new `pinning` field defaults to Signature when the legacy
454        // JSON is loaded.
455        let legacy = json!({
456            "op": "Stage",
457            "id": "legacy_hash",
458        });
459        let parsed: CompositionNode = serde_json::from_value(legacy).unwrap();
460        match parsed {
461            CompositionNode::Stage { id, pinning, .. } => {
462                assert_eq!(id.0, "legacy_hash");
463                assert_eq!(pinning, Pinning::Signature);
464            }
465            _ => panic!("expected Stage variant"),
466        }
467    }
468
469    #[test]
470    fn explicit_both_pinning_deserialises() {
471        let pinned = json!({
472            "op": "Stage",
473            "id": "impl_xyz",
474            "pinning": "both",
475        });
476        let parsed: CompositionNode = serde_json::from_value(pinned).unwrap();
477        match parsed {
478            CompositionNode::Stage { pinning, .. } => {
479                assert_eq!(pinning, Pinning::Both);
480            }
481            _ => panic!("expected Stage variant"),
482        }
483    }
484
485    #[test]
486    fn serde_remote_stage_round_trip() {
487        let node = CompositionNode::RemoteStage {
488            url: "http://localhost:8080".into(),
489            input: NType::record([("count", NType::Number)]),
490            output: NType::VNode,
491        };
492        let json = serde_json::to_string(&node).unwrap();
493        let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
494        assert_eq!(node, parsed);
495    }
496
497    #[test]
498    fn remote_stage_json_shape() {
499        let node = CompositionNode::RemoteStage {
500            url: "http://api.example.com".into(),
501            input: NType::Text,
502            output: NType::Number,
503        };
504        let v: serde_json::Value = serde_json::to_value(&node).unwrap();
505        assert_eq!(v["op"], json!("RemoteStage"));
506        assert_eq!(v["url"], json!("http://api.example.com"));
507        assert!(v["input"].is_object());
508        assert!(v["output"].is_object());
509    }
510
511    #[test]
512    fn collect_stage_ids_skips_remote_stage() {
513        let node = CompositionNode::Sequential {
514            stages: vec![
515                stage("local-a"),
516                CompositionNode::RemoteStage {
517                    url: "http://remote".into(),
518                    input: NType::Text,
519                    output: NType::Text,
520                },
521                stage("local-b"),
522            ],
523        };
524        let ids = collect_stage_ids(&node);
525        // Only local stages contribute IDs
526        assert_eq!(ids.len(), 2);
527        assert_eq!(ids[0].0, "local-a");
528        assert_eq!(ids[1].0, "local-b");
529    }
530
531    #[test]
532    fn remote_stage_in_full_graph_serde() {
533        let graph = CompositionGraph::new(
534            "full-stack pipeline",
535            CompositionNode::Sequential {
536                stages: vec![
537                    CompositionNode::RemoteStage {
538                        url: "http://api:8080".into(),
539                        input: NType::record([("query", NType::Text)]),
540                        output: NType::List(Box::new(NType::Text)),
541                    },
542                    stage("render"),
543                ],
544            },
545        );
546        let json = serde_json::to_string_pretty(&graph).unwrap();
547        let parsed: CompositionGraph = serde_json::from_str(&json).unwrap();
548        assert_eq!(graph, parsed);
549    }
550}