1use std::sync::Arc;
3
4use cruxx_core::prelude::*;
5use serde_json::Value;
6
7use crate::expr::{ExprContext, StepResult};
8use crate::registry::HandlerRegistry;
9use crate::schema::{BudgetDef, PipelineDef, SpeculateMode, StepDef};
10
11pub struct Runner {
13 registry: Arc<HandlerRegistry>,
14}
15
16impl Runner {
17 pub fn new(registry: Arc<HandlerRegistry>) -> Self {
18 Self { registry }
19 }
20
21 pub async fn run(&self, pipeline: &PipelineDef, input: Value) -> Crux<Value> {
23 let mut ctx = CruxCtx::new(&pipeline.pipeline);
24
25 if let Some(budget_def) = &pipeline.budget {
26 ctx.set_budget(budget_from_def(budget_def));
27 }
28
29 let result = self.execute_steps(&mut ctx, &pipeline.steps, input).await;
30 ctx.finalize(result)
31 }
32
33 async fn execute_steps(
34 &self,
35 ctx: &mut CruxCtx,
36 steps: &[StepDef],
37 input: Value,
38 ) -> Result<Value, CruxErr> {
39 let mut expr_ctx = ExprContext::new(input.clone());
40 let mut last_output = input;
41
42 for step_def in steps {
43 last_output = self
44 .execute_step(ctx, step_def, &last_output, &mut expr_ctx)
45 .await?;
46 }
47
48 Ok(last_output)
49 }
50
51 async fn execute_step(
52 &self,
53 ctx: &mut CruxCtx,
54 step_def: &StepDef,
55 current_input: &Value,
56 expr_ctx: &mut ExprContext,
57 ) -> Result<Value, CruxErr> {
58 match step_def {
59 StepDef::Step(node) => {
60 let handler_name = node.handler.as_deref().unwrap_or(&node.step);
61 let handler = self
62 .registry
63 .get_handler(handler_name)
64 .ok_or_else(|| {
65 CruxErr::step_failed(
66 &node.step,
67 format!("handler not found: {handler_name}"),
68 )
69 })?
70 .clone();
71 let input = if let Some(step_args) = &node.args {
73 let mut merged = current_input.clone();
74 if let Value::Object(ref mut map) = merged {
75 map.insert("args".to_string(), step_args.clone());
76 } else {
77 merged = serde_json::json!({ "args": step_args, "input": current_input });
78 }
79 merged
80 } else {
81 current_input.clone()
82 };
83 let result = ctx
84 .step(&node.step, || {
85 let h = handler.clone();
86 let i = input.clone();
87 async move { h(i).await }
88 })
89 .await?;
90 expr_ctx.steps.insert(
91 node.step.clone(),
92 StepResult {
93 output: result.clone(),
94 confidence: 1.0,
95 },
96 );
97 Ok(result)
98 }
99
100 StepDef::Delegate(node) => {
101 let step_name = node.name.as_deref().unwrap_or(&node.delegate);
102 let agent_runner = self
103 .registry
104 .get_agent(&node.delegate)
105 .ok_or_else(|| {
106 CruxErr::step_failed(
107 step_name,
108 format!("agent not found: {}", node.delegate),
109 )
110 })?
111 .clone();
112
113 let input = current_input.clone();
114 let result = agent_runner(input).await;
115
116 let output = ctx.step(step_name, || async { result }).await?;
118
119 expr_ctx.steps.insert(
120 step_name.to_string(),
121 StepResult {
122 output: output.clone(),
123 confidence: 1.0,
124 },
125 );
126 Ok(output)
127 }
128
129 StepDef::Pipe(node) => {
130 let registry = self.registry.clone();
131 #[allow(clippy::type_complexity)]
132 let stages: Vec<(
133 &str,
134 Box<dyn FnOnce(Value) -> BoxFut<Value> + Send>,
135 )> = node
136 .stages
137 .iter()
138 .map(|arm| {
139 let handler = registry.get_handler(arm.handler_name()).cloned();
140 let name_owned = arm.handler_name().to_string();
141 let static_args = arm.args().cloned();
142 let stage_fn: Box<dyn FnOnce(Value) -> BoxFut<Value> + Send> =
143 Box::new(move |v: Value| {
144 Box::pin(async move {
145 let h = handler.ok_or_else(|| {
146 CruxErr::step_failed(&name_owned, "handler not found")
147 })?;
148 let input = merge_args(v, static_args);
149 h(input).await
150 }) as BoxFut<Value>
151 });
152 (arm.label(), stage_fn)
153 })
154 .collect();
155
156 let input = current_input.clone();
157 let result = ctx.pipe(&node.pipe, input, stages).await?;
158
159 expr_ctx.steps.insert(
160 node.pipe.clone(),
161 StepResult {
162 output: result.clone(),
163 confidence: 1.0,
164 },
165 );
166 Ok(result)
167 }
168
169 StepDef::JoinAll(node) => {
170 let arms: Vec<(&str, BoxFut<Value>)> = node
171 .arms
172 .iter()
173 .map(|arm| {
174 let handler = self.registry.get_handler(arm.handler_name()).cloned();
175 let input = merge_args(current_input.clone(), arm.args().cloned());
176 let name_owned = arm.handler_name().to_string();
177 let fut: BoxFut<Value> = Box::pin(async move {
178 let h = handler.ok_or_else(|| {
179 CruxErr::step_failed(&name_owned, "handler not found")
180 })?;
181 h(input).await
182 });
183 (arm.label(), fut)
184 })
185 .collect();
186
187 let results = ctx.join_all(&node.join_all, arms).await?;
188 let output = Value::Array(results);
189
190 expr_ctx.steps.insert(
191 node.join_all.clone(),
192 StepResult {
193 output: output.clone(),
194 confidence: 1.0,
195 },
196 );
197 Ok(output)
198 }
199
200 StepDef::RouteOnConfidence(node) => {
201 let confidence = expr_ctx
202 .eval_f32(&node.value)
203 .map_err(|e| CruxErr::step_failed(&node.route_on_confidence, e.to_string()))?;
204
205 let routes: Vec<ConfidenceRoute<'_, Value>> = node
206 .routes
207 .iter()
208 .map(|branch| {
209 let range = parse_range(&branch.range);
210 let handler = self.registry.get_handler(&branch.handler).cloned();
211 let input = merge_args(current_input.clone(), branch.args.clone());
212 let handler_name = branch.handler.clone();
213 let fut: BoxFut<Value> = Box::pin(async move {
214 let h = handler.ok_or_else(|| {
215 CruxErr::step_failed(&handler_name, "handler not found")
216 })?;
217 h(input).await
218 });
219 (range, branch.label.as_str(), fut)
220 })
221 .collect();
222
223 let result = ctx
224 .route_on_confidence(&node.route_on_confidence, confidence, routes)
225 .await?;
226
227 expr_ctx.steps.insert(
228 node.route_on_confidence.clone(),
229 StepResult {
230 output: result.clone(),
231 confidence,
232 },
233 );
234 Ok(result)
235 }
236
237 StepDef::Speculate(node) => {
238 let arms: Vec<(&str, BoxFut<Value>)> = node
239 .arms
240 .iter()
241 .map(|arm| {
242 let handler = self.registry.get_handler(arm.handler_name()).cloned();
243 let input = merge_args(current_input.clone(), arm.args().cloned());
244 let name_owned = arm.handler_name().to_string();
245 let fut: BoxFut<Value> = Box::pin(async move {
246 let h = handler.ok_or_else(|| {
247 CruxErr::step_failed(&name_owned, "handler not found")
248 })?;
249 h(input).await
250 });
251 (arm.label(), fut)
252 })
253 .collect();
254
255 let builder = ctx.speculate(&node.speculate, arms);
256 let result = match node.mode {
257 SpeculateMode::PickBest => {
258 builder
259 .pick_best_by(|v: &Value| {
260 v.get("score").and_then(|s| s.as_f64()).unwrap_or(0.0) as f32
261 })
262 .await?
263 }
264 SpeculateMode::FirstOk => builder.first_ok().await?,
265 };
266
267 expr_ctx.steps.insert(
268 node.speculate.clone(),
269 StepResult {
270 output: result.clone(),
271 confidence: 1.0,
272 },
273 );
274 Ok(result)
275 }
276 }
277 }
278}
279
280fn merge_args(mut input: Value, args: Option<Value>) -> Value {
282 if let Some(a) = args {
283 if let Value::Object(ref mut map) = input {
284 map.insert("args".to_string(), a);
285 } else {
286 input = serde_json::json!({ "args": a, "input": input });
287 }
288 }
289 input
290}
291
292fn budget_from_def(def: &BudgetDef) -> Budget {
293 match def {
294 BudgetDef::Tokens { tokens } => Budget::tokens(*tokens),
295 BudgetDef::Calls { calls } => Budget::calls(*calls),
296 BudgetDef::Duration { duration_ms } => {
297 Budget::duration(std::time::Duration::from_millis(*duration_ms))
298 }
299 BudgetDef::CostCents { cost_cents } => Budget::cost_cents(*cost_cents),
300 }
301}
302
303fn parse_range(s: &str) -> ConfidenceRange {
305 let s = s.trim();
306 let inclusive_end = s.ends_with(']');
307 let inner = &s[1..s.len() - 1];
308 let parts: Vec<&str> = inner.split(',').collect();
309 let lo: f32 = parts[0].trim().parse().expect("invalid range lower bound");
310 let hi: f32 = parts[1].trim().parse().expect("invalid range upper bound");
311 if inclusive_end {
312 ConfidenceRange::inclusive(lo, hi)
313 } else {
314 ConfidenceRange::exclusive(lo, hi)
315 }
316}