Skip to main content

axon/
tool_dispatch_bridge.rs

1//! §Fase 34.d (v1.29.0) — Bridge between [`ToolEntry`] (the registry
2//! shape) and [`Tool`] trait impls (the dispatcher's streaming
3//! surface).
4//!
5//! The dispatcher's `pure_shape::run_step` calls
6//! [`resolve_streaming_tool`] to obtain a [`Tool`] trait object for
7//! a given registry entry. The returned object's `stream()` method
8//! produces a `Stream<ToolChunk>` the dispatcher drains chunk-by-
9//! chunk + emits as `FlowExecutionEvent::StepToken` events on the
10//! wire.
11//!
12//! # 34.d-scoped provider coverage
13//!
14//! - **`stub`** → [`StubStreamingTool`] — emits a deterministic
15//!   3-chunk stream (intermediate × 3 + terminator). The reference
16//!   stream-producer for testing the dispatch path under all
17//!   policy / cancel / audit invariants.
18//! - **`native`** → [`NativeWrappedTool`] — wraps the existing
19//!   `tool_executor::dispatch` (Calculator / DateTimeTool) as a
20//!   single-chunk stream. D9 backwards-compat: built-in tools
21//!   that don't declare a stream effect still produce a Tool impl,
22//!   they just emit 1 chunk via the `from_result` path.
23//! - **`stub_stream`** → [`StubStreamingTool`] — explicit alias
24//!   that adopters can declare in source via
25//!   `tool MyStream { provider: "stub_stream" effects:
26//!   <stream:drop_oldest> ... }`. Same impl as `stub` but the
27//!   declaration documents the streaming intent.
28//! - **`http`** → [`crate::http_tool::HttpStreamingTool`] (Fase 34.e).
29//!   Async reqwest::Client + `bytes_stream()` drain with framing
30//!   classified by Content-Type: `text/event-stream` →
31//!   per-SSE-event ToolChunks; `application/x-ndjson` /
32//!   `application/jsonl` → per-line ToolChunks; everything else →
33//!   single-chunk wrap (D9 backwards-compat). HONEST error
34//!   terminator on every failure surface. Falls back to
35//!   [`SyncFallbackTool`] when the registry entry has an
36//!   invalid/missing URL (parser would already reject, defensive).
37//! - **`mcp`** → [`crate::emcp::McpStreamingTool`] (Fase 34.f). Async
38//!   reqwest::Client + JSON-RPC 2.0 over HTTP. Streaming MCP servers
39//!   (Content-Type `application/x-ndjson` / `application/jsonl`)
40//!   emit per-`notifications/message` ToolChunks; the final `result`
41//!   envelope closes the stream. Non-streaming MCP servers
42//!   (Content-Type `application/json`) fall back to D9 single-chunk
43//!   wrap. Best-effort `notifications/cancelled` notification fired
44//!   on cancel. Falls back to [`SyncFallbackTool`] when the registry
45//!   entry has an invalid/missing server URL.
46//!
47//! # Cross-stack contract
48//!
49//! Python's `axon.runtime.tools.streaming.Tool` ABC + provider-
50//! dispatch surface in `axon.runtime.tools.dispatcher` mirror this
51//! Rust bridge. Drift gate alignment lives in Fase 34.j fuzz.
52
53use crate::cancel_token::CancellationFlag;
54use crate::tool_executor::{self, ToolResult};
55use crate::tool_registry::ToolEntry;
56use crate::tool_trait::{Tool, ToolChunk, ToolContext, ToolFinishReason, ToolStream};
57use async_trait::async_trait;
58use futures::stream;
59
60// ════════════════════════════════════════════════════════════════════
61//  Factory
62// ════════════════════════════════════════════════════════════════════
63
64/// Resolve a [`Tool`] trait object from a registry [`ToolEntry`].
65///
66/// The dispatcher (`pure_shape::run_step`) calls this when
67/// `entry.is_streaming` is true. The returned trait object's
68/// `stream()` method drives the per-chunk wire emission path.
69///
70/// # 34.f-scoped dispatch
71///
72/// | Provider | Impl | Behavior |
73/// |---|---|---|
74/// | _(empty)_ | [`StubStreamingTool`] | §Fase 36.x.e — an unspecified `provider:` resolves to the deterministic stub stream |
75/// | `stub` | [`StubStreamingTool`] | Deterministic 3-chunk stream |
76/// | `stub_stream` | [`StubStreamingTool`] | Alias for adopter clarity |
77/// | `native` | [`NativeWrappedTool`] | Wraps `tool_executor::dispatch` as 1-chunk |
78/// | `http` | [`crate::http_tool::HttpStreamingTool`] | Async reqwest + framing-aware drain (Fase 34.e) |
79/// | `mcp` | [`crate::emcp::McpStreamingTool`] | JSON-RPC 2.0 + notifications stream (Fase 34.f) |
80/// | _other_ | [`SyncFallbackTool`] | Synchronous fallback (unknown provider) |
81///
82/// §Fase 36.x.e — a `tool` declaring `effects: <stream:…>` but NO
83/// `provider:` is under-specified, not erroneous: it resolves to the
84/// deterministic stub stream (no external dependency) so the flow
85/// runs gracefully — the adopter adds a concrete `provider:` when
86/// ready. An unspecified streaming provider that hard-errored the
87/// flow (via the `SyncFallbackTool` `_other_` arm) was the masked
88/// regression the Fase 36.x.c terminator fix surfaced.
89pub fn resolve_streaming_tool(entry: &ToolEntry) -> Box<dyn Tool> {
90    match entry.provider.trim() {
91        "" | "stub" | "stub_stream" => {
92            Box::new(StubStreamingTool::new(entry.name.clone()))
93        }
94        "native" => {
95            Box::new(NativeWrappedTool::new(entry.name.clone()))
96        }
97        "http" => match crate::http_tool::HttpStreamingTool::from_entry(entry) {
98            Ok(t) => Box::new(t),
99            // Defensive: parser rejects invalid URLs at compile time,
100            // but if a malformed runtime URL reaches the registry
101            // (e.g. programmatic registration in tests / hot-reload
102            // edge), fall back to the honest SyncFallbackTool so the
103            // consumer sees a structured error-terminator chunk
104            // instead of a panic.
105            Err(_) => Box::new(SyncFallbackTool::new(
106                entry.name.clone(),
107                "http".to_string(),
108            )),
109        },
110        "mcp" => match crate::emcp::McpStreamingTool::from_entry(entry) {
111            Ok(t) => Box::new(t),
112            // Defensive: same shape as the http arm — bad MCP server
113            // URL → honest fallback.
114            Err(_) => Box::new(SyncFallbackTool::new(
115                entry.name.clone(),
116                "mcp".to_string(),
117            )),
118        },
119        // Unknown provider → synchronous fallback. Adopters declaring
120        // a custom provider see the honest error-terminator at the
121        // wire layer.
122        other => Box::new(SyncFallbackTool::new(
123            entry.name.clone(),
124            other.to_string(),
125        )),
126    }
127}
128
129// ════════════════════════════════════════════════════════════════════
130//  StubStreamingTool — deterministic 3-chunk reference stream
131// ════════════════════════════════════════════════════════════════════
132
133/// Synthetic stream producer for testing the dispatcher's streaming
134/// arm. Emits a deterministic 4-frame sequence per invocation:
135///
136/// ```text
137///   ToolChunk::intermediate("[stub-stream] <name>(")
138///   ToolChunk::intermediate(<args>)
139///   ToolChunk::intermediate(")")
140///   ToolChunk::terminator("", ToolFinishReason::Stop)
141/// ```
142///
143/// Cancel-safe: between every chunk emission, the tool polls
144/// `ctx.cancel`; if fired, the stream short-circuits to a
145/// `ToolFinishReason::Cancelled` terminator chunk. The pre-cancel
146/// chunks already emitted reach the consumer; the post-cancel
147/// chunks are skipped. D5 p95 ≤100ms invariant honored at the
148/// chunk boundary.
149pub struct StubStreamingTool {
150    name: String,
151}
152
153impl StubStreamingTool {
154    /// Construct a new stub streaming tool with the given name.
155    pub fn new(name: String) -> Self {
156        Self { name }
157    }
158}
159
160#[async_trait]
161impl Tool for StubStreamingTool {
162    async fn execute(&self, args: String, _ctx: ToolContext) -> ToolResult {
163        // Synchronous path — kept available so the default
164        // `stream()` impl can fall back if needed. Adopters
165        // calling execute() directly get the materialized form.
166        ToolResult {
167            success: true,
168            output: format!("[stub-stream-tool] {}({args})", self.name),
169            tool_name: self.name.clone(),
170        }
171    }
172
173    async fn stream(&self, args: String, ctx: ToolContext) -> ToolStream {
174        let name = self.name.clone();
175        let cancel = ctx.cancel.clone();
176
177        // Materialize the chunk sequence eagerly. The runtime check
178        // for cancel happens at the dispatcher's drain boundary
179        // (see pure_shape::drain_stream_tool) — the dispatcher polls
180        // the cancel flag between consuming each chunk + emitting
181        // it on the wire, so stream lazily-yielded chunks honor the
182        // cancel discipline at the consumer side.
183        let chunks: Vec<ToolChunk> = if cancel.is_cancelled() {
184            // Pre-cancel: emit a cancelled-terminator and bail.
185            vec![ToolChunk::terminator("", ToolFinishReason::Cancelled)]
186        } else {
187            vec![
188                ToolChunk::intermediate(format!("[stub-stream] {name}(")),
189                ToolChunk::intermediate(args),
190                ToolChunk::intermediate(")"),
191                ToolChunk::terminator("", ToolFinishReason::Stop),
192            ]
193        };
194        Box::pin(stream::iter(chunks))
195    }
196
197    fn is_streaming(&self) -> bool {
198        true
199    }
200}
201
202// ════════════════════════════════════════════════════════════════════
203//  NativeWrappedTool — bridges built-in tools to the streaming surface
204// ════════════════════════════════════════════════════════════════════
205
206/// Wraps a built-in tool (Calculator / DateTimeTool) as a single-
207/// chunk stream. D9 backwards-compat: built-ins keep working
208/// byte-equal; their materialized output becomes the terminator
209/// chunk's `delta`.
210///
211/// This impl is rarely exercised by 34.d test paths (built-ins
212/// don't declare stream effects, so `is_streaming` is false on
213/// their registry entries and the dispatcher takes the
214/// synchronous path). The impl exists for completeness — adopters
215/// who programmatically flag a built-in as streaming (e.g. for
216/// testing) get a working single-chunk stream.
217pub struct NativeWrappedTool {
218    name: String,
219}
220
221impl NativeWrappedTool {
222    pub fn new(name: String) -> Self {
223        Self { name }
224    }
225}
226
227#[async_trait]
228impl Tool for NativeWrappedTool {
229    async fn execute(&self, args: String, _ctx: ToolContext) -> ToolResult {
230        tool_executor::dispatch(&self.name, &args).unwrap_or_else(|| ToolResult {
231            success: false,
232            output: format!("native tool '{}' not registered", self.name),
233            tool_name: self.name.clone(),
234        })
235    }
236
237    async fn stream(&self, args: String, ctx: ToolContext) -> ToolStream {
238        let result = self.execute(args, ctx).await;
239        Box::pin(stream::iter(vec![ToolChunk::from_result(&result)]))
240    }
241
242    fn is_streaming(&self) -> bool {
243        // Built-ins default to non-streaming. The dispatcher only
244        // reaches this impl when the entry's is_streaming flag was
245        // set programmatically by an adopter.
246        false
247    }
248}
249
250// ════════════════════════════════════════════════════════════════════
251//  SyncFallbackTool — for providers without 34.d streaming coverage
252// ════════════════════════════════════════════════════════════════════
253
254/// Fallback impl for providers without a dedicated streaming adapter
255/// in 34.d's scope (http / mcp / unknown). The `stream()` method
256/// emits a single error-terminator chunk indicating that the
257/// provider's streaming surface lands in a later sub-fase
258/// (Fase 34.e for HTTP, Fase 34.f for MCP).
259///
260/// This is a HONEST fallback — it does NOT silently coerce a
261/// streaming declaration into a synchronous call. Adopters who
262/// declare a stream effect on an HTTP/MCP tool today see a clear
263/// `ToolFinishReason::Error { message: "streaming adapter not
264/// yet implemented for provider 'http' — pending Fase 34.e" }`
265/// terminator chunk. After 34.e/f, the bridge's `match` arms
266/// route these providers to their dedicated streaming impls.
267pub struct SyncFallbackTool {
268    name: String,
269    provider: String,
270}
271
272impl SyncFallbackTool {
273    pub fn new(name: String, provider: String) -> Self {
274        Self { name, provider }
275    }
276}
277
278#[async_trait]
279impl Tool for SyncFallbackTool {
280    async fn execute(&self, _args: String, _ctx: ToolContext) -> ToolResult {
281        ToolResult {
282            success: false,
283            output: format!(
284                "synchronous fallback for provider '{}' tool '{}' — \
285                 streaming dispatch only resolves stream-effect tools \
286                 via dedicated provider adapters (Fase 34.e HTTP / \
287                 Fase 34.f MCP).",
288                self.provider, self.name
289            ),
290            tool_name: self.name.clone(),
291        }
292    }
293
294    async fn stream(&self, args: String, ctx: ToolContext) -> ToolStream {
295        let provider = self.provider.clone();
296        let result = self.execute(args, ctx).await;
297        // Post-Fase 34.e, the `http` arm of the bridge resolves to
298        // [`crate::http_tool::HttpStreamingTool`] for entries with a
299        // valid `runtime:` URL. SyncFallbackTool is only reached for
300        // http when [`crate::http_tool::HttpStreamingTool::from_entry`]
301        // returns Err (empty / non-http scheme). For mcp, 34.f is
302        // pending. Any other unknown provider falls here.
303        let hint = match provider.as_str() {
304            "http" => {
305                "Fase 34.e shipped HTTP streaming — verify the \
306                 tool's `runtime:` URL starts with http:// or https://"
307            }
308            "mcp" => {
309                "Fase 34.f shipped MCP streaming — verify the \
310                 tool's `runtime:` URL starts with http:// or https://"
311            }
312            _ => "no dedicated streaming adapter — pending later sub-fase",
313        };
314        let error_msg = format!(
315            "streaming dispatch fallback for provider '{provider}' tool '{name}' \
316             ({hint}); synchronous fallback returned: {output}",
317            name = self.name,
318            output = result.output,
319        );
320        Box::pin(stream::iter(vec![ToolChunk::terminator(
321            String::new(),
322            ToolFinishReason::Error { message: error_msg },
323        )]))
324    }
325
326    fn is_streaming(&self) -> bool {
327        // Honestly false — this impl is the placeholder, not a real
328        // stream producer. Adopters who declared a stream effect
329        // see the error-terminator at the wire layer + can fall
330        // back to a different provider until 34.e/f ship.
331        false
332    }
333}
334
335// ════════════════════════════════════════════════════════════════════
336//  extract_stream_policy — pull `<stream:<policy>>` from effect_row
337// ════════════════════════════════════════════════════════════════════
338
339/// Extract the declared [`crate::stream_effect::BackpressurePolicy`]
340/// from a tool's `effect_row`. Returns `None` when:
341///
342/// - No `stream:<policy>` entry is present (the tool is not flagged
343///   as streaming).
344/// - The `stream:<policy>` entry's policy slug is not in the closed
345///   catalog (defensive — parser rejects unknown slugs at compile
346///   time, but the runtime stays robust against stale source).
347///
348/// When multiple `stream:<policy>` entries appear (malformed
349/// source), the FIRST one wins. Parser enforces single-policy per
350/// tool declaration at compile time.
351pub fn extract_stream_policy(
352    effect_row: &[String],
353) -> Option<crate::stream_effect::BackpressurePolicy> {
354    for entry in effect_row {
355        if let Some(rest) = entry.strip_prefix("stream:") {
356            // The closed-catalog `BackpressurePolicy::from_slug`
357            // returns None on unknown slugs (defensive).
358            if let Some(policy) =
359                crate::stream_effect::BackpressurePolicy::from_slug(rest)
360            {
361                return Some(policy);
362            }
363        }
364    }
365    None
366}
367
368// ════════════════════════════════════════════════════════════════════
369//  Build a per-tool-invocation ToolContext
370// ════════════════════════════════════════════════════════════════════
371
372/// Construct a fresh [`ToolContext`] for a tool invocation given the
373/// dispatcher's cancel flag + trace_id. Centralizes the
374/// construction so the dispatcher's branch doesn't duplicate the
375/// pattern across multiple call sites.
376pub fn build_tool_context(cancel: CancellationFlag, trace_id: u64) -> ToolContext {
377    ToolContext::new(cancel, trace_id)
378}
379
380// ════════════════════════════════════════════════════════════════════
381//  Lib unit tests
382// ════════════════════════════════════════════════════════════════════
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387    use crate::stream_effect::BackpressurePolicy;
388    use crate::tool_registry::ToolSource;
389    use futures::StreamExt;
390
391    fn entry(name: &str, provider: &str, effect_row: Vec<String>) -> ToolEntry {
392        let is_streaming = crate::tool_registry::derive_is_streaming(&effect_row);
393        ToolEntry {
394            name: name.to_string(),
395            provider: provider.to_string(),
396            timeout: String::new(),
397            runtime: String::new(),
398            sandbox: None,
399            max_results: None,
400            output_schema: String::new(),
401            effect_row,
402            parameters: Vec::new(),
403            source: ToolSource::Program,
404            is_streaming,
405        }
406    }
407
408    // ─── resolve_streaming_tool dispatch table ─────────────────────
409
410    #[test]
411    fn resolve_stub_provider_returns_stub_streaming_tool() {
412        let e = entry("MyTool", "stub", vec!["stream:drop_oldest".into()]);
413        let tool = resolve_streaming_tool(&e);
414        assert!(tool.is_streaming());
415    }
416
417    #[test]
418    fn resolve_stub_stream_alias_returns_stub_streaming_tool() {
419        let e = entry("MyTool", "stub_stream", vec!["stream:fail".into()]);
420        let tool = resolve_streaming_tool(&e);
421        assert!(tool.is_streaming());
422    }
423
424    #[test]
425    fn resolve_native_provider_returns_native_wrapped_tool() {
426        let e = entry("Calculator", "native", vec!["compute".into()]);
427        let tool = resolve_streaming_tool(&e);
428        assert!(!tool.is_streaming()); // NativeWrappedTool reports false
429    }
430
431    #[test]
432    fn resolve_http_provider_with_valid_url_returns_http_streaming_tool() {
433        // §Fase 34.e — `http` arm now resolves to HttpStreamingTool
434        // (when the runtime URL is valid). The HttpStreamingTool
435        // reports `is_streaming() = true` (first-class streaming
436        // surface).
437        let mut e = entry("HttpTool", "http", vec!["stream:drop_oldest".into()]);
438        e.runtime = "https://example.com/api".to_string();
439        e.timeout = "10s".to_string();
440        let tool = resolve_streaming_tool(&e);
441        assert!(tool.is_streaming());
442    }
443
444    #[test]
445    fn resolve_http_provider_with_invalid_url_falls_back_to_sync_fallback() {
446        // Defensive: parser rejects invalid URLs, but if a malformed
447        // runtime URL reaches the registry, the bridge falls back to
448        // the honest SyncFallbackTool (is_streaming = false) so the
449        // consumer sees a structured error-terminator chunk.
450        let mut e = entry("HttpTool", "http", vec!["stream:drop_oldest".into()]);
451        e.runtime = "ftp://example.com/api".to_string(); // bad scheme
452        let tool = resolve_streaming_tool(&e);
453        assert!(!tool.is_streaming());
454    }
455
456    #[test]
457    fn resolve_http_provider_with_empty_url_falls_back_to_sync_fallback() {
458        let mut e = entry("HttpTool", "http", vec!["stream:drop_oldest".into()]);
459        e.runtime = String::new();
460        let tool = resolve_streaming_tool(&e);
461        assert!(!tool.is_streaming());
462    }
463
464    #[test]
465    fn resolve_mcp_provider_with_valid_url_returns_mcp_streaming_tool() {
466        // §Fase 34.f — `mcp` arm now resolves to McpStreamingTool
467        // (when the runtime URL is valid). McpStreamingTool reports
468        // `is_streaming() = true` (first-class streaming surface).
469        let mut e = entry("McpTool", "mcp", vec!["stream:fail".into()]);
470        e.runtime = "http://localhost:3000/mcp".to_string();
471        e.timeout = "10s".to_string();
472        let tool = resolve_streaming_tool(&e);
473        assert!(tool.is_streaming());
474    }
475
476    #[test]
477    fn resolve_mcp_provider_with_invalid_url_falls_back_to_sync_fallback() {
478        let mut e = entry("McpTool", "mcp", vec!["stream:fail".into()]);
479        e.runtime = "ws://localhost:3000".to_string(); // wrong scheme
480        let tool = resolve_streaming_tool(&e);
481        assert!(!tool.is_streaming());
482    }
483
484    #[test]
485    fn resolve_mcp_provider_with_empty_url_falls_back_to_sync_fallback() {
486        let mut e = entry("McpTool", "mcp", vec!["stream:fail".into()]);
487        e.runtime = String::new();
488        let tool = resolve_streaming_tool(&e);
489        assert!(!tool.is_streaming());
490    }
491
492    #[test]
493    fn resolve_unknown_provider_falls_through() {
494        let e = entry("CustomTool", "custom_xyz", vec![]);
495        let tool = resolve_streaming_tool(&e);
496        assert!(!tool.is_streaming());
497    }
498
499    // ─── StubStreamingTool emits 4-frame stream ────────────────────
500
501    #[tokio::test]
502    async fn stub_streaming_tool_emits_4_frame_sequence() {
503        let tool = StubStreamingTool::new("Search".to_string());
504        let cancel = CancellationFlag::new();
505        let ctx = ToolContext::new(cancel, 0x42);
506        let mut stream = tool.stream("query=axon".to_string(), ctx).await;
507
508        let chunks: Vec<ToolChunk> = {
509            let mut v = Vec::new();
510            while let Some(c) = stream.next().await {
511                v.push(c);
512            }
513            v
514        };
515        assert_eq!(chunks.len(), 4);
516        assert_eq!(chunks[0].delta, "[stub-stream] Search(");
517        assert_eq!(chunks[1].delta, "query=axon");
518        assert_eq!(chunks[2].delta, ")");
519        assert!(chunks[3].is_terminator());
520        assert_eq!(chunks[3].finish_reason, Some(ToolFinishReason::Stop));
521    }
522
523    // ─── StubStreamingTool pre-cancel emits cancelled-terminator ──
524
525    #[tokio::test]
526    async fn stub_streaming_tool_pre_cancel_emits_cancelled_terminator_only() {
527        let tool = StubStreamingTool::new("Search".to_string());
528        let cancel = CancellationFlag::new();
529        cancel.cancel(); // Fire BEFORE invoking stream().
530        let ctx = ToolContext::new(cancel, 0x42);
531        let mut stream = tool.stream("query=axon".to_string(), ctx).await;
532
533        let chunks: Vec<ToolChunk> = {
534            let mut v = Vec::new();
535            while let Some(c) = stream.next().await {
536                v.push(c);
537            }
538            v
539        };
540        assert_eq!(chunks.len(), 1);
541        assert!(chunks[0].is_terminator());
542        assert_eq!(chunks[0].finish_reason, Some(ToolFinishReason::Cancelled));
543    }
544
545    // ─── NativeWrappedTool wraps execute() as single-chunk ────────
546
547    #[tokio::test]
548    async fn native_wrapped_tool_wraps_calculator_as_single_chunk() {
549        let tool = NativeWrappedTool::new("Calculator".to_string());
550        let cancel = CancellationFlag::new();
551        let ctx = ToolContext::new(cancel, 0);
552        let mut stream = tool.stream("2 + 3".to_string(), ctx).await;
553        let first = stream.next().await.expect("at least one chunk");
554        assert_eq!(first.delta, "5");
555        assert_eq!(first.finish_reason, Some(ToolFinishReason::Stop));
556        assert!(first.is_terminator());
557        // Single-chunk stream — no second chunk.
558        assert!(stream.next().await.is_none());
559    }
560
561    // ─── SyncFallbackTool emits error-terminator with hint ────────
562
563    #[tokio::test]
564    async fn sync_fallback_tool_for_http_emits_error_terminator() {
565        // Post-34.e: SyncFallbackTool for `http` is only reached
566        // when [`HttpStreamingTool::from_entry`] fails (invalid
567        // URL). The error hint MUST point at URL validation rather
568        // than the (now-shipped) Fase 34.e itself.
569        let tool = SyncFallbackTool::new("HttpTool".to_string(), "http".to_string());
570        let cancel = CancellationFlag::new();
571        let ctx = ToolContext::new(cancel, 0);
572        let mut stream = tool.stream("arg".to_string(), ctx).await;
573        let chunk = stream.next().await.expect("at least one chunk");
574        assert!(chunk.is_terminator());
575        match chunk.finish_reason {
576            Some(ToolFinishReason::Error { ref message }) => {
577                assert!(message.contains("Fase 34.e"));
578                assert!(message.contains("http"));
579                assert!(
580                    message.contains("runtime") || message.contains("URL"),
581                    "post-34.e fallback hint must reference URL validation: {message}"
582                );
583            }
584            other => panic!("expected Error finish_reason, got {other:?}"),
585        }
586    }
587
588    #[tokio::test]
589    async fn sync_fallback_tool_for_mcp_emits_error_terminator() {
590        // Post-34.f: SyncFallbackTool for `mcp` is only reached
591        // when [`McpStreamingTool::from_entry`] fails (invalid URL).
592        // The error hint MUST point at URL validation rather than
593        // the (now-shipped) Fase 34.f itself.
594        let tool = SyncFallbackTool::new("McpTool".to_string(), "mcp".to_string());
595        let cancel = CancellationFlag::new();
596        let ctx = ToolContext::new(cancel, 0);
597        let mut stream = tool.stream("arg".to_string(), ctx).await;
598        let chunk = stream.next().await.expect("at least one chunk");
599        match chunk.finish_reason {
600            Some(ToolFinishReason::Error { ref message }) => {
601                assert!(message.contains("Fase 34.f"));
602                assert!(message.contains("mcp"));
603                assert!(
604                    message.contains("runtime") || message.contains("URL"),
605                    "post-34.f fallback hint must reference URL validation: {message}"
606                );
607            }
608            other => panic!("expected Error finish_reason, got {other:?}"),
609        }
610    }
611
612    // ─── extract_stream_policy ─────────────────────────────────────
613
614    #[test]
615    fn extract_stream_policy_returns_none_for_empty_effect_row() {
616        assert_eq!(extract_stream_policy(&[]), None);
617    }
618
619    #[test]
620    fn extract_stream_policy_returns_none_for_non_stream_effects() {
621        assert_eq!(
622            extract_stream_policy(&[
623                "compute".into(),
624                "network".into(),
625                "io".into(),
626            ]),
627            None
628        );
629    }
630
631    #[test]
632    fn extract_stream_policy_parses_drop_oldest() {
633        assert_eq!(
634            extract_stream_policy(&["stream:drop_oldest".into()]),
635            Some(BackpressurePolicy::DropOldest)
636        );
637    }
638
639    #[test]
640    fn extract_stream_policy_parses_all_four_closed_catalog_policies() {
641        assert_eq!(
642            extract_stream_policy(&["stream:drop_oldest".into()]),
643            Some(BackpressurePolicy::DropOldest)
644        );
645        assert_eq!(
646            extract_stream_policy(&["stream:degrade_quality".into()]),
647            Some(BackpressurePolicy::DegradeQuality)
648        );
649        assert_eq!(
650            extract_stream_policy(&["stream:pause_upstream".into()]),
651            Some(BackpressurePolicy::PauseUpstream)
652        );
653        assert_eq!(
654            extract_stream_policy(&["stream:fail".into()]),
655            Some(BackpressurePolicy::Fail)
656        );
657    }
658
659    #[test]
660    fn extract_stream_policy_ignores_unknown_slug() {
661        // Defensive: parser rejects unknown slugs at compile time,
662        // but the runtime stays robust if stale source somehow
663        // reaches the registry.
664        assert_eq!(
665            extract_stream_policy(&["stream:nonsense_xyz".into()]),
666            None
667        );
668    }
669
670    #[test]
671    fn extract_stream_policy_first_wins_on_multiple() {
672        // Defensive: malformed source might have multiple
673        // stream entries. First-wins is the policy.
674        assert_eq!(
675            extract_stream_policy(&[
676                "stream:drop_oldest".into(),
677                "stream:fail".into(),
678            ]),
679            Some(BackpressurePolicy::DropOldest)
680        );
681    }
682
683    // ─── build_tool_context ────────────────────────────────────────
684
685    #[test]
686    fn build_tool_context_wires_cancel_and_trace_id() {
687        let cancel = CancellationFlag::new();
688        let ctx = build_tool_context(cancel.clone(), 0xCAFE_BABE);
689        assert_eq!(ctx.trace_id, 0xCAFE_BABE);
690        assert!(!ctx.is_cancelled());
691        cancel.cancel();
692        assert!(ctx.is_cancelled());
693    }
694}