1#![warn(clippy::unwrap_used)]
2#![cfg_attr(test, allow(clippy::unwrap_used))]
3
4use super::{ExecutionError, StageExecutor};
5use crate::executor::pure_cache::PureStageCache;
6use crate::lagrange::CompositionNode;
7use crate::trace::{CompositionTrace, StageStatus, StageTrace, TraceStatus};
8use chrono::Utc;
9use noether_core::stage::StageId;
10use serde_json::Value;
11use sha2::{Digest, Sha256};
12use std::time::Instant;
13
14#[derive(Debug)]
16pub struct CompositionResult {
17 pub output: Value,
18 pub trace: CompositionTrace,
19 pub spent_cents: u64,
23}
24
25pub fn run_composition<E: StageExecutor + Sync>(
29 node: &CompositionNode,
30 input: &Value,
31 executor: &E,
32 composition_id: &str,
33) -> Result<CompositionResult, ExecutionError> {
34 run_composition_with_cache(node, input, executor, composition_id, None)
35}
36
37pub fn run_composition_with_cache<E: StageExecutor + Sync>(
39 node: &CompositionNode,
40 input: &Value,
41 executor: &E,
42 composition_id: &str,
43 cache: Option<&mut PureStageCache>,
44) -> Result<CompositionResult, ExecutionError> {
45 let start = Instant::now();
46 let mut stage_traces = Vec::new();
47 let mut step_counter = 0;
48
49 let mut owned_cache;
50 let cache_ref: &mut Option<&mut PureStageCache>;
51 let mut none_holder: Option<&mut PureStageCache> = None;
52
53 if let Some(c) = cache {
54 owned_cache = Some(c);
55 cache_ref = &mut owned_cache;
56 } else {
57 cache_ref = &mut none_holder;
58 }
59
60 let output = execute_node(
61 node,
62 input,
63 executor,
64 &mut stage_traces,
65 &mut step_counter,
66 cache_ref,
67 )?;
68
69 let duration_ms = start.elapsed().as_millis() as u64;
70 let has_failures = stage_traces
71 .iter()
72 .any(|t| matches!(t.status, StageStatus::Failed { .. }));
73
74 let trace = CompositionTrace {
75 composition_id: composition_id.into(),
76 started_at: Utc::now().to_rfc3339(),
77 duration_ms,
78 status: if has_failures {
79 TraceStatus::Failed
80 } else {
81 TraceStatus::Ok
82 },
83 stages: stage_traces,
84 security_events: Vec::new(),
85 warnings: Vec::new(),
86 };
87
88 Ok(CompositionResult {
89 output,
90 trace,
91 spent_cents: 0,
92 })
93}
94
95fn execute_node<E: StageExecutor + Sync>(
96 node: &CompositionNode,
97 input: &Value,
98 executor: &E,
99 traces: &mut Vec<StageTrace>,
100 step_counter: &mut usize,
101 cache: &mut Option<&mut PureStageCache>,
102) -> Result<Value, ExecutionError> {
103 match node {
104 CompositionNode::Stage {
105 id,
106 pinning: _, config,
108 } => {
109 let merged = if let Some(cfg) = config {
110 let mut obj = match input {
111 Value::Object(map) => map.clone(),
112 other => {
113 let mut m = serde_json::Map::new();
114 let data_key = [
115 "items", "text", "data", "input", "records", "train", "document",
116 "html", "csv", "json_str",
117 ]
118 .iter()
119 .find(|k| !cfg.contains_key(**k))
120 .unwrap_or(&"items");
121 m.insert(data_key.to_string(), other.clone());
122 m
123 }
124 };
125 for (k, v) in cfg {
126 obj.insert(k.clone(), v.clone());
127 }
128 Value::Object(obj)
129 } else {
130 input.clone()
131 };
132 execute_stage(id, &merged, executor, traces, step_counter, cache)
133 }
134 CompositionNode::Const { value } => Ok(value.clone()),
135 CompositionNode::Sequential { stages } => {
136 let mut current = input.clone();
137 for stage in stages {
138 current = execute_node(stage, ¤t, executor, traces, step_counter, cache)?;
139 }
140 Ok(current)
141 }
142 CompositionNode::Parallel { branches } => {
143 let branch_data: Vec<(&str, &CompositionNode, Value)> = branches
149 .iter()
150 .map(|(name, branch)| {
151 let branch_input = if let Value::Object(ref obj) = input {
152 obj.get(name).cloned().unwrap_or_else(|| input.clone())
153 } else {
154 input.clone()
155 };
156 (name.as_str(), branch, branch_input)
157 })
158 .collect();
159
160 let branch_results = std::thread::scope(|s| {
169 let handles: Vec<_> = branch_data
170 .iter()
171 .map(|(name, branch, branch_input)| {
172 s.spawn(move || {
173 let mut branch_traces = Vec::new();
174 let mut branch_counter = 0usize;
175 let result = execute_node(
176 branch,
177 branch_input,
178 executor,
179 &mut branch_traces,
180 &mut branch_counter,
181 &mut None,
182 );
183 (*name, result, branch_traces)
184 })
185 })
186 .collect();
187 handles
188 .into_iter()
189 .zip(branch_data.iter())
190 .map(|(h, (name, _, _))| match h.join() {
191 Ok(tuple) => tuple,
192 Err(_panic) => (
193 *name,
194 Err(ExecutionError::StageFailed {
195 stage_id: StageId(format!("parallel:{name}")),
196 message: format!(
197 "parallel branch {name:?} panicked during execution"
198 ),
199 }),
200 Vec::new(),
201 ),
202 })
203 .collect::<Vec<_>>()
204 });
205
206 let mut output_fields = serde_json::Map::new();
207 for (name, result, branch_traces) in branch_results {
208 let branch_output = result?;
209 output_fields.insert(name.to_string(), branch_output);
210 traces.extend(branch_traces);
211 }
212 Ok(Value::Object(output_fields))
213 }
214 CompositionNode::Branch {
215 predicate,
216 if_true,
217 if_false,
218 } => {
219 let pred_result =
220 execute_node(predicate, input, executor, traces, step_counter, cache)?;
221 let condition = match &pred_result {
222 Value::Bool(b) => *b,
223 _ => false,
224 };
225 if condition {
226 execute_node(if_true, input, executor, traces, step_counter, cache)
227 } else {
228 execute_node(if_false, input, executor, traces, step_counter, cache)
229 }
230 }
231 CompositionNode::Fanout { source, targets } => {
232 let source_output = execute_node(source, input, executor, traces, step_counter, cache)?;
233 let mut results = Vec::new();
234 for target in targets {
235 let result = execute_node(
236 target,
237 &source_output,
238 executor,
239 traces,
240 step_counter,
241 cache,
242 )?;
243 results.push(result);
244 }
245 Ok(Value::Array(results))
246 }
247 CompositionNode::Merge { sources, target } => {
248 let mut merged = serde_json::Map::new();
249 for (i, source) in sources.iter().enumerate() {
250 let source_input = if let Value::Object(ref obj) = input {
251 obj.get(&format!("source_{i}"))
252 .cloned()
253 .unwrap_or(Value::Null)
254 } else {
255 input.clone()
256 };
257 let result =
258 execute_node(source, &source_input, executor, traces, step_counter, cache)?;
259 merged.insert(format!("source_{i}"), result);
260 }
261 execute_node(
262 target,
263 &Value::Object(merged),
264 executor,
265 traces,
266 step_counter,
267 cache,
268 )
269 }
270 CompositionNode::Retry {
271 stage,
272 max_attempts,
273 ..
274 } => {
275 let mut last_err = None;
276 for _ in 0..*max_attempts {
277 match execute_node(stage, input, executor, traces, step_counter, cache) {
278 Ok(output) => return Ok(output),
279 Err(e) => last_err = Some(e),
280 }
281 }
282 Err(last_err.unwrap_or(ExecutionError::RetryExhausted {
283 stage_id: StageId("unknown".into()),
284 attempts: *max_attempts,
285 }))
286 }
287 CompositionNode::RemoteStage { url, .. } => execute_remote_stage(url, input),
288 CompositionNode::Let { bindings, body } => {
289 let bindings_vec: Vec<(&str, &CompositionNode)> =
292 bindings.iter().map(|(n, b)| (n.as_str(), b)).collect();
293
294 let binding_results = std::thread::scope(|s| {
298 let handles: Vec<_> = bindings_vec
299 .iter()
300 .map(|(name, node)| {
301 s.spawn(move || {
302 let mut bt = Vec::new();
303 let mut bc = 0usize;
304 let r =
305 execute_node(node, input, executor, &mut bt, &mut bc, &mut None);
306 (*name, r, bt)
307 })
308 })
309 .collect();
310 handles
311 .into_iter()
312 .zip(bindings_vec.iter())
313 .map(|(h, (name, _))| match h.join() {
314 Ok(tuple) => tuple,
315 Err(_panic) => (
316 *name,
317 Err(ExecutionError::StageFailed {
318 stage_id: StageId(format!("let:{name}")),
319 message: format!("let binding {name:?} panicked during execution"),
320 }),
321 Vec::new(),
322 ),
323 })
324 .collect::<Vec<_>>()
325 });
326
327 let mut merged = match input {
329 Value::Object(map) => map.clone(),
330 _ => serde_json::Map::new(),
331 };
332 for (name, result, branch_traces) in binding_results {
333 let value = result?;
334 merged.insert(name.to_string(), value);
335 traces.extend(branch_traces);
336 }
337
338 let body_input = Value::Object(merged);
339 execute_node(body, &body_input, executor, traces, step_counter, cache)
340 }
341 }
342}
343
344fn execute_stage<E: StageExecutor + Sync>(
345 id: &StageId,
346 input: &Value,
347 executor: &E,
348 traces: &mut Vec<StageTrace>,
349 step_counter: &mut usize,
350 cache: &mut Option<&mut PureStageCache>,
351) -> Result<Value, ExecutionError> {
352 let step_index = *step_counter;
353 *step_counter += 1;
354 let start = Instant::now();
355
356 let input_hash = hash_value(input);
357
358 if let Some(ref mut c) = cache {
360 if let Some(cached_output) = c.get(id, input) {
361 let output = cached_output.clone();
362 let duration_ms = start.elapsed().as_millis() as u64;
363 traces.push(StageTrace {
364 stage_id: id.clone(),
365 step_index,
366 status: StageStatus::Ok,
367 duration_ms,
368 input_hash: Some(input_hash),
369 output_hash: Some(hash_value(&output)),
370 });
371 return Ok(output);
372 }
373 }
374
375 match executor.execute(id, input) {
376 Ok(output) => {
377 let output_hash = hash_value(&output);
378 let duration_ms = start.elapsed().as_millis() as u64;
379 traces.push(StageTrace {
380 stage_id: id.clone(),
381 step_index,
382 status: StageStatus::Ok,
383 duration_ms,
384 input_hash: Some(input_hash),
385 output_hash: Some(output_hash),
386 });
387 if let Some(ref mut c) = cache {
389 c.put(id, input, output.clone());
390 }
391 Ok(output)
392 }
393 Err(e) => {
394 let duration_ms = start.elapsed().as_millis() as u64;
395 traces.push(StageTrace {
396 stage_id: id.clone(),
397 step_index,
398 status: StageStatus::Failed {
399 code: "EXECUTION_ERROR".into(),
400 message: format!("{e}"),
401 },
402 duration_ms,
403 input_hash: Some(input_hash),
404 output_hash: None,
405 });
406 Err(e)
407 }
408 }
409}
410
411fn hash_value(value: &Value) -> String {
412 let bytes = serde_json::to_vec(value).unwrap_or_default();
413 let hash = Sha256::digest(&bytes);
414 hex::encode(hash)
415}
416
417fn execute_remote_stage(url: &str, input: &Value) -> Result<Value, ExecutionError> {
425 #[cfg(feature = "native")]
426 {
427 use reqwest::blocking::Client;
428
429 let client = Client::new();
430 let body = serde_json::json!({ "input": input });
431 let resp =
432 client
433 .post(url)
434 .json(&body)
435 .send()
436 .map_err(|e| ExecutionError::RemoteCallFailed {
437 url: url.to_string(),
438 reason: e.to_string(),
439 })?;
440
441 let resp_json: Value = resp.json().map_err(|e| ExecutionError::RemoteCallFailed {
442 url: url.to_string(),
443 reason: format!("invalid JSON response: {e}"),
444 })?;
445
446 if resp_json.get("ok") == Some(&Value::Bool(false)) {
451 let reason = resp_json
452 .get("error")
453 .and_then(|e| e.as_str())
454 .unwrap_or("remote reported ok=false without error message")
455 .to_string();
456 return Err(ExecutionError::RemoteCallFailed {
457 url: url.to_string(),
458 reason,
459 });
460 }
461 resp_json
462 .get("data")
463 .and_then(|d| d.get("output"))
464 .cloned()
465 .ok_or_else(|| ExecutionError::RemoteCallFailed {
466 url: url.to_string(),
467 reason: "response missing data.output field".to_string(),
468 })
469 }
470 #[cfg(not(feature = "native"))]
471 {
472 let _ = (url, input);
473 Err(ExecutionError::RemoteCallFailed {
474 url: url.to_string(),
475 reason: "remote calls are handled by the JS runtime in WASM builds".to_string(),
476 })
477 }
478}
479
480#[cfg(test)]
481mod tests {
482 use super::*;
483 use crate::executor::mock::MockExecutor;
484 use serde_json::json;
485 use std::collections::BTreeMap;
486
487 fn stage(id: &str) -> CompositionNode {
488 CompositionNode::Stage {
489 id: StageId(id.into()),
490 pinning: crate::lagrange::Pinning::Signature,
491 config: None,
492 }
493 }
494
495 #[test]
496 fn run_single_stage() {
497 let executor = MockExecutor::new().with_output(&StageId("a".into()), json!(42));
498 let result = run_composition(&stage("a"), &json!("input"), &executor, "test_comp").unwrap();
499 assert_eq!(result.output, json!(42));
500 assert_eq!(result.trace.stages.len(), 1);
501 assert!(matches!(result.trace.status, TraceStatus::Ok));
502 }
503
504 #[test]
505 fn run_sequential() {
506 let executor = MockExecutor::new()
507 .with_output(&StageId("a".into()), json!("mid"))
508 .with_output(&StageId("b".into()), json!("final"));
509 let node = CompositionNode::Sequential {
510 stages: vec![stage("a"), stage("b")],
511 };
512 let result = run_composition(&node, &json!("start"), &executor, "test").unwrap();
513 assert_eq!(result.output, json!("final"));
514 assert_eq!(result.trace.stages.len(), 2);
515 }
516
517 #[test]
518 fn run_parallel() {
519 let executor = MockExecutor::new()
520 .with_output(&StageId("s1".into()), json!("r1"))
521 .with_output(&StageId("s2".into()), json!("r2"));
522 let node = CompositionNode::Parallel {
523 branches: BTreeMap::from([("left".into(), stage("s1")), ("right".into(), stage("s2"))]),
524 };
525 let result = run_composition(&node, &json!({}), &executor, "test").unwrap();
526 assert_eq!(result.output, json!({"left": "r1", "right": "r2"}));
527 }
528
529 #[test]
530 fn run_branch_true() {
531 let executor = MockExecutor::new()
532 .with_output(&StageId("pred".into()), json!(true))
533 .with_output(&StageId("yes".into()), json!("YES"))
534 .with_output(&StageId("no".into()), json!("NO"));
535 let node = CompositionNode::Branch {
536 predicate: Box::new(stage("pred")),
537 if_true: Box::new(stage("yes")),
538 if_false: Box::new(stage("no")),
539 };
540 let result = run_composition(&node, &json!("input"), &executor, "test").unwrap();
541 assert_eq!(result.output, json!("YES"));
542 }
543
544 #[test]
545 fn run_branch_false() {
546 let executor = MockExecutor::new()
547 .with_output(&StageId("pred".into()), json!(false))
548 .with_output(&StageId("yes".into()), json!("YES"))
549 .with_output(&StageId("no".into()), json!("NO"));
550 let node = CompositionNode::Branch {
551 predicate: Box::new(stage("pred")),
552 if_true: Box::new(stage("yes")),
553 if_false: Box::new(stage("no")),
554 };
555 let result = run_composition(&node, &json!("input"), &executor, "test").unwrap();
556 assert_eq!(result.output, json!("NO"));
557 }
558
559 #[test]
560 fn run_fanout() {
561 let executor = MockExecutor::new()
562 .with_output(&StageId("src".into()), json!("data"))
563 .with_output(&StageId("t1".into()), json!("r1"))
564 .with_output(&StageId("t2".into()), json!("r2"));
565 let node = CompositionNode::Fanout {
566 source: Box::new(stage("src")),
567 targets: vec![stage("t1"), stage("t2")],
568 };
569 let result = run_composition(&node, &json!("in"), &executor, "test").unwrap();
570 assert_eq!(result.output, json!(["r1", "r2"]));
571 }
572
573 struct PanickingExecutor {
576 panic_on: String,
577 }
578
579 impl StageExecutor for PanickingExecutor {
580 fn execute(&self, stage_id: &StageId, _input: &Value) -> Result<Value, ExecutionError> {
581 if stage_id.0 == self.panic_on {
582 panic!("intentional test panic");
583 }
584 Ok(Value::Null)
585 }
586 }
587
588 #[test]
589 fn parallel_branch_panic_becomes_execution_error() {
590 let executor = PanickingExecutor {
593 panic_on: "boom".into(),
594 };
595 let node = CompositionNode::Parallel {
596 branches: BTreeMap::from([
597 ("left".into(), stage("boom")),
598 ("right".into(), stage("ok")),
599 ]),
600 };
601 let result = run_composition(&node, &json!({}), &executor, "test");
602 assert!(
603 matches!(
604 result,
605 Err(ExecutionError::StageFailed { ref message, .. })
606 if message.contains("panicked")
607 ),
608 "expected StageFailed with panic marker, got: {result:?}"
609 );
610 }
611
612 #[test]
613 fn trace_has_input_output_hashes() {
614 let executor = MockExecutor::new().with_output(&StageId("a".into()), json!(42));
615 let result = run_composition(&stage("a"), &json!("input"), &executor, "test").unwrap();
616 assert!(result.trace.stages[0].input_hash.is_some());
617 assert!(result.trace.stages[0].output_hash.is_some());
618 }
619}