1use std::time::Duration;
32
33use crate::tool_executor::ToolResult;
34use crate::tool_registry::ToolEntry;
35
36fn parse_timeout(s: &str) -> Option<Duration> {
41 let s = s.trim();
42 if s.is_empty() {
43 return None;
44 }
45
46 if let Some(secs) = s.strip_suffix("ms") {
47 secs.trim().parse::<u64>().ok().map(Duration::from_millis)
48 } else if let Some(secs) = s.strip_suffix('s') {
49 secs.trim().parse::<u64>().ok().map(Duration::from_secs)
50 } else if let Some(mins) = s.strip_suffix('m') {
51 mins.trim()
52 .parse::<u64>()
53 .ok()
54 .map(|m| Duration::from_secs(m * 60))
55 } else {
56 s.parse::<u64>().ok().map(Duration::from_secs)
58 }
59}
60
61pub fn parse_timeout_pub(s: &str) -> Option<Duration> {
63 parse_timeout(s)
64}
65
66const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
67
68pub fn dispatch_http(entry: &ToolEntry, argument: &str) -> ToolResult {
78 let url = entry.runtime.trim();
79
80 if url.is_empty() {
81 return ToolResult {
82 success: false,
83 output: format!(
84 "HTTP tool '{}': no endpoint URL. Set runtime: \"https://...\" in tool definition.",
85 entry.name
86 ),
87 tool_name: entry.name.clone(),
88 };
89 }
90
91 if !url.starts_with("http://") && !url.starts_with("https://") {
93 return ToolResult {
94 success: false,
95 output: format!(
96 "HTTP tool '{}': invalid URL '{}'. Must start with http:// or https://.",
97 entry.name, url
98 ),
99 tool_name: entry.name.clone(),
100 };
101 }
102
103 let timeout = parse_timeout(&entry.timeout).unwrap_or(DEFAULT_TIMEOUT);
104
105 let body = if argument.trim_start().starts_with('{') || argument.trim_start().starts_with('[') {
107 argument.to_string()
108 } else {
109 serde_json::json!({ "input": argument }).to_string()
110 };
111
112 match execute_request(url, &entry.name, &body, timeout) {
114 Ok(response) => response,
115 Err(e) => ToolResult {
116 success: false,
117 output: format!("HTTP tool '{}': {}", entry.name, e),
118 tool_name: entry.name.clone(),
119 },
120 }
121}
122
123fn execute_request(
125 url: &str,
126 tool_name: &str,
127 body: &str,
128 timeout: Duration,
129) -> Result<ToolResult, String> {
130 let client = reqwest::blocking::Client::builder()
131 .timeout(timeout)
132 .build()
133 .map_err(|e| format!("failed to create HTTP client: {e}"))?;
134
135 let response = client
136 .post(url)
137 .header("Content-Type", "application/json")
138 .header("X-Axon-Tool", tool_name)
139 .body(body.to_string())
140 .send()
141 .map_err(|e| {
142 if e.is_timeout() {
143 format!("request timed out after {}s", timeout.as_secs())
144 } else if e.is_connect() {
145 format!("connection failed to {url}")
146 } else {
147 format!("request failed: {e}")
148 }
149 })?;
150
151 let status = response.status();
152 let response_body = response
153 .text()
154 .map_err(|e| format!("failed to read response body: {e}"))?;
155
156 if status.is_success() {
157 Ok(ToolResult {
158 success: true,
159 output: response_body,
160 tool_name: tool_name.to_string(),
161 })
162 } else {
163 Ok(ToolResult {
164 success: false,
165 output: format!(
166 "HTTP {}: {}",
167 status.as_u16(),
168 if response_body.len() > 200 {
169 format!("{}...", &response_body[..200])
170 } else {
171 response_body
172 }
173 ),
174 tool_name: tool_name.to_string(),
175 })
176 }
177}
178
179use async_trait::async_trait;
185use bytes::Bytes;
186use futures::StreamExt;
187
188use crate::backends::sse_streaming::{LineBuffer, SseEventParser};
189use crate::tool_trait::{Tool, ToolChunk, ToolContext, ToolFinishReason, ToolStream};
190
191pub struct HttpStreamingTool {
228 name: String,
229 url: String,
230 timeout: Duration,
231}
232
233impl HttpStreamingTool {
234 pub fn from_entry(entry: &ToolEntry) -> Result<Self, String> {
238 let url = entry.runtime.trim();
239 if url.is_empty() {
240 return Err(format!(
241 "HTTP tool '{}': no endpoint URL. Set runtime: \"https://...\" in tool definition.",
242 entry.name
243 ));
244 }
245 if !url.starts_with("http://") && !url.starts_with("https://") {
246 return Err(format!(
247 "HTTP tool '{}': invalid URL '{}'. Must start with http:// or https://.",
248 entry.name, url
249 ));
250 }
251 let timeout = parse_timeout(&entry.timeout).unwrap_or(DEFAULT_TIMEOUT);
252 Ok(Self {
253 name: entry.name.clone(),
254 url: url.to_string(),
255 timeout,
256 })
257 }
258
259 pub fn new(name: String, url: String, timeout: Duration) -> Self {
262 Self { name, url, timeout }
263 }
264}
265
266fn build_request_body(args: &str) -> String {
269 let trimmed = args.trim_start();
270 if trimmed.starts_with('{') || trimmed.starts_with('[') {
271 args.to_string()
272 } else {
273 serde_json::json!({ "input": args }).to_string()
274 }
275}
276
277#[derive(Debug, Clone, Copy, PartialEq, Eq)]
280enum FramingMode {
281 Sse,
285 Ndjson,
288 Single,
292}
293
294fn classify_framing(content_type: &str) -> FramingMode {
295 let lc = content_type.to_ascii_lowercase();
296 if lc.contains("text/event-stream") {
297 FramingMode::Sse
298 } else if lc.contains("application/x-ndjson") || lc.contains("application/jsonl") {
299 FramingMode::Ndjson
300 } else {
301 FramingMode::Single
302 }
303}
304
305#[async_trait]
306impl Tool for HttpStreamingTool {
307 async fn execute(&self, args: String, _ctx: ToolContext) -> ToolResult {
308 let entry = ToolEntry {
318 name: self.name.clone(),
319 provider: "http".to_string(),
320 timeout: format!("{}s", self.timeout.as_secs()),
321 runtime: self.url.clone(),
322 sandbox: None,
323 max_results: None,
324 output_schema: String::new(),
325 effect_row: Vec::new(),
326 parameters: Vec::new(),
329 source: crate::tool_registry::ToolSource::Program,
330 is_streaming: false,
331 };
332 match tokio::task::spawn_blocking(move || dispatch_http(&entry, &args)).await {
333 Ok(result) => result,
334 Err(e) => ToolResult {
335 success: false,
336 output: format!("HTTP tool '{}': blocking task join failed: {e}", self.name),
337 tool_name: self.name.clone(),
338 },
339 }
340 }
341
342 async fn stream(&self, args: String, ctx: ToolContext) -> ToolStream {
343 let url = self.url.clone();
344 let name = self.name.clone();
345 let timeout = self.timeout;
346 let cancel = ctx.cancel.clone();
347 let body = build_request_body(&args);
348
349 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ToolChunk>();
355
356 tokio::spawn(async move {
357 let send_terminator = |reason: ToolFinishReason| {
362 let _ = tx.send(ToolChunk::terminator("", reason));
363 };
364
365 if cancel.is_cancelled() {
367 send_terminator(ToolFinishReason::Cancelled);
368 return;
369 }
370
371 let client = match reqwest::Client::builder().timeout(timeout).build() {
373 Ok(c) => c,
374 Err(e) => {
375 send_terminator(ToolFinishReason::Error {
376 message: format!(
377 "HTTP tool '{name}': failed to build async client: {e}"
378 ),
379 });
380 return;
381 }
382 };
383
384 let response = match client
386 .post(&url)
387 .header("Content-Type", "application/json")
388 .header("X-Axon-Tool", &name)
389 .body(body)
390 .send()
391 .await
392 {
393 Ok(r) => r,
394 Err(e) => {
395 let message = if e.is_timeout() {
396 format!(
397 "HTTP tool '{name}': request timed out after {}s",
398 timeout.as_secs()
399 )
400 } else if e.is_connect() {
401 format!("HTTP tool '{name}': connection failed to {url}")
402 } else {
403 format!("HTTP tool '{name}': request failed: {e}")
404 };
405 send_terminator(ToolFinishReason::Error { message });
406 return;
407 }
408 };
409
410 let status = response.status();
414 if !status.is_success() {
415 let body_text = response.text().await.unwrap_or_default();
416 let truncated = if body_text.len() > 200 {
417 format!("{}...", &body_text[..200])
418 } else {
419 body_text
420 };
421 send_terminator(ToolFinishReason::Error {
422 message: format!("HTTP {}: {}", status.as_u16(), truncated),
423 });
424 return;
425 }
426
427 let content_type = response
429 .headers()
430 .get(reqwest::header::CONTENT_TYPE)
431 .and_then(|v| v.to_str().ok())
432 .unwrap_or("")
433 .to_string();
434 let framing = classify_framing(&content_type);
435
436 let mut byte_stream = response.bytes_stream();
438 let drain_result = match framing {
439 FramingMode::Sse => {
440 drain_sse(&mut byte_stream, &cancel, &tx).await
441 }
442 FramingMode::Ndjson => {
443 drain_ndjson(&mut byte_stream, &cancel, &tx).await
444 }
445 FramingMode::Single => {
446 drain_single(&mut byte_stream, &cancel, &tx).await
447 }
448 };
449
450 match drain_result {
451 DrainOutcome::Completed => send_terminator(ToolFinishReason::Stop),
452 DrainOutcome::Cancelled => send_terminator(ToolFinishReason::Cancelled),
453 DrainOutcome::Error(message) => {
454 send_terminator(ToolFinishReason::Error { message })
455 }
456 }
457 });
458
459 Box::pin(futures::stream::unfold(rx, |mut rx| async move {
463 rx.recv().await.map(|chunk| (chunk, rx))
464 }))
465 }
466
467 fn is_streaming(&self) -> bool {
468 true
469 }
470}
471
472enum DrainOutcome {
475 Completed,
476 Cancelled,
477 Error(String),
478}
479
480async fn drain_sse<S>(
486 byte_stream: &mut S,
487 cancel: &crate::cancel_token::CancellationFlag,
488 tx: &tokio::sync::mpsc::UnboundedSender<ToolChunk>,
489) -> DrainOutcome
490where
491 S: futures::Stream<Item = reqwest::Result<Bytes>> + Unpin + Send,
492{
493 let mut line_buf = LineBuffer::new();
494 let mut sse_parser = SseEventParser::new();
495 loop {
496 if cancel.is_cancelled() {
497 return DrainOutcome::Cancelled;
498 }
499 match byte_stream.next().await {
500 None => break,
501 Some(Err(e)) => {
502 return DrainOutcome::Error(format!("SSE stream chunk error: {e}"))
503 }
504 Some(Ok(bytes)) => {
505 let lines = line_buf.push(&bytes);
506 for line in lines {
507 if let Some(event) = sse_parser.push_line(&line) {
508 if let Some(data) = event.data {
509 if tx
510 .send(ToolChunk::intermediate(data))
511 .is_err()
512 {
513 return DrainOutcome::Cancelled;
514 }
515 }
516 }
517 }
518 }
519 }
520 }
521 if let Some(line) = line_buf.flush() {
525 if let Some(event) = sse_parser.push_line(&line) {
526 if let Some(data) = event.data {
527 let _ = tx.send(ToolChunk::intermediate(data));
528 }
529 }
530 }
531 DrainOutcome::Completed
532}
533
534async fn drain_ndjson<S>(
538 byte_stream: &mut S,
539 cancel: &crate::cancel_token::CancellationFlag,
540 tx: &tokio::sync::mpsc::UnboundedSender<ToolChunk>,
541) -> DrainOutcome
542where
543 S: futures::Stream<Item = reqwest::Result<Bytes>> + Unpin + Send,
544{
545 let mut line_buf = LineBuffer::new();
546 loop {
547 if cancel.is_cancelled() {
548 return DrainOutcome::Cancelled;
549 }
550 match byte_stream.next().await {
551 None => break,
552 Some(Err(e)) => {
553 return DrainOutcome::Error(format!("NDJSON stream chunk error: {e}"))
554 }
555 Some(Ok(bytes)) => {
556 let lines = line_buf.push(&bytes);
557 for line in lines {
558 if !line.is_empty()
559 && tx.send(ToolChunk::intermediate(line)).is_err()
560 {
561 return DrainOutcome::Cancelled;
562 }
563 }
564 }
565 }
566 }
567 if let Some(line) = line_buf.flush() {
568 if !line.is_empty() {
569 let _ = tx.send(ToolChunk::intermediate(line));
570 }
571 }
572 DrainOutcome::Completed
573}
574
575async fn drain_single<S>(
579 byte_stream: &mut S,
580 cancel: &crate::cancel_token::CancellationFlag,
581 tx: &tokio::sync::mpsc::UnboundedSender<ToolChunk>,
582) -> DrainOutcome
583where
584 S: futures::Stream<Item = reqwest::Result<Bytes>> + Unpin + Send,
585{
586 let mut acc: Vec<u8> = Vec::new();
587 loop {
588 if cancel.is_cancelled() {
589 return DrainOutcome::Cancelled;
590 }
591 match byte_stream.next().await {
592 None => break,
593 Some(Err(e)) => {
594 return DrainOutcome::Error(format!("HTTP body chunk error: {e}"))
595 }
596 Some(Ok(bytes)) => acc.extend_from_slice(&bytes),
597 }
598 }
599 let body_text = String::from_utf8_lossy(&acc).into_owned();
600 if !body_text.is_empty()
601 && tx
602 .send(ToolChunk::intermediate(body_text))
603 .is_err()
604 {
605 return DrainOutcome::Cancelled;
606 }
607 DrainOutcome::Completed
608}
609
610#[cfg(test)]
613mod tests {
614 use super::*;
615 use crate::tool_registry::{ToolEntry, ToolSource};
616
617 fn make_http_entry(name: &str, url: &str, timeout: &str) -> ToolEntry {
618 ToolEntry {
619 name: name.to_string(),
620 provider: "http".to_string(),
621 timeout: timeout.to_string(),
622 runtime: url.to_string(),
623 sandbox: None,
624 max_results: None,
625 output_schema: "JSON".to_string(),
626 effect_row: vec!["network".to_string()],
627 parameters: Vec::new(),
628 source: ToolSource::Program,
629 is_streaming: false,
633 }
634 }
635
636 #[test]
639 fn parse_timeout_seconds() {
640 assert_eq!(parse_timeout("10s"), Some(Duration::from_secs(10)));
641 assert_eq!(parse_timeout("30s"), Some(Duration::from_secs(30)));
642 }
643
644 #[test]
645 fn parse_timeout_milliseconds() {
646 assert_eq!(parse_timeout("500ms"), Some(Duration::from_millis(500)));
647 assert_eq!(parse_timeout("100ms"), Some(Duration::from_millis(100)));
648 }
649
650 #[test]
651 fn parse_timeout_minutes() {
652 assert_eq!(parse_timeout("2m"), Some(Duration::from_secs(120)));
653 }
654
655 #[test]
656 fn parse_timeout_raw_number() {
657 assert_eq!(parse_timeout("15"), Some(Duration::from_secs(15)));
658 }
659
660 #[test]
661 fn parse_timeout_empty() {
662 assert_eq!(parse_timeout(""), None);
663 assert_eq!(parse_timeout(" "), None);
664 }
665
666 #[test]
667 fn parse_timeout_invalid() {
668 assert_eq!(parse_timeout("abc"), None);
669 assert_eq!(parse_timeout("10x"), None);
670 }
671
672 #[test]
675 fn dispatch_empty_url_fails() {
676 let entry = make_http_entry("DataAPI", "", "10s");
677 let result = dispatch_http(&entry, "test query");
678 assert!(!result.success);
679 assert!(result.output.contains("no endpoint URL"));
680 }
681
682 #[test]
683 fn dispatch_invalid_url_scheme_fails() {
684 let entry = make_http_entry("DataAPI", "ftp://example.com", "10s");
685 let result = dispatch_http(&entry, "test query");
686 assert!(!result.success);
687 assert!(result.output.contains("invalid URL"));
688 assert!(result.output.contains("http://"));
689 }
690
691 #[test]
694 fn dispatch_connection_refused() {
695 let entry = make_http_entry("TestTool", "http://127.0.0.1:1/api", "2s");
697 let result = dispatch_http(&entry, "test");
698 assert!(!result.success);
699 assert!(
700 result.output.contains("connection failed")
701 || result.output.contains("request failed")
702 || result.output.contains("timed out"),
703 "unexpected error: {}",
704 result.output
705 );
706 }
707
708 #[test]
711 fn json_body_passthrough() {
712 let arg = r#"{"query": "test"}"#;
714 let body = if arg.trim_start().starts_with('{') {
715 arg.to_string()
716 } else {
717 serde_json::json!({ "input": arg }).to_string()
718 };
719 assert_eq!(body, r#"{"query": "test"}"#);
720 }
721
722 #[test]
723 fn plain_text_wrapped() {
724 let arg = "search for cats";
726 let body = if arg.trim_start().starts_with('{') || arg.trim_start().starts_with('[') {
727 arg.to_string()
728 } else {
729 serde_json::json!({ "input": arg }).to_string()
730 };
731 let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
732 assert_eq!(parsed["input"], "search for cats");
733 }
734
735 #[test]
736 fn array_body_passthrough() {
737 let arg = r#"[1, 2, 3]"#;
738 let body = if arg.trim_start().starts_with('{') || arg.trim_start().starts_with('[') {
739 arg.to_string()
740 } else {
741 serde_json::json!({ "input": arg }).to_string()
742 };
743 assert_eq!(body, "[1, 2, 3]");
744 }
745}