Skip to main content

noether_engine/executor/
runner.rs

1#![warn(clippy::unwrap_used)]
2#![cfg_attr(test, allow(clippy::unwrap_used))]
3
4use super::{ExecutionError, StageExecutor};
5use crate::executor::pure_cache::PureStageCache;
6use crate::lagrange::CompositionNode;
7use crate::trace::{CompositionTrace, StageStatus, StageTrace, TraceStatus};
8use chrono::Utc;
9use noether_core::stage::StageId;
10use serde_json::Value;
11use sha2::{Digest, Sha256};
12use std::time::Instant;
13
14/// Result of executing a composition graph.
15#[derive(Debug)]
16pub struct CompositionResult {
17    pub output: Value,
18    pub trace: CompositionTrace,
19    /// Actual cost consumed during this run in cents (sum of declared
20    /// `Effect::Cost` for every stage that executed). Zero when no budget
21    /// tracking was requested.
22    pub spent_cents: u64,
23}
24
25/// Execute a composition graph using the provided executor.
26///
27/// Pass a `PureStageCache` to enable Pure-stage output caching within this run.
28pub fn run_composition<E: StageExecutor + Sync>(
29    node: &CompositionNode,
30    input: &Value,
31    executor: &E,
32    composition_id: &str,
33) -> Result<CompositionResult, ExecutionError> {
34    run_composition_with_cache(node, input, executor, composition_id, None)
35}
36
37/// Like `run_composition` but accepts an explicit `PureStageCache`.
38pub fn run_composition_with_cache<E: StageExecutor + Sync>(
39    node: &CompositionNode,
40    input: &Value,
41    executor: &E,
42    composition_id: &str,
43    cache: Option<&mut PureStageCache>,
44) -> Result<CompositionResult, ExecutionError> {
45    let start = Instant::now();
46    let mut stage_traces = Vec::new();
47    let mut step_counter = 0;
48
49    let mut owned_cache;
50    let cache_ref: &mut Option<&mut PureStageCache>;
51    let mut none_holder: Option<&mut PureStageCache> = None;
52
53    if let Some(c) = cache {
54        owned_cache = Some(c);
55        cache_ref = &mut owned_cache;
56    } else {
57        cache_ref = &mut none_holder;
58    }
59
60    let output = execute_node(
61        node,
62        input,
63        executor,
64        &mut stage_traces,
65        &mut step_counter,
66        cache_ref,
67    )?;
68
69    let duration_ms = start.elapsed().as_millis() as u64;
70    let has_failures = stage_traces
71        .iter()
72        .any(|t| matches!(t.status, StageStatus::Failed { .. }));
73
74    let trace = CompositionTrace {
75        composition_id: composition_id.into(),
76        started_at: Utc::now().to_rfc3339(),
77        duration_ms,
78        status: if has_failures {
79            TraceStatus::Failed
80        } else {
81            TraceStatus::Ok
82        },
83        stages: stage_traces,
84        security_events: Vec::new(),
85        warnings: Vec::new(),
86    };
87
88    Ok(CompositionResult {
89        output,
90        trace,
91        spent_cents: 0,
92    })
93}
94
95fn execute_node<E: StageExecutor + Sync>(
96    node: &CompositionNode,
97    input: &Value,
98    executor: &E,
99    traces: &mut Vec<StageTrace>,
100    step_counter: &mut usize,
101    cache: &mut Option<&mut PureStageCache>,
102) -> Result<Value, ExecutionError> {
103    match node {
104        CompositionNode::Stage {
105            id,
106            pinning: _, // resolved upstream by checker / planner
107            config,
108        } => {
109            let merged = if let Some(cfg) = config {
110                let mut obj = match input {
111                    Value::Object(map) => map.clone(),
112                    other => {
113                        let mut m = serde_json::Map::new();
114                        let data_key = [
115                            "items", "text", "data", "input", "records", "train", "document",
116                            "html", "csv", "json_str",
117                        ]
118                        .iter()
119                        .find(|k| !cfg.contains_key(**k))
120                        .unwrap_or(&"items");
121                        m.insert(data_key.to_string(), other.clone());
122                        m
123                    }
124                };
125                for (k, v) in cfg {
126                    obj.insert(k.clone(), v.clone());
127                }
128                Value::Object(obj)
129            } else {
130                input.clone()
131            };
132            execute_stage(id, &merged, executor, traces, step_counter, cache)
133        }
134        CompositionNode::Const { value } => Ok(value.clone()),
135        CompositionNode::Sequential { stages } => {
136            let mut current = input.clone();
137            for stage in stages {
138                current = execute_node(stage, &current, executor, traces, step_counter, cache)?;
139            }
140            Ok(current)
141        }
142        CompositionNode::Parallel { branches } => {
143            // Resolve each branch's input before spawning (pure field lookup).
144            // If the input is a Record containing the branch name as a key,
145            // that field's value is passed to the branch. Otherwise the full
146            // input is passed — this lets Stage branches receive the pipeline
147            // input naturally while Const branches ignore it entirely.
148            let branch_data: Vec<(&str, &CompositionNode, Value)> = branches
149                .iter()
150                .map(|(name, branch)| {
151                    let branch_input = if let Value::Object(ref obj) = input {
152                        obj.get(name).cloned().unwrap_or_else(|| input.clone())
153                    } else {
154                        input.clone()
155                    };
156                    (name.as_str(), branch, branch_input)
157                })
158                .collect();
159
160            // Execute all branches concurrently. Each branch gets its own
161            // trace list; the Pure cache is NOT shared across parallel branches
162            // to avoid any locking overhead.
163            //
164            // A panicking worker thread surfaces as a typed
165            // `ExecutionError::StageFailed` rather than propagating the
166            // panic — CLI callers rely on `Result` propagation to emit
167            // structured ACLI errors with non-zero exit codes.
168            let branch_results = std::thread::scope(|s| {
169                let handles: Vec<_> = branch_data
170                    .iter()
171                    .map(|(name, branch, branch_input)| {
172                        s.spawn(move || {
173                            let mut branch_traces = Vec::new();
174                            let mut branch_counter = 0usize;
175                            let result = execute_node(
176                                branch,
177                                branch_input,
178                                executor,
179                                &mut branch_traces,
180                                &mut branch_counter,
181                                &mut None,
182                            );
183                            (*name, result, branch_traces)
184                        })
185                    })
186                    .collect();
187                handles
188                    .into_iter()
189                    .zip(branch_data.iter())
190                    .map(|(h, (name, _, _))| match h.join() {
191                        Ok(tuple) => tuple,
192                        Err(_panic) => (
193                            *name,
194                            Err(ExecutionError::StageFailed {
195                                stage_id: StageId(format!("parallel:{name}")),
196                                message: format!(
197                                    "parallel branch {name:?} panicked during execution"
198                                ),
199                            }),
200                            Vec::new(),
201                        ),
202                    })
203                    .collect::<Vec<_>>()
204            });
205
206            let mut output_fields = serde_json::Map::new();
207            for (name, result, branch_traces) in branch_results {
208                let branch_output = result?;
209                output_fields.insert(name.to_string(), branch_output);
210                traces.extend(branch_traces);
211            }
212            Ok(Value::Object(output_fields))
213        }
214        CompositionNode::Branch {
215            predicate,
216            if_true,
217            if_false,
218        } => {
219            let pred_result =
220                execute_node(predicate, input, executor, traces, step_counter, cache)?;
221            let condition = match &pred_result {
222                Value::Bool(b) => *b,
223                _ => false,
224            };
225            if condition {
226                execute_node(if_true, input, executor, traces, step_counter, cache)
227            } else {
228                execute_node(if_false, input, executor, traces, step_counter, cache)
229            }
230        }
231        CompositionNode::Fanout { source, targets } => {
232            let source_output = execute_node(source, input, executor, traces, step_counter, cache)?;
233            let mut results = Vec::new();
234            for target in targets {
235                let result = execute_node(
236                    target,
237                    &source_output,
238                    executor,
239                    traces,
240                    step_counter,
241                    cache,
242                )?;
243                results.push(result);
244            }
245            Ok(Value::Array(results))
246        }
247        CompositionNode::Merge { sources, target } => {
248            let mut merged = serde_json::Map::new();
249            for (i, source) in sources.iter().enumerate() {
250                let source_input = if let Value::Object(ref obj) = input {
251                    obj.get(&format!("source_{i}"))
252                        .cloned()
253                        .unwrap_or(Value::Null)
254                } else {
255                    input.clone()
256                };
257                let result =
258                    execute_node(source, &source_input, executor, traces, step_counter, cache)?;
259                merged.insert(format!("source_{i}"), result);
260            }
261            execute_node(
262                target,
263                &Value::Object(merged),
264                executor,
265                traces,
266                step_counter,
267                cache,
268            )
269        }
270        CompositionNode::Retry {
271            stage,
272            max_attempts,
273            ..
274        } => {
275            let mut last_err = None;
276            for _ in 0..*max_attempts {
277                match execute_node(stage, input, executor, traces, step_counter, cache) {
278                    Ok(output) => return Ok(output),
279                    Err(e) => last_err = Some(e),
280                }
281            }
282            Err(last_err.unwrap_or(ExecutionError::RetryExhausted {
283                stage_id: StageId("unknown".into()),
284                attempts: *max_attempts,
285            }))
286        }
287        CompositionNode::RemoteStage { url, .. } => execute_remote_stage(url, input),
288        CompositionNode::Let { bindings, body } => {
289            // Execute bindings concurrently — each receives the outer input.
290            // Then merge: outer-input record fields + binding name → output.
291            let bindings_vec: Vec<(&str, &CompositionNode)> =
292                bindings.iter().map(|(n, b)| (n.as_str(), b)).collect();
293
294            // A panicking binding thread surfaces as a typed
295            // `ExecutionError::StageFailed` rather than propagating the
296            // panic (same rationale as Parallel above).
297            let binding_results = std::thread::scope(|s| {
298                let handles: Vec<_> = bindings_vec
299                    .iter()
300                    .map(|(name, node)| {
301                        s.spawn(move || {
302                            let mut bt = Vec::new();
303                            let mut bc = 0usize;
304                            let r =
305                                execute_node(node, input, executor, &mut bt, &mut bc, &mut None);
306                            (*name, r, bt)
307                        })
308                    })
309                    .collect();
310                handles
311                    .into_iter()
312                    .zip(bindings_vec.iter())
313                    .map(|(h, (name, _))| match h.join() {
314                        Ok(tuple) => tuple,
315                        Err(_panic) => (
316                            *name,
317                            Err(ExecutionError::StageFailed {
318                                stage_id: StageId(format!("let:{name}")),
319                                message: format!("let binding {name:?} panicked during execution"),
320                            }),
321                            Vec::new(),
322                        ),
323                    })
324                    .collect::<Vec<_>>()
325            });
326
327            // Start the merged record from the outer input (when it is one).
328            let mut merged = match input {
329                Value::Object(map) => map.clone(),
330                _ => serde_json::Map::new(),
331            };
332            for (name, result, branch_traces) in binding_results {
333                let value = result?;
334                merged.insert(name.to_string(), value);
335                traces.extend(branch_traces);
336            }
337
338            let body_input = Value::Object(merged);
339            execute_node(body, &body_input, executor, traces, step_counter, cache)
340        }
341    }
342}
343
344fn execute_stage<E: StageExecutor + Sync>(
345    id: &StageId,
346    input: &Value,
347    executor: &E,
348    traces: &mut Vec<StageTrace>,
349    step_counter: &mut usize,
350    cache: &mut Option<&mut PureStageCache>,
351) -> Result<Value, ExecutionError> {
352    let step_index = *step_counter;
353    *step_counter += 1;
354    let start = Instant::now();
355
356    let input_hash = hash_value(input);
357
358    // Pure cache check: skip execution if we have a cached output for this stage + input.
359    if let Some(ref mut c) = cache {
360        if let Some(cached_output) = c.get(id, input) {
361            let output = cached_output.clone();
362            let duration_ms = start.elapsed().as_millis() as u64;
363            traces.push(StageTrace {
364                stage_id: id.clone(),
365                step_index,
366                status: StageStatus::Ok,
367                duration_ms,
368                input_hash: Some(input_hash),
369                output_hash: Some(hash_value(&output)),
370            });
371            return Ok(output);
372        }
373    }
374
375    match executor.execute(id, input) {
376        Ok(output) => {
377            let output_hash = hash_value(&output);
378            let duration_ms = start.elapsed().as_millis() as u64;
379            traces.push(StageTrace {
380                stage_id: id.clone(),
381                step_index,
382                status: StageStatus::Ok,
383                duration_ms,
384                input_hash: Some(input_hash),
385                output_hash: Some(output_hash),
386            });
387            // Store result in Pure cache for future calls within this run.
388            if let Some(ref mut c) = cache {
389                c.put(id, input, output.clone());
390            }
391            Ok(output)
392        }
393        Err(e) => {
394            let duration_ms = start.elapsed().as_millis() as u64;
395            traces.push(StageTrace {
396                stage_id: id.clone(),
397                step_index,
398                status: StageStatus::Failed {
399                    code: "EXECUTION_ERROR".into(),
400                    message: format!("{e}"),
401                },
402                duration_ms,
403                input_hash: Some(input_hash),
404                output_hash: None,
405            });
406            Err(e)
407        }
408    }
409}
410
411fn hash_value(value: &Value) -> String {
412    let bytes = serde_json::to_vec(value).unwrap_or_default();
413    let hash = Sha256::digest(&bytes);
414    hex::encode(hash)
415}
416
417/// Execute a remote Noether API call via HTTP POST.
418///
419/// Sends `{"input": <value>}` to `url` and extracts the output from the
420/// ACLI response envelope `{"data": {"output": <value>}}`.
421///
422/// In native builds this uses `reqwest::blocking`. In WASM builds this
423/// function returns an error — remote calls are handled by the JS runtime.
424fn execute_remote_stage(url: &str, input: &Value) -> Result<Value, ExecutionError> {
425    #[cfg(feature = "native")]
426    {
427        use reqwest::blocking::Client;
428
429        let client = Client::new();
430        let body = serde_json::json!({ "input": input });
431        let resp =
432            client
433                .post(url)
434                .json(&body)
435                .send()
436                .map_err(|e| ExecutionError::RemoteCallFailed {
437                    url: url.to_string(),
438                    reason: e.to_string(),
439                })?;
440
441        let resp_json: Value = resp.json().map_err(|e| ExecutionError::RemoteCallFailed {
442            url: url.to_string(),
443            reason: format!("invalid JSON response: {e}"),
444        })?;
445
446        // ACLI envelope: {"ok": true, "data": {"output": ...}} on success,
447        // {"ok": false, "error": "..."} on failure. Check `ok` first so a
448        // worker-side error (e.g. stage not found) surfaces verbatim
449        // instead of being masked as "missing data.output".
450        if resp_json.get("ok") == Some(&Value::Bool(false)) {
451            let reason = resp_json
452                .get("error")
453                .and_then(|e| e.as_str())
454                .unwrap_or("remote reported ok=false without error message")
455                .to_string();
456            return Err(ExecutionError::RemoteCallFailed {
457                url: url.to_string(),
458                reason,
459            });
460        }
461        resp_json
462            .get("data")
463            .and_then(|d| d.get("output"))
464            .cloned()
465            .ok_or_else(|| ExecutionError::RemoteCallFailed {
466                url: url.to_string(),
467                reason: "response missing data.output field".to_string(),
468            })
469    }
470    #[cfg(not(feature = "native"))]
471    {
472        let _ = (url, input);
473        Err(ExecutionError::RemoteCallFailed {
474            url: url.to_string(),
475            reason: "remote calls are handled by the JS runtime in WASM builds".to_string(),
476        })
477    }
478}
479
480#[cfg(test)]
481mod tests {
482    use super::*;
483    use crate::executor::mock::MockExecutor;
484    use serde_json::json;
485    use std::collections::BTreeMap;
486
487    fn stage(id: &str) -> CompositionNode {
488        CompositionNode::Stage {
489            id: StageId(id.into()),
490            pinning: crate::lagrange::Pinning::Signature,
491            config: None,
492        }
493    }
494
495    #[test]
496    fn run_single_stage() {
497        let executor = MockExecutor::new().with_output(&StageId("a".into()), json!(42));
498        let result = run_composition(&stage("a"), &json!("input"), &executor, "test_comp").unwrap();
499        assert_eq!(result.output, json!(42));
500        assert_eq!(result.trace.stages.len(), 1);
501        assert!(matches!(result.trace.status, TraceStatus::Ok));
502    }
503
504    #[test]
505    fn run_sequential() {
506        let executor = MockExecutor::new()
507            .with_output(&StageId("a".into()), json!("mid"))
508            .with_output(&StageId("b".into()), json!("final"));
509        let node = CompositionNode::Sequential {
510            stages: vec![stage("a"), stage("b")],
511        };
512        let result = run_composition(&node, &json!("start"), &executor, "test").unwrap();
513        assert_eq!(result.output, json!("final"));
514        assert_eq!(result.trace.stages.len(), 2);
515    }
516
517    #[test]
518    fn run_parallel() {
519        let executor = MockExecutor::new()
520            .with_output(&StageId("s1".into()), json!("r1"))
521            .with_output(&StageId("s2".into()), json!("r2"));
522        let node = CompositionNode::Parallel {
523            branches: BTreeMap::from([("left".into(), stage("s1")), ("right".into(), stage("s2"))]),
524        };
525        let result = run_composition(&node, &json!({}), &executor, "test").unwrap();
526        assert_eq!(result.output, json!({"left": "r1", "right": "r2"}));
527    }
528
529    #[test]
530    fn run_branch_true() {
531        let executor = MockExecutor::new()
532            .with_output(&StageId("pred".into()), json!(true))
533            .with_output(&StageId("yes".into()), json!("YES"))
534            .with_output(&StageId("no".into()), json!("NO"));
535        let node = CompositionNode::Branch {
536            predicate: Box::new(stage("pred")),
537            if_true: Box::new(stage("yes")),
538            if_false: Box::new(stage("no")),
539        };
540        let result = run_composition(&node, &json!("input"), &executor, "test").unwrap();
541        assert_eq!(result.output, json!("YES"));
542    }
543
544    #[test]
545    fn run_branch_false() {
546        let executor = MockExecutor::new()
547            .with_output(&StageId("pred".into()), json!(false))
548            .with_output(&StageId("yes".into()), json!("YES"))
549            .with_output(&StageId("no".into()), json!("NO"));
550        let node = CompositionNode::Branch {
551            predicate: Box::new(stage("pred")),
552            if_true: Box::new(stage("yes")),
553            if_false: Box::new(stage("no")),
554        };
555        let result = run_composition(&node, &json!("input"), &executor, "test").unwrap();
556        assert_eq!(result.output, json!("NO"));
557    }
558
559    #[test]
560    fn run_fanout() {
561        let executor = MockExecutor::new()
562            .with_output(&StageId("src".into()), json!("data"))
563            .with_output(&StageId("t1".into()), json!("r1"))
564            .with_output(&StageId("t2".into()), json!("r2"));
565        let node = CompositionNode::Fanout {
566            source: Box::new(stage("src")),
567            targets: vec![stage("t1"), stage("t2")],
568        };
569        let result = run_composition(&node, &json!("in"), &executor, "test").unwrap();
570        assert_eq!(result.output, json!(["r1", "r2"]));
571    }
572
573    /// Executor that panics for a configured stage id. Used to exercise the
574    /// parallel/Let branch panic-to-error conversion.
575    struct PanickingExecutor {
576        panic_on: String,
577    }
578
579    impl StageExecutor for PanickingExecutor {
580        fn execute(&self, stage_id: &StageId, _input: &Value) -> Result<Value, ExecutionError> {
581            if stage_id.0 == self.panic_on {
582                panic!("intentional test panic");
583            }
584            Ok(Value::Null)
585        }
586    }
587
588    #[test]
589    fn parallel_branch_panic_becomes_execution_error() {
590        // A panicking stage inside a parallel branch must surface as a typed
591        // ExecutionError — not propagate the panic up to the CLI process.
592        let executor = PanickingExecutor {
593            panic_on: "boom".into(),
594        };
595        let node = CompositionNode::Parallel {
596            branches: BTreeMap::from([
597                ("left".into(), stage("boom")),
598                ("right".into(), stage("ok")),
599            ]),
600        };
601        let result = run_composition(&node, &json!({}), &executor, "test");
602        assert!(
603            matches!(
604                result,
605                Err(ExecutionError::StageFailed { ref message, .. })
606                    if message.contains("panicked")
607            ),
608            "expected StageFailed with panic marker, got: {result:?}"
609        );
610    }
611
612    #[test]
613    fn trace_has_input_output_hashes() {
614        let executor = MockExecutor::new().with_output(&StageId("a".into()), json!(42));
615        let result = run_composition(&stage("a"), &json!("input"), &executor, "test").unwrap();
616        assert!(result.trace.stages[0].input_hash.is_some());
617        assert!(result.trace.stages[0].output_hash.is_some());
618    }
619}