1use std::sync::{Arc, Mutex};
3
4use cruxx_core::prelude::*;
5use serde_json::Value;
6
7use crate::expr::{ExprContext, ExprError, StepResult};
8use crate::registry::HandlerRegistry;
9use crate::schema::{BudgetDef, PipelineDef, SpeculateMode, StepDef};
10
11pub struct Runner {
13 registry: Arc<HandlerRegistry>,
14}
15
16impl Runner {
17 pub fn new(registry: Arc<HandlerRegistry>) -> Self {
18 Self { registry }
19 }
20
21 pub async fn run(&self, pipeline: &PipelineDef, input: Value) -> Crux<Value> {
23 let mut ctx = CruxCtx::new(&pipeline.pipeline);
24
25 if let Some(budget_def) = &pipeline.budget {
26 ctx.set_budget(budget_from_def(budget_def));
27 }
28
29 let result = self.execute_steps(&mut ctx, &pipeline.steps, input).await;
30 ctx.finalize(result)
31 }
32
33 async fn execute_steps(
34 &self,
35 ctx: &mut CruxCtx,
36 steps: &[StepDef],
37 input: Value,
38 ) -> Result<Value, CruxErr> {
39 let mut expr_ctx = ExprContext::new(input.clone());
40 let mut last_output = input;
41
42 for step_def in steps {
43 last_output = self
44 .execute_step(ctx, step_def, &last_output, &mut expr_ctx)
45 .await?;
46 }
47
48 Ok(last_output)
49 }
50
51 async fn execute_step(
52 &self,
53 ctx: &mut CruxCtx,
54 step_def: &StepDef,
55 current_input: &Value,
56 expr_ctx: &mut ExprContext,
57 ) -> Result<Value, CruxErr> {
58 match step_def {
59 StepDef::Step(node) => {
60 let handler_name = node.handler.as_deref().unwrap_or(&node.step);
61 let handler = self
62 .registry
63 .get_handler(handler_name)
64 .ok_or_else(|| {
65 CruxErr::step_failed(
66 &node.step,
67 format!("handler not found: {handler_name}"),
68 )
69 })?
70 .clone();
71 let input = if let Some(step_args) = &node.args {
75 let expanded = expand_args(step_args.clone(), expr_ctx);
76 let mut merged = current_input.clone();
77 if let Value::Object(ref mut map) = merged {
78 map.insert("args".to_string(), expanded);
79 } else {
80 merged = serde_json::json!({ "args": expanded, "input": current_input });
81 }
82 merged
83 } else {
84 current_input.clone()
85 };
86 let raw = handler(input.clone()).await?;
90 let confidence = raw.confidence;
91 let value = raw.value;
92
93 let handler_out = ctx
95 .step(&node.step, || {
96 let v = value.clone();
97 async move { Ok::<Value, CruxErr>(v) }
98 })
99 .await?;
100
101 expr_ctx.steps.insert(
102 node.step.clone(),
103 StepResult {
104 output: handler_out.clone(),
105 confidence,
106 },
107 );
108 Ok(handler_out)
109 }
110
111 StepDef::Delegate(node) => {
112 let step_name = node.name.as_deref().unwrap_or(&node.delegate);
113 let agent_runner = self
114 .registry
115 .get_agent(&node.delegate)
116 .ok_or_else(|| {
117 CruxErr::step_failed(
118 step_name,
119 format!("agent not found: {}", node.delegate),
120 )
121 })?
122 .clone();
123
124 let input = current_input.clone();
125 let result = agent_runner(input).await;
126
127 let output = ctx.step(step_name, || async { result }).await?;
129
130 expr_ctx.steps.insert(
131 step_name.to_string(),
132 StepResult {
133 output: output.clone(),
134 confidence: None,
135 },
136 );
137 Ok(output)
138 }
139
140 StepDef::Pipe(node) => {
141 let registry = self.registry.clone();
142
143 let confidence_cells: Vec<Arc<Mutex<Option<f32>>>> = node
145 .stages
146 .iter()
147 .map(|_| Arc::new(Mutex::new(None)))
148 .collect();
149
150 #[allow(clippy::type_complexity)]
151 let stages: Vec<(
152 &str,
153 Box<dyn FnOnce(Value) -> BoxFut<Value> + Send>,
154 )> = node
155 .stages
156 .iter()
157 .zip(confidence_cells.iter())
158 .map(|(arm, cell)| {
159 let handler = registry.get_handler(arm.handler_name()).cloned();
160 let name_owned = arm.handler_name().to_string();
161 let static_args = arm.args().cloned();
162 let cell = Arc::clone(cell);
163 let stage_fn: Box<dyn FnOnce(Value) -> BoxFut<Value> + Send> =
164 Box::new(move |v: Value| {
165 Box::pin(async move {
166 let h = handler.ok_or_else(|| {
167 CruxErr::step_failed(&name_owned, "handler not found")
168 })?;
169 let input = merge_args(v, static_args);
170 let out = h(input).await?;
171 *cell.lock().unwrap() = out.confidence;
172 Ok(out.value)
173 }) as BoxFut<Value>
174 });
175 (arm.label(), stage_fn)
176 })
177 .collect();
178
179 let input = current_input.clone();
180 let result = ctx.pipe(&node.pipe, input, stages).await?;
181
182 let confidence = confidence_cells.last().and_then(|c| *c.lock().unwrap());
185
186 expr_ctx.steps.insert(
187 node.pipe.clone(),
188 StepResult {
189 output: result.clone(),
190 confidence,
191 },
192 );
193 Ok(result)
194 }
195
196 StepDef::JoinAll(node) => {
197 let confidence_cells: Vec<Arc<Mutex<Option<f32>>>> = node
198 .arms
199 .iter()
200 .map(|_| Arc::new(Mutex::new(None)))
201 .collect();
202
203 let arms: Vec<(&str, BoxFut<Value>)> = node
204 .arms
205 .iter()
206 .zip(confidence_cells.iter())
207 .map(|(arm, cell)| {
208 let handler = self.registry.get_handler(arm.handler_name()).cloned();
209 let input = merge_args(current_input.clone(), arm.args().cloned());
210 let name_owned = arm.handler_name().to_string();
211 let cell = Arc::clone(cell);
212 let fut: BoxFut<Value> = Box::pin(async move {
213 let h = handler.ok_or_else(|| {
214 CruxErr::step_failed(&name_owned, "handler not found")
215 })?;
216 let out = h(input).await?;
217 *cell.lock().unwrap() = out.confidence;
218 Ok(out.value)
219 });
220 (arm.label(), fut)
221 })
222 .collect();
223
224 let results = ctx.join_all(&node.join_all, arms).await?;
225 let output = Value::Array(results);
226
227 let scored: Vec<f32> = confidence_cells
229 .iter()
230 .filter_map(|c| *c.lock().unwrap())
231 .collect();
232 let confidence = if scored.is_empty() {
233 None
234 } else {
235 Some(scored.iter().sum::<f32>() / scored.len() as f32)
236 };
237
238 expr_ctx.steps.insert(
239 node.join_all.clone(),
240 StepResult {
241 output: output.clone(),
242 confidence,
243 },
244 );
245 Ok(output)
246 }
247
248 StepDef::RouteOnConfidence(node) => {
249 let confidence = expr_ctx
250 .eval_f32(&node.value)
251 .map_err(|e| CruxErr::step_failed(&node.route_on_confidence, e.to_string()))?;
252
253 let confidence_cells: Vec<Arc<Mutex<Option<f32>>>> = node
255 .routes
256 .iter()
257 .map(|_| Arc::new(Mutex::new(None)))
258 .collect();
259
260 let routes: Vec<ConfidenceRoute<'_, Value>> = node
261 .routes
262 .iter()
263 .zip(confidence_cells.iter())
264 .map(|(branch, cell)| {
265 let range = parse_range(&branch.range);
266 let handler = self.registry.get_handler(&branch.handler).cloned();
267 let input = merge_args(current_input.clone(), branch.args.clone());
268 let handler_name = branch.handler.clone();
269 let cell = Arc::clone(cell);
270 let fut: BoxFut<Value> = Box::pin(async move {
271 let h = handler.ok_or_else(|| {
272 CruxErr::step_failed(&handler_name, "handler not found")
273 })?;
274 let out = h(input).await?;
275 *cell.lock().unwrap() = out.confidence;
276 Ok(out.value)
277 });
278 (range, branch.label.as_str(), fut)
279 })
280 .collect();
281
282 let result = ctx
283 .route_on_confidence(&node.route_on_confidence, confidence, routes)
284 .await?;
285
286 let handler_confidence = confidence_cells
288 .iter()
289 .find_map(|c| *c.lock().unwrap())
290 .map(Some)
291 .unwrap_or(Some(confidence));
292
293 expr_ctx.steps.insert(
294 node.route_on_confidence.clone(),
295 StepResult {
296 output: result.clone(),
297 confidence: handler_confidence,
298 },
299 );
300 Ok(result)
301 }
302
303 StepDef::Speculate(node) => {
304 let arms: Vec<(&str, BoxFut<Value>)> = node
305 .arms
306 .iter()
307 .map(|arm| {
308 let handler = self.registry.get_handler(arm.handler_name()).cloned();
309 let input = merge_args(current_input.clone(), arm.args().cloned());
310 let name_owned = arm.handler_name().to_string();
311 let fut: BoxFut<Value> = Box::pin(async move {
312 let h = handler.ok_or_else(|| {
313 CruxErr::step_failed(&name_owned, "handler not found")
314 })?;
315 h(input).await.map(|o| o.value)
316 });
317 (arm.label(), fut)
318 })
319 .collect();
320
321 let builder = ctx.speculate(&node.speculate, arms);
322 let result = match node.mode {
323 SpeculateMode::PickBest => {
324 builder
325 .pick_best_by(|v: &Value| {
326 v.get("score").and_then(|s| s.as_f64()).unwrap_or(0.0) as f32
327 })
328 .await?
329 }
330 SpeculateMode::FirstOk => builder.first_ok().await?,
331 };
332
333 expr_ctx.steps.insert(
334 node.speculate.clone(),
335 StepResult {
336 output: result.clone(),
337 confidence: None,
338 },
339 );
340 Ok(result)
341 }
342 }
343 }
344}
345
346fn expand_args(value: Value, ctx: &ExprContext) -> Value {
353 match value {
354 Value::String(s) => match ctx.eval(&s) {
355 Ok(expanded) => expanded,
356 Err(ExprError::Syntax(_) | ExprError::UnknownStep(_) | ExprError::UnknownPath(_)) => {
357 Value::String(s)
358 }
359 Err(_) => Value::String(s),
360 },
361 Value::Array(arr) => Value::Array(arr.into_iter().map(|v| expand_args(v, ctx)).collect()),
362 Value::Object(map) => Value::Object(
363 map.into_iter()
364 .map(|(k, v)| (k, expand_args(v, ctx)))
365 .collect(),
366 ),
367 other => other,
368 }
369}
370
371fn merge_args(mut input: Value, args: Option<Value>) -> Value {
373 if let Some(a) = args {
374 if let Value::Object(ref mut map) = input {
375 map.insert("args".to_string(), a);
376 } else {
377 input = serde_json::json!({ "args": a, "input": input });
378 }
379 }
380 input
381}
382
383fn budget_from_def(def: &BudgetDef) -> Budget {
384 match def {
385 BudgetDef::Tokens { tokens } => Budget::tokens(*tokens),
386 BudgetDef::Calls { calls } => Budget::calls(*calls),
387 BudgetDef::Duration { duration_ms } => {
388 Budget::duration(std::time::Duration::from_millis(*duration_ms))
389 }
390 BudgetDef::CostCents { cost_cents } => Budget::cost_cents(*cost_cents),
391 }
392}
393
394fn parse_range(s: &str) -> ConfidenceRange {
396 let s = s.trim();
397 let inclusive_end = s.ends_with(']');
398 let inner = &s[1..s.len() - 1];
399 let parts: Vec<&str> = inner.split(',').collect();
400 let lo: f32 = parts[0].trim().parse().expect("invalid range lower bound");
401 let hi: f32 = parts[1].trim().parse().expect("invalid range upper bound");
402 if inclusive_end {
403 ConfidenceRange::inclusive(lo, hi)
404 } else {
405 ConfidenceRange::exclusive(lo, hi)
406 }
407}