Skip to main content

camel_component_http/
lib.rs

1pub mod auth;
2pub mod bundle;
3pub mod config;
4pub mod health;
5pub mod registry;
6pub mod static_config;
7pub mod static_dispatch;
8pub mod static_endpoint;
9use crate::config::parse_ok_status_code_range;
10pub use bundle::HttpBundle;
11pub use bundle::HttpStaticBundle;
12pub use config::HttpConfig;
13pub use health::HttpHealthCheck;
14pub use registry::HttpRouteRegistry;
15pub use static_config::HttpStaticConfig;
16pub use static_endpoint::{HttpStaticComponent, HttpStaticConsumer, HttpStaticEndpoint};
17
18use std::collections::HashMap;
19use std::future::Future;
20use std::net::IpAddr;
21use std::pin::Pin;
22use std::sync::{Arc, Mutex, OnceLock};
23use std::task::{Context, Poll};
24use std::time::Duration;
25
26use tokio::sync::OnceCell;
27use tower::Layer;
28use tower::Service;
29use tracing::debug;
30
31use axum::body::BodyDataStream;
32use camel_auth::bearer_token_layer::BearerTokenLayer;
33use camel_auth::oauth2::TokenProvider;
34use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, StreamBody, StreamMetadata};
35use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
36use camel_component_api::{UriComponents, UriConfig, parse_uri};
37use futures::TryStreamExt;
38use futures::stream::BoxStream;
39
40// ---------------------------------------------------------------------------
41// HttpEndpointConfig
42// ---------------------------------------------------------------------------
43
44/// Configuration for an HTTP client (producer) endpoint.
45///
46/// # Memory Limits
47///
48/// HTTP operations enforce conservative memory limits to prevent denial-of-service
49/// attacks from untrusted network sources. These limits are significantly lower than
50/// file component limits (100MB) because HTTP typically handles API responses rather
51/// than large file transfers, and clients may be untrusted.
52///
53/// ## Default Limits
54///
55/// - **HTTP client body**: 10MB (typical API responses)
56/// - **HTTP server request**: 2MB (untrusted network input - see `HttpServerConfig`)
57/// - **HTTP server response**: 10MB (same as client - see `HttpServerConfig`)
58///
59/// ## Rationale
60///
61/// The 10MB limit for HTTP client responses is appropriate for most API interactions
62/// while providing protection against:
63/// - Malicious servers sending oversized responses
64/// - Runaway processes generating unexpectedly large payloads
65/// - Memory exhaustion attacks
66///
67/// The 2MB server request limit is even more conservative because it handles input
68/// from potentially untrusted clients on the public internet.
69///
70/// ## Overriding Limits
71///
72/// Override the default client body limit using the `maxBodySize` URI parameter:
73///
74/// ```text
75/// http://api.example.com/large-data?maxBodySize=52428800
76/// ```
77///
78/// For server endpoints, use `maxRequestBody` and `maxResponseBody` parameters:
79///
80/// ```text
81/// http://0.0.0.0:8080/upload?maxRequestBody=52428800
82/// ```
83///
84/// ## Behavior When Exceeded
85///
86/// When a body exceeds the configured limit:
87/// - An error is returned immediately
88/// - No memory is exhausted - the limit is checked before allocation
89/// - The HTTP connection is terminated cleanly
90///
91/// ## Security Considerations
92///
93/// HTTP endpoints should be treated with more caution than file endpoints because:
94/// - Clients may be unknown and untrusted
95/// - Network traffic can be spoofed or malicious
96/// - DoS attacks often exploit unbounded resource consumption
97///
98/// Only increase limits when you control both ends of the connection or when
99/// business requirements demand larger payloads.
100#[derive(Debug, Clone)]
101pub struct HttpEndpointConfig {
102    pub base_url: String,
103    pub http_method: Option<String>,
104    pub throw_exception_on_failure: bool,
105    pub ok_status_code_range: (u16, u16),
106    pub response_timeout: Option<Duration>,
107    pub query_params: HashMap<String, String>,
108    pub allow_private_ips: bool,
109    pub blocked_hosts: Vec<String>,
110    pub max_body_size: usize,
111    pub read_timeout_ms: u64,
112    pub max_response_bytes: usize,
113    pub auth: HttpAuth,
114    pub token_provider: Option<Arc<dyn TokenProvider>>,
115    pub user_agent: Option<String>,
116    pub cookie_handling: CookieHandling,
117    pub bridge_endpoint: bool,
118    pub connection_close: bool,
119    pub skip_request_headers: Vec<String>,
120    pub skip_response_headers: Vec<String>,
121}
122
123#[derive(Clone, PartialEq)]
124pub enum HttpAuth {
125    None,
126    Basic { username: String, password: String },
127    Bearer { token: String },
128}
129
130impl std::fmt::Debug for HttpAuth {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        match self {
133            HttpAuth::None => f.write_str("None"),
134            HttpAuth::Basic { username, .. } => f
135                .debug_struct("Basic")
136                .field("username", username)
137                .field("password", &"***")
138                .finish(),
139            HttpAuth::Bearer { .. } => f.debug_struct("Bearer").field("token", &"***").finish(),
140        }
141    }
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub enum CookieHandling {
146    Disabled,
147    InMemory,
148}
149
150/// Camel options that should NOT be forwarded as HTTP query params
151const HTTP_CAMEL_OPTIONS: &[&str] = &[
152    "httpMethod",
153    "throwExceptionOnFailure",
154    "okStatusCodeRange",
155    "followRedirects",
156    "connectTimeout",
157    "responseTimeout",
158    "allowPrivateIps",
159    "blockedHosts",
160    "maxBodySize",
161    "readTimeout",
162    "maxResponseBytes",
163    "authMethod",
164    "authUsername",
165    "authPassword",
166    "authBearerToken",
167    "userAgent",
168    "cookieHandling",
169    "bridgeEndpoint",
170    "connectionClose",
171    "skipRequestHeaders",
172    "skipResponseHeaders",
173];
174
175impl UriConfig for HttpEndpointConfig {
176    /// Returns "http" as the primary scheme (also accepts "https")
177    fn scheme() -> &'static str {
178        "http"
179    }
180
181    fn from_uri(uri: &str) -> Result<Self, CamelError> {
182        let parts = parse_uri(uri)?;
183        Self::from_components(parts)
184    }
185
186    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
187        // Validate scheme - accept both http and https
188        if parts.scheme != "http" && parts.scheme != "https" {
189            return Err(CamelError::InvalidUri(format!(
190                "expected scheme 'http' or 'https', got '{}'",
191                parts.scheme
192            )));
193        }
194
195        // Construct base_url from scheme + path
196        // e.g., "http://localhost:8080/api" from scheme "http" and path "//localhost:8080/api"
197        let base_url = format!("{}:{}", parts.scheme, parts.path);
198
199        let http_method = parts.params.get("httpMethod").cloned();
200
201        let throw_exception_on_failure = match parts.params.get("throwExceptionOnFailure") {
202            Some(v) => parse_bool_param_http(v).map_err(|e| {
203                CamelError::InvalidUri(format!("invalid value for throwExceptionOnFailure: {e}"))
204            })?,
205            None => true,
206        };
207
208        // Parse status code range from "start-end" format (e.g., "200-299")
209        let ok_status_code_range = match parts.params.get("okStatusCodeRange") {
210            Some(v) => parse_ok_status_code_range(v)?,
211            None => (200, 299),
212        };
213
214        let response_timeout = match parts.params.get("responseTimeout") {
215            Some(v) => Some(v.parse::<u64>().map(Duration::from_millis).map_err(|e| {
216                CamelError::InvalidUri(format!("invalid value for responseTimeout: {e}"))
217            })?),
218            None => None,
219        };
220
221        // SSRF protection settings
222        let allow_private_ips = match parts.params.get("allowPrivateIps") {
223            Some(v) => parse_bool_param_http(v).map_err(|e| {
224                CamelError::InvalidUri(format!("invalid value for allowPrivateIps: {e}"))
225            })?,
226            None => false, // Default: block private IPs
227        };
228
229        // Parse comma-separated blocked hosts
230        let blocked_hosts = parts
231            .params
232            .get("blockedHosts")
233            .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
234            .unwrap_or_default();
235
236        let max_body_size = match parts.params.get("maxBodySize") {
237            Some(v) => v.parse::<usize>().map_err(|e| {
238                CamelError::InvalidUri(format!("invalid value for maxBodySize: {e}"))
239            })?,
240            None => 10 * 1024 * 1024, // Default: 10MB
241        };
242
243        let read_timeout_ms = match parts.params.get("readTimeout") {
244            Some(v) => v.parse::<u64>().map_err(|e| {
245                CamelError::InvalidUri(format!("invalid value for readTimeout: {e}"))
246            })?,
247            None => 30_000, // Default: 30s
248        };
249
250        let max_response_bytes = match parts.params.get("maxResponseBytes") {
251            Some(v) => v.parse::<usize>().map_err(|e| {
252                CamelError::InvalidUri(format!("invalid value for maxResponseBytes: {e}"))
253            })?,
254            None => 10 * 1024 * 1024, // Default: 10MB
255        };
256
257        let auth = parse_auth_from_params(&parts.params)?;
258
259        let user_agent = parts.params.get("userAgent").cloned();
260
261        let cookie_handling = match parts.params.get("cookieHandling") {
262            Some(v) if v.eq_ignore_ascii_case("inmemory") => CookieHandling::InMemory,
263            Some(v) if v.eq_ignore_ascii_case("disabled") => CookieHandling::Disabled,
264            Some(v) => {
265                return Err(CamelError::InvalidUri(format!(
266                    "invalid value for cookieHandling: {v} (expected Disabled or InMemory)"
267                )));
268            }
269            None => CookieHandling::Disabled,
270        };
271
272        let bridge_endpoint = match parts.params.get("bridgeEndpoint") {
273            Some(v) => parse_bool_param_http(v).map_err(|e| {
274                CamelError::InvalidUri(format!("invalid value for bridgeEndpoint: {e}"))
275            })?,
276            None => false,
277        };
278
279        let connection_close = match parts.params.get("connectionClose") {
280            Some(v) => parse_bool_param_http(v).map_err(|e| {
281                CamelError::InvalidUri(format!("invalid value for connectionClose: {e}"))
282            })?,
283            None => false,
284        };
285
286        let skip_request_headers = parts
287            .params
288            .get("skipRequestHeaders")
289            .map(|v| {
290                v.split(',')
291                    .map(str::trim)
292                    .filter(|s| !s.is_empty())
293                    .map(|s| s.to_ascii_lowercase())
294                    .collect::<Vec<_>>()
295            })
296            .unwrap_or_default();
297
298        let skip_response_headers = parts
299            .params
300            .get("skipResponseHeaders")
301            .map(|v| {
302                v.split(',')
303                    .map(str::trim)
304                    .filter(|s| !s.is_empty())
305                    .map(|s| s.to_ascii_lowercase())
306                    .collect::<Vec<_>>()
307            })
308            .unwrap_or_default();
309
310        // Collect remaining params (not Camel options) as query params
311        let query_params: HashMap<String, String> = parts
312            .params
313            .into_iter()
314            .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
315            .collect();
316
317        Ok(Self {
318            base_url,
319            http_method,
320            throw_exception_on_failure,
321            ok_status_code_range,
322            response_timeout,
323            query_params,
324            allow_private_ips,
325            blocked_hosts,
326            max_body_size,
327            read_timeout_ms,
328            max_response_bytes,
329            auth,
330            token_provider: None,
331            user_agent,
332            cookie_handling,
333            bridge_endpoint,
334            connection_close,
335            skip_request_headers,
336            skip_response_headers,
337        })
338    }
339}
340
341fn parse_auth_from_params(params: &HashMap<String, String>) -> Result<HttpAuth, CamelError> {
342    let Some(method) = params.get("authMethod") else {
343        return Ok(HttpAuth::None);
344    };
345
346    if method.eq_ignore_ascii_case("none") {
347        return Ok(HttpAuth::None);
348    }
349
350    if method.eq_ignore_ascii_case("basic") {
351        let username = params.get("authUsername").cloned().ok_or_else(|| {
352            CamelError::InvalidUri("authUsername is required for authMethod=Basic".to_string())
353        })?;
354        let password = params.get("authPassword").cloned().ok_or_else(|| {
355            CamelError::InvalidUri("authPassword is required for authMethod=Basic".to_string())
356        })?;
357        return Ok(HttpAuth::Basic { username, password });
358    }
359
360    if method.eq_ignore_ascii_case("bearer") {
361        let token = params.get("authBearerToken").cloned().ok_or_else(|| {
362            CamelError::InvalidUri("authBearerToken is required for authMethod=Bearer".to_string())
363        })?;
364        return Ok(HttpAuth::Bearer { token });
365    }
366
367    Err(CamelError::InvalidUri(format!(
368        "invalid value for authMethod: {method} (expected None, Basic, or Bearer)"
369    )))
370}
371
372fn parse_bool_param_http(value: &str) -> Result<bool, CamelError> {
373    match value.to_ascii_lowercase().as_str() {
374        "true" | "1" | "yes" => Ok(true),
375        "false" | "0" | "no" => Ok(false),
376        _ => Err(CamelError::InvalidUri(format!(
377            "invalid boolean value: '{value}'"
378        ))),
379    }
380}
381
382impl HttpEndpointConfig {
383    pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
384        let parts = parse_uri(uri)?;
385        let mut endpoint = Self::from_components(parts.clone())?;
386        if endpoint.response_timeout.is_none() {
387            endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
388        }
389        if !parts.params.contains_key("allowPrivateIps") {
390            endpoint.allow_private_ips = config.allow_private_ips;
391        }
392        if !parts.params.contains_key("blockedHosts") {
393            endpoint.blocked_hosts = config.blocked_hosts.clone();
394        }
395        if !parts.params.contains_key("maxBodySize") {
396            endpoint.max_body_size = config.max_body_size;
397        }
398        if !parts.params.contains_key("readTimeout") {
399            endpoint.read_timeout_ms = config.read_timeout_ms;
400        }
401        if !parts.params.contains_key("maxResponseBytes") {
402            endpoint.max_response_bytes = config.max_response_bytes;
403        }
404        if !parts.params.contains_key("okStatusCodeRange")
405            && let Some(range) = &config.ok_status_code_range
406        {
407            endpoint.ok_status_code_range = parse_ok_status_code_range(range)?;
408        }
409
410        Ok(endpoint)
411    }
412}
413
414// ---------------------------------------------------------------------------
415// HttpServerConfig
416// ---------------------------------------------------------------------------
417
418/// Configuration for an HTTP server (consumer) endpoint.
419#[derive(Debug, Clone)]
420pub struct HttpServerConfig {
421    /// Bind address, e.g. "0.0.0.0" or "127.0.0.1".
422    pub host: String,
423    /// TCP port to listen on.
424    pub port: u16,
425    /// URL path this consumer handles, e.g. "/orders".
426    pub path: String,
427    /// Maximum request body size in bytes.
428    pub max_request_body: usize,
429    /// Maximum response body size for materializing streams in bytes.
430    pub max_response_body: usize,
431    /// Maximum number of in-flight requests handled concurrently by this server.
432    pub max_inflight_requests: usize,
433}
434
435impl UriConfig for HttpServerConfig {
436    /// Returns "http" as the primary scheme (also accepts "https")
437    fn scheme() -> &'static str {
438        "http"
439    }
440
441    fn from_uri(uri: &str) -> Result<Self, CamelError> {
442        let parts = parse_uri(uri)?;
443        Self::from_components(parts)
444    }
445
446    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
447        // Validate scheme - accept both http and https
448        if parts.scheme != "http" && parts.scheme != "https" {
449            return Err(CamelError::InvalidUri(format!(
450                "expected scheme 'http' or 'https', got '{}'",
451                parts.scheme
452            )));
453        }
454
455        // parts.path is everything after the scheme colon, e.g. "//0.0.0.0:8080/orders"
456        // Strip leading "//"
457        let authority_and_path = parts.path.trim_start_matches('/');
458
459        // Split on the first "/" to separate "host:port" from "/path"
460        let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
461            (&authority_and_path[..idx], &authority_and_path[idx..])
462        } else {
463            (authority_and_path, "/")
464        };
465
466        let path = if path_suffix.is_empty() {
467            "/"
468        } else {
469            path_suffix
470        }
471        .to_string();
472
473        // Parse host:port from authority
474        let (host, port) = if let Some(colon) = authority.rfind(':') {
475            let port_str = &authority[colon + 1..];
476            match port_str.parse::<u16>() {
477                Ok(p) => (authority[..colon].to_string(), p),
478                Err(_) => {
479                    return Err(CamelError::InvalidUri(format!(
480                        "invalid port '{}' in authority",
481                        port_str
482                    )));
483                }
484            }
485        } else {
486            // Default port based on scheme: 443 for https, 80 for http
487            let default_port = if parts.scheme == "https" { 443 } else { 80 };
488            (authority.to_string(), default_port)
489        };
490
491        let max_request_body = parts
492            .params
493            .get("maxRequestBody")
494            .and_then(|v| v.parse::<usize>().ok())
495            .unwrap_or(2 * 1024 * 1024); // Default: 2MB
496
497        let max_response_body = parts
498            .params
499            .get("maxResponseBody")
500            .and_then(|v| v.parse::<usize>().ok())
501            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
502
503        let max_inflight_requests = parts
504            .params
505            .get("maxInflightRequests")
506            .and_then(|v| v.parse::<usize>().ok())
507            .unwrap_or(1024);
508
509        Ok(Self {
510            host,
511            port,
512            path,
513            max_request_body,
514            max_response_body,
515            max_inflight_requests,
516        })
517    }
518}
519
520impl HttpServerConfig {
521    pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
522        let parts = parse_uri(uri)?;
523        let mut server = Self::from_components(parts.clone())?;
524        if !parts.params.contains_key("maxRequestBody") {
525            server.max_request_body = config.max_request_body;
526        }
527        if !parts.params.contains_key("maxResponseBody") {
528            // Default max_response_body is 10MB via HttpConfig::default().max_body_size.
529            server.max_response_body = config.max_body_size;
530        }
531        Ok(server)
532    }
533}
534
535// ---------------------------------------------------------------------------
536// RequestEnvelope / HttpReply
537// ---------------------------------------------------------------------------
538
539/// Body of the HTTP response: already-materialized bytes or a lazy stream.
540///
541/// **Internal plumbing** — subject to change without notice.
542pub enum HttpReplyBody {
543    Bytes(bytes::Bytes),
544    Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
545}
546
547/// An inbound HTTP request sent from the Axum dispatch handler to an
548/// `HttpConsumer` receive loop.
549///
550/// **Internal plumbing** — subject to change without notice.
551pub struct RequestEnvelope {
552    pub method: String,
553    pub path: String,
554    pub query: String,
555    pub headers: http::HeaderMap,
556    pub body: StreamBody,
557    pub reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
558}
559
560/// The HTTP response that `HttpConsumer` sends back to the Axum handler.
561///
562/// **Internal plumbing** — subject to change without notice.
563pub struct HttpReply {
564    pub status: u16,
565    pub headers: Vec<(String, String)>,
566    pub body: HttpReplyBody,
567}
568
569// ---------------------------------------------------------------------------
570// HttpRouteRegistry / ServerRegistry
571// ---------------------------------------------------------------------------
572
573type ServerKey = (String, u16);
574
575/// Handle to a running Axum server on one interface/port.
576struct ServerHandle {
577    registry: HttpRouteRegistry,
578    max_request_body: usize,
579    max_response_body: usize,
580    max_inflight_requests: usize,
581}
582
583/// Process-global registry mapping (host, port) → running Axum server handle.
584pub struct ServerRegistry {
585    inner: Mutex<HashMap<ServerKey, Arc<OnceCell<ServerHandle>>>>,
586}
587
588impl ServerRegistry {
589    /// Returns the global singleton.
590    pub fn global() -> &'static Self {
591        static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
592        INSTANCE.get_or_init(|| ServerRegistry {
593            inner: Mutex::new(HashMap::new()),
594        })
595    }
596
597    /// Returns route registry for `port`, spawning new Axum server if
598    /// none is running on that port yet.
599    pub async fn get_or_spawn(
600        &'static self,
601        host: &str,
602        port: u16,
603        max_request_body: usize,
604        max_response_body: usize,
605        max_inflight_requests: usize,
606    ) -> Result<HttpRouteRegistry, CamelError> {
607        let host_owned = host.to_string();
608
609        let cell = {
610            let mut guard = self.inner.lock().map_err(|_| {
611                CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
612            })?;
613            let key = (host.to_string(), port);
614            guard
615                .entry(key)
616                .or_insert_with(|| Arc::new(OnceCell::new()))
617                .clone()
618        };
619
620        if let Some(existing) = cell.get()
621            && existing.max_request_body != max_request_body
622        {
623            return Err(CamelError::EndpointCreationFailed(format!(
624                "incompatible maxRequestBody for shared server (host={host}, port={port}): {} vs {}",
625                existing.max_request_body, max_request_body
626            )));
627        }
628
629        if let Some(existing) = cell.get()
630            && existing.max_response_body != max_response_body
631        {
632            return Err(CamelError::EndpointCreationFailed(format!(
633                "incompatible maxResponseBody for shared server (host={host}, port={port}): {} vs {}",
634                existing.max_response_body, max_response_body
635            )));
636        }
637
638        if let Some(existing) = cell.get()
639            && existing.max_inflight_requests != max_inflight_requests
640        {
641            return Err(CamelError::EndpointCreationFailed(format!(
642                "incompatible maxInflightRequests for shared server (host={host}, port={port}): {} vs {}",
643                existing.max_inflight_requests, max_inflight_requests
644            )));
645        }
646
647        let handle = cell
648            .get_or_try_init(|| async {
649                let addr = format!("{host_owned}:{port}");
650                let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
651                    CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
652                })?;
653                let registry = HttpRouteRegistry::new();
654                let inflight = Arc::new(tokio::sync::Semaphore::new(max_inflight_requests));
655                let task = tokio::spawn(run_axum_server(
656                    listener,
657                    registry.clone(),
658                    max_request_body,
659                    max_response_body,
660                    Arc::clone(&inflight),
661                ));
662                let addr_for_monitor = format!("{host_owned}:{port}");
663                tokio::spawn(monitor_axum_task(task, addr_for_monitor));
664                Ok::<ServerHandle, CamelError>(ServerHandle {
665                    registry,
666                    max_request_body,
667                    max_response_body,
668                    max_inflight_requests,
669                })
670            })
671            .await?;
672
673        Ok(handle.registry.clone())
674    }
675
676    /// Reset the global registry — **test-only**.
677    ///
678    /// Clears all registered server handles so that tests can start from a clean
679    /// state. This is intentionally `#[cfg(test)]` because the registry is a
680    /// process-global singleton in production and resetting it would break
681    /// running servers.
682    #[cfg(test)]
683    pub fn reset() {
684        let instance = Self::global();
685        let mut guard = instance
686            .inner
687            .lock()
688            .expect("ServerRegistry lock poisoned during test reset");
689        guard.clear();
690    }
691}
692
693// ---------------------------------------------------------------------------
694// Axum server
695// ---------------------------------------------------------------------------
696
697use axum::{
698    Router,
699    body::Body as AxumBody,
700    extract::{Request, State},
701    http::{Response, StatusCode},
702    response::IntoResponse,
703};
704
705#[derive(Clone)]
706pub(crate) struct AppState {
707    registry: HttpRouteRegistry,
708    max_request_body: usize,
709    max_response_body: usize,
710    inflight: Arc<tokio::sync::Semaphore>,
711}
712
713async fn run_axum_server(
714    listener: tokio::net::TcpListener,
715    registry: HttpRouteRegistry,
716    max_request_body: usize,
717    max_response_body: usize,
718    inflight: Arc<tokio::sync::Semaphore>,
719) {
720    let state = AppState {
721        registry,
722        max_request_body,
723        max_response_body,
724        inflight,
725    };
726    let app = Router::new().fallback(dispatch_handler).with_state(state);
727
728    axum::serve(listener, app).await.unwrap_or_else(|e| {
729        tracing::error!(error = %e, "Axum server error");
730    });
731}
732
733/// Monitors an Axum server task and emits a structured error event if it
734/// exits unexpectedly.
735///
736/// # Limitations
737/// The HTTP server is shared across all routes on a port. Full per-route
738/// CrashNotification propagation is deferred — this provides observable
739/// structured logging as a first guard.
740async fn monitor_axum_task(handle: tokio::task::JoinHandle<()>, addr: String) {
741    match handle.await {
742        Ok(()) => {
743            // Clean exit (process shutdown or normal stop)
744        }
745        Err(join_err) => {
746            tracing::error!(
747                addr = %addr,
748                error = %join_err,
749                "Axum server task exited unexpectedly — all routes on this port are now dead"
750            );
751        }
752    }
753}
754
755async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
756    let path = req.uri().path().to_owned();
757
758    // 1. API match — lookup by path, only consume body if matched
759    let api_sender = {
760        let inner = state.registry.inner.read().await;
761        inner.api_routes.get(&path).cloned()
762    }; // lock released BEFORE any IO
763
764    if let Some(sender) = api_sender {
765        let method = req.method().to_string();
766        let query = req.uri().query().unwrap_or("").to_string();
767        let headers = req.headers().clone();
768
769        // Check Content-Length against limit BEFORE opening the stream
770        let content_length: Option<u64> = headers
771            .get(http::header::CONTENT_LENGTH)
772            .and_then(|v| v.to_str().ok())
773            .and_then(|s| s.parse().ok());
774
775        if let Some(len) = content_length
776            && len > state.max_request_body as u64
777        {
778            return Response::builder()
779                .status(StatusCode::PAYLOAD_TOO_LARGE)
780                .body(AxumBody::from("Request body exceeds configured limit"))
781                .expect("infallible"); // allow-unwrap
782        }
783
784        let _permit = match Arc::clone(&state.inflight).try_acquire_owned() {
785            Ok(permit) => permit,
786            Err(_) => {
787                return Response::builder()
788                    .status(StatusCode::SERVICE_UNAVAILABLE)
789                    .body(AxumBody::from("Service Unavailable"))
790                    .expect("infallible"); // allow-unwrap
791            }
792        };
793
794        // Build StreamBody from Axum body WITHOUT materializing
795        let content_type = headers
796            .get(http::header::CONTENT_TYPE)
797            .and_then(|v| v.to_str().ok())
798            .map(|s| s.to_string());
799
800        let data_stream: BodyDataStream = req.into_body().into_data_stream();
801        let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
802        let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
803
804        let stream_body = StreamBody {
805            stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
806            metadata: StreamMetadata {
807                size_hint: content_length,
808                content_type,
809                origin: None,
810            },
811        };
812
813        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
814        let envelope = RequestEnvelope {
815            method,
816            path,
817            query,
818            headers,
819            body: stream_body,
820            reply_tx,
821        };
822
823        if sender.send(envelope).await.is_err() {
824            return Response::builder()
825                .status(StatusCode::SERVICE_UNAVAILABLE)
826                .body(AxumBody::from("Consumer unavailable"))
827                .expect("infallible"); // allow-unwrap
828        }
829
830        match reply_rx.await {
831            Ok(reply) => {
832                let reply = match reply.body {
833                    HttpReplyBody::Bytes(b)
834                        if exceeds_max_response_body(b.len(), state.max_response_body) =>
835                    {
836                        HttpReply {
837                            status: 500,
838                            headers: vec![],
839                            body: HttpReplyBody::Bytes(bytes::Bytes::from(
840                                "Response body exceeds configured limit",
841                            )),
842                        }
843                    }
844                    _ => reply,
845                };
846
847                let status =
848                    StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
849                let mut builder = Response::builder().status(status);
850                for (k, v) in &reply.headers {
851                    builder = builder.header(k.as_str(), v.as_str());
852                }
853                match reply.body {
854                    HttpReplyBody::Bytes(b) => {
855                        builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
856                            Response::builder()
857                                .status(StatusCode::INTERNAL_SERVER_ERROR)
858                                .body(AxumBody::from("Invalid response headers from consumer"))
859                                .expect("infallible") // allow-unwrap
860                        })
861                    }
862                    HttpReplyBody::Stream(stream) => builder
863                        .body(AxumBody::from_stream(stream))
864                        .unwrap_or_else(|_| {
865                            Response::builder()
866                                .status(StatusCode::INTERNAL_SERVER_ERROR)
867                                .body(AxumBody::from("Invalid response headers from consumer"))
868                                .expect("infallible") // allow-unwrap
869                        }),
870                }
871            }
872            Err(_) => Response::builder()
873                .status(StatusCode::INTERNAL_SERVER_ERROR)
874                .body(AxumBody::from("Pipeline error"))
875                .expect("infallible"), // allow-unwrap
876        }
877    } else {
878        // No API route matched — try static mounts
879        static_dispatch::dispatch_static(&state, req, &path).await
880    }
881}
882
883fn exceeds_max_response_body(len: usize, max: usize) -> bool {
884    len > max
885}
886
887fn title_case_header(name: &str) -> String {
888    name.split('-')
889        .map(|part| {
890            let mut chars = part.chars();
891            match chars.next() {
892                None => String::new(),
893                Some(first) => first.to_uppercase().chain(chars.as_str().chars()).collect(),
894            }
895        })
896        .collect::<Vec<_>>()
897        .join("-")
898}
899
900// ---------------------------------------------------------------------------
901// HttpConsumer
902// ---------------------------------------------------------------------------
903
904pub struct HttpConsumer {
905    config: HttpServerConfig,
906}
907
908impl HttpConsumer {
909    pub fn new(config: HttpServerConfig) -> Self {
910        Self { config }
911    }
912}
913
914#[async_trait::async_trait]
915impl Consumer for HttpConsumer {
916    async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
917        use camel_component_api::{Body, Exchange, Message};
918
919        let registry = ServerRegistry::global()
920            .get_or_spawn(
921                &self.config.host,
922                self.config.port,
923                self.config.max_request_body,
924                self.config.max_response_body,
925                self.config.max_inflight_requests,
926            )
927            .await?;
928
929        // Create channel for this path and register it
930        let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
931        registry
932            .register_api_route(self.config.path.clone(), env_tx)
933            .await;
934
935        let path = self.config.path.clone();
936        let registry_for_cleanup = registry.clone();
937        let cancel_token = ctx.cancel_token();
938        loop {
939            tokio::select! {
940                _ = ctx.cancelled() => {
941                    break;
942                }
943                envelope = env_rx.recv() => {
944                    let Some(envelope) = envelope else { break; };
945
946                    // Build Exchange from HTTP request
947                    let mut msg = Message::default();
948
949                    // Set standard Camel HTTP headers
950                    msg.set_header("CamelHttpMethod",
951                        serde_json::Value::String(envelope.method.clone()));
952                    msg.set_header("CamelHttpPath",
953                        serde_json::Value::String(envelope.path.clone()));
954                    msg.set_header("CamelHttpQuery",
955                        serde_json::Value::String(envelope.query.clone()));
956
957                    // Forward HTTP headers with Title-Case names (hyper lowercases them)
958                    for (k, v) in &envelope.headers {
959                        if let Ok(val_str) = v.to_str() {
960                            msg.set_header(
961                                title_case_header(k.as_str()),
962                                serde_json::Value::String(val_str.to_string()),
963                            );
964                        }
965                    }
966
967                    // Body: always arrives as Body::Stream (native streaming)
968                    // Routes can call into_bytes() if they need to materialize
969                    msg.body = Body::Stream(envelope.body);
970
971                    #[allow(unused_mut)]
972                    let mut exchange = Exchange::new(msg);
973
974                    // Extract W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
975                    #[cfg(feature = "otel")]
976                    {
977                        let headers: HashMap<String, String> = envelope
978                            .headers
979                            .iter()
980                            .filter_map(|(k, v)| {
981                                Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
982                            })
983                            .collect();
984                        camel_otel::extract_into_exchange(&mut exchange, &headers);
985                    }
986
987                    let reply_tx = envelope.reply_tx;
988                    let sender = ctx.sender().clone();
989                    let path_clone = path.clone();
990                    let cancel = cancel_token.clone();
991
992                    // Spawn a task to handle this request concurrently
993                    //
994                    // NOTE: This spawns a separate tokio task for each incoming HTTP request to enable
995                    // true concurrent request processing. This change was introduced as part of the
996                    // pipeline concurrency feature and was NOT part of the original HttpConsumer design.
997                    //
998                    // Rationale:
999                    // 1. Without spawning per-request tasks, the send_and_wait() operation would block
1000                    //    the consumer's main loop until the pipeline processing completes
1001                    // 2. This blocking would prevent multiple HTTP requests from being processed
1002                    //    concurrently, even when ConcurrencyModel::Concurrent is enabled on the pipeline
1003                    // 3. The channel would never have multiple exchanges buffered simultaneously,
1004                    //    defeating the purpose of pipeline-side concurrency
1005                    // 4. By spawning a task per request, we allow the consumer loop to continue
1006                    //    accepting new requests while existing ones are processed in the pipeline
1007                    //
1008                    // This approach effectively decouples request acceptance from pipeline processing,
1009                    // allowing the channel to buffer multiple exchanges that can be processed concurrently
1010                    // by the pipeline when ConcurrencyModel::Concurrent is active.
1011                    tokio::spawn(async move {
1012                        // Check for cancellation before sending to pipeline.
1013                        // Returns 503 (Service Unavailable) instead of letting the request
1014                        // enter a shutting-down pipeline. This is a behavioral change from
1015                        // the pre-concurrency implementation where cancellation during
1016                        // processing would result in a 500 (Internal Server Error).
1017                        // 503 is more semantically correct: the server is temporarily
1018                        // unable to handle the request due to shutdown.
1019                        if cancel.is_cancelled() {
1020                            let _ = reply_tx.send(HttpReply {
1021                                status: 503,
1022                                headers: vec![],
1023                                body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
1024                            });
1025                            return;
1026                        }
1027
1028                        // Send through pipeline and await result
1029                        let (tx, rx) = tokio::sync::oneshot::channel();
1030                        let envelope = camel_component_api::consumer::ExchangeEnvelope {
1031                            exchange,
1032                            reply_tx: Some(tx),
1033                        };
1034
1035                        let result = match sender.send(envelope).await {
1036                            Ok(()) => rx.await.map_err(|_| camel_component_api::CamelError::ChannelClosed),
1037                            Err(_) => Err(camel_component_api::CamelError::ChannelClosed),
1038                        }
1039                        .and_then(|r| r);
1040
1041                        let reply = match result {
1042                            Ok(out) => {
1043                                let status = out
1044                                    .input
1045                                    .header("CamelHttpResponseCode")
1046                                    .and_then(|v| {
1047                                        let raw = v.as_u64()
1048                                            .or_else(|| v.as_str().and_then(|s| s.parse().ok()))?;
1049                                        let code = raw as u16;
1050                                        (100..1000).contains(&code).then_some(code)
1051                                    })
1052                                    .unwrap_or(200);
1053
1054                                let user_content_type = out
1055                                    .input
1056                                    .header("Content-Type")
1057                                    .and_then(|v| v.as_str().map(|s| s.to_string()));
1058
1059                                let (reply_body, inferred_content_type): (HttpReplyBody, Option<String>) = match out.input.body {
1060                                    Body::Empty => (HttpReplyBody::Bytes(bytes::Bytes::new()), None),
1061                                    Body::Bytes(b) => (HttpReplyBody::Bytes(b), None),
1062                                    Body::Text(s) => (HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())), Some("text/plain; charset=utf-8".to_string())),
1063                                    Body::Xml(s) => (HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())), Some("application/xml".to_string())),
1064                                    Body::Json(v) => (HttpReplyBody::Bytes(bytes::Bytes::from(
1065                                        v.to_string().into_bytes(),
1066                                    )), Some("application/json".to_string())),
1067                                    Body::Stream(s) => {
1068                                        let ct = s.metadata.content_type.clone();
1069                                        match s.stream.lock().await.take() {
1070                                            Some(stream) => (
1071                                                HttpReplyBody::Stream(stream),
1072                                                ct,
1073                                            ),
1074                                            None => {
1075                                                tracing::error!(
1076                                                    "Body::Stream already consumed before HTTP reply — returning 500"
1077                                                );
1078                                                let error_reply = HttpReply {
1079                                                    status: 500,
1080                                                    headers: vec![],
1081                                                    body: HttpReplyBody::Bytes(bytes::Bytes::new()),
1082                                                };
1083                                                if reply_tx.send(error_reply).is_err() {
1084                                                    debug!("reply_tx dropped before error reply could be sent");
1085                                                }
1086                                                return;
1087                                            }
1088                                        }
1089                                    }
1090                                };
1091
1092                                let mut resp_headers: Vec<(String, String)> = out
1093                                    .input
1094                                    .headers
1095                                    .iter()
1096                                    .filter(|(k, _)| !k.starts_with("Camel"))
1097                                    .filter(|(k, _)| {
1098                                        !matches!(
1099                                            k.to_lowercase().as_str(),
1100                                            "content-length"
1101                                            | "content-type"
1102                                            | "transfer-encoding"
1103                                            | "connection"
1104                                            | "cache-control"
1105                                            | "date"
1106                                            | "pragma"
1107                                            | "trailer"
1108                                            | "upgrade"
1109                                            | "via"
1110                                            | "warning"
1111                                            | "host"
1112                                            | "user-agent"
1113                                            | "accept"
1114                                            | "accept-encoding"
1115                                            | "accept-language"
1116                                            | "accept-charset"
1117                                            | "authorization"
1118                                            | "proxy-authorization"
1119                                            | "cookie"
1120                                            | "expect"
1121                                            | "from"
1122                                            | "if-match"
1123                                            | "if-modified-since"
1124                                            | "if-none-match"
1125                                            | "if-range"
1126                                            | "if-unmodified-since"
1127                                            | "max-forwards"
1128                                            | "proxy-connection"
1129                                            | "range"
1130                                            | "referer"
1131                                            | "te"
1132                                        )
1133                                    })
1134                                    .filter_map(|(k, v)| {
1135                                        v.as_str().map(|s| (k.clone(), s.to_string()))
1136                                    })
1137                                    .collect();
1138
1139                                let content_type = user_content_type
1140                                    .or(inferred_content_type);
1141                                if let Some(ct) = content_type {
1142                                    resp_headers.push(("Content-Type".to_string(), ct));
1143                                }
1144
1145                                HttpReply {
1146                                    status,
1147                                    headers: resp_headers,
1148                                    body: reply_body,
1149                                }
1150                            }
1151                            Err(CamelError::Stopped) => {
1152                                tracing::debug!(path = %path_clone, "Route stopped — returning 204 No Content");
1153                                HttpReply {
1154                                    status: 204,
1155                                    headers: vec![],
1156                                    body: HttpReplyBody::Bytes(bytes::Bytes::new()),
1157                                }
1158                            }
1159                            Err(CamelError::Unauthenticated(msg)) => {
1160                                tracing::warn!(error = %msg, path = %path_clone, "Authentication failed");
1161                                HttpReply {
1162                                    status: 401,
1163                                    headers: vec![("WWW-Authenticate".to_string(), "Bearer".to_string())],
1164                                    body: HttpReplyBody::Bytes(bytes::Bytes::from("Unauthorized")),
1165                                }
1166                            }
1167                            Err(CamelError::Unauthorized(msg)) => {
1168                                tracing::warn!(error = %msg, path = %path_clone, "Authorization failed");
1169                                HttpReply {
1170                                    status: 403,
1171                                    headers: vec![],
1172                                    body: HttpReplyBody::Bytes(bytes::Bytes::from("Forbidden")),
1173                                }
1174                            }
1175                            Err(e) => {
1176                                tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
1177                                HttpReply {
1178                                    status: 500,
1179                                    headers: vec![],
1180                                    body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
1181                                }
1182                            }
1183                        };
1184
1185                        // Reply to Axum handler (ignore error if client disconnected)
1186                        let _ = reply_tx.send(reply);
1187                    });
1188                }
1189            }
1190        }
1191
1192        // Deregister this path
1193        registry_for_cleanup.unregister_api_route(&path).await;
1194
1195        Ok(())
1196    }
1197
1198    async fn stop(&mut self) -> Result<(), CamelError> {
1199        Ok(())
1200    }
1201
1202    fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1203        camel_component_api::ConcurrencyModel::Concurrent { max: None }
1204    }
1205}
1206
1207// ---------------------------------------------------------------------------
1208// HttpComponent / HttpsComponent
1209// ---------------------------------------------------------------------------
1210
1211pub struct HttpComponent {
1212    config: HttpConfig,
1213}
1214
1215fn build_client(config: &HttpConfig, cookie_handling: CookieHandling) -> reqwest::Client {
1216    let mut builder = reqwest::Client::builder()
1217        .connect_timeout(Duration::from_millis(config.connect_timeout_ms))
1218        .pool_max_idle_per_host(config.pool_max_idle_per_host)
1219        .pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
1220
1221    if !config.follow_redirects {
1222        builder = builder.redirect(reqwest::redirect::Policy::none());
1223    } else if let Some(max_redirects) = config.max_redirects {
1224        builder = builder.redirect(reqwest::redirect::Policy::limited(max_redirects));
1225    }
1226
1227    if let Some(proxy_url) = &config.proxy_url
1228        && let Ok(proxy) = reqwest::Proxy::all(proxy_url)
1229    {
1230        builder = builder.proxy(proxy);
1231    }
1232
1233    if matches!(cookie_handling, CookieHandling::InMemory) {
1234        // TODO(HTTP-013): enable reqwest cookie jar once workspace reqwest features include cookie_store.
1235    }
1236
1237    if let Some(tls) = &config.tls
1238        && tls.enabled
1239    {
1240        if tls.insecure || !tls.verify_peer {
1241            builder = builder.danger_accept_invalid_certs(true);
1242        }
1243
1244        if let Some(ca_path) = &tls.ca_cert_path
1245            && let Ok(ca_bytes) = std::fs::read(ca_path)
1246        {
1247            let cert = reqwest::Certificate::from_pem(&ca_bytes)
1248                .or_else(|_| reqwest::Certificate::from_der(&ca_bytes));
1249            if let Ok(ca_cert) = cert {
1250                builder = builder.add_root_certificate(ca_cert);
1251            }
1252        }
1253
1254        if let (Some(cert_path), Some(key_path)) = (&tls.client_cert_path, &tls.client_key_path)
1255            && let (Ok(cert_bytes), Ok(key_bytes)) =
1256                (std::fs::read(cert_path), std::fs::read(key_path))
1257        {
1258            let mut identity_pem = cert_bytes;
1259            identity_pem.extend_from_slice(&key_bytes);
1260            if let Ok(identity) = reqwest::Identity::from_pem(&identity_pem) {
1261                builder = builder.identity(identity);
1262            }
1263        }
1264    }
1265
1266    builder
1267        .build()
1268        .expect("reqwest::Client::build() with valid config should not fail") // allow-unwrap
1269}
1270
1271impl HttpComponent {
1272    pub fn new() -> Self {
1273        let config = HttpConfig::default();
1274        Self { config }
1275    }
1276
1277    pub fn with_config(config: HttpConfig) -> Self {
1278        Self { config }
1279    }
1280
1281    pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1282        match config {
1283            Some(cfg) => Self::with_config(cfg),
1284            None => Self::new(),
1285        }
1286    }
1287}
1288
1289impl Default for HttpComponent {
1290    fn default() -> Self {
1291        Self::new()
1292    }
1293}
1294
1295impl Component for HttpComponent {
1296    fn scheme(&self) -> &str {
1297        "http"
1298    }
1299
1300    fn create_endpoint(
1301        &self,
1302        uri: &str,
1303        ctx: &dyn camel_component_api::ComponentContext,
1304    ) -> Result<Box<dyn Endpoint>, CamelError> {
1305        self.config.validate()?;
1306        let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1307        let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1308        let client = build_client(&self.config, config.cookie_handling);
1309        ctx.register_current_route_health_check(Arc::new(HttpHealthCheck::new(
1310            server_config.host.clone(),
1311            server_config.port,
1312        )));
1313        Ok(Box::new(HttpEndpoint {
1314            uri: uri.to_string(),
1315            config,
1316            server_config,
1317            client,
1318        }))
1319    }
1320}
1321
1322pub struct HttpsComponent {
1323    config: HttpConfig,
1324}
1325
1326impl HttpsComponent {
1327    pub fn new() -> Self {
1328        let config = HttpConfig::default();
1329        Self { config }
1330    }
1331
1332    pub fn with_config(config: HttpConfig) -> Self {
1333        Self { config }
1334    }
1335
1336    pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1337        match config {
1338            Some(cfg) => Self::with_config(cfg),
1339            None => Self::new(),
1340        }
1341    }
1342}
1343
1344impl Default for HttpsComponent {
1345    fn default() -> Self {
1346        Self::new()
1347    }
1348}
1349
1350impl Component for HttpsComponent {
1351    fn scheme(&self) -> &str {
1352        "https"
1353    }
1354
1355    fn create_endpoint(
1356        &self,
1357        uri: &str,
1358        ctx: &dyn camel_component_api::ComponentContext,
1359    ) -> Result<Box<dyn Endpoint>, CamelError> {
1360        self.config.validate()?;
1361        let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1362        let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1363        let client = build_client(&self.config, config.cookie_handling);
1364        ctx.register_current_route_health_check(Arc::new(HttpHealthCheck::new(
1365            server_config.host.clone(),
1366            server_config.port,
1367        )));
1368        Ok(Box::new(HttpEndpoint {
1369            uri: uri.to_string(),
1370            config,
1371            server_config,
1372            client,
1373        }))
1374    }
1375}
1376
1377// ---------------------------------------------------------------------------
1378// HttpEndpoint
1379// ---------------------------------------------------------------------------
1380
1381struct HttpEndpoint {
1382    uri: String,
1383    config: HttpEndpointConfig,
1384    server_config: HttpServerConfig,
1385    client: reqwest::Client,
1386}
1387
1388impl Endpoint for HttpEndpoint {
1389    fn uri(&self) -> &str {
1390        &self.uri
1391    }
1392
1393    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1394        Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
1395    }
1396
1397    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1398        let producer = HttpProducer {
1399            config: Arc::new(self.config.clone()),
1400            client: self.client.clone(),
1401        };
1402        if let Some(ref provider) = self.config.token_provider {
1403            let layer = BearerTokenLayer::new(Arc::clone(provider));
1404            Ok(BoxProcessor::new(layer.layer(producer)))
1405        } else {
1406            Ok(BoxProcessor::new(producer))
1407        }
1408    }
1409}
1410
1411// ---------------------------------------------------------------------------
1412// SSRF Protection
1413// ---------------------------------------------------------------------------
1414
1415fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
1416    let parsed = url::Url::parse(url)
1417        .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1418
1419    // Check blocked hosts
1420    if let Some(host) = parsed.host_str()
1421        && config.blocked_hosts.iter().any(|blocked| host == blocked)
1422    {
1423        return Err(CamelError::ProcessorError(format!(
1424            "Host '{}' is blocked",
1425            host
1426        )));
1427    }
1428
1429    // Check private IPs if not allowed
1430    if !config.allow_private_ips
1431        && let Some(host) = parsed.host()
1432    {
1433        match host {
1434            url::Host::Ipv4(ip) => {
1435                if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
1436                    return Err(CamelError::ProcessorError(format!(
1437                        "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
1438                        ip
1439                    )));
1440                }
1441            }
1442            url::Host::Ipv6(ip) => {
1443                if ip.is_loopback() {
1444                    return Err(CamelError::ProcessorError(format!(
1445                        "Loopback IP '{}' not allowed",
1446                        ip
1447                    )));
1448                }
1449            }
1450            url::Host::Domain(domain) => {
1451                // Block common internal domains
1452                let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
1453                if blocked_domains.contains(&domain) {
1454                    return Err(CamelError::ProcessorError(format!(
1455                        "Domain '{}' is not allowed",
1456                        domain
1457                    )));
1458                }
1459            }
1460        }
1461    }
1462
1463    Ok(())
1464}
1465
1466fn is_private_ip(ip: &IpAddr) -> bool {
1467    match ip {
1468        IpAddr::V4(ipv4) => {
1469            ipv4.is_private() || ipv4.is_loopback() || ipv4.is_link_local() || ipv4.octets()[0] == 0
1470        }
1471        IpAddr::V6(ipv6) => {
1472            let seg0 = ipv6.segments()[0];
1473            ipv6.is_loopback()
1474                // fc00::/7 (ULA)
1475                || (seg0 & 0xfe00) == 0xfc00
1476                // fe80::/10 (link-local)
1477                || (seg0 & 0xffc0) == 0xfe80
1478                // ::ffff:0:0/96 (IPv4-mapped): only block if the mapped IPv4 is private
1479                || ipv6
1480                    .to_ipv4_mapped()
1481                    .map(|v4| {
1482                        v4.is_private()
1483                            || v4.is_loopback()
1484                            || v4.is_link_local()
1485                            || v4.octets()[0] == 0
1486                    })
1487                    .unwrap_or(false)
1488        }
1489    }
1490}
1491
1492async fn validate_resolved_host_for_ssrf(
1493    url: &str,
1494    config: &HttpEndpointConfig,
1495) -> Result<(), CamelError> {
1496    if config.allow_private_ips {
1497        return Ok(());
1498    }
1499
1500    let parsed = url::Url::parse(url)
1501        .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1502    let Some(host) = parsed.host_str() else {
1503        return Ok(());
1504    };
1505    let Some(port) = parsed.port_or_known_default() else {
1506        return Ok(());
1507    };
1508
1509    let resolved = tokio::net::lookup_host((host, port)).await.map_err(|e| {
1510        CamelError::ProcessorError(format!("Failed to resolve host '{}': {}", host, e))
1511    })?;
1512
1513    for addr in resolved {
1514        let ip = addr.ip();
1515        if is_private_ip(&ip) {
1516            return Err(CamelError::ProcessorError(format!(
1517                "Target resolved to private IP: {}",
1518                ip
1519            )));
1520        }
1521    }
1522
1523    Ok(())
1524}
1525
1526// ---------------------------------------------------------------------------
1527// HttpProducer
1528// ---------------------------------------------------------------------------
1529
1530#[derive(Clone)]
1531struct HttpProducer {
1532    config: Arc<HttpEndpointConfig>,
1533    client: reqwest::Client,
1534}
1535
1536impl HttpProducer {
1537    fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1538        if let Some(ref method) = config.http_method {
1539            return method.to_uppercase();
1540        }
1541        if let Some(method) = exchange
1542            .input
1543            .header("CamelHttpMethod")
1544            .and_then(|v| v.as_str())
1545        {
1546            return method.to_uppercase();
1547        }
1548        if !exchange.input.body.is_empty() {
1549            return "POST".to_string();
1550        }
1551        "GET".to_string()
1552    }
1553
1554    fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1555        if let Some(uri) = exchange
1556            .input
1557            .header("CamelHttpUri")
1558            .and_then(|v| v.as_str())
1559        {
1560            let mut url = uri.to_string();
1561            if let Some(path) = exchange
1562                .input
1563                .header("CamelHttpPath")
1564                .and_then(|v| v.as_str())
1565            {
1566                if !url.ends_with('/') && !path.starts_with('/') {
1567                    url.push('/');
1568                }
1569                url.push_str(path);
1570            }
1571            if let Some(query) = exchange
1572                .input
1573                .header("CamelHttpQuery")
1574                .and_then(|v| v.as_str())
1575            {
1576                url.push('?');
1577                url.push_str(query);
1578            }
1579            return url;
1580        }
1581
1582        let mut url = config.base_url.clone();
1583
1584        if let Some(path) = exchange
1585            .input
1586            .header("CamelHttpPath")
1587            .and_then(|v| v.as_str())
1588        {
1589            if !url.ends_with('/') && !path.starts_with('/') {
1590                url.push('/');
1591            }
1592            url.push_str(path);
1593        }
1594
1595        if let Some(query) = exchange
1596            .input
1597            .header("CamelHttpQuery")
1598            .and_then(|v| v.as_str())
1599        {
1600            url.push('?');
1601            url.push_str(query);
1602        } else if !config.query_params.is_empty() {
1603            let mut parsed = url::Url::parse(&url).expect("base URL must be valid"); // allow-unwrap
1604            for (k, v) in &config.query_params {
1605                parsed.query_pairs_mut().append_pair(k, v);
1606            }
1607            url = parsed.to_string();
1608        }
1609
1610        url
1611    }
1612
1613    fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1614        status >= range.0 && status <= range.1
1615    }
1616}
1617
1618impl Service<Exchange> for HttpProducer {
1619    type Response = Exchange;
1620    type Error = CamelError;
1621    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1622
1623    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1624        Poll::Ready(Ok(()))
1625    }
1626
1627    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1628        let config = self.config.clone();
1629        let client = self.client.clone();
1630
1631        Box::pin(async move {
1632            let method_str = HttpProducer::resolve_method(&exchange, &config);
1633            let url = HttpProducer::resolve_url(&exchange, &config);
1634
1635            // SECURITY: Validate URL for SSRF
1636            validate_url_for_ssrf(&url, &config)?;
1637            validate_resolved_host_for_ssrf(&url, &config).await?;
1638
1639            debug!(
1640                correlation_id = %exchange.correlation_id(),
1641                method = %method_str,
1642                url = %url,
1643                "HTTP request"
1644            );
1645
1646            let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1647                CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1648            })?;
1649
1650            let mut request = client.request(method, &url);
1651
1652            if let Some(timeout) = config.response_timeout {
1653                request = request.timeout(timeout);
1654            }
1655
1656            if let Some(user_agent) = &config.user_agent
1657                && !config.bridge_endpoint
1658            {
1659                request = request.header("User-Agent", user_agent.clone());
1660            }
1661
1662            // Inject W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
1663            #[cfg(feature = "otel")]
1664            let should_inject_otel = !config.bridge_endpoint;
1665            #[cfg(feature = "otel")]
1666            if should_inject_otel {
1667                let mut otel_headers = HashMap::new();
1668                camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1669                for (k, v) in otel_headers {
1670                    if let (Ok(name), Ok(val)) = (
1671                        reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1672                        reqwest::header::HeaderValue::from_str(&v),
1673                    ) {
1674                        request = request.header(name, val);
1675                    }
1676                }
1677            }
1678
1679            for (key, value) in &exchange.input.headers {
1680                if !key.starts_with("Camel")
1681                    && !config
1682                        .skip_request_headers
1683                        .iter()
1684                        .any(|h| h.eq_ignore_ascii_case(key))
1685                    && let Some(val_str) = value.as_str()
1686                    && let (Ok(name), Ok(val)) = (
1687                        reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1688                        reqwest::header::HeaderValue::from_str(val_str),
1689                    )
1690                {
1691                    request = request.header(name, val);
1692                }
1693            }
1694
1695            if !config.bridge_endpoint {
1696                match &config.auth {
1697                    HttpAuth::None => {}
1698                    HttpAuth::Basic { username, password } => {
1699                        request = request.basic_auth(username, Some(password));
1700                    }
1701                    HttpAuth::Bearer { token } => {
1702                        request = request.bearer_auth(token);
1703                    }
1704                }
1705
1706                if config.connection_close {
1707                    request = request.header("Connection", "close");
1708                }
1709            }
1710
1711            match exchange.input.body {
1712                Body::Stream(ref s) => {
1713                    let mut stream_lock = s.stream.lock().await;
1714                    if let Some(stream) = stream_lock.take() {
1715                        request = request.body(reqwest::Body::wrap_stream(stream));
1716                    } else {
1717                        return Err(CamelError::AlreadyConsumed);
1718                    }
1719                }
1720                _ => {
1721                    // For other types, materialize with configured limit
1722                    let body = std::mem::take(&mut exchange.input.body);
1723                    let bytes = body.into_bytes(config.max_body_size).await?;
1724                    if !bytes.is_empty() {
1725                        request = request.body(bytes);
1726                    }
1727                }
1728            }
1729
1730            let response = request
1731                .send()
1732                .await
1733                .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1734
1735            let status_code = response.status().as_u16();
1736            let status_text = response
1737                .status()
1738                .canonical_reason()
1739                .unwrap_or("Unknown")
1740                .to_string();
1741
1742            for (key, value) in response.headers() {
1743                if config
1744                    .skip_response_headers
1745                    .iter()
1746                    .any(|h| h.eq_ignore_ascii_case(key.as_str()))
1747                {
1748                    continue;
1749                }
1750                if let Ok(val_str) = value.to_str() {
1751                    exchange.input.set_header(
1752                        title_case_header(key.as_str()),
1753                        serde_json::Value::String(val_str.to_string()),
1754                    );
1755                }
1756            }
1757
1758            exchange.input.set_header(
1759                "CamelHttpResponseCode",
1760                serde_json::Value::Number(status_code.into()),
1761            );
1762            exchange.input.set_header(
1763                "CamelHttpResponseText",
1764                serde_json::Value::String(status_text.clone()),
1765            );
1766
1767            // Read response body with timeout and size guard (HTTP-004, HTTP-005)
1768            let read_timeout = Duration::from_millis(config.read_timeout_ms);
1769            let response_body = tokio::time::timeout(read_timeout, async {
1770                // Check Content-Length header before allocating
1771                if let Some(content_len) = response.content_length()
1772                    && content_len > config.max_response_bytes as u64
1773                {
1774                    return Err(CamelError::ProcessorError(format!(
1775                        "Response body too large: {} bytes exceeds limit of {} bytes",
1776                        content_len, config.max_response_bytes
1777                    )));
1778                }
1779                // Use bytes_stream() for lazy streaming with size guard
1780                use futures::TryStreamExt;
1781                let mut stream = response.bytes_stream();
1782                let mut total: usize = 0;
1783                let mut collected = Vec::new();
1784                while let Some(chunk) = stream.try_next().await.map_err(|e| {
1785                    CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1786                })? {
1787                    total += chunk.len();
1788                    if total > config.max_response_bytes {
1789                        return Err(CamelError::ProcessorError(format!(
1790                            "Response body too large: {} bytes exceeds limit of {} bytes",
1791                            total, config.max_response_bytes
1792                        )));
1793                    }
1794                    collected.push(chunk);
1795                }
1796                let mut result = bytes::BytesMut::with_capacity(total);
1797                for chunk in collected {
1798                    result.extend_from_slice(&chunk);
1799                }
1800                Ok::<bytes::Bytes, CamelError>(result.freeze())
1801            })
1802            .await
1803            .map_err(|_| {
1804                CamelError::ProcessorError(format!(
1805                    "Read timeout after {}ms",
1806                    config.read_timeout_ms
1807                ))
1808            })??;
1809
1810            if config.throw_exception_on_failure
1811                && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1812            {
1813                return Err(CamelError::HttpOperationFailed {
1814                    method: method_str,
1815                    url,
1816                    status_code,
1817                    status_text,
1818                    response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1819                });
1820            }
1821
1822            if !response_body.is_empty() {
1823                exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1824            }
1825
1826            debug!(
1827                correlation_id = %exchange.correlation_id(),
1828                status = status_code,
1829                url = %url,
1830                "HTTP response"
1831            );
1832            Ok(exchange)
1833        })
1834    }
1835}
1836
1837/// Serializes tests that mutate or depend on the global `ServerRegistry`.
1838///
1839/// `ServerRegistry::global()` is a process-wide singleton that persists
1840/// across tests. `ServerRegistry::reset()` clears ALL entries; if it races
1841/// with another test that has a live server on a fixed port (e.g. 9991),
1842/// the registry entry is removed while the OS socket is still bound, so
1843/// the next `get_or_spawn` call on that port fails with "Address already
1844/// in use". Holding this mutex for the full body of each affected test
1845/// prevents the race without requiring `--test-threads=1`.
1846#[cfg(test)]
1847pub(crate) static REGISTRY_TEST_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
1848
1849#[cfg(test)]
1850mod tests {
1851    use super::*;
1852    use camel_component_api::{Message, NoOpComponentContext};
1853    use std::sync::Arc;
1854    use std::time::Duration;
1855
1856    fn test_producer_ctx() -> ProducerContext {
1857        ProducerContext::new()
1858    }
1859
1860    #[test]
1861    fn test_http_config_defaults() {
1862        let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1863        assert_eq!(config.base_url, "http://localhost:8080/api");
1864        assert!(config.http_method.is_none());
1865        assert!(config.throw_exception_on_failure);
1866        assert_eq!(config.ok_status_code_range, (200, 299));
1867        assert!(config.response_timeout.is_none());
1868        assert!(matches!(config.auth, HttpAuth::None));
1869        assert!(matches!(config.cookie_handling, CookieHandling::Disabled));
1870        assert!(!config.bridge_endpoint);
1871        assert!(!config.connection_close);
1872    }
1873
1874    #[test]
1875    fn test_http_config_scheme() {
1876        // UriConfig trait method returns "http" as primary scheme
1877        assert_eq!(HttpEndpointConfig::scheme(), "http");
1878    }
1879
1880    #[test]
1881    fn test_http_config_from_components() {
1882        // Test from_components directly (trait method)
1883        let components = camel_component_api::UriComponents {
1884            scheme: "https".to_string(),
1885            path: "//api.example.com/v1".to_string(),
1886            params: std::collections::HashMap::from([(
1887                "httpMethod".to_string(),
1888                "POST".to_string(),
1889            )]),
1890        };
1891        let config = HttpEndpointConfig::from_components(components).unwrap();
1892        assert_eq!(config.base_url, "https://api.example.com/v1");
1893        assert_eq!(config.http_method, Some("POST".to_string()));
1894    }
1895
1896    #[test]
1897    fn test_http_config_with_options() {
1898        let config = HttpEndpointConfig::from_uri(
1899            "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1900        ).unwrap();
1901        assert_eq!(config.base_url, "https://api.example.com/v1");
1902        assert_eq!(config.http_method, Some("PUT".to_string()));
1903        assert!(!config.throw_exception_on_failure);
1904        assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1905    }
1906
1907    #[test]
1908    fn test_http_endpoint_config_auth_and_headers_options() {
1909        let config = HttpEndpointConfig::from_uri(
1910            "http://localhost/api?authMethod=Basic&authUsername=u&authPassword=p&userAgent=camel-test&bridgeEndpoint=true&connectionClose=true&skipRequestHeaders=Authorization,X-Secret&skipResponseHeaders=Set-Cookie&cookieHandling=InMemory",
1911        )
1912        .unwrap();
1913
1914        assert!(matches!(
1915            config.auth,
1916            HttpAuth::Basic { username, password } if username == "u" && password == "p"
1917        ));
1918        assert_eq!(config.user_agent.as_deref(), Some("camel-test"));
1919        assert!(matches!(config.cookie_handling, CookieHandling::InMemory));
1920        assert!(config.bridge_endpoint);
1921        assert!(config.connection_close);
1922        assert_eq!(
1923            config.skip_request_headers,
1924            vec!["authorization".to_string(), "x-secret".to_string()]
1925        );
1926        assert_eq!(config.skip_response_headers, vec!["set-cookie".to_string()]);
1927    }
1928
1929    #[test]
1930    fn test_http_endpoint_config_bearer_auth() {
1931        let config = HttpEndpointConfig::from_uri(
1932            "http://localhost/api?authMethod=Bearer&authBearerToken=t",
1933        )
1934        .unwrap();
1935        assert!(matches!(
1936            config.auth,
1937            HttpAuth::Bearer { token } if token == "t"
1938        ));
1939    }
1940
1941    #[test]
1942    fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
1943        let config = HttpConfig::default()
1944            .with_response_timeout_ms(999)
1945            .with_allow_private_ips(true)
1946            .with_blocked_hosts(vec!["evil.com".to_string()])
1947            .with_max_body_size(12345);
1948        let endpoint =
1949            HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
1950        assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
1951        assert!(endpoint.allow_private_ips);
1952        assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
1953        assert_eq!(endpoint.max_body_size, 12345);
1954    }
1955
1956    #[test]
1957    fn test_from_uri_with_defaults_uri_overrides_config() {
1958        let config = HttpConfig::default()
1959            .with_response_timeout_ms(999)
1960            .with_allow_private_ips(true)
1961            .with_blocked_hosts(vec!["evil.com".to_string()])
1962            .with_max_body_size(12345);
1963        let endpoint = HttpEndpointConfig::from_uri_with_defaults(
1964            "http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
1965            &config,
1966        )
1967        .unwrap();
1968        assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
1969        assert!(!endpoint.allow_private_ips);
1970        assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
1971        assert_eq!(endpoint.max_body_size, 99);
1972    }
1973
1974    #[test]
1975    fn test_http_config_ok_status_range() {
1976        let config =
1977            HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1978        assert_eq!(config.ok_status_code_range, (200, 204));
1979    }
1980
1981    #[test]
1982    fn test_http_config_wrong_scheme() {
1983        let result = HttpEndpointConfig::from_uri("file:/tmp");
1984        assert!(result.is_err());
1985    }
1986
1987    #[test]
1988    fn test_http_component_scheme() {
1989        let component = HttpComponent::new();
1990        assert_eq!(component.scheme(), "http");
1991    }
1992
1993    #[test]
1994    fn test_https_component_scheme() {
1995        let component = HttpsComponent::new();
1996        assert_eq!(component.scheme(), "https");
1997    }
1998
1999    #[test]
2000    fn test_http_endpoint_creates_consumer() {
2001        let component = HttpComponent::new();
2002        let ctx = NoOpComponentContext;
2003        let endpoint = component
2004            .create_endpoint("http://0.0.0.0:19100/test", &ctx)
2005            .unwrap();
2006        assert!(endpoint.create_consumer().is_ok());
2007    }
2008
2009    #[test]
2010    fn test_https_endpoint_creates_consumer() {
2011        let component = HttpsComponent::new();
2012        let ctx = NoOpComponentContext;
2013        let endpoint = component
2014            .create_endpoint("https://0.0.0.0:8443/test", &ctx)
2015            .unwrap();
2016        assert!(endpoint.create_consumer().is_ok());
2017    }
2018
2019    #[test]
2020    fn test_http_endpoint_creates_producer() {
2021        let ctx = test_producer_ctx();
2022        let component = HttpComponent::new();
2023        let endpoint_ctx = NoOpComponentContext;
2024        let endpoint = component
2025            .create_endpoint("http://localhost/api", &endpoint_ctx)
2026            .unwrap();
2027        assert!(endpoint.create_producer(&ctx).is_ok());
2028    }
2029
2030    // -----------------------------------------------------------------------
2031    // Producer tests
2032    // -----------------------------------------------------------------------
2033
2034    #[tokio::test]
2035    async fn test_producer_with_token_provider() {
2036        use camel_auth::oauth2::TokenProvider;
2037        use tower::ServiceExt;
2038
2039        let captured_auth: Arc<std::sync::Mutex<Option<String>>> =
2040            Arc::new(std::sync::Mutex::new(None));
2041        let captured_clone = Arc::clone(&captured_auth);
2042
2043        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2044        let port = listener.local_addr().unwrap().port();
2045
2046        let _handle = tokio::spawn(async move {
2047            use tokio::io::{AsyncReadExt, AsyncWriteExt};
2048            if let Ok((mut stream, _)) = listener.accept().await {
2049                let mut buf = vec![0u8; 8192];
2050                let n = stream.read(&mut buf).await.unwrap_or(0);
2051                let request = String::from_utf8_lossy(&buf[..n]).to_string();
2052                let auth = request
2053                    .lines()
2054                    .find(|l| l.to_lowercase().starts_with("authorization:"))
2055                    .map(|l| {
2056                        l.split(':')
2057                            .nth(1)
2058                            .map(|s| s.trim().to_string())
2059                            .unwrap_or_default()
2060                    });
2061                *captured_clone.lock().unwrap() = auth;
2062                let body = r#"{"echo":"ok"}"#;
2063                let resp = format!(
2064                    "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
2065                    body.len(),
2066                    body
2067                );
2068                let _ = stream.write_all(resp.as_bytes()).await;
2069            }
2070        });
2071
2072        #[derive(Debug)]
2073        struct StaticProvider;
2074        #[async_trait::async_trait]
2075        impl TokenProvider for StaticProvider {
2076            async fn get_token(&self) -> Result<String, camel_auth::types::AuthError> {
2077                Ok("injected-token".into())
2078            }
2079        }
2080
2081        let uri = format!("http://127.0.0.1:{}/api?allowPrivateIps=true", port);
2082        let ctx = test_producer_ctx();
2083        let component = HttpComponent::new();
2084        let endpoint_ctx = NoOpComponentContext;
2085        let endpoint = component.create_endpoint(&uri, &endpoint_ctx).unwrap();
2086        let producer = endpoint.create_producer(&ctx).unwrap();
2087
2088        let exchange = Exchange::new(Message::new("hello"));
2089
2090        let layer = BearerTokenLayer::new(Arc::new(StaticProvider));
2091        let mut layered = layer.layer(producer);
2092        let result = layered.ready().await.unwrap().call(exchange).await;
2093        assert!(result.is_ok(), "producer call failed: {:?}", result);
2094
2095        tokio::time::sleep(Duration::from_millis(100)).await;
2096        let auth = captured_auth.lock().unwrap().take();
2097        assert_eq!(auth.as_deref(), Some("Bearer injected-token"));
2098    }
2099
2100    async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
2101        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2102        let addr = listener.local_addr().unwrap();
2103        let url = format!("http://127.0.0.1:{}", addr.port());
2104
2105        let handle = tokio::spawn(async move {
2106            loop {
2107                if let Ok((mut stream, _)) = listener.accept().await {
2108                    tokio::spawn(async move {
2109                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
2110                        let mut buf = vec![0u8; 4096];
2111                        let n = stream.read(&mut buf).await.unwrap_or(0);
2112                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
2113
2114                        let method = request.split_whitespace().next().unwrap_or("GET");
2115
2116                        let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
2117                        let response = format!(
2118                            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
2119                            body.len(),
2120                            body
2121                        );
2122                        let _ = stream.write_all(response.as_bytes()).await;
2123                    });
2124                }
2125            }
2126        });
2127
2128        (url, handle)
2129    }
2130
2131    async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
2132        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2133        let addr = listener.local_addr().unwrap();
2134        let url = format!("http://127.0.0.1:{}", addr.port());
2135
2136        let handle = tokio::spawn(async move {
2137            loop {
2138                if let Ok((mut stream, _)) = listener.accept().await {
2139                    let status = status;
2140                    tokio::spawn(async move {
2141                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
2142                        let mut buf = vec![0u8; 4096];
2143                        let _ = stream.read(&mut buf).await;
2144
2145                        let status_text = match status {
2146                            404 => "Not Found",
2147                            500 => "Internal Server Error",
2148                            _ => "Error",
2149                        };
2150                        let body = "error body";
2151                        let response = format!(
2152                            "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
2153                            status,
2154                            status_text,
2155                            body.len(),
2156                            body
2157                        );
2158                        let _ = stream.write_all(response.as_bytes()).await;
2159                    });
2160                }
2161            }
2162        });
2163
2164        (url, handle)
2165    }
2166
2167    #[tokio::test]
2168    async fn test_http_producer_get_request() {
2169        use tower::ServiceExt;
2170
2171        let (url, _handle) = start_test_server().await;
2172        let ctx = test_producer_ctx();
2173
2174        let component = HttpComponent::new();
2175        let endpoint_ctx = NoOpComponentContext;
2176        let endpoint = component
2177            .create_endpoint(
2178                &format!("{url}/api/test?allowPrivateIps=true"),
2179                &endpoint_ctx,
2180            )
2181            .unwrap();
2182        let producer = endpoint.create_producer(&ctx).unwrap();
2183
2184        let exchange = Exchange::new(Message::default());
2185        let result = producer.oneshot(exchange).await.unwrap();
2186
2187        let status = result
2188            .input
2189            .header("CamelHttpResponseCode")
2190            .and_then(|v| v.as_u64())
2191            .unwrap();
2192        assert_eq!(status, 200);
2193
2194        assert!(!result.input.body.is_empty());
2195    }
2196
2197    #[tokio::test]
2198    async fn test_http_producer_post_with_body() {
2199        use tower::ServiceExt;
2200
2201        let (url, _handle) = start_test_server().await;
2202        let ctx = test_producer_ctx();
2203
2204        let component = HttpComponent::new();
2205        let endpoint_ctx = NoOpComponentContext;
2206        let endpoint = component
2207            .create_endpoint(
2208                &format!("{url}/api/data?allowPrivateIps=true"),
2209                &endpoint_ctx,
2210            )
2211            .unwrap();
2212        let producer = endpoint.create_producer(&ctx).unwrap();
2213
2214        let exchange = Exchange::new(Message::new("request body"));
2215        let result = producer.oneshot(exchange).await.unwrap();
2216
2217        let status = result
2218            .input
2219            .header("CamelHttpResponseCode")
2220            .and_then(|v| v.as_u64())
2221            .unwrap();
2222        assert_eq!(status, 200);
2223    }
2224
2225    #[tokio::test]
2226    async fn test_http_producer_method_from_header() {
2227        use tower::ServiceExt;
2228
2229        let (url, _handle) = start_test_server().await;
2230        let ctx = test_producer_ctx();
2231
2232        let component = HttpComponent::new();
2233        let endpoint_ctx = NoOpComponentContext;
2234        let endpoint = component
2235            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2236            .unwrap();
2237        let producer = endpoint.create_producer(&ctx).unwrap();
2238
2239        let mut exchange = Exchange::new(Message::default());
2240        exchange.input.set_header(
2241            "CamelHttpMethod",
2242            serde_json::Value::String("DELETE".to_string()),
2243        );
2244
2245        let result = producer.oneshot(exchange).await.unwrap();
2246        let status = result
2247            .input
2248            .header("CamelHttpResponseCode")
2249            .and_then(|v| v.as_u64())
2250            .unwrap();
2251        assert_eq!(status, 200);
2252    }
2253
2254    #[tokio::test]
2255    async fn test_http_producer_forced_method() {
2256        use tower::ServiceExt;
2257
2258        let (url, _handle) = start_test_server().await;
2259        let ctx = test_producer_ctx();
2260
2261        let component = HttpComponent::new();
2262        let endpoint_ctx = NoOpComponentContext;
2263        let endpoint = component
2264            .create_endpoint(
2265                &format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"),
2266                &endpoint_ctx,
2267            )
2268            .unwrap();
2269        let producer = endpoint.create_producer(&ctx).unwrap();
2270
2271        let exchange = Exchange::new(Message::default());
2272        let result = producer.oneshot(exchange).await.unwrap();
2273
2274        let status = result
2275            .input
2276            .header("CamelHttpResponseCode")
2277            .and_then(|v| v.as_u64())
2278            .unwrap();
2279        assert_eq!(status, 200);
2280    }
2281
2282    #[tokio::test]
2283    async fn test_http_producer_throw_exception_on_failure() {
2284        use tower::ServiceExt;
2285
2286        let (url, _handle) = start_status_server(404).await;
2287        let ctx = test_producer_ctx();
2288
2289        let component = HttpComponent::new();
2290        let endpoint_ctx = NoOpComponentContext;
2291        let endpoint = component
2292            .create_endpoint(
2293                &format!("{url}/not-found?allowPrivateIps=true"),
2294                &endpoint_ctx,
2295            )
2296            .unwrap();
2297        let producer = endpoint.create_producer(&ctx).unwrap();
2298
2299        let exchange = Exchange::new(Message::default());
2300        let result = producer.oneshot(exchange).await;
2301        assert!(result.is_err());
2302
2303        match result.unwrap_err() {
2304            CamelError::HttpOperationFailed { status_code, .. } => {
2305                assert_eq!(status_code, 404);
2306            }
2307            e => panic!("Expected HttpOperationFailed, got: {e}"),
2308        }
2309    }
2310
2311    #[tokio::test]
2312    async fn test_http_producer_no_throw_on_failure() {
2313        use tower::ServiceExt;
2314
2315        let (url, _handle) = start_status_server(500).await;
2316        let ctx = test_producer_ctx();
2317
2318        let component = HttpComponent::new();
2319        let endpoint_ctx = NoOpComponentContext;
2320        let endpoint = component
2321            .create_endpoint(
2322                &format!("{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"),
2323                &endpoint_ctx,
2324            )
2325            .unwrap();
2326        let producer = endpoint.create_producer(&ctx).unwrap();
2327
2328        let exchange = Exchange::new(Message::default());
2329        let result = producer.oneshot(exchange).await.unwrap();
2330
2331        let status = result
2332            .input
2333            .header("CamelHttpResponseCode")
2334            .and_then(|v| v.as_u64())
2335            .unwrap();
2336        assert_eq!(status, 500);
2337    }
2338
2339    #[tokio::test]
2340    async fn test_http_producer_uri_override() {
2341        use tower::ServiceExt;
2342
2343        let (url, _handle) = start_test_server().await;
2344        let ctx = test_producer_ctx();
2345
2346        let component = HttpComponent::new();
2347        let endpoint_ctx = NoOpComponentContext;
2348        let endpoint = component
2349            .create_endpoint(
2350                "http://localhost:1/does-not-exist?allowPrivateIps=true",
2351                &endpoint_ctx,
2352            )
2353            .unwrap();
2354        let producer = endpoint.create_producer(&ctx).unwrap();
2355
2356        let mut exchange = Exchange::new(Message::default());
2357        exchange.input.set_header(
2358            "CamelHttpUri",
2359            serde_json::Value::String(format!("{url}/api")),
2360        );
2361
2362        let result = producer.oneshot(exchange).await.unwrap();
2363        let status = result
2364            .input
2365            .header("CamelHttpResponseCode")
2366            .and_then(|v| v.as_u64())
2367            .unwrap();
2368        assert_eq!(status, 200);
2369    }
2370
2371    #[tokio::test]
2372    async fn test_http_producer_response_headers_mapped() {
2373        use tower::ServiceExt;
2374
2375        let (url, _handle) = start_test_server().await;
2376        let ctx = test_producer_ctx();
2377
2378        let component = HttpComponent::new();
2379        let endpoint_ctx = NoOpComponentContext;
2380        let endpoint = component
2381            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2382            .unwrap();
2383        let producer = endpoint.create_producer(&ctx).unwrap();
2384
2385        let exchange = Exchange::new(Message::default());
2386        let result = producer.oneshot(exchange).await.unwrap();
2387
2388        assert!(
2389            result.input.header("Content-Type").is_some(),
2390            "Response should have Content-Type header"
2391        );
2392        assert!(result.input.header("CamelHttpResponseText").is_some());
2393    }
2394
2395    // -----------------------------------------------------------------------
2396    // Bug fix tests: Client configuration per-endpoint
2397    // -----------------------------------------------------------------------
2398
2399    async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
2400        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2401        let addr = listener.local_addr().unwrap();
2402        let url = format!("http://127.0.0.1:{}", addr.port());
2403
2404        let handle = tokio::spawn(async move {
2405            use tokio::io::{AsyncReadExt, AsyncWriteExt};
2406            loop {
2407                if let Ok((mut stream, _)) = listener.accept().await {
2408                    tokio::spawn(async move {
2409                        let mut buf = vec![0u8; 4096];
2410                        let n = stream.read(&mut buf).await.unwrap_or(0);
2411                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
2412
2413                        // Check if this is a request to /final
2414                        if request.contains("GET /final") {
2415                            let body = r#"{"status":"final"}"#;
2416                            let response = format!(
2417                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
2418                                body.len(),
2419                                body
2420                            );
2421                            let _ = stream.write_all(response.as_bytes()).await;
2422                        } else {
2423                            // Redirect to /final
2424                            let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
2425                            let _ = stream.write_all(response.as_bytes()).await;
2426                        }
2427                    });
2428                }
2429            }
2430        });
2431
2432        (url, handle)
2433    }
2434
2435    #[tokio::test]
2436    async fn test_follow_redirects_false_does_not_follow() {
2437        use tower::ServiceExt;
2438
2439        let (url, _handle) = start_redirect_server().await;
2440        let ctx = test_producer_ctx();
2441
2442        let component =
2443            HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
2444        let endpoint_ctx = NoOpComponentContext;
2445        let endpoint = component
2446            .create_endpoint(
2447                &format!("{url}?throwExceptionOnFailure=false&allowPrivateIps=true"),
2448                &endpoint_ctx,
2449            )
2450            .unwrap();
2451        let producer = endpoint.create_producer(&ctx).unwrap();
2452
2453        let exchange = Exchange::new(Message::default());
2454        let result = producer.oneshot(exchange).await.unwrap();
2455
2456        // Should get 302, NOT follow redirect to 200
2457        let status = result
2458            .input
2459            .header("CamelHttpResponseCode")
2460            .and_then(|v| v.as_u64())
2461            .unwrap();
2462        assert_eq!(
2463            status, 302,
2464            "Should NOT follow redirect when followRedirects=false"
2465        );
2466    }
2467
2468    #[tokio::test]
2469    async fn test_follow_redirects_true_follows_redirect() {
2470        use tower::ServiceExt;
2471
2472        let (url, _handle) = start_redirect_server().await;
2473        let ctx = test_producer_ctx();
2474
2475        let component =
2476            HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
2477        let endpoint_ctx = NoOpComponentContext;
2478        let endpoint = component
2479            .create_endpoint(&format!("{url}?allowPrivateIps=true"), &endpoint_ctx)
2480            .unwrap();
2481        let producer = endpoint.create_producer(&ctx).unwrap();
2482
2483        let exchange = Exchange::new(Message::default());
2484        let result = producer.oneshot(exchange).await.unwrap();
2485
2486        // Should follow redirect and get 200
2487        let status = result
2488            .input
2489            .header("CamelHttpResponseCode")
2490            .and_then(|v| v.as_u64())
2491            .unwrap();
2492        assert_eq!(
2493            status, 200,
2494            "Should follow redirect when followRedirects=true"
2495        );
2496    }
2497
2498    #[tokio::test]
2499    async fn test_query_params_forwarded_to_http_request() {
2500        use tower::ServiceExt;
2501
2502        let (url, _handle) = start_test_server().await;
2503        let ctx = test_producer_ctx();
2504
2505        let component = HttpComponent::new();
2506        let endpoint_ctx = NoOpComponentContext;
2507        // apiKey is NOT a Camel option, should be forwarded as query param
2508        let endpoint = component
2509            .create_endpoint(
2510                &format!("{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"),
2511                &endpoint_ctx,
2512            )
2513            .unwrap();
2514        let producer = endpoint.create_producer(&ctx).unwrap();
2515
2516        let exchange = Exchange::new(Message::default());
2517        let result = producer.oneshot(exchange).await.unwrap();
2518
2519        // The test server returns the request info in response
2520        // We just verify it succeeds (the query param was sent)
2521        let status = result
2522            .input
2523            .header("CamelHttpResponseCode")
2524            .and_then(|v| v.as_u64())
2525            .unwrap();
2526        assert_eq!(status, 200);
2527    }
2528
2529    #[tokio::test]
2530    async fn test_non_camel_query_params_are_forwarded() {
2531        // This test verifies Bug #3 fix: non-Camel options should be forwarded
2532        // We'll test the config parsing, not the actual HTTP call
2533        let config = HttpEndpointConfig::from_uri(
2534            "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
2535        )
2536        .unwrap();
2537
2538        // apiKey and token are NOT Camel options, should be forwarded
2539        assert!(
2540            config.query_params.contains_key("apiKey"),
2541            "apiKey should be preserved"
2542        );
2543        assert!(
2544            config.query_params.contains_key("token"),
2545            "token should be preserved"
2546        );
2547        assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
2548        assert_eq!(config.query_params.get("token").unwrap(), "abc456");
2549
2550        // httpMethod IS a Camel option, should NOT be in query_params
2551        assert!(
2552            !config.query_params.contains_key("httpMethod"),
2553            "httpMethod should not be forwarded"
2554        );
2555    }
2556
2557    #[test]
2558    fn test_query_params_are_url_encoded_when_resolving_url() {
2559        let config =
2560            HttpEndpointConfig::from_uri("http://example.com/api?q=hello world&tag=a+b").unwrap();
2561        let exchange = Exchange::new(Message::default());
2562
2563        let url = HttpProducer::resolve_url(&exchange, &config);
2564
2565        assert!(url.contains("q=hello+world"), "url was: {url}");
2566        assert!(url.contains("tag=a%2Bb"), "url was: {url}");
2567    }
2568
2569    // -----------------------------------------------------------------------
2570    // Timeout tests (HTTP-004)
2571    // -----------------------------------------------------------------------
2572
2573    async fn start_slow_server(delay_ms: u64) -> (String, tokio::task::JoinHandle<()>) {
2574        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2575        let addr = listener.local_addr().unwrap();
2576        let url = format!("http://127.0.0.1:{}", addr.port());
2577
2578        let handle = tokio::spawn(async move {
2579            loop {
2580                if let Ok((mut stream, _)) = listener.accept().await {
2581                    let delay = delay_ms;
2582                    tokio::spawn(async move {
2583                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
2584                        let mut buf = vec![0u8; 4096];
2585                        let _ = stream.read(&mut buf).await;
2586                        // Send headers immediately (no Content-Length → chunked)
2587                        let headers = "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nTransfer-Encoding: chunked\r\n\r\n";
2588                        let _ = stream.write_all(headers.as_bytes()).await;
2589                        // Delay before sending body chunk
2590                        tokio::time::sleep(Duration::from_millis(delay)).await;
2591                        let body = r#"{"status":"slow"}"#;
2592                        let chunk = format!("{:x}\r\n{}\r\n0\r\n\r\n", body.len(), body);
2593                        let _ = stream.write_all(chunk.as_bytes()).await;
2594                    });
2595                }
2596            }
2597        });
2598
2599        (url, handle)
2600    }
2601
2602    #[tokio::test]
2603    async fn test_http_producer_timeout() {
2604        use tower::ServiceExt;
2605
2606        // Server delays 500ms, client timeout is 100ms → should timeout
2607        let (url, _handle) = start_slow_server(500).await;
2608        let ctx = test_producer_ctx();
2609
2610        let component = HttpComponent::with_config(
2611            HttpConfig::default()
2612                .with_read_timeout_ms(100)
2613                .with_response_timeout_ms(30_000), // generous response timeout
2614        );
2615        let endpoint_ctx = NoOpComponentContext;
2616        let endpoint = component
2617            .create_endpoint(&format!("{url}/slow?allowPrivateIps=true"), &endpoint_ctx)
2618            .unwrap();
2619        let producer = endpoint.create_producer(&ctx).unwrap();
2620
2621        let exchange = Exchange::new(Message::default());
2622        let result = producer.oneshot(exchange).await;
2623
2624        assert!(result.is_err(), "Expected timeout error, got: {:?}", result);
2625        let err = result.unwrap_err().to_string();
2626        assert!(
2627            err.contains("Read timeout") || err.contains("timeout"),
2628            "Error should mention timeout, got: {}",
2629            err
2630        );
2631    }
2632
2633    #[tokio::test]
2634    async fn test_http_producer_no_timeout_when_fast() {
2635        use tower::ServiceExt;
2636
2637        let (url, _handle) = start_test_server().await;
2638        let ctx = test_producer_ctx();
2639
2640        let component =
2641            HttpComponent::with_config(HttpConfig::default().with_read_timeout_ms(5_000));
2642        let endpoint_ctx = NoOpComponentContext;
2643        let endpoint = component
2644            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2645            .unwrap();
2646        let producer = endpoint.create_producer(&ctx).unwrap();
2647
2648        let exchange = Exchange::new(Message::default());
2649        let result = producer.oneshot(exchange).await.unwrap();
2650
2651        let status = result
2652            .input
2653            .header("CamelHttpResponseCode")
2654            .and_then(|v| v.as_u64())
2655            .unwrap();
2656        assert_eq!(status, 200);
2657    }
2658
2659    // -----------------------------------------------------------------------
2660    // SSRF Protection tests
2661    // -----------------------------------------------------------------------
2662
2663    #[tokio::test]
2664    async fn test_http_producer_blocks_metadata_endpoint() {
2665        use tower::ServiceExt;
2666
2667        let ctx = test_producer_ctx();
2668        let component = HttpComponent::new();
2669        let endpoint_ctx = NoOpComponentContext;
2670        let endpoint = component
2671            .create_endpoint(
2672                "http://example.com/api?allowPrivateIps=false",
2673                &endpoint_ctx,
2674            )
2675            .unwrap();
2676        let producer = endpoint.create_producer(&ctx).unwrap();
2677
2678        let mut exchange = Exchange::new(Message::default());
2679        exchange.input.set_header(
2680            "CamelHttpUri",
2681            serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
2682        );
2683
2684        let result = producer.oneshot(exchange).await;
2685        assert!(result.is_err(), "Should block AWS metadata endpoint");
2686
2687        let err = result.unwrap_err();
2688        assert!(
2689            err.to_string().contains("Private IP"),
2690            "Error should mention private IP blocking, got: {}",
2691            err
2692        );
2693    }
2694
2695    #[test]
2696    fn test_ssrf_config_defaults() {
2697        let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
2698        assert!(
2699            !config.allow_private_ips,
2700            "Private IPs should be blocked by default"
2701        );
2702        assert!(
2703            config.blocked_hosts.is_empty(),
2704            "Blocked hosts should be empty by default"
2705        );
2706    }
2707
2708    #[test]
2709    fn test_ssrf_config_allow_private_ips() {
2710        let config =
2711            HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
2712        assert!(
2713            config.allow_private_ips,
2714            "Private IPs should be allowed when explicitly set"
2715        );
2716    }
2717
2718    #[test]
2719    fn test_ssrf_config_blocked_hosts() {
2720        let config = HttpEndpointConfig::from_uri(
2721            "http://example.com/api?blockedHosts=evil.com,malware.net",
2722        )
2723        .unwrap();
2724        assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
2725    }
2726
2727    #[tokio::test]
2728    async fn test_http_producer_blocks_localhost() {
2729        use tower::ServiceExt;
2730
2731        let ctx = test_producer_ctx();
2732        let component = HttpComponent::new();
2733        let endpoint_ctx = NoOpComponentContext;
2734        let endpoint = component
2735            .create_endpoint("http://example.com/api", &endpoint_ctx)
2736            .unwrap();
2737        let producer = endpoint.create_producer(&ctx).unwrap();
2738
2739        let mut exchange = Exchange::new(Message::default());
2740        exchange.input.set_header(
2741            "CamelHttpUri",
2742            serde_json::Value::String("http://localhost:8080/internal".to_string()),
2743        );
2744
2745        let result = producer.oneshot(exchange).await;
2746        assert!(result.is_err(), "Should block localhost");
2747    }
2748
2749    #[tokio::test]
2750    async fn test_http_producer_blocks_loopback_ip() {
2751        use tower::ServiceExt;
2752
2753        let ctx = test_producer_ctx();
2754        let component = HttpComponent::new();
2755        let endpoint_ctx = NoOpComponentContext;
2756        let endpoint = component
2757            .create_endpoint("http://example.com/api", &endpoint_ctx)
2758            .unwrap();
2759        let producer = endpoint.create_producer(&ctx).unwrap();
2760
2761        let mut exchange = Exchange::new(Message::default());
2762        exchange.input.set_header(
2763            "CamelHttpUri",
2764            serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
2765        );
2766
2767        let result = producer.oneshot(exchange).await;
2768        assert!(result.is_err(), "Should block loopback IP");
2769    }
2770
2771    #[tokio::test]
2772    async fn test_http_producer_allows_private_ip_when_enabled() {
2773        use tower::ServiceExt;
2774
2775        let ctx = test_producer_ctx();
2776        let component = HttpComponent::new();
2777        let endpoint_ctx = NoOpComponentContext;
2778        // With allowPrivateIps=true, the validation should pass
2779        // (actual connection will fail, but that's expected)
2780        let endpoint = component
2781            .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true", &endpoint_ctx)
2782            .unwrap();
2783        let producer = endpoint.create_producer(&ctx).unwrap();
2784
2785        let exchange = Exchange::new(Message::default());
2786
2787        // The request will fail because we can't connect, but it should NOT fail
2788        // due to SSRF protection
2789        let result = producer.oneshot(exchange).await;
2790        // We expect connection error, not SSRF error
2791        if let Err(ref e) = result {
2792            let err_str = e.to_string();
2793            assert!(
2794                !err_str.contains("Private IP") && !err_str.contains("not allowed"),
2795                "Should not be SSRF error, got: {}",
2796                err_str
2797            );
2798        }
2799    }
2800
2801    // -----------------------------------------------------------------------
2802    // HttpServerConfig tests
2803    // -----------------------------------------------------------------------
2804
2805    #[test]
2806    fn test_http_server_config_parse() {
2807        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
2808        assert_eq!(cfg.host, "0.0.0.0");
2809        assert_eq!(cfg.port, 8080);
2810        assert_eq!(cfg.path, "/orders");
2811        assert_eq!(cfg.max_inflight_requests, 1024);
2812    }
2813
2814    #[test]
2815    fn test_http_server_config_scheme() {
2816        // UriConfig trait method returns "http" as primary scheme
2817        assert_eq!(HttpServerConfig::scheme(), "http");
2818    }
2819
2820    #[test]
2821    fn test_http_server_config_from_components() {
2822        // Test from_components directly (trait method)
2823        let components = camel_component_api::UriComponents {
2824            scheme: "https".to_string(),
2825            path: "//0.0.0.0:8443/api".to_string(),
2826            params: std::collections::HashMap::from([
2827                ("maxRequestBody".to_string(), "5242880".to_string()),
2828                ("maxInflightRequests".to_string(), "7".to_string()),
2829            ]),
2830        };
2831        let cfg = HttpServerConfig::from_components(components).unwrap();
2832        assert_eq!(cfg.host, "0.0.0.0");
2833        assert_eq!(cfg.port, 8443);
2834        assert_eq!(cfg.path, "/api");
2835        assert_eq!(cfg.max_request_body, 5242880);
2836        assert_eq!(cfg.max_inflight_requests, 7);
2837    }
2838
2839    #[test]
2840    fn test_http_server_config_default_path() {
2841        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
2842        assert_eq!(cfg.path, "/");
2843    }
2844
2845    #[test]
2846    fn test_http_server_config_wrong_scheme() {
2847        assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
2848    }
2849
2850    #[test]
2851    fn test_http_server_config_invalid_port() {
2852        assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
2853    }
2854
2855    #[test]
2856    fn test_http_server_config_default_port_by_scheme() {
2857        // HTTP without explicit port should default to 80
2858        let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
2859        assert_eq!(cfg_http.port, 80);
2860
2861        // HTTPS without explicit port should default to 443
2862        let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
2863        assert_eq!(cfg_https.port, 443);
2864    }
2865
2866    #[test]
2867    fn test_request_envelope_and_reply_are_send() {
2868        fn assert_send<T: Send>() {}
2869        assert_send::<RequestEnvelope>();
2870        assert_send::<HttpReply>();
2871    }
2872
2873    // -----------------------------------------------------------------------
2874    // ServerRegistry tests
2875    // -----------------------------------------------------------------------
2876
2877    #[test]
2878    fn test_server_registry_global_is_singleton() {
2879        let r1 = ServerRegistry::global();
2880        let r2 = ServerRegistry::global();
2881        assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
2882    }
2883
2884    #[allow(clippy::await_holding_lock)]
2885    #[tokio::test]
2886    async fn test_concurrent_get_or_spawn_returns_same_registry() {
2887        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2888        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2889        let port = listener.local_addr().unwrap().port();
2890        drop(listener);
2891
2892        let results: Arc<std::sync::Mutex<Vec<HttpRouteRegistry>>> =
2893            Arc::new(std::sync::Mutex::new(Vec::new()));
2894
2895        let mut handles = Vec::new();
2896        for _ in 0..4 {
2897            let results = results.clone();
2898            handles.push(tokio::spawn(async move {
2899                let registry = ServerRegistry::global()
2900                    .get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2901                    .await
2902                    .unwrap();
2903                results.lock().unwrap().push(registry);
2904            }));
2905        }
2906
2907        for h in handles {
2908            h.await.unwrap();
2909        }
2910
2911        let registries = results.lock().unwrap();
2912        assert_eq!(registries.len(), 4);
2913        for i in 1..registries.len() {
2914            assert!(
2915                Arc::ptr_eq(&registries[0].inner, &registries[i].inner),
2916                "all concurrent callers should get same route registry"
2917            );
2918        }
2919    }
2920
2921    #[test]
2922    fn test_server_registry_distinguishes_host_and_port() {
2923        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2924        let rt = tokio::runtime::Runtime::new().expect("runtime");
2925        rt.block_on(async {
2926            let registry = ServerRegistry::global();
2927            // Use two distinct host values with same configured port key.
2928            // Port 0 is acceptable here because the registry key uses the configured
2929            // tuple, not the OS-assigned ephemeral port.
2930            let d1 = registry
2931                .get_or_spawn("127.0.0.1", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2932                .await;
2933            let d2 = registry
2934                .get_or_spawn("0.0.0.0", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2935                .await;
2936            assert!(d1.is_ok());
2937            assert!(d2.is_ok());
2938            assert!(!Arc::ptr_eq(&d1.unwrap().inner, &d2.unwrap().inner));
2939        });
2940    }
2941
2942    #[allow(clippy::await_holding_lock)]
2943    #[tokio::test]
2944    async fn test_shared_server_max_request_body_policy_is_deterministic() {
2945        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2946        let registry = ServerRegistry::global();
2947        // First registration: maxRequestBody = 1 MB
2948        let d1 = registry
2949            .get_or_spawn("127.0.0.1", 9991, 1024 * 1024, 10 * 1024 * 1024, 1024)
2950            .await;
2951        assert!(d1.is_ok());
2952
2953        // Second registration on same (host,port): maxRequestBody = 2 MB
2954        // Expected: explicit EndpointCreationFailed about incompatible maxRequestBody
2955        let d2 = registry
2956            .get_or_spawn("127.0.0.1", 9991, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2957            .await;
2958        assert!(d2.is_err());
2959        let err = d2.unwrap_err();
2960        assert!(
2961            err.to_string().contains("maxRequestBody") || err.to_string().contains("incompatible"),
2962            "Expected incompatible maxRequestBody error, got: {}",
2963            err
2964        );
2965    }
2966
2967    #[test]
2968    fn test_server_registry_reset_clears_entries() {
2969        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2970        let rt = tokio::runtime::Runtime::new().expect("runtime");
2971        rt.block_on(async {
2972            // Register something on a unique port
2973            let d1 = ServerRegistry::global()
2974                .get_or_spawn("127.0.0.1", 9992, 1024 * 1024, 10 * 1024 * 1024, 1024)
2975                .await;
2976            assert!(d1.is_ok());
2977
2978            // Verify entry exists
2979            let guard = ServerRegistry::global().inner.lock().expect("lock");
2980            assert!(guard.contains_key(&("127.0.0.1".to_string(), 9992)));
2981            drop(guard);
2982
2983            // Reset
2984            ServerRegistry::reset();
2985
2986            // Verify cleared
2987            let guard = ServerRegistry::global().inner.lock().expect("lock");
2988            assert!(
2989                guard.is_empty(),
2990                "registry should be empty after reset, has {} entries",
2991                guard.len()
2992            );
2993        });
2994    }
2995
2996    // -----------------------------------------------------------------------
2997    // Axum dispatch handler tests
2998    // -----------------------------------------------------------------------
2999
3000    #[tokio::test]
3001    async fn test_dispatch_handler_returns_404_for_unknown_path() {
3002        let registry = HttpRouteRegistry::new();
3003        // Nothing registered in route registry
3004        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3005        let port = listener.local_addr().unwrap().port();
3006        tokio::spawn(run_axum_server(
3007            listener,
3008            registry,
3009            2 * 1024 * 1024,
3010            10 * 1024 * 1024,
3011            Arc::new(tokio::sync::Semaphore::new(1024)),
3012        ));
3013
3014        // Wait for server to start
3015        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3016
3017        let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
3018            .await
3019            .unwrap();
3020        assert_eq!(resp.status().as_u16(), 404);
3021    }
3022
3023    // -----------------------------------------------------------------------
3024    // HttpConsumer tests
3025    // -----------------------------------------------------------------------
3026
3027    #[tokio::test]
3028    async fn test_http_consumer_start_registers_path() {
3029        use camel_component_api::ConsumerContext;
3030
3031        // Get an OS-assigned free port
3032        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3033        let port = listener.local_addr().unwrap().port();
3034        drop(listener); // Release port — ServerRegistry will rebind it
3035
3036        let consumer_cfg = HttpServerConfig {
3037            host: "127.0.0.1".to_string(),
3038            port,
3039            path: "/ping".to_string(),
3040            max_request_body: 2 * 1024 * 1024,
3041            max_response_body: 10 * 1024 * 1024,
3042            max_inflight_requests: 1024,
3043        };
3044        let mut consumer = HttpConsumer::new(consumer_cfg);
3045
3046        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3047        let token = tokio_util::sync::CancellationToken::new();
3048        let ctx = ConsumerContext::new(tx, token.clone());
3049
3050        tokio::spawn(async move {
3051            consumer.start(ctx).await.unwrap();
3052        });
3053
3054        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3055
3056        let client = reqwest::Client::new();
3057        let resp_future = client
3058            .post(format!("http://127.0.0.1:{port}/ping"))
3059            .body("hello world")
3060            .send();
3061
3062        let (http_result, _) = tokio::join!(resp_future, async {
3063            if let Some(mut envelope) = rx.recv().await {
3064                // Set a custom status code
3065                envelope.exchange.input.set_header(
3066                    "CamelHttpResponseCode",
3067                    serde_json::Value::Number(201.into()),
3068                );
3069                if let Some(reply_tx) = envelope.reply_tx {
3070                    let _ = reply_tx.send(Ok(envelope.exchange));
3071                }
3072            }
3073        });
3074
3075        let resp = http_result.unwrap();
3076        assert_eq!(resp.status().as_u16(), 201);
3077
3078        token.cancel();
3079    }
3080
3081    #[tokio::test]
3082    async fn test_http_consumer_returns_503_when_inflight_limit_reached() {
3083        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3084
3085        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3086        let port = listener.local_addr().unwrap().port();
3087        drop(listener);
3088
3089        let consumer_cfg = HttpServerConfig {
3090            host: "127.0.0.1".to_string(),
3091            port,
3092            path: "/saturation".to_string(),
3093            max_request_body: 2 * 1024 * 1024,
3094            max_response_body: 10 * 1024 * 1024,
3095            max_inflight_requests: 1,
3096        };
3097        let mut consumer = HttpConsumer::new(consumer_cfg);
3098
3099        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3100        let token = tokio_util::sync::CancellationToken::new();
3101        let ctx = ConsumerContext::new(tx, token.clone());
3102        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3103        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3104
3105        let (first_seen_tx, first_seen_rx) = tokio::sync::oneshot::channel::<()>();
3106        let (unblock_first_tx, unblock_first_rx) = tokio::sync::oneshot::channel::<()>();
3107
3108        tokio::spawn(async move {
3109            let mut first_seen_tx = Some(first_seen_tx);
3110            let mut unblock_first_rx = Some(unblock_first_rx);
3111
3112            while let Some(envelope) = rx.recv().await {
3113                if let Some(tx) = first_seen_tx.take() {
3114                    let _ = tx.send(());
3115                    if let Some(rx_unblock) = unblock_first_rx.take() {
3116                        let _ = rx_unblock.await;
3117                    }
3118                }
3119
3120                if let Some(reply_tx) = envelope.reply_tx {
3121                    let _ = reply_tx.send(Ok(envelope.exchange));
3122                }
3123            }
3124        });
3125
3126        let client = reqwest::Client::new();
3127        let first_req = {
3128            let client = client.clone();
3129            async move {
3130                client
3131                    .get(format!("http://127.0.0.1:{port}/saturation"))
3132                    .send()
3133                    .await
3134                    .unwrap()
3135            }
3136        };
3137
3138        let first_handle = tokio::spawn(first_req);
3139        first_seen_rx.await.unwrap();
3140
3141        let second_resp = client
3142            .get(format!("http://127.0.0.1:{port}/saturation"))
3143            .send()
3144            .await
3145            .unwrap();
3146
3147        assert_eq!(second_resp.status().as_u16(), 503);
3148
3149        let _ = unblock_first_tx.send(());
3150        let first_resp = first_handle.await.unwrap();
3151        assert_eq!(first_resp.status().as_u16(), 200);
3152
3153        token.cancel();
3154    }
3155
3156    #[tokio::test]
3157    async fn test_http_consumer_enforces_max_response_body_for_bytes() {
3158        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3159
3160        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3161
3162        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3163        let port = listener.local_addr().unwrap().port();
3164        drop(listener);
3165
3166        let consumer_cfg = HttpServerConfig {
3167            host: "127.0.0.1".to_string(),
3168            port,
3169            path: "/limit-bytes".to_string(),
3170            max_request_body: 2 * 1024 * 1024,
3171            max_response_body: 16,
3172            max_inflight_requests: 1024,
3173        };
3174        let mut consumer = HttpConsumer::new(consumer_cfg);
3175
3176        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3177        let token = tokio_util::sync::CancellationToken::new();
3178        let ctx = ConsumerContext::new(tx, token.clone());
3179        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3180        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3181
3182        let client = reqwest::Client::new();
3183        let send_fut = client
3184            .get(format!("http://127.0.0.1:{port}/limit-bytes"))
3185            .send();
3186
3187        let (http_result, _) = tokio::join!(send_fut, async {
3188            if let Some(mut envelope) = rx.recv().await {
3189                envelope.exchange.input.body =
3190                    camel_component_api::Body::Bytes(bytes::Bytes::from(vec![b'x'; 32]));
3191                if let Some(reply_tx) = envelope.reply_tx {
3192                    let _ = reply_tx.send(Ok(envelope.exchange));
3193                }
3194            }
3195        });
3196
3197        let resp = http_result.unwrap();
3198        assert_eq!(resp.status().as_u16(), 500);
3199        let body = resp.text().await.unwrap();
3200        assert_eq!(body, "Response body exceeds configured limit");
3201        token.cancel();
3202    }
3203
3204    #[tokio::test]
3205    async fn test_http_consumer_enforces_max_response_body_for_json() {
3206        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3207
3208        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3209
3210        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3211        let port = listener.local_addr().unwrap().port();
3212        drop(listener);
3213
3214        let consumer_cfg = HttpServerConfig {
3215            host: "127.0.0.1".to_string(),
3216            port,
3217            path: "/limit-json".to_string(),
3218            max_request_body: 2 * 1024 * 1024,
3219            max_response_body: 16,
3220            max_inflight_requests: 1024,
3221        };
3222        let mut consumer = HttpConsumer::new(consumer_cfg);
3223
3224        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3225        let token = tokio_util::sync::CancellationToken::new();
3226        let ctx = ConsumerContext::new(tx, token.clone());
3227        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3228        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3229
3230        let client = reqwest::Client::new();
3231        let send_fut = client
3232            .get(format!("http://127.0.0.1:{port}/limit-json"))
3233            .send();
3234
3235        let (http_result, _) = tokio::join!(send_fut, async {
3236            if let Some(mut envelope) = rx.recv().await {
3237                envelope.exchange.input.body = camel_component_api::Body::Json(
3238                    serde_json::json!({"message":"this response is bigger than sixteen"}),
3239                );
3240                if let Some(reply_tx) = envelope.reply_tx {
3241                    let _ = reply_tx.send(Ok(envelope.exchange));
3242                }
3243            }
3244        });
3245
3246        let resp = http_result.unwrap();
3247        assert_eq!(resp.status().as_u16(), 500);
3248        let body = resp.text().await.unwrap();
3249        assert_eq!(body, "Response body exceeds configured limit");
3250        token.cancel();
3251    }
3252
3253    #[tokio::test]
3254    async fn test_http_consumer_enforces_max_response_body_for_xml() {
3255        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3256
3257        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3258
3259        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3260        let port = listener.local_addr().unwrap().port();
3261        drop(listener);
3262
3263        let consumer_cfg = HttpServerConfig {
3264            host: "127.0.0.1".to_string(),
3265            port,
3266            path: "/limit-xml".to_string(),
3267            max_request_body: 2 * 1024 * 1024,
3268            max_response_body: 16,
3269            max_inflight_requests: 1024,
3270        };
3271        let mut consumer = HttpConsumer::new(consumer_cfg);
3272
3273        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3274        let token = tokio_util::sync::CancellationToken::new();
3275        let ctx = ConsumerContext::new(tx, token.clone());
3276        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3277        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3278
3279        let client = reqwest::Client::new();
3280        let send_fut = client
3281            .get(format!("http://127.0.0.1:{port}/limit-xml"))
3282            .send();
3283
3284        let (http_result, _) = tokio::join!(send_fut, async {
3285            if let Some(mut envelope) = rx.recv().await {
3286                envelope.exchange.input.body = camel_component_api::Body::Xml(
3287                    "<root><value>way-too-large</value></root>".into(),
3288                );
3289                if let Some(reply_tx) = envelope.reply_tx {
3290                    let _ = reply_tx.send(Ok(envelope.exchange));
3291                }
3292            }
3293        });
3294
3295        let resp = http_result.unwrap();
3296        assert_eq!(resp.status().as_u16(), 500);
3297        let body = resp.text().await.unwrap();
3298        assert_eq!(body, "Response body exceeds configured limit");
3299        token.cancel();
3300    }
3301
3302    #[tokio::test]
3303    async fn test_http_consumer_does_not_enforce_max_response_body_for_stream() {
3304        use camel_component_api::{
3305            CamelError, ConsumerContext, ExchangeEnvelope, StreamBody, StreamMetadata,
3306        };
3307        use futures::stream;
3308
3309        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3310
3311        let listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();
3312        let port = listener.local_addr().unwrap().port();
3313        drop(listener);
3314
3315        let consumer_cfg = HttpServerConfig {
3316            host: "0.0.0.0".to_string(),
3317            port,
3318            path: "/limit-stream".to_string(),
3319            max_request_body: 2 * 1024 * 1024,
3320            max_response_body: 16,
3321            max_inflight_requests: 1024,
3322        };
3323        let mut consumer = HttpConsumer::new(consumer_cfg);
3324
3325        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3326        let token = tokio_util::sync::CancellationToken::new();
3327        let ctx = ConsumerContext::new(tx, token.clone());
3328        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3329        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3330
3331        let client = reqwest::Client::new();
3332        let send_fut = client
3333            .get(format!("http://127.0.0.1:{port}/limit-stream"))
3334            .send();
3335
3336        let (http_result, _) = tokio::join!(send_fut, async {
3337            if let Some(mut envelope) = rx.recv().await {
3338                let chunks: Vec<Result<bytes::Bytes, CamelError>> =
3339                    vec![Ok(bytes::Bytes::from(vec![b'x'; 32]))];
3340                let stream = Box::pin(stream::iter(chunks));
3341                envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
3342                    stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
3343                    metadata: StreamMetadata {
3344                        size_hint: Some(32),
3345                        content_type: Some("application/octet-stream".into()),
3346                        origin: None,
3347                    },
3348                });
3349                if let Some(reply_tx) = envelope.reply_tx {
3350                    let _ = reply_tx.send(Ok(envelope.exchange));
3351                }
3352            }
3353        });
3354
3355        let resp = http_result.unwrap();
3356        assert_eq!(resp.status().as_u16(), 200);
3357        let body = resp.bytes().await.unwrap();
3358        assert_eq!(body.len(), 32);
3359        token.cancel();
3360    }
3361
3362    // -----------------------------------------------------------------------
3363    // Integration tests
3364    // -----------------------------------------------------------------------
3365
3366    #[tokio::test]
3367    async fn test_integration_single_consumer_round_trip() {
3368        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3369
3370        // Get an OS-assigned free port (ephemeral)
3371        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3372        let port = listener.local_addr().unwrap().port();
3373        drop(listener); // Release — ServerRegistry will rebind
3374
3375        let component = HttpComponent::new();
3376        let endpoint_ctx = NoOpComponentContext;
3377        let endpoint = component
3378            .create_endpoint(&format!("http://127.0.0.1:{port}/echo"), &endpoint_ctx)
3379            .unwrap();
3380        let mut consumer = endpoint.create_consumer().unwrap();
3381
3382        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3383        let token = tokio_util::sync::CancellationToken::new();
3384        let ctx = ConsumerContext::new(tx, token.clone());
3385
3386        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3387        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3388
3389        let client = reqwest::Client::new();
3390        let send_fut = client
3391            .post(format!("http://127.0.0.1:{port}/echo"))
3392            .header("Content-Type", "text/plain")
3393            .body("ping")
3394            .send();
3395
3396        let (http_result, _) = tokio::join!(send_fut, async {
3397            if let Some(mut envelope) = rx.recv().await {
3398                assert_eq!(
3399                    envelope.exchange.input.header("CamelHttpMethod"),
3400                    Some(&serde_json::Value::String("POST".into()))
3401                );
3402                assert_eq!(
3403                    envelope.exchange.input.header("CamelHttpPath"),
3404                    Some(&serde_json::Value::String("/echo".into()))
3405                );
3406                envelope.exchange.input.body = camel_component_api::Body::Text("pong".to_string());
3407                if let Some(reply_tx) = envelope.reply_tx {
3408                    let _ = reply_tx.send(Ok(envelope.exchange));
3409                }
3410            }
3411        });
3412
3413        let resp = http_result.unwrap();
3414        assert_eq!(resp.status().as_u16(), 200);
3415        let body = resp.text().await.unwrap();
3416        assert_eq!(body, "pong");
3417
3418        token.cancel();
3419    }
3420
3421    #[tokio::test]
3422    async fn test_integration_two_consumers_shared_port() {
3423        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3424
3425        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3426
3427        // Get an OS-assigned free port (ephemeral)
3428        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3429        let port = listener.local_addr().unwrap().port();
3430        drop(listener);
3431
3432        let component = HttpComponent::new();
3433        let endpoint_ctx = NoOpComponentContext;
3434
3435        // Consumer A: /hello
3436        let endpoint_a = component
3437            .create_endpoint(&format!("http://127.0.0.1:{port}/hello"), &endpoint_ctx)
3438            .unwrap();
3439        let mut consumer_a = endpoint_a.create_consumer().unwrap();
3440
3441        // Consumer B: /world
3442        let endpoint_b = component
3443            .create_endpoint(&format!("http://127.0.0.1:{port}/world"), &endpoint_ctx)
3444            .unwrap();
3445        let mut consumer_b = endpoint_b.create_consumer().unwrap();
3446
3447        let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3448        let token_a = tokio_util::sync::CancellationToken::new();
3449        let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
3450
3451        let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3452        let token_b = tokio_util::sync::CancellationToken::new();
3453        let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
3454
3455        tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
3456        tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
3457        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3458
3459        let client = reqwest::Client::new();
3460
3461        // Request to /hello
3462        let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
3463        let (resp_hello, _) = tokio::join!(fut_hello, async {
3464            if let Some(mut envelope) = rx_a.recv().await {
3465                envelope.exchange.input.body =
3466                    camel_component_api::Body::Text("hello-response".to_string());
3467                if let Some(reply_tx) = envelope.reply_tx {
3468                    let _ = reply_tx.send(Ok(envelope.exchange));
3469                }
3470            }
3471        });
3472
3473        // Request to /world
3474        let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
3475        let (resp_world, _) = tokio::join!(fut_world, async {
3476            if let Some(mut envelope) = rx_b.recv().await {
3477                envelope.exchange.input.body =
3478                    camel_component_api::Body::Text("world-response".to_string());
3479                if let Some(reply_tx) = envelope.reply_tx {
3480                    let _ = reply_tx.send(Ok(envelope.exchange));
3481                }
3482            }
3483        });
3484
3485        let body_a = resp_hello.unwrap().text().await.unwrap();
3486        let body_b = resp_world.unwrap().text().await.unwrap();
3487
3488        assert_eq!(body_a, "hello-response");
3489        assert_eq!(body_b, "world-response");
3490
3491        token_a.cancel();
3492        token_b.cancel();
3493    }
3494
3495    #[tokio::test]
3496    async fn test_integration_unregistered_path_returns_404() {
3497        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3498
3499        // Get an OS-assigned free port (ephemeral)
3500        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3501        let port = listener.local_addr().unwrap().port();
3502        drop(listener);
3503
3504        let component = HttpComponent::new();
3505        let endpoint_ctx = NoOpComponentContext;
3506        let endpoint = component
3507            .create_endpoint(
3508                &format!("http://127.0.0.1:{port}/registered"),
3509                &endpoint_ctx,
3510            )
3511            .unwrap();
3512        let mut consumer = endpoint.create_consumer().unwrap();
3513
3514        let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3515        let token = tokio_util::sync::CancellationToken::new();
3516        let ctx = ConsumerContext::new(tx, token.clone());
3517
3518        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3519
3520        // Wait until the server is actually accepting connections (CI runners can be slow).
3521        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
3522        loop {
3523            if tokio::net::TcpStream::connect(format!("127.0.0.1:{port}"))
3524                .await
3525                .is_ok()
3526            {
3527                break;
3528            }
3529            if std::time::Instant::now() >= deadline {
3530                panic!("HTTP server did not start within 5s on port {port}");
3531            }
3532            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3533        }
3534
3535        let client = reqwest::Client::new();
3536        let resp = client
3537            .get(format!("http://127.0.0.1:{port}/not-there"))
3538            .send()
3539            .await
3540            .unwrap();
3541        assert_eq!(resp.status().as_u16(), 404);
3542
3543        token.cancel();
3544    }
3545
3546    #[test]
3547    fn test_http_consumer_declares_concurrent() {
3548        use camel_component_api::ConcurrencyModel;
3549
3550        let config = HttpServerConfig {
3551            host: "127.0.0.1".to_string(),
3552            port: 19999,
3553            path: "/test".to_string(),
3554            max_request_body: 2 * 1024 * 1024,
3555            max_response_body: 10 * 1024 * 1024,
3556            max_inflight_requests: 1024,
3557        };
3558        let consumer = HttpConsumer::new(config);
3559        assert_eq!(
3560            consumer.concurrency_model(),
3561            ConcurrencyModel::Concurrent { max: None }
3562        );
3563    }
3564
3565    // -----------------------------------------------------------------------
3566    // HttpReplyBody streaming tests
3567    // -----------------------------------------------------------------------
3568
3569    #[tokio::test]
3570    async fn test_http_reply_body_stream_variant_exists() {
3571        use bytes::Bytes;
3572        use camel_component_api::CamelError;
3573        use futures::stream;
3574
3575        let chunks: Vec<Result<Bytes, CamelError>> =
3576            vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
3577        let stream = Box::pin(stream::iter(chunks));
3578        let reply_body = HttpReplyBody::Stream(stream);
3579        // Si compila y el match funciona, el test pasa
3580        match reply_body {
3581            HttpReplyBody::Stream(_) => {}
3582            HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
3583        }
3584    }
3585
3586    // -----------------------------------------------------------------------
3587    // OpenTelemetry propagation tests (only compiled with "otel" feature)
3588    // -----------------------------------------------------------------------
3589
3590    #[cfg(feature = "otel")]
3591    mod otel_tests {
3592        use super::*;
3593        use camel_component_api::Message;
3594        use tower::ServiceExt;
3595
3596        #[tokio::test]
3597        async fn test_producer_injects_traceparent_header() {
3598            let (url, _handle) = start_test_server_with_header_capture().await;
3599            let ctx = test_producer_ctx();
3600
3601            let component = HttpComponent::new();
3602            let endpoint_ctx = NoOpComponentContext;
3603            let endpoint = component
3604                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3605                .unwrap();
3606            let producer = endpoint.create_producer(&ctx).unwrap();
3607
3608            // Create exchange with an OTel context by extracting from a traceparent header
3609            let mut exchange = Exchange::new(Message::default());
3610            let mut headers = std::collections::HashMap::new();
3611            headers.insert(
3612                "traceparent".to_string(),
3613                "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
3614            );
3615            camel_otel::extract_into_exchange(&mut exchange, &headers);
3616
3617            let result = producer.oneshot(exchange).await.unwrap();
3618
3619            // Verify request succeeded
3620            let status = result
3621                .input
3622                .header("CamelHttpResponseCode")
3623                .and_then(|v| v.as_u64())
3624                .unwrap();
3625            assert_eq!(status, 200);
3626
3627            // The test server echoes back the received traceparent header
3628            let traceparent = result.input.header("X-Received-Traceparent");
3629            assert!(
3630                traceparent.is_some(),
3631                "traceparent header should have been sent"
3632            );
3633
3634            let traceparent_str = traceparent.unwrap().as_str().unwrap();
3635            // Verify format: version-traceid-spanid-flags
3636            let parts: Vec<&str> = traceparent_str.split('-').collect();
3637            assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3638            assert_eq!(parts[0], "00", "version should be 00");
3639            assert_eq!(
3640                parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3641                "trace-id should match"
3642            );
3643            assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
3644            assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
3645        }
3646
3647        #[tokio::test]
3648        async fn test_consumer_extracts_traceparent_header() {
3649            use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3650
3651            // Get an OS-assigned free port
3652            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3653            let port = listener.local_addr().unwrap().port();
3654            drop(listener);
3655
3656            let component = HttpComponent::new();
3657            let endpoint_ctx = NoOpComponentContext;
3658            let endpoint = component
3659                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
3660                .unwrap();
3661            let mut consumer = endpoint.create_consumer().unwrap();
3662
3663            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3664            let token = tokio_util::sync::CancellationToken::new();
3665            let ctx = ConsumerContext::new(tx, token.clone());
3666
3667            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3668            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3669
3670            // Send request with traceparent header
3671            let client = reqwest::Client::new();
3672            let send_fut = client
3673                .post(format!("http://127.0.0.1:{port}/trace"))
3674                .header(
3675                    "traceparent",
3676                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3677                )
3678                .body("test")
3679                .send();
3680
3681            let (http_result, _) = tokio::join!(send_fut, async {
3682                if let Some(envelope) = rx.recv().await {
3683                    // Verify the exchange has a valid OTel context by re-injecting it
3684                    // and checking the traceparent matches
3685                    let mut injected_headers = std::collections::HashMap::new();
3686                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3687
3688                    assert!(
3689                        injected_headers.contains_key("traceparent"),
3690                        "Exchange should have traceparent after extraction"
3691                    );
3692
3693                    let traceparent = injected_headers.get("traceparent").unwrap();
3694                    let parts: Vec<&str> = traceparent.split('-').collect();
3695                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3696                    assert_eq!(
3697                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3698                        "Trace ID should match the original traceparent header"
3699                    );
3700
3701                    if let Some(reply_tx) = envelope.reply_tx {
3702                        let _ = reply_tx.send(Ok(envelope.exchange));
3703                    }
3704                }
3705            });
3706
3707            let resp = http_result.unwrap();
3708            assert_eq!(resp.status().as_u16(), 200);
3709
3710            token.cancel();
3711        }
3712
3713        #[tokio::test]
3714        async fn test_consumer_extracts_mixed_case_traceparent_header() {
3715            use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3716
3717            // Get an OS-assigned free port
3718            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3719            let port = listener.local_addr().unwrap().port();
3720            drop(listener);
3721
3722            let component = HttpComponent::new();
3723            let endpoint_ctx = NoOpComponentContext;
3724            let endpoint = component
3725                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
3726                .unwrap();
3727            let mut consumer = endpoint.create_consumer().unwrap();
3728
3729            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3730            let token = tokio_util::sync::CancellationToken::new();
3731            let ctx = ConsumerContext::new(tx, token.clone());
3732
3733            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3734            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3735
3736            // Send request with MIXED-CASE TraceParent header (not lowercase)
3737            let client = reqwest::Client::new();
3738            let send_fut = client
3739                .post(format!("http://127.0.0.1:{port}/trace"))
3740                .header(
3741                    "TraceParent",
3742                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3743                )
3744                .body("test")
3745                .send();
3746
3747            let (http_result, _) = tokio::join!(send_fut, async {
3748                if let Some(envelope) = rx.recv().await {
3749                    // Verify the exchange has a valid OTel context by re-injecting it
3750                    // and checking the traceparent matches
3751                    let mut injected_headers = HashMap::new();
3752                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3753
3754                    assert!(
3755                        injected_headers.contains_key("traceparent"),
3756                        "Exchange should have traceparent after extraction from mixed-case header"
3757                    );
3758
3759                    let traceparent = injected_headers.get("traceparent").unwrap();
3760                    let parts: Vec<&str> = traceparent.split('-').collect();
3761                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3762                    assert_eq!(
3763                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3764                        "Trace ID should match the original mixed-case TraceParent header"
3765                    );
3766
3767                    if let Some(reply_tx) = envelope.reply_tx {
3768                        let _ = reply_tx.send(Ok(envelope.exchange));
3769                    }
3770                }
3771            });
3772
3773            let resp = http_result.unwrap();
3774            assert_eq!(resp.status().as_u16(), 200);
3775
3776            token.cancel();
3777        }
3778
3779        #[tokio::test]
3780        async fn test_producer_no_trace_context_no_crash() {
3781            let (url, _handle) = start_test_server().await;
3782            let ctx = test_producer_ctx();
3783
3784            let component = HttpComponent::new();
3785            let endpoint_ctx = NoOpComponentContext;
3786            let endpoint = component
3787                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3788                .unwrap();
3789            let producer = endpoint.create_producer(&ctx).unwrap();
3790
3791            // Create exchange with default (empty) otel_context - no trace context
3792            let exchange = Exchange::new(Message::default());
3793
3794            // Should succeed without panic
3795            let result = producer.oneshot(exchange).await.unwrap();
3796
3797            // Verify request succeeded
3798            let status = result
3799                .input
3800                .header("CamelHttpResponseCode")
3801                .and_then(|v| v.as_u64())
3802                .unwrap();
3803            assert_eq!(status, 200);
3804        }
3805
3806        /// Test server that captures and echoes back the traceparent header
3807        async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
3808            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3809            let addr = listener.local_addr().unwrap();
3810            let url = format!("http://127.0.0.1:{}", addr.port());
3811
3812            let handle = tokio::spawn(async move {
3813                loop {
3814                    if let Ok((mut stream, _)) = listener.accept().await {
3815                        tokio::spawn(async move {
3816                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
3817                            let mut buf = vec![0u8; 8192];
3818                            let n = stream.read(&mut buf).await.unwrap_or(0);
3819                            let request = String::from_utf8_lossy(&buf[..n]).to_string();
3820
3821                            // Extract traceparent header from request
3822                            let traceparent = request
3823                                .lines()
3824                                .find(|line| line.to_lowercase().starts_with("traceparent:"))
3825                                .map(|line| {
3826                                    line.split(':')
3827                                        .nth(1)
3828                                        .map(|s| s.trim().to_string())
3829                                        .unwrap_or_default()
3830                                })
3831                                .unwrap_or_default();
3832
3833                            let body =
3834                                format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
3835                            let response = format!(
3836                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
3837                                body.len(),
3838                                traceparent,
3839                                body
3840                            );
3841                            let _ = stream.write_all(response.as_bytes()).await;
3842                        });
3843                    }
3844                }
3845            });
3846
3847            (url, handle)
3848        }
3849    }
3850
3851    // -----------------------------------------------------------------------
3852    // Response streaming tests (Eje A - Task 2)
3853    // -----------------------------------------------------------------------
3854
3855    // -----------------------------------------------------------------------
3856    // Request streaming tests (Eje B - Task 3)
3857    // -----------------------------------------------------------------------
3858
3859    #[tokio::test]
3860    async fn test_request_body_arrives_as_stream() {
3861        use camel_component_api::Body;
3862        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3863
3864        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3865        let port = listener.local_addr().unwrap().port();
3866        drop(listener);
3867
3868        let component = HttpComponent::new();
3869        let endpoint_ctx = NoOpComponentContext;
3870        let endpoint = component
3871            .create_endpoint(&format!("http://127.0.0.1:{port}/upload"), &endpoint_ctx)
3872            .unwrap();
3873        let mut consumer = endpoint.create_consumer().unwrap();
3874
3875        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3876        let token = tokio_util::sync::CancellationToken::new();
3877        let ctx = ConsumerContext::new(tx, token.clone());
3878
3879        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3880        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3881
3882        let client = reqwest::Client::new();
3883        let send_fut = client
3884            .post(format!("http://127.0.0.1:{port}/upload"))
3885            .body("hello streaming world")
3886            .send();
3887
3888        let (http_result, _) = tokio::join!(send_fut, async {
3889            if let Some(mut envelope) = rx.recv().await {
3890                // Body must be Body::Stream, not Body::Text or Body::Bytes
3891                assert!(
3892                    matches!(envelope.exchange.input.body, Body::Stream(_)),
3893                    "expected Body::Stream, got discriminant {:?}",
3894                    std::mem::discriminant(&envelope.exchange.input.body)
3895                );
3896                // Materialize to verify content
3897                let bytes = envelope
3898                    .exchange
3899                    .input
3900                    .body
3901                    .into_bytes(1024 * 1024)
3902                    .await
3903                    .unwrap();
3904                assert_eq!(&bytes[..], b"hello streaming world");
3905
3906                envelope.exchange.input.body = camel_component_api::Body::Empty;
3907                if let Some(reply_tx) = envelope.reply_tx {
3908                    let _ = reply_tx.send(Ok(envelope.exchange));
3909                }
3910            }
3911        });
3912
3913        let resp = http_result.unwrap();
3914        assert_eq!(resp.status().as_u16(), 200);
3915
3916        token.cancel();
3917    }
3918
3919    // -----------------------------------------------------------------------
3920    // Response streaming tests (Eje A - Task 2)
3921    // -----------------------------------------------------------------------
3922
3923    #[tokio::test]
3924    async fn test_streaming_response_chunked() {
3925        use bytes::Bytes;
3926        use camel_component_api::Body;
3927        use camel_component_api::CamelError;
3928        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3929        use camel_component_api::{StreamBody, StreamMetadata};
3930        use futures::stream;
3931        use std::sync::Arc;
3932        use tokio::sync::Mutex;
3933
3934        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3935        let port = listener.local_addr().unwrap().port();
3936        drop(listener);
3937
3938        let component = HttpComponent::new();
3939        let endpoint_ctx = NoOpComponentContext;
3940        let endpoint = component
3941            .create_endpoint(&format!("http://127.0.0.1:{port}/stream"), &endpoint_ctx)
3942            .unwrap();
3943        let mut consumer = endpoint.create_consumer().unwrap();
3944
3945        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3946        let token = tokio_util::sync::CancellationToken::new();
3947        let ctx = ConsumerContext::new(tx, token.clone());
3948
3949        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3950        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3951
3952        let client = reqwest::Client::new();
3953        let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
3954
3955        let (http_result, _) = tokio::join!(send_fut, async {
3956            if let Some(mut envelope) = rx.recv().await {
3957                // Respond with Body::Stream
3958                let chunks: Vec<Result<Bytes, CamelError>> =
3959                    vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
3960                let stream = Box::pin(stream::iter(chunks));
3961                envelope.exchange.input.body = Body::Stream(StreamBody {
3962                    stream: Arc::new(Mutex::new(Some(stream))),
3963                    metadata: StreamMetadata::default(),
3964                });
3965                if let Some(reply_tx) = envelope.reply_tx {
3966                    let _ = reply_tx.send(Ok(envelope.exchange));
3967                }
3968            }
3969        });
3970
3971        let resp = http_result.unwrap();
3972        assert_eq!(resp.status().as_u16(), 200);
3973        let body = resp.text().await.unwrap();
3974        assert_eq!(body, "chunk1chunk2");
3975
3976        token.cancel();
3977    }
3978
3979    // -----------------------------------------------------------------------
3980    // 413 Content-Length limit test (Task 4)
3981    // -----------------------------------------------------------------------
3982
3983    #[tokio::test]
3984    async fn test_413_when_content_length_exceeds_limit() {
3985        use camel_component_api::ConsumerContext;
3986
3987        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3988        let port = listener.local_addr().unwrap().port();
3989        drop(listener);
3990
3991        // maxRequestBody=100 — any request declaring more than 100 bytes must get 413
3992        let component = HttpComponent::new();
3993        let endpoint_ctx = NoOpComponentContext;
3994        let endpoint = component
3995            .create_endpoint(
3996                &format!("http://127.0.0.1:{port}/upload?maxRequestBody=100"),
3997                &endpoint_ctx,
3998            )
3999            .unwrap();
4000        let mut consumer = endpoint.create_consumer().unwrap();
4001
4002        let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4003        let token = tokio_util::sync::CancellationToken::new();
4004        let ctx = ConsumerContext::new(tx, token.clone());
4005
4006        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4007        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4008
4009        let client = reqwest::Client::new();
4010        let resp = client
4011            .post(format!("http://127.0.0.1:{port}/upload"))
4012            .header("Content-Length", "1000") // declares 1000 bytes, limit is 100
4013            .body("x".repeat(1000))
4014            .send()
4015            .await
4016            .unwrap();
4017
4018        assert_eq!(resp.status().as_u16(), 413);
4019
4020        token.cancel();
4021    }
4022
4023    /// Chunked upload without Content-Length header must NOT be rejected by maxRequestBody.
4024    /// The spec says: "If there is no Content-Length, the limit does not apply at the
4025    /// consumer level — the route is responsible."
4026    #[tokio::test]
4027    async fn test_chunked_upload_without_content_length_bypasses_limit() {
4028        use bytes::Bytes;
4029        use camel_component_api::Body;
4030        use camel_component_api::ConsumerContext;
4031        use futures::stream;
4032
4033        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4034        let port = listener.local_addr().unwrap().port();
4035        drop(listener);
4036
4037        // maxRequestBody=10 — very small limit; chunked uploads have no Content-Length
4038        let component = HttpComponent::new();
4039        let endpoint_ctx = NoOpComponentContext;
4040        let endpoint = component
4041            .create_endpoint(
4042                &format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"),
4043                &endpoint_ctx,
4044            )
4045            .unwrap();
4046        let mut consumer = endpoint.create_consumer().unwrap();
4047
4048        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4049        let token = tokio_util::sync::CancellationToken::new();
4050        let ctx = ConsumerContext::new(tx, token.clone());
4051
4052        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4053        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4054
4055        let client = reqwest::Client::new();
4056
4057        // Use wrap_stream so reqwest sends chunked transfer encoding WITHOUT a
4058        // Content-Length header. 100 bytes exceeds the 10-byte maxRequestBody limit,
4059        // but since there's no Content-Length the 413 check must NOT fire.
4060        let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
4061            Ok(Bytes::from("y".repeat(50))),
4062            Ok(Bytes::from("y".repeat(50))),
4063        ];
4064        let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
4065        let send_fut = client
4066            .post(format!("http://127.0.0.1:{port}/upload"))
4067            .body(stream_body)
4068            .send();
4069
4070        let consumer_fut = async {
4071            // Use timeout to avoid deadlock if the handler rejects before enqueueing
4072            match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
4073                Ok(Some(mut envelope)) => {
4074                    assert!(
4075                        matches!(envelope.exchange.input.body, Body::Stream(_)),
4076                        "expected Body::Stream"
4077                    );
4078                    envelope.exchange.input.body = camel_component_api::Body::Empty;
4079                    if let Some(reply_tx) = envelope.reply_tx {
4080                        let _ = reply_tx.send(Ok(envelope.exchange));
4081                    }
4082                }
4083                Ok(None) => panic!("consumer channel closed unexpectedly"),
4084                Err(_) => {
4085                    // Timeout: the request was rejected before reaching the consumer.
4086                    // The HTTP response will carry the real status code (we check below).
4087                }
4088            }
4089        };
4090
4091        let (http_result, _) = tokio::join!(send_fut, consumer_fut);
4092
4093        let resp = http_result.unwrap();
4094        // Must NOT be 413; chunked uploads without Content-Length bypass the limit.
4095        assert_ne!(
4096            resp.status().as_u16(),
4097            413,
4098            "chunked upload must not be rejected by maxRequestBody"
4099        );
4100        assert_eq!(resp.status().as_u16(), 200);
4101
4102        token.cancel();
4103    }
4104
4105    #[test]
4106    fn test_validate_url_for_ssrf_blocks_and_allows_hosts() {
4107        let mut cfg = HttpEndpointConfig::from_uri("http://example.com").unwrap();
4108        cfg.blocked_hosts = vec!["blocked.local".to_string()];
4109        cfg.allow_private_ips = false;
4110
4111        let blocked = validate_url_for_ssrf("http://blocked.local/api", &cfg);
4112        assert!(blocked.is_err());
4113
4114        let private_ip = validate_url_for_ssrf("http://127.0.0.1/api", &cfg);
4115        assert!(private_ip.is_err());
4116
4117        cfg.allow_private_ips = true;
4118        let allowed = validate_url_for_ssrf("http://127.0.0.1/api", &cfg);
4119        assert!(allowed.is_ok());
4120    }
4121
4122    #[test]
4123    fn test_is_private_ip_ranges() {
4124        assert!(is_private_ip(&"10.0.0.1".parse().unwrap())); // allow-unwrap
4125        assert!(is_private_ip(&"172.16.1.10".parse().unwrap())); // allow-unwrap
4126        assert!(is_private_ip(&"192.168.1.1".parse().unwrap())); // allow-unwrap
4127        assert!(is_private_ip(&"127.0.0.1".parse().unwrap())); // allow-unwrap
4128        assert!(is_private_ip(&"169.254.1.1".parse().unwrap())); // allow-unwrap
4129        assert!(is_private_ip(&"0.1.2.3".parse().unwrap())); // allow-unwrap
4130
4131        assert!(is_private_ip(&"::1".parse().unwrap())); // allow-unwrap
4132        assert!(is_private_ip(&"fc00::1".parse().unwrap())); // allow-unwrap
4133        assert!(is_private_ip(&"fd12::1".parse().unwrap())); // allow-unwrap
4134        assert!(is_private_ip(&"fe80::1".parse().unwrap())); // allow-unwrap
4135        // ::ffff:0:0/96 (IPv4-mapped): only private if the mapped IPv4 is private
4136        assert!(is_private_ip(&"::ffff:10.0.0.1".parse().unwrap())); // allow-unwrap
4137        assert!(is_private_ip(&"::ffff:192.168.1.1".parse().unwrap())); // allow-unwrap
4138        assert!(is_private_ip(&"::ffff:127.0.0.1".parse().unwrap())); // allow-unwrap
4139
4140        assert!(!is_private_ip(&"8.8.8.8".parse().unwrap())); // allow-unwrap
4141        assert!(!is_private_ip(&"::ffff:8.8.8.8".parse().unwrap())); // allow-unwrap — public IPv4-mapped
4142        assert!(!is_private_ip(&"2001:4860:4860::8888".parse().unwrap())); // allow-unwrap
4143    }
4144
4145    #[tokio::test]
4146    async fn test_validate_resolved_host_for_ssrf_blocks_resolved_private_ip() {
4147        let mut cfg = HttpEndpointConfig::from_uri("http://example.com").unwrap(); // allow-unwrap
4148        cfg.allow_private_ips = false;
4149
4150        let err = validate_resolved_host_for_ssrf("http://localhost", &cfg)
4151            .await
4152            .expect_err("localhost must resolve to loopback and be blocked");
4153
4154        let msg = err.to_string();
4155        assert!(msg.contains("Target resolved to private IP"));
4156    }
4157
4158    #[test]
4159    fn test_title_case_header() {
4160        assert_eq!(title_case_header("content-type"), "Content-Type");
4161        assert_eq!(title_case_header("authorization"), "Authorization");
4162        assert_eq!(title_case_header("x-custom-header"), "X-Custom-Header");
4163        assert_eq!(title_case_header("host"), "Host");
4164        assert_eq!(title_case_header("x-b3-traceid"), "X-B3-Traceid");
4165        assert_eq!(title_case_header("single"), "Single");
4166        assert_eq!(title_case_header(""), "");
4167    }
4168
4169    #[test]
4170    fn test_resolve_url_combines_path_and_query_sources() {
4171        let cfg = HttpEndpointConfig::from_uri("http://example.com/base?foo=bar").unwrap();
4172        let mut exchange = Exchange::new(Message::default());
4173        exchange.input.set_header(
4174            "CamelHttpPath",
4175            serde_json::Value::String("next".to_string()),
4176        );
4177        let url = HttpProducer::resolve_url(&exchange, &cfg);
4178        assert!(url.starts_with("http://example.com/base/next?"));
4179        assert!(url.contains("foo=bar"));
4180
4181        exchange.input.set_header(
4182            "CamelHttpUri",
4183            serde_json::Value::String("http://other.test/root".to_string()),
4184        );
4185        exchange.input.set_header(
4186            "CamelHttpQuery",
4187            serde_json::Value::String("a=1&b=2".to_string()),
4188        );
4189
4190        let override_url = HttpProducer::resolve_url(&exchange, &cfg);
4191        assert_eq!(override_url, "http://other.test/root/next?a=1&b=2");
4192    }
4193
4194    #[test]
4195    fn test_http_producer_helpers_status_and_size_boundaries() {
4196        assert!(HttpProducer::is_ok_status(200, (200, 299)));
4197        assert!(HttpProducer::is_ok_status(299, (200, 299)));
4198        assert!(!HttpProducer::is_ok_status(199, (200, 299)));
4199        assert!(!HttpProducer::is_ok_status(300, (200, 299)));
4200
4201        assert!(!exceeds_max_response_body(10, 10));
4202        assert!(exceeds_max_response_body(11, 10));
4203    }
4204
4205    // -----------------------------------------------------------------------
4206    // Content-Type inference tests
4207    // -----------------------------------------------------------------------
4208
4209    async fn setup_consumer_on_free_port(
4210        path: &str,
4211    ) -> (
4212        u16,
4213        tokio::sync::mpsc::Receiver<camel_component_api::ExchangeEnvelope>,
4214        tokio_util::sync::CancellationToken,
4215    ) {
4216        use camel_component_api::ConsumerContext;
4217
4218        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4219        let port = listener.local_addr().unwrap().port();
4220        drop(listener);
4221
4222        let consumer_cfg = HttpServerConfig {
4223            host: "127.0.0.1".to_string(),
4224            port,
4225            path: path.to_string(),
4226            max_request_body: 2 * 1024 * 1024,
4227            max_response_body: 10 * 1024 * 1024,
4228            max_inflight_requests: 1024,
4229        };
4230        let mut consumer = HttpConsumer::new(consumer_cfg);
4231
4232        let (tx, rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4233        let token = tokio_util::sync::CancellationToken::new();
4234        let ctx = ConsumerContext::new(tx, token.clone());
4235
4236        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4237        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4238
4239        (port, rx, token)
4240    }
4241
4242    #[tokio::test]
4243    async fn test_content_type_inferred_for_json_body() {
4244        let (port, mut rx, token) = setup_consumer_on_free_port("/json").await;
4245
4246        let client = reqwest::Client::new();
4247        let send_fut = client.get(format!("http://127.0.0.1:{port}/json")).send();
4248
4249        let (http_result, _) = tokio::join!(send_fut, async {
4250            if let Some(mut envelope) = rx.recv().await {
4251                envelope.exchange.input.body =
4252                    camel_component_api::Body::Json(serde_json::json!({"message": "hello"}));
4253                if let Some(reply_tx) = envelope.reply_tx {
4254                    let _ = reply_tx.send(Ok(envelope.exchange));
4255                }
4256            }
4257        });
4258
4259        let resp = http_result.unwrap();
4260        assert_eq!(resp.status().as_u16(), 200);
4261        let ct = resp
4262            .headers()
4263            .get("content-type")
4264            .expect("Content-Type header should be present");
4265        assert_eq!(ct, "application/json");
4266        let body = resp.text().await.unwrap();
4267        assert_eq!(body, r#"{"message":"hello"}"#);
4268
4269        token.cancel();
4270    }
4271
4272    #[tokio::test]
4273    async fn test_content_type_inferred_for_text_body() {
4274        let (port, mut rx, token) = setup_consumer_on_free_port("/text").await;
4275
4276        let client = reqwest::Client::new();
4277        let send_fut = client.get(format!("http://127.0.0.1:{port}/text")).send();
4278
4279        let (http_result, _) = tokio::join!(send_fut, async {
4280            if let Some(mut envelope) = rx.recv().await {
4281                envelope.exchange.input.body =
4282                    camel_component_api::Body::Text("plain text response".to_string());
4283                if let Some(reply_tx) = envelope.reply_tx {
4284                    let _ = reply_tx.send(Ok(envelope.exchange));
4285                }
4286            }
4287        });
4288
4289        let resp = http_result.unwrap();
4290        assert_eq!(resp.status().as_u16(), 200);
4291        let ct = resp
4292            .headers()
4293            .get("content-type")
4294            .expect("Content-Type header should be present");
4295        assert_eq!(ct, "text/plain; charset=utf-8");
4296        let body = resp.text().await.unwrap();
4297        assert_eq!(body, "plain text response");
4298
4299        token.cancel();
4300    }
4301
4302    #[tokio::test]
4303    async fn test_content_type_inferred_for_xml_body() {
4304        let (port, mut rx, token) = setup_consumer_on_free_port("/xml").await;
4305
4306        let client = reqwest::Client::new();
4307        let send_fut = client.get(format!("http://127.0.0.1:{port}/xml")).send();
4308
4309        let (http_result, _) = tokio::join!(send_fut, async {
4310            if let Some(mut envelope) = rx.recv().await {
4311                envelope.exchange.input.body =
4312                    camel_component_api::Body::Xml("<root><item>value</item></root>".to_string());
4313                if let Some(reply_tx) = envelope.reply_tx {
4314                    let _ = reply_tx.send(Ok(envelope.exchange));
4315                }
4316            }
4317        });
4318
4319        let resp = http_result.unwrap();
4320        assert_eq!(resp.status().as_u16(), 200);
4321        let ct = resp
4322            .headers()
4323            .get("content-type")
4324            .expect("Content-Type header should be present");
4325        assert_eq!(ct, "application/xml");
4326        let body = resp.text().await.unwrap();
4327        assert_eq!(body, "<root><item>value</item></root>");
4328
4329        token.cancel();
4330    }
4331
4332    #[tokio::test]
4333    async fn test_no_content_type_for_empty_body() {
4334        let (port, mut rx, token) = setup_consumer_on_free_port("/empty").await;
4335
4336        let client = reqwest::Client::new();
4337        let send_fut = client.get(format!("http://127.0.0.1:{port}/empty")).send();
4338
4339        let (http_result, _) = tokio::join!(send_fut, async {
4340            if let Some(mut envelope) = rx.recv().await {
4341                envelope.exchange.input.body = camel_component_api::Body::Empty;
4342                if let Some(reply_tx) = envelope.reply_tx {
4343                    let _ = reply_tx.send(Ok(envelope.exchange));
4344                }
4345            }
4346        });
4347
4348        let resp = http_result.unwrap();
4349        assert_eq!(resp.status().as_u16(), 200);
4350        assert!(
4351            resp.headers().get("content-type").is_none(),
4352            "Empty body should not set Content-Type"
4353        );
4354
4355        token.cancel();
4356    }
4357
4358    #[tokio::test]
4359    async fn test_no_content_type_for_raw_bytes_body() {
4360        let (port, mut rx, token) = setup_consumer_on_free_port("/bytes").await;
4361
4362        let client = reqwest::Client::new();
4363        let send_fut = client.get(format!("http://127.0.0.1:{port}/bytes")).send();
4364
4365        let (http_result, _) = tokio::join!(send_fut, async {
4366            if let Some(mut envelope) = rx.recv().await {
4367                envelope.exchange.input.body =
4368                    camel_component_api::Body::Bytes(bytes::Bytes::from_static(b"\x00\x01\x02"));
4369                if let Some(reply_tx) = envelope.reply_tx {
4370                    let _ = reply_tx.send(Ok(envelope.exchange));
4371                }
4372            }
4373        });
4374
4375        let resp = http_result.unwrap();
4376        assert_eq!(resp.status().as_u16(), 200);
4377        assert!(
4378            resp.headers().get("content-type").is_none(),
4379            "Raw Bytes body should not set Content-Type"
4380        );
4381
4382        token.cancel();
4383    }
4384
4385    #[tokio::test]
4386    async fn test_content_type_from_stream_metadata() {
4387        use camel_component_api::{StreamBody, StreamMetadata};
4388        use futures::stream;
4389
4390        let (port, mut rx, token) = setup_consumer_on_free_port("/stream-ct").await;
4391
4392        let client = reqwest::Client::new();
4393        let send_fut = client
4394            .get(format!("http://127.0.0.1:{port}/stream-ct"))
4395            .send();
4396
4397        let (http_result, _) = tokio::join!(send_fut, async {
4398            if let Some(mut envelope) = rx.recv().await {
4399                let chunks: Vec<Result<bytes::Bytes, CamelError>> =
4400                    vec![Ok(bytes::Bytes::from("audio data"))];
4401                let stream = Box::pin(stream::iter(chunks));
4402                envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
4403                    stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
4404                    metadata: StreamMetadata {
4405                        size_hint: None,
4406                        content_type: Some("audio/mpeg".to_string()),
4407                        origin: None,
4408                    },
4409                });
4410                if let Some(reply_tx) = envelope.reply_tx {
4411                    let _ = reply_tx.send(Ok(envelope.exchange));
4412                }
4413            }
4414        });
4415
4416        let resp = http_result.unwrap();
4417        assert_eq!(resp.status().as_u16(), 200);
4418        let ct = resp
4419            .headers()
4420            .get("content-type")
4421            .expect("Content-Type header should be present");
4422        assert_eq!(ct, "audio/mpeg");
4423        let body = resp.text().await.unwrap();
4424        assert_eq!(body, "audio data");
4425
4426        token.cancel();
4427    }
4428
4429    #[tokio::test]
4430    async fn test_user_content_type_overrides_inferred() {
4431        let (port, mut rx, token) = setup_consumer_on_free_port("/override-ct").await;
4432
4433        let client = reqwest::Client::new();
4434        let send_fut = client
4435            .get(format!("http://127.0.0.1:{port}/override-ct"))
4436            .send();
4437
4438        let (http_result, _) = tokio::join!(send_fut, async {
4439            if let Some(mut envelope) = rx.recv().await {
4440                envelope.exchange.input.body =
4441                    camel_component_api::Body::Json(serde_json::json!({"ok": true}));
4442                envelope.exchange.input.set_header(
4443                    "Content-Type",
4444                    serde_json::Value::String("text/html".to_string()),
4445                );
4446                if let Some(reply_tx) = envelope.reply_tx {
4447                    let _ = reply_tx.send(Ok(envelope.exchange));
4448                }
4449            }
4450        });
4451
4452        let resp = http_result.unwrap();
4453        assert_eq!(resp.status().as_u16(), 200);
4454        let ct = resp
4455            .headers()
4456            .get("content-type")
4457            .expect("Content-Type header should be present");
4458        assert_eq!(
4459            ct, "text/html",
4460            "User-set Content-Type should take precedence over inferred type"
4461        );
4462
4463        token.cancel();
4464    }
4465
4466    #[tokio::test]
4467    async fn test_user_content_type_with_bytes_body() {
4468        let (port, mut rx, token) = setup_consumer_on_free_port("/bytes-ct").await;
4469
4470        let client = reqwest::Client::new();
4471        let send_fut = client
4472            .get(format!("http://127.0.0.1:{port}/bytes-ct"))
4473            .send();
4474
4475        let (http_result, _) = tokio::join!(send_fut, async {
4476            if let Some(mut envelope) = rx.recv().await {
4477                envelope.exchange.input.body =
4478                    camel_component_api::Body::Bytes(bytes::Bytes::from_static(b"{\"ok\":true}"));
4479                envelope.exchange.input.set_header(
4480                    "Content-Type",
4481                    serde_json::Value::String("application/json".to_string()),
4482                );
4483                if let Some(reply_tx) = envelope.reply_tx {
4484                    let _ = reply_tx.send(Ok(envelope.exchange));
4485                }
4486            }
4487        });
4488
4489        let resp = http_result.unwrap();
4490        assert_eq!(resp.status().as_u16(), 200);
4491        let ct = resp
4492            .headers()
4493            .get("content-type")
4494            .expect("Content-Type header should be present for Bytes body with user header");
4495        assert_eq!(
4496            ct, "application/json",
4497            "User Content-Type should be sent for Bytes body"
4498        );
4499
4500        token.cancel();
4501    }
4502
4503    // -----------------------------------------------------------------------
4504    // Server monitor tests (GRL-005)
4505    // -----------------------------------------------------------------------
4506
4507    #[tokio::test]
4508    async fn monitor_task_silent_on_clean_exit() {
4509        let handle: tokio::task::JoinHandle<()> = tokio::spawn(async {});
4510        // Clean exit should complete without panicking or logging errors
4511        monitor_axum_task(handle, "127.0.0.1:0".to_string()).await;
4512    }
4513
4514    #[tokio::test]
4515    async fn monitor_task_handles_panicked_task() {
4516        let handle: tokio::task::JoinHandle<()> = tokio::spawn(async {
4517            panic!("simulated server crash");
4518        });
4519        // Should complete without panicking even though the inner task panicked
4520        monitor_axum_task(handle, "127.0.0.1:9999".to_string()).await;
4521    }
4522
4523    // -----------------------------------------------------------------------
4524    // Credential redaction tests
4525    // -----------------------------------------------------------------------
4526
4527    #[test]
4528    fn http_auth_basic_debug_redacts_password() {
4529        let auth = HttpAuth::Basic {
4530            username: "admin".to_string(),
4531            password: "hunter2".to_string(),
4532        };
4533        let debug = format!("{:?}", auth);
4534        assert!(
4535            !debug.contains("hunter2"),
4536            "password must be redacted: {debug}"
4537        );
4538        assert!(debug.contains("admin"), "username should appear: {debug}");
4539    }
4540
4541    #[test]
4542    fn http_auth_bearer_debug_redacts_token() {
4543        let auth = HttpAuth::Bearer {
4544            token: "eyJhbGciOiJIUzI1NiJ9.secret".to_string(),
4545        };
4546        let debug = format!("{:?}", auth);
4547        assert!(
4548            !debug.contains("eyJhbGci"),
4549            "token must be redacted: {debug}"
4550        );
4551    }
4552
4553    #[test]
4554    fn http_auth_none_debug_shows_variant() {
4555        let debug = format!("{:?}", HttpAuth::None);
4556        assert!(
4557            debug.contains("None"),
4558            "None variant should appear: {debug}"
4559        );
4560    }
4561
4562    #[test]
4563    fn http_endpoint_config_debug_redacts_auth_credentials() {
4564        let config = HttpEndpointConfig::from_uri(
4565            "http://localhost/api?authMethod=Basic&authUsername=admin&authPassword=secret123",
4566        )
4567        .unwrap();
4568        let debug = format!("{:?}", config);
4569        assert!(
4570            !debug.contains("secret123"),
4571            "password must be redacted in HttpEndpointConfig debug: {debug}"
4572        );
4573    }
4574
4575    // -----------------------------------------------------------------------
4576    // Static file serving tests (Task 5)
4577    // -----------------------------------------------------------------------
4578
4579    use crate::registry::{HttpRouteRegistry, MountMode, StaticMount};
4580    use tower_http::services::ServeDir;
4581
4582    fn make_test_registry() -> HttpRouteRegistry {
4583        HttpRouteRegistry::new()
4584    }
4585
4586    fn make_test_state(registry: HttpRouteRegistry) -> AppState {
4587        AppState {
4588            registry,
4589            max_request_body: 2 * 1024 * 1024,
4590            max_response_body: 10 * 1024 * 1024,
4591            inflight: Arc::new(tokio::sync::Semaphore::new(1024)),
4592        }
4593    }
4594
4595    #[allow(clippy::await_holding_lock)]
4596    #[tokio::test]
4597    async fn test_static_file_serving_serves_file_contents() {
4598        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
4599        ServerRegistry::reset();
4600
4601        // Create temp dir with test files
4602        let temp_dir =
4603            std::env::temp_dir().join(format!("http_static_test_{}", std::process::id()));
4604        std::fs::create_dir_all(&temp_dir).unwrap();
4605        std::fs::write(temp_dir.join("hello.txt"), "Hello, static world!").unwrap();
4606        std::fs::write(temp_dir.join("style.css"), "body { color: red; }").unwrap();
4607
4608        let canonical_dir = std::fs::canonicalize(&temp_dir).unwrap();
4609
4610        let registry = make_test_registry();
4611        let serve_dir = ServeDir::new(&canonical_dir)
4612            .precompressed_gzip()
4613            .precompressed_br()
4614            .append_index_html_on_directories(true);
4615
4616        let mount = StaticMount {
4617            mount_path: "/".to_string(),
4618            mode: MountMode::Static,
4619            dir: canonical_dir.clone(),
4620            cache_control: "public, max-age=3600".to_string(),
4621            error_pages: std::collections::HashMap::new(),
4622            serve_dir,
4623        };
4624        registry.register_static_mount(mount).await.unwrap();
4625
4626        let state = make_test_state(registry);
4627
4628        // Test serving hello.txt
4629        let req = Request::builder()
4630            .uri("/hello.txt")
4631            .body(AxumBody::empty())
4632            .unwrap();
4633        let resp = static_dispatch::dispatch_static(&state, req, "/hello.txt").await;
4634        assert_eq!(resp.status(), StatusCode::OK);
4635        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4636            .await
4637            .unwrap();
4638        assert_eq!(&body[..], b"Hello, static world!");
4639
4640        // Test serving style.css
4641        let req = Request::builder()
4642            .uri("/style.css")
4643            .body(AxumBody::empty())
4644            .unwrap();
4645        let resp = static_dispatch::dispatch_static(&state, req, "/style.css").await;
4646        assert_eq!(resp.status(), StatusCode::OK);
4647        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4648            .await
4649            .unwrap();
4650        assert_eq!(&body[..], b"body { color: red; }");
4651
4652        // Test 404 for non-existent file
4653        let req = Request::builder()
4654            .uri("/missing.txt")
4655            .body(AxumBody::empty())
4656            .unwrap();
4657        let resp = static_dispatch::dispatch_static(&state, req, "/missing.txt").await;
4658        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4659
4660        // Cleanup
4661        std::fs::remove_dir_all(&temp_dir).ok();
4662    }
4663
4664    #[allow(clippy::await_holding_lock)]
4665    #[tokio::test]
4666    async fn test_spa_fallback_serves_index_for_unknown_paths() {
4667        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
4668        ServerRegistry::reset();
4669
4670        let temp_dir = std::env::temp_dir().join(format!("http_spa_test_{}", std::process::id()));
4671        std::fs::create_dir_all(&temp_dir).unwrap();
4672        std::fs::write(temp_dir.join("index.html"), "<h1>SPA App</h1>").unwrap();
4673        std::fs::write(temp_dir.join("app.js"), "console.log('app')").unwrap();
4674
4675        let canonical_dir = std::fs::canonicalize(&temp_dir).unwrap();
4676
4677        let registry = make_test_registry();
4678        let serve_dir = ServeDir::new(&canonical_dir)
4679            .precompressed_gzip()
4680            .precompressed_br()
4681            .append_index_html_on_directories(true);
4682
4683        let mount = StaticMount {
4684            mount_path: "/".to_string(),
4685            mode: MountMode::Spa,
4686            dir: canonical_dir.clone(),
4687            cache_control: "public, max-age=0".to_string(),
4688            error_pages: std::collections::HashMap::new(),
4689            serve_dir,
4690        };
4691        // Register as SPA mount
4692        registry.register_static_mount(mount).await.unwrap();
4693
4694        let state = make_test_state(registry);
4695
4696        // SPA fallback: GET /dashboard with Accept: text/html → index.html
4697        let req = Request::builder()
4698            .method("GET")
4699            .uri("/dashboard")
4700            .header("Accept", "text/html")
4701            .body(AxumBody::empty())
4702            .unwrap();
4703        let resp = static_dispatch::dispatch_static(&state, req, "/dashboard").await;
4704        assert_eq!(resp.status(), StatusCode::OK);
4705        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4706            .await
4707            .unwrap();
4708        assert_eq!(&body[..], b"<h1>SPA App</h1>");
4709
4710        // Static file still works: GET /app.js
4711        let req = Request::builder()
4712            .method("GET")
4713            .uri("/app.js")
4714            .body(AxumBody::empty())
4715            .unwrap();
4716        let resp = static_dispatch::dispatch_static(&state, req, "/app.js").await;
4717        assert_eq!(resp.status(), StatusCode::OK);
4718        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4719            .await
4720            .unwrap();
4721        assert_eq!(&body[..], b"console.log('app')");
4722
4723        // No SPA fallback for JSON accept → 404
4724        let req = Request::builder()
4725            .method("GET")
4726            .uri("/api/data")
4727            .header("Accept", "application/json")
4728            .body(AxumBody::empty())
4729            .unwrap();
4730        let resp = static_dispatch::dispatch_static(&state, req, "/api/data").await;
4731        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4732
4733        // No SPA fallback for file extensions → 404
4734        let req = Request::builder()
4735            .method("GET")
4736            .uri("/style.css")
4737            .header("Accept", "text/html")
4738            .body(AxumBody::empty())
4739            .unwrap();
4740        let resp = static_dispatch::dispatch_static(&state, req, "/style.css").await;
4741        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4742
4743        // Cleanup
4744        std::fs::remove_dir_all(&temp_dir).ok();
4745    }
4746
4747    #[allow(clippy::await_holding_lock)]
4748    #[tokio::test]
4749    async fn test_error_page_mapping_serves_custom_404() {
4750        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
4751        ServerRegistry::reset();
4752
4753        let temp_dir = std::env::temp_dir().join(format!("http_error_test_{}", std::process::id()));
4754        let errors_dir = temp_dir.join("errors");
4755        std::fs::create_dir_all(&errors_dir).unwrap();
4756        std::fs::write(temp_dir.join("index.html"), "<h1>Home</h1>").unwrap();
4757        std::fs::write(errors_dir.join("404.html"), "<h1>Custom 404</h1>").unwrap();
4758
4759        let canonical_dir = std::fs::canonicalize(&temp_dir).unwrap();
4760        let canonical_404 = std::fs::canonicalize(errors_dir.join("404.html")).unwrap();
4761
4762        let registry = make_test_registry();
4763        let serve_dir = ServeDir::new(&canonical_dir)
4764            .precompressed_gzip()
4765            .precompressed_br()
4766            .append_index_html_on_directories(true);
4767
4768        let mut error_pages = std::collections::HashMap::new();
4769        error_pages.insert(404, canonical_404);
4770
4771        let mount = StaticMount {
4772            mount_path: "/".to_string(),
4773            mode: MountMode::Static,
4774            dir: canonical_dir.clone(),
4775            cache_control: "public, max-age=0".to_string(),
4776            error_pages,
4777            serve_dir,
4778        };
4779        registry.register_static_mount(mount).await.unwrap();
4780
4781        let state = make_test_state(registry);
4782
4783        // Request non-existent file → custom 404 page
4784        let req = Request::builder()
4785            .method("GET")
4786            .uri("/missing.html")
4787            .body(AxumBody::empty())
4788            .unwrap();
4789        let resp = static_dispatch::dispatch_static(&state, req, "/missing.html").await;
4790        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4791        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4792            .await
4793            .unwrap();
4794        assert_eq!(&body[..], b"<h1>Custom 404</h1>");
4795
4796        // Existing file still works
4797        let req = Request::builder()
4798            .method("GET")
4799            .uri("/index.html")
4800            .body(AxumBody::empty())
4801            .unwrap();
4802        let resp = static_dispatch::dispatch_static(&state, req, "/index.html").await;
4803        assert_eq!(resp.status(), StatusCode::OK);
4804        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4805            .await
4806            .unwrap();
4807        assert_eq!(&body[..], b"<h1>Home</h1>");
4808
4809        // Cleanup
4810        std::fs::remove_dir_all(&temp_dir).ok();
4811    }
4812}