axon/flow_dispatcher/lambda_tools.rs
1//! §Fase 33.y.j — Lambda + UseTool. The final 2 variants needed
2//! to reach 45/45 IRFlowNode graduation.
3//!
4//! Two variants graduated in 33.y.j:
5//!
6//! - **`LambdaDataApply`** (Fase 15 ΛD apply) — apply a named
7//! lambda data structure to a target expression. Sync runner
8//! walks a CPS dispatcher mapping lambda data structures to
9//! their result expressions. OSS reference impl uses the public
10//! helper [`apply_lambda_data`] which returns a canonical
11//! `"lambda:<name>(<target>)"` placeholder; enterprise R&D
12//! (axon_enterprise lambda runtime) wires the real CPS
13//! dispatcher.
14//!
15//! - **`UseTool`** (Fase 22 mid-step tool dispatch) — invoke a
16//! named tool with an argument. The full
17//! `ChatRequest.tools` cross-cutting plumb-through (D8) lands
18//! in 33.y.k as a cross-cutting fix that extends the
19//! `pure_shape` core. 33.y.j ships the OSS reference impl via
20//! the public helper [`invoke_tool`] which returns a canonical
21//! `"tool:<name>(<argument>)"` placeholder; enterprise R&D
22//! wires the real Fase 22 tool registry + dispatch.
23//!
24//! After 33.y.j: 45/45 IRFlowNode variants graduated. The legacy
25//! `shim` becomes structurally unreachable from `dispatch_node`;
26//! 33.y.l explicitly retires it.
27//!
28//! # D-letter anchors
29//!
30//! - **D1** — both variants have NAMED async handlers; the
31//! exhaustive match in `dispatch_node` reaches 45/45 graduation.
32//! - **D3** — cancel checked at every `.await` boundary.
33//! - **D7** — every error case routes through `DispatchError`;
34//! OSS helpers cannot fail (placeholder semantics); enterprise
35//! overrides surface `BackendError` for real lambda/tool
36//! runtime errors.
37//! - **D8 (preview)** — UseTool is the 33.y.k cross-cutting
38//! anchor. 33.y.j ships the wire shape + helper surface; 33.y.k
39//! plumbs `ChatRequest.tools` through every `pure_shape`-routed
40//! handler so adopters declaring `apply: <tool>` see real
41//! tool-call events on the wire.
42//! - **D10** — sync-runner parity: lambda apply + tool invocation
43//! produce deterministic placeholders for OSS path; enterprise
44//! integration preserves the SAME wire envelope (only inner
45//! content differs).
46
47use crate::flow_dispatcher::{DispatchCtx, DispatchError, NodeOutcome};
48use crate::flow_execution_event::{now_ms, FlowExecutionEvent};
49use crate::ir_nodes::{IRLambdaDataApply, IRUseToolStep};
50
51// ────────────────────────────────────────────────────────────────────
52// Public helpers (enterprise hooks override these)
53// ────────────────────────────────────────────────────────────────────
54
55/// Apply a named ΛD (lambda data structure) to a target. OSS
56/// default: resolves `target` through `ctx.let_bindings` (literal
57/// if missing) + returns canonical `"lambda:<name>(<resolved_target>)"`.
58/// Enterprise overrides hook the Fase 15 CPS dispatcher (real
59/// lambda evaluation against the IR).
60pub fn apply_lambda_data(
61 lambda_name: &str,
62 target: &str,
63 ctx: &DispatchCtx,
64) -> String {
65 let resolved_target = ctx
66 .let_bindings
67 .get(target)
68 .cloned()
69 .unwrap_or_else(|| target.to_string());
70 format!("lambda:{lambda_name}({resolved_target})")
71}
72
73/// Invoke a tool with an argument. OSS default: resolves
74/// `argument` through `ctx.let_bindings` (literal if missing) +
75/// returns canonical `"tool:<name>(<resolved_argument>)"`.
76/// Enterprise overrides hook the Fase 22 tool registry +
77/// per-provider dispatch (Anthropic / OpenAI / etc.). The D8
78/// cross-cutting fix (33.y.k) extends `pure_shape::run_pure_shape`
79/// to plumb `ChatRequest.tools` so `apply: <tool>` on a Step
80/// activates real upstream tool-calling on the wire.
81pub fn invoke_tool(tool_name: &str, argument: &str, ctx: &DispatchCtx) -> String {
82 let resolved_argument = ctx
83 .let_bindings
84 .get(argument)
85 .cloned()
86 .unwrap_or_else(|| argument.to_string());
87 format!("tool:{tool_name}({resolved_argument})")
88}
89
90// ────────────────────────────────────────────────────────────────────
91// LambdaDataApply (Fase 15 ΛD apply)
92// ────────────────────────────────────────────────────────────────────
93
94/// LambdaDataApply handler. Wire shape:
95/// `step_type: "lambda_data_apply"`. Resolves the lambda via
96/// [`apply_lambda_data`] + binds result under `output_type` (or
97/// `<target>_lambda_applied` canonical fallback) in
98/// `ctx.let_bindings`.
99pub async fn run_lambda_data_apply(
100 node: &IRLambdaDataApply,
101 ctx: &mut DispatchCtx,
102) -> Result<NodeOutcome, DispatchError> {
103 if ctx.cancel.is_cancelled() {
104 return Err(DispatchError::UpstreamCancelled);
105 }
106 let step_index = ctx.step_counter;
107 ctx.step_counter += 1;
108
109 let step_name = if node.lambda_data_name.is_empty() {
110 "LambdaApply".to_string()
111 } else {
112 node.lambda_data_name.clone()
113 };
114 emit_step_start(ctx, &step_name, step_index, "lambda_data_apply")?;
115
116 let result = apply_lambda_data(&node.lambda_data_name, &node.target, ctx);
117
118 let output_key = if !node.output_type.is_empty() {
119 node.output_type.clone()
120 } else if !node.target.is_empty() {
121 format!("{}_lambda_applied", node.target)
122 } else {
123 String::new()
124 };
125 if !output_key.is_empty() {
126 ctx.let_bindings.insert(output_key, result.clone());
127 }
128
129 emit_step_complete(ctx, &step_name, step_index, &result, 0, true)?;
130
131 Ok(NodeOutcome::Completed {
132 output: result,
133 tokens_emitted: 0,
134 step_index,
135 })
136}
137
138// ────────────────────────────────────────────────────────────────────
139// UseTool (Fase 22 mid-step tool dispatch)
140// ────────────────────────────────────────────────────────────────────
141
142/// UseTool handler. Wire shape: `step_type: "use_tool"`. Binds the
143/// result under the `<tool_name>_result` canonical key in
144/// `ctx.let_bindings`.
145///
146/// # §Fase 58.f.2 — real dispatch on the streaming path
147///
148/// When the request-scoped `ctx.tool_registry` (wired by
149/// `run_streaming_via_dispatcher` since §36.i, so it is populated on
150/// every production SSE flow) resolves the tool to a locally-
151/// dispatchable provider (`native` / `stub` / `http` / `mcp`), the
152/// handler POSTs the STRUCTURED JSON body assembled from
153/// `node.named_args` (keyword form, D2) — or the interpolated single
154/// argument (legacy `on <arg>`, D5) — to the tool's `runtime:`
155/// endpoint (D7) and binds the real response. This retires the
156/// `"tool:<name>(<arg>)"` placeholder on the SSE path, reaching
157/// dispatch parity with the synchronous server path (§58.f).
158///
159/// The placeholder ([`invoke_tool`]) survives ONLY as the D5
160/// fall-back for tools with no registry, an unregistered name, or a
161/// provider that intentionally falls through to the LLM (e.g.
162/// `brave`) — those keep their pre-58 behavior byte-for-byte.
163pub async fn run_use_tool(
164 node: &IRUseToolStep,
165 ctx: &mut DispatchCtx,
166) -> Result<NodeOutcome, DispatchError> {
167 if ctx.cancel.is_cancelled() {
168 return Err(DispatchError::UpstreamCancelled);
169 }
170 let step_index = ctx.step_counter;
171 ctx.step_counter += 1;
172
173 let step_name = if node.tool_name.is_empty() {
174 "UseTool".to_string()
175 } else {
176 node.tool_name.clone()
177 };
178 emit_step_start(ctx, &step_name, step_index, "use_tool")?;
179
180 // §Fase 58.f.2 — attempt a real dispatch; honor cancel observed
181 // while the (potentially blocking, network-bound) call ran.
182 let (result, success) = match dispatch_use_tool_real(node, ctx).await {
183 Some(tool_result) => (tool_result.output, tool_result.success),
184 // D5 — no registry / unregistered / LLM-routed provider →
185 // the canonical placeholder, unchanged from pre-58.
186 None => (invoke_tool(&node.tool_name, &node.argument, ctx), true),
187 };
188 if ctx.cancel.is_cancelled() {
189 return Err(DispatchError::UpstreamCancelled);
190 }
191
192 if !node.tool_name.is_empty() {
193 ctx.let_bindings
194 .insert(format!("{}_result", node.tool_name), result.clone());
195 }
196
197 emit_step_complete(ctx, &step_name, step_index, &result, 0, success)?;
198
199 Ok(NodeOutcome::Completed {
200 output: result,
201 tokens_emitted: 0,
202 step_index,
203 })
204}
205
206/// §Fase 58.f.2 — attempt a REAL tool dispatch on the streaming path.
207///
208/// Returns `Some(ToolResult)` when `ctx.tool_registry` resolves the
209/// tool to a locally-dispatchable provider; `None` when there is no
210/// registry, the tool is unregistered, or its provider falls through
211/// to the LLM — the caller then keeps the canonical placeholder (D5).
212///
213/// The structured `use Tool(k = v, …)` body is assembled with the
214/// SAME `(name, type)` coercion the synchronous server path applies
215/// (`runner::build_structured_tool_body`, §58.e), reading the typed
216/// schema carried on the [`crate::tool_registry::ToolEntry`] (§58.f.2
217/// piece 1). Interpolation of arg values mirrors the sync path's
218/// [`crate::exec_context::ExecContext::interpolate`] via the shared
219/// `interpolate_vars` helper over `ctx.let_bindings`.
220///
221/// `registry.dispatch` uses a blocking `reqwest` client for the
222/// `http` / `mcp` providers; calling it directly inside the tokio
223/// runtime would panic, so the dispatch runs on the blocking pool via
224/// `spawn_blocking` (D6). The request-scoped registry is `Arc`-cloned
225/// into the task — never a shared mutable global (D10).
226async fn dispatch_use_tool_real(
227 node: &IRUseToolStep,
228 ctx: &DispatchCtx,
229) -> Option<crate::tool_executor::ToolResult> {
230 let registry = ctx.tool_registry.clone()?;
231 // Resolve the typed input schema for coercion. The borrow ends
232 // here (cloned) so `registry` can move into `spawn_blocking`.
233 let parameters = registry.get(&node.tool_name)?.parameters.clone();
234
235 // Assemble the request argument: a structured JSON body for the
236 // keyword form (D2), or the interpolated single argument for the
237 // legacy `on <arg>` form (D5).
238 let argument = if node.named_args.is_empty() {
239 crate::exec_context::interpolate_vars(&node.argument, &ctx.let_bindings)
240 } else {
241 let interpolated: Vec<(String, String)> = node
242 .named_args
243 .iter()
244 .map(|a| {
245 (
246 a.name.clone(),
247 // §Fase 60 — resolve by value_kind: a `"reference"` (bare
248 // identifier / `Step.output`) is a binding lookup, not a
249 // literal name; `"literal"` keeps `${…}` interpolation.
250 crate::exec_context::resolve_named_arg_value(
251 &a.value,
252 &a.value_kind,
253 &ctx.let_bindings,
254 ),
255 )
256 })
257 .collect();
258 crate::runner::build_structured_tool_body(&interpolated, ¶meters)
259 };
260
261 let tool_name = node.tool_name.clone();
262 let registry_for_task = registry.clone();
263 match tokio::task::spawn_blocking(move || {
264 registry_for_task.dispatch(&tool_name, &argument)
265 })
266 .await
267 {
268 Ok(opt) => opt,
269 // A join failure (panic in the blocking task) surfaces as a
270 // failed ToolResult rather than propagating a panic to the
271 // dispatcher — the consumer sees a clean error, never a hang.
272 Err(join_err) => Some(crate::tool_executor::ToolResult {
273 success: false,
274 output: format!(
275 "tool '{}' dispatch task failed: {join_err}",
276 node.tool_name
277 ),
278 tool_name: node.tool_name.clone(),
279 }),
280 }
281}
282
283// ────────────────────────────────────────────────────────────────────
284// Wire-event helpers
285// ────────────────────────────────────────────────────────────────────
286
287fn emit_step_start(
288 ctx: &mut DispatchCtx,
289 step_name: &str,
290 step_index: usize,
291 step_type: &str,
292) -> Result<(), DispatchError> {
293 ctx.tx
294 .send(FlowExecutionEvent::StepStart {
295 step_name: step_name.to_string(),
296 step_index,
297 step_type: step_type.to_string(),
298 timestamp_ms: now_ms(),
299 })
300 .map_err(|_| DispatchError::ChannelClosed)
301}
302
303fn emit_step_complete(
304 ctx: &mut DispatchCtx,
305 step_name: &str,
306 step_index: usize,
307 full_output: &str,
308 tokens_output: u64,
309 success: bool,
310) -> Result<(), DispatchError> {
311 ctx.tx
312 .send(FlowExecutionEvent::StepComplete {
313 step_name: step_name.to_string(),
314 step_index,
315 success,
316 full_output: full_output.to_string(),
317 tokens_input: 0,
318 tokens_output,
319 timestamp_ms: now_ms(),
320 })
321 .map_err(|_| DispatchError::ChannelClosed)
322}
323
324// ────────────────────────────────────────────────────────────────────
325// Unit tests
326// ────────────────────────────────────────────────────────────────────
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331 use crate::cancel_token::CancellationFlag;
332 use crate::ir_nodes::*;
333 use tokio::sync::mpsc;
334
335 fn fresh_ctx() -> (
336 DispatchCtx,
337 mpsc::UnboundedReceiver<FlowExecutionEvent>,
338 ) {
339 let (tx, rx) = mpsc::unbounded_channel();
340 let ctx = DispatchCtx::new(
341 "TestFlow",
342 "stub",
343 "",
344 CancellationFlag::new(),
345 tx,
346 );
347 (ctx, rx)
348 }
349
350 // ── apply_lambda_data ────────────────────────────────────────────
351
352 #[test]
353 fn apply_lambda_data_literal_target() {
354 let (ctx, _rx) = fresh_ctx();
355 assert_eq!(
356 apply_lambda_data("inc", "5", &ctx),
357 "lambda:inc(5)"
358 );
359 }
360
361 #[test]
362 fn apply_lambda_data_resolves_target_through_bindings() {
363 let (mut ctx, _rx) = fresh_ctx();
364 ctx.let_bindings.insert("x".into(), "42".into());
365 assert_eq!(
366 apply_lambda_data("square", "x", &ctx),
367 "lambda:square(42)"
368 );
369 }
370
371 // ── invoke_tool ──────────────────────────────────────────────────
372
373 #[test]
374 fn invoke_tool_literal_argument() {
375 let (ctx, _rx) = fresh_ctx();
376 assert_eq!(
377 invoke_tool("calculator", "2+2", &ctx),
378 "tool:calculator(2+2)"
379 );
380 }
381
382 #[test]
383 fn invoke_tool_resolves_argument_through_bindings() {
384 let (mut ctx, _rx) = fresh_ctx();
385 ctx.let_bindings.insert("query".into(), "weather today".into());
386 assert_eq!(
387 invoke_tool("web_search", "query", &ctx),
388 "tool:web_search(weather today)"
389 );
390 }
391
392 // ── LambdaDataApply ──────────────────────────────────────────────
393
394 #[tokio::test]
395 async fn run_lambda_data_apply_binds_under_output_type() {
396 let (mut ctx, mut rx) = fresh_ctx();
397 ctx.let_bindings.insert("input_data".into(), "raw".into());
398 let node = IRLambdaDataApply {
399 node_type: "lambda_data_apply",
400 source_line: 0,
401 source_column: 0,
402 lambda_data_name: "transform".into(),
403 target: "input_data".into(),
404 output_type: "transformed".into(),
405 };
406 let outcome = run_lambda_data_apply(&node, &mut ctx).await.unwrap();
407 match outcome {
408 NodeOutcome::Completed { output, tokens_emitted, .. } => {
409 assert_eq!(output, "lambda:transform(raw)");
410 assert_eq!(tokens_emitted, 0);
411 }
412 other => panic!("expected Completed, got {other:?}"),
413 }
414 assert_eq!(
415 ctx.let_bindings.get("transformed").unwrap(),
416 "lambda:transform(raw)"
417 );
418 let first = rx.try_recv().unwrap();
419 match first {
420 FlowExecutionEvent::StepStart { step_type, .. } => {
421 assert_eq!(step_type, "lambda_data_apply");
422 }
423 e => panic!("expected StepStart, got {e:?}"),
424 }
425 }
426
427 #[tokio::test]
428 async fn run_lambda_data_apply_canonical_fallback() {
429 let (mut ctx, _rx) = fresh_ctx();
430 let node = IRLambdaDataApply {
431 node_type: "lambda_data_apply",
432 source_line: 0,
433 source_column: 0,
434 lambda_data_name: "norm".into(),
435 target: "doc".into(),
436 output_type: String::new(),
437 };
438 run_lambda_data_apply(&node, &mut ctx).await.unwrap();
439 assert_eq!(
440 ctx.let_bindings.get("doc_lambda_applied").unwrap(),
441 "lambda:norm(doc)"
442 );
443 }
444
445 // ── UseTool ──────────────────────────────────────────────────────
446
447 #[tokio::test]
448 async fn run_use_tool_binds_under_canonical_result_key() {
449 let (mut ctx, mut rx) = fresh_ctx();
450 ctx.let_bindings.insert("input".into(), "5+3".into());
451 let node = IRUseToolStep {
452 node_type: "use_tool",
453 source_line: 0,
454 source_column: 0,
455 tool_name: "calculator".into(),
456 argument: "input".into(),
457 named_args: Vec::new(),
458 };
459 let outcome = run_use_tool(&node, &mut ctx).await.unwrap();
460 match outcome {
461 NodeOutcome::Completed { output, tokens_emitted, .. } => {
462 assert_eq!(output, "tool:calculator(5+3)");
463 assert_eq!(tokens_emitted, 0);
464 }
465 other => panic!("expected Completed, got {other:?}"),
466 }
467 assert_eq!(
468 ctx.let_bindings.get("calculator_result").unwrap(),
469 "tool:calculator(5+3)"
470 );
471 let first = rx.try_recv().unwrap();
472 match first {
473 FlowExecutionEvent::StepStart { step_type, .. } => {
474 assert_eq!(step_type, "use_tool");
475 }
476 e => panic!("expected StepStart, got {e:?}"),
477 }
478 }
479
480 // ── Cancel guards ────────────────────────────────────────────────
481
482 #[tokio::test]
483 async fn lambda_and_use_tool_short_circuit_on_cancel() {
484 let cancel = CancellationFlag::new();
485 cancel.cancel();
486 let (tx, _rx) = mpsc::unbounded_channel();
487 let mut ctx = DispatchCtx::new("F", "stub", "", cancel, tx);
488
489 let lambda = IRLambdaDataApply {
490 node_type: "lambda_data_apply",
491 source_line: 0,
492 source_column: 0,
493 lambda_data_name: "x".into(),
494 target: "y".into(),
495 output_type: "z".into(),
496 };
497 assert!(matches!(
498 run_lambda_data_apply(&lambda, &mut ctx).await,
499 Err(DispatchError::UpstreamCancelled)
500 ));
501
502 let ut = IRUseToolStep {
503 node_type: "use_tool",
504 source_line: 0,
505 source_column: 0,
506 tool_name: "x".into(),
507 argument: "y".into(),
508 named_args: Vec::new(),
509 };
510 assert!(matches!(
511 run_use_tool(&ut, &mut ctx).await,
512 Err(DispatchError::UpstreamCancelled)
513 ));
514 }
515}