Skip to main content

axon/flow_dispatcher/
lambda_tools.rs

1//! §Fase 33.y.j — Lambda + UseTool. The final 2 variants needed
2//! to reach 45/45 IRFlowNode graduation.
3//!
4//! Two variants graduated in 33.y.j:
5//!
6//! - **`LambdaDataApply`** (Fase 15 ΛD apply) — apply a named
7//!   lambda data structure to a target expression. Sync runner
8//!   walks a CPS dispatcher mapping lambda data structures to
9//!   their result expressions. OSS reference impl uses the public
10//!   helper [`apply_lambda_data`] which returns a canonical
11//!   `"lambda:<name>(<target>)"` placeholder; enterprise R&D
12//!   (axon_enterprise lambda runtime) wires the real CPS
13//!   dispatcher.
14//!
15//! - **`UseTool`** (Fase 22 mid-step tool dispatch) — invoke a
16//!   named tool with an argument. The full
17//!   `ChatRequest.tools` cross-cutting plumb-through (D8) lands
18//!   in 33.y.k as a cross-cutting fix that extends the
19//!   `pure_shape` core. 33.y.j ships the OSS reference impl via
20//!   the public helper [`invoke_tool`] which returns a canonical
21//!   `"tool:<name>(<argument>)"` placeholder; enterprise R&D
22//!   wires the real Fase 22 tool registry + dispatch.
23//!
24//! After 33.y.j: 45/45 IRFlowNode variants graduated. The legacy
25//! `shim` becomes structurally unreachable from `dispatch_node`;
26//! 33.y.l explicitly retires it.
27//!
28//! # D-letter anchors
29//!
30//! - **D1** — both variants have NAMED async handlers; the
31//!   exhaustive match in `dispatch_node` reaches 45/45 graduation.
32//! - **D3** — cancel checked at every `.await` boundary.
33//! - **D7** — every error case routes through `DispatchError`;
34//!   OSS helpers cannot fail (placeholder semantics); enterprise
35//!   overrides surface `BackendError` for real lambda/tool
36//!   runtime errors.
37//! - **D8 (preview)** — UseTool is the 33.y.k cross-cutting
38//!   anchor. 33.y.j ships the wire shape + helper surface; 33.y.k
39//!   plumbs `ChatRequest.tools` through every `pure_shape`-routed
40//!   handler so adopters declaring `apply: <tool>` see real
41//!   tool-call events on the wire.
42//! - **D10** — sync-runner parity: lambda apply + tool invocation
43//!   produce deterministic placeholders for OSS path; enterprise
44//!   integration preserves the SAME wire envelope (only inner
45//!   content differs).
46
47use crate::flow_dispatcher::{DispatchCtx, DispatchError, NodeOutcome};
48use crate::flow_execution_event::{now_ms, FlowExecutionEvent};
49use crate::ir_nodes::{IRLambdaDataApply, IRUseToolStep};
50
51// ────────────────────────────────────────────────────────────────────
52//  Public helpers (enterprise hooks override these)
53// ────────────────────────────────────────────────────────────────────
54
55/// Apply a named ΛD (lambda data structure) to a target. OSS
56/// default: resolves `target` through `ctx.let_bindings` (literal
57/// if missing) + returns canonical `"lambda:<name>(<resolved_target>)"`.
58/// Enterprise overrides hook the Fase 15 CPS dispatcher (real
59/// lambda evaluation against the IR).
60pub fn apply_lambda_data(
61    lambda_name: &str,
62    target: &str,
63    ctx: &DispatchCtx,
64) -> String {
65    let resolved_target = ctx
66        .let_bindings
67        .get(target)
68        .cloned()
69        .unwrap_or_else(|| target.to_string());
70    format!("lambda:{lambda_name}({resolved_target})")
71}
72
73/// Invoke a tool with an argument. OSS default: resolves
74/// `argument` through `ctx.let_bindings` (literal if missing) +
75/// returns canonical `"tool:<name>(<resolved_argument>)"`.
76/// Enterprise overrides hook the Fase 22 tool registry +
77/// per-provider dispatch (Anthropic / OpenAI / etc.). The D8
78/// cross-cutting fix (33.y.k) extends `pure_shape::run_pure_shape`
79/// to plumb `ChatRequest.tools` so `apply: <tool>` on a Step
80/// activates real upstream tool-calling on the wire.
81pub fn invoke_tool(tool_name: &str, argument: &str, ctx: &DispatchCtx) -> String {
82    let resolved_argument = ctx
83        .let_bindings
84        .get(argument)
85        .cloned()
86        .unwrap_or_else(|| argument.to_string());
87    format!("tool:{tool_name}({resolved_argument})")
88}
89
90// ────────────────────────────────────────────────────────────────────
91//  LambdaDataApply (Fase 15 ΛD apply)
92// ────────────────────────────────────────────────────────────────────
93
94/// LambdaDataApply handler. Wire shape:
95/// `step_type: "lambda_data_apply"`. Resolves the lambda via
96/// [`apply_lambda_data`] + binds result under `output_type` (or
97/// `<target>_lambda_applied` canonical fallback) in
98/// `ctx.let_bindings`.
99pub async fn run_lambda_data_apply(
100    node: &IRLambdaDataApply,
101    ctx: &mut DispatchCtx,
102) -> Result<NodeOutcome, DispatchError> {
103    if ctx.cancel.is_cancelled() {
104        return Err(DispatchError::UpstreamCancelled);
105    }
106    let step_index = ctx.step_counter;
107    ctx.step_counter += 1;
108
109    let step_name = if node.lambda_data_name.is_empty() {
110        "LambdaApply".to_string()
111    } else {
112        node.lambda_data_name.clone()
113    };
114    emit_step_start(ctx, &step_name, step_index, "lambda_data_apply")?;
115
116    let result = apply_lambda_data(&node.lambda_data_name, &node.target, ctx);
117
118    let output_key = if !node.output_type.is_empty() {
119        node.output_type.clone()
120    } else if !node.target.is_empty() {
121        format!("{}_lambda_applied", node.target)
122    } else {
123        String::new()
124    };
125    if !output_key.is_empty() {
126        ctx.let_bindings.insert(output_key, result.clone());
127    }
128
129    emit_step_complete(ctx, &step_name, step_index, &result, 0, true)?;
130
131    Ok(NodeOutcome::Completed {
132        output: result,
133        tokens_emitted: 0,
134        step_index,
135    })
136}
137
138// ────────────────────────────────────────────────────────────────────
139//  UseTool (Fase 22 mid-step tool dispatch)
140// ────────────────────────────────────────────────────────────────────
141
142/// UseTool handler. Wire shape: `step_type: "use_tool"`. Binds the
143/// result under the `<tool_name>_result` canonical key in
144/// `ctx.let_bindings`.
145///
146/// # §Fase 58.f.2 — real dispatch on the streaming path
147///
148/// When the request-scoped `ctx.tool_registry` (wired by
149/// `run_streaming_via_dispatcher` since §36.i, so it is populated on
150/// every production SSE flow) resolves the tool to a locally-
151/// dispatchable provider (`native` / `stub` / `http` / `mcp`), the
152/// handler POSTs the STRUCTURED JSON body assembled from
153/// `node.named_args` (keyword form, D2) — or the interpolated single
154/// argument (legacy `on <arg>`, D5) — to the tool's `runtime:`
155/// endpoint (D7) and binds the real response. This retires the
156/// `"tool:<name>(<arg>)"` placeholder on the SSE path, reaching
157/// dispatch parity with the synchronous server path (§58.f).
158///
159/// The placeholder ([`invoke_tool`]) survives ONLY as the D5
160/// fall-back for tools with no registry, an unregistered name, or a
161/// provider that intentionally falls through to the LLM (e.g.
162/// `brave`) — those keep their pre-58 behavior byte-for-byte.
163pub async fn run_use_tool(
164    node: &IRUseToolStep,
165    ctx: &mut DispatchCtx,
166) -> Result<NodeOutcome, DispatchError> {
167    if ctx.cancel.is_cancelled() {
168        return Err(DispatchError::UpstreamCancelled);
169    }
170    let step_index = ctx.step_counter;
171    ctx.step_counter += 1;
172
173    let step_name = if node.tool_name.is_empty() {
174        "UseTool".to_string()
175    } else {
176        node.tool_name.clone()
177    };
178    emit_step_start(ctx, &step_name, step_index, "use_tool")?;
179
180    // §Fase 58.f.2 — attempt a real dispatch; honor cancel observed
181    // while the (potentially blocking, network-bound) call ran.
182    let (result, success) = match dispatch_use_tool_real(node, ctx).await {
183        Some(tool_result) => (tool_result.output, tool_result.success),
184        // D5 — no registry / unregistered / LLM-routed provider →
185        // the canonical placeholder, unchanged from pre-58.
186        None => (invoke_tool(&node.tool_name, &node.argument, ctx), true),
187    };
188    if ctx.cancel.is_cancelled() {
189        return Err(DispatchError::UpstreamCancelled);
190    }
191
192    if !node.tool_name.is_empty() {
193        ctx.let_bindings
194            .insert(format!("{}_result", node.tool_name), result.clone());
195    }
196
197    emit_step_complete(ctx, &step_name, step_index, &result, 0, success)?;
198
199    Ok(NodeOutcome::Completed {
200        output: result,
201        tokens_emitted: 0,
202        step_index,
203    })
204}
205
206/// §Fase 58.f.2 — attempt a REAL tool dispatch on the streaming path.
207///
208/// Returns `Some(ToolResult)` when `ctx.tool_registry` resolves the
209/// tool to a locally-dispatchable provider; `None` when there is no
210/// registry, the tool is unregistered, or its provider falls through
211/// to the LLM — the caller then keeps the canonical placeholder (D5).
212///
213/// The structured `use Tool(k = v, …)` body is assembled with the
214/// SAME `(name, type)` coercion the synchronous server path applies
215/// (`runner::build_structured_tool_body`, §58.e), reading the typed
216/// schema carried on the [`crate::tool_registry::ToolEntry`] (§58.f.2
217/// piece 1). Interpolation of arg values mirrors the sync path's
218/// [`crate::exec_context::ExecContext::interpolate`] via the shared
219/// `interpolate_vars` helper over `ctx.let_bindings`.
220///
221/// `registry.dispatch` uses a blocking `reqwest` client for the
222/// `http` / `mcp` providers; calling it directly inside the tokio
223/// runtime would panic, so the dispatch runs on the blocking pool via
224/// `spawn_blocking` (D6). The request-scoped registry is `Arc`-cloned
225/// into the task — never a shared mutable global (D10).
226async fn dispatch_use_tool_real(
227    node: &IRUseToolStep,
228    ctx: &DispatchCtx,
229) -> Option<crate::tool_executor::ToolResult> {
230    let registry = ctx.tool_registry.clone()?;
231    // Resolve the typed input schema for coercion. The borrow ends
232    // here (cloned) so `registry` can move into `spawn_blocking`.
233    let parameters = registry.get(&node.tool_name)?.parameters.clone();
234
235    // Assemble the request argument: a structured JSON body for the
236    // keyword form (D2), or the interpolated single argument for the
237    // legacy `on <arg>` form (D5).
238    let argument = if node.named_args.is_empty() {
239        crate::exec_context::interpolate_vars(&node.argument, &ctx.let_bindings)
240    } else {
241        let interpolated: Vec<(String, String)> = node
242            .named_args
243            .iter()
244            .map(|a| {
245                (
246                    a.name.clone(),
247                    // §Fase 60 — resolve by value_kind: a `"reference"` (bare
248                    // identifier / `Step.output`) is a binding lookup, not a
249                    // literal name; `"literal"` keeps `${…}` interpolation.
250                    crate::exec_context::resolve_named_arg_value(
251                        &a.value,
252                        &a.value_kind,
253                        &ctx.let_bindings,
254                    ),
255                )
256            })
257            .collect();
258        crate::runner::build_structured_tool_body(&interpolated, &parameters)
259    };
260
261    let tool_name = node.tool_name.clone();
262    let registry_for_task = registry.clone();
263    match tokio::task::spawn_blocking(move || {
264        registry_for_task.dispatch(&tool_name, &argument)
265    })
266    .await
267    {
268        Ok(opt) => opt,
269        // A join failure (panic in the blocking task) surfaces as a
270        // failed ToolResult rather than propagating a panic to the
271        // dispatcher — the consumer sees a clean error, never a hang.
272        Err(join_err) => Some(crate::tool_executor::ToolResult {
273            success: false,
274            output: format!(
275                "tool '{}' dispatch task failed: {join_err}",
276                node.tool_name
277            ),
278            tool_name: node.tool_name.clone(),
279        }),
280    }
281}
282
283// ────────────────────────────────────────────────────────────────────
284//  Wire-event helpers
285// ────────────────────────────────────────────────────────────────────
286
287fn emit_step_start(
288    ctx: &mut DispatchCtx,
289    step_name: &str,
290    step_index: usize,
291    step_type: &str,
292) -> Result<(), DispatchError> {
293    ctx.tx
294        .send(FlowExecutionEvent::StepStart {
295            step_name: step_name.to_string(),
296            step_index,
297            step_type: step_type.to_string(),
298            timestamp_ms: now_ms(),
299        })
300        .map_err(|_| DispatchError::ChannelClosed)
301}
302
303fn emit_step_complete(
304    ctx: &mut DispatchCtx,
305    step_name: &str,
306    step_index: usize,
307    full_output: &str,
308    tokens_output: u64,
309    success: bool,
310) -> Result<(), DispatchError> {
311    ctx.tx
312        .send(FlowExecutionEvent::StepComplete {
313            step_name: step_name.to_string(),
314            step_index,
315            success,
316            full_output: full_output.to_string(),
317            tokens_input: 0,
318            tokens_output,
319            timestamp_ms: now_ms(),
320        })
321        .map_err(|_| DispatchError::ChannelClosed)
322}
323
324// ────────────────────────────────────────────────────────────────────
325//  Unit tests
326// ────────────────────────────────────────────────────────────────────
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331    use crate::cancel_token::CancellationFlag;
332    use crate::ir_nodes::*;
333    use tokio::sync::mpsc;
334
335    fn fresh_ctx() -> (
336        DispatchCtx,
337        mpsc::UnboundedReceiver<FlowExecutionEvent>,
338    ) {
339        let (tx, rx) = mpsc::unbounded_channel();
340        let ctx = DispatchCtx::new(
341            "TestFlow",
342            "stub",
343            "",
344            CancellationFlag::new(),
345            tx,
346        );
347        (ctx, rx)
348    }
349
350    // ── apply_lambda_data ────────────────────────────────────────────
351
352    #[test]
353    fn apply_lambda_data_literal_target() {
354        let (ctx, _rx) = fresh_ctx();
355        assert_eq!(
356            apply_lambda_data("inc", "5", &ctx),
357            "lambda:inc(5)"
358        );
359    }
360
361    #[test]
362    fn apply_lambda_data_resolves_target_through_bindings() {
363        let (mut ctx, _rx) = fresh_ctx();
364        ctx.let_bindings.insert("x".into(), "42".into());
365        assert_eq!(
366            apply_lambda_data("square", "x", &ctx),
367            "lambda:square(42)"
368        );
369    }
370
371    // ── invoke_tool ──────────────────────────────────────────────────
372
373    #[test]
374    fn invoke_tool_literal_argument() {
375        let (ctx, _rx) = fresh_ctx();
376        assert_eq!(
377            invoke_tool("calculator", "2+2", &ctx),
378            "tool:calculator(2+2)"
379        );
380    }
381
382    #[test]
383    fn invoke_tool_resolves_argument_through_bindings() {
384        let (mut ctx, _rx) = fresh_ctx();
385        ctx.let_bindings.insert("query".into(), "weather today".into());
386        assert_eq!(
387            invoke_tool("web_search", "query", &ctx),
388            "tool:web_search(weather today)"
389        );
390    }
391
392    // ── LambdaDataApply ──────────────────────────────────────────────
393
394    #[tokio::test]
395    async fn run_lambda_data_apply_binds_under_output_type() {
396        let (mut ctx, mut rx) = fresh_ctx();
397        ctx.let_bindings.insert("input_data".into(), "raw".into());
398        let node = IRLambdaDataApply {
399            node_type: "lambda_data_apply",
400            source_line: 0,
401            source_column: 0,
402            lambda_data_name: "transform".into(),
403            target: "input_data".into(),
404            output_type: "transformed".into(),
405        };
406        let outcome = run_lambda_data_apply(&node, &mut ctx).await.unwrap();
407        match outcome {
408            NodeOutcome::Completed { output, tokens_emitted, .. } => {
409                assert_eq!(output, "lambda:transform(raw)");
410                assert_eq!(tokens_emitted, 0);
411            }
412            other => panic!("expected Completed, got {other:?}"),
413        }
414        assert_eq!(
415            ctx.let_bindings.get("transformed").unwrap(),
416            "lambda:transform(raw)"
417        );
418        let first = rx.try_recv().unwrap();
419        match first {
420            FlowExecutionEvent::StepStart { step_type, .. } => {
421                assert_eq!(step_type, "lambda_data_apply");
422            }
423            e => panic!("expected StepStart, got {e:?}"),
424        }
425    }
426
427    #[tokio::test]
428    async fn run_lambda_data_apply_canonical_fallback() {
429        let (mut ctx, _rx) = fresh_ctx();
430        let node = IRLambdaDataApply {
431            node_type: "lambda_data_apply",
432            source_line: 0,
433            source_column: 0,
434            lambda_data_name: "norm".into(),
435            target: "doc".into(),
436            output_type: String::new(),
437        };
438        run_lambda_data_apply(&node, &mut ctx).await.unwrap();
439        assert_eq!(
440            ctx.let_bindings.get("doc_lambda_applied").unwrap(),
441            "lambda:norm(doc)"
442        );
443    }
444
445    // ── UseTool ──────────────────────────────────────────────────────
446
447    #[tokio::test]
448    async fn run_use_tool_binds_under_canonical_result_key() {
449        let (mut ctx, mut rx) = fresh_ctx();
450        ctx.let_bindings.insert("input".into(), "5+3".into());
451        let node = IRUseToolStep {
452            node_type: "use_tool",
453            source_line: 0,
454            source_column: 0,
455            tool_name: "calculator".into(),
456            argument: "input".into(),
457            named_args: Vec::new(),
458        };
459        let outcome = run_use_tool(&node, &mut ctx).await.unwrap();
460        match outcome {
461            NodeOutcome::Completed { output, tokens_emitted, .. } => {
462                assert_eq!(output, "tool:calculator(5+3)");
463                assert_eq!(tokens_emitted, 0);
464            }
465            other => panic!("expected Completed, got {other:?}"),
466        }
467        assert_eq!(
468            ctx.let_bindings.get("calculator_result").unwrap(),
469            "tool:calculator(5+3)"
470        );
471        let first = rx.try_recv().unwrap();
472        match first {
473            FlowExecutionEvent::StepStart { step_type, .. } => {
474                assert_eq!(step_type, "use_tool");
475            }
476            e => panic!("expected StepStart, got {e:?}"),
477        }
478    }
479
480    // ── Cancel guards ────────────────────────────────────────────────
481
482    #[tokio::test]
483    async fn lambda_and_use_tool_short_circuit_on_cancel() {
484        let cancel = CancellationFlag::new();
485        cancel.cancel();
486        let (tx, _rx) = mpsc::unbounded_channel();
487        let mut ctx = DispatchCtx::new("F", "stub", "", cancel, tx);
488
489        let lambda = IRLambdaDataApply {
490            node_type: "lambda_data_apply",
491            source_line: 0,
492            source_column: 0,
493            lambda_data_name: "x".into(),
494            target: "y".into(),
495            output_type: "z".into(),
496        };
497        assert!(matches!(
498            run_lambda_data_apply(&lambda, &mut ctx).await,
499            Err(DispatchError::UpstreamCancelled)
500        ));
501
502        let ut = IRUseToolStep {
503            node_type: "use_tool",
504            source_line: 0,
505            source_column: 0,
506            tool_name: "x".into(),
507            argument: "y".into(),
508            named_args: Vec::new(),
509        };
510        assert!(matches!(
511            run_use_tool(&ut, &mut ctx).await,
512            Err(DispatchError::UpstreamCancelled)
513        ));
514    }
515}