Skip to main content

cruxx_script/
runner.rs

1/// Pipeline runner — interprets a parsed YAML pipeline against CruxCtx + HandlerRegistry.
2use std::sync::Arc;
3
4use cruxx_core::prelude::*;
5use serde_json::Value;
6
7use crate::expr::{ExprContext, StepResult};
8use crate::registry::HandlerRegistry;
9use crate::schema::{BudgetDef, PipelineDef, SpeculateMode, StepDef};
10
11/// Executes parsed pipelines against a handler registry.
12pub struct Runner {
13    registry: Arc<HandlerRegistry>,
14}
15
16impl Runner {
17    pub fn new(registry: Arc<HandlerRegistry>) -> Self {
18        Self { registry }
19    }
20
21    /// Run a pipeline definition with the given input, producing a full Crux trace.
22    pub async fn run(&self, pipeline: &PipelineDef, input: Value) -> Crux<Value> {
23        let mut ctx = CruxCtx::new(&pipeline.pipeline);
24
25        if let Some(budget_def) = &pipeline.budget {
26            ctx.set_budget(budget_from_def(budget_def));
27        }
28
29        let result = self.execute_steps(&mut ctx, &pipeline.steps, input).await;
30        ctx.finalize(result)
31    }
32
33    async fn execute_steps(
34        &self,
35        ctx: &mut CruxCtx,
36        steps: &[StepDef],
37        input: Value,
38    ) -> Result<Value, CruxErr> {
39        let mut expr_ctx = ExprContext::new(input.clone());
40        let mut last_output = input;
41
42        for step_def in steps {
43            last_output = self
44                .execute_step(ctx, step_def, &last_output, &mut expr_ctx)
45                .await?;
46        }
47
48        Ok(last_output)
49    }
50
51    async fn execute_step(
52        &self,
53        ctx: &mut CruxCtx,
54        step_def: &StepDef,
55        current_input: &Value,
56        expr_ctx: &mut ExprContext,
57    ) -> Result<Value, CruxErr> {
58        match step_def {
59            StepDef::Step(node) => {
60                let handler_name = node.handler.as_deref().unwrap_or(&node.step);
61                let handler = self
62                    .registry
63                    .get_handler(handler_name)
64                    .ok_or_else(|| {
65                        CruxErr::step_failed(
66                            &node.step,
67                            format!("handler not found: {handler_name}"),
68                        )
69                    })?
70                    .clone();
71                // Merge static step args into the current input under the "args" key.
72                let input = if let Some(step_args) = &node.args {
73                    let mut merged = current_input.clone();
74                    if let Value::Object(ref mut map) = merged {
75                        map.insert("args".to_string(), step_args.clone());
76                    } else {
77                        merged = serde_json::json!({ "args": step_args, "input": current_input });
78                    }
79                    merged
80                } else {
81                    current_input.clone()
82                };
83                let result = ctx
84                    .step(&node.step, || {
85                        let h = handler.clone();
86                        let i = input.clone();
87                        async move { h(i).await }
88                    })
89                    .await?;
90                expr_ctx.steps.insert(
91                    node.step.clone(),
92                    StepResult {
93                        output: result.clone(),
94                        confidence: 1.0,
95                    },
96                );
97                Ok(result)
98            }
99
100            StepDef::Delegate(node) => {
101                let step_name = node.name.as_deref().unwrap_or(&node.delegate);
102                let agent_runner = self
103                    .registry
104                    .get_agent(&node.delegate)
105                    .ok_or_else(|| {
106                        CruxErr::step_failed(
107                            step_name,
108                            format!("agent not found: {}", node.delegate),
109                        )
110                    })?
111                    .clone();
112
113                let input = current_input.clone();
114                let result = agent_runner(input).await;
115
116                // Record the delegation step in parent
117                let output = ctx.step(step_name, || async { result }).await?;
118
119                expr_ctx.steps.insert(
120                    step_name.to_string(),
121                    StepResult {
122                        output: output.clone(),
123                        confidence: 1.0,
124                    },
125                );
126                Ok(output)
127            }
128
129            StepDef::Pipe(node) => {
130                let registry = self.registry.clone();
131                #[allow(clippy::type_complexity)]
132                let stages: Vec<(
133                    &str,
134                    Box<dyn FnOnce(Value) -> BoxFut<Value> + Send>,
135                )> = node
136                    .stages
137                    .iter()
138                    .map(|arm| {
139                        let handler = registry.get_handler(arm.handler_name()).cloned();
140                        let name_owned = arm.handler_name().to_string();
141                        let static_args = arm.args().cloned();
142                        let stage_fn: Box<dyn FnOnce(Value) -> BoxFut<Value> + Send> =
143                            Box::new(move |v: Value| {
144                                Box::pin(async move {
145                                    let h = handler.ok_or_else(|| {
146                                        CruxErr::step_failed(&name_owned, "handler not found")
147                                    })?;
148                                    let input = merge_args(v, static_args);
149                                    h(input).await
150                                }) as BoxFut<Value>
151                            });
152                        (arm.label(), stage_fn)
153                    })
154                    .collect();
155
156                let input = current_input.clone();
157                let result = ctx.pipe(&node.pipe, input, stages).await?;
158
159                expr_ctx.steps.insert(
160                    node.pipe.clone(),
161                    StepResult {
162                        output: result.clone(),
163                        confidence: 1.0,
164                    },
165                );
166                Ok(result)
167            }
168
169            StepDef::JoinAll(node) => {
170                let arms: Vec<(&str, BoxFut<Value>)> = node
171                    .arms
172                    .iter()
173                    .map(|arm| {
174                        let handler = self.registry.get_handler(arm.handler_name()).cloned();
175                        let input = merge_args(current_input.clone(), arm.args().cloned());
176                        let name_owned = arm.handler_name().to_string();
177                        let fut: BoxFut<Value> = Box::pin(async move {
178                            let h = handler.ok_or_else(|| {
179                                CruxErr::step_failed(&name_owned, "handler not found")
180                            })?;
181                            h(input).await
182                        });
183                        (arm.label(), fut)
184                    })
185                    .collect();
186
187                let results = ctx.join_all(&node.join_all, arms).await?;
188                let output = Value::Array(results);
189
190                expr_ctx.steps.insert(
191                    node.join_all.clone(),
192                    StepResult {
193                        output: output.clone(),
194                        confidence: 1.0,
195                    },
196                );
197                Ok(output)
198            }
199
200            StepDef::RouteOnConfidence(node) => {
201                let confidence = expr_ctx
202                    .eval_f32(&node.value)
203                    .map_err(|e| CruxErr::step_failed(&node.route_on_confidence, e.to_string()))?;
204
205                let routes: Vec<ConfidenceRoute<'_, Value>> = node
206                    .routes
207                    .iter()
208                    .map(|branch| {
209                        let range = parse_range(&branch.range);
210                        let handler = self.registry.get_handler(&branch.handler).cloned();
211                        let input = merge_args(current_input.clone(), branch.args.clone());
212                        let handler_name = branch.handler.clone();
213                        let fut: BoxFut<Value> = Box::pin(async move {
214                            let h = handler.ok_or_else(|| {
215                                CruxErr::step_failed(&handler_name, "handler not found")
216                            })?;
217                            h(input).await
218                        });
219                        (range, branch.label.as_str(), fut)
220                    })
221                    .collect();
222
223                let result = ctx
224                    .route_on_confidence(&node.route_on_confidence, confidence, routes)
225                    .await?;
226
227                expr_ctx.steps.insert(
228                    node.route_on_confidence.clone(),
229                    StepResult {
230                        output: result.clone(),
231                        confidence,
232                    },
233                );
234                Ok(result)
235            }
236
237            StepDef::Speculate(node) => {
238                let arms: Vec<(&str, BoxFut<Value>)> = node
239                    .arms
240                    .iter()
241                    .map(|arm| {
242                        let handler = self.registry.get_handler(arm.handler_name()).cloned();
243                        let input = merge_args(current_input.clone(), arm.args().cloned());
244                        let name_owned = arm.handler_name().to_string();
245                        let fut: BoxFut<Value> = Box::pin(async move {
246                            let h = handler.ok_or_else(|| {
247                                CruxErr::step_failed(&name_owned, "handler not found")
248                            })?;
249                            h(input).await
250                        });
251                        (arm.label(), fut)
252                    })
253                    .collect();
254
255                let builder = ctx.speculate(&node.speculate, arms);
256                let result = match node.mode {
257                    SpeculateMode::PickBest => {
258                        builder
259                            .pick_best_by(|v: &Value| {
260                                v.get("score").and_then(|s| s.as_f64()).unwrap_or(0.0) as f32
261                            })
262                            .await?
263                    }
264                    SpeculateMode::FirstOk => builder.first_ok().await?,
265                };
266
267                expr_ctx.steps.insert(
268                    node.speculate.clone(),
269                    StepResult {
270                        output: result.clone(),
271                        confidence: 1.0,
272                    },
273                );
274                Ok(result)
275            }
276        }
277    }
278}
279
280/// Merge static step args into handler input under the "args" key.
281fn merge_args(mut input: Value, args: Option<Value>) -> Value {
282    if let Some(a) = args {
283        if let Value::Object(ref mut map) = input {
284            map.insert("args".to_string(), a);
285        } else {
286            input = serde_json::json!({ "args": a, "input": input });
287        }
288    }
289    input
290}
291
292fn budget_from_def(def: &BudgetDef) -> Budget {
293    match def {
294        BudgetDef::Tokens { tokens } => Budget::tokens(*tokens),
295        BudgetDef::Calls { calls } => Budget::calls(*calls),
296        BudgetDef::Duration { duration_ms } => {
297            Budget::duration(std::time::Duration::from_millis(*duration_ms))
298        }
299        BudgetDef::CostCents { cost_cents } => Budget::cost_cents(*cost_cents),
300    }
301}
302
303/// Parse a range string like `[0.0, 0.5)` or `[0.8, 1.0]`.
304fn parse_range(s: &str) -> ConfidenceRange {
305    let s = s.trim();
306    let inclusive_end = s.ends_with(']');
307    let inner = &s[1..s.len() - 1];
308    let parts: Vec<&str> = inner.split(',').collect();
309    let lo: f32 = parts[0].trim().parse().expect("invalid range lower bound");
310    let hi: f32 = parts[1].trim().parse().expect("invalid range upper bound");
311    if inclusive_end {
312        ConfidenceRange::inclusive(lo, hi)
313    } else {
314        ConfidenceRange::exclusive(lo, hi)
315    }
316}