use crate::cancel_token::CancellationFlag;
use crate::tool_executor::{self, ToolResult};
use crate::tool_registry::ToolEntry;
use crate::tool_trait::{Tool, ToolChunk, ToolContext, ToolFinishReason, ToolStream};
use async_trait::async_trait;
use futures::stream;
pub fn resolve_streaming_tool(entry: &ToolEntry) -> Box<dyn Tool> {
match entry.provider.trim() {
"" | "stub" | "stub_stream" => {
Box::new(StubStreamingTool::new(entry.name.clone()))
}
"native" => {
Box::new(NativeWrappedTool::new(entry.name.clone()))
}
"http" => match crate::http_tool::HttpStreamingTool::from_entry(entry) {
Ok(t) => Box::new(t),
Err(_) => Box::new(SyncFallbackTool::new(
entry.name.clone(),
"http".to_string(),
)),
},
"mcp" => match crate::emcp::McpStreamingTool::from_entry(entry) {
Ok(t) => Box::new(t),
Err(_) => Box::new(SyncFallbackTool::new(
entry.name.clone(),
"mcp".to_string(),
)),
},
other => Box::new(SyncFallbackTool::new(
entry.name.clone(),
other.to_string(),
)),
}
}
pub struct StubStreamingTool {
name: String,
}
impl StubStreamingTool {
pub fn new(name: String) -> Self {
Self { name }
}
}
#[async_trait]
impl Tool for StubStreamingTool {
async fn execute(&self, args: String, _ctx: ToolContext) -> ToolResult {
ToolResult {
success: true,
output: format!("[stub-stream-tool] {}({args})", self.name),
tool_name: self.name.clone(),
}
}
async fn stream(&self, args: String, ctx: ToolContext) -> ToolStream {
let name = self.name.clone();
let cancel = ctx.cancel.clone();
let chunks: Vec<ToolChunk> = if cancel.is_cancelled() {
vec![ToolChunk::terminator("", ToolFinishReason::Cancelled)]
} else {
vec![
ToolChunk::intermediate(format!("[stub-stream] {name}(")),
ToolChunk::intermediate(args),
ToolChunk::intermediate(")"),
ToolChunk::terminator("", ToolFinishReason::Stop),
]
};
Box::pin(stream::iter(chunks))
}
fn is_streaming(&self) -> bool {
true
}
}
pub struct NativeWrappedTool {
name: String,
}
impl NativeWrappedTool {
pub fn new(name: String) -> Self {
Self { name }
}
}
#[async_trait]
impl Tool for NativeWrappedTool {
async fn execute(&self, args: String, _ctx: ToolContext) -> ToolResult {
tool_executor::dispatch(&self.name, &args).unwrap_or_else(|| ToolResult {
success: false,
output: format!("native tool '{}' not registered", self.name),
tool_name: self.name.clone(),
})
}
async fn stream(&self, args: String, ctx: ToolContext) -> ToolStream {
let result = self.execute(args, ctx).await;
Box::pin(stream::iter(vec![ToolChunk::from_result(&result)]))
}
fn is_streaming(&self) -> bool {
false
}
}
pub struct SyncFallbackTool {
name: String,
provider: String,
}
impl SyncFallbackTool {
pub fn new(name: String, provider: String) -> Self {
Self { name, provider }
}
}
#[async_trait]
impl Tool for SyncFallbackTool {
async fn execute(&self, _args: String, _ctx: ToolContext) -> ToolResult {
ToolResult {
success: false,
output: format!(
"synchronous fallback for provider '{}' tool '{}' — \
streaming dispatch only resolves stream-effect tools \
via dedicated provider adapters (Fase 34.e HTTP / \
Fase 34.f MCP).",
self.provider, self.name
),
tool_name: self.name.clone(),
}
}
async fn stream(&self, args: String, ctx: ToolContext) -> ToolStream {
let provider = self.provider.clone();
let result = self.execute(args, ctx).await;
let hint = match provider.as_str() {
"http" => {
"Fase 34.e shipped HTTP streaming — verify the \
tool's `runtime:` URL starts with http:// or https://"
}
"mcp" => {
"Fase 34.f shipped MCP streaming — verify the \
tool's `runtime:` URL starts with http:// or https://"
}
_ => "no dedicated streaming adapter — pending later sub-fase",
};
let error_msg = format!(
"streaming dispatch fallback for provider '{provider}' tool '{name}' \
({hint}); synchronous fallback returned: {output}",
name = self.name,
output = result.output,
);
Box::pin(stream::iter(vec![ToolChunk::terminator(
String::new(),
ToolFinishReason::Error { message: error_msg },
)]))
}
fn is_streaming(&self) -> bool {
false
}
}
pub fn extract_stream_policy(
effect_row: &[String],
) -> Option<crate::stream_effect::BackpressurePolicy> {
for entry in effect_row {
if let Some(rest) = entry.strip_prefix("stream:") {
if let Some(policy) =
crate::stream_effect::BackpressurePolicy::from_slug(rest)
{
return Some(policy);
}
}
}
None
}
pub fn build_tool_context(cancel: CancellationFlag, trace_id: u64) -> ToolContext {
ToolContext::new(cancel, trace_id)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::stream_effect::BackpressurePolicy;
use crate::tool_registry::ToolSource;
use futures::StreamExt;
fn entry(name: &str, provider: &str, effect_row: Vec<String>) -> ToolEntry {
let is_streaming = crate::tool_registry::derive_is_streaming(&effect_row);
ToolEntry {
name: name.to_string(),
provider: provider.to_string(),
timeout: String::new(),
runtime: String::new(),
sandbox: None,
max_results: None,
output_schema: String::new(),
effect_row,
source: ToolSource::Program,
is_streaming,
}
}
#[test]
fn resolve_stub_provider_returns_stub_streaming_tool() {
let e = entry("MyTool", "stub", vec!["stream:drop_oldest".into()]);
let tool = resolve_streaming_tool(&e);
assert!(tool.is_streaming());
}
#[test]
fn resolve_stub_stream_alias_returns_stub_streaming_tool() {
let e = entry("MyTool", "stub_stream", vec!["stream:fail".into()]);
let tool = resolve_streaming_tool(&e);
assert!(tool.is_streaming());
}
#[test]
fn resolve_native_provider_returns_native_wrapped_tool() {
let e = entry("Calculator", "native", vec!["compute".into()]);
let tool = resolve_streaming_tool(&e);
assert!(!tool.is_streaming()); }
#[test]
fn resolve_http_provider_with_valid_url_returns_http_streaming_tool() {
let mut e = entry("HttpTool", "http", vec!["stream:drop_oldest".into()]);
e.runtime = "https://example.com/api".to_string();
e.timeout = "10s".to_string();
let tool = resolve_streaming_tool(&e);
assert!(tool.is_streaming());
}
#[test]
fn resolve_http_provider_with_invalid_url_falls_back_to_sync_fallback() {
let mut e = entry("HttpTool", "http", vec!["stream:drop_oldest".into()]);
e.runtime = "ftp://example.com/api".to_string(); let tool = resolve_streaming_tool(&e);
assert!(!tool.is_streaming());
}
#[test]
fn resolve_http_provider_with_empty_url_falls_back_to_sync_fallback() {
let mut e = entry("HttpTool", "http", vec!["stream:drop_oldest".into()]);
e.runtime = String::new();
let tool = resolve_streaming_tool(&e);
assert!(!tool.is_streaming());
}
#[test]
fn resolve_mcp_provider_with_valid_url_returns_mcp_streaming_tool() {
let mut e = entry("McpTool", "mcp", vec!["stream:fail".into()]);
e.runtime = "http://localhost:3000/mcp".to_string();
e.timeout = "10s".to_string();
let tool = resolve_streaming_tool(&e);
assert!(tool.is_streaming());
}
#[test]
fn resolve_mcp_provider_with_invalid_url_falls_back_to_sync_fallback() {
let mut e = entry("McpTool", "mcp", vec!["stream:fail".into()]);
e.runtime = "ws://localhost:3000".to_string(); let tool = resolve_streaming_tool(&e);
assert!(!tool.is_streaming());
}
#[test]
fn resolve_mcp_provider_with_empty_url_falls_back_to_sync_fallback() {
let mut e = entry("McpTool", "mcp", vec!["stream:fail".into()]);
e.runtime = String::new();
let tool = resolve_streaming_tool(&e);
assert!(!tool.is_streaming());
}
#[test]
fn resolve_unknown_provider_falls_through() {
let e = entry("CustomTool", "custom_xyz", vec![]);
let tool = resolve_streaming_tool(&e);
assert!(!tool.is_streaming());
}
#[tokio::test]
async fn stub_streaming_tool_emits_4_frame_sequence() {
let tool = StubStreamingTool::new("Search".to_string());
let cancel = CancellationFlag::new();
let ctx = ToolContext::new(cancel, 0x42);
let mut stream = tool.stream("query=axon".to_string(), ctx).await;
let chunks: Vec<ToolChunk> = {
let mut v = Vec::new();
while let Some(c) = stream.next().await {
v.push(c);
}
v
};
assert_eq!(chunks.len(), 4);
assert_eq!(chunks[0].delta, "[stub-stream] Search(");
assert_eq!(chunks[1].delta, "query=axon");
assert_eq!(chunks[2].delta, ")");
assert!(chunks[3].is_terminator());
assert_eq!(chunks[3].finish_reason, Some(ToolFinishReason::Stop));
}
#[tokio::test]
async fn stub_streaming_tool_pre_cancel_emits_cancelled_terminator_only() {
let tool = StubStreamingTool::new("Search".to_string());
let cancel = CancellationFlag::new();
cancel.cancel(); let ctx = ToolContext::new(cancel, 0x42);
let mut stream = tool.stream("query=axon".to_string(), ctx).await;
let chunks: Vec<ToolChunk> = {
let mut v = Vec::new();
while let Some(c) = stream.next().await {
v.push(c);
}
v
};
assert_eq!(chunks.len(), 1);
assert!(chunks[0].is_terminator());
assert_eq!(chunks[0].finish_reason, Some(ToolFinishReason::Cancelled));
}
#[tokio::test]
async fn native_wrapped_tool_wraps_calculator_as_single_chunk() {
let tool = NativeWrappedTool::new("Calculator".to_string());
let cancel = CancellationFlag::new();
let ctx = ToolContext::new(cancel, 0);
let mut stream = tool.stream("2 + 3".to_string(), ctx).await;
let first = stream.next().await.expect("at least one chunk");
assert_eq!(first.delta, "5");
assert_eq!(first.finish_reason, Some(ToolFinishReason::Stop));
assert!(first.is_terminator());
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn sync_fallback_tool_for_http_emits_error_terminator() {
let tool = SyncFallbackTool::new("HttpTool".to_string(), "http".to_string());
let cancel = CancellationFlag::new();
let ctx = ToolContext::new(cancel, 0);
let mut stream = tool.stream("arg".to_string(), ctx).await;
let chunk = stream.next().await.expect("at least one chunk");
assert!(chunk.is_terminator());
match chunk.finish_reason {
Some(ToolFinishReason::Error { ref message }) => {
assert!(message.contains("Fase 34.e"));
assert!(message.contains("http"));
assert!(
message.contains("runtime") || message.contains("URL"),
"post-34.e fallback hint must reference URL validation: {message}"
);
}
other => panic!("expected Error finish_reason, got {other:?}"),
}
}
#[tokio::test]
async fn sync_fallback_tool_for_mcp_emits_error_terminator() {
let tool = SyncFallbackTool::new("McpTool".to_string(), "mcp".to_string());
let cancel = CancellationFlag::new();
let ctx = ToolContext::new(cancel, 0);
let mut stream = tool.stream("arg".to_string(), ctx).await;
let chunk = stream.next().await.expect("at least one chunk");
match chunk.finish_reason {
Some(ToolFinishReason::Error { ref message }) => {
assert!(message.contains("Fase 34.f"));
assert!(message.contains("mcp"));
assert!(
message.contains("runtime") || message.contains("URL"),
"post-34.f fallback hint must reference URL validation: {message}"
);
}
other => panic!("expected Error finish_reason, got {other:?}"),
}
}
#[test]
fn extract_stream_policy_returns_none_for_empty_effect_row() {
assert_eq!(extract_stream_policy(&[]), None);
}
#[test]
fn extract_stream_policy_returns_none_for_non_stream_effects() {
assert_eq!(
extract_stream_policy(&[
"compute".into(),
"network".into(),
"io".into(),
]),
None
);
}
#[test]
fn extract_stream_policy_parses_drop_oldest() {
assert_eq!(
extract_stream_policy(&["stream:drop_oldest".into()]),
Some(BackpressurePolicy::DropOldest)
);
}
#[test]
fn extract_stream_policy_parses_all_four_closed_catalog_policies() {
assert_eq!(
extract_stream_policy(&["stream:drop_oldest".into()]),
Some(BackpressurePolicy::DropOldest)
);
assert_eq!(
extract_stream_policy(&["stream:degrade_quality".into()]),
Some(BackpressurePolicy::DegradeQuality)
);
assert_eq!(
extract_stream_policy(&["stream:pause_upstream".into()]),
Some(BackpressurePolicy::PauseUpstream)
);
assert_eq!(
extract_stream_policy(&["stream:fail".into()]),
Some(BackpressurePolicy::Fail)
);
}
#[test]
fn extract_stream_policy_ignores_unknown_slug() {
assert_eq!(
extract_stream_policy(&["stream:nonsense_xyz".into()]),
None
);
}
#[test]
fn extract_stream_policy_first_wins_on_multiple() {
assert_eq!(
extract_stream_policy(&[
"stream:drop_oldest".into(),
"stream:fail".into(),
]),
Some(BackpressurePolicy::DropOldest)
);
}
#[test]
fn build_tool_context_wires_cancel_and_trace_id() {
let cancel = CancellationFlag::new();
let ctx = build_tool_context(cancel.clone(), 0xCAFE_BABE);
assert_eq!(ctx.trace_id, 0xCAFE_BABE);
assert!(!ctx.is_cancelled());
cancel.cancel();
assert!(ctx.is_cancelled());
}
}