Skip to main content

camel_component_http/
lib.rs

1pub mod bundle;
2pub mod config;
3pub use bundle::HttpBundle;
4pub use config::HttpConfig;
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex, OnceLock};
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use tokio::sync::{OnceCell, RwLock};
14use tower::Service;
15use tracing::debug;
16
17use axum::body::BodyDataStream;
18use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, StreamBody, StreamMetadata};
19use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
20use camel_component_api::{UriComponents, UriConfig, parse_uri};
21use futures::TryStreamExt;
22use futures::stream::BoxStream;
23
24// ---------------------------------------------------------------------------
25// HttpEndpointConfig
26// ---------------------------------------------------------------------------
27
28/// Configuration for an HTTP client (producer) endpoint.
29///
30/// # Memory Limits
31///
32/// HTTP operations enforce conservative memory limits to prevent denial-of-service
33/// attacks from untrusted network sources. These limits are significantly lower than
34/// file component limits (100MB) because HTTP typically handles API responses rather
35/// than large file transfers, and clients may be untrusted.
36///
37/// ## Default Limits
38///
39/// - **HTTP client body**: 10MB (typical API responses)
40/// - **HTTP server request**: 2MB (untrusted network input - see `HttpServerConfig`)
41/// - **HTTP server response**: 10MB (same as client - see `HttpServerConfig`)
42///
43/// ## Rationale
44///
45/// The 10MB limit for HTTP client responses is appropriate for most API interactions
46/// while providing protection against:
47/// - Malicious servers sending oversized responses
48/// - Runaway processes generating unexpectedly large payloads
49/// - Memory exhaustion attacks
50///
51/// The 2MB server request limit is even more conservative because it handles input
52/// from potentially untrusted clients on the public internet.
53///
54/// ## Overriding Limits
55///
56/// Override the default client body limit using the `maxBodySize` URI parameter:
57///
58/// ```text
59/// http://api.example.com/large-data?maxBodySize=52428800
60/// ```
61///
62/// For server endpoints, use `maxRequestBody` and `maxResponseBody` parameters:
63///
64/// ```text
65/// http://0.0.0.0:8080/upload?maxRequestBody=52428800
66/// ```
67///
68/// ## Behavior When Exceeded
69///
70/// When a body exceeds the configured limit:
71/// - An error is returned immediately
72/// - No memory is exhausted - the limit is checked before allocation
73/// - The HTTP connection is terminated cleanly
74///
75/// ## Security Considerations
76///
77/// HTTP endpoints should be treated with more caution than file endpoints because:
78/// - Clients may be unknown and untrusted
79/// - Network traffic can be spoofed or malicious
80/// - DoS attacks often exploit unbounded resource consumption
81///
82/// Only increase limits when you control both ends of the connection or when
83/// business requirements demand larger payloads.
84#[derive(Debug, Clone)]
85pub struct HttpEndpointConfig {
86    pub base_url: String,
87    pub http_method: Option<String>,
88    pub throw_exception_on_failure: bool,
89    pub ok_status_code_range: (u16, u16),
90    pub response_timeout: Option<Duration>,
91    pub query_params: HashMap<String, String>,
92    pub allow_private_ips: bool,
93    pub blocked_hosts: Vec<String>,
94    pub max_body_size: usize,
95}
96
97/// Camel options that should NOT be forwarded as HTTP query params
98const HTTP_CAMEL_OPTIONS: &[&str] = &[
99    "httpMethod",
100    "throwExceptionOnFailure",
101    "okStatusCodeRange",
102    "followRedirects",
103    "connectTimeout",
104    "responseTimeout",
105    "allowPrivateIps",
106    "blockedHosts",
107    "maxBodySize",
108];
109
110impl UriConfig for HttpEndpointConfig {
111    /// Returns "http" as the primary scheme (also accepts "https")
112    fn scheme() -> &'static str {
113        "http"
114    }
115
116    fn from_uri(uri: &str) -> Result<Self, CamelError> {
117        let parts = parse_uri(uri)?;
118        Self::from_components(parts)
119    }
120
121    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
122        // Validate scheme - accept both http and https
123        if parts.scheme != "http" && parts.scheme != "https" {
124            return Err(CamelError::InvalidUri(format!(
125                "expected scheme 'http' or 'https', got '{}'",
126                parts.scheme
127            )));
128        }
129
130        // Construct base_url from scheme + path
131        // e.g., "http://localhost:8080/api" from scheme "http" and path "//localhost:8080/api"
132        let base_url = format!("{}:{}", parts.scheme, parts.path);
133
134        let http_method = parts.params.get("httpMethod").cloned();
135
136        let throw_exception_on_failure = parts
137            .params
138            .get("throwExceptionOnFailure")
139            .map(|v| v != "false")
140            .unwrap_or(true);
141
142        // Parse status code range from "start-end" format (e.g., "200-299")
143        let ok_status_code_range = parts
144            .params
145            .get("okStatusCodeRange")
146            .and_then(|v| {
147                let (start, end) = v.split_once('-')?;
148                Some((start.parse::<u16>().ok()?, end.parse::<u16>().ok()?))
149            })
150            .unwrap_or((200, 299));
151
152        let response_timeout = parts
153            .params
154            .get("responseTimeout")
155            .and_then(|v| v.parse::<u64>().ok())
156            .map(Duration::from_millis);
157
158        // SSRF protection settings
159        let allow_private_ips = parts
160            .params
161            .get("allowPrivateIps")
162            .map(|v| v == "true")
163            .unwrap_or(false); // Default: block private IPs
164
165        // Parse comma-separated blocked hosts
166        let blocked_hosts = parts
167            .params
168            .get("blockedHosts")
169            .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
170            .unwrap_or_default();
171
172        let max_body_size = parts
173            .params
174            .get("maxBodySize")
175            .and_then(|v| v.parse::<usize>().ok())
176            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
177
178        // Collect remaining params (not Camel options) as query params
179        let query_params: HashMap<String, String> = parts
180            .params
181            .into_iter()
182            .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
183            .collect();
184
185        Ok(Self {
186            base_url,
187            http_method,
188            throw_exception_on_failure,
189            ok_status_code_range,
190            response_timeout,
191            query_params,
192            allow_private_ips,
193            blocked_hosts,
194            max_body_size,
195        })
196    }
197}
198
199impl HttpEndpointConfig {
200    pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
201        let parts = parse_uri(uri)?;
202        let mut endpoint = Self::from_components(parts.clone())?;
203        if endpoint.response_timeout.is_none() {
204            endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
205        }
206        if !parts.params.contains_key("allowPrivateIps") {
207            endpoint.allow_private_ips = config.allow_private_ips;
208        }
209        if !parts.params.contains_key("blockedHosts") {
210            endpoint.blocked_hosts = config.blocked_hosts.clone();
211        }
212        if !parts.params.contains_key("maxBodySize") {
213            endpoint.max_body_size = config.max_body_size;
214        }
215        Ok(endpoint)
216    }
217}
218
219// ---------------------------------------------------------------------------
220// HttpServerConfig
221// ---------------------------------------------------------------------------
222
223/// Configuration for an HTTP server (consumer) endpoint.
224#[derive(Debug, Clone)]
225pub struct HttpServerConfig {
226    /// Bind address, e.g. "0.0.0.0" or "127.0.0.1".
227    pub host: String,
228    /// TCP port to listen on.
229    pub port: u16,
230    /// URL path this consumer handles, e.g. "/orders".
231    pub path: String,
232    /// Maximum request body size in bytes.
233    pub max_request_body: usize,
234    /// Maximum response body size for materializing streams in bytes.
235    pub max_response_body: usize,
236    /// Maximum number of in-flight requests handled concurrently by this server.
237    pub max_inflight_requests: usize,
238}
239
240impl UriConfig for HttpServerConfig {
241    /// Returns "http" as the primary scheme (also accepts "https")
242    fn scheme() -> &'static str {
243        "http"
244    }
245
246    fn from_uri(uri: &str) -> Result<Self, CamelError> {
247        let parts = parse_uri(uri)?;
248        Self::from_components(parts)
249    }
250
251    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
252        // Validate scheme - accept both http and https
253        if parts.scheme != "http" && parts.scheme != "https" {
254            return Err(CamelError::InvalidUri(format!(
255                "expected scheme 'http' or 'https', got '{}'",
256                parts.scheme
257            )));
258        }
259
260        // parts.path is everything after the scheme colon, e.g. "//0.0.0.0:8080/orders"
261        // Strip leading "//"
262        let authority_and_path = parts.path.trim_start_matches('/');
263
264        // Split on the first "/" to separate "host:port" from "/path"
265        let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
266            (&authority_and_path[..idx], &authority_and_path[idx..])
267        } else {
268            (authority_and_path, "/")
269        };
270
271        let path = if path_suffix.is_empty() {
272            "/"
273        } else {
274            path_suffix
275        }
276        .to_string();
277
278        // Parse host:port from authority
279        let (host, port) = if let Some(colon) = authority.rfind(':') {
280            let port_str = &authority[colon + 1..];
281            match port_str.parse::<u16>() {
282                Ok(p) => (authority[..colon].to_string(), p),
283                Err(_) => {
284                    return Err(CamelError::InvalidUri(format!(
285                        "invalid port '{}' in authority",
286                        port_str
287                    )));
288                }
289            }
290        } else {
291            // Default port based on scheme: 443 for https, 80 for http
292            let default_port = if parts.scheme == "https" { 443 } else { 80 };
293            (authority.to_string(), default_port)
294        };
295
296        let max_request_body = parts
297            .params
298            .get("maxRequestBody")
299            .and_then(|v| v.parse::<usize>().ok())
300            .unwrap_or(2 * 1024 * 1024); // Default: 2MB
301
302        let max_response_body = parts
303            .params
304            .get("maxResponseBody")
305            .and_then(|v| v.parse::<usize>().ok())
306            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
307
308        let max_inflight_requests = parts
309            .params
310            .get("maxInflightRequests")
311            .and_then(|v| v.parse::<usize>().ok())
312            .unwrap_or(1024);
313
314        Ok(Self {
315            host,
316            port,
317            path,
318            max_request_body,
319            max_response_body,
320            max_inflight_requests,
321        })
322    }
323}
324
325impl HttpServerConfig {
326    pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
327        let parts = parse_uri(uri)?;
328        let mut server = Self::from_components(parts.clone())?;
329        if !parts.params.contains_key("maxRequestBody") {
330            server.max_request_body = config.max_request_body;
331        }
332        if !parts.params.contains_key("maxResponseBody") {
333            server.max_response_body = config.max_body_size;
334        }
335        Ok(server)
336    }
337}
338
339// ---------------------------------------------------------------------------
340// RequestEnvelope / HttpReply
341// ---------------------------------------------------------------------------
342
343/// Body de la respuesta HTTP: bytes ya materializados o stream lazy.
344pub(crate) enum HttpReplyBody {
345    Bytes(bytes::Bytes),
346    Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
347}
348
349/// An inbound HTTP request sent from the Axum dispatch handler to an
350/// `HttpConsumer` receive loop.
351pub(crate) struct RequestEnvelope {
352    pub(crate) method: String,
353    pub(crate) path: String,
354    pub(crate) query: String,
355    pub(crate) headers: http::HeaderMap,
356    pub(crate) body: StreamBody,
357    pub(crate) reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
358}
359
360/// The HTTP response that `HttpConsumer` sends back to the Axum handler.
361pub(crate) struct HttpReply {
362    pub(crate) status: u16,
363    pub(crate) headers: Vec<(String, String)>,
364    pub(crate) body: HttpReplyBody,
365}
366
367// ---------------------------------------------------------------------------
368// DispatchTable / ServerRegistry
369// ---------------------------------------------------------------------------
370
371/// Maps URL path → channel sender for the consumer that owns that path.
372pub(crate) type DispatchTable =
373    Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
374
375type ServerKey = (String, u16);
376
377/// Handle to a running Axum server on one interface/port.
378#[allow(dead_code)]
379struct ServerHandle {
380    dispatch: DispatchTable,
381    max_request_body: usize,
382    max_response_body: usize,
383    max_inflight_requests: usize,
384    inflight: Arc<tokio::sync::Semaphore>,
385    /// Kept alive so the task isn't dropped; not used directly.
386    _task: tokio::task::JoinHandle<()>,
387}
388
389/// Process-global registry mapping (host, port) → running Axum server handle.
390pub struct ServerRegistry {
391    inner: Mutex<HashMap<ServerKey, Arc<OnceCell<ServerHandle>>>>,
392}
393
394impl ServerRegistry {
395    /// Returns the global singleton.
396    pub fn global() -> &'static Self {
397        static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
398        INSTANCE.get_or_init(|| ServerRegistry {
399            inner: Mutex::new(HashMap::new()),
400        })
401    }
402
403    /// Returns the `DispatchTable` for `port`, spawning a new Axum server if
404    /// none is running on that port yet.
405    pub(crate) async fn get_or_spawn(
406        &'static self,
407        host: &str,
408        port: u16,
409        max_request_body: usize,
410        max_response_body: usize,
411        max_inflight_requests: usize,
412    ) -> Result<DispatchTable, CamelError> {
413        let host_owned = host.to_string();
414
415        let cell = {
416            let mut guard = self.inner.lock().map_err(|_| {
417                CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
418            })?;
419            let key = (host.to_string(), port);
420            guard
421                .entry(key)
422                .or_insert_with(|| Arc::new(OnceCell::new()))
423                .clone()
424        };
425
426        if let Some(existing) = cell.get()
427            && existing.max_request_body != max_request_body
428        {
429            return Err(CamelError::EndpointCreationFailed(format!(
430                "incompatible maxRequestBody for shared server (host={host}, port={port}): {} vs {}",
431                existing.max_request_body, max_request_body
432            )));
433        }
434
435        if let Some(existing) = cell.get()
436            && existing.max_response_body != max_response_body
437        {
438            return Err(CamelError::EndpointCreationFailed(format!(
439                "incompatible maxResponseBody for shared server (host={host}, port={port}): {} vs {}",
440                existing.max_response_body, max_response_body
441            )));
442        }
443
444        if let Some(existing) = cell.get()
445            && existing.max_inflight_requests != max_inflight_requests
446        {
447            return Err(CamelError::EndpointCreationFailed(format!(
448                "incompatible maxInflightRequests for shared server (host={host}, port={port}): {} vs {}",
449                existing.max_inflight_requests, max_inflight_requests
450            )));
451        }
452
453        let handle = cell
454            .get_or_try_init(|| async {
455                let addr = format!("{host_owned}:{port}");
456                let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
457                    CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
458                })?;
459                let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
460                let inflight = Arc::new(tokio::sync::Semaphore::new(max_inflight_requests));
461                let task = tokio::spawn(run_axum_server(
462                    listener,
463                    Arc::clone(&dispatch),
464                    max_request_body,
465                    max_response_body,
466                    Arc::clone(&inflight),
467                ));
468                Ok::<ServerHandle, CamelError>(ServerHandle {
469                    dispatch,
470                    max_request_body,
471                    max_response_body,
472                    max_inflight_requests,
473                    inflight,
474                    _task: task,
475                })
476            })
477            .await?;
478
479        Ok(Arc::clone(&handle.dispatch))
480    }
481}
482
483// ---------------------------------------------------------------------------
484// Axum server
485// ---------------------------------------------------------------------------
486
487use axum::{
488    Router,
489    body::Body as AxumBody,
490    extract::{Request, State},
491    http::{Response, StatusCode},
492    response::IntoResponse,
493};
494
495#[derive(Clone)]
496struct AppState {
497    dispatch: DispatchTable,
498    max_request_body: usize,
499    max_response_body: usize,
500    inflight: Arc<tokio::sync::Semaphore>,
501}
502
503async fn run_axum_server(
504    listener: tokio::net::TcpListener,
505    dispatch: DispatchTable,
506    max_request_body: usize,
507    max_response_body: usize,
508    inflight: Arc<tokio::sync::Semaphore>,
509) {
510    let state = AppState {
511        dispatch,
512        max_request_body,
513        max_response_body,
514        inflight,
515    };
516    let app = Router::new().fallback(dispatch_handler).with_state(state);
517
518    axum::serve(listener, app).await.unwrap_or_else(|e| {
519        tracing::error!(error = %e, "Axum server error");
520    });
521}
522
523async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
524    let method = req.method().to_string();
525    let path = req.uri().path().to_string();
526    let query = req.uri().query().unwrap_or("").to_string();
527    let headers = req.headers().clone();
528
529    // Check Content-Length against limit BEFORE opening the stream
530    let content_length: Option<u64> = headers
531        .get(http::header::CONTENT_LENGTH)
532        .and_then(|v| v.to_str().ok())
533        .and_then(|s| s.parse().ok());
534
535    if let Some(len) = content_length
536        && len > state.max_request_body as u64
537    {
538        return Response::builder()
539            .status(StatusCode::PAYLOAD_TOO_LARGE)
540            .body(AxumBody::from("Request body exceeds configured limit"))
541            .expect("infallible");
542    }
543
544    let _permit = match Arc::clone(&state.inflight).try_acquire_owned() {
545        Ok(permit) => permit,
546        Err(_) => {
547            return Response::builder()
548                .status(StatusCode::SERVICE_UNAVAILABLE)
549                .body(AxumBody::from("Service Unavailable"))
550                .expect("infallible");
551        }
552    };
553
554    // Build StreamBody from Axum body WITHOUT materializing
555    let content_type = headers
556        .get(http::header::CONTENT_TYPE)
557        .and_then(|v| v.to_str().ok())
558        .map(|s| s.to_string());
559
560    let data_stream: BodyDataStream = req.into_body().into_data_stream();
561    let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
562    let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
563
564    let stream_body = StreamBody {
565        stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
566        metadata: StreamMetadata {
567            size_hint: content_length,
568            content_type,
569            origin: None,
570        },
571    };
572
573    // Look up handler for this path
574    let sender = {
575        let table = state.dispatch.read().await;
576        table.get(&path).cloned()
577    };
578    let Some(sender) = sender else {
579        return Response::builder()
580            .status(StatusCode::NOT_FOUND)
581            .body(AxumBody::from("No consumer registered for this path"))
582            .expect("infallible");
583    };
584
585    let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
586    let envelope = RequestEnvelope {
587        method,
588        path,
589        query,
590        headers,
591        body: stream_body,
592        reply_tx,
593    };
594
595    if sender.send(envelope).await.is_err() {
596        return Response::builder()
597            .status(StatusCode::SERVICE_UNAVAILABLE)
598            .body(AxumBody::from("Consumer unavailable"))
599            .expect("infallible");
600    }
601
602    match reply_rx.await {
603        Ok(reply) => {
604            let reply = match reply.body {
605                HttpReplyBody::Bytes(b)
606                    if exceeds_max_response_body(b.len(), state.max_response_body) =>
607                {
608                    HttpReply {
609                        status: 500,
610                        headers: vec![],
611                        body: HttpReplyBody::Bytes(bytes::Bytes::from(
612                            "Response body exceeds configured limit",
613                        )),
614                    }
615                }
616                _ => reply,
617            };
618
619            let status =
620                StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
621            let mut builder = Response::builder().status(status);
622            for (k, v) in &reply.headers {
623                builder = builder.header(k.as_str(), v.as_str());
624            }
625            match reply.body {
626                HttpReplyBody::Bytes(b) => builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
627                    Response::builder()
628                        .status(StatusCode::INTERNAL_SERVER_ERROR)
629                        .body(AxumBody::from("Invalid response headers from consumer"))
630                        .expect("infallible")
631                }),
632                HttpReplyBody::Stream(stream) => builder
633                    .body(AxumBody::from_stream(stream))
634                    .unwrap_or_else(|_| {
635                        Response::builder()
636                            .status(StatusCode::INTERNAL_SERVER_ERROR)
637                            .body(AxumBody::from("Invalid response headers from consumer"))
638                            .expect("infallible")
639                    }),
640            }
641        }
642        Err(_) => Response::builder()
643            .status(StatusCode::INTERNAL_SERVER_ERROR)
644            .body(AxumBody::from("Pipeline error"))
645            .expect("Response::builder() with a known-valid status code and body is infallible"),
646    }
647}
648
649fn exceeds_max_response_body(len: usize, max: usize) -> bool {
650    len > max
651}
652
653// ---------------------------------------------------------------------------
654// HttpConsumer
655// ---------------------------------------------------------------------------
656
657pub struct HttpConsumer {
658    config: HttpServerConfig,
659}
660
661impl HttpConsumer {
662    pub fn new(config: HttpServerConfig) -> Self {
663        Self { config }
664    }
665}
666
667#[async_trait::async_trait]
668impl Consumer for HttpConsumer {
669    async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
670        use camel_component_api::{Body, Exchange, Message};
671
672        let dispatch = ServerRegistry::global()
673            .get_or_spawn(
674                &self.config.host,
675                self.config.port,
676                self.config.max_request_body,
677                self.config.max_response_body,
678                self.config.max_inflight_requests,
679            )
680            .await?;
681
682        // Create a channel for this path and register it
683        let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
684        {
685            let mut table = dispatch.write().await;
686            table.insert(self.config.path.clone(), env_tx);
687        }
688
689        let path = self.config.path.clone();
690        let cancel_token = ctx.cancel_token();
691        loop {
692            tokio::select! {
693                _ = ctx.cancelled() => {
694                    break;
695                }
696                envelope = env_rx.recv() => {
697                    let Some(envelope) = envelope else { break; };
698
699                    // Build Exchange from HTTP request
700                    let mut msg = Message::default();
701
702                    // Set standard Camel HTTP headers
703                    msg.set_header("CamelHttpMethod",
704                        serde_json::Value::String(envelope.method.clone()));
705                    msg.set_header("CamelHttpPath",
706                        serde_json::Value::String(envelope.path.clone()));
707                    msg.set_header("CamelHttpQuery",
708                        serde_json::Value::String(envelope.query.clone()));
709
710                    // Forward HTTP headers (skip pseudo-headers)
711                    for (k, v) in &envelope.headers {
712                        if let Ok(val_str) = v.to_str() {
713                            msg.set_header(
714                                k.as_str(),
715                                serde_json::Value::String(val_str.to_string()),
716                            );
717                        }
718                    }
719
720                    // Body: always arrives as Body::Stream (native streaming)
721                    // Routes can call into_bytes() if they need to materialize
722                    msg.body = Body::Stream(envelope.body);
723
724                    #[allow(unused_mut)]
725                    let mut exchange = Exchange::new(msg);
726
727                    // Extract W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
728                    #[cfg(feature = "otel")]
729                    {
730                        let headers: HashMap<String, String> = envelope
731                            .headers
732                            .iter()
733                            .filter_map(|(k, v)| {
734                                Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
735                            })
736                            .collect();
737                        camel_otel::extract_into_exchange(&mut exchange, &headers);
738                    }
739
740                    let reply_tx = envelope.reply_tx;
741                    let sender = ctx.sender().clone();
742                    let path_clone = path.clone();
743                    let cancel = cancel_token.clone();
744
745                    // Spawn a task to handle this request concurrently
746                    //
747                    // NOTE: This spawns a separate tokio task for each incoming HTTP request to enable
748                    // true concurrent request processing. This change was introduced as part of the
749                    // pipeline concurrency feature and was NOT part of the original HttpConsumer design.
750                    //
751                    // Rationale:
752                    // 1. Without spawning per-request tasks, the send_and_wait() operation would block
753                    //    the consumer's main loop until the pipeline processing completes
754                    // 2. This blocking would prevent multiple HTTP requests from being processed
755                    //    concurrently, even when ConcurrencyModel::Concurrent is enabled on the pipeline
756                    // 3. The channel would never have multiple exchanges buffered simultaneously,
757                    //    defeating the purpose of pipeline-side concurrency
758                    // 4. By spawning a task per request, we allow the consumer loop to continue
759                    //    accepting new requests while existing ones are processed in the pipeline
760                    //
761                    // This approach effectively decouples request acceptance from pipeline processing,
762                    // allowing the channel to buffer multiple exchanges that can be processed concurrently
763                    // by the pipeline when ConcurrencyModel::Concurrent is active.
764                    tokio::spawn(async move {
765                        // Check for cancellation before sending to pipeline.
766                        // Returns 503 (Service Unavailable) instead of letting the request
767                        // enter a shutting-down pipeline. This is a behavioral change from
768                        // the pre-concurrency implementation where cancellation during
769                        // processing would result in a 500 (Internal Server Error).
770                        // 503 is more semantically correct: the server is temporarily
771                        // unable to handle the request due to shutdown.
772                        if cancel.is_cancelled() {
773                            let _ = reply_tx.send(HttpReply {
774                                status: 503,
775                                headers: vec![],
776                                body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
777                            });
778                            return;
779                        }
780
781                        // Send through pipeline and await result
782                        let (tx, rx) = tokio::sync::oneshot::channel();
783                        let envelope = camel_component_api::consumer::ExchangeEnvelope {
784                            exchange,
785                            reply_tx: Some(tx),
786                        };
787
788                        let result = match sender.send(envelope).await {
789                            Ok(()) => rx.await.map_err(|_| camel_component_api::CamelError::ChannelClosed),
790                            Err(_) => Err(camel_component_api::CamelError::ChannelClosed),
791                        }
792                        .and_then(|r| r);
793
794                        let reply = match result {
795                            Ok(out) => {
796                                let status = out
797                                    .input
798                                    .header("CamelHttpResponseCode")
799                                    .and_then(|v| v.as_u64())
800                                    .map(|s| s as u16)
801                                    .unwrap_or(200);
802
803                                let reply_body: HttpReplyBody = match out.input.body {
804                                    Body::Empty => HttpReplyBody::Bytes(bytes::Bytes::new()),
805                                    Body::Bytes(b) => HttpReplyBody::Bytes(b),
806                                    Body::Text(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
807                                    Body::Xml(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
808                                    Body::Json(v) => HttpReplyBody::Bytes(bytes::Bytes::from(
809                                        v.to_string().into_bytes(),
810                                    )),
811                                    Body::Stream(s) => {
812                                        match s.stream.lock().await.take() {
813                                            Some(stream) => HttpReplyBody::Stream(stream),
814                                            None => {
815                                                tracing::error!(
816                                                    "Body::Stream already consumed before HTTP reply — returning 500"
817                                                );
818                                                let error_reply = HttpReply {
819                                                    status: 500,
820                                                    headers: vec![],
821                                                    body: HttpReplyBody::Bytes(bytes::Bytes::new()),
822                                                };
823                                                if reply_tx.send(error_reply).is_err() {
824                                                    debug!("reply_tx dropped before error reply could be sent");
825                                                }
826                                                return;
827                                            }
828                                        }
829                                    }
830                                };
831
832                                let resp_headers: Vec<(String, String)> = out
833                                    .input
834                                    .headers
835                                    .iter()
836                                    // Filter Camel internal headers
837                                    .filter(|(k, _)| !k.starts_with("Camel"))
838                                    // Filter hop-by-hop and request-only headers
839                                    // Based on Apache Camel's HttpUtil.addCommonFilters()
840                                    .filter(|(k, _)| {
841                                        !matches!(
842                                            k.to_lowercase().as_str(),
843                                            // RFC 2616 Section 4.5 - General headers
844                                            "content-length" |      // Auto-calculated by framework
845                                            "content-type" |        // Auto-calculated from body
846                                            "transfer-encoding" |   // Hop-by-hop
847                                            "connection" |          // Hop-by-hop
848                                            "cache-control" |       // Hop-by-hop
849                                            "date" |                // Auto-generated
850                                            "pragma" |              // Hop-by-hop
851                                            "trailer" |             // Hop-by-hop
852                                            "upgrade" |             // Hop-by-hop
853                                            "via" |                 // Hop-by-hop
854                                            "warning" |             // Hop-by-hop
855                                            // Request-only headers
856                                            "host" |                // Request-only
857                                            "user-agent" |          // Request-only
858                                            "accept" |              // Request-only
859                                            "accept-encoding" |     // Request-only
860                                            "accept-language" |     // Request-only
861                                            "accept-charset" |      // Request-only
862                                            "authorization" |       // Request-only (security)
863                                            "proxy-authorization" | // Request-only (security)
864                                            "cookie" |              // Request-only
865                                            "expect" |              // Request-only
866                                            "from" |                // Request-only
867                                            "if-match" |            // Request-only
868                                            "if-modified-since" |   // Request-only
869                                            "if-none-match" |       // Request-only
870                                            "if-range" |            // Request-only
871                                            "if-unmodified-since" | // Request-only
872                                            "max-forwards" |        // Request-only
873                                            "proxy-connection" |    // Request-only
874                                            "range" |               // Request-only
875                                            "referer" |             // Request-only
876                                            "te"                    // Request-only
877                                        )
878                                    })
879                                    .filter_map(|(k, v)| {
880                                        v.as_str().map(|s| (k.clone(), s.to_string()))
881                                    })
882                                    .collect();
883
884                                HttpReply {
885                                    status,
886                                    headers: resp_headers,
887                                    body: reply_body,
888                                }
889                            }
890                            Err(e) => {
891                                tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
892                                HttpReply {
893                                    status: 500,
894                                    headers: vec![],
895                                    body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
896                                }
897                            }
898                        };
899
900                        // Reply to Axum handler (ignore error if client disconnected)
901                        let _ = reply_tx.send(reply);
902                    });
903                }
904            }
905        }
906
907        // Deregister this path
908        {
909            let mut table = dispatch.write().await;
910            table.remove(&path);
911        }
912
913        Ok(())
914    }
915
916    async fn stop(&mut self) -> Result<(), CamelError> {
917        Ok(())
918    }
919
920    fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
921        camel_component_api::ConcurrencyModel::Concurrent { max: None }
922    }
923}
924
925// ---------------------------------------------------------------------------
926// HttpComponent / HttpsComponent
927// ---------------------------------------------------------------------------
928
929pub struct HttpComponent {
930    client: reqwest::Client,
931    config: HttpConfig,
932}
933
934fn build_client(config: &HttpConfig) -> reqwest::Client {
935    let mut builder = reqwest::Client::builder()
936        .connect_timeout(Duration::from_millis(config.connect_timeout_ms))
937        .pool_max_idle_per_host(config.pool_max_idle_per_host)
938        .pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
939
940    if !config.follow_redirects {
941        builder = builder.redirect(reqwest::redirect::Policy::none());
942    }
943
944    builder
945        .build()
946        .expect("reqwest::Client::build() with valid config should not fail")
947}
948
949impl HttpComponent {
950    pub fn new() -> Self {
951        let config = HttpConfig::default();
952        let client = build_client(&config);
953        Self { client, config }
954    }
955
956    pub fn with_config(config: HttpConfig) -> Self {
957        let client = build_client(&config);
958        Self { client, config }
959    }
960
961    pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
962        match config {
963            Some(cfg) => Self::with_config(cfg),
964            None => Self::new(),
965        }
966    }
967}
968
969impl Default for HttpComponent {
970    fn default() -> Self {
971        Self::new()
972    }
973}
974
975impl Component for HttpComponent {
976    fn scheme(&self) -> &str {
977        "http"
978    }
979
980    fn create_endpoint(
981        &self,
982        uri: &str,
983        _ctx: &dyn camel_component_api::ComponentContext,
984    ) -> Result<Box<dyn Endpoint>, CamelError> {
985        let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
986        let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
987        Ok(Box::new(HttpEndpoint {
988            uri: uri.to_string(),
989            config,
990            server_config,
991            client: self.client.clone(),
992        }))
993    }
994}
995
996pub struct HttpsComponent {
997    client: reqwest::Client,
998    config: HttpConfig,
999}
1000
1001impl HttpsComponent {
1002    pub fn new() -> Self {
1003        let config = HttpConfig::default();
1004        let client = build_client(&config);
1005        Self { client, config }
1006    }
1007
1008    pub fn with_config(config: HttpConfig) -> Self {
1009        let client = build_client(&config);
1010        Self { client, config }
1011    }
1012
1013    pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1014        match config {
1015            Some(cfg) => Self::with_config(cfg),
1016            None => Self::new(),
1017        }
1018    }
1019}
1020
1021impl Default for HttpsComponent {
1022    fn default() -> Self {
1023        Self::new()
1024    }
1025}
1026
1027impl Component for HttpsComponent {
1028    fn scheme(&self) -> &str {
1029        "https"
1030    }
1031
1032    fn create_endpoint(
1033        &self,
1034        uri: &str,
1035        _ctx: &dyn camel_component_api::ComponentContext,
1036    ) -> Result<Box<dyn Endpoint>, CamelError> {
1037        let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1038        let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1039        Ok(Box::new(HttpEndpoint {
1040            uri: uri.to_string(),
1041            config,
1042            server_config,
1043            client: self.client.clone(),
1044        }))
1045    }
1046}
1047
1048// ---------------------------------------------------------------------------
1049// HttpEndpoint
1050// ---------------------------------------------------------------------------
1051
1052struct HttpEndpoint {
1053    uri: String,
1054    config: HttpEndpointConfig,
1055    server_config: HttpServerConfig,
1056    client: reqwest::Client,
1057}
1058
1059impl Endpoint for HttpEndpoint {
1060    fn uri(&self) -> &str {
1061        &self.uri
1062    }
1063
1064    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1065        Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
1066    }
1067
1068    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1069        Ok(BoxProcessor::new(HttpProducer {
1070            config: Arc::new(self.config.clone()),
1071            client: self.client.clone(),
1072        }))
1073    }
1074}
1075
1076// ---------------------------------------------------------------------------
1077// SSRF Protection
1078// ---------------------------------------------------------------------------
1079
1080fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
1081    let parsed = url::Url::parse(url)
1082        .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1083
1084    // Check blocked hosts
1085    if let Some(host) = parsed.host_str()
1086        && config.blocked_hosts.iter().any(|blocked| host == blocked)
1087    {
1088        return Err(CamelError::ProcessorError(format!(
1089            "Host '{}' is blocked",
1090            host
1091        )));
1092    }
1093
1094    // Check private IPs if not allowed
1095    if !config.allow_private_ips
1096        && let Some(host) = parsed.host()
1097    {
1098        match host {
1099            url::Host::Ipv4(ip) => {
1100                if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
1101                    return Err(CamelError::ProcessorError(format!(
1102                        "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
1103                        ip
1104                    )));
1105                }
1106            }
1107            url::Host::Ipv6(ip) => {
1108                if ip.is_loopback() {
1109                    return Err(CamelError::ProcessorError(format!(
1110                        "Loopback IP '{}' not allowed",
1111                        ip
1112                    )));
1113                }
1114            }
1115            url::Host::Domain(domain) => {
1116                // Block common internal domains
1117                let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
1118                if blocked_domains.contains(&domain) {
1119                    return Err(CamelError::ProcessorError(format!(
1120                        "Domain '{}' is not allowed",
1121                        domain
1122                    )));
1123                }
1124            }
1125        }
1126    }
1127
1128    Ok(())
1129}
1130
1131// ---------------------------------------------------------------------------
1132// HttpProducer
1133// ---------------------------------------------------------------------------
1134
1135#[derive(Clone)]
1136struct HttpProducer {
1137    config: Arc<HttpEndpointConfig>,
1138    client: reqwest::Client,
1139}
1140
1141impl HttpProducer {
1142    fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1143        if let Some(ref method) = config.http_method {
1144            return method.to_uppercase();
1145        }
1146        if let Some(method) = exchange
1147            .input
1148            .header("CamelHttpMethod")
1149            .and_then(|v| v.as_str())
1150        {
1151            return method.to_uppercase();
1152        }
1153        if !exchange.input.body.is_empty() {
1154            return "POST".to_string();
1155        }
1156        "GET".to_string()
1157    }
1158
1159    fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1160        if let Some(uri) = exchange
1161            .input
1162            .header("CamelHttpUri")
1163            .and_then(|v| v.as_str())
1164        {
1165            let mut url = uri.to_string();
1166            if let Some(path) = exchange
1167                .input
1168                .header("CamelHttpPath")
1169                .and_then(|v| v.as_str())
1170            {
1171                if !url.ends_with('/') && !path.starts_with('/') {
1172                    url.push('/');
1173                }
1174                url.push_str(path);
1175            }
1176            if let Some(query) = exchange
1177                .input
1178                .header("CamelHttpQuery")
1179                .and_then(|v| v.as_str())
1180            {
1181                url.push('?');
1182                url.push_str(query);
1183            }
1184            return url;
1185        }
1186
1187        let mut url = config.base_url.clone();
1188
1189        if let Some(path) = exchange
1190            .input
1191            .header("CamelHttpPath")
1192            .and_then(|v| v.as_str())
1193        {
1194            if !url.ends_with('/') && !path.starts_with('/') {
1195                url.push('/');
1196            }
1197            url.push_str(path);
1198        }
1199
1200        if let Some(query) = exchange
1201            .input
1202            .header("CamelHttpQuery")
1203            .and_then(|v| v.as_str())
1204        {
1205            url.push('?');
1206            url.push_str(query);
1207        } else if !config.query_params.is_empty() {
1208            // Forward non-Camel query params from config
1209            url.push('?');
1210            let query_string: String = config
1211                .query_params
1212                .iter()
1213                .map(|(k, v)| format!("{k}={v}"))
1214                .collect::<Vec<_>>()
1215                .join("&");
1216            url.push_str(&query_string);
1217        }
1218
1219        url
1220    }
1221
1222    fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1223        status >= range.0 && status <= range.1
1224    }
1225}
1226
1227impl Service<Exchange> for HttpProducer {
1228    type Response = Exchange;
1229    type Error = CamelError;
1230    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1231
1232    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1233        Poll::Ready(Ok(()))
1234    }
1235
1236    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1237        let config = self.config.clone();
1238        let client = self.client.clone();
1239
1240        Box::pin(async move {
1241            let method_str = HttpProducer::resolve_method(&exchange, &config);
1242            let url = HttpProducer::resolve_url(&exchange, &config);
1243
1244            // SECURITY: Validate URL for SSRF
1245            validate_url_for_ssrf(&url, &config)?;
1246
1247            debug!(
1248                correlation_id = %exchange.correlation_id(),
1249                method = %method_str,
1250                url = %url,
1251                "HTTP request"
1252            );
1253
1254            let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1255                CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1256            })?;
1257
1258            let mut request = client.request(method, &url);
1259
1260            if let Some(timeout) = config.response_timeout {
1261                request = request.timeout(timeout);
1262            }
1263
1264            // Inject W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
1265            #[cfg(feature = "otel")]
1266            {
1267                let mut otel_headers = HashMap::new();
1268                camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1269                for (k, v) in otel_headers {
1270                    if let (Ok(name), Ok(val)) = (
1271                        reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1272                        reqwest::header::HeaderValue::from_str(&v),
1273                    ) {
1274                        request = request.header(name, val);
1275                    }
1276                }
1277            }
1278
1279            for (key, value) in &exchange.input.headers {
1280                if !key.starts_with("Camel")
1281                    && let Some(val_str) = value.as_str()
1282                    && let (Ok(name), Ok(val)) = (
1283                        reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1284                        reqwest::header::HeaderValue::from_str(val_str),
1285                    )
1286                {
1287                    request = request.header(name, val);
1288                }
1289            }
1290
1291            match exchange.input.body {
1292                Body::Stream(ref s) => {
1293                    let mut stream_lock = s.stream.lock().await;
1294                    if let Some(stream) = stream_lock.take() {
1295                        request = request.body(reqwest::Body::wrap_stream(stream));
1296                    } else {
1297                        return Err(CamelError::AlreadyConsumed);
1298                    }
1299                }
1300                _ => {
1301                    // For other types, materialize with configured limit
1302                    let body = std::mem::take(&mut exchange.input.body);
1303                    let bytes = body.into_bytes(config.max_body_size).await?;
1304                    if !bytes.is_empty() {
1305                        request = request.body(bytes);
1306                    }
1307                }
1308            }
1309
1310            let response = request
1311                .send()
1312                .await
1313                .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1314
1315            let status_code = response.status().as_u16();
1316            let status_text = response
1317                .status()
1318                .canonical_reason()
1319                .unwrap_or("Unknown")
1320                .to_string();
1321
1322            for (key, value) in response.headers() {
1323                if let Ok(val_str) = value.to_str() {
1324                    exchange
1325                        .input
1326                        .set_header(key.as_str(), serde_json::Value::String(val_str.to_string()));
1327                }
1328            }
1329
1330            exchange.input.set_header(
1331                "CamelHttpResponseCode",
1332                serde_json::Value::Number(status_code.into()),
1333            );
1334            exchange.input.set_header(
1335                "CamelHttpResponseText",
1336                serde_json::Value::String(status_text.clone()),
1337            );
1338
1339            let response_body = response.bytes().await.map_err(|e| {
1340                CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1341            })?;
1342
1343            if config.throw_exception_on_failure
1344                && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1345            {
1346                return Err(CamelError::HttpOperationFailed {
1347                    method: method_str,
1348                    url,
1349                    status_code,
1350                    status_text,
1351                    response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1352                });
1353            }
1354
1355            if !response_body.is_empty() {
1356                exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1357            }
1358
1359            debug!(
1360                correlation_id = %exchange.correlation_id(),
1361                status = status_code,
1362                url = %url,
1363                "HTTP response"
1364            );
1365            Ok(exchange)
1366        })
1367    }
1368}
1369
1370#[cfg(test)]
1371mod tests {
1372    use super::*;
1373    use camel_component_api::{Message, NoOpComponentContext};
1374    use std::sync::Arc;
1375    use std::time::Duration;
1376
1377    fn test_producer_ctx() -> ProducerContext {
1378        ProducerContext::new()
1379    }
1380
1381    #[test]
1382    fn test_http_config_defaults() {
1383        let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1384        assert_eq!(config.base_url, "http://localhost:8080/api");
1385        assert!(config.http_method.is_none());
1386        assert!(config.throw_exception_on_failure);
1387        assert_eq!(config.ok_status_code_range, (200, 299));
1388        assert!(config.response_timeout.is_none());
1389    }
1390
1391    #[test]
1392    fn test_http_config_scheme() {
1393        // UriConfig trait method returns "http" as primary scheme
1394        assert_eq!(HttpEndpointConfig::scheme(), "http");
1395    }
1396
1397    #[test]
1398    fn test_http_config_from_components() {
1399        // Test from_components directly (trait method)
1400        let components = camel_component_api::UriComponents {
1401            scheme: "https".to_string(),
1402            path: "//api.example.com/v1".to_string(),
1403            params: std::collections::HashMap::from([(
1404                "httpMethod".to_string(),
1405                "POST".to_string(),
1406            )]),
1407        };
1408        let config = HttpEndpointConfig::from_components(components).unwrap();
1409        assert_eq!(config.base_url, "https://api.example.com/v1");
1410        assert_eq!(config.http_method, Some("POST".to_string()));
1411    }
1412
1413    #[test]
1414    fn test_http_config_with_options() {
1415        let config = HttpEndpointConfig::from_uri(
1416            "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1417        ).unwrap();
1418        assert_eq!(config.base_url, "https://api.example.com/v1");
1419        assert_eq!(config.http_method, Some("PUT".to_string()));
1420        assert!(!config.throw_exception_on_failure);
1421        assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1422    }
1423
1424    #[test]
1425    fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
1426        let config = HttpConfig::default()
1427            .with_response_timeout_ms(999)
1428            .with_allow_private_ips(true)
1429            .with_blocked_hosts(vec!["evil.com".to_string()])
1430            .with_max_body_size(12345);
1431        let endpoint =
1432            HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
1433        assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
1434        assert!(endpoint.allow_private_ips);
1435        assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
1436        assert_eq!(endpoint.max_body_size, 12345);
1437    }
1438
1439    #[test]
1440    fn test_from_uri_with_defaults_uri_overrides_config() {
1441        let config = HttpConfig::default()
1442            .with_response_timeout_ms(999)
1443            .with_allow_private_ips(true)
1444            .with_blocked_hosts(vec!["evil.com".to_string()])
1445            .with_max_body_size(12345);
1446        let endpoint = HttpEndpointConfig::from_uri_with_defaults(
1447            "http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
1448            &config,
1449        )
1450        .unwrap();
1451        assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
1452        assert!(!endpoint.allow_private_ips);
1453        assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
1454        assert_eq!(endpoint.max_body_size, 99);
1455    }
1456
1457    #[test]
1458    fn test_http_config_ok_status_range() {
1459        let config =
1460            HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1461        assert_eq!(config.ok_status_code_range, (200, 204));
1462    }
1463
1464    #[test]
1465    fn test_http_config_wrong_scheme() {
1466        let result = HttpEndpointConfig::from_uri("file:/tmp");
1467        assert!(result.is_err());
1468    }
1469
1470    #[test]
1471    fn test_http_component_scheme() {
1472        let component = HttpComponent::new();
1473        assert_eq!(component.scheme(), "http");
1474    }
1475
1476    #[test]
1477    fn test_https_component_scheme() {
1478        let component = HttpsComponent::new();
1479        assert_eq!(component.scheme(), "https");
1480    }
1481
1482    #[test]
1483    fn test_http_endpoint_creates_consumer() {
1484        let component = HttpComponent::new();
1485        let ctx = NoOpComponentContext;
1486        let endpoint = component
1487            .create_endpoint("http://0.0.0.0:19100/test", &ctx)
1488            .unwrap();
1489        assert!(endpoint.create_consumer().is_ok());
1490    }
1491
1492    #[test]
1493    fn test_https_endpoint_creates_consumer() {
1494        let component = HttpsComponent::new();
1495        let ctx = NoOpComponentContext;
1496        let endpoint = component
1497            .create_endpoint("https://0.0.0.0:8443/test", &ctx)
1498            .unwrap();
1499        assert!(endpoint.create_consumer().is_ok());
1500    }
1501
1502    #[test]
1503    fn test_http_endpoint_creates_producer() {
1504        let ctx = test_producer_ctx();
1505        let component = HttpComponent::new();
1506        let endpoint_ctx = NoOpComponentContext;
1507        let endpoint = component
1508            .create_endpoint("http://localhost/api", &endpoint_ctx)
1509            .unwrap();
1510        assert!(endpoint.create_producer(&ctx).is_ok());
1511    }
1512
1513    // -----------------------------------------------------------------------
1514    // Producer tests
1515    // -----------------------------------------------------------------------
1516
1517    async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
1518        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1519        let addr = listener.local_addr().unwrap();
1520        let url = format!("http://127.0.0.1:{}", addr.port());
1521
1522        let handle = tokio::spawn(async move {
1523            loop {
1524                if let Ok((mut stream, _)) = listener.accept().await {
1525                    tokio::spawn(async move {
1526                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1527                        let mut buf = vec![0u8; 4096];
1528                        let n = stream.read(&mut buf).await.unwrap_or(0);
1529                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
1530
1531                        let method = request.split_whitespace().next().unwrap_or("GET");
1532
1533                        let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
1534                        let response = format!(
1535                            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
1536                            body.len(),
1537                            body
1538                        );
1539                        let _ = stream.write_all(response.as_bytes()).await;
1540                    });
1541                }
1542            }
1543        });
1544
1545        (url, handle)
1546    }
1547
1548    async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
1549        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1550        let addr = listener.local_addr().unwrap();
1551        let url = format!("http://127.0.0.1:{}", addr.port());
1552
1553        let handle = tokio::spawn(async move {
1554            loop {
1555                if let Ok((mut stream, _)) = listener.accept().await {
1556                    let status = status;
1557                    tokio::spawn(async move {
1558                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1559                        let mut buf = vec![0u8; 4096];
1560                        let _ = stream.read(&mut buf).await;
1561
1562                        let status_text = match status {
1563                            404 => "Not Found",
1564                            500 => "Internal Server Error",
1565                            _ => "Error",
1566                        };
1567                        let body = "error body";
1568                        let response = format!(
1569                            "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
1570                            status,
1571                            status_text,
1572                            body.len(),
1573                            body
1574                        );
1575                        let _ = stream.write_all(response.as_bytes()).await;
1576                    });
1577                }
1578            }
1579        });
1580
1581        (url, handle)
1582    }
1583
1584    #[tokio::test]
1585    async fn test_http_producer_get_request() {
1586        use tower::ServiceExt;
1587
1588        let (url, _handle) = start_test_server().await;
1589        let ctx = test_producer_ctx();
1590
1591        let component = HttpComponent::new();
1592        let endpoint_ctx = NoOpComponentContext;
1593        let endpoint = component
1594            .create_endpoint(
1595                &format!("{url}/api/test?allowPrivateIps=true"),
1596                &endpoint_ctx,
1597            )
1598            .unwrap();
1599        let producer = endpoint.create_producer(&ctx).unwrap();
1600
1601        let exchange = Exchange::new(Message::default());
1602        let result = producer.oneshot(exchange).await.unwrap();
1603
1604        let status = result
1605            .input
1606            .header("CamelHttpResponseCode")
1607            .and_then(|v| v.as_u64())
1608            .unwrap();
1609        assert_eq!(status, 200);
1610
1611        assert!(!result.input.body.is_empty());
1612    }
1613
1614    #[tokio::test]
1615    async fn test_http_producer_post_with_body() {
1616        use tower::ServiceExt;
1617
1618        let (url, _handle) = start_test_server().await;
1619        let ctx = test_producer_ctx();
1620
1621        let component = HttpComponent::new();
1622        let endpoint_ctx = NoOpComponentContext;
1623        let endpoint = component
1624            .create_endpoint(
1625                &format!("{url}/api/data?allowPrivateIps=true"),
1626                &endpoint_ctx,
1627            )
1628            .unwrap();
1629        let producer = endpoint.create_producer(&ctx).unwrap();
1630
1631        let exchange = Exchange::new(Message::new("request body"));
1632        let result = producer.oneshot(exchange).await.unwrap();
1633
1634        let status = result
1635            .input
1636            .header("CamelHttpResponseCode")
1637            .and_then(|v| v.as_u64())
1638            .unwrap();
1639        assert_eq!(status, 200);
1640    }
1641
1642    #[tokio::test]
1643    async fn test_http_producer_method_from_header() {
1644        use tower::ServiceExt;
1645
1646        let (url, _handle) = start_test_server().await;
1647        let ctx = test_producer_ctx();
1648
1649        let component = HttpComponent::new();
1650        let endpoint_ctx = NoOpComponentContext;
1651        let endpoint = component
1652            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
1653            .unwrap();
1654        let producer = endpoint.create_producer(&ctx).unwrap();
1655
1656        let mut exchange = Exchange::new(Message::default());
1657        exchange.input.set_header(
1658            "CamelHttpMethod",
1659            serde_json::Value::String("DELETE".to_string()),
1660        );
1661
1662        let result = producer.oneshot(exchange).await.unwrap();
1663        let status = result
1664            .input
1665            .header("CamelHttpResponseCode")
1666            .and_then(|v| v.as_u64())
1667            .unwrap();
1668        assert_eq!(status, 200);
1669    }
1670
1671    #[tokio::test]
1672    async fn test_http_producer_forced_method() {
1673        use tower::ServiceExt;
1674
1675        let (url, _handle) = start_test_server().await;
1676        let ctx = test_producer_ctx();
1677
1678        let component = HttpComponent::new();
1679        let endpoint_ctx = NoOpComponentContext;
1680        let endpoint = component
1681            .create_endpoint(
1682                &format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"),
1683                &endpoint_ctx,
1684            )
1685            .unwrap();
1686        let producer = endpoint.create_producer(&ctx).unwrap();
1687
1688        let exchange = Exchange::new(Message::default());
1689        let result = producer.oneshot(exchange).await.unwrap();
1690
1691        let status = result
1692            .input
1693            .header("CamelHttpResponseCode")
1694            .and_then(|v| v.as_u64())
1695            .unwrap();
1696        assert_eq!(status, 200);
1697    }
1698
1699    #[tokio::test]
1700    async fn test_http_producer_throw_exception_on_failure() {
1701        use tower::ServiceExt;
1702
1703        let (url, _handle) = start_status_server(404).await;
1704        let ctx = test_producer_ctx();
1705
1706        let component = HttpComponent::new();
1707        let endpoint_ctx = NoOpComponentContext;
1708        let endpoint = component
1709            .create_endpoint(
1710                &format!("{url}/not-found?allowPrivateIps=true"),
1711                &endpoint_ctx,
1712            )
1713            .unwrap();
1714        let producer = endpoint.create_producer(&ctx).unwrap();
1715
1716        let exchange = Exchange::new(Message::default());
1717        let result = producer.oneshot(exchange).await;
1718        assert!(result.is_err());
1719
1720        match result.unwrap_err() {
1721            CamelError::HttpOperationFailed { status_code, .. } => {
1722                assert_eq!(status_code, 404);
1723            }
1724            e => panic!("Expected HttpOperationFailed, got: {e}"),
1725        }
1726    }
1727
1728    #[tokio::test]
1729    async fn test_http_producer_no_throw_on_failure() {
1730        use tower::ServiceExt;
1731
1732        let (url, _handle) = start_status_server(500).await;
1733        let ctx = test_producer_ctx();
1734
1735        let component = HttpComponent::new();
1736        let endpoint_ctx = NoOpComponentContext;
1737        let endpoint = component
1738            .create_endpoint(
1739                &format!("{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"),
1740                &endpoint_ctx,
1741            )
1742            .unwrap();
1743        let producer = endpoint.create_producer(&ctx).unwrap();
1744
1745        let exchange = Exchange::new(Message::default());
1746        let result = producer.oneshot(exchange).await.unwrap();
1747
1748        let status = result
1749            .input
1750            .header("CamelHttpResponseCode")
1751            .and_then(|v| v.as_u64())
1752            .unwrap();
1753        assert_eq!(status, 500);
1754    }
1755
1756    #[tokio::test]
1757    async fn test_http_producer_uri_override() {
1758        use tower::ServiceExt;
1759
1760        let (url, _handle) = start_test_server().await;
1761        let ctx = test_producer_ctx();
1762
1763        let component = HttpComponent::new();
1764        let endpoint_ctx = NoOpComponentContext;
1765        let endpoint = component
1766            .create_endpoint(
1767                "http://localhost:1/does-not-exist?allowPrivateIps=true",
1768                &endpoint_ctx,
1769            )
1770            .unwrap();
1771        let producer = endpoint.create_producer(&ctx).unwrap();
1772
1773        let mut exchange = Exchange::new(Message::default());
1774        exchange.input.set_header(
1775            "CamelHttpUri",
1776            serde_json::Value::String(format!("{url}/api")),
1777        );
1778
1779        let result = producer.oneshot(exchange).await.unwrap();
1780        let status = result
1781            .input
1782            .header("CamelHttpResponseCode")
1783            .and_then(|v| v.as_u64())
1784            .unwrap();
1785        assert_eq!(status, 200);
1786    }
1787
1788    #[tokio::test]
1789    async fn test_http_producer_response_headers_mapped() {
1790        use tower::ServiceExt;
1791
1792        let (url, _handle) = start_test_server().await;
1793        let ctx = test_producer_ctx();
1794
1795        let component = HttpComponent::new();
1796        let endpoint_ctx = NoOpComponentContext;
1797        let endpoint = component
1798            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
1799            .unwrap();
1800        let producer = endpoint.create_producer(&ctx).unwrap();
1801
1802        let exchange = Exchange::new(Message::default());
1803        let result = producer.oneshot(exchange).await.unwrap();
1804
1805        assert!(
1806            result.input.header("content-type").is_some()
1807                || result.input.header("Content-Type").is_some()
1808        );
1809        assert!(result.input.header("CamelHttpResponseText").is_some());
1810    }
1811
1812    // -----------------------------------------------------------------------
1813    // Bug fix tests: Client configuration per-endpoint
1814    // -----------------------------------------------------------------------
1815
1816    async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
1817        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1818        let addr = listener.local_addr().unwrap();
1819        let url = format!("http://127.0.0.1:{}", addr.port());
1820
1821        let handle = tokio::spawn(async move {
1822            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1823            loop {
1824                if let Ok((mut stream, _)) = listener.accept().await {
1825                    tokio::spawn(async move {
1826                        let mut buf = vec![0u8; 4096];
1827                        let n = stream.read(&mut buf).await.unwrap_or(0);
1828                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
1829
1830                        // Check if this is a request to /final
1831                        if request.contains("GET /final") {
1832                            let body = r#"{"status":"final"}"#;
1833                            let response = format!(
1834                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1835                                body.len(),
1836                                body
1837                            );
1838                            let _ = stream.write_all(response.as_bytes()).await;
1839                        } else {
1840                            // Redirect to /final
1841                            let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
1842                            let _ = stream.write_all(response.as_bytes()).await;
1843                        }
1844                    });
1845                }
1846            }
1847        });
1848
1849        (url, handle)
1850    }
1851
1852    #[tokio::test]
1853    async fn test_follow_redirects_false_does_not_follow() {
1854        use tower::ServiceExt;
1855
1856        let (url, _handle) = start_redirect_server().await;
1857        let ctx = test_producer_ctx();
1858
1859        let component =
1860            HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
1861        let endpoint_ctx = NoOpComponentContext;
1862        let endpoint = component
1863            .create_endpoint(
1864                &format!("{url}?throwExceptionOnFailure=false&allowPrivateIps=true"),
1865                &endpoint_ctx,
1866            )
1867            .unwrap();
1868        let producer = endpoint.create_producer(&ctx).unwrap();
1869
1870        let exchange = Exchange::new(Message::default());
1871        let result = producer.oneshot(exchange).await.unwrap();
1872
1873        // Should get 302, NOT follow redirect to 200
1874        let status = result
1875            .input
1876            .header("CamelHttpResponseCode")
1877            .and_then(|v| v.as_u64())
1878            .unwrap();
1879        assert_eq!(
1880            status, 302,
1881            "Should NOT follow redirect when followRedirects=false"
1882        );
1883    }
1884
1885    #[tokio::test]
1886    async fn test_follow_redirects_true_follows_redirect() {
1887        use tower::ServiceExt;
1888
1889        let (url, _handle) = start_redirect_server().await;
1890        let ctx = test_producer_ctx();
1891
1892        let component =
1893            HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
1894        let endpoint_ctx = NoOpComponentContext;
1895        let endpoint = component
1896            .create_endpoint(&format!("{url}?allowPrivateIps=true"), &endpoint_ctx)
1897            .unwrap();
1898        let producer = endpoint.create_producer(&ctx).unwrap();
1899
1900        let exchange = Exchange::new(Message::default());
1901        let result = producer.oneshot(exchange).await.unwrap();
1902
1903        // Should follow redirect and get 200
1904        let status = result
1905            .input
1906            .header("CamelHttpResponseCode")
1907            .and_then(|v| v.as_u64())
1908            .unwrap();
1909        assert_eq!(
1910            status, 200,
1911            "Should follow redirect when followRedirects=true"
1912        );
1913    }
1914
1915    #[tokio::test]
1916    async fn test_query_params_forwarded_to_http_request() {
1917        use tower::ServiceExt;
1918
1919        let (url, _handle) = start_test_server().await;
1920        let ctx = test_producer_ctx();
1921
1922        let component = HttpComponent::new();
1923        let endpoint_ctx = NoOpComponentContext;
1924        // apiKey is NOT a Camel option, should be forwarded as query param
1925        let endpoint = component
1926            .create_endpoint(
1927                &format!("{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"),
1928                &endpoint_ctx,
1929            )
1930            .unwrap();
1931        let producer = endpoint.create_producer(&ctx).unwrap();
1932
1933        let exchange = Exchange::new(Message::default());
1934        let result = producer.oneshot(exchange).await.unwrap();
1935
1936        // The test server returns the request info in response
1937        // We just verify it succeeds (the query param was sent)
1938        let status = result
1939            .input
1940            .header("CamelHttpResponseCode")
1941            .and_then(|v| v.as_u64())
1942            .unwrap();
1943        assert_eq!(status, 200);
1944    }
1945
1946    #[tokio::test]
1947    async fn test_non_camel_query_params_are_forwarded() {
1948        // This test verifies Bug #3 fix: non-Camel options should be forwarded
1949        // We'll test the config parsing, not the actual HTTP call
1950        let config = HttpEndpointConfig::from_uri(
1951            "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
1952        )
1953        .unwrap();
1954
1955        // apiKey and token are NOT Camel options, should be forwarded
1956        assert!(
1957            config.query_params.contains_key("apiKey"),
1958            "apiKey should be preserved"
1959        );
1960        assert!(
1961            config.query_params.contains_key("token"),
1962            "token should be preserved"
1963        );
1964        assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
1965        assert_eq!(config.query_params.get("token").unwrap(), "abc456");
1966
1967        // httpMethod IS a Camel option, should NOT be in query_params
1968        assert!(
1969            !config.query_params.contains_key("httpMethod"),
1970            "httpMethod should not be forwarded"
1971        );
1972    }
1973
1974    // -----------------------------------------------------------------------
1975    // SSRF Protection tests
1976    // -----------------------------------------------------------------------
1977
1978    #[tokio::test]
1979    async fn test_http_producer_blocks_metadata_endpoint() {
1980        use tower::ServiceExt;
1981
1982        let ctx = test_producer_ctx();
1983        let component = HttpComponent::new();
1984        let endpoint_ctx = NoOpComponentContext;
1985        let endpoint = component
1986            .create_endpoint(
1987                "http://example.com/api?allowPrivateIps=false",
1988                &endpoint_ctx,
1989            )
1990            .unwrap();
1991        let producer = endpoint.create_producer(&ctx).unwrap();
1992
1993        let mut exchange = Exchange::new(Message::default());
1994        exchange.input.set_header(
1995            "CamelHttpUri",
1996            serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
1997        );
1998
1999        let result = producer.oneshot(exchange).await;
2000        assert!(result.is_err(), "Should block AWS metadata endpoint");
2001
2002        let err = result.unwrap_err();
2003        assert!(
2004            err.to_string().contains("Private IP"),
2005            "Error should mention private IP blocking, got: {}",
2006            err
2007        );
2008    }
2009
2010    #[test]
2011    fn test_ssrf_config_defaults() {
2012        let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
2013        assert!(
2014            !config.allow_private_ips,
2015            "Private IPs should be blocked by default"
2016        );
2017        assert!(
2018            config.blocked_hosts.is_empty(),
2019            "Blocked hosts should be empty by default"
2020        );
2021    }
2022
2023    #[test]
2024    fn test_ssrf_config_allow_private_ips() {
2025        let config =
2026            HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
2027        assert!(
2028            config.allow_private_ips,
2029            "Private IPs should be allowed when explicitly set"
2030        );
2031    }
2032
2033    #[test]
2034    fn test_ssrf_config_blocked_hosts() {
2035        let config = HttpEndpointConfig::from_uri(
2036            "http://example.com/api?blockedHosts=evil.com,malware.net",
2037        )
2038        .unwrap();
2039        assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
2040    }
2041
2042    #[tokio::test]
2043    async fn test_http_producer_blocks_localhost() {
2044        use tower::ServiceExt;
2045
2046        let ctx = test_producer_ctx();
2047        let component = HttpComponent::new();
2048        let endpoint_ctx = NoOpComponentContext;
2049        let endpoint = component
2050            .create_endpoint("http://example.com/api", &endpoint_ctx)
2051            .unwrap();
2052        let producer = endpoint.create_producer(&ctx).unwrap();
2053
2054        let mut exchange = Exchange::new(Message::default());
2055        exchange.input.set_header(
2056            "CamelHttpUri",
2057            serde_json::Value::String("http://localhost:8080/internal".to_string()),
2058        );
2059
2060        let result = producer.oneshot(exchange).await;
2061        assert!(result.is_err(), "Should block localhost");
2062    }
2063
2064    #[tokio::test]
2065    async fn test_http_producer_blocks_loopback_ip() {
2066        use tower::ServiceExt;
2067
2068        let ctx = test_producer_ctx();
2069        let component = HttpComponent::new();
2070        let endpoint_ctx = NoOpComponentContext;
2071        let endpoint = component
2072            .create_endpoint("http://example.com/api", &endpoint_ctx)
2073            .unwrap();
2074        let producer = endpoint.create_producer(&ctx).unwrap();
2075
2076        let mut exchange = Exchange::new(Message::default());
2077        exchange.input.set_header(
2078            "CamelHttpUri",
2079            serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
2080        );
2081
2082        let result = producer.oneshot(exchange).await;
2083        assert!(result.is_err(), "Should block loopback IP");
2084    }
2085
2086    #[tokio::test]
2087    async fn test_http_producer_allows_private_ip_when_enabled() {
2088        use tower::ServiceExt;
2089
2090        let ctx = test_producer_ctx();
2091        let component = HttpComponent::new();
2092        let endpoint_ctx = NoOpComponentContext;
2093        // With allowPrivateIps=true, the validation should pass
2094        // (actual connection will fail, but that's expected)
2095        let endpoint = component
2096            .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true", &endpoint_ctx)
2097            .unwrap();
2098        let producer = endpoint.create_producer(&ctx).unwrap();
2099
2100        let exchange = Exchange::new(Message::default());
2101
2102        // The request will fail because we can't connect, but it should NOT fail
2103        // due to SSRF protection
2104        let result = producer.oneshot(exchange).await;
2105        // We expect connection error, not SSRF error
2106        if let Err(ref e) = result {
2107            let err_str = e.to_string();
2108            assert!(
2109                !err_str.contains("Private IP") && !err_str.contains("not allowed"),
2110                "Should not be SSRF error, got: {}",
2111                err_str
2112            );
2113        }
2114    }
2115
2116    // -----------------------------------------------------------------------
2117    // HttpServerConfig tests
2118    // -----------------------------------------------------------------------
2119
2120    #[test]
2121    fn test_http_server_config_parse() {
2122        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
2123        assert_eq!(cfg.host, "0.0.0.0");
2124        assert_eq!(cfg.port, 8080);
2125        assert_eq!(cfg.path, "/orders");
2126        assert_eq!(cfg.max_inflight_requests, 1024);
2127    }
2128
2129    #[test]
2130    fn test_http_server_config_scheme() {
2131        // UriConfig trait method returns "http" as primary scheme
2132        assert_eq!(HttpServerConfig::scheme(), "http");
2133    }
2134
2135    #[test]
2136    fn test_http_server_config_from_components() {
2137        // Test from_components directly (trait method)
2138        let components = camel_component_api::UriComponents {
2139            scheme: "https".to_string(),
2140            path: "//0.0.0.0:8443/api".to_string(),
2141            params: std::collections::HashMap::from([
2142                ("maxRequestBody".to_string(), "5242880".to_string()),
2143                ("maxInflightRequests".to_string(), "7".to_string()),
2144            ]),
2145        };
2146        let cfg = HttpServerConfig::from_components(components).unwrap();
2147        assert_eq!(cfg.host, "0.0.0.0");
2148        assert_eq!(cfg.port, 8443);
2149        assert_eq!(cfg.path, "/api");
2150        assert_eq!(cfg.max_request_body, 5242880);
2151        assert_eq!(cfg.max_inflight_requests, 7);
2152    }
2153
2154    #[test]
2155    fn test_http_server_config_default_path() {
2156        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
2157        assert_eq!(cfg.path, "/");
2158    }
2159
2160    #[test]
2161    fn test_http_server_config_wrong_scheme() {
2162        assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
2163    }
2164
2165    #[test]
2166    fn test_http_server_config_invalid_port() {
2167        assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
2168    }
2169
2170    #[test]
2171    fn test_http_server_config_default_port_by_scheme() {
2172        // HTTP without explicit port should default to 80
2173        let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
2174        assert_eq!(cfg_http.port, 80);
2175
2176        // HTTPS without explicit port should default to 443
2177        let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
2178        assert_eq!(cfg_https.port, 443);
2179    }
2180
2181    #[test]
2182    fn test_request_envelope_and_reply_are_send() {
2183        fn assert_send<T: Send>() {}
2184        assert_send::<RequestEnvelope>();
2185        assert_send::<HttpReply>();
2186    }
2187
2188    // -----------------------------------------------------------------------
2189    // ServerRegistry tests
2190    // -----------------------------------------------------------------------
2191
2192    #[test]
2193    fn test_server_registry_global_is_singleton() {
2194        let r1 = ServerRegistry::global();
2195        let r2 = ServerRegistry::global();
2196        assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
2197    }
2198
2199    #[tokio::test]
2200    async fn test_concurrent_get_or_spawn_returns_same_dispatch() {
2201        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2202        let port = listener.local_addr().unwrap().port();
2203        drop(listener);
2204
2205        let results: Arc<std::sync::Mutex<Vec<DispatchTable>>> =
2206            Arc::new(std::sync::Mutex::new(Vec::new()));
2207
2208        let mut handles = Vec::new();
2209        for _ in 0..4 {
2210            let results = results.clone();
2211            handles.push(tokio::spawn(async move {
2212                let dispatch = ServerRegistry::global()
2213                    .get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2214                    .await
2215                    .unwrap();
2216                results.lock().unwrap().push(dispatch);
2217            }));
2218        }
2219
2220        for h in handles {
2221            h.await.unwrap();
2222        }
2223
2224        let dispatches = results.lock().unwrap();
2225        assert_eq!(dispatches.len(), 4);
2226        for i in 1..dispatches.len() {
2227            assert!(
2228                Arc::ptr_eq(&dispatches[0], &dispatches[i]),
2229                "all concurrent callers should get the same dispatch table"
2230            );
2231        }
2232    }
2233
2234    #[test]
2235    fn test_server_registry_distinguishes_host_and_port() {
2236        let rt = tokio::runtime::Runtime::new().expect("runtime");
2237        rt.block_on(async {
2238            let registry = ServerRegistry::global();
2239            // Use two distinct host values with same configured port key.
2240            // Port 0 is acceptable here because the registry key uses the configured
2241            // tuple, not the OS-assigned ephemeral port.
2242            let d1 = registry
2243                .get_or_spawn("127.0.0.1", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2244                .await;
2245            let d2 = registry
2246                .get_or_spawn("0.0.0.0", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2247                .await;
2248            assert!(d1.is_ok());
2249            assert!(d2.is_ok());
2250            assert!(!Arc::ptr_eq(&d1.unwrap(), &d2.unwrap()));
2251        });
2252    }
2253
2254    #[tokio::test]
2255    async fn test_shared_server_max_request_body_policy_is_deterministic() {
2256        let registry = ServerRegistry::global();
2257        // First registration: maxRequestBody = 1 MB
2258        let d1 = registry
2259            .get_or_spawn("127.0.0.1", 9991, 1024 * 1024, 10 * 1024 * 1024, 1024)
2260            .await;
2261        assert!(d1.is_ok());
2262
2263        // Second registration on same (host,port): maxRequestBody = 2 MB
2264        // Expected: explicit EndpointCreationFailed about incompatible maxRequestBody
2265        let d2 = registry
2266            .get_or_spawn("127.0.0.1", 9991, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2267            .await;
2268        assert!(d2.is_err());
2269        let err = d2.unwrap_err();
2270        assert!(
2271            err.to_string().contains("maxRequestBody") || err.to_string().contains("incompatible"),
2272            "Expected incompatible maxRequestBody error, got: {}",
2273            err
2274        );
2275    }
2276
2277    // -----------------------------------------------------------------------
2278    // Axum dispatch handler tests
2279    // -----------------------------------------------------------------------
2280
2281    #[tokio::test]
2282    async fn test_dispatch_handler_returns_404_for_unknown_path() {
2283        let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
2284        // Nothing registered in the dispatch table
2285        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2286        let port = listener.local_addr().unwrap().port();
2287        tokio::spawn(run_axum_server(
2288            listener,
2289            dispatch,
2290            2 * 1024 * 1024,
2291            10 * 1024 * 1024,
2292            Arc::new(tokio::sync::Semaphore::new(1024)),
2293        ));
2294
2295        // Wait for server to start
2296        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2297
2298        let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
2299            .await
2300            .unwrap();
2301        assert_eq!(resp.status().as_u16(), 404);
2302    }
2303
2304    // -----------------------------------------------------------------------
2305    // HttpConsumer tests
2306    // -----------------------------------------------------------------------
2307
2308    #[tokio::test]
2309    async fn test_http_consumer_start_registers_path() {
2310        use camel_component_api::ConsumerContext;
2311
2312        // Get an OS-assigned free port
2313        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2314        let port = listener.local_addr().unwrap().port();
2315        drop(listener); // Release port — ServerRegistry will rebind it
2316
2317        let consumer_cfg = HttpServerConfig {
2318            host: "127.0.0.1".to_string(),
2319            port,
2320            path: "/ping".to_string(),
2321            max_request_body: 2 * 1024 * 1024,
2322            max_response_body: 10 * 1024 * 1024,
2323            max_inflight_requests: 1024,
2324        };
2325        let mut consumer = HttpConsumer::new(consumer_cfg);
2326
2327        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
2328        let token = tokio_util::sync::CancellationToken::new();
2329        let ctx = ConsumerContext::new(tx, token.clone());
2330
2331        tokio::spawn(async move {
2332            consumer.start(ctx).await.unwrap();
2333        });
2334
2335        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2336
2337        let client = reqwest::Client::new();
2338        let resp_future = client
2339            .post(format!("http://127.0.0.1:{port}/ping"))
2340            .body("hello world")
2341            .send();
2342
2343        let (http_result, _) = tokio::join!(resp_future, async {
2344            if let Some(mut envelope) = rx.recv().await {
2345                // Set a custom status code
2346                envelope.exchange.input.set_header(
2347                    "CamelHttpResponseCode",
2348                    serde_json::Value::Number(201.into()),
2349                );
2350                if let Some(reply_tx) = envelope.reply_tx {
2351                    let _ = reply_tx.send(Ok(envelope.exchange));
2352                }
2353            }
2354        });
2355
2356        let resp = http_result.unwrap();
2357        assert_eq!(resp.status().as_u16(), 201);
2358
2359        token.cancel();
2360    }
2361
2362    #[tokio::test]
2363    async fn test_http_consumer_returns_503_when_inflight_limit_reached() {
2364        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2365
2366        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2367        let port = listener.local_addr().unwrap().port();
2368        drop(listener);
2369
2370        let consumer_cfg = HttpServerConfig {
2371            host: "127.0.0.1".to_string(),
2372            port,
2373            path: "/saturation".to_string(),
2374            max_request_body: 2 * 1024 * 1024,
2375            max_response_body: 10 * 1024 * 1024,
2376            max_inflight_requests: 1,
2377        };
2378        let mut consumer = HttpConsumer::new(consumer_cfg);
2379
2380        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2381        let token = tokio_util::sync::CancellationToken::new();
2382        let ctx = ConsumerContext::new(tx, token.clone());
2383        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2384        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2385
2386        let (first_seen_tx, first_seen_rx) = tokio::sync::oneshot::channel::<()>();
2387        let (unblock_first_tx, unblock_first_rx) = tokio::sync::oneshot::channel::<()>();
2388
2389        tokio::spawn(async move {
2390            let mut first_seen_tx = Some(first_seen_tx);
2391            let mut unblock_first_rx = Some(unblock_first_rx);
2392
2393            while let Some(envelope) = rx.recv().await {
2394                if let Some(tx) = first_seen_tx.take() {
2395                    let _ = tx.send(());
2396                    if let Some(rx_unblock) = unblock_first_rx.take() {
2397                        let _ = rx_unblock.await;
2398                    }
2399                }
2400
2401                if let Some(reply_tx) = envelope.reply_tx {
2402                    let _ = reply_tx.send(Ok(envelope.exchange));
2403                }
2404            }
2405        });
2406
2407        let client = reqwest::Client::new();
2408        let first_req = {
2409            let client = client.clone();
2410            async move {
2411                client
2412                    .get(format!("http://127.0.0.1:{port}/saturation"))
2413                    .send()
2414                    .await
2415                    .unwrap()
2416            }
2417        };
2418
2419        let first_handle = tokio::spawn(first_req);
2420        first_seen_rx.await.unwrap();
2421
2422        let second_resp = client
2423            .get(format!("http://127.0.0.1:{port}/saturation"))
2424            .send()
2425            .await
2426            .unwrap();
2427
2428        assert_eq!(second_resp.status().as_u16(), 503);
2429
2430        let _ = unblock_first_tx.send(());
2431        let first_resp = first_handle.await.unwrap();
2432        assert_eq!(first_resp.status().as_u16(), 200);
2433
2434        token.cancel();
2435    }
2436
2437    #[tokio::test]
2438    async fn test_http_consumer_enforces_max_response_body_for_bytes() {
2439        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2440
2441        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2442        let port = listener.local_addr().unwrap().port();
2443        drop(listener);
2444
2445        let consumer_cfg = HttpServerConfig {
2446            host: "127.0.0.1".to_string(),
2447            port,
2448            path: "/limit-bytes".to_string(),
2449            max_request_body: 2 * 1024 * 1024,
2450            max_response_body: 16,
2451            max_inflight_requests: 1024,
2452        };
2453        let mut consumer = HttpConsumer::new(consumer_cfg);
2454
2455        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2456        let token = tokio_util::sync::CancellationToken::new();
2457        let ctx = ConsumerContext::new(tx, token.clone());
2458        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2459        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2460
2461        let client = reqwest::Client::new();
2462        let send_fut = client
2463            .get(format!("http://127.0.0.1:{port}/limit-bytes"))
2464            .send();
2465
2466        let (http_result, _) = tokio::join!(send_fut, async {
2467            if let Some(mut envelope) = rx.recv().await {
2468                envelope.exchange.input.body =
2469                    camel_component_api::Body::Bytes(bytes::Bytes::from(vec![b'x'; 32]));
2470                if let Some(reply_tx) = envelope.reply_tx {
2471                    let _ = reply_tx.send(Ok(envelope.exchange));
2472                }
2473            }
2474        });
2475
2476        let resp = http_result.unwrap();
2477        assert_eq!(resp.status().as_u16(), 500);
2478        let body = resp.text().await.unwrap();
2479        assert_eq!(body, "Response body exceeds configured limit");
2480        token.cancel();
2481    }
2482
2483    #[tokio::test]
2484    async fn test_http_consumer_enforces_max_response_body_for_json() {
2485        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2486
2487        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2488        let port = listener.local_addr().unwrap().port();
2489        drop(listener);
2490
2491        let consumer_cfg = HttpServerConfig {
2492            host: "127.0.0.1".to_string(),
2493            port,
2494            path: "/limit-json".to_string(),
2495            max_request_body: 2 * 1024 * 1024,
2496            max_response_body: 16,
2497            max_inflight_requests: 1024,
2498        };
2499        let mut consumer = HttpConsumer::new(consumer_cfg);
2500
2501        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2502        let token = tokio_util::sync::CancellationToken::new();
2503        let ctx = ConsumerContext::new(tx, token.clone());
2504        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2505        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2506
2507        let client = reqwest::Client::new();
2508        let send_fut = client
2509            .get(format!("http://127.0.0.1:{port}/limit-json"))
2510            .send();
2511
2512        let (http_result, _) = tokio::join!(send_fut, async {
2513            if let Some(mut envelope) = rx.recv().await {
2514                envelope.exchange.input.body = camel_component_api::Body::Json(
2515                    serde_json::json!({"message":"this response is bigger than sixteen"}),
2516                );
2517                if let Some(reply_tx) = envelope.reply_tx {
2518                    let _ = reply_tx.send(Ok(envelope.exchange));
2519                }
2520            }
2521        });
2522
2523        let resp = http_result.unwrap();
2524        assert_eq!(resp.status().as_u16(), 500);
2525        let body = resp.text().await.unwrap();
2526        assert_eq!(body, "Response body exceeds configured limit");
2527        token.cancel();
2528    }
2529
2530    #[tokio::test]
2531    async fn test_http_consumer_enforces_max_response_body_for_xml() {
2532        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2533
2534        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2535        let port = listener.local_addr().unwrap().port();
2536        drop(listener);
2537
2538        let consumer_cfg = HttpServerConfig {
2539            host: "127.0.0.1".to_string(),
2540            port,
2541            path: "/limit-xml".to_string(),
2542            max_request_body: 2 * 1024 * 1024,
2543            max_response_body: 16,
2544            max_inflight_requests: 1024,
2545        };
2546        let mut consumer = HttpConsumer::new(consumer_cfg);
2547
2548        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2549        let token = tokio_util::sync::CancellationToken::new();
2550        let ctx = ConsumerContext::new(tx, token.clone());
2551        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2552        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2553
2554        let client = reqwest::Client::new();
2555        let send_fut = client
2556            .get(format!("http://127.0.0.1:{port}/limit-xml"))
2557            .send();
2558
2559        let (http_result, _) = tokio::join!(send_fut, async {
2560            if let Some(mut envelope) = rx.recv().await {
2561                envelope.exchange.input.body = camel_component_api::Body::Xml(
2562                    "<root><value>way-too-large</value></root>".into(),
2563                );
2564                if let Some(reply_tx) = envelope.reply_tx {
2565                    let _ = reply_tx.send(Ok(envelope.exchange));
2566                }
2567            }
2568        });
2569
2570        let resp = http_result.unwrap();
2571        assert_eq!(resp.status().as_u16(), 500);
2572        let body = resp.text().await.unwrap();
2573        assert_eq!(body, "Response body exceeds configured limit");
2574        token.cancel();
2575    }
2576
2577    #[tokio::test]
2578    async fn test_http_consumer_does_not_enforce_max_response_body_for_stream() {
2579        use camel_component_api::{
2580            CamelError, ConsumerContext, ExchangeEnvelope, StreamBody, StreamMetadata,
2581        };
2582        use futures::stream;
2583
2584        let listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();
2585        let port = listener.local_addr().unwrap().port();
2586        drop(listener);
2587
2588        let consumer_cfg = HttpServerConfig {
2589            host: "0.0.0.0".to_string(),
2590            port,
2591            path: "/limit-stream".to_string(),
2592            max_request_body: 2 * 1024 * 1024,
2593            max_response_body: 16,
2594            max_inflight_requests: 1024,
2595        };
2596        let mut consumer = HttpConsumer::new(consumer_cfg);
2597
2598        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2599        let token = tokio_util::sync::CancellationToken::new();
2600        let ctx = ConsumerContext::new(tx, token.clone());
2601        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2602        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2603
2604        let client = reqwest::Client::new();
2605        let send_fut = client
2606            .get(format!("http://127.0.0.1:{port}/limit-stream"))
2607            .send();
2608
2609        let (http_result, _) = tokio::join!(send_fut, async {
2610            if let Some(mut envelope) = rx.recv().await {
2611                let chunks: Vec<Result<bytes::Bytes, CamelError>> =
2612                    vec![Ok(bytes::Bytes::from(vec![b'x'; 32]))];
2613                let stream = Box::pin(stream::iter(chunks));
2614                envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
2615                    stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
2616                    metadata: StreamMetadata {
2617                        size_hint: Some(32),
2618                        content_type: Some("application/octet-stream".into()),
2619                        origin: None,
2620                    },
2621                });
2622                if let Some(reply_tx) = envelope.reply_tx {
2623                    let _ = reply_tx.send(Ok(envelope.exchange));
2624                }
2625            }
2626        });
2627
2628        let resp = http_result.unwrap();
2629        assert_eq!(resp.status().as_u16(), 200);
2630        let body = resp.bytes().await.unwrap();
2631        assert_eq!(body.len(), 32);
2632        token.cancel();
2633    }
2634
2635    // -----------------------------------------------------------------------
2636    // Integration tests
2637    // -----------------------------------------------------------------------
2638
2639    #[tokio::test]
2640    async fn test_integration_single_consumer_round_trip() {
2641        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2642
2643        // Get an OS-assigned free port (ephemeral)
2644        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2645        let port = listener.local_addr().unwrap().port();
2646        drop(listener); // Release — ServerRegistry will rebind
2647
2648        let component = HttpComponent::new();
2649        let endpoint_ctx = NoOpComponentContext;
2650        let endpoint = component
2651            .create_endpoint(&format!("http://127.0.0.1:{port}/echo"), &endpoint_ctx)
2652            .unwrap();
2653        let mut consumer = endpoint.create_consumer().unwrap();
2654
2655        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2656        let token = tokio_util::sync::CancellationToken::new();
2657        let ctx = ConsumerContext::new(tx, token.clone());
2658
2659        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2660        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2661
2662        let client = reqwest::Client::new();
2663        let send_fut = client
2664            .post(format!("http://127.0.0.1:{port}/echo"))
2665            .header("Content-Type", "text/plain")
2666            .body("ping")
2667            .send();
2668
2669        let (http_result, _) = tokio::join!(send_fut, async {
2670            if let Some(mut envelope) = rx.recv().await {
2671                assert_eq!(
2672                    envelope.exchange.input.header("CamelHttpMethod"),
2673                    Some(&serde_json::Value::String("POST".into()))
2674                );
2675                assert_eq!(
2676                    envelope.exchange.input.header("CamelHttpPath"),
2677                    Some(&serde_json::Value::String("/echo".into()))
2678                );
2679                envelope.exchange.input.body = camel_component_api::Body::Text("pong".to_string());
2680                if let Some(reply_tx) = envelope.reply_tx {
2681                    let _ = reply_tx.send(Ok(envelope.exchange));
2682                }
2683            }
2684        });
2685
2686        let resp = http_result.unwrap();
2687        assert_eq!(resp.status().as_u16(), 200);
2688        let body = resp.text().await.unwrap();
2689        assert_eq!(body, "pong");
2690
2691        token.cancel();
2692    }
2693
2694    #[tokio::test]
2695    async fn test_integration_two_consumers_shared_port() {
2696        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2697
2698        // Get an OS-assigned free port (ephemeral)
2699        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2700        let port = listener.local_addr().unwrap().port();
2701        drop(listener);
2702
2703        let component = HttpComponent::new();
2704        let endpoint_ctx = NoOpComponentContext;
2705
2706        // Consumer A: /hello
2707        let endpoint_a = component
2708            .create_endpoint(&format!("http://127.0.0.1:{port}/hello"), &endpoint_ctx)
2709            .unwrap();
2710        let mut consumer_a = endpoint_a.create_consumer().unwrap();
2711
2712        // Consumer B: /world
2713        let endpoint_b = component
2714            .create_endpoint(&format!("http://127.0.0.1:{port}/world"), &endpoint_ctx)
2715            .unwrap();
2716        let mut consumer_b = endpoint_b.create_consumer().unwrap();
2717
2718        let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2719        let token_a = tokio_util::sync::CancellationToken::new();
2720        let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
2721
2722        let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2723        let token_b = tokio_util::sync::CancellationToken::new();
2724        let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
2725
2726        tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
2727        tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
2728        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2729
2730        let client = reqwest::Client::new();
2731
2732        // Request to /hello
2733        let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
2734        let (resp_hello, _) = tokio::join!(fut_hello, async {
2735            if let Some(mut envelope) = rx_a.recv().await {
2736                envelope.exchange.input.body =
2737                    camel_component_api::Body::Text("hello-response".to_string());
2738                if let Some(reply_tx) = envelope.reply_tx {
2739                    let _ = reply_tx.send(Ok(envelope.exchange));
2740                }
2741            }
2742        });
2743
2744        // Request to /world
2745        let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
2746        let (resp_world, _) = tokio::join!(fut_world, async {
2747            if let Some(mut envelope) = rx_b.recv().await {
2748                envelope.exchange.input.body =
2749                    camel_component_api::Body::Text("world-response".to_string());
2750                if let Some(reply_tx) = envelope.reply_tx {
2751                    let _ = reply_tx.send(Ok(envelope.exchange));
2752                }
2753            }
2754        });
2755
2756        let body_a = resp_hello.unwrap().text().await.unwrap();
2757        let body_b = resp_world.unwrap().text().await.unwrap();
2758
2759        assert_eq!(body_a, "hello-response");
2760        assert_eq!(body_b, "world-response");
2761
2762        token_a.cancel();
2763        token_b.cancel();
2764    }
2765
2766    #[tokio::test]
2767    async fn test_integration_unregistered_path_returns_404() {
2768        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2769
2770        // Get an OS-assigned free port (ephemeral)
2771        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2772        let port = listener.local_addr().unwrap().port();
2773        drop(listener);
2774
2775        let component = HttpComponent::new();
2776        let endpoint_ctx = NoOpComponentContext;
2777        let endpoint = component
2778            .create_endpoint(
2779                &format!("http://127.0.0.1:{port}/registered"),
2780                &endpoint_ctx,
2781            )
2782            .unwrap();
2783        let mut consumer = endpoint.create_consumer().unwrap();
2784
2785        let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2786        let token = tokio_util::sync::CancellationToken::new();
2787        let ctx = ConsumerContext::new(tx, token.clone());
2788
2789        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2790        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2791
2792        let client = reqwest::Client::new();
2793        let resp = client
2794            .get(format!("http://127.0.0.1:{port}/not-there"))
2795            .send()
2796            .await
2797            .unwrap();
2798        assert_eq!(resp.status().as_u16(), 404);
2799
2800        token.cancel();
2801    }
2802
2803    #[test]
2804    fn test_http_consumer_declares_concurrent() {
2805        use camel_component_api::ConcurrencyModel;
2806
2807        let config = HttpServerConfig {
2808            host: "127.0.0.1".to_string(),
2809            port: 19999,
2810            path: "/test".to_string(),
2811            max_request_body: 2 * 1024 * 1024,
2812            max_response_body: 10 * 1024 * 1024,
2813            max_inflight_requests: 1024,
2814        };
2815        let consumer = HttpConsumer::new(config);
2816        assert_eq!(
2817            consumer.concurrency_model(),
2818            ConcurrencyModel::Concurrent { max: None }
2819        );
2820    }
2821
2822    // -----------------------------------------------------------------------
2823    // HttpReplyBody streaming tests
2824    // -----------------------------------------------------------------------
2825
2826    #[tokio::test]
2827    async fn test_http_reply_body_stream_variant_exists() {
2828        use bytes::Bytes;
2829        use camel_component_api::CamelError;
2830        use futures::stream;
2831
2832        let chunks: Vec<Result<Bytes, CamelError>> =
2833            vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
2834        let stream = Box::pin(stream::iter(chunks));
2835        let reply_body = HttpReplyBody::Stream(stream);
2836        // Si compila y el match funciona, el test pasa
2837        match reply_body {
2838            HttpReplyBody::Stream(_) => {}
2839            HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
2840        }
2841    }
2842
2843    // -----------------------------------------------------------------------
2844    // OpenTelemetry propagation tests (only compiled with "otel" feature)
2845    // -----------------------------------------------------------------------
2846
2847    #[cfg(feature = "otel")]
2848    mod otel_tests {
2849        use super::*;
2850        use camel_component_api::Message;
2851        use tower::ServiceExt;
2852
2853        #[tokio::test]
2854        async fn test_producer_injects_traceparent_header() {
2855            let (url, _handle) = start_test_server_with_header_capture().await;
2856            let ctx = test_producer_ctx();
2857
2858            let component = HttpComponent::new();
2859            let endpoint_ctx = NoOpComponentContext;
2860            let endpoint = component
2861                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2862                .unwrap();
2863            let producer = endpoint.create_producer(&ctx).unwrap();
2864
2865            // Create exchange with an OTel context by extracting from a traceparent header
2866            let mut exchange = Exchange::new(Message::default());
2867            let mut headers = std::collections::HashMap::new();
2868            headers.insert(
2869                "traceparent".to_string(),
2870                "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
2871            );
2872            camel_otel::extract_into_exchange(&mut exchange, &headers);
2873
2874            let result = producer.oneshot(exchange).await.unwrap();
2875
2876            // Verify request succeeded
2877            let status = result
2878                .input
2879                .header("CamelHttpResponseCode")
2880                .and_then(|v| v.as_u64())
2881                .unwrap();
2882            assert_eq!(status, 200);
2883
2884            // The test server echoes back the received traceparent header
2885            let traceparent = result.input.header("x-received-traceparent");
2886            assert!(
2887                traceparent.is_some(),
2888                "traceparent header should have been sent"
2889            );
2890
2891            let traceparent_str = traceparent.unwrap().as_str().unwrap();
2892            // Verify format: version-traceid-spanid-flags
2893            let parts: Vec<&str> = traceparent_str.split('-').collect();
2894            assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2895            assert_eq!(parts[0], "00", "version should be 00");
2896            assert_eq!(
2897                parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2898                "trace-id should match"
2899            );
2900            assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
2901            assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
2902        }
2903
2904        #[tokio::test]
2905        async fn test_consumer_extracts_traceparent_header() {
2906            use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2907
2908            // Get an OS-assigned free port
2909            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2910            let port = listener.local_addr().unwrap().port();
2911            drop(listener);
2912
2913            let component = HttpComponent::new();
2914            let endpoint_ctx = NoOpComponentContext;
2915            let endpoint = component
2916                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
2917                .unwrap();
2918            let mut consumer = endpoint.create_consumer().unwrap();
2919
2920            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2921            let token = tokio_util::sync::CancellationToken::new();
2922            let ctx = ConsumerContext::new(tx, token.clone());
2923
2924            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2925            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2926
2927            // Send request with traceparent header
2928            let client = reqwest::Client::new();
2929            let send_fut = client
2930                .post(format!("http://127.0.0.1:{port}/trace"))
2931                .header(
2932                    "traceparent",
2933                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2934                )
2935                .body("test")
2936                .send();
2937
2938            let (http_result, _) = tokio::join!(send_fut, async {
2939                if let Some(envelope) = rx.recv().await {
2940                    // Verify the exchange has a valid OTel context by re-injecting it
2941                    // and checking the traceparent matches
2942                    let mut injected_headers = std::collections::HashMap::new();
2943                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2944
2945                    assert!(
2946                        injected_headers.contains_key("traceparent"),
2947                        "Exchange should have traceparent after extraction"
2948                    );
2949
2950                    let traceparent = injected_headers.get("traceparent").unwrap();
2951                    let parts: Vec<&str> = traceparent.split('-').collect();
2952                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2953                    assert_eq!(
2954                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2955                        "Trace ID should match the original traceparent header"
2956                    );
2957
2958                    if let Some(reply_tx) = envelope.reply_tx {
2959                        let _ = reply_tx.send(Ok(envelope.exchange));
2960                    }
2961                }
2962            });
2963
2964            let resp = http_result.unwrap();
2965            assert_eq!(resp.status().as_u16(), 200);
2966
2967            token.cancel();
2968        }
2969
2970        #[tokio::test]
2971        async fn test_consumer_extracts_mixed_case_traceparent_header() {
2972            use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2973
2974            // Get an OS-assigned free port
2975            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2976            let port = listener.local_addr().unwrap().port();
2977            drop(listener);
2978
2979            let component = HttpComponent::new();
2980            let endpoint_ctx = NoOpComponentContext;
2981            let endpoint = component
2982                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
2983                .unwrap();
2984            let mut consumer = endpoint.create_consumer().unwrap();
2985
2986            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2987            let token = tokio_util::sync::CancellationToken::new();
2988            let ctx = ConsumerContext::new(tx, token.clone());
2989
2990            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2991            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2992
2993            // Send request with MIXED-CASE TraceParent header (not lowercase)
2994            let client = reqwest::Client::new();
2995            let send_fut = client
2996                .post(format!("http://127.0.0.1:{port}/trace"))
2997                .header(
2998                    "TraceParent",
2999                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3000                )
3001                .body("test")
3002                .send();
3003
3004            let (http_result, _) = tokio::join!(send_fut, async {
3005                if let Some(envelope) = rx.recv().await {
3006                    // Verify the exchange has a valid OTel context by re-injecting it
3007                    // and checking the traceparent matches
3008                    let mut injected_headers = HashMap::new();
3009                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3010
3011                    assert!(
3012                        injected_headers.contains_key("traceparent"),
3013                        "Exchange should have traceparent after extraction from mixed-case header"
3014                    );
3015
3016                    let traceparent = injected_headers.get("traceparent").unwrap();
3017                    let parts: Vec<&str> = traceparent.split('-').collect();
3018                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3019                    assert_eq!(
3020                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3021                        "Trace ID should match the original mixed-case TraceParent header"
3022                    );
3023
3024                    if let Some(reply_tx) = envelope.reply_tx {
3025                        let _ = reply_tx.send(Ok(envelope.exchange));
3026                    }
3027                }
3028            });
3029
3030            let resp = http_result.unwrap();
3031            assert_eq!(resp.status().as_u16(), 200);
3032
3033            token.cancel();
3034        }
3035
3036        #[tokio::test]
3037        async fn test_producer_no_trace_context_no_crash() {
3038            let (url, _handle) = start_test_server().await;
3039            let ctx = test_producer_ctx();
3040
3041            let component = HttpComponent::new();
3042            let endpoint_ctx = NoOpComponentContext;
3043            let endpoint = component
3044                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3045                .unwrap();
3046            let producer = endpoint.create_producer(&ctx).unwrap();
3047
3048            // Create exchange with default (empty) otel_context - no trace context
3049            let exchange = Exchange::new(Message::default());
3050
3051            // Should succeed without panic
3052            let result = producer.oneshot(exchange).await.unwrap();
3053
3054            // Verify request succeeded
3055            let status = result
3056                .input
3057                .header("CamelHttpResponseCode")
3058                .and_then(|v| v.as_u64())
3059                .unwrap();
3060            assert_eq!(status, 200);
3061        }
3062
3063        /// Test server that captures and echoes back the traceparent header
3064        async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
3065            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3066            let addr = listener.local_addr().unwrap();
3067            let url = format!("http://127.0.0.1:{}", addr.port());
3068
3069            let handle = tokio::spawn(async move {
3070                loop {
3071                    if let Ok((mut stream, _)) = listener.accept().await {
3072                        tokio::spawn(async move {
3073                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
3074                            let mut buf = vec![0u8; 8192];
3075                            let n = stream.read(&mut buf).await.unwrap_or(0);
3076                            let request = String::from_utf8_lossy(&buf[..n]).to_string();
3077
3078                            // Extract traceparent header from request
3079                            let traceparent = request
3080                                .lines()
3081                                .find(|line| line.to_lowercase().starts_with("traceparent:"))
3082                                .map(|line| {
3083                                    line.split(':')
3084                                        .nth(1)
3085                                        .map(|s| s.trim().to_string())
3086                                        .unwrap_or_default()
3087                                })
3088                                .unwrap_or_default();
3089
3090                            let body =
3091                                format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
3092                            let response = format!(
3093                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
3094                                body.len(),
3095                                traceparent,
3096                                body
3097                            );
3098                            let _ = stream.write_all(response.as_bytes()).await;
3099                        });
3100                    }
3101                }
3102            });
3103
3104            (url, handle)
3105        }
3106    }
3107
3108    // -----------------------------------------------------------------------
3109    // Response streaming tests (Eje A - Task 2)
3110    // -----------------------------------------------------------------------
3111
3112    // -----------------------------------------------------------------------
3113    // Request streaming tests (Eje B - Task 3)
3114    // -----------------------------------------------------------------------
3115
3116    #[tokio::test]
3117    async fn test_request_body_arrives_as_stream() {
3118        use camel_component_api::Body;
3119        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3120
3121        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3122        let port = listener.local_addr().unwrap().port();
3123        drop(listener);
3124
3125        let component = HttpComponent::new();
3126        let endpoint_ctx = NoOpComponentContext;
3127        let endpoint = component
3128            .create_endpoint(&format!("http://127.0.0.1:{port}/upload"), &endpoint_ctx)
3129            .unwrap();
3130        let mut consumer = endpoint.create_consumer().unwrap();
3131
3132        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3133        let token = tokio_util::sync::CancellationToken::new();
3134        let ctx = ConsumerContext::new(tx, token.clone());
3135
3136        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3137        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3138
3139        let client = reqwest::Client::new();
3140        let send_fut = client
3141            .post(format!("http://127.0.0.1:{port}/upload"))
3142            .body("hello streaming world")
3143            .send();
3144
3145        let (http_result, _) = tokio::join!(send_fut, async {
3146            if let Some(mut envelope) = rx.recv().await {
3147                // Body must be Body::Stream, not Body::Text or Body::Bytes
3148                assert!(
3149                    matches!(envelope.exchange.input.body, Body::Stream(_)),
3150                    "expected Body::Stream, got discriminant {:?}",
3151                    std::mem::discriminant(&envelope.exchange.input.body)
3152                );
3153                // Materialize to verify content
3154                let bytes = envelope
3155                    .exchange
3156                    .input
3157                    .body
3158                    .into_bytes(1024 * 1024)
3159                    .await
3160                    .unwrap();
3161                assert_eq!(&bytes[..], b"hello streaming world");
3162
3163                envelope.exchange.input.body = camel_component_api::Body::Empty;
3164                if let Some(reply_tx) = envelope.reply_tx {
3165                    let _ = reply_tx.send(Ok(envelope.exchange));
3166                }
3167            }
3168        });
3169
3170        let resp = http_result.unwrap();
3171        assert_eq!(resp.status().as_u16(), 200);
3172
3173        token.cancel();
3174    }
3175
3176    // -----------------------------------------------------------------------
3177    // Response streaming tests (Eje A - Task 2)
3178    // -----------------------------------------------------------------------
3179
3180    #[tokio::test]
3181    async fn test_streaming_response_chunked() {
3182        use bytes::Bytes;
3183        use camel_component_api::Body;
3184        use camel_component_api::CamelError;
3185        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3186        use camel_component_api::{StreamBody, StreamMetadata};
3187        use futures::stream;
3188        use std::sync::Arc;
3189        use tokio::sync::Mutex;
3190
3191        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3192        let port = listener.local_addr().unwrap().port();
3193        drop(listener);
3194
3195        let component = HttpComponent::new();
3196        let endpoint_ctx = NoOpComponentContext;
3197        let endpoint = component
3198            .create_endpoint(&format!("http://127.0.0.1:{port}/stream"), &endpoint_ctx)
3199            .unwrap();
3200        let mut consumer = endpoint.create_consumer().unwrap();
3201
3202        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3203        let token = tokio_util::sync::CancellationToken::new();
3204        let ctx = ConsumerContext::new(tx, token.clone());
3205
3206        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3207        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3208
3209        let client = reqwest::Client::new();
3210        let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
3211
3212        let (http_result, _) = tokio::join!(send_fut, async {
3213            if let Some(mut envelope) = rx.recv().await {
3214                // Respond with Body::Stream
3215                let chunks: Vec<Result<Bytes, CamelError>> =
3216                    vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
3217                let stream = Box::pin(stream::iter(chunks));
3218                envelope.exchange.input.body = Body::Stream(StreamBody {
3219                    stream: Arc::new(Mutex::new(Some(stream))),
3220                    metadata: StreamMetadata::default(),
3221                });
3222                if let Some(reply_tx) = envelope.reply_tx {
3223                    let _ = reply_tx.send(Ok(envelope.exchange));
3224                }
3225            }
3226        });
3227
3228        let resp = http_result.unwrap();
3229        assert_eq!(resp.status().as_u16(), 200);
3230        let body = resp.text().await.unwrap();
3231        assert_eq!(body, "chunk1chunk2");
3232
3233        token.cancel();
3234    }
3235
3236    // -----------------------------------------------------------------------
3237    // 413 Content-Length limit test (Task 4)
3238    // -----------------------------------------------------------------------
3239
3240    #[tokio::test]
3241    async fn test_413_when_content_length_exceeds_limit() {
3242        use camel_component_api::ConsumerContext;
3243
3244        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3245        let port = listener.local_addr().unwrap().port();
3246        drop(listener);
3247
3248        // maxRequestBody=100 — any request declaring more than 100 bytes must get 413
3249        let component = HttpComponent::new();
3250        let endpoint_ctx = NoOpComponentContext;
3251        let endpoint = component
3252            .create_endpoint(
3253                &format!("http://127.0.0.1:{port}/upload?maxRequestBody=100"),
3254                &endpoint_ctx,
3255            )
3256            .unwrap();
3257        let mut consumer = endpoint.create_consumer().unwrap();
3258
3259        let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3260        let token = tokio_util::sync::CancellationToken::new();
3261        let ctx = ConsumerContext::new(tx, token.clone());
3262
3263        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3264        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3265
3266        let client = reqwest::Client::new();
3267        let resp = client
3268            .post(format!("http://127.0.0.1:{port}/upload"))
3269            .header("Content-Length", "1000") // declares 1000 bytes, limit is 100
3270            .body("x".repeat(1000))
3271            .send()
3272            .await
3273            .unwrap();
3274
3275        assert_eq!(resp.status().as_u16(), 413);
3276
3277        token.cancel();
3278    }
3279
3280    /// Chunked upload without Content-Length header must NOT be rejected by maxRequestBody.
3281    /// The spec says: "If there is no Content-Length, the limit does not apply at the
3282    /// consumer level — the route is responsible."
3283    #[tokio::test]
3284    async fn test_chunked_upload_without_content_length_bypasses_limit() {
3285        use bytes::Bytes;
3286        use camel_component_api::Body;
3287        use camel_component_api::ConsumerContext;
3288        use futures::stream;
3289
3290        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3291        let port = listener.local_addr().unwrap().port();
3292        drop(listener);
3293
3294        // maxRequestBody=10 — very small limit; chunked uploads have no Content-Length
3295        let component = HttpComponent::new();
3296        let endpoint_ctx = NoOpComponentContext;
3297        let endpoint = component
3298            .create_endpoint(
3299                &format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"),
3300                &endpoint_ctx,
3301            )
3302            .unwrap();
3303        let mut consumer = endpoint.create_consumer().unwrap();
3304
3305        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3306        let token = tokio_util::sync::CancellationToken::new();
3307        let ctx = ConsumerContext::new(tx, token.clone());
3308
3309        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3310        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3311
3312        let client = reqwest::Client::new();
3313
3314        // Use wrap_stream so reqwest sends chunked transfer encoding WITHOUT a
3315        // Content-Length header. 100 bytes exceeds the 10-byte maxRequestBody limit,
3316        // but since there's no Content-Length the 413 check must NOT fire.
3317        let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
3318            Ok(Bytes::from("y".repeat(50))),
3319            Ok(Bytes::from("y".repeat(50))),
3320        ];
3321        let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
3322        let send_fut = client
3323            .post(format!("http://127.0.0.1:{port}/upload"))
3324            .body(stream_body)
3325            .send();
3326
3327        let consumer_fut = async {
3328            // Use timeout to avoid deadlock if the handler rejects before enqueueing
3329            match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
3330                Ok(Some(mut envelope)) => {
3331                    assert!(
3332                        matches!(envelope.exchange.input.body, Body::Stream(_)),
3333                        "expected Body::Stream"
3334                    );
3335                    envelope.exchange.input.body = camel_component_api::Body::Empty;
3336                    if let Some(reply_tx) = envelope.reply_tx {
3337                        let _ = reply_tx.send(Ok(envelope.exchange));
3338                    }
3339                }
3340                Ok(None) => panic!("consumer channel closed unexpectedly"),
3341                Err(_) => {
3342                    // Timeout: the request was rejected before reaching the consumer.
3343                    // The HTTP response will carry the real status code (we check below).
3344                }
3345            }
3346        };
3347
3348        let (http_result, _) = tokio::join!(send_fut, consumer_fut);
3349
3350        let resp = http_result.unwrap();
3351        // Must NOT be 413; chunked uploads without Content-Length bypass the limit.
3352        assert_ne!(
3353            resp.status().as_u16(),
3354            413,
3355            "chunked upload must not be rejected by maxRequestBody"
3356        );
3357        assert_eq!(resp.status().as_u16(), 200);
3358
3359        token.cancel();
3360    }
3361}