Skip to main content

camel_component_http/
lib.rs

1pub mod bundle;
2pub mod config;
3use crate::config::parse_ok_status_code_range;
4pub use bundle::HttpBundle;
5pub use config::HttpConfig;
6
7use std::collections::HashMap;
8use std::future::Future;
9use std::net::IpAddr;
10use std::pin::Pin;
11use std::sync::{Arc, Mutex, OnceLock};
12use std::task::{Context, Poll};
13use std::time::Duration;
14
15use tokio::sync::{OnceCell, RwLock};
16use tower::Service;
17use tracing::debug;
18
19use axum::body::BodyDataStream;
20use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, StreamBody, StreamMetadata};
21use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
22use camel_component_api::{UriComponents, UriConfig, parse_uri};
23use futures::TryStreamExt;
24use futures::stream::BoxStream;
25
26// ---------------------------------------------------------------------------
27// HttpEndpointConfig
28// ---------------------------------------------------------------------------
29
30/// Configuration for an HTTP client (producer) endpoint.
31///
32/// # Memory Limits
33///
34/// HTTP operations enforce conservative memory limits to prevent denial-of-service
35/// attacks from untrusted network sources. These limits are significantly lower than
36/// file component limits (100MB) because HTTP typically handles API responses rather
37/// than large file transfers, and clients may be untrusted.
38///
39/// ## Default Limits
40///
41/// - **HTTP client body**: 10MB (typical API responses)
42/// - **HTTP server request**: 2MB (untrusted network input - see `HttpServerConfig`)
43/// - **HTTP server response**: 10MB (same as client - see `HttpServerConfig`)
44///
45/// ## Rationale
46///
47/// The 10MB limit for HTTP client responses is appropriate for most API interactions
48/// while providing protection against:
49/// - Malicious servers sending oversized responses
50/// - Runaway processes generating unexpectedly large payloads
51/// - Memory exhaustion attacks
52///
53/// The 2MB server request limit is even more conservative because it handles input
54/// from potentially untrusted clients on the public internet.
55///
56/// ## Overriding Limits
57///
58/// Override the default client body limit using the `maxBodySize` URI parameter:
59///
60/// ```text
61/// http://api.example.com/large-data?maxBodySize=52428800
62/// ```
63///
64/// For server endpoints, use `maxRequestBody` and `maxResponseBody` parameters:
65///
66/// ```text
67/// http://0.0.0.0:8080/upload?maxRequestBody=52428800
68/// ```
69///
70/// ## Behavior When Exceeded
71///
72/// When a body exceeds the configured limit:
73/// - An error is returned immediately
74/// - No memory is exhausted - the limit is checked before allocation
75/// - The HTTP connection is terminated cleanly
76///
77/// ## Security Considerations
78///
79/// HTTP endpoints should be treated with more caution than file endpoints because:
80/// - Clients may be unknown and untrusted
81/// - Network traffic can be spoofed or malicious
82/// - DoS attacks often exploit unbounded resource consumption
83///
84/// Only increase limits when you control both ends of the connection or when
85/// business requirements demand larger payloads.
86#[derive(Debug, Clone)]
87pub struct HttpEndpointConfig {
88    pub base_url: String,
89    pub http_method: Option<String>,
90    pub throw_exception_on_failure: bool,
91    pub ok_status_code_range: (u16, u16),
92    pub response_timeout: Option<Duration>,
93    pub query_params: HashMap<String, String>,
94    pub allow_private_ips: bool,
95    pub blocked_hosts: Vec<String>,
96    pub max_body_size: usize,
97    pub read_timeout_ms: u64,
98    pub max_response_bytes: usize,
99    pub auth: HttpAuth,
100    pub user_agent: Option<String>,
101    pub cookie_handling: CookieHandling,
102    pub bridge_endpoint: bool,
103    pub connection_close: bool,
104    pub skip_request_headers: Vec<String>,
105    pub skip_response_headers: Vec<String>,
106}
107
108#[derive(Debug, Clone, PartialEq)]
109pub enum HttpAuth {
110    None,
111    Basic { username: String, password: String },
112    Bearer { token: String },
113}
114
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub enum CookieHandling {
117    Disabled,
118    InMemory,
119}
120
121/// Camel options that should NOT be forwarded as HTTP query params
122const HTTP_CAMEL_OPTIONS: &[&str] = &[
123    "httpMethod",
124    "throwExceptionOnFailure",
125    "okStatusCodeRange",
126    "followRedirects",
127    "connectTimeout",
128    "responseTimeout",
129    "allowPrivateIps",
130    "blockedHosts",
131    "maxBodySize",
132    "readTimeout",
133    "maxResponseBytes",
134    "authMethod",
135    "authUsername",
136    "authPassword",
137    "authBearerToken",
138    "userAgent",
139    "cookieHandling",
140    "bridgeEndpoint",
141    "connectionClose",
142    "skipRequestHeaders",
143    "skipResponseHeaders",
144];
145
146impl UriConfig for HttpEndpointConfig {
147    /// Returns "http" as the primary scheme (also accepts "https")
148    fn scheme() -> &'static str {
149        "http"
150    }
151
152    fn from_uri(uri: &str) -> Result<Self, CamelError> {
153        let parts = parse_uri(uri)?;
154        Self::from_components(parts)
155    }
156
157    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
158        // Validate scheme - accept both http and https
159        if parts.scheme != "http" && parts.scheme != "https" {
160            return Err(CamelError::InvalidUri(format!(
161                "expected scheme 'http' or 'https', got '{}'",
162                parts.scheme
163            )));
164        }
165
166        // Construct base_url from scheme + path
167        // e.g., "http://localhost:8080/api" from scheme "http" and path "//localhost:8080/api"
168        let base_url = format!("{}:{}", parts.scheme, parts.path);
169
170        let http_method = parts.params.get("httpMethod").cloned();
171
172        let throw_exception_on_failure = match parts.params.get("throwExceptionOnFailure") {
173            Some(v) => parse_bool_param_http(v).map_err(|e| {
174                CamelError::InvalidUri(format!("invalid value for throwExceptionOnFailure: {e}"))
175            })?,
176            None => true,
177        };
178
179        // Parse status code range from "start-end" format (e.g., "200-299")
180        let ok_status_code_range = match parts.params.get("okStatusCodeRange") {
181            Some(v) => parse_ok_status_code_range(v)?,
182            None => (200, 299),
183        };
184
185        let response_timeout = match parts.params.get("responseTimeout") {
186            Some(v) => Some(v.parse::<u64>().map(Duration::from_millis).map_err(|e| {
187                CamelError::InvalidUri(format!("invalid value for responseTimeout: {e}"))
188            })?),
189            None => None,
190        };
191
192        // SSRF protection settings
193        let allow_private_ips = match parts.params.get("allowPrivateIps") {
194            Some(v) => parse_bool_param_http(v).map_err(|e| {
195                CamelError::InvalidUri(format!("invalid value for allowPrivateIps: {e}"))
196            })?,
197            None => false, // Default: block private IPs
198        };
199
200        // Parse comma-separated blocked hosts
201        let blocked_hosts = parts
202            .params
203            .get("blockedHosts")
204            .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
205            .unwrap_or_default();
206
207        let max_body_size = match parts.params.get("maxBodySize") {
208            Some(v) => v.parse::<usize>().map_err(|e| {
209                CamelError::InvalidUri(format!("invalid value for maxBodySize: {e}"))
210            })?,
211            None => 10 * 1024 * 1024, // Default: 10MB
212        };
213
214        let read_timeout_ms = match parts.params.get("readTimeout") {
215            Some(v) => v.parse::<u64>().map_err(|e| {
216                CamelError::InvalidUri(format!("invalid value for readTimeout: {e}"))
217            })?,
218            None => 30_000, // Default: 30s
219        };
220
221        let max_response_bytes = match parts.params.get("maxResponseBytes") {
222            Some(v) => v.parse::<usize>().map_err(|e| {
223                CamelError::InvalidUri(format!("invalid value for maxResponseBytes: {e}"))
224            })?,
225            None => 10 * 1024 * 1024, // Default: 10MB
226        };
227
228        let auth = parse_auth_from_params(&parts.params)?;
229
230        let user_agent = parts.params.get("userAgent").cloned();
231
232        let cookie_handling = match parts.params.get("cookieHandling") {
233            Some(v) if v.eq_ignore_ascii_case("inmemory") => CookieHandling::InMemory,
234            Some(v) if v.eq_ignore_ascii_case("disabled") => CookieHandling::Disabled,
235            Some(v) => {
236                return Err(CamelError::InvalidUri(format!(
237                    "invalid value for cookieHandling: {v} (expected Disabled or InMemory)"
238                )));
239            }
240            None => CookieHandling::Disabled,
241        };
242
243        let bridge_endpoint = match parts.params.get("bridgeEndpoint") {
244            Some(v) => parse_bool_param_http(v).map_err(|e| {
245                CamelError::InvalidUri(format!("invalid value for bridgeEndpoint: {e}"))
246            })?,
247            None => false,
248        };
249
250        let connection_close = match parts.params.get("connectionClose") {
251            Some(v) => parse_bool_param_http(v).map_err(|e| {
252                CamelError::InvalidUri(format!("invalid value for connectionClose: {e}"))
253            })?,
254            None => false,
255        };
256
257        let skip_request_headers = parts
258            .params
259            .get("skipRequestHeaders")
260            .map(|v| {
261                v.split(',')
262                    .map(str::trim)
263                    .filter(|s| !s.is_empty())
264                    .map(|s| s.to_ascii_lowercase())
265                    .collect::<Vec<_>>()
266            })
267            .unwrap_or_default();
268
269        let skip_response_headers = parts
270            .params
271            .get("skipResponseHeaders")
272            .map(|v| {
273                v.split(',')
274                    .map(str::trim)
275                    .filter(|s| !s.is_empty())
276                    .map(|s| s.to_ascii_lowercase())
277                    .collect::<Vec<_>>()
278            })
279            .unwrap_or_default();
280
281        // Collect remaining params (not Camel options) as query params
282        let query_params: HashMap<String, String> = parts
283            .params
284            .into_iter()
285            .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
286            .collect();
287
288        Ok(Self {
289            base_url,
290            http_method,
291            throw_exception_on_failure,
292            ok_status_code_range,
293            response_timeout,
294            query_params,
295            allow_private_ips,
296            blocked_hosts,
297            max_body_size,
298            read_timeout_ms,
299            max_response_bytes,
300            auth,
301            user_agent,
302            cookie_handling,
303            bridge_endpoint,
304            connection_close,
305            skip_request_headers,
306            skip_response_headers,
307        })
308    }
309}
310
311fn parse_auth_from_params(params: &HashMap<String, String>) -> Result<HttpAuth, CamelError> {
312    let Some(method) = params.get("authMethod") else {
313        return Ok(HttpAuth::None);
314    };
315
316    if method.eq_ignore_ascii_case("none") {
317        return Ok(HttpAuth::None);
318    }
319
320    if method.eq_ignore_ascii_case("basic") {
321        let username = params.get("authUsername").cloned().ok_or_else(|| {
322            CamelError::InvalidUri("authUsername is required for authMethod=Basic".to_string())
323        })?;
324        let password = params.get("authPassword").cloned().ok_or_else(|| {
325            CamelError::InvalidUri("authPassword is required for authMethod=Basic".to_string())
326        })?;
327        return Ok(HttpAuth::Basic { username, password });
328    }
329
330    if method.eq_ignore_ascii_case("bearer") {
331        let token = params.get("authBearerToken").cloned().ok_or_else(|| {
332            CamelError::InvalidUri("authBearerToken is required for authMethod=Bearer".to_string())
333        })?;
334        return Ok(HttpAuth::Bearer { token });
335    }
336
337    Err(CamelError::InvalidUri(format!(
338        "invalid value for authMethod: {method} (expected None, Basic, or Bearer)"
339    )))
340}
341
342fn parse_bool_param_http(value: &str) -> Result<bool, CamelError> {
343    match value.to_ascii_lowercase().as_str() {
344        "true" | "1" | "yes" => Ok(true),
345        "false" | "0" | "no" => Ok(false),
346        _ => Err(CamelError::InvalidUri(format!(
347            "invalid boolean value: '{value}'"
348        ))),
349    }
350}
351
352impl HttpEndpointConfig {
353    pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
354        let parts = parse_uri(uri)?;
355        let mut endpoint = Self::from_components(parts.clone())?;
356        if endpoint.response_timeout.is_none() {
357            endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
358        }
359        if !parts.params.contains_key("allowPrivateIps") {
360            endpoint.allow_private_ips = config.allow_private_ips;
361        }
362        if !parts.params.contains_key("blockedHosts") {
363            endpoint.blocked_hosts = config.blocked_hosts.clone();
364        }
365        if !parts.params.contains_key("maxBodySize") {
366            endpoint.max_body_size = config.max_body_size;
367        }
368        if !parts.params.contains_key("readTimeout") {
369            endpoint.read_timeout_ms = config.read_timeout_ms;
370        }
371        if !parts.params.contains_key("maxResponseBytes") {
372            endpoint.max_response_bytes = config.max_response_bytes;
373        }
374        if !parts.params.contains_key("okStatusCodeRange")
375            && let Some(range) = &config.ok_status_code_range
376        {
377            endpoint.ok_status_code_range = parse_ok_status_code_range(range)?;
378        }
379
380        Ok(endpoint)
381    }
382}
383
384// ---------------------------------------------------------------------------
385// HttpServerConfig
386// ---------------------------------------------------------------------------
387
388/// Configuration for an HTTP server (consumer) endpoint.
389#[derive(Debug, Clone)]
390pub struct HttpServerConfig {
391    /// Bind address, e.g. "0.0.0.0" or "127.0.0.1".
392    pub host: String,
393    /// TCP port to listen on.
394    pub port: u16,
395    /// URL path this consumer handles, e.g. "/orders".
396    pub path: String,
397    /// Maximum request body size in bytes.
398    pub max_request_body: usize,
399    /// Maximum response body size for materializing streams in bytes.
400    pub max_response_body: usize,
401    /// Maximum number of in-flight requests handled concurrently by this server.
402    pub max_inflight_requests: usize,
403}
404
405impl UriConfig for HttpServerConfig {
406    /// Returns "http" as the primary scheme (also accepts "https")
407    fn scheme() -> &'static str {
408        "http"
409    }
410
411    fn from_uri(uri: &str) -> Result<Self, CamelError> {
412        let parts = parse_uri(uri)?;
413        Self::from_components(parts)
414    }
415
416    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
417        // Validate scheme - accept both http and https
418        if parts.scheme != "http" && parts.scheme != "https" {
419            return Err(CamelError::InvalidUri(format!(
420                "expected scheme 'http' or 'https', got '{}'",
421                parts.scheme
422            )));
423        }
424
425        // parts.path is everything after the scheme colon, e.g. "//0.0.0.0:8080/orders"
426        // Strip leading "//"
427        let authority_and_path = parts.path.trim_start_matches('/');
428
429        // Split on the first "/" to separate "host:port" from "/path"
430        let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
431            (&authority_and_path[..idx], &authority_and_path[idx..])
432        } else {
433            (authority_and_path, "/")
434        };
435
436        let path = if path_suffix.is_empty() {
437            "/"
438        } else {
439            path_suffix
440        }
441        .to_string();
442
443        // Parse host:port from authority
444        let (host, port) = if let Some(colon) = authority.rfind(':') {
445            let port_str = &authority[colon + 1..];
446            match port_str.parse::<u16>() {
447                Ok(p) => (authority[..colon].to_string(), p),
448                Err(_) => {
449                    return Err(CamelError::InvalidUri(format!(
450                        "invalid port '{}' in authority",
451                        port_str
452                    )));
453                }
454            }
455        } else {
456            // Default port based on scheme: 443 for https, 80 for http
457            let default_port = if parts.scheme == "https" { 443 } else { 80 };
458            (authority.to_string(), default_port)
459        };
460
461        let max_request_body = parts
462            .params
463            .get("maxRequestBody")
464            .and_then(|v| v.parse::<usize>().ok())
465            .unwrap_or(2 * 1024 * 1024); // Default: 2MB
466
467        let max_response_body = parts
468            .params
469            .get("maxResponseBody")
470            .and_then(|v| v.parse::<usize>().ok())
471            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
472
473        let max_inflight_requests = parts
474            .params
475            .get("maxInflightRequests")
476            .and_then(|v| v.parse::<usize>().ok())
477            .unwrap_or(1024);
478
479        Ok(Self {
480            host,
481            port,
482            path,
483            max_request_body,
484            max_response_body,
485            max_inflight_requests,
486        })
487    }
488}
489
490impl HttpServerConfig {
491    pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
492        let parts = parse_uri(uri)?;
493        let mut server = Self::from_components(parts.clone())?;
494        if !parts.params.contains_key("maxRequestBody") {
495            server.max_request_body = config.max_request_body;
496        }
497        if !parts.params.contains_key("maxResponseBody") {
498            // Default max_response_body is 10MB via HttpConfig::default().max_body_size.
499            server.max_response_body = config.max_body_size;
500        }
501        Ok(server)
502    }
503}
504
505// ---------------------------------------------------------------------------
506// RequestEnvelope / HttpReply
507// ---------------------------------------------------------------------------
508
509/// Body de la respuesta HTTP: bytes ya materializados o stream lazy.
510pub(crate) enum HttpReplyBody {
511    Bytes(bytes::Bytes),
512    Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
513}
514
515/// An inbound HTTP request sent from the Axum dispatch handler to an
516/// `HttpConsumer` receive loop.
517pub(crate) struct RequestEnvelope {
518    pub(crate) method: String,
519    pub(crate) path: String,
520    pub(crate) query: String,
521    pub(crate) headers: http::HeaderMap,
522    pub(crate) body: StreamBody,
523    pub(crate) reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
524}
525
526/// The HTTP response that `HttpConsumer` sends back to the Axum handler.
527pub(crate) struct HttpReply {
528    pub(crate) status: u16,
529    pub(crate) headers: Vec<(String, String)>,
530    pub(crate) body: HttpReplyBody,
531}
532
533// ---------------------------------------------------------------------------
534// DispatchTable / ServerRegistry
535// ---------------------------------------------------------------------------
536
537/// Maps URL path → channel sender for the consumer that owns that path.
538pub(crate) type DispatchTable =
539    Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
540
541type ServerKey = (String, u16);
542
543/// Handle to a running Axum server on one interface/port.
544#[allow(dead_code)]
545struct ServerHandle {
546    dispatch: DispatchTable,
547    max_request_body: usize,
548    max_response_body: usize,
549    max_inflight_requests: usize,
550    inflight: Arc<tokio::sync::Semaphore>,
551    /// Kept alive so the task isn't dropped; not used directly.
552    _task: tokio::task::JoinHandle<()>,
553}
554
555/// Process-global registry mapping (host, port) → running Axum server handle.
556pub struct ServerRegistry {
557    inner: Mutex<HashMap<ServerKey, Arc<OnceCell<ServerHandle>>>>,
558}
559
560impl ServerRegistry {
561    /// Returns the global singleton.
562    pub fn global() -> &'static Self {
563        static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
564        INSTANCE.get_or_init(|| ServerRegistry {
565            inner: Mutex::new(HashMap::new()),
566        })
567    }
568
569    /// Returns the `DispatchTable` for `port`, spawning a new Axum server if
570    /// none is running on that port yet.
571    pub(crate) async fn get_or_spawn(
572        &'static self,
573        host: &str,
574        port: u16,
575        max_request_body: usize,
576        max_response_body: usize,
577        max_inflight_requests: usize,
578    ) -> Result<DispatchTable, CamelError> {
579        let host_owned = host.to_string();
580
581        let cell = {
582            let mut guard = self.inner.lock().map_err(|_| {
583                CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
584            })?;
585            let key = (host.to_string(), port);
586            guard
587                .entry(key)
588                .or_insert_with(|| Arc::new(OnceCell::new()))
589                .clone()
590        };
591
592        if let Some(existing) = cell.get()
593            && existing.max_request_body != max_request_body
594        {
595            return Err(CamelError::EndpointCreationFailed(format!(
596                "incompatible maxRequestBody for shared server (host={host}, port={port}): {} vs {}",
597                existing.max_request_body, max_request_body
598            )));
599        }
600
601        if let Some(existing) = cell.get()
602            && existing.max_response_body != max_response_body
603        {
604            return Err(CamelError::EndpointCreationFailed(format!(
605                "incompatible maxResponseBody for shared server (host={host}, port={port}): {} vs {}",
606                existing.max_response_body, max_response_body
607            )));
608        }
609
610        if let Some(existing) = cell.get()
611            && existing.max_inflight_requests != max_inflight_requests
612        {
613            return Err(CamelError::EndpointCreationFailed(format!(
614                "incompatible maxInflightRequests for shared server (host={host}, port={port}): {} vs {}",
615                existing.max_inflight_requests, max_inflight_requests
616            )));
617        }
618
619        let handle = cell
620            .get_or_try_init(|| async {
621                let addr = format!("{host_owned}:{port}");
622                let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
623                    CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
624                })?;
625                let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
626                let inflight = Arc::new(tokio::sync::Semaphore::new(max_inflight_requests));
627                let task = tokio::spawn(run_axum_server(
628                    listener,
629                    Arc::clone(&dispatch),
630                    max_request_body,
631                    max_response_body,
632                    Arc::clone(&inflight),
633                ));
634                Ok::<ServerHandle, CamelError>(ServerHandle {
635                    dispatch,
636                    max_request_body,
637                    max_response_body,
638                    max_inflight_requests,
639                    inflight,
640                    _task: task,
641                })
642            })
643            .await?;
644
645        Ok(Arc::clone(&handle.dispatch))
646    }
647
648    /// Reset the global registry — **test-only**.
649    ///
650    /// Clears all registered server handles so that tests can start from a clean
651    /// state. This is intentionally `#[cfg(test)]` because the registry is a
652    /// process-global singleton in production and resetting it would break
653    /// running servers.
654    #[cfg(test)]
655    pub fn reset() {
656        let instance = Self::global();
657        let mut guard = instance
658            .inner
659            .lock()
660            .expect("ServerRegistry lock poisoned during test reset");
661        guard.clear();
662    }
663}
664
665// ---------------------------------------------------------------------------
666// Axum server
667// ---------------------------------------------------------------------------
668
669use axum::{
670    Router,
671    body::Body as AxumBody,
672    extract::{Request, State},
673    http::{Response, StatusCode},
674    response::IntoResponse,
675};
676
677#[derive(Clone)]
678struct AppState {
679    dispatch: DispatchTable,
680    max_request_body: usize,
681    max_response_body: usize,
682    inflight: Arc<tokio::sync::Semaphore>,
683}
684
685async fn run_axum_server(
686    listener: tokio::net::TcpListener,
687    dispatch: DispatchTable,
688    max_request_body: usize,
689    max_response_body: usize,
690    inflight: Arc<tokio::sync::Semaphore>,
691) {
692    let state = AppState {
693        dispatch,
694        max_request_body,
695        max_response_body,
696        inflight,
697    };
698    let app = Router::new().fallback(dispatch_handler).with_state(state);
699
700    axum::serve(listener, app).await.unwrap_or_else(|e| {
701        tracing::error!(error = %e, "Axum server error");
702    });
703}
704
705async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
706    let method = req.method().to_string();
707    let path = req.uri().path().to_string();
708    let query = req.uri().query().unwrap_or("").to_string();
709    let headers = req.headers().clone();
710
711    // Check Content-Length against limit BEFORE opening the stream
712    let content_length: Option<u64> = headers
713        .get(http::header::CONTENT_LENGTH)
714        .and_then(|v| v.to_str().ok())
715        .and_then(|s| s.parse().ok());
716
717    if let Some(len) = content_length
718        && len > state.max_request_body as u64
719    {
720        return Response::builder()
721            .status(StatusCode::PAYLOAD_TOO_LARGE)
722            .body(AxumBody::from("Request body exceeds configured limit"))
723            .expect("infallible"); // allow-unwrap
724    }
725
726    let _permit = match Arc::clone(&state.inflight).try_acquire_owned() {
727        Ok(permit) => permit,
728        Err(_) => {
729            return Response::builder()
730                .status(StatusCode::SERVICE_UNAVAILABLE)
731                .body(AxumBody::from("Service Unavailable"))
732                .expect("infallible"); // allow-unwrap
733        }
734    };
735
736    // Build StreamBody from Axum body WITHOUT materializing
737    let content_type = headers
738        .get(http::header::CONTENT_TYPE)
739        .and_then(|v| v.to_str().ok())
740        .map(|s| s.to_string());
741
742    let data_stream: BodyDataStream = req.into_body().into_data_stream();
743    let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
744    let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
745
746    let stream_body = StreamBody {
747        stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
748        metadata: StreamMetadata {
749            size_hint: content_length,
750            content_type,
751            origin: None,
752        },
753    };
754
755    // Look up handler for this path
756    let sender = {
757        let table = state.dispatch.read().await;
758        table.get(&path).cloned()
759    };
760    let Some(sender) = sender else {
761        return Response::builder()
762            .status(StatusCode::NOT_FOUND)
763            .body(AxumBody::from("No consumer registered for this path"))
764            .expect("infallible"); // allow-unwrap
765    };
766
767    let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
768    let envelope = RequestEnvelope {
769        method,
770        path,
771        query,
772        headers,
773        body: stream_body,
774        reply_tx,
775    };
776
777    if sender.send(envelope).await.is_err() {
778        return Response::builder()
779            .status(StatusCode::SERVICE_UNAVAILABLE)
780            .body(AxumBody::from("Consumer unavailable"))
781            .expect("infallible"); // allow-unwrap
782    }
783
784    match reply_rx.await {
785        Ok(reply) => {
786            let reply = match reply.body {
787                HttpReplyBody::Bytes(b)
788                    if exceeds_max_response_body(b.len(), state.max_response_body) =>
789                {
790                    HttpReply {
791                        status: 500,
792                        headers: vec![],
793                        body: HttpReplyBody::Bytes(bytes::Bytes::from(
794                            "Response body exceeds configured limit",
795                        )),
796                    }
797                }
798                _ => reply,
799            };
800
801            let status =
802                StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
803            let mut builder = Response::builder().status(status);
804            for (k, v) in &reply.headers {
805                builder = builder.header(k.as_str(), v.as_str());
806            }
807            match reply.body {
808                HttpReplyBody::Bytes(b) => builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
809                    Response::builder()
810                        .status(StatusCode::INTERNAL_SERVER_ERROR)
811                        .body(AxumBody::from("Invalid response headers from consumer"))
812                        .expect("infallible") // allow-unwrap
813                }),
814                HttpReplyBody::Stream(stream) => builder
815                    .body(AxumBody::from_stream(stream))
816                    .unwrap_or_else(|_| {
817                        Response::builder()
818                            .status(StatusCode::INTERNAL_SERVER_ERROR)
819                            .body(AxumBody::from("Invalid response headers from consumer"))
820                            .expect("infallible") // allow-unwrap
821                    }),
822            }
823        }
824        Err(_) => Response::builder()
825            .status(StatusCode::INTERNAL_SERVER_ERROR)
826            .body(AxumBody::from("Pipeline error"))
827            .expect("Response::builder() with a known-valid status code and body is infallible"), // allow-unwrap
828    }
829}
830
831fn exceeds_max_response_body(len: usize, max: usize) -> bool {
832    len > max
833}
834
835fn title_case_header(name: &str) -> String {
836    name.split('-')
837        .map(|part| {
838            let mut chars = part.chars();
839            match chars.next() {
840                None => String::new(),
841                Some(first) => first.to_uppercase().chain(chars.as_str().chars()).collect(),
842            }
843        })
844        .collect::<Vec<_>>()
845        .join("-")
846}
847
848// ---------------------------------------------------------------------------
849// HttpConsumer
850// ---------------------------------------------------------------------------
851
852pub struct HttpConsumer {
853    config: HttpServerConfig,
854}
855
856impl HttpConsumer {
857    pub fn new(config: HttpServerConfig) -> Self {
858        Self { config }
859    }
860}
861
862#[async_trait::async_trait]
863impl Consumer for HttpConsumer {
864    async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
865        use camel_component_api::{Body, Exchange, Message};
866
867        let dispatch = ServerRegistry::global()
868            .get_or_spawn(
869                &self.config.host,
870                self.config.port,
871                self.config.max_request_body,
872                self.config.max_response_body,
873                self.config.max_inflight_requests,
874            )
875            .await?;
876
877        // Create a channel for this path and register it
878        let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
879        {
880            let mut table = dispatch.write().await;
881            table.insert(self.config.path.clone(), env_tx);
882        }
883
884        let path = self.config.path.clone();
885        let cancel_token = ctx.cancel_token();
886        loop {
887            tokio::select! {
888                _ = ctx.cancelled() => {
889                    break;
890                }
891                envelope = env_rx.recv() => {
892                    let Some(envelope) = envelope else { break; };
893
894                    // Build Exchange from HTTP request
895                    let mut msg = Message::default();
896
897                    // Set standard Camel HTTP headers
898                    msg.set_header("CamelHttpMethod",
899                        serde_json::Value::String(envelope.method.clone()));
900                    msg.set_header("CamelHttpPath",
901                        serde_json::Value::String(envelope.path.clone()));
902                    msg.set_header("CamelHttpQuery",
903                        serde_json::Value::String(envelope.query.clone()));
904
905                    // Forward HTTP headers with Title-Case names (hyper lowercases them)
906                    for (k, v) in &envelope.headers {
907                        if let Ok(val_str) = v.to_str() {
908                            msg.set_header(
909                                title_case_header(k.as_str()),
910                                serde_json::Value::String(val_str.to_string()),
911                            );
912                        }
913                    }
914
915                    // Body: always arrives as Body::Stream (native streaming)
916                    // Routes can call into_bytes() if they need to materialize
917                    msg.body = Body::Stream(envelope.body);
918
919                    #[allow(unused_mut)]
920                    let mut exchange = Exchange::new(msg);
921
922                    // Extract W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
923                    #[cfg(feature = "otel")]
924                    {
925                        let headers: HashMap<String, String> = envelope
926                            .headers
927                            .iter()
928                            .filter_map(|(k, v)| {
929                                Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
930                            })
931                            .collect();
932                        camel_otel::extract_into_exchange(&mut exchange, &headers);
933                    }
934
935                    let reply_tx = envelope.reply_tx;
936                    let sender = ctx.sender().clone();
937                    let path_clone = path.clone();
938                    let cancel = cancel_token.clone();
939
940                    // Spawn a task to handle this request concurrently
941                    //
942                    // NOTE: This spawns a separate tokio task for each incoming HTTP request to enable
943                    // true concurrent request processing. This change was introduced as part of the
944                    // pipeline concurrency feature and was NOT part of the original HttpConsumer design.
945                    //
946                    // Rationale:
947                    // 1. Without spawning per-request tasks, the send_and_wait() operation would block
948                    //    the consumer's main loop until the pipeline processing completes
949                    // 2. This blocking would prevent multiple HTTP requests from being processed
950                    //    concurrently, even when ConcurrencyModel::Concurrent is enabled on the pipeline
951                    // 3. The channel would never have multiple exchanges buffered simultaneously,
952                    //    defeating the purpose of pipeline-side concurrency
953                    // 4. By spawning a task per request, we allow the consumer loop to continue
954                    //    accepting new requests while existing ones are processed in the pipeline
955                    //
956                    // This approach effectively decouples request acceptance from pipeline processing,
957                    // allowing the channel to buffer multiple exchanges that can be processed concurrently
958                    // by the pipeline when ConcurrencyModel::Concurrent is active.
959                    tokio::spawn(async move {
960                        // Check for cancellation before sending to pipeline.
961                        // Returns 503 (Service Unavailable) instead of letting the request
962                        // enter a shutting-down pipeline. This is a behavioral change from
963                        // the pre-concurrency implementation where cancellation during
964                        // processing would result in a 500 (Internal Server Error).
965                        // 503 is more semantically correct: the server is temporarily
966                        // unable to handle the request due to shutdown.
967                        if cancel.is_cancelled() {
968                            let _ = reply_tx.send(HttpReply {
969                                status: 503,
970                                headers: vec![],
971                                body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
972                            });
973                            return;
974                        }
975
976                        // Send through pipeline and await result
977                        let (tx, rx) = tokio::sync::oneshot::channel();
978                        let envelope = camel_component_api::consumer::ExchangeEnvelope {
979                            exchange,
980                            reply_tx: Some(tx),
981                        };
982
983                        let result = match sender.send(envelope).await {
984                            Ok(()) => rx.await.map_err(|_| camel_component_api::CamelError::ChannelClosed),
985                            Err(_) => Err(camel_component_api::CamelError::ChannelClosed),
986                        }
987                        .and_then(|r| r);
988
989                        let reply = match result {
990                            Ok(out) => {
991                                let status = out
992                                    .input
993                                    .header("CamelHttpResponseCode")
994                                    .and_then(|v| {
995                                        let raw = v.as_u64()
996                                            .or_else(|| v.as_str().and_then(|s| s.parse().ok()))?;
997                                        let code = raw as u16;
998                                        (100..1000).contains(&code).then_some(code)
999                                    })
1000                                    .unwrap_or(200);
1001
1002                                let user_content_type = out
1003                                    .input
1004                                    .header("Content-Type")
1005                                    .and_then(|v| v.as_str().map(|s| s.to_string()));
1006
1007                                let (reply_body, inferred_content_type): (HttpReplyBody, Option<String>) = match out.input.body {
1008                                    Body::Empty => (HttpReplyBody::Bytes(bytes::Bytes::new()), None),
1009                                    Body::Bytes(b) => (HttpReplyBody::Bytes(b), None),
1010                                    Body::Text(s) => (HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())), Some("text/plain; charset=utf-8".to_string())),
1011                                    Body::Xml(s) => (HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())), Some("application/xml".to_string())),
1012                                    Body::Json(v) => (HttpReplyBody::Bytes(bytes::Bytes::from(
1013                                        v.to_string().into_bytes(),
1014                                    )), Some("application/json".to_string())),
1015                                    Body::Stream(s) => {
1016                                        let ct = s.metadata.content_type.clone();
1017                                        match s.stream.lock().await.take() {
1018                                            Some(stream) => (
1019                                                HttpReplyBody::Stream(stream),
1020                                                ct,
1021                                            ),
1022                                            None => {
1023                                                tracing::error!(
1024                                                    "Body::Stream already consumed before HTTP reply — returning 500"
1025                                                );
1026                                                let error_reply = HttpReply {
1027                                                    status: 500,
1028                                                    headers: vec![],
1029                                                    body: HttpReplyBody::Bytes(bytes::Bytes::new()),
1030                                                };
1031                                                if reply_tx.send(error_reply).is_err() {
1032                                                    debug!("reply_tx dropped before error reply could be sent");
1033                                                }
1034                                                return;
1035                                            }
1036                                        }
1037                                    }
1038                                };
1039
1040                                let mut resp_headers: Vec<(String, String)> = out
1041                                    .input
1042                                    .headers
1043                                    .iter()
1044                                    .filter(|(k, _)| !k.starts_with("Camel"))
1045                                    .filter(|(k, _)| {
1046                                        !matches!(
1047                                            k.to_lowercase().as_str(),
1048                                            "content-length"
1049                                            | "content-type"
1050                                            | "transfer-encoding"
1051                                            | "connection"
1052                                            | "cache-control"
1053                                            | "date"
1054                                            | "pragma"
1055                                            | "trailer"
1056                                            | "upgrade"
1057                                            | "via"
1058                                            | "warning"
1059                                            | "host"
1060                                            | "user-agent"
1061                                            | "accept"
1062                                            | "accept-encoding"
1063                                            | "accept-language"
1064                                            | "accept-charset"
1065                                            | "authorization"
1066                                            | "proxy-authorization"
1067                                            | "cookie"
1068                                            | "expect"
1069                                            | "from"
1070                                            | "if-match"
1071                                            | "if-modified-since"
1072                                            | "if-none-match"
1073                                            | "if-range"
1074                                            | "if-unmodified-since"
1075                                            | "max-forwards"
1076                                            | "proxy-connection"
1077                                            | "range"
1078                                            | "referer"
1079                                            | "te"
1080                                        )
1081                                    })
1082                                    .filter_map(|(k, v)| {
1083                                        v.as_str().map(|s| (k.clone(), s.to_string()))
1084                                    })
1085                                    .collect();
1086
1087                                let content_type = user_content_type
1088                                    .or(inferred_content_type);
1089                                if let Some(ct) = content_type {
1090                                    resp_headers.push(("Content-Type".to_string(), ct));
1091                                }
1092
1093                                HttpReply {
1094                                    status,
1095                                    headers: resp_headers,
1096                                    body: reply_body,
1097                                }
1098                            }
1099                            Err(CamelError::Stopped) => {
1100                                tracing::debug!(path = %path_clone, "Route stopped — returning 204 No Content");
1101                                HttpReply {
1102                                    status: 204,
1103                                    headers: vec![],
1104                                    body: HttpReplyBody::Bytes(bytes::Bytes::new()),
1105                                }
1106                            }
1107                            Err(e) => {
1108                                tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
1109                                HttpReply {
1110                                    status: 500,
1111                                    headers: vec![],
1112                                    body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
1113                                }
1114                            }
1115                        };
1116
1117                        // Reply to Axum handler (ignore error if client disconnected)
1118                        let _ = reply_tx.send(reply);
1119                    });
1120                }
1121            }
1122        }
1123
1124        // Deregister this path
1125        {
1126            let mut table = dispatch.write().await;
1127            table.remove(&path);
1128        }
1129
1130        Ok(())
1131    }
1132
1133    async fn stop(&mut self) -> Result<(), CamelError> {
1134        Ok(())
1135    }
1136
1137    fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1138        camel_component_api::ConcurrencyModel::Concurrent { max: None }
1139    }
1140}
1141
1142// ---------------------------------------------------------------------------
1143// HttpComponent / HttpsComponent
1144// ---------------------------------------------------------------------------
1145
1146pub struct HttpComponent {
1147    config: HttpConfig,
1148}
1149
1150fn build_client(config: &HttpConfig, cookie_handling: CookieHandling) -> reqwest::Client {
1151    let mut builder = reqwest::Client::builder()
1152        .connect_timeout(Duration::from_millis(config.connect_timeout_ms))
1153        .pool_max_idle_per_host(config.pool_max_idle_per_host)
1154        .pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
1155
1156    if !config.follow_redirects {
1157        builder = builder.redirect(reqwest::redirect::Policy::none());
1158    } else if let Some(max_redirects) = config.max_redirects {
1159        builder = builder.redirect(reqwest::redirect::Policy::limited(max_redirects));
1160    }
1161
1162    if let Some(proxy_url) = &config.proxy_url
1163        && let Ok(proxy) = reqwest::Proxy::all(proxy_url)
1164    {
1165        builder = builder.proxy(proxy);
1166    }
1167
1168    if matches!(cookie_handling, CookieHandling::InMemory) {
1169        // TODO(HTTP-013): enable reqwest cookie jar once workspace reqwest features include cookie_store.
1170    }
1171
1172    if let Some(tls) = &config.tls
1173        && tls.enabled
1174    {
1175        if tls.insecure || !tls.verify_peer {
1176            builder = builder.danger_accept_invalid_certs(true);
1177        }
1178
1179        if let Some(ca_path) = &tls.ca_cert_path
1180            && let Ok(ca_bytes) = std::fs::read(ca_path)
1181        {
1182            let cert = reqwest::Certificate::from_pem(&ca_bytes)
1183                .or_else(|_| reqwest::Certificate::from_der(&ca_bytes));
1184            if let Ok(ca_cert) = cert {
1185                builder = builder.add_root_certificate(ca_cert);
1186            }
1187        }
1188
1189        if let (Some(cert_path), Some(key_path)) = (&tls.client_cert_path, &tls.client_key_path)
1190            && let (Ok(cert_bytes), Ok(key_bytes)) =
1191                (std::fs::read(cert_path), std::fs::read(key_path))
1192        {
1193            let mut identity_pem = cert_bytes;
1194            identity_pem.extend_from_slice(&key_bytes);
1195            if let Ok(identity) = reqwest::Identity::from_pem(&identity_pem) {
1196                builder = builder.identity(identity);
1197            }
1198        }
1199    }
1200
1201    builder
1202        .build()
1203        .expect("reqwest::Client::build() with valid config should not fail") // allow-unwrap
1204}
1205
1206impl HttpComponent {
1207    pub fn new() -> Self {
1208        let config = HttpConfig::default();
1209        Self { config }
1210    }
1211
1212    pub fn with_config(config: HttpConfig) -> Self {
1213        Self { config }
1214    }
1215
1216    pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1217        match config {
1218            Some(cfg) => Self::with_config(cfg),
1219            None => Self::new(),
1220        }
1221    }
1222}
1223
1224impl Default for HttpComponent {
1225    fn default() -> Self {
1226        Self::new()
1227    }
1228}
1229
1230impl Component for HttpComponent {
1231    fn scheme(&self) -> &str {
1232        "http"
1233    }
1234
1235    fn create_endpoint(
1236        &self,
1237        uri: &str,
1238        _ctx: &dyn camel_component_api::ComponentContext,
1239    ) -> Result<Box<dyn Endpoint>, CamelError> {
1240        self.config.validate()?;
1241        let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1242        let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1243        let client = build_client(&self.config, config.cookie_handling);
1244        Ok(Box::new(HttpEndpoint {
1245            uri: uri.to_string(),
1246            config,
1247            server_config,
1248            client,
1249        }))
1250    }
1251}
1252
1253pub struct HttpsComponent {
1254    config: HttpConfig,
1255}
1256
1257impl HttpsComponent {
1258    pub fn new() -> Self {
1259        let config = HttpConfig::default();
1260        Self { config }
1261    }
1262
1263    pub fn with_config(config: HttpConfig) -> Self {
1264        Self { config }
1265    }
1266
1267    pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1268        match config {
1269            Some(cfg) => Self::with_config(cfg),
1270            None => Self::new(),
1271        }
1272    }
1273}
1274
1275impl Default for HttpsComponent {
1276    fn default() -> Self {
1277        Self::new()
1278    }
1279}
1280
1281impl Component for HttpsComponent {
1282    fn scheme(&self) -> &str {
1283        "https"
1284    }
1285
1286    fn create_endpoint(
1287        &self,
1288        uri: &str,
1289        _ctx: &dyn camel_component_api::ComponentContext,
1290    ) -> Result<Box<dyn Endpoint>, CamelError> {
1291        self.config.validate()?;
1292        let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1293        let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1294        let client = build_client(&self.config, config.cookie_handling);
1295        Ok(Box::new(HttpEndpoint {
1296            uri: uri.to_string(),
1297            config,
1298            server_config,
1299            client,
1300        }))
1301    }
1302}
1303
1304// ---------------------------------------------------------------------------
1305// HttpEndpoint
1306// ---------------------------------------------------------------------------
1307
1308struct HttpEndpoint {
1309    uri: String,
1310    config: HttpEndpointConfig,
1311    server_config: HttpServerConfig,
1312    client: reqwest::Client,
1313}
1314
1315impl Endpoint for HttpEndpoint {
1316    fn uri(&self) -> &str {
1317        &self.uri
1318    }
1319
1320    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1321        Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
1322    }
1323
1324    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1325        Ok(BoxProcessor::new(HttpProducer {
1326            config: Arc::new(self.config.clone()),
1327            client: self.client.clone(),
1328        }))
1329    }
1330}
1331
1332// ---------------------------------------------------------------------------
1333// SSRF Protection
1334// ---------------------------------------------------------------------------
1335
1336fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
1337    let parsed = url::Url::parse(url)
1338        .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1339
1340    // Check blocked hosts
1341    if let Some(host) = parsed.host_str()
1342        && config.blocked_hosts.iter().any(|blocked| host == blocked)
1343    {
1344        return Err(CamelError::ProcessorError(format!(
1345            "Host '{}' is blocked",
1346            host
1347        )));
1348    }
1349
1350    // Check private IPs if not allowed
1351    if !config.allow_private_ips
1352        && let Some(host) = parsed.host()
1353    {
1354        match host {
1355            url::Host::Ipv4(ip) => {
1356                if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
1357                    return Err(CamelError::ProcessorError(format!(
1358                        "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
1359                        ip
1360                    )));
1361                }
1362            }
1363            url::Host::Ipv6(ip) => {
1364                if ip.is_loopback() {
1365                    return Err(CamelError::ProcessorError(format!(
1366                        "Loopback IP '{}' not allowed",
1367                        ip
1368                    )));
1369                }
1370            }
1371            url::Host::Domain(domain) => {
1372                // Block common internal domains
1373                let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
1374                if blocked_domains.contains(&domain) {
1375                    return Err(CamelError::ProcessorError(format!(
1376                        "Domain '{}' is not allowed",
1377                        domain
1378                    )));
1379                }
1380            }
1381        }
1382    }
1383
1384    Ok(())
1385}
1386
1387fn is_private_ip(ip: &IpAddr) -> bool {
1388    match ip {
1389        IpAddr::V4(ipv4) => {
1390            ipv4.is_private() || ipv4.is_loopback() || ipv4.is_link_local() || ipv4.octets()[0] == 0
1391        }
1392        IpAddr::V6(ipv6) => {
1393            let seg0 = ipv6.segments()[0];
1394            ipv6.is_loopback()
1395                // fc00::/7 (ULA)
1396                || (seg0 & 0xfe00) == 0xfc00
1397                // fe80::/10 (link-local)
1398                || (seg0 & 0xffc0) == 0xfe80
1399                // ::ffff:0:0/96 (IPv4-mapped): only block if the mapped IPv4 is private
1400                || ipv6
1401                    .to_ipv4_mapped()
1402                    .map(|v4| {
1403                        v4.is_private()
1404                            || v4.is_loopback()
1405                            || v4.is_link_local()
1406                            || v4.octets()[0] == 0
1407                    })
1408                    .unwrap_or(false)
1409        }
1410    }
1411}
1412
1413async fn validate_resolved_host_for_ssrf(
1414    url: &str,
1415    config: &HttpEndpointConfig,
1416) -> Result<(), CamelError> {
1417    if config.allow_private_ips {
1418        return Ok(());
1419    }
1420
1421    let parsed = url::Url::parse(url)
1422        .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1423    let Some(host) = parsed.host_str() else {
1424        return Ok(());
1425    };
1426    let Some(port) = parsed.port_or_known_default() else {
1427        return Ok(());
1428    };
1429
1430    let resolved = tokio::net::lookup_host((host, port)).await.map_err(|e| {
1431        CamelError::ProcessorError(format!("Failed to resolve host '{}': {}", host, e))
1432    })?;
1433
1434    for addr in resolved {
1435        let ip = addr.ip();
1436        if is_private_ip(&ip) {
1437            return Err(CamelError::ProcessorError(format!(
1438                "Target resolved to private IP: {}",
1439                ip
1440            )));
1441        }
1442    }
1443
1444    Ok(())
1445}
1446
1447// ---------------------------------------------------------------------------
1448// HttpProducer
1449// ---------------------------------------------------------------------------
1450
1451#[derive(Clone)]
1452struct HttpProducer {
1453    config: Arc<HttpEndpointConfig>,
1454    client: reqwest::Client,
1455}
1456
1457impl HttpProducer {
1458    fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1459        if let Some(ref method) = config.http_method {
1460            return method.to_uppercase();
1461        }
1462        if let Some(method) = exchange
1463            .input
1464            .header("CamelHttpMethod")
1465            .and_then(|v| v.as_str())
1466        {
1467            return method.to_uppercase();
1468        }
1469        if !exchange.input.body.is_empty() {
1470            return "POST".to_string();
1471        }
1472        "GET".to_string()
1473    }
1474
1475    fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1476        if let Some(uri) = exchange
1477            .input
1478            .header("CamelHttpUri")
1479            .and_then(|v| v.as_str())
1480        {
1481            let mut url = uri.to_string();
1482            if let Some(path) = exchange
1483                .input
1484                .header("CamelHttpPath")
1485                .and_then(|v| v.as_str())
1486            {
1487                if !url.ends_with('/') && !path.starts_with('/') {
1488                    url.push('/');
1489                }
1490                url.push_str(path);
1491            }
1492            if let Some(query) = exchange
1493                .input
1494                .header("CamelHttpQuery")
1495                .and_then(|v| v.as_str())
1496            {
1497                url.push('?');
1498                url.push_str(query);
1499            }
1500            return url;
1501        }
1502
1503        let mut url = config.base_url.clone();
1504
1505        if let Some(path) = exchange
1506            .input
1507            .header("CamelHttpPath")
1508            .and_then(|v| v.as_str())
1509        {
1510            if !url.ends_with('/') && !path.starts_with('/') {
1511                url.push('/');
1512            }
1513            url.push_str(path);
1514        }
1515
1516        if let Some(query) = exchange
1517            .input
1518            .header("CamelHttpQuery")
1519            .and_then(|v| v.as_str())
1520        {
1521            url.push('?');
1522            url.push_str(query);
1523        } else if !config.query_params.is_empty() {
1524            let mut parsed = url::Url::parse(&url).expect("base URL must be valid"); // allow-unwrap
1525            for (k, v) in &config.query_params {
1526                parsed.query_pairs_mut().append_pair(k, v);
1527            }
1528            url = parsed.to_string();
1529        }
1530
1531        url
1532    }
1533
1534    fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1535        status >= range.0 && status <= range.1
1536    }
1537}
1538
1539impl Service<Exchange> for HttpProducer {
1540    type Response = Exchange;
1541    type Error = CamelError;
1542    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1543
1544    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1545        Poll::Ready(Ok(()))
1546    }
1547
1548    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1549        let config = self.config.clone();
1550        let client = self.client.clone();
1551
1552        Box::pin(async move {
1553            let method_str = HttpProducer::resolve_method(&exchange, &config);
1554            let url = HttpProducer::resolve_url(&exchange, &config);
1555
1556            // SECURITY: Validate URL for SSRF
1557            validate_url_for_ssrf(&url, &config)?;
1558            validate_resolved_host_for_ssrf(&url, &config).await?;
1559
1560            debug!(
1561                correlation_id = %exchange.correlation_id(),
1562                method = %method_str,
1563                url = %url,
1564                "HTTP request"
1565            );
1566
1567            let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1568                CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1569            })?;
1570
1571            let mut request = client.request(method, &url);
1572
1573            if let Some(timeout) = config.response_timeout {
1574                request = request.timeout(timeout);
1575            }
1576
1577            if let Some(user_agent) = &config.user_agent
1578                && !config.bridge_endpoint
1579            {
1580                request = request.header("User-Agent", user_agent.clone());
1581            }
1582
1583            // Inject W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
1584            #[cfg(feature = "otel")]
1585            let should_inject_otel = !config.bridge_endpoint;
1586            #[cfg(feature = "otel")]
1587            if should_inject_otel {
1588                let mut otel_headers = HashMap::new();
1589                camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1590                for (k, v) in otel_headers {
1591                    if let (Ok(name), Ok(val)) = (
1592                        reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1593                        reqwest::header::HeaderValue::from_str(&v),
1594                    ) {
1595                        request = request.header(name, val);
1596                    }
1597                }
1598            }
1599
1600            for (key, value) in &exchange.input.headers {
1601                if !key.starts_with("Camel")
1602                    && !config
1603                        .skip_request_headers
1604                        .iter()
1605                        .any(|h| h.eq_ignore_ascii_case(key))
1606                    && let Some(val_str) = value.as_str()
1607                    && let (Ok(name), Ok(val)) = (
1608                        reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1609                        reqwest::header::HeaderValue::from_str(val_str),
1610                    )
1611                {
1612                    request = request.header(name, val);
1613                }
1614            }
1615
1616            if !config.bridge_endpoint {
1617                match &config.auth {
1618                    HttpAuth::None => {}
1619                    HttpAuth::Basic { username, password } => {
1620                        request = request.basic_auth(username, Some(password));
1621                    }
1622                    HttpAuth::Bearer { token } => {
1623                        request = request.bearer_auth(token);
1624                    }
1625                }
1626
1627                if config.connection_close {
1628                    request = request.header("Connection", "close");
1629                }
1630            }
1631
1632            match exchange.input.body {
1633                Body::Stream(ref s) => {
1634                    let mut stream_lock = s.stream.lock().await;
1635                    if let Some(stream) = stream_lock.take() {
1636                        request = request.body(reqwest::Body::wrap_stream(stream));
1637                    } else {
1638                        return Err(CamelError::AlreadyConsumed);
1639                    }
1640                }
1641                _ => {
1642                    // For other types, materialize with configured limit
1643                    let body = std::mem::take(&mut exchange.input.body);
1644                    let bytes = body.into_bytes(config.max_body_size).await?;
1645                    if !bytes.is_empty() {
1646                        request = request.body(bytes);
1647                    }
1648                }
1649            }
1650
1651            let response = request
1652                .send()
1653                .await
1654                .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1655
1656            let status_code = response.status().as_u16();
1657            let status_text = response
1658                .status()
1659                .canonical_reason()
1660                .unwrap_or("Unknown")
1661                .to_string();
1662
1663            for (key, value) in response.headers() {
1664                if config
1665                    .skip_response_headers
1666                    .iter()
1667                    .any(|h| h.eq_ignore_ascii_case(key.as_str()))
1668                {
1669                    continue;
1670                }
1671                if let Ok(val_str) = value.to_str() {
1672                    exchange.input.set_header(
1673                        title_case_header(key.as_str()),
1674                        serde_json::Value::String(val_str.to_string()),
1675                    );
1676                }
1677            }
1678
1679            exchange.input.set_header(
1680                "CamelHttpResponseCode",
1681                serde_json::Value::Number(status_code.into()),
1682            );
1683            exchange.input.set_header(
1684                "CamelHttpResponseText",
1685                serde_json::Value::String(status_text.clone()),
1686            );
1687
1688            // Read response body with timeout and size guard (HTTP-004, HTTP-005)
1689            let read_timeout = Duration::from_millis(config.read_timeout_ms);
1690            let response_body = tokio::time::timeout(read_timeout, async {
1691                // Check Content-Length header before allocating
1692                if let Some(content_len) = response.content_length()
1693                    && content_len > config.max_response_bytes as u64
1694                {
1695                    return Err(CamelError::ProcessorError(format!(
1696                        "Response body too large: {} bytes exceeds limit of {} bytes",
1697                        content_len, config.max_response_bytes
1698                    )));
1699                }
1700                // Use bytes_stream() for lazy streaming with size guard
1701                use futures::TryStreamExt;
1702                let mut stream = response.bytes_stream();
1703                let mut total: usize = 0;
1704                let mut collected = Vec::new();
1705                while let Some(chunk) = stream.try_next().await.map_err(|e| {
1706                    CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1707                })? {
1708                    total += chunk.len();
1709                    if total > config.max_response_bytes {
1710                        return Err(CamelError::ProcessorError(format!(
1711                            "Response body too large: {} bytes exceeds limit of {} bytes",
1712                            total, config.max_response_bytes
1713                        )));
1714                    }
1715                    collected.push(chunk);
1716                }
1717                let mut result = bytes::BytesMut::with_capacity(total);
1718                for chunk in collected {
1719                    result.extend_from_slice(&chunk);
1720                }
1721                Ok::<bytes::Bytes, CamelError>(result.freeze())
1722            })
1723            .await
1724            .map_err(|_| {
1725                CamelError::ProcessorError(format!(
1726                    "Read timeout after {}ms",
1727                    config.read_timeout_ms
1728                ))
1729            })??;
1730
1731            if config.throw_exception_on_failure
1732                && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1733            {
1734                return Err(CamelError::HttpOperationFailed {
1735                    method: method_str,
1736                    url,
1737                    status_code,
1738                    status_text,
1739                    response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1740                });
1741            }
1742
1743            if !response_body.is_empty() {
1744                exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1745            }
1746
1747            debug!(
1748                correlation_id = %exchange.correlation_id(),
1749                status = status_code,
1750                url = %url,
1751                "HTTP response"
1752            );
1753            Ok(exchange)
1754        })
1755    }
1756}
1757
1758#[cfg(test)]
1759mod tests {
1760    use super::*;
1761    use camel_component_api::{Message, NoOpComponentContext};
1762    use std::sync::Arc;
1763    use std::time::Duration;
1764
1765    fn test_producer_ctx() -> ProducerContext {
1766        ProducerContext::new()
1767    }
1768
1769    #[test]
1770    fn test_http_config_defaults() {
1771        let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1772        assert_eq!(config.base_url, "http://localhost:8080/api");
1773        assert!(config.http_method.is_none());
1774        assert!(config.throw_exception_on_failure);
1775        assert_eq!(config.ok_status_code_range, (200, 299));
1776        assert!(config.response_timeout.is_none());
1777        assert!(matches!(config.auth, HttpAuth::None));
1778        assert!(matches!(config.cookie_handling, CookieHandling::Disabled));
1779        assert!(!config.bridge_endpoint);
1780        assert!(!config.connection_close);
1781    }
1782
1783    #[test]
1784    fn test_http_config_scheme() {
1785        // UriConfig trait method returns "http" as primary scheme
1786        assert_eq!(HttpEndpointConfig::scheme(), "http");
1787    }
1788
1789    #[test]
1790    fn test_http_config_from_components() {
1791        // Test from_components directly (trait method)
1792        let components = camel_component_api::UriComponents {
1793            scheme: "https".to_string(),
1794            path: "//api.example.com/v1".to_string(),
1795            params: std::collections::HashMap::from([(
1796                "httpMethod".to_string(),
1797                "POST".to_string(),
1798            )]),
1799        };
1800        let config = HttpEndpointConfig::from_components(components).unwrap();
1801        assert_eq!(config.base_url, "https://api.example.com/v1");
1802        assert_eq!(config.http_method, Some("POST".to_string()));
1803    }
1804
1805    #[test]
1806    fn test_http_config_with_options() {
1807        let config = HttpEndpointConfig::from_uri(
1808            "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1809        ).unwrap();
1810        assert_eq!(config.base_url, "https://api.example.com/v1");
1811        assert_eq!(config.http_method, Some("PUT".to_string()));
1812        assert!(!config.throw_exception_on_failure);
1813        assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1814    }
1815
1816    #[test]
1817    fn test_http_endpoint_config_auth_and_headers_options() {
1818        let config = HttpEndpointConfig::from_uri(
1819            "http://localhost/api?authMethod=Basic&authUsername=u&authPassword=p&userAgent=camel-test&bridgeEndpoint=true&connectionClose=true&skipRequestHeaders=Authorization,X-Secret&skipResponseHeaders=Set-Cookie&cookieHandling=InMemory",
1820        )
1821        .unwrap();
1822
1823        assert!(matches!(
1824            config.auth,
1825            HttpAuth::Basic { username, password } if username == "u" && password == "p"
1826        ));
1827        assert_eq!(config.user_agent.as_deref(), Some("camel-test"));
1828        assert!(matches!(config.cookie_handling, CookieHandling::InMemory));
1829        assert!(config.bridge_endpoint);
1830        assert!(config.connection_close);
1831        assert_eq!(
1832            config.skip_request_headers,
1833            vec!["authorization".to_string(), "x-secret".to_string()]
1834        );
1835        assert_eq!(config.skip_response_headers, vec!["set-cookie".to_string()]);
1836    }
1837
1838    #[test]
1839    fn test_http_endpoint_config_bearer_auth() {
1840        let config = HttpEndpointConfig::from_uri(
1841            "http://localhost/api?authMethod=Bearer&authBearerToken=t",
1842        )
1843        .unwrap();
1844        assert!(matches!(
1845            config.auth,
1846            HttpAuth::Bearer { token } if token == "t"
1847        ));
1848    }
1849
1850    #[test]
1851    fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
1852        let config = HttpConfig::default()
1853            .with_response_timeout_ms(999)
1854            .with_allow_private_ips(true)
1855            .with_blocked_hosts(vec!["evil.com".to_string()])
1856            .with_max_body_size(12345);
1857        let endpoint =
1858            HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
1859        assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
1860        assert!(endpoint.allow_private_ips);
1861        assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
1862        assert_eq!(endpoint.max_body_size, 12345);
1863    }
1864
1865    #[test]
1866    fn test_from_uri_with_defaults_uri_overrides_config() {
1867        let config = HttpConfig::default()
1868            .with_response_timeout_ms(999)
1869            .with_allow_private_ips(true)
1870            .with_blocked_hosts(vec!["evil.com".to_string()])
1871            .with_max_body_size(12345);
1872        let endpoint = HttpEndpointConfig::from_uri_with_defaults(
1873            "http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
1874            &config,
1875        )
1876        .unwrap();
1877        assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
1878        assert!(!endpoint.allow_private_ips);
1879        assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
1880        assert_eq!(endpoint.max_body_size, 99);
1881    }
1882
1883    #[test]
1884    fn test_http_config_ok_status_range() {
1885        let config =
1886            HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1887        assert_eq!(config.ok_status_code_range, (200, 204));
1888    }
1889
1890    #[test]
1891    fn test_http_config_wrong_scheme() {
1892        let result = HttpEndpointConfig::from_uri("file:/tmp");
1893        assert!(result.is_err());
1894    }
1895
1896    #[test]
1897    fn test_http_component_scheme() {
1898        let component = HttpComponent::new();
1899        assert_eq!(component.scheme(), "http");
1900    }
1901
1902    #[test]
1903    fn test_https_component_scheme() {
1904        let component = HttpsComponent::new();
1905        assert_eq!(component.scheme(), "https");
1906    }
1907
1908    #[test]
1909    fn test_http_endpoint_creates_consumer() {
1910        let component = HttpComponent::new();
1911        let ctx = NoOpComponentContext;
1912        let endpoint = component
1913            .create_endpoint("http://0.0.0.0:19100/test", &ctx)
1914            .unwrap();
1915        assert!(endpoint.create_consumer().is_ok());
1916    }
1917
1918    #[test]
1919    fn test_https_endpoint_creates_consumer() {
1920        let component = HttpsComponent::new();
1921        let ctx = NoOpComponentContext;
1922        let endpoint = component
1923            .create_endpoint("https://0.0.0.0:8443/test", &ctx)
1924            .unwrap();
1925        assert!(endpoint.create_consumer().is_ok());
1926    }
1927
1928    #[test]
1929    fn test_http_endpoint_creates_producer() {
1930        let ctx = test_producer_ctx();
1931        let component = HttpComponent::new();
1932        let endpoint_ctx = NoOpComponentContext;
1933        let endpoint = component
1934            .create_endpoint("http://localhost/api", &endpoint_ctx)
1935            .unwrap();
1936        assert!(endpoint.create_producer(&ctx).is_ok());
1937    }
1938
1939    // -----------------------------------------------------------------------
1940    // Producer tests
1941    // -----------------------------------------------------------------------
1942
1943    async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
1944        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1945        let addr = listener.local_addr().unwrap();
1946        let url = format!("http://127.0.0.1:{}", addr.port());
1947
1948        let handle = tokio::spawn(async move {
1949            loop {
1950                if let Ok((mut stream, _)) = listener.accept().await {
1951                    tokio::spawn(async move {
1952                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1953                        let mut buf = vec![0u8; 4096];
1954                        let n = stream.read(&mut buf).await.unwrap_or(0);
1955                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
1956
1957                        let method = request.split_whitespace().next().unwrap_or("GET");
1958
1959                        let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
1960                        let response = format!(
1961                            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
1962                            body.len(),
1963                            body
1964                        );
1965                        let _ = stream.write_all(response.as_bytes()).await;
1966                    });
1967                }
1968            }
1969        });
1970
1971        (url, handle)
1972    }
1973
1974    async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
1975        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1976        let addr = listener.local_addr().unwrap();
1977        let url = format!("http://127.0.0.1:{}", addr.port());
1978
1979        let handle = tokio::spawn(async move {
1980            loop {
1981                if let Ok((mut stream, _)) = listener.accept().await {
1982                    let status = status;
1983                    tokio::spawn(async move {
1984                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1985                        let mut buf = vec![0u8; 4096];
1986                        let _ = stream.read(&mut buf).await;
1987
1988                        let status_text = match status {
1989                            404 => "Not Found",
1990                            500 => "Internal Server Error",
1991                            _ => "Error",
1992                        };
1993                        let body = "error body";
1994                        let response = format!(
1995                            "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
1996                            status,
1997                            status_text,
1998                            body.len(),
1999                            body
2000                        );
2001                        let _ = stream.write_all(response.as_bytes()).await;
2002                    });
2003                }
2004            }
2005        });
2006
2007        (url, handle)
2008    }
2009
2010    #[tokio::test]
2011    async fn test_http_producer_get_request() {
2012        use tower::ServiceExt;
2013
2014        let (url, _handle) = start_test_server().await;
2015        let ctx = test_producer_ctx();
2016
2017        let component = HttpComponent::new();
2018        let endpoint_ctx = NoOpComponentContext;
2019        let endpoint = component
2020            .create_endpoint(
2021                &format!("{url}/api/test?allowPrivateIps=true"),
2022                &endpoint_ctx,
2023            )
2024            .unwrap();
2025        let producer = endpoint.create_producer(&ctx).unwrap();
2026
2027        let exchange = Exchange::new(Message::default());
2028        let result = producer.oneshot(exchange).await.unwrap();
2029
2030        let status = result
2031            .input
2032            .header("CamelHttpResponseCode")
2033            .and_then(|v| v.as_u64())
2034            .unwrap();
2035        assert_eq!(status, 200);
2036
2037        assert!(!result.input.body.is_empty());
2038    }
2039
2040    #[tokio::test]
2041    async fn test_http_producer_post_with_body() {
2042        use tower::ServiceExt;
2043
2044        let (url, _handle) = start_test_server().await;
2045        let ctx = test_producer_ctx();
2046
2047        let component = HttpComponent::new();
2048        let endpoint_ctx = NoOpComponentContext;
2049        let endpoint = component
2050            .create_endpoint(
2051                &format!("{url}/api/data?allowPrivateIps=true"),
2052                &endpoint_ctx,
2053            )
2054            .unwrap();
2055        let producer = endpoint.create_producer(&ctx).unwrap();
2056
2057        let exchange = Exchange::new(Message::new("request body"));
2058        let result = producer.oneshot(exchange).await.unwrap();
2059
2060        let status = result
2061            .input
2062            .header("CamelHttpResponseCode")
2063            .and_then(|v| v.as_u64())
2064            .unwrap();
2065        assert_eq!(status, 200);
2066    }
2067
2068    #[tokio::test]
2069    async fn test_http_producer_method_from_header() {
2070        use tower::ServiceExt;
2071
2072        let (url, _handle) = start_test_server().await;
2073        let ctx = test_producer_ctx();
2074
2075        let component = HttpComponent::new();
2076        let endpoint_ctx = NoOpComponentContext;
2077        let endpoint = component
2078            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2079            .unwrap();
2080        let producer = endpoint.create_producer(&ctx).unwrap();
2081
2082        let mut exchange = Exchange::new(Message::default());
2083        exchange.input.set_header(
2084            "CamelHttpMethod",
2085            serde_json::Value::String("DELETE".to_string()),
2086        );
2087
2088        let result = producer.oneshot(exchange).await.unwrap();
2089        let status = result
2090            .input
2091            .header("CamelHttpResponseCode")
2092            .and_then(|v| v.as_u64())
2093            .unwrap();
2094        assert_eq!(status, 200);
2095    }
2096
2097    #[tokio::test]
2098    async fn test_http_producer_forced_method() {
2099        use tower::ServiceExt;
2100
2101        let (url, _handle) = start_test_server().await;
2102        let ctx = test_producer_ctx();
2103
2104        let component = HttpComponent::new();
2105        let endpoint_ctx = NoOpComponentContext;
2106        let endpoint = component
2107            .create_endpoint(
2108                &format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"),
2109                &endpoint_ctx,
2110            )
2111            .unwrap();
2112        let producer = endpoint.create_producer(&ctx).unwrap();
2113
2114        let exchange = Exchange::new(Message::default());
2115        let result = producer.oneshot(exchange).await.unwrap();
2116
2117        let status = result
2118            .input
2119            .header("CamelHttpResponseCode")
2120            .and_then(|v| v.as_u64())
2121            .unwrap();
2122        assert_eq!(status, 200);
2123    }
2124
2125    #[tokio::test]
2126    async fn test_http_producer_throw_exception_on_failure() {
2127        use tower::ServiceExt;
2128
2129        let (url, _handle) = start_status_server(404).await;
2130        let ctx = test_producer_ctx();
2131
2132        let component = HttpComponent::new();
2133        let endpoint_ctx = NoOpComponentContext;
2134        let endpoint = component
2135            .create_endpoint(
2136                &format!("{url}/not-found?allowPrivateIps=true"),
2137                &endpoint_ctx,
2138            )
2139            .unwrap();
2140        let producer = endpoint.create_producer(&ctx).unwrap();
2141
2142        let exchange = Exchange::new(Message::default());
2143        let result = producer.oneshot(exchange).await;
2144        assert!(result.is_err());
2145
2146        match result.unwrap_err() {
2147            CamelError::HttpOperationFailed { status_code, .. } => {
2148                assert_eq!(status_code, 404);
2149            }
2150            e => panic!("Expected HttpOperationFailed, got: {e}"),
2151        }
2152    }
2153
2154    #[tokio::test]
2155    async fn test_http_producer_no_throw_on_failure() {
2156        use tower::ServiceExt;
2157
2158        let (url, _handle) = start_status_server(500).await;
2159        let ctx = test_producer_ctx();
2160
2161        let component = HttpComponent::new();
2162        let endpoint_ctx = NoOpComponentContext;
2163        let endpoint = component
2164            .create_endpoint(
2165                &format!("{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"),
2166                &endpoint_ctx,
2167            )
2168            .unwrap();
2169        let producer = endpoint.create_producer(&ctx).unwrap();
2170
2171        let exchange = Exchange::new(Message::default());
2172        let result = producer.oneshot(exchange).await.unwrap();
2173
2174        let status = result
2175            .input
2176            .header("CamelHttpResponseCode")
2177            .and_then(|v| v.as_u64())
2178            .unwrap();
2179        assert_eq!(status, 500);
2180    }
2181
2182    #[tokio::test]
2183    async fn test_http_producer_uri_override() {
2184        use tower::ServiceExt;
2185
2186        let (url, _handle) = start_test_server().await;
2187        let ctx = test_producer_ctx();
2188
2189        let component = HttpComponent::new();
2190        let endpoint_ctx = NoOpComponentContext;
2191        let endpoint = component
2192            .create_endpoint(
2193                "http://localhost:1/does-not-exist?allowPrivateIps=true",
2194                &endpoint_ctx,
2195            )
2196            .unwrap();
2197        let producer = endpoint.create_producer(&ctx).unwrap();
2198
2199        let mut exchange = Exchange::new(Message::default());
2200        exchange.input.set_header(
2201            "CamelHttpUri",
2202            serde_json::Value::String(format!("{url}/api")),
2203        );
2204
2205        let result = producer.oneshot(exchange).await.unwrap();
2206        let status = result
2207            .input
2208            .header("CamelHttpResponseCode")
2209            .and_then(|v| v.as_u64())
2210            .unwrap();
2211        assert_eq!(status, 200);
2212    }
2213
2214    #[tokio::test]
2215    async fn test_http_producer_response_headers_mapped() {
2216        use tower::ServiceExt;
2217
2218        let (url, _handle) = start_test_server().await;
2219        let ctx = test_producer_ctx();
2220
2221        let component = HttpComponent::new();
2222        let endpoint_ctx = NoOpComponentContext;
2223        let endpoint = component
2224            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2225            .unwrap();
2226        let producer = endpoint.create_producer(&ctx).unwrap();
2227
2228        let exchange = Exchange::new(Message::default());
2229        let result = producer.oneshot(exchange).await.unwrap();
2230
2231        assert!(
2232            result.input.header("Content-Type").is_some(),
2233            "Response should have Content-Type header"
2234        );
2235        assert!(result.input.header("CamelHttpResponseText").is_some());
2236    }
2237
2238    // -----------------------------------------------------------------------
2239    // Bug fix tests: Client configuration per-endpoint
2240    // -----------------------------------------------------------------------
2241
2242    async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
2243        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2244        let addr = listener.local_addr().unwrap();
2245        let url = format!("http://127.0.0.1:{}", addr.port());
2246
2247        let handle = tokio::spawn(async move {
2248            use tokio::io::{AsyncReadExt, AsyncWriteExt};
2249            loop {
2250                if let Ok((mut stream, _)) = listener.accept().await {
2251                    tokio::spawn(async move {
2252                        let mut buf = vec![0u8; 4096];
2253                        let n = stream.read(&mut buf).await.unwrap_or(0);
2254                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
2255
2256                        // Check if this is a request to /final
2257                        if request.contains("GET /final") {
2258                            let body = r#"{"status":"final"}"#;
2259                            let response = format!(
2260                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
2261                                body.len(),
2262                                body
2263                            );
2264                            let _ = stream.write_all(response.as_bytes()).await;
2265                        } else {
2266                            // Redirect to /final
2267                            let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
2268                            let _ = stream.write_all(response.as_bytes()).await;
2269                        }
2270                    });
2271                }
2272            }
2273        });
2274
2275        (url, handle)
2276    }
2277
2278    #[tokio::test]
2279    async fn test_follow_redirects_false_does_not_follow() {
2280        use tower::ServiceExt;
2281
2282        let (url, _handle) = start_redirect_server().await;
2283        let ctx = test_producer_ctx();
2284
2285        let component =
2286            HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
2287        let endpoint_ctx = NoOpComponentContext;
2288        let endpoint = component
2289            .create_endpoint(
2290                &format!("{url}?throwExceptionOnFailure=false&allowPrivateIps=true"),
2291                &endpoint_ctx,
2292            )
2293            .unwrap();
2294        let producer = endpoint.create_producer(&ctx).unwrap();
2295
2296        let exchange = Exchange::new(Message::default());
2297        let result = producer.oneshot(exchange).await.unwrap();
2298
2299        // Should get 302, NOT follow redirect to 200
2300        let status = result
2301            .input
2302            .header("CamelHttpResponseCode")
2303            .and_then(|v| v.as_u64())
2304            .unwrap();
2305        assert_eq!(
2306            status, 302,
2307            "Should NOT follow redirect when followRedirects=false"
2308        );
2309    }
2310
2311    #[tokio::test]
2312    async fn test_follow_redirects_true_follows_redirect() {
2313        use tower::ServiceExt;
2314
2315        let (url, _handle) = start_redirect_server().await;
2316        let ctx = test_producer_ctx();
2317
2318        let component =
2319            HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
2320        let endpoint_ctx = NoOpComponentContext;
2321        let endpoint = component
2322            .create_endpoint(&format!("{url}?allowPrivateIps=true"), &endpoint_ctx)
2323            .unwrap();
2324        let producer = endpoint.create_producer(&ctx).unwrap();
2325
2326        let exchange = Exchange::new(Message::default());
2327        let result = producer.oneshot(exchange).await.unwrap();
2328
2329        // Should follow redirect and get 200
2330        let status = result
2331            .input
2332            .header("CamelHttpResponseCode")
2333            .and_then(|v| v.as_u64())
2334            .unwrap();
2335        assert_eq!(
2336            status, 200,
2337            "Should follow redirect when followRedirects=true"
2338        );
2339    }
2340
2341    #[tokio::test]
2342    async fn test_query_params_forwarded_to_http_request() {
2343        use tower::ServiceExt;
2344
2345        let (url, _handle) = start_test_server().await;
2346        let ctx = test_producer_ctx();
2347
2348        let component = HttpComponent::new();
2349        let endpoint_ctx = NoOpComponentContext;
2350        // apiKey is NOT a Camel option, should be forwarded as query param
2351        let endpoint = component
2352            .create_endpoint(
2353                &format!("{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"),
2354                &endpoint_ctx,
2355            )
2356            .unwrap();
2357        let producer = endpoint.create_producer(&ctx).unwrap();
2358
2359        let exchange = Exchange::new(Message::default());
2360        let result = producer.oneshot(exchange).await.unwrap();
2361
2362        // The test server returns the request info in response
2363        // We just verify it succeeds (the query param was sent)
2364        let status = result
2365            .input
2366            .header("CamelHttpResponseCode")
2367            .and_then(|v| v.as_u64())
2368            .unwrap();
2369        assert_eq!(status, 200);
2370    }
2371
2372    #[tokio::test]
2373    async fn test_non_camel_query_params_are_forwarded() {
2374        // This test verifies Bug #3 fix: non-Camel options should be forwarded
2375        // We'll test the config parsing, not the actual HTTP call
2376        let config = HttpEndpointConfig::from_uri(
2377            "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
2378        )
2379        .unwrap();
2380
2381        // apiKey and token are NOT Camel options, should be forwarded
2382        assert!(
2383            config.query_params.contains_key("apiKey"),
2384            "apiKey should be preserved"
2385        );
2386        assert!(
2387            config.query_params.contains_key("token"),
2388            "token should be preserved"
2389        );
2390        assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
2391        assert_eq!(config.query_params.get("token").unwrap(), "abc456");
2392
2393        // httpMethod IS a Camel option, should NOT be in query_params
2394        assert!(
2395            !config.query_params.contains_key("httpMethod"),
2396            "httpMethod should not be forwarded"
2397        );
2398    }
2399
2400    #[test]
2401    fn test_query_params_are_url_encoded_when_resolving_url() {
2402        let config =
2403            HttpEndpointConfig::from_uri("http://example.com/api?q=hello world&tag=a+b").unwrap();
2404        let exchange = Exchange::new(Message::default());
2405
2406        let url = HttpProducer::resolve_url(&exchange, &config);
2407
2408        assert!(url.contains("q=hello+world"), "url was: {url}");
2409        assert!(url.contains("tag=a%2Bb"), "url was: {url}");
2410    }
2411
2412    // -----------------------------------------------------------------------
2413    // Timeout tests (HTTP-004)
2414    // -----------------------------------------------------------------------
2415
2416    async fn start_slow_server(delay_ms: u64) -> (String, tokio::task::JoinHandle<()>) {
2417        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2418        let addr = listener.local_addr().unwrap();
2419        let url = format!("http://127.0.0.1:{}", addr.port());
2420
2421        let handle = tokio::spawn(async move {
2422            loop {
2423                if let Ok((mut stream, _)) = listener.accept().await {
2424                    let delay = delay_ms;
2425                    tokio::spawn(async move {
2426                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
2427                        let mut buf = vec![0u8; 4096];
2428                        let _ = stream.read(&mut buf).await;
2429                        // Send headers immediately (no Content-Length → chunked)
2430                        let headers = "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nTransfer-Encoding: chunked\r\n\r\n";
2431                        let _ = stream.write_all(headers.as_bytes()).await;
2432                        // Delay before sending body chunk
2433                        tokio::time::sleep(Duration::from_millis(delay)).await;
2434                        let body = r#"{"status":"slow"}"#;
2435                        let chunk = format!("{:x}\r\n{}\r\n0\r\n\r\n", body.len(), body);
2436                        let _ = stream.write_all(chunk.as_bytes()).await;
2437                    });
2438                }
2439            }
2440        });
2441
2442        (url, handle)
2443    }
2444
2445    #[tokio::test]
2446    async fn test_http_producer_timeout() {
2447        use tower::ServiceExt;
2448
2449        // Server delays 500ms, client timeout is 100ms → should timeout
2450        let (url, _handle) = start_slow_server(500).await;
2451        let ctx = test_producer_ctx();
2452
2453        let component = HttpComponent::with_config(
2454            HttpConfig::default()
2455                .with_read_timeout_ms(100)
2456                .with_response_timeout_ms(30_000), // generous response timeout
2457        );
2458        let endpoint_ctx = NoOpComponentContext;
2459        let endpoint = component
2460            .create_endpoint(&format!("{url}/slow?allowPrivateIps=true"), &endpoint_ctx)
2461            .unwrap();
2462        let producer = endpoint.create_producer(&ctx).unwrap();
2463
2464        let exchange = Exchange::new(Message::default());
2465        let result = producer.oneshot(exchange).await;
2466
2467        assert!(result.is_err(), "Expected timeout error, got: {:?}", result);
2468        let err = result.unwrap_err().to_string();
2469        assert!(
2470            err.contains("Read timeout") || err.contains("timeout"),
2471            "Error should mention timeout, got: {}",
2472            err
2473        );
2474    }
2475
2476    #[tokio::test]
2477    async fn test_http_producer_no_timeout_when_fast() {
2478        use tower::ServiceExt;
2479
2480        let (url, _handle) = start_test_server().await;
2481        let ctx = test_producer_ctx();
2482
2483        let component =
2484            HttpComponent::with_config(HttpConfig::default().with_read_timeout_ms(5_000));
2485        let endpoint_ctx = NoOpComponentContext;
2486        let endpoint = component
2487            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2488            .unwrap();
2489        let producer = endpoint.create_producer(&ctx).unwrap();
2490
2491        let exchange = Exchange::new(Message::default());
2492        let result = producer.oneshot(exchange).await.unwrap();
2493
2494        let status = result
2495            .input
2496            .header("CamelHttpResponseCode")
2497            .and_then(|v| v.as_u64())
2498            .unwrap();
2499        assert_eq!(status, 200);
2500    }
2501
2502    // -----------------------------------------------------------------------
2503    // SSRF Protection tests
2504    // -----------------------------------------------------------------------
2505
2506    #[tokio::test]
2507    async fn test_http_producer_blocks_metadata_endpoint() {
2508        use tower::ServiceExt;
2509
2510        let ctx = test_producer_ctx();
2511        let component = HttpComponent::new();
2512        let endpoint_ctx = NoOpComponentContext;
2513        let endpoint = component
2514            .create_endpoint(
2515                "http://example.com/api?allowPrivateIps=false",
2516                &endpoint_ctx,
2517            )
2518            .unwrap();
2519        let producer = endpoint.create_producer(&ctx).unwrap();
2520
2521        let mut exchange = Exchange::new(Message::default());
2522        exchange.input.set_header(
2523            "CamelHttpUri",
2524            serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
2525        );
2526
2527        let result = producer.oneshot(exchange).await;
2528        assert!(result.is_err(), "Should block AWS metadata endpoint");
2529
2530        let err = result.unwrap_err();
2531        assert!(
2532            err.to_string().contains("Private IP"),
2533            "Error should mention private IP blocking, got: {}",
2534            err
2535        );
2536    }
2537
2538    #[test]
2539    fn test_ssrf_config_defaults() {
2540        let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
2541        assert!(
2542            !config.allow_private_ips,
2543            "Private IPs should be blocked by default"
2544        );
2545        assert!(
2546            config.blocked_hosts.is_empty(),
2547            "Blocked hosts should be empty by default"
2548        );
2549    }
2550
2551    #[test]
2552    fn test_ssrf_config_allow_private_ips() {
2553        let config =
2554            HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
2555        assert!(
2556            config.allow_private_ips,
2557            "Private IPs should be allowed when explicitly set"
2558        );
2559    }
2560
2561    #[test]
2562    fn test_ssrf_config_blocked_hosts() {
2563        let config = HttpEndpointConfig::from_uri(
2564            "http://example.com/api?blockedHosts=evil.com,malware.net",
2565        )
2566        .unwrap();
2567        assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
2568    }
2569
2570    #[tokio::test]
2571    async fn test_http_producer_blocks_localhost() {
2572        use tower::ServiceExt;
2573
2574        let ctx = test_producer_ctx();
2575        let component = HttpComponent::new();
2576        let endpoint_ctx = NoOpComponentContext;
2577        let endpoint = component
2578            .create_endpoint("http://example.com/api", &endpoint_ctx)
2579            .unwrap();
2580        let producer = endpoint.create_producer(&ctx).unwrap();
2581
2582        let mut exchange = Exchange::new(Message::default());
2583        exchange.input.set_header(
2584            "CamelHttpUri",
2585            serde_json::Value::String("http://localhost:8080/internal".to_string()),
2586        );
2587
2588        let result = producer.oneshot(exchange).await;
2589        assert!(result.is_err(), "Should block localhost");
2590    }
2591
2592    #[tokio::test]
2593    async fn test_http_producer_blocks_loopback_ip() {
2594        use tower::ServiceExt;
2595
2596        let ctx = test_producer_ctx();
2597        let component = HttpComponent::new();
2598        let endpoint_ctx = NoOpComponentContext;
2599        let endpoint = component
2600            .create_endpoint("http://example.com/api", &endpoint_ctx)
2601            .unwrap();
2602        let producer = endpoint.create_producer(&ctx).unwrap();
2603
2604        let mut exchange = Exchange::new(Message::default());
2605        exchange.input.set_header(
2606            "CamelHttpUri",
2607            serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
2608        );
2609
2610        let result = producer.oneshot(exchange).await;
2611        assert!(result.is_err(), "Should block loopback IP");
2612    }
2613
2614    #[tokio::test]
2615    async fn test_http_producer_allows_private_ip_when_enabled() {
2616        use tower::ServiceExt;
2617
2618        let ctx = test_producer_ctx();
2619        let component = HttpComponent::new();
2620        let endpoint_ctx = NoOpComponentContext;
2621        // With allowPrivateIps=true, the validation should pass
2622        // (actual connection will fail, but that's expected)
2623        let endpoint = component
2624            .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true", &endpoint_ctx)
2625            .unwrap();
2626        let producer = endpoint.create_producer(&ctx).unwrap();
2627
2628        let exchange = Exchange::new(Message::default());
2629
2630        // The request will fail because we can't connect, but it should NOT fail
2631        // due to SSRF protection
2632        let result = producer.oneshot(exchange).await;
2633        // We expect connection error, not SSRF error
2634        if let Err(ref e) = result {
2635            let err_str = e.to_string();
2636            assert!(
2637                !err_str.contains("Private IP") && !err_str.contains("not allowed"),
2638                "Should not be SSRF error, got: {}",
2639                err_str
2640            );
2641        }
2642    }
2643
2644    // -----------------------------------------------------------------------
2645    // HttpServerConfig tests
2646    // -----------------------------------------------------------------------
2647
2648    #[test]
2649    fn test_http_server_config_parse() {
2650        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
2651        assert_eq!(cfg.host, "0.0.0.0");
2652        assert_eq!(cfg.port, 8080);
2653        assert_eq!(cfg.path, "/orders");
2654        assert_eq!(cfg.max_inflight_requests, 1024);
2655    }
2656
2657    #[test]
2658    fn test_http_server_config_scheme() {
2659        // UriConfig trait method returns "http" as primary scheme
2660        assert_eq!(HttpServerConfig::scheme(), "http");
2661    }
2662
2663    #[test]
2664    fn test_http_server_config_from_components() {
2665        // Test from_components directly (trait method)
2666        let components = camel_component_api::UriComponents {
2667            scheme: "https".to_string(),
2668            path: "//0.0.0.0:8443/api".to_string(),
2669            params: std::collections::HashMap::from([
2670                ("maxRequestBody".to_string(), "5242880".to_string()),
2671                ("maxInflightRequests".to_string(), "7".to_string()),
2672            ]),
2673        };
2674        let cfg = HttpServerConfig::from_components(components).unwrap();
2675        assert_eq!(cfg.host, "0.0.0.0");
2676        assert_eq!(cfg.port, 8443);
2677        assert_eq!(cfg.path, "/api");
2678        assert_eq!(cfg.max_request_body, 5242880);
2679        assert_eq!(cfg.max_inflight_requests, 7);
2680    }
2681
2682    #[test]
2683    fn test_http_server_config_default_path() {
2684        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
2685        assert_eq!(cfg.path, "/");
2686    }
2687
2688    #[test]
2689    fn test_http_server_config_wrong_scheme() {
2690        assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
2691    }
2692
2693    #[test]
2694    fn test_http_server_config_invalid_port() {
2695        assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
2696    }
2697
2698    #[test]
2699    fn test_http_server_config_default_port_by_scheme() {
2700        // HTTP without explicit port should default to 80
2701        let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
2702        assert_eq!(cfg_http.port, 80);
2703
2704        // HTTPS without explicit port should default to 443
2705        let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
2706        assert_eq!(cfg_https.port, 443);
2707    }
2708
2709    #[test]
2710    fn test_request_envelope_and_reply_are_send() {
2711        fn assert_send<T: Send>() {}
2712        assert_send::<RequestEnvelope>();
2713        assert_send::<HttpReply>();
2714    }
2715
2716    // -----------------------------------------------------------------------
2717    // ServerRegistry tests
2718    // -----------------------------------------------------------------------
2719
2720    /// Serializes tests that mutate or depend on the global `ServerRegistry`.
2721    ///
2722    /// `ServerRegistry::global()` is a process-wide singleton that persists
2723    /// across tests. `ServerRegistry::reset()` clears ALL entries; if it races
2724    /// with another test that has a live server on a fixed port (e.g. 9991),
2725    /// the registry entry is removed while the OS socket is still bound, so
2726    /// the next `get_or_spawn` call on that port fails with "Address already
2727    /// in use". Holding this mutex for the full body of each affected test
2728    /// prevents the race without requiring `--test-threads=1`.
2729    static REGISTRY_TEST_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
2730
2731    #[test]
2732    fn test_server_registry_global_is_singleton() {
2733        let r1 = ServerRegistry::global();
2734        let r2 = ServerRegistry::global();
2735        assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
2736    }
2737
2738    #[tokio::test]
2739    async fn test_concurrent_get_or_spawn_returns_same_dispatch() {
2740        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2741        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2742        let port = listener.local_addr().unwrap().port();
2743        drop(listener);
2744
2745        let results: Arc<std::sync::Mutex<Vec<DispatchTable>>> =
2746            Arc::new(std::sync::Mutex::new(Vec::new()));
2747
2748        let mut handles = Vec::new();
2749        for _ in 0..4 {
2750            let results = results.clone();
2751            handles.push(tokio::spawn(async move {
2752                let dispatch = ServerRegistry::global()
2753                    .get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2754                    .await
2755                    .unwrap();
2756                results.lock().unwrap().push(dispatch);
2757            }));
2758        }
2759
2760        for h in handles {
2761            h.await.unwrap();
2762        }
2763
2764        let dispatches = results.lock().unwrap();
2765        assert_eq!(dispatches.len(), 4);
2766        for i in 1..dispatches.len() {
2767            assert!(
2768                Arc::ptr_eq(&dispatches[0], &dispatches[i]),
2769                "all concurrent callers should get the same dispatch table"
2770            );
2771        }
2772    }
2773
2774    #[test]
2775    fn test_server_registry_distinguishes_host_and_port() {
2776        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2777        let rt = tokio::runtime::Runtime::new().expect("runtime");
2778        rt.block_on(async {
2779            let registry = ServerRegistry::global();
2780            // Use two distinct host values with same configured port key.
2781            // Port 0 is acceptable here because the registry key uses the configured
2782            // tuple, not the OS-assigned ephemeral port.
2783            let d1 = registry
2784                .get_or_spawn("127.0.0.1", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2785                .await;
2786            let d2 = registry
2787                .get_or_spawn("0.0.0.0", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2788                .await;
2789            assert!(d1.is_ok());
2790            assert!(d2.is_ok());
2791            assert!(!Arc::ptr_eq(&d1.unwrap(), &d2.unwrap()));
2792        });
2793    }
2794
2795    #[tokio::test]
2796    async fn test_shared_server_max_request_body_policy_is_deterministic() {
2797        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2798        let registry = ServerRegistry::global();
2799        // First registration: maxRequestBody = 1 MB
2800        let d1 = registry
2801            .get_or_spawn("127.0.0.1", 9991, 1024 * 1024, 10 * 1024 * 1024, 1024)
2802            .await;
2803        assert!(d1.is_ok());
2804
2805        // Second registration on same (host,port): maxRequestBody = 2 MB
2806        // Expected: explicit EndpointCreationFailed about incompatible maxRequestBody
2807        let d2 = registry
2808            .get_or_spawn("127.0.0.1", 9991, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2809            .await;
2810        assert!(d2.is_err());
2811        let err = d2.unwrap_err();
2812        assert!(
2813            err.to_string().contains("maxRequestBody") || err.to_string().contains("incompatible"),
2814            "Expected incompatible maxRequestBody error, got: {}",
2815            err
2816        );
2817    }
2818
2819    #[test]
2820    fn test_server_registry_reset_clears_entries() {
2821        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2822        let rt = tokio::runtime::Runtime::new().expect("runtime");
2823        rt.block_on(async {
2824            // Register something on a unique port
2825            let d1 = ServerRegistry::global()
2826                .get_or_spawn("127.0.0.1", 9992, 1024 * 1024, 10 * 1024 * 1024, 1024)
2827                .await;
2828            assert!(d1.is_ok());
2829
2830            // Verify entry exists
2831            let guard = ServerRegistry::global().inner.lock().expect("lock");
2832            assert!(guard.contains_key(&("127.0.0.1".to_string(), 9992)));
2833            drop(guard);
2834
2835            // Reset
2836            ServerRegistry::reset();
2837
2838            // Verify cleared
2839            let guard = ServerRegistry::global().inner.lock().expect("lock");
2840            assert!(
2841                guard.is_empty(),
2842                "registry should be empty after reset, has {} entries",
2843                guard.len()
2844            );
2845        });
2846    }
2847
2848    // -----------------------------------------------------------------------
2849    // Axum dispatch handler tests
2850    // -----------------------------------------------------------------------
2851
2852    #[tokio::test]
2853    async fn test_dispatch_handler_returns_404_for_unknown_path() {
2854        let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
2855        // Nothing registered in the dispatch table
2856        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2857        let port = listener.local_addr().unwrap().port();
2858        tokio::spawn(run_axum_server(
2859            listener,
2860            dispatch,
2861            2 * 1024 * 1024,
2862            10 * 1024 * 1024,
2863            Arc::new(tokio::sync::Semaphore::new(1024)),
2864        ));
2865
2866        // Wait for server to start
2867        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2868
2869        let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
2870            .await
2871            .unwrap();
2872        assert_eq!(resp.status().as_u16(), 404);
2873    }
2874
2875    // -----------------------------------------------------------------------
2876    // HttpConsumer tests
2877    // -----------------------------------------------------------------------
2878
2879    #[tokio::test]
2880    async fn test_http_consumer_start_registers_path() {
2881        use camel_component_api::ConsumerContext;
2882
2883        // Get an OS-assigned free port
2884        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2885        let port = listener.local_addr().unwrap().port();
2886        drop(listener); // Release port — ServerRegistry will rebind it
2887
2888        let consumer_cfg = HttpServerConfig {
2889            host: "127.0.0.1".to_string(),
2890            port,
2891            path: "/ping".to_string(),
2892            max_request_body: 2 * 1024 * 1024,
2893            max_response_body: 10 * 1024 * 1024,
2894            max_inflight_requests: 1024,
2895        };
2896        let mut consumer = HttpConsumer::new(consumer_cfg);
2897
2898        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
2899        let token = tokio_util::sync::CancellationToken::new();
2900        let ctx = ConsumerContext::new(tx, token.clone());
2901
2902        tokio::spawn(async move {
2903            consumer.start(ctx).await.unwrap();
2904        });
2905
2906        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2907
2908        let client = reqwest::Client::new();
2909        let resp_future = client
2910            .post(format!("http://127.0.0.1:{port}/ping"))
2911            .body("hello world")
2912            .send();
2913
2914        let (http_result, _) = tokio::join!(resp_future, async {
2915            if let Some(mut envelope) = rx.recv().await {
2916                // Set a custom status code
2917                envelope.exchange.input.set_header(
2918                    "CamelHttpResponseCode",
2919                    serde_json::Value::Number(201.into()),
2920                );
2921                if let Some(reply_tx) = envelope.reply_tx {
2922                    let _ = reply_tx.send(Ok(envelope.exchange));
2923                }
2924            }
2925        });
2926
2927        let resp = http_result.unwrap();
2928        assert_eq!(resp.status().as_u16(), 201);
2929
2930        token.cancel();
2931    }
2932
2933    #[tokio::test]
2934    async fn test_http_consumer_returns_503_when_inflight_limit_reached() {
2935        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2936
2937        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2938        let port = listener.local_addr().unwrap().port();
2939        drop(listener);
2940
2941        let consumer_cfg = HttpServerConfig {
2942            host: "127.0.0.1".to_string(),
2943            port,
2944            path: "/saturation".to_string(),
2945            max_request_body: 2 * 1024 * 1024,
2946            max_response_body: 10 * 1024 * 1024,
2947            max_inflight_requests: 1,
2948        };
2949        let mut consumer = HttpConsumer::new(consumer_cfg);
2950
2951        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2952        let token = tokio_util::sync::CancellationToken::new();
2953        let ctx = ConsumerContext::new(tx, token.clone());
2954        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2955        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2956
2957        let (first_seen_tx, first_seen_rx) = tokio::sync::oneshot::channel::<()>();
2958        let (unblock_first_tx, unblock_first_rx) = tokio::sync::oneshot::channel::<()>();
2959
2960        tokio::spawn(async move {
2961            let mut first_seen_tx = Some(first_seen_tx);
2962            let mut unblock_first_rx = Some(unblock_first_rx);
2963
2964            while let Some(envelope) = rx.recv().await {
2965                if let Some(tx) = first_seen_tx.take() {
2966                    let _ = tx.send(());
2967                    if let Some(rx_unblock) = unblock_first_rx.take() {
2968                        let _ = rx_unblock.await;
2969                    }
2970                }
2971
2972                if let Some(reply_tx) = envelope.reply_tx {
2973                    let _ = reply_tx.send(Ok(envelope.exchange));
2974                }
2975            }
2976        });
2977
2978        let client = reqwest::Client::new();
2979        let first_req = {
2980            let client = client.clone();
2981            async move {
2982                client
2983                    .get(format!("http://127.0.0.1:{port}/saturation"))
2984                    .send()
2985                    .await
2986                    .unwrap()
2987            }
2988        };
2989
2990        let first_handle = tokio::spawn(first_req);
2991        first_seen_rx.await.unwrap();
2992
2993        let second_resp = client
2994            .get(format!("http://127.0.0.1:{port}/saturation"))
2995            .send()
2996            .await
2997            .unwrap();
2998
2999        assert_eq!(second_resp.status().as_u16(), 503);
3000
3001        let _ = unblock_first_tx.send(());
3002        let first_resp = first_handle.await.unwrap();
3003        assert_eq!(first_resp.status().as_u16(), 200);
3004
3005        token.cancel();
3006    }
3007
3008    #[tokio::test]
3009    async fn test_http_consumer_enforces_max_response_body_for_bytes() {
3010        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3011
3012        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3013        let port = listener.local_addr().unwrap().port();
3014        drop(listener);
3015
3016        let consumer_cfg = HttpServerConfig {
3017            host: "127.0.0.1".to_string(),
3018            port,
3019            path: "/limit-bytes".to_string(),
3020            max_request_body: 2 * 1024 * 1024,
3021            max_response_body: 16,
3022            max_inflight_requests: 1024,
3023        };
3024        let mut consumer = HttpConsumer::new(consumer_cfg);
3025
3026        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3027        let token = tokio_util::sync::CancellationToken::new();
3028        let ctx = ConsumerContext::new(tx, token.clone());
3029        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3030        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3031
3032        let client = reqwest::Client::new();
3033        let send_fut = client
3034            .get(format!("http://127.0.0.1:{port}/limit-bytes"))
3035            .send();
3036
3037        let (http_result, _) = tokio::join!(send_fut, async {
3038            if let Some(mut envelope) = rx.recv().await {
3039                envelope.exchange.input.body =
3040                    camel_component_api::Body::Bytes(bytes::Bytes::from(vec![b'x'; 32]));
3041                if let Some(reply_tx) = envelope.reply_tx {
3042                    let _ = reply_tx.send(Ok(envelope.exchange));
3043                }
3044            }
3045        });
3046
3047        let resp = http_result.unwrap();
3048        assert_eq!(resp.status().as_u16(), 500);
3049        let body = resp.text().await.unwrap();
3050        assert_eq!(body, "Response body exceeds configured limit");
3051        token.cancel();
3052    }
3053
3054    #[tokio::test]
3055    async fn test_http_consumer_enforces_max_response_body_for_json() {
3056        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3057
3058        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3059        let port = listener.local_addr().unwrap().port();
3060        drop(listener);
3061
3062        let consumer_cfg = HttpServerConfig {
3063            host: "127.0.0.1".to_string(),
3064            port,
3065            path: "/limit-json".to_string(),
3066            max_request_body: 2 * 1024 * 1024,
3067            max_response_body: 16,
3068            max_inflight_requests: 1024,
3069        };
3070        let mut consumer = HttpConsumer::new(consumer_cfg);
3071
3072        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3073        let token = tokio_util::sync::CancellationToken::new();
3074        let ctx = ConsumerContext::new(tx, token.clone());
3075        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3076        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3077
3078        let client = reqwest::Client::new();
3079        let send_fut = client
3080            .get(format!("http://127.0.0.1:{port}/limit-json"))
3081            .send();
3082
3083        let (http_result, _) = tokio::join!(send_fut, async {
3084            if let Some(mut envelope) = rx.recv().await {
3085                envelope.exchange.input.body = camel_component_api::Body::Json(
3086                    serde_json::json!({"message":"this response is bigger than sixteen"}),
3087                );
3088                if let Some(reply_tx) = envelope.reply_tx {
3089                    let _ = reply_tx.send(Ok(envelope.exchange));
3090                }
3091            }
3092        });
3093
3094        let resp = http_result.unwrap();
3095        assert_eq!(resp.status().as_u16(), 500);
3096        let body = resp.text().await.unwrap();
3097        assert_eq!(body, "Response body exceeds configured limit");
3098        token.cancel();
3099    }
3100
3101    #[tokio::test]
3102    async fn test_http_consumer_enforces_max_response_body_for_xml() {
3103        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3104
3105        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3106        let port = listener.local_addr().unwrap().port();
3107        drop(listener);
3108
3109        let consumer_cfg = HttpServerConfig {
3110            host: "127.0.0.1".to_string(),
3111            port,
3112            path: "/limit-xml".to_string(),
3113            max_request_body: 2 * 1024 * 1024,
3114            max_response_body: 16,
3115            max_inflight_requests: 1024,
3116        };
3117        let mut consumer = HttpConsumer::new(consumer_cfg);
3118
3119        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3120        let token = tokio_util::sync::CancellationToken::new();
3121        let ctx = ConsumerContext::new(tx, token.clone());
3122        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3123        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3124
3125        let client = reqwest::Client::new();
3126        let send_fut = client
3127            .get(format!("http://127.0.0.1:{port}/limit-xml"))
3128            .send();
3129
3130        let (http_result, _) = tokio::join!(send_fut, async {
3131            if let Some(mut envelope) = rx.recv().await {
3132                envelope.exchange.input.body = camel_component_api::Body::Xml(
3133                    "<root><value>way-too-large</value></root>".into(),
3134                );
3135                if let Some(reply_tx) = envelope.reply_tx {
3136                    let _ = reply_tx.send(Ok(envelope.exchange));
3137                }
3138            }
3139        });
3140
3141        let resp = http_result.unwrap();
3142        assert_eq!(resp.status().as_u16(), 500);
3143        let body = resp.text().await.unwrap();
3144        assert_eq!(body, "Response body exceeds configured limit");
3145        token.cancel();
3146    }
3147
3148    #[tokio::test]
3149    async fn test_http_consumer_does_not_enforce_max_response_body_for_stream() {
3150        use camel_component_api::{
3151            CamelError, ConsumerContext, ExchangeEnvelope, StreamBody, StreamMetadata,
3152        };
3153        use futures::stream;
3154
3155        let listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();
3156        let port = listener.local_addr().unwrap().port();
3157        drop(listener);
3158
3159        let consumer_cfg = HttpServerConfig {
3160            host: "0.0.0.0".to_string(),
3161            port,
3162            path: "/limit-stream".to_string(),
3163            max_request_body: 2 * 1024 * 1024,
3164            max_response_body: 16,
3165            max_inflight_requests: 1024,
3166        };
3167        let mut consumer = HttpConsumer::new(consumer_cfg);
3168
3169        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3170        let token = tokio_util::sync::CancellationToken::new();
3171        let ctx = ConsumerContext::new(tx, token.clone());
3172        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3173        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3174
3175        let client = reqwest::Client::new();
3176        let send_fut = client
3177            .get(format!("http://127.0.0.1:{port}/limit-stream"))
3178            .send();
3179
3180        let (http_result, _) = tokio::join!(send_fut, async {
3181            if let Some(mut envelope) = rx.recv().await {
3182                let chunks: Vec<Result<bytes::Bytes, CamelError>> =
3183                    vec![Ok(bytes::Bytes::from(vec![b'x'; 32]))];
3184                let stream = Box::pin(stream::iter(chunks));
3185                envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
3186                    stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
3187                    metadata: StreamMetadata {
3188                        size_hint: Some(32),
3189                        content_type: Some("application/octet-stream".into()),
3190                        origin: None,
3191                    },
3192                });
3193                if let Some(reply_tx) = envelope.reply_tx {
3194                    let _ = reply_tx.send(Ok(envelope.exchange));
3195                }
3196            }
3197        });
3198
3199        let resp = http_result.unwrap();
3200        assert_eq!(resp.status().as_u16(), 200);
3201        let body = resp.bytes().await.unwrap();
3202        assert_eq!(body.len(), 32);
3203        token.cancel();
3204    }
3205
3206    // -----------------------------------------------------------------------
3207    // Integration tests
3208    // -----------------------------------------------------------------------
3209
3210    #[tokio::test]
3211    async fn test_integration_single_consumer_round_trip() {
3212        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3213
3214        // Get an OS-assigned free port (ephemeral)
3215        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3216        let port = listener.local_addr().unwrap().port();
3217        drop(listener); // Release — ServerRegistry will rebind
3218
3219        let component = HttpComponent::new();
3220        let endpoint_ctx = NoOpComponentContext;
3221        let endpoint = component
3222            .create_endpoint(&format!("http://127.0.0.1:{port}/echo"), &endpoint_ctx)
3223            .unwrap();
3224        let mut consumer = endpoint.create_consumer().unwrap();
3225
3226        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3227        let token = tokio_util::sync::CancellationToken::new();
3228        let ctx = ConsumerContext::new(tx, token.clone());
3229
3230        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3231        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3232
3233        let client = reqwest::Client::new();
3234        let send_fut = client
3235            .post(format!("http://127.0.0.1:{port}/echo"))
3236            .header("Content-Type", "text/plain")
3237            .body("ping")
3238            .send();
3239
3240        let (http_result, _) = tokio::join!(send_fut, async {
3241            if let Some(mut envelope) = rx.recv().await {
3242                assert_eq!(
3243                    envelope.exchange.input.header("CamelHttpMethod"),
3244                    Some(&serde_json::Value::String("POST".into()))
3245                );
3246                assert_eq!(
3247                    envelope.exchange.input.header("CamelHttpPath"),
3248                    Some(&serde_json::Value::String("/echo".into()))
3249                );
3250                envelope.exchange.input.body = camel_component_api::Body::Text("pong".to_string());
3251                if let Some(reply_tx) = envelope.reply_tx {
3252                    let _ = reply_tx.send(Ok(envelope.exchange));
3253                }
3254            }
3255        });
3256
3257        let resp = http_result.unwrap();
3258        assert_eq!(resp.status().as_u16(), 200);
3259        let body = resp.text().await.unwrap();
3260        assert_eq!(body, "pong");
3261
3262        token.cancel();
3263    }
3264
3265    #[tokio::test]
3266    async fn test_integration_two_consumers_shared_port() {
3267        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3268
3269        // Get an OS-assigned free port (ephemeral)
3270        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3271        let port = listener.local_addr().unwrap().port();
3272        drop(listener);
3273
3274        let component = HttpComponent::new();
3275        let endpoint_ctx = NoOpComponentContext;
3276
3277        // Consumer A: /hello
3278        let endpoint_a = component
3279            .create_endpoint(&format!("http://127.0.0.1:{port}/hello"), &endpoint_ctx)
3280            .unwrap();
3281        let mut consumer_a = endpoint_a.create_consumer().unwrap();
3282
3283        // Consumer B: /world
3284        let endpoint_b = component
3285            .create_endpoint(&format!("http://127.0.0.1:{port}/world"), &endpoint_ctx)
3286            .unwrap();
3287        let mut consumer_b = endpoint_b.create_consumer().unwrap();
3288
3289        let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3290        let token_a = tokio_util::sync::CancellationToken::new();
3291        let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
3292
3293        let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3294        let token_b = tokio_util::sync::CancellationToken::new();
3295        let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
3296
3297        tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
3298        tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
3299        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3300
3301        let client = reqwest::Client::new();
3302
3303        // Request to /hello
3304        let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
3305        let (resp_hello, _) = tokio::join!(fut_hello, async {
3306            if let Some(mut envelope) = rx_a.recv().await {
3307                envelope.exchange.input.body =
3308                    camel_component_api::Body::Text("hello-response".to_string());
3309                if let Some(reply_tx) = envelope.reply_tx {
3310                    let _ = reply_tx.send(Ok(envelope.exchange));
3311                }
3312            }
3313        });
3314
3315        // Request to /world
3316        let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
3317        let (resp_world, _) = tokio::join!(fut_world, async {
3318            if let Some(mut envelope) = rx_b.recv().await {
3319                envelope.exchange.input.body =
3320                    camel_component_api::Body::Text("world-response".to_string());
3321                if let Some(reply_tx) = envelope.reply_tx {
3322                    let _ = reply_tx.send(Ok(envelope.exchange));
3323                }
3324            }
3325        });
3326
3327        let body_a = resp_hello.unwrap().text().await.unwrap();
3328        let body_b = resp_world.unwrap().text().await.unwrap();
3329
3330        assert_eq!(body_a, "hello-response");
3331        assert_eq!(body_b, "world-response");
3332
3333        token_a.cancel();
3334        token_b.cancel();
3335    }
3336
3337    #[tokio::test]
3338    async fn test_integration_unregistered_path_returns_404() {
3339        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3340
3341        // Get an OS-assigned free port (ephemeral)
3342        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3343        let port = listener.local_addr().unwrap().port();
3344        drop(listener);
3345
3346        let component = HttpComponent::new();
3347        let endpoint_ctx = NoOpComponentContext;
3348        let endpoint = component
3349            .create_endpoint(
3350                &format!("http://127.0.0.1:{port}/registered"),
3351                &endpoint_ctx,
3352            )
3353            .unwrap();
3354        let mut consumer = endpoint.create_consumer().unwrap();
3355
3356        let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3357        let token = tokio_util::sync::CancellationToken::new();
3358        let ctx = ConsumerContext::new(tx, token.clone());
3359
3360        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3361
3362        // Wait until the server is actually accepting connections (CI runners can be slow).
3363        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
3364        loop {
3365            if tokio::net::TcpStream::connect(format!("127.0.0.1:{port}"))
3366                .await
3367                .is_ok()
3368            {
3369                break;
3370            }
3371            if std::time::Instant::now() >= deadline {
3372                panic!("HTTP server did not start within 5s on port {port}");
3373            }
3374            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3375        }
3376
3377        let client = reqwest::Client::new();
3378        let resp = client
3379            .get(format!("http://127.0.0.1:{port}/not-there"))
3380            .send()
3381            .await
3382            .unwrap();
3383        assert_eq!(resp.status().as_u16(), 404);
3384
3385        token.cancel();
3386    }
3387
3388    #[test]
3389    fn test_http_consumer_declares_concurrent() {
3390        use camel_component_api::ConcurrencyModel;
3391
3392        let config = HttpServerConfig {
3393            host: "127.0.0.1".to_string(),
3394            port: 19999,
3395            path: "/test".to_string(),
3396            max_request_body: 2 * 1024 * 1024,
3397            max_response_body: 10 * 1024 * 1024,
3398            max_inflight_requests: 1024,
3399        };
3400        let consumer = HttpConsumer::new(config);
3401        assert_eq!(
3402            consumer.concurrency_model(),
3403            ConcurrencyModel::Concurrent { max: None }
3404        );
3405    }
3406
3407    // -----------------------------------------------------------------------
3408    // HttpReplyBody streaming tests
3409    // -----------------------------------------------------------------------
3410
3411    #[tokio::test]
3412    async fn test_http_reply_body_stream_variant_exists() {
3413        use bytes::Bytes;
3414        use camel_component_api::CamelError;
3415        use futures::stream;
3416
3417        let chunks: Vec<Result<Bytes, CamelError>> =
3418            vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
3419        let stream = Box::pin(stream::iter(chunks));
3420        let reply_body = HttpReplyBody::Stream(stream);
3421        // Si compila y el match funciona, el test pasa
3422        match reply_body {
3423            HttpReplyBody::Stream(_) => {}
3424            HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
3425        }
3426    }
3427
3428    // -----------------------------------------------------------------------
3429    // OpenTelemetry propagation tests (only compiled with "otel" feature)
3430    // -----------------------------------------------------------------------
3431
3432    #[cfg(feature = "otel")]
3433    mod otel_tests {
3434        use super::*;
3435        use camel_component_api::Message;
3436        use tower::ServiceExt;
3437
3438        #[tokio::test]
3439        async fn test_producer_injects_traceparent_header() {
3440            let (url, _handle) = start_test_server_with_header_capture().await;
3441            let ctx = test_producer_ctx();
3442
3443            let component = HttpComponent::new();
3444            let endpoint_ctx = NoOpComponentContext;
3445            let endpoint = component
3446                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3447                .unwrap();
3448            let producer = endpoint.create_producer(&ctx).unwrap();
3449
3450            // Create exchange with an OTel context by extracting from a traceparent header
3451            let mut exchange = Exchange::new(Message::default());
3452            let mut headers = std::collections::HashMap::new();
3453            headers.insert(
3454                "traceparent".to_string(),
3455                "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
3456            );
3457            camel_otel::extract_into_exchange(&mut exchange, &headers);
3458
3459            let result = producer.oneshot(exchange).await.unwrap();
3460
3461            // Verify request succeeded
3462            let status = result
3463                .input
3464                .header("CamelHttpResponseCode")
3465                .and_then(|v| v.as_u64())
3466                .unwrap();
3467            assert_eq!(status, 200);
3468
3469            // The test server echoes back the received traceparent header
3470            let traceparent = result.input.header("X-Received-Traceparent");
3471            assert!(
3472                traceparent.is_some(),
3473                "traceparent header should have been sent"
3474            );
3475
3476            let traceparent_str = traceparent.unwrap().as_str().unwrap();
3477            // Verify format: version-traceid-spanid-flags
3478            let parts: Vec<&str> = traceparent_str.split('-').collect();
3479            assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3480            assert_eq!(parts[0], "00", "version should be 00");
3481            assert_eq!(
3482                parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3483                "trace-id should match"
3484            );
3485            assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
3486            assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
3487        }
3488
3489        #[tokio::test]
3490        async fn test_consumer_extracts_traceparent_header() {
3491            use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3492
3493            // Get an OS-assigned free port
3494            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3495            let port = listener.local_addr().unwrap().port();
3496            drop(listener);
3497
3498            let component = HttpComponent::new();
3499            let endpoint_ctx = NoOpComponentContext;
3500            let endpoint = component
3501                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
3502                .unwrap();
3503            let mut consumer = endpoint.create_consumer().unwrap();
3504
3505            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3506            let token = tokio_util::sync::CancellationToken::new();
3507            let ctx = ConsumerContext::new(tx, token.clone());
3508
3509            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3510            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3511
3512            // Send request with traceparent header
3513            let client = reqwest::Client::new();
3514            let send_fut = client
3515                .post(format!("http://127.0.0.1:{port}/trace"))
3516                .header(
3517                    "traceparent",
3518                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3519                )
3520                .body("test")
3521                .send();
3522
3523            let (http_result, _) = tokio::join!(send_fut, async {
3524                if let Some(envelope) = rx.recv().await {
3525                    // Verify the exchange has a valid OTel context by re-injecting it
3526                    // and checking the traceparent matches
3527                    let mut injected_headers = std::collections::HashMap::new();
3528                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3529
3530                    assert!(
3531                        injected_headers.contains_key("traceparent"),
3532                        "Exchange should have traceparent after extraction"
3533                    );
3534
3535                    let traceparent = injected_headers.get("traceparent").unwrap();
3536                    let parts: Vec<&str> = traceparent.split('-').collect();
3537                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3538                    assert_eq!(
3539                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3540                        "Trace ID should match the original traceparent header"
3541                    );
3542
3543                    if let Some(reply_tx) = envelope.reply_tx {
3544                        let _ = reply_tx.send(Ok(envelope.exchange));
3545                    }
3546                }
3547            });
3548
3549            let resp = http_result.unwrap();
3550            assert_eq!(resp.status().as_u16(), 200);
3551
3552            token.cancel();
3553        }
3554
3555        #[tokio::test]
3556        async fn test_consumer_extracts_mixed_case_traceparent_header() {
3557            use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3558
3559            // Get an OS-assigned free port
3560            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3561            let port = listener.local_addr().unwrap().port();
3562            drop(listener);
3563
3564            let component = HttpComponent::new();
3565            let endpoint_ctx = NoOpComponentContext;
3566            let endpoint = component
3567                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
3568                .unwrap();
3569            let mut consumer = endpoint.create_consumer().unwrap();
3570
3571            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3572            let token = tokio_util::sync::CancellationToken::new();
3573            let ctx = ConsumerContext::new(tx, token.clone());
3574
3575            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3576            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3577
3578            // Send request with MIXED-CASE TraceParent header (not lowercase)
3579            let client = reqwest::Client::new();
3580            let send_fut = client
3581                .post(format!("http://127.0.0.1:{port}/trace"))
3582                .header(
3583                    "TraceParent",
3584                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3585                )
3586                .body("test")
3587                .send();
3588
3589            let (http_result, _) = tokio::join!(send_fut, async {
3590                if let Some(envelope) = rx.recv().await {
3591                    // Verify the exchange has a valid OTel context by re-injecting it
3592                    // and checking the traceparent matches
3593                    let mut injected_headers = HashMap::new();
3594                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3595
3596                    assert!(
3597                        injected_headers.contains_key("traceparent"),
3598                        "Exchange should have traceparent after extraction from mixed-case header"
3599                    );
3600
3601                    let traceparent = injected_headers.get("traceparent").unwrap();
3602                    let parts: Vec<&str> = traceparent.split('-').collect();
3603                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3604                    assert_eq!(
3605                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3606                        "Trace ID should match the original mixed-case TraceParent header"
3607                    );
3608
3609                    if let Some(reply_tx) = envelope.reply_tx {
3610                        let _ = reply_tx.send(Ok(envelope.exchange));
3611                    }
3612                }
3613            });
3614
3615            let resp = http_result.unwrap();
3616            assert_eq!(resp.status().as_u16(), 200);
3617
3618            token.cancel();
3619        }
3620
3621        #[tokio::test]
3622        async fn test_producer_no_trace_context_no_crash() {
3623            let (url, _handle) = start_test_server().await;
3624            let ctx = test_producer_ctx();
3625
3626            let component = HttpComponent::new();
3627            let endpoint_ctx = NoOpComponentContext;
3628            let endpoint = component
3629                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3630                .unwrap();
3631            let producer = endpoint.create_producer(&ctx).unwrap();
3632
3633            // Create exchange with default (empty) otel_context - no trace context
3634            let exchange = Exchange::new(Message::default());
3635
3636            // Should succeed without panic
3637            let result = producer.oneshot(exchange).await.unwrap();
3638
3639            // Verify request succeeded
3640            let status = result
3641                .input
3642                .header("CamelHttpResponseCode")
3643                .and_then(|v| v.as_u64())
3644                .unwrap();
3645            assert_eq!(status, 200);
3646        }
3647
3648        /// Test server that captures and echoes back the traceparent header
3649        async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
3650            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3651            let addr = listener.local_addr().unwrap();
3652            let url = format!("http://127.0.0.1:{}", addr.port());
3653
3654            let handle = tokio::spawn(async move {
3655                loop {
3656                    if let Ok((mut stream, _)) = listener.accept().await {
3657                        tokio::spawn(async move {
3658                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
3659                            let mut buf = vec![0u8; 8192];
3660                            let n = stream.read(&mut buf).await.unwrap_or(0);
3661                            let request = String::from_utf8_lossy(&buf[..n]).to_string();
3662
3663                            // Extract traceparent header from request
3664                            let traceparent = request
3665                                .lines()
3666                                .find(|line| line.to_lowercase().starts_with("traceparent:"))
3667                                .map(|line| {
3668                                    line.split(':')
3669                                        .nth(1)
3670                                        .map(|s| s.trim().to_string())
3671                                        .unwrap_or_default()
3672                                })
3673                                .unwrap_or_default();
3674
3675                            let body =
3676                                format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
3677                            let response = format!(
3678                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
3679                                body.len(),
3680                                traceparent,
3681                                body
3682                            );
3683                            let _ = stream.write_all(response.as_bytes()).await;
3684                        });
3685                    }
3686                }
3687            });
3688
3689            (url, handle)
3690        }
3691    }
3692
3693    // -----------------------------------------------------------------------
3694    // Response streaming tests (Eje A - Task 2)
3695    // -----------------------------------------------------------------------
3696
3697    // -----------------------------------------------------------------------
3698    // Request streaming tests (Eje B - Task 3)
3699    // -----------------------------------------------------------------------
3700
3701    #[tokio::test]
3702    async fn test_request_body_arrives_as_stream() {
3703        use camel_component_api::Body;
3704        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3705
3706        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3707        let port = listener.local_addr().unwrap().port();
3708        drop(listener);
3709
3710        let component = HttpComponent::new();
3711        let endpoint_ctx = NoOpComponentContext;
3712        let endpoint = component
3713            .create_endpoint(&format!("http://127.0.0.1:{port}/upload"), &endpoint_ctx)
3714            .unwrap();
3715        let mut consumer = endpoint.create_consumer().unwrap();
3716
3717        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3718        let token = tokio_util::sync::CancellationToken::new();
3719        let ctx = ConsumerContext::new(tx, token.clone());
3720
3721        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3722        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3723
3724        let client = reqwest::Client::new();
3725        let send_fut = client
3726            .post(format!("http://127.0.0.1:{port}/upload"))
3727            .body("hello streaming world")
3728            .send();
3729
3730        let (http_result, _) = tokio::join!(send_fut, async {
3731            if let Some(mut envelope) = rx.recv().await {
3732                // Body must be Body::Stream, not Body::Text or Body::Bytes
3733                assert!(
3734                    matches!(envelope.exchange.input.body, Body::Stream(_)),
3735                    "expected Body::Stream, got discriminant {:?}",
3736                    std::mem::discriminant(&envelope.exchange.input.body)
3737                );
3738                // Materialize to verify content
3739                let bytes = envelope
3740                    .exchange
3741                    .input
3742                    .body
3743                    .into_bytes(1024 * 1024)
3744                    .await
3745                    .unwrap();
3746                assert_eq!(&bytes[..], b"hello streaming world");
3747
3748                envelope.exchange.input.body = camel_component_api::Body::Empty;
3749                if let Some(reply_tx) = envelope.reply_tx {
3750                    let _ = reply_tx.send(Ok(envelope.exchange));
3751                }
3752            }
3753        });
3754
3755        let resp = http_result.unwrap();
3756        assert_eq!(resp.status().as_u16(), 200);
3757
3758        token.cancel();
3759    }
3760
3761    // -----------------------------------------------------------------------
3762    // Response streaming tests (Eje A - Task 2)
3763    // -----------------------------------------------------------------------
3764
3765    #[tokio::test]
3766    async fn test_streaming_response_chunked() {
3767        use bytes::Bytes;
3768        use camel_component_api::Body;
3769        use camel_component_api::CamelError;
3770        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3771        use camel_component_api::{StreamBody, StreamMetadata};
3772        use futures::stream;
3773        use std::sync::Arc;
3774        use tokio::sync::Mutex;
3775
3776        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3777        let port = listener.local_addr().unwrap().port();
3778        drop(listener);
3779
3780        let component = HttpComponent::new();
3781        let endpoint_ctx = NoOpComponentContext;
3782        let endpoint = component
3783            .create_endpoint(&format!("http://127.0.0.1:{port}/stream"), &endpoint_ctx)
3784            .unwrap();
3785        let mut consumer = endpoint.create_consumer().unwrap();
3786
3787        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3788        let token = tokio_util::sync::CancellationToken::new();
3789        let ctx = ConsumerContext::new(tx, token.clone());
3790
3791        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3792        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3793
3794        let client = reqwest::Client::new();
3795        let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
3796
3797        let (http_result, _) = tokio::join!(send_fut, async {
3798            if let Some(mut envelope) = rx.recv().await {
3799                // Respond with Body::Stream
3800                let chunks: Vec<Result<Bytes, CamelError>> =
3801                    vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
3802                let stream = Box::pin(stream::iter(chunks));
3803                envelope.exchange.input.body = Body::Stream(StreamBody {
3804                    stream: Arc::new(Mutex::new(Some(stream))),
3805                    metadata: StreamMetadata::default(),
3806                });
3807                if let Some(reply_tx) = envelope.reply_tx {
3808                    let _ = reply_tx.send(Ok(envelope.exchange));
3809                }
3810            }
3811        });
3812
3813        let resp = http_result.unwrap();
3814        assert_eq!(resp.status().as_u16(), 200);
3815        let body = resp.text().await.unwrap();
3816        assert_eq!(body, "chunk1chunk2");
3817
3818        token.cancel();
3819    }
3820
3821    // -----------------------------------------------------------------------
3822    // 413 Content-Length limit test (Task 4)
3823    // -----------------------------------------------------------------------
3824
3825    #[tokio::test]
3826    async fn test_413_when_content_length_exceeds_limit() {
3827        use camel_component_api::ConsumerContext;
3828
3829        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3830        let port = listener.local_addr().unwrap().port();
3831        drop(listener);
3832
3833        // maxRequestBody=100 — any request declaring more than 100 bytes must get 413
3834        let component = HttpComponent::new();
3835        let endpoint_ctx = NoOpComponentContext;
3836        let endpoint = component
3837            .create_endpoint(
3838                &format!("http://127.0.0.1:{port}/upload?maxRequestBody=100"),
3839                &endpoint_ctx,
3840            )
3841            .unwrap();
3842        let mut consumer = endpoint.create_consumer().unwrap();
3843
3844        let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3845        let token = tokio_util::sync::CancellationToken::new();
3846        let ctx = ConsumerContext::new(tx, token.clone());
3847
3848        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3849        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3850
3851        let client = reqwest::Client::new();
3852        let resp = client
3853            .post(format!("http://127.0.0.1:{port}/upload"))
3854            .header("Content-Length", "1000") // declares 1000 bytes, limit is 100
3855            .body("x".repeat(1000))
3856            .send()
3857            .await
3858            .unwrap();
3859
3860        assert_eq!(resp.status().as_u16(), 413);
3861
3862        token.cancel();
3863    }
3864
3865    /// Chunked upload without Content-Length header must NOT be rejected by maxRequestBody.
3866    /// The spec says: "If there is no Content-Length, the limit does not apply at the
3867    /// consumer level — the route is responsible."
3868    #[tokio::test]
3869    async fn test_chunked_upload_without_content_length_bypasses_limit() {
3870        use bytes::Bytes;
3871        use camel_component_api::Body;
3872        use camel_component_api::ConsumerContext;
3873        use futures::stream;
3874
3875        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3876        let port = listener.local_addr().unwrap().port();
3877        drop(listener);
3878
3879        // maxRequestBody=10 — very small limit; chunked uploads have no Content-Length
3880        let component = HttpComponent::new();
3881        let endpoint_ctx = NoOpComponentContext;
3882        let endpoint = component
3883            .create_endpoint(
3884                &format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"),
3885                &endpoint_ctx,
3886            )
3887            .unwrap();
3888        let mut consumer = endpoint.create_consumer().unwrap();
3889
3890        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3891        let token = tokio_util::sync::CancellationToken::new();
3892        let ctx = ConsumerContext::new(tx, token.clone());
3893
3894        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3895        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3896
3897        let client = reqwest::Client::new();
3898
3899        // Use wrap_stream so reqwest sends chunked transfer encoding WITHOUT a
3900        // Content-Length header. 100 bytes exceeds the 10-byte maxRequestBody limit,
3901        // but since there's no Content-Length the 413 check must NOT fire.
3902        let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
3903            Ok(Bytes::from("y".repeat(50))),
3904            Ok(Bytes::from("y".repeat(50))),
3905        ];
3906        let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
3907        let send_fut = client
3908            .post(format!("http://127.0.0.1:{port}/upload"))
3909            .body(stream_body)
3910            .send();
3911
3912        let consumer_fut = async {
3913            // Use timeout to avoid deadlock if the handler rejects before enqueueing
3914            match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
3915                Ok(Some(mut envelope)) => {
3916                    assert!(
3917                        matches!(envelope.exchange.input.body, Body::Stream(_)),
3918                        "expected Body::Stream"
3919                    );
3920                    envelope.exchange.input.body = camel_component_api::Body::Empty;
3921                    if let Some(reply_tx) = envelope.reply_tx {
3922                        let _ = reply_tx.send(Ok(envelope.exchange));
3923                    }
3924                }
3925                Ok(None) => panic!("consumer channel closed unexpectedly"),
3926                Err(_) => {
3927                    // Timeout: the request was rejected before reaching the consumer.
3928                    // The HTTP response will carry the real status code (we check below).
3929                }
3930            }
3931        };
3932
3933        let (http_result, _) = tokio::join!(send_fut, consumer_fut);
3934
3935        let resp = http_result.unwrap();
3936        // Must NOT be 413; chunked uploads without Content-Length bypass the limit.
3937        assert_ne!(
3938            resp.status().as_u16(),
3939            413,
3940            "chunked upload must not be rejected by maxRequestBody"
3941        );
3942        assert_eq!(resp.status().as_u16(), 200);
3943
3944        token.cancel();
3945    }
3946
3947    #[test]
3948    fn test_validate_url_for_ssrf_blocks_and_allows_hosts() {
3949        let mut cfg = HttpEndpointConfig::from_uri("http://example.com").unwrap();
3950        cfg.blocked_hosts = vec!["blocked.local".to_string()];
3951        cfg.allow_private_ips = false;
3952
3953        let blocked = validate_url_for_ssrf("http://blocked.local/api", &cfg);
3954        assert!(blocked.is_err());
3955
3956        let private_ip = validate_url_for_ssrf("http://127.0.0.1/api", &cfg);
3957        assert!(private_ip.is_err());
3958
3959        cfg.allow_private_ips = true;
3960        let allowed = validate_url_for_ssrf("http://127.0.0.1/api", &cfg);
3961        assert!(allowed.is_ok());
3962    }
3963
3964    #[test]
3965    fn test_is_private_ip_ranges() {
3966        assert!(is_private_ip(&"10.0.0.1".parse().unwrap())); // allow-unwrap
3967        assert!(is_private_ip(&"172.16.1.10".parse().unwrap())); // allow-unwrap
3968        assert!(is_private_ip(&"192.168.1.1".parse().unwrap())); // allow-unwrap
3969        assert!(is_private_ip(&"127.0.0.1".parse().unwrap())); // allow-unwrap
3970        assert!(is_private_ip(&"169.254.1.1".parse().unwrap())); // allow-unwrap
3971        assert!(is_private_ip(&"0.1.2.3".parse().unwrap())); // allow-unwrap
3972
3973        assert!(is_private_ip(&"::1".parse().unwrap())); // allow-unwrap
3974        assert!(is_private_ip(&"fc00::1".parse().unwrap())); // allow-unwrap
3975        assert!(is_private_ip(&"fd12::1".parse().unwrap())); // allow-unwrap
3976        assert!(is_private_ip(&"fe80::1".parse().unwrap())); // allow-unwrap
3977        // ::ffff:0:0/96 (IPv4-mapped): only private if the mapped IPv4 is private
3978        assert!(is_private_ip(&"::ffff:10.0.0.1".parse().unwrap())); // allow-unwrap
3979        assert!(is_private_ip(&"::ffff:192.168.1.1".parse().unwrap())); // allow-unwrap
3980        assert!(is_private_ip(&"::ffff:127.0.0.1".parse().unwrap())); // allow-unwrap
3981
3982        assert!(!is_private_ip(&"8.8.8.8".parse().unwrap())); // allow-unwrap
3983        assert!(!is_private_ip(&"::ffff:8.8.8.8".parse().unwrap())); // allow-unwrap — public IPv4-mapped
3984        assert!(!is_private_ip(&"2001:4860:4860::8888".parse().unwrap())); // allow-unwrap
3985    }
3986
3987    #[tokio::test]
3988    async fn test_validate_resolved_host_for_ssrf_blocks_resolved_private_ip() {
3989        let mut cfg = HttpEndpointConfig::from_uri("http://example.com").unwrap(); // allow-unwrap
3990        cfg.allow_private_ips = false;
3991
3992        let err = validate_resolved_host_for_ssrf("http://localhost", &cfg)
3993            .await
3994            .expect_err("localhost must resolve to loopback and be blocked");
3995
3996        let msg = err.to_string();
3997        assert!(msg.contains("Target resolved to private IP"));
3998    }
3999
4000    #[test]
4001    fn test_title_case_header() {
4002        assert_eq!(title_case_header("content-type"), "Content-Type");
4003        assert_eq!(title_case_header("authorization"), "Authorization");
4004        assert_eq!(title_case_header("x-custom-header"), "X-Custom-Header");
4005        assert_eq!(title_case_header("host"), "Host");
4006        assert_eq!(title_case_header("x-b3-traceid"), "X-B3-Traceid");
4007        assert_eq!(title_case_header("single"), "Single");
4008        assert_eq!(title_case_header(""), "");
4009    }
4010
4011    #[test]
4012    fn test_resolve_url_combines_path_and_query_sources() {
4013        let cfg = HttpEndpointConfig::from_uri("http://example.com/base?foo=bar").unwrap();
4014        let mut exchange = Exchange::new(Message::default());
4015        exchange.input.set_header(
4016            "CamelHttpPath",
4017            serde_json::Value::String("next".to_string()),
4018        );
4019        let url = HttpProducer::resolve_url(&exchange, &cfg);
4020        assert!(url.starts_with("http://example.com/base/next?"));
4021        assert!(url.contains("foo=bar"));
4022
4023        exchange.input.set_header(
4024            "CamelHttpUri",
4025            serde_json::Value::String("http://other.test/root".to_string()),
4026        );
4027        exchange.input.set_header(
4028            "CamelHttpQuery",
4029            serde_json::Value::String("a=1&b=2".to_string()),
4030        );
4031
4032        let override_url = HttpProducer::resolve_url(&exchange, &cfg);
4033        assert_eq!(override_url, "http://other.test/root/next?a=1&b=2");
4034    }
4035
4036    #[test]
4037    fn test_http_producer_helpers_status_and_size_boundaries() {
4038        assert!(HttpProducer::is_ok_status(200, (200, 299)));
4039        assert!(HttpProducer::is_ok_status(299, (200, 299)));
4040        assert!(!HttpProducer::is_ok_status(199, (200, 299)));
4041        assert!(!HttpProducer::is_ok_status(300, (200, 299)));
4042
4043        assert!(!exceeds_max_response_body(10, 10));
4044        assert!(exceeds_max_response_body(11, 10));
4045    }
4046
4047    // -----------------------------------------------------------------------
4048    // Content-Type inference tests
4049    // -----------------------------------------------------------------------
4050
4051    async fn setup_consumer_on_free_port(
4052        path: &str,
4053    ) -> (
4054        u16,
4055        tokio::sync::mpsc::Receiver<camel_component_api::ExchangeEnvelope>,
4056        tokio_util::sync::CancellationToken,
4057    ) {
4058        use camel_component_api::ConsumerContext;
4059
4060        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4061        let port = listener.local_addr().unwrap().port();
4062        drop(listener);
4063
4064        let consumer_cfg = HttpServerConfig {
4065            host: "127.0.0.1".to_string(),
4066            port,
4067            path: path.to_string(),
4068            max_request_body: 2 * 1024 * 1024,
4069            max_response_body: 10 * 1024 * 1024,
4070            max_inflight_requests: 1024,
4071        };
4072        let mut consumer = HttpConsumer::new(consumer_cfg);
4073
4074        let (tx, rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4075        let token = tokio_util::sync::CancellationToken::new();
4076        let ctx = ConsumerContext::new(tx, token.clone());
4077
4078        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4079        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4080
4081        (port, rx, token)
4082    }
4083
4084    #[tokio::test]
4085    async fn test_content_type_inferred_for_json_body() {
4086        let (port, mut rx, token) = setup_consumer_on_free_port("/json").await;
4087
4088        let client = reqwest::Client::new();
4089        let send_fut = client.get(format!("http://127.0.0.1:{port}/json")).send();
4090
4091        let (http_result, _) = tokio::join!(send_fut, async {
4092            if let Some(mut envelope) = rx.recv().await {
4093                envelope.exchange.input.body =
4094                    camel_component_api::Body::Json(serde_json::json!({"message": "hello"}));
4095                if let Some(reply_tx) = envelope.reply_tx {
4096                    let _ = reply_tx.send(Ok(envelope.exchange));
4097                }
4098            }
4099        });
4100
4101        let resp = http_result.unwrap();
4102        assert_eq!(resp.status().as_u16(), 200);
4103        let ct = resp
4104            .headers()
4105            .get("content-type")
4106            .expect("Content-Type header should be present");
4107        assert_eq!(ct, "application/json");
4108        let body = resp.text().await.unwrap();
4109        assert_eq!(body, r#"{"message":"hello"}"#);
4110
4111        token.cancel();
4112    }
4113
4114    #[tokio::test]
4115    async fn test_content_type_inferred_for_text_body() {
4116        let (port, mut rx, token) = setup_consumer_on_free_port("/text").await;
4117
4118        let client = reqwest::Client::new();
4119        let send_fut = client.get(format!("http://127.0.0.1:{port}/text")).send();
4120
4121        let (http_result, _) = tokio::join!(send_fut, async {
4122            if let Some(mut envelope) = rx.recv().await {
4123                envelope.exchange.input.body =
4124                    camel_component_api::Body::Text("plain text response".to_string());
4125                if let Some(reply_tx) = envelope.reply_tx {
4126                    let _ = reply_tx.send(Ok(envelope.exchange));
4127                }
4128            }
4129        });
4130
4131        let resp = http_result.unwrap();
4132        assert_eq!(resp.status().as_u16(), 200);
4133        let ct = resp
4134            .headers()
4135            .get("content-type")
4136            .expect("Content-Type header should be present");
4137        assert_eq!(ct, "text/plain; charset=utf-8");
4138        let body = resp.text().await.unwrap();
4139        assert_eq!(body, "plain text response");
4140
4141        token.cancel();
4142    }
4143
4144    #[tokio::test]
4145    async fn test_content_type_inferred_for_xml_body() {
4146        let (port, mut rx, token) = setup_consumer_on_free_port("/xml").await;
4147
4148        let client = reqwest::Client::new();
4149        let send_fut = client.get(format!("http://127.0.0.1:{port}/xml")).send();
4150
4151        let (http_result, _) = tokio::join!(send_fut, async {
4152            if let Some(mut envelope) = rx.recv().await {
4153                envelope.exchange.input.body =
4154                    camel_component_api::Body::Xml("<root><item>value</item></root>".to_string());
4155                if let Some(reply_tx) = envelope.reply_tx {
4156                    let _ = reply_tx.send(Ok(envelope.exchange));
4157                }
4158            }
4159        });
4160
4161        let resp = http_result.unwrap();
4162        assert_eq!(resp.status().as_u16(), 200);
4163        let ct = resp
4164            .headers()
4165            .get("content-type")
4166            .expect("Content-Type header should be present");
4167        assert_eq!(ct, "application/xml");
4168        let body = resp.text().await.unwrap();
4169        assert_eq!(body, "<root><item>value</item></root>");
4170
4171        token.cancel();
4172    }
4173
4174    #[tokio::test]
4175    async fn test_no_content_type_for_empty_body() {
4176        let (port, mut rx, token) = setup_consumer_on_free_port("/empty").await;
4177
4178        let client = reqwest::Client::new();
4179        let send_fut = client.get(format!("http://127.0.0.1:{port}/empty")).send();
4180
4181        let (http_result, _) = tokio::join!(send_fut, async {
4182            if let Some(mut envelope) = rx.recv().await {
4183                envelope.exchange.input.body = camel_component_api::Body::Empty;
4184                if let Some(reply_tx) = envelope.reply_tx {
4185                    let _ = reply_tx.send(Ok(envelope.exchange));
4186                }
4187            }
4188        });
4189
4190        let resp = http_result.unwrap();
4191        assert_eq!(resp.status().as_u16(), 200);
4192        assert!(
4193            resp.headers().get("content-type").is_none(),
4194            "Empty body should not set Content-Type"
4195        );
4196
4197        token.cancel();
4198    }
4199
4200    #[tokio::test]
4201    async fn test_no_content_type_for_raw_bytes_body() {
4202        let (port, mut rx, token) = setup_consumer_on_free_port("/bytes").await;
4203
4204        let client = reqwest::Client::new();
4205        let send_fut = client.get(format!("http://127.0.0.1:{port}/bytes")).send();
4206
4207        let (http_result, _) = tokio::join!(send_fut, async {
4208            if let Some(mut envelope) = rx.recv().await {
4209                envelope.exchange.input.body =
4210                    camel_component_api::Body::Bytes(bytes::Bytes::from_static(b"\x00\x01\x02"));
4211                if let Some(reply_tx) = envelope.reply_tx {
4212                    let _ = reply_tx.send(Ok(envelope.exchange));
4213                }
4214            }
4215        });
4216
4217        let resp = http_result.unwrap();
4218        assert_eq!(resp.status().as_u16(), 200);
4219        assert!(
4220            resp.headers().get("content-type").is_none(),
4221            "Raw Bytes body should not set Content-Type"
4222        );
4223
4224        token.cancel();
4225    }
4226
4227    #[tokio::test]
4228    async fn test_content_type_from_stream_metadata() {
4229        use camel_component_api::{StreamBody, StreamMetadata};
4230        use futures::stream;
4231
4232        let (port, mut rx, token) = setup_consumer_on_free_port("/stream-ct").await;
4233
4234        let client = reqwest::Client::new();
4235        let send_fut = client
4236            .get(format!("http://127.0.0.1:{port}/stream-ct"))
4237            .send();
4238
4239        let (http_result, _) = tokio::join!(send_fut, async {
4240            if let Some(mut envelope) = rx.recv().await {
4241                let chunks: Vec<Result<bytes::Bytes, CamelError>> =
4242                    vec![Ok(bytes::Bytes::from("audio data"))];
4243                let stream = Box::pin(stream::iter(chunks));
4244                envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
4245                    stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
4246                    metadata: StreamMetadata {
4247                        size_hint: None,
4248                        content_type: Some("audio/mpeg".to_string()),
4249                        origin: None,
4250                    },
4251                });
4252                if let Some(reply_tx) = envelope.reply_tx {
4253                    let _ = reply_tx.send(Ok(envelope.exchange));
4254                }
4255            }
4256        });
4257
4258        let resp = http_result.unwrap();
4259        assert_eq!(resp.status().as_u16(), 200);
4260        let ct = resp
4261            .headers()
4262            .get("content-type")
4263            .expect("Content-Type header should be present");
4264        assert_eq!(ct, "audio/mpeg");
4265        let body = resp.text().await.unwrap();
4266        assert_eq!(body, "audio data");
4267
4268        token.cancel();
4269    }
4270
4271    #[tokio::test]
4272    async fn test_user_content_type_overrides_inferred() {
4273        let (port, mut rx, token) = setup_consumer_on_free_port("/override-ct").await;
4274
4275        let client = reqwest::Client::new();
4276        let send_fut = client
4277            .get(format!("http://127.0.0.1:{port}/override-ct"))
4278            .send();
4279
4280        let (http_result, _) = tokio::join!(send_fut, async {
4281            if let Some(mut envelope) = rx.recv().await {
4282                envelope.exchange.input.body =
4283                    camel_component_api::Body::Json(serde_json::json!({"ok": true}));
4284                envelope.exchange.input.set_header(
4285                    "Content-Type",
4286                    serde_json::Value::String("text/html".to_string()),
4287                );
4288                if let Some(reply_tx) = envelope.reply_tx {
4289                    let _ = reply_tx.send(Ok(envelope.exchange));
4290                }
4291            }
4292        });
4293
4294        let resp = http_result.unwrap();
4295        assert_eq!(resp.status().as_u16(), 200);
4296        let ct = resp
4297            .headers()
4298            .get("content-type")
4299            .expect("Content-Type header should be present");
4300        assert_eq!(
4301            ct, "text/html",
4302            "User-set Content-Type should take precedence over inferred type"
4303        );
4304
4305        token.cancel();
4306    }
4307
4308    #[tokio::test]
4309    async fn test_user_content_type_with_bytes_body() {
4310        let (port, mut rx, token) = setup_consumer_on_free_port("/bytes-ct").await;
4311
4312        let client = reqwest::Client::new();
4313        let send_fut = client
4314            .get(format!("http://127.0.0.1:{port}/bytes-ct"))
4315            .send();
4316
4317        let (http_result, _) = tokio::join!(send_fut, async {
4318            if let Some(mut envelope) = rx.recv().await {
4319                envelope.exchange.input.body =
4320                    camel_component_api::Body::Bytes(bytes::Bytes::from_static(b"{\"ok\":true}"));
4321                envelope.exchange.input.set_header(
4322                    "Content-Type",
4323                    serde_json::Value::String("application/json".to_string()),
4324                );
4325                if let Some(reply_tx) = envelope.reply_tx {
4326                    let _ = reply_tx.send(Ok(envelope.exchange));
4327                }
4328            }
4329        });
4330
4331        let resp = http_result.unwrap();
4332        assert_eq!(resp.status().as_u16(), 200);
4333        let ct = resp
4334            .headers()
4335            .get("content-type")
4336            .expect("Content-Type header should be present for Bytes body with user header");
4337        assert_eq!(
4338            ct, "application/json",
4339            "User Content-Type should be sent for Bytes body"
4340        );
4341
4342        token.cancel();
4343    }
4344}