1use super::{
10 Connector, HostCallErrorCode, HostCallPayload, HostResultPayload, host_result_err,
11 host_result_err_with_details, host_result_ok,
12};
13use crate::error::Result;
14use crate::http::client::Client;
15use asupersync::time::{timeout, wall_now};
16use async_trait::async_trait;
17use futures::Stream;
18use futures::StreamExt;
19use serde::{Deserialize, Serialize};
20use serde_json::{Value, json};
21use std::collections::HashMap;
22use std::pin::Pin;
23use std::time::{Duration, Instant};
24use tracing::{debug, info, warn};
25
26type ValidationError = (HostCallErrorCode, String);
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct HttpConnectorConfig {
32 #[serde(default)]
34 pub allowlist: Vec<String>,
35
36 #[serde(default)]
38 pub denylist: Vec<String>,
39
40 #[serde(default)]
42 pub enforce_allowlist: bool,
43
44 #[serde(default = "default_require_tls")]
46 pub require_tls: bool,
47
48 #[serde(default = "default_max_request_bytes")]
50 pub max_request_bytes: usize,
51
52 #[serde(default = "default_max_response_bytes")]
54 pub max_response_bytes: usize,
55
56 #[serde(default = "default_timeout_ms")]
58 pub default_timeout_ms: u64,
59}
60
61const fn default_require_tls() -> bool {
62 true
63}
64
65const fn default_max_request_bytes() -> usize {
66 10 * 1024 * 1024 }
68
69const fn default_max_response_bytes() -> usize {
70 50 * 1024 * 1024 }
72
73const fn default_timeout_ms() -> u64 {
74 30_000 }
76
77impl Default for HttpConnectorConfig {
78 fn default() -> Self {
79 Self {
80 allowlist: Vec::new(),
81 denylist: Vec::new(),
82 enforce_allowlist: false,
83 require_tls: default_require_tls(),
84 max_request_bytes: default_max_request_bytes(),
85 max_response_bytes: default_max_response_bytes(),
86 default_timeout_ms: default_timeout_ms(),
87 }
88 }
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct HttpRequest {
94 pub url: String,
96
97 #[serde(default = "default_method")]
99 pub method: String,
100
101 #[serde(default)]
103 pub headers: HashMap<String, String>,
104
105 #[serde(default, skip_serializing_if = "Option::is_none")]
107 pub body: Option<String>,
108
109 #[serde(default, skip_serializing_if = "Option::is_none")]
111 pub body_bytes: Option<String>,
112
113 #[serde(default, skip_serializing_if = "Option::is_none")]
115 pub timeout_ms: Option<u64>,
116}
117
118fn default_method() -> String {
119 "GET".to_string()
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct HttpResponse {
125 pub status: u16,
127
128 pub headers: HashMap<String, String>,
130
131 #[serde(default, skip_serializing_if = "Option::is_none")]
133 pub body: Option<String>,
134
135 #[serde(default, skip_serializing_if = "Option::is_none")]
137 pub body_bytes: Option<String>,
138
139 pub size_bytes: usize,
141
142 pub duration_ms: u64,
144}
145
146pub struct StreamingHttpResponse {
152 pub status: u16,
153 pub headers: HashMap<String, String>,
154 pub stream: Pin<Box<dyn Stream<Item = std::io::Result<Vec<u8>>> + Send>>,
155}
156
157pub struct HttpConnector {
159 config: HttpConnectorConfig,
160 client: Client,
161}
162
163impl HttpConnector {
164 #[must_use]
166 pub fn new(config: HttpConnectorConfig) -> Self {
167 Self {
168 config,
169 client: Client::new(),
170 }
171 }
172
173 #[must_use]
175 pub fn with_defaults() -> Self {
176 Self::new(HttpConnectorConfig::default())
177 }
178
179 fn validate_url(&self, url: &str) -> std::result::Result<(), ValidationError> {
181 let parsed = url::Url::parse(url).map_err(|e| {
183 (
184 HostCallErrorCode::InvalidRequest,
185 format!("Invalid URL: {e}"),
186 )
187 })?;
188
189 let scheme = parsed.scheme();
191 if scheme != "http" && scheme != "https" {
192 return Err((
193 HostCallErrorCode::InvalidRequest,
194 format!("Unsupported URL scheme: '{scheme}'"),
195 ));
196 }
197
198 if self.config.require_tls && scheme == "http" {
200 return Err((
201 HostCallErrorCode::Denied,
202 format!("TLS required: URL scheme must be 'https', got '{scheme}'"),
203 ));
204 }
205
206 let host = parsed.host_str().ok_or_else(|| {
208 (
209 HostCallErrorCode::InvalidRequest,
210 "URL missing host".to_string(),
211 )
212 })?;
213
214 if Self::matches_pattern_list(host, &self.config.denylist) {
216 return Err((
217 HostCallErrorCode::Denied,
218 format!("Host '{host}' is in denylist"),
219 ));
220 }
221
222 let requires_allowlist = self.config.enforce_allowlist || !self.config.allowlist.is_empty();
224 if requires_allowlist && !Self::matches_pattern_list(host, &self.config.allowlist) {
225 return Err((
226 HostCallErrorCode::Denied,
227 format!(
228 "Host '{host}' is not in allowlist; declare capability_manifest scope.hosts for http capability"
229 ),
230 ));
231 }
232
233 Ok(())
234 }
235
236 fn matches_pattern_list(host: &str, patterns: &[String]) -> bool {
238 let host_lower = host.to_ascii_lowercase();
239 patterns.iter().any(|pattern| {
240 let pattern_lower = pattern.to_ascii_lowercase();
241 pattern_lower.strip_prefix("*.").map_or_else(
242 || host_lower == pattern_lower,
243 |domain| {
244 let suffix = pattern_lower.strip_prefix('*').unwrap_or(""); host_lower.ends_with(suffix) || host_lower == domain
247 },
248 )
249 })
250 }
251
252 fn parse_request(&self, params: &Value) -> std::result::Result<HttpRequest, ValidationError> {
254 let mut request: HttpRequest = serde_json::from_value(params.clone()).map_err(|e| {
255 (
256 HostCallErrorCode::InvalidRequest,
257 format!("Invalid HTTP request params: {e}"),
258 )
259 })?;
260
261 if request.body.is_some() && request.body_bytes.is_some() {
262 return Err((
263 HostCallErrorCode::InvalidRequest,
264 "Request must specify either 'body' or 'body_bytes', not both".to_string(),
265 ));
266 }
267
268 let method_upper = request.method.to_ascii_uppercase();
270 if !matches!(method_upper.as_str(), "GET" | "POST") {
271 return Err((
272 HostCallErrorCode::InvalidRequest,
273 format!(
274 "Invalid HTTP method: '{}'. Supported methods: GET, POST.",
275 request.method
276 ),
277 ));
278 }
279
280 request.timeout_ms = request.timeout_ms.filter(|ms| *ms > 0);
282
283 let body_size = request
285 .body
286 .as_ref()
287 .map(String::len)
288 .or_else(|| {
289 request.body_bytes.as_ref().map(|b| b.len() * 3 / 4) })
291 .unwrap_or(0);
292
293 if body_size > self.config.max_request_bytes {
294 return Err((
295 HostCallErrorCode::InvalidRequest,
296 format!(
297 "Request body too large: {} bytes (max: {} bytes)",
298 body_size, self.config.max_request_bytes
299 ),
300 ));
301 }
302
303 if method_upper == "GET" && (request.body.is_some() || request.body_bytes.is_some()) {
304 return Err((
305 HostCallErrorCode::InvalidRequest,
306 "GET requests cannot include a body".to_string(),
307 ));
308 }
309
310 Ok(request)
311 }
312
313 async fn execute_request(&self, request: &HttpRequest) -> Result<HttpResponse> {
315 let start = Instant::now();
316
317 let method_upper = request.method.to_ascii_uppercase();
319 let mut builder = match method_upper.as_str() {
320 "GET" => self.client.get(&request.url),
321 "POST" => self.client.post(&request.url),
322 _ => {
323 return Err(crate::error::Error::validation(format!(
324 "Invalid HTTP method: '{}'. Supported methods: GET, POST.",
325 request.method
326 )));
327 }
328 };
329
330 for (key, value) in &request.headers {
332 builder = builder.header(key, value);
333 }
334
335 if let Some(body) = &request.body {
337 builder = builder.body(body.as_bytes().to_vec());
338 } else if let Some(body_bytes) = &request.body_bytes {
339 use base64::Engine;
340 let decoded = base64::engine::general_purpose::STANDARD
341 .decode(body_bytes)
342 .map_err(|e| {
343 crate::error::Error::validation(format!("Invalid base64 body: {e}"))
344 })?;
345 builder = builder.body(decoded);
346 }
347
348 let response = builder
350 .send()
351 .await
352 .map_err(|e| crate::error::Error::extension(format!("HTTP request failed: {e}")))?;
353
354 let status = response.status();
356 let response_headers: Vec<(String, String)> = response.headers().to_vec();
357
358 let mut body_bytes_vec = Vec::new();
359 let mut stream = response.bytes_stream();
360
361 while let Some(chunk_result) = stream.next().await {
362 let chunk: Vec<u8> = chunk_result
363 .map_err(|e| crate::error::Error::extension(format!("Read error: {e}")))?;
364 if body_bytes_vec.len() + chunk.len() > self.config.max_response_bytes {
365 return Err(crate::error::Error::extension(format!(
366 "Response body too large (max: {} bytes)",
367 self.config.max_response_bytes
368 )));
369 }
370 body_bytes_vec.extend_from_slice(&chunk);
371 }
372
373 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
374 let size_bytes = body_bytes_vec.len();
375
376 let mut headers_map = HashMap::new();
378 for (key, value) in response_headers {
379 headers_map.insert(key, value);
380 }
381
382 let (body, body_bytes_b64) = String::from_utf8(body_bytes_vec).map_or_else(
384 |err| {
385 use base64::Engine;
386 let encoded = base64::engine::general_purpose::STANDARD.encode(err.into_bytes());
387 (None, Some(encoded))
388 },
389 |s| (Some(s), None),
390 );
391
392 Ok(HttpResponse {
393 status,
394 headers: headers_map,
395 body,
396 body_bytes: body_bytes_b64,
397 size_bytes,
398 duration_ms,
399 })
400 }
401
402 fn request_details(request: &HttpRequest, timeout_ms: u64) -> Value {
403 json!({
404 "url": request.url,
405 "method": request.method,
406 "timeout_ms": timeout_ms,
407 })
408 }
409
410 fn redact_url_for_log(url: &str) -> String {
411 url::Url::parse(url).map_or_else(
412 |_| url.split(['?', '#']).next().unwrap_or(url).to_string(),
413 |mut parsed| {
414 parsed.set_query(None);
415 parsed.set_fragment(None);
416 let _ = parsed.set_username("");
417 let _ = parsed.set_password(None);
418 parsed.to_string()
419 },
420 )
421 }
422
423 async fn dispatch_request(&self, call_id: &str, request: HttpRequest) -> HostResultPayload {
424 let log_url = Self::redact_url_for_log(&request.url);
425 if let Err((code, message)) = self.validate_url(&request.url) {
427 info!(
428 call_id = %call_id,
429 url = %log_url,
430 error = %message,
431 "HTTP connector: policy denied"
432 );
433 return host_result_err(call_id, code, message, None);
434 }
435
436 debug!(
438 call_id = %call_id,
439 url = %log_url,
440 method = %request.method,
441 "HTTP connector: executing request"
442 );
443
444 let timeout_ms = request.timeout_ms.unwrap_or(self.config.default_timeout_ms);
446 let start = Instant::now();
447 let result = if timeout_ms == 0 {
448 Ok(self.execute_request(&request).await)
449 } else {
450 timeout(
451 wall_now(),
452 Duration::from_millis(timeout_ms),
453 Box::pin(self.execute_request(&request)),
454 )
455 .await
456 };
457
458 match result {
459 Ok(Ok(response)) => {
460 info!(
461 call_id = %call_id,
462 url = %log_url,
463 status = %response.status,
464 size_bytes = %response.size_bytes,
465 duration_ms = %response.duration_ms,
466 "HTTP connector: request completed"
467 );
468
469 let output = serde_json::to_value(&response)
470 .unwrap_or_else(|_| json!({"error": "serialization_failed"}));
471
472 host_result_ok(call_id, output)
473 }
474 Ok(Err(e)) => {
475 if timeout_ms > 0 && start.elapsed() >= Duration::from_millis(timeout_ms) {
476 let message = format!("Request timeout after {timeout_ms}ms");
477 warn!(
478 call_id = %call_id,
479 url = %log_url,
480 error = %message,
481 "HTTP connector: request timed out"
482 );
483
484 return host_result_err_with_details(
485 call_id,
486 HostCallErrorCode::Timeout,
487 &message,
488 Self::request_details(&request, timeout_ms),
489 Some(true),
490 );
491 }
492
493 let message = e.to_string();
494 let code = match e {
495 crate::error::Error::Validation(_) => HostCallErrorCode::InvalidRequest,
496 _ => HostCallErrorCode::Io,
497 };
498
499 warn!(
500 call_id = %call_id,
501 url = %log_url,
502 error = %message,
503 "HTTP connector: request failed"
504 );
505
506 host_result_err_with_details(
507 call_id,
508 code,
509 &message,
510 Self::request_details(&request, timeout_ms),
511 Some(false),
512 )
513 }
514 Err(_) => {
515 let message = format!("Request timeout after {timeout_ms}ms");
516 warn!(
517 call_id = %call_id,
518 url = %log_url,
519 error = %message,
520 "HTTP connector: request timed out"
521 );
522
523 host_result_err_with_details(
524 call_id,
525 HostCallErrorCode::Timeout,
526 &message,
527 Self::request_details(&request, timeout_ms),
528 Some(true),
529 )
530 }
531 }
532 }
533
534 pub async fn dispatch_streaming(
539 &self,
540 call: &HostCallPayload,
541 ) -> std::result::Result<StreamingHttpResponse, HostResultPayload> {
542 let call_id = &call.call_id;
543 let method = call.method.to_ascii_lowercase();
544
545 if method != "http" {
546 warn!(
547 call_id = %call_id,
548 method = %method,
549 "HTTP connector: unsupported method (streaming)"
550 );
551 return Err(host_result_err(
552 call_id,
553 HostCallErrorCode::InvalidRequest,
554 format!("Unsupported HTTP connector method: '{method}'. Use 'http'."),
555 None,
556 ));
557 }
558
559 let mut request = match self.parse_request(&call.params) {
560 Ok(req) => req,
561 Err((code, message)) => {
562 warn!(
563 call_id = %call_id,
564 error = %message,
565 "HTTP connector: invalid request (streaming)"
566 );
567 return Err(host_result_err(call_id, code, message, None));
568 }
569 };
570
571 if request.timeout_ms.is_none() {
573 request.timeout_ms = call.timeout_ms.filter(|ms| *ms > 0);
574 }
575
576 let log_url = Self::redact_url_for_log(&request.url);
577 if let Err((code, message)) = self.validate_url(&request.url) {
578 info!(
579 call_id = %call_id,
580 url = %log_url,
581 error = %message,
582 "HTTP connector: policy denied (streaming)"
583 );
584 return Err(host_result_err(call_id, code, message, None));
585 }
586
587 debug!(
588 call_id = %call_id,
589 url = %log_url,
590 method = %request.method,
591 "HTTP connector: executing request (streaming)"
592 );
593
594 let timeout_ms = request.timeout_ms.unwrap_or(self.config.default_timeout_ms);
595 let (response, duration_ms) = match self
596 .dispatch_request_streaming_head(call_id, &request, timeout_ms, &log_url)
597 .await
598 {
599 Ok(res) => res,
600 Err(payload) => return Err(payload),
601 };
602
603 let status = response.status();
604 let response_headers: Vec<(String, String)> = response.headers().to_vec();
605
606 let mut headers_map = HashMap::new();
607 for (key, value) in response_headers {
608 headers_map.insert(key, value);
609 }
610
611 info!(
612 call_id = %call_id,
613 url = %log_url,
614 status = status,
615 duration_ms = duration_ms,
616 "HTTP connector: streaming response head received"
617 );
618
619 Ok(StreamingHttpResponse {
620 status,
621 headers: headers_map,
622 stream: response.bytes_stream(),
623 })
624 }
625
626 #[allow(clippy::future_not_send)]
627 async fn dispatch_request_streaming_head(
628 &self,
629 call_id: &str,
630 request: &HttpRequest,
631 timeout_ms: u64,
632 log_url: &str,
633 ) -> std::result::Result<(crate::http::client::Response, u64), HostResultPayload> {
634 let start = Instant::now();
635 let builder = match self.build_streaming_request_builder(call_id, request, timeout_ms) {
636 Ok(builder) => builder,
637 Err(payload) => return Err(*payload),
638 };
639 let send_fut = builder.send();
640 let result = if timeout_ms == 0 {
641 Ok(send_fut.await)
642 } else {
643 timeout(
644 wall_now(),
645 Duration::from_millis(timeout_ms),
646 Box::pin(send_fut),
647 )
648 .await
649 };
650
651 match result {
652 Ok(Ok(response)) => {
653 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
654 Ok((response, duration_ms))
655 }
656 Ok(Err(err)) => {
657 if timeout_ms > 0 && start.elapsed() >= Duration::from_millis(timeout_ms) {
658 let message = format!("Request timeout after {timeout_ms}ms");
659 warn!(
660 call_id = %call_id,
661 url = %log_url,
662 error = %message,
663 "HTTP connector: request timed out (streaming)"
664 );
665
666 return Err(host_result_err_with_details(
667 call_id,
668 HostCallErrorCode::Timeout,
669 &message,
670 Self::request_details(request, timeout_ms),
671 Some(true),
672 ));
673 }
674
675 let message = err.to_string();
676 let code = match err {
677 crate::error::Error::Validation(_) => HostCallErrorCode::InvalidRequest,
678 _ => HostCallErrorCode::Io,
679 };
680
681 warn!(
682 call_id = %call_id,
683 url = %log_url,
684 error = %message,
685 "HTTP connector: request failed (streaming)"
686 );
687
688 Err(host_result_err_with_details(
689 call_id,
690 code,
691 &message,
692 Self::request_details(request, timeout_ms),
693 Some(false),
694 ))
695 }
696 Err(_) => {
697 let message = format!("Request timeout after {timeout_ms}ms");
698 warn!(
699 call_id = %call_id,
700 url = %log_url,
701 error = %message,
702 "HTTP connector: request timed out (streaming)"
703 );
704
705 Err(host_result_err_with_details(
706 call_id,
707 HostCallErrorCode::Timeout,
708 &message,
709 Self::request_details(request, timeout_ms),
710 Some(true),
711 ))
712 }
713 }
714 }
715
716 fn build_streaming_request_builder<'a>(
717 &'a self,
718 call_id: &str,
719 request: &HttpRequest,
720 timeout_ms: u64,
721 ) -> std::result::Result<crate::http::client::RequestBuilder<'a>, Box<HostResultPayload>> {
722 let method_upper = request.method.to_ascii_uppercase();
723 let mut builder = match method_upper.as_str() {
724 "GET" => self.client.get(&request.url),
725 "POST" => self.client.post(&request.url),
726 _ => {
727 return Err(Box::new(host_result_err_with_details(
728 call_id,
729 HostCallErrorCode::InvalidRequest,
730 format!(
731 "Invalid HTTP method: '{}'. Supported methods: GET, POST.",
732 request.method
733 ),
734 Self::request_details(request, timeout_ms),
735 Some(false),
736 )));
737 }
738 };
739
740 for (key, value) in &request.headers {
741 builder = builder.header(key, value);
742 }
743
744 if let Some(body) = &request.body {
745 builder = builder.body(body.as_bytes().to_vec());
746 } else if let Some(body_bytes) = &request.body_bytes {
747 use base64::Engine;
748 let decoded = match base64::engine::general_purpose::STANDARD.decode(body_bytes) {
749 Ok(decoded) => decoded,
750 Err(err) => {
751 return Err(Box::new(host_result_err_with_details(
752 call_id,
753 HostCallErrorCode::InvalidRequest,
754 format!("Invalid base64 body: {err}"),
755 Self::request_details(request, timeout_ms),
756 Some(false),
757 )));
758 }
759 };
760 builder = builder.body(decoded);
761 }
762
763 Ok(builder)
764 }
765}
766
767#[async_trait]
768impl Connector for HttpConnector {
769 fn capability(&self) -> &'static str {
770 "http"
771 }
772
773 #[allow(clippy::too_many_lines)]
774 async fn dispatch(&self, call: &HostCallPayload) -> Result<HostResultPayload> {
775 let call_id = &call.call_id;
776 let method = call.method.to_ascii_lowercase();
777
778 if method != "http" {
780 warn!(
781 call_id = %call_id,
782 method = %method,
783 "HTTP connector: unsupported method"
784 );
785 return Ok(host_result_err(
786 call_id,
787 HostCallErrorCode::InvalidRequest,
788 format!("Unsupported HTTP connector method: '{method}'. Use 'http'."),
789 None,
790 ));
791 }
792
793 let mut request = match self.parse_request(&call.params) {
795 Ok(req) => req,
796 Err((code, message)) => {
797 warn!(
798 call_id = %call_id,
799 error = %message,
800 "HTTP connector: invalid request"
801 );
802 return Ok(host_result_err(call_id, code, message, None));
803 }
804 };
805
806 if request.timeout_ms.is_none() {
808 request.timeout_ms = call.timeout_ms.filter(|ms| *ms > 0);
809 }
810
811 Ok(self.dispatch_request(call_id, request).await)
812 }
813}
814
815#[cfg(test)]
816mod tests {
817 use super::*;
818 use std::future::Future;
819 use std::net::TcpListener;
820 use std::sync::mpsc;
821 use std::thread;
822
823 fn run_async<T, Fut>(future: Fut) -> T
824 where
825 Fut: Future<Output = T> + Send + 'static,
826 T: Send + 'static,
827 {
828 let runtime = asupersync::runtime::RuntimeBuilder::current_thread()
829 .build()
830 .expect("build asupersync runtime");
831 let join = runtime.handle().spawn(future);
832 runtime.block_on(join)
833 }
834
835 #[test]
836 fn test_default_config() {
837 let config = HttpConnectorConfig::default();
838 assert!(config.require_tls);
839 assert_eq!(config.max_request_bytes, 10 * 1024 * 1024);
840 assert_eq!(config.max_response_bytes, 50 * 1024 * 1024);
841 assert_eq!(config.default_timeout_ms, 30_000);
842 assert!(config.allowlist.is_empty());
843 assert!(config.denylist.is_empty());
844 assert!(!config.enforce_allowlist);
845 }
846
847 #[test]
848 fn test_url_validation_tls_required() {
849 let connector = HttpConnector::new(HttpConnectorConfig {
850 require_tls: true,
851 ..Default::default()
852 });
853
854 assert!(connector.validate_url("https://example.com").is_ok());
856
857 let result = connector.validate_url("http://example.com");
859 assert!(result.is_err());
860 let (code, _) = result.unwrap_err();
861 assert_eq!(code, HostCallErrorCode::Denied);
862 }
863
864 #[test]
865 fn test_url_validation_tls_not_required() {
866 let connector = HttpConnector::new(HttpConnectorConfig {
867 require_tls: false,
868 ..Default::default()
869 });
870
871 assert!(connector.validate_url("https://example.com").is_ok());
873 assert!(connector.validate_url("http://example.com").is_ok());
874 }
875
876 #[test]
877 fn test_url_validation_allowlist() {
878 let connector = HttpConnector::new(HttpConnectorConfig {
879 require_tls: false,
880 allowlist: vec!["api.example.com".to_string(), "*.github.com".to_string()],
881 ..Default::default()
882 });
883
884 assert!(
886 connector
887 .validate_url("http://api.example.com/path")
888 .is_ok()
889 );
890
891 assert!(connector.validate_url("http://api.github.com/path").is_ok());
893 assert!(connector.validate_url("http://raw.github.com/path").is_ok());
894
895 let result = connector.validate_url("http://other.com/path");
897 assert!(result.is_err());
898 let (code, _) = result.unwrap_err();
899 assert_eq!(code, HostCallErrorCode::Denied);
900 }
901
902 #[test]
903 fn test_url_validation_enforced_allowlist_denies_when_empty() {
904 let connector = HttpConnector::new(HttpConnectorConfig {
905 require_tls: false,
906 enforce_allowlist: true,
907 ..Default::default()
908 });
909
910 let result = connector.validate_url("http://example.com/path");
911 assert!(result.is_err());
912 let (code, msg) = result.unwrap_err();
913 assert_eq!(code, HostCallErrorCode::Denied);
914 assert!(msg.contains("allowlist"), "{msg}");
915 }
916
917 #[test]
918 fn test_url_validation_denylist() {
919 let connector = HttpConnector::new(HttpConnectorConfig {
920 require_tls: false,
921 denylist: vec!["evil.com".to_string(), "*.malware.net".to_string()],
922 ..Default::default()
923 });
924
925 assert!(connector.validate_url("http://example.com/path").is_ok());
927
928 let result = connector.validate_url("http://evil.com/path");
930 assert!(result.is_err());
931 let (code, _) = result.unwrap_err();
932 assert_eq!(code, HostCallErrorCode::Denied);
933
934 let result = connector.validate_url("http://api.malware.net/path");
936 assert!(result.is_err());
937 }
938
939 #[test]
940 fn test_url_validation_denylist_precedence() {
941 let connector = HttpConnector::new(HttpConnectorConfig {
942 require_tls: false,
943 allowlist: vec!["*.example.com".to_string()],
944 denylist: vec!["evil.example.com".to_string()],
945 ..Default::default()
946 });
947
948 assert!(
950 connector
951 .validate_url("http://api.example.com/path")
952 .is_ok()
953 );
954
955 let result = connector.validate_url("http://evil.example.com/path");
957 assert!(result.is_err());
958 let (code, _) = result.unwrap_err();
959 assert_eq!(code, HostCallErrorCode::Denied);
960 }
961
962 #[test]
963 fn test_pattern_matching() {
964 let wildcard_patterns = vec!["*.example.com".to_string()];
965
966 assert!(HttpConnector::matches_pattern_list(
968 "api.example.com",
969 &wildcard_patterns
970 ));
971 assert!(HttpConnector::matches_pattern_list(
972 "sub.api.example.com",
973 &wildcard_patterns
974 ));
975 assert!(HttpConnector::matches_pattern_list(
976 "example.com",
977 &wildcard_patterns
978 ));
979
980 let exact_patterns = vec!["example.com".to_string()];
982 assert!(HttpConnector::matches_pattern_list(
983 "example.com",
984 &exact_patterns
985 ));
986 assert!(!HttpConnector::matches_pattern_list(
987 "api.example.com",
988 &exact_patterns
989 ));
990
991 assert!(HttpConnector::matches_pattern_list(
993 "API.Example.COM",
994 &wildcard_patterns
995 ));
996 }
997
998 #[test]
999 fn test_parse_request_valid() {
1000 let connector = HttpConnector::with_defaults();
1001
1002 let params = json!({
1003 "url": "https://api.example.com/data",
1004 "method": "POST",
1005 "headers": {"Content-Type": "application/json"},
1006 "body": "{\"key\": \"value\"}"
1007 });
1008
1009 let request = connector.parse_request(¶ms).unwrap();
1010 assert_eq!(request.url, "https://api.example.com/data");
1011 assert_eq!(request.method, "POST");
1012 assert_eq!(
1013 request.headers.get("Content-Type").unwrap(),
1014 "application/json"
1015 );
1016 assert_eq!(request.body.as_ref().unwrap(), "{\"key\": \"value\"}");
1017 }
1018
1019 #[test]
1020 fn test_parse_request_invalid_method() {
1021 let connector = HttpConnector::with_defaults();
1022
1023 let params = json!({
1024 "url": "https://api.example.com/data",
1025 "method": "INVALID"
1026 });
1027
1028 let result = connector.parse_request(¶ms);
1029 assert!(result.is_err());
1030 let (code, _) = result.unwrap_err();
1031 assert_eq!(code, HostCallErrorCode::InvalidRequest);
1032 }
1033
1034 #[test]
1035 fn test_parse_request_body_too_large() {
1036 let connector = HttpConnector::new(HttpConnectorConfig {
1037 max_request_bytes: 100,
1038 ..Default::default()
1039 });
1040
1041 let large_body = "x".repeat(200);
1042 let params = json!({
1043 "url": "https://api.example.com/data",
1044 "method": "POST",
1045 "body": large_body
1046 });
1047
1048 let result = connector.parse_request(¶ms);
1049 assert!(result.is_err());
1050 let (code, _) = result.unwrap_err();
1051 assert_eq!(code, HostCallErrorCode::InvalidRequest);
1052 }
1053
1054 #[test]
1055 fn test_parse_request_rejects_both_body_and_body_bytes() {
1056 let connector = HttpConnector::with_defaults();
1057
1058 let params = json!({
1059 "url": "https://api.example.com/data",
1060 "method": "POST",
1061 "body": "{\"key\": \"value\"}",
1062 "body_bytes": "eyJrZXkiOiAidmFsdWUifQ=="
1063 });
1064
1065 let result = connector.parse_request(¶ms);
1066 assert!(result.is_err());
1067 let (code, message) = result.unwrap_err();
1068 assert_eq!(code, HostCallErrorCode::InvalidRequest);
1069 assert!(
1070 message.contains("not both"),
1071 "expected ambiguity error, got: {message}"
1072 );
1073 }
1074
1075 #[test]
1076 fn test_config_serialization() {
1077 let config = HttpConnectorConfig {
1078 allowlist: vec!["*.example.com".to_string()],
1079 denylist: vec!["evil.com".to_string()],
1080 enforce_allowlist: true,
1081 require_tls: true,
1082 max_request_bytes: 1024,
1083 max_response_bytes: 2048,
1084 default_timeout_ms: 5000,
1085 };
1086
1087 let json = serde_json::to_string(&config).unwrap();
1088 let parsed: HttpConnectorConfig = serde_json::from_str(&json).unwrap();
1089
1090 assert_eq!(parsed.allowlist, config.allowlist);
1091 assert_eq!(parsed.denylist, config.denylist);
1092 assert_eq!(parsed.enforce_allowlist, config.enforce_allowlist);
1093 assert_eq!(parsed.require_tls, config.require_tls);
1094 assert_eq!(parsed.max_request_bytes, config.max_request_bytes);
1095 assert_eq!(parsed.max_response_bytes, config.max_response_bytes);
1096 assert_eq!(parsed.default_timeout_ms, config.default_timeout_ms);
1097 }
1098
1099 #[test]
1100 fn test_dispatch_denied_host_returns_deterministic_error() {
1101 let connector = HttpConnector::new(HttpConnectorConfig {
1102 require_tls: false,
1103 allowlist: vec!["allowed.example".to_string()],
1104 ..Default::default()
1105 });
1106
1107 let call = HostCallPayload {
1108 call_id: "call-1".to_string(),
1109 capability: "http".to_string(),
1110 method: "http".to_string(),
1111 params: json!({
1112 "url": "http://denied.example/test",
1113 "method": "GET",
1114 }),
1115 timeout_ms: None,
1116 cancel_token: None,
1117 context: None,
1118 };
1119
1120 let result = run_async(async move { connector.dispatch(&call).await.unwrap() });
1121 assert!(result.is_error);
1122 let error = result.error.expect("error payload");
1123 assert_eq!(error.code, HostCallErrorCode::Denied);
1124 }
1125
1126 #[test]
1127 fn test_dispatch_timeout_returns_timeout_error_code() {
1128 let listener = TcpListener::bind("127.0.0.1:0").expect("bind test listener");
1129 let addr = listener.local_addr().expect("listener addr");
1130
1131 let (ready_tx, ready_rx) = mpsc::channel();
1132 let (shutdown_tx, shutdown_rx) = mpsc::channel();
1133 let join = thread::spawn(move || {
1134 let _ = ready_tx.send(());
1135 let (_stream, _peer) = listener.accept().expect("accept");
1136 let _ = shutdown_rx.recv_timeout(std::time::Duration::from_millis(500));
1137 });
1138 let _ = ready_rx.recv();
1139
1140 let connector = HttpConnector::new(HttpConnectorConfig {
1141 require_tls: false,
1142 default_timeout_ms: 100,
1143 ..Default::default()
1144 });
1145
1146 let call = HostCallPayload {
1147 call_id: "call-1".to_string(),
1148 capability: "http".to_string(),
1149 method: "http".to_string(),
1150 params: json!({
1151 "url": format!("http://{addr}/"),
1152 "method": "GET",
1153 "timeout_ms": 100,
1154 }),
1155 timeout_ms: None,
1156 cancel_token: None,
1157 context: None,
1158 };
1159
1160 let result = run_async(async move { connector.dispatch(&call).await.unwrap() });
1161 assert!(result.is_error);
1162 let error = result.error.expect("error payload");
1163 assert_eq!(error.code, HostCallErrorCode::Timeout);
1164 assert_eq!(error.retryable, Some(true));
1165
1166 let _ = shutdown_tx.send(());
1167 let _ = join.join();
1168 }
1169
1170 #[test]
1171 fn test_dispatch_uses_call_timeout_ms_when_request_timeout_absent() {
1172 let listener = TcpListener::bind("127.0.0.1:0").expect("bind test listener");
1173 let addr = listener.local_addr().expect("listener addr");
1174
1175 let (ready_tx, ready_rx) = mpsc::channel();
1176 let (shutdown_tx, shutdown_rx) = mpsc::channel();
1177 let join = thread::spawn(move || {
1178 let _ = ready_tx.send(());
1179 let (_stream, _peer) = listener.accept().expect("accept");
1180 let _ = shutdown_rx.recv_timeout(std::time::Duration::from_millis(500));
1181 });
1182 let _ = ready_rx.recv();
1183
1184 let connector = HttpConnector::new(HttpConnectorConfig {
1186 require_tls: false,
1187 default_timeout_ms: 5000,
1188 ..Default::default()
1189 });
1190
1191 let call = HostCallPayload {
1192 call_id: "call-1".to_string(),
1193 capability: "http".to_string(),
1194 method: "http".to_string(),
1195 params: json!({
1196 "url": format!("http://{addr}/"),
1197 "method": "GET",
1198 }),
1199 timeout_ms: Some(100),
1200 cancel_token: None,
1201 context: None,
1202 };
1203
1204 let result = run_async(async move { connector.dispatch(&call).await.unwrap() });
1205 assert!(result.is_error);
1206 let error = result.error.expect("error payload");
1207 assert!(
1208 error.code == HostCallErrorCode::Timeout,
1209 "expected timeout, got {:?} (details={:?})",
1210 error.code,
1211 error.details
1212 );
1213
1214 let _ = shutdown_tx.send(());
1215 let _ = join.join();
1216 }
1217
1218 #[test]
1219 #[cfg(unix)]
1220 fn test_dispatch_treats_zero_timeout_as_unset() {
1221 use std::io::Write;
1222
1223 let listener = TcpListener::bind("127.0.0.1:0").expect("bind test listener");
1224 let addr = listener.local_addr().expect("listener addr");
1225
1226 let join = thread::spawn(move || {
1227 let (mut stream, _peer) = listener.accept().expect("accept");
1228 let body = "hello";
1229 let response = format!(
1230 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
1231 body.len()
1232 );
1233 stream
1234 .write_all(response.as_bytes())
1235 .expect("write response");
1236 });
1237
1238 let connector = HttpConnector::new(HttpConnectorConfig {
1239 require_tls: false,
1240 default_timeout_ms: 5000,
1241 ..Default::default()
1242 });
1243
1244 let call = HostCallPayload {
1245 call_id: "call-1".to_string(),
1246 capability: "http".to_string(),
1247 method: "http".to_string(),
1248 params: json!({
1249 "url": format!("http://{addr}/"),
1250 "method": "GET",
1251 "timeout_ms": 0,
1252 }),
1253 timeout_ms: None,
1254 cancel_token: None,
1255 context: None,
1256 };
1257
1258 let result = run_async(async move { connector.dispatch(&call).await.unwrap() });
1259 assert!(!result.is_error);
1260 assert_eq!(
1261 result.output.get("status").and_then(Value::as_u64),
1262 Some(200)
1263 );
1264 assert_eq!(
1265 result.output.get("body").and_then(Value::as_str),
1266 Some("hello")
1267 );
1268
1269 let _ = join.join();
1270 }
1271
1272 #[test]
1273 #[cfg(unix)]
1274 fn test_dispatch_streaming_returns_status_headers_and_body_stream() {
1275 use futures::StreamExt as _;
1276 use std::io::Write;
1277
1278 let listener = TcpListener::bind("127.0.0.1:0").expect("bind test listener");
1279 let addr = listener.local_addr().expect("listener addr");
1280
1281 let join = thread::spawn(move || {
1282 let (mut stream, _peer) = listener.accept().expect("accept");
1283 let body = "hello-stream";
1284 let response = format!(
1285 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\n{body}",
1286 body.len()
1287 );
1288 stream
1289 .write_all(response.as_bytes())
1290 .expect("write response");
1291 });
1292
1293 let connector = HttpConnector::new(HttpConnectorConfig {
1294 require_tls: false,
1295 default_timeout_ms: 5000,
1296 ..Default::default()
1297 });
1298
1299 let call = HostCallPayload {
1300 call_id: "call-1".to_string(),
1301 capability: "http".to_string(),
1302 method: "http".to_string(),
1303 params: json!({
1304 "url": format!("http://{addr}/"),
1305 "method": "GET",
1306 "timeout_ms": 1000,
1307 }),
1308 timeout_ms: None,
1309 cancel_token: None,
1310 context: None,
1311 };
1312
1313 let (status, headers, body) = run_async(async move {
1314 let response = connector
1315 .dispatch_streaming(&call)
1316 .await
1317 .expect("dispatch_streaming ok");
1318
1319 let mut bytes = Vec::new();
1320 let mut stream = response.stream;
1321 while let Some(chunk) = stream.next().await {
1322 let chunk = chunk.expect("stream chunk");
1323 bytes.extend_from_slice(&chunk);
1324 }
1325
1326 (response.status, response.headers, bytes)
1327 });
1328
1329 assert_eq!(status, 200);
1330 assert_eq!(
1331 headers
1332 .get("Content-Type")
1333 .or_else(|| headers.get("content-type"))
1334 .map(String::as_str),
1335 Some("text/plain")
1336 );
1337 assert_eq!(String::from_utf8_lossy(&body), "hello-stream");
1338
1339 let _ = join.join();
1340 }
1341
1342 #[test]
1343 fn http_connector_redact_url_for_log_strips_sensitive_parts() {
1344 let redacted =
1345 HttpConnector::redact_url_for_log("http://user:pass@denied.example/test?q=hello#frag");
1346 assert!(redacted.contains("http://denied.example/test"));
1347 assert!(!redacted.contains("q=hello"));
1348 assert!(!redacted.contains("#frag"));
1349 assert!(!redacted.contains("user"));
1350 assert!(!redacted.contains("pass"));
1351 }
1352
1353 #[test]
1354 fn http_connector_redact_url_for_log_falls_back_for_invalid_urls() {
1355 let redacted = HttpConnector::redact_url_for_log("not a url?q=hello#frag");
1356 assert_eq!(redacted, "not a url");
1357 }
1358}