Skip to main content

camel_component_http/
lib.rs

1pub mod auth;
2pub mod bundle;
3pub mod config;
4pub mod health;
5pub mod registry;
6pub mod static_config;
7pub mod static_dispatch;
8pub mod static_endpoint;
9use crate::config::parse_ok_status_code_range;
10pub use bundle::HttpBundle;
11pub use bundle::HttpStaticBundle;
12pub use config::HttpConfig;
13pub use health::HttpHealthCheck;
14pub use registry::HttpRouteRegistry;
15pub use static_config::HttpStaticConfig;
16pub use static_endpoint::{HttpStaticComponent, HttpStaticConsumer, HttpStaticEndpoint};
17
18use std::collections::HashMap;
19use std::future::Future;
20use std::net::IpAddr;
21use std::pin::Pin;
22use std::sync::{Arc, Mutex, OnceLock};
23use std::task::{Context, Poll};
24use std::time::Duration;
25
26use tokio::sync::OnceCell;
27use tower::Layer;
28use tower::Service;
29use tracing::debug;
30
31use axum::body::BodyDataStream;
32use camel_auth::bearer_token_layer::BearerTokenLayer;
33use camel_auth::oauth2::TokenProvider;
34use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, StreamBody, StreamMetadata};
35use camel_component_api::{Component, Consumer, Endpoint, ProducerContext, RuntimeObservability};
36use camel_component_api::{UriComponents, UriConfig, parse_uri};
37use futures::TryStreamExt;
38use futures::stream::BoxStream;
39
40// ---------------------------------------------------------------------------
41// HttpEndpointConfig
42// ---------------------------------------------------------------------------
43
44/// Configuration for an HTTP client (producer) endpoint.
45///
46/// # Memory Limits
47///
48/// HTTP operations enforce conservative memory limits to prevent denial-of-service
49/// attacks from untrusted network sources. These limits are significantly lower than
50/// file component limits (100MB) because HTTP typically handles API responses rather
51/// than large file transfers, and clients may be untrusted.
52///
53/// ## Default Limits
54///
55/// - **HTTP client body**: 10MB (typical API responses)
56/// - **HTTP server request**: 2MB (untrusted network input - see `HttpServerConfig`)
57/// - **HTTP server response**: 10MB (same as client - see `HttpServerConfig`)
58///
59/// ## Rationale
60///
61/// The 10MB limit for HTTP client responses is appropriate for most API interactions
62/// while providing protection against:
63/// - Malicious servers sending oversized responses
64/// - Runaway processes generating unexpectedly large payloads
65/// - Memory exhaustion attacks
66///
67/// The 2MB server request limit is even more conservative because it handles input
68/// from potentially untrusted clients on the public internet.
69///
70/// ## Overriding Limits
71///
72/// Override the default client body limit using the `maxBodySize` URI parameter:
73///
74/// ```text
75/// http://api.example.com/large-data?maxBodySize=52428800
76/// ```
77///
78/// For server endpoints, use `maxRequestBody` and `maxResponseBody` parameters:
79///
80/// ```text
81/// http://0.0.0.0:8080/upload?maxRequestBody=52428800
82/// ```
83///
84/// ## Behavior When Exceeded
85///
86/// When a body exceeds the configured limit:
87/// - An error is returned immediately
88/// - No memory is exhausted - the limit is checked before allocation
89/// - The HTTP connection is terminated cleanly
90///
91/// ## Security Considerations
92///
93/// HTTP endpoints should be treated with more caution than file endpoints because:
94/// - Clients may be unknown and untrusted
95/// - Network traffic can be spoofed or malicious
96/// - DoS attacks often exploit unbounded resource consumption
97///
98/// Only increase limits when you control both ends of the connection or when
99/// business requirements demand larger payloads.
100#[derive(Debug, Clone)]
101pub struct HttpEndpointConfig {
102    pub base_url: String,
103    pub http_method: Option<String>,
104    pub throw_exception_on_failure: bool,
105    pub ok_status_code_range: (u16, u16),
106    pub response_timeout: Option<Duration>,
107    pub query_params: HashMap<String, String>,
108    pub allow_private_ips: bool,
109    pub blocked_hosts: Vec<String>,
110    pub max_body_size: usize,
111    pub read_timeout_ms: u64,
112    pub max_response_bytes: usize,
113    pub auth: HttpAuth,
114    pub token_provider: Option<Arc<dyn TokenProvider>>,
115    pub user_agent: Option<String>,
116    pub cookie_handling: CookieHandling,
117    pub bridge_endpoint: bool,
118    pub connection_close: bool,
119    pub skip_request_headers: Vec<String>,
120    pub skip_response_headers: Vec<String>,
121}
122
123#[derive(Clone, PartialEq)]
124pub enum HttpAuth {
125    None,
126    Basic { username: String, password: String },
127    Bearer { token: String },
128}
129
130impl std::fmt::Debug for HttpAuth {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        match self {
133            HttpAuth::None => f.write_str("None"),
134            HttpAuth::Basic { username, .. } => f
135                .debug_struct("Basic")
136                .field("username", username)
137                .field("password", &"***")
138                .finish(),
139            HttpAuth::Bearer { .. } => f.debug_struct("Bearer").field("token", &"***").finish(),
140        }
141    }
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub enum CookieHandling {
146    Disabled,
147    InMemory,
148}
149
150/// Camel options that should NOT be forwarded as HTTP query params
151const HTTP_CAMEL_OPTIONS: &[&str] = &[
152    "httpMethod",
153    "throwExceptionOnFailure",
154    "okStatusCodeRange",
155    "followRedirects",
156    "connectTimeout",
157    "responseTimeout",
158    "allowPrivateIps",
159    "blockedHosts",
160    "maxBodySize",
161    "readTimeout",
162    "maxResponseBytes",
163    "authMethod",
164    "authUsername",
165    "authPassword",
166    "authBearerToken",
167    "userAgent",
168    "cookieHandling",
169    "bridgeEndpoint",
170    "connectionClose",
171    "skipRequestHeaders",
172    "skipResponseHeaders",
173];
174
175impl UriConfig for HttpEndpointConfig {
176    /// Returns "http" as the primary scheme (also accepts "https")
177    fn scheme() -> &'static str {
178        "http"
179    }
180
181    fn from_uri(uri: &str) -> Result<Self, CamelError> {
182        let parts = parse_uri(uri)?;
183        Self::from_components(parts)
184    }
185
186    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
187        // Validate scheme - accept both http and https
188        if parts.scheme != "http" && parts.scheme != "https" {
189            return Err(CamelError::InvalidUri(format!(
190                "expected scheme 'http' or 'https', got '{}'",
191                parts.scheme
192            )));
193        }
194
195        // Construct base_url from scheme + path
196        // e.g., "http://localhost:8080/api" from scheme "http" and path "//localhost:8080/api"
197        let base_url = format!("{}:{}", parts.scheme, parts.path);
198
199        let http_method = parts.params.get("httpMethod").cloned();
200
201        let throw_exception_on_failure = match parts.params.get("throwExceptionOnFailure") {
202            Some(v) => parse_bool_param_http(v).map_err(|e| {
203                CamelError::InvalidUri(format!("invalid value for throwExceptionOnFailure: {e}"))
204            })?,
205            None => true,
206        };
207
208        // Parse status code range from "start-end" format (e.g., "200-299")
209        let ok_status_code_range = match parts.params.get("okStatusCodeRange") {
210            Some(v) => parse_ok_status_code_range(v)?,
211            None => (200, 299),
212        };
213
214        let response_timeout = match parts.params.get("responseTimeout") {
215            Some(v) => Some(v.parse::<u64>().map(Duration::from_millis).map_err(|e| {
216                CamelError::InvalidUri(format!("invalid value for responseTimeout: {e}"))
217            })?),
218            None => None,
219        };
220
221        // SSRF protection settings
222        let allow_private_ips = match parts.params.get("allowPrivateIps") {
223            Some(v) => parse_bool_param_http(v).map_err(|e| {
224                CamelError::InvalidUri(format!("invalid value for allowPrivateIps: {e}"))
225            })?,
226            None => false, // Default: block private IPs
227        };
228
229        // Parse comma-separated blocked hosts
230        let blocked_hosts = parts
231            .params
232            .get("blockedHosts")
233            .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
234            .unwrap_or_default();
235
236        let max_body_size = match parts.params.get("maxBodySize") {
237            Some(v) => v.parse::<usize>().map_err(|e| {
238                CamelError::InvalidUri(format!("invalid value for maxBodySize: {e}"))
239            })?,
240            None => 10 * 1024 * 1024, // Default: 10MB
241        };
242
243        let read_timeout_ms = match parts.params.get("readTimeout") {
244            Some(v) => v.parse::<u64>().map_err(|e| {
245                CamelError::InvalidUri(format!("invalid value for readTimeout: {e}"))
246            })?,
247            None => 30_000, // Default: 30s
248        };
249
250        let max_response_bytes = match parts.params.get("maxResponseBytes") {
251            Some(v) => v.parse::<usize>().map_err(|e| {
252                CamelError::InvalidUri(format!("invalid value for maxResponseBytes: {e}"))
253            })?,
254            None => 10 * 1024 * 1024, // Default: 10MB
255        };
256
257        let auth = parse_auth_from_params(&parts.params)?;
258
259        let user_agent = parts.params.get("userAgent").cloned();
260
261        let cookie_handling = match parts.params.get("cookieHandling") {
262            Some(v) if v.eq_ignore_ascii_case("inmemory") => CookieHandling::InMemory,
263            Some(v) if v.eq_ignore_ascii_case("disabled") => CookieHandling::Disabled,
264            Some(v) => {
265                return Err(CamelError::InvalidUri(format!(
266                    "invalid value for cookieHandling: {v} (expected Disabled or InMemory)"
267                )));
268            }
269            None => CookieHandling::Disabled,
270        };
271
272        let bridge_endpoint = match parts.params.get("bridgeEndpoint") {
273            Some(v) => parse_bool_param_http(v).map_err(|e| {
274                CamelError::InvalidUri(format!("invalid value for bridgeEndpoint: {e}"))
275            })?,
276            None => false,
277        };
278
279        let connection_close = match parts.params.get("connectionClose") {
280            Some(v) => parse_bool_param_http(v).map_err(|e| {
281                CamelError::InvalidUri(format!("invalid value for connectionClose: {e}"))
282            })?,
283            None => false,
284        };
285
286        let skip_request_headers = parts
287            .params
288            .get("skipRequestHeaders")
289            .map(|v| {
290                v.split(',')
291                    .map(str::trim)
292                    .filter(|s| !s.is_empty())
293                    .map(|s| s.to_ascii_lowercase())
294                    .collect::<Vec<_>>()
295            })
296            .unwrap_or_default();
297
298        let skip_response_headers = parts
299            .params
300            .get("skipResponseHeaders")
301            .map(|v| {
302                v.split(',')
303                    .map(str::trim)
304                    .filter(|s| !s.is_empty())
305                    .map(|s| s.to_ascii_lowercase())
306                    .collect::<Vec<_>>()
307            })
308            .unwrap_or_default();
309
310        // Collect remaining params (not Camel options) as query params
311        let query_params: HashMap<String, String> = parts
312            .params
313            .into_iter()
314            .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
315            .collect();
316
317        Ok(Self {
318            base_url,
319            http_method,
320            throw_exception_on_failure,
321            ok_status_code_range,
322            response_timeout,
323            query_params,
324            allow_private_ips,
325            blocked_hosts,
326            max_body_size,
327            read_timeout_ms,
328            max_response_bytes,
329            auth,
330            token_provider: None,
331            user_agent,
332            cookie_handling,
333            bridge_endpoint,
334            connection_close,
335            skip_request_headers,
336            skip_response_headers,
337        })
338    }
339}
340
341fn parse_auth_from_params(params: &HashMap<String, String>) -> Result<HttpAuth, CamelError> {
342    let Some(method) = params.get("authMethod") else {
343        return Ok(HttpAuth::None);
344    };
345
346    if method.eq_ignore_ascii_case("none") {
347        return Ok(HttpAuth::None);
348    }
349
350    if method.eq_ignore_ascii_case("basic") {
351        let username = params.get("authUsername").cloned().ok_or_else(|| {
352            CamelError::InvalidUri("authUsername is required for authMethod=Basic".to_string())
353        })?;
354        let password = params.get("authPassword").cloned().ok_or_else(|| {
355            CamelError::InvalidUri("authPassword is required for authMethod=Basic".to_string())
356        })?;
357        return Ok(HttpAuth::Basic { username, password });
358    }
359
360    if method.eq_ignore_ascii_case("bearer") {
361        let token = params.get("authBearerToken").cloned().ok_or_else(|| {
362            CamelError::InvalidUri("authBearerToken is required for authMethod=Bearer".to_string())
363        })?;
364        return Ok(HttpAuth::Bearer { token });
365    }
366
367    Err(CamelError::InvalidUri(format!(
368        "invalid value for authMethod: {method} (expected None, Basic, or Bearer)"
369    )))
370}
371
372fn parse_bool_param_http(value: &str) -> Result<bool, CamelError> {
373    match value.to_ascii_lowercase().as_str() {
374        "true" | "1" | "yes" => Ok(true),
375        "false" | "0" | "no" => Ok(false),
376        _ => Err(CamelError::InvalidUri(format!(
377            "invalid boolean value: '{value}'"
378        ))),
379    }
380}
381
382impl HttpEndpointConfig {
383    pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
384        let parts = parse_uri(uri)?;
385        let mut endpoint = Self::from_components(parts.clone())?;
386        if endpoint.response_timeout.is_none() {
387            endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
388        }
389        if !parts.params.contains_key("allowPrivateIps") {
390            endpoint.allow_private_ips = config.allow_private_ips;
391        }
392        if !parts.params.contains_key("blockedHosts") {
393            endpoint.blocked_hosts = config.blocked_hosts.clone();
394        }
395        if !parts.params.contains_key("maxBodySize") {
396            endpoint.max_body_size = config.max_body_size;
397        }
398        if !parts.params.contains_key("readTimeout") {
399            endpoint.read_timeout_ms = config.read_timeout_ms;
400        }
401        if !parts.params.contains_key("maxResponseBytes") {
402            endpoint.max_response_bytes = config.max_response_bytes;
403        }
404        if !parts.params.contains_key("okStatusCodeRange")
405            && let Some(range) = &config.ok_status_code_range
406        {
407            endpoint.ok_status_code_range = parse_ok_status_code_range(range)?;
408        }
409
410        Ok(endpoint)
411    }
412}
413
414// ---------------------------------------------------------------------------
415// HttpServerConfig
416// ---------------------------------------------------------------------------
417
418/// Configuration for an HTTP server (consumer) endpoint.
419#[derive(Debug, Clone)]
420pub struct HttpServerConfig {
421    /// Bind address, e.g. "0.0.0.0" or "127.0.0.1".
422    pub host: String,
423    /// TCP port to listen on.
424    pub port: u16,
425    /// URL path this consumer handles, e.g. "/orders".
426    pub path: String,
427    /// Maximum request body size in bytes.
428    pub max_request_body: usize,
429    /// Maximum response body size for materializing streams in bytes.
430    pub max_response_body: usize,
431    /// Maximum number of in-flight requests handled concurrently by this server.
432    pub max_inflight_requests: usize,
433}
434
435impl UriConfig for HttpServerConfig {
436    /// Returns "http" as the primary scheme (also accepts "https")
437    fn scheme() -> &'static str {
438        "http"
439    }
440
441    fn from_uri(uri: &str) -> Result<Self, CamelError> {
442        let parts = parse_uri(uri)?;
443        Self::from_components(parts)
444    }
445
446    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
447        // Validate scheme - accept both http and https
448        if parts.scheme != "http" && parts.scheme != "https" {
449            return Err(CamelError::InvalidUri(format!(
450                "expected scheme 'http' or 'https', got '{}'",
451                parts.scheme
452            )));
453        }
454
455        // parts.path is everything after the scheme colon, e.g. "//0.0.0.0:8080/orders"
456        // Strip leading "//"
457        let authority_and_path = parts.path.trim_start_matches('/');
458
459        // Split on the first "/" to separate "host:port" from "/path"
460        let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
461            (&authority_and_path[..idx], &authority_and_path[idx..])
462        } else {
463            (authority_and_path, "/")
464        };
465
466        let path = if path_suffix.is_empty() {
467            "/"
468        } else {
469            path_suffix
470        }
471        .to_string();
472
473        // Parse host:port from authority
474        let (host, port) = if let Some(colon) = authority.rfind(':') {
475            let port_str = &authority[colon + 1..];
476            match port_str.parse::<u16>() {
477                Ok(p) => (authority[..colon].to_string(), p),
478                Err(_) => {
479                    return Err(CamelError::InvalidUri(format!(
480                        "invalid port '{}' in authority",
481                        port_str
482                    )));
483                }
484            }
485        } else {
486            // Default port based on scheme: 443 for https, 80 for http
487            let default_port = if parts.scheme == "https" { 443 } else { 80 };
488            (authority.to_string(), default_port)
489        };
490
491        let max_request_body = parts
492            .params
493            .get("maxRequestBody")
494            .and_then(|v| v.parse::<usize>().ok())
495            .unwrap_or(2 * 1024 * 1024); // Default: 2MB
496
497        let max_response_body = parts
498            .params
499            .get("maxResponseBody")
500            .and_then(|v| v.parse::<usize>().ok())
501            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
502
503        let max_inflight_requests = parts
504            .params
505            .get("maxInflightRequests")
506            .and_then(|v| v.parse::<usize>().ok())
507            .unwrap_or(1024);
508
509        Ok(Self {
510            host,
511            port,
512            path,
513            max_request_body,
514            max_response_body,
515            max_inflight_requests,
516        })
517    }
518}
519
520impl HttpServerConfig {
521    pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
522        let parts = parse_uri(uri)?;
523        let mut server = Self::from_components(parts.clone())?;
524        if !parts.params.contains_key("maxRequestBody") {
525            server.max_request_body = config.max_request_body;
526        }
527        if !parts.params.contains_key("maxResponseBody") {
528            // Default max_response_body is 10MB via HttpConfig::default().max_body_size.
529            server.max_response_body = config.max_body_size;
530        }
531        Ok(server)
532    }
533}
534
535// ---------------------------------------------------------------------------
536// RequestEnvelope / HttpReply
537// ---------------------------------------------------------------------------
538
539/// Body of the HTTP response: already-materialized bytes or a lazy stream.
540///
541/// **Internal plumbing** — subject to change without notice.
542pub enum HttpReplyBody {
543    Bytes(bytes::Bytes),
544    Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
545}
546
547/// An inbound HTTP request sent from the Axum dispatch handler to an
548/// `HttpConsumer` receive loop.
549///
550/// **Internal plumbing** — subject to change without notice.
551pub struct RequestEnvelope {
552    pub method: String,
553    pub path: String,
554    pub query: String,
555    pub headers: http::HeaderMap,
556    pub body: StreamBody,
557    pub reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
558}
559
560/// The HTTP response that `HttpConsumer` sends back to the Axum handler.
561///
562/// **Internal plumbing** — subject to change without notice.
563pub struct HttpReply {
564    pub status: u16,
565    pub headers: Vec<(String, String)>,
566    pub body: HttpReplyBody,
567}
568
569// ---------------------------------------------------------------------------
570// HttpRouteRegistry / ServerRegistry
571// ---------------------------------------------------------------------------
572
573type ServerKey = (String, u16);
574
575/// Handle to a running Axum server on one interface/port.
576struct ServerHandle {
577    registry: HttpRouteRegistry,
578    max_request_body: usize,
579    max_response_body: usize,
580    max_inflight_requests: usize,
581}
582
583/// Process-global registry mapping (host, port) → running Axum server handle.
584pub struct ServerRegistry {
585    inner: Mutex<HashMap<ServerKey, Arc<OnceCell<ServerHandle>>>>,
586}
587
588impl ServerRegistry {
589    /// Returns the global singleton.
590    pub fn global() -> &'static Self {
591        static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
592        INSTANCE.get_or_init(|| ServerRegistry {
593            inner: Mutex::new(HashMap::new()),
594        })
595    }
596
597    /// Returns route registry for `port`, spawning new Axum server if
598    /// none is running on that port yet.
599    #[allow(clippy::too_many_arguments)]
600    pub async fn get_or_spawn(
601        &'static self,
602        host: &str,
603        port: u16,
604        max_request_body: usize,
605        max_response_body: usize,
606        max_inflight_requests: usize,
607        runtime: Arc<dyn RuntimeObservability>,
608        route_id: String,
609    ) -> Result<HttpRouteRegistry, CamelError> {
610        let host_owned = host.to_string();
611
612        let cell = {
613            let mut guard = self.inner.lock().map_err(|_| {
614                CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
615            })?;
616            let key = (host.to_string(), port);
617            guard
618                .entry(key)
619                .or_insert_with(|| Arc::new(OnceCell::new()))
620                .clone()
621        };
622
623        if let Some(existing) = cell.get()
624            && existing.max_request_body != max_request_body
625        {
626            return Err(CamelError::EndpointCreationFailed(format!(
627                "incompatible maxRequestBody for shared server (host={host}, port={port}): {} vs {}",
628                existing.max_request_body, max_request_body
629            )));
630        }
631
632        if let Some(existing) = cell.get()
633            && existing.max_response_body != max_response_body
634        {
635            return Err(CamelError::EndpointCreationFailed(format!(
636                "incompatible maxResponseBody for shared server (host={host}, port={port}): {} vs {}",
637                existing.max_response_body, max_response_body
638            )));
639        }
640
641        if let Some(existing) = cell.get()
642            && existing.max_inflight_requests != max_inflight_requests
643        {
644            return Err(CamelError::EndpointCreationFailed(format!(
645                "incompatible maxInflightRequests for shared server (host={host}, port={port}): {} vs {}",
646                existing.max_inflight_requests, max_inflight_requests
647            )));
648        }
649
650        let handle = cell
651            .get_or_try_init(|| {
652                let rt = Arc::clone(&runtime);
653                let rid = route_id.clone();
654                async move {
655                    let addr = format!("{host_owned}:{port}");
656                    let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
657                        CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
658                    })?;
659                    let registry = HttpRouteRegistry::new();
660                    let inflight = Arc::new(tokio::sync::Semaphore::new(max_inflight_requests));
661                    let task = tokio::spawn(run_axum_server(
662                        listener,
663                        registry.clone(),
664                        max_request_body,
665                        max_response_body,
666                        Arc::clone(&inflight),
667                        Arc::clone(&rt),
668                        rid.clone(),
669                    ));
670                    let addr_for_monitor = format!("{host_owned}:{port}");
671                    tokio::spawn(monitor_axum_task(
672                        task,
673                        addr_for_monitor,
674                        Arc::clone(&rt),
675                        rid,
676                    ));
677                    Ok::<ServerHandle, CamelError>(ServerHandle {
678                        registry,
679                        max_request_body,
680                        max_response_body,
681                        max_inflight_requests,
682                    })
683                }
684            })
685            .await?;
686
687        Ok(handle.registry.clone())
688    }
689
690    /// Reset the global registry — **test-only**.
691    ///
692    /// Clears all registered server handles so that tests can start from a clean
693    /// state. This is intentionally `#[cfg(test)]` because the registry is a
694    /// process-global singleton in production and resetting it would break
695    /// running servers.
696    #[cfg(test)]
697    pub fn reset() {
698        let instance = Self::global();
699        let mut guard = instance
700            .inner
701            .lock()
702            .expect("ServerRegistry lock poisoned during test reset");
703        guard.clear();
704    }
705}
706
707// ---------------------------------------------------------------------------
708// Axum server
709// ---------------------------------------------------------------------------
710
711use axum::{
712    Router,
713    body::Body as AxumBody,
714    extract::{Request, State},
715    http::{Response, StatusCode},
716    response::IntoResponse,
717};
718
719#[derive(Clone)]
720pub(crate) struct AppState {
721    registry: HttpRouteRegistry,
722    max_request_body: usize,
723    max_response_body: usize,
724    inflight: Arc<tokio::sync::Semaphore>,
725}
726
727async fn run_axum_server(
728    listener: tokio::net::TcpListener,
729    registry: HttpRouteRegistry,
730    max_request_body: usize,
731    max_response_body: usize,
732    inflight: Arc<tokio::sync::Semaphore>,
733    runtime: Arc<dyn RuntimeObservability>,
734    route_id: String,
735) {
736    let state = AppState {
737        registry,
738        max_request_body,
739        max_response_body,
740        inflight,
741    };
742    let app = Router::new().fallback(dispatch_handler).with_state(state);
743
744    axum::serve(listener, app).await.unwrap_or_else(|e| {
745        runtime
746            .metrics()
747            .increment_errors(&route_id, "e:http:accept");
748        // log-policy: outside-contract
749        tracing::error!(error = %e, "Axum server error");
750    });
751}
752
753/// Monitors an Axum server task and emits a structured error event if it
754/// exits unexpectedly.
755///
756/// # Limitations
757/// The HTTP server is shared across all routes on a port. Full per-route
758/// CrashNotification propagation is deferred — this provides observable
759/// structured logging as a first guard.
760async fn monitor_axum_task(
761    handle: tokio::task::JoinHandle<()>,
762    addr: String,
763    runtime: Arc<dyn RuntimeObservability>,
764    route_id: String,
765) {
766    match handle.await {
767        Ok(()) => {
768            // Clean exit (process shutdown or normal stop)
769        }
770        Err(join_err) => {
771            runtime
772                .metrics()
773                .increment_errors(&route_id, "e:http:server-task-exited");
774            // log-policy: outside-contract
775            tracing::error!(
776                addr = %addr,
777                error = %join_err,
778                "Axum server task exited unexpectedly — all routes on this port are now dead"
779            );
780        }
781    }
782}
783
784async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
785    let path = req.uri().path().to_owned();
786
787    // 1. API match — lookup by path, only consume body if matched
788    let api_sender = {
789        let inner = state.registry.inner.read().await;
790        inner.api_routes.get(&path).cloned()
791    }; // lock released BEFORE any IO
792
793    if let Some(sender) = api_sender {
794        let method = req.method().to_string();
795        let query = req.uri().query().unwrap_or("").to_string();
796        let headers = req.headers().clone();
797
798        // Check Content-Length against limit BEFORE opening the stream
799        let content_length: Option<u64> = headers
800            .get(http::header::CONTENT_LENGTH)
801            .and_then(|v| v.to_str().ok())
802            .and_then(|s| s.parse().ok());
803
804        if let Some(len) = content_length
805            && len > state.max_request_body as u64
806        {
807            return Response::builder()
808                .status(StatusCode::PAYLOAD_TOO_LARGE)
809                .body(AxumBody::from("Request body exceeds configured limit"))
810                .expect("infallible"); // allow-unwrap
811        }
812
813        let _permit = match Arc::clone(&state.inflight).try_acquire_owned() {
814            Ok(permit) => permit,
815            Err(_) => {
816                return Response::builder()
817                    .status(StatusCode::SERVICE_UNAVAILABLE)
818                    .body(AxumBody::from("Service Unavailable"))
819                    .expect("infallible"); // allow-unwrap
820            }
821        };
822
823        // Build StreamBody from Axum body WITHOUT materializing
824        let content_type = headers
825            .get(http::header::CONTENT_TYPE)
826            .and_then(|v| v.to_str().ok())
827            .map(|s| s.to_string());
828
829        let data_stream: BodyDataStream = req.into_body().into_data_stream();
830        let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
831        let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
832
833        let stream_body = StreamBody {
834            stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
835            metadata: StreamMetadata {
836                size_hint: content_length,
837                content_type,
838                origin: None,
839            },
840        };
841
842        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
843        let envelope = RequestEnvelope {
844            method,
845            path,
846            query,
847            headers,
848            body: stream_body,
849            reply_tx,
850        };
851
852        if sender.send(envelope).await.is_err() {
853            return Response::builder()
854                .status(StatusCode::SERVICE_UNAVAILABLE)
855                .body(AxumBody::from("Consumer unavailable"))
856                .expect("infallible"); // allow-unwrap
857        }
858
859        match reply_rx.await {
860            Ok(reply) => {
861                let reply = match reply.body {
862                    HttpReplyBody::Bytes(b)
863                        if exceeds_max_response_body(b.len(), state.max_response_body) =>
864                    {
865                        HttpReply {
866                            status: 500,
867                            headers: vec![],
868                            body: HttpReplyBody::Bytes(bytes::Bytes::from(
869                                "Response body exceeds configured limit",
870                            )),
871                        }
872                    }
873                    _ => reply,
874                };
875
876                let status =
877                    StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
878                let mut builder = Response::builder().status(status);
879                for (k, v) in &reply.headers {
880                    builder = builder.header(k.as_str(), v.as_str());
881                }
882                match reply.body {
883                    HttpReplyBody::Bytes(b) => {
884                        builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
885                            Response::builder()
886                                .status(StatusCode::INTERNAL_SERVER_ERROR)
887                                .body(AxumBody::from("Invalid response headers from consumer"))
888                                .expect("infallible") // allow-unwrap
889                        })
890                    }
891                    HttpReplyBody::Stream(stream) => builder
892                        .body(AxumBody::from_stream(stream))
893                        .unwrap_or_else(|_| {
894                            Response::builder()
895                                .status(StatusCode::INTERNAL_SERVER_ERROR)
896                                .body(AxumBody::from("Invalid response headers from consumer"))
897                                .expect("infallible") // allow-unwrap
898                        }),
899                }
900            }
901            Err(_) => Response::builder()
902                .status(StatusCode::INTERNAL_SERVER_ERROR)
903                .body(AxumBody::from("Pipeline error"))
904                .expect("infallible"), // allow-unwrap
905        }
906    } else {
907        // No API route matched — try static mounts
908        static_dispatch::dispatch_static(&state, req, &path).await
909    }
910}
911
912fn exceeds_max_response_body(len: usize, max: usize) -> bool {
913    len > max
914}
915
916fn title_case_header(name: &str) -> String {
917    name.split('-')
918        .map(|part| {
919            let mut chars = part.chars();
920            match chars.next() {
921                None => String::new(),
922                Some(first) => first.to_uppercase().chain(chars.as_str().chars()).collect(),
923            }
924        })
925        .collect::<Vec<_>>()
926        .join("-")
927}
928
929// ---------------------------------------------------------------------------
930// HttpConsumer
931// ---------------------------------------------------------------------------
932
933pub struct HttpConsumer {
934    config: HttpServerConfig,
935    /// Runtime observability handle for ADR-0012 metrics and health calls.
936    runtime: Arc<dyn RuntimeObservability>,
937}
938
939impl HttpConsumer {
940    pub fn new(config: HttpServerConfig, runtime: Arc<dyn RuntimeObservability>) -> Self {
941        Self { config, runtime }
942    }
943}
944
945#[async_trait::async_trait]
946impl Consumer for HttpConsumer {
947    async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
948        use camel_component_api::{Body, Exchange, Message};
949
950        let registry = ServerRegistry::global()
951            .get_or_spawn(
952                &self.config.host,
953                self.config.port,
954                self.config.max_request_body,
955                self.config.max_response_body,
956                self.config.max_inflight_requests,
957                self.runtime.clone(),
958                ctx.route_id().to_string(),
959            )
960            .await?;
961
962        // Create channel for this path and register it
963        let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
964        registry
965            .register_api_route(self.config.path.clone(), env_tx)
966            .await;
967
968        let path = self.config.path.clone();
969        let registry_for_cleanup = registry.clone();
970        let cancel_token = ctx.cancel_token();
971        loop {
972            tokio::select! {
973                _ = ctx.cancelled() => {
974                    break;
975                }
976                envelope = env_rx.recv() => {
977                    let Some(envelope) = envelope else { break; };
978
979                    // Build Exchange from HTTP request
980                    let mut msg = Message::default();
981
982                    // Set standard Camel HTTP headers
983                    msg.set_header("CamelHttpMethod",
984                        serde_json::Value::String(envelope.method.clone()));
985                    msg.set_header("CamelHttpPath",
986                        serde_json::Value::String(envelope.path.clone()));
987                    msg.set_header("CamelHttpQuery",
988                        serde_json::Value::String(envelope.query.clone()));
989
990                    // Forward HTTP headers with Title-Case names (hyper lowercases them)
991                    for (k, v) in &envelope.headers {
992                        if let Ok(val_str) = v.to_str() {
993                            msg.set_header(
994                                title_case_header(k.as_str()),
995                                serde_json::Value::String(val_str.to_string()),
996                            );
997                        }
998                    }
999
1000                    // Body: always arrives as Body::Stream (native streaming)
1001                    // Routes can call into_bytes() if they need to materialize
1002                    msg.body = Body::Stream(envelope.body);
1003
1004                    #[allow(unused_mut)]
1005                    let mut exchange = Exchange::new(msg);
1006
1007                    // Extract W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
1008                    #[cfg(feature = "otel")]
1009                    {
1010                        let headers: HashMap<String, String> = envelope
1011                            .headers
1012                            .iter()
1013                            .filter_map(|(k, v)| {
1014                                Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
1015                            })
1016                            .collect();
1017                        camel_otel::extract_into_exchange(&mut exchange, &headers);
1018                    }
1019
1020                    let reply_tx = envelope.reply_tx;
1021                    let sender = ctx.sender().clone();
1022                    let path_clone = path.clone();
1023                    let cancel = cancel_token.clone();
1024
1025                    // Spawn a task to handle this request concurrently
1026                    //
1027                    // NOTE: This spawns a separate tokio task for each incoming HTTP request to enable
1028                    // true concurrent request processing. This change was introduced as part of the
1029                    // pipeline concurrency feature and was NOT part of the original HttpConsumer design.
1030                    //
1031                    // Rationale:
1032                    // 1. Without spawning per-request tasks, the send_and_wait() operation would block
1033                    //    the consumer's main loop until the pipeline processing completes
1034                    // 2. This blocking would prevent multiple HTTP requests from being processed
1035                    //    concurrently, even when ConcurrencyModel::Concurrent is enabled on the pipeline
1036                    // 3. The channel would never have multiple exchanges buffered simultaneously,
1037                    //    defeating the purpose of pipeline-side concurrency
1038                    // 4. By spawning a task per request, we allow the consumer loop to continue
1039                    //    accepting new requests while existing ones are processed in the pipeline
1040                    //
1041                    // This approach effectively decouples request acceptance from pipeline processing,
1042                    // allowing the channel to buffer multiple exchanges that can be processed concurrently
1043                    // by the pipeline when ConcurrencyModel::Concurrent is active.
1044                    tokio::spawn(async move {
1045                        // Check for cancellation before sending to pipeline.
1046                        // Returns 503 (Service Unavailable) instead of letting the request
1047                        // enter a shutting-down pipeline. This is a behavioral change from
1048                        // the pre-concurrency implementation where cancellation during
1049                        // processing would result in a 500 (Internal Server Error).
1050                        // 503 is more semantically correct: the server is temporarily
1051                        // unable to handle the request due to shutdown.
1052                        if cancel.is_cancelled() {
1053                            let _ = reply_tx.send(HttpReply {
1054                                status: 503,
1055                                headers: vec![],
1056                                body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
1057                            });
1058                            return;
1059                        }
1060
1061                        // Send through pipeline and await result
1062                        let (tx, rx) = tokio::sync::oneshot::channel();
1063                        let envelope = camel_component_api::consumer::ExchangeEnvelope {
1064                            exchange,
1065                            reply_tx: Some(tx),
1066                        };
1067
1068                        let result = match sender.send(envelope).await {
1069                            Ok(()) => rx.await.map_err(|_| camel_component_api::CamelError::ChannelClosed),
1070                            Err(_) => Err(camel_component_api::CamelError::ChannelClosed),
1071                        }
1072                        .and_then(|r| r);
1073
1074                        let reply = match result {
1075                            Ok(out) => {
1076                                let status = out
1077                                    .input
1078                                    .header("CamelHttpResponseCode")
1079                                    .and_then(|v| {
1080                                        let raw = v.as_u64()
1081                                            .or_else(|| v.as_str().and_then(|s| s.parse().ok()))?;
1082                                        let code = raw as u16;
1083                                        (100..1000).contains(&code).then_some(code)
1084                                    })
1085                                    .unwrap_or(200);
1086
1087                                let user_content_type = out
1088                                    .input
1089                                    .header("Content-Type")
1090                                    .and_then(|v| v.as_str().map(|s| s.to_string()));
1091
1092                                let (reply_body, inferred_content_type): (HttpReplyBody, Option<String>) = match out.input.body {
1093                                    Body::Empty => (HttpReplyBody::Bytes(bytes::Bytes::new()), None),
1094                                    Body::Bytes(b) => (HttpReplyBody::Bytes(b), None),
1095                                    Body::Text(s) => (HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())), Some("text/plain; charset=utf-8".to_string())),
1096                                    Body::Xml(s) => (HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())), Some("application/xml".to_string())),
1097                                    Body::Json(v) => (HttpReplyBody::Bytes(bytes::Bytes::from(
1098                                        v.to_string().into_bytes(),
1099                                    )), Some("application/json".to_string())),
1100                                    Body::Stream(s) => {
1101                                        let ct = s.metadata.content_type.clone();
1102                                        match s.stream.lock().await.take() {
1103                                            Some(stream) => (
1104                                                HttpReplyBody::Stream(stream),
1105                                                ct,
1106                                            ),
1107                                            None => {
1108                                                // log-policy: system-broken
1109                                                tracing::error!(
1110                                                    "Body::Stream already consumed before HTTP reply — returning 500"
1111                                                );
1112                                                let error_reply = HttpReply {
1113                                                    status: 500,
1114                                                    headers: vec![],
1115                                                    body: HttpReplyBody::Bytes(bytes::Bytes::new()),
1116                                                };
1117                                                if reply_tx.send(error_reply).is_err() {
1118                                                    debug!("reply_tx dropped before error reply could be sent");
1119                                                }
1120                                                return;
1121                                            }
1122                                        }
1123                                    }
1124                                };
1125
1126                                let mut resp_headers: Vec<(String, String)> = out
1127                                    .input
1128                                    .headers
1129                                    .iter()
1130                                    .filter(|(k, _)| !k.starts_with("Camel"))
1131                                    .filter(|(k, _)| {
1132                                        !matches!(
1133                                            k.to_lowercase().as_str(),
1134                                            "content-length"
1135                                            | "content-type"
1136                                            | "transfer-encoding"
1137                                            | "connection"
1138                                            | "cache-control"
1139                                            | "date"
1140                                            | "pragma"
1141                                            | "trailer"
1142                                            | "upgrade"
1143                                            | "via"
1144                                            | "warning"
1145                                            | "host"
1146                                            | "user-agent"
1147                                            | "accept"
1148                                            | "accept-encoding"
1149                                            | "accept-language"
1150                                            | "accept-charset"
1151                                            | "authorization"
1152                                            | "proxy-authorization"
1153                                            | "cookie"
1154                                            | "expect"
1155                                            | "from"
1156                                            | "if-match"
1157                                            | "if-modified-since"
1158                                            | "if-none-match"
1159                                            | "if-range"
1160                                            | "if-unmodified-since"
1161                                            | "max-forwards"
1162                                            | "proxy-connection"
1163                                            | "range"
1164                                            | "referer"
1165                                            | "te"
1166                                        )
1167                                    })
1168                                    .filter_map(|(k, v)| {
1169                                        v.as_str().map(|s| (k.clone(), s.to_string()))
1170                                    })
1171                                    .collect();
1172
1173                                let content_type = user_content_type
1174                                    .or(inferred_content_type);
1175                                if let Some(ct) = content_type {
1176                                    resp_headers.push(("Content-Type".to_string(), ct));
1177                                }
1178
1179                                HttpReply {
1180                                    status,
1181                                    headers: resp_headers,
1182                                    body: reply_body,
1183                                }
1184                            }
1185                            Err(CamelError::Stopped) => {
1186                                tracing::debug!(path = %path_clone, "Route stopped — returning 204 No Content");
1187                                HttpReply {
1188                                    status: 204,
1189                                    headers: vec![],
1190                                    body: HttpReplyBody::Bytes(bytes::Bytes::new()),
1191                                }
1192                            }
1193                            Err(CamelError::Unauthenticated(msg)) => {
1194                                tracing::warn!(error = %msg, path = %path_clone, "Authentication failed");
1195                                HttpReply {
1196                                    status: 401,
1197                                    headers: vec![("WWW-Authenticate".to_string(), "Bearer".to_string())],
1198                                    body: HttpReplyBody::Bytes(bytes::Bytes::from("Unauthorized")),
1199                                }
1200                            }
1201                            Err(CamelError::Unauthorized(msg)) => {
1202                                tracing::warn!(error = %msg, path = %path_clone, "Authorization failed");
1203                                HttpReply {
1204                                    status: 403,
1205                                    headers: vec![],
1206                                    body: HttpReplyBody::Bytes(bytes::Bytes::from("Forbidden")),
1207                                }
1208                            }
1209                            Err(e) => {
1210                                // log-policy: handler-owned
1211                                tracing::warn!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
1212                                HttpReply {
1213                                    status: 500,
1214                                    headers: vec![],
1215                                    body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
1216                                }
1217                            }
1218                        };
1219
1220                        // Reply to Axum handler (ignore error if client disconnected)
1221                        let _ = reply_tx.send(reply);
1222                    });
1223                }
1224            }
1225        }
1226
1227        // Deregister this path
1228        registry_for_cleanup.unregister_api_route(&path).await;
1229
1230        Ok(())
1231    }
1232
1233    async fn stop(&mut self) -> Result<(), CamelError> {
1234        Ok(())
1235    }
1236
1237    fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1238        camel_component_api::ConcurrencyModel::Concurrent { max: None }
1239    }
1240}
1241
1242// ---------------------------------------------------------------------------
1243// HttpComponent / HttpsComponent
1244// ---------------------------------------------------------------------------
1245
1246pub struct HttpComponent {
1247    config: HttpConfig,
1248}
1249
1250fn build_client(config: &HttpConfig, cookie_handling: CookieHandling) -> reqwest::Client {
1251    let mut builder = reqwest::Client::builder()
1252        .connect_timeout(Duration::from_millis(config.connect_timeout_ms))
1253        .pool_max_idle_per_host(config.pool_max_idle_per_host)
1254        .pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
1255
1256    if !config.follow_redirects {
1257        builder = builder.redirect(reqwest::redirect::Policy::none());
1258    } else if let Some(max_redirects) = config.max_redirects {
1259        builder = builder.redirect(reqwest::redirect::Policy::limited(max_redirects));
1260    }
1261
1262    if let Some(proxy_url) = &config.proxy_url
1263        && let Ok(proxy) = reqwest::Proxy::all(proxy_url)
1264    {
1265        builder = builder.proxy(proxy);
1266    }
1267
1268    if matches!(cookie_handling, CookieHandling::InMemory) {
1269        // TODO(HTTP-013): enable reqwest cookie jar once workspace reqwest features include cookie_store.
1270    }
1271
1272    if let Some(tls) = &config.tls
1273        && tls.enabled
1274    {
1275        if tls.insecure || !tls.verify_peer {
1276            builder = builder.danger_accept_invalid_certs(true);
1277        }
1278
1279        if let Some(ca_path) = &tls.ca_cert_path
1280            && let Ok(ca_bytes) = std::fs::read(ca_path)
1281        {
1282            let cert = reqwest::Certificate::from_pem(&ca_bytes)
1283                .or_else(|_| reqwest::Certificate::from_der(&ca_bytes));
1284            if let Ok(ca_cert) = cert {
1285                builder = builder.add_root_certificate(ca_cert);
1286            }
1287        }
1288
1289        if let (Some(cert_path), Some(key_path)) = (&tls.client_cert_path, &tls.client_key_path)
1290            && let (Ok(cert_bytes), Ok(key_bytes)) =
1291                (std::fs::read(cert_path), std::fs::read(key_path))
1292        {
1293            let mut identity_pem = cert_bytes;
1294            identity_pem.extend_from_slice(&key_bytes);
1295            if let Ok(identity) = reqwest::Identity::from_pem(&identity_pem) {
1296                builder = builder.identity(identity);
1297            }
1298        }
1299    }
1300
1301    builder
1302        .build()
1303        .expect("reqwest::Client::build() with valid config should not fail") // allow-unwrap
1304}
1305
1306impl HttpComponent {
1307    pub fn new() -> Self {
1308        let config = HttpConfig::default();
1309        Self { config }
1310    }
1311
1312    pub fn with_config(config: HttpConfig) -> Self {
1313        Self { config }
1314    }
1315
1316    pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1317        match config {
1318            Some(cfg) => Self::with_config(cfg),
1319            None => Self::new(),
1320        }
1321    }
1322}
1323
1324impl Default for HttpComponent {
1325    fn default() -> Self {
1326        Self::new()
1327    }
1328}
1329
1330impl Component for HttpComponent {
1331    fn scheme(&self) -> &str {
1332        "http"
1333    }
1334
1335    fn create_endpoint(
1336        &self,
1337        uri: &str,
1338        ctx: &dyn camel_component_api::ComponentContext,
1339    ) -> Result<Box<dyn Endpoint>, CamelError> {
1340        self.config.validate()?;
1341        let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1342        let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1343        let client = build_client(&self.config, config.cookie_handling);
1344        ctx.register_current_route_health_check(Arc::new(HttpHealthCheck::new(
1345            server_config.host.clone(),
1346            server_config.port,
1347        )));
1348        Ok(Box::new(HttpEndpoint {
1349            uri: uri.to_string(),
1350            config,
1351            server_config,
1352            client,
1353        }))
1354    }
1355}
1356
1357pub struct HttpsComponent {
1358    config: HttpConfig,
1359}
1360
1361impl HttpsComponent {
1362    pub fn new() -> Self {
1363        let config = HttpConfig::default();
1364        Self { config }
1365    }
1366
1367    pub fn with_config(config: HttpConfig) -> Self {
1368        Self { config }
1369    }
1370
1371    pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1372        match config {
1373            Some(cfg) => Self::with_config(cfg),
1374            None => Self::new(),
1375        }
1376    }
1377}
1378
1379impl Default for HttpsComponent {
1380    fn default() -> Self {
1381        Self::new()
1382    }
1383}
1384
1385impl Component for HttpsComponent {
1386    fn scheme(&self) -> &str {
1387        "https"
1388    }
1389
1390    fn create_endpoint(
1391        &self,
1392        uri: &str,
1393        ctx: &dyn camel_component_api::ComponentContext,
1394    ) -> Result<Box<dyn Endpoint>, CamelError> {
1395        self.config.validate()?;
1396        let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1397        let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1398        let client = build_client(&self.config, config.cookie_handling);
1399        ctx.register_current_route_health_check(Arc::new(HttpHealthCheck::new(
1400            server_config.host.clone(),
1401            server_config.port,
1402        )));
1403        Ok(Box::new(HttpEndpoint {
1404            uri: uri.to_string(),
1405            config,
1406            server_config,
1407            client,
1408        }))
1409    }
1410}
1411
1412// ---------------------------------------------------------------------------
1413// HttpEndpoint
1414// ---------------------------------------------------------------------------
1415
1416struct HttpEndpoint {
1417    uri: String,
1418    config: HttpEndpointConfig,
1419    server_config: HttpServerConfig,
1420    client: reqwest::Client,
1421}
1422
1423impl Endpoint for HttpEndpoint {
1424    fn uri(&self) -> &str {
1425        &self.uri
1426    }
1427
1428    fn create_consumer(
1429        &self,
1430        rt: Arc<dyn camel_component_api::RuntimeObservability>,
1431    ) -> Result<Box<dyn Consumer>, CamelError> {
1432        Ok(Box::new(HttpConsumer::new(self.server_config.clone(), rt)))
1433    }
1434
1435    fn create_producer(
1436        &self,
1437        _rt: Arc<dyn camel_component_api::RuntimeObservability>,
1438        _ctx: &ProducerContext,
1439    ) -> Result<BoxProcessor, CamelError> {
1440        let producer = HttpProducer {
1441            config: Arc::new(self.config.clone()),
1442            client: self.client.clone(),
1443        };
1444        if let Some(ref provider) = self.config.token_provider {
1445            let layer = BearerTokenLayer::new(Arc::clone(provider));
1446            Ok(BoxProcessor::new(layer.layer(producer)))
1447        } else {
1448            Ok(BoxProcessor::new(producer))
1449        }
1450    }
1451}
1452
1453// ---------------------------------------------------------------------------
1454// SSRF Protection
1455// ---------------------------------------------------------------------------
1456
1457fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
1458    let parsed = url::Url::parse(url)
1459        .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1460
1461    // Check blocked hosts
1462    if let Some(host) = parsed.host_str()
1463        && config.blocked_hosts.iter().any(|blocked| host == blocked)
1464    {
1465        return Err(CamelError::ProcessorError(format!(
1466            "Host '{}' is blocked",
1467            host
1468        )));
1469    }
1470
1471    // Check private IPs if not allowed
1472    if !config.allow_private_ips
1473        && let Some(host) = parsed.host()
1474    {
1475        match host {
1476            url::Host::Ipv4(ip) => {
1477                if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
1478                    return Err(CamelError::ProcessorError(format!(
1479                        "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
1480                        ip
1481                    )));
1482                }
1483            }
1484            url::Host::Ipv6(ip) => {
1485                if ip.is_loopback() {
1486                    return Err(CamelError::ProcessorError(format!(
1487                        "Loopback IP '{}' not allowed",
1488                        ip
1489                    )));
1490                }
1491            }
1492            url::Host::Domain(domain) => {
1493                // Block common internal domains
1494                let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
1495                if blocked_domains.contains(&domain) {
1496                    return Err(CamelError::ProcessorError(format!(
1497                        "Domain '{}' is not allowed",
1498                        domain
1499                    )));
1500                }
1501            }
1502        }
1503    }
1504
1505    Ok(())
1506}
1507
1508fn is_private_ip(ip: &IpAddr) -> bool {
1509    match ip {
1510        IpAddr::V4(ipv4) => {
1511            ipv4.is_private() || ipv4.is_loopback() || ipv4.is_link_local() || ipv4.octets()[0] == 0
1512        }
1513        IpAddr::V6(ipv6) => {
1514            let seg0 = ipv6.segments()[0];
1515            ipv6.is_loopback()
1516                // fc00::/7 (ULA)
1517                || (seg0 & 0xfe00) == 0xfc00
1518                // fe80::/10 (link-local)
1519                || (seg0 & 0xffc0) == 0xfe80
1520                // ::ffff:0:0/96 (IPv4-mapped): only block if the mapped IPv4 is private
1521                || ipv6
1522                    .to_ipv4_mapped()
1523                    .map(|v4| {
1524                        v4.is_private()
1525                            || v4.is_loopback()
1526                            || v4.is_link_local()
1527                            || v4.octets()[0] == 0
1528                    })
1529                    .unwrap_or(false)
1530        }
1531    }
1532}
1533
1534async fn validate_resolved_host_for_ssrf(
1535    url: &str,
1536    config: &HttpEndpointConfig,
1537) -> Result<(), CamelError> {
1538    if config.allow_private_ips {
1539        return Ok(());
1540    }
1541
1542    let parsed = url::Url::parse(url)
1543        .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1544    let Some(host) = parsed.host_str() else {
1545        return Ok(());
1546    };
1547    let Some(port) = parsed.port_or_known_default() else {
1548        return Ok(());
1549    };
1550
1551    let resolved = tokio::net::lookup_host((host, port)).await.map_err(|e| {
1552        CamelError::ProcessorError(format!("Failed to resolve host '{}': {}", host, e))
1553    })?;
1554
1555    for addr in resolved {
1556        let ip = addr.ip();
1557        if is_private_ip(&ip) {
1558            return Err(CamelError::ProcessorError(format!(
1559                "Target resolved to private IP: {}",
1560                ip
1561            )));
1562        }
1563    }
1564
1565    Ok(())
1566}
1567
1568// ---------------------------------------------------------------------------
1569// HttpProducer
1570// ---------------------------------------------------------------------------
1571
1572#[derive(Clone)]
1573struct HttpProducer {
1574    config: Arc<HttpEndpointConfig>,
1575    client: reqwest::Client,
1576}
1577
1578impl HttpProducer {
1579    fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1580        if let Some(ref method) = config.http_method {
1581            return method.to_uppercase();
1582        }
1583        if let Some(method) = exchange
1584            .input
1585            .header("CamelHttpMethod")
1586            .and_then(|v| v.as_str())
1587        {
1588            return method.to_uppercase();
1589        }
1590        if !exchange.input.body.is_empty() {
1591            return "POST".to_string();
1592        }
1593        "GET".to_string()
1594    }
1595
1596    fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1597        if let Some(uri) = exchange
1598            .input
1599            .header("CamelHttpUri")
1600            .and_then(|v| v.as_str())
1601        {
1602            let mut url = uri.to_string();
1603            if let Some(path) = exchange
1604                .input
1605                .header("CamelHttpPath")
1606                .and_then(|v| v.as_str())
1607            {
1608                if !url.ends_with('/') && !path.starts_with('/') {
1609                    url.push('/');
1610                }
1611                url.push_str(path);
1612            }
1613            if let Some(query) = exchange
1614                .input
1615                .header("CamelHttpQuery")
1616                .and_then(|v| v.as_str())
1617            {
1618                url.push('?');
1619                url.push_str(query);
1620            }
1621            return url;
1622        }
1623
1624        let mut url = config.base_url.clone();
1625
1626        if let Some(path) = exchange
1627            .input
1628            .header("CamelHttpPath")
1629            .and_then(|v| v.as_str())
1630        {
1631            if !url.ends_with('/') && !path.starts_with('/') {
1632                url.push('/');
1633            }
1634            url.push_str(path);
1635        }
1636
1637        if let Some(query) = exchange
1638            .input
1639            .header("CamelHttpQuery")
1640            .and_then(|v| v.as_str())
1641        {
1642            url.push('?');
1643            url.push_str(query);
1644        } else if !config.query_params.is_empty() {
1645            let mut parsed = url::Url::parse(&url).expect("base URL must be valid"); // allow-unwrap
1646            for (k, v) in &config.query_params {
1647                parsed.query_pairs_mut().append_pair(k, v);
1648            }
1649            url = parsed.to_string();
1650        }
1651
1652        url
1653    }
1654
1655    fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1656        status >= range.0 && status <= range.1
1657    }
1658}
1659
1660impl Service<Exchange> for HttpProducer {
1661    type Response = Exchange;
1662    type Error = CamelError;
1663    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1664
1665    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1666        Poll::Ready(Ok(()))
1667    }
1668
1669    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1670        let config = self.config.clone();
1671        let client = self.client.clone();
1672
1673        Box::pin(async move {
1674            let method_str = HttpProducer::resolve_method(&exchange, &config);
1675            let url = HttpProducer::resolve_url(&exchange, &config);
1676
1677            // SECURITY: Validate URL for SSRF
1678            validate_url_for_ssrf(&url, &config)?;
1679            validate_resolved_host_for_ssrf(&url, &config).await?;
1680
1681            debug!(
1682                correlation_id = %exchange.correlation_id(),
1683                method = %method_str,
1684                url = %url,
1685                "HTTP request"
1686            );
1687
1688            let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1689                CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1690            })?;
1691
1692            let mut request = client.request(method, &url);
1693
1694            if let Some(timeout) = config.response_timeout {
1695                request = request.timeout(timeout);
1696            }
1697
1698            if let Some(user_agent) = &config.user_agent
1699                && !config.bridge_endpoint
1700            {
1701                request = request.header("User-Agent", user_agent.clone());
1702            }
1703
1704            // Inject W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
1705            #[cfg(feature = "otel")]
1706            let should_inject_otel = !config.bridge_endpoint;
1707            #[cfg(feature = "otel")]
1708            if should_inject_otel {
1709                let mut otel_headers = HashMap::new();
1710                camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1711                for (k, v) in otel_headers {
1712                    if let (Ok(name), Ok(val)) = (
1713                        reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1714                        reqwest::header::HeaderValue::from_str(&v),
1715                    ) {
1716                        request = request.header(name, val);
1717                    }
1718                }
1719            }
1720
1721            for (key, value) in &exchange.input.headers {
1722                if !key.starts_with("Camel")
1723                    && !config
1724                        .skip_request_headers
1725                        .iter()
1726                        .any(|h| h.eq_ignore_ascii_case(key))
1727                    && let Some(val_str) = value.as_str()
1728                    && let (Ok(name), Ok(val)) = (
1729                        reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1730                        reqwest::header::HeaderValue::from_str(val_str),
1731                    )
1732                {
1733                    request = request.header(name, val);
1734                }
1735            }
1736
1737            if !config.bridge_endpoint {
1738                match &config.auth {
1739                    HttpAuth::None => {}
1740                    HttpAuth::Basic { username, password } => {
1741                        request = request.basic_auth(username, Some(password));
1742                    }
1743                    HttpAuth::Bearer { token } => {
1744                        request = request.bearer_auth(token);
1745                    }
1746                }
1747
1748                if config.connection_close {
1749                    request = request.header("Connection", "close");
1750                }
1751            }
1752
1753            match exchange.input.body {
1754                Body::Stream(ref s) => {
1755                    let mut stream_lock = s.stream.lock().await;
1756                    if let Some(stream) = stream_lock.take() {
1757                        request = request.body(reqwest::Body::wrap_stream(stream));
1758                    } else {
1759                        return Err(CamelError::AlreadyConsumed);
1760                    }
1761                }
1762                _ => {
1763                    // For other types, materialize with configured limit
1764                    let body = std::mem::take(&mut exchange.input.body);
1765                    let bytes = body.into_bytes(config.max_body_size).await?;
1766                    if !bytes.is_empty() {
1767                        request = request.body(bytes);
1768                    }
1769                }
1770            }
1771
1772            let response = request
1773                .send()
1774                .await
1775                .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1776
1777            let status_code = response.status().as_u16();
1778            let status_text = response
1779                .status()
1780                .canonical_reason()
1781                .unwrap_or("Unknown")
1782                .to_string();
1783
1784            for (key, value) in response.headers() {
1785                if config
1786                    .skip_response_headers
1787                    .iter()
1788                    .any(|h| h.eq_ignore_ascii_case(key.as_str()))
1789                {
1790                    continue;
1791                }
1792                if let Ok(val_str) = value.to_str() {
1793                    exchange.input.set_header(
1794                        title_case_header(key.as_str()),
1795                        serde_json::Value::String(val_str.to_string()),
1796                    );
1797                }
1798            }
1799
1800            exchange.input.set_header(
1801                "CamelHttpResponseCode",
1802                serde_json::Value::Number(status_code.into()),
1803            );
1804            exchange.input.set_header(
1805                "CamelHttpResponseText",
1806                serde_json::Value::String(status_text.clone()),
1807            );
1808
1809            // Read response body with timeout and size guard (HTTP-004, HTTP-005)
1810            let read_timeout = Duration::from_millis(config.read_timeout_ms);
1811            let response_body = tokio::time::timeout(read_timeout, async {
1812                // Check Content-Length header before allocating
1813                if let Some(content_len) = response.content_length()
1814                    && content_len > config.max_response_bytes as u64
1815                {
1816                    return Err(CamelError::ProcessorError(format!(
1817                        "Response body too large: {} bytes exceeds limit of {} bytes",
1818                        content_len, config.max_response_bytes
1819                    )));
1820                }
1821                // Use bytes_stream() for lazy streaming with size guard
1822                use futures::TryStreamExt;
1823                let mut stream = response.bytes_stream();
1824                let mut total: usize = 0;
1825                let mut collected = Vec::new();
1826                while let Some(chunk) = stream.try_next().await.map_err(|e| {
1827                    CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1828                })? {
1829                    total += chunk.len();
1830                    if total > config.max_response_bytes {
1831                        return Err(CamelError::ProcessorError(format!(
1832                            "Response body too large: {} bytes exceeds limit of {} bytes",
1833                            total, config.max_response_bytes
1834                        )));
1835                    }
1836                    collected.push(chunk);
1837                }
1838                let mut result = bytes::BytesMut::with_capacity(total);
1839                for chunk in collected {
1840                    result.extend_from_slice(&chunk);
1841                }
1842                Ok::<bytes::Bytes, CamelError>(result.freeze())
1843            })
1844            .await
1845            .map_err(|_| {
1846                CamelError::ProcessorError(format!(
1847                    "Read timeout after {}ms",
1848                    config.read_timeout_ms
1849                ))
1850            })??;
1851
1852            if config.throw_exception_on_failure
1853                && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1854            {
1855                return Err(CamelError::HttpOperationFailed {
1856                    method: method_str,
1857                    url,
1858                    status_code,
1859                    status_text,
1860                    response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1861                });
1862            }
1863
1864            if !response_body.is_empty() {
1865                exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1866            }
1867
1868            debug!(
1869                correlation_id = %exchange.correlation_id(),
1870                status = status_code,
1871                url = %url,
1872                "HTTP response"
1873            );
1874            Ok(exchange)
1875        })
1876    }
1877}
1878
1879/// Serializes tests that mutate or depend on the global `ServerRegistry`.
1880///
1881/// `ServerRegistry::global()` is a process-wide singleton that persists
1882/// across tests. `ServerRegistry::reset()` clears ALL entries; if it races
1883/// with another test that has a live server on a fixed port (e.g. 9991),
1884/// the registry entry is removed while the OS socket is still bound, so
1885/// the next `get_or_spawn` call on that port fails with "Address already
1886/// in use". Holding this mutex for the full body of each affected test
1887/// prevents the race without requiring `--test-threads=1`.
1888#[cfg(test)]
1889pub(crate) static REGISTRY_TEST_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
1890
1891#[cfg(test)]
1892mod tests {
1893    use camel_component_api::test_support::{NoopRuntimeObservability, PanicRuntimeObservability};
1894    fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
1895        std::sync::Arc::new(PanicRuntimeObservability)
1896    }
1897    fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
1898        std::sync::Arc::new(PanicRuntimeObservability)
1899    }
1900    fn noop_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
1901        std::sync::Arc::new(NoopRuntimeObservability)
1902    }
1903
1904    use super::*;
1905    use camel_component_api::{Message, NoOpComponentContext};
1906    use std::sync::Arc;
1907    use std::time::Duration;
1908
1909    fn test_producer_ctx() -> ProducerContext {
1910        ProducerContext::new()
1911    }
1912
1913    #[test]
1914    fn test_http_config_defaults() {
1915        let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1916        assert_eq!(config.base_url, "http://localhost:8080/api");
1917        assert!(config.http_method.is_none());
1918        assert!(config.throw_exception_on_failure);
1919        assert_eq!(config.ok_status_code_range, (200, 299));
1920        assert!(config.response_timeout.is_none());
1921        assert!(matches!(config.auth, HttpAuth::None));
1922        assert!(matches!(config.cookie_handling, CookieHandling::Disabled));
1923        assert!(!config.bridge_endpoint);
1924        assert!(!config.connection_close);
1925    }
1926
1927    #[test]
1928    fn test_http_config_scheme() {
1929        // UriConfig trait method returns "http" as primary scheme
1930        assert_eq!(HttpEndpointConfig::scheme(), "http");
1931    }
1932
1933    #[test]
1934    fn test_http_config_from_components() {
1935        // Test from_components directly (trait method)
1936        let components = camel_component_api::UriComponents {
1937            scheme: "https".to_string(),
1938            path: "//api.example.com/v1".to_string(),
1939            params: std::collections::HashMap::from([(
1940                "httpMethod".to_string(),
1941                "POST".to_string(),
1942            )]),
1943        };
1944        let config = HttpEndpointConfig::from_components(components).unwrap();
1945        assert_eq!(config.base_url, "https://api.example.com/v1");
1946        assert_eq!(config.http_method, Some("POST".to_string()));
1947    }
1948
1949    #[test]
1950    fn test_http_config_with_options() {
1951        let config = HttpEndpointConfig::from_uri(
1952            "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1953        ).unwrap();
1954        assert_eq!(config.base_url, "https://api.example.com/v1");
1955        assert_eq!(config.http_method, Some("PUT".to_string()));
1956        assert!(!config.throw_exception_on_failure);
1957        assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1958    }
1959
1960    #[test]
1961    fn test_http_endpoint_config_auth_and_headers_options() {
1962        let config = HttpEndpointConfig::from_uri(
1963            "http://localhost/api?authMethod=Basic&authUsername=u&authPassword=p&userAgent=camel-test&bridgeEndpoint=true&connectionClose=true&skipRequestHeaders=Authorization,X-Secret&skipResponseHeaders=Set-Cookie&cookieHandling=InMemory",
1964        )
1965        .unwrap();
1966
1967        assert!(matches!(
1968            config.auth,
1969            HttpAuth::Basic { username, password } if username == "u" && password == "p"
1970        ));
1971        assert_eq!(config.user_agent.as_deref(), Some("camel-test"));
1972        assert!(matches!(config.cookie_handling, CookieHandling::InMemory));
1973        assert!(config.bridge_endpoint);
1974        assert!(config.connection_close);
1975        assert_eq!(
1976            config.skip_request_headers,
1977            vec!["authorization".to_string(), "x-secret".to_string()]
1978        );
1979        assert_eq!(config.skip_response_headers, vec!["set-cookie".to_string()]);
1980    }
1981
1982    #[test]
1983    fn test_http_endpoint_config_bearer_auth() {
1984        let config = HttpEndpointConfig::from_uri(
1985            "http://localhost/api?authMethod=Bearer&authBearerToken=t",
1986        )
1987        .unwrap();
1988        assert!(matches!(
1989            config.auth,
1990            HttpAuth::Bearer { token } if token == "t"
1991        ));
1992    }
1993
1994    #[test]
1995    fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
1996        let config = HttpConfig::default()
1997            .with_response_timeout_ms(999)
1998            .with_allow_private_ips(true)
1999            .with_blocked_hosts(vec!["evil.com".to_string()])
2000            .with_max_body_size(12345);
2001        let endpoint =
2002            HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
2003        assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
2004        assert!(endpoint.allow_private_ips);
2005        assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
2006        assert_eq!(endpoint.max_body_size, 12345);
2007    }
2008
2009    #[test]
2010    fn test_from_uri_with_defaults_uri_overrides_config() {
2011        let config = HttpConfig::default()
2012            .with_response_timeout_ms(999)
2013            .with_allow_private_ips(true)
2014            .with_blocked_hosts(vec!["evil.com".to_string()])
2015            .with_max_body_size(12345);
2016        let endpoint = HttpEndpointConfig::from_uri_with_defaults(
2017            "http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
2018            &config,
2019        )
2020        .unwrap();
2021        assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
2022        assert!(!endpoint.allow_private_ips);
2023        assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
2024        assert_eq!(endpoint.max_body_size, 99);
2025    }
2026
2027    #[test]
2028    fn test_http_config_ok_status_range() {
2029        let config =
2030            HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
2031        assert_eq!(config.ok_status_code_range, (200, 204));
2032    }
2033
2034    #[test]
2035    fn test_http_config_wrong_scheme() {
2036        let result = HttpEndpointConfig::from_uri("file:/tmp");
2037        assert!(result.is_err());
2038    }
2039
2040    #[test]
2041    fn test_http_component_scheme() {
2042        let component = HttpComponent::new();
2043        assert_eq!(component.scheme(), "http");
2044    }
2045
2046    #[test]
2047    fn test_https_component_scheme() {
2048        let component = HttpsComponent::new();
2049        assert_eq!(component.scheme(), "https");
2050    }
2051
2052    #[test]
2053    fn test_http_endpoint_creates_consumer() {
2054        let component = HttpComponent::new();
2055        let ctx = NoOpComponentContext;
2056        let endpoint = component
2057            .create_endpoint("http://0.0.0.0:19100/test", &ctx)
2058            .unwrap();
2059        assert!(endpoint.create_consumer(rt()).is_ok());
2060    }
2061
2062    #[test]
2063    fn test_https_endpoint_creates_consumer() {
2064        let component = HttpsComponent::new();
2065        let ctx = NoOpComponentContext;
2066        let endpoint = component
2067            .create_endpoint("https://0.0.0.0:8443/test", &ctx)
2068            .unwrap();
2069        assert!(endpoint.create_consumer(rt()).is_ok());
2070    }
2071
2072    #[test]
2073    fn test_http_endpoint_creates_producer() {
2074        let ctx = test_producer_ctx();
2075        let component = HttpComponent::new();
2076        let endpoint_ctx = NoOpComponentContext;
2077        let endpoint = component
2078            .create_endpoint("http://localhost/api", &endpoint_ctx)
2079            .unwrap();
2080        assert!(endpoint.create_producer(rt(), &ctx).is_ok());
2081    }
2082
2083    // -----------------------------------------------------------------------
2084    // Producer tests
2085    // -----------------------------------------------------------------------
2086
2087    #[tokio::test]
2088    async fn test_producer_with_token_provider() {
2089        use camel_auth::oauth2::TokenProvider;
2090        use tower::ServiceExt;
2091
2092        let captured_auth: Arc<std::sync::Mutex<Option<String>>> =
2093            Arc::new(std::sync::Mutex::new(None));
2094        let captured_clone = Arc::clone(&captured_auth);
2095
2096        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2097        let port = listener.local_addr().unwrap().port();
2098
2099        let _handle = tokio::spawn(async move {
2100            use tokio::io::{AsyncReadExt, AsyncWriteExt};
2101            if let Ok((mut stream, _)) = listener.accept().await {
2102                let mut buf = vec![0u8; 8192];
2103                let n = stream.read(&mut buf).await.unwrap_or(0);
2104                let request = String::from_utf8_lossy(&buf[..n]).to_string();
2105                let auth = request
2106                    .lines()
2107                    .find(|l| l.to_lowercase().starts_with("authorization:"))
2108                    .map(|l| {
2109                        l.split(':')
2110                            .nth(1)
2111                            .map(|s| s.trim().to_string())
2112                            .unwrap_or_default()
2113                    });
2114                *captured_clone.lock().unwrap() = auth;
2115                let body = r#"{"echo":"ok"}"#;
2116                let resp = format!(
2117                    "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
2118                    body.len(),
2119                    body
2120                );
2121                let _ = stream.write_all(resp.as_bytes()).await;
2122            }
2123        });
2124
2125        #[derive(Debug)]
2126        struct StaticProvider;
2127        #[async_trait::async_trait]
2128        impl TokenProvider for StaticProvider {
2129            async fn get_token(&self) -> Result<String, camel_auth::types::AuthError> {
2130                Ok("injected-token".into())
2131            }
2132        }
2133
2134        let uri = format!("http://127.0.0.1:{}/api?allowPrivateIps=true", port);
2135        let ctx = test_producer_ctx();
2136        let component = HttpComponent::new();
2137        let endpoint_ctx = NoOpComponentContext;
2138        let endpoint = component.create_endpoint(&uri, &endpoint_ctx).unwrap();
2139        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2140
2141        let exchange = Exchange::new(Message::new("hello"));
2142
2143        let layer = BearerTokenLayer::new(Arc::new(StaticProvider));
2144        let mut layered = layer.layer(producer);
2145        let result = layered.ready().await.unwrap().call(exchange).await;
2146        assert!(result.is_ok(), "producer call failed: {:?}", result);
2147
2148        tokio::time::sleep(Duration::from_millis(100)).await;
2149        let auth = captured_auth.lock().unwrap().take();
2150        assert_eq!(auth.as_deref(), Some("Bearer injected-token"));
2151    }
2152
2153    async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
2154        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2155        let addr = listener.local_addr().unwrap();
2156        let url = format!("http://127.0.0.1:{}", addr.port());
2157
2158        let handle = tokio::spawn(async move {
2159            loop {
2160                if let Ok((mut stream, _)) = listener.accept().await {
2161                    tokio::spawn(async move {
2162                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
2163                        let mut buf = vec![0u8; 4096];
2164                        let n = stream.read(&mut buf).await.unwrap_or(0);
2165                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
2166
2167                        let method = request.split_whitespace().next().unwrap_or("GET");
2168
2169                        let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
2170                        let response = format!(
2171                            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
2172                            body.len(),
2173                            body
2174                        );
2175                        let _ = stream.write_all(response.as_bytes()).await;
2176                    });
2177                }
2178            }
2179        });
2180
2181        (url, handle)
2182    }
2183
2184    async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
2185        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2186        let addr = listener.local_addr().unwrap();
2187        let url = format!("http://127.0.0.1:{}", addr.port());
2188
2189        let handle = tokio::spawn(async move {
2190            loop {
2191                if let Ok((mut stream, _)) = listener.accept().await {
2192                    let status = status;
2193                    tokio::spawn(async move {
2194                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
2195                        let mut buf = vec![0u8; 4096];
2196                        let _ = stream.read(&mut buf).await;
2197
2198                        let status_text = match status {
2199                            404 => "Not Found",
2200                            500 => "Internal Server Error",
2201                            _ => "Error",
2202                        };
2203                        let body = "error body";
2204                        let response = format!(
2205                            "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
2206                            status,
2207                            status_text,
2208                            body.len(),
2209                            body
2210                        );
2211                        let _ = stream.write_all(response.as_bytes()).await;
2212                    });
2213                }
2214            }
2215        });
2216
2217        (url, handle)
2218    }
2219
2220    #[tokio::test]
2221    async fn test_http_producer_get_request() {
2222        use tower::ServiceExt;
2223
2224        let (url, _handle) = start_test_server().await;
2225        let ctx = test_producer_ctx();
2226
2227        let component = HttpComponent::new();
2228        let endpoint_ctx = NoOpComponentContext;
2229        let endpoint = component
2230            .create_endpoint(
2231                &format!("{url}/api/test?allowPrivateIps=true"),
2232                &endpoint_ctx,
2233            )
2234            .unwrap();
2235        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2236
2237        let exchange = Exchange::new(Message::default());
2238        let result = producer.oneshot(exchange).await.unwrap();
2239
2240        let status = result
2241            .input
2242            .header("CamelHttpResponseCode")
2243            .and_then(|v| v.as_u64())
2244            .unwrap();
2245        assert_eq!(status, 200);
2246
2247        assert!(!result.input.body.is_empty());
2248    }
2249
2250    #[tokio::test]
2251    async fn test_http_producer_post_with_body() {
2252        use tower::ServiceExt;
2253
2254        let (url, _handle) = start_test_server().await;
2255        let ctx = test_producer_ctx();
2256
2257        let component = HttpComponent::new();
2258        let endpoint_ctx = NoOpComponentContext;
2259        let endpoint = component
2260            .create_endpoint(
2261                &format!("{url}/api/data?allowPrivateIps=true"),
2262                &endpoint_ctx,
2263            )
2264            .unwrap();
2265        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2266
2267        let exchange = Exchange::new(Message::new("request body"));
2268        let result = producer.oneshot(exchange).await.unwrap();
2269
2270        let status = result
2271            .input
2272            .header("CamelHttpResponseCode")
2273            .and_then(|v| v.as_u64())
2274            .unwrap();
2275        assert_eq!(status, 200);
2276    }
2277
2278    #[tokio::test]
2279    async fn test_http_producer_method_from_header() {
2280        use tower::ServiceExt;
2281
2282        let (url, _handle) = start_test_server().await;
2283        let ctx = test_producer_ctx();
2284
2285        let component = HttpComponent::new();
2286        let endpoint_ctx = NoOpComponentContext;
2287        let endpoint = component
2288            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2289            .unwrap();
2290        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2291
2292        let mut exchange = Exchange::new(Message::default());
2293        exchange.input.set_header(
2294            "CamelHttpMethod",
2295            serde_json::Value::String("DELETE".to_string()),
2296        );
2297
2298        let result = producer.oneshot(exchange).await.unwrap();
2299        let status = result
2300            .input
2301            .header("CamelHttpResponseCode")
2302            .and_then(|v| v.as_u64())
2303            .unwrap();
2304        assert_eq!(status, 200);
2305    }
2306
2307    #[tokio::test]
2308    async fn test_http_producer_forced_method() {
2309        use tower::ServiceExt;
2310
2311        let (url, _handle) = start_test_server().await;
2312        let ctx = test_producer_ctx();
2313
2314        let component = HttpComponent::new();
2315        let endpoint_ctx = NoOpComponentContext;
2316        let endpoint = component
2317            .create_endpoint(
2318                &format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"),
2319                &endpoint_ctx,
2320            )
2321            .unwrap();
2322        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2323
2324        let exchange = Exchange::new(Message::default());
2325        let result = producer.oneshot(exchange).await.unwrap();
2326
2327        let status = result
2328            .input
2329            .header("CamelHttpResponseCode")
2330            .and_then(|v| v.as_u64())
2331            .unwrap();
2332        assert_eq!(status, 200);
2333    }
2334
2335    #[tokio::test]
2336    async fn test_http_producer_throw_exception_on_failure() {
2337        use tower::ServiceExt;
2338
2339        let (url, _handle) = start_status_server(404).await;
2340        let ctx = test_producer_ctx();
2341
2342        let component = HttpComponent::new();
2343        let endpoint_ctx = NoOpComponentContext;
2344        let endpoint = component
2345            .create_endpoint(
2346                &format!("{url}/not-found?allowPrivateIps=true"),
2347                &endpoint_ctx,
2348            )
2349            .unwrap();
2350        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2351
2352        let exchange = Exchange::new(Message::default());
2353        let result = producer.oneshot(exchange).await;
2354        assert!(result.is_err());
2355
2356        match result.unwrap_err() {
2357            CamelError::HttpOperationFailed { status_code, .. } => {
2358                assert_eq!(status_code, 404);
2359            }
2360            e => panic!("Expected HttpOperationFailed, got: {e}"),
2361        }
2362    }
2363
2364    #[tokio::test]
2365    async fn test_http_producer_no_throw_on_failure() {
2366        use tower::ServiceExt;
2367
2368        let (url, _handle) = start_status_server(500).await;
2369        let ctx = test_producer_ctx();
2370
2371        let component = HttpComponent::new();
2372        let endpoint_ctx = NoOpComponentContext;
2373        let endpoint = component
2374            .create_endpoint(
2375                &format!("{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"),
2376                &endpoint_ctx,
2377            )
2378            .unwrap();
2379        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2380
2381        let exchange = Exchange::new(Message::default());
2382        let result = producer.oneshot(exchange).await.unwrap();
2383
2384        let status = result
2385            .input
2386            .header("CamelHttpResponseCode")
2387            .and_then(|v| v.as_u64())
2388            .unwrap();
2389        assert_eq!(status, 500);
2390    }
2391
2392    #[tokio::test]
2393    async fn test_http_producer_uri_override() {
2394        use tower::ServiceExt;
2395
2396        let (url, _handle) = start_test_server().await;
2397        let ctx = test_producer_ctx();
2398
2399        let component = HttpComponent::new();
2400        let endpoint_ctx = NoOpComponentContext;
2401        let endpoint = component
2402            .create_endpoint(
2403                "http://localhost:1/does-not-exist?allowPrivateIps=true",
2404                &endpoint_ctx,
2405            )
2406            .unwrap();
2407        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2408
2409        let mut exchange = Exchange::new(Message::default());
2410        exchange.input.set_header(
2411            "CamelHttpUri",
2412            serde_json::Value::String(format!("{url}/api")),
2413        );
2414
2415        let result = producer.oneshot(exchange).await.unwrap();
2416        let status = result
2417            .input
2418            .header("CamelHttpResponseCode")
2419            .and_then(|v| v.as_u64())
2420            .unwrap();
2421        assert_eq!(status, 200);
2422    }
2423
2424    #[tokio::test]
2425    async fn test_http_producer_response_headers_mapped() {
2426        use tower::ServiceExt;
2427
2428        let (url, _handle) = start_test_server().await;
2429        let ctx = test_producer_ctx();
2430
2431        let component = HttpComponent::new();
2432        let endpoint_ctx = NoOpComponentContext;
2433        let endpoint = component
2434            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2435            .unwrap();
2436        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2437
2438        let exchange = Exchange::new(Message::default());
2439        let result = producer.oneshot(exchange).await.unwrap();
2440
2441        assert!(
2442            result.input.header("Content-Type").is_some(),
2443            "Response should have Content-Type header"
2444        );
2445        assert!(result.input.header("CamelHttpResponseText").is_some());
2446    }
2447
2448    // -----------------------------------------------------------------------
2449    // Bug fix tests: Client configuration per-endpoint
2450    // -----------------------------------------------------------------------
2451
2452    async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
2453        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2454        let addr = listener.local_addr().unwrap();
2455        let url = format!("http://127.0.0.1:{}", addr.port());
2456
2457        let handle = tokio::spawn(async move {
2458            use tokio::io::{AsyncReadExt, AsyncWriteExt};
2459            loop {
2460                if let Ok((mut stream, _)) = listener.accept().await {
2461                    tokio::spawn(async move {
2462                        let mut buf = vec![0u8; 4096];
2463                        let n = stream.read(&mut buf).await.unwrap_or(0);
2464                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
2465
2466                        // Check if this is a request to /final
2467                        if request.contains("GET /final") {
2468                            let body = r#"{"status":"final"}"#;
2469                            let response = format!(
2470                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
2471                                body.len(),
2472                                body
2473                            );
2474                            let _ = stream.write_all(response.as_bytes()).await;
2475                        } else {
2476                            // Redirect to /final
2477                            let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
2478                            let _ = stream.write_all(response.as_bytes()).await;
2479                        }
2480                    });
2481                }
2482            }
2483        });
2484
2485        (url, handle)
2486    }
2487
2488    #[tokio::test]
2489    async fn test_follow_redirects_false_does_not_follow() {
2490        use tower::ServiceExt;
2491
2492        let (url, _handle) = start_redirect_server().await;
2493        let ctx = test_producer_ctx();
2494
2495        let component =
2496            HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
2497        let endpoint_ctx = NoOpComponentContext;
2498        let endpoint = component
2499            .create_endpoint(
2500                &format!("{url}?throwExceptionOnFailure=false&allowPrivateIps=true"),
2501                &endpoint_ctx,
2502            )
2503            .unwrap();
2504        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2505
2506        let exchange = Exchange::new(Message::default());
2507        let result = producer.oneshot(exchange).await.unwrap();
2508
2509        // Should get 302, NOT follow redirect to 200
2510        let status = result
2511            .input
2512            .header("CamelHttpResponseCode")
2513            .and_then(|v| v.as_u64())
2514            .unwrap();
2515        assert_eq!(
2516            status, 302,
2517            "Should NOT follow redirect when followRedirects=false"
2518        );
2519    }
2520
2521    #[tokio::test]
2522    async fn test_follow_redirects_true_follows_redirect() {
2523        use tower::ServiceExt;
2524
2525        let (url, _handle) = start_redirect_server().await;
2526        let ctx = test_producer_ctx();
2527
2528        let component =
2529            HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
2530        let endpoint_ctx = NoOpComponentContext;
2531        let endpoint = component
2532            .create_endpoint(&format!("{url}?allowPrivateIps=true"), &endpoint_ctx)
2533            .unwrap();
2534        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2535
2536        let exchange = Exchange::new(Message::default());
2537        let result = producer.oneshot(exchange).await.unwrap();
2538
2539        // Should follow redirect and get 200
2540        let status = result
2541            .input
2542            .header("CamelHttpResponseCode")
2543            .and_then(|v| v.as_u64())
2544            .unwrap();
2545        assert_eq!(
2546            status, 200,
2547            "Should follow redirect when followRedirects=true"
2548        );
2549    }
2550
2551    #[tokio::test]
2552    async fn test_query_params_forwarded_to_http_request() {
2553        use tower::ServiceExt;
2554
2555        let (url, _handle) = start_test_server().await;
2556        let ctx = test_producer_ctx();
2557
2558        let component = HttpComponent::new();
2559        let endpoint_ctx = NoOpComponentContext;
2560        // apiKey is NOT a Camel option, should be forwarded as query param
2561        let endpoint = component
2562            .create_endpoint(
2563                &format!("{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"),
2564                &endpoint_ctx,
2565            )
2566            .unwrap();
2567        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2568
2569        let exchange = Exchange::new(Message::default());
2570        let result = producer.oneshot(exchange).await.unwrap();
2571
2572        // The test server returns the request info in response
2573        // We just verify it succeeds (the query param was sent)
2574        let status = result
2575            .input
2576            .header("CamelHttpResponseCode")
2577            .and_then(|v| v.as_u64())
2578            .unwrap();
2579        assert_eq!(status, 200);
2580    }
2581
2582    #[tokio::test]
2583    async fn test_non_camel_query_params_are_forwarded() {
2584        // This test verifies Bug #3 fix: non-Camel options should be forwarded
2585        // We'll test the config parsing, not the actual HTTP call
2586        let config = HttpEndpointConfig::from_uri(
2587            "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
2588        )
2589        .unwrap();
2590
2591        // apiKey and token are NOT Camel options, should be forwarded
2592        assert!(
2593            config.query_params.contains_key("apiKey"),
2594            "apiKey should be preserved"
2595        );
2596        assert!(
2597            config.query_params.contains_key("token"),
2598            "token should be preserved"
2599        );
2600        assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
2601        assert_eq!(config.query_params.get("token").unwrap(), "abc456");
2602
2603        // httpMethod IS a Camel option, should NOT be in query_params
2604        assert!(
2605            !config.query_params.contains_key("httpMethod"),
2606            "httpMethod should not be forwarded"
2607        );
2608    }
2609
2610    #[test]
2611    fn test_query_params_are_url_encoded_when_resolving_url() {
2612        let config =
2613            HttpEndpointConfig::from_uri("http://example.com/api?q=hello world&tag=a+b").unwrap();
2614        let exchange = Exchange::new(Message::default());
2615
2616        let url = HttpProducer::resolve_url(&exchange, &config);
2617
2618        assert!(url.contains("q=hello+world"), "url was: {url}");
2619        assert!(url.contains("tag=a%2Bb"), "url was: {url}");
2620    }
2621
2622    // -----------------------------------------------------------------------
2623    // Timeout tests (HTTP-004)
2624    // -----------------------------------------------------------------------
2625
2626    async fn start_slow_server(delay_ms: u64) -> (String, tokio::task::JoinHandle<()>) {
2627        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2628        let addr = listener.local_addr().unwrap();
2629        let url = format!("http://127.0.0.1:{}", addr.port());
2630
2631        let handle = tokio::spawn(async move {
2632            loop {
2633                if let Ok((mut stream, _)) = listener.accept().await {
2634                    let delay = delay_ms;
2635                    tokio::spawn(async move {
2636                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
2637                        let mut buf = vec![0u8; 4096];
2638                        let _ = stream.read(&mut buf).await;
2639                        // Send headers immediately (no Content-Length → chunked)
2640                        let headers = "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nTransfer-Encoding: chunked\r\n\r\n";
2641                        let _ = stream.write_all(headers.as_bytes()).await;
2642                        // Delay before sending body chunk
2643                        tokio::time::sleep(Duration::from_millis(delay)).await;
2644                        let body = r#"{"status":"slow"}"#;
2645                        let chunk = format!("{:x}\r\n{}\r\n0\r\n\r\n", body.len(), body);
2646                        let _ = stream.write_all(chunk.as_bytes()).await;
2647                    });
2648                }
2649            }
2650        });
2651
2652        (url, handle)
2653    }
2654
2655    #[tokio::test]
2656    async fn test_http_producer_timeout() {
2657        use tower::ServiceExt;
2658
2659        // Server delays 500ms, client timeout is 100ms → should timeout
2660        let (url, _handle) = start_slow_server(500).await;
2661        let ctx = test_producer_ctx();
2662
2663        let component = HttpComponent::with_config(
2664            HttpConfig::default()
2665                .with_read_timeout_ms(100)
2666                .with_response_timeout_ms(30_000), // generous response timeout
2667        );
2668        let endpoint_ctx = NoOpComponentContext;
2669        let endpoint = component
2670            .create_endpoint(&format!("{url}/slow?allowPrivateIps=true"), &endpoint_ctx)
2671            .unwrap();
2672        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2673
2674        let exchange = Exchange::new(Message::default());
2675        let result = producer.oneshot(exchange).await;
2676
2677        assert!(result.is_err(), "Expected timeout error, got: {:?}", result);
2678        let err = result.unwrap_err().to_string();
2679        assert!(
2680            err.contains("Read timeout") || err.contains("timeout"),
2681            "Error should mention timeout, got: {}",
2682            err
2683        );
2684    }
2685
2686    #[tokio::test]
2687    async fn test_http_producer_no_timeout_when_fast() {
2688        use tower::ServiceExt;
2689
2690        let (url, _handle) = start_test_server().await;
2691        let ctx = test_producer_ctx();
2692
2693        let component =
2694            HttpComponent::with_config(HttpConfig::default().with_read_timeout_ms(5_000));
2695        let endpoint_ctx = NoOpComponentContext;
2696        let endpoint = component
2697            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2698            .unwrap();
2699        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2700
2701        let exchange = Exchange::new(Message::default());
2702        let result = producer.oneshot(exchange).await.unwrap();
2703
2704        let status = result
2705            .input
2706            .header("CamelHttpResponseCode")
2707            .and_then(|v| v.as_u64())
2708            .unwrap();
2709        assert_eq!(status, 200);
2710    }
2711
2712    // -----------------------------------------------------------------------
2713    // SSRF Protection tests
2714    // -----------------------------------------------------------------------
2715
2716    #[tokio::test]
2717    async fn test_http_producer_blocks_metadata_endpoint() {
2718        use tower::ServiceExt;
2719
2720        let ctx = test_producer_ctx();
2721        let component = HttpComponent::new();
2722        let endpoint_ctx = NoOpComponentContext;
2723        let endpoint = component
2724            .create_endpoint(
2725                "http://example.com/api?allowPrivateIps=false",
2726                &endpoint_ctx,
2727            )
2728            .unwrap();
2729        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2730
2731        let mut exchange = Exchange::new(Message::default());
2732        exchange.input.set_header(
2733            "CamelHttpUri",
2734            serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
2735        );
2736
2737        let result = producer.oneshot(exchange).await;
2738        assert!(result.is_err(), "Should block AWS metadata endpoint");
2739
2740        let err = result.unwrap_err();
2741        assert!(
2742            err.to_string().contains("Private IP"),
2743            "Error should mention private IP blocking, got: {}",
2744            err
2745        );
2746    }
2747
2748    #[test]
2749    fn test_ssrf_config_defaults() {
2750        let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
2751        assert!(
2752            !config.allow_private_ips,
2753            "Private IPs should be blocked by default"
2754        );
2755        assert!(
2756            config.blocked_hosts.is_empty(),
2757            "Blocked hosts should be empty by default"
2758        );
2759    }
2760
2761    #[test]
2762    fn test_ssrf_config_allow_private_ips() {
2763        let config =
2764            HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
2765        assert!(
2766            config.allow_private_ips,
2767            "Private IPs should be allowed when explicitly set"
2768        );
2769    }
2770
2771    #[test]
2772    fn test_ssrf_config_blocked_hosts() {
2773        let config = HttpEndpointConfig::from_uri(
2774            "http://example.com/api?blockedHosts=evil.com,malware.net",
2775        )
2776        .unwrap();
2777        assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
2778    }
2779
2780    #[tokio::test]
2781    async fn test_http_producer_blocks_localhost() {
2782        use tower::ServiceExt;
2783
2784        let ctx = test_producer_ctx();
2785        let component = HttpComponent::new();
2786        let endpoint_ctx = NoOpComponentContext;
2787        let endpoint = component
2788            .create_endpoint("http://example.com/api", &endpoint_ctx)
2789            .unwrap();
2790        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2791
2792        let mut exchange = Exchange::new(Message::default());
2793        exchange.input.set_header(
2794            "CamelHttpUri",
2795            serde_json::Value::String("http://localhost:8080/internal".to_string()),
2796        );
2797
2798        let result = producer.oneshot(exchange).await;
2799        assert!(result.is_err(), "Should block localhost");
2800    }
2801
2802    #[tokio::test]
2803    async fn test_http_producer_blocks_loopback_ip() {
2804        use tower::ServiceExt;
2805
2806        let ctx = test_producer_ctx();
2807        let component = HttpComponent::new();
2808        let endpoint_ctx = NoOpComponentContext;
2809        let endpoint = component
2810            .create_endpoint("http://example.com/api", &endpoint_ctx)
2811            .unwrap();
2812        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2813
2814        let mut exchange = Exchange::new(Message::default());
2815        exchange.input.set_header(
2816            "CamelHttpUri",
2817            serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
2818        );
2819
2820        let result = producer.oneshot(exchange).await;
2821        assert!(result.is_err(), "Should block loopback IP");
2822    }
2823
2824    #[tokio::test]
2825    async fn test_http_producer_allows_private_ip_when_enabled() {
2826        use tower::ServiceExt;
2827
2828        let ctx = test_producer_ctx();
2829        let component = HttpComponent::new();
2830        let endpoint_ctx = NoOpComponentContext;
2831        // With allowPrivateIps=true, the validation should pass
2832        // (actual connection will fail, but that's expected)
2833        let endpoint = component
2834            .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true", &endpoint_ctx)
2835            .unwrap();
2836        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2837
2838        let exchange = Exchange::new(Message::default());
2839
2840        // The request will fail because we can't connect, but it should NOT fail
2841        // due to SSRF protection
2842        let result = producer.oneshot(exchange).await;
2843        // We expect connection error, not SSRF error
2844        if let Err(ref e) = result {
2845            let err_str = e.to_string();
2846            assert!(
2847                !err_str.contains("Private IP") && !err_str.contains("not allowed"),
2848                "Should not be SSRF error, got: {}",
2849                err_str
2850            );
2851        }
2852    }
2853
2854    // -----------------------------------------------------------------------
2855    // HttpServerConfig tests
2856    // -----------------------------------------------------------------------
2857
2858    #[test]
2859    fn test_http_server_config_parse() {
2860        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
2861        assert_eq!(cfg.host, "0.0.0.0");
2862        assert_eq!(cfg.port, 8080);
2863        assert_eq!(cfg.path, "/orders");
2864        assert_eq!(cfg.max_inflight_requests, 1024);
2865    }
2866
2867    #[test]
2868    fn test_http_server_config_scheme() {
2869        // UriConfig trait method returns "http" as primary scheme
2870        assert_eq!(HttpServerConfig::scheme(), "http");
2871    }
2872
2873    #[test]
2874    fn test_http_server_config_from_components() {
2875        // Test from_components directly (trait method)
2876        let components = camel_component_api::UriComponents {
2877            scheme: "https".to_string(),
2878            path: "//0.0.0.0:8443/api".to_string(),
2879            params: std::collections::HashMap::from([
2880                ("maxRequestBody".to_string(), "5242880".to_string()),
2881                ("maxInflightRequests".to_string(), "7".to_string()),
2882            ]),
2883        };
2884        let cfg = HttpServerConfig::from_components(components).unwrap();
2885        assert_eq!(cfg.host, "0.0.0.0");
2886        assert_eq!(cfg.port, 8443);
2887        assert_eq!(cfg.path, "/api");
2888        assert_eq!(cfg.max_request_body, 5242880);
2889        assert_eq!(cfg.max_inflight_requests, 7);
2890    }
2891
2892    #[test]
2893    fn test_http_server_config_default_path() {
2894        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
2895        assert_eq!(cfg.path, "/");
2896    }
2897
2898    #[test]
2899    fn test_http_server_config_wrong_scheme() {
2900        assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
2901    }
2902
2903    #[test]
2904    fn test_http_server_config_invalid_port() {
2905        assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
2906    }
2907
2908    #[test]
2909    fn test_http_server_config_default_port_by_scheme() {
2910        // HTTP without explicit port should default to 80
2911        let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
2912        assert_eq!(cfg_http.port, 80);
2913
2914        // HTTPS without explicit port should default to 443
2915        let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
2916        assert_eq!(cfg_https.port, 443);
2917    }
2918
2919    #[test]
2920    fn test_request_envelope_and_reply_are_send() {
2921        fn assert_send<T: Send>() {}
2922        assert_send::<RequestEnvelope>();
2923        assert_send::<HttpReply>();
2924    }
2925
2926    // -----------------------------------------------------------------------
2927    // ServerRegistry tests
2928    // -----------------------------------------------------------------------
2929
2930    #[test]
2931    fn test_server_registry_global_is_singleton() {
2932        let r1 = ServerRegistry::global();
2933        let r2 = ServerRegistry::global();
2934        assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
2935    }
2936
2937    #[allow(clippy::await_holding_lock)]
2938    #[tokio::test]
2939    async fn test_concurrent_get_or_spawn_returns_same_registry() {
2940        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2941        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2942        let port = listener.local_addr().unwrap().port();
2943        drop(listener);
2944
2945        let results: Arc<std::sync::Mutex<Vec<HttpRouteRegistry>>> =
2946            Arc::new(std::sync::Mutex::new(Vec::new()));
2947
2948        let mut handles = Vec::new();
2949        for _ in 0..4 {
2950            let results = results.clone();
2951            handles.push(tokio::spawn(async move {
2952                let registry = ServerRegistry::global()
2953                    .get_or_spawn(
2954                        "127.0.0.1",
2955                        port,
2956                        2 * 1024 * 1024,
2957                        10 * 1024 * 1024,
2958                        1024,
2959                        test_rt(),
2960                        "test-route".into(),
2961                    )
2962                    .await
2963                    .unwrap();
2964                results.lock().unwrap().push(registry);
2965            }));
2966        }
2967
2968        for h in handles {
2969            h.await.unwrap();
2970        }
2971
2972        let registries = results.lock().unwrap();
2973        assert_eq!(registries.len(), 4);
2974        for i in 1..registries.len() {
2975            assert!(
2976                Arc::ptr_eq(&registries[0].inner, &registries[i].inner),
2977                "all concurrent callers should get same route registry"
2978            );
2979        }
2980    }
2981
2982    #[test]
2983    fn test_server_registry_distinguishes_host_and_port() {
2984        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2985        let rt = tokio::runtime::Runtime::new().expect("runtime");
2986        rt.block_on(async {
2987            let registry = ServerRegistry::global();
2988            // Use two distinct host values with same configured port key.
2989            // Port 0 is acceptable here because the registry key uses the configured
2990            // tuple, not the OS-assigned ephemeral port.
2991            let d1 = registry
2992                .get_or_spawn(
2993                    "127.0.0.1",
2994                    0,
2995                    1024 * 1024,
2996                    10 * 1024 * 1024,
2997                    1024,
2998                    test_rt(),
2999                    "test-route-1".into(),
3000                )
3001                .await;
3002            let d2 = registry
3003                .get_or_spawn(
3004                    "0.0.0.0",
3005                    0,
3006                    1024 * 1024,
3007                    10 * 1024 * 1024,
3008                    1024,
3009                    test_rt(),
3010                    "test-route-2".into(),
3011                )
3012                .await;
3013            assert!(d1.is_ok());
3014            assert!(d2.is_ok());
3015            assert!(!Arc::ptr_eq(&d1.unwrap().inner, &d2.unwrap().inner));
3016        });
3017    }
3018
3019    #[allow(clippy::await_holding_lock)]
3020    #[tokio::test]
3021    async fn test_shared_server_max_request_body_policy_is_deterministic() {
3022        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3023        let registry = ServerRegistry::global();
3024        // First registration: maxRequestBody = 1 MB
3025        let d1 = registry
3026            .get_or_spawn(
3027                "127.0.0.1",
3028                9991,
3029                1024 * 1024,
3030                10 * 1024 * 1024,
3031                1024,
3032                test_rt(),
3033                "test-route".into(),
3034            )
3035            .await;
3036        assert!(d1.is_ok());
3037
3038        // Second registration on same (host,port): maxRequestBody = 2 MB
3039        // Expected: explicit EndpointCreationFailed about incompatible maxRequestBody
3040        let d2 = registry
3041            .get_or_spawn(
3042                "127.0.0.1",
3043                9991,
3044                2 * 1024 * 1024,
3045                10 * 1024 * 1024,
3046                1024,
3047                test_rt(),
3048                "test-route-2".into(),
3049            )
3050            .await;
3051        assert!(d2.is_err());
3052        let err = d2.unwrap_err();
3053        assert!(
3054            err.to_string().contains("maxRequestBody") || err.to_string().contains("incompatible"),
3055            "Expected incompatible maxRequestBody error, got: {}",
3056            err
3057        );
3058    }
3059
3060    #[test]
3061    fn test_server_registry_reset_clears_entries() {
3062        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3063        let rt = tokio::runtime::Runtime::new().expect("runtime");
3064        rt.block_on(async {
3065            // Register something on a unique port
3066            let d1 = ServerRegistry::global()
3067                .get_or_spawn(
3068                    "127.0.0.1",
3069                    9992,
3070                    1024 * 1024,
3071                    10 * 1024 * 1024,
3072                    1024,
3073                    test_rt(),
3074                    "test-route".into(),
3075                )
3076                .await;
3077            assert!(d1.is_ok());
3078
3079            // Verify entry exists
3080            let guard = ServerRegistry::global().inner.lock().expect("lock");
3081            assert!(guard.contains_key(&("127.0.0.1".to_string(), 9992)));
3082            drop(guard);
3083
3084            // Reset
3085            ServerRegistry::reset();
3086
3087            // Verify cleared
3088            let guard = ServerRegistry::global().inner.lock().expect("lock");
3089            assert!(
3090                guard.is_empty(),
3091                "registry should be empty after reset, has {} entries",
3092                guard.len()
3093            );
3094        });
3095    }
3096
3097    // -----------------------------------------------------------------------
3098    // Axum dispatch handler tests
3099    // -----------------------------------------------------------------------
3100
3101    #[tokio::test]
3102    async fn test_dispatch_handler_returns_404_for_unknown_path() {
3103        let registry = HttpRouteRegistry::new();
3104        // Nothing registered in route registry
3105        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3106        let port = listener.local_addr().unwrap().port();
3107        tokio::spawn(run_axum_server(
3108            listener,
3109            registry,
3110            2 * 1024 * 1024,
3111            10 * 1024 * 1024,
3112            Arc::new(tokio::sync::Semaphore::new(1024)),
3113            test_rt(),
3114            "test-route".into(),
3115        ));
3116
3117        // Wait for server to start
3118        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3119
3120        let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
3121            .await
3122            .unwrap();
3123        assert_eq!(resp.status().as_u16(), 404);
3124    }
3125
3126    // -----------------------------------------------------------------------
3127    // HttpConsumer tests
3128    // -----------------------------------------------------------------------
3129
3130    #[tokio::test]
3131    async fn test_http_consumer_start_registers_path() {
3132        use camel_component_api::ConsumerContext;
3133
3134        // Get an OS-assigned free port
3135        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3136        let port = listener.local_addr().unwrap().port();
3137        drop(listener); // Release port — ServerRegistry will rebind it
3138
3139        let consumer_cfg = HttpServerConfig {
3140            host: "127.0.0.1".to_string(),
3141            port,
3142            path: "/ping".to_string(),
3143            max_request_body: 2 * 1024 * 1024,
3144            max_response_body: 10 * 1024 * 1024,
3145            max_inflight_requests: 1024,
3146        };
3147        let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3148
3149        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3150        let token = tokio_util::sync::CancellationToken::new();
3151        let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3152
3153        tokio::spawn(async move {
3154            consumer.start(ctx).await.unwrap();
3155        });
3156
3157        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3158
3159        let client = reqwest::Client::new();
3160        let resp_future = client
3161            .post(format!("http://127.0.0.1:{port}/ping"))
3162            .body("hello world")
3163            .send();
3164
3165        let (http_result, _) = tokio::join!(resp_future, async {
3166            if let Some(mut envelope) = rx.recv().await {
3167                // Set a custom status code
3168                envelope.exchange.input.set_header(
3169                    "CamelHttpResponseCode",
3170                    serde_json::Value::Number(201.into()),
3171                );
3172                if let Some(reply_tx) = envelope.reply_tx {
3173                    let _ = reply_tx.send(Ok(envelope.exchange));
3174                }
3175            }
3176        });
3177
3178        let resp = http_result.unwrap();
3179        assert_eq!(resp.status().as_u16(), 201);
3180
3181        token.cancel();
3182    }
3183
3184    #[tokio::test]
3185    async fn test_http_consumer_returns_503_when_inflight_limit_reached() {
3186        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3187
3188        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3189        let port = listener.local_addr().unwrap().port();
3190        drop(listener);
3191
3192        let consumer_cfg = HttpServerConfig {
3193            host: "127.0.0.1".to_string(),
3194            port,
3195            path: "/saturation".to_string(),
3196            max_request_body: 2 * 1024 * 1024,
3197            max_response_body: 10 * 1024 * 1024,
3198            max_inflight_requests: 1,
3199        };
3200        let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3201
3202        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3203        let token = tokio_util::sync::CancellationToken::new();
3204        let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3205        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3206        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3207
3208        let (first_seen_tx, first_seen_rx) = tokio::sync::oneshot::channel::<()>();
3209        let (unblock_first_tx, unblock_first_rx) = tokio::sync::oneshot::channel::<()>();
3210
3211        tokio::spawn(async move {
3212            let mut first_seen_tx = Some(first_seen_tx);
3213            let mut unblock_first_rx = Some(unblock_first_rx);
3214
3215            while let Some(envelope) = rx.recv().await {
3216                if let Some(tx) = first_seen_tx.take() {
3217                    let _ = tx.send(());
3218                    if let Some(rx_unblock) = unblock_first_rx.take() {
3219                        let _ = rx_unblock.await;
3220                    }
3221                }
3222
3223                if let Some(reply_tx) = envelope.reply_tx {
3224                    let _ = reply_tx.send(Ok(envelope.exchange));
3225                }
3226            }
3227        });
3228
3229        let client = reqwest::Client::new();
3230        let first_req = {
3231            let client = client.clone();
3232            async move {
3233                client
3234                    .get(format!("http://127.0.0.1:{port}/saturation"))
3235                    .send()
3236                    .await
3237                    .unwrap()
3238            }
3239        };
3240
3241        let first_handle = tokio::spawn(first_req);
3242        first_seen_rx.await.unwrap();
3243
3244        let second_resp = client
3245            .get(format!("http://127.0.0.1:{port}/saturation"))
3246            .send()
3247            .await
3248            .unwrap();
3249
3250        assert_eq!(second_resp.status().as_u16(), 503);
3251
3252        let _ = unblock_first_tx.send(());
3253        let first_resp = first_handle.await.unwrap();
3254        assert_eq!(first_resp.status().as_u16(), 200);
3255
3256        token.cancel();
3257    }
3258
3259    #[tokio::test]
3260    async fn test_http_consumer_enforces_max_response_body_for_bytes() {
3261        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3262
3263        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3264
3265        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3266        let port = listener.local_addr().unwrap().port();
3267        drop(listener);
3268
3269        let consumer_cfg = HttpServerConfig {
3270            host: "127.0.0.1".to_string(),
3271            port,
3272            path: "/limit-bytes".to_string(),
3273            max_request_body: 2 * 1024 * 1024,
3274            max_response_body: 16,
3275            max_inflight_requests: 1024,
3276        };
3277        let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3278
3279        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3280        let token = tokio_util::sync::CancellationToken::new();
3281        let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3282        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3283        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3284
3285        let client = reqwest::Client::new();
3286        let send_fut = client
3287            .get(format!("http://127.0.0.1:{port}/limit-bytes"))
3288            .send();
3289
3290        let (http_result, _) = tokio::join!(send_fut, async {
3291            if let Some(mut envelope) = rx.recv().await {
3292                envelope.exchange.input.body =
3293                    camel_component_api::Body::Bytes(bytes::Bytes::from(vec![b'x'; 32]));
3294                if let Some(reply_tx) = envelope.reply_tx {
3295                    let _ = reply_tx.send(Ok(envelope.exchange));
3296                }
3297            }
3298        });
3299
3300        let resp = http_result.unwrap();
3301        assert_eq!(resp.status().as_u16(), 500);
3302        let body = resp.text().await.unwrap();
3303        assert_eq!(body, "Response body exceeds configured limit");
3304        token.cancel();
3305    }
3306
3307    #[tokio::test]
3308    async fn test_http_consumer_enforces_max_response_body_for_json() {
3309        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3310
3311        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3312
3313        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3314        let port = listener.local_addr().unwrap().port();
3315        drop(listener);
3316
3317        let consumer_cfg = HttpServerConfig {
3318            host: "127.0.0.1".to_string(),
3319            port,
3320            path: "/limit-json".to_string(),
3321            max_request_body: 2 * 1024 * 1024,
3322            max_response_body: 16,
3323            max_inflight_requests: 1024,
3324        };
3325        let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3326
3327        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3328        let token = tokio_util::sync::CancellationToken::new();
3329        let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3330        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3331        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3332
3333        let client = reqwest::Client::new();
3334        let send_fut = client
3335            .get(format!("http://127.0.0.1:{port}/limit-json"))
3336            .send();
3337
3338        let (http_result, _) = tokio::join!(send_fut, async {
3339            if let Some(mut envelope) = rx.recv().await {
3340                envelope.exchange.input.body = camel_component_api::Body::Json(
3341                    serde_json::json!({"message":"this response is bigger than sixteen"}),
3342                );
3343                if let Some(reply_tx) = envelope.reply_tx {
3344                    let _ = reply_tx.send(Ok(envelope.exchange));
3345                }
3346            }
3347        });
3348
3349        let resp = http_result.unwrap();
3350        assert_eq!(resp.status().as_u16(), 500);
3351        let body = resp.text().await.unwrap();
3352        assert_eq!(body, "Response body exceeds configured limit");
3353        token.cancel();
3354    }
3355
3356    #[tokio::test]
3357    async fn test_http_consumer_enforces_max_response_body_for_xml() {
3358        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3359
3360        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3361
3362        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3363        let port = listener.local_addr().unwrap().port();
3364        drop(listener);
3365
3366        let consumer_cfg = HttpServerConfig {
3367            host: "127.0.0.1".to_string(),
3368            port,
3369            path: "/limit-xml".to_string(),
3370            max_request_body: 2 * 1024 * 1024,
3371            max_response_body: 16,
3372            max_inflight_requests: 1024,
3373        };
3374        let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3375
3376        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3377        let token = tokio_util::sync::CancellationToken::new();
3378        let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3379        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3380        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3381
3382        let client = reqwest::Client::new();
3383        let send_fut = client
3384            .get(format!("http://127.0.0.1:{port}/limit-xml"))
3385            .send();
3386
3387        let (http_result, _) = tokio::join!(send_fut, async {
3388            if let Some(mut envelope) = rx.recv().await {
3389                envelope.exchange.input.body = camel_component_api::Body::Xml(
3390                    "<root><value>way-too-large</value></root>".into(),
3391                );
3392                if let Some(reply_tx) = envelope.reply_tx {
3393                    let _ = reply_tx.send(Ok(envelope.exchange));
3394                }
3395            }
3396        });
3397
3398        let resp = http_result.unwrap();
3399        assert_eq!(resp.status().as_u16(), 500);
3400        let body = resp.text().await.unwrap();
3401        assert_eq!(body, "Response body exceeds configured limit");
3402        token.cancel();
3403    }
3404
3405    #[tokio::test]
3406    async fn test_http_consumer_does_not_enforce_max_response_body_for_stream() {
3407        use camel_component_api::{
3408            CamelError, ConsumerContext, ExchangeEnvelope, StreamBody, StreamMetadata,
3409        };
3410        use futures::stream;
3411
3412        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3413
3414        let listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();
3415        let port = listener.local_addr().unwrap().port();
3416        drop(listener);
3417
3418        let consumer_cfg = HttpServerConfig {
3419            host: "0.0.0.0".to_string(),
3420            port,
3421            path: "/limit-stream".to_string(),
3422            max_request_body: 2 * 1024 * 1024,
3423            max_response_body: 16,
3424            max_inflight_requests: 1024,
3425        };
3426        let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3427
3428        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3429        let token = tokio_util::sync::CancellationToken::new();
3430        let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3431        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3432        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3433
3434        let client = reqwest::Client::new();
3435        let send_fut = client
3436            .get(format!("http://127.0.0.1:{port}/limit-stream"))
3437            .send();
3438
3439        let (http_result, _) = tokio::join!(send_fut, async {
3440            if let Some(mut envelope) = rx.recv().await {
3441                let chunks: Vec<Result<bytes::Bytes, CamelError>> =
3442                    vec![Ok(bytes::Bytes::from(vec![b'x'; 32]))];
3443                let stream = Box::pin(stream::iter(chunks));
3444                envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
3445                    stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
3446                    metadata: StreamMetadata {
3447                        size_hint: Some(32),
3448                        content_type: Some("application/octet-stream".into()),
3449                        origin: None,
3450                    },
3451                });
3452                if let Some(reply_tx) = envelope.reply_tx {
3453                    let _ = reply_tx.send(Ok(envelope.exchange));
3454                }
3455            }
3456        });
3457
3458        let resp = http_result.unwrap();
3459        assert_eq!(resp.status().as_u16(), 200);
3460        let body = resp.bytes().await.unwrap();
3461        assert_eq!(body.len(), 32);
3462        token.cancel();
3463    }
3464
3465    // -----------------------------------------------------------------------
3466    // Integration tests
3467    // -----------------------------------------------------------------------
3468
3469    #[tokio::test]
3470    async fn test_integration_single_consumer_round_trip() {
3471        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3472
3473        // Get an OS-assigned free port (ephemeral)
3474        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3475        let port = listener.local_addr().unwrap().port();
3476        drop(listener); // Release — ServerRegistry will rebind
3477
3478        let component = HttpComponent::new();
3479        let endpoint_ctx = NoOpComponentContext;
3480        let endpoint = component
3481            .create_endpoint(&format!("http://127.0.0.1:{port}/echo"), &endpoint_ctx)
3482            .unwrap();
3483        let mut consumer = endpoint.create_consumer(rt()).unwrap();
3484
3485        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3486        let token = tokio_util::sync::CancellationToken::new();
3487        let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3488
3489        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3490        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3491
3492        let client = reqwest::Client::new();
3493        let send_fut = client
3494            .post(format!("http://127.0.0.1:{port}/echo"))
3495            .header("Content-Type", "text/plain")
3496            .body("ping")
3497            .send();
3498
3499        let (http_result, _) = tokio::join!(send_fut, async {
3500            if let Some(mut envelope) = rx.recv().await {
3501                assert_eq!(
3502                    envelope.exchange.input.header("CamelHttpMethod"),
3503                    Some(&serde_json::Value::String("POST".into()))
3504                );
3505                assert_eq!(
3506                    envelope.exchange.input.header("CamelHttpPath"),
3507                    Some(&serde_json::Value::String("/echo".into()))
3508                );
3509                envelope.exchange.input.body = camel_component_api::Body::Text("pong".to_string());
3510                if let Some(reply_tx) = envelope.reply_tx {
3511                    let _ = reply_tx.send(Ok(envelope.exchange));
3512                }
3513            }
3514        });
3515
3516        let resp = http_result.unwrap();
3517        assert_eq!(resp.status().as_u16(), 200);
3518        let body = resp.text().await.unwrap();
3519        assert_eq!(body, "pong");
3520
3521        token.cancel();
3522    }
3523
3524    #[tokio::test]
3525    async fn test_integration_two_consumers_shared_port() {
3526        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3527
3528        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3529
3530        // Get an OS-assigned free port (ephemeral)
3531        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3532        let port = listener.local_addr().unwrap().port();
3533        drop(listener);
3534
3535        let component = HttpComponent::new();
3536        let endpoint_ctx = NoOpComponentContext;
3537
3538        // Consumer A: /hello
3539        let endpoint_a = component
3540            .create_endpoint(&format!("http://127.0.0.1:{port}/hello"), &endpoint_ctx)
3541            .unwrap();
3542        let mut consumer_a = endpoint_a.create_consumer(rt()).unwrap();
3543
3544        // Consumer B: /world
3545        let endpoint_b = component
3546            .create_endpoint(&format!("http://127.0.0.1:{port}/world"), &endpoint_ctx)
3547            .unwrap();
3548        let mut consumer_b = endpoint_b.create_consumer(rt()).unwrap();
3549
3550        let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3551        let token_a = tokio_util::sync::CancellationToken::new();
3552        let ctx_a = ConsumerContext::new(tx_a, token_a.clone(), "http-test-route-a".to_string());
3553
3554        let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3555        let token_b = tokio_util::sync::CancellationToken::new();
3556        let ctx_b = ConsumerContext::new(tx_b, token_b.clone(), "http-test-route-b".to_string());
3557
3558        tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
3559        tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
3560        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3561
3562        let client = reqwest::Client::new();
3563
3564        // Request to /hello
3565        let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
3566        let (resp_hello, _) = tokio::join!(fut_hello, async {
3567            if let Some(mut envelope) = rx_a.recv().await {
3568                envelope.exchange.input.body =
3569                    camel_component_api::Body::Text("hello-response".to_string());
3570                if let Some(reply_tx) = envelope.reply_tx {
3571                    let _ = reply_tx.send(Ok(envelope.exchange));
3572                }
3573            }
3574        });
3575
3576        // Request to /world
3577        let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
3578        let (resp_world, _) = tokio::join!(fut_world, async {
3579            if let Some(mut envelope) = rx_b.recv().await {
3580                envelope.exchange.input.body =
3581                    camel_component_api::Body::Text("world-response".to_string());
3582                if let Some(reply_tx) = envelope.reply_tx {
3583                    let _ = reply_tx.send(Ok(envelope.exchange));
3584                }
3585            }
3586        });
3587
3588        let body_a = resp_hello.unwrap().text().await.unwrap();
3589        let body_b = resp_world.unwrap().text().await.unwrap();
3590
3591        assert_eq!(body_a, "hello-response");
3592        assert_eq!(body_b, "world-response");
3593
3594        token_a.cancel();
3595        token_b.cancel();
3596    }
3597
3598    #[tokio::test]
3599    async fn test_integration_unregistered_path_returns_404() {
3600        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3601
3602        // Get an OS-assigned free port (ephemeral)
3603        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3604        let port = listener.local_addr().unwrap().port();
3605        drop(listener);
3606
3607        let component = HttpComponent::new();
3608        let endpoint_ctx = NoOpComponentContext;
3609        let endpoint = component
3610            .create_endpoint(
3611                &format!("http://127.0.0.1:{port}/registered"),
3612                &endpoint_ctx,
3613            )
3614            .unwrap();
3615        let mut consumer = endpoint.create_consumer(rt()).unwrap();
3616
3617        let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3618        let token = tokio_util::sync::CancellationToken::new();
3619        let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3620
3621        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3622
3623        // Wait until the server is actually accepting connections (CI runners can be slow).
3624        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
3625        loop {
3626            if tokio::net::TcpStream::connect(format!("127.0.0.1:{port}"))
3627                .await
3628                .is_ok()
3629            {
3630                break;
3631            }
3632            if std::time::Instant::now() >= deadline {
3633                panic!("HTTP server did not start within 5s on port {port}");
3634            }
3635            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3636        }
3637
3638        let client = reqwest::Client::new();
3639        let resp = client
3640            .get(format!("http://127.0.0.1:{port}/not-there"))
3641            .send()
3642            .await
3643            .unwrap();
3644        assert_eq!(resp.status().as_u16(), 404);
3645
3646        token.cancel();
3647    }
3648
3649    #[test]
3650    fn test_http_consumer_declares_concurrent() {
3651        use camel_component_api::ConcurrencyModel;
3652
3653        let config = HttpServerConfig {
3654            host: "127.0.0.1".to_string(),
3655            port: 19999,
3656            path: "/test".to_string(),
3657            max_request_body: 2 * 1024 * 1024,
3658            max_response_body: 10 * 1024 * 1024,
3659            max_inflight_requests: 1024,
3660        };
3661        let consumer = HttpConsumer::new(config, test_rt());
3662        assert_eq!(
3663            consumer.concurrency_model(),
3664            ConcurrencyModel::Concurrent { max: None }
3665        );
3666    }
3667
3668    // -----------------------------------------------------------------------
3669    // HttpReplyBody streaming tests
3670    // -----------------------------------------------------------------------
3671
3672    #[tokio::test]
3673    async fn test_http_reply_body_stream_variant_exists() {
3674        use bytes::Bytes;
3675        use camel_component_api::CamelError;
3676        use futures::stream;
3677
3678        let chunks: Vec<Result<Bytes, CamelError>> =
3679            vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
3680        let stream = Box::pin(stream::iter(chunks));
3681        let reply_body = HttpReplyBody::Stream(stream);
3682        // Si compila y el match funciona, el test pasa
3683        match reply_body {
3684            HttpReplyBody::Stream(_) => {}
3685            HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
3686        }
3687    }
3688
3689    // -----------------------------------------------------------------------
3690    // OpenTelemetry propagation tests (only compiled with "otel" feature)
3691    // -----------------------------------------------------------------------
3692
3693    #[cfg(feature = "otel")]
3694    mod otel_tests {
3695        use super::*;
3696        use camel_component_api::Message;
3697        use tower::ServiceExt;
3698
3699        #[tokio::test]
3700        async fn test_producer_injects_traceparent_header() {
3701            let (url, _handle) = start_test_server_with_header_capture().await;
3702            let ctx = test_producer_ctx();
3703
3704            let component = HttpComponent::new();
3705            let endpoint_ctx = NoOpComponentContext;
3706            let endpoint = component
3707                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3708                .unwrap();
3709            let producer = endpoint.create_producer(rt(), &ctx).unwrap();
3710
3711            // Create exchange with an OTel context by extracting from a traceparent header
3712            let mut exchange = Exchange::new(Message::default());
3713            let mut headers = std::collections::HashMap::new();
3714            headers.insert(
3715                "traceparent".to_string(),
3716                "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
3717            );
3718            camel_otel::extract_into_exchange(&mut exchange, &headers);
3719
3720            let result = producer.oneshot(exchange).await.unwrap();
3721
3722            // Verify request succeeded
3723            let status = result
3724                .input
3725                .header("CamelHttpResponseCode")
3726                .and_then(|v| v.as_u64())
3727                .unwrap();
3728            assert_eq!(status, 200);
3729
3730            // The test server echoes back the received traceparent header
3731            let traceparent = result.input.header("X-Received-Traceparent");
3732            assert!(
3733                traceparent.is_some(),
3734                "traceparent header should have been sent"
3735            );
3736
3737            let traceparent_str = traceparent.unwrap().as_str().unwrap();
3738            // Verify format: version-traceid-spanid-flags
3739            let parts: Vec<&str> = traceparent_str.split('-').collect();
3740            assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3741            assert_eq!(parts[0], "00", "version should be 00");
3742            assert_eq!(
3743                parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3744                "trace-id should match"
3745            );
3746            assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
3747            assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
3748        }
3749
3750        #[tokio::test]
3751        async fn test_consumer_extracts_traceparent_header() {
3752            use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3753
3754            // Get an OS-assigned free port
3755            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3756            let port = listener.local_addr().unwrap().port();
3757            drop(listener);
3758
3759            let component = HttpComponent::new();
3760            let endpoint_ctx = NoOpComponentContext;
3761            let endpoint = component
3762                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
3763                .unwrap();
3764            let mut consumer = endpoint.create_consumer(rt()).unwrap();
3765
3766            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3767            let token = tokio_util::sync::CancellationToken::new();
3768            let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3769
3770            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3771            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3772
3773            // Send request with traceparent header
3774            let client = reqwest::Client::new();
3775            let send_fut = client
3776                .post(format!("http://127.0.0.1:{port}/trace"))
3777                .header(
3778                    "traceparent",
3779                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3780                )
3781                .body("test")
3782                .send();
3783
3784            let (http_result, _) = tokio::join!(send_fut, async {
3785                if let Some(envelope) = rx.recv().await {
3786                    // Verify the exchange has a valid OTel context by re-injecting it
3787                    // and checking the traceparent matches
3788                    let mut injected_headers = std::collections::HashMap::new();
3789                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3790
3791                    assert!(
3792                        injected_headers.contains_key("traceparent"),
3793                        "Exchange should have traceparent after extraction"
3794                    );
3795
3796                    let traceparent = injected_headers.get("traceparent").unwrap();
3797                    let parts: Vec<&str> = traceparent.split('-').collect();
3798                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3799                    assert_eq!(
3800                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3801                        "Trace ID should match the original traceparent header"
3802                    );
3803
3804                    if let Some(reply_tx) = envelope.reply_tx {
3805                        let _ = reply_tx.send(Ok(envelope.exchange));
3806                    }
3807                }
3808            });
3809
3810            let resp = http_result.unwrap();
3811            assert_eq!(resp.status().as_u16(), 200);
3812
3813            token.cancel();
3814        }
3815
3816        #[tokio::test]
3817        async fn test_consumer_extracts_mixed_case_traceparent_header() {
3818            use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3819
3820            // Get an OS-assigned free port
3821            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3822            let port = listener.local_addr().unwrap().port();
3823            drop(listener);
3824
3825            let component = HttpComponent::new();
3826            let endpoint_ctx = NoOpComponentContext;
3827            let endpoint = component
3828                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
3829                .unwrap();
3830            let mut consumer = endpoint.create_consumer(rt()).unwrap();
3831
3832            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3833            let token = tokio_util::sync::CancellationToken::new();
3834            let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3835
3836            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3837            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3838
3839            // Send request with MIXED-CASE TraceParent header (not lowercase)
3840            let client = reqwest::Client::new();
3841            let send_fut = client
3842                .post(format!("http://127.0.0.1:{port}/trace"))
3843                .header(
3844                    "TraceParent",
3845                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3846                )
3847                .body("test")
3848                .send();
3849
3850            let (http_result, _) = tokio::join!(send_fut, async {
3851                if let Some(envelope) = rx.recv().await {
3852                    // Verify the exchange has a valid OTel context by re-injecting it
3853                    // and checking the traceparent matches
3854                    let mut injected_headers = HashMap::new();
3855                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3856
3857                    assert!(
3858                        injected_headers.contains_key("traceparent"),
3859                        "Exchange should have traceparent after extraction from mixed-case header"
3860                    );
3861
3862                    let traceparent = injected_headers.get("traceparent").unwrap();
3863                    let parts: Vec<&str> = traceparent.split('-').collect();
3864                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3865                    assert_eq!(
3866                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3867                        "Trace ID should match the original mixed-case TraceParent header"
3868                    );
3869
3870                    if let Some(reply_tx) = envelope.reply_tx {
3871                        let _ = reply_tx.send(Ok(envelope.exchange));
3872                    }
3873                }
3874            });
3875
3876            let resp = http_result.unwrap();
3877            assert_eq!(resp.status().as_u16(), 200);
3878
3879            token.cancel();
3880        }
3881
3882        #[tokio::test]
3883        async fn test_producer_no_trace_context_no_crash() {
3884            let (url, _handle) = start_test_server().await;
3885            let ctx = test_producer_ctx();
3886
3887            let component = HttpComponent::new();
3888            let endpoint_ctx = NoOpComponentContext;
3889            let endpoint = component
3890                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3891                .unwrap();
3892            let producer = endpoint.create_producer(rt(), &ctx).unwrap();
3893
3894            // Create exchange with default (empty) otel_context - no trace context
3895            let exchange = Exchange::new(Message::default());
3896
3897            // Should succeed without panic
3898            let result = producer.oneshot(exchange).await.unwrap();
3899
3900            // Verify request succeeded
3901            let status = result
3902                .input
3903                .header("CamelHttpResponseCode")
3904                .and_then(|v| v.as_u64())
3905                .unwrap();
3906            assert_eq!(status, 200);
3907        }
3908
3909        /// Test server that captures and echoes back the traceparent header
3910        async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
3911            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3912            let addr = listener.local_addr().unwrap();
3913            let url = format!("http://127.0.0.1:{}", addr.port());
3914
3915            let handle = tokio::spawn(async move {
3916                loop {
3917                    if let Ok((mut stream, _)) = listener.accept().await {
3918                        tokio::spawn(async move {
3919                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
3920                            let mut buf = vec![0u8; 8192];
3921                            let n = stream.read(&mut buf).await.unwrap_or(0);
3922                            let request = String::from_utf8_lossy(&buf[..n]).to_string();
3923
3924                            // Extract traceparent header from request
3925                            let traceparent = request
3926                                .lines()
3927                                .find(|line| line.to_lowercase().starts_with("traceparent:"))
3928                                .map(|line| {
3929                                    line.split(':')
3930                                        .nth(1)
3931                                        .map(|s| s.trim().to_string())
3932                                        .unwrap_or_default()
3933                                })
3934                                .unwrap_or_default();
3935
3936                            let body =
3937                                format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
3938                            let response = format!(
3939                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
3940                                body.len(),
3941                                traceparent,
3942                                body
3943                            );
3944                            let _ = stream.write_all(response.as_bytes()).await;
3945                        });
3946                    }
3947                }
3948            });
3949
3950            (url, handle)
3951        }
3952    }
3953
3954    // -----------------------------------------------------------------------
3955    // Response streaming tests (Eje A - Task 2)
3956    // -----------------------------------------------------------------------
3957
3958    // -----------------------------------------------------------------------
3959    // Request streaming tests (Eje B - Task 3)
3960    // -----------------------------------------------------------------------
3961
3962    #[tokio::test]
3963    async fn test_request_body_arrives_as_stream() {
3964        use camel_component_api::Body;
3965        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3966
3967        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3968        let port = listener.local_addr().unwrap().port();
3969        drop(listener);
3970
3971        let component = HttpComponent::new();
3972        let endpoint_ctx = NoOpComponentContext;
3973        let endpoint = component
3974            .create_endpoint(&format!("http://127.0.0.1:{port}/upload"), &endpoint_ctx)
3975            .unwrap();
3976        let mut consumer = endpoint.create_consumer(rt()).unwrap();
3977
3978        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3979        let token = tokio_util::sync::CancellationToken::new();
3980        let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3981
3982        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3983        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3984
3985        let client = reqwest::Client::new();
3986        let send_fut = client
3987            .post(format!("http://127.0.0.1:{port}/upload"))
3988            .body("hello streaming world")
3989            .send();
3990
3991        let (http_result, _) = tokio::join!(send_fut, async {
3992            if let Some(mut envelope) = rx.recv().await {
3993                // Body must be Body::Stream, not Body::Text or Body::Bytes
3994                assert!(
3995                    matches!(envelope.exchange.input.body, Body::Stream(_)),
3996                    "expected Body::Stream, got discriminant {:?}",
3997                    std::mem::discriminant(&envelope.exchange.input.body)
3998                );
3999                // Materialize to verify content
4000                let bytes = envelope
4001                    .exchange
4002                    .input
4003                    .body
4004                    .into_bytes(1024 * 1024)
4005                    .await
4006                    .unwrap();
4007                assert_eq!(&bytes[..], b"hello streaming world");
4008
4009                envelope.exchange.input.body = camel_component_api::Body::Empty;
4010                if let Some(reply_tx) = envelope.reply_tx {
4011                    let _ = reply_tx.send(Ok(envelope.exchange));
4012                }
4013            }
4014        });
4015
4016        let resp = http_result.unwrap();
4017        assert_eq!(resp.status().as_u16(), 200);
4018
4019        token.cancel();
4020    }
4021
4022    // -----------------------------------------------------------------------
4023    // Response streaming tests (Eje A - Task 2)
4024    // -----------------------------------------------------------------------
4025
4026    #[tokio::test]
4027    async fn test_streaming_response_chunked() {
4028        use bytes::Bytes;
4029        use camel_component_api::Body;
4030        use camel_component_api::CamelError;
4031        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
4032        use camel_component_api::{StreamBody, StreamMetadata};
4033        use futures::stream;
4034        use std::sync::Arc;
4035        use tokio::sync::Mutex;
4036
4037        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4038        let port = listener.local_addr().unwrap().port();
4039        drop(listener);
4040
4041        let component = HttpComponent::new();
4042        let endpoint_ctx = NoOpComponentContext;
4043        let endpoint = component
4044            .create_endpoint(&format!("http://127.0.0.1:{port}/stream"), &endpoint_ctx)
4045            .unwrap();
4046        let mut consumer = endpoint.create_consumer(rt()).unwrap();
4047
4048        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
4049        let token = tokio_util::sync::CancellationToken::new();
4050        let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
4051
4052        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4053        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4054
4055        let client = reqwest::Client::new();
4056        let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
4057
4058        let (http_result, _) = tokio::join!(send_fut, async {
4059            if let Some(mut envelope) = rx.recv().await {
4060                // Respond with Body::Stream
4061                let chunks: Vec<Result<Bytes, CamelError>> =
4062                    vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
4063                let stream = Box::pin(stream::iter(chunks));
4064                envelope.exchange.input.body = Body::Stream(StreamBody {
4065                    stream: Arc::new(Mutex::new(Some(stream))),
4066                    metadata: StreamMetadata::default(),
4067                });
4068                if let Some(reply_tx) = envelope.reply_tx {
4069                    let _ = reply_tx.send(Ok(envelope.exchange));
4070                }
4071            }
4072        });
4073
4074        let resp = http_result.unwrap();
4075        assert_eq!(resp.status().as_u16(), 200);
4076        let body = resp.text().await.unwrap();
4077        assert_eq!(body, "chunk1chunk2");
4078
4079        token.cancel();
4080    }
4081
4082    // -----------------------------------------------------------------------
4083    // 413 Content-Length limit test (Task 4)
4084    // -----------------------------------------------------------------------
4085
4086    #[tokio::test]
4087    async fn test_413_when_content_length_exceeds_limit() {
4088        use camel_component_api::ConsumerContext;
4089
4090        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4091        let port = listener.local_addr().unwrap().port();
4092        drop(listener);
4093
4094        // maxRequestBody=100 — any request declaring more than 100 bytes must get 413
4095        let component = HttpComponent::new();
4096        let endpoint_ctx = NoOpComponentContext;
4097        let endpoint = component
4098            .create_endpoint(
4099                &format!("http://127.0.0.1:{port}/upload?maxRequestBody=100"),
4100                &endpoint_ctx,
4101            )
4102            .unwrap();
4103        let mut consumer = endpoint.create_consumer(rt()).unwrap();
4104
4105        let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4106        let token = tokio_util::sync::CancellationToken::new();
4107        let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
4108
4109        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4110        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4111
4112        let client = reqwest::Client::new();
4113        let resp = client
4114            .post(format!("http://127.0.0.1:{port}/upload"))
4115            .header("Content-Length", "1000") // declares 1000 bytes, limit is 100
4116            .body("x".repeat(1000))
4117            .send()
4118            .await
4119            .unwrap();
4120
4121        assert_eq!(resp.status().as_u16(), 413);
4122
4123        token.cancel();
4124    }
4125
4126    /// Chunked upload without Content-Length header must NOT be rejected by maxRequestBody.
4127    /// The spec says: "If there is no Content-Length, the limit does not apply at the
4128    /// consumer level — the route is responsible."
4129    #[tokio::test]
4130    async fn test_chunked_upload_without_content_length_bypasses_limit() {
4131        use bytes::Bytes;
4132        use camel_component_api::Body;
4133        use camel_component_api::ConsumerContext;
4134        use futures::stream;
4135
4136        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4137        let port = listener.local_addr().unwrap().port();
4138        drop(listener);
4139
4140        // maxRequestBody=10 — very small limit; chunked uploads have no Content-Length
4141        let component = HttpComponent::new();
4142        let endpoint_ctx = NoOpComponentContext;
4143        let endpoint = component
4144            .create_endpoint(
4145                &format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"),
4146                &endpoint_ctx,
4147            )
4148            .unwrap();
4149        let mut consumer = endpoint.create_consumer(rt()).unwrap();
4150
4151        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4152        let token = tokio_util::sync::CancellationToken::new();
4153        let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
4154
4155        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4156        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4157
4158        let client = reqwest::Client::new();
4159
4160        // Use wrap_stream so reqwest sends chunked transfer encoding WITHOUT a
4161        // Content-Length header. 100 bytes exceeds the 10-byte maxRequestBody limit,
4162        // but since there's no Content-Length the 413 check must NOT fire.
4163        let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
4164            Ok(Bytes::from("y".repeat(50))),
4165            Ok(Bytes::from("y".repeat(50))),
4166        ];
4167        let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
4168        let send_fut = client
4169            .post(format!("http://127.0.0.1:{port}/upload"))
4170            .body(stream_body)
4171            .send();
4172
4173        let consumer_fut = async {
4174            // Use timeout to avoid deadlock if the handler rejects before enqueueing
4175            match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
4176                Ok(Some(mut envelope)) => {
4177                    assert!(
4178                        matches!(envelope.exchange.input.body, Body::Stream(_)),
4179                        "expected Body::Stream"
4180                    );
4181                    envelope.exchange.input.body = camel_component_api::Body::Empty;
4182                    if let Some(reply_tx) = envelope.reply_tx {
4183                        let _ = reply_tx.send(Ok(envelope.exchange));
4184                    }
4185                }
4186                Ok(None) => panic!("consumer channel closed unexpectedly"),
4187                Err(_) => {
4188                    // Timeout: the request was rejected before reaching the consumer.
4189                    // The HTTP response will carry the real status code (we check below).
4190                }
4191            }
4192        };
4193
4194        let (http_result, _) = tokio::join!(send_fut, consumer_fut);
4195
4196        let resp = http_result.unwrap();
4197        // Must NOT be 413; chunked uploads without Content-Length bypass the limit.
4198        assert_ne!(
4199            resp.status().as_u16(),
4200            413,
4201            "chunked upload must not be rejected by maxRequestBody"
4202        );
4203        assert_eq!(resp.status().as_u16(), 200);
4204
4205        token.cancel();
4206    }
4207
4208    #[test]
4209    fn test_validate_url_for_ssrf_blocks_and_allows_hosts() {
4210        let mut cfg = HttpEndpointConfig::from_uri("http://example.com").unwrap();
4211        cfg.blocked_hosts = vec!["blocked.local".to_string()];
4212        cfg.allow_private_ips = false;
4213
4214        let blocked = validate_url_for_ssrf("http://blocked.local/api", &cfg);
4215        assert!(blocked.is_err());
4216
4217        let private_ip = validate_url_for_ssrf("http://127.0.0.1/api", &cfg);
4218        assert!(private_ip.is_err());
4219
4220        cfg.allow_private_ips = true;
4221        let allowed = validate_url_for_ssrf("http://127.0.0.1/api", &cfg);
4222        assert!(allowed.is_ok());
4223    }
4224
4225    #[test]
4226    fn test_is_private_ip_ranges() {
4227        assert!(is_private_ip(&"10.0.0.1".parse().unwrap())); // allow-unwrap
4228        assert!(is_private_ip(&"172.16.1.10".parse().unwrap())); // allow-unwrap
4229        assert!(is_private_ip(&"192.168.1.1".parse().unwrap())); // allow-unwrap
4230        assert!(is_private_ip(&"127.0.0.1".parse().unwrap())); // allow-unwrap
4231        assert!(is_private_ip(&"169.254.1.1".parse().unwrap())); // allow-unwrap
4232        assert!(is_private_ip(&"0.1.2.3".parse().unwrap())); // allow-unwrap
4233
4234        assert!(is_private_ip(&"::1".parse().unwrap())); // allow-unwrap
4235        assert!(is_private_ip(&"fc00::1".parse().unwrap())); // allow-unwrap
4236        assert!(is_private_ip(&"fd12::1".parse().unwrap())); // allow-unwrap
4237        assert!(is_private_ip(&"fe80::1".parse().unwrap())); // allow-unwrap
4238        // ::ffff:0:0/96 (IPv4-mapped): only private if the mapped IPv4 is private
4239        assert!(is_private_ip(&"::ffff:10.0.0.1".parse().unwrap())); // allow-unwrap
4240        assert!(is_private_ip(&"::ffff:192.168.1.1".parse().unwrap())); // allow-unwrap
4241        assert!(is_private_ip(&"::ffff:127.0.0.1".parse().unwrap())); // allow-unwrap
4242
4243        assert!(!is_private_ip(&"8.8.8.8".parse().unwrap())); // allow-unwrap
4244        assert!(!is_private_ip(&"::ffff:8.8.8.8".parse().unwrap())); // allow-unwrap — public IPv4-mapped
4245        assert!(!is_private_ip(&"2001:4860:4860::8888".parse().unwrap())); // allow-unwrap
4246    }
4247
4248    #[tokio::test]
4249    async fn test_validate_resolved_host_for_ssrf_blocks_resolved_private_ip() {
4250        let mut cfg = HttpEndpointConfig::from_uri("http://example.com").unwrap(); // allow-unwrap
4251        cfg.allow_private_ips = false;
4252
4253        let err = validate_resolved_host_for_ssrf("http://localhost", &cfg)
4254            .await
4255            .expect_err("localhost must resolve to loopback and be blocked");
4256
4257        let msg = err.to_string();
4258        assert!(msg.contains("Target resolved to private IP"));
4259    }
4260
4261    #[test]
4262    fn test_title_case_header() {
4263        assert_eq!(title_case_header("content-type"), "Content-Type");
4264        assert_eq!(title_case_header("authorization"), "Authorization");
4265        assert_eq!(title_case_header("x-custom-header"), "X-Custom-Header");
4266        assert_eq!(title_case_header("host"), "Host");
4267        assert_eq!(title_case_header("x-b3-traceid"), "X-B3-Traceid");
4268        assert_eq!(title_case_header("single"), "Single");
4269        assert_eq!(title_case_header(""), "");
4270    }
4271
4272    #[test]
4273    fn test_resolve_url_combines_path_and_query_sources() {
4274        let cfg = HttpEndpointConfig::from_uri("http://example.com/base?foo=bar").unwrap();
4275        let mut exchange = Exchange::new(Message::default());
4276        exchange.input.set_header(
4277            "CamelHttpPath",
4278            serde_json::Value::String("next".to_string()),
4279        );
4280        let url = HttpProducer::resolve_url(&exchange, &cfg);
4281        assert!(url.starts_with("http://example.com/base/next?"));
4282        assert!(url.contains("foo=bar"));
4283
4284        exchange.input.set_header(
4285            "CamelHttpUri",
4286            serde_json::Value::String("http://other.test/root".to_string()),
4287        );
4288        exchange.input.set_header(
4289            "CamelHttpQuery",
4290            serde_json::Value::String("a=1&b=2".to_string()),
4291        );
4292
4293        let override_url = HttpProducer::resolve_url(&exchange, &cfg);
4294        assert_eq!(override_url, "http://other.test/root/next?a=1&b=2");
4295    }
4296
4297    #[test]
4298    fn test_http_producer_helpers_status_and_size_boundaries() {
4299        assert!(HttpProducer::is_ok_status(200, (200, 299)));
4300        assert!(HttpProducer::is_ok_status(299, (200, 299)));
4301        assert!(!HttpProducer::is_ok_status(199, (200, 299)));
4302        assert!(!HttpProducer::is_ok_status(300, (200, 299)));
4303
4304        assert!(!exceeds_max_response_body(10, 10));
4305        assert!(exceeds_max_response_body(11, 10));
4306    }
4307
4308    // -----------------------------------------------------------------------
4309    // Content-Type inference tests
4310    // -----------------------------------------------------------------------
4311
4312    async fn setup_consumer_on_free_port(
4313        path: &str,
4314    ) -> (
4315        u16,
4316        tokio::sync::mpsc::Receiver<camel_component_api::ExchangeEnvelope>,
4317        tokio_util::sync::CancellationToken,
4318    ) {
4319        use camel_component_api::ConsumerContext;
4320
4321        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4322        let port = listener.local_addr().unwrap().port();
4323        drop(listener);
4324
4325        let consumer_cfg = HttpServerConfig {
4326            host: "127.0.0.1".to_string(),
4327            port,
4328            path: path.to_string(),
4329            max_request_body: 2 * 1024 * 1024,
4330            max_response_body: 10 * 1024 * 1024,
4331            max_inflight_requests: 1024,
4332        };
4333        let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
4334
4335        let (tx, rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4336        let token = tokio_util::sync::CancellationToken::new();
4337        let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
4338
4339        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4340        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4341
4342        (port, rx, token)
4343    }
4344
4345    #[tokio::test]
4346    async fn test_content_type_inferred_for_json_body() {
4347        let (port, mut rx, token) = setup_consumer_on_free_port("/json").await;
4348
4349        let client = reqwest::Client::new();
4350        let send_fut = client.get(format!("http://127.0.0.1:{port}/json")).send();
4351
4352        let (http_result, _) = tokio::join!(send_fut, async {
4353            if let Some(mut envelope) = rx.recv().await {
4354                envelope.exchange.input.body =
4355                    camel_component_api::Body::Json(serde_json::json!({"message": "hello"}));
4356                if let Some(reply_tx) = envelope.reply_tx {
4357                    let _ = reply_tx.send(Ok(envelope.exchange));
4358                }
4359            }
4360        });
4361
4362        let resp = http_result.unwrap();
4363        assert_eq!(resp.status().as_u16(), 200);
4364        let ct = resp
4365            .headers()
4366            .get("content-type")
4367            .expect("Content-Type header should be present");
4368        assert_eq!(ct, "application/json");
4369        let body = resp.text().await.unwrap();
4370        assert_eq!(body, r#"{"message":"hello"}"#);
4371
4372        token.cancel();
4373    }
4374
4375    #[tokio::test]
4376    async fn test_content_type_inferred_for_text_body() {
4377        let (port, mut rx, token) = setup_consumer_on_free_port("/text").await;
4378
4379        let client = reqwest::Client::new();
4380        let send_fut = client.get(format!("http://127.0.0.1:{port}/text")).send();
4381
4382        let (http_result, _) = tokio::join!(send_fut, async {
4383            if let Some(mut envelope) = rx.recv().await {
4384                envelope.exchange.input.body =
4385                    camel_component_api::Body::Text("plain text response".to_string());
4386                if let Some(reply_tx) = envelope.reply_tx {
4387                    let _ = reply_tx.send(Ok(envelope.exchange));
4388                }
4389            }
4390        });
4391
4392        let resp = http_result.unwrap();
4393        assert_eq!(resp.status().as_u16(), 200);
4394        let ct = resp
4395            .headers()
4396            .get("content-type")
4397            .expect("Content-Type header should be present");
4398        assert_eq!(ct, "text/plain; charset=utf-8");
4399        let body = resp.text().await.unwrap();
4400        assert_eq!(body, "plain text response");
4401
4402        token.cancel();
4403    }
4404
4405    #[tokio::test]
4406    async fn test_content_type_inferred_for_xml_body() {
4407        let (port, mut rx, token) = setup_consumer_on_free_port("/xml").await;
4408
4409        let client = reqwest::Client::new();
4410        let send_fut = client.get(format!("http://127.0.0.1:{port}/xml")).send();
4411
4412        let (http_result, _) = tokio::join!(send_fut, async {
4413            if let Some(mut envelope) = rx.recv().await {
4414                envelope.exchange.input.body =
4415                    camel_component_api::Body::Xml("<root><item>value</item></root>".to_string());
4416                if let Some(reply_tx) = envelope.reply_tx {
4417                    let _ = reply_tx.send(Ok(envelope.exchange));
4418                }
4419            }
4420        });
4421
4422        let resp = http_result.unwrap();
4423        assert_eq!(resp.status().as_u16(), 200);
4424        let ct = resp
4425            .headers()
4426            .get("content-type")
4427            .expect("Content-Type header should be present");
4428        assert_eq!(ct, "application/xml");
4429        let body = resp.text().await.unwrap();
4430        assert_eq!(body, "<root><item>value</item></root>");
4431
4432        token.cancel();
4433    }
4434
4435    #[tokio::test]
4436    async fn test_no_content_type_for_empty_body() {
4437        let (port, mut rx, token) = setup_consumer_on_free_port("/empty").await;
4438
4439        let client = reqwest::Client::new();
4440        let send_fut = client.get(format!("http://127.0.0.1:{port}/empty")).send();
4441
4442        let (http_result, _) = tokio::join!(send_fut, async {
4443            if let Some(mut envelope) = rx.recv().await {
4444                envelope.exchange.input.body = camel_component_api::Body::Empty;
4445                if let Some(reply_tx) = envelope.reply_tx {
4446                    let _ = reply_tx.send(Ok(envelope.exchange));
4447                }
4448            }
4449        });
4450
4451        let resp = http_result.unwrap();
4452        assert_eq!(resp.status().as_u16(), 200);
4453        assert!(
4454            resp.headers().get("content-type").is_none(),
4455            "Empty body should not set Content-Type"
4456        );
4457
4458        token.cancel();
4459    }
4460
4461    #[tokio::test]
4462    async fn test_no_content_type_for_raw_bytes_body() {
4463        let (port, mut rx, token) = setup_consumer_on_free_port("/bytes").await;
4464
4465        let client = reqwest::Client::new();
4466        let send_fut = client.get(format!("http://127.0.0.1:{port}/bytes")).send();
4467
4468        let (http_result, _) = tokio::join!(send_fut, async {
4469            if let Some(mut envelope) = rx.recv().await {
4470                envelope.exchange.input.body =
4471                    camel_component_api::Body::Bytes(bytes::Bytes::from_static(b"\x00\x01\x02"));
4472                if let Some(reply_tx) = envelope.reply_tx {
4473                    let _ = reply_tx.send(Ok(envelope.exchange));
4474                }
4475            }
4476        });
4477
4478        let resp = http_result.unwrap();
4479        assert_eq!(resp.status().as_u16(), 200);
4480        assert!(
4481            resp.headers().get("content-type").is_none(),
4482            "Raw Bytes body should not set Content-Type"
4483        );
4484
4485        token.cancel();
4486    }
4487
4488    #[tokio::test]
4489    async fn test_content_type_from_stream_metadata() {
4490        use camel_component_api::{StreamBody, StreamMetadata};
4491        use futures::stream;
4492
4493        let (port, mut rx, token) = setup_consumer_on_free_port("/stream-ct").await;
4494
4495        let client = reqwest::Client::new();
4496        let send_fut = client
4497            .get(format!("http://127.0.0.1:{port}/stream-ct"))
4498            .send();
4499
4500        let (http_result, _) = tokio::join!(send_fut, async {
4501            if let Some(mut envelope) = rx.recv().await {
4502                let chunks: Vec<Result<bytes::Bytes, CamelError>> =
4503                    vec![Ok(bytes::Bytes::from("audio data"))];
4504                let stream = Box::pin(stream::iter(chunks));
4505                envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
4506                    stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
4507                    metadata: StreamMetadata {
4508                        size_hint: None,
4509                        content_type: Some("audio/mpeg".to_string()),
4510                        origin: None,
4511                    },
4512                });
4513                if let Some(reply_tx) = envelope.reply_tx {
4514                    let _ = reply_tx.send(Ok(envelope.exchange));
4515                }
4516            }
4517        });
4518
4519        let resp = http_result.unwrap();
4520        assert_eq!(resp.status().as_u16(), 200);
4521        let ct = resp
4522            .headers()
4523            .get("content-type")
4524            .expect("Content-Type header should be present");
4525        assert_eq!(ct, "audio/mpeg");
4526        let body = resp.text().await.unwrap();
4527        assert_eq!(body, "audio data");
4528
4529        token.cancel();
4530    }
4531
4532    #[tokio::test]
4533    async fn test_user_content_type_overrides_inferred() {
4534        let (port, mut rx, token) = setup_consumer_on_free_port("/override-ct").await;
4535
4536        let client = reqwest::Client::new();
4537        let send_fut = client
4538            .get(format!("http://127.0.0.1:{port}/override-ct"))
4539            .send();
4540
4541        let (http_result, _) = tokio::join!(send_fut, async {
4542            if let Some(mut envelope) = rx.recv().await {
4543                envelope.exchange.input.body =
4544                    camel_component_api::Body::Json(serde_json::json!({"ok": true}));
4545                envelope.exchange.input.set_header(
4546                    "Content-Type",
4547                    serde_json::Value::String("text/html".to_string()),
4548                );
4549                if let Some(reply_tx) = envelope.reply_tx {
4550                    let _ = reply_tx.send(Ok(envelope.exchange));
4551                }
4552            }
4553        });
4554
4555        let resp = http_result.unwrap();
4556        assert_eq!(resp.status().as_u16(), 200);
4557        let ct = resp
4558            .headers()
4559            .get("content-type")
4560            .expect("Content-Type header should be present");
4561        assert_eq!(
4562            ct, "text/html",
4563            "User-set Content-Type should take precedence over inferred type"
4564        );
4565
4566        token.cancel();
4567    }
4568
4569    #[tokio::test]
4570    async fn test_user_content_type_with_bytes_body() {
4571        let (port, mut rx, token) = setup_consumer_on_free_port("/bytes-ct").await;
4572
4573        let client = reqwest::Client::new();
4574        let send_fut = client
4575            .get(format!("http://127.0.0.1:{port}/bytes-ct"))
4576            .send();
4577
4578        let (http_result, _) = tokio::join!(send_fut, async {
4579            if let Some(mut envelope) = rx.recv().await {
4580                envelope.exchange.input.body =
4581                    camel_component_api::Body::Bytes(bytes::Bytes::from_static(b"{\"ok\":true}"));
4582                envelope.exchange.input.set_header(
4583                    "Content-Type",
4584                    serde_json::Value::String("application/json".to_string()),
4585                );
4586                if let Some(reply_tx) = envelope.reply_tx {
4587                    let _ = reply_tx.send(Ok(envelope.exchange));
4588                }
4589            }
4590        });
4591
4592        let resp = http_result.unwrap();
4593        assert_eq!(resp.status().as_u16(), 200);
4594        let ct = resp
4595            .headers()
4596            .get("content-type")
4597            .expect("Content-Type header should be present for Bytes body with user header");
4598        assert_eq!(
4599            ct, "application/json",
4600            "User Content-Type should be sent for Bytes body"
4601        );
4602
4603        token.cancel();
4604    }
4605
4606    // -----------------------------------------------------------------------
4607    // Server monitor tests (GRL-005)
4608    // -----------------------------------------------------------------------
4609
4610    #[tokio::test]
4611    async fn monitor_task_silent_on_clean_exit() {
4612        let handle: tokio::task::JoinHandle<()> = tokio::spawn(async {});
4613        // Clean exit should complete without panicking or logging errors
4614        monitor_axum_task(
4615            handle,
4616            "127.0.0.1:0".to_string(),
4617            noop_rt(),
4618            "test-monitor".into(),
4619        )
4620        .await;
4621    }
4622
4623    #[tokio::test]
4624    async fn monitor_task_handles_panicked_task() {
4625        let handle: tokio::task::JoinHandle<()> = tokio::spawn(async {
4626            panic!("simulated server crash");
4627        });
4628        // Should complete without panicking even though the inner task panicked
4629        monitor_axum_task(
4630            handle,
4631            "127.0.0.1:9999".to_string(),
4632            noop_rt(),
4633            "test-monitor".into(),
4634        )
4635        .await;
4636    }
4637
4638    // -----------------------------------------------------------------------
4639    // Credential redaction tests
4640    // -----------------------------------------------------------------------
4641
4642    #[test]
4643    fn http_auth_basic_debug_redacts_password() {
4644        let auth = HttpAuth::Basic {
4645            username: "admin".to_string(),
4646            password: "hunter2".to_string(),
4647        };
4648        let debug = format!("{:?}", auth);
4649        assert!(
4650            !debug.contains("hunter2"),
4651            "password must be redacted: {debug}"
4652        );
4653        assert!(debug.contains("admin"), "username should appear: {debug}");
4654    }
4655
4656    #[test]
4657    fn http_auth_bearer_debug_redacts_token() {
4658        let auth = HttpAuth::Bearer {
4659            token: "eyJhbGciOiJIUzI1NiJ9.secret".to_string(),
4660        };
4661        let debug = format!("{:?}", auth);
4662        assert!(
4663            !debug.contains("eyJhbGci"),
4664            "token must be redacted: {debug}"
4665        );
4666    }
4667
4668    #[test]
4669    fn http_auth_none_debug_shows_variant() {
4670        let debug = format!("{:?}", HttpAuth::None);
4671        assert!(
4672            debug.contains("None"),
4673            "None variant should appear: {debug}"
4674        );
4675    }
4676
4677    #[test]
4678    fn http_endpoint_config_debug_redacts_auth_credentials() {
4679        let config = HttpEndpointConfig::from_uri(
4680            "http://localhost/api?authMethod=Basic&authUsername=admin&authPassword=secret123",
4681        )
4682        .unwrap();
4683        let debug = format!("{:?}", config);
4684        assert!(
4685            !debug.contains("secret123"),
4686            "password must be redacted in HttpEndpointConfig debug: {debug}"
4687        );
4688    }
4689
4690    // -----------------------------------------------------------------------
4691    // Static file serving tests (Task 5)
4692    // -----------------------------------------------------------------------
4693
4694    use crate::registry::{HttpRouteRegistry, MountMode, StaticMount};
4695    use tower_http::services::ServeDir;
4696
4697    fn make_test_registry() -> HttpRouteRegistry {
4698        HttpRouteRegistry::new()
4699    }
4700
4701    fn make_test_state(registry: HttpRouteRegistry) -> AppState {
4702        AppState {
4703            registry,
4704            max_request_body: 2 * 1024 * 1024,
4705            max_response_body: 10 * 1024 * 1024,
4706            inflight: Arc::new(tokio::sync::Semaphore::new(1024)),
4707        }
4708    }
4709
4710    #[allow(clippy::await_holding_lock)]
4711    #[tokio::test]
4712    async fn test_static_file_serving_serves_file_contents() {
4713        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
4714        ServerRegistry::reset();
4715
4716        // Create temp dir with test files
4717        let temp_dir =
4718            std::env::temp_dir().join(format!("http_static_test_{}", std::process::id()));
4719        std::fs::create_dir_all(&temp_dir).unwrap();
4720        std::fs::write(temp_dir.join("hello.txt"), "Hello, static world!").unwrap();
4721        std::fs::write(temp_dir.join("style.css"), "body { color: red; }").unwrap();
4722
4723        let canonical_dir = std::fs::canonicalize(&temp_dir).unwrap();
4724
4725        let registry = make_test_registry();
4726        let serve_dir = ServeDir::new(&canonical_dir)
4727            .precompressed_gzip()
4728            .precompressed_br()
4729            .append_index_html_on_directories(true);
4730
4731        let mount = StaticMount {
4732            mount_path: "/".to_string(),
4733            mode: MountMode::Static,
4734            dir: canonical_dir.clone(),
4735            cache_control: "public, max-age=3600".to_string(),
4736            error_pages: std::collections::HashMap::new(),
4737            serve_dir,
4738        };
4739        registry.register_static_mount(mount).await.unwrap();
4740
4741        let state = make_test_state(registry);
4742
4743        // Test serving hello.txt
4744        let req = Request::builder()
4745            .uri("/hello.txt")
4746            .body(AxumBody::empty())
4747            .unwrap();
4748        let resp = static_dispatch::dispatch_static(&state, req, "/hello.txt").await;
4749        assert_eq!(resp.status(), StatusCode::OK);
4750        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4751            .await
4752            .unwrap();
4753        assert_eq!(&body[..], b"Hello, static world!");
4754
4755        // Test serving style.css
4756        let req = Request::builder()
4757            .uri("/style.css")
4758            .body(AxumBody::empty())
4759            .unwrap();
4760        let resp = static_dispatch::dispatch_static(&state, req, "/style.css").await;
4761        assert_eq!(resp.status(), StatusCode::OK);
4762        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4763            .await
4764            .unwrap();
4765        assert_eq!(&body[..], b"body { color: red; }");
4766
4767        // Test 404 for non-existent file
4768        let req = Request::builder()
4769            .uri("/missing.txt")
4770            .body(AxumBody::empty())
4771            .unwrap();
4772        let resp = static_dispatch::dispatch_static(&state, req, "/missing.txt").await;
4773        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4774
4775        // Cleanup
4776        std::fs::remove_dir_all(&temp_dir).ok();
4777    }
4778
4779    #[allow(clippy::await_holding_lock)]
4780    #[tokio::test]
4781    async fn test_spa_fallback_serves_index_for_unknown_paths() {
4782        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
4783        ServerRegistry::reset();
4784
4785        let temp_dir = std::env::temp_dir().join(format!("http_spa_test_{}", std::process::id()));
4786        std::fs::create_dir_all(&temp_dir).unwrap();
4787        std::fs::write(temp_dir.join("index.html"), "<h1>SPA App</h1>").unwrap();
4788        std::fs::write(temp_dir.join("app.js"), "console.log('app')").unwrap();
4789
4790        let canonical_dir = std::fs::canonicalize(&temp_dir).unwrap();
4791
4792        let registry = make_test_registry();
4793        let serve_dir = ServeDir::new(&canonical_dir)
4794            .precompressed_gzip()
4795            .precompressed_br()
4796            .append_index_html_on_directories(true);
4797
4798        let mount = StaticMount {
4799            mount_path: "/".to_string(),
4800            mode: MountMode::Spa,
4801            dir: canonical_dir.clone(),
4802            cache_control: "public, max-age=0".to_string(),
4803            error_pages: std::collections::HashMap::new(),
4804            serve_dir,
4805        };
4806        // Register as SPA mount
4807        registry.register_static_mount(mount).await.unwrap();
4808
4809        let state = make_test_state(registry);
4810
4811        // SPA fallback: GET /dashboard with Accept: text/html → index.html
4812        let req = Request::builder()
4813            .method("GET")
4814            .uri("/dashboard")
4815            .header("Accept", "text/html")
4816            .body(AxumBody::empty())
4817            .unwrap();
4818        let resp = static_dispatch::dispatch_static(&state, req, "/dashboard").await;
4819        assert_eq!(resp.status(), StatusCode::OK);
4820        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4821            .await
4822            .unwrap();
4823        assert_eq!(&body[..], b"<h1>SPA App</h1>");
4824
4825        // Static file still works: GET /app.js
4826        let req = Request::builder()
4827            .method("GET")
4828            .uri("/app.js")
4829            .body(AxumBody::empty())
4830            .unwrap();
4831        let resp = static_dispatch::dispatch_static(&state, req, "/app.js").await;
4832        assert_eq!(resp.status(), StatusCode::OK);
4833        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4834            .await
4835            .unwrap();
4836        assert_eq!(&body[..], b"console.log('app')");
4837
4838        // No SPA fallback for JSON accept → 404
4839        let req = Request::builder()
4840            .method("GET")
4841            .uri("/api/data")
4842            .header("Accept", "application/json")
4843            .body(AxumBody::empty())
4844            .unwrap();
4845        let resp = static_dispatch::dispatch_static(&state, req, "/api/data").await;
4846        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4847
4848        // No SPA fallback for file extensions → 404
4849        let req = Request::builder()
4850            .method("GET")
4851            .uri("/style.css")
4852            .header("Accept", "text/html")
4853            .body(AxumBody::empty())
4854            .unwrap();
4855        let resp = static_dispatch::dispatch_static(&state, req, "/style.css").await;
4856        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4857
4858        // Cleanup
4859        std::fs::remove_dir_all(&temp_dir).ok();
4860    }
4861
4862    #[allow(clippy::await_holding_lock)]
4863    #[tokio::test]
4864    async fn test_error_page_mapping_serves_custom_404() {
4865        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
4866        ServerRegistry::reset();
4867
4868        let temp_dir = std::env::temp_dir().join(format!("http_error_test_{}", std::process::id()));
4869        let errors_dir = temp_dir.join("errors");
4870        std::fs::create_dir_all(&errors_dir).unwrap();
4871        std::fs::write(temp_dir.join("index.html"), "<h1>Home</h1>").unwrap();
4872        std::fs::write(errors_dir.join("404.html"), "<h1>Custom 404</h1>").unwrap();
4873
4874        let canonical_dir = std::fs::canonicalize(&temp_dir).unwrap();
4875        let canonical_404 = std::fs::canonicalize(errors_dir.join("404.html")).unwrap();
4876
4877        let registry = make_test_registry();
4878        let serve_dir = ServeDir::new(&canonical_dir)
4879            .precompressed_gzip()
4880            .precompressed_br()
4881            .append_index_html_on_directories(true);
4882
4883        let mut error_pages = std::collections::HashMap::new();
4884        error_pages.insert(404, canonical_404);
4885
4886        let mount = StaticMount {
4887            mount_path: "/".to_string(),
4888            mode: MountMode::Static,
4889            dir: canonical_dir.clone(),
4890            cache_control: "public, max-age=0".to_string(),
4891            error_pages,
4892            serve_dir,
4893        };
4894        registry.register_static_mount(mount).await.unwrap();
4895
4896        let state = make_test_state(registry);
4897
4898        // Request non-existent file → custom 404 page
4899        let req = Request::builder()
4900            .method("GET")
4901            .uri("/missing.html")
4902            .body(AxumBody::empty())
4903            .unwrap();
4904        let resp = static_dispatch::dispatch_static(&state, req, "/missing.html").await;
4905        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4906        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4907            .await
4908            .unwrap();
4909        assert_eq!(&body[..], b"<h1>Custom 404</h1>");
4910
4911        // Existing file still works
4912        let req = Request::builder()
4913            .method("GET")
4914            .uri("/index.html")
4915            .body(AxumBody::empty())
4916            .unwrap();
4917        let resp = static_dispatch::dispatch_static(&state, req, "/index.html").await;
4918        assert_eq!(resp.status(), StatusCode::OK);
4919        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4920            .await
4921            .unwrap();
4922        assert_eq!(&body[..], b"<h1>Home</h1>");
4923
4924        // Cleanup
4925        std::fs::remove_dir_all(&temp_dir).ok();
4926    }
4927}