use std::time::Duration;
use crate::tool_executor::ToolResult;
use crate::tool_registry::ToolEntry;
fn parse_timeout(s: &str) -> Option<Duration> {
let s = s.trim();
if s.is_empty() {
return None;
}
if let Some(secs) = s.strip_suffix("ms") {
secs.trim().parse::<u64>().ok().map(Duration::from_millis)
} else if let Some(secs) = s.strip_suffix('s') {
secs.trim().parse::<u64>().ok().map(Duration::from_secs)
} else if let Some(mins) = s.strip_suffix('m') {
mins.trim()
.parse::<u64>()
.ok()
.map(|m| Duration::from_secs(m * 60))
} else {
s.parse::<u64>().ok().map(Duration::from_secs)
}
}
pub fn parse_timeout_pub(s: &str) -> Option<Duration> {
parse_timeout(s)
}
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
pub fn dispatch_http(entry: &ToolEntry, argument: &str) -> ToolResult {
let url = entry.runtime.trim();
if url.is_empty() {
return ToolResult {
success: false,
output: format!(
"HTTP tool '{}': no endpoint URL. Set runtime: \"https://...\" in tool definition.",
entry.name
),
tool_name: entry.name.clone(),
};
}
if !url.starts_with("http://") && !url.starts_with("https://") {
return ToolResult {
success: false,
output: format!(
"HTTP tool '{}': invalid URL '{}'. Must start with http:// or https://.",
entry.name, url
),
tool_name: entry.name.clone(),
};
}
let timeout = parse_timeout(&entry.timeout).unwrap_or(DEFAULT_TIMEOUT);
let body = if argument.trim_start().starts_with('{') || argument.trim_start().starts_with('[') {
argument.to_string()
} else {
serde_json::json!({ "input": argument }).to_string()
};
match execute_request(url, &entry.name, &body, timeout) {
Ok(response) => response,
Err(e) => ToolResult {
success: false,
output: format!("HTTP tool '{}': {}", entry.name, e),
tool_name: entry.name.clone(),
},
}
}
fn execute_request(
url: &str,
tool_name: &str,
body: &str,
timeout: Duration,
) -> Result<ToolResult, String> {
let client = reqwest::blocking::Client::builder()
.timeout(timeout)
.build()
.map_err(|e| format!("failed to create HTTP client: {e}"))?;
let response = client
.post(url)
.header("Content-Type", "application/json")
.header("X-Axon-Tool", tool_name)
.body(body.to_string())
.send()
.map_err(|e| {
if e.is_timeout() {
format!("request timed out after {}s", timeout.as_secs())
} else if e.is_connect() {
format!("connection failed to {url}")
} else {
format!("request failed: {e}")
}
})?;
let status = response.status();
let response_body = response
.text()
.map_err(|e| format!("failed to read response body: {e}"))?;
if status.is_success() {
Ok(ToolResult {
success: true,
output: response_body,
tool_name: tool_name.to_string(),
})
} else {
Ok(ToolResult {
success: false,
output: format!(
"HTTP {}: {}",
status.as_u16(),
if response_body.len() > 200 {
format!("{}...", &response_body[..200])
} else {
response_body
}
),
tool_name: tool_name.to_string(),
})
}
}
use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use crate::backends::sse_streaming::{LineBuffer, SseEventParser};
use crate::tool_trait::{Tool, ToolChunk, ToolContext, ToolFinishReason, ToolStream};
pub struct HttpStreamingTool {
name: String,
url: String,
timeout: Duration,
}
impl HttpStreamingTool {
pub fn from_entry(entry: &ToolEntry) -> Result<Self, String> {
let url = entry.runtime.trim();
if url.is_empty() {
return Err(format!(
"HTTP tool '{}': no endpoint URL. Set runtime: \"https://...\" in tool definition.",
entry.name
));
}
if !url.starts_with("http://") && !url.starts_with("https://") {
return Err(format!(
"HTTP tool '{}': invalid URL '{}'. Must start with http:// or https://.",
entry.name, url
));
}
let timeout = parse_timeout(&entry.timeout).unwrap_or(DEFAULT_TIMEOUT);
Ok(Self {
name: entry.name.clone(),
url: url.to_string(),
timeout,
})
}
pub fn new(name: String, url: String, timeout: Duration) -> Self {
Self { name, url, timeout }
}
}
fn build_request_body(args: &str) -> String {
let trimmed = args.trim_start();
if trimmed.starts_with('{') || trimmed.starts_with('[') {
args.to_string()
} else {
serde_json::json!({ "input": args }).to_string()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FramingMode {
Sse,
Ndjson,
Single,
}
fn classify_framing(content_type: &str) -> FramingMode {
let lc = content_type.to_ascii_lowercase();
if lc.contains("text/event-stream") {
FramingMode::Sse
} else if lc.contains("application/x-ndjson") || lc.contains("application/jsonl") {
FramingMode::Ndjson
} else {
FramingMode::Single
}
}
#[async_trait]
impl Tool for HttpStreamingTool {
async fn execute(&self, args: String, _ctx: ToolContext) -> ToolResult {
let entry = ToolEntry {
name: self.name.clone(),
provider: "http".to_string(),
timeout: format!("{}s", self.timeout.as_secs()),
runtime: self.url.clone(),
sandbox: None,
max_results: None,
output_schema: String::new(),
effect_row: Vec::new(),
source: crate::tool_registry::ToolSource::Program,
is_streaming: false,
};
match tokio::task::spawn_blocking(move || dispatch_http(&entry, &args)).await {
Ok(result) => result,
Err(e) => ToolResult {
success: false,
output: format!("HTTP tool '{}': blocking task join failed: {e}", self.name),
tool_name: self.name.clone(),
},
}
}
async fn stream(&self, args: String, ctx: ToolContext) -> ToolStream {
let url = self.url.clone();
let name = self.name.clone();
let timeout = self.timeout;
let cancel = ctx.cancel.clone();
let body = build_request_body(&args);
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ToolChunk>();
tokio::spawn(async move {
let send_terminator = |reason: ToolFinishReason| {
let _ = tx.send(ToolChunk::terminator("", reason));
};
if cancel.is_cancelled() {
send_terminator(ToolFinishReason::Cancelled);
return;
}
let client = match reqwest::Client::builder().timeout(timeout).build() {
Ok(c) => c,
Err(e) => {
send_terminator(ToolFinishReason::Error {
message: format!(
"HTTP tool '{name}': failed to build async client: {e}"
),
});
return;
}
};
let response = match client
.post(&url)
.header("Content-Type", "application/json")
.header("X-Axon-Tool", &name)
.body(body)
.send()
.await
{
Ok(r) => r,
Err(e) => {
let message = if e.is_timeout() {
format!(
"HTTP tool '{name}': request timed out after {}s",
timeout.as_secs()
)
} else if e.is_connect() {
format!("HTTP tool '{name}': connection failed to {url}")
} else {
format!("HTTP tool '{name}': request failed: {e}")
};
send_terminator(ToolFinishReason::Error { message });
return;
}
};
let status = response.status();
if !status.is_success() {
let body_text = response.text().await.unwrap_or_default();
let truncated = if body_text.len() > 200 {
format!("{}...", &body_text[..200])
} else {
body_text
};
send_terminator(ToolFinishReason::Error {
message: format!("HTTP {}: {}", status.as_u16(), truncated),
});
return;
}
let content_type = response
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or("")
.to_string();
let framing = classify_framing(&content_type);
let mut byte_stream = response.bytes_stream();
let drain_result = match framing {
FramingMode::Sse => {
drain_sse(&mut byte_stream, &cancel, &tx).await
}
FramingMode::Ndjson => {
drain_ndjson(&mut byte_stream, &cancel, &tx).await
}
FramingMode::Single => {
drain_single(&mut byte_stream, &cancel, &tx).await
}
};
match drain_result {
DrainOutcome::Completed => send_terminator(ToolFinishReason::Stop),
DrainOutcome::Cancelled => send_terminator(ToolFinishReason::Cancelled),
DrainOutcome::Error(message) => {
send_terminator(ToolFinishReason::Error { message })
}
}
});
Box::pin(futures::stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|chunk| (chunk, rx))
}))
}
fn is_streaming(&self) -> bool {
true
}
}
enum DrainOutcome {
Completed,
Cancelled,
Error(String),
}
async fn drain_sse<S>(
byte_stream: &mut S,
cancel: &crate::cancel_token::CancellationFlag,
tx: &tokio::sync::mpsc::UnboundedSender<ToolChunk>,
) -> DrainOutcome
where
S: futures::Stream<Item = reqwest::Result<Bytes>> + Unpin + Send,
{
let mut line_buf = LineBuffer::new();
let mut sse_parser = SseEventParser::new();
loop {
if cancel.is_cancelled() {
return DrainOutcome::Cancelled;
}
match byte_stream.next().await {
None => break,
Some(Err(e)) => {
return DrainOutcome::Error(format!("SSE stream chunk error: {e}"))
}
Some(Ok(bytes)) => {
let lines = line_buf.push(&bytes);
for line in lines {
if let Some(event) = sse_parser.push_line(&line) {
if let Some(data) = event.data {
if tx
.send(ToolChunk::intermediate(data))
.is_err()
{
return DrainOutcome::Cancelled;
}
}
}
}
}
}
}
if let Some(line) = line_buf.flush() {
if let Some(event) = sse_parser.push_line(&line) {
if let Some(data) = event.data {
let _ = tx.send(ToolChunk::intermediate(data));
}
}
}
DrainOutcome::Completed
}
async fn drain_ndjson<S>(
byte_stream: &mut S,
cancel: &crate::cancel_token::CancellationFlag,
tx: &tokio::sync::mpsc::UnboundedSender<ToolChunk>,
) -> DrainOutcome
where
S: futures::Stream<Item = reqwest::Result<Bytes>> + Unpin + Send,
{
let mut line_buf = LineBuffer::new();
loop {
if cancel.is_cancelled() {
return DrainOutcome::Cancelled;
}
match byte_stream.next().await {
None => break,
Some(Err(e)) => {
return DrainOutcome::Error(format!("NDJSON stream chunk error: {e}"))
}
Some(Ok(bytes)) => {
let lines = line_buf.push(&bytes);
for line in lines {
if !line.is_empty()
&& tx.send(ToolChunk::intermediate(line)).is_err()
{
return DrainOutcome::Cancelled;
}
}
}
}
}
if let Some(line) = line_buf.flush() {
if !line.is_empty() {
let _ = tx.send(ToolChunk::intermediate(line));
}
}
DrainOutcome::Completed
}
async fn drain_single<S>(
byte_stream: &mut S,
cancel: &crate::cancel_token::CancellationFlag,
tx: &tokio::sync::mpsc::UnboundedSender<ToolChunk>,
) -> DrainOutcome
where
S: futures::Stream<Item = reqwest::Result<Bytes>> + Unpin + Send,
{
let mut acc: Vec<u8> = Vec::new();
loop {
if cancel.is_cancelled() {
return DrainOutcome::Cancelled;
}
match byte_stream.next().await {
None => break,
Some(Err(e)) => {
return DrainOutcome::Error(format!("HTTP body chunk error: {e}"))
}
Some(Ok(bytes)) => acc.extend_from_slice(&bytes),
}
}
let body_text = String::from_utf8_lossy(&acc).into_owned();
if !body_text.is_empty()
&& tx
.send(ToolChunk::intermediate(body_text))
.is_err()
{
return DrainOutcome::Cancelled;
}
DrainOutcome::Completed
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tool_registry::{ToolEntry, ToolSource};
fn make_http_entry(name: &str, url: &str, timeout: &str) -> ToolEntry {
ToolEntry {
name: name.to_string(),
provider: "http".to_string(),
timeout: timeout.to_string(),
runtime: url.to_string(),
sandbox: None,
max_results: None,
output_schema: "JSON".to_string(),
effect_row: vec!["network".to_string()],
source: ToolSource::Program,
is_streaming: false,
}
}
#[test]
fn parse_timeout_seconds() {
assert_eq!(parse_timeout("10s"), Some(Duration::from_secs(10)));
assert_eq!(parse_timeout("30s"), Some(Duration::from_secs(30)));
}
#[test]
fn parse_timeout_milliseconds() {
assert_eq!(parse_timeout("500ms"), Some(Duration::from_millis(500)));
assert_eq!(parse_timeout("100ms"), Some(Duration::from_millis(100)));
}
#[test]
fn parse_timeout_minutes() {
assert_eq!(parse_timeout("2m"), Some(Duration::from_secs(120)));
}
#[test]
fn parse_timeout_raw_number() {
assert_eq!(parse_timeout("15"), Some(Duration::from_secs(15)));
}
#[test]
fn parse_timeout_empty() {
assert_eq!(parse_timeout(""), None);
assert_eq!(parse_timeout(" "), None);
}
#[test]
fn parse_timeout_invalid() {
assert_eq!(parse_timeout("abc"), None);
assert_eq!(parse_timeout("10x"), None);
}
#[test]
fn dispatch_empty_url_fails() {
let entry = make_http_entry("DataAPI", "", "10s");
let result = dispatch_http(&entry, "test query");
assert!(!result.success);
assert!(result.output.contains("no endpoint URL"));
}
#[test]
fn dispatch_invalid_url_scheme_fails() {
let entry = make_http_entry("DataAPI", "ftp://example.com", "10s");
let result = dispatch_http(&entry, "test query");
assert!(!result.success);
assert!(result.output.contains("invalid URL"));
assert!(result.output.contains("http://"));
}
#[test]
fn dispatch_connection_refused() {
let entry = make_http_entry("TestTool", "http://127.0.0.1:1/api", "2s");
let result = dispatch_http(&entry, "test");
assert!(!result.success);
assert!(
result.output.contains("connection failed")
|| result.output.contains("request failed")
|| result.output.contains("timed out"),
"unexpected error: {}",
result.output
);
}
#[test]
fn json_body_passthrough() {
let arg = r#"{"query": "test"}"#;
let body = if arg.trim_start().starts_with('{') {
arg.to_string()
} else {
serde_json::json!({ "input": arg }).to_string()
};
assert_eq!(body, r#"{"query": "test"}"#);
}
#[test]
fn plain_text_wrapped() {
let arg = "search for cats";
let body = if arg.trim_start().starts_with('{') || arg.trim_start().starts_with('[') {
arg.to_string()
} else {
serde_json::json!({ "input": arg }).to_string()
};
let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(parsed["input"], "search for cats");
}
#[test]
fn array_body_passthrough() {
let arg = r#"[1, 2, 3]"#;
let body = if arg.trim_start().starts_with('{') || arg.trim_start().starts_with('[') {
arg.to_string()
} else {
serde_json::json!({ "input": arg }).to_string()
};
assert_eq!(body, "[1, 2, 3]");
}
}