Skip to main content

cruxx_script/
runner.rs

1/// Pipeline runner — interprets a parsed YAML pipeline against CruxCtx + HandlerRegistry.
2use std::sync::{Arc, Mutex};
3
4use cruxx_core::prelude::*;
5use serde_json::Value;
6
7use crate::expr::{ExprContext, ExprError, StepResult};
8use crate::registry::HandlerRegistry;
9use crate::schema::{BudgetDef, PipelineDef, SpeculateMode, StepDef};
10
11/// Executes parsed pipelines against a handler registry.
12pub struct Runner {
13    registry: Arc<HandlerRegistry>,
14}
15
16impl Runner {
17    pub fn new(registry: Arc<HandlerRegistry>) -> Self {
18        Self { registry }
19    }
20
21    /// Run a pipeline definition with the given input, producing a full Crux trace.
22    pub async fn run(&self, pipeline: &PipelineDef, input: Value) -> Crux<Value> {
23        let mut ctx = CruxCtx::new(&pipeline.pipeline);
24
25        if let Some(budget_def) = &pipeline.budget {
26            ctx.set_budget(budget_from_def(budget_def));
27        }
28
29        let result = self.execute_steps(&mut ctx, &pipeline.steps, input).await;
30        ctx.finalize(result)
31    }
32
33    async fn execute_steps(
34        &self,
35        ctx: &mut CruxCtx,
36        steps: &[StepDef],
37        input: Value,
38    ) -> Result<Value, CruxErr> {
39        let mut expr_ctx = ExprContext::new(input.clone());
40        let mut last_output = input;
41
42        for step_def in steps {
43            last_output = self
44                .execute_step(ctx, step_def, &last_output, &mut expr_ctx)
45                .await?;
46        }
47
48        Ok(last_output)
49    }
50
51    async fn execute_step(
52        &self,
53        ctx: &mut CruxCtx,
54        step_def: &StepDef,
55        current_input: &Value,
56        expr_ctx: &mut ExprContext,
57    ) -> Result<Value, CruxErr> {
58        match step_def {
59            StepDef::Step(node) => {
60                let handler_name = node.handler.as_deref().unwrap_or(&node.step);
61                let handler = self
62                    .registry
63                    .get_handler(handler_name)
64                    .ok_or_else(|| {
65                        CruxErr::step_failed(
66                            &node.step,
67                            format!("handler not found: {handler_name}"),
68                        )
69                    })?
70                    .clone();
71                // Merge static step args into the current input under the "args" key.
72                // Template strings (`{{ input.field }}`, `{{ steps.X.output.field }}`) in
73                // args string values are expanded against the current ExprContext before merge.
74                let input = if let Some(step_args) = &node.args {
75                    let expanded = expand_args(step_args.clone(), expr_ctx);
76                    let mut merged = current_input.clone();
77                    if let Value::Object(ref mut map) = merged {
78                        map.insert("args".to_string(), expanded);
79                    } else {
80                        merged = serde_json::json!({ "args": expanded, "input": current_input });
81                    }
82                    merged
83                } else {
84                    current_input.clone()
85                };
86                // Invoke the handler once, capturing both value and confidence.
87                // Preserve `None` so that `route_on_confidence` can distinguish "no score"
88                // (handler_value steps) from an explicit 1.0 — see ExprError::NoConfidence.
89                let raw = handler(input.clone()).await?;
90                let confidence = raw.confidence;
91                let value = raw.value;
92
93                // Record the step in the trace (closure returns the pre-computed value).
94                let handler_out = ctx
95                    .step(&node.step, || {
96                        let v = value.clone();
97                        async move { Ok::<Value, CruxErr>(v) }
98                    })
99                    .await?;
100
101                expr_ctx.steps.insert(
102                    node.step.clone(),
103                    StepResult {
104                        output: handler_out.clone(),
105                        confidence,
106                    },
107                );
108                Ok(handler_out)
109            }
110
111            StepDef::Delegate(node) => {
112                let step_name = node.name.as_deref().unwrap_or(&node.delegate);
113                let agent_runner = self
114                    .registry
115                    .get_agent(&node.delegate)
116                    .ok_or_else(|| {
117                        CruxErr::step_failed(
118                            step_name,
119                            format!("agent not found: {}", node.delegate),
120                        )
121                    })?
122                    .clone();
123
124                let input = current_input.clone();
125                let result = agent_runner(input).await;
126
127                // Record the delegation step in parent
128                let output = ctx.step(step_name, || async { result }).await?;
129
130                expr_ctx.steps.insert(
131                    step_name.to_string(),
132                    StepResult {
133                        output: output.clone(),
134                        confidence: None,
135                    },
136                );
137                Ok(output)
138            }
139
140            StepDef::Pipe(node) => {
141                let registry = self.registry.clone();
142
143                // One confidence cell per stage; the last stage's confidence wins.
144                let confidence_cells: Vec<Arc<Mutex<Option<f32>>>> = node
145                    .stages
146                    .iter()
147                    .map(|_| Arc::new(Mutex::new(None)))
148                    .collect();
149
150                #[allow(clippy::type_complexity)]
151                let stages: Vec<(
152                    &str,
153                    Box<dyn FnOnce(Value) -> BoxFut<Value> + Send>,
154                )> = node
155                    .stages
156                    .iter()
157                    .zip(confidence_cells.iter())
158                    .map(|(arm, cell)| {
159                        let handler = registry.get_handler(arm.handler_name()).cloned();
160                        let name_owned = arm.handler_name().to_string();
161                        let static_args = arm.args().cloned();
162                        let cell = Arc::clone(cell);
163                        let stage_fn: Box<dyn FnOnce(Value) -> BoxFut<Value> + Send> =
164                            Box::new(move |v: Value| {
165                                Box::pin(async move {
166                                    let h = handler.ok_or_else(|| {
167                                        CruxErr::step_failed(&name_owned, "handler not found")
168                                    })?;
169                                    let input = merge_args(v, static_args);
170                                    let out = h(input).await?;
171                                    *cell.lock().unwrap() = out.confidence;
172                                    Ok(out.value)
173                                }) as BoxFut<Value>
174                            });
175                        (arm.label(), stage_fn)
176                    })
177                    .collect();
178
179                let input = current_input.clone();
180                let result = ctx.pipe(&node.pipe, input, stages).await?;
181
182                // Use the last stage's confidence (pipeline is sequential).
183                // Empty stages vec → last() returns None → confidence is None (correct for degenerate case).
184                let confidence = confidence_cells.last().and_then(|c| *c.lock().unwrap());
185
186                expr_ctx.steps.insert(
187                    node.pipe.clone(),
188                    StepResult {
189                        output: result.clone(),
190                        confidence,
191                    },
192                );
193                Ok(result)
194            }
195
196            StepDef::JoinAll(node) => {
197                let confidence_cells: Vec<Arc<Mutex<Option<f32>>>> = node
198                    .arms
199                    .iter()
200                    .map(|_| Arc::new(Mutex::new(None)))
201                    .collect();
202
203                let arms: Vec<(&str, BoxFut<Value>)> = node
204                    .arms
205                    .iter()
206                    .zip(confidence_cells.iter())
207                    .map(|(arm, cell)| {
208                        let handler = self.registry.get_handler(arm.handler_name()).cloned();
209                        let input = merge_args(current_input.clone(), arm.args().cloned());
210                        let name_owned = arm.handler_name().to_string();
211                        let cell = Arc::clone(cell);
212                        let fut: BoxFut<Value> = Box::pin(async move {
213                            let h = handler.ok_or_else(|| {
214                                CruxErr::step_failed(&name_owned, "handler not found")
215                            })?;
216                            let out = h(input).await?;
217                            *cell.lock().unwrap() = out.confidence;
218                            Ok(out.value)
219                        });
220                        (arm.label(), fut)
221                    })
222                    .collect();
223
224                let results = ctx.join_all(&node.join_all, arms).await?;
225                let output = Value::Array(results);
226
227                // Average confidence across arms that provided a score; None if none did.
228                let scored: Vec<f32> = confidence_cells
229                    .iter()
230                    .filter_map(|c| *c.lock().unwrap())
231                    .collect();
232                let confidence = if scored.is_empty() {
233                    None
234                } else {
235                    Some(scored.iter().sum::<f32>() / scored.len() as f32)
236                };
237
238                expr_ctx.steps.insert(
239                    node.join_all.clone(),
240                    StepResult {
241                        output: output.clone(),
242                        confidence,
243                    },
244                );
245                Ok(output)
246            }
247
248            StepDef::RouteOnConfidence(node) => {
249                let confidence = expr_ctx
250                    .eval_f32(&node.value)
251                    .map_err(|e| CruxErr::step_failed(&node.route_on_confidence, e.to_string()))?;
252
253                // One cell per route; only the matching branch's handler will write to it.
254                let confidence_cells: Vec<Arc<Mutex<Option<f32>>>> = node
255                    .routes
256                    .iter()
257                    .map(|_| Arc::new(Mutex::new(None)))
258                    .collect();
259
260                let routes: Vec<ConfidenceRoute<'_, Value>> = node
261                    .routes
262                    .iter()
263                    .zip(confidence_cells.iter())
264                    .map(|(branch, cell)| {
265                        let range = parse_range(&branch.range);
266                        let handler = self.registry.get_handler(&branch.handler).cloned();
267                        let input = merge_args(current_input.clone(), branch.args.clone());
268                        let handler_name = branch.handler.clone();
269                        let cell = Arc::clone(cell);
270                        let fut: BoxFut<Value> = Box::pin(async move {
271                            let h = handler.ok_or_else(|| {
272                                CruxErr::step_failed(&handler_name, "handler not found")
273                            })?;
274                            let out = h(input).await?;
275                            *cell.lock().unwrap() = out.confidence;
276                            Ok(out.value)
277                        });
278                        (range, branch.label.as_str(), fut)
279                    })
280                    .collect();
281
282                let result = ctx
283                    .route_on_confidence(&node.route_on_confidence, confidence, routes)
284                    .await?;
285
286                // Use the matched branch's handler confidence; fall back to the routing score.
287                let handler_confidence = confidence_cells
288                    .iter()
289                    .find_map(|c| *c.lock().unwrap())
290                    .map(Some)
291                    .unwrap_or(Some(confidence));
292
293                expr_ctx.steps.insert(
294                    node.route_on_confidence.clone(),
295                    StepResult {
296                        output: result.clone(),
297                        confidence: handler_confidence,
298                    },
299                );
300                Ok(result)
301            }
302
303            StepDef::Speculate(node) => {
304                let arms: Vec<(&str, BoxFut<Value>)> = node
305                    .arms
306                    .iter()
307                    .map(|arm| {
308                        let handler = self.registry.get_handler(arm.handler_name()).cloned();
309                        let input = merge_args(current_input.clone(), arm.args().cloned());
310                        let name_owned = arm.handler_name().to_string();
311                        let fut: BoxFut<Value> = Box::pin(async move {
312                            let h = handler.ok_or_else(|| {
313                                CruxErr::step_failed(&name_owned, "handler not found")
314                            })?;
315                            h(input).await.map(|o| o.value)
316                        });
317                        (arm.label(), fut)
318                    })
319                    .collect();
320
321                let builder = ctx.speculate(&node.speculate, arms);
322                let result = match node.mode {
323                    SpeculateMode::PickBest => {
324                        builder
325                            .pick_best_by(|v: &Value| {
326                                v.get("score").and_then(|s| s.as_f64()).unwrap_or(0.0) as f32
327                            })
328                            .await?
329                    }
330                    SpeculateMode::FirstOk => builder.first_ok().await?,
331                };
332
333                expr_ctx.steps.insert(
334                    node.speculate.clone(),
335                    StepResult {
336                        output: result.clone(),
337                        confidence: None,
338                    },
339                );
340                Ok(result)
341            }
342        }
343    }
344}
345
346/// Recursively expand `{{ expr }}` templates in all string leaves of a JSON value.
347///
348/// Non-string leaves (numbers, booleans, null, arrays, objects) are traversed but not
349/// substituted. Strings that are not `{{ ... }}` templates are returned unchanged.
350/// Expansion errors (unknown step, unknown path) are silently ignored — the original
351/// string is preserved. This keeps static pipelines working without any ExprContext setup.
352fn expand_args(value: Value, ctx: &ExprContext) -> Value {
353    match value {
354        Value::String(s) => match ctx.eval(&s) {
355            Ok(expanded) => expanded,
356            Err(ExprError::Syntax(_) | ExprError::UnknownStep(_) | ExprError::UnknownPath(_)) => {
357                Value::String(s)
358            }
359            Err(_) => Value::String(s),
360        },
361        Value::Array(arr) => Value::Array(arr.into_iter().map(|v| expand_args(v, ctx)).collect()),
362        Value::Object(map) => Value::Object(
363            map.into_iter()
364                .map(|(k, v)| (k, expand_args(v, ctx)))
365                .collect(),
366        ),
367        other => other,
368    }
369}
370
371/// Merge static step args into handler input under the "args" key.
372fn merge_args(mut input: Value, args: Option<Value>) -> Value {
373    if let Some(a) = args {
374        if let Value::Object(ref mut map) = input {
375            map.insert("args".to_string(), a);
376        } else {
377            input = serde_json::json!({ "args": a, "input": input });
378        }
379    }
380    input
381}
382
383fn budget_from_def(def: &BudgetDef) -> Budget {
384    match def {
385        BudgetDef::Tokens { tokens } => Budget::tokens(*tokens),
386        BudgetDef::Calls { calls } => Budget::calls(*calls),
387        BudgetDef::Duration { duration_ms } => {
388            Budget::duration(std::time::Duration::from_millis(*duration_ms))
389        }
390        BudgetDef::CostCents { cost_cents } => Budget::cost_cents(*cost_cents),
391    }
392}
393
394/// Parse a range string like `[0.0, 0.5)` or `[0.8, 1.0]`.
395fn parse_range(s: &str) -> ConfidenceRange {
396    let s = s.trim();
397    let inclusive_end = s.ends_with(']');
398    let inner = &s[1..s.len() - 1];
399    let parts: Vec<&str> = inner.split(',').collect();
400    let lo: f32 = parts[0].trim().parse().expect("invalid range lower bound");
401    let hi: f32 = parts[1].trim().parse().expect("invalid range upper bound");
402    if inclusive_end {
403        ConfidenceRange::inclusive(lo, hi)
404    } else {
405        ConfidenceRange::exclusive(lo, hi)
406    }
407}