Skip to main content

noether_engine/lagrange/
ast.rs

1use noether_core::stage::StageId;
2use noether_core::types::NType;
3use serde::{Deserialize, Serialize};
4use std::collections::BTreeMap;
5
6/// A composition graph node. The core AST for Noether's composition language.
7#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
8#[serde(tag = "op")]
9pub enum CompositionNode {
10    /// Leaf node: reference to a stage by its content hash.
11    ///
12    /// The optional `config` provides static parameter values merged with
13    /// the pipeline input before the stage executes. This separates data
14    /// flow (from the pipeline) from configuration (from the graph).
15    Stage {
16        id: StageId,
17        #[serde(default, skip_serializing_if = "Option::is_none")]
18        config: Option<BTreeMap<String, serde_json::Value>>,
19    },
20
21    /// Call a remote Noether API endpoint over HTTP.
22    ///
23    /// The declared `input` and `output` types are verified by the type checker
24    /// at build time — the remote server does not need to be running during
25    /// `noether build`. In native builds, execution uses reqwest. In browser
26    /// builds, the JS runtime makes a `fetch()` call.
27    RemoteStage {
28        /// URL of the remote Noether API (e.g. "http://localhost:8080")
29        url: String,
30        /// Declared input type — what this node accepts from the pipeline
31        input: NType,
32        /// Declared output type — what this node returns to the pipeline
33        output: NType,
34    },
35
36    /// Emits a constant JSON value, ignoring its input entirely.
37    /// Used to inject literal strings, numbers, or objects into a pipeline.
38    Const { value: serde_json::Value },
39
40    /// A >> B >> C: output of each stage feeds the next.
41    Sequential { stages: Vec<CompositionNode> },
42
43    /// Execute branches concurrently, merge outputs into a Record keyed by
44    /// branch name. Each branch receives `input[branch_name]` if the input is
45    /// a Record containing that key; otherwise it receives the full input.
46    /// `Const` branches ignore their input entirely — use them for literals.
47    Parallel {
48        branches: BTreeMap<String, CompositionNode>,
49    },
50
51    /// Conditional routing based on a predicate stage.
52    Branch {
53        predicate: Box<CompositionNode>,
54        if_true: Box<CompositionNode>,
55        if_false: Box<CompositionNode>,
56    },
57
58    /// Source output sent to all targets concurrently.
59    Fanout {
60        source: Box<CompositionNode>,
61        targets: Vec<CompositionNode>,
62    },
63
64    /// Multiple sources merge into a single target.
65    Merge {
66        sources: Vec<CompositionNode>,
67        target: Box<CompositionNode>,
68    },
69
70    /// Retry a stage up to max_attempts times on failure.
71    Retry {
72        stage: Box<CompositionNode>,
73        max_attempts: u32,
74        delay_ms: Option<u64>,
75    },
76
77    /// Bind named intermediate computations and reference them in `body`.
78    ///
79    /// Each binding sub-node receives the **outer Let input** (the same value
80    /// passed to the Let node). After all bindings have produced a value, the
81    /// `body` runs against an augmented input record:
82    ///
83    ///   `{ ...outer-input fields, <binding-name>: <binding-output>, ... }`
84    ///
85    /// Bindings with the same name as an outer-input field shadow it. This
86    /// makes it possible to carry original-input fields into stages later in a
87    /// Sequential pipeline — the canonical example is scan → hash → diff,
88    /// where `diff` needs `state_path` from the original input but `hash`
89    /// would otherwise erase it.
90    ///
91    /// All bindings are scheduled concurrently — there are no inter-binding
92    /// references. If you need a binding to depend on another, wrap it in a
93    /// nested `Sequential`.
94    Let {
95        bindings: BTreeMap<String, CompositionNode>,
96        body: Box<CompositionNode>,
97    },
98}
99
100/// A complete composition graph with metadata.
101#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
102pub struct CompositionGraph {
103    pub description: String,
104    pub root: CompositionNode,
105    pub version: String,
106}
107
108impl CompositionGraph {
109    pub fn new(description: impl Into<String>, root: CompositionNode) -> Self {
110        Self {
111            description: description.into(),
112            root,
113            version: "0.1.0".into(),
114        }
115    }
116}
117
118/// Collect all StageIds referenced in a composition node.
119pub fn collect_stage_ids(node: &CompositionNode) -> Vec<&StageId> {
120    let mut ids = Vec::new();
121    collect_ids_recursive(node, &mut ids);
122    ids
123}
124
125fn collect_ids_recursive<'a>(node: &'a CompositionNode, ids: &mut Vec<&'a StageId>) {
126    match node {
127        CompositionNode::Stage { id, .. } => ids.push(id),
128        CompositionNode::RemoteStage { .. } => {} // no local stage ID; URL is resolved at runtime
129        CompositionNode::Const { .. } => {}       // no stage IDs in a constant
130        CompositionNode::Sequential { stages } => {
131            for s in stages {
132                collect_ids_recursive(s, ids);
133            }
134        }
135        CompositionNode::Parallel { branches } => {
136            for b in branches.values() {
137                collect_ids_recursive(b, ids);
138            }
139        }
140        CompositionNode::Branch {
141            predicate,
142            if_true,
143            if_false,
144        } => {
145            collect_ids_recursive(predicate, ids);
146            collect_ids_recursive(if_true, ids);
147            collect_ids_recursive(if_false, ids);
148        }
149        CompositionNode::Fanout { source, targets } => {
150            collect_ids_recursive(source, ids);
151            for t in targets {
152                collect_ids_recursive(t, ids);
153            }
154        }
155        CompositionNode::Merge { sources, target } => {
156            for s in sources {
157                collect_ids_recursive(s, ids);
158            }
159            collect_ids_recursive(target, ids);
160        }
161        CompositionNode::Retry { stage, .. } => {
162            collect_ids_recursive(stage, ids);
163        }
164        CompositionNode::Let { bindings, body } => {
165            for b in bindings.values() {
166                collect_ids_recursive(b, ids);
167            }
168            collect_ids_recursive(body, ids);
169        }
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176    use serde_json::json;
177
178    fn stage(id: &str) -> CompositionNode {
179        CompositionNode::Stage {
180            id: StageId(id.into()),
181            config: None,
182        }
183    }
184
185    #[test]
186    fn serde_stage_round_trip() {
187        let node = stage("abc123");
188        let json = serde_json::to_string(&node).unwrap();
189        let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
190        assert_eq!(node, parsed);
191    }
192
193    #[test]
194    fn serde_sequential() {
195        let node = CompositionNode::Sequential {
196            stages: vec![stage("a"), stage("b"), stage("c")],
197        };
198        let json = serde_json::to_string_pretty(&node).unwrap();
199        let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
200        assert_eq!(node, parsed);
201    }
202
203    #[test]
204    fn serde_parallel() {
205        let mut branches = BTreeMap::new();
206        branches.insert("left".into(), stage("a"));
207        branches.insert("right".into(), stage("b"));
208        let node = CompositionNode::Parallel { branches };
209        let json = serde_json::to_string(&node).unwrap();
210        let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
211        assert_eq!(node, parsed);
212    }
213
214    #[test]
215    fn serde_branch() {
216        let node = CompositionNode::Branch {
217            predicate: Box::new(stage("pred")),
218            if_true: Box::new(stage("yes")),
219            if_false: Box::new(stage("no")),
220        };
221        let json = serde_json::to_string(&node).unwrap();
222        let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
223        assert_eq!(node, parsed);
224    }
225
226    #[test]
227    fn serde_retry() {
228        let node = CompositionNode::Retry {
229            stage: Box::new(stage("fallible")),
230            max_attempts: 3,
231            delay_ms: Some(500),
232        };
233        let json = serde_json::to_string(&node).unwrap();
234        let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
235        assert_eq!(node, parsed);
236    }
237
238    #[test]
239    fn serde_full_graph() {
240        let graph = CompositionGraph::new(
241            "test pipeline",
242            CompositionNode::Sequential {
243                stages: vec![stage("parse"), stage("transform"), stage("output")],
244            },
245        );
246        let json = serde_json::to_string_pretty(&graph).unwrap();
247        let parsed: CompositionGraph = serde_json::from_str(&json).unwrap();
248        assert_eq!(graph, parsed);
249    }
250
251    #[test]
252    fn serde_nested_composition() {
253        let node = CompositionNode::Sequential {
254            stages: vec![
255                stage("input"),
256                CompositionNode::Retry {
257                    stage: Box::new(CompositionNode::Sequential {
258                        stages: vec![stage("a"), stage("b")],
259                    }),
260                    max_attempts: 2,
261                    delay_ms: None,
262                },
263                stage("output"),
264            ],
265        };
266        let json = serde_json::to_string(&node).unwrap();
267        let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
268        assert_eq!(node, parsed);
269    }
270
271    #[test]
272    fn collect_stage_ids_finds_all() {
273        let node = CompositionNode::Sequential {
274            stages: vec![
275                stage("a"),
276                CompositionNode::Parallel {
277                    branches: BTreeMap::from([("x".into(), stage("b")), ("y".into(), stage("c"))]),
278                },
279                stage("d"),
280            ],
281        };
282        let ids = collect_stage_ids(&node);
283        assert_eq!(ids.len(), 4);
284    }
285
286    #[test]
287    fn json_format_is_tagged() {
288        let node = stage("abc123");
289        let v: serde_json::Value = serde_json::to_value(&node).unwrap();
290        assert_eq!(v["op"], json!("Stage"));
291        assert_eq!(v["id"], json!("abc123"));
292    }
293
294    #[test]
295    fn serde_remote_stage_round_trip() {
296        let node = CompositionNode::RemoteStage {
297            url: "http://localhost:8080".into(),
298            input: NType::record([("count", NType::Number)]),
299            output: NType::VNode,
300        };
301        let json = serde_json::to_string(&node).unwrap();
302        let parsed: CompositionNode = serde_json::from_str(&json).unwrap();
303        assert_eq!(node, parsed);
304    }
305
306    #[test]
307    fn remote_stage_json_shape() {
308        let node = CompositionNode::RemoteStage {
309            url: "http://api.example.com".into(),
310            input: NType::Text,
311            output: NType::Number,
312        };
313        let v: serde_json::Value = serde_json::to_value(&node).unwrap();
314        assert_eq!(v["op"], json!("RemoteStage"));
315        assert_eq!(v["url"], json!("http://api.example.com"));
316        assert!(v["input"].is_object());
317        assert!(v["output"].is_object());
318    }
319
320    #[test]
321    fn collect_stage_ids_skips_remote_stage() {
322        let node = CompositionNode::Sequential {
323            stages: vec![
324                stage("local-a"),
325                CompositionNode::RemoteStage {
326                    url: "http://remote".into(),
327                    input: NType::Text,
328                    output: NType::Text,
329                },
330                stage("local-b"),
331            ],
332        };
333        let ids = collect_stage_ids(&node);
334        // Only local stages contribute IDs
335        assert_eq!(ids.len(), 2);
336        assert_eq!(ids[0].0, "local-a");
337        assert_eq!(ids[1].0, "local-b");
338    }
339
340    #[test]
341    fn remote_stage_in_full_graph_serde() {
342        let graph = CompositionGraph::new(
343            "full-stack pipeline",
344            CompositionNode::Sequential {
345                stages: vec![
346                    CompositionNode::RemoteStage {
347                        url: "http://api:8080".into(),
348                        input: NType::record([("query", NType::Text)]),
349                        output: NType::List(Box::new(NType::Text)),
350                    },
351                    stage("render"),
352                ],
353            },
354        );
355        let json = serde_json::to_string_pretty(&graph).unwrap();
356        let parsed: CompositionGraph = serde_json::from_str(&json).unwrap();
357        assert_eq!(graph, parsed);
358    }
359}