Skip to main content

axon/
http_tool.rs

1//! HTTP tool provider — executes tool calls as REST requests via reqwest.
2//!
3//! Tools declared with `provider: http` in .axon files dispatch their
4//! argument as the request body to the URL specified in `runtime`.
5//!
6//! Request format:
7//!   POST {runtime_url}
8//!   Content-Type: application/json
9//!   X-Axon-Tool: {tool_name}
10//!
11//!   Body: the tool argument (string, sent as JSON-wrapped if not already JSON)
12//!
13//! Response handling:
14//!   - 2xx: response body becomes tool output (success)
15//!   - 4xx/5xx: error message with status code (failure)
16//!   - Connection error: descriptive error (failure)
17//!
18//! Timeout: parsed from ToolEntry.timeout field (e.g., "10s", "500ms").
19//! Default timeout: 30 seconds.
20//!
21//! §Fase 34.e (v1.29.0) — Streaming surface via [`HttpStreamingTool`].
22//! The async-trait Tool impl drives the upstream HTTP request via
23//! `reqwest::Client` (async) + drains the response body chunk-by-chunk.
24//! Content-Type drives framing:
25//!   - `text/event-stream` → per-W3C-SSE-event ToolChunks
26//!   - `application/x-ndjson` / `application/jsonl` → per-line ToolChunks
27//!   - Other (raw bytes, JSON, etc.) → single-chunk wrap (D9 backwards-
28//!     compat for non-streaming HTTP endpoints)
29//! Per-chunk cancel poll honors the D5 ≤100ms budget.
30
31use std::time::Duration;
32
33use crate::tool_executor::ToolResult;
34use crate::tool_registry::ToolEntry;
35
36// ── Timeout parsing ───────────────────────────────────────────────────────
37
38/// Parse a timeout string like "10s", "500ms", "2m" into Duration.
39/// Returns None for empty or unparseable values.
40fn parse_timeout(s: &str) -> Option<Duration> {
41    let s = s.trim();
42    if s.is_empty() {
43        return None;
44    }
45
46    if let Some(secs) = s.strip_suffix("ms") {
47        secs.trim().parse::<u64>().ok().map(Duration::from_millis)
48    } else if let Some(secs) = s.strip_suffix('s') {
49        secs.trim().parse::<u64>().ok().map(Duration::from_secs)
50    } else if let Some(mins) = s.strip_suffix('m') {
51        mins.trim()
52            .parse::<u64>()
53            .ok()
54            .map(|m| Duration::from_secs(m * 60))
55    } else {
56        // Try as raw seconds
57        s.parse::<u64>().ok().map(Duration::from_secs)
58    }
59}
60
61/// Public accessor for timeout parsing (used by emcp module).
62pub fn parse_timeout_pub(s: &str) -> Option<Duration> {
63    parse_timeout(s)
64}
65
66const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
67
68// ── HTTP dispatch ─────────────────────────────────────────────────────────
69
70/// Execute an HTTP tool call.
71///
72/// - `entry`: the tool's registry entry (must have provider == "http")
73/// - `argument`: the argument string from the use_tool step
74///
75/// Returns a ToolResult with the HTTP response body on success,
76/// or an error description on failure.
77pub fn dispatch_http(entry: &ToolEntry, argument: &str) -> ToolResult {
78    let url = entry.runtime.trim();
79
80    if url.is_empty() {
81        return ToolResult {
82            success: false,
83            output: format!(
84                "HTTP tool '{}': no endpoint URL. Set runtime: \"https://...\" in tool definition.",
85                entry.name
86            ),
87            tool_name: entry.name.clone(),
88        };
89    }
90
91    // Validate URL scheme
92    if !url.starts_with("http://") && !url.starts_with("https://") {
93        return ToolResult {
94            success: false,
95            output: format!(
96                "HTTP tool '{}': invalid URL '{}'. Must start with http:// or https://.",
97                entry.name, url
98            ),
99            tool_name: entry.name.clone(),
100        };
101    }
102
103    let timeout = parse_timeout(&entry.timeout).unwrap_or(DEFAULT_TIMEOUT);
104
105    // Build the request body — wrap as JSON string if not already JSON
106    let body = if argument.trim_start().starts_with('{') || argument.trim_start().starts_with('[') {
107        argument.to_string()
108    } else {
109        serde_json::json!({ "input": argument }).to_string()
110    };
111
112    // Execute the HTTP request
113    match execute_request(url, &entry.name, &body, timeout) {
114        Ok(response) => response,
115        Err(e) => ToolResult {
116            success: false,
117            output: format!("HTTP tool '{}': {}", entry.name, e),
118            tool_name: entry.name.clone(),
119        },
120    }
121}
122
123/// Perform the actual HTTP POST request.
124fn execute_request(
125    url: &str,
126    tool_name: &str,
127    body: &str,
128    timeout: Duration,
129) -> Result<ToolResult, String> {
130    let client = reqwest::blocking::Client::builder()
131        .timeout(timeout)
132        .build()
133        .map_err(|e| format!("failed to create HTTP client: {e}"))?;
134
135    let response = client
136        .post(url)
137        .header("Content-Type", "application/json")
138        .header("X-Axon-Tool", tool_name)
139        .body(body.to_string())
140        .send()
141        .map_err(|e| {
142            if e.is_timeout() {
143                format!("request timed out after {}s", timeout.as_secs())
144            } else if e.is_connect() {
145                format!("connection failed to {url}")
146            } else {
147                format!("request failed: {e}")
148            }
149        })?;
150
151    let status = response.status();
152    let response_body = response
153        .text()
154        .map_err(|e| format!("failed to read response body: {e}"))?;
155
156    if status.is_success() {
157        Ok(ToolResult {
158            success: true,
159            output: response_body,
160            tool_name: tool_name.to_string(),
161        })
162    } else {
163        Ok(ToolResult {
164            success: false,
165            output: format!(
166                "HTTP {}: {}",
167                status.as_u16(),
168                if response_body.len() > 200 {
169                    format!("{}...", &response_body[..200])
170                } else {
171                    response_body
172                }
173            ),
174            tool_name: tool_name.to_string(),
175        })
176    }
177}
178
179// ════════════════════════════════════════════════════════════════════════════
180//  §Fase 34.e — HttpStreamingTool: async-trait Tool impl with per-chunk
181//  streaming wire emission via reqwest::Response::bytes_stream().
182// ════════════════════════════════════════════════════════════════════════════
183
184use async_trait::async_trait;
185use bytes::Bytes;
186use futures::StreamExt;
187
188use crate::backends::sse_streaming::{LineBuffer, SseEventParser};
189use crate::tool_trait::{Tool, ToolChunk, ToolContext, ToolFinishReason, ToolStream};
190
191/// HTTP tool with first-class streaming surface (Fase 34.e).
192///
193/// `stream()` drives a `reqwest::Client::post(url)` async request +
194/// drains `response.bytes_stream()` chunk-by-chunk. Content-Type
195/// header decides framing:
196///
197/// - **`text/event-stream`** → SSE per W3C spec. Each `data:` field
198///   from a complete SSE event emits as a `ToolChunk::intermediate`.
199///   `event:` / `id:` / `retry:` fields are dropped (the adopter sees
200///   only the data payload — the framing was for HTTP transport).
201/// - **`application/x-ndjson`** / **`application/jsonl`** → one
202///   `ToolChunk::intermediate` per LF-delimited line. Empty lines
203///   are skipped.
204/// - **Other** (`application/json`, `text/plain`, raw bytes) →
205///   single `ToolChunk::intermediate` with the full body accumulated,
206///   then a terminator. This is the D9 backwards-compat path for
207///   non-streaming HTTP endpoints — the same response shape
208///   [`dispatch_http`] returns synchronously, projected onto the
209///   single-chunk streaming surface.
210///
211/// # Cancel discipline (D5)
212///
213/// `ctx.cancel` is polled between every `bytes_stream().next().await`
214/// boundary. When fired, the stream drops the `reqwest::Response`
215/// (closing the connection) + emits a single
216/// `ToolFinishReason::Cancelled` terminator chunk. Wall-clock budget
217/// is bounded by the network roundtrip latency to the next chunk
218/// arrival (typically ≤100ms for SSE streams with regular keepalive).
219///
220/// # Error discipline
221///
222/// Every failure surface (URL invalid / client build / connect /
223/// timeout / non-2xx status / mid-stream byte error / I/O error)
224/// is captured as a `ToolFinishReason::Error { message }` terminator
225/// chunk — the consumer never sees a panic or a silently-truncated
226/// stream.
227pub struct HttpStreamingTool {
228    name: String,
229    url: String,
230    timeout: Duration,
231}
232
233impl HttpStreamingTool {
234    /// Construct from a registry [`ToolEntry`]. Validates the URL +
235    /// extracts the timeout. Returns `Err` with adopter-facing
236    /// diagnostic when the URL is missing or has an invalid scheme.
237    pub fn from_entry(entry: &ToolEntry) -> Result<Self, String> {
238        let url = entry.runtime.trim();
239        if url.is_empty() {
240            return Err(format!(
241                "HTTP tool '{}': no endpoint URL. Set runtime: \"https://...\" in tool definition.",
242                entry.name
243            ));
244        }
245        if !url.starts_with("http://") && !url.starts_with("https://") {
246            return Err(format!(
247                "HTTP tool '{}': invalid URL '{}'. Must start with http:// or https://.",
248                entry.name, url
249            ));
250        }
251        let timeout = parse_timeout(&entry.timeout).unwrap_or(DEFAULT_TIMEOUT);
252        Ok(Self {
253            name: entry.name.clone(),
254            url: url.to_string(),
255            timeout,
256        })
257    }
258
259    /// Public new() ctor for tests + adopters who construct directly
260    /// without a registry entry.
261    pub fn new(name: String, url: String, timeout: Duration) -> Self {
262        Self { name, url, timeout }
263    }
264}
265
266/// Build the request body — wrap as JSON `{ "input": args }` if not
267/// already JSON. Same logic as [`dispatch_http`].
268fn build_request_body(args: &str) -> String {
269    let trimmed = args.trim_start();
270    if trimmed.starts_with('{') || trimmed.starts_with('[') {
271        args.to_string()
272    } else {
273        serde_json::json!({ "input": args }).to_string()
274    }
275}
276
277/// Classify an HTTP Content-Type header into the framing mode the
278/// streaming tool will use.
279#[derive(Debug, Clone, Copy, PartialEq, Eq)]
280enum FramingMode {
281    /// W3C Server-Sent Events. Drain via [`LineBuffer`] +
282    /// [`SseEventParser`]; emit each event's `data:` field as a
283    /// `ToolChunk`.
284    Sse,
285    /// Newline-delimited JSON. Drain via [`LineBuffer`]; emit each
286    /// non-empty line as a `ToolChunk`.
287    Ndjson,
288    /// Anything else. Accumulate full body + emit as 1 chunk +
289    /// terminator. D9-style backwards-compat for non-streaming
290    /// HTTP endpoints.
291    Single,
292}
293
294fn classify_framing(content_type: &str) -> FramingMode {
295    let lc = content_type.to_ascii_lowercase();
296    if lc.contains("text/event-stream") {
297        FramingMode::Sse
298    } else if lc.contains("application/x-ndjson") || lc.contains("application/jsonl") {
299        FramingMode::Ndjson
300    } else {
301        FramingMode::Single
302    }
303}
304
305#[async_trait]
306impl Tool for HttpStreamingTool {
307    async fn execute(&self, args: String, _ctx: ToolContext) -> ToolResult {
308        // Synchronous path — adopters calling execute() directly get
309        // the legacy [`dispatch_http`] behavior verbatim. The
310        // streaming path drives `stream()` exclusively.
311        //
312        // [`dispatch_http`] uses `reqwest::blocking::Client` (it
313        // existed pre-async-trait). Calling blocking-reqwest from
314        // inside a tokio runtime panics; we wrap the call with
315        // `spawn_blocking` so the synchronous client runs on tokio's
316        // blocking pool. Output is byte-equal to dispatch_http (D9).
317        let entry = ToolEntry {
318            name: self.name.clone(),
319            provider: "http".to_string(),
320            timeout: format!("{}s", self.timeout.as_secs()),
321            runtime: self.url.clone(),
322            sandbox: None,
323            max_results: None,
324            output_schema: String::new(),
325            effect_row: Vec::new(),
326            // §Fase 58.f.2 — reconstructed entry for the legacy sync
327            // delegate; no typed input schema needed on this path.
328            parameters: Vec::new(),
329            source: crate::tool_registry::ToolSource::Program,
330            is_streaming: false,
331        };
332        match tokio::task::spawn_blocking(move || dispatch_http(&entry, &args)).await {
333            Ok(result) => result,
334            Err(e) => ToolResult {
335                success: false,
336                output: format!("HTTP tool '{}': blocking task join failed: {e}", self.name),
337                tool_name: self.name.clone(),
338            },
339        }
340    }
341
342    async fn stream(&self, args: String, ctx: ToolContext) -> ToolStream {
343        let url = self.url.clone();
344        let name = self.name.clone();
345        let timeout = self.timeout;
346        let cancel = ctx.cancel.clone();
347        let body = build_request_body(&args);
348
349        // mpsc + spawn pattern: the background task drives the HTTP
350        // request + drains chunks into the channel; the returned
351        // stream wraps the receiver. This gives us real per-chunk
352        // streaming (chunks reach the dispatcher AS they arrive from
353        // upstream) without requiring async-stream macro.
354        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<ToolChunk>();
355
356        tokio::spawn(async move {
357            // Helper: send the terminator + drop tx so the consumer's
358            // stream ends cleanly. Returning `Err` from a sub-step
359            // sends an Error-terminator; reaching the natural end of
360            // the body sends a Stop-terminator.
361            let send_terminator = |reason: ToolFinishReason| {
362                let _ = tx.send(ToolChunk::terminator("", reason));
363            };
364
365            // Pre-flight cancel check.
366            if cancel.is_cancelled() {
367                send_terminator(ToolFinishReason::Cancelled);
368                return;
369            }
370
371            // 1. Build async client.
372            let client = match reqwest::Client::builder().timeout(timeout).build() {
373                Ok(c) => c,
374                Err(e) => {
375                    send_terminator(ToolFinishReason::Error {
376                        message: format!(
377                            "HTTP tool '{name}': failed to build async client: {e}"
378                        ),
379                    });
380                    return;
381                }
382            };
383
384            // 2. Issue request.
385            let response = match client
386                .post(&url)
387                .header("Content-Type", "application/json")
388                .header("X-Axon-Tool", &name)
389                .body(body)
390                .send()
391                .await
392            {
393                Ok(r) => r,
394                Err(e) => {
395                    let message = if e.is_timeout() {
396                        format!(
397                            "HTTP tool '{name}': request timed out after {}s",
398                            timeout.as_secs()
399                        )
400                    } else if e.is_connect() {
401                        format!("HTTP tool '{name}': connection failed to {url}")
402                    } else {
403                        format!("HTTP tool '{name}': request failed: {e}")
404                    };
405                    send_terminator(ToolFinishReason::Error { message });
406                    return;
407                }
408            };
409
410            // 3. Non-2xx → error terminator with status code +
411            //    truncated body. Mirrors dispatch_http's diagnostic
412            //    shape.
413            let status = response.status();
414            if !status.is_success() {
415                let body_text = response.text().await.unwrap_or_default();
416                let truncated = if body_text.len() > 200 {
417                    format!("{}...", &body_text[..200])
418                } else {
419                    body_text
420                };
421                send_terminator(ToolFinishReason::Error {
422                    message: format!("HTTP {}: {}", status.as_u16(), truncated),
423                });
424                return;
425            }
426
427            // 4. Read Content-Type header → classify framing.
428            let content_type = response
429                .headers()
430                .get(reqwest::header::CONTENT_TYPE)
431                .and_then(|v| v.to_str().ok())
432                .unwrap_or("")
433                .to_string();
434            let framing = classify_framing(&content_type);
435
436            // 5. Drain the body byte-stream per framing mode.
437            let mut byte_stream = response.bytes_stream();
438            let drain_result = match framing {
439                FramingMode::Sse => {
440                    drain_sse(&mut byte_stream, &cancel, &tx).await
441                }
442                FramingMode::Ndjson => {
443                    drain_ndjson(&mut byte_stream, &cancel, &tx).await
444                }
445                FramingMode::Single => {
446                    drain_single(&mut byte_stream, &cancel, &tx).await
447                }
448            };
449
450            match drain_result {
451                DrainOutcome::Completed => send_terminator(ToolFinishReason::Stop),
452                DrainOutcome::Cancelled => send_terminator(ToolFinishReason::Cancelled),
453                DrainOutcome::Error(message) => {
454                    send_terminator(ToolFinishReason::Error { message })
455                }
456            }
457        });
458
459        // Wrap the receiver as a Stream. Each `recv().await` yields
460        // a ToolChunk + holds the channel open until the producer
461        // task drops `tx`.
462        Box::pin(futures::stream::unfold(rx, |mut rx| async move {
463            rx.recv().await.map(|chunk| (chunk, rx))
464        }))
465    }
466
467    fn is_streaming(&self) -> bool {
468        true
469    }
470}
471
472/// Per-framing-mode drain outcome. Drives the terminator decision
473/// in the spawned task without leaking implementation details.
474enum DrainOutcome {
475    Completed,
476    Cancelled,
477    Error(String),
478}
479
480/// Drain SSE framing. Reuses the battle-tested
481/// [`crate::backends::sse_streaming::LineBuffer`] +
482/// [`crate::backends::sse_streaming::SseEventParser`] from Fase 33.d
483/// so every adopter-emitted SSE shape (CRLF normalization, CR strip,
484/// multi-line data field, comment lines) is honored verbatim.
485async fn drain_sse<S>(
486    byte_stream: &mut S,
487    cancel: &crate::cancel_token::CancellationFlag,
488    tx: &tokio::sync::mpsc::UnboundedSender<ToolChunk>,
489) -> DrainOutcome
490where
491    S: futures::Stream<Item = reqwest::Result<Bytes>> + Unpin + Send,
492{
493    let mut line_buf = LineBuffer::new();
494    let mut sse_parser = SseEventParser::new();
495    loop {
496        if cancel.is_cancelled() {
497            return DrainOutcome::Cancelled;
498        }
499        match byte_stream.next().await {
500            None => break,
501            Some(Err(e)) => {
502                return DrainOutcome::Error(format!("SSE stream chunk error: {e}"))
503            }
504            Some(Ok(bytes)) => {
505                let lines = line_buf.push(&bytes);
506                for line in lines {
507                    if let Some(event) = sse_parser.push_line(&line) {
508                        if let Some(data) = event.data {
509                            if tx
510                                .send(ToolChunk::intermediate(data))
511                                .is_err()
512                            {
513                                return DrainOutcome::Cancelled;
514                            }
515                        }
516                    }
517                }
518            }
519        }
520    }
521    // Flush trailing line (events without a final blank-line
522    // terminator) — push it into the parser; if it completes an
523    // event, emit it.
524    if let Some(line) = line_buf.flush() {
525        if let Some(event) = sse_parser.push_line(&line) {
526            if let Some(data) = event.data {
527                let _ = tx.send(ToolChunk::intermediate(data));
528            }
529        }
530    }
531    DrainOutcome::Completed
532}
533
534/// Drain NDJSON framing. Each LF-delimited line emits as a
535/// `ToolChunk::intermediate`. Empty lines are skipped per
536/// `application/x-ndjson` spec.
537async fn drain_ndjson<S>(
538    byte_stream: &mut S,
539    cancel: &crate::cancel_token::CancellationFlag,
540    tx: &tokio::sync::mpsc::UnboundedSender<ToolChunk>,
541) -> DrainOutcome
542where
543    S: futures::Stream<Item = reqwest::Result<Bytes>> + Unpin + Send,
544{
545    let mut line_buf = LineBuffer::new();
546    loop {
547        if cancel.is_cancelled() {
548            return DrainOutcome::Cancelled;
549        }
550        match byte_stream.next().await {
551            None => break,
552            Some(Err(e)) => {
553                return DrainOutcome::Error(format!("NDJSON stream chunk error: {e}"))
554            }
555            Some(Ok(bytes)) => {
556                let lines = line_buf.push(&bytes);
557                for line in lines {
558                    if !line.is_empty()
559                        && tx.send(ToolChunk::intermediate(line)).is_err()
560                    {
561                        return DrainOutcome::Cancelled;
562                    }
563                }
564            }
565        }
566    }
567    if let Some(line) = line_buf.flush() {
568        if !line.is_empty() {
569            let _ = tx.send(ToolChunk::intermediate(line));
570        }
571    }
572    DrainOutcome::Completed
573}
574
575/// Drain single-chunk framing. Accumulate the full body + emit as
576/// 1 `ToolChunk::intermediate` (terminator follows from the caller).
577/// D9 backwards-compat for non-streaming HTTP endpoints.
578async fn drain_single<S>(
579    byte_stream: &mut S,
580    cancel: &crate::cancel_token::CancellationFlag,
581    tx: &tokio::sync::mpsc::UnboundedSender<ToolChunk>,
582) -> DrainOutcome
583where
584    S: futures::Stream<Item = reqwest::Result<Bytes>> + Unpin + Send,
585{
586    let mut acc: Vec<u8> = Vec::new();
587    loop {
588        if cancel.is_cancelled() {
589            return DrainOutcome::Cancelled;
590        }
591        match byte_stream.next().await {
592            None => break,
593            Some(Err(e)) => {
594                return DrainOutcome::Error(format!("HTTP body chunk error: {e}"))
595            }
596            Some(Ok(bytes)) => acc.extend_from_slice(&bytes),
597        }
598    }
599    let body_text = String::from_utf8_lossy(&acc).into_owned();
600    if !body_text.is_empty()
601        && tx
602            .send(ToolChunk::intermediate(body_text))
603            .is_err()
604    {
605        return DrainOutcome::Cancelled;
606    }
607    DrainOutcome::Completed
608}
609
610// ── Tests ─────────────────────────────────────────────────────────────────
611
612#[cfg(test)]
613mod tests {
614    use super::*;
615    use crate::tool_registry::{ToolEntry, ToolSource};
616
617    fn make_http_entry(name: &str, url: &str, timeout: &str) -> ToolEntry {
618        ToolEntry {
619            name: name.to_string(),
620            provider: "http".to_string(),
621            timeout: timeout.to_string(),
622            runtime: url.to_string(),
623            sandbox: None,
624            max_results: None,
625            output_schema: "JSON".to_string(),
626            effect_row: vec!["network".to_string()],
627            parameters: Vec::new(),
628            source: ToolSource::Program,
629            // §Fase 34.c — HTTP tools default to non-streaming; effect_row
630            // carries `network` but no `stream:` prefix. HTTP streaming
631            // (SSE-aware adapter consuming upstream SSE) lands in Fase 34.e.
632            is_streaming: false,
633        }
634    }
635
636    // ── Timeout parsing ───────────────────────────────────────────
637
638    #[test]
639    fn parse_timeout_seconds() {
640        assert_eq!(parse_timeout("10s"), Some(Duration::from_secs(10)));
641        assert_eq!(parse_timeout("30s"), Some(Duration::from_secs(30)));
642    }
643
644    #[test]
645    fn parse_timeout_milliseconds() {
646        assert_eq!(parse_timeout("500ms"), Some(Duration::from_millis(500)));
647        assert_eq!(parse_timeout("100ms"), Some(Duration::from_millis(100)));
648    }
649
650    #[test]
651    fn parse_timeout_minutes() {
652        assert_eq!(parse_timeout("2m"), Some(Duration::from_secs(120)));
653    }
654
655    #[test]
656    fn parse_timeout_raw_number() {
657        assert_eq!(parse_timeout("15"), Some(Duration::from_secs(15)));
658    }
659
660    #[test]
661    fn parse_timeout_empty() {
662        assert_eq!(parse_timeout(""), None);
663        assert_eq!(parse_timeout("  "), None);
664    }
665
666    #[test]
667    fn parse_timeout_invalid() {
668        assert_eq!(parse_timeout("abc"), None);
669        assert_eq!(parse_timeout("10x"), None);
670    }
671
672    // ── URL validation ────────────────────────────────────────────
673
674    #[test]
675    fn dispatch_empty_url_fails() {
676        let entry = make_http_entry("DataAPI", "", "10s");
677        let result = dispatch_http(&entry, "test query");
678        assert!(!result.success);
679        assert!(result.output.contains("no endpoint URL"));
680    }
681
682    #[test]
683    fn dispatch_invalid_url_scheme_fails() {
684        let entry = make_http_entry("DataAPI", "ftp://example.com", "10s");
685        let result = dispatch_http(&entry, "test query");
686        assert!(!result.success);
687        assert!(result.output.contains("invalid URL"));
688        assert!(result.output.contains("http://"));
689    }
690
691    // ── Connection errors (no server) ─────────────────────────────
692
693    #[test]
694    fn dispatch_connection_refused() {
695        // Port 1 is almost certainly not listening
696        let entry = make_http_entry("TestTool", "http://127.0.0.1:1/api", "2s");
697        let result = dispatch_http(&entry, "test");
698        assert!(!result.success);
699        assert!(
700            result.output.contains("connection failed")
701                || result.output.contains("request failed")
702                || result.output.contains("timed out"),
703            "unexpected error: {}",
704            result.output
705        );
706    }
707
708    // ── Body wrapping ─────────────────────────────────────────────
709
710    #[test]
711    fn json_body_passthrough() {
712        // If argument is already JSON, it should be sent as-is
713        let arg = r#"{"query": "test"}"#;
714        let body = if arg.trim_start().starts_with('{') {
715            arg.to_string()
716        } else {
717            serde_json::json!({ "input": arg }).to_string()
718        };
719        assert_eq!(body, r#"{"query": "test"}"#);
720    }
721
722    #[test]
723    fn plain_text_wrapped() {
724        // If argument is plain text, it should be wrapped
725        let arg = "search for cats";
726        let body = if arg.trim_start().starts_with('{') || arg.trim_start().starts_with('[') {
727            arg.to_string()
728        } else {
729            serde_json::json!({ "input": arg }).to_string()
730        };
731        let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
732        assert_eq!(parsed["input"], "search for cats");
733    }
734
735    #[test]
736    fn array_body_passthrough() {
737        let arg = r#"[1, 2, 3]"#;
738        let body = if arg.trim_start().starts_with('{') || arg.trim_start().starts_with('[') {
739            arg.to_string()
740        } else {
741            serde_json::json!({ "input": arg }).to_string()
742        };
743        assert_eq!(body, "[1, 2, 3]");
744    }
745}