axon/tool_dispatch_bridge.rs
1//! §Fase 34.d (v1.29.0) — Bridge between [`ToolEntry`] (the registry
2//! shape) and [`Tool`] trait impls (the dispatcher's streaming
3//! surface).
4//!
5//! The dispatcher's `pure_shape::run_step` calls
6//! [`resolve_streaming_tool`] to obtain a [`Tool`] trait object for
7//! a given registry entry. The returned object's `stream()` method
8//! produces a `Stream<ToolChunk>` the dispatcher drains chunk-by-
9//! chunk + emits as `FlowExecutionEvent::StepToken` events on the
10//! wire.
11//!
12//! # 34.d-scoped provider coverage
13//!
14//! - **`stub`** → [`StubStreamingTool`] — emits a deterministic
15//! 3-chunk stream (intermediate × 3 + terminator). The reference
16//! stream-producer for testing the dispatch path under all
17//! policy / cancel / audit invariants.
18//! - **`native`** → [`NativeWrappedTool`] — wraps the existing
19//! `tool_executor::dispatch` (Calculator / DateTimeTool) as a
20//! single-chunk stream. D9 backwards-compat: built-in tools
21//! that don't declare a stream effect still produce a Tool impl,
22//! they just emit 1 chunk via the `from_result` path.
23//! - **`stub_stream`** → [`StubStreamingTool`] — explicit alias
24//! that adopters can declare in source via
25//! `tool MyStream { provider: "stub_stream" effects:
26//! <stream:drop_oldest> ... }`. Same impl as `stub` but the
27//! declaration documents the streaming intent.
28//! - **`http`** → [`crate::http_tool::HttpStreamingTool`] (Fase 34.e).
29//! Async reqwest::Client + `bytes_stream()` drain with framing
30//! classified by Content-Type: `text/event-stream` →
31//! per-SSE-event ToolChunks; `application/x-ndjson` /
32//! `application/jsonl` → per-line ToolChunks; everything else →
33//! single-chunk wrap (D9 backwards-compat). HONEST error
34//! terminator on every failure surface. Falls back to
35//! [`SyncFallbackTool`] when the registry entry has an
36//! invalid/missing URL (parser would already reject, defensive).
37//! - **`mcp`** → [`crate::emcp::McpStreamingTool`] (Fase 34.f). Async
38//! reqwest::Client + JSON-RPC 2.0 over HTTP. Streaming MCP servers
39//! (Content-Type `application/x-ndjson` / `application/jsonl`)
40//! emit per-`notifications/message` ToolChunks; the final `result`
41//! envelope closes the stream. Non-streaming MCP servers
42//! (Content-Type `application/json`) fall back to D9 single-chunk
43//! wrap. Best-effort `notifications/cancelled` notification fired
44//! on cancel. Falls back to [`SyncFallbackTool`] when the registry
45//! entry has an invalid/missing server URL.
46//!
47//! # Cross-stack contract
48//!
49//! Python's `axon.runtime.tools.streaming.Tool` ABC + provider-
50//! dispatch surface in `axon.runtime.tools.dispatcher` mirror this
51//! Rust bridge. Drift gate alignment lives in Fase 34.j fuzz.
52
53use crate::cancel_token::CancellationFlag;
54use crate::tool_executor::{self, ToolResult};
55use crate::tool_registry::ToolEntry;
56use crate::tool_trait::{Tool, ToolChunk, ToolContext, ToolFinishReason, ToolStream};
57use async_trait::async_trait;
58use futures::stream;
59
60// ════════════════════════════════════════════════════════════════════
61// Factory
62// ════════════════════════════════════════════════════════════════════
63
64/// Resolve a [`Tool`] trait object from a registry [`ToolEntry`].
65///
66/// The dispatcher (`pure_shape::run_step`) calls this when
67/// `entry.is_streaming` is true. The returned trait object's
68/// `stream()` method drives the per-chunk wire emission path.
69///
70/// # 34.f-scoped dispatch
71///
72/// | Provider | Impl | Behavior |
73/// |---|---|---|
74/// | _(empty)_ | [`StubStreamingTool`] | §Fase 36.x.e — an unspecified `provider:` resolves to the deterministic stub stream |
75/// | `stub` | [`StubStreamingTool`] | Deterministic 3-chunk stream |
76/// | `stub_stream` | [`StubStreamingTool`] | Alias for adopter clarity |
77/// | `native` | [`NativeWrappedTool`] | Wraps `tool_executor::dispatch` as 1-chunk |
78/// | `http` | [`crate::http_tool::HttpStreamingTool`] | Async reqwest + framing-aware drain (Fase 34.e) |
79/// | `mcp` | [`crate::emcp::McpStreamingTool`] | JSON-RPC 2.0 + notifications stream (Fase 34.f) |
80/// | _other_ | [`SyncFallbackTool`] | Synchronous fallback (unknown provider) |
81///
82/// §Fase 36.x.e — a `tool` declaring `effects: <stream:…>` but NO
83/// `provider:` is under-specified, not erroneous: it resolves to the
84/// deterministic stub stream (no external dependency) so the flow
85/// runs gracefully — the adopter adds a concrete `provider:` when
86/// ready. An unspecified streaming provider that hard-errored the
87/// flow (via the `SyncFallbackTool` `_other_` arm) was the masked
88/// regression the Fase 36.x.c terminator fix surfaced.
89pub fn resolve_streaming_tool(entry: &ToolEntry) -> Box<dyn Tool> {
90 match entry.provider.trim() {
91 "" | "stub" | "stub_stream" => {
92 Box::new(StubStreamingTool::new(entry.name.clone()))
93 }
94 "native" => {
95 Box::new(NativeWrappedTool::new(entry.name.clone()))
96 }
97 "http" => match crate::http_tool::HttpStreamingTool::from_entry(entry) {
98 Ok(t) => Box::new(t),
99 // Defensive: parser rejects invalid URLs at compile time,
100 // but if a malformed runtime URL reaches the registry
101 // (e.g. programmatic registration in tests / hot-reload
102 // edge), fall back to the honest SyncFallbackTool so the
103 // consumer sees a structured error-terminator chunk
104 // instead of a panic.
105 Err(_) => Box::new(SyncFallbackTool::new(
106 entry.name.clone(),
107 "http".to_string(),
108 )),
109 },
110 "mcp" => match crate::emcp::McpStreamingTool::from_entry(entry) {
111 Ok(t) => Box::new(t),
112 // Defensive: same shape as the http arm — bad MCP server
113 // URL → honest fallback.
114 Err(_) => Box::new(SyncFallbackTool::new(
115 entry.name.clone(),
116 "mcp".to_string(),
117 )),
118 },
119 // Unknown provider → synchronous fallback. Adopters declaring
120 // a custom provider see the honest error-terminator at the
121 // wire layer.
122 other => Box::new(SyncFallbackTool::new(
123 entry.name.clone(),
124 other.to_string(),
125 )),
126 }
127}
128
129// ════════════════════════════════════════════════════════════════════
130// StubStreamingTool — deterministic 3-chunk reference stream
131// ════════════════════════════════════════════════════════════════════
132
133/// Synthetic stream producer for testing the dispatcher's streaming
134/// arm. Emits a deterministic 4-frame sequence per invocation:
135///
136/// ```text
137/// ToolChunk::intermediate("[stub-stream] <name>(")
138/// ToolChunk::intermediate(<args>)
139/// ToolChunk::intermediate(")")
140/// ToolChunk::terminator("", ToolFinishReason::Stop)
141/// ```
142///
143/// Cancel-safe: between every chunk emission, the tool polls
144/// `ctx.cancel`; if fired, the stream short-circuits to a
145/// `ToolFinishReason::Cancelled` terminator chunk. The pre-cancel
146/// chunks already emitted reach the consumer; the post-cancel
147/// chunks are skipped. D5 p95 ≤100ms invariant honored at the
148/// chunk boundary.
149pub struct StubStreamingTool {
150 name: String,
151}
152
153impl StubStreamingTool {
154 /// Construct a new stub streaming tool with the given name.
155 pub fn new(name: String) -> Self {
156 Self { name }
157 }
158}
159
160#[async_trait]
161impl Tool for StubStreamingTool {
162 async fn execute(&self, args: String, _ctx: ToolContext) -> ToolResult {
163 // Synchronous path — kept available so the default
164 // `stream()` impl can fall back if needed. Adopters
165 // calling execute() directly get the materialized form.
166 ToolResult {
167 success: true,
168 output: format!("[stub-stream-tool] {}({args})", self.name),
169 tool_name: self.name.clone(),
170 }
171 }
172
173 async fn stream(&self, args: String, ctx: ToolContext) -> ToolStream {
174 let name = self.name.clone();
175 let cancel = ctx.cancel.clone();
176
177 // Materialize the chunk sequence eagerly. The runtime check
178 // for cancel happens at the dispatcher's drain boundary
179 // (see pure_shape::drain_stream_tool) — the dispatcher polls
180 // the cancel flag between consuming each chunk + emitting
181 // it on the wire, so stream lazily-yielded chunks honor the
182 // cancel discipline at the consumer side.
183 let chunks: Vec<ToolChunk> = if cancel.is_cancelled() {
184 // Pre-cancel: emit a cancelled-terminator and bail.
185 vec![ToolChunk::terminator("", ToolFinishReason::Cancelled)]
186 } else {
187 vec![
188 ToolChunk::intermediate(format!("[stub-stream] {name}(")),
189 ToolChunk::intermediate(args),
190 ToolChunk::intermediate(")"),
191 ToolChunk::terminator("", ToolFinishReason::Stop),
192 ]
193 };
194 Box::pin(stream::iter(chunks))
195 }
196
197 fn is_streaming(&self) -> bool {
198 true
199 }
200}
201
202// ════════════════════════════════════════════════════════════════════
203// NativeWrappedTool — bridges built-in tools to the streaming surface
204// ════════════════════════════════════════════════════════════════════
205
206/// Wraps a built-in tool (Calculator / DateTimeTool) as a single-
207/// chunk stream. D9 backwards-compat: built-ins keep working
208/// byte-equal; their materialized output becomes the terminator
209/// chunk's `delta`.
210///
211/// This impl is rarely exercised by 34.d test paths (built-ins
212/// don't declare stream effects, so `is_streaming` is false on
213/// their registry entries and the dispatcher takes the
214/// synchronous path). The impl exists for completeness — adopters
215/// who programmatically flag a built-in as streaming (e.g. for
216/// testing) get a working single-chunk stream.
217pub struct NativeWrappedTool {
218 name: String,
219}
220
221impl NativeWrappedTool {
222 pub fn new(name: String) -> Self {
223 Self { name }
224 }
225}
226
227#[async_trait]
228impl Tool for NativeWrappedTool {
229 async fn execute(&self, args: String, _ctx: ToolContext) -> ToolResult {
230 tool_executor::dispatch(&self.name, &args).unwrap_or_else(|| ToolResult {
231 success: false,
232 output: format!("native tool '{}' not registered", self.name),
233 tool_name: self.name.clone(),
234 })
235 }
236
237 async fn stream(&self, args: String, ctx: ToolContext) -> ToolStream {
238 let result = self.execute(args, ctx).await;
239 Box::pin(stream::iter(vec![ToolChunk::from_result(&result)]))
240 }
241
242 fn is_streaming(&self) -> bool {
243 // Built-ins default to non-streaming. The dispatcher only
244 // reaches this impl when the entry's is_streaming flag was
245 // set programmatically by an adopter.
246 false
247 }
248}
249
250// ════════════════════════════════════════════════════════════════════
251// SyncFallbackTool — for providers without 34.d streaming coverage
252// ════════════════════════════════════════════════════════════════════
253
254/// Fallback impl for providers without a dedicated streaming adapter
255/// in 34.d's scope (http / mcp / unknown). The `stream()` method
256/// emits a single error-terminator chunk indicating that the
257/// provider's streaming surface lands in a later sub-fase
258/// (Fase 34.e for HTTP, Fase 34.f for MCP).
259///
260/// This is a HONEST fallback — it does NOT silently coerce a
261/// streaming declaration into a synchronous call. Adopters who
262/// declare a stream effect on an HTTP/MCP tool today see a clear
263/// `ToolFinishReason::Error { message: "streaming adapter not
264/// yet implemented for provider 'http' — pending Fase 34.e" }`
265/// terminator chunk. After 34.e/f, the bridge's `match` arms
266/// route these providers to their dedicated streaming impls.
267pub struct SyncFallbackTool {
268 name: String,
269 provider: String,
270}
271
272impl SyncFallbackTool {
273 pub fn new(name: String, provider: String) -> Self {
274 Self { name, provider }
275 }
276}
277
278#[async_trait]
279impl Tool for SyncFallbackTool {
280 async fn execute(&self, _args: String, _ctx: ToolContext) -> ToolResult {
281 ToolResult {
282 success: false,
283 output: format!(
284 "synchronous fallback for provider '{}' tool '{}' — \
285 streaming dispatch only resolves stream-effect tools \
286 via dedicated provider adapters (Fase 34.e HTTP / \
287 Fase 34.f MCP).",
288 self.provider, self.name
289 ),
290 tool_name: self.name.clone(),
291 }
292 }
293
294 async fn stream(&self, args: String, ctx: ToolContext) -> ToolStream {
295 let provider = self.provider.clone();
296 let result = self.execute(args, ctx).await;
297 // Post-Fase 34.e, the `http` arm of the bridge resolves to
298 // [`crate::http_tool::HttpStreamingTool`] for entries with a
299 // valid `runtime:` URL. SyncFallbackTool is only reached for
300 // http when [`crate::http_tool::HttpStreamingTool::from_entry`]
301 // returns Err (empty / non-http scheme). For mcp, 34.f is
302 // pending. Any other unknown provider falls here.
303 let hint = match provider.as_str() {
304 "http" => {
305 "Fase 34.e shipped HTTP streaming — verify the \
306 tool's `runtime:` URL starts with http:// or https://"
307 }
308 "mcp" => {
309 "Fase 34.f shipped MCP streaming — verify the \
310 tool's `runtime:` URL starts with http:// or https://"
311 }
312 _ => "no dedicated streaming adapter — pending later sub-fase",
313 };
314 let error_msg = format!(
315 "streaming dispatch fallback for provider '{provider}' tool '{name}' \
316 ({hint}); synchronous fallback returned: {output}",
317 name = self.name,
318 output = result.output,
319 );
320 Box::pin(stream::iter(vec![ToolChunk::terminator(
321 String::new(),
322 ToolFinishReason::Error { message: error_msg },
323 )]))
324 }
325
326 fn is_streaming(&self) -> bool {
327 // Honestly false — this impl is the placeholder, not a real
328 // stream producer. Adopters who declared a stream effect
329 // see the error-terminator at the wire layer + can fall
330 // back to a different provider until 34.e/f ship.
331 false
332 }
333}
334
335// ════════════════════════════════════════════════════════════════════
336// extract_stream_policy — pull `<stream:<policy>>` from effect_row
337// ════════════════════════════════════════════════════════════════════
338
339/// Extract the declared [`crate::stream_effect::BackpressurePolicy`]
340/// from a tool's `effect_row`. Returns `None` when:
341///
342/// - No `stream:<policy>` entry is present (the tool is not flagged
343/// as streaming).
344/// - The `stream:<policy>` entry's policy slug is not in the closed
345/// catalog (defensive — parser rejects unknown slugs at compile
346/// time, but the runtime stays robust against stale source).
347///
348/// When multiple `stream:<policy>` entries appear (malformed
349/// source), the FIRST one wins. Parser enforces single-policy per
350/// tool declaration at compile time.
351pub fn extract_stream_policy(
352 effect_row: &[String],
353) -> Option<crate::stream_effect::BackpressurePolicy> {
354 for entry in effect_row {
355 if let Some(rest) = entry.strip_prefix("stream:") {
356 // The closed-catalog `BackpressurePolicy::from_slug`
357 // returns None on unknown slugs (defensive).
358 if let Some(policy) =
359 crate::stream_effect::BackpressurePolicy::from_slug(rest)
360 {
361 return Some(policy);
362 }
363 }
364 }
365 None
366}
367
368// ════════════════════════════════════════════════════════════════════
369// Build a per-tool-invocation ToolContext
370// ════════════════════════════════════════════════════════════════════
371
372/// Construct a fresh [`ToolContext`] for a tool invocation given the
373/// dispatcher's cancel flag + trace_id. Centralizes the
374/// construction so the dispatcher's branch doesn't duplicate the
375/// pattern across multiple call sites.
376pub fn build_tool_context(cancel: CancellationFlag, trace_id: u64) -> ToolContext {
377 ToolContext::new(cancel, trace_id)
378}
379
380// ════════════════════════════════════════════════════════════════════
381// Lib unit tests
382// ════════════════════════════════════════════════════════════════════
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387 use crate::stream_effect::BackpressurePolicy;
388 use crate::tool_registry::ToolSource;
389 use futures::StreamExt;
390
391 fn entry(name: &str, provider: &str, effect_row: Vec<String>) -> ToolEntry {
392 let is_streaming = crate::tool_registry::derive_is_streaming(&effect_row);
393 ToolEntry {
394 name: name.to_string(),
395 provider: provider.to_string(),
396 timeout: String::new(),
397 runtime: String::new(),
398 sandbox: None,
399 max_results: None,
400 output_schema: String::new(),
401 effect_row,
402 parameters: Vec::new(),
403 source: ToolSource::Program,
404 is_streaming,
405 }
406 }
407
408 // ─── resolve_streaming_tool dispatch table ─────────────────────
409
410 #[test]
411 fn resolve_stub_provider_returns_stub_streaming_tool() {
412 let e = entry("MyTool", "stub", vec!["stream:drop_oldest".into()]);
413 let tool = resolve_streaming_tool(&e);
414 assert!(tool.is_streaming());
415 }
416
417 #[test]
418 fn resolve_stub_stream_alias_returns_stub_streaming_tool() {
419 let e = entry("MyTool", "stub_stream", vec!["stream:fail".into()]);
420 let tool = resolve_streaming_tool(&e);
421 assert!(tool.is_streaming());
422 }
423
424 #[test]
425 fn resolve_native_provider_returns_native_wrapped_tool() {
426 let e = entry("Calculator", "native", vec!["compute".into()]);
427 let tool = resolve_streaming_tool(&e);
428 assert!(!tool.is_streaming()); // NativeWrappedTool reports false
429 }
430
431 #[test]
432 fn resolve_http_provider_with_valid_url_returns_http_streaming_tool() {
433 // §Fase 34.e — `http` arm now resolves to HttpStreamingTool
434 // (when the runtime URL is valid). The HttpStreamingTool
435 // reports `is_streaming() = true` (first-class streaming
436 // surface).
437 let mut e = entry("HttpTool", "http", vec!["stream:drop_oldest".into()]);
438 e.runtime = "https://example.com/api".to_string();
439 e.timeout = "10s".to_string();
440 let tool = resolve_streaming_tool(&e);
441 assert!(tool.is_streaming());
442 }
443
444 #[test]
445 fn resolve_http_provider_with_invalid_url_falls_back_to_sync_fallback() {
446 // Defensive: parser rejects invalid URLs, but if a malformed
447 // runtime URL reaches the registry, the bridge falls back to
448 // the honest SyncFallbackTool (is_streaming = false) so the
449 // consumer sees a structured error-terminator chunk.
450 let mut e = entry("HttpTool", "http", vec!["stream:drop_oldest".into()]);
451 e.runtime = "ftp://example.com/api".to_string(); // bad scheme
452 let tool = resolve_streaming_tool(&e);
453 assert!(!tool.is_streaming());
454 }
455
456 #[test]
457 fn resolve_http_provider_with_empty_url_falls_back_to_sync_fallback() {
458 let mut e = entry("HttpTool", "http", vec!["stream:drop_oldest".into()]);
459 e.runtime = String::new();
460 let tool = resolve_streaming_tool(&e);
461 assert!(!tool.is_streaming());
462 }
463
464 #[test]
465 fn resolve_mcp_provider_with_valid_url_returns_mcp_streaming_tool() {
466 // §Fase 34.f — `mcp` arm now resolves to McpStreamingTool
467 // (when the runtime URL is valid). McpStreamingTool reports
468 // `is_streaming() = true` (first-class streaming surface).
469 let mut e = entry("McpTool", "mcp", vec!["stream:fail".into()]);
470 e.runtime = "http://localhost:3000/mcp".to_string();
471 e.timeout = "10s".to_string();
472 let tool = resolve_streaming_tool(&e);
473 assert!(tool.is_streaming());
474 }
475
476 #[test]
477 fn resolve_mcp_provider_with_invalid_url_falls_back_to_sync_fallback() {
478 let mut e = entry("McpTool", "mcp", vec!["stream:fail".into()]);
479 e.runtime = "ws://localhost:3000".to_string(); // wrong scheme
480 let tool = resolve_streaming_tool(&e);
481 assert!(!tool.is_streaming());
482 }
483
484 #[test]
485 fn resolve_mcp_provider_with_empty_url_falls_back_to_sync_fallback() {
486 let mut e = entry("McpTool", "mcp", vec!["stream:fail".into()]);
487 e.runtime = String::new();
488 let tool = resolve_streaming_tool(&e);
489 assert!(!tool.is_streaming());
490 }
491
492 #[test]
493 fn resolve_unknown_provider_falls_through() {
494 let e = entry("CustomTool", "custom_xyz", vec![]);
495 let tool = resolve_streaming_tool(&e);
496 assert!(!tool.is_streaming());
497 }
498
499 // ─── StubStreamingTool emits 4-frame stream ────────────────────
500
501 #[tokio::test]
502 async fn stub_streaming_tool_emits_4_frame_sequence() {
503 let tool = StubStreamingTool::new("Search".to_string());
504 let cancel = CancellationFlag::new();
505 let ctx = ToolContext::new(cancel, 0x42);
506 let mut stream = tool.stream("query=axon".to_string(), ctx).await;
507
508 let chunks: Vec<ToolChunk> = {
509 let mut v = Vec::new();
510 while let Some(c) = stream.next().await {
511 v.push(c);
512 }
513 v
514 };
515 assert_eq!(chunks.len(), 4);
516 assert_eq!(chunks[0].delta, "[stub-stream] Search(");
517 assert_eq!(chunks[1].delta, "query=axon");
518 assert_eq!(chunks[2].delta, ")");
519 assert!(chunks[3].is_terminator());
520 assert_eq!(chunks[3].finish_reason, Some(ToolFinishReason::Stop));
521 }
522
523 // ─── StubStreamingTool pre-cancel emits cancelled-terminator ──
524
525 #[tokio::test]
526 async fn stub_streaming_tool_pre_cancel_emits_cancelled_terminator_only() {
527 let tool = StubStreamingTool::new("Search".to_string());
528 let cancel = CancellationFlag::new();
529 cancel.cancel(); // Fire BEFORE invoking stream().
530 let ctx = ToolContext::new(cancel, 0x42);
531 let mut stream = tool.stream("query=axon".to_string(), ctx).await;
532
533 let chunks: Vec<ToolChunk> = {
534 let mut v = Vec::new();
535 while let Some(c) = stream.next().await {
536 v.push(c);
537 }
538 v
539 };
540 assert_eq!(chunks.len(), 1);
541 assert!(chunks[0].is_terminator());
542 assert_eq!(chunks[0].finish_reason, Some(ToolFinishReason::Cancelled));
543 }
544
545 // ─── NativeWrappedTool wraps execute() as single-chunk ────────
546
547 #[tokio::test]
548 async fn native_wrapped_tool_wraps_calculator_as_single_chunk() {
549 let tool = NativeWrappedTool::new("Calculator".to_string());
550 let cancel = CancellationFlag::new();
551 let ctx = ToolContext::new(cancel, 0);
552 let mut stream = tool.stream("2 + 3".to_string(), ctx).await;
553 let first = stream.next().await.expect("at least one chunk");
554 assert_eq!(first.delta, "5");
555 assert_eq!(first.finish_reason, Some(ToolFinishReason::Stop));
556 assert!(first.is_terminator());
557 // Single-chunk stream — no second chunk.
558 assert!(stream.next().await.is_none());
559 }
560
561 // ─── SyncFallbackTool emits error-terminator with hint ────────
562
563 #[tokio::test]
564 async fn sync_fallback_tool_for_http_emits_error_terminator() {
565 // Post-34.e: SyncFallbackTool for `http` is only reached
566 // when [`HttpStreamingTool::from_entry`] fails (invalid
567 // URL). The error hint MUST point at URL validation rather
568 // than the (now-shipped) Fase 34.e itself.
569 let tool = SyncFallbackTool::new("HttpTool".to_string(), "http".to_string());
570 let cancel = CancellationFlag::new();
571 let ctx = ToolContext::new(cancel, 0);
572 let mut stream = tool.stream("arg".to_string(), ctx).await;
573 let chunk = stream.next().await.expect("at least one chunk");
574 assert!(chunk.is_terminator());
575 match chunk.finish_reason {
576 Some(ToolFinishReason::Error { ref message }) => {
577 assert!(message.contains("Fase 34.e"));
578 assert!(message.contains("http"));
579 assert!(
580 message.contains("runtime") || message.contains("URL"),
581 "post-34.e fallback hint must reference URL validation: {message}"
582 );
583 }
584 other => panic!("expected Error finish_reason, got {other:?}"),
585 }
586 }
587
588 #[tokio::test]
589 async fn sync_fallback_tool_for_mcp_emits_error_terminator() {
590 // Post-34.f: SyncFallbackTool for `mcp` is only reached
591 // when [`McpStreamingTool::from_entry`] fails (invalid URL).
592 // The error hint MUST point at URL validation rather than
593 // the (now-shipped) Fase 34.f itself.
594 let tool = SyncFallbackTool::new("McpTool".to_string(), "mcp".to_string());
595 let cancel = CancellationFlag::new();
596 let ctx = ToolContext::new(cancel, 0);
597 let mut stream = tool.stream("arg".to_string(), ctx).await;
598 let chunk = stream.next().await.expect("at least one chunk");
599 match chunk.finish_reason {
600 Some(ToolFinishReason::Error { ref message }) => {
601 assert!(message.contains("Fase 34.f"));
602 assert!(message.contains("mcp"));
603 assert!(
604 message.contains("runtime") || message.contains("URL"),
605 "post-34.f fallback hint must reference URL validation: {message}"
606 );
607 }
608 other => panic!("expected Error finish_reason, got {other:?}"),
609 }
610 }
611
612 // ─── extract_stream_policy ─────────────────────────────────────
613
614 #[test]
615 fn extract_stream_policy_returns_none_for_empty_effect_row() {
616 assert_eq!(extract_stream_policy(&[]), None);
617 }
618
619 #[test]
620 fn extract_stream_policy_returns_none_for_non_stream_effects() {
621 assert_eq!(
622 extract_stream_policy(&[
623 "compute".into(),
624 "network".into(),
625 "io".into(),
626 ]),
627 None
628 );
629 }
630
631 #[test]
632 fn extract_stream_policy_parses_drop_oldest() {
633 assert_eq!(
634 extract_stream_policy(&["stream:drop_oldest".into()]),
635 Some(BackpressurePolicy::DropOldest)
636 );
637 }
638
639 #[test]
640 fn extract_stream_policy_parses_all_four_closed_catalog_policies() {
641 assert_eq!(
642 extract_stream_policy(&["stream:drop_oldest".into()]),
643 Some(BackpressurePolicy::DropOldest)
644 );
645 assert_eq!(
646 extract_stream_policy(&["stream:degrade_quality".into()]),
647 Some(BackpressurePolicy::DegradeQuality)
648 );
649 assert_eq!(
650 extract_stream_policy(&["stream:pause_upstream".into()]),
651 Some(BackpressurePolicy::PauseUpstream)
652 );
653 assert_eq!(
654 extract_stream_policy(&["stream:fail".into()]),
655 Some(BackpressurePolicy::Fail)
656 );
657 }
658
659 #[test]
660 fn extract_stream_policy_ignores_unknown_slug() {
661 // Defensive: parser rejects unknown slugs at compile time,
662 // but the runtime stays robust if stale source somehow
663 // reaches the registry.
664 assert_eq!(
665 extract_stream_policy(&["stream:nonsense_xyz".into()]),
666 None
667 );
668 }
669
670 #[test]
671 fn extract_stream_policy_first_wins_on_multiple() {
672 // Defensive: malformed source might have multiple
673 // stream entries. First-wins is the policy.
674 assert_eq!(
675 extract_stream_policy(&[
676 "stream:drop_oldest".into(),
677 "stream:fail".into(),
678 ]),
679 Some(BackpressurePolicy::DropOldest)
680 );
681 }
682
683 // ─── build_tool_context ────────────────────────────────────────
684
685 #[test]
686 fn build_tool_context_wires_cancel_and_trace_id() {
687 let cancel = CancellationFlag::new();
688 let ctx = build_tool_context(cancel.clone(), 0xCAFE_BABE);
689 assert_eq!(ctx.trace_id, 0xCAFE_BABE);
690 assert!(!ctx.is_cancelled());
691 cancel.cancel();
692 assert!(ctx.is_cancelled());
693 }
694}