Skip to main content

camel_component_http/
lib.rs

1pub mod auth;
2pub mod bundle;
3pub mod config;
4pub mod health;
5pub mod registry;
6pub mod static_config;
7pub mod static_dispatch;
8pub mod static_endpoint;
9use crate::config::parse_ok_status_code_range;
10pub use bundle::HttpBundle;
11pub use bundle::HttpStaticBundle;
12pub use config::HttpConfig;
13pub use health::HttpHealthCheck;
14pub use registry::HttpRouteRegistry;
15pub use static_config::HttpStaticConfig;
16pub use static_endpoint::{HttpStaticComponent, HttpStaticConsumer, HttpStaticEndpoint};
17
18use std::collections::HashMap;
19use std::future::Future;
20use std::net::IpAddr;
21use std::pin::Pin;
22use std::sync::{Arc, Mutex, OnceLock};
23use std::task::{Context, Poll};
24use std::time::Duration;
25
26use tokio::sync::OnceCell;
27use tower::Layer;
28use tower::Service;
29use tracing::debug;
30
31use axum::body::BodyDataStream;
32use camel_auth::bearer_token_layer::BearerTokenLayer;
33use camel_auth::oauth2::TokenProvider;
34use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, StreamBody, StreamMetadata};
35use camel_component_api::{Component, Consumer, Endpoint, ProducerContext, RuntimeObservability};
36use camel_component_api::{UriComponents, UriConfig, parse_uri};
37use futures::TryStreamExt;
38use futures::stream::BoxStream;
39
40// ---------------------------------------------------------------------------
41// HttpEndpointConfig
42// ---------------------------------------------------------------------------
43
44/// Configuration for an HTTP client (producer) endpoint.
45///
46/// # Memory Limits
47///
48/// HTTP operations enforce conservative memory limits to prevent denial-of-service
49/// attacks from untrusted network sources. These limits are significantly lower than
50/// file component limits (100MB) because HTTP typically handles API responses rather
51/// than large file transfers, and clients may be untrusted.
52///
53/// ## Default Limits
54///
55/// - **HTTP client body**: 10MB (typical API responses)
56/// - **HTTP server request**: 2MB (untrusted network input - see `HttpServerConfig`)
57/// - **HTTP server response**: 10MB (same as client - see `HttpServerConfig`)
58///
59/// ## Rationale
60///
61/// The 10MB limit for HTTP client responses is appropriate for most API interactions
62/// while providing protection against:
63/// - Malicious servers sending oversized responses
64/// - Runaway processes generating unexpectedly large payloads
65/// - Memory exhaustion attacks
66///
67/// The 2MB server request limit is even more conservative because it handles input
68/// from potentially untrusted clients on the public internet.
69///
70/// ## Overriding Limits
71///
72/// Override the default client body limit using the `maxBodySize` URI parameter:
73///
74/// ```text
75/// http://api.example.com/large-data?maxBodySize=52428800
76/// ```
77///
78/// For server endpoints, use `maxRequestBody` and `maxResponseBody` parameters:
79///
80/// ```text
81/// http://0.0.0.0:8080/upload?maxRequestBody=52428800
82/// ```
83///
84/// ## Behavior When Exceeded
85///
86/// When a body exceeds the configured limit:
87/// - An error is returned immediately
88/// - No memory is exhausted - the limit is checked before allocation
89/// - The HTTP connection is terminated cleanly
90///
91/// ## Security Considerations
92///
93/// HTTP endpoints should be treated with more caution than file endpoints because:
94/// - Clients may be unknown and untrusted
95/// - Network traffic can be spoofed or malicious
96/// - DoS attacks often exploit unbounded resource consumption
97///
98/// Only increase limits when you control both ends of the connection or when
99/// business requirements demand larger payloads.
100#[derive(Debug, Clone)]
101pub struct HttpEndpointConfig {
102    pub base_url: String,
103    pub http_method: Option<String>,
104    pub throw_exception_on_failure: bool,
105    pub ok_status_code_range: (u16, u16),
106    pub response_timeout: Option<Duration>,
107    pub query_params: HashMap<String, String>,
108    pub allow_private_ips: bool,
109    pub blocked_hosts: Vec<String>,
110    pub max_body_size: usize,
111    pub read_timeout_ms: u64,
112    pub max_response_bytes: usize,
113    pub auth: HttpAuth,
114    pub token_provider: Option<Arc<dyn TokenProvider>>,
115    pub user_agent: Option<String>,
116    pub cookie_handling: CookieHandling,
117    pub bridge_endpoint: bool,
118    pub connection_close: bool,
119    pub skip_request_headers: Vec<String>,
120    pub skip_response_headers: Vec<String>,
121}
122
123#[derive(Clone, PartialEq)]
124pub enum HttpAuth {
125    None,
126    Basic { username: String, password: String },
127    Bearer { token: String },
128}
129
130impl std::fmt::Debug for HttpAuth {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        match self {
133            HttpAuth::None => f.write_str("None"),
134            HttpAuth::Basic { username, .. } => f
135                .debug_struct("Basic")
136                .field("username", username)
137                .field("password", &"***")
138                .finish(),
139            HttpAuth::Bearer { .. } => f.debug_struct("Bearer").field("token", &"***").finish(),
140        }
141    }
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub enum CookieHandling {
146    Disabled,
147    InMemory,
148}
149
150/// Camel options that should NOT be forwarded as HTTP query params
151const HTTP_CAMEL_OPTIONS: &[&str] = &[
152    "httpMethod",
153    "throwExceptionOnFailure",
154    "okStatusCodeRange",
155    "followRedirects",
156    "connectTimeout",
157    "responseTimeout",
158    "allowPrivateIps",
159    "blockedHosts",
160    "maxBodySize",
161    "readTimeout",
162    "maxResponseBytes",
163    "authMethod",
164    "authUsername",
165    "authPassword",
166    "authBearerToken",
167    "userAgent",
168    "cookieHandling",
169    "bridgeEndpoint",
170    "connectionClose",
171    "skipRequestHeaders",
172    "skipResponseHeaders",
173];
174
175impl UriConfig for HttpEndpointConfig {
176    /// Returns "http" as the primary scheme (also accepts "https")
177    fn scheme() -> &'static str {
178        "http"
179    }
180
181    fn from_uri(uri: &str) -> Result<Self, CamelError> {
182        let parts = parse_uri(uri)?;
183        Self::from_components(parts)
184    }
185
186    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
187        // Validate scheme - accept both http and https
188        if parts.scheme != "http" && parts.scheme != "https" {
189            return Err(CamelError::InvalidUri(format!(
190                "expected scheme 'http' or 'https', got '{}'",
191                parts.scheme
192            )));
193        }
194
195        // Construct base_url from scheme + path
196        // e.g., "http://localhost:8080/api" from scheme "http" and path "//localhost:8080/api"
197        let base_url = format!("{}:{}", parts.scheme, parts.path);
198
199        let http_method = parts.params.get("httpMethod").cloned();
200
201        let throw_exception_on_failure = match parts.params.get("throwExceptionOnFailure") {
202            Some(v) => parse_bool_param_http(v).map_err(|e| {
203                CamelError::InvalidUri(format!("invalid value for throwExceptionOnFailure: {e}"))
204            })?,
205            None => true,
206        };
207
208        // Parse status code range from "start-end" format (e.g., "200-299")
209        let ok_status_code_range = match parts.params.get("okStatusCodeRange") {
210            Some(v) => parse_ok_status_code_range(v)?,
211            None => (200, 299),
212        };
213
214        let response_timeout = match parts.params.get("responseTimeout") {
215            Some(v) => Some(v.parse::<u64>().map(Duration::from_millis).map_err(|e| {
216                CamelError::InvalidUri(format!("invalid value for responseTimeout: {e}"))
217            })?),
218            None => None,
219        };
220
221        // SSRF protection settings
222        let allow_private_ips = match parts.params.get("allowPrivateIps") {
223            Some(v) => parse_bool_param_http(v).map_err(|e| {
224                CamelError::InvalidUri(format!("invalid value for allowPrivateIps: {e}"))
225            })?,
226            None => false, // Default: block private IPs
227        };
228
229        // Parse comma-separated blocked hosts
230        let blocked_hosts = parts
231            .params
232            .get("blockedHosts")
233            .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
234            .unwrap_or_default();
235
236        let max_body_size = match parts.params.get("maxBodySize") {
237            Some(v) => v.parse::<usize>().map_err(|e| {
238                CamelError::InvalidUri(format!("invalid value for maxBodySize: {e}"))
239            })?,
240            None => 10 * 1024 * 1024, // Default: 10MB
241        };
242
243        let read_timeout_ms = match parts.params.get("readTimeout") {
244            Some(v) => v.parse::<u64>().map_err(|e| {
245                CamelError::InvalidUri(format!("invalid value for readTimeout: {e}"))
246            })?,
247            None => 30_000, // Default: 30s
248        };
249
250        let max_response_bytes = match parts.params.get("maxResponseBytes") {
251            Some(v) => v.parse::<usize>().map_err(|e| {
252                CamelError::InvalidUri(format!("invalid value for maxResponseBytes: {e}"))
253            })?,
254            None => 10 * 1024 * 1024, // Default: 10MB
255        };
256
257        let auth = parse_auth_from_params(&parts.params)?;
258
259        let user_agent = parts.params.get("userAgent").cloned();
260
261        let cookie_handling = match parts.params.get("cookieHandling") {
262            Some(v) if v.eq_ignore_ascii_case("inmemory") => CookieHandling::InMemory,
263            Some(v) if v.eq_ignore_ascii_case("disabled") => CookieHandling::Disabled,
264            Some(v) => {
265                return Err(CamelError::InvalidUri(format!(
266                    "invalid value for cookieHandling: {v} (expected Disabled or InMemory)"
267                )));
268            }
269            None => CookieHandling::Disabled,
270        };
271
272        let bridge_endpoint = match parts.params.get("bridgeEndpoint") {
273            Some(v) => parse_bool_param_http(v).map_err(|e| {
274                CamelError::InvalidUri(format!("invalid value for bridgeEndpoint: {e}"))
275            })?,
276            None => false,
277        };
278
279        let connection_close = match parts.params.get("connectionClose") {
280            Some(v) => parse_bool_param_http(v).map_err(|e| {
281                CamelError::InvalidUri(format!("invalid value for connectionClose: {e}"))
282            })?,
283            None => false,
284        };
285
286        let skip_request_headers = parts
287            .params
288            .get("skipRequestHeaders")
289            .map(|v| {
290                v.split(',')
291                    .map(str::trim)
292                    .filter(|s| !s.is_empty())
293                    .map(|s| s.to_ascii_lowercase())
294                    .collect::<Vec<_>>()
295            })
296            .unwrap_or_default();
297
298        let skip_response_headers = parts
299            .params
300            .get("skipResponseHeaders")
301            .map(|v| {
302                v.split(',')
303                    .map(str::trim)
304                    .filter(|s| !s.is_empty())
305                    .map(|s| s.to_ascii_lowercase())
306                    .collect::<Vec<_>>()
307            })
308            .unwrap_or_default();
309
310        // Collect remaining params (not Camel options) as query params
311        let query_params: HashMap<String, String> = parts
312            .params
313            .into_iter()
314            .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
315            .collect();
316
317        Ok(Self {
318            base_url,
319            http_method,
320            throw_exception_on_failure,
321            ok_status_code_range,
322            response_timeout,
323            query_params,
324            allow_private_ips,
325            blocked_hosts,
326            max_body_size,
327            read_timeout_ms,
328            max_response_bytes,
329            auth,
330            token_provider: None,
331            user_agent,
332            cookie_handling,
333            bridge_endpoint,
334            connection_close,
335            skip_request_headers,
336            skip_response_headers,
337        })
338    }
339}
340
341fn parse_auth_from_params(params: &HashMap<String, String>) -> Result<HttpAuth, CamelError> {
342    let Some(method) = params.get("authMethod") else {
343        return Ok(HttpAuth::None);
344    };
345
346    if method.eq_ignore_ascii_case("none") {
347        return Ok(HttpAuth::None);
348    }
349
350    if method.eq_ignore_ascii_case("basic") {
351        let username = params.get("authUsername").cloned().ok_or_else(|| {
352            CamelError::InvalidUri("authUsername is required for authMethod=Basic".to_string())
353        })?;
354        let password = params.get("authPassword").cloned().ok_or_else(|| {
355            CamelError::InvalidUri("authPassword is required for authMethod=Basic".to_string())
356        })?;
357        return Ok(HttpAuth::Basic { username, password });
358    }
359
360    if method.eq_ignore_ascii_case("bearer") {
361        let token = params.get("authBearerToken").cloned().ok_or_else(|| {
362            CamelError::InvalidUri("authBearerToken is required for authMethod=Bearer".to_string())
363        })?;
364        return Ok(HttpAuth::Bearer { token });
365    }
366
367    Err(CamelError::InvalidUri(format!(
368        "invalid value for authMethod: {method} (expected None, Basic, or Bearer)"
369    )))
370}
371
372fn parse_bool_param_http(value: &str) -> Result<bool, CamelError> {
373    match value.to_ascii_lowercase().as_str() {
374        "true" | "1" | "yes" => Ok(true),
375        "false" | "0" | "no" => Ok(false),
376        _ => Err(CamelError::InvalidUri(format!(
377            "invalid boolean value: '{value}'"
378        ))),
379    }
380}
381
382impl HttpEndpointConfig {
383    pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
384        let parts = parse_uri(uri)?;
385        let mut endpoint = Self::from_components(parts.clone())?;
386        if endpoint.response_timeout.is_none() {
387            endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
388        }
389        if !parts.params.contains_key("allowPrivateIps") {
390            endpoint.allow_private_ips = config.allow_private_ips;
391        }
392        if !parts.params.contains_key("blockedHosts") {
393            endpoint.blocked_hosts = config.blocked_hosts.clone();
394        }
395        if !parts.params.contains_key("maxBodySize") {
396            endpoint.max_body_size = config.max_body_size;
397        }
398        if !parts.params.contains_key("readTimeout") {
399            endpoint.read_timeout_ms = config.read_timeout_ms;
400        }
401        if !parts.params.contains_key("maxResponseBytes") {
402            endpoint.max_response_bytes = config.max_response_bytes;
403        }
404        if !parts.params.contains_key("okStatusCodeRange")
405            && let Some(range) = &config.ok_status_code_range
406        {
407            endpoint.ok_status_code_range = parse_ok_status_code_range(range)?;
408        }
409
410        Ok(endpoint)
411    }
412}
413
414// ---------------------------------------------------------------------------
415// HttpServerConfig
416// ---------------------------------------------------------------------------
417
418/// Configuration for an HTTP server (consumer) endpoint.
419#[derive(Debug, Clone)]
420pub struct HttpServerConfig {
421    /// Bind address, e.g. "0.0.0.0" or "127.0.0.1".
422    pub host: String,
423    /// TCP port to listen on.
424    pub port: u16,
425    /// URL path this consumer handles, e.g. "/orders".
426    pub path: String,
427    /// Maximum request body size in bytes.
428    pub max_request_body: usize,
429    /// Maximum response body size for materializing streams in bytes.
430    pub max_response_body: usize,
431    /// Maximum number of in-flight requests handled concurrently by this server.
432    pub max_inflight_requests: usize,
433}
434
435impl UriConfig for HttpServerConfig {
436    /// Returns "http" as the primary scheme (also accepts "https")
437    fn scheme() -> &'static str {
438        "http"
439    }
440
441    fn from_uri(uri: &str) -> Result<Self, CamelError> {
442        let parts = parse_uri(uri)?;
443        Self::from_components(parts)
444    }
445
446    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
447        // Validate scheme - accept both http and https
448        if parts.scheme != "http" && parts.scheme != "https" {
449            return Err(CamelError::InvalidUri(format!(
450                "expected scheme 'http' or 'https', got '{}'",
451                parts.scheme
452            )));
453        }
454
455        // parts.path is everything after the scheme colon, e.g. "//0.0.0.0:8080/orders"
456        // Strip leading "//"
457        let authority_and_path = parts.path.trim_start_matches('/');
458
459        // Split on the first "/" to separate "host:port" from "/path"
460        let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
461            (&authority_and_path[..idx], &authority_and_path[idx..])
462        } else {
463            (authority_and_path, "/")
464        };
465
466        let path = if path_suffix.is_empty() {
467            "/"
468        } else {
469            path_suffix
470        }
471        .to_string();
472
473        // Parse host:port from authority
474        let (host, port) = if let Some(colon) = authority.rfind(':') {
475            let port_str = &authority[colon + 1..];
476            match port_str.parse::<u16>() {
477                Ok(p) => (authority[..colon].to_string(), p),
478                Err(_) => {
479                    return Err(CamelError::InvalidUri(format!(
480                        "invalid port '{}' in authority",
481                        port_str
482                    )));
483                }
484            }
485        } else {
486            // Default port based on scheme: 443 for https, 80 for http
487            let default_port = if parts.scheme == "https" { 443 } else { 80 };
488            (authority.to_string(), default_port)
489        };
490
491        let max_request_body = parts
492            .params
493            .get("maxRequestBody")
494            .and_then(|v| v.parse::<usize>().ok())
495            .unwrap_or(2 * 1024 * 1024); // Default: 2MB
496
497        let max_response_body = parts
498            .params
499            .get("maxResponseBody")
500            .and_then(|v| v.parse::<usize>().ok())
501            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
502
503        let max_inflight_requests = parts
504            .params
505            .get("maxInflightRequests")
506            .and_then(|v| v.parse::<usize>().ok())
507            .unwrap_or(1024);
508
509        Ok(Self {
510            host,
511            port,
512            path,
513            max_request_body,
514            max_response_body,
515            max_inflight_requests,
516        })
517    }
518}
519
520impl HttpServerConfig {
521    pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
522        let parts = parse_uri(uri)?;
523        let mut server = Self::from_components(parts.clone())?;
524        if !parts.params.contains_key("maxRequestBody") {
525            server.max_request_body = config.max_request_body;
526        }
527        if !parts.params.contains_key("maxResponseBody") {
528            // Default max_response_body is 10MB via HttpConfig::default().max_body_size.
529            server.max_response_body = config.max_body_size;
530        }
531        Ok(server)
532    }
533}
534
535// ---------------------------------------------------------------------------
536// RequestEnvelope / HttpReply
537// ---------------------------------------------------------------------------
538
539/// Body of the HTTP response: already-materialized bytes or a lazy stream.
540///
541/// **Internal plumbing** — subject to change without notice.
542pub enum HttpReplyBody {
543    Bytes(bytes::Bytes),
544    Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
545}
546
547/// An inbound HTTP request sent from the Axum dispatch handler to an
548/// `HttpConsumer` receive loop.
549///
550/// **Internal plumbing** — subject to change without notice.
551pub struct RequestEnvelope {
552    pub method: String,
553    pub path: String,
554    pub query: String,
555    pub headers: http::HeaderMap,
556    pub body: StreamBody,
557    pub reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
558}
559
560/// The HTTP response that `HttpConsumer` sends back to the Axum handler.
561///
562/// **Internal plumbing** — subject to change without notice.
563pub struct HttpReply {
564    pub status: u16,
565    pub headers: Vec<(String, String)>,
566    pub body: HttpReplyBody,
567}
568
569// ---------------------------------------------------------------------------
570// HttpRouteRegistry / ServerRegistry
571// ---------------------------------------------------------------------------
572
573type ServerKey = (String, u16);
574
575/// Handle to a running Axum server on one interface/port.
576struct ServerHandle {
577    registry: HttpRouteRegistry,
578    max_request_body: usize,
579    max_response_body: usize,
580    max_inflight_requests: usize,
581}
582
583/// Process-global registry mapping (host, port) → running Axum server handle.
584pub struct ServerRegistry {
585    inner: Mutex<HashMap<ServerKey, Arc<OnceCell<ServerHandle>>>>,
586}
587
588impl ServerRegistry {
589    /// Returns the global singleton.
590    pub fn global() -> &'static Self {
591        static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
592        INSTANCE.get_or_init(|| ServerRegistry {
593            inner: Mutex::new(HashMap::new()),
594        })
595    }
596
597    /// Returns route registry for `port`, spawning new Axum server if
598    /// none is running on that port yet.
599    pub async fn get_or_spawn(
600        &'static self,
601        host: &str,
602        port: u16,
603        max_request_body: usize,
604        max_response_body: usize,
605        max_inflight_requests: usize,
606    ) -> Result<HttpRouteRegistry, CamelError> {
607        let host_owned = host.to_string();
608
609        let cell = {
610            let mut guard = self.inner.lock().map_err(|_| {
611                CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
612            })?;
613            let key = (host.to_string(), port);
614            guard
615                .entry(key)
616                .or_insert_with(|| Arc::new(OnceCell::new()))
617                .clone()
618        };
619
620        if let Some(existing) = cell.get()
621            && existing.max_request_body != max_request_body
622        {
623            return Err(CamelError::EndpointCreationFailed(format!(
624                "incompatible maxRequestBody for shared server (host={host}, port={port}): {} vs {}",
625                existing.max_request_body, max_request_body
626            )));
627        }
628
629        if let Some(existing) = cell.get()
630            && existing.max_response_body != max_response_body
631        {
632            return Err(CamelError::EndpointCreationFailed(format!(
633                "incompatible maxResponseBody for shared server (host={host}, port={port}): {} vs {}",
634                existing.max_response_body, max_response_body
635            )));
636        }
637
638        if let Some(existing) = cell.get()
639            && existing.max_inflight_requests != max_inflight_requests
640        {
641            return Err(CamelError::EndpointCreationFailed(format!(
642                "incompatible maxInflightRequests for shared server (host={host}, port={port}): {} vs {}",
643                existing.max_inflight_requests, max_inflight_requests
644            )));
645        }
646
647        let handle = cell
648            .get_or_try_init(|| async {
649                let addr = format!("{host_owned}:{port}");
650                let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
651                    CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
652                })?;
653                let registry = HttpRouteRegistry::new();
654                let inflight = Arc::new(tokio::sync::Semaphore::new(max_inflight_requests));
655                let task = tokio::spawn(run_axum_server(
656                    listener,
657                    registry.clone(),
658                    max_request_body,
659                    max_response_body,
660                    Arc::clone(&inflight),
661                ));
662                let addr_for_monitor = format!("{host_owned}:{port}");
663                tokio::spawn(monitor_axum_task(task, addr_for_monitor));
664                Ok::<ServerHandle, CamelError>(ServerHandle {
665                    registry,
666                    max_request_body,
667                    max_response_body,
668                    max_inflight_requests,
669                })
670            })
671            .await?;
672
673        Ok(handle.registry.clone())
674    }
675
676    /// Reset the global registry — **test-only**.
677    ///
678    /// Clears all registered server handles so that tests can start from a clean
679    /// state. This is intentionally `#[cfg(test)]` because the registry is a
680    /// process-global singleton in production and resetting it would break
681    /// running servers.
682    #[cfg(test)]
683    pub fn reset() {
684        let instance = Self::global();
685        let mut guard = instance
686            .inner
687            .lock()
688            .expect("ServerRegistry lock poisoned during test reset");
689        guard.clear();
690    }
691}
692
693// ---------------------------------------------------------------------------
694// Axum server
695// ---------------------------------------------------------------------------
696
697use axum::{
698    Router,
699    body::Body as AxumBody,
700    extract::{Request, State},
701    http::{Response, StatusCode},
702    response::IntoResponse,
703};
704
705#[derive(Clone)]
706pub(crate) struct AppState {
707    registry: HttpRouteRegistry,
708    max_request_body: usize,
709    max_response_body: usize,
710    inflight: Arc<tokio::sync::Semaphore>,
711}
712
713async fn run_axum_server(
714    listener: tokio::net::TcpListener,
715    registry: HttpRouteRegistry,
716    max_request_body: usize,
717    max_response_body: usize,
718    inflight: Arc<tokio::sync::Semaphore>,
719) {
720    let state = AppState {
721        registry,
722        max_request_body,
723        max_response_body,
724        inflight,
725    };
726    let app = Router::new().fallback(dispatch_handler).with_state(state);
727
728    axum::serve(listener, app).await.unwrap_or_else(|e| {
729        tracing::error!(error = %e, "Axum server error");
730    });
731}
732
733/// Monitors an Axum server task and emits a structured error event if it
734/// exits unexpectedly.
735///
736/// # Limitations
737/// The HTTP server is shared across all routes on a port. Full per-route
738/// CrashNotification propagation is deferred — this provides observable
739/// structured logging as a first guard.
740async fn monitor_axum_task(handle: tokio::task::JoinHandle<()>, addr: String) {
741    match handle.await {
742        Ok(()) => {
743            // Clean exit (process shutdown or normal stop)
744        }
745        Err(join_err) => {
746            tracing::error!(
747                addr = %addr,
748                error = %join_err,
749                "Axum server task exited unexpectedly — all routes on this port are now dead"
750            );
751        }
752    }
753}
754
755async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
756    let path = req.uri().path().to_owned();
757
758    // 1. API match — lookup by path, only consume body if matched
759    let api_sender = {
760        let inner = state.registry.inner.read().await;
761        inner.api_routes.get(&path).cloned()
762    }; // lock released BEFORE any IO
763
764    if let Some(sender) = api_sender {
765        let method = req.method().to_string();
766        let query = req.uri().query().unwrap_or("").to_string();
767        let headers = req.headers().clone();
768
769        // Check Content-Length against limit BEFORE opening the stream
770        let content_length: Option<u64> = headers
771            .get(http::header::CONTENT_LENGTH)
772            .and_then(|v| v.to_str().ok())
773            .and_then(|s| s.parse().ok());
774
775        if let Some(len) = content_length
776            && len > state.max_request_body as u64
777        {
778            return Response::builder()
779                .status(StatusCode::PAYLOAD_TOO_LARGE)
780                .body(AxumBody::from("Request body exceeds configured limit"))
781                .expect("infallible"); // allow-unwrap
782        }
783
784        let _permit = match Arc::clone(&state.inflight).try_acquire_owned() {
785            Ok(permit) => permit,
786            Err(_) => {
787                return Response::builder()
788                    .status(StatusCode::SERVICE_UNAVAILABLE)
789                    .body(AxumBody::from("Service Unavailable"))
790                    .expect("infallible"); // allow-unwrap
791            }
792        };
793
794        // Build StreamBody from Axum body WITHOUT materializing
795        let content_type = headers
796            .get(http::header::CONTENT_TYPE)
797            .and_then(|v| v.to_str().ok())
798            .map(|s| s.to_string());
799
800        let data_stream: BodyDataStream = req.into_body().into_data_stream();
801        let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
802        let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
803
804        let stream_body = StreamBody {
805            stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
806            metadata: StreamMetadata {
807                size_hint: content_length,
808                content_type,
809                origin: None,
810            },
811        };
812
813        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
814        let envelope = RequestEnvelope {
815            method,
816            path,
817            query,
818            headers,
819            body: stream_body,
820            reply_tx,
821        };
822
823        if sender.send(envelope).await.is_err() {
824            return Response::builder()
825                .status(StatusCode::SERVICE_UNAVAILABLE)
826                .body(AxumBody::from("Consumer unavailable"))
827                .expect("infallible"); // allow-unwrap
828        }
829
830        match reply_rx.await {
831            Ok(reply) => {
832                let reply = match reply.body {
833                    HttpReplyBody::Bytes(b)
834                        if exceeds_max_response_body(b.len(), state.max_response_body) =>
835                    {
836                        HttpReply {
837                            status: 500,
838                            headers: vec![],
839                            body: HttpReplyBody::Bytes(bytes::Bytes::from(
840                                "Response body exceeds configured limit",
841                            )),
842                        }
843                    }
844                    _ => reply,
845                };
846
847                let status =
848                    StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
849                let mut builder = Response::builder().status(status);
850                for (k, v) in &reply.headers {
851                    builder = builder.header(k.as_str(), v.as_str());
852                }
853                match reply.body {
854                    HttpReplyBody::Bytes(b) => {
855                        builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
856                            Response::builder()
857                                .status(StatusCode::INTERNAL_SERVER_ERROR)
858                                .body(AxumBody::from("Invalid response headers from consumer"))
859                                .expect("infallible") // allow-unwrap
860                        })
861                    }
862                    HttpReplyBody::Stream(stream) => builder
863                        .body(AxumBody::from_stream(stream))
864                        .unwrap_or_else(|_| {
865                            Response::builder()
866                                .status(StatusCode::INTERNAL_SERVER_ERROR)
867                                .body(AxumBody::from("Invalid response headers from consumer"))
868                                .expect("infallible") // allow-unwrap
869                        }),
870                }
871            }
872            Err(_) => Response::builder()
873                .status(StatusCode::INTERNAL_SERVER_ERROR)
874                .body(AxumBody::from("Pipeline error"))
875                .expect("infallible"), // allow-unwrap
876        }
877    } else {
878        // No API route matched — try static mounts
879        static_dispatch::dispatch_static(&state, req, &path).await
880    }
881}
882
883fn exceeds_max_response_body(len: usize, max: usize) -> bool {
884    len > max
885}
886
887fn title_case_header(name: &str) -> String {
888    name.split('-')
889        .map(|part| {
890            let mut chars = part.chars();
891            match chars.next() {
892                None => String::new(),
893                Some(first) => first.to_uppercase().chain(chars.as_str().chars()).collect(),
894            }
895        })
896        .collect::<Vec<_>>()
897        .join("-")
898}
899
900// ---------------------------------------------------------------------------
901// HttpConsumer
902// ---------------------------------------------------------------------------
903
904pub struct HttpConsumer {
905    config: HttpServerConfig,
906    /// Phase B will use this for `rt.metrics().increment_errors(...)` and
907    /// `rt.health().force_unhealthy_for_route(...)` calls per ADR-0012.
908    #[allow(dead_code)]
909    runtime: Arc<dyn RuntimeObservability>,
910}
911
912impl HttpConsumer {
913    pub fn new(config: HttpServerConfig, runtime: Arc<dyn RuntimeObservability>) -> Self {
914        Self { config, runtime }
915    }
916}
917
918#[async_trait::async_trait]
919impl Consumer for HttpConsumer {
920    async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
921        use camel_component_api::{Body, Exchange, Message};
922
923        let registry = ServerRegistry::global()
924            .get_or_spawn(
925                &self.config.host,
926                self.config.port,
927                self.config.max_request_body,
928                self.config.max_response_body,
929                self.config.max_inflight_requests,
930            )
931            .await?;
932
933        // Create channel for this path and register it
934        let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
935        registry
936            .register_api_route(self.config.path.clone(), env_tx)
937            .await;
938
939        let path = self.config.path.clone();
940        let registry_for_cleanup = registry.clone();
941        let cancel_token = ctx.cancel_token();
942        loop {
943            tokio::select! {
944                _ = ctx.cancelled() => {
945                    break;
946                }
947                envelope = env_rx.recv() => {
948                    let Some(envelope) = envelope else { break; };
949
950                    // Build Exchange from HTTP request
951                    let mut msg = Message::default();
952
953                    // Set standard Camel HTTP headers
954                    msg.set_header("CamelHttpMethod",
955                        serde_json::Value::String(envelope.method.clone()));
956                    msg.set_header("CamelHttpPath",
957                        serde_json::Value::String(envelope.path.clone()));
958                    msg.set_header("CamelHttpQuery",
959                        serde_json::Value::String(envelope.query.clone()));
960
961                    // Forward HTTP headers with Title-Case names (hyper lowercases them)
962                    for (k, v) in &envelope.headers {
963                        if let Ok(val_str) = v.to_str() {
964                            msg.set_header(
965                                title_case_header(k.as_str()),
966                                serde_json::Value::String(val_str.to_string()),
967                            );
968                        }
969                    }
970
971                    // Body: always arrives as Body::Stream (native streaming)
972                    // Routes can call into_bytes() if they need to materialize
973                    msg.body = Body::Stream(envelope.body);
974
975                    #[allow(unused_mut)]
976                    let mut exchange = Exchange::new(msg);
977
978                    // Extract W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
979                    #[cfg(feature = "otel")]
980                    {
981                        let headers: HashMap<String, String> = envelope
982                            .headers
983                            .iter()
984                            .filter_map(|(k, v)| {
985                                Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
986                            })
987                            .collect();
988                        camel_otel::extract_into_exchange(&mut exchange, &headers);
989                    }
990
991                    let reply_tx = envelope.reply_tx;
992                    let sender = ctx.sender().clone();
993                    let path_clone = path.clone();
994                    let cancel = cancel_token.clone();
995
996                    // Spawn a task to handle this request concurrently
997                    //
998                    // NOTE: This spawns a separate tokio task for each incoming HTTP request to enable
999                    // true concurrent request processing. This change was introduced as part of the
1000                    // pipeline concurrency feature and was NOT part of the original HttpConsumer design.
1001                    //
1002                    // Rationale:
1003                    // 1. Without spawning per-request tasks, the send_and_wait() operation would block
1004                    //    the consumer's main loop until the pipeline processing completes
1005                    // 2. This blocking would prevent multiple HTTP requests from being processed
1006                    //    concurrently, even when ConcurrencyModel::Concurrent is enabled on the pipeline
1007                    // 3. The channel would never have multiple exchanges buffered simultaneously,
1008                    //    defeating the purpose of pipeline-side concurrency
1009                    // 4. By spawning a task per request, we allow the consumer loop to continue
1010                    //    accepting new requests while existing ones are processed in the pipeline
1011                    //
1012                    // This approach effectively decouples request acceptance from pipeline processing,
1013                    // allowing the channel to buffer multiple exchanges that can be processed concurrently
1014                    // by the pipeline when ConcurrencyModel::Concurrent is active.
1015                    tokio::spawn(async move {
1016                        // Check for cancellation before sending to pipeline.
1017                        // Returns 503 (Service Unavailable) instead of letting the request
1018                        // enter a shutting-down pipeline. This is a behavioral change from
1019                        // the pre-concurrency implementation where cancellation during
1020                        // processing would result in a 500 (Internal Server Error).
1021                        // 503 is more semantically correct: the server is temporarily
1022                        // unable to handle the request due to shutdown.
1023                        if cancel.is_cancelled() {
1024                            let _ = reply_tx.send(HttpReply {
1025                                status: 503,
1026                                headers: vec![],
1027                                body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
1028                            });
1029                            return;
1030                        }
1031
1032                        // Send through pipeline and await result
1033                        let (tx, rx) = tokio::sync::oneshot::channel();
1034                        let envelope = camel_component_api::consumer::ExchangeEnvelope {
1035                            exchange,
1036                            reply_tx: Some(tx),
1037                        };
1038
1039                        let result = match sender.send(envelope).await {
1040                            Ok(()) => rx.await.map_err(|_| camel_component_api::CamelError::ChannelClosed),
1041                            Err(_) => Err(camel_component_api::CamelError::ChannelClosed),
1042                        }
1043                        .and_then(|r| r);
1044
1045                        let reply = match result {
1046                            Ok(out) => {
1047                                let status = out
1048                                    .input
1049                                    .header("CamelHttpResponseCode")
1050                                    .and_then(|v| {
1051                                        let raw = v.as_u64()
1052                                            .or_else(|| v.as_str().and_then(|s| s.parse().ok()))?;
1053                                        let code = raw as u16;
1054                                        (100..1000).contains(&code).then_some(code)
1055                                    })
1056                                    .unwrap_or(200);
1057
1058                                let user_content_type = out
1059                                    .input
1060                                    .header("Content-Type")
1061                                    .and_then(|v| v.as_str().map(|s| s.to_string()));
1062
1063                                let (reply_body, inferred_content_type): (HttpReplyBody, Option<String>) = match out.input.body {
1064                                    Body::Empty => (HttpReplyBody::Bytes(bytes::Bytes::new()), None),
1065                                    Body::Bytes(b) => (HttpReplyBody::Bytes(b), None),
1066                                    Body::Text(s) => (HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())), Some("text/plain; charset=utf-8".to_string())),
1067                                    Body::Xml(s) => (HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())), Some("application/xml".to_string())),
1068                                    Body::Json(v) => (HttpReplyBody::Bytes(bytes::Bytes::from(
1069                                        v.to_string().into_bytes(),
1070                                    )), Some("application/json".to_string())),
1071                                    Body::Stream(s) => {
1072                                        let ct = s.metadata.content_type.clone();
1073                                        match s.stream.lock().await.take() {
1074                                            Some(stream) => (
1075                                                HttpReplyBody::Stream(stream),
1076                                                ct,
1077                                            ),
1078                                            None => {
1079                                                tracing::error!(
1080                                                    "Body::Stream already consumed before HTTP reply — returning 500"
1081                                                );
1082                                                let error_reply = HttpReply {
1083                                                    status: 500,
1084                                                    headers: vec![],
1085                                                    body: HttpReplyBody::Bytes(bytes::Bytes::new()),
1086                                                };
1087                                                if reply_tx.send(error_reply).is_err() {
1088                                                    debug!("reply_tx dropped before error reply could be sent");
1089                                                }
1090                                                return;
1091                                            }
1092                                        }
1093                                    }
1094                                };
1095
1096                                let mut resp_headers: Vec<(String, String)> = out
1097                                    .input
1098                                    .headers
1099                                    .iter()
1100                                    .filter(|(k, _)| !k.starts_with("Camel"))
1101                                    .filter(|(k, _)| {
1102                                        !matches!(
1103                                            k.to_lowercase().as_str(),
1104                                            "content-length"
1105                                            | "content-type"
1106                                            | "transfer-encoding"
1107                                            | "connection"
1108                                            | "cache-control"
1109                                            | "date"
1110                                            | "pragma"
1111                                            | "trailer"
1112                                            | "upgrade"
1113                                            | "via"
1114                                            | "warning"
1115                                            | "host"
1116                                            | "user-agent"
1117                                            | "accept"
1118                                            | "accept-encoding"
1119                                            | "accept-language"
1120                                            | "accept-charset"
1121                                            | "authorization"
1122                                            | "proxy-authorization"
1123                                            | "cookie"
1124                                            | "expect"
1125                                            | "from"
1126                                            | "if-match"
1127                                            | "if-modified-since"
1128                                            | "if-none-match"
1129                                            | "if-range"
1130                                            | "if-unmodified-since"
1131                                            | "max-forwards"
1132                                            | "proxy-connection"
1133                                            | "range"
1134                                            | "referer"
1135                                            | "te"
1136                                        )
1137                                    })
1138                                    .filter_map(|(k, v)| {
1139                                        v.as_str().map(|s| (k.clone(), s.to_string()))
1140                                    })
1141                                    .collect();
1142
1143                                let content_type = user_content_type
1144                                    .or(inferred_content_type);
1145                                if let Some(ct) = content_type {
1146                                    resp_headers.push(("Content-Type".to_string(), ct));
1147                                }
1148
1149                                HttpReply {
1150                                    status,
1151                                    headers: resp_headers,
1152                                    body: reply_body,
1153                                }
1154                            }
1155                            Err(CamelError::Stopped) => {
1156                                tracing::debug!(path = %path_clone, "Route stopped — returning 204 No Content");
1157                                HttpReply {
1158                                    status: 204,
1159                                    headers: vec![],
1160                                    body: HttpReplyBody::Bytes(bytes::Bytes::new()),
1161                                }
1162                            }
1163                            Err(CamelError::Unauthenticated(msg)) => {
1164                                tracing::warn!(error = %msg, path = %path_clone, "Authentication failed");
1165                                HttpReply {
1166                                    status: 401,
1167                                    headers: vec![("WWW-Authenticate".to_string(), "Bearer".to_string())],
1168                                    body: HttpReplyBody::Bytes(bytes::Bytes::from("Unauthorized")),
1169                                }
1170                            }
1171                            Err(CamelError::Unauthorized(msg)) => {
1172                                tracing::warn!(error = %msg, path = %path_clone, "Authorization failed");
1173                                HttpReply {
1174                                    status: 403,
1175                                    headers: vec![],
1176                                    body: HttpReplyBody::Bytes(bytes::Bytes::from("Forbidden")),
1177                                }
1178                            }
1179                            Err(e) => {
1180                                tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
1181                                HttpReply {
1182                                    status: 500,
1183                                    headers: vec![],
1184                                    body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
1185                                }
1186                            }
1187                        };
1188
1189                        // Reply to Axum handler (ignore error if client disconnected)
1190                        let _ = reply_tx.send(reply);
1191                    });
1192                }
1193            }
1194        }
1195
1196        // Deregister this path
1197        registry_for_cleanup.unregister_api_route(&path).await;
1198
1199        Ok(())
1200    }
1201
1202    async fn stop(&mut self) -> Result<(), CamelError> {
1203        Ok(())
1204    }
1205
1206    fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1207        camel_component_api::ConcurrencyModel::Concurrent { max: None }
1208    }
1209}
1210
1211// ---------------------------------------------------------------------------
1212// HttpComponent / HttpsComponent
1213// ---------------------------------------------------------------------------
1214
1215pub struct HttpComponent {
1216    config: HttpConfig,
1217}
1218
1219fn build_client(config: &HttpConfig, cookie_handling: CookieHandling) -> reqwest::Client {
1220    let mut builder = reqwest::Client::builder()
1221        .connect_timeout(Duration::from_millis(config.connect_timeout_ms))
1222        .pool_max_idle_per_host(config.pool_max_idle_per_host)
1223        .pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
1224
1225    if !config.follow_redirects {
1226        builder = builder.redirect(reqwest::redirect::Policy::none());
1227    } else if let Some(max_redirects) = config.max_redirects {
1228        builder = builder.redirect(reqwest::redirect::Policy::limited(max_redirects));
1229    }
1230
1231    if let Some(proxy_url) = &config.proxy_url
1232        && let Ok(proxy) = reqwest::Proxy::all(proxy_url)
1233    {
1234        builder = builder.proxy(proxy);
1235    }
1236
1237    if matches!(cookie_handling, CookieHandling::InMemory) {
1238        // TODO(HTTP-013): enable reqwest cookie jar once workspace reqwest features include cookie_store.
1239    }
1240
1241    if let Some(tls) = &config.tls
1242        && tls.enabled
1243    {
1244        if tls.insecure || !tls.verify_peer {
1245            builder = builder.danger_accept_invalid_certs(true);
1246        }
1247
1248        if let Some(ca_path) = &tls.ca_cert_path
1249            && let Ok(ca_bytes) = std::fs::read(ca_path)
1250        {
1251            let cert = reqwest::Certificate::from_pem(&ca_bytes)
1252                .or_else(|_| reqwest::Certificate::from_der(&ca_bytes));
1253            if let Ok(ca_cert) = cert {
1254                builder = builder.add_root_certificate(ca_cert);
1255            }
1256        }
1257
1258        if let (Some(cert_path), Some(key_path)) = (&tls.client_cert_path, &tls.client_key_path)
1259            && let (Ok(cert_bytes), Ok(key_bytes)) =
1260                (std::fs::read(cert_path), std::fs::read(key_path))
1261        {
1262            let mut identity_pem = cert_bytes;
1263            identity_pem.extend_from_slice(&key_bytes);
1264            if let Ok(identity) = reqwest::Identity::from_pem(&identity_pem) {
1265                builder = builder.identity(identity);
1266            }
1267        }
1268    }
1269
1270    builder
1271        .build()
1272        .expect("reqwest::Client::build() with valid config should not fail") // allow-unwrap
1273}
1274
1275impl HttpComponent {
1276    pub fn new() -> Self {
1277        let config = HttpConfig::default();
1278        Self { config }
1279    }
1280
1281    pub fn with_config(config: HttpConfig) -> Self {
1282        Self { config }
1283    }
1284
1285    pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1286        match config {
1287            Some(cfg) => Self::with_config(cfg),
1288            None => Self::new(),
1289        }
1290    }
1291}
1292
1293impl Default for HttpComponent {
1294    fn default() -> Self {
1295        Self::new()
1296    }
1297}
1298
1299impl Component for HttpComponent {
1300    fn scheme(&self) -> &str {
1301        "http"
1302    }
1303
1304    fn create_endpoint(
1305        &self,
1306        uri: &str,
1307        ctx: &dyn camel_component_api::ComponentContext,
1308    ) -> Result<Box<dyn Endpoint>, CamelError> {
1309        self.config.validate()?;
1310        let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1311        let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1312        let client = build_client(&self.config, config.cookie_handling);
1313        ctx.register_current_route_health_check(Arc::new(HttpHealthCheck::new(
1314            server_config.host.clone(),
1315            server_config.port,
1316        )));
1317        Ok(Box::new(HttpEndpoint {
1318            uri: uri.to_string(),
1319            config,
1320            server_config,
1321            client,
1322        }))
1323    }
1324}
1325
1326pub struct HttpsComponent {
1327    config: HttpConfig,
1328}
1329
1330impl HttpsComponent {
1331    pub fn new() -> Self {
1332        let config = HttpConfig::default();
1333        Self { config }
1334    }
1335
1336    pub fn with_config(config: HttpConfig) -> Self {
1337        Self { config }
1338    }
1339
1340    pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1341        match config {
1342            Some(cfg) => Self::with_config(cfg),
1343            None => Self::new(),
1344        }
1345    }
1346}
1347
1348impl Default for HttpsComponent {
1349    fn default() -> Self {
1350        Self::new()
1351    }
1352}
1353
1354impl Component for HttpsComponent {
1355    fn scheme(&self) -> &str {
1356        "https"
1357    }
1358
1359    fn create_endpoint(
1360        &self,
1361        uri: &str,
1362        ctx: &dyn camel_component_api::ComponentContext,
1363    ) -> Result<Box<dyn Endpoint>, CamelError> {
1364        self.config.validate()?;
1365        let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1366        let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1367        let client = build_client(&self.config, config.cookie_handling);
1368        ctx.register_current_route_health_check(Arc::new(HttpHealthCheck::new(
1369            server_config.host.clone(),
1370            server_config.port,
1371        )));
1372        Ok(Box::new(HttpEndpoint {
1373            uri: uri.to_string(),
1374            config,
1375            server_config,
1376            client,
1377        }))
1378    }
1379}
1380
1381// ---------------------------------------------------------------------------
1382// HttpEndpoint
1383// ---------------------------------------------------------------------------
1384
1385struct HttpEndpoint {
1386    uri: String,
1387    config: HttpEndpointConfig,
1388    server_config: HttpServerConfig,
1389    client: reqwest::Client,
1390}
1391
1392impl Endpoint for HttpEndpoint {
1393    fn uri(&self) -> &str {
1394        &self.uri
1395    }
1396
1397    fn create_consumer(
1398        &self,
1399        rt: Arc<dyn camel_component_api::RuntimeObservability>,
1400    ) -> Result<Box<dyn Consumer>, CamelError> {
1401        Ok(Box::new(HttpConsumer::new(self.server_config.clone(), rt)))
1402    }
1403
1404    fn create_producer(
1405        &self,
1406        _rt: Arc<dyn camel_component_api::RuntimeObservability>,
1407        _ctx: &ProducerContext,
1408    ) -> Result<BoxProcessor, CamelError> {
1409        let producer = HttpProducer {
1410            config: Arc::new(self.config.clone()),
1411            client: self.client.clone(),
1412        };
1413        if let Some(ref provider) = self.config.token_provider {
1414            let layer = BearerTokenLayer::new(Arc::clone(provider));
1415            Ok(BoxProcessor::new(layer.layer(producer)))
1416        } else {
1417            Ok(BoxProcessor::new(producer))
1418        }
1419    }
1420}
1421
1422// ---------------------------------------------------------------------------
1423// SSRF Protection
1424// ---------------------------------------------------------------------------
1425
1426fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
1427    let parsed = url::Url::parse(url)
1428        .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1429
1430    // Check blocked hosts
1431    if let Some(host) = parsed.host_str()
1432        && config.blocked_hosts.iter().any(|blocked| host == blocked)
1433    {
1434        return Err(CamelError::ProcessorError(format!(
1435            "Host '{}' is blocked",
1436            host
1437        )));
1438    }
1439
1440    // Check private IPs if not allowed
1441    if !config.allow_private_ips
1442        && let Some(host) = parsed.host()
1443    {
1444        match host {
1445            url::Host::Ipv4(ip) => {
1446                if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
1447                    return Err(CamelError::ProcessorError(format!(
1448                        "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
1449                        ip
1450                    )));
1451                }
1452            }
1453            url::Host::Ipv6(ip) => {
1454                if ip.is_loopback() {
1455                    return Err(CamelError::ProcessorError(format!(
1456                        "Loopback IP '{}' not allowed",
1457                        ip
1458                    )));
1459                }
1460            }
1461            url::Host::Domain(domain) => {
1462                // Block common internal domains
1463                let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
1464                if blocked_domains.contains(&domain) {
1465                    return Err(CamelError::ProcessorError(format!(
1466                        "Domain '{}' is not allowed",
1467                        domain
1468                    )));
1469                }
1470            }
1471        }
1472    }
1473
1474    Ok(())
1475}
1476
1477fn is_private_ip(ip: &IpAddr) -> bool {
1478    match ip {
1479        IpAddr::V4(ipv4) => {
1480            ipv4.is_private() || ipv4.is_loopback() || ipv4.is_link_local() || ipv4.octets()[0] == 0
1481        }
1482        IpAddr::V6(ipv6) => {
1483            let seg0 = ipv6.segments()[0];
1484            ipv6.is_loopback()
1485                // fc00::/7 (ULA)
1486                || (seg0 & 0xfe00) == 0xfc00
1487                // fe80::/10 (link-local)
1488                || (seg0 & 0xffc0) == 0xfe80
1489                // ::ffff:0:0/96 (IPv4-mapped): only block if the mapped IPv4 is private
1490                || ipv6
1491                    .to_ipv4_mapped()
1492                    .map(|v4| {
1493                        v4.is_private()
1494                            || v4.is_loopback()
1495                            || v4.is_link_local()
1496                            || v4.octets()[0] == 0
1497                    })
1498                    .unwrap_or(false)
1499        }
1500    }
1501}
1502
1503async fn validate_resolved_host_for_ssrf(
1504    url: &str,
1505    config: &HttpEndpointConfig,
1506) -> Result<(), CamelError> {
1507    if config.allow_private_ips {
1508        return Ok(());
1509    }
1510
1511    let parsed = url::Url::parse(url)
1512        .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1513    let Some(host) = parsed.host_str() else {
1514        return Ok(());
1515    };
1516    let Some(port) = parsed.port_or_known_default() else {
1517        return Ok(());
1518    };
1519
1520    let resolved = tokio::net::lookup_host((host, port)).await.map_err(|e| {
1521        CamelError::ProcessorError(format!("Failed to resolve host '{}': {}", host, e))
1522    })?;
1523
1524    for addr in resolved {
1525        let ip = addr.ip();
1526        if is_private_ip(&ip) {
1527            return Err(CamelError::ProcessorError(format!(
1528                "Target resolved to private IP: {}",
1529                ip
1530            )));
1531        }
1532    }
1533
1534    Ok(())
1535}
1536
1537// ---------------------------------------------------------------------------
1538// HttpProducer
1539// ---------------------------------------------------------------------------
1540
1541#[derive(Clone)]
1542struct HttpProducer {
1543    config: Arc<HttpEndpointConfig>,
1544    client: reqwest::Client,
1545}
1546
1547impl HttpProducer {
1548    fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1549        if let Some(ref method) = config.http_method {
1550            return method.to_uppercase();
1551        }
1552        if let Some(method) = exchange
1553            .input
1554            .header("CamelHttpMethod")
1555            .and_then(|v| v.as_str())
1556        {
1557            return method.to_uppercase();
1558        }
1559        if !exchange.input.body.is_empty() {
1560            return "POST".to_string();
1561        }
1562        "GET".to_string()
1563    }
1564
1565    fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1566        if let Some(uri) = exchange
1567            .input
1568            .header("CamelHttpUri")
1569            .and_then(|v| v.as_str())
1570        {
1571            let mut url = uri.to_string();
1572            if let Some(path) = exchange
1573                .input
1574                .header("CamelHttpPath")
1575                .and_then(|v| v.as_str())
1576            {
1577                if !url.ends_with('/') && !path.starts_with('/') {
1578                    url.push('/');
1579                }
1580                url.push_str(path);
1581            }
1582            if let Some(query) = exchange
1583                .input
1584                .header("CamelHttpQuery")
1585                .and_then(|v| v.as_str())
1586            {
1587                url.push('?');
1588                url.push_str(query);
1589            }
1590            return url;
1591        }
1592
1593        let mut url = config.base_url.clone();
1594
1595        if let Some(path) = exchange
1596            .input
1597            .header("CamelHttpPath")
1598            .and_then(|v| v.as_str())
1599        {
1600            if !url.ends_with('/') && !path.starts_with('/') {
1601                url.push('/');
1602            }
1603            url.push_str(path);
1604        }
1605
1606        if let Some(query) = exchange
1607            .input
1608            .header("CamelHttpQuery")
1609            .and_then(|v| v.as_str())
1610        {
1611            url.push('?');
1612            url.push_str(query);
1613        } else if !config.query_params.is_empty() {
1614            let mut parsed = url::Url::parse(&url).expect("base URL must be valid"); // allow-unwrap
1615            for (k, v) in &config.query_params {
1616                parsed.query_pairs_mut().append_pair(k, v);
1617            }
1618            url = parsed.to_string();
1619        }
1620
1621        url
1622    }
1623
1624    fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1625        status >= range.0 && status <= range.1
1626    }
1627}
1628
1629impl Service<Exchange> for HttpProducer {
1630    type Response = Exchange;
1631    type Error = CamelError;
1632    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1633
1634    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1635        Poll::Ready(Ok(()))
1636    }
1637
1638    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1639        let config = self.config.clone();
1640        let client = self.client.clone();
1641
1642        Box::pin(async move {
1643            let method_str = HttpProducer::resolve_method(&exchange, &config);
1644            let url = HttpProducer::resolve_url(&exchange, &config);
1645
1646            // SECURITY: Validate URL for SSRF
1647            validate_url_for_ssrf(&url, &config)?;
1648            validate_resolved_host_for_ssrf(&url, &config).await?;
1649
1650            debug!(
1651                correlation_id = %exchange.correlation_id(),
1652                method = %method_str,
1653                url = %url,
1654                "HTTP request"
1655            );
1656
1657            let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1658                CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1659            })?;
1660
1661            let mut request = client.request(method, &url);
1662
1663            if let Some(timeout) = config.response_timeout {
1664                request = request.timeout(timeout);
1665            }
1666
1667            if let Some(user_agent) = &config.user_agent
1668                && !config.bridge_endpoint
1669            {
1670                request = request.header("User-Agent", user_agent.clone());
1671            }
1672
1673            // Inject W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
1674            #[cfg(feature = "otel")]
1675            let should_inject_otel = !config.bridge_endpoint;
1676            #[cfg(feature = "otel")]
1677            if should_inject_otel {
1678                let mut otel_headers = HashMap::new();
1679                camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1680                for (k, v) in otel_headers {
1681                    if let (Ok(name), Ok(val)) = (
1682                        reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1683                        reqwest::header::HeaderValue::from_str(&v),
1684                    ) {
1685                        request = request.header(name, val);
1686                    }
1687                }
1688            }
1689
1690            for (key, value) in &exchange.input.headers {
1691                if !key.starts_with("Camel")
1692                    && !config
1693                        .skip_request_headers
1694                        .iter()
1695                        .any(|h| h.eq_ignore_ascii_case(key))
1696                    && let Some(val_str) = value.as_str()
1697                    && let (Ok(name), Ok(val)) = (
1698                        reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1699                        reqwest::header::HeaderValue::from_str(val_str),
1700                    )
1701                {
1702                    request = request.header(name, val);
1703                }
1704            }
1705
1706            if !config.bridge_endpoint {
1707                match &config.auth {
1708                    HttpAuth::None => {}
1709                    HttpAuth::Basic { username, password } => {
1710                        request = request.basic_auth(username, Some(password));
1711                    }
1712                    HttpAuth::Bearer { token } => {
1713                        request = request.bearer_auth(token);
1714                    }
1715                }
1716
1717                if config.connection_close {
1718                    request = request.header("Connection", "close");
1719                }
1720            }
1721
1722            match exchange.input.body {
1723                Body::Stream(ref s) => {
1724                    let mut stream_lock = s.stream.lock().await;
1725                    if let Some(stream) = stream_lock.take() {
1726                        request = request.body(reqwest::Body::wrap_stream(stream));
1727                    } else {
1728                        return Err(CamelError::AlreadyConsumed);
1729                    }
1730                }
1731                _ => {
1732                    // For other types, materialize with configured limit
1733                    let body = std::mem::take(&mut exchange.input.body);
1734                    let bytes = body.into_bytes(config.max_body_size).await?;
1735                    if !bytes.is_empty() {
1736                        request = request.body(bytes);
1737                    }
1738                }
1739            }
1740
1741            let response = request
1742                .send()
1743                .await
1744                .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1745
1746            let status_code = response.status().as_u16();
1747            let status_text = response
1748                .status()
1749                .canonical_reason()
1750                .unwrap_or("Unknown")
1751                .to_string();
1752
1753            for (key, value) in response.headers() {
1754                if config
1755                    .skip_response_headers
1756                    .iter()
1757                    .any(|h| h.eq_ignore_ascii_case(key.as_str()))
1758                {
1759                    continue;
1760                }
1761                if let Ok(val_str) = value.to_str() {
1762                    exchange.input.set_header(
1763                        title_case_header(key.as_str()),
1764                        serde_json::Value::String(val_str.to_string()),
1765                    );
1766                }
1767            }
1768
1769            exchange.input.set_header(
1770                "CamelHttpResponseCode",
1771                serde_json::Value::Number(status_code.into()),
1772            );
1773            exchange.input.set_header(
1774                "CamelHttpResponseText",
1775                serde_json::Value::String(status_text.clone()),
1776            );
1777
1778            // Read response body with timeout and size guard (HTTP-004, HTTP-005)
1779            let read_timeout = Duration::from_millis(config.read_timeout_ms);
1780            let response_body = tokio::time::timeout(read_timeout, async {
1781                // Check Content-Length header before allocating
1782                if let Some(content_len) = response.content_length()
1783                    && content_len > config.max_response_bytes as u64
1784                {
1785                    return Err(CamelError::ProcessorError(format!(
1786                        "Response body too large: {} bytes exceeds limit of {} bytes",
1787                        content_len, config.max_response_bytes
1788                    )));
1789                }
1790                // Use bytes_stream() for lazy streaming with size guard
1791                use futures::TryStreamExt;
1792                let mut stream = response.bytes_stream();
1793                let mut total: usize = 0;
1794                let mut collected = Vec::new();
1795                while let Some(chunk) = stream.try_next().await.map_err(|e| {
1796                    CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1797                })? {
1798                    total += chunk.len();
1799                    if total > config.max_response_bytes {
1800                        return Err(CamelError::ProcessorError(format!(
1801                            "Response body too large: {} bytes exceeds limit of {} bytes",
1802                            total, config.max_response_bytes
1803                        )));
1804                    }
1805                    collected.push(chunk);
1806                }
1807                let mut result = bytes::BytesMut::with_capacity(total);
1808                for chunk in collected {
1809                    result.extend_from_slice(&chunk);
1810                }
1811                Ok::<bytes::Bytes, CamelError>(result.freeze())
1812            })
1813            .await
1814            .map_err(|_| {
1815                CamelError::ProcessorError(format!(
1816                    "Read timeout after {}ms",
1817                    config.read_timeout_ms
1818                ))
1819            })??;
1820
1821            if config.throw_exception_on_failure
1822                && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1823            {
1824                return Err(CamelError::HttpOperationFailed {
1825                    method: method_str,
1826                    url,
1827                    status_code,
1828                    status_text,
1829                    response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1830                });
1831            }
1832
1833            if !response_body.is_empty() {
1834                exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1835            }
1836
1837            debug!(
1838                correlation_id = %exchange.correlation_id(),
1839                status = status_code,
1840                url = %url,
1841                "HTTP response"
1842            );
1843            Ok(exchange)
1844        })
1845    }
1846}
1847
1848/// Serializes tests that mutate or depend on the global `ServerRegistry`.
1849///
1850/// `ServerRegistry::global()` is a process-wide singleton that persists
1851/// across tests. `ServerRegistry::reset()` clears ALL entries; if it races
1852/// with another test that has a live server on a fixed port (e.g. 9991),
1853/// the registry entry is removed while the OS socket is still bound, so
1854/// the next `get_or_spawn` call on that port fails with "Address already
1855/// in use". Holding this mutex for the full body of each affected test
1856/// prevents the race without requiring `--test-threads=1`.
1857#[cfg(test)]
1858pub(crate) static REGISTRY_TEST_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
1859
1860#[cfg(test)]
1861mod tests {
1862    use camel_component_api::test_support::PanicRuntimeObservability;
1863    fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
1864        std::sync::Arc::new(PanicRuntimeObservability)
1865    }
1866    fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
1867        std::sync::Arc::new(PanicRuntimeObservability)
1868    }
1869
1870    use super::*;
1871    use camel_component_api::{Message, NoOpComponentContext};
1872    use std::sync::Arc;
1873    use std::time::Duration;
1874
1875    fn test_producer_ctx() -> ProducerContext {
1876        ProducerContext::new()
1877    }
1878
1879    #[test]
1880    fn test_http_config_defaults() {
1881        let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1882        assert_eq!(config.base_url, "http://localhost:8080/api");
1883        assert!(config.http_method.is_none());
1884        assert!(config.throw_exception_on_failure);
1885        assert_eq!(config.ok_status_code_range, (200, 299));
1886        assert!(config.response_timeout.is_none());
1887        assert!(matches!(config.auth, HttpAuth::None));
1888        assert!(matches!(config.cookie_handling, CookieHandling::Disabled));
1889        assert!(!config.bridge_endpoint);
1890        assert!(!config.connection_close);
1891    }
1892
1893    #[test]
1894    fn test_http_config_scheme() {
1895        // UriConfig trait method returns "http" as primary scheme
1896        assert_eq!(HttpEndpointConfig::scheme(), "http");
1897    }
1898
1899    #[test]
1900    fn test_http_config_from_components() {
1901        // Test from_components directly (trait method)
1902        let components = camel_component_api::UriComponents {
1903            scheme: "https".to_string(),
1904            path: "//api.example.com/v1".to_string(),
1905            params: std::collections::HashMap::from([(
1906                "httpMethod".to_string(),
1907                "POST".to_string(),
1908            )]),
1909        };
1910        let config = HttpEndpointConfig::from_components(components).unwrap();
1911        assert_eq!(config.base_url, "https://api.example.com/v1");
1912        assert_eq!(config.http_method, Some("POST".to_string()));
1913    }
1914
1915    #[test]
1916    fn test_http_config_with_options() {
1917        let config = HttpEndpointConfig::from_uri(
1918            "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1919        ).unwrap();
1920        assert_eq!(config.base_url, "https://api.example.com/v1");
1921        assert_eq!(config.http_method, Some("PUT".to_string()));
1922        assert!(!config.throw_exception_on_failure);
1923        assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1924    }
1925
1926    #[test]
1927    fn test_http_endpoint_config_auth_and_headers_options() {
1928        let config = HttpEndpointConfig::from_uri(
1929            "http://localhost/api?authMethod=Basic&authUsername=u&authPassword=p&userAgent=camel-test&bridgeEndpoint=true&connectionClose=true&skipRequestHeaders=Authorization,X-Secret&skipResponseHeaders=Set-Cookie&cookieHandling=InMemory",
1930        )
1931        .unwrap();
1932
1933        assert!(matches!(
1934            config.auth,
1935            HttpAuth::Basic { username, password } if username == "u" && password == "p"
1936        ));
1937        assert_eq!(config.user_agent.as_deref(), Some("camel-test"));
1938        assert!(matches!(config.cookie_handling, CookieHandling::InMemory));
1939        assert!(config.bridge_endpoint);
1940        assert!(config.connection_close);
1941        assert_eq!(
1942            config.skip_request_headers,
1943            vec!["authorization".to_string(), "x-secret".to_string()]
1944        );
1945        assert_eq!(config.skip_response_headers, vec!["set-cookie".to_string()]);
1946    }
1947
1948    #[test]
1949    fn test_http_endpoint_config_bearer_auth() {
1950        let config = HttpEndpointConfig::from_uri(
1951            "http://localhost/api?authMethod=Bearer&authBearerToken=t",
1952        )
1953        .unwrap();
1954        assert!(matches!(
1955            config.auth,
1956            HttpAuth::Bearer { token } if token == "t"
1957        ));
1958    }
1959
1960    #[test]
1961    fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
1962        let config = HttpConfig::default()
1963            .with_response_timeout_ms(999)
1964            .with_allow_private_ips(true)
1965            .with_blocked_hosts(vec!["evil.com".to_string()])
1966            .with_max_body_size(12345);
1967        let endpoint =
1968            HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
1969        assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
1970        assert!(endpoint.allow_private_ips);
1971        assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
1972        assert_eq!(endpoint.max_body_size, 12345);
1973    }
1974
1975    #[test]
1976    fn test_from_uri_with_defaults_uri_overrides_config() {
1977        let config = HttpConfig::default()
1978            .with_response_timeout_ms(999)
1979            .with_allow_private_ips(true)
1980            .with_blocked_hosts(vec!["evil.com".to_string()])
1981            .with_max_body_size(12345);
1982        let endpoint = HttpEndpointConfig::from_uri_with_defaults(
1983            "http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
1984            &config,
1985        )
1986        .unwrap();
1987        assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
1988        assert!(!endpoint.allow_private_ips);
1989        assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
1990        assert_eq!(endpoint.max_body_size, 99);
1991    }
1992
1993    #[test]
1994    fn test_http_config_ok_status_range() {
1995        let config =
1996            HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1997        assert_eq!(config.ok_status_code_range, (200, 204));
1998    }
1999
2000    #[test]
2001    fn test_http_config_wrong_scheme() {
2002        let result = HttpEndpointConfig::from_uri("file:/tmp");
2003        assert!(result.is_err());
2004    }
2005
2006    #[test]
2007    fn test_http_component_scheme() {
2008        let component = HttpComponent::new();
2009        assert_eq!(component.scheme(), "http");
2010    }
2011
2012    #[test]
2013    fn test_https_component_scheme() {
2014        let component = HttpsComponent::new();
2015        assert_eq!(component.scheme(), "https");
2016    }
2017
2018    #[test]
2019    fn test_http_endpoint_creates_consumer() {
2020        let component = HttpComponent::new();
2021        let ctx = NoOpComponentContext;
2022        let endpoint = component
2023            .create_endpoint("http://0.0.0.0:19100/test", &ctx)
2024            .unwrap();
2025        assert!(endpoint.create_consumer(rt()).is_ok());
2026    }
2027
2028    #[test]
2029    fn test_https_endpoint_creates_consumer() {
2030        let component = HttpsComponent::new();
2031        let ctx = NoOpComponentContext;
2032        let endpoint = component
2033            .create_endpoint("https://0.0.0.0:8443/test", &ctx)
2034            .unwrap();
2035        assert!(endpoint.create_consumer(rt()).is_ok());
2036    }
2037
2038    #[test]
2039    fn test_http_endpoint_creates_producer() {
2040        let ctx = test_producer_ctx();
2041        let component = HttpComponent::new();
2042        let endpoint_ctx = NoOpComponentContext;
2043        let endpoint = component
2044            .create_endpoint("http://localhost/api", &endpoint_ctx)
2045            .unwrap();
2046        assert!(endpoint.create_producer(rt(), &ctx).is_ok());
2047    }
2048
2049    // -----------------------------------------------------------------------
2050    // Producer tests
2051    // -----------------------------------------------------------------------
2052
2053    #[tokio::test]
2054    async fn test_producer_with_token_provider() {
2055        use camel_auth::oauth2::TokenProvider;
2056        use tower::ServiceExt;
2057
2058        let captured_auth: Arc<std::sync::Mutex<Option<String>>> =
2059            Arc::new(std::sync::Mutex::new(None));
2060        let captured_clone = Arc::clone(&captured_auth);
2061
2062        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2063        let port = listener.local_addr().unwrap().port();
2064
2065        let _handle = tokio::spawn(async move {
2066            use tokio::io::{AsyncReadExt, AsyncWriteExt};
2067            if let Ok((mut stream, _)) = listener.accept().await {
2068                let mut buf = vec![0u8; 8192];
2069                let n = stream.read(&mut buf).await.unwrap_or(0);
2070                let request = String::from_utf8_lossy(&buf[..n]).to_string();
2071                let auth = request
2072                    .lines()
2073                    .find(|l| l.to_lowercase().starts_with("authorization:"))
2074                    .map(|l| {
2075                        l.split(':')
2076                            .nth(1)
2077                            .map(|s| s.trim().to_string())
2078                            .unwrap_or_default()
2079                    });
2080                *captured_clone.lock().unwrap() = auth;
2081                let body = r#"{"echo":"ok"}"#;
2082                let resp = format!(
2083                    "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
2084                    body.len(),
2085                    body
2086                );
2087                let _ = stream.write_all(resp.as_bytes()).await;
2088            }
2089        });
2090
2091        #[derive(Debug)]
2092        struct StaticProvider;
2093        #[async_trait::async_trait]
2094        impl TokenProvider for StaticProvider {
2095            async fn get_token(&self) -> Result<String, camel_auth::types::AuthError> {
2096                Ok("injected-token".into())
2097            }
2098        }
2099
2100        let uri = format!("http://127.0.0.1:{}/api?allowPrivateIps=true", port);
2101        let ctx = test_producer_ctx();
2102        let component = HttpComponent::new();
2103        let endpoint_ctx = NoOpComponentContext;
2104        let endpoint = component.create_endpoint(&uri, &endpoint_ctx).unwrap();
2105        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2106
2107        let exchange = Exchange::new(Message::new("hello"));
2108
2109        let layer = BearerTokenLayer::new(Arc::new(StaticProvider));
2110        let mut layered = layer.layer(producer);
2111        let result = layered.ready().await.unwrap().call(exchange).await;
2112        assert!(result.is_ok(), "producer call failed: {:?}", result);
2113
2114        tokio::time::sleep(Duration::from_millis(100)).await;
2115        let auth = captured_auth.lock().unwrap().take();
2116        assert_eq!(auth.as_deref(), Some("Bearer injected-token"));
2117    }
2118
2119    async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
2120        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2121        let addr = listener.local_addr().unwrap();
2122        let url = format!("http://127.0.0.1:{}", addr.port());
2123
2124        let handle = tokio::spawn(async move {
2125            loop {
2126                if let Ok((mut stream, _)) = listener.accept().await {
2127                    tokio::spawn(async move {
2128                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
2129                        let mut buf = vec![0u8; 4096];
2130                        let n = stream.read(&mut buf).await.unwrap_or(0);
2131                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
2132
2133                        let method = request.split_whitespace().next().unwrap_or("GET");
2134
2135                        let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
2136                        let response = format!(
2137                            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
2138                            body.len(),
2139                            body
2140                        );
2141                        let _ = stream.write_all(response.as_bytes()).await;
2142                    });
2143                }
2144            }
2145        });
2146
2147        (url, handle)
2148    }
2149
2150    async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
2151        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2152        let addr = listener.local_addr().unwrap();
2153        let url = format!("http://127.0.0.1:{}", addr.port());
2154
2155        let handle = tokio::spawn(async move {
2156            loop {
2157                if let Ok((mut stream, _)) = listener.accept().await {
2158                    let status = status;
2159                    tokio::spawn(async move {
2160                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
2161                        let mut buf = vec![0u8; 4096];
2162                        let _ = stream.read(&mut buf).await;
2163
2164                        let status_text = match status {
2165                            404 => "Not Found",
2166                            500 => "Internal Server Error",
2167                            _ => "Error",
2168                        };
2169                        let body = "error body";
2170                        let response = format!(
2171                            "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
2172                            status,
2173                            status_text,
2174                            body.len(),
2175                            body
2176                        );
2177                        let _ = stream.write_all(response.as_bytes()).await;
2178                    });
2179                }
2180            }
2181        });
2182
2183        (url, handle)
2184    }
2185
2186    #[tokio::test]
2187    async fn test_http_producer_get_request() {
2188        use tower::ServiceExt;
2189
2190        let (url, _handle) = start_test_server().await;
2191        let ctx = test_producer_ctx();
2192
2193        let component = HttpComponent::new();
2194        let endpoint_ctx = NoOpComponentContext;
2195        let endpoint = component
2196            .create_endpoint(
2197                &format!("{url}/api/test?allowPrivateIps=true"),
2198                &endpoint_ctx,
2199            )
2200            .unwrap();
2201        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2202
2203        let exchange = Exchange::new(Message::default());
2204        let result = producer.oneshot(exchange).await.unwrap();
2205
2206        let status = result
2207            .input
2208            .header("CamelHttpResponseCode")
2209            .and_then(|v| v.as_u64())
2210            .unwrap();
2211        assert_eq!(status, 200);
2212
2213        assert!(!result.input.body.is_empty());
2214    }
2215
2216    #[tokio::test]
2217    async fn test_http_producer_post_with_body() {
2218        use tower::ServiceExt;
2219
2220        let (url, _handle) = start_test_server().await;
2221        let ctx = test_producer_ctx();
2222
2223        let component = HttpComponent::new();
2224        let endpoint_ctx = NoOpComponentContext;
2225        let endpoint = component
2226            .create_endpoint(
2227                &format!("{url}/api/data?allowPrivateIps=true"),
2228                &endpoint_ctx,
2229            )
2230            .unwrap();
2231        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2232
2233        let exchange = Exchange::new(Message::new("request body"));
2234        let result = producer.oneshot(exchange).await.unwrap();
2235
2236        let status = result
2237            .input
2238            .header("CamelHttpResponseCode")
2239            .and_then(|v| v.as_u64())
2240            .unwrap();
2241        assert_eq!(status, 200);
2242    }
2243
2244    #[tokio::test]
2245    async fn test_http_producer_method_from_header() {
2246        use tower::ServiceExt;
2247
2248        let (url, _handle) = start_test_server().await;
2249        let ctx = test_producer_ctx();
2250
2251        let component = HttpComponent::new();
2252        let endpoint_ctx = NoOpComponentContext;
2253        let endpoint = component
2254            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2255            .unwrap();
2256        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2257
2258        let mut exchange = Exchange::new(Message::default());
2259        exchange.input.set_header(
2260            "CamelHttpMethod",
2261            serde_json::Value::String("DELETE".to_string()),
2262        );
2263
2264        let result = producer.oneshot(exchange).await.unwrap();
2265        let status = result
2266            .input
2267            .header("CamelHttpResponseCode")
2268            .and_then(|v| v.as_u64())
2269            .unwrap();
2270        assert_eq!(status, 200);
2271    }
2272
2273    #[tokio::test]
2274    async fn test_http_producer_forced_method() {
2275        use tower::ServiceExt;
2276
2277        let (url, _handle) = start_test_server().await;
2278        let ctx = test_producer_ctx();
2279
2280        let component = HttpComponent::new();
2281        let endpoint_ctx = NoOpComponentContext;
2282        let endpoint = component
2283            .create_endpoint(
2284                &format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"),
2285                &endpoint_ctx,
2286            )
2287            .unwrap();
2288        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2289
2290        let exchange = Exchange::new(Message::default());
2291        let result = producer.oneshot(exchange).await.unwrap();
2292
2293        let status = result
2294            .input
2295            .header("CamelHttpResponseCode")
2296            .and_then(|v| v.as_u64())
2297            .unwrap();
2298        assert_eq!(status, 200);
2299    }
2300
2301    #[tokio::test]
2302    async fn test_http_producer_throw_exception_on_failure() {
2303        use tower::ServiceExt;
2304
2305        let (url, _handle) = start_status_server(404).await;
2306        let ctx = test_producer_ctx();
2307
2308        let component = HttpComponent::new();
2309        let endpoint_ctx = NoOpComponentContext;
2310        let endpoint = component
2311            .create_endpoint(
2312                &format!("{url}/not-found?allowPrivateIps=true"),
2313                &endpoint_ctx,
2314            )
2315            .unwrap();
2316        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2317
2318        let exchange = Exchange::new(Message::default());
2319        let result = producer.oneshot(exchange).await;
2320        assert!(result.is_err());
2321
2322        match result.unwrap_err() {
2323            CamelError::HttpOperationFailed { status_code, .. } => {
2324                assert_eq!(status_code, 404);
2325            }
2326            e => panic!("Expected HttpOperationFailed, got: {e}"),
2327        }
2328    }
2329
2330    #[tokio::test]
2331    async fn test_http_producer_no_throw_on_failure() {
2332        use tower::ServiceExt;
2333
2334        let (url, _handle) = start_status_server(500).await;
2335        let ctx = test_producer_ctx();
2336
2337        let component = HttpComponent::new();
2338        let endpoint_ctx = NoOpComponentContext;
2339        let endpoint = component
2340            .create_endpoint(
2341                &format!("{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"),
2342                &endpoint_ctx,
2343            )
2344            .unwrap();
2345        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2346
2347        let exchange = Exchange::new(Message::default());
2348        let result = producer.oneshot(exchange).await.unwrap();
2349
2350        let status = result
2351            .input
2352            .header("CamelHttpResponseCode")
2353            .and_then(|v| v.as_u64())
2354            .unwrap();
2355        assert_eq!(status, 500);
2356    }
2357
2358    #[tokio::test]
2359    async fn test_http_producer_uri_override() {
2360        use tower::ServiceExt;
2361
2362        let (url, _handle) = start_test_server().await;
2363        let ctx = test_producer_ctx();
2364
2365        let component = HttpComponent::new();
2366        let endpoint_ctx = NoOpComponentContext;
2367        let endpoint = component
2368            .create_endpoint(
2369                "http://localhost:1/does-not-exist?allowPrivateIps=true",
2370                &endpoint_ctx,
2371            )
2372            .unwrap();
2373        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2374
2375        let mut exchange = Exchange::new(Message::default());
2376        exchange.input.set_header(
2377            "CamelHttpUri",
2378            serde_json::Value::String(format!("{url}/api")),
2379        );
2380
2381        let result = producer.oneshot(exchange).await.unwrap();
2382        let status = result
2383            .input
2384            .header("CamelHttpResponseCode")
2385            .and_then(|v| v.as_u64())
2386            .unwrap();
2387        assert_eq!(status, 200);
2388    }
2389
2390    #[tokio::test]
2391    async fn test_http_producer_response_headers_mapped() {
2392        use tower::ServiceExt;
2393
2394        let (url, _handle) = start_test_server().await;
2395        let ctx = test_producer_ctx();
2396
2397        let component = HttpComponent::new();
2398        let endpoint_ctx = NoOpComponentContext;
2399        let endpoint = component
2400            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2401            .unwrap();
2402        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2403
2404        let exchange = Exchange::new(Message::default());
2405        let result = producer.oneshot(exchange).await.unwrap();
2406
2407        assert!(
2408            result.input.header("Content-Type").is_some(),
2409            "Response should have Content-Type header"
2410        );
2411        assert!(result.input.header("CamelHttpResponseText").is_some());
2412    }
2413
2414    // -----------------------------------------------------------------------
2415    // Bug fix tests: Client configuration per-endpoint
2416    // -----------------------------------------------------------------------
2417
2418    async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
2419        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2420        let addr = listener.local_addr().unwrap();
2421        let url = format!("http://127.0.0.1:{}", addr.port());
2422
2423        let handle = tokio::spawn(async move {
2424            use tokio::io::{AsyncReadExt, AsyncWriteExt};
2425            loop {
2426                if let Ok((mut stream, _)) = listener.accept().await {
2427                    tokio::spawn(async move {
2428                        let mut buf = vec![0u8; 4096];
2429                        let n = stream.read(&mut buf).await.unwrap_or(0);
2430                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
2431
2432                        // Check if this is a request to /final
2433                        if request.contains("GET /final") {
2434                            let body = r#"{"status":"final"}"#;
2435                            let response = format!(
2436                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
2437                                body.len(),
2438                                body
2439                            );
2440                            let _ = stream.write_all(response.as_bytes()).await;
2441                        } else {
2442                            // Redirect to /final
2443                            let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
2444                            let _ = stream.write_all(response.as_bytes()).await;
2445                        }
2446                    });
2447                }
2448            }
2449        });
2450
2451        (url, handle)
2452    }
2453
2454    #[tokio::test]
2455    async fn test_follow_redirects_false_does_not_follow() {
2456        use tower::ServiceExt;
2457
2458        let (url, _handle) = start_redirect_server().await;
2459        let ctx = test_producer_ctx();
2460
2461        let component =
2462            HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
2463        let endpoint_ctx = NoOpComponentContext;
2464        let endpoint = component
2465            .create_endpoint(
2466                &format!("{url}?throwExceptionOnFailure=false&allowPrivateIps=true"),
2467                &endpoint_ctx,
2468            )
2469            .unwrap();
2470        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2471
2472        let exchange = Exchange::new(Message::default());
2473        let result = producer.oneshot(exchange).await.unwrap();
2474
2475        // Should get 302, NOT follow redirect to 200
2476        let status = result
2477            .input
2478            .header("CamelHttpResponseCode")
2479            .and_then(|v| v.as_u64())
2480            .unwrap();
2481        assert_eq!(
2482            status, 302,
2483            "Should NOT follow redirect when followRedirects=false"
2484        );
2485    }
2486
2487    #[tokio::test]
2488    async fn test_follow_redirects_true_follows_redirect() {
2489        use tower::ServiceExt;
2490
2491        let (url, _handle) = start_redirect_server().await;
2492        let ctx = test_producer_ctx();
2493
2494        let component =
2495            HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
2496        let endpoint_ctx = NoOpComponentContext;
2497        let endpoint = component
2498            .create_endpoint(&format!("{url}?allowPrivateIps=true"), &endpoint_ctx)
2499            .unwrap();
2500        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2501
2502        let exchange = Exchange::new(Message::default());
2503        let result = producer.oneshot(exchange).await.unwrap();
2504
2505        // Should follow redirect and get 200
2506        let status = result
2507            .input
2508            .header("CamelHttpResponseCode")
2509            .and_then(|v| v.as_u64())
2510            .unwrap();
2511        assert_eq!(
2512            status, 200,
2513            "Should follow redirect when followRedirects=true"
2514        );
2515    }
2516
2517    #[tokio::test]
2518    async fn test_query_params_forwarded_to_http_request() {
2519        use tower::ServiceExt;
2520
2521        let (url, _handle) = start_test_server().await;
2522        let ctx = test_producer_ctx();
2523
2524        let component = HttpComponent::new();
2525        let endpoint_ctx = NoOpComponentContext;
2526        // apiKey is NOT a Camel option, should be forwarded as query param
2527        let endpoint = component
2528            .create_endpoint(
2529                &format!("{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"),
2530                &endpoint_ctx,
2531            )
2532            .unwrap();
2533        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2534
2535        let exchange = Exchange::new(Message::default());
2536        let result = producer.oneshot(exchange).await.unwrap();
2537
2538        // The test server returns the request info in response
2539        // We just verify it succeeds (the query param was sent)
2540        let status = result
2541            .input
2542            .header("CamelHttpResponseCode")
2543            .and_then(|v| v.as_u64())
2544            .unwrap();
2545        assert_eq!(status, 200);
2546    }
2547
2548    #[tokio::test]
2549    async fn test_non_camel_query_params_are_forwarded() {
2550        // This test verifies Bug #3 fix: non-Camel options should be forwarded
2551        // We'll test the config parsing, not the actual HTTP call
2552        let config = HttpEndpointConfig::from_uri(
2553            "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
2554        )
2555        .unwrap();
2556
2557        // apiKey and token are NOT Camel options, should be forwarded
2558        assert!(
2559            config.query_params.contains_key("apiKey"),
2560            "apiKey should be preserved"
2561        );
2562        assert!(
2563            config.query_params.contains_key("token"),
2564            "token should be preserved"
2565        );
2566        assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
2567        assert_eq!(config.query_params.get("token").unwrap(), "abc456");
2568
2569        // httpMethod IS a Camel option, should NOT be in query_params
2570        assert!(
2571            !config.query_params.contains_key("httpMethod"),
2572            "httpMethod should not be forwarded"
2573        );
2574    }
2575
2576    #[test]
2577    fn test_query_params_are_url_encoded_when_resolving_url() {
2578        let config =
2579            HttpEndpointConfig::from_uri("http://example.com/api?q=hello world&tag=a+b").unwrap();
2580        let exchange = Exchange::new(Message::default());
2581
2582        let url = HttpProducer::resolve_url(&exchange, &config);
2583
2584        assert!(url.contains("q=hello+world"), "url was: {url}");
2585        assert!(url.contains("tag=a%2Bb"), "url was: {url}");
2586    }
2587
2588    // -----------------------------------------------------------------------
2589    // Timeout tests (HTTP-004)
2590    // -----------------------------------------------------------------------
2591
2592    async fn start_slow_server(delay_ms: u64) -> (String, tokio::task::JoinHandle<()>) {
2593        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2594        let addr = listener.local_addr().unwrap();
2595        let url = format!("http://127.0.0.1:{}", addr.port());
2596
2597        let handle = tokio::spawn(async move {
2598            loop {
2599                if let Ok((mut stream, _)) = listener.accept().await {
2600                    let delay = delay_ms;
2601                    tokio::spawn(async move {
2602                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
2603                        let mut buf = vec![0u8; 4096];
2604                        let _ = stream.read(&mut buf).await;
2605                        // Send headers immediately (no Content-Length → chunked)
2606                        let headers = "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nTransfer-Encoding: chunked\r\n\r\n";
2607                        let _ = stream.write_all(headers.as_bytes()).await;
2608                        // Delay before sending body chunk
2609                        tokio::time::sleep(Duration::from_millis(delay)).await;
2610                        let body = r#"{"status":"slow"}"#;
2611                        let chunk = format!("{:x}\r\n{}\r\n0\r\n\r\n", body.len(), body);
2612                        let _ = stream.write_all(chunk.as_bytes()).await;
2613                    });
2614                }
2615            }
2616        });
2617
2618        (url, handle)
2619    }
2620
2621    #[tokio::test]
2622    async fn test_http_producer_timeout() {
2623        use tower::ServiceExt;
2624
2625        // Server delays 500ms, client timeout is 100ms → should timeout
2626        let (url, _handle) = start_slow_server(500).await;
2627        let ctx = test_producer_ctx();
2628
2629        let component = HttpComponent::with_config(
2630            HttpConfig::default()
2631                .with_read_timeout_ms(100)
2632                .with_response_timeout_ms(30_000), // generous response timeout
2633        );
2634        let endpoint_ctx = NoOpComponentContext;
2635        let endpoint = component
2636            .create_endpoint(&format!("{url}/slow?allowPrivateIps=true"), &endpoint_ctx)
2637            .unwrap();
2638        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2639
2640        let exchange = Exchange::new(Message::default());
2641        let result = producer.oneshot(exchange).await;
2642
2643        assert!(result.is_err(), "Expected timeout error, got: {:?}", result);
2644        let err = result.unwrap_err().to_string();
2645        assert!(
2646            err.contains("Read timeout") || err.contains("timeout"),
2647            "Error should mention timeout, got: {}",
2648            err
2649        );
2650    }
2651
2652    #[tokio::test]
2653    async fn test_http_producer_no_timeout_when_fast() {
2654        use tower::ServiceExt;
2655
2656        let (url, _handle) = start_test_server().await;
2657        let ctx = test_producer_ctx();
2658
2659        let component =
2660            HttpComponent::with_config(HttpConfig::default().with_read_timeout_ms(5_000));
2661        let endpoint_ctx = NoOpComponentContext;
2662        let endpoint = component
2663            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2664            .unwrap();
2665        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2666
2667        let exchange = Exchange::new(Message::default());
2668        let result = producer.oneshot(exchange).await.unwrap();
2669
2670        let status = result
2671            .input
2672            .header("CamelHttpResponseCode")
2673            .and_then(|v| v.as_u64())
2674            .unwrap();
2675        assert_eq!(status, 200);
2676    }
2677
2678    // -----------------------------------------------------------------------
2679    // SSRF Protection tests
2680    // -----------------------------------------------------------------------
2681
2682    #[tokio::test]
2683    async fn test_http_producer_blocks_metadata_endpoint() {
2684        use tower::ServiceExt;
2685
2686        let ctx = test_producer_ctx();
2687        let component = HttpComponent::new();
2688        let endpoint_ctx = NoOpComponentContext;
2689        let endpoint = component
2690            .create_endpoint(
2691                "http://example.com/api?allowPrivateIps=false",
2692                &endpoint_ctx,
2693            )
2694            .unwrap();
2695        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2696
2697        let mut exchange = Exchange::new(Message::default());
2698        exchange.input.set_header(
2699            "CamelHttpUri",
2700            serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
2701        );
2702
2703        let result = producer.oneshot(exchange).await;
2704        assert!(result.is_err(), "Should block AWS metadata endpoint");
2705
2706        let err = result.unwrap_err();
2707        assert!(
2708            err.to_string().contains("Private IP"),
2709            "Error should mention private IP blocking, got: {}",
2710            err
2711        );
2712    }
2713
2714    #[test]
2715    fn test_ssrf_config_defaults() {
2716        let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
2717        assert!(
2718            !config.allow_private_ips,
2719            "Private IPs should be blocked by default"
2720        );
2721        assert!(
2722            config.blocked_hosts.is_empty(),
2723            "Blocked hosts should be empty by default"
2724        );
2725    }
2726
2727    #[test]
2728    fn test_ssrf_config_allow_private_ips() {
2729        let config =
2730            HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
2731        assert!(
2732            config.allow_private_ips,
2733            "Private IPs should be allowed when explicitly set"
2734        );
2735    }
2736
2737    #[test]
2738    fn test_ssrf_config_blocked_hosts() {
2739        let config = HttpEndpointConfig::from_uri(
2740            "http://example.com/api?blockedHosts=evil.com,malware.net",
2741        )
2742        .unwrap();
2743        assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
2744    }
2745
2746    #[tokio::test]
2747    async fn test_http_producer_blocks_localhost() {
2748        use tower::ServiceExt;
2749
2750        let ctx = test_producer_ctx();
2751        let component = HttpComponent::new();
2752        let endpoint_ctx = NoOpComponentContext;
2753        let endpoint = component
2754            .create_endpoint("http://example.com/api", &endpoint_ctx)
2755            .unwrap();
2756        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2757
2758        let mut exchange = Exchange::new(Message::default());
2759        exchange.input.set_header(
2760            "CamelHttpUri",
2761            serde_json::Value::String("http://localhost:8080/internal".to_string()),
2762        );
2763
2764        let result = producer.oneshot(exchange).await;
2765        assert!(result.is_err(), "Should block localhost");
2766    }
2767
2768    #[tokio::test]
2769    async fn test_http_producer_blocks_loopback_ip() {
2770        use tower::ServiceExt;
2771
2772        let ctx = test_producer_ctx();
2773        let component = HttpComponent::new();
2774        let endpoint_ctx = NoOpComponentContext;
2775        let endpoint = component
2776            .create_endpoint("http://example.com/api", &endpoint_ctx)
2777            .unwrap();
2778        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2779
2780        let mut exchange = Exchange::new(Message::default());
2781        exchange.input.set_header(
2782            "CamelHttpUri",
2783            serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
2784        );
2785
2786        let result = producer.oneshot(exchange).await;
2787        assert!(result.is_err(), "Should block loopback IP");
2788    }
2789
2790    #[tokio::test]
2791    async fn test_http_producer_allows_private_ip_when_enabled() {
2792        use tower::ServiceExt;
2793
2794        let ctx = test_producer_ctx();
2795        let component = HttpComponent::new();
2796        let endpoint_ctx = NoOpComponentContext;
2797        // With allowPrivateIps=true, the validation should pass
2798        // (actual connection will fail, but that's expected)
2799        let endpoint = component
2800            .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true", &endpoint_ctx)
2801            .unwrap();
2802        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2803
2804        let exchange = Exchange::new(Message::default());
2805
2806        // The request will fail because we can't connect, but it should NOT fail
2807        // due to SSRF protection
2808        let result = producer.oneshot(exchange).await;
2809        // We expect connection error, not SSRF error
2810        if let Err(ref e) = result {
2811            let err_str = e.to_string();
2812            assert!(
2813                !err_str.contains("Private IP") && !err_str.contains("not allowed"),
2814                "Should not be SSRF error, got: {}",
2815                err_str
2816            );
2817        }
2818    }
2819
2820    // -----------------------------------------------------------------------
2821    // HttpServerConfig tests
2822    // -----------------------------------------------------------------------
2823
2824    #[test]
2825    fn test_http_server_config_parse() {
2826        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
2827        assert_eq!(cfg.host, "0.0.0.0");
2828        assert_eq!(cfg.port, 8080);
2829        assert_eq!(cfg.path, "/orders");
2830        assert_eq!(cfg.max_inflight_requests, 1024);
2831    }
2832
2833    #[test]
2834    fn test_http_server_config_scheme() {
2835        // UriConfig trait method returns "http" as primary scheme
2836        assert_eq!(HttpServerConfig::scheme(), "http");
2837    }
2838
2839    #[test]
2840    fn test_http_server_config_from_components() {
2841        // Test from_components directly (trait method)
2842        let components = camel_component_api::UriComponents {
2843            scheme: "https".to_string(),
2844            path: "//0.0.0.0:8443/api".to_string(),
2845            params: std::collections::HashMap::from([
2846                ("maxRequestBody".to_string(), "5242880".to_string()),
2847                ("maxInflightRequests".to_string(), "7".to_string()),
2848            ]),
2849        };
2850        let cfg = HttpServerConfig::from_components(components).unwrap();
2851        assert_eq!(cfg.host, "0.0.0.0");
2852        assert_eq!(cfg.port, 8443);
2853        assert_eq!(cfg.path, "/api");
2854        assert_eq!(cfg.max_request_body, 5242880);
2855        assert_eq!(cfg.max_inflight_requests, 7);
2856    }
2857
2858    #[test]
2859    fn test_http_server_config_default_path() {
2860        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
2861        assert_eq!(cfg.path, "/");
2862    }
2863
2864    #[test]
2865    fn test_http_server_config_wrong_scheme() {
2866        assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
2867    }
2868
2869    #[test]
2870    fn test_http_server_config_invalid_port() {
2871        assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
2872    }
2873
2874    #[test]
2875    fn test_http_server_config_default_port_by_scheme() {
2876        // HTTP without explicit port should default to 80
2877        let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
2878        assert_eq!(cfg_http.port, 80);
2879
2880        // HTTPS without explicit port should default to 443
2881        let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
2882        assert_eq!(cfg_https.port, 443);
2883    }
2884
2885    #[test]
2886    fn test_request_envelope_and_reply_are_send() {
2887        fn assert_send<T: Send>() {}
2888        assert_send::<RequestEnvelope>();
2889        assert_send::<HttpReply>();
2890    }
2891
2892    // -----------------------------------------------------------------------
2893    // ServerRegistry tests
2894    // -----------------------------------------------------------------------
2895
2896    #[test]
2897    fn test_server_registry_global_is_singleton() {
2898        let r1 = ServerRegistry::global();
2899        let r2 = ServerRegistry::global();
2900        assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
2901    }
2902
2903    #[allow(clippy::await_holding_lock)]
2904    #[tokio::test]
2905    async fn test_concurrent_get_or_spawn_returns_same_registry() {
2906        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2907        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2908        let port = listener.local_addr().unwrap().port();
2909        drop(listener);
2910
2911        let results: Arc<std::sync::Mutex<Vec<HttpRouteRegistry>>> =
2912            Arc::new(std::sync::Mutex::new(Vec::new()));
2913
2914        let mut handles = Vec::new();
2915        for _ in 0..4 {
2916            let results = results.clone();
2917            handles.push(tokio::spawn(async move {
2918                let registry = ServerRegistry::global()
2919                    .get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2920                    .await
2921                    .unwrap();
2922                results.lock().unwrap().push(registry);
2923            }));
2924        }
2925
2926        for h in handles {
2927            h.await.unwrap();
2928        }
2929
2930        let registries = results.lock().unwrap();
2931        assert_eq!(registries.len(), 4);
2932        for i in 1..registries.len() {
2933            assert!(
2934                Arc::ptr_eq(&registries[0].inner, &registries[i].inner),
2935                "all concurrent callers should get same route registry"
2936            );
2937        }
2938    }
2939
2940    #[test]
2941    fn test_server_registry_distinguishes_host_and_port() {
2942        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2943        let rt = tokio::runtime::Runtime::new().expect("runtime");
2944        rt.block_on(async {
2945            let registry = ServerRegistry::global();
2946            // Use two distinct host values with same configured port key.
2947            // Port 0 is acceptable here because the registry key uses the configured
2948            // tuple, not the OS-assigned ephemeral port.
2949            let d1 = registry
2950                .get_or_spawn("127.0.0.1", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2951                .await;
2952            let d2 = registry
2953                .get_or_spawn("0.0.0.0", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2954                .await;
2955            assert!(d1.is_ok());
2956            assert!(d2.is_ok());
2957            assert!(!Arc::ptr_eq(&d1.unwrap().inner, &d2.unwrap().inner));
2958        });
2959    }
2960
2961    #[allow(clippy::await_holding_lock)]
2962    #[tokio::test]
2963    async fn test_shared_server_max_request_body_policy_is_deterministic() {
2964        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2965        let registry = ServerRegistry::global();
2966        // First registration: maxRequestBody = 1 MB
2967        let d1 = registry
2968            .get_or_spawn("127.0.0.1", 9991, 1024 * 1024, 10 * 1024 * 1024, 1024)
2969            .await;
2970        assert!(d1.is_ok());
2971
2972        // Second registration on same (host,port): maxRequestBody = 2 MB
2973        // Expected: explicit EndpointCreationFailed about incompatible maxRequestBody
2974        let d2 = registry
2975            .get_or_spawn("127.0.0.1", 9991, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2976            .await;
2977        assert!(d2.is_err());
2978        let err = d2.unwrap_err();
2979        assert!(
2980            err.to_string().contains("maxRequestBody") || err.to_string().contains("incompatible"),
2981            "Expected incompatible maxRequestBody error, got: {}",
2982            err
2983        );
2984    }
2985
2986    #[test]
2987    fn test_server_registry_reset_clears_entries() {
2988        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2989        let rt = tokio::runtime::Runtime::new().expect("runtime");
2990        rt.block_on(async {
2991            // Register something on a unique port
2992            let d1 = ServerRegistry::global()
2993                .get_or_spawn("127.0.0.1", 9992, 1024 * 1024, 10 * 1024 * 1024, 1024)
2994                .await;
2995            assert!(d1.is_ok());
2996
2997            // Verify entry exists
2998            let guard = ServerRegistry::global().inner.lock().expect("lock");
2999            assert!(guard.contains_key(&("127.0.0.1".to_string(), 9992)));
3000            drop(guard);
3001
3002            // Reset
3003            ServerRegistry::reset();
3004
3005            // Verify cleared
3006            let guard = ServerRegistry::global().inner.lock().expect("lock");
3007            assert!(
3008                guard.is_empty(),
3009                "registry should be empty after reset, has {} entries",
3010                guard.len()
3011            );
3012        });
3013    }
3014
3015    // -----------------------------------------------------------------------
3016    // Axum dispatch handler tests
3017    // -----------------------------------------------------------------------
3018
3019    #[tokio::test]
3020    async fn test_dispatch_handler_returns_404_for_unknown_path() {
3021        let registry = HttpRouteRegistry::new();
3022        // Nothing registered in route registry
3023        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3024        let port = listener.local_addr().unwrap().port();
3025        tokio::spawn(run_axum_server(
3026            listener,
3027            registry,
3028            2 * 1024 * 1024,
3029            10 * 1024 * 1024,
3030            Arc::new(tokio::sync::Semaphore::new(1024)),
3031        ));
3032
3033        // Wait for server to start
3034        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3035
3036        let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
3037            .await
3038            .unwrap();
3039        assert_eq!(resp.status().as_u16(), 404);
3040    }
3041
3042    // -----------------------------------------------------------------------
3043    // HttpConsumer tests
3044    // -----------------------------------------------------------------------
3045
3046    #[tokio::test]
3047    async fn test_http_consumer_start_registers_path() {
3048        use camel_component_api::ConsumerContext;
3049
3050        // Get an OS-assigned free port
3051        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3052        let port = listener.local_addr().unwrap().port();
3053        drop(listener); // Release port — ServerRegistry will rebind it
3054
3055        let consumer_cfg = HttpServerConfig {
3056            host: "127.0.0.1".to_string(),
3057            port,
3058            path: "/ping".to_string(),
3059            max_request_body: 2 * 1024 * 1024,
3060            max_response_body: 10 * 1024 * 1024,
3061            max_inflight_requests: 1024,
3062        };
3063        let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3064
3065        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3066        let token = tokio_util::sync::CancellationToken::new();
3067        let ctx = ConsumerContext::new(tx, token.clone());
3068
3069        tokio::spawn(async move {
3070            consumer.start(ctx).await.unwrap();
3071        });
3072
3073        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3074
3075        let client = reqwest::Client::new();
3076        let resp_future = client
3077            .post(format!("http://127.0.0.1:{port}/ping"))
3078            .body("hello world")
3079            .send();
3080
3081        let (http_result, _) = tokio::join!(resp_future, async {
3082            if let Some(mut envelope) = rx.recv().await {
3083                // Set a custom status code
3084                envelope.exchange.input.set_header(
3085                    "CamelHttpResponseCode",
3086                    serde_json::Value::Number(201.into()),
3087                );
3088                if let Some(reply_tx) = envelope.reply_tx {
3089                    let _ = reply_tx.send(Ok(envelope.exchange));
3090                }
3091            }
3092        });
3093
3094        let resp = http_result.unwrap();
3095        assert_eq!(resp.status().as_u16(), 201);
3096
3097        token.cancel();
3098    }
3099
3100    #[tokio::test]
3101    async fn test_http_consumer_returns_503_when_inflight_limit_reached() {
3102        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3103
3104        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3105        let port = listener.local_addr().unwrap().port();
3106        drop(listener);
3107
3108        let consumer_cfg = HttpServerConfig {
3109            host: "127.0.0.1".to_string(),
3110            port,
3111            path: "/saturation".to_string(),
3112            max_request_body: 2 * 1024 * 1024,
3113            max_response_body: 10 * 1024 * 1024,
3114            max_inflight_requests: 1,
3115        };
3116        let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3117
3118        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3119        let token = tokio_util::sync::CancellationToken::new();
3120        let ctx = ConsumerContext::new(tx, token.clone());
3121        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3122        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3123
3124        let (first_seen_tx, first_seen_rx) = tokio::sync::oneshot::channel::<()>();
3125        let (unblock_first_tx, unblock_first_rx) = tokio::sync::oneshot::channel::<()>();
3126
3127        tokio::spawn(async move {
3128            let mut first_seen_tx = Some(first_seen_tx);
3129            let mut unblock_first_rx = Some(unblock_first_rx);
3130
3131            while let Some(envelope) = rx.recv().await {
3132                if let Some(tx) = first_seen_tx.take() {
3133                    let _ = tx.send(());
3134                    if let Some(rx_unblock) = unblock_first_rx.take() {
3135                        let _ = rx_unblock.await;
3136                    }
3137                }
3138
3139                if let Some(reply_tx) = envelope.reply_tx {
3140                    let _ = reply_tx.send(Ok(envelope.exchange));
3141                }
3142            }
3143        });
3144
3145        let client = reqwest::Client::new();
3146        let first_req = {
3147            let client = client.clone();
3148            async move {
3149                client
3150                    .get(format!("http://127.0.0.1:{port}/saturation"))
3151                    .send()
3152                    .await
3153                    .unwrap()
3154            }
3155        };
3156
3157        let first_handle = tokio::spawn(first_req);
3158        first_seen_rx.await.unwrap();
3159
3160        let second_resp = client
3161            .get(format!("http://127.0.0.1:{port}/saturation"))
3162            .send()
3163            .await
3164            .unwrap();
3165
3166        assert_eq!(second_resp.status().as_u16(), 503);
3167
3168        let _ = unblock_first_tx.send(());
3169        let first_resp = first_handle.await.unwrap();
3170        assert_eq!(first_resp.status().as_u16(), 200);
3171
3172        token.cancel();
3173    }
3174
3175    #[tokio::test]
3176    async fn test_http_consumer_enforces_max_response_body_for_bytes() {
3177        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3178
3179        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3180
3181        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3182        let port = listener.local_addr().unwrap().port();
3183        drop(listener);
3184
3185        let consumer_cfg = HttpServerConfig {
3186            host: "127.0.0.1".to_string(),
3187            port,
3188            path: "/limit-bytes".to_string(),
3189            max_request_body: 2 * 1024 * 1024,
3190            max_response_body: 16,
3191            max_inflight_requests: 1024,
3192        };
3193        let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3194
3195        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3196        let token = tokio_util::sync::CancellationToken::new();
3197        let ctx = ConsumerContext::new(tx, token.clone());
3198        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3199        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3200
3201        let client = reqwest::Client::new();
3202        let send_fut = client
3203            .get(format!("http://127.0.0.1:{port}/limit-bytes"))
3204            .send();
3205
3206        let (http_result, _) = tokio::join!(send_fut, async {
3207            if let Some(mut envelope) = rx.recv().await {
3208                envelope.exchange.input.body =
3209                    camel_component_api::Body::Bytes(bytes::Bytes::from(vec![b'x'; 32]));
3210                if let Some(reply_tx) = envelope.reply_tx {
3211                    let _ = reply_tx.send(Ok(envelope.exchange));
3212                }
3213            }
3214        });
3215
3216        let resp = http_result.unwrap();
3217        assert_eq!(resp.status().as_u16(), 500);
3218        let body = resp.text().await.unwrap();
3219        assert_eq!(body, "Response body exceeds configured limit");
3220        token.cancel();
3221    }
3222
3223    #[tokio::test]
3224    async fn test_http_consumer_enforces_max_response_body_for_json() {
3225        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3226
3227        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3228
3229        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3230        let port = listener.local_addr().unwrap().port();
3231        drop(listener);
3232
3233        let consumer_cfg = HttpServerConfig {
3234            host: "127.0.0.1".to_string(),
3235            port,
3236            path: "/limit-json".to_string(),
3237            max_request_body: 2 * 1024 * 1024,
3238            max_response_body: 16,
3239            max_inflight_requests: 1024,
3240        };
3241        let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3242
3243        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3244        let token = tokio_util::sync::CancellationToken::new();
3245        let ctx = ConsumerContext::new(tx, token.clone());
3246        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3247        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3248
3249        let client = reqwest::Client::new();
3250        let send_fut = client
3251            .get(format!("http://127.0.0.1:{port}/limit-json"))
3252            .send();
3253
3254        let (http_result, _) = tokio::join!(send_fut, async {
3255            if let Some(mut envelope) = rx.recv().await {
3256                envelope.exchange.input.body = camel_component_api::Body::Json(
3257                    serde_json::json!({"message":"this response is bigger than sixteen"}),
3258                );
3259                if let Some(reply_tx) = envelope.reply_tx {
3260                    let _ = reply_tx.send(Ok(envelope.exchange));
3261                }
3262            }
3263        });
3264
3265        let resp = http_result.unwrap();
3266        assert_eq!(resp.status().as_u16(), 500);
3267        let body = resp.text().await.unwrap();
3268        assert_eq!(body, "Response body exceeds configured limit");
3269        token.cancel();
3270    }
3271
3272    #[tokio::test]
3273    async fn test_http_consumer_enforces_max_response_body_for_xml() {
3274        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3275
3276        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3277
3278        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3279        let port = listener.local_addr().unwrap().port();
3280        drop(listener);
3281
3282        let consumer_cfg = HttpServerConfig {
3283            host: "127.0.0.1".to_string(),
3284            port,
3285            path: "/limit-xml".to_string(),
3286            max_request_body: 2 * 1024 * 1024,
3287            max_response_body: 16,
3288            max_inflight_requests: 1024,
3289        };
3290        let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3291
3292        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3293        let token = tokio_util::sync::CancellationToken::new();
3294        let ctx = ConsumerContext::new(tx, token.clone());
3295        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3296        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3297
3298        let client = reqwest::Client::new();
3299        let send_fut = client
3300            .get(format!("http://127.0.0.1:{port}/limit-xml"))
3301            .send();
3302
3303        let (http_result, _) = tokio::join!(send_fut, async {
3304            if let Some(mut envelope) = rx.recv().await {
3305                envelope.exchange.input.body = camel_component_api::Body::Xml(
3306                    "<root><value>way-too-large</value></root>".into(),
3307                );
3308                if let Some(reply_tx) = envelope.reply_tx {
3309                    let _ = reply_tx.send(Ok(envelope.exchange));
3310                }
3311            }
3312        });
3313
3314        let resp = http_result.unwrap();
3315        assert_eq!(resp.status().as_u16(), 500);
3316        let body = resp.text().await.unwrap();
3317        assert_eq!(body, "Response body exceeds configured limit");
3318        token.cancel();
3319    }
3320
3321    #[tokio::test]
3322    async fn test_http_consumer_does_not_enforce_max_response_body_for_stream() {
3323        use camel_component_api::{
3324            CamelError, ConsumerContext, ExchangeEnvelope, StreamBody, StreamMetadata,
3325        };
3326        use futures::stream;
3327
3328        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3329
3330        let listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();
3331        let port = listener.local_addr().unwrap().port();
3332        drop(listener);
3333
3334        let consumer_cfg = HttpServerConfig {
3335            host: "0.0.0.0".to_string(),
3336            port,
3337            path: "/limit-stream".to_string(),
3338            max_request_body: 2 * 1024 * 1024,
3339            max_response_body: 16,
3340            max_inflight_requests: 1024,
3341        };
3342        let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3343
3344        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3345        let token = tokio_util::sync::CancellationToken::new();
3346        let ctx = ConsumerContext::new(tx, token.clone());
3347        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3348        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3349
3350        let client = reqwest::Client::new();
3351        let send_fut = client
3352            .get(format!("http://127.0.0.1:{port}/limit-stream"))
3353            .send();
3354
3355        let (http_result, _) = tokio::join!(send_fut, async {
3356            if let Some(mut envelope) = rx.recv().await {
3357                let chunks: Vec<Result<bytes::Bytes, CamelError>> =
3358                    vec![Ok(bytes::Bytes::from(vec![b'x'; 32]))];
3359                let stream = Box::pin(stream::iter(chunks));
3360                envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
3361                    stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
3362                    metadata: StreamMetadata {
3363                        size_hint: Some(32),
3364                        content_type: Some("application/octet-stream".into()),
3365                        origin: None,
3366                    },
3367                });
3368                if let Some(reply_tx) = envelope.reply_tx {
3369                    let _ = reply_tx.send(Ok(envelope.exchange));
3370                }
3371            }
3372        });
3373
3374        let resp = http_result.unwrap();
3375        assert_eq!(resp.status().as_u16(), 200);
3376        let body = resp.bytes().await.unwrap();
3377        assert_eq!(body.len(), 32);
3378        token.cancel();
3379    }
3380
3381    // -----------------------------------------------------------------------
3382    // Integration tests
3383    // -----------------------------------------------------------------------
3384
3385    #[tokio::test]
3386    async fn test_integration_single_consumer_round_trip() {
3387        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3388
3389        // Get an OS-assigned free port (ephemeral)
3390        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3391        let port = listener.local_addr().unwrap().port();
3392        drop(listener); // Release — ServerRegistry will rebind
3393
3394        let component = HttpComponent::new();
3395        let endpoint_ctx = NoOpComponentContext;
3396        let endpoint = component
3397            .create_endpoint(&format!("http://127.0.0.1:{port}/echo"), &endpoint_ctx)
3398            .unwrap();
3399        let mut consumer = endpoint.create_consumer(rt()).unwrap();
3400
3401        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3402        let token = tokio_util::sync::CancellationToken::new();
3403        let ctx = ConsumerContext::new(tx, token.clone());
3404
3405        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3406        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3407
3408        let client = reqwest::Client::new();
3409        let send_fut = client
3410            .post(format!("http://127.0.0.1:{port}/echo"))
3411            .header("Content-Type", "text/plain")
3412            .body("ping")
3413            .send();
3414
3415        let (http_result, _) = tokio::join!(send_fut, async {
3416            if let Some(mut envelope) = rx.recv().await {
3417                assert_eq!(
3418                    envelope.exchange.input.header("CamelHttpMethod"),
3419                    Some(&serde_json::Value::String("POST".into()))
3420                );
3421                assert_eq!(
3422                    envelope.exchange.input.header("CamelHttpPath"),
3423                    Some(&serde_json::Value::String("/echo".into()))
3424                );
3425                envelope.exchange.input.body = camel_component_api::Body::Text("pong".to_string());
3426                if let Some(reply_tx) = envelope.reply_tx {
3427                    let _ = reply_tx.send(Ok(envelope.exchange));
3428                }
3429            }
3430        });
3431
3432        let resp = http_result.unwrap();
3433        assert_eq!(resp.status().as_u16(), 200);
3434        let body = resp.text().await.unwrap();
3435        assert_eq!(body, "pong");
3436
3437        token.cancel();
3438    }
3439
3440    #[tokio::test]
3441    async fn test_integration_two_consumers_shared_port() {
3442        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3443
3444        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3445
3446        // Get an OS-assigned free port (ephemeral)
3447        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3448        let port = listener.local_addr().unwrap().port();
3449        drop(listener);
3450
3451        let component = HttpComponent::new();
3452        let endpoint_ctx = NoOpComponentContext;
3453
3454        // Consumer A: /hello
3455        let endpoint_a = component
3456            .create_endpoint(&format!("http://127.0.0.1:{port}/hello"), &endpoint_ctx)
3457            .unwrap();
3458        let mut consumer_a = endpoint_a.create_consumer(rt()).unwrap();
3459
3460        // Consumer B: /world
3461        let endpoint_b = component
3462            .create_endpoint(&format!("http://127.0.0.1:{port}/world"), &endpoint_ctx)
3463            .unwrap();
3464        let mut consumer_b = endpoint_b.create_consumer(rt()).unwrap();
3465
3466        let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3467        let token_a = tokio_util::sync::CancellationToken::new();
3468        let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
3469
3470        let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3471        let token_b = tokio_util::sync::CancellationToken::new();
3472        let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
3473
3474        tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
3475        tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
3476        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3477
3478        let client = reqwest::Client::new();
3479
3480        // Request to /hello
3481        let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
3482        let (resp_hello, _) = tokio::join!(fut_hello, async {
3483            if let Some(mut envelope) = rx_a.recv().await {
3484                envelope.exchange.input.body =
3485                    camel_component_api::Body::Text("hello-response".to_string());
3486                if let Some(reply_tx) = envelope.reply_tx {
3487                    let _ = reply_tx.send(Ok(envelope.exchange));
3488                }
3489            }
3490        });
3491
3492        // Request to /world
3493        let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
3494        let (resp_world, _) = tokio::join!(fut_world, async {
3495            if let Some(mut envelope) = rx_b.recv().await {
3496                envelope.exchange.input.body =
3497                    camel_component_api::Body::Text("world-response".to_string());
3498                if let Some(reply_tx) = envelope.reply_tx {
3499                    let _ = reply_tx.send(Ok(envelope.exchange));
3500                }
3501            }
3502        });
3503
3504        let body_a = resp_hello.unwrap().text().await.unwrap();
3505        let body_b = resp_world.unwrap().text().await.unwrap();
3506
3507        assert_eq!(body_a, "hello-response");
3508        assert_eq!(body_b, "world-response");
3509
3510        token_a.cancel();
3511        token_b.cancel();
3512    }
3513
3514    #[tokio::test]
3515    async fn test_integration_unregistered_path_returns_404() {
3516        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3517
3518        // Get an OS-assigned free port (ephemeral)
3519        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3520        let port = listener.local_addr().unwrap().port();
3521        drop(listener);
3522
3523        let component = HttpComponent::new();
3524        let endpoint_ctx = NoOpComponentContext;
3525        let endpoint = component
3526            .create_endpoint(
3527                &format!("http://127.0.0.1:{port}/registered"),
3528                &endpoint_ctx,
3529            )
3530            .unwrap();
3531        let mut consumer = endpoint.create_consumer(rt()).unwrap();
3532
3533        let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3534        let token = tokio_util::sync::CancellationToken::new();
3535        let ctx = ConsumerContext::new(tx, token.clone());
3536
3537        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3538
3539        // Wait until the server is actually accepting connections (CI runners can be slow).
3540        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
3541        loop {
3542            if tokio::net::TcpStream::connect(format!("127.0.0.1:{port}"))
3543                .await
3544                .is_ok()
3545            {
3546                break;
3547            }
3548            if std::time::Instant::now() >= deadline {
3549                panic!("HTTP server did not start within 5s on port {port}");
3550            }
3551            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3552        }
3553
3554        let client = reqwest::Client::new();
3555        let resp = client
3556            .get(format!("http://127.0.0.1:{port}/not-there"))
3557            .send()
3558            .await
3559            .unwrap();
3560        assert_eq!(resp.status().as_u16(), 404);
3561
3562        token.cancel();
3563    }
3564
3565    #[test]
3566    fn test_http_consumer_declares_concurrent() {
3567        use camel_component_api::ConcurrencyModel;
3568
3569        let config = HttpServerConfig {
3570            host: "127.0.0.1".to_string(),
3571            port: 19999,
3572            path: "/test".to_string(),
3573            max_request_body: 2 * 1024 * 1024,
3574            max_response_body: 10 * 1024 * 1024,
3575            max_inflight_requests: 1024,
3576        };
3577        let consumer = HttpConsumer::new(config, test_rt());
3578        assert_eq!(
3579            consumer.concurrency_model(),
3580            ConcurrencyModel::Concurrent { max: None }
3581        );
3582    }
3583
3584    // -----------------------------------------------------------------------
3585    // HttpReplyBody streaming tests
3586    // -----------------------------------------------------------------------
3587
3588    #[tokio::test]
3589    async fn test_http_reply_body_stream_variant_exists() {
3590        use bytes::Bytes;
3591        use camel_component_api::CamelError;
3592        use futures::stream;
3593
3594        let chunks: Vec<Result<Bytes, CamelError>> =
3595            vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
3596        let stream = Box::pin(stream::iter(chunks));
3597        let reply_body = HttpReplyBody::Stream(stream);
3598        // Si compila y el match funciona, el test pasa
3599        match reply_body {
3600            HttpReplyBody::Stream(_) => {}
3601            HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
3602        }
3603    }
3604
3605    // -----------------------------------------------------------------------
3606    // OpenTelemetry propagation tests (only compiled with "otel" feature)
3607    // -----------------------------------------------------------------------
3608
3609    #[cfg(feature = "otel")]
3610    mod otel_tests {
3611        use super::*;
3612        use camel_component_api::Message;
3613        use tower::ServiceExt;
3614
3615        #[tokio::test]
3616        async fn test_producer_injects_traceparent_header() {
3617            let (url, _handle) = start_test_server_with_header_capture().await;
3618            let ctx = test_producer_ctx();
3619
3620            let component = HttpComponent::new();
3621            let endpoint_ctx = NoOpComponentContext;
3622            let endpoint = component
3623                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3624                .unwrap();
3625            let producer = endpoint.create_producer(rt(), &ctx).unwrap();
3626
3627            // Create exchange with an OTel context by extracting from a traceparent header
3628            let mut exchange = Exchange::new(Message::default());
3629            let mut headers = std::collections::HashMap::new();
3630            headers.insert(
3631                "traceparent".to_string(),
3632                "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
3633            );
3634            camel_otel::extract_into_exchange(&mut exchange, &headers);
3635
3636            let result = producer.oneshot(exchange).await.unwrap();
3637
3638            // Verify request succeeded
3639            let status = result
3640                .input
3641                .header("CamelHttpResponseCode")
3642                .and_then(|v| v.as_u64())
3643                .unwrap();
3644            assert_eq!(status, 200);
3645
3646            // The test server echoes back the received traceparent header
3647            let traceparent = result.input.header("X-Received-Traceparent");
3648            assert!(
3649                traceparent.is_some(),
3650                "traceparent header should have been sent"
3651            );
3652
3653            let traceparent_str = traceparent.unwrap().as_str().unwrap();
3654            // Verify format: version-traceid-spanid-flags
3655            let parts: Vec<&str> = traceparent_str.split('-').collect();
3656            assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3657            assert_eq!(parts[0], "00", "version should be 00");
3658            assert_eq!(
3659                parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3660                "trace-id should match"
3661            );
3662            assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
3663            assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
3664        }
3665
3666        #[tokio::test]
3667        async fn test_consumer_extracts_traceparent_header() {
3668            use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3669
3670            // Get an OS-assigned free port
3671            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3672            let port = listener.local_addr().unwrap().port();
3673            drop(listener);
3674
3675            let component = HttpComponent::new();
3676            let endpoint_ctx = NoOpComponentContext;
3677            let endpoint = component
3678                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
3679                .unwrap();
3680            let mut consumer = endpoint.create_consumer(rt()).unwrap();
3681
3682            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3683            let token = tokio_util::sync::CancellationToken::new();
3684            let ctx = ConsumerContext::new(tx, token.clone());
3685
3686            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3687            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3688
3689            // Send request with traceparent header
3690            let client = reqwest::Client::new();
3691            let send_fut = client
3692                .post(format!("http://127.0.0.1:{port}/trace"))
3693                .header(
3694                    "traceparent",
3695                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3696                )
3697                .body("test")
3698                .send();
3699
3700            let (http_result, _) = tokio::join!(send_fut, async {
3701                if let Some(envelope) = rx.recv().await {
3702                    // Verify the exchange has a valid OTel context by re-injecting it
3703                    // and checking the traceparent matches
3704                    let mut injected_headers = std::collections::HashMap::new();
3705                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3706
3707                    assert!(
3708                        injected_headers.contains_key("traceparent"),
3709                        "Exchange should have traceparent after extraction"
3710                    );
3711
3712                    let traceparent = injected_headers.get("traceparent").unwrap();
3713                    let parts: Vec<&str> = traceparent.split('-').collect();
3714                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3715                    assert_eq!(
3716                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3717                        "Trace ID should match the original traceparent header"
3718                    );
3719
3720                    if let Some(reply_tx) = envelope.reply_tx {
3721                        let _ = reply_tx.send(Ok(envelope.exchange));
3722                    }
3723                }
3724            });
3725
3726            let resp = http_result.unwrap();
3727            assert_eq!(resp.status().as_u16(), 200);
3728
3729            token.cancel();
3730        }
3731
3732        #[tokio::test]
3733        async fn test_consumer_extracts_mixed_case_traceparent_header() {
3734            use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3735
3736            // Get an OS-assigned free port
3737            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3738            let port = listener.local_addr().unwrap().port();
3739            drop(listener);
3740
3741            let component = HttpComponent::new();
3742            let endpoint_ctx = NoOpComponentContext;
3743            let endpoint = component
3744                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
3745                .unwrap();
3746            let mut consumer = endpoint.create_consumer(rt()).unwrap();
3747
3748            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3749            let token = tokio_util::sync::CancellationToken::new();
3750            let ctx = ConsumerContext::new(tx, token.clone());
3751
3752            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3753            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3754
3755            // Send request with MIXED-CASE TraceParent header (not lowercase)
3756            let client = reqwest::Client::new();
3757            let send_fut = client
3758                .post(format!("http://127.0.0.1:{port}/trace"))
3759                .header(
3760                    "TraceParent",
3761                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3762                )
3763                .body("test")
3764                .send();
3765
3766            let (http_result, _) = tokio::join!(send_fut, async {
3767                if let Some(envelope) = rx.recv().await {
3768                    // Verify the exchange has a valid OTel context by re-injecting it
3769                    // and checking the traceparent matches
3770                    let mut injected_headers = HashMap::new();
3771                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3772
3773                    assert!(
3774                        injected_headers.contains_key("traceparent"),
3775                        "Exchange should have traceparent after extraction from mixed-case header"
3776                    );
3777
3778                    let traceparent = injected_headers.get("traceparent").unwrap();
3779                    let parts: Vec<&str> = traceparent.split('-').collect();
3780                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3781                    assert_eq!(
3782                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3783                        "Trace ID should match the original mixed-case TraceParent header"
3784                    );
3785
3786                    if let Some(reply_tx) = envelope.reply_tx {
3787                        let _ = reply_tx.send(Ok(envelope.exchange));
3788                    }
3789                }
3790            });
3791
3792            let resp = http_result.unwrap();
3793            assert_eq!(resp.status().as_u16(), 200);
3794
3795            token.cancel();
3796        }
3797
3798        #[tokio::test]
3799        async fn test_producer_no_trace_context_no_crash() {
3800            let (url, _handle) = start_test_server().await;
3801            let ctx = test_producer_ctx();
3802
3803            let component = HttpComponent::new();
3804            let endpoint_ctx = NoOpComponentContext;
3805            let endpoint = component
3806                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3807                .unwrap();
3808            let producer = endpoint.create_producer(rt(), &ctx).unwrap();
3809
3810            // Create exchange with default (empty) otel_context - no trace context
3811            let exchange = Exchange::new(Message::default());
3812
3813            // Should succeed without panic
3814            let result = producer.oneshot(exchange).await.unwrap();
3815
3816            // Verify request succeeded
3817            let status = result
3818                .input
3819                .header("CamelHttpResponseCode")
3820                .and_then(|v| v.as_u64())
3821                .unwrap();
3822            assert_eq!(status, 200);
3823        }
3824
3825        /// Test server that captures and echoes back the traceparent header
3826        async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
3827            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3828            let addr = listener.local_addr().unwrap();
3829            let url = format!("http://127.0.0.1:{}", addr.port());
3830
3831            let handle = tokio::spawn(async move {
3832                loop {
3833                    if let Ok((mut stream, _)) = listener.accept().await {
3834                        tokio::spawn(async move {
3835                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
3836                            let mut buf = vec![0u8; 8192];
3837                            let n = stream.read(&mut buf).await.unwrap_or(0);
3838                            let request = String::from_utf8_lossy(&buf[..n]).to_string();
3839
3840                            // Extract traceparent header from request
3841                            let traceparent = request
3842                                .lines()
3843                                .find(|line| line.to_lowercase().starts_with("traceparent:"))
3844                                .map(|line| {
3845                                    line.split(':')
3846                                        .nth(1)
3847                                        .map(|s| s.trim().to_string())
3848                                        .unwrap_or_default()
3849                                })
3850                                .unwrap_or_default();
3851
3852                            let body =
3853                                format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
3854                            let response = format!(
3855                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
3856                                body.len(),
3857                                traceparent,
3858                                body
3859                            );
3860                            let _ = stream.write_all(response.as_bytes()).await;
3861                        });
3862                    }
3863                }
3864            });
3865
3866            (url, handle)
3867        }
3868    }
3869
3870    // -----------------------------------------------------------------------
3871    // Response streaming tests (Eje A - Task 2)
3872    // -----------------------------------------------------------------------
3873
3874    // -----------------------------------------------------------------------
3875    // Request streaming tests (Eje B - Task 3)
3876    // -----------------------------------------------------------------------
3877
3878    #[tokio::test]
3879    async fn test_request_body_arrives_as_stream() {
3880        use camel_component_api::Body;
3881        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3882
3883        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3884        let port = listener.local_addr().unwrap().port();
3885        drop(listener);
3886
3887        let component = HttpComponent::new();
3888        let endpoint_ctx = NoOpComponentContext;
3889        let endpoint = component
3890            .create_endpoint(&format!("http://127.0.0.1:{port}/upload"), &endpoint_ctx)
3891            .unwrap();
3892        let mut consumer = endpoint.create_consumer(rt()).unwrap();
3893
3894        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3895        let token = tokio_util::sync::CancellationToken::new();
3896        let ctx = ConsumerContext::new(tx, token.clone());
3897
3898        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3899        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3900
3901        let client = reqwest::Client::new();
3902        let send_fut = client
3903            .post(format!("http://127.0.0.1:{port}/upload"))
3904            .body("hello streaming world")
3905            .send();
3906
3907        let (http_result, _) = tokio::join!(send_fut, async {
3908            if let Some(mut envelope) = rx.recv().await {
3909                // Body must be Body::Stream, not Body::Text or Body::Bytes
3910                assert!(
3911                    matches!(envelope.exchange.input.body, Body::Stream(_)),
3912                    "expected Body::Stream, got discriminant {:?}",
3913                    std::mem::discriminant(&envelope.exchange.input.body)
3914                );
3915                // Materialize to verify content
3916                let bytes = envelope
3917                    .exchange
3918                    .input
3919                    .body
3920                    .into_bytes(1024 * 1024)
3921                    .await
3922                    .unwrap();
3923                assert_eq!(&bytes[..], b"hello streaming world");
3924
3925                envelope.exchange.input.body = camel_component_api::Body::Empty;
3926                if let Some(reply_tx) = envelope.reply_tx {
3927                    let _ = reply_tx.send(Ok(envelope.exchange));
3928                }
3929            }
3930        });
3931
3932        let resp = http_result.unwrap();
3933        assert_eq!(resp.status().as_u16(), 200);
3934
3935        token.cancel();
3936    }
3937
3938    // -----------------------------------------------------------------------
3939    // Response streaming tests (Eje A - Task 2)
3940    // -----------------------------------------------------------------------
3941
3942    #[tokio::test]
3943    async fn test_streaming_response_chunked() {
3944        use bytes::Bytes;
3945        use camel_component_api::Body;
3946        use camel_component_api::CamelError;
3947        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3948        use camel_component_api::{StreamBody, StreamMetadata};
3949        use futures::stream;
3950        use std::sync::Arc;
3951        use tokio::sync::Mutex;
3952
3953        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3954        let port = listener.local_addr().unwrap().port();
3955        drop(listener);
3956
3957        let component = HttpComponent::new();
3958        let endpoint_ctx = NoOpComponentContext;
3959        let endpoint = component
3960            .create_endpoint(&format!("http://127.0.0.1:{port}/stream"), &endpoint_ctx)
3961            .unwrap();
3962        let mut consumer = endpoint.create_consumer(rt()).unwrap();
3963
3964        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3965        let token = tokio_util::sync::CancellationToken::new();
3966        let ctx = ConsumerContext::new(tx, token.clone());
3967
3968        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3969        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3970
3971        let client = reqwest::Client::new();
3972        let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
3973
3974        let (http_result, _) = tokio::join!(send_fut, async {
3975            if let Some(mut envelope) = rx.recv().await {
3976                // Respond with Body::Stream
3977                let chunks: Vec<Result<Bytes, CamelError>> =
3978                    vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
3979                let stream = Box::pin(stream::iter(chunks));
3980                envelope.exchange.input.body = Body::Stream(StreamBody {
3981                    stream: Arc::new(Mutex::new(Some(stream))),
3982                    metadata: StreamMetadata::default(),
3983                });
3984                if let Some(reply_tx) = envelope.reply_tx {
3985                    let _ = reply_tx.send(Ok(envelope.exchange));
3986                }
3987            }
3988        });
3989
3990        let resp = http_result.unwrap();
3991        assert_eq!(resp.status().as_u16(), 200);
3992        let body = resp.text().await.unwrap();
3993        assert_eq!(body, "chunk1chunk2");
3994
3995        token.cancel();
3996    }
3997
3998    // -----------------------------------------------------------------------
3999    // 413 Content-Length limit test (Task 4)
4000    // -----------------------------------------------------------------------
4001
4002    #[tokio::test]
4003    async fn test_413_when_content_length_exceeds_limit() {
4004        use camel_component_api::ConsumerContext;
4005
4006        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4007        let port = listener.local_addr().unwrap().port();
4008        drop(listener);
4009
4010        // maxRequestBody=100 — any request declaring more than 100 bytes must get 413
4011        let component = HttpComponent::new();
4012        let endpoint_ctx = NoOpComponentContext;
4013        let endpoint = component
4014            .create_endpoint(
4015                &format!("http://127.0.0.1:{port}/upload?maxRequestBody=100"),
4016                &endpoint_ctx,
4017            )
4018            .unwrap();
4019        let mut consumer = endpoint.create_consumer(rt()).unwrap();
4020
4021        let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4022        let token = tokio_util::sync::CancellationToken::new();
4023        let ctx = ConsumerContext::new(tx, token.clone());
4024
4025        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4026        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4027
4028        let client = reqwest::Client::new();
4029        let resp = client
4030            .post(format!("http://127.0.0.1:{port}/upload"))
4031            .header("Content-Length", "1000") // declares 1000 bytes, limit is 100
4032            .body("x".repeat(1000))
4033            .send()
4034            .await
4035            .unwrap();
4036
4037        assert_eq!(resp.status().as_u16(), 413);
4038
4039        token.cancel();
4040    }
4041
4042    /// Chunked upload without Content-Length header must NOT be rejected by maxRequestBody.
4043    /// The spec says: "If there is no Content-Length, the limit does not apply at the
4044    /// consumer level — the route is responsible."
4045    #[tokio::test]
4046    async fn test_chunked_upload_without_content_length_bypasses_limit() {
4047        use bytes::Bytes;
4048        use camel_component_api::Body;
4049        use camel_component_api::ConsumerContext;
4050        use futures::stream;
4051
4052        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4053        let port = listener.local_addr().unwrap().port();
4054        drop(listener);
4055
4056        // maxRequestBody=10 — very small limit; chunked uploads have no Content-Length
4057        let component = HttpComponent::new();
4058        let endpoint_ctx = NoOpComponentContext;
4059        let endpoint = component
4060            .create_endpoint(
4061                &format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"),
4062                &endpoint_ctx,
4063            )
4064            .unwrap();
4065        let mut consumer = endpoint.create_consumer(rt()).unwrap();
4066
4067        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4068        let token = tokio_util::sync::CancellationToken::new();
4069        let ctx = ConsumerContext::new(tx, token.clone());
4070
4071        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4072        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4073
4074        let client = reqwest::Client::new();
4075
4076        // Use wrap_stream so reqwest sends chunked transfer encoding WITHOUT a
4077        // Content-Length header. 100 bytes exceeds the 10-byte maxRequestBody limit,
4078        // but since there's no Content-Length the 413 check must NOT fire.
4079        let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
4080            Ok(Bytes::from("y".repeat(50))),
4081            Ok(Bytes::from("y".repeat(50))),
4082        ];
4083        let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
4084        let send_fut = client
4085            .post(format!("http://127.0.0.1:{port}/upload"))
4086            .body(stream_body)
4087            .send();
4088
4089        let consumer_fut = async {
4090            // Use timeout to avoid deadlock if the handler rejects before enqueueing
4091            match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
4092                Ok(Some(mut envelope)) => {
4093                    assert!(
4094                        matches!(envelope.exchange.input.body, Body::Stream(_)),
4095                        "expected Body::Stream"
4096                    );
4097                    envelope.exchange.input.body = camel_component_api::Body::Empty;
4098                    if let Some(reply_tx) = envelope.reply_tx {
4099                        let _ = reply_tx.send(Ok(envelope.exchange));
4100                    }
4101                }
4102                Ok(None) => panic!("consumer channel closed unexpectedly"),
4103                Err(_) => {
4104                    // Timeout: the request was rejected before reaching the consumer.
4105                    // The HTTP response will carry the real status code (we check below).
4106                }
4107            }
4108        };
4109
4110        let (http_result, _) = tokio::join!(send_fut, consumer_fut);
4111
4112        let resp = http_result.unwrap();
4113        // Must NOT be 413; chunked uploads without Content-Length bypass the limit.
4114        assert_ne!(
4115            resp.status().as_u16(),
4116            413,
4117            "chunked upload must not be rejected by maxRequestBody"
4118        );
4119        assert_eq!(resp.status().as_u16(), 200);
4120
4121        token.cancel();
4122    }
4123
4124    #[test]
4125    fn test_validate_url_for_ssrf_blocks_and_allows_hosts() {
4126        let mut cfg = HttpEndpointConfig::from_uri("http://example.com").unwrap();
4127        cfg.blocked_hosts = vec!["blocked.local".to_string()];
4128        cfg.allow_private_ips = false;
4129
4130        let blocked = validate_url_for_ssrf("http://blocked.local/api", &cfg);
4131        assert!(blocked.is_err());
4132
4133        let private_ip = validate_url_for_ssrf("http://127.0.0.1/api", &cfg);
4134        assert!(private_ip.is_err());
4135
4136        cfg.allow_private_ips = true;
4137        let allowed = validate_url_for_ssrf("http://127.0.0.1/api", &cfg);
4138        assert!(allowed.is_ok());
4139    }
4140
4141    #[test]
4142    fn test_is_private_ip_ranges() {
4143        assert!(is_private_ip(&"10.0.0.1".parse().unwrap())); // allow-unwrap
4144        assert!(is_private_ip(&"172.16.1.10".parse().unwrap())); // allow-unwrap
4145        assert!(is_private_ip(&"192.168.1.1".parse().unwrap())); // allow-unwrap
4146        assert!(is_private_ip(&"127.0.0.1".parse().unwrap())); // allow-unwrap
4147        assert!(is_private_ip(&"169.254.1.1".parse().unwrap())); // allow-unwrap
4148        assert!(is_private_ip(&"0.1.2.3".parse().unwrap())); // allow-unwrap
4149
4150        assert!(is_private_ip(&"::1".parse().unwrap())); // allow-unwrap
4151        assert!(is_private_ip(&"fc00::1".parse().unwrap())); // allow-unwrap
4152        assert!(is_private_ip(&"fd12::1".parse().unwrap())); // allow-unwrap
4153        assert!(is_private_ip(&"fe80::1".parse().unwrap())); // allow-unwrap
4154        // ::ffff:0:0/96 (IPv4-mapped): only private if the mapped IPv4 is private
4155        assert!(is_private_ip(&"::ffff:10.0.0.1".parse().unwrap())); // allow-unwrap
4156        assert!(is_private_ip(&"::ffff:192.168.1.1".parse().unwrap())); // allow-unwrap
4157        assert!(is_private_ip(&"::ffff:127.0.0.1".parse().unwrap())); // allow-unwrap
4158
4159        assert!(!is_private_ip(&"8.8.8.8".parse().unwrap())); // allow-unwrap
4160        assert!(!is_private_ip(&"::ffff:8.8.8.8".parse().unwrap())); // allow-unwrap — public IPv4-mapped
4161        assert!(!is_private_ip(&"2001:4860:4860::8888".parse().unwrap())); // allow-unwrap
4162    }
4163
4164    #[tokio::test]
4165    async fn test_validate_resolved_host_for_ssrf_blocks_resolved_private_ip() {
4166        let mut cfg = HttpEndpointConfig::from_uri("http://example.com").unwrap(); // allow-unwrap
4167        cfg.allow_private_ips = false;
4168
4169        let err = validate_resolved_host_for_ssrf("http://localhost", &cfg)
4170            .await
4171            .expect_err("localhost must resolve to loopback and be blocked");
4172
4173        let msg = err.to_string();
4174        assert!(msg.contains("Target resolved to private IP"));
4175    }
4176
4177    #[test]
4178    fn test_title_case_header() {
4179        assert_eq!(title_case_header("content-type"), "Content-Type");
4180        assert_eq!(title_case_header("authorization"), "Authorization");
4181        assert_eq!(title_case_header("x-custom-header"), "X-Custom-Header");
4182        assert_eq!(title_case_header("host"), "Host");
4183        assert_eq!(title_case_header("x-b3-traceid"), "X-B3-Traceid");
4184        assert_eq!(title_case_header("single"), "Single");
4185        assert_eq!(title_case_header(""), "");
4186    }
4187
4188    #[test]
4189    fn test_resolve_url_combines_path_and_query_sources() {
4190        let cfg = HttpEndpointConfig::from_uri("http://example.com/base?foo=bar").unwrap();
4191        let mut exchange = Exchange::new(Message::default());
4192        exchange.input.set_header(
4193            "CamelHttpPath",
4194            serde_json::Value::String("next".to_string()),
4195        );
4196        let url = HttpProducer::resolve_url(&exchange, &cfg);
4197        assert!(url.starts_with("http://example.com/base/next?"));
4198        assert!(url.contains("foo=bar"));
4199
4200        exchange.input.set_header(
4201            "CamelHttpUri",
4202            serde_json::Value::String("http://other.test/root".to_string()),
4203        );
4204        exchange.input.set_header(
4205            "CamelHttpQuery",
4206            serde_json::Value::String("a=1&b=2".to_string()),
4207        );
4208
4209        let override_url = HttpProducer::resolve_url(&exchange, &cfg);
4210        assert_eq!(override_url, "http://other.test/root/next?a=1&b=2");
4211    }
4212
4213    #[test]
4214    fn test_http_producer_helpers_status_and_size_boundaries() {
4215        assert!(HttpProducer::is_ok_status(200, (200, 299)));
4216        assert!(HttpProducer::is_ok_status(299, (200, 299)));
4217        assert!(!HttpProducer::is_ok_status(199, (200, 299)));
4218        assert!(!HttpProducer::is_ok_status(300, (200, 299)));
4219
4220        assert!(!exceeds_max_response_body(10, 10));
4221        assert!(exceeds_max_response_body(11, 10));
4222    }
4223
4224    // -----------------------------------------------------------------------
4225    // Content-Type inference tests
4226    // -----------------------------------------------------------------------
4227
4228    async fn setup_consumer_on_free_port(
4229        path: &str,
4230    ) -> (
4231        u16,
4232        tokio::sync::mpsc::Receiver<camel_component_api::ExchangeEnvelope>,
4233        tokio_util::sync::CancellationToken,
4234    ) {
4235        use camel_component_api::ConsumerContext;
4236
4237        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4238        let port = listener.local_addr().unwrap().port();
4239        drop(listener);
4240
4241        let consumer_cfg = HttpServerConfig {
4242            host: "127.0.0.1".to_string(),
4243            port,
4244            path: path.to_string(),
4245            max_request_body: 2 * 1024 * 1024,
4246            max_response_body: 10 * 1024 * 1024,
4247            max_inflight_requests: 1024,
4248        };
4249        let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
4250
4251        let (tx, rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4252        let token = tokio_util::sync::CancellationToken::new();
4253        let ctx = ConsumerContext::new(tx, token.clone());
4254
4255        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4256        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4257
4258        (port, rx, token)
4259    }
4260
4261    #[tokio::test]
4262    async fn test_content_type_inferred_for_json_body() {
4263        let (port, mut rx, token) = setup_consumer_on_free_port("/json").await;
4264
4265        let client = reqwest::Client::new();
4266        let send_fut = client.get(format!("http://127.0.0.1:{port}/json")).send();
4267
4268        let (http_result, _) = tokio::join!(send_fut, async {
4269            if let Some(mut envelope) = rx.recv().await {
4270                envelope.exchange.input.body =
4271                    camel_component_api::Body::Json(serde_json::json!({"message": "hello"}));
4272                if let Some(reply_tx) = envelope.reply_tx {
4273                    let _ = reply_tx.send(Ok(envelope.exchange));
4274                }
4275            }
4276        });
4277
4278        let resp = http_result.unwrap();
4279        assert_eq!(resp.status().as_u16(), 200);
4280        let ct = resp
4281            .headers()
4282            .get("content-type")
4283            .expect("Content-Type header should be present");
4284        assert_eq!(ct, "application/json");
4285        let body = resp.text().await.unwrap();
4286        assert_eq!(body, r#"{"message":"hello"}"#);
4287
4288        token.cancel();
4289    }
4290
4291    #[tokio::test]
4292    async fn test_content_type_inferred_for_text_body() {
4293        let (port, mut rx, token) = setup_consumer_on_free_port("/text").await;
4294
4295        let client = reqwest::Client::new();
4296        let send_fut = client.get(format!("http://127.0.0.1:{port}/text")).send();
4297
4298        let (http_result, _) = tokio::join!(send_fut, async {
4299            if let Some(mut envelope) = rx.recv().await {
4300                envelope.exchange.input.body =
4301                    camel_component_api::Body::Text("plain text response".to_string());
4302                if let Some(reply_tx) = envelope.reply_tx {
4303                    let _ = reply_tx.send(Ok(envelope.exchange));
4304                }
4305            }
4306        });
4307
4308        let resp = http_result.unwrap();
4309        assert_eq!(resp.status().as_u16(), 200);
4310        let ct = resp
4311            .headers()
4312            .get("content-type")
4313            .expect("Content-Type header should be present");
4314        assert_eq!(ct, "text/plain; charset=utf-8");
4315        let body = resp.text().await.unwrap();
4316        assert_eq!(body, "plain text response");
4317
4318        token.cancel();
4319    }
4320
4321    #[tokio::test]
4322    async fn test_content_type_inferred_for_xml_body() {
4323        let (port, mut rx, token) = setup_consumer_on_free_port("/xml").await;
4324
4325        let client = reqwest::Client::new();
4326        let send_fut = client.get(format!("http://127.0.0.1:{port}/xml")).send();
4327
4328        let (http_result, _) = tokio::join!(send_fut, async {
4329            if let Some(mut envelope) = rx.recv().await {
4330                envelope.exchange.input.body =
4331                    camel_component_api::Body::Xml("<root><item>value</item></root>".to_string());
4332                if let Some(reply_tx) = envelope.reply_tx {
4333                    let _ = reply_tx.send(Ok(envelope.exchange));
4334                }
4335            }
4336        });
4337
4338        let resp = http_result.unwrap();
4339        assert_eq!(resp.status().as_u16(), 200);
4340        let ct = resp
4341            .headers()
4342            .get("content-type")
4343            .expect("Content-Type header should be present");
4344        assert_eq!(ct, "application/xml");
4345        let body = resp.text().await.unwrap();
4346        assert_eq!(body, "<root><item>value</item></root>");
4347
4348        token.cancel();
4349    }
4350
4351    #[tokio::test]
4352    async fn test_no_content_type_for_empty_body() {
4353        let (port, mut rx, token) = setup_consumer_on_free_port("/empty").await;
4354
4355        let client = reqwest::Client::new();
4356        let send_fut = client.get(format!("http://127.0.0.1:{port}/empty")).send();
4357
4358        let (http_result, _) = tokio::join!(send_fut, async {
4359            if let Some(mut envelope) = rx.recv().await {
4360                envelope.exchange.input.body = camel_component_api::Body::Empty;
4361                if let Some(reply_tx) = envelope.reply_tx {
4362                    let _ = reply_tx.send(Ok(envelope.exchange));
4363                }
4364            }
4365        });
4366
4367        let resp = http_result.unwrap();
4368        assert_eq!(resp.status().as_u16(), 200);
4369        assert!(
4370            resp.headers().get("content-type").is_none(),
4371            "Empty body should not set Content-Type"
4372        );
4373
4374        token.cancel();
4375    }
4376
4377    #[tokio::test]
4378    async fn test_no_content_type_for_raw_bytes_body() {
4379        let (port, mut rx, token) = setup_consumer_on_free_port("/bytes").await;
4380
4381        let client = reqwest::Client::new();
4382        let send_fut = client.get(format!("http://127.0.0.1:{port}/bytes")).send();
4383
4384        let (http_result, _) = tokio::join!(send_fut, async {
4385            if let Some(mut envelope) = rx.recv().await {
4386                envelope.exchange.input.body =
4387                    camel_component_api::Body::Bytes(bytes::Bytes::from_static(b"\x00\x01\x02"));
4388                if let Some(reply_tx) = envelope.reply_tx {
4389                    let _ = reply_tx.send(Ok(envelope.exchange));
4390                }
4391            }
4392        });
4393
4394        let resp = http_result.unwrap();
4395        assert_eq!(resp.status().as_u16(), 200);
4396        assert!(
4397            resp.headers().get("content-type").is_none(),
4398            "Raw Bytes body should not set Content-Type"
4399        );
4400
4401        token.cancel();
4402    }
4403
4404    #[tokio::test]
4405    async fn test_content_type_from_stream_metadata() {
4406        use camel_component_api::{StreamBody, StreamMetadata};
4407        use futures::stream;
4408
4409        let (port, mut rx, token) = setup_consumer_on_free_port("/stream-ct").await;
4410
4411        let client = reqwest::Client::new();
4412        let send_fut = client
4413            .get(format!("http://127.0.0.1:{port}/stream-ct"))
4414            .send();
4415
4416        let (http_result, _) = tokio::join!(send_fut, async {
4417            if let Some(mut envelope) = rx.recv().await {
4418                let chunks: Vec<Result<bytes::Bytes, CamelError>> =
4419                    vec![Ok(bytes::Bytes::from("audio data"))];
4420                let stream = Box::pin(stream::iter(chunks));
4421                envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
4422                    stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
4423                    metadata: StreamMetadata {
4424                        size_hint: None,
4425                        content_type: Some("audio/mpeg".to_string()),
4426                        origin: None,
4427                    },
4428                });
4429                if let Some(reply_tx) = envelope.reply_tx {
4430                    let _ = reply_tx.send(Ok(envelope.exchange));
4431                }
4432            }
4433        });
4434
4435        let resp = http_result.unwrap();
4436        assert_eq!(resp.status().as_u16(), 200);
4437        let ct = resp
4438            .headers()
4439            .get("content-type")
4440            .expect("Content-Type header should be present");
4441        assert_eq!(ct, "audio/mpeg");
4442        let body = resp.text().await.unwrap();
4443        assert_eq!(body, "audio data");
4444
4445        token.cancel();
4446    }
4447
4448    #[tokio::test]
4449    async fn test_user_content_type_overrides_inferred() {
4450        let (port, mut rx, token) = setup_consumer_on_free_port("/override-ct").await;
4451
4452        let client = reqwest::Client::new();
4453        let send_fut = client
4454            .get(format!("http://127.0.0.1:{port}/override-ct"))
4455            .send();
4456
4457        let (http_result, _) = tokio::join!(send_fut, async {
4458            if let Some(mut envelope) = rx.recv().await {
4459                envelope.exchange.input.body =
4460                    camel_component_api::Body::Json(serde_json::json!({"ok": true}));
4461                envelope.exchange.input.set_header(
4462                    "Content-Type",
4463                    serde_json::Value::String("text/html".to_string()),
4464                );
4465                if let Some(reply_tx) = envelope.reply_tx {
4466                    let _ = reply_tx.send(Ok(envelope.exchange));
4467                }
4468            }
4469        });
4470
4471        let resp = http_result.unwrap();
4472        assert_eq!(resp.status().as_u16(), 200);
4473        let ct = resp
4474            .headers()
4475            .get("content-type")
4476            .expect("Content-Type header should be present");
4477        assert_eq!(
4478            ct, "text/html",
4479            "User-set Content-Type should take precedence over inferred type"
4480        );
4481
4482        token.cancel();
4483    }
4484
4485    #[tokio::test]
4486    async fn test_user_content_type_with_bytes_body() {
4487        let (port, mut rx, token) = setup_consumer_on_free_port("/bytes-ct").await;
4488
4489        let client = reqwest::Client::new();
4490        let send_fut = client
4491            .get(format!("http://127.0.0.1:{port}/bytes-ct"))
4492            .send();
4493
4494        let (http_result, _) = tokio::join!(send_fut, async {
4495            if let Some(mut envelope) = rx.recv().await {
4496                envelope.exchange.input.body =
4497                    camel_component_api::Body::Bytes(bytes::Bytes::from_static(b"{\"ok\":true}"));
4498                envelope.exchange.input.set_header(
4499                    "Content-Type",
4500                    serde_json::Value::String("application/json".to_string()),
4501                );
4502                if let Some(reply_tx) = envelope.reply_tx {
4503                    let _ = reply_tx.send(Ok(envelope.exchange));
4504                }
4505            }
4506        });
4507
4508        let resp = http_result.unwrap();
4509        assert_eq!(resp.status().as_u16(), 200);
4510        let ct = resp
4511            .headers()
4512            .get("content-type")
4513            .expect("Content-Type header should be present for Bytes body with user header");
4514        assert_eq!(
4515            ct, "application/json",
4516            "User Content-Type should be sent for Bytes body"
4517        );
4518
4519        token.cancel();
4520    }
4521
4522    // -----------------------------------------------------------------------
4523    // Server monitor tests (GRL-005)
4524    // -----------------------------------------------------------------------
4525
4526    #[tokio::test]
4527    async fn monitor_task_silent_on_clean_exit() {
4528        let handle: tokio::task::JoinHandle<()> = tokio::spawn(async {});
4529        // Clean exit should complete without panicking or logging errors
4530        monitor_axum_task(handle, "127.0.0.1:0".to_string()).await;
4531    }
4532
4533    #[tokio::test]
4534    async fn monitor_task_handles_panicked_task() {
4535        let handle: tokio::task::JoinHandle<()> = tokio::spawn(async {
4536            panic!("simulated server crash");
4537        });
4538        // Should complete without panicking even though the inner task panicked
4539        monitor_axum_task(handle, "127.0.0.1:9999".to_string()).await;
4540    }
4541
4542    // -----------------------------------------------------------------------
4543    // Credential redaction tests
4544    // -----------------------------------------------------------------------
4545
4546    #[test]
4547    fn http_auth_basic_debug_redacts_password() {
4548        let auth = HttpAuth::Basic {
4549            username: "admin".to_string(),
4550            password: "hunter2".to_string(),
4551        };
4552        let debug = format!("{:?}", auth);
4553        assert!(
4554            !debug.contains("hunter2"),
4555            "password must be redacted: {debug}"
4556        );
4557        assert!(debug.contains("admin"), "username should appear: {debug}");
4558    }
4559
4560    #[test]
4561    fn http_auth_bearer_debug_redacts_token() {
4562        let auth = HttpAuth::Bearer {
4563            token: "eyJhbGciOiJIUzI1NiJ9.secret".to_string(),
4564        };
4565        let debug = format!("{:?}", auth);
4566        assert!(
4567            !debug.contains("eyJhbGci"),
4568            "token must be redacted: {debug}"
4569        );
4570    }
4571
4572    #[test]
4573    fn http_auth_none_debug_shows_variant() {
4574        let debug = format!("{:?}", HttpAuth::None);
4575        assert!(
4576            debug.contains("None"),
4577            "None variant should appear: {debug}"
4578        );
4579    }
4580
4581    #[test]
4582    fn http_endpoint_config_debug_redacts_auth_credentials() {
4583        let config = HttpEndpointConfig::from_uri(
4584            "http://localhost/api?authMethod=Basic&authUsername=admin&authPassword=secret123",
4585        )
4586        .unwrap();
4587        let debug = format!("{:?}", config);
4588        assert!(
4589            !debug.contains("secret123"),
4590            "password must be redacted in HttpEndpointConfig debug: {debug}"
4591        );
4592    }
4593
4594    // -----------------------------------------------------------------------
4595    // Static file serving tests (Task 5)
4596    // -----------------------------------------------------------------------
4597
4598    use crate::registry::{HttpRouteRegistry, MountMode, StaticMount};
4599    use tower_http::services::ServeDir;
4600
4601    fn make_test_registry() -> HttpRouteRegistry {
4602        HttpRouteRegistry::new()
4603    }
4604
4605    fn make_test_state(registry: HttpRouteRegistry) -> AppState {
4606        AppState {
4607            registry,
4608            max_request_body: 2 * 1024 * 1024,
4609            max_response_body: 10 * 1024 * 1024,
4610            inflight: Arc::new(tokio::sync::Semaphore::new(1024)),
4611        }
4612    }
4613
4614    #[allow(clippy::await_holding_lock)]
4615    #[tokio::test]
4616    async fn test_static_file_serving_serves_file_contents() {
4617        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
4618        ServerRegistry::reset();
4619
4620        // Create temp dir with test files
4621        let temp_dir =
4622            std::env::temp_dir().join(format!("http_static_test_{}", std::process::id()));
4623        std::fs::create_dir_all(&temp_dir).unwrap();
4624        std::fs::write(temp_dir.join("hello.txt"), "Hello, static world!").unwrap();
4625        std::fs::write(temp_dir.join("style.css"), "body { color: red; }").unwrap();
4626
4627        let canonical_dir = std::fs::canonicalize(&temp_dir).unwrap();
4628
4629        let registry = make_test_registry();
4630        let serve_dir = ServeDir::new(&canonical_dir)
4631            .precompressed_gzip()
4632            .precompressed_br()
4633            .append_index_html_on_directories(true);
4634
4635        let mount = StaticMount {
4636            mount_path: "/".to_string(),
4637            mode: MountMode::Static,
4638            dir: canonical_dir.clone(),
4639            cache_control: "public, max-age=3600".to_string(),
4640            error_pages: std::collections::HashMap::new(),
4641            serve_dir,
4642        };
4643        registry.register_static_mount(mount).await.unwrap();
4644
4645        let state = make_test_state(registry);
4646
4647        // Test serving hello.txt
4648        let req = Request::builder()
4649            .uri("/hello.txt")
4650            .body(AxumBody::empty())
4651            .unwrap();
4652        let resp = static_dispatch::dispatch_static(&state, req, "/hello.txt").await;
4653        assert_eq!(resp.status(), StatusCode::OK);
4654        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4655            .await
4656            .unwrap();
4657        assert_eq!(&body[..], b"Hello, static world!");
4658
4659        // Test serving style.css
4660        let req = Request::builder()
4661            .uri("/style.css")
4662            .body(AxumBody::empty())
4663            .unwrap();
4664        let resp = static_dispatch::dispatch_static(&state, req, "/style.css").await;
4665        assert_eq!(resp.status(), StatusCode::OK);
4666        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4667            .await
4668            .unwrap();
4669        assert_eq!(&body[..], b"body { color: red; }");
4670
4671        // Test 404 for non-existent file
4672        let req = Request::builder()
4673            .uri("/missing.txt")
4674            .body(AxumBody::empty())
4675            .unwrap();
4676        let resp = static_dispatch::dispatch_static(&state, req, "/missing.txt").await;
4677        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4678
4679        // Cleanup
4680        std::fs::remove_dir_all(&temp_dir).ok();
4681    }
4682
4683    #[allow(clippy::await_holding_lock)]
4684    #[tokio::test]
4685    async fn test_spa_fallback_serves_index_for_unknown_paths() {
4686        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
4687        ServerRegistry::reset();
4688
4689        let temp_dir = std::env::temp_dir().join(format!("http_spa_test_{}", std::process::id()));
4690        std::fs::create_dir_all(&temp_dir).unwrap();
4691        std::fs::write(temp_dir.join("index.html"), "<h1>SPA App</h1>").unwrap();
4692        std::fs::write(temp_dir.join("app.js"), "console.log('app')").unwrap();
4693
4694        let canonical_dir = std::fs::canonicalize(&temp_dir).unwrap();
4695
4696        let registry = make_test_registry();
4697        let serve_dir = ServeDir::new(&canonical_dir)
4698            .precompressed_gzip()
4699            .precompressed_br()
4700            .append_index_html_on_directories(true);
4701
4702        let mount = StaticMount {
4703            mount_path: "/".to_string(),
4704            mode: MountMode::Spa,
4705            dir: canonical_dir.clone(),
4706            cache_control: "public, max-age=0".to_string(),
4707            error_pages: std::collections::HashMap::new(),
4708            serve_dir,
4709        };
4710        // Register as SPA mount
4711        registry.register_static_mount(mount).await.unwrap();
4712
4713        let state = make_test_state(registry);
4714
4715        // SPA fallback: GET /dashboard with Accept: text/html → index.html
4716        let req = Request::builder()
4717            .method("GET")
4718            .uri("/dashboard")
4719            .header("Accept", "text/html")
4720            .body(AxumBody::empty())
4721            .unwrap();
4722        let resp = static_dispatch::dispatch_static(&state, req, "/dashboard").await;
4723        assert_eq!(resp.status(), StatusCode::OK);
4724        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4725            .await
4726            .unwrap();
4727        assert_eq!(&body[..], b"<h1>SPA App</h1>");
4728
4729        // Static file still works: GET /app.js
4730        let req = Request::builder()
4731            .method("GET")
4732            .uri("/app.js")
4733            .body(AxumBody::empty())
4734            .unwrap();
4735        let resp = static_dispatch::dispatch_static(&state, req, "/app.js").await;
4736        assert_eq!(resp.status(), StatusCode::OK);
4737        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4738            .await
4739            .unwrap();
4740        assert_eq!(&body[..], b"console.log('app')");
4741
4742        // No SPA fallback for JSON accept → 404
4743        let req = Request::builder()
4744            .method("GET")
4745            .uri("/api/data")
4746            .header("Accept", "application/json")
4747            .body(AxumBody::empty())
4748            .unwrap();
4749        let resp = static_dispatch::dispatch_static(&state, req, "/api/data").await;
4750        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4751
4752        // No SPA fallback for file extensions → 404
4753        let req = Request::builder()
4754            .method("GET")
4755            .uri("/style.css")
4756            .header("Accept", "text/html")
4757            .body(AxumBody::empty())
4758            .unwrap();
4759        let resp = static_dispatch::dispatch_static(&state, req, "/style.css").await;
4760        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4761
4762        // Cleanup
4763        std::fs::remove_dir_all(&temp_dir).ok();
4764    }
4765
4766    #[allow(clippy::await_holding_lock)]
4767    #[tokio::test]
4768    async fn test_error_page_mapping_serves_custom_404() {
4769        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
4770        ServerRegistry::reset();
4771
4772        let temp_dir = std::env::temp_dir().join(format!("http_error_test_{}", std::process::id()));
4773        let errors_dir = temp_dir.join("errors");
4774        std::fs::create_dir_all(&errors_dir).unwrap();
4775        std::fs::write(temp_dir.join("index.html"), "<h1>Home</h1>").unwrap();
4776        std::fs::write(errors_dir.join("404.html"), "<h1>Custom 404</h1>").unwrap();
4777
4778        let canonical_dir = std::fs::canonicalize(&temp_dir).unwrap();
4779        let canonical_404 = std::fs::canonicalize(errors_dir.join("404.html")).unwrap();
4780
4781        let registry = make_test_registry();
4782        let serve_dir = ServeDir::new(&canonical_dir)
4783            .precompressed_gzip()
4784            .precompressed_br()
4785            .append_index_html_on_directories(true);
4786
4787        let mut error_pages = std::collections::HashMap::new();
4788        error_pages.insert(404, canonical_404);
4789
4790        let mount = StaticMount {
4791            mount_path: "/".to_string(),
4792            mode: MountMode::Static,
4793            dir: canonical_dir.clone(),
4794            cache_control: "public, max-age=0".to_string(),
4795            error_pages,
4796            serve_dir,
4797        };
4798        registry.register_static_mount(mount).await.unwrap();
4799
4800        let state = make_test_state(registry);
4801
4802        // Request non-existent file → custom 404 page
4803        let req = Request::builder()
4804            .method("GET")
4805            .uri("/missing.html")
4806            .body(AxumBody::empty())
4807            .unwrap();
4808        let resp = static_dispatch::dispatch_static(&state, req, "/missing.html").await;
4809        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4810        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4811            .await
4812            .unwrap();
4813        assert_eq!(&body[..], b"<h1>Custom 404</h1>");
4814
4815        // Existing file still works
4816        let req = Request::builder()
4817            .method("GET")
4818            .uri("/index.html")
4819            .body(AxumBody::empty())
4820            .unwrap();
4821        let resp = static_dispatch::dispatch_static(&state, req, "/index.html").await;
4822        assert_eq!(resp.status(), StatusCode::OK);
4823        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4824            .await
4825            .unwrap();
4826        assert_eq!(&body[..], b"<h1>Home</h1>");
4827
4828        // Cleanup
4829        std::fs::remove_dir_all(&temp_dir).ok();
4830    }
4831}