Skip to main content

camel_component_http/
lib.rs

1pub mod auth;
2pub mod bundle;
3pub mod config;
4pub mod health;
5use crate::config::parse_ok_status_code_range;
6pub use bundle::HttpBundle;
7pub use config::HttpConfig;
8pub use health::HttpHealthCheck;
9
10use std::collections::HashMap;
11use std::future::Future;
12use std::net::IpAddr;
13use std::pin::Pin;
14use std::sync::{Arc, Mutex, OnceLock};
15use std::task::{Context, Poll};
16use std::time::Duration;
17
18use tokio::sync::{OnceCell, RwLock};
19use tower::Layer;
20use tower::Service;
21use tracing::debug;
22
23use axum::body::BodyDataStream;
24use camel_auth::bearer_token_layer::BearerTokenLayer;
25use camel_auth::oauth2::TokenProvider;
26use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, StreamBody, StreamMetadata};
27use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
28use camel_component_api::{UriComponents, UriConfig, parse_uri};
29use futures::TryStreamExt;
30use futures::stream::BoxStream;
31
32// ---------------------------------------------------------------------------
33// HttpEndpointConfig
34// ---------------------------------------------------------------------------
35
36/// Configuration for an HTTP client (producer) endpoint.
37///
38/// # Memory Limits
39///
40/// HTTP operations enforce conservative memory limits to prevent denial-of-service
41/// attacks from untrusted network sources. These limits are significantly lower than
42/// file component limits (100MB) because HTTP typically handles API responses rather
43/// than large file transfers, and clients may be untrusted.
44///
45/// ## Default Limits
46///
47/// - **HTTP client body**: 10MB (typical API responses)
48/// - **HTTP server request**: 2MB (untrusted network input - see `HttpServerConfig`)
49/// - **HTTP server response**: 10MB (same as client - see `HttpServerConfig`)
50///
51/// ## Rationale
52///
53/// The 10MB limit for HTTP client responses is appropriate for most API interactions
54/// while providing protection against:
55/// - Malicious servers sending oversized responses
56/// - Runaway processes generating unexpectedly large payloads
57/// - Memory exhaustion attacks
58///
59/// The 2MB server request limit is even more conservative because it handles input
60/// from potentially untrusted clients on the public internet.
61///
62/// ## Overriding Limits
63///
64/// Override the default client body limit using the `maxBodySize` URI parameter:
65///
66/// ```text
67/// http://api.example.com/large-data?maxBodySize=52428800
68/// ```
69///
70/// For server endpoints, use `maxRequestBody` and `maxResponseBody` parameters:
71///
72/// ```text
73/// http://0.0.0.0:8080/upload?maxRequestBody=52428800
74/// ```
75///
76/// ## Behavior When Exceeded
77///
78/// When a body exceeds the configured limit:
79/// - An error is returned immediately
80/// - No memory is exhausted - the limit is checked before allocation
81/// - The HTTP connection is terminated cleanly
82///
83/// ## Security Considerations
84///
85/// HTTP endpoints should be treated with more caution than file endpoints because:
86/// - Clients may be unknown and untrusted
87/// - Network traffic can be spoofed or malicious
88/// - DoS attacks often exploit unbounded resource consumption
89///
90/// Only increase limits when you control both ends of the connection or when
91/// business requirements demand larger payloads.
92#[derive(Debug, Clone)]
93pub struct HttpEndpointConfig {
94    pub base_url: String,
95    pub http_method: Option<String>,
96    pub throw_exception_on_failure: bool,
97    pub ok_status_code_range: (u16, u16),
98    pub response_timeout: Option<Duration>,
99    pub query_params: HashMap<String, String>,
100    pub allow_private_ips: bool,
101    pub blocked_hosts: Vec<String>,
102    pub max_body_size: usize,
103    pub read_timeout_ms: u64,
104    pub max_response_bytes: usize,
105    pub auth: HttpAuth,
106    pub token_provider: Option<Arc<dyn TokenProvider>>,
107    pub user_agent: Option<String>,
108    pub cookie_handling: CookieHandling,
109    pub bridge_endpoint: bool,
110    pub connection_close: bool,
111    pub skip_request_headers: Vec<String>,
112    pub skip_response_headers: Vec<String>,
113}
114
115#[derive(Clone, PartialEq)]
116pub enum HttpAuth {
117    None,
118    Basic { username: String, password: String },
119    Bearer { token: String },
120}
121
122impl std::fmt::Debug for HttpAuth {
123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124        match self {
125            HttpAuth::None => f.write_str("None"),
126            HttpAuth::Basic { username, .. } => f
127                .debug_struct("Basic")
128                .field("username", username)
129                .field("password", &"***")
130                .finish(),
131            HttpAuth::Bearer { .. } => f.debug_struct("Bearer").field("token", &"***").finish(),
132        }
133    }
134}
135
136#[derive(Debug, Clone, Copy, PartialEq, Eq)]
137pub enum CookieHandling {
138    Disabled,
139    InMemory,
140}
141
142/// Camel options that should NOT be forwarded as HTTP query params
143const HTTP_CAMEL_OPTIONS: &[&str] = &[
144    "httpMethod",
145    "throwExceptionOnFailure",
146    "okStatusCodeRange",
147    "followRedirects",
148    "connectTimeout",
149    "responseTimeout",
150    "allowPrivateIps",
151    "blockedHosts",
152    "maxBodySize",
153    "readTimeout",
154    "maxResponseBytes",
155    "authMethod",
156    "authUsername",
157    "authPassword",
158    "authBearerToken",
159    "userAgent",
160    "cookieHandling",
161    "bridgeEndpoint",
162    "connectionClose",
163    "skipRequestHeaders",
164    "skipResponseHeaders",
165];
166
167impl UriConfig for HttpEndpointConfig {
168    /// Returns "http" as the primary scheme (also accepts "https")
169    fn scheme() -> &'static str {
170        "http"
171    }
172
173    fn from_uri(uri: &str) -> Result<Self, CamelError> {
174        let parts = parse_uri(uri)?;
175        Self::from_components(parts)
176    }
177
178    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
179        // Validate scheme - accept both http and https
180        if parts.scheme != "http" && parts.scheme != "https" {
181            return Err(CamelError::InvalidUri(format!(
182                "expected scheme 'http' or 'https', got '{}'",
183                parts.scheme
184            )));
185        }
186
187        // Construct base_url from scheme + path
188        // e.g., "http://localhost:8080/api" from scheme "http" and path "//localhost:8080/api"
189        let base_url = format!("{}:{}", parts.scheme, parts.path);
190
191        let http_method = parts.params.get("httpMethod").cloned();
192
193        let throw_exception_on_failure = match parts.params.get("throwExceptionOnFailure") {
194            Some(v) => parse_bool_param_http(v).map_err(|e| {
195                CamelError::InvalidUri(format!("invalid value for throwExceptionOnFailure: {e}"))
196            })?,
197            None => true,
198        };
199
200        // Parse status code range from "start-end" format (e.g., "200-299")
201        let ok_status_code_range = match parts.params.get("okStatusCodeRange") {
202            Some(v) => parse_ok_status_code_range(v)?,
203            None => (200, 299),
204        };
205
206        let response_timeout = match parts.params.get("responseTimeout") {
207            Some(v) => Some(v.parse::<u64>().map(Duration::from_millis).map_err(|e| {
208                CamelError::InvalidUri(format!("invalid value for responseTimeout: {e}"))
209            })?),
210            None => None,
211        };
212
213        // SSRF protection settings
214        let allow_private_ips = match parts.params.get("allowPrivateIps") {
215            Some(v) => parse_bool_param_http(v).map_err(|e| {
216                CamelError::InvalidUri(format!("invalid value for allowPrivateIps: {e}"))
217            })?,
218            None => false, // Default: block private IPs
219        };
220
221        // Parse comma-separated blocked hosts
222        let blocked_hosts = parts
223            .params
224            .get("blockedHosts")
225            .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
226            .unwrap_or_default();
227
228        let max_body_size = match parts.params.get("maxBodySize") {
229            Some(v) => v.parse::<usize>().map_err(|e| {
230                CamelError::InvalidUri(format!("invalid value for maxBodySize: {e}"))
231            })?,
232            None => 10 * 1024 * 1024, // Default: 10MB
233        };
234
235        let read_timeout_ms = match parts.params.get("readTimeout") {
236            Some(v) => v.parse::<u64>().map_err(|e| {
237                CamelError::InvalidUri(format!("invalid value for readTimeout: {e}"))
238            })?,
239            None => 30_000, // Default: 30s
240        };
241
242        let max_response_bytes = match parts.params.get("maxResponseBytes") {
243            Some(v) => v.parse::<usize>().map_err(|e| {
244                CamelError::InvalidUri(format!("invalid value for maxResponseBytes: {e}"))
245            })?,
246            None => 10 * 1024 * 1024, // Default: 10MB
247        };
248
249        let auth = parse_auth_from_params(&parts.params)?;
250
251        let user_agent = parts.params.get("userAgent").cloned();
252
253        let cookie_handling = match parts.params.get("cookieHandling") {
254            Some(v) if v.eq_ignore_ascii_case("inmemory") => CookieHandling::InMemory,
255            Some(v) if v.eq_ignore_ascii_case("disabled") => CookieHandling::Disabled,
256            Some(v) => {
257                return Err(CamelError::InvalidUri(format!(
258                    "invalid value for cookieHandling: {v} (expected Disabled or InMemory)"
259                )));
260            }
261            None => CookieHandling::Disabled,
262        };
263
264        let bridge_endpoint = match parts.params.get("bridgeEndpoint") {
265            Some(v) => parse_bool_param_http(v).map_err(|e| {
266                CamelError::InvalidUri(format!("invalid value for bridgeEndpoint: {e}"))
267            })?,
268            None => false,
269        };
270
271        let connection_close = match parts.params.get("connectionClose") {
272            Some(v) => parse_bool_param_http(v).map_err(|e| {
273                CamelError::InvalidUri(format!("invalid value for connectionClose: {e}"))
274            })?,
275            None => false,
276        };
277
278        let skip_request_headers = parts
279            .params
280            .get("skipRequestHeaders")
281            .map(|v| {
282                v.split(',')
283                    .map(str::trim)
284                    .filter(|s| !s.is_empty())
285                    .map(|s| s.to_ascii_lowercase())
286                    .collect::<Vec<_>>()
287            })
288            .unwrap_or_default();
289
290        let skip_response_headers = parts
291            .params
292            .get("skipResponseHeaders")
293            .map(|v| {
294                v.split(',')
295                    .map(str::trim)
296                    .filter(|s| !s.is_empty())
297                    .map(|s| s.to_ascii_lowercase())
298                    .collect::<Vec<_>>()
299            })
300            .unwrap_or_default();
301
302        // Collect remaining params (not Camel options) as query params
303        let query_params: HashMap<String, String> = parts
304            .params
305            .into_iter()
306            .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
307            .collect();
308
309        Ok(Self {
310            base_url,
311            http_method,
312            throw_exception_on_failure,
313            ok_status_code_range,
314            response_timeout,
315            query_params,
316            allow_private_ips,
317            blocked_hosts,
318            max_body_size,
319            read_timeout_ms,
320            max_response_bytes,
321            auth,
322            token_provider: None,
323            user_agent,
324            cookie_handling,
325            bridge_endpoint,
326            connection_close,
327            skip_request_headers,
328            skip_response_headers,
329        })
330    }
331}
332
333fn parse_auth_from_params(params: &HashMap<String, String>) -> Result<HttpAuth, CamelError> {
334    let Some(method) = params.get("authMethod") else {
335        return Ok(HttpAuth::None);
336    };
337
338    if method.eq_ignore_ascii_case("none") {
339        return Ok(HttpAuth::None);
340    }
341
342    if method.eq_ignore_ascii_case("basic") {
343        let username = params.get("authUsername").cloned().ok_or_else(|| {
344            CamelError::InvalidUri("authUsername is required for authMethod=Basic".to_string())
345        })?;
346        let password = params.get("authPassword").cloned().ok_or_else(|| {
347            CamelError::InvalidUri("authPassword is required for authMethod=Basic".to_string())
348        })?;
349        return Ok(HttpAuth::Basic { username, password });
350    }
351
352    if method.eq_ignore_ascii_case("bearer") {
353        let token = params.get("authBearerToken").cloned().ok_or_else(|| {
354            CamelError::InvalidUri("authBearerToken is required for authMethod=Bearer".to_string())
355        })?;
356        return Ok(HttpAuth::Bearer { token });
357    }
358
359    Err(CamelError::InvalidUri(format!(
360        "invalid value for authMethod: {method} (expected None, Basic, or Bearer)"
361    )))
362}
363
364fn parse_bool_param_http(value: &str) -> Result<bool, CamelError> {
365    match value.to_ascii_lowercase().as_str() {
366        "true" | "1" | "yes" => Ok(true),
367        "false" | "0" | "no" => Ok(false),
368        _ => Err(CamelError::InvalidUri(format!(
369            "invalid boolean value: '{value}'"
370        ))),
371    }
372}
373
374impl HttpEndpointConfig {
375    pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
376        let parts = parse_uri(uri)?;
377        let mut endpoint = Self::from_components(parts.clone())?;
378        if endpoint.response_timeout.is_none() {
379            endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
380        }
381        if !parts.params.contains_key("allowPrivateIps") {
382            endpoint.allow_private_ips = config.allow_private_ips;
383        }
384        if !parts.params.contains_key("blockedHosts") {
385            endpoint.blocked_hosts = config.blocked_hosts.clone();
386        }
387        if !parts.params.contains_key("maxBodySize") {
388            endpoint.max_body_size = config.max_body_size;
389        }
390        if !parts.params.contains_key("readTimeout") {
391            endpoint.read_timeout_ms = config.read_timeout_ms;
392        }
393        if !parts.params.contains_key("maxResponseBytes") {
394            endpoint.max_response_bytes = config.max_response_bytes;
395        }
396        if !parts.params.contains_key("okStatusCodeRange")
397            && let Some(range) = &config.ok_status_code_range
398        {
399            endpoint.ok_status_code_range = parse_ok_status_code_range(range)?;
400        }
401
402        Ok(endpoint)
403    }
404}
405
406// ---------------------------------------------------------------------------
407// HttpServerConfig
408// ---------------------------------------------------------------------------
409
410/// Configuration for an HTTP server (consumer) endpoint.
411#[derive(Debug, Clone)]
412pub struct HttpServerConfig {
413    /// Bind address, e.g. "0.0.0.0" or "127.0.0.1".
414    pub host: String,
415    /// TCP port to listen on.
416    pub port: u16,
417    /// URL path this consumer handles, e.g. "/orders".
418    pub path: String,
419    /// Maximum request body size in bytes.
420    pub max_request_body: usize,
421    /// Maximum response body size for materializing streams in bytes.
422    pub max_response_body: usize,
423    /// Maximum number of in-flight requests handled concurrently by this server.
424    pub max_inflight_requests: usize,
425}
426
427impl UriConfig for HttpServerConfig {
428    /// Returns "http" as the primary scheme (also accepts "https")
429    fn scheme() -> &'static str {
430        "http"
431    }
432
433    fn from_uri(uri: &str) -> Result<Self, CamelError> {
434        let parts = parse_uri(uri)?;
435        Self::from_components(parts)
436    }
437
438    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
439        // Validate scheme - accept both http and https
440        if parts.scheme != "http" && parts.scheme != "https" {
441            return Err(CamelError::InvalidUri(format!(
442                "expected scheme 'http' or 'https', got '{}'",
443                parts.scheme
444            )));
445        }
446
447        // parts.path is everything after the scheme colon, e.g. "//0.0.0.0:8080/orders"
448        // Strip leading "//"
449        let authority_and_path = parts.path.trim_start_matches('/');
450
451        // Split on the first "/" to separate "host:port" from "/path"
452        let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
453            (&authority_and_path[..idx], &authority_and_path[idx..])
454        } else {
455            (authority_and_path, "/")
456        };
457
458        let path = if path_suffix.is_empty() {
459            "/"
460        } else {
461            path_suffix
462        }
463        .to_string();
464
465        // Parse host:port from authority
466        let (host, port) = if let Some(colon) = authority.rfind(':') {
467            let port_str = &authority[colon + 1..];
468            match port_str.parse::<u16>() {
469                Ok(p) => (authority[..colon].to_string(), p),
470                Err(_) => {
471                    return Err(CamelError::InvalidUri(format!(
472                        "invalid port '{}' in authority",
473                        port_str
474                    )));
475                }
476            }
477        } else {
478            // Default port based on scheme: 443 for https, 80 for http
479            let default_port = if parts.scheme == "https" { 443 } else { 80 };
480            (authority.to_string(), default_port)
481        };
482
483        let max_request_body = parts
484            .params
485            .get("maxRequestBody")
486            .and_then(|v| v.parse::<usize>().ok())
487            .unwrap_or(2 * 1024 * 1024); // Default: 2MB
488
489        let max_response_body = parts
490            .params
491            .get("maxResponseBody")
492            .and_then(|v| v.parse::<usize>().ok())
493            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
494
495        let max_inflight_requests = parts
496            .params
497            .get("maxInflightRequests")
498            .and_then(|v| v.parse::<usize>().ok())
499            .unwrap_or(1024);
500
501        Ok(Self {
502            host,
503            port,
504            path,
505            max_request_body,
506            max_response_body,
507            max_inflight_requests,
508        })
509    }
510}
511
512impl HttpServerConfig {
513    pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
514        let parts = parse_uri(uri)?;
515        let mut server = Self::from_components(parts.clone())?;
516        if !parts.params.contains_key("maxRequestBody") {
517            server.max_request_body = config.max_request_body;
518        }
519        if !parts.params.contains_key("maxResponseBody") {
520            // Default max_response_body is 10MB via HttpConfig::default().max_body_size.
521            server.max_response_body = config.max_body_size;
522        }
523        Ok(server)
524    }
525}
526
527// ---------------------------------------------------------------------------
528// RequestEnvelope / HttpReply
529// ---------------------------------------------------------------------------
530
531/// Body de la respuesta HTTP: bytes ya materializados o stream lazy.
532pub(crate) enum HttpReplyBody {
533    Bytes(bytes::Bytes),
534    Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
535}
536
537/// An inbound HTTP request sent from the Axum dispatch handler to an
538/// `HttpConsumer` receive loop.
539pub(crate) struct RequestEnvelope {
540    pub(crate) method: String,
541    pub(crate) path: String,
542    pub(crate) query: String,
543    pub(crate) headers: http::HeaderMap,
544    pub(crate) body: StreamBody,
545    pub(crate) reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
546}
547
548/// The HTTP response that `HttpConsumer` sends back to the Axum handler.
549pub(crate) struct HttpReply {
550    pub(crate) status: u16,
551    pub(crate) headers: Vec<(String, String)>,
552    pub(crate) body: HttpReplyBody,
553}
554
555// ---------------------------------------------------------------------------
556// DispatchTable / ServerRegistry
557// ---------------------------------------------------------------------------
558
559/// Maps URL path → channel sender for the consumer that owns that path.
560pub(crate) type DispatchTable =
561    Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
562
563type ServerKey = (String, u16);
564
565/// Handle to a running Axum server on one interface/port.
566struct ServerHandle {
567    dispatch: DispatchTable,
568    max_request_body: usize,
569    max_response_body: usize,
570    max_inflight_requests: usize,
571}
572
573/// Process-global registry mapping (host, port) → running Axum server handle.
574pub struct ServerRegistry {
575    inner: Mutex<HashMap<ServerKey, Arc<OnceCell<ServerHandle>>>>,
576}
577
578impl ServerRegistry {
579    /// Returns the global singleton.
580    pub fn global() -> &'static Self {
581        static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
582        INSTANCE.get_or_init(|| ServerRegistry {
583            inner: Mutex::new(HashMap::new()),
584        })
585    }
586
587    /// Returns the `DispatchTable` for `port`, spawning a new Axum server if
588    /// none is running on that port yet.
589    pub(crate) async fn get_or_spawn(
590        &'static self,
591        host: &str,
592        port: u16,
593        max_request_body: usize,
594        max_response_body: usize,
595        max_inflight_requests: usize,
596    ) -> Result<DispatchTable, CamelError> {
597        let host_owned = host.to_string();
598
599        let cell = {
600            let mut guard = self.inner.lock().map_err(|_| {
601                CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
602            })?;
603            let key = (host.to_string(), port);
604            guard
605                .entry(key)
606                .or_insert_with(|| Arc::new(OnceCell::new()))
607                .clone()
608        };
609
610        if let Some(existing) = cell.get()
611            && existing.max_request_body != max_request_body
612        {
613            return Err(CamelError::EndpointCreationFailed(format!(
614                "incompatible maxRequestBody for shared server (host={host}, port={port}): {} vs {}",
615                existing.max_request_body, max_request_body
616            )));
617        }
618
619        if let Some(existing) = cell.get()
620            && existing.max_response_body != max_response_body
621        {
622            return Err(CamelError::EndpointCreationFailed(format!(
623                "incompatible maxResponseBody for shared server (host={host}, port={port}): {} vs {}",
624                existing.max_response_body, max_response_body
625            )));
626        }
627
628        if let Some(existing) = cell.get()
629            && existing.max_inflight_requests != max_inflight_requests
630        {
631            return Err(CamelError::EndpointCreationFailed(format!(
632                "incompatible maxInflightRequests for shared server (host={host}, port={port}): {} vs {}",
633                existing.max_inflight_requests, max_inflight_requests
634            )));
635        }
636
637        let handle = cell
638            .get_or_try_init(|| async {
639                let addr = format!("{host_owned}:{port}");
640                let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
641                    CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
642                })?;
643                let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
644                let inflight = Arc::new(tokio::sync::Semaphore::new(max_inflight_requests));
645                let task = tokio::spawn(run_axum_server(
646                    listener,
647                    Arc::clone(&dispatch),
648                    max_request_body,
649                    max_response_body,
650                    Arc::clone(&inflight),
651                ));
652                let addr_for_monitor = format!("{host_owned}:{port}");
653                tokio::spawn(monitor_axum_task(task, addr_for_monitor));
654                Ok::<ServerHandle, CamelError>(ServerHandle {
655                    dispatch,
656                    max_request_body,
657                    max_response_body,
658                    max_inflight_requests,
659                })
660            })
661            .await?;
662
663        Ok(Arc::clone(&handle.dispatch))
664    }
665
666    /// Reset the global registry — **test-only**.
667    ///
668    /// Clears all registered server handles so that tests can start from a clean
669    /// state. This is intentionally `#[cfg(test)]` because the registry is a
670    /// process-global singleton in production and resetting it would break
671    /// running servers.
672    #[cfg(test)]
673    pub fn reset() {
674        let instance = Self::global();
675        let mut guard = instance
676            .inner
677            .lock()
678            .expect("ServerRegistry lock poisoned during test reset");
679        guard.clear();
680    }
681}
682
683// ---------------------------------------------------------------------------
684// Axum server
685// ---------------------------------------------------------------------------
686
687use axum::{
688    Router,
689    body::Body as AxumBody,
690    extract::{Request, State},
691    http::{Response, StatusCode},
692    response::IntoResponse,
693};
694
695#[derive(Clone)]
696struct AppState {
697    dispatch: DispatchTable,
698    max_request_body: usize,
699    max_response_body: usize,
700    inflight: Arc<tokio::sync::Semaphore>,
701}
702
703async fn run_axum_server(
704    listener: tokio::net::TcpListener,
705    dispatch: DispatchTable,
706    max_request_body: usize,
707    max_response_body: usize,
708    inflight: Arc<tokio::sync::Semaphore>,
709) {
710    let state = AppState {
711        dispatch,
712        max_request_body,
713        max_response_body,
714        inflight,
715    };
716    let app = Router::new().fallback(dispatch_handler).with_state(state);
717
718    axum::serve(listener, app).await.unwrap_or_else(|e| {
719        tracing::error!(error = %e, "Axum server error");
720    });
721}
722
723/// Monitors an Axum server task and emits a structured error event if it
724/// exits unexpectedly.
725///
726/// # Limitations
727/// The HTTP server is shared across all routes on a port. Full per-route
728/// CrashNotification propagation is deferred — this provides observable
729/// structured logging as a first guard.
730async fn monitor_axum_task(handle: tokio::task::JoinHandle<()>, addr: String) {
731    match handle.await {
732        Ok(()) => {
733            // Clean exit (process shutdown or normal stop)
734        }
735        Err(join_err) => {
736            tracing::error!(
737                addr = %addr,
738                error = %join_err,
739                "Axum server task exited unexpectedly — all routes on this port are now dead"
740            );
741        }
742    }
743}
744
745async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
746    let method = req.method().to_string();
747    let path = req.uri().path().to_string();
748    let query = req.uri().query().unwrap_or("").to_string();
749    let headers = req.headers().clone();
750
751    // Check Content-Length against limit BEFORE opening the stream
752    let content_length: Option<u64> = headers
753        .get(http::header::CONTENT_LENGTH)
754        .and_then(|v| v.to_str().ok())
755        .and_then(|s| s.parse().ok());
756
757    if let Some(len) = content_length
758        && len > state.max_request_body as u64
759    {
760        return Response::builder()
761            .status(StatusCode::PAYLOAD_TOO_LARGE)
762            .body(AxumBody::from("Request body exceeds configured limit"))
763            .expect("infallible"); // allow-unwrap
764    }
765
766    let _permit = match Arc::clone(&state.inflight).try_acquire_owned() {
767        Ok(permit) => permit,
768        Err(_) => {
769            return Response::builder()
770                .status(StatusCode::SERVICE_UNAVAILABLE)
771                .body(AxumBody::from("Service Unavailable"))
772                .expect("infallible"); // allow-unwrap
773        }
774    };
775
776    // Build StreamBody from Axum body WITHOUT materializing
777    let content_type = headers
778        .get(http::header::CONTENT_TYPE)
779        .and_then(|v| v.to_str().ok())
780        .map(|s| s.to_string());
781
782    let data_stream: BodyDataStream = req.into_body().into_data_stream();
783    let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
784    let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
785
786    let stream_body = StreamBody {
787        stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
788        metadata: StreamMetadata {
789            size_hint: content_length,
790            content_type,
791            origin: None,
792        },
793    };
794
795    // Look up handler for this path
796    let sender = {
797        let table = state.dispatch.read().await;
798        table.get(&path).cloned()
799    };
800    let Some(sender) = sender else {
801        return Response::builder()
802            .status(StatusCode::NOT_FOUND)
803            .body(AxumBody::from("No consumer registered for this path"))
804            .expect("infallible"); // allow-unwrap
805    };
806
807    let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
808    let envelope = RequestEnvelope {
809        method,
810        path,
811        query,
812        headers,
813        body: stream_body,
814        reply_tx,
815    };
816
817    if sender.send(envelope).await.is_err() {
818        return Response::builder()
819            .status(StatusCode::SERVICE_UNAVAILABLE)
820            .body(AxumBody::from("Consumer unavailable"))
821            .expect("infallible"); // allow-unwrap
822    }
823
824    match reply_rx.await {
825        Ok(reply) => {
826            let reply = match reply.body {
827                HttpReplyBody::Bytes(b)
828                    if exceeds_max_response_body(b.len(), state.max_response_body) =>
829                {
830                    HttpReply {
831                        status: 500,
832                        headers: vec![],
833                        body: HttpReplyBody::Bytes(bytes::Bytes::from(
834                            "Response body exceeds configured limit",
835                        )),
836                    }
837                }
838                _ => reply,
839            };
840
841            let status =
842                StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
843            let mut builder = Response::builder().status(status);
844            for (k, v) in &reply.headers {
845                builder = builder.header(k.as_str(), v.as_str());
846            }
847            match reply.body {
848                HttpReplyBody::Bytes(b) => builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
849                    Response::builder()
850                        .status(StatusCode::INTERNAL_SERVER_ERROR)
851                        .body(AxumBody::from("Invalid response headers from consumer"))
852                        .expect("infallible") // allow-unwrap
853                }),
854                HttpReplyBody::Stream(stream) => builder
855                    .body(AxumBody::from_stream(stream))
856                    .unwrap_or_else(|_| {
857                        Response::builder()
858                            .status(StatusCode::INTERNAL_SERVER_ERROR)
859                            .body(AxumBody::from("Invalid response headers from consumer"))
860                            .expect("infallible") // allow-unwrap
861                    }),
862            }
863        }
864        Err(_) => Response::builder()
865            .status(StatusCode::INTERNAL_SERVER_ERROR)
866            .body(AxumBody::from("Pipeline error"))
867            .expect("Response::builder() with a known-valid status code and body is infallible"), // allow-unwrap
868    }
869}
870
871fn exceeds_max_response_body(len: usize, max: usize) -> bool {
872    len > max
873}
874
875fn title_case_header(name: &str) -> String {
876    name.split('-')
877        .map(|part| {
878            let mut chars = part.chars();
879            match chars.next() {
880                None => String::new(),
881                Some(first) => first.to_uppercase().chain(chars.as_str().chars()).collect(),
882            }
883        })
884        .collect::<Vec<_>>()
885        .join("-")
886}
887
888// ---------------------------------------------------------------------------
889// HttpConsumer
890// ---------------------------------------------------------------------------
891
892pub struct HttpConsumer {
893    config: HttpServerConfig,
894}
895
896impl HttpConsumer {
897    pub fn new(config: HttpServerConfig) -> Self {
898        Self { config }
899    }
900}
901
902#[async_trait::async_trait]
903impl Consumer for HttpConsumer {
904    async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
905        use camel_component_api::{Body, Exchange, Message};
906
907        let dispatch = ServerRegistry::global()
908            .get_or_spawn(
909                &self.config.host,
910                self.config.port,
911                self.config.max_request_body,
912                self.config.max_response_body,
913                self.config.max_inflight_requests,
914            )
915            .await?;
916
917        // Create a channel for this path and register it
918        let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
919        {
920            let mut table = dispatch.write().await;
921            table.insert(self.config.path.clone(), env_tx);
922        }
923
924        let path = self.config.path.clone();
925        let cancel_token = ctx.cancel_token();
926        loop {
927            tokio::select! {
928                _ = ctx.cancelled() => {
929                    break;
930                }
931                envelope = env_rx.recv() => {
932                    let Some(envelope) = envelope else { break; };
933
934                    // Build Exchange from HTTP request
935                    let mut msg = Message::default();
936
937                    // Set standard Camel HTTP headers
938                    msg.set_header("CamelHttpMethod",
939                        serde_json::Value::String(envelope.method.clone()));
940                    msg.set_header("CamelHttpPath",
941                        serde_json::Value::String(envelope.path.clone()));
942                    msg.set_header("CamelHttpQuery",
943                        serde_json::Value::String(envelope.query.clone()));
944
945                    // Forward HTTP headers with Title-Case names (hyper lowercases them)
946                    for (k, v) in &envelope.headers {
947                        if let Ok(val_str) = v.to_str() {
948                            msg.set_header(
949                                title_case_header(k.as_str()),
950                                serde_json::Value::String(val_str.to_string()),
951                            );
952                        }
953                    }
954
955                    // Body: always arrives as Body::Stream (native streaming)
956                    // Routes can call into_bytes() if they need to materialize
957                    msg.body = Body::Stream(envelope.body);
958
959                    #[allow(unused_mut)]
960                    let mut exchange = Exchange::new(msg);
961
962                    // Extract W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
963                    #[cfg(feature = "otel")]
964                    {
965                        let headers: HashMap<String, String> = envelope
966                            .headers
967                            .iter()
968                            .filter_map(|(k, v)| {
969                                Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
970                            })
971                            .collect();
972                        camel_otel::extract_into_exchange(&mut exchange, &headers);
973                    }
974
975                    let reply_tx = envelope.reply_tx;
976                    let sender = ctx.sender().clone();
977                    let path_clone = path.clone();
978                    let cancel = cancel_token.clone();
979
980                    // Spawn a task to handle this request concurrently
981                    //
982                    // NOTE: This spawns a separate tokio task for each incoming HTTP request to enable
983                    // true concurrent request processing. This change was introduced as part of the
984                    // pipeline concurrency feature and was NOT part of the original HttpConsumer design.
985                    //
986                    // Rationale:
987                    // 1. Without spawning per-request tasks, the send_and_wait() operation would block
988                    //    the consumer's main loop until the pipeline processing completes
989                    // 2. This blocking would prevent multiple HTTP requests from being processed
990                    //    concurrently, even when ConcurrencyModel::Concurrent is enabled on the pipeline
991                    // 3. The channel would never have multiple exchanges buffered simultaneously,
992                    //    defeating the purpose of pipeline-side concurrency
993                    // 4. By spawning a task per request, we allow the consumer loop to continue
994                    //    accepting new requests while existing ones are processed in the pipeline
995                    //
996                    // This approach effectively decouples request acceptance from pipeline processing,
997                    // allowing the channel to buffer multiple exchanges that can be processed concurrently
998                    // by the pipeline when ConcurrencyModel::Concurrent is active.
999                    tokio::spawn(async move {
1000                        // Check for cancellation before sending to pipeline.
1001                        // Returns 503 (Service Unavailable) instead of letting the request
1002                        // enter a shutting-down pipeline. This is a behavioral change from
1003                        // the pre-concurrency implementation where cancellation during
1004                        // processing would result in a 500 (Internal Server Error).
1005                        // 503 is more semantically correct: the server is temporarily
1006                        // unable to handle the request due to shutdown.
1007                        if cancel.is_cancelled() {
1008                            let _ = reply_tx.send(HttpReply {
1009                                status: 503,
1010                                headers: vec![],
1011                                body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
1012                            });
1013                            return;
1014                        }
1015
1016                        // Send through pipeline and await result
1017                        let (tx, rx) = tokio::sync::oneshot::channel();
1018                        let envelope = camel_component_api::consumer::ExchangeEnvelope {
1019                            exchange,
1020                            reply_tx: Some(tx),
1021                        };
1022
1023                        let result = match sender.send(envelope).await {
1024                            Ok(()) => rx.await.map_err(|_| camel_component_api::CamelError::ChannelClosed),
1025                            Err(_) => Err(camel_component_api::CamelError::ChannelClosed),
1026                        }
1027                        .and_then(|r| r);
1028
1029                        let reply = match result {
1030                            Ok(out) => {
1031                                let status = out
1032                                    .input
1033                                    .header("CamelHttpResponseCode")
1034                                    .and_then(|v| {
1035                                        let raw = v.as_u64()
1036                                            .or_else(|| v.as_str().and_then(|s| s.parse().ok()))?;
1037                                        let code = raw as u16;
1038                                        (100..1000).contains(&code).then_some(code)
1039                                    })
1040                                    .unwrap_or(200);
1041
1042                                let user_content_type = out
1043                                    .input
1044                                    .header("Content-Type")
1045                                    .and_then(|v| v.as_str().map(|s| s.to_string()));
1046
1047                                let (reply_body, inferred_content_type): (HttpReplyBody, Option<String>) = match out.input.body {
1048                                    Body::Empty => (HttpReplyBody::Bytes(bytes::Bytes::new()), None),
1049                                    Body::Bytes(b) => (HttpReplyBody::Bytes(b), None),
1050                                    Body::Text(s) => (HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())), Some("text/plain; charset=utf-8".to_string())),
1051                                    Body::Xml(s) => (HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())), Some("application/xml".to_string())),
1052                                    Body::Json(v) => (HttpReplyBody::Bytes(bytes::Bytes::from(
1053                                        v.to_string().into_bytes(),
1054                                    )), Some("application/json".to_string())),
1055                                    Body::Stream(s) => {
1056                                        let ct = s.metadata.content_type.clone();
1057                                        match s.stream.lock().await.take() {
1058                                            Some(stream) => (
1059                                                HttpReplyBody::Stream(stream),
1060                                                ct,
1061                                            ),
1062                                            None => {
1063                                                tracing::error!(
1064                                                    "Body::Stream already consumed before HTTP reply — returning 500"
1065                                                );
1066                                                let error_reply = HttpReply {
1067                                                    status: 500,
1068                                                    headers: vec![],
1069                                                    body: HttpReplyBody::Bytes(bytes::Bytes::new()),
1070                                                };
1071                                                if reply_tx.send(error_reply).is_err() {
1072                                                    debug!("reply_tx dropped before error reply could be sent");
1073                                                }
1074                                                return;
1075                                            }
1076                                        }
1077                                    }
1078                                };
1079
1080                                let mut resp_headers: Vec<(String, String)> = out
1081                                    .input
1082                                    .headers
1083                                    .iter()
1084                                    .filter(|(k, _)| !k.starts_with("Camel"))
1085                                    .filter(|(k, _)| {
1086                                        !matches!(
1087                                            k.to_lowercase().as_str(),
1088                                            "content-length"
1089                                            | "content-type"
1090                                            | "transfer-encoding"
1091                                            | "connection"
1092                                            | "cache-control"
1093                                            | "date"
1094                                            | "pragma"
1095                                            | "trailer"
1096                                            | "upgrade"
1097                                            | "via"
1098                                            | "warning"
1099                                            | "host"
1100                                            | "user-agent"
1101                                            | "accept"
1102                                            | "accept-encoding"
1103                                            | "accept-language"
1104                                            | "accept-charset"
1105                                            | "authorization"
1106                                            | "proxy-authorization"
1107                                            | "cookie"
1108                                            | "expect"
1109                                            | "from"
1110                                            | "if-match"
1111                                            | "if-modified-since"
1112                                            | "if-none-match"
1113                                            | "if-range"
1114                                            | "if-unmodified-since"
1115                                            | "max-forwards"
1116                                            | "proxy-connection"
1117                                            | "range"
1118                                            | "referer"
1119                                            | "te"
1120                                        )
1121                                    })
1122                                    .filter_map(|(k, v)| {
1123                                        v.as_str().map(|s| (k.clone(), s.to_string()))
1124                                    })
1125                                    .collect();
1126
1127                                let content_type = user_content_type
1128                                    .or(inferred_content_type);
1129                                if let Some(ct) = content_type {
1130                                    resp_headers.push(("Content-Type".to_string(), ct));
1131                                }
1132
1133                                HttpReply {
1134                                    status,
1135                                    headers: resp_headers,
1136                                    body: reply_body,
1137                                }
1138                            }
1139                            Err(CamelError::Stopped) => {
1140                                tracing::debug!(path = %path_clone, "Route stopped — returning 204 No Content");
1141                                HttpReply {
1142                                    status: 204,
1143                                    headers: vec![],
1144                                    body: HttpReplyBody::Bytes(bytes::Bytes::new()),
1145                                }
1146                            }
1147                            Err(CamelError::Unauthenticated(msg)) => {
1148                                tracing::warn!(error = %msg, path = %path_clone, "Authentication failed");
1149                                HttpReply {
1150                                    status: 401,
1151                                    headers: vec![("WWW-Authenticate".to_string(), "Bearer".to_string())],
1152                                    body: HttpReplyBody::Bytes(bytes::Bytes::from("Unauthorized")),
1153                                }
1154                            }
1155                            Err(CamelError::Unauthorized(msg)) => {
1156                                tracing::warn!(error = %msg, path = %path_clone, "Authorization failed");
1157                                HttpReply {
1158                                    status: 403,
1159                                    headers: vec![],
1160                                    body: HttpReplyBody::Bytes(bytes::Bytes::from("Forbidden")),
1161                                }
1162                            }
1163                            Err(e) => {
1164                                tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
1165                                HttpReply {
1166                                    status: 500,
1167                                    headers: vec![],
1168                                    body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
1169                                }
1170                            }
1171                        };
1172
1173                        // Reply to Axum handler (ignore error if client disconnected)
1174                        let _ = reply_tx.send(reply);
1175                    });
1176                }
1177            }
1178        }
1179
1180        // Deregister this path
1181        {
1182            let mut table = dispatch.write().await;
1183            table.remove(&path);
1184        }
1185
1186        Ok(())
1187    }
1188
1189    async fn stop(&mut self) -> Result<(), CamelError> {
1190        Ok(())
1191    }
1192
1193    fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1194        camel_component_api::ConcurrencyModel::Concurrent { max: None }
1195    }
1196}
1197
1198// ---------------------------------------------------------------------------
1199// HttpComponent / HttpsComponent
1200// ---------------------------------------------------------------------------
1201
1202pub struct HttpComponent {
1203    config: HttpConfig,
1204}
1205
1206fn build_client(config: &HttpConfig, cookie_handling: CookieHandling) -> reqwest::Client {
1207    let mut builder = reqwest::Client::builder()
1208        .connect_timeout(Duration::from_millis(config.connect_timeout_ms))
1209        .pool_max_idle_per_host(config.pool_max_idle_per_host)
1210        .pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
1211
1212    if !config.follow_redirects {
1213        builder = builder.redirect(reqwest::redirect::Policy::none());
1214    } else if let Some(max_redirects) = config.max_redirects {
1215        builder = builder.redirect(reqwest::redirect::Policy::limited(max_redirects));
1216    }
1217
1218    if let Some(proxy_url) = &config.proxy_url
1219        && let Ok(proxy) = reqwest::Proxy::all(proxy_url)
1220    {
1221        builder = builder.proxy(proxy);
1222    }
1223
1224    if matches!(cookie_handling, CookieHandling::InMemory) {
1225        // TODO(HTTP-013): enable reqwest cookie jar once workspace reqwest features include cookie_store.
1226    }
1227
1228    if let Some(tls) = &config.tls
1229        && tls.enabled
1230    {
1231        if tls.insecure || !tls.verify_peer {
1232            builder = builder.danger_accept_invalid_certs(true);
1233        }
1234
1235        if let Some(ca_path) = &tls.ca_cert_path
1236            && let Ok(ca_bytes) = std::fs::read(ca_path)
1237        {
1238            let cert = reqwest::Certificate::from_pem(&ca_bytes)
1239                .or_else(|_| reqwest::Certificate::from_der(&ca_bytes));
1240            if let Ok(ca_cert) = cert {
1241                builder = builder.add_root_certificate(ca_cert);
1242            }
1243        }
1244
1245        if let (Some(cert_path), Some(key_path)) = (&tls.client_cert_path, &tls.client_key_path)
1246            && let (Ok(cert_bytes), Ok(key_bytes)) =
1247                (std::fs::read(cert_path), std::fs::read(key_path))
1248        {
1249            let mut identity_pem = cert_bytes;
1250            identity_pem.extend_from_slice(&key_bytes);
1251            if let Ok(identity) = reqwest::Identity::from_pem(&identity_pem) {
1252                builder = builder.identity(identity);
1253            }
1254        }
1255    }
1256
1257    builder
1258        .build()
1259        .expect("reqwest::Client::build() with valid config should not fail") // allow-unwrap
1260}
1261
1262impl HttpComponent {
1263    pub fn new() -> Self {
1264        let config = HttpConfig::default();
1265        Self { config }
1266    }
1267
1268    pub fn with_config(config: HttpConfig) -> Self {
1269        Self { config }
1270    }
1271
1272    pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1273        match config {
1274            Some(cfg) => Self::with_config(cfg),
1275            None => Self::new(),
1276        }
1277    }
1278}
1279
1280impl Default for HttpComponent {
1281    fn default() -> Self {
1282        Self::new()
1283    }
1284}
1285
1286impl Component for HttpComponent {
1287    fn scheme(&self) -> &str {
1288        "http"
1289    }
1290
1291    fn create_endpoint(
1292        &self,
1293        uri: &str,
1294        ctx: &dyn camel_component_api::ComponentContext,
1295    ) -> Result<Box<dyn Endpoint>, CamelError> {
1296        self.config.validate()?;
1297        let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1298        let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1299        let client = build_client(&self.config, config.cookie_handling);
1300        ctx.register_current_route_health_check(Arc::new(HttpHealthCheck::new(
1301            server_config.host.clone(),
1302            server_config.port,
1303        )));
1304        Ok(Box::new(HttpEndpoint {
1305            uri: uri.to_string(),
1306            config,
1307            server_config,
1308            client,
1309        }))
1310    }
1311}
1312
1313pub struct HttpsComponent {
1314    config: HttpConfig,
1315}
1316
1317impl HttpsComponent {
1318    pub fn new() -> Self {
1319        let config = HttpConfig::default();
1320        Self { config }
1321    }
1322
1323    pub fn with_config(config: HttpConfig) -> Self {
1324        Self { config }
1325    }
1326
1327    pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1328        match config {
1329            Some(cfg) => Self::with_config(cfg),
1330            None => Self::new(),
1331        }
1332    }
1333}
1334
1335impl Default for HttpsComponent {
1336    fn default() -> Self {
1337        Self::new()
1338    }
1339}
1340
1341impl Component for HttpsComponent {
1342    fn scheme(&self) -> &str {
1343        "https"
1344    }
1345
1346    fn create_endpoint(
1347        &self,
1348        uri: &str,
1349        ctx: &dyn camel_component_api::ComponentContext,
1350    ) -> Result<Box<dyn Endpoint>, CamelError> {
1351        self.config.validate()?;
1352        let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1353        let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1354        let client = build_client(&self.config, config.cookie_handling);
1355        ctx.register_current_route_health_check(Arc::new(HttpHealthCheck::new(
1356            server_config.host.clone(),
1357            server_config.port,
1358        )));
1359        Ok(Box::new(HttpEndpoint {
1360            uri: uri.to_string(),
1361            config,
1362            server_config,
1363            client,
1364        }))
1365    }
1366}
1367
1368// ---------------------------------------------------------------------------
1369// HttpEndpoint
1370// ---------------------------------------------------------------------------
1371
1372struct HttpEndpoint {
1373    uri: String,
1374    config: HttpEndpointConfig,
1375    server_config: HttpServerConfig,
1376    client: reqwest::Client,
1377}
1378
1379impl Endpoint for HttpEndpoint {
1380    fn uri(&self) -> &str {
1381        &self.uri
1382    }
1383
1384    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1385        Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
1386    }
1387
1388    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1389        let producer = HttpProducer {
1390            config: Arc::new(self.config.clone()),
1391            client: self.client.clone(),
1392        };
1393        if let Some(ref provider) = self.config.token_provider {
1394            let layer = BearerTokenLayer::new(Arc::clone(provider));
1395            Ok(BoxProcessor::new(layer.layer(producer)))
1396        } else {
1397            Ok(BoxProcessor::new(producer))
1398        }
1399    }
1400}
1401
1402// ---------------------------------------------------------------------------
1403// SSRF Protection
1404// ---------------------------------------------------------------------------
1405
1406fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
1407    let parsed = url::Url::parse(url)
1408        .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1409
1410    // Check blocked hosts
1411    if let Some(host) = parsed.host_str()
1412        && config.blocked_hosts.iter().any(|blocked| host == blocked)
1413    {
1414        return Err(CamelError::ProcessorError(format!(
1415            "Host '{}' is blocked",
1416            host
1417        )));
1418    }
1419
1420    // Check private IPs if not allowed
1421    if !config.allow_private_ips
1422        && let Some(host) = parsed.host()
1423    {
1424        match host {
1425            url::Host::Ipv4(ip) => {
1426                if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
1427                    return Err(CamelError::ProcessorError(format!(
1428                        "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
1429                        ip
1430                    )));
1431                }
1432            }
1433            url::Host::Ipv6(ip) => {
1434                if ip.is_loopback() {
1435                    return Err(CamelError::ProcessorError(format!(
1436                        "Loopback IP '{}' not allowed",
1437                        ip
1438                    )));
1439                }
1440            }
1441            url::Host::Domain(domain) => {
1442                // Block common internal domains
1443                let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
1444                if blocked_domains.contains(&domain) {
1445                    return Err(CamelError::ProcessorError(format!(
1446                        "Domain '{}' is not allowed",
1447                        domain
1448                    )));
1449                }
1450            }
1451        }
1452    }
1453
1454    Ok(())
1455}
1456
1457fn is_private_ip(ip: &IpAddr) -> bool {
1458    match ip {
1459        IpAddr::V4(ipv4) => {
1460            ipv4.is_private() || ipv4.is_loopback() || ipv4.is_link_local() || ipv4.octets()[0] == 0
1461        }
1462        IpAddr::V6(ipv6) => {
1463            let seg0 = ipv6.segments()[0];
1464            ipv6.is_loopback()
1465                // fc00::/7 (ULA)
1466                || (seg0 & 0xfe00) == 0xfc00
1467                // fe80::/10 (link-local)
1468                || (seg0 & 0xffc0) == 0xfe80
1469                // ::ffff:0:0/96 (IPv4-mapped): only block if the mapped IPv4 is private
1470                || ipv6
1471                    .to_ipv4_mapped()
1472                    .map(|v4| {
1473                        v4.is_private()
1474                            || v4.is_loopback()
1475                            || v4.is_link_local()
1476                            || v4.octets()[0] == 0
1477                    })
1478                    .unwrap_or(false)
1479        }
1480    }
1481}
1482
1483async fn validate_resolved_host_for_ssrf(
1484    url: &str,
1485    config: &HttpEndpointConfig,
1486) -> Result<(), CamelError> {
1487    if config.allow_private_ips {
1488        return Ok(());
1489    }
1490
1491    let parsed = url::Url::parse(url)
1492        .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1493    let Some(host) = parsed.host_str() else {
1494        return Ok(());
1495    };
1496    let Some(port) = parsed.port_or_known_default() else {
1497        return Ok(());
1498    };
1499
1500    let resolved = tokio::net::lookup_host((host, port)).await.map_err(|e| {
1501        CamelError::ProcessorError(format!("Failed to resolve host '{}': {}", host, e))
1502    })?;
1503
1504    for addr in resolved {
1505        let ip = addr.ip();
1506        if is_private_ip(&ip) {
1507            return Err(CamelError::ProcessorError(format!(
1508                "Target resolved to private IP: {}",
1509                ip
1510            )));
1511        }
1512    }
1513
1514    Ok(())
1515}
1516
1517// ---------------------------------------------------------------------------
1518// HttpProducer
1519// ---------------------------------------------------------------------------
1520
1521#[derive(Clone)]
1522struct HttpProducer {
1523    config: Arc<HttpEndpointConfig>,
1524    client: reqwest::Client,
1525}
1526
1527impl HttpProducer {
1528    fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1529        if let Some(ref method) = config.http_method {
1530            return method.to_uppercase();
1531        }
1532        if let Some(method) = exchange
1533            .input
1534            .header("CamelHttpMethod")
1535            .and_then(|v| v.as_str())
1536        {
1537            return method.to_uppercase();
1538        }
1539        if !exchange.input.body.is_empty() {
1540            return "POST".to_string();
1541        }
1542        "GET".to_string()
1543    }
1544
1545    fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1546        if let Some(uri) = exchange
1547            .input
1548            .header("CamelHttpUri")
1549            .and_then(|v| v.as_str())
1550        {
1551            let mut url = uri.to_string();
1552            if let Some(path) = exchange
1553                .input
1554                .header("CamelHttpPath")
1555                .and_then(|v| v.as_str())
1556            {
1557                if !url.ends_with('/') && !path.starts_with('/') {
1558                    url.push('/');
1559                }
1560                url.push_str(path);
1561            }
1562            if let Some(query) = exchange
1563                .input
1564                .header("CamelHttpQuery")
1565                .and_then(|v| v.as_str())
1566            {
1567                url.push('?');
1568                url.push_str(query);
1569            }
1570            return url;
1571        }
1572
1573        let mut url = config.base_url.clone();
1574
1575        if let Some(path) = exchange
1576            .input
1577            .header("CamelHttpPath")
1578            .and_then(|v| v.as_str())
1579        {
1580            if !url.ends_with('/') && !path.starts_with('/') {
1581                url.push('/');
1582            }
1583            url.push_str(path);
1584        }
1585
1586        if let Some(query) = exchange
1587            .input
1588            .header("CamelHttpQuery")
1589            .and_then(|v| v.as_str())
1590        {
1591            url.push('?');
1592            url.push_str(query);
1593        } else if !config.query_params.is_empty() {
1594            let mut parsed = url::Url::parse(&url).expect("base URL must be valid"); // allow-unwrap
1595            for (k, v) in &config.query_params {
1596                parsed.query_pairs_mut().append_pair(k, v);
1597            }
1598            url = parsed.to_string();
1599        }
1600
1601        url
1602    }
1603
1604    fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1605        status >= range.0 && status <= range.1
1606    }
1607}
1608
1609impl Service<Exchange> for HttpProducer {
1610    type Response = Exchange;
1611    type Error = CamelError;
1612    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1613
1614    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1615        Poll::Ready(Ok(()))
1616    }
1617
1618    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1619        let config = self.config.clone();
1620        let client = self.client.clone();
1621
1622        Box::pin(async move {
1623            let method_str = HttpProducer::resolve_method(&exchange, &config);
1624            let url = HttpProducer::resolve_url(&exchange, &config);
1625
1626            // SECURITY: Validate URL for SSRF
1627            validate_url_for_ssrf(&url, &config)?;
1628            validate_resolved_host_for_ssrf(&url, &config).await?;
1629
1630            debug!(
1631                correlation_id = %exchange.correlation_id(),
1632                method = %method_str,
1633                url = %url,
1634                "HTTP request"
1635            );
1636
1637            let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1638                CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1639            })?;
1640
1641            let mut request = client.request(method, &url);
1642
1643            if let Some(timeout) = config.response_timeout {
1644                request = request.timeout(timeout);
1645            }
1646
1647            if let Some(user_agent) = &config.user_agent
1648                && !config.bridge_endpoint
1649            {
1650                request = request.header("User-Agent", user_agent.clone());
1651            }
1652
1653            // Inject W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
1654            #[cfg(feature = "otel")]
1655            let should_inject_otel = !config.bridge_endpoint;
1656            #[cfg(feature = "otel")]
1657            if should_inject_otel {
1658                let mut otel_headers = HashMap::new();
1659                camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1660                for (k, v) in otel_headers {
1661                    if let (Ok(name), Ok(val)) = (
1662                        reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1663                        reqwest::header::HeaderValue::from_str(&v),
1664                    ) {
1665                        request = request.header(name, val);
1666                    }
1667                }
1668            }
1669
1670            for (key, value) in &exchange.input.headers {
1671                if !key.starts_with("Camel")
1672                    && !config
1673                        .skip_request_headers
1674                        .iter()
1675                        .any(|h| h.eq_ignore_ascii_case(key))
1676                    && let Some(val_str) = value.as_str()
1677                    && let (Ok(name), Ok(val)) = (
1678                        reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1679                        reqwest::header::HeaderValue::from_str(val_str),
1680                    )
1681                {
1682                    request = request.header(name, val);
1683                }
1684            }
1685
1686            if !config.bridge_endpoint {
1687                match &config.auth {
1688                    HttpAuth::None => {}
1689                    HttpAuth::Basic { username, password } => {
1690                        request = request.basic_auth(username, Some(password));
1691                    }
1692                    HttpAuth::Bearer { token } => {
1693                        request = request.bearer_auth(token);
1694                    }
1695                }
1696
1697                if config.connection_close {
1698                    request = request.header("Connection", "close");
1699                }
1700            }
1701
1702            match exchange.input.body {
1703                Body::Stream(ref s) => {
1704                    let mut stream_lock = s.stream.lock().await;
1705                    if let Some(stream) = stream_lock.take() {
1706                        request = request.body(reqwest::Body::wrap_stream(stream));
1707                    } else {
1708                        return Err(CamelError::AlreadyConsumed);
1709                    }
1710                }
1711                _ => {
1712                    // For other types, materialize with configured limit
1713                    let body = std::mem::take(&mut exchange.input.body);
1714                    let bytes = body.into_bytes(config.max_body_size).await?;
1715                    if !bytes.is_empty() {
1716                        request = request.body(bytes);
1717                    }
1718                }
1719            }
1720
1721            let response = request
1722                .send()
1723                .await
1724                .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1725
1726            let status_code = response.status().as_u16();
1727            let status_text = response
1728                .status()
1729                .canonical_reason()
1730                .unwrap_or("Unknown")
1731                .to_string();
1732
1733            for (key, value) in response.headers() {
1734                if config
1735                    .skip_response_headers
1736                    .iter()
1737                    .any(|h| h.eq_ignore_ascii_case(key.as_str()))
1738                {
1739                    continue;
1740                }
1741                if let Ok(val_str) = value.to_str() {
1742                    exchange.input.set_header(
1743                        title_case_header(key.as_str()),
1744                        serde_json::Value::String(val_str.to_string()),
1745                    );
1746                }
1747            }
1748
1749            exchange.input.set_header(
1750                "CamelHttpResponseCode",
1751                serde_json::Value::Number(status_code.into()),
1752            );
1753            exchange.input.set_header(
1754                "CamelHttpResponseText",
1755                serde_json::Value::String(status_text.clone()),
1756            );
1757
1758            // Read response body with timeout and size guard (HTTP-004, HTTP-005)
1759            let read_timeout = Duration::from_millis(config.read_timeout_ms);
1760            let response_body = tokio::time::timeout(read_timeout, async {
1761                // Check Content-Length header before allocating
1762                if let Some(content_len) = response.content_length()
1763                    && content_len > config.max_response_bytes as u64
1764                {
1765                    return Err(CamelError::ProcessorError(format!(
1766                        "Response body too large: {} bytes exceeds limit of {} bytes",
1767                        content_len, config.max_response_bytes
1768                    )));
1769                }
1770                // Use bytes_stream() for lazy streaming with size guard
1771                use futures::TryStreamExt;
1772                let mut stream = response.bytes_stream();
1773                let mut total: usize = 0;
1774                let mut collected = Vec::new();
1775                while let Some(chunk) = stream.try_next().await.map_err(|e| {
1776                    CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1777                })? {
1778                    total += chunk.len();
1779                    if total > config.max_response_bytes {
1780                        return Err(CamelError::ProcessorError(format!(
1781                            "Response body too large: {} bytes exceeds limit of {} bytes",
1782                            total, config.max_response_bytes
1783                        )));
1784                    }
1785                    collected.push(chunk);
1786                }
1787                let mut result = bytes::BytesMut::with_capacity(total);
1788                for chunk in collected {
1789                    result.extend_from_slice(&chunk);
1790                }
1791                Ok::<bytes::Bytes, CamelError>(result.freeze())
1792            })
1793            .await
1794            .map_err(|_| {
1795                CamelError::ProcessorError(format!(
1796                    "Read timeout after {}ms",
1797                    config.read_timeout_ms
1798                ))
1799            })??;
1800
1801            if config.throw_exception_on_failure
1802                && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1803            {
1804                return Err(CamelError::HttpOperationFailed {
1805                    method: method_str,
1806                    url,
1807                    status_code,
1808                    status_text,
1809                    response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1810                });
1811            }
1812
1813            if !response_body.is_empty() {
1814                exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1815            }
1816
1817            debug!(
1818                correlation_id = %exchange.correlation_id(),
1819                status = status_code,
1820                url = %url,
1821                "HTTP response"
1822            );
1823            Ok(exchange)
1824        })
1825    }
1826}
1827
1828#[cfg(test)]
1829mod tests {
1830    use super::*;
1831    use camel_component_api::{Message, NoOpComponentContext};
1832    use std::sync::Arc;
1833    use std::time::Duration;
1834
1835    fn test_producer_ctx() -> ProducerContext {
1836        ProducerContext::new()
1837    }
1838
1839    #[test]
1840    fn test_http_config_defaults() {
1841        let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1842        assert_eq!(config.base_url, "http://localhost:8080/api");
1843        assert!(config.http_method.is_none());
1844        assert!(config.throw_exception_on_failure);
1845        assert_eq!(config.ok_status_code_range, (200, 299));
1846        assert!(config.response_timeout.is_none());
1847        assert!(matches!(config.auth, HttpAuth::None));
1848        assert!(matches!(config.cookie_handling, CookieHandling::Disabled));
1849        assert!(!config.bridge_endpoint);
1850        assert!(!config.connection_close);
1851    }
1852
1853    #[test]
1854    fn test_http_config_scheme() {
1855        // UriConfig trait method returns "http" as primary scheme
1856        assert_eq!(HttpEndpointConfig::scheme(), "http");
1857    }
1858
1859    #[test]
1860    fn test_http_config_from_components() {
1861        // Test from_components directly (trait method)
1862        let components = camel_component_api::UriComponents {
1863            scheme: "https".to_string(),
1864            path: "//api.example.com/v1".to_string(),
1865            params: std::collections::HashMap::from([(
1866                "httpMethod".to_string(),
1867                "POST".to_string(),
1868            )]),
1869        };
1870        let config = HttpEndpointConfig::from_components(components).unwrap();
1871        assert_eq!(config.base_url, "https://api.example.com/v1");
1872        assert_eq!(config.http_method, Some("POST".to_string()));
1873    }
1874
1875    #[test]
1876    fn test_http_config_with_options() {
1877        let config = HttpEndpointConfig::from_uri(
1878            "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1879        ).unwrap();
1880        assert_eq!(config.base_url, "https://api.example.com/v1");
1881        assert_eq!(config.http_method, Some("PUT".to_string()));
1882        assert!(!config.throw_exception_on_failure);
1883        assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1884    }
1885
1886    #[test]
1887    fn test_http_endpoint_config_auth_and_headers_options() {
1888        let config = HttpEndpointConfig::from_uri(
1889            "http://localhost/api?authMethod=Basic&authUsername=u&authPassword=p&userAgent=camel-test&bridgeEndpoint=true&connectionClose=true&skipRequestHeaders=Authorization,X-Secret&skipResponseHeaders=Set-Cookie&cookieHandling=InMemory",
1890        )
1891        .unwrap();
1892
1893        assert!(matches!(
1894            config.auth,
1895            HttpAuth::Basic { username, password } if username == "u" && password == "p"
1896        ));
1897        assert_eq!(config.user_agent.as_deref(), Some("camel-test"));
1898        assert!(matches!(config.cookie_handling, CookieHandling::InMemory));
1899        assert!(config.bridge_endpoint);
1900        assert!(config.connection_close);
1901        assert_eq!(
1902            config.skip_request_headers,
1903            vec!["authorization".to_string(), "x-secret".to_string()]
1904        );
1905        assert_eq!(config.skip_response_headers, vec!["set-cookie".to_string()]);
1906    }
1907
1908    #[test]
1909    fn test_http_endpoint_config_bearer_auth() {
1910        let config = HttpEndpointConfig::from_uri(
1911            "http://localhost/api?authMethod=Bearer&authBearerToken=t",
1912        )
1913        .unwrap();
1914        assert!(matches!(
1915            config.auth,
1916            HttpAuth::Bearer { token } if token == "t"
1917        ));
1918    }
1919
1920    #[test]
1921    fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
1922        let config = HttpConfig::default()
1923            .with_response_timeout_ms(999)
1924            .with_allow_private_ips(true)
1925            .with_blocked_hosts(vec!["evil.com".to_string()])
1926            .with_max_body_size(12345);
1927        let endpoint =
1928            HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
1929        assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
1930        assert!(endpoint.allow_private_ips);
1931        assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
1932        assert_eq!(endpoint.max_body_size, 12345);
1933    }
1934
1935    #[test]
1936    fn test_from_uri_with_defaults_uri_overrides_config() {
1937        let config = HttpConfig::default()
1938            .with_response_timeout_ms(999)
1939            .with_allow_private_ips(true)
1940            .with_blocked_hosts(vec!["evil.com".to_string()])
1941            .with_max_body_size(12345);
1942        let endpoint = HttpEndpointConfig::from_uri_with_defaults(
1943            "http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
1944            &config,
1945        )
1946        .unwrap();
1947        assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
1948        assert!(!endpoint.allow_private_ips);
1949        assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
1950        assert_eq!(endpoint.max_body_size, 99);
1951    }
1952
1953    #[test]
1954    fn test_http_config_ok_status_range() {
1955        let config =
1956            HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1957        assert_eq!(config.ok_status_code_range, (200, 204));
1958    }
1959
1960    #[test]
1961    fn test_http_config_wrong_scheme() {
1962        let result = HttpEndpointConfig::from_uri("file:/tmp");
1963        assert!(result.is_err());
1964    }
1965
1966    #[test]
1967    fn test_http_component_scheme() {
1968        let component = HttpComponent::new();
1969        assert_eq!(component.scheme(), "http");
1970    }
1971
1972    #[test]
1973    fn test_https_component_scheme() {
1974        let component = HttpsComponent::new();
1975        assert_eq!(component.scheme(), "https");
1976    }
1977
1978    #[test]
1979    fn test_http_endpoint_creates_consumer() {
1980        let component = HttpComponent::new();
1981        let ctx = NoOpComponentContext;
1982        let endpoint = component
1983            .create_endpoint("http://0.0.0.0:19100/test", &ctx)
1984            .unwrap();
1985        assert!(endpoint.create_consumer().is_ok());
1986    }
1987
1988    #[test]
1989    fn test_https_endpoint_creates_consumer() {
1990        let component = HttpsComponent::new();
1991        let ctx = NoOpComponentContext;
1992        let endpoint = component
1993            .create_endpoint("https://0.0.0.0:8443/test", &ctx)
1994            .unwrap();
1995        assert!(endpoint.create_consumer().is_ok());
1996    }
1997
1998    #[test]
1999    fn test_http_endpoint_creates_producer() {
2000        let ctx = test_producer_ctx();
2001        let component = HttpComponent::new();
2002        let endpoint_ctx = NoOpComponentContext;
2003        let endpoint = component
2004            .create_endpoint("http://localhost/api", &endpoint_ctx)
2005            .unwrap();
2006        assert!(endpoint.create_producer(&ctx).is_ok());
2007    }
2008
2009    // -----------------------------------------------------------------------
2010    // Producer tests
2011    // -----------------------------------------------------------------------
2012
2013    #[tokio::test]
2014    async fn test_producer_with_token_provider() {
2015        use camel_auth::oauth2::TokenProvider;
2016        use tower::ServiceExt;
2017
2018        let captured_auth: Arc<std::sync::Mutex<Option<String>>> =
2019            Arc::new(std::sync::Mutex::new(None));
2020        let captured_clone = Arc::clone(&captured_auth);
2021
2022        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2023        let port = listener.local_addr().unwrap().port();
2024
2025        let _handle = tokio::spawn(async move {
2026            use tokio::io::{AsyncReadExt, AsyncWriteExt};
2027            if let Ok((mut stream, _)) = listener.accept().await {
2028                let mut buf = vec![0u8; 8192];
2029                let n = stream.read(&mut buf).await.unwrap_or(0);
2030                let request = String::from_utf8_lossy(&buf[..n]).to_string();
2031                let auth = request
2032                    .lines()
2033                    .find(|l| l.to_lowercase().starts_with("authorization:"))
2034                    .map(|l| {
2035                        l.split(':')
2036                            .nth(1)
2037                            .map(|s| s.trim().to_string())
2038                            .unwrap_or_default()
2039                    });
2040                *captured_clone.lock().unwrap() = auth;
2041                let body = r#"{"echo":"ok"}"#;
2042                let resp = format!(
2043                    "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
2044                    body.len(),
2045                    body
2046                );
2047                let _ = stream.write_all(resp.as_bytes()).await;
2048            }
2049        });
2050
2051        #[derive(Debug)]
2052        struct StaticProvider;
2053        #[async_trait::async_trait]
2054        impl TokenProvider for StaticProvider {
2055            async fn get_token(&self) -> Result<String, camel_auth::types::AuthError> {
2056                Ok("injected-token".into())
2057            }
2058        }
2059
2060        let uri = format!("http://127.0.0.1:{}/api?allowPrivateIps=true", port);
2061        let ctx = test_producer_ctx();
2062        let component = HttpComponent::new();
2063        let endpoint_ctx = NoOpComponentContext;
2064        let endpoint = component.create_endpoint(&uri, &endpoint_ctx).unwrap();
2065        let producer = endpoint.create_producer(&ctx).unwrap();
2066
2067        let mut exchange = Exchange::new(Message::new("hello"));
2068
2069        let layer = BearerTokenLayer::new(Arc::new(StaticProvider));
2070        let mut layered = layer.layer(producer);
2071        let result = layered.ready().await.unwrap().call(exchange).await;
2072        assert!(result.is_ok(), "producer call failed: {:?}", result);
2073
2074        tokio::time::sleep(Duration::from_millis(100)).await;
2075        let auth = captured_auth.lock().unwrap().take();
2076        assert_eq!(auth.as_deref(), Some("Bearer injected-token"));
2077    }
2078
2079    async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
2080        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2081        let addr = listener.local_addr().unwrap();
2082        let url = format!("http://127.0.0.1:{}", addr.port());
2083
2084        let handle = tokio::spawn(async move {
2085            loop {
2086                if let Ok((mut stream, _)) = listener.accept().await {
2087                    tokio::spawn(async move {
2088                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
2089                        let mut buf = vec![0u8; 4096];
2090                        let n = stream.read(&mut buf).await.unwrap_or(0);
2091                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
2092
2093                        let method = request.split_whitespace().next().unwrap_or("GET");
2094
2095                        let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
2096                        let response = format!(
2097                            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
2098                            body.len(),
2099                            body
2100                        );
2101                        let _ = stream.write_all(response.as_bytes()).await;
2102                    });
2103                }
2104            }
2105        });
2106
2107        (url, handle)
2108    }
2109
2110    async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
2111        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2112        let addr = listener.local_addr().unwrap();
2113        let url = format!("http://127.0.0.1:{}", addr.port());
2114
2115        let handle = tokio::spawn(async move {
2116            loop {
2117                if let Ok((mut stream, _)) = listener.accept().await {
2118                    let status = status;
2119                    tokio::spawn(async move {
2120                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
2121                        let mut buf = vec![0u8; 4096];
2122                        let _ = stream.read(&mut buf).await;
2123
2124                        let status_text = match status {
2125                            404 => "Not Found",
2126                            500 => "Internal Server Error",
2127                            _ => "Error",
2128                        };
2129                        let body = "error body";
2130                        let response = format!(
2131                            "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
2132                            status,
2133                            status_text,
2134                            body.len(),
2135                            body
2136                        );
2137                        let _ = stream.write_all(response.as_bytes()).await;
2138                    });
2139                }
2140            }
2141        });
2142
2143        (url, handle)
2144    }
2145
2146    #[tokio::test]
2147    async fn test_http_producer_get_request() {
2148        use tower::ServiceExt;
2149
2150        let (url, _handle) = start_test_server().await;
2151        let ctx = test_producer_ctx();
2152
2153        let component = HttpComponent::new();
2154        let endpoint_ctx = NoOpComponentContext;
2155        let endpoint = component
2156            .create_endpoint(
2157                &format!("{url}/api/test?allowPrivateIps=true"),
2158                &endpoint_ctx,
2159            )
2160            .unwrap();
2161        let producer = endpoint.create_producer(&ctx).unwrap();
2162
2163        let exchange = Exchange::new(Message::default());
2164        let result = producer.oneshot(exchange).await.unwrap();
2165
2166        let status = result
2167            .input
2168            .header("CamelHttpResponseCode")
2169            .and_then(|v| v.as_u64())
2170            .unwrap();
2171        assert_eq!(status, 200);
2172
2173        assert!(!result.input.body.is_empty());
2174    }
2175
2176    #[tokio::test]
2177    async fn test_http_producer_post_with_body() {
2178        use tower::ServiceExt;
2179
2180        let (url, _handle) = start_test_server().await;
2181        let ctx = test_producer_ctx();
2182
2183        let component = HttpComponent::new();
2184        let endpoint_ctx = NoOpComponentContext;
2185        let endpoint = component
2186            .create_endpoint(
2187                &format!("{url}/api/data?allowPrivateIps=true"),
2188                &endpoint_ctx,
2189            )
2190            .unwrap();
2191        let producer = endpoint.create_producer(&ctx).unwrap();
2192
2193        let exchange = Exchange::new(Message::new("request body"));
2194        let result = producer.oneshot(exchange).await.unwrap();
2195
2196        let status = result
2197            .input
2198            .header("CamelHttpResponseCode")
2199            .and_then(|v| v.as_u64())
2200            .unwrap();
2201        assert_eq!(status, 200);
2202    }
2203
2204    #[tokio::test]
2205    async fn test_http_producer_method_from_header() {
2206        use tower::ServiceExt;
2207
2208        let (url, _handle) = start_test_server().await;
2209        let ctx = test_producer_ctx();
2210
2211        let component = HttpComponent::new();
2212        let endpoint_ctx = NoOpComponentContext;
2213        let endpoint = component
2214            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2215            .unwrap();
2216        let producer = endpoint.create_producer(&ctx).unwrap();
2217
2218        let mut exchange = Exchange::new(Message::default());
2219        exchange.input.set_header(
2220            "CamelHttpMethod",
2221            serde_json::Value::String("DELETE".to_string()),
2222        );
2223
2224        let result = producer.oneshot(exchange).await.unwrap();
2225        let status = result
2226            .input
2227            .header("CamelHttpResponseCode")
2228            .and_then(|v| v.as_u64())
2229            .unwrap();
2230        assert_eq!(status, 200);
2231    }
2232
2233    #[tokio::test]
2234    async fn test_http_producer_forced_method() {
2235        use tower::ServiceExt;
2236
2237        let (url, _handle) = start_test_server().await;
2238        let ctx = test_producer_ctx();
2239
2240        let component = HttpComponent::new();
2241        let endpoint_ctx = NoOpComponentContext;
2242        let endpoint = component
2243            .create_endpoint(
2244                &format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"),
2245                &endpoint_ctx,
2246            )
2247            .unwrap();
2248        let producer = endpoint.create_producer(&ctx).unwrap();
2249
2250        let exchange = Exchange::new(Message::default());
2251        let result = producer.oneshot(exchange).await.unwrap();
2252
2253        let status = result
2254            .input
2255            .header("CamelHttpResponseCode")
2256            .and_then(|v| v.as_u64())
2257            .unwrap();
2258        assert_eq!(status, 200);
2259    }
2260
2261    #[tokio::test]
2262    async fn test_http_producer_throw_exception_on_failure() {
2263        use tower::ServiceExt;
2264
2265        let (url, _handle) = start_status_server(404).await;
2266        let ctx = test_producer_ctx();
2267
2268        let component = HttpComponent::new();
2269        let endpoint_ctx = NoOpComponentContext;
2270        let endpoint = component
2271            .create_endpoint(
2272                &format!("{url}/not-found?allowPrivateIps=true"),
2273                &endpoint_ctx,
2274            )
2275            .unwrap();
2276        let producer = endpoint.create_producer(&ctx).unwrap();
2277
2278        let exchange = Exchange::new(Message::default());
2279        let result = producer.oneshot(exchange).await;
2280        assert!(result.is_err());
2281
2282        match result.unwrap_err() {
2283            CamelError::HttpOperationFailed { status_code, .. } => {
2284                assert_eq!(status_code, 404);
2285            }
2286            e => panic!("Expected HttpOperationFailed, got: {e}"),
2287        }
2288    }
2289
2290    #[tokio::test]
2291    async fn test_http_producer_no_throw_on_failure() {
2292        use tower::ServiceExt;
2293
2294        let (url, _handle) = start_status_server(500).await;
2295        let ctx = test_producer_ctx();
2296
2297        let component = HttpComponent::new();
2298        let endpoint_ctx = NoOpComponentContext;
2299        let endpoint = component
2300            .create_endpoint(
2301                &format!("{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"),
2302                &endpoint_ctx,
2303            )
2304            .unwrap();
2305        let producer = endpoint.create_producer(&ctx).unwrap();
2306
2307        let exchange = Exchange::new(Message::default());
2308        let result = producer.oneshot(exchange).await.unwrap();
2309
2310        let status = result
2311            .input
2312            .header("CamelHttpResponseCode")
2313            .and_then(|v| v.as_u64())
2314            .unwrap();
2315        assert_eq!(status, 500);
2316    }
2317
2318    #[tokio::test]
2319    async fn test_http_producer_uri_override() {
2320        use tower::ServiceExt;
2321
2322        let (url, _handle) = start_test_server().await;
2323        let ctx = test_producer_ctx();
2324
2325        let component = HttpComponent::new();
2326        let endpoint_ctx = NoOpComponentContext;
2327        let endpoint = component
2328            .create_endpoint(
2329                "http://localhost:1/does-not-exist?allowPrivateIps=true",
2330                &endpoint_ctx,
2331            )
2332            .unwrap();
2333        let producer = endpoint.create_producer(&ctx).unwrap();
2334
2335        let mut exchange = Exchange::new(Message::default());
2336        exchange.input.set_header(
2337            "CamelHttpUri",
2338            serde_json::Value::String(format!("{url}/api")),
2339        );
2340
2341        let result = producer.oneshot(exchange).await.unwrap();
2342        let status = result
2343            .input
2344            .header("CamelHttpResponseCode")
2345            .and_then(|v| v.as_u64())
2346            .unwrap();
2347        assert_eq!(status, 200);
2348    }
2349
2350    #[tokio::test]
2351    async fn test_http_producer_response_headers_mapped() {
2352        use tower::ServiceExt;
2353
2354        let (url, _handle) = start_test_server().await;
2355        let ctx = test_producer_ctx();
2356
2357        let component = HttpComponent::new();
2358        let endpoint_ctx = NoOpComponentContext;
2359        let endpoint = component
2360            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2361            .unwrap();
2362        let producer = endpoint.create_producer(&ctx).unwrap();
2363
2364        let exchange = Exchange::new(Message::default());
2365        let result = producer.oneshot(exchange).await.unwrap();
2366
2367        assert!(
2368            result.input.header("Content-Type").is_some(),
2369            "Response should have Content-Type header"
2370        );
2371        assert!(result.input.header("CamelHttpResponseText").is_some());
2372    }
2373
2374    // -----------------------------------------------------------------------
2375    // Bug fix tests: Client configuration per-endpoint
2376    // -----------------------------------------------------------------------
2377
2378    async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
2379        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2380        let addr = listener.local_addr().unwrap();
2381        let url = format!("http://127.0.0.1:{}", addr.port());
2382
2383        let handle = tokio::spawn(async move {
2384            use tokio::io::{AsyncReadExt, AsyncWriteExt};
2385            loop {
2386                if let Ok((mut stream, _)) = listener.accept().await {
2387                    tokio::spawn(async move {
2388                        let mut buf = vec![0u8; 4096];
2389                        let n = stream.read(&mut buf).await.unwrap_or(0);
2390                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
2391
2392                        // Check if this is a request to /final
2393                        if request.contains("GET /final") {
2394                            let body = r#"{"status":"final"}"#;
2395                            let response = format!(
2396                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
2397                                body.len(),
2398                                body
2399                            );
2400                            let _ = stream.write_all(response.as_bytes()).await;
2401                        } else {
2402                            // Redirect to /final
2403                            let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
2404                            let _ = stream.write_all(response.as_bytes()).await;
2405                        }
2406                    });
2407                }
2408            }
2409        });
2410
2411        (url, handle)
2412    }
2413
2414    #[tokio::test]
2415    async fn test_follow_redirects_false_does_not_follow() {
2416        use tower::ServiceExt;
2417
2418        let (url, _handle) = start_redirect_server().await;
2419        let ctx = test_producer_ctx();
2420
2421        let component =
2422            HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
2423        let endpoint_ctx = NoOpComponentContext;
2424        let endpoint = component
2425            .create_endpoint(
2426                &format!("{url}?throwExceptionOnFailure=false&allowPrivateIps=true"),
2427                &endpoint_ctx,
2428            )
2429            .unwrap();
2430        let producer = endpoint.create_producer(&ctx).unwrap();
2431
2432        let exchange = Exchange::new(Message::default());
2433        let result = producer.oneshot(exchange).await.unwrap();
2434
2435        // Should get 302, NOT follow redirect to 200
2436        let status = result
2437            .input
2438            .header("CamelHttpResponseCode")
2439            .and_then(|v| v.as_u64())
2440            .unwrap();
2441        assert_eq!(
2442            status, 302,
2443            "Should NOT follow redirect when followRedirects=false"
2444        );
2445    }
2446
2447    #[tokio::test]
2448    async fn test_follow_redirects_true_follows_redirect() {
2449        use tower::ServiceExt;
2450
2451        let (url, _handle) = start_redirect_server().await;
2452        let ctx = test_producer_ctx();
2453
2454        let component =
2455            HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
2456        let endpoint_ctx = NoOpComponentContext;
2457        let endpoint = component
2458            .create_endpoint(&format!("{url}?allowPrivateIps=true"), &endpoint_ctx)
2459            .unwrap();
2460        let producer = endpoint.create_producer(&ctx).unwrap();
2461
2462        let exchange = Exchange::new(Message::default());
2463        let result = producer.oneshot(exchange).await.unwrap();
2464
2465        // Should follow redirect and get 200
2466        let status = result
2467            .input
2468            .header("CamelHttpResponseCode")
2469            .and_then(|v| v.as_u64())
2470            .unwrap();
2471        assert_eq!(
2472            status, 200,
2473            "Should follow redirect when followRedirects=true"
2474        );
2475    }
2476
2477    #[tokio::test]
2478    async fn test_query_params_forwarded_to_http_request() {
2479        use tower::ServiceExt;
2480
2481        let (url, _handle) = start_test_server().await;
2482        let ctx = test_producer_ctx();
2483
2484        let component = HttpComponent::new();
2485        let endpoint_ctx = NoOpComponentContext;
2486        // apiKey is NOT a Camel option, should be forwarded as query param
2487        let endpoint = component
2488            .create_endpoint(
2489                &format!("{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"),
2490                &endpoint_ctx,
2491            )
2492            .unwrap();
2493        let producer = endpoint.create_producer(&ctx).unwrap();
2494
2495        let exchange = Exchange::new(Message::default());
2496        let result = producer.oneshot(exchange).await.unwrap();
2497
2498        // The test server returns the request info in response
2499        // We just verify it succeeds (the query param was sent)
2500        let status = result
2501            .input
2502            .header("CamelHttpResponseCode")
2503            .and_then(|v| v.as_u64())
2504            .unwrap();
2505        assert_eq!(status, 200);
2506    }
2507
2508    #[tokio::test]
2509    async fn test_non_camel_query_params_are_forwarded() {
2510        // This test verifies Bug #3 fix: non-Camel options should be forwarded
2511        // We'll test the config parsing, not the actual HTTP call
2512        let config = HttpEndpointConfig::from_uri(
2513            "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
2514        )
2515        .unwrap();
2516
2517        // apiKey and token are NOT Camel options, should be forwarded
2518        assert!(
2519            config.query_params.contains_key("apiKey"),
2520            "apiKey should be preserved"
2521        );
2522        assert!(
2523            config.query_params.contains_key("token"),
2524            "token should be preserved"
2525        );
2526        assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
2527        assert_eq!(config.query_params.get("token").unwrap(), "abc456");
2528
2529        // httpMethod IS a Camel option, should NOT be in query_params
2530        assert!(
2531            !config.query_params.contains_key("httpMethod"),
2532            "httpMethod should not be forwarded"
2533        );
2534    }
2535
2536    #[test]
2537    fn test_query_params_are_url_encoded_when_resolving_url() {
2538        let config =
2539            HttpEndpointConfig::from_uri("http://example.com/api?q=hello world&tag=a+b").unwrap();
2540        let exchange = Exchange::new(Message::default());
2541
2542        let url = HttpProducer::resolve_url(&exchange, &config);
2543
2544        assert!(url.contains("q=hello+world"), "url was: {url}");
2545        assert!(url.contains("tag=a%2Bb"), "url was: {url}");
2546    }
2547
2548    // -----------------------------------------------------------------------
2549    // Timeout tests (HTTP-004)
2550    // -----------------------------------------------------------------------
2551
2552    async fn start_slow_server(delay_ms: u64) -> (String, tokio::task::JoinHandle<()>) {
2553        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2554        let addr = listener.local_addr().unwrap();
2555        let url = format!("http://127.0.0.1:{}", addr.port());
2556
2557        let handle = tokio::spawn(async move {
2558            loop {
2559                if let Ok((mut stream, _)) = listener.accept().await {
2560                    let delay = delay_ms;
2561                    tokio::spawn(async move {
2562                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
2563                        let mut buf = vec![0u8; 4096];
2564                        let _ = stream.read(&mut buf).await;
2565                        // Send headers immediately (no Content-Length → chunked)
2566                        let headers = "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nTransfer-Encoding: chunked\r\n\r\n";
2567                        let _ = stream.write_all(headers.as_bytes()).await;
2568                        // Delay before sending body chunk
2569                        tokio::time::sleep(Duration::from_millis(delay)).await;
2570                        let body = r#"{"status":"slow"}"#;
2571                        let chunk = format!("{:x}\r\n{}\r\n0\r\n\r\n", body.len(), body);
2572                        let _ = stream.write_all(chunk.as_bytes()).await;
2573                    });
2574                }
2575            }
2576        });
2577
2578        (url, handle)
2579    }
2580
2581    #[tokio::test]
2582    async fn test_http_producer_timeout() {
2583        use tower::ServiceExt;
2584
2585        // Server delays 500ms, client timeout is 100ms → should timeout
2586        let (url, _handle) = start_slow_server(500).await;
2587        let ctx = test_producer_ctx();
2588
2589        let component = HttpComponent::with_config(
2590            HttpConfig::default()
2591                .with_read_timeout_ms(100)
2592                .with_response_timeout_ms(30_000), // generous response timeout
2593        );
2594        let endpoint_ctx = NoOpComponentContext;
2595        let endpoint = component
2596            .create_endpoint(&format!("{url}/slow?allowPrivateIps=true"), &endpoint_ctx)
2597            .unwrap();
2598        let producer = endpoint.create_producer(&ctx).unwrap();
2599
2600        let exchange = Exchange::new(Message::default());
2601        let result = producer.oneshot(exchange).await;
2602
2603        assert!(result.is_err(), "Expected timeout error, got: {:?}", result);
2604        let err = result.unwrap_err().to_string();
2605        assert!(
2606            err.contains("Read timeout") || err.contains("timeout"),
2607            "Error should mention timeout, got: {}",
2608            err
2609        );
2610    }
2611
2612    #[tokio::test]
2613    async fn test_http_producer_no_timeout_when_fast() {
2614        use tower::ServiceExt;
2615
2616        let (url, _handle) = start_test_server().await;
2617        let ctx = test_producer_ctx();
2618
2619        let component =
2620            HttpComponent::with_config(HttpConfig::default().with_read_timeout_ms(5_000));
2621        let endpoint_ctx = NoOpComponentContext;
2622        let endpoint = component
2623            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2624            .unwrap();
2625        let producer = endpoint.create_producer(&ctx).unwrap();
2626
2627        let exchange = Exchange::new(Message::default());
2628        let result = producer.oneshot(exchange).await.unwrap();
2629
2630        let status = result
2631            .input
2632            .header("CamelHttpResponseCode")
2633            .and_then(|v| v.as_u64())
2634            .unwrap();
2635        assert_eq!(status, 200);
2636    }
2637
2638    // -----------------------------------------------------------------------
2639    // SSRF Protection tests
2640    // -----------------------------------------------------------------------
2641
2642    #[tokio::test]
2643    async fn test_http_producer_blocks_metadata_endpoint() {
2644        use tower::ServiceExt;
2645
2646        let ctx = test_producer_ctx();
2647        let component = HttpComponent::new();
2648        let endpoint_ctx = NoOpComponentContext;
2649        let endpoint = component
2650            .create_endpoint(
2651                "http://example.com/api?allowPrivateIps=false",
2652                &endpoint_ctx,
2653            )
2654            .unwrap();
2655        let producer = endpoint.create_producer(&ctx).unwrap();
2656
2657        let mut exchange = Exchange::new(Message::default());
2658        exchange.input.set_header(
2659            "CamelHttpUri",
2660            serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
2661        );
2662
2663        let result = producer.oneshot(exchange).await;
2664        assert!(result.is_err(), "Should block AWS metadata endpoint");
2665
2666        let err = result.unwrap_err();
2667        assert!(
2668            err.to_string().contains("Private IP"),
2669            "Error should mention private IP blocking, got: {}",
2670            err
2671        );
2672    }
2673
2674    #[test]
2675    fn test_ssrf_config_defaults() {
2676        let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
2677        assert!(
2678            !config.allow_private_ips,
2679            "Private IPs should be blocked by default"
2680        );
2681        assert!(
2682            config.blocked_hosts.is_empty(),
2683            "Blocked hosts should be empty by default"
2684        );
2685    }
2686
2687    #[test]
2688    fn test_ssrf_config_allow_private_ips() {
2689        let config =
2690            HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
2691        assert!(
2692            config.allow_private_ips,
2693            "Private IPs should be allowed when explicitly set"
2694        );
2695    }
2696
2697    #[test]
2698    fn test_ssrf_config_blocked_hosts() {
2699        let config = HttpEndpointConfig::from_uri(
2700            "http://example.com/api?blockedHosts=evil.com,malware.net",
2701        )
2702        .unwrap();
2703        assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
2704    }
2705
2706    #[tokio::test]
2707    async fn test_http_producer_blocks_localhost() {
2708        use tower::ServiceExt;
2709
2710        let ctx = test_producer_ctx();
2711        let component = HttpComponent::new();
2712        let endpoint_ctx = NoOpComponentContext;
2713        let endpoint = component
2714            .create_endpoint("http://example.com/api", &endpoint_ctx)
2715            .unwrap();
2716        let producer = endpoint.create_producer(&ctx).unwrap();
2717
2718        let mut exchange = Exchange::new(Message::default());
2719        exchange.input.set_header(
2720            "CamelHttpUri",
2721            serde_json::Value::String("http://localhost:8080/internal".to_string()),
2722        );
2723
2724        let result = producer.oneshot(exchange).await;
2725        assert!(result.is_err(), "Should block localhost");
2726    }
2727
2728    #[tokio::test]
2729    async fn test_http_producer_blocks_loopback_ip() {
2730        use tower::ServiceExt;
2731
2732        let ctx = test_producer_ctx();
2733        let component = HttpComponent::new();
2734        let endpoint_ctx = NoOpComponentContext;
2735        let endpoint = component
2736            .create_endpoint("http://example.com/api", &endpoint_ctx)
2737            .unwrap();
2738        let producer = endpoint.create_producer(&ctx).unwrap();
2739
2740        let mut exchange = Exchange::new(Message::default());
2741        exchange.input.set_header(
2742            "CamelHttpUri",
2743            serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
2744        );
2745
2746        let result = producer.oneshot(exchange).await;
2747        assert!(result.is_err(), "Should block loopback IP");
2748    }
2749
2750    #[tokio::test]
2751    async fn test_http_producer_allows_private_ip_when_enabled() {
2752        use tower::ServiceExt;
2753
2754        let ctx = test_producer_ctx();
2755        let component = HttpComponent::new();
2756        let endpoint_ctx = NoOpComponentContext;
2757        // With allowPrivateIps=true, the validation should pass
2758        // (actual connection will fail, but that's expected)
2759        let endpoint = component
2760            .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true", &endpoint_ctx)
2761            .unwrap();
2762        let producer = endpoint.create_producer(&ctx).unwrap();
2763
2764        let exchange = Exchange::new(Message::default());
2765
2766        // The request will fail because we can't connect, but it should NOT fail
2767        // due to SSRF protection
2768        let result = producer.oneshot(exchange).await;
2769        // We expect connection error, not SSRF error
2770        if let Err(ref e) = result {
2771            let err_str = e.to_string();
2772            assert!(
2773                !err_str.contains("Private IP") && !err_str.contains("not allowed"),
2774                "Should not be SSRF error, got: {}",
2775                err_str
2776            );
2777        }
2778    }
2779
2780    // -----------------------------------------------------------------------
2781    // HttpServerConfig tests
2782    // -----------------------------------------------------------------------
2783
2784    #[test]
2785    fn test_http_server_config_parse() {
2786        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
2787        assert_eq!(cfg.host, "0.0.0.0");
2788        assert_eq!(cfg.port, 8080);
2789        assert_eq!(cfg.path, "/orders");
2790        assert_eq!(cfg.max_inflight_requests, 1024);
2791    }
2792
2793    #[test]
2794    fn test_http_server_config_scheme() {
2795        // UriConfig trait method returns "http" as primary scheme
2796        assert_eq!(HttpServerConfig::scheme(), "http");
2797    }
2798
2799    #[test]
2800    fn test_http_server_config_from_components() {
2801        // Test from_components directly (trait method)
2802        let components = camel_component_api::UriComponents {
2803            scheme: "https".to_string(),
2804            path: "//0.0.0.0:8443/api".to_string(),
2805            params: std::collections::HashMap::from([
2806                ("maxRequestBody".to_string(), "5242880".to_string()),
2807                ("maxInflightRequests".to_string(), "7".to_string()),
2808            ]),
2809        };
2810        let cfg = HttpServerConfig::from_components(components).unwrap();
2811        assert_eq!(cfg.host, "0.0.0.0");
2812        assert_eq!(cfg.port, 8443);
2813        assert_eq!(cfg.path, "/api");
2814        assert_eq!(cfg.max_request_body, 5242880);
2815        assert_eq!(cfg.max_inflight_requests, 7);
2816    }
2817
2818    #[test]
2819    fn test_http_server_config_default_path() {
2820        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
2821        assert_eq!(cfg.path, "/");
2822    }
2823
2824    #[test]
2825    fn test_http_server_config_wrong_scheme() {
2826        assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
2827    }
2828
2829    #[test]
2830    fn test_http_server_config_invalid_port() {
2831        assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
2832    }
2833
2834    #[test]
2835    fn test_http_server_config_default_port_by_scheme() {
2836        // HTTP without explicit port should default to 80
2837        let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
2838        assert_eq!(cfg_http.port, 80);
2839
2840        // HTTPS without explicit port should default to 443
2841        let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
2842        assert_eq!(cfg_https.port, 443);
2843    }
2844
2845    #[test]
2846    fn test_request_envelope_and_reply_are_send() {
2847        fn assert_send<T: Send>() {}
2848        assert_send::<RequestEnvelope>();
2849        assert_send::<HttpReply>();
2850    }
2851
2852    // -----------------------------------------------------------------------
2853    // ServerRegistry tests
2854    // -----------------------------------------------------------------------
2855
2856    /// Serializes tests that mutate or depend on the global `ServerRegistry`.
2857    ///
2858    /// `ServerRegistry::global()` is a process-wide singleton that persists
2859    /// across tests. `ServerRegistry::reset()` clears ALL entries; if it races
2860    /// with another test that has a live server on a fixed port (e.g. 9991),
2861    /// the registry entry is removed while the OS socket is still bound, so
2862    /// the next `get_or_spawn` call on that port fails with "Address already
2863    /// in use". Holding this mutex for the full body of each affected test
2864    /// prevents the race without requiring `--test-threads=1`.
2865    static REGISTRY_TEST_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
2866
2867    #[test]
2868    fn test_server_registry_global_is_singleton() {
2869        let r1 = ServerRegistry::global();
2870        let r2 = ServerRegistry::global();
2871        assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
2872    }
2873
2874    #[tokio::test]
2875    async fn test_concurrent_get_or_spawn_returns_same_dispatch() {
2876        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2877        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2878        let port = listener.local_addr().unwrap().port();
2879        drop(listener);
2880
2881        let results: Arc<std::sync::Mutex<Vec<DispatchTable>>> =
2882            Arc::new(std::sync::Mutex::new(Vec::new()));
2883
2884        let mut handles = Vec::new();
2885        for _ in 0..4 {
2886            let results = results.clone();
2887            handles.push(tokio::spawn(async move {
2888                let dispatch = ServerRegistry::global()
2889                    .get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2890                    .await
2891                    .unwrap();
2892                results.lock().unwrap().push(dispatch);
2893            }));
2894        }
2895
2896        for h in handles {
2897            h.await.unwrap();
2898        }
2899
2900        let dispatches = results.lock().unwrap();
2901        assert_eq!(dispatches.len(), 4);
2902        for i in 1..dispatches.len() {
2903            assert!(
2904                Arc::ptr_eq(&dispatches[0], &dispatches[i]),
2905                "all concurrent callers should get the same dispatch table"
2906            );
2907        }
2908    }
2909
2910    #[test]
2911    fn test_server_registry_distinguishes_host_and_port() {
2912        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2913        let rt = tokio::runtime::Runtime::new().expect("runtime");
2914        rt.block_on(async {
2915            let registry = ServerRegistry::global();
2916            // Use two distinct host values with same configured port key.
2917            // Port 0 is acceptable here because the registry key uses the configured
2918            // tuple, not the OS-assigned ephemeral port.
2919            let d1 = registry
2920                .get_or_spawn("127.0.0.1", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2921                .await;
2922            let d2 = registry
2923                .get_or_spawn("0.0.0.0", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2924                .await;
2925            assert!(d1.is_ok());
2926            assert!(d2.is_ok());
2927            assert!(!Arc::ptr_eq(&d1.unwrap(), &d2.unwrap()));
2928        });
2929    }
2930
2931    #[tokio::test]
2932    async fn test_shared_server_max_request_body_policy_is_deterministic() {
2933        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2934        let registry = ServerRegistry::global();
2935        // First registration: maxRequestBody = 1 MB
2936        let d1 = registry
2937            .get_or_spawn("127.0.0.1", 9991, 1024 * 1024, 10 * 1024 * 1024, 1024)
2938            .await;
2939        assert!(d1.is_ok());
2940
2941        // Second registration on same (host,port): maxRequestBody = 2 MB
2942        // Expected: explicit EndpointCreationFailed about incompatible maxRequestBody
2943        let d2 = registry
2944            .get_or_spawn("127.0.0.1", 9991, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2945            .await;
2946        assert!(d2.is_err());
2947        let err = d2.unwrap_err();
2948        assert!(
2949            err.to_string().contains("maxRequestBody") || err.to_string().contains("incompatible"),
2950            "Expected incompatible maxRequestBody error, got: {}",
2951            err
2952        );
2953    }
2954
2955    #[test]
2956    fn test_server_registry_reset_clears_entries() {
2957        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2958        let rt = tokio::runtime::Runtime::new().expect("runtime");
2959        rt.block_on(async {
2960            // Register something on a unique port
2961            let d1 = ServerRegistry::global()
2962                .get_or_spawn("127.0.0.1", 9992, 1024 * 1024, 10 * 1024 * 1024, 1024)
2963                .await;
2964            assert!(d1.is_ok());
2965
2966            // Verify entry exists
2967            let guard = ServerRegistry::global().inner.lock().expect("lock");
2968            assert!(guard.contains_key(&("127.0.0.1".to_string(), 9992)));
2969            drop(guard);
2970
2971            // Reset
2972            ServerRegistry::reset();
2973
2974            // Verify cleared
2975            let guard = ServerRegistry::global().inner.lock().expect("lock");
2976            assert!(
2977                guard.is_empty(),
2978                "registry should be empty after reset, has {} entries",
2979                guard.len()
2980            );
2981        });
2982    }
2983
2984    // -----------------------------------------------------------------------
2985    // Axum dispatch handler tests
2986    // -----------------------------------------------------------------------
2987
2988    #[tokio::test]
2989    async fn test_dispatch_handler_returns_404_for_unknown_path() {
2990        let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
2991        // Nothing registered in the dispatch table
2992        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2993        let port = listener.local_addr().unwrap().port();
2994        tokio::spawn(run_axum_server(
2995            listener,
2996            dispatch,
2997            2 * 1024 * 1024,
2998            10 * 1024 * 1024,
2999            Arc::new(tokio::sync::Semaphore::new(1024)),
3000        ));
3001
3002        // Wait for server to start
3003        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3004
3005        let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
3006            .await
3007            .unwrap();
3008        assert_eq!(resp.status().as_u16(), 404);
3009    }
3010
3011    // -----------------------------------------------------------------------
3012    // HttpConsumer tests
3013    // -----------------------------------------------------------------------
3014
3015    #[tokio::test]
3016    async fn test_http_consumer_start_registers_path() {
3017        use camel_component_api::ConsumerContext;
3018
3019        // Get an OS-assigned free port
3020        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3021        let port = listener.local_addr().unwrap().port();
3022        drop(listener); // Release port — ServerRegistry will rebind it
3023
3024        let consumer_cfg = HttpServerConfig {
3025            host: "127.0.0.1".to_string(),
3026            port,
3027            path: "/ping".to_string(),
3028            max_request_body: 2 * 1024 * 1024,
3029            max_response_body: 10 * 1024 * 1024,
3030            max_inflight_requests: 1024,
3031        };
3032        let mut consumer = HttpConsumer::new(consumer_cfg);
3033
3034        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3035        let token = tokio_util::sync::CancellationToken::new();
3036        let ctx = ConsumerContext::new(tx, token.clone());
3037
3038        tokio::spawn(async move {
3039            consumer.start(ctx).await.unwrap();
3040        });
3041
3042        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3043
3044        let client = reqwest::Client::new();
3045        let resp_future = client
3046            .post(format!("http://127.0.0.1:{port}/ping"))
3047            .body("hello world")
3048            .send();
3049
3050        let (http_result, _) = tokio::join!(resp_future, async {
3051            if let Some(mut envelope) = rx.recv().await {
3052                // Set a custom status code
3053                envelope.exchange.input.set_header(
3054                    "CamelHttpResponseCode",
3055                    serde_json::Value::Number(201.into()),
3056                );
3057                if let Some(reply_tx) = envelope.reply_tx {
3058                    let _ = reply_tx.send(Ok(envelope.exchange));
3059                }
3060            }
3061        });
3062
3063        let resp = http_result.unwrap();
3064        assert_eq!(resp.status().as_u16(), 201);
3065
3066        token.cancel();
3067    }
3068
3069    #[tokio::test]
3070    async fn test_http_consumer_returns_503_when_inflight_limit_reached() {
3071        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3072
3073        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3074        let port = listener.local_addr().unwrap().port();
3075        drop(listener);
3076
3077        let consumer_cfg = HttpServerConfig {
3078            host: "127.0.0.1".to_string(),
3079            port,
3080            path: "/saturation".to_string(),
3081            max_request_body: 2 * 1024 * 1024,
3082            max_response_body: 10 * 1024 * 1024,
3083            max_inflight_requests: 1,
3084        };
3085        let mut consumer = HttpConsumer::new(consumer_cfg);
3086
3087        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3088        let token = tokio_util::sync::CancellationToken::new();
3089        let ctx = ConsumerContext::new(tx, token.clone());
3090        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3091        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3092
3093        let (first_seen_tx, first_seen_rx) = tokio::sync::oneshot::channel::<()>();
3094        let (unblock_first_tx, unblock_first_rx) = tokio::sync::oneshot::channel::<()>();
3095
3096        tokio::spawn(async move {
3097            let mut first_seen_tx = Some(first_seen_tx);
3098            let mut unblock_first_rx = Some(unblock_first_rx);
3099
3100            while let Some(envelope) = rx.recv().await {
3101                if let Some(tx) = first_seen_tx.take() {
3102                    let _ = tx.send(());
3103                    if let Some(rx_unblock) = unblock_first_rx.take() {
3104                        let _ = rx_unblock.await;
3105                    }
3106                }
3107
3108                if let Some(reply_tx) = envelope.reply_tx {
3109                    let _ = reply_tx.send(Ok(envelope.exchange));
3110                }
3111            }
3112        });
3113
3114        let client = reqwest::Client::new();
3115        let first_req = {
3116            let client = client.clone();
3117            async move {
3118                client
3119                    .get(format!("http://127.0.0.1:{port}/saturation"))
3120                    .send()
3121                    .await
3122                    .unwrap()
3123            }
3124        };
3125
3126        let first_handle = tokio::spawn(first_req);
3127        first_seen_rx.await.unwrap();
3128
3129        let second_resp = client
3130            .get(format!("http://127.0.0.1:{port}/saturation"))
3131            .send()
3132            .await
3133            .unwrap();
3134
3135        assert_eq!(second_resp.status().as_u16(), 503);
3136
3137        let _ = unblock_first_tx.send(());
3138        let first_resp = first_handle.await.unwrap();
3139        assert_eq!(first_resp.status().as_u16(), 200);
3140
3141        token.cancel();
3142    }
3143
3144    #[tokio::test]
3145    async fn test_http_consumer_enforces_max_response_body_for_bytes() {
3146        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3147
3148        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3149        let port = listener.local_addr().unwrap().port();
3150        drop(listener);
3151
3152        let consumer_cfg = HttpServerConfig {
3153            host: "127.0.0.1".to_string(),
3154            port,
3155            path: "/limit-bytes".to_string(),
3156            max_request_body: 2 * 1024 * 1024,
3157            max_response_body: 16,
3158            max_inflight_requests: 1024,
3159        };
3160        let mut consumer = HttpConsumer::new(consumer_cfg);
3161
3162        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3163        let token = tokio_util::sync::CancellationToken::new();
3164        let ctx = ConsumerContext::new(tx, token.clone());
3165        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3166        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3167
3168        let client = reqwest::Client::new();
3169        let send_fut = client
3170            .get(format!("http://127.0.0.1:{port}/limit-bytes"))
3171            .send();
3172
3173        let (http_result, _) = tokio::join!(send_fut, async {
3174            if let Some(mut envelope) = rx.recv().await {
3175                envelope.exchange.input.body =
3176                    camel_component_api::Body::Bytes(bytes::Bytes::from(vec![b'x'; 32]));
3177                if let Some(reply_tx) = envelope.reply_tx {
3178                    let _ = reply_tx.send(Ok(envelope.exchange));
3179                }
3180            }
3181        });
3182
3183        let resp = http_result.unwrap();
3184        assert_eq!(resp.status().as_u16(), 500);
3185        let body = resp.text().await.unwrap();
3186        assert_eq!(body, "Response body exceeds configured limit");
3187        token.cancel();
3188    }
3189
3190    #[tokio::test]
3191    async fn test_http_consumer_enforces_max_response_body_for_json() {
3192        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3193
3194        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3195        let port = listener.local_addr().unwrap().port();
3196        drop(listener);
3197
3198        let consumer_cfg = HttpServerConfig {
3199            host: "127.0.0.1".to_string(),
3200            port,
3201            path: "/limit-json".to_string(),
3202            max_request_body: 2 * 1024 * 1024,
3203            max_response_body: 16,
3204            max_inflight_requests: 1024,
3205        };
3206        let mut consumer = HttpConsumer::new(consumer_cfg);
3207
3208        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3209        let token = tokio_util::sync::CancellationToken::new();
3210        let ctx = ConsumerContext::new(tx, token.clone());
3211        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3212        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3213
3214        let client = reqwest::Client::new();
3215        let send_fut = client
3216            .get(format!("http://127.0.0.1:{port}/limit-json"))
3217            .send();
3218
3219        let (http_result, _) = tokio::join!(send_fut, async {
3220            if let Some(mut envelope) = rx.recv().await {
3221                envelope.exchange.input.body = camel_component_api::Body::Json(
3222                    serde_json::json!({"message":"this response is bigger than sixteen"}),
3223                );
3224                if let Some(reply_tx) = envelope.reply_tx {
3225                    let _ = reply_tx.send(Ok(envelope.exchange));
3226                }
3227            }
3228        });
3229
3230        let resp = http_result.unwrap();
3231        assert_eq!(resp.status().as_u16(), 500);
3232        let body = resp.text().await.unwrap();
3233        assert_eq!(body, "Response body exceeds configured limit");
3234        token.cancel();
3235    }
3236
3237    #[tokio::test]
3238    async fn test_http_consumer_enforces_max_response_body_for_xml() {
3239        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3240
3241        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3242        let port = listener.local_addr().unwrap().port();
3243        drop(listener);
3244
3245        let consumer_cfg = HttpServerConfig {
3246            host: "127.0.0.1".to_string(),
3247            port,
3248            path: "/limit-xml".to_string(),
3249            max_request_body: 2 * 1024 * 1024,
3250            max_response_body: 16,
3251            max_inflight_requests: 1024,
3252        };
3253        let mut consumer = HttpConsumer::new(consumer_cfg);
3254
3255        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3256        let token = tokio_util::sync::CancellationToken::new();
3257        let ctx = ConsumerContext::new(tx, token.clone());
3258        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3259        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3260
3261        let client = reqwest::Client::new();
3262        let send_fut = client
3263            .get(format!("http://127.0.0.1:{port}/limit-xml"))
3264            .send();
3265
3266        let (http_result, _) = tokio::join!(send_fut, async {
3267            if let Some(mut envelope) = rx.recv().await {
3268                envelope.exchange.input.body = camel_component_api::Body::Xml(
3269                    "<root><value>way-too-large</value></root>".into(),
3270                );
3271                if let Some(reply_tx) = envelope.reply_tx {
3272                    let _ = reply_tx.send(Ok(envelope.exchange));
3273                }
3274            }
3275        });
3276
3277        let resp = http_result.unwrap();
3278        assert_eq!(resp.status().as_u16(), 500);
3279        let body = resp.text().await.unwrap();
3280        assert_eq!(body, "Response body exceeds configured limit");
3281        token.cancel();
3282    }
3283
3284    #[tokio::test]
3285    async fn test_http_consumer_does_not_enforce_max_response_body_for_stream() {
3286        use camel_component_api::{
3287            CamelError, ConsumerContext, ExchangeEnvelope, StreamBody, StreamMetadata,
3288        };
3289        use futures::stream;
3290
3291        let listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();
3292        let port = listener.local_addr().unwrap().port();
3293        drop(listener);
3294
3295        let consumer_cfg = HttpServerConfig {
3296            host: "0.0.0.0".to_string(),
3297            port,
3298            path: "/limit-stream".to_string(),
3299            max_request_body: 2 * 1024 * 1024,
3300            max_response_body: 16,
3301            max_inflight_requests: 1024,
3302        };
3303        let mut consumer = HttpConsumer::new(consumer_cfg);
3304
3305        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3306        let token = tokio_util::sync::CancellationToken::new();
3307        let ctx = ConsumerContext::new(tx, token.clone());
3308        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3309        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3310
3311        let client = reqwest::Client::new();
3312        let send_fut = client
3313            .get(format!("http://127.0.0.1:{port}/limit-stream"))
3314            .send();
3315
3316        let (http_result, _) = tokio::join!(send_fut, async {
3317            if let Some(mut envelope) = rx.recv().await {
3318                let chunks: Vec<Result<bytes::Bytes, CamelError>> =
3319                    vec![Ok(bytes::Bytes::from(vec![b'x'; 32]))];
3320                let stream = Box::pin(stream::iter(chunks));
3321                envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
3322                    stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
3323                    metadata: StreamMetadata {
3324                        size_hint: Some(32),
3325                        content_type: Some("application/octet-stream".into()),
3326                        origin: None,
3327                    },
3328                });
3329                if let Some(reply_tx) = envelope.reply_tx {
3330                    let _ = reply_tx.send(Ok(envelope.exchange));
3331                }
3332            }
3333        });
3334
3335        let resp = http_result.unwrap();
3336        assert_eq!(resp.status().as_u16(), 200);
3337        let body = resp.bytes().await.unwrap();
3338        assert_eq!(body.len(), 32);
3339        token.cancel();
3340    }
3341
3342    // -----------------------------------------------------------------------
3343    // Integration tests
3344    // -----------------------------------------------------------------------
3345
3346    #[tokio::test]
3347    async fn test_integration_single_consumer_round_trip() {
3348        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3349
3350        // Get an OS-assigned free port (ephemeral)
3351        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3352        let port = listener.local_addr().unwrap().port();
3353        drop(listener); // Release — ServerRegistry will rebind
3354
3355        let component = HttpComponent::new();
3356        let endpoint_ctx = NoOpComponentContext;
3357        let endpoint = component
3358            .create_endpoint(&format!("http://127.0.0.1:{port}/echo"), &endpoint_ctx)
3359            .unwrap();
3360        let mut consumer = endpoint.create_consumer().unwrap();
3361
3362        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3363        let token = tokio_util::sync::CancellationToken::new();
3364        let ctx = ConsumerContext::new(tx, token.clone());
3365
3366        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3367        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3368
3369        let client = reqwest::Client::new();
3370        let send_fut = client
3371            .post(format!("http://127.0.0.1:{port}/echo"))
3372            .header("Content-Type", "text/plain")
3373            .body("ping")
3374            .send();
3375
3376        let (http_result, _) = tokio::join!(send_fut, async {
3377            if let Some(mut envelope) = rx.recv().await {
3378                assert_eq!(
3379                    envelope.exchange.input.header("CamelHttpMethod"),
3380                    Some(&serde_json::Value::String("POST".into()))
3381                );
3382                assert_eq!(
3383                    envelope.exchange.input.header("CamelHttpPath"),
3384                    Some(&serde_json::Value::String("/echo".into()))
3385                );
3386                envelope.exchange.input.body = camel_component_api::Body::Text("pong".to_string());
3387                if let Some(reply_tx) = envelope.reply_tx {
3388                    let _ = reply_tx.send(Ok(envelope.exchange));
3389                }
3390            }
3391        });
3392
3393        let resp = http_result.unwrap();
3394        assert_eq!(resp.status().as_u16(), 200);
3395        let body = resp.text().await.unwrap();
3396        assert_eq!(body, "pong");
3397
3398        token.cancel();
3399    }
3400
3401    #[tokio::test]
3402    async fn test_integration_two_consumers_shared_port() {
3403        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3404
3405        // Get an OS-assigned free port (ephemeral)
3406        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3407        let port = listener.local_addr().unwrap().port();
3408        drop(listener);
3409
3410        let component = HttpComponent::new();
3411        let endpoint_ctx = NoOpComponentContext;
3412
3413        // Consumer A: /hello
3414        let endpoint_a = component
3415            .create_endpoint(&format!("http://127.0.0.1:{port}/hello"), &endpoint_ctx)
3416            .unwrap();
3417        let mut consumer_a = endpoint_a.create_consumer().unwrap();
3418
3419        // Consumer B: /world
3420        let endpoint_b = component
3421            .create_endpoint(&format!("http://127.0.0.1:{port}/world"), &endpoint_ctx)
3422            .unwrap();
3423        let mut consumer_b = endpoint_b.create_consumer().unwrap();
3424
3425        let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3426        let token_a = tokio_util::sync::CancellationToken::new();
3427        let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
3428
3429        let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3430        let token_b = tokio_util::sync::CancellationToken::new();
3431        let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
3432
3433        tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
3434        tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
3435        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3436
3437        let client = reqwest::Client::new();
3438
3439        // Request to /hello
3440        let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
3441        let (resp_hello, _) = tokio::join!(fut_hello, async {
3442            if let Some(mut envelope) = rx_a.recv().await {
3443                envelope.exchange.input.body =
3444                    camel_component_api::Body::Text("hello-response".to_string());
3445                if let Some(reply_tx) = envelope.reply_tx {
3446                    let _ = reply_tx.send(Ok(envelope.exchange));
3447                }
3448            }
3449        });
3450
3451        // Request to /world
3452        let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
3453        let (resp_world, _) = tokio::join!(fut_world, async {
3454            if let Some(mut envelope) = rx_b.recv().await {
3455                envelope.exchange.input.body =
3456                    camel_component_api::Body::Text("world-response".to_string());
3457                if let Some(reply_tx) = envelope.reply_tx {
3458                    let _ = reply_tx.send(Ok(envelope.exchange));
3459                }
3460            }
3461        });
3462
3463        let body_a = resp_hello.unwrap().text().await.unwrap();
3464        let body_b = resp_world.unwrap().text().await.unwrap();
3465
3466        assert_eq!(body_a, "hello-response");
3467        assert_eq!(body_b, "world-response");
3468
3469        token_a.cancel();
3470        token_b.cancel();
3471    }
3472
3473    #[tokio::test]
3474    async fn test_integration_unregistered_path_returns_404() {
3475        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3476
3477        // Get an OS-assigned free port (ephemeral)
3478        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3479        let port = listener.local_addr().unwrap().port();
3480        drop(listener);
3481
3482        let component = HttpComponent::new();
3483        let endpoint_ctx = NoOpComponentContext;
3484        let endpoint = component
3485            .create_endpoint(
3486                &format!("http://127.0.0.1:{port}/registered"),
3487                &endpoint_ctx,
3488            )
3489            .unwrap();
3490        let mut consumer = endpoint.create_consumer().unwrap();
3491
3492        let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3493        let token = tokio_util::sync::CancellationToken::new();
3494        let ctx = ConsumerContext::new(tx, token.clone());
3495
3496        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3497
3498        // Wait until the server is actually accepting connections (CI runners can be slow).
3499        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
3500        loop {
3501            if tokio::net::TcpStream::connect(format!("127.0.0.1:{port}"))
3502                .await
3503                .is_ok()
3504            {
3505                break;
3506            }
3507            if std::time::Instant::now() >= deadline {
3508                panic!("HTTP server did not start within 5s on port {port}");
3509            }
3510            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3511        }
3512
3513        let client = reqwest::Client::new();
3514        let resp = client
3515            .get(format!("http://127.0.0.1:{port}/not-there"))
3516            .send()
3517            .await
3518            .unwrap();
3519        assert_eq!(resp.status().as_u16(), 404);
3520
3521        token.cancel();
3522    }
3523
3524    #[test]
3525    fn test_http_consumer_declares_concurrent() {
3526        use camel_component_api::ConcurrencyModel;
3527
3528        let config = HttpServerConfig {
3529            host: "127.0.0.1".to_string(),
3530            port: 19999,
3531            path: "/test".to_string(),
3532            max_request_body: 2 * 1024 * 1024,
3533            max_response_body: 10 * 1024 * 1024,
3534            max_inflight_requests: 1024,
3535        };
3536        let consumer = HttpConsumer::new(config);
3537        assert_eq!(
3538            consumer.concurrency_model(),
3539            ConcurrencyModel::Concurrent { max: None }
3540        );
3541    }
3542
3543    // -----------------------------------------------------------------------
3544    // HttpReplyBody streaming tests
3545    // -----------------------------------------------------------------------
3546
3547    #[tokio::test]
3548    async fn test_http_reply_body_stream_variant_exists() {
3549        use bytes::Bytes;
3550        use camel_component_api::CamelError;
3551        use futures::stream;
3552
3553        let chunks: Vec<Result<Bytes, CamelError>> =
3554            vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
3555        let stream = Box::pin(stream::iter(chunks));
3556        let reply_body = HttpReplyBody::Stream(stream);
3557        // Si compila y el match funciona, el test pasa
3558        match reply_body {
3559            HttpReplyBody::Stream(_) => {}
3560            HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
3561        }
3562    }
3563
3564    // -----------------------------------------------------------------------
3565    // OpenTelemetry propagation tests (only compiled with "otel" feature)
3566    // -----------------------------------------------------------------------
3567
3568    #[cfg(feature = "otel")]
3569    mod otel_tests {
3570        use super::*;
3571        use camel_component_api::Message;
3572        use tower::ServiceExt;
3573
3574        #[tokio::test]
3575        async fn test_producer_injects_traceparent_header() {
3576            let (url, _handle) = start_test_server_with_header_capture().await;
3577            let ctx = test_producer_ctx();
3578
3579            let component = HttpComponent::new();
3580            let endpoint_ctx = NoOpComponentContext;
3581            let endpoint = component
3582                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3583                .unwrap();
3584            let producer = endpoint.create_producer(&ctx).unwrap();
3585
3586            // Create exchange with an OTel context by extracting from a traceparent header
3587            let mut exchange = Exchange::new(Message::default());
3588            let mut headers = std::collections::HashMap::new();
3589            headers.insert(
3590                "traceparent".to_string(),
3591                "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
3592            );
3593            camel_otel::extract_into_exchange(&mut exchange, &headers);
3594
3595            let result = producer.oneshot(exchange).await.unwrap();
3596
3597            // Verify request succeeded
3598            let status = result
3599                .input
3600                .header("CamelHttpResponseCode")
3601                .and_then(|v| v.as_u64())
3602                .unwrap();
3603            assert_eq!(status, 200);
3604
3605            // The test server echoes back the received traceparent header
3606            let traceparent = result.input.header("X-Received-Traceparent");
3607            assert!(
3608                traceparent.is_some(),
3609                "traceparent header should have been sent"
3610            );
3611
3612            let traceparent_str = traceparent.unwrap().as_str().unwrap();
3613            // Verify format: version-traceid-spanid-flags
3614            let parts: Vec<&str> = traceparent_str.split('-').collect();
3615            assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3616            assert_eq!(parts[0], "00", "version should be 00");
3617            assert_eq!(
3618                parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3619                "trace-id should match"
3620            );
3621            assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
3622            assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
3623        }
3624
3625        #[tokio::test]
3626        async fn test_consumer_extracts_traceparent_header() {
3627            use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3628
3629            // Get an OS-assigned free port
3630            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3631            let port = listener.local_addr().unwrap().port();
3632            drop(listener);
3633
3634            let component = HttpComponent::new();
3635            let endpoint_ctx = NoOpComponentContext;
3636            let endpoint = component
3637                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
3638                .unwrap();
3639            let mut consumer = endpoint.create_consumer().unwrap();
3640
3641            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3642            let token = tokio_util::sync::CancellationToken::new();
3643            let ctx = ConsumerContext::new(tx, token.clone());
3644
3645            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3646            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3647
3648            // Send request with traceparent header
3649            let client = reqwest::Client::new();
3650            let send_fut = client
3651                .post(format!("http://127.0.0.1:{port}/trace"))
3652                .header(
3653                    "traceparent",
3654                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3655                )
3656                .body("test")
3657                .send();
3658
3659            let (http_result, _) = tokio::join!(send_fut, async {
3660                if let Some(envelope) = rx.recv().await {
3661                    // Verify the exchange has a valid OTel context by re-injecting it
3662                    // and checking the traceparent matches
3663                    let mut injected_headers = std::collections::HashMap::new();
3664                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3665
3666                    assert!(
3667                        injected_headers.contains_key("traceparent"),
3668                        "Exchange should have traceparent after extraction"
3669                    );
3670
3671                    let traceparent = injected_headers.get("traceparent").unwrap();
3672                    let parts: Vec<&str> = traceparent.split('-').collect();
3673                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3674                    assert_eq!(
3675                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3676                        "Trace ID should match the original traceparent header"
3677                    );
3678
3679                    if let Some(reply_tx) = envelope.reply_tx {
3680                        let _ = reply_tx.send(Ok(envelope.exchange));
3681                    }
3682                }
3683            });
3684
3685            let resp = http_result.unwrap();
3686            assert_eq!(resp.status().as_u16(), 200);
3687
3688            token.cancel();
3689        }
3690
3691        #[tokio::test]
3692        async fn test_consumer_extracts_mixed_case_traceparent_header() {
3693            use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3694
3695            // Get an OS-assigned free port
3696            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3697            let port = listener.local_addr().unwrap().port();
3698            drop(listener);
3699
3700            let component = HttpComponent::new();
3701            let endpoint_ctx = NoOpComponentContext;
3702            let endpoint = component
3703                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
3704                .unwrap();
3705            let mut consumer = endpoint.create_consumer().unwrap();
3706
3707            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3708            let token = tokio_util::sync::CancellationToken::new();
3709            let ctx = ConsumerContext::new(tx, token.clone());
3710
3711            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3712            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3713
3714            // Send request with MIXED-CASE TraceParent header (not lowercase)
3715            let client = reqwest::Client::new();
3716            let send_fut = client
3717                .post(format!("http://127.0.0.1:{port}/trace"))
3718                .header(
3719                    "TraceParent",
3720                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3721                )
3722                .body("test")
3723                .send();
3724
3725            let (http_result, _) = tokio::join!(send_fut, async {
3726                if let Some(envelope) = rx.recv().await {
3727                    // Verify the exchange has a valid OTel context by re-injecting it
3728                    // and checking the traceparent matches
3729                    let mut injected_headers = HashMap::new();
3730                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3731
3732                    assert!(
3733                        injected_headers.contains_key("traceparent"),
3734                        "Exchange should have traceparent after extraction from mixed-case header"
3735                    );
3736
3737                    let traceparent = injected_headers.get("traceparent").unwrap();
3738                    let parts: Vec<&str> = traceparent.split('-').collect();
3739                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3740                    assert_eq!(
3741                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3742                        "Trace ID should match the original mixed-case TraceParent header"
3743                    );
3744
3745                    if let Some(reply_tx) = envelope.reply_tx {
3746                        let _ = reply_tx.send(Ok(envelope.exchange));
3747                    }
3748                }
3749            });
3750
3751            let resp = http_result.unwrap();
3752            assert_eq!(resp.status().as_u16(), 200);
3753
3754            token.cancel();
3755        }
3756
3757        #[tokio::test]
3758        async fn test_producer_no_trace_context_no_crash() {
3759            let (url, _handle) = start_test_server().await;
3760            let ctx = test_producer_ctx();
3761
3762            let component = HttpComponent::new();
3763            let endpoint_ctx = NoOpComponentContext;
3764            let endpoint = component
3765                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3766                .unwrap();
3767            let producer = endpoint.create_producer(&ctx).unwrap();
3768
3769            // Create exchange with default (empty) otel_context - no trace context
3770            let exchange = Exchange::new(Message::default());
3771
3772            // Should succeed without panic
3773            let result = producer.oneshot(exchange).await.unwrap();
3774
3775            // Verify request succeeded
3776            let status = result
3777                .input
3778                .header("CamelHttpResponseCode")
3779                .and_then(|v| v.as_u64())
3780                .unwrap();
3781            assert_eq!(status, 200);
3782        }
3783
3784        /// Test server that captures and echoes back the traceparent header
3785        async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
3786            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3787            let addr = listener.local_addr().unwrap();
3788            let url = format!("http://127.0.0.1:{}", addr.port());
3789
3790            let handle = tokio::spawn(async move {
3791                loop {
3792                    if let Ok((mut stream, _)) = listener.accept().await {
3793                        tokio::spawn(async move {
3794                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
3795                            let mut buf = vec![0u8; 8192];
3796                            let n = stream.read(&mut buf).await.unwrap_or(0);
3797                            let request = String::from_utf8_lossy(&buf[..n]).to_string();
3798
3799                            // Extract traceparent header from request
3800                            let traceparent = request
3801                                .lines()
3802                                .find(|line| line.to_lowercase().starts_with("traceparent:"))
3803                                .map(|line| {
3804                                    line.split(':')
3805                                        .nth(1)
3806                                        .map(|s| s.trim().to_string())
3807                                        .unwrap_or_default()
3808                                })
3809                                .unwrap_or_default();
3810
3811                            let body =
3812                                format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
3813                            let response = format!(
3814                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
3815                                body.len(),
3816                                traceparent,
3817                                body
3818                            );
3819                            let _ = stream.write_all(response.as_bytes()).await;
3820                        });
3821                    }
3822                }
3823            });
3824
3825            (url, handle)
3826        }
3827    }
3828
3829    // -----------------------------------------------------------------------
3830    // Response streaming tests (Eje A - Task 2)
3831    // -----------------------------------------------------------------------
3832
3833    // -----------------------------------------------------------------------
3834    // Request streaming tests (Eje B - Task 3)
3835    // -----------------------------------------------------------------------
3836
3837    #[tokio::test]
3838    async fn test_request_body_arrives_as_stream() {
3839        use camel_component_api::Body;
3840        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3841
3842        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3843        let port = listener.local_addr().unwrap().port();
3844        drop(listener);
3845
3846        let component = HttpComponent::new();
3847        let endpoint_ctx = NoOpComponentContext;
3848        let endpoint = component
3849            .create_endpoint(&format!("http://127.0.0.1:{port}/upload"), &endpoint_ctx)
3850            .unwrap();
3851        let mut consumer = endpoint.create_consumer().unwrap();
3852
3853        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3854        let token = tokio_util::sync::CancellationToken::new();
3855        let ctx = ConsumerContext::new(tx, token.clone());
3856
3857        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3858        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3859
3860        let client = reqwest::Client::new();
3861        let send_fut = client
3862            .post(format!("http://127.0.0.1:{port}/upload"))
3863            .body("hello streaming world")
3864            .send();
3865
3866        let (http_result, _) = tokio::join!(send_fut, async {
3867            if let Some(mut envelope) = rx.recv().await {
3868                // Body must be Body::Stream, not Body::Text or Body::Bytes
3869                assert!(
3870                    matches!(envelope.exchange.input.body, Body::Stream(_)),
3871                    "expected Body::Stream, got discriminant {:?}",
3872                    std::mem::discriminant(&envelope.exchange.input.body)
3873                );
3874                // Materialize to verify content
3875                let bytes = envelope
3876                    .exchange
3877                    .input
3878                    .body
3879                    .into_bytes(1024 * 1024)
3880                    .await
3881                    .unwrap();
3882                assert_eq!(&bytes[..], b"hello streaming world");
3883
3884                envelope.exchange.input.body = camel_component_api::Body::Empty;
3885                if let Some(reply_tx) = envelope.reply_tx {
3886                    let _ = reply_tx.send(Ok(envelope.exchange));
3887                }
3888            }
3889        });
3890
3891        let resp = http_result.unwrap();
3892        assert_eq!(resp.status().as_u16(), 200);
3893
3894        token.cancel();
3895    }
3896
3897    // -----------------------------------------------------------------------
3898    // Response streaming tests (Eje A - Task 2)
3899    // -----------------------------------------------------------------------
3900
3901    #[tokio::test]
3902    async fn test_streaming_response_chunked() {
3903        use bytes::Bytes;
3904        use camel_component_api::Body;
3905        use camel_component_api::CamelError;
3906        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3907        use camel_component_api::{StreamBody, StreamMetadata};
3908        use futures::stream;
3909        use std::sync::Arc;
3910        use tokio::sync::Mutex;
3911
3912        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3913        let port = listener.local_addr().unwrap().port();
3914        drop(listener);
3915
3916        let component = HttpComponent::new();
3917        let endpoint_ctx = NoOpComponentContext;
3918        let endpoint = component
3919            .create_endpoint(&format!("http://127.0.0.1:{port}/stream"), &endpoint_ctx)
3920            .unwrap();
3921        let mut consumer = endpoint.create_consumer().unwrap();
3922
3923        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3924        let token = tokio_util::sync::CancellationToken::new();
3925        let ctx = ConsumerContext::new(tx, token.clone());
3926
3927        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3928        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3929
3930        let client = reqwest::Client::new();
3931        let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
3932
3933        let (http_result, _) = tokio::join!(send_fut, async {
3934            if let Some(mut envelope) = rx.recv().await {
3935                // Respond with Body::Stream
3936                let chunks: Vec<Result<Bytes, CamelError>> =
3937                    vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
3938                let stream = Box::pin(stream::iter(chunks));
3939                envelope.exchange.input.body = Body::Stream(StreamBody {
3940                    stream: Arc::new(Mutex::new(Some(stream))),
3941                    metadata: StreamMetadata::default(),
3942                });
3943                if let Some(reply_tx) = envelope.reply_tx {
3944                    let _ = reply_tx.send(Ok(envelope.exchange));
3945                }
3946            }
3947        });
3948
3949        let resp = http_result.unwrap();
3950        assert_eq!(resp.status().as_u16(), 200);
3951        let body = resp.text().await.unwrap();
3952        assert_eq!(body, "chunk1chunk2");
3953
3954        token.cancel();
3955    }
3956
3957    // -----------------------------------------------------------------------
3958    // 413 Content-Length limit test (Task 4)
3959    // -----------------------------------------------------------------------
3960
3961    #[tokio::test]
3962    async fn test_413_when_content_length_exceeds_limit() {
3963        use camel_component_api::ConsumerContext;
3964
3965        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3966        let port = listener.local_addr().unwrap().port();
3967        drop(listener);
3968
3969        // maxRequestBody=100 — any request declaring more than 100 bytes must get 413
3970        let component = HttpComponent::new();
3971        let endpoint_ctx = NoOpComponentContext;
3972        let endpoint = component
3973            .create_endpoint(
3974                &format!("http://127.0.0.1:{port}/upload?maxRequestBody=100"),
3975                &endpoint_ctx,
3976            )
3977            .unwrap();
3978        let mut consumer = endpoint.create_consumer().unwrap();
3979
3980        let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3981        let token = tokio_util::sync::CancellationToken::new();
3982        let ctx = ConsumerContext::new(tx, token.clone());
3983
3984        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3985        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3986
3987        let client = reqwest::Client::new();
3988        let resp = client
3989            .post(format!("http://127.0.0.1:{port}/upload"))
3990            .header("Content-Length", "1000") // declares 1000 bytes, limit is 100
3991            .body("x".repeat(1000))
3992            .send()
3993            .await
3994            .unwrap();
3995
3996        assert_eq!(resp.status().as_u16(), 413);
3997
3998        token.cancel();
3999    }
4000
4001    /// Chunked upload without Content-Length header must NOT be rejected by maxRequestBody.
4002    /// The spec says: "If there is no Content-Length, the limit does not apply at the
4003    /// consumer level — the route is responsible."
4004    #[tokio::test]
4005    async fn test_chunked_upload_without_content_length_bypasses_limit() {
4006        use bytes::Bytes;
4007        use camel_component_api::Body;
4008        use camel_component_api::ConsumerContext;
4009        use futures::stream;
4010
4011        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4012        let port = listener.local_addr().unwrap().port();
4013        drop(listener);
4014
4015        // maxRequestBody=10 — very small limit; chunked uploads have no Content-Length
4016        let component = HttpComponent::new();
4017        let endpoint_ctx = NoOpComponentContext;
4018        let endpoint = component
4019            .create_endpoint(
4020                &format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"),
4021                &endpoint_ctx,
4022            )
4023            .unwrap();
4024        let mut consumer = endpoint.create_consumer().unwrap();
4025
4026        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4027        let token = tokio_util::sync::CancellationToken::new();
4028        let ctx = ConsumerContext::new(tx, token.clone());
4029
4030        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4031        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4032
4033        let client = reqwest::Client::new();
4034
4035        // Use wrap_stream so reqwest sends chunked transfer encoding WITHOUT a
4036        // Content-Length header. 100 bytes exceeds the 10-byte maxRequestBody limit,
4037        // but since there's no Content-Length the 413 check must NOT fire.
4038        let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
4039            Ok(Bytes::from("y".repeat(50))),
4040            Ok(Bytes::from("y".repeat(50))),
4041        ];
4042        let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
4043        let send_fut = client
4044            .post(format!("http://127.0.0.1:{port}/upload"))
4045            .body(stream_body)
4046            .send();
4047
4048        let consumer_fut = async {
4049            // Use timeout to avoid deadlock if the handler rejects before enqueueing
4050            match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
4051                Ok(Some(mut envelope)) => {
4052                    assert!(
4053                        matches!(envelope.exchange.input.body, Body::Stream(_)),
4054                        "expected Body::Stream"
4055                    );
4056                    envelope.exchange.input.body = camel_component_api::Body::Empty;
4057                    if let Some(reply_tx) = envelope.reply_tx {
4058                        let _ = reply_tx.send(Ok(envelope.exchange));
4059                    }
4060                }
4061                Ok(None) => panic!("consumer channel closed unexpectedly"),
4062                Err(_) => {
4063                    // Timeout: the request was rejected before reaching the consumer.
4064                    // The HTTP response will carry the real status code (we check below).
4065                }
4066            }
4067        };
4068
4069        let (http_result, _) = tokio::join!(send_fut, consumer_fut);
4070
4071        let resp = http_result.unwrap();
4072        // Must NOT be 413; chunked uploads without Content-Length bypass the limit.
4073        assert_ne!(
4074            resp.status().as_u16(),
4075            413,
4076            "chunked upload must not be rejected by maxRequestBody"
4077        );
4078        assert_eq!(resp.status().as_u16(), 200);
4079
4080        token.cancel();
4081    }
4082
4083    #[test]
4084    fn test_validate_url_for_ssrf_blocks_and_allows_hosts() {
4085        let mut cfg = HttpEndpointConfig::from_uri("http://example.com").unwrap();
4086        cfg.blocked_hosts = vec!["blocked.local".to_string()];
4087        cfg.allow_private_ips = false;
4088
4089        let blocked = validate_url_for_ssrf("http://blocked.local/api", &cfg);
4090        assert!(blocked.is_err());
4091
4092        let private_ip = validate_url_for_ssrf("http://127.0.0.1/api", &cfg);
4093        assert!(private_ip.is_err());
4094
4095        cfg.allow_private_ips = true;
4096        let allowed = validate_url_for_ssrf("http://127.0.0.1/api", &cfg);
4097        assert!(allowed.is_ok());
4098    }
4099
4100    #[test]
4101    fn test_is_private_ip_ranges() {
4102        assert!(is_private_ip(&"10.0.0.1".parse().unwrap())); // allow-unwrap
4103        assert!(is_private_ip(&"172.16.1.10".parse().unwrap())); // allow-unwrap
4104        assert!(is_private_ip(&"192.168.1.1".parse().unwrap())); // allow-unwrap
4105        assert!(is_private_ip(&"127.0.0.1".parse().unwrap())); // allow-unwrap
4106        assert!(is_private_ip(&"169.254.1.1".parse().unwrap())); // allow-unwrap
4107        assert!(is_private_ip(&"0.1.2.3".parse().unwrap())); // allow-unwrap
4108
4109        assert!(is_private_ip(&"::1".parse().unwrap())); // allow-unwrap
4110        assert!(is_private_ip(&"fc00::1".parse().unwrap())); // allow-unwrap
4111        assert!(is_private_ip(&"fd12::1".parse().unwrap())); // allow-unwrap
4112        assert!(is_private_ip(&"fe80::1".parse().unwrap())); // allow-unwrap
4113        // ::ffff:0:0/96 (IPv4-mapped): only private if the mapped IPv4 is private
4114        assert!(is_private_ip(&"::ffff:10.0.0.1".parse().unwrap())); // allow-unwrap
4115        assert!(is_private_ip(&"::ffff:192.168.1.1".parse().unwrap())); // allow-unwrap
4116        assert!(is_private_ip(&"::ffff:127.0.0.1".parse().unwrap())); // allow-unwrap
4117
4118        assert!(!is_private_ip(&"8.8.8.8".parse().unwrap())); // allow-unwrap
4119        assert!(!is_private_ip(&"::ffff:8.8.8.8".parse().unwrap())); // allow-unwrap — public IPv4-mapped
4120        assert!(!is_private_ip(&"2001:4860:4860::8888".parse().unwrap())); // allow-unwrap
4121    }
4122
4123    #[tokio::test]
4124    async fn test_validate_resolved_host_for_ssrf_blocks_resolved_private_ip() {
4125        let mut cfg = HttpEndpointConfig::from_uri("http://example.com").unwrap(); // allow-unwrap
4126        cfg.allow_private_ips = false;
4127
4128        let err = validate_resolved_host_for_ssrf("http://localhost", &cfg)
4129            .await
4130            .expect_err("localhost must resolve to loopback and be blocked");
4131
4132        let msg = err.to_string();
4133        assert!(msg.contains("Target resolved to private IP"));
4134    }
4135
4136    #[test]
4137    fn test_title_case_header() {
4138        assert_eq!(title_case_header("content-type"), "Content-Type");
4139        assert_eq!(title_case_header("authorization"), "Authorization");
4140        assert_eq!(title_case_header("x-custom-header"), "X-Custom-Header");
4141        assert_eq!(title_case_header("host"), "Host");
4142        assert_eq!(title_case_header("x-b3-traceid"), "X-B3-Traceid");
4143        assert_eq!(title_case_header("single"), "Single");
4144        assert_eq!(title_case_header(""), "");
4145    }
4146
4147    #[test]
4148    fn test_resolve_url_combines_path_and_query_sources() {
4149        let cfg = HttpEndpointConfig::from_uri("http://example.com/base?foo=bar").unwrap();
4150        let mut exchange = Exchange::new(Message::default());
4151        exchange.input.set_header(
4152            "CamelHttpPath",
4153            serde_json::Value::String("next".to_string()),
4154        );
4155        let url = HttpProducer::resolve_url(&exchange, &cfg);
4156        assert!(url.starts_with("http://example.com/base/next?"));
4157        assert!(url.contains("foo=bar"));
4158
4159        exchange.input.set_header(
4160            "CamelHttpUri",
4161            serde_json::Value::String("http://other.test/root".to_string()),
4162        );
4163        exchange.input.set_header(
4164            "CamelHttpQuery",
4165            serde_json::Value::String("a=1&b=2".to_string()),
4166        );
4167
4168        let override_url = HttpProducer::resolve_url(&exchange, &cfg);
4169        assert_eq!(override_url, "http://other.test/root/next?a=1&b=2");
4170    }
4171
4172    #[test]
4173    fn test_http_producer_helpers_status_and_size_boundaries() {
4174        assert!(HttpProducer::is_ok_status(200, (200, 299)));
4175        assert!(HttpProducer::is_ok_status(299, (200, 299)));
4176        assert!(!HttpProducer::is_ok_status(199, (200, 299)));
4177        assert!(!HttpProducer::is_ok_status(300, (200, 299)));
4178
4179        assert!(!exceeds_max_response_body(10, 10));
4180        assert!(exceeds_max_response_body(11, 10));
4181    }
4182
4183    // -----------------------------------------------------------------------
4184    // Content-Type inference tests
4185    // -----------------------------------------------------------------------
4186
4187    async fn setup_consumer_on_free_port(
4188        path: &str,
4189    ) -> (
4190        u16,
4191        tokio::sync::mpsc::Receiver<camel_component_api::ExchangeEnvelope>,
4192        tokio_util::sync::CancellationToken,
4193    ) {
4194        use camel_component_api::ConsumerContext;
4195
4196        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4197        let port = listener.local_addr().unwrap().port();
4198        drop(listener);
4199
4200        let consumer_cfg = HttpServerConfig {
4201            host: "127.0.0.1".to_string(),
4202            port,
4203            path: path.to_string(),
4204            max_request_body: 2 * 1024 * 1024,
4205            max_response_body: 10 * 1024 * 1024,
4206            max_inflight_requests: 1024,
4207        };
4208        let mut consumer = HttpConsumer::new(consumer_cfg);
4209
4210        let (tx, rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4211        let token = tokio_util::sync::CancellationToken::new();
4212        let ctx = ConsumerContext::new(tx, token.clone());
4213
4214        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4215        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4216
4217        (port, rx, token)
4218    }
4219
4220    #[tokio::test]
4221    async fn test_content_type_inferred_for_json_body() {
4222        let (port, mut rx, token) = setup_consumer_on_free_port("/json").await;
4223
4224        let client = reqwest::Client::new();
4225        let send_fut = client.get(format!("http://127.0.0.1:{port}/json")).send();
4226
4227        let (http_result, _) = tokio::join!(send_fut, async {
4228            if let Some(mut envelope) = rx.recv().await {
4229                envelope.exchange.input.body =
4230                    camel_component_api::Body::Json(serde_json::json!({"message": "hello"}));
4231                if let Some(reply_tx) = envelope.reply_tx {
4232                    let _ = reply_tx.send(Ok(envelope.exchange));
4233                }
4234            }
4235        });
4236
4237        let resp = http_result.unwrap();
4238        assert_eq!(resp.status().as_u16(), 200);
4239        let ct = resp
4240            .headers()
4241            .get("content-type")
4242            .expect("Content-Type header should be present");
4243        assert_eq!(ct, "application/json");
4244        let body = resp.text().await.unwrap();
4245        assert_eq!(body, r#"{"message":"hello"}"#);
4246
4247        token.cancel();
4248    }
4249
4250    #[tokio::test]
4251    async fn test_content_type_inferred_for_text_body() {
4252        let (port, mut rx, token) = setup_consumer_on_free_port("/text").await;
4253
4254        let client = reqwest::Client::new();
4255        let send_fut = client.get(format!("http://127.0.0.1:{port}/text")).send();
4256
4257        let (http_result, _) = tokio::join!(send_fut, async {
4258            if let Some(mut envelope) = rx.recv().await {
4259                envelope.exchange.input.body =
4260                    camel_component_api::Body::Text("plain text response".to_string());
4261                if let Some(reply_tx) = envelope.reply_tx {
4262                    let _ = reply_tx.send(Ok(envelope.exchange));
4263                }
4264            }
4265        });
4266
4267        let resp = http_result.unwrap();
4268        assert_eq!(resp.status().as_u16(), 200);
4269        let ct = resp
4270            .headers()
4271            .get("content-type")
4272            .expect("Content-Type header should be present");
4273        assert_eq!(ct, "text/plain; charset=utf-8");
4274        let body = resp.text().await.unwrap();
4275        assert_eq!(body, "plain text response");
4276
4277        token.cancel();
4278    }
4279
4280    #[tokio::test]
4281    async fn test_content_type_inferred_for_xml_body() {
4282        let (port, mut rx, token) = setup_consumer_on_free_port("/xml").await;
4283
4284        let client = reqwest::Client::new();
4285        let send_fut = client.get(format!("http://127.0.0.1:{port}/xml")).send();
4286
4287        let (http_result, _) = tokio::join!(send_fut, async {
4288            if let Some(mut envelope) = rx.recv().await {
4289                envelope.exchange.input.body =
4290                    camel_component_api::Body::Xml("<root><item>value</item></root>".to_string());
4291                if let Some(reply_tx) = envelope.reply_tx {
4292                    let _ = reply_tx.send(Ok(envelope.exchange));
4293                }
4294            }
4295        });
4296
4297        let resp = http_result.unwrap();
4298        assert_eq!(resp.status().as_u16(), 200);
4299        let ct = resp
4300            .headers()
4301            .get("content-type")
4302            .expect("Content-Type header should be present");
4303        assert_eq!(ct, "application/xml");
4304        let body = resp.text().await.unwrap();
4305        assert_eq!(body, "<root><item>value</item></root>");
4306
4307        token.cancel();
4308    }
4309
4310    #[tokio::test]
4311    async fn test_no_content_type_for_empty_body() {
4312        let (port, mut rx, token) = setup_consumer_on_free_port("/empty").await;
4313
4314        let client = reqwest::Client::new();
4315        let send_fut = client.get(format!("http://127.0.0.1:{port}/empty")).send();
4316
4317        let (http_result, _) = tokio::join!(send_fut, async {
4318            if let Some(mut envelope) = rx.recv().await {
4319                envelope.exchange.input.body = camel_component_api::Body::Empty;
4320                if let Some(reply_tx) = envelope.reply_tx {
4321                    let _ = reply_tx.send(Ok(envelope.exchange));
4322                }
4323            }
4324        });
4325
4326        let resp = http_result.unwrap();
4327        assert_eq!(resp.status().as_u16(), 200);
4328        assert!(
4329            resp.headers().get("content-type").is_none(),
4330            "Empty body should not set Content-Type"
4331        );
4332
4333        token.cancel();
4334    }
4335
4336    #[tokio::test]
4337    async fn test_no_content_type_for_raw_bytes_body() {
4338        let (port, mut rx, token) = setup_consumer_on_free_port("/bytes").await;
4339
4340        let client = reqwest::Client::new();
4341        let send_fut = client.get(format!("http://127.0.0.1:{port}/bytes")).send();
4342
4343        let (http_result, _) = tokio::join!(send_fut, async {
4344            if let Some(mut envelope) = rx.recv().await {
4345                envelope.exchange.input.body =
4346                    camel_component_api::Body::Bytes(bytes::Bytes::from_static(b"\x00\x01\x02"));
4347                if let Some(reply_tx) = envelope.reply_tx {
4348                    let _ = reply_tx.send(Ok(envelope.exchange));
4349                }
4350            }
4351        });
4352
4353        let resp = http_result.unwrap();
4354        assert_eq!(resp.status().as_u16(), 200);
4355        assert!(
4356            resp.headers().get("content-type").is_none(),
4357            "Raw Bytes body should not set Content-Type"
4358        );
4359
4360        token.cancel();
4361    }
4362
4363    #[tokio::test]
4364    async fn test_content_type_from_stream_metadata() {
4365        use camel_component_api::{StreamBody, StreamMetadata};
4366        use futures::stream;
4367
4368        let (port, mut rx, token) = setup_consumer_on_free_port("/stream-ct").await;
4369
4370        let client = reqwest::Client::new();
4371        let send_fut = client
4372            .get(format!("http://127.0.0.1:{port}/stream-ct"))
4373            .send();
4374
4375        let (http_result, _) = tokio::join!(send_fut, async {
4376            if let Some(mut envelope) = rx.recv().await {
4377                let chunks: Vec<Result<bytes::Bytes, CamelError>> =
4378                    vec![Ok(bytes::Bytes::from("audio data"))];
4379                let stream = Box::pin(stream::iter(chunks));
4380                envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
4381                    stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
4382                    metadata: StreamMetadata {
4383                        size_hint: None,
4384                        content_type: Some("audio/mpeg".to_string()),
4385                        origin: None,
4386                    },
4387                });
4388                if let Some(reply_tx) = envelope.reply_tx {
4389                    let _ = reply_tx.send(Ok(envelope.exchange));
4390                }
4391            }
4392        });
4393
4394        let resp = http_result.unwrap();
4395        assert_eq!(resp.status().as_u16(), 200);
4396        let ct = resp
4397            .headers()
4398            .get("content-type")
4399            .expect("Content-Type header should be present");
4400        assert_eq!(ct, "audio/mpeg");
4401        let body = resp.text().await.unwrap();
4402        assert_eq!(body, "audio data");
4403
4404        token.cancel();
4405    }
4406
4407    #[tokio::test]
4408    async fn test_user_content_type_overrides_inferred() {
4409        let (port, mut rx, token) = setup_consumer_on_free_port("/override-ct").await;
4410
4411        let client = reqwest::Client::new();
4412        let send_fut = client
4413            .get(format!("http://127.0.0.1:{port}/override-ct"))
4414            .send();
4415
4416        let (http_result, _) = tokio::join!(send_fut, async {
4417            if let Some(mut envelope) = rx.recv().await {
4418                envelope.exchange.input.body =
4419                    camel_component_api::Body::Json(serde_json::json!({"ok": true}));
4420                envelope.exchange.input.set_header(
4421                    "Content-Type",
4422                    serde_json::Value::String("text/html".to_string()),
4423                );
4424                if let Some(reply_tx) = envelope.reply_tx {
4425                    let _ = reply_tx.send(Ok(envelope.exchange));
4426                }
4427            }
4428        });
4429
4430        let resp = http_result.unwrap();
4431        assert_eq!(resp.status().as_u16(), 200);
4432        let ct = resp
4433            .headers()
4434            .get("content-type")
4435            .expect("Content-Type header should be present");
4436        assert_eq!(
4437            ct, "text/html",
4438            "User-set Content-Type should take precedence over inferred type"
4439        );
4440
4441        token.cancel();
4442    }
4443
4444    #[tokio::test]
4445    async fn test_user_content_type_with_bytes_body() {
4446        let (port, mut rx, token) = setup_consumer_on_free_port("/bytes-ct").await;
4447
4448        let client = reqwest::Client::new();
4449        let send_fut = client
4450            .get(format!("http://127.0.0.1:{port}/bytes-ct"))
4451            .send();
4452
4453        let (http_result, _) = tokio::join!(send_fut, async {
4454            if let Some(mut envelope) = rx.recv().await {
4455                envelope.exchange.input.body =
4456                    camel_component_api::Body::Bytes(bytes::Bytes::from_static(b"{\"ok\":true}"));
4457                envelope.exchange.input.set_header(
4458                    "Content-Type",
4459                    serde_json::Value::String("application/json".to_string()),
4460                );
4461                if let Some(reply_tx) = envelope.reply_tx {
4462                    let _ = reply_tx.send(Ok(envelope.exchange));
4463                }
4464            }
4465        });
4466
4467        let resp = http_result.unwrap();
4468        assert_eq!(resp.status().as_u16(), 200);
4469        let ct = resp
4470            .headers()
4471            .get("content-type")
4472            .expect("Content-Type header should be present for Bytes body with user header");
4473        assert_eq!(
4474            ct, "application/json",
4475            "User Content-Type should be sent for Bytes body"
4476        );
4477
4478        token.cancel();
4479    }
4480
4481    // -----------------------------------------------------------------------
4482    // Server monitor tests (GRL-005)
4483    // -----------------------------------------------------------------------
4484
4485    #[tokio::test]
4486    async fn monitor_task_silent_on_clean_exit() {
4487        let handle: tokio::task::JoinHandle<()> = tokio::spawn(async {});
4488        // Clean exit should complete without panicking or logging errors
4489        monitor_axum_task(handle, "127.0.0.1:0".to_string()).await;
4490    }
4491
4492    #[tokio::test]
4493    async fn monitor_task_handles_panicked_task() {
4494        let handle: tokio::task::JoinHandle<()> = tokio::spawn(async {
4495            panic!("simulated server crash");
4496        });
4497        // Should complete without panicking even though the inner task panicked
4498        monitor_axum_task(handle, "127.0.0.1:9999".to_string()).await;
4499    }
4500
4501    // -----------------------------------------------------------------------
4502    // Credential redaction tests
4503    // -----------------------------------------------------------------------
4504
4505    #[test]
4506    fn http_auth_basic_debug_redacts_password() {
4507        let auth = HttpAuth::Basic {
4508            username: "admin".to_string(),
4509            password: "hunter2".to_string(),
4510        };
4511        let debug = format!("{:?}", auth);
4512        assert!(
4513            !debug.contains("hunter2"),
4514            "password must be redacted: {debug}"
4515        );
4516        assert!(debug.contains("admin"), "username should appear: {debug}");
4517    }
4518
4519    #[test]
4520    fn http_auth_bearer_debug_redacts_token() {
4521        let auth = HttpAuth::Bearer {
4522            token: "eyJhbGciOiJIUzI1NiJ9.secret".to_string(),
4523        };
4524        let debug = format!("{:?}", auth);
4525        assert!(
4526            !debug.contains("eyJhbGci"),
4527            "token must be redacted: {debug}"
4528        );
4529    }
4530
4531    #[test]
4532    fn http_auth_none_debug_shows_variant() {
4533        let debug = format!("{:?}", HttpAuth::None);
4534        assert!(
4535            debug.contains("None"),
4536            "None variant should appear: {debug}"
4537        );
4538    }
4539
4540    #[test]
4541    fn http_endpoint_config_debug_redacts_auth_credentials() {
4542        let config = HttpEndpointConfig::from_uri(
4543            "http://localhost/api?authMethod=Basic&authUsername=admin&authPassword=secret123",
4544        )
4545        .unwrap();
4546        let debug = format!("{:?}", config);
4547        assert!(
4548            !debug.contains("secret123"),
4549            "password must be redacted in HttpEndpointConfig debug: {debug}"
4550        );
4551    }
4552}