Skip to main content

pi/connectors/
http.rs

1//! HTTP/network connector with policy-gated access.
2//!
3//! Provides basic fetch (GET/POST) with:
4//! - Host allowlist/denylist
5//! - TLS required by default
6//! - Request timeouts and size limits
7//! - Structured logging for audit trail
8
9use super::{
10    Connector, HostCallErrorCode, HostCallPayload, HostResultPayload, host_result_err,
11    host_result_err_with_details, host_result_ok,
12};
13use crate::error::Result;
14use crate::http::client::Client;
15use asupersync::time::{timeout, wall_now};
16use async_trait::async_trait;
17use futures::Stream;
18use futures::StreamExt;
19use serde::{Deserialize, Serialize};
20use serde_json::{Value, json};
21use std::collections::HashMap;
22use std::pin::Pin;
23use std::time::{Duration, Instant};
24use tracing::{debug, info, warn};
25
26/// Validation error with error code and message.
27type ValidationError = (HostCallErrorCode, String);
28
29/// Configuration for the HTTP connector.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct HttpConnectorConfig {
32    /// Host patterns to allow (glob-style: "*.example.com", "api.github.com")
33    #[serde(default)]
34    pub allowlist: Vec<String>,
35
36    /// Host patterns to deny (takes precedence over allowlist)
37    #[serde(default)]
38    pub denylist: Vec<String>,
39
40    /// When true, an empty allowlist denies all outbound hosts.
41    #[serde(default)]
42    pub enforce_allowlist: bool,
43
44    /// Require TLS for all requests (default: true)
45    #[serde(default = "default_require_tls")]
46    pub require_tls: bool,
47
48    /// Maximum request body size in bytes (default: 10MB)
49    #[serde(default = "default_max_request_bytes")]
50    pub max_request_bytes: usize,
51
52    /// Maximum response body size in bytes (default: 50MB)
53    #[serde(default = "default_max_response_bytes")]
54    pub max_response_bytes: usize,
55
56    /// Default timeout in milliseconds (default: 30000)
57    #[serde(default = "default_timeout_ms")]
58    pub default_timeout_ms: u64,
59}
60
61const fn default_require_tls() -> bool {
62    true
63}
64
65const fn default_max_request_bytes() -> usize {
66    10 * 1024 * 1024 // 10MB
67}
68
69const fn default_max_response_bytes() -> usize {
70    50 * 1024 * 1024 // 50MB
71}
72
73const fn default_timeout_ms() -> u64 {
74    30_000 // 30 seconds
75}
76
77impl Default for HttpConnectorConfig {
78    fn default() -> Self {
79        Self {
80            allowlist: Vec::new(),
81            denylist: Vec::new(),
82            enforce_allowlist: false,
83            require_tls: default_require_tls(),
84            max_request_bytes: default_max_request_bytes(),
85            max_response_bytes: default_max_response_bytes(),
86            default_timeout_ms: default_timeout_ms(),
87        }
88    }
89}
90
91/// HTTP request parameters from hostcall.
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct HttpRequest {
94    /// The URL to fetch
95    pub url: String,
96
97    /// HTTP method (GET, POST)
98    #[serde(default = "default_method")]
99    pub method: String,
100
101    /// Request headers
102    #[serde(default)]
103    pub headers: HashMap<String, String>,
104
105    /// Request body (for POST)
106    #[serde(default, skip_serializing_if = "Option::is_none")]
107    pub body: Option<String>,
108
109    /// Request body as bytes (base64-encoded)
110    #[serde(default, skip_serializing_if = "Option::is_none")]
111    pub body_bytes: Option<String>,
112
113    /// Override timeout in milliseconds
114    #[serde(default, skip_serializing_if = "Option::is_none")]
115    pub timeout_ms: Option<u64>,
116}
117
118fn default_method() -> String {
119    "GET".to_string()
120}
121
122/// HTTP response returned to extension.
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct HttpResponse {
125    /// HTTP status code
126    pub status: u16,
127
128    /// Response headers
129    pub headers: HashMap<String, String>,
130
131    /// Response body as string (if text)
132    #[serde(default, skip_serializing_if = "Option::is_none")]
133    pub body: Option<String>,
134
135    /// Response body as bytes (base64-encoded, if binary)
136    #[serde(default, skip_serializing_if = "Option::is_none")]
137    pub body_bytes: Option<String>,
138
139    /// Response body size in bytes
140    pub size_bytes: usize,
141
142    /// Request duration in milliseconds
143    pub duration_ms: u64,
144}
145
146/// Streaming HTTP response returned to the host dispatcher.
147///
148/// This intentionally returns only the response head plus a byte stream. The caller
149/// is responsible for chunking/decoding (UTF-8/base64), SSE parsing, idle timeouts,
150/// and delivering `StreamChunk` outcomes to the extension runtime.
151pub struct StreamingHttpResponse {
152    pub status: u16,
153    pub headers: HashMap<String, String>,
154    pub stream: Pin<Box<dyn Stream<Item = std::io::Result<Vec<u8>>> + Send>>,
155}
156
157/// HTTP connector for extension hostcalls.
158pub struct HttpConnector {
159    config: HttpConnectorConfig,
160    client: Client,
161}
162
163impl HttpConnector {
164    /// Create a new HTTP connector with the given configuration.
165    #[must_use]
166    pub fn new(config: HttpConnectorConfig) -> Self {
167        Self {
168            config,
169            client: Client::new(),
170        }
171    }
172
173    /// Create a new HTTP connector with default configuration.
174    #[must_use]
175    pub fn with_defaults() -> Self {
176        Self::new(HttpConnectorConfig::default())
177    }
178
179    /// Validate a URL against the policy.
180    fn validate_url(&self, url: &str) -> std::result::Result<(), ValidationError> {
181        // Parse URL to extract host
182        let parsed = url::Url::parse(url).map_err(|e| {
183            (
184                HostCallErrorCode::InvalidRequest,
185                format!("Invalid URL: {e}"),
186            )
187        })?;
188
189        // HTTP/HTTPS only
190        let scheme = parsed.scheme();
191        if scheme != "http" && scheme != "https" {
192            return Err((
193                HostCallErrorCode::InvalidRequest,
194                format!("Unsupported URL scheme: '{scheme}'"),
195            ));
196        }
197
198        // Check scheme (TLS requirement)
199        if self.config.require_tls && scheme == "http" {
200            return Err((
201                HostCallErrorCode::Denied,
202                format!("TLS required: URL scheme must be 'https', got '{scheme}'"),
203            ));
204        }
205
206        // Extract host
207        let host = parsed.host_str().ok_or_else(|| {
208            (
209                HostCallErrorCode::InvalidRequest,
210                "URL missing host".to_string(),
211            )
212        })?;
213
214        // Check denylist first (takes precedence)
215        if Self::matches_pattern_list(host, &self.config.denylist) {
216            return Err((
217                HostCallErrorCode::Denied,
218                format!("Host '{host}' is in denylist"),
219            ));
220        }
221
222        // Check allowlist. In strict mode, empty allowlist means deny-all.
223        let requires_allowlist = self.config.enforce_allowlist || !self.config.allowlist.is_empty();
224        if requires_allowlist && !Self::matches_pattern_list(host, &self.config.allowlist) {
225            return Err((
226                HostCallErrorCode::Denied,
227                format!(
228                    "Host '{host}' is not in allowlist; declare capability_manifest scope.hosts for http capability"
229                ),
230            ));
231        }
232
233        Ok(())
234    }
235
236    /// Check if a host matches any pattern in the list.
237    fn matches_pattern_list(host: &str, patterns: &[String]) -> bool {
238        let host_lower = host.to_ascii_lowercase();
239        patterns.iter().any(|pattern| {
240            let pattern_lower = pattern.to_ascii_lowercase();
241            pattern_lower.strip_prefix("*.").map_or_else(
242                || host_lower == pattern_lower,
243                |domain| {
244                    // Wildcard subdomain match: "*.example.com" matches "api.example.com"
245                    let suffix = pattern_lower.strip_prefix('*').unwrap_or(""); // ".example.com"
246                    host_lower.ends_with(suffix) || host_lower == domain
247                },
248            )
249        })
250    }
251
252    /// Parse and validate the HTTP request from hostcall params.
253    fn parse_request(&self, params: &Value) -> std::result::Result<HttpRequest, ValidationError> {
254        let mut request: HttpRequest = serde_json::from_value(params.clone()).map_err(|e| {
255            (
256                HostCallErrorCode::InvalidRequest,
257                format!("Invalid HTTP request params: {e}"),
258            )
259        })?;
260
261        if request.body.is_some() && request.body_bytes.is_some() {
262            return Err((
263                HostCallErrorCode::InvalidRequest,
264                "Request must specify either 'body' or 'body_bytes', not both".to_string(),
265            ));
266        }
267
268        // Validate method (connector supports GET/POST only)
269        let method_upper = request.method.to_ascii_uppercase();
270        if !matches!(method_upper.as_str(), "GET" | "POST") {
271            return Err((
272                HostCallErrorCode::InvalidRequest,
273                format!(
274                    "Invalid HTTP method: '{}'. Supported methods: GET, POST.",
275                    request.method
276                ),
277            ));
278        }
279
280        // Treat 0 as unset/absent to match core hostcall timeout semantics.
281        request.timeout_ms = request.timeout_ms.filter(|ms| *ms > 0);
282
283        // Validate body size
284        let body_size = request
285            .body
286            .as_ref()
287            .map(String::len)
288            .or_else(|| {
289                request.body_bytes.as_ref().map(|b| b.len() * 3 / 4) // base64 decode estimate
290            })
291            .unwrap_or(0);
292
293        if body_size > self.config.max_request_bytes {
294            return Err((
295                HostCallErrorCode::InvalidRequest,
296                format!(
297                    "Request body too large: {} bytes (max: {} bytes)",
298                    body_size, self.config.max_request_bytes
299                ),
300            ));
301        }
302
303        if method_upper == "GET" && (request.body.is_some() || request.body_bytes.is_some()) {
304            return Err((
305                HostCallErrorCode::InvalidRequest,
306                "GET requests cannot include a body".to_string(),
307            ));
308        }
309
310        Ok(request)
311    }
312
313    /// Execute the HTTP request.
314    async fn execute_request(&self, request: &HttpRequest) -> Result<HttpResponse> {
315        let start = Instant::now();
316
317        // Build request
318        let method_upper = request.method.to_ascii_uppercase();
319        let mut builder = match method_upper.as_str() {
320            "GET" => self.client.get(&request.url),
321            "POST" => self.client.post(&request.url),
322            _ => {
323                return Err(crate::error::Error::validation(format!(
324                    "Invalid HTTP method: '{}'. Supported methods: GET, POST.",
325                    request.method
326                )));
327            }
328        };
329
330        // Add headers
331        for (key, value) in &request.headers {
332            builder = builder.header(key, value);
333        }
334
335        // Add body if present
336        if let Some(body) = &request.body {
337            builder = builder.body(body.as_bytes().to_vec());
338        } else if let Some(body_bytes) = &request.body_bytes {
339            use base64::Engine;
340            let decoded = base64::engine::general_purpose::STANDARD
341                .decode(body_bytes)
342                .map_err(|e| {
343                    crate::error::Error::validation(format!("Invalid base64 body: {e}"))
344                })?;
345            builder = builder.body(decoded);
346        }
347
348        // Send request
349        let response = builder
350            .send()
351            .await
352            .map_err(|e| crate::error::Error::extension(format!("HTTP request failed: {e}")))?;
353
354        // Read response body with size limit
355        let status = response.status();
356        let response_headers: Vec<(String, String)> = response.headers().to_vec();
357
358        let mut body_bytes_vec = Vec::new();
359        let mut stream = response.bytes_stream();
360
361        while let Some(chunk_result) = stream.next().await {
362            let chunk: Vec<u8> = chunk_result
363                .map_err(|e| crate::error::Error::extension(format!("Read error: {e}")))?;
364            if body_bytes_vec.len() + chunk.len() > self.config.max_response_bytes {
365                return Err(crate::error::Error::extension(format!(
366                    "Response body too large (max: {} bytes)",
367                    self.config.max_response_bytes
368                )));
369            }
370            body_bytes_vec.extend_from_slice(&chunk);
371        }
372
373        let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
374        let size_bytes = body_bytes_vec.len();
375
376        // Convert headers to HashMap
377        let mut headers_map = HashMap::new();
378        for (key, value) in response_headers {
379            headers_map.insert(key, value);
380        }
381
382        // Try to decode body as UTF-8, fall back to base64.
383        let (body, body_bytes_b64) = String::from_utf8(body_bytes_vec).map_or_else(
384            |err| {
385                use base64::Engine;
386                let encoded = base64::engine::general_purpose::STANDARD.encode(err.into_bytes());
387                (None, Some(encoded))
388            },
389            |s| (Some(s), None),
390        );
391
392        Ok(HttpResponse {
393            status,
394            headers: headers_map,
395            body,
396            body_bytes: body_bytes_b64,
397            size_bytes,
398            duration_ms,
399        })
400    }
401
402    fn request_details(request: &HttpRequest, timeout_ms: u64) -> Value {
403        json!({
404            "url": request.url,
405            "method": request.method,
406            "timeout_ms": timeout_ms,
407        })
408    }
409
410    fn redact_url_for_log(url: &str) -> String {
411        url::Url::parse(url).map_or_else(
412            |_| url.split(['?', '#']).next().unwrap_or(url).to_string(),
413            |mut parsed| {
414                parsed.set_query(None);
415                parsed.set_fragment(None);
416                let _ = parsed.set_username("");
417                let _ = parsed.set_password(None);
418                parsed.to_string()
419            },
420        )
421    }
422
423    async fn dispatch_request(&self, call_id: &str, request: HttpRequest) -> HostResultPayload {
424        let log_url = Self::redact_url_for_log(&request.url);
425        // Validate URL against policy
426        if let Err((code, message)) = self.validate_url(&request.url) {
427            info!(
428                call_id = %call_id,
429                url = %log_url,
430                error = %message,
431                "HTTP connector: policy denied"
432            );
433            return host_result_err(call_id, code, message, None);
434        }
435
436        // Log request
437        debug!(
438            call_id = %call_id,
439            url = %log_url,
440            method = %request.method,
441            "HTTP connector: executing request"
442        );
443
444        // Execute request with timeout for the full request/response read.
445        let timeout_ms = request.timeout_ms.unwrap_or(self.config.default_timeout_ms);
446        let start = Instant::now();
447        let result = if timeout_ms == 0 {
448            Ok(self.execute_request(&request).await)
449        } else {
450            timeout(
451                wall_now(),
452                Duration::from_millis(timeout_ms),
453                Box::pin(self.execute_request(&request)),
454            )
455            .await
456        };
457
458        match result {
459            Ok(Ok(response)) => {
460                info!(
461                    call_id = %call_id,
462                    url = %log_url,
463                    status = %response.status,
464                    size_bytes = %response.size_bytes,
465                    duration_ms = %response.duration_ms,
466                    "HTTP connector: request completed"
467                );
468
469                let output = serde_json::to_value(&response)
470                    .unwrap_or_else(|_| json!({"error": "serialization_failed"}));
471
472                host_result_ok(call_id, output)
473            }
474            Ok(Err(e)) => {
475                if timeout_ms > 0 && start.elapsed() >= Duration::from_millis(timeout_ms) {
476                    let message = format!("Request timeout after {timeout_ms}ms");
477                    warn!(
478                        call_id = %call_id,
479                        url = %log_url,
480                        error = %message,
481                        "HTTP connector: request timed out"
482                    );
483
484                    return host_result_err_with_details(
485                        call_id,
486                        HostCallErrorCode::Timeout,
487                        &message,
488                        Self::request_details(&request, timeout_ms),
489                        Some(true),
490                    );
491                }
492
493                let message = e.to_string();
494                let code = match e {
495                    crate::error::Error::Validation(_) => HostCallErrorCode::InvalidRequest,
496                    _ => HostCallErrorCode::Io,
497                };
498
499                warn!(
500                    call_id = %call_id,
501                    url = %log_url,
502                    error = %message,
503                    "HTTP connector: request failed"
504                );
505
506                host_result_err_with_details(
507                    call_id,
508                    code,
509                    &message,
510                    Self::request_details(&request, timeout_ms),
511                    Some(false),
512                )
513            }
514            Err(_) => {
515                let message = format!("Request timeout after {timeout_ms}ms");
516                warn!(
517                    call_id = %call_id,
518                    url = %log_url,
519                    error = %message,
520                    "HTTP connector: request timed out"
521                );
522
523                host_result_err_with_details(
524                    call_id,
525                    HostCallErrorCode::Timeout,
526                    &message,
527                    Self::request_details(&request, timeout_ms),
528                    Some(true),
529                )
530            }
531        }
532    }
533
534    /// Dispatch an HTTP request but return a streaming response body instead of buffering it.
535    ///
536    /// Errors are returned as a `HostResultPayload` (taxonomy-correct) so the caller can
537    /// convert into `HostcallOutcome::Error` deterministically.
538    pub async fn dispatch_streaming(
539        &self,
540        call: &HostCallPayload,
541    ) -> std::result::Result<StreamingHttpResponse, HostResultPayload> {
542        let call_id = &call.call_id;
543        let method = call.method.to_ascii_lowercase();
544
545        if method != "http" {
546            warn!(
547                call_id = %call_id,
548                method = %method,
549                "HTTP connector: unsupported method (streaming)"
550            );
551            return Err(host_result_err(
552                call_id,
553                HostCallErrorCode::InvalidRequest,
554                format!("Unsupported HTTP connector method: '{method}'. Use 'http'."),
555                None,
556            ));
557        }
558
559        let mut request = match self.parse_request(&call.params) {
560            Ok(req) => req,
561            Err((code, message)) => {
562                warn!(
563                    call_id = %call_id,
564                    error = %message,
565                    "HTTP connector: invalid request (streaming)"
566                );
567                return Err(host_result_err(call_id, code, message, None));
568            }
569        };
570
571        // Prefer explicit per-request timeout in params, otherwise fall back to host_call.timeout_ms.
572        if request.timeout_ms.is_none() {
573            request.timeout_ms = call.timeout_ms.filter(|ms| *ms > 0);
574        }
575
576        let log_url = Self::redact_url_for_log(&request.url);
577        if let Err((code, message)) = self.validate_url(&request.url) {
578            info!(
579                call_id = %call_id,
580                url = %log_url,
581                error = %message,
582                "HTTP connector: policy denied (streaming)"
583            );
584            return Err(host_result_err(call_id, code, message, None));
585        }
586
587        debug!(
588            call_id = %call_id,
589            url = %log_url,
590            method = %request.method,
591            "HTTP connector: executing request (streaming)"
592        );
593
594        let timeout_ms = request.timeout_ms.unwrap_or(self.config.default_timeout_ms);
595        let (response, duration_ms) = match self
596            .dispatch_request_streaming_head(call_id, &request, timeout_ms, &log_url)
597            .await
598        {
599            Ok(res) => res,
600            Err(payload) => return Err(payload),
601        };
602
603        let status = response.status();
604        let response_headers: Vec<(String, String)> = response.headers().to_vec();
605
606        let mut headers_map = HashMap::new();
607        for (key, value) in response_headers {
608            headers_map.insert(key, value);
609        }
610
611        info!(
612            call_id = %call_id,
613            url = %log_url,
614            status = status,
615            duration_ms = duration_ms,
616            "HTTP connector: streaming response head received"
617        );
618
619        Ok(StreamingHttpResponse {
620            status,
621            headers: headers_map,
622            stream: response.bytes_stream(),
623        })
624    }
625
626    #[allow(clippy::future_not_send)]
627    async fn dispatch_request_streaming_head(
628        &self,
629        call_id: &str,
630        request: &HttpRequest,
631        timeout_ms: u64,
632        log_url: &str,
633    ) -> std::result::Result<(crate::http::client::Response, u64), HostResultPayload> {
634        let start = Instant::now();
635        let builder = match self.build_streaming_request_builder(call_id, request, timeout_ms) {
636            Ok(builder) => builder,
637            Err(payload) => return Err(*payload),
638        };
639        let send_fut = builder.send();
640        let result = if timeout_ms == 0 {
641            Ok(send_fut.await)
642        } else {
643            timeout(
644                wall_now(),
645                Duration::from_millis(timeout_ms),
646                Box::pin(send_fut),
647            )
648            .await
649        };
650
651        match result {
652            Ok(Ok(response)) => {
653                let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
654                Ok((response, duration_ms))
655            }
656            Ok(Err(err)) => {
657                if timeout_ms > 0 && start.elapsed() >= Duration::from_millis(timeout_ms) {
658                    let message = format!("Request timeout after {timeout_ms}ms");
659                    warn!(
660                        call_id = %call_id,
661                        url = %log_url,
662                        error = %message,
663                        "HTTP connector: request timed out (streaming)"
664                    );
665
666                    return Err(host_result_err_with_details(
667                        call_id,
668                        HostCallErrorCode::Timeout,
669                        &message,
670                        Self::request_details(request, timeout_ms),
671                        Some(true),
672                    ));
673                }
674
675                let message = err.to_string();
676                let code = match err {
677                    crate::error::Error::Validation(_) => HostCallErrorCode::InvalidRequest,
678                    _ => HostCallErrorCode::Io,
679                };
680
681                warn!(
682                    call_id = %call_id,
683                    url = %log_url,
684                    error = %message,
685                    "HTTP connector: request failed (streaming)"
686                );
687
688                Err(host_result_err_with_details(
689                    call_id,
690                    code,
691                    &message,
692                    Self::request_details(request, timeout_ms),
693                    Some(false),
694                ))
695            }
696            Err(_) => {
697                let message = format!("Request timeout after {timeout_ms}ms");
698                warn!(
699                    call_id = %call_id,
700                    url = %log_url,
701                    error = %message,
702                    "HTTP connector: request timed out (streaming)"
703                );
704
705                Err(host_result_err_with_details(
706                    call_id,
707                    HostCallErrorCode::Timeout,
708                    &message,
709                    Self::request_details(request, timeout_ms),
710                    Some(true),
711                ))
712            }
713        }
714    }
715
716    fn build_streaming_request_builder<'a>(
717        &'a self,
718        call_id: &str,
719        request: &HttpRequest,
720        timeout_ms: u64,
721    ) -> std::result::Result<crate::http::client::RequestBuilder<'a>, Box<HostResultPayload>> {
722        let method_upper = request.method.to_ascii_uppercase();
723        let mut builder = match method_upper.as_str() {
724            "GET" => self.client.get(&request.url),
725            "POST" => self.client.post(&request.url),
726            _ => {
727                return Err(Box::new(host_result_err_with_details(
728                    call_id,
729                    HostCallErrorCode::InvalidRequest,
730                    format!(
731                        "Invalid HTTP method: '{}'. Supported methods: GET, POST.",
732                        request.method
733                    ),
734                    Self::request_details(request, timeout_ms),
735                    Some(false),
736                )));
737            }
738        };
739
740        for (key, value) in &request.headers {
741            builder = builder.header(key, value);
742        }
743
744        if let Some(body) = &request.body {
745            builder = builder.body(body.as_bytes().to_vec());
746        } else if let Some(body_bytes) = &request.body_bytes {
747            use base64::Engine;
748            let decoded = match base64::engine::general_purpose::STANDARD.decode(body_bytes) {
749                Ok(decoded) => decoded,
750                Err(err) => {
751                    return Err(Box::new(host_result_err_with_details(
752                        call_id,
753                        HostCallErrorCode::InvalidRequest,
754                        format!("Invalid base64 body: {err}"),
755                        Self::request_details(request, timeout_ms),
756                        Some(false),
757                    )));
758                }
759            };
760            builder = builder.body(decoded);
761        }
762
763        Ok(builder)
764    }
765}
766
767#[async_trait]
768impl Connector for HttpConnector {
769    fn capability(&self) -> &'static str {
770        "http"
771    }
772
773    #[allow(clippy::too_many_lines)]
774    async fn dispatch(&self, call: &HostCallPayload) -> Result<HostResultPayload> {
775        let call_id = &call.call_id;
776        let method = call.method.to_ascii_lowercase();
777
778        // Protocol expects connector method name "http".
779        if method != "http" {
780            warn!(
781                call_id = %call_id,
782                method = %method,
783                "HTTP connector: unsupported method"
784            );
785            return Ok(host_result_err(
786                call_id,
787                HostCallErrorCode::InvalidRequest,
788                format!("Unsupported HTTP connector method: '{method}'. Use 'http'."),
789                None,
790            ));
791        }
792
793        // Parse request
794        let mut request = match self.parse_request(&call.params) {
795            Ok(req) => req,
796            Err((code, message)) => {
797                warn!(
798                    call_id = %call_id,
799                    error = %message,
800                    "HTTP connector: invalid request"
801                );
802                return Ok(host_result_err(call_id, code, message, None));
803            }
804        };
805
806        // Prefer explicit per-request timeout in params, otherwise fall back to host_call.timeout_ms.
807        if request.timeout_ms.is_none() {
808            request.timeout_ms = call.timeout_ms.filter(|ms| *ms > 0);
809        }
810
811        Ok(self.dispatch_request(call_id, request).await)
812    }
813}
814
815#[cfg(test)]
816mod tests {
817    use super::*;
818    use std::future::Future;
819    use std::net::TcpListener;
820    use std::sync::mpsc;
821    use std::thread;
822
823    fn run_async<T, Fut>(future: Fut) -> T
824    where
825        Fut: Future<Output = T> + Send + 'static,
826        T: Send + 'static,
827    {
828        let runtime = asupersync::runtime::RuntimeBuilder::current_thread()
829            .build()
830            .expect("build asupersync runtime");
831        let join = runtime.handle().spawn(future);
832        runtime.block_on(join)
833    }
834
835    #[test]
836    fn test_default_config() {
837        let config = HttpConnectorConfig::default();
838        assert!(config.require_tls);
839        assert_eq!(config.max_request_bytes, 10 * 1024 * 1024);
840        assert_eq!(config.max_response_bytes, 50 * 1024 * 1024);
841        assert_eq!(config.default_timeout_ms, 30_000);
842        assert!(config.allowlist.is_empty());
843        assert!(config.denylist.is_empty());
844        assert!(!config.enforce_allowlist);
845    }
846
847    #[test]
848    fn test_url_validation_tls_required() {
849        let connector = HttpConnector::new(HttpConnectorConfig {
850            require_tls: true,
851            ..Default::default()
852        });
853
854        // HTTPS should pass
855        assert!(connector.validate_url("https://example.com").is_ok());
856
857        // HTTP should fail when TLS required
858        let result = connector.validate_url("http://example.com");
859        assert!(result.is_err());
860        let (code, _) = result.unwrap_err();
861        assert_eq!(code, HostCallErrorCode::Denied);
862    }
863
864    #[test]
865    fn test_url_validation_tls_not_required() {
866        let connector = HttpConnector::new(HttpConnectorConfig {
867            require_tls: false,
868            ..Default::default()
869        });
870
871        // Both should pass
872        assert!(connector.validate_url("https://example.com").is_ok());
873        assert!(connector.validate_url("http://example.com").is_ok());
874    }
875
876    #[test]
877    fn test_url_validation_allowlist() {
878        let connector = HttpConnector::new(HttpConnectorConfig {
879            require_tls: false,
880            allowlist: vec!["api.example.com".to_string(), "*.github.com".to_string()],
881            ..Default::default()
882        });
883
884        // Exact match should pass
885        assert!(
886            connector
887                .validate_url("http://api.example.com/path")
888                .is_ok()
889        );
890
891        // Wildcard match should pass
892        assert!(connector.validate_url("http://api.github.com/path").is_ok());
893        assert!(connector.validate_url("http://raw.github.com/path").is_ok());
894
895        // Non-matching should fail
896        let result = connector.validate_url("http://other.com/path");
897        assert!(result.is_err());
898        let (code, _) = result.unwrap_err();
899        assert_eq!(code, HostCallErrorCode::Denied);
900    }
901
902    #[test]
903    fn test_url_validation_enforced_allowlist_denies_when_empty() {
904        let connector = HttpConnector::new(HttpConnectorConfig {
905            require_tls: false,
906            enforce_allowlist: true,
907            ..Default::default()
908        });
909
910        let result = connector.validate_url("http://example.com/path");
911        assert!(result.is_err());
912        let (code, msg) = result.unwrap_err();
913        assert_eq!(code, HostCallErrorCode::Denied);
914        assert!(msg.contains("allowlist"), "{msg}");
915    }
916
917    #[test]
918    fn test_url_validation_denylist() {
919        let connector = HttpConnector::new(HttpConnectorConfig {
920            require_tls: false,
921            denylist: vec!["evil.com".to_string(), "*.malware.net".to_string()],
922            ..Default::default()
923        });
924
925        // Non-denied should pass
926        assert!(connector.validate_url("http://example.com/path").is_ok());
927
928        // Exact deny match should fail
929        let result = connector.validate_url("http://evil.com/path");
930        assert!(result.is_err());
931        let (code, _) = result.unwrap_err();
932        assert_eq!(code, HostCallErrorCode::Denied);
933
934        // Wildcard deny match should fail
935        let result = connector.validate_url("http://api.malware.net/path");
936        assert!(result.is_err());
937    }
938
939    #[test]
940    fn test_url_validation_denylist_precedence() {
941        let connector = HttpConnector::new(HttpConnectorConfig {
942            require_tls: false,
943            allowlist: vec!["*.example.com".to_string()],
944            denylist: vec!["evil.example.com".to_string()],
945            ..Default::default()
946        });
947
948        // Allowed subdomain should pass
949        assert!(
950            connector
951                .validate_url("http://api.example.com/path")
952                .is_ok()
953        );
954
955        // Denied subdomain should fail (denylist takes precedence)
956        let result = connector.validate_url("http://evil.example.com/path");
957        assert!(result.is_err());
958        let (code, _) = result.unwrap_err();
959        assert_eq!(code, HostCallErrorCode::Denied);
960    }
961
962    #[test]
963    fn test_pattern_matching() {
964        let wildcard_patterns = vec!["*.example.com".to_string()];
965
966        // Test wildcard patterns
967        assert!(HttpConnector::matches_pattern_list(
968            "api.example.com",
969            &wildcard_patterns
970        ));
971        assert!(HttpConnector::matches_pattern_list(
972            "sub.api.example.com",
973            &wildcard_patterns
974        ));
975        assert!(HttpConnector::matches_pattern_list(
976            "example.com",
977            &wildcard_patterns
978        ));
979
980        // Test exact patterns
981        let exact_patterns = vec!["example.com".to_string()];
982        assert!(HttpConnector::matches_pattern_list(
983            "example.com",
984            &exact_patterns
985        ));
986        assert!(!HttpConnector::matches_pattern_list(
987            "api.example.com",
988            &exact_patterns
989        ));
990
991        // Test case insensitivity
992        assert!(HttpConnector::matches_pattern_list(
993            "API.Example.COM",
994            &wildcard_patterns
995        ));
996    }
997
998    #[test]
999    fn test_parse_request_valid() {
1000        let connector = HttpConnector::with_defaults();
1001
1002        let params = json!({
1003            "url": "https://api.example.com/data",
1004            "method": "POST",
1005            "headers": {"Content-Type": "application/json"},
1006            "body": "{\"key\": \"value\"}"
1007        });
1008
1009        let request = connector.parse_request(&params).unwrap();
1010        assert_eq!(request.url, "https://api.example.com/data");
1011        assert_eq!(request.method, "POST");
1012        assert_eq!(
1013            request.headers.get("Content-Type").unwrap(),
1014            "application/json"
1015        );
1016        assert_eq!(request.body.as_ref().unwrap(), "{\"key\": \"value\"}");
1017    }
1018
1019    #[test]
1020    fn test_parse_request_invalid_method() {
1021        let connector = HttpConnector::with_defaults();
1022
1023        let params = json!({
1024            "url": "https://api.example.com/data",
1025            "method": "INVALID"
1026        });
1027
1028        let result = connector.parse_request(&params);
1029        assert!(result.is_err());
1030        let (code, _) = result.unwrap_err();
1031        assert_eq!(code, HostCallErrorCode::InvalidRequest);
1032    }
1033
1034    #[test]
1035    fn test_parse_request_body_too_large() {
1036        let connector = HttpConnector::new(HttpConnectorConfig {
1037            max_request_bytes: 100,
1038            ..Default::default()
1039        });
1040
1041        let large_body = "x".repeat(200);
1042        let params = json!({
1043            "url": "https://api.example.com/data",
1044            "method": "POST",
1045            "body": large_body
1046        });
1047
1048        let result = connector.parse_request(&params);
1049        assert!(result.is_err());
1050        let (code, _) = result.unwrap_err();
1051        assert_eq!(code, HostCallErrorCode::InvalidRequest);
1052    }
1053
1054    #[test]
1055    fn test_parse_request_rejects_both_body_and_body_bytes() {
1056        let connector = HttpConnector::with_defaults();
1057
1058        let params = json!({
1059            "url": "https://api.example.com/data",
1060            "method": "POST",
1061            "body": "{\"key\": \"value\"}",
1062            "body_bytes": "eyJrZXkiOiAidmFsdWUifQ=="
1063        });
1064
1065        let result = connector.parse_request(&params);
1066        assert!(result.is_err());
1067        let (code, message) = result.unwrap_err();
1068        assert_eq!(code, HostCallErrorCode::InvalidRequest);
1069        assert!(
1070            message.contains("not both"),
1071            "expected ambiguity error, got: {message}"
1072        );
1073    }
1074
1075    #[test]
1076    fn test_config_serialization() {
1077        let config = HttpConnectorConfig {
1078            allowlist: vec!["*.example.com".to_string()],
1079            denylist: vec!["evil.com".to_string()],
1080            enforce_allowlist: true,
1081            require_tls: true,
1082            max_request_bytes: 1024,
1083            max_response_bytes: 2048,
1084            default_timeout_ms: 5000,
1085        };
1086
1087        let json = serde_json::to_string(&config).unwrap();
1088        let parsed: HttpConnectorConfig = serde_json::from_str(&json).unwrap();
1089
1090        assert_eq!(parsed.allowlist, config.allowlist);
1091        assert_eq!(parsed.denylist, config.denylist);
1092        assert_eq!(parsed.enforce_allowlist, config.enforce_allowlist);
1093        assert_eq!(parsed.require_tls, config.require_tls);
1094        assert_eq!(parsed.max_request_bytes, config.max_request_bytes);
1095        assert_eq!(parsed.max_response_bytes, config.max_response_bytes);
1096        assert_eq!(parsed.default_timeout_ms, config.default_timeout_ms);
1097    }
1098
1099    #[test]
1100    fn test_dispatch_denied_host_returns_deterministic_error() {
1101        let connector = HttpConnector::new(HttpConnectorConfig {
1102            require_tls: false,
1103            allowlist: vec!["allowed.example".to_string()],
1104            ..Default::default()
1105        });
1106
1107        let call = HostCallPayload {
1108            call_id: "call-1".to_string(),
1109            capability: "http".to_string(),
1110            method: "http".to_string(),
1111            params: json!({
1112                "url": "http://denied.example/test",
1113                "method": "GET",
1114            }),
1115            timeout_ms: None,
1116            cancel_token: None,
1117            context: None,
1118        };
1119
1120        let result = run_async(async move { connector.dispatch(&call).await.unwrap() });
1121        assert!(result.is_error);
1122        let error = result.error.expect("error payload");
1123        assert_eq!(error.code, HostCallErrorCode::Denied);
1124    }
1125
1126    #[test]
1127    fn test_dispatch_timeout_returns_timeout_error_code() {
1128        let listener = TcpListener::bind("127.0.0.1:0").expect("bind test listener");
1129        let addr = listener.local_addr().expect("listener addr");
1130
1131        let (ready_tx, ready_rx) = mpsc::channel();
1132        let (shutdown_tx, shutdown_rx) = mpsc::channel();
1133        let join = thread::spawn(move || {
1134            let _ = ready_tx.send(());
1135            let (_stream, _peer) = listener.accept().expect("accept");
1136            let _ = shutdown_rx.recv_timeout(std::time::Duration::from_millis(500));
1137        });
1138        let _ = ready_rx.recv();
1139
1140        let connector = HttpConnector::new(HttpConnectorConfig {
1141            require_tls: false,
1142            default_timeout_ms: 100,
1143            ..Default::default()
1144        });
1145
1146        let call = HostCallPayload {
1147            call_id: "call-1".to_string(),
1148            capability: "http".to_string(),
1149            method: "http".to_string(),
1150            params: json!({
1151                "url": format!("http://{addr}/"),
1152                "method": "GET",
1153                "timeout_ms": 100,
1154            }),
1155            timeout_ms: None,
1156            cancel_token: None,
1157            context: None,
1158        };
1159
1160        let result = run_async(async move { connector.dispatch(&call).await.unwrap() });
1161        assert!(result.is_error);
1162        let error = result.error.expect("error payload");
1163        assert_eq!(error.code, HostCallErrorCode::Timeout);
1164        assert_eq!(error.retryable, Some(true));
1165
1166        let _ = shutdown_tx.send(());
1167        let _ = join.join();
1168    }
1169
1170    #[test]
1171    fn test_dispatch_uses_call_timeout_ms_when_request_timeout_absent() {
1172        let listener = TcpListener::bind("127.0.0.1:0").expect("bind test listener");
1173        let addr = listener.local_addr().expect("listener addr");
1174
1175        let (ready_tx, ready_rx) = mpsc::channel();
1176        let (shutdown_tx, shutdown_rx) = mpsc::channel();
1177        let join = thread::spawn(move || {
1178            let _ = ready_tx.send(());
1179            let (_stream, _peer) = listener.accept().expect("accept");
1180            let _ = shutdown_rx.recv_timeout(std::time::Duration::from_millis(500));
1181        });
1182        let _ = ready_rx.recv();
1183
1184        // Default timeout is large; call.timeout_ms should take precedence when params omit it.
1185        let connector = HttpConnector::new(HttpConnectorConfig {
1186            require_tls: false,
1187            default_timeout_ms: 5000,
1188            ..Default::default()
1189        });
1190
1191        let call = HostCallPayload {
1192            call_id: "call-1".to_string(),
1193            capability: "http".to_string(),
1194            method: "http".to_string(),
1195            params: json!({
1196                "url": format!("http://{addr}/"),
1197                "method": "GET",
1198            }),
1199            timeout_ms: Some(100),
1200            cancel_token: None,
1201            context: None,
1202        };
1203
1204        let result = run_async(async move { connector.dispatch(&call).await.unwrap() });
1205        assert!(result.is_error);
1206        let error = result.error.expect("error payload");
1207        assert!(
1208            error.code == HostCallErrorCode::Timeout,
1209            "expected timeout, got {:?} (details={:?})",
1210            error.code,
1211            error.details
1212        );
1213
1214        let _ = shutdown_tx.send(());
1215        let _ = join.join();
1216    }
1217
1218    #[test]
1219    #[cfg(unix)]
1220    fn test_dispatch_treats_zero_timeout_as_unset() {
1221        use std::io::Write;
1222
1223        let listener = TcpListener::bind("127.0.0.1:0").expect("bind test listener");
1224        let addr = listener.local_addr().expect("listener addr");
1225
1226        let join = thread::spawn(move || {
1227            let (mut stream, _peer) = listener.accept().expect("accept");
1228            let body = "hello";
1229            let response = format!(
1230                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
1231                body.len()
1232            );
1233            stream
1234                .write_all(response.as_bytes())
1235                .expect("write response");
1236        });
1237
1238        let connector = HttpConnector::new(HttpConnectorConfig {
1239            require_tls: false,
1240            default_timeout_ms: 5000,
1241            ..Default::default()
1242        });
1243
1244        let call = HostCallPayload {
1245            call_id: "call-1".to_string(),
1246            capability: "http".to_string(),
1247            method: "http".to_string(),
1248            params: json!({
1249                "url": format!("http://{addr}/"),
1250                "method": "GET",
1251                "timeout_ms": 0,
1252            }),
1253            timeout_ms: None,
1254            cancel_token: None,
1255            context: None,
1256        };
1257
1258        let result = run_async(async move { connector.dispatch(&call).await.unwrap() });
1259        assert!(!result.is_error);
1260        assert_eq!(
1261            result.output.get("status").and_then(Value::as_u64),
1262            Some(200)
1263        );
1264        assert_eq!(
1265            result.output.get("body").and_then(Value::as_str),
1266            Some("hello")
1267        );
1268
1269        let _ = join.join();
1270    }
1271
1272    #[test]
1273    #[cfg(unix)]
1274    fn test_dispatch_streaming_returns_status_headers_and_body_stream() {
1275        use futures::StreamExt as _;
1276        use std::io::Write;
1277
1278        let listener = TcpListener::bind("127.0.0.1:0").expect("bind test listener");
1279        let addr = listener.local_addr().expect("listener addr");
1280
1281        let join = thread::spawn(move || {
1282            let (mut stream, _peer) = listener.accept().expect("accept");
1283            let body = "hello-stream";
1284            let response = format!(
1285                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\n{body}",
1286                body.len()
1287            );
1288            stream
1289                .write_all(response.as_bytes())
1290                .expect("write response");
1291        });
1292
1293        let connector = HttpConnector::new(HttpConnectorConfig {
1294            require_tls: false,
1295            default_timeout_ms: 5000,
1296            ..Default::default()
1297        });
1298
1299        let call = HostCallPayload {
1300            call_id: "call-1".to_string(),
1301            capability: "http".to_string(),
1302            method: "http".to_string(),
1303            params: json!({
1304                "url": format!("http://{addr}/"),
1305                "method": "GET",
1306                "timeout_ms": 1000,
1307            }),
1308            timeout_ms: None,
1309            cancel_token: None,
1310            context: None,
1311        };
1312
1313        let (status, headers, body) = run_async(async move {
1314            let response = connector
1315                .dispatch_streaming(&call)
1316                .await
1317                .expect("dispatch_streaming ok");
1318
1319            let mut bytes = Vec::new();
1320            let mut stream = response.stream;
1321            while let Some(chunk) = stream.next().await {
1322                let chunk = chunk.expect("stream chunk");
1323                bytes.extend_from_slice(&chunk);
1324            }
1325
1326            (response.status, response.headers, bytes)
1327        });
1328
1329        assert_eq!(status, 200);
1330        assert_eq!(
1331            headers
1332                .get("Content-Type")
1333                .or_else(|| headers.get("content-type"))
1334                .map(String::as_str),
1335            Some("text/plain")
1336        );
1337        assert_eq!(String::from_utf8_lossy(&body), "hello-stream");
1338
1339        let _ = join.join();
1340    }
1341
1342    #[test]
1343    fn http_connector_redact_url_for_log_strips_sensitive_parts() {
1344        let redacted =
1345            HttpConnector::redact_url_for_log("http://user:pass@denied.example/test?q=hello#frag");
1346        assert!(redacted.contains("http://denied.example/test"));
1347        assert!(!redacted.contains("q=hello"));
1348        assert!(!redacted.contains("#frag"));
1349        assert!(!redacted.contains("user"));
1350        assert!(!redacted.contains("pass"));
1351    }
1352
1353    #[test]
1354    fn http_connector_redact_url_for_log_falls_back_for_invalid_urls() {
1355        let redacted = HttpConnector::redact_url_for_log("not a url?q=hello#frag");
1356        assert_eq!(redacted, "not a url");
1357    }
1358}