Skip to main content

camel_component_http/
lib.rs

1pub mod bundle;
2pub mod config;
3pub use bundle::HttpBundle;
4pub use config::HttpConfig;
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex, OnceLock};
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use tokio::sync::{OnceCell, RwLock};
14use tower::Service;
15use tracing::debug;
16
17use axum::body::BodyDataStream;
18use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, StreamBody, StreamMetadata};
19use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
20use camel_component_api::{UriComponents, UriConfig, parse_uri};
21use futures::TryStreamExt;
22use futures::stream::BoxStream;
23
24// ---------------------------------------------------------------------------
25// HttpEndpointConfig
26// ---------------------------------------------------------------------------
27
28/// Configuration for an HTTP client (producer) endpoint.
29///
30/// # Memory Limits
31///
32/// HTTP operations enforce conservative memory limits to prevent denial-of-service
33/// attacks from untrusted network sources. These limits are significantly lower than
34/// file component limits (100MB) because HTTP typically handles API responses rather
35/// than large file transfers, and clients may be untrusted.
36///
37/// ## Default Limits
38///
39/// - **HTTP client body**: 10MB (typical API responses)
40/// - **HTTP server request**: 2MB (untrusted network input - see `HttpServerConfig`)
41/// - **HTTP server response**: 10MB (same as client - see `HttpServerConfig`)
42///
43/// ## Rationale
44///
45/// The 10MB limit for HTTP client responses is appropriate for most API interactions
46/// while providing protection against:
47/// - Malicious servers sending oversized responses
48/// - Runaway processes generating unexpectedly large payloads
49/// - Memory exhaustion attacks
50///
51/// The 2MB server request limit is even more conservative because it handles input
52/// from potentially untrusted clients on the public internet.
53///
54/// ## Overriding Limits
55///
56/// Override the default client body limit using the `maxBodySize` URI parameter:
57///
58/// ```text
59/// http://api.example.com/large-data?maxBodySize=52428800
60/// ```
61///
62/// For server endpoints, use `maxRequestBody` and `maxResponseBody` parameters:
63///
64/// ```text
65/// http://0.0.0.0:8080/upload?maxRequestBody=52428800
66/// ```
67///
68/// ## Behavior When Exceeded
69///
70/// When a body exceeds the configured limit:
71/// - An error is returned immediately
72/// - No memory is exhausted - the limit is checked before allocation
73/// - The HTTP connection is terminated cleanly
74///
75/// ## Security Considerations
76///
77/// HTTP endpoints should be treated with more caution than file endpoints because:
78/// - Clients may be unknown and untrusted
79/// - Network traffic can be spoofed or malicious
80/// - DoS attacks often exploit unbounded resource consumption
81///
82/// Only increase limits when you control both ends of the connection or when
83/// business requirements demand larger payloads.
84#[derive(Debug, Clone)]
85pub struct HttpEndpointConfig {
86    pub base_url: String,
87    pub http_method: Option<String>,
88    pub throw_exception_on_failure: bool,
89    pub ok_status_code_range: (u16, u16),
90    pub response_timeout: Option<Duration>,
91    pub query_params: HashMap<String, String>,
92    pub allow_private_ips: bool,
93    pub blocked_hosts: Vec<String>,
94    pub max_body_size: usize,
95}
96
97/// Camel options that should NOT be forwarded as HTTP query params
98const HTTP_CAMEL_OPTIONS: &[&str] = &[
99    "httpMethod",
100    "throwExceptionOnFailure",
101    "okStatusCodeRange",
102    "followRedirects",
103    "connectTimeout",
104    "responseTimeout",
105    "allowPrivateIps",
106    "blockedHosts",
107    "maxBodySize",
108];
109
110impl UriConfig for HttpEndpointConfig {
111    /// Returns "http" as the primary scheme (also accepts "https")
112    fn scheme() -> &'static str {
113        "http"
114    }
115
116    fn from_uri(uri: &str) -> Result<Self, CamelError> {
117        let parts = parse_uri(uri)?;
118        Self::from_components(parts)
119    }
120
121    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
122        // Validate scheme - accept both http and https
123        if parts.scheme != "http" && parts.scheme != "https" {
124            return Err(CamelError::InvalidUri(format!(
125                "expected scheme 'http' or 'https', got '{}'",
126                parts.scheme
127            )));
128        }
129
130        // Construct base_url from scheme + path
131        // e.g., "http://localhost:8080/api" from scheme "http" and path "//localhost:8080/api"
132        let base_url = format!("{}:{}", parts.scheme, parts.path);
133
134        let http_method = parts.params.get("httpMethod").cloned();
135
136        let throw_exception_on_failure = parts
137            .params
138            .get("throwExceptionOnFailure")
139            .map(|v| v != "false")
140            .unwrap_or(true);
141
142        // Parse status code range from "start-end" format (e.g., "200-299")
143        let ok_status_code_range = parts
144            .params
145            .get("okStatusCodeRange")
146            .and_then(|v| {
147                let (start, end) = v.split_once('-')?;
148                Some((start.parse::<u16>().ok()?, end.parse::<u16>().ok()?))
149            })
150            .unwrap_or((200, 299));
151
152        let response_timeout = parts
153            .params
154            .get("responseTimeout")
155            .and_then(|v| v.parse::<u64>().ok())
156            .map(Duration::from_millis);
157
158        // SSRF protection settings
159        let allow_private_ips = parts
160            .params
161            .get("allowPrivateIps")
162            .map(|v| v == "true")
163            .unwrap_or(false); // Default: block private IPs
164
165        // Parse comma-separated blocked hosts
166        let blocked_hosts = parts
167            .params
168            .get("blockedHosts")
169            .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
170            .unwrap_or_default();
171
172        let max_body_size = parts
173            .params
174            .get("maxBodySize")
175            .and_then(|v| v.parse::<usize>().ok())
176            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
177
178        // Collect remaining params (not Camel options) as query params
179        let query_params: HashMap<String, String> = parts
180            .params
181            .into_iter()
182            .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
183            .collect();
184
185        Ok(Self {
186            base_url,
187            http_method,
188            throw_exception_on_failure,
189            ok_status_code_range,
190            response_timeout,
191            query_params,
192            allow_private_ips,
193            blocked_hosts,
194            max_body_size,
195        })
196    }
197}
198
199impl HttpEndpointConfig {
200    pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
201        let parts = parse_uri(uri)?;
202        let mut endpoint = Self::from_components(parts.clone())?;
203        if endpoint.response_timeout.is_none() {
204            endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
205        }
206        if !parts.params.contains_key("allowPrivateIps") {
207            endpoint.allow_private_ips = config.allow_private_ips;
208        }
209        if !parts.params.contains_key("blockedHosts") {
210            endpoint.blocked_hosts = config.blocked_hosts.clone();
211        }
212        if !parts.params.contains_key("maxBodySize") {
213            endpoint.max_body_size = config.max_body_size;
214        }
215        Ok(endpoint)
216    }
217}
218
219// ---------------------------------------------------------------------------
220// HttpServerConfig
221// ---------------------------------------------------------------------------
222
223/// Configuration for an HTTP server (consumer) endpoint.
224#[derive(Debug, Clone)]
225pub struct HttpServerConfig {
226    /// Bind address, e.g. "0.0.0.0" or "127.0.0.1".
227    pub host: String,
228    /// TCP port to listen on.
229    pub port: u16,
230    /// URL path this consumer handles, e.g. "/orders".
231    pub path: String,
232    /// Maximum request body size in bytes.
233    pub max_request_body: usize,
234    /// Maximum response body size for materializing streams in bytes.
235    pub max_response_body: usize,
236    /// Maximum number of in-flight requests handled concurrently by this server.
237    pub max_inflight_requests: usize,
238}
239
240impl UriConfig for HttpServerConfig {
241    /// Returns "http" as the primary scheme (also accepts "https")
242    fn scheme() -> &'static str {
243        "http"
244    }
245
246    fn from_uri(uri: &str) -> Result<Self, CamelError> {
247        let parts = parse_uri(uri)?;
248        Self::from_components(parts)
249    }
250
251    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
252        // Validate scheme - accept both http and https
253        if parts.scheme != "http" && parts.scheme != "https" {
254            return Err(CamelError::InvalidUri(format!(
255                "expected scheme 'http' or 'https', got '{}'",
256                parts.scheme
257            )));
258        }
259
260        // parts.path is everything after the scheme colon, e.g. "//0.0.0.0:8080/orders"
261        // Strip leading "//"
262        let authority_and_path = parts.path.trim_start_matches('/');
263
264        // Split on the first "/" to separate "host:port" from "/path"
265        let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
266            (&authority_and_path[..idx], &authority_and_path[idx..])
267        } else {
268            (authority_and_path, "/")
269        };
270
271        let path = if path_suffix.is_empty() {
272            "/"
273        } else {
274            path_suffix
275        }
276        .to_string();
277
278        // Parse host:port from authority
279        let (host, port) = if let Some(colon) = authority.rfind(':') {
280            let port_str = &authority[colon + 1..];
281            match port_str.parse::<u16>() {
282                Ok(p) => (authority[..colon].to_string(), p),
283                Err(_) => {
284                    return Err(CamelError::InvalidUri(format!(
285                        "invalid port '{}' in authority",
286                        port_str
287                    )));
288                }
289            }
290        } else {
291            // Default port based on scheme: 443 for https, 80 for http
292            let default_port = if parts.scheme == "https" { 443 } else { 80 };
293            (authority.to_string(), default_port)
294        };
295
296        let max_request_body = parts
297            .params
298            .get("maxRequestBody")
299            .and_then(|v| v.parse::<usize>().ok())
300            .unwrap_or(2 * 1024 * 1024); // Default: 2MB
301
302        let max_response_body = parts
303            .params
304            .get("maxResponseBody")
305            .and_then(|v| v.parse::<usize>().ok())
306            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
307
308        let max_inflight_requests = parts
309            .params
310            .get("maxInflightRequests")
311            .and_then(|v| v.parse::<usize>().ok())
312            .unwrap_or(1024);
313
314        Ok(Self {
315            host,
316            port,
317            path,
318            max_request_body,
319            max_response_body,
320            max_inflight_requests,
321        })
322    }
323}
324
325impl HttpServerConfig {
326    pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
327        let parts = parse_uri(uri)?;
328        let mut server = Self::from_components(parts.clone())?;
329        if !parts.params.contains_key("maxRequestBody") {
330            server.max_request_body = config.max_request_body;
331        }
332        if !parts.params.contains_key("maxResponseBody") {
333            server.max_response_body = config.max_body_size;
334        }
335        Ok(server)
336    }
337}
338
339// ---------------------------------------------------------------------------
340// RequestEnvelope / HttpReply
341// ---------------------------------------------------------------------------
342
343/// Body de la respuesta HTTP: bytes ya materializados o stream lazy.
344pub(crate) enum HttpReplyBody {
345    Bytes(bytes::Bytes),
346    Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
347}
348
349/// An inbound HTTP request sent from the Axum dispatch handler to an
350/// `HttpConsumer` receive loop.
351pub(crate) struct RequestEnvelope {
352    pub(crate) method: String,
353    pub(crate) path: String,
354    pub(crate) query: String,
355    pub(crate) headers: http::HeaderMap,
356    pub(crate) body: StreamBody,
357    pub(crate) reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
358}
359
360/// The HTTP response that `HttpConsumer` sends back to the Axum handler.
361pub(crate) struct HttpReply {
362    pub(crate) status: u16,
363    pub(crate) headers: Vec<(String, String)>,
364    pub(crate) body: HttpReplyBody,
365}
366
367// ---------------------------------------------------------------------------
368// DispatchTable / ServerRegistry
369// ---------------------------------------------------------------------------
370
371/// Maps URL path → channel sender for the consumer that owns that path.
372pub(crate) type DispatchTable =
373    Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
374
375type ServerKey = (String, u16);
376
377/// Handle to a running Axum server on one interface/port.
378#[allow(dead_code)]
379struct ServerHandle {
380    dispatch: DispatchTable,
381    max_request_body: usize,
382    max_response_body: usize,
383    max_inflight_requests: usize,
384    inflight: Arc<tokio::sync::Semaphore>,
385    /// Kept alive so the task isn't dropped; not used directly.
386    _task: tokio::task::JoinHandle<()>,
387}
388
389/// Process-global registry mapping (host, port) → running Axum server handle.
390pub struct ServerRegistry {
391    inner: Mutex<HashMap<ServerKey, Arc<OnceCell<ServerHandle>>>>,
392}
393
394impl ServerRegistry {
395    /// Returns the global singleton.
396    pub fn global() -> &'static Self {
397        static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
398        INSTANCE.get_or_init(|| ServerRegistry {
399            inner: Mutex::new(HashMap::new()),
400        })
401    }
402
403    /// Returns the `DispatchTable` for `port`, spawning a new Axum server if
404    /// none is running on that port yet.
405    pub(crate) async fn get_or_spawn(
406        &'static self,
407        host: &str,
408        port: u16,
409        max_request_body: usize,
410        max_response_body: usize,
411        max_inflight_requests: usize,
412    ) -> Result<DispatchTable, CamelError> {
413        let host_owned = host.to_string();
414
415        let cell = {
416            let mut guard = self.inner.lock().map_err(|_| {
417                CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
418            })?;
419            let key = (host.to_string(), port);
420            guard
421                .entry(key)
422                .or_insert_with(|| Arc::new(OnceCell::new()))
423                .clone()
424        };
425
426        if let Some(existing) = cell.get()
427            && existing.max_request_body != max_request_body
428        {
429            return Err(CamelError::EndpointCreationFailed(
430                format!(
431                    "incompatible maxRequestBody for shared server (host={host}, port={port}): {} vs {}",
432                    existing.max_request_body, max_request_body
433                ),
434            ));
435        }
436
437        if let Some(existing) = cell.get()
438            && existing.max_response_body != max_response_body
439        {
440            return Err(CamelError::EndpointCreationFailed(
441                format!(
442                    "incompatible maxResponseBody for shared server (host={host}, port={port}): {} vs {}",
443                    existing.max_response_body, max_response_body
444                ),
445            ));
446        }
447
448        if let Some(existing) = cell.get()
449            && existing.max_inflight_requests != max_inflight_requests
450        {
451            return Err(CamelError::EndpointCreationFailed(
452                format!(
453                    "incompatible maxInflightRequests for shared server (host={host}, port={port}): {} vs {}",
454                    existing.max_inflight_requests, max_inflight_requests
455                ),
456            ));
457        }
458
459        let handle = cell
460            .get_or_try_init(|| async {
461                let addr = format!("{host_owned}:{port}");
462                let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
463                    CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
464                })?;
465                let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
466                let inflight = Arc::new(tokio::sync::Semaphore::new(max_inflight_requests));
467                let task = tokio::spawn(run_axum_server(
468                    listener,
469                    Arc::clone(&dispatch),
470                    max_request_body,
471                    max_response_body,
472                    Arc::clone(&inflight),
473                ));
474                Ok::<ServerHandle, CamelError>(ServerHandle {
475                    dispatch,
476                    max_request_body,
477                    max_response_body,
478                    max_inflight_requests,
479                    inflight,
480                    _task: task,
481                })
482            })
483            .await?;
484
485        Ok(Arc::clone(&handle.dispatch))
486    }
487}
488
489// ---------------------------------------------------------------------------
490// Axum server
491// ---------------------------------------------------------------------------
492
493use axum::{
494    Router,
495    body::Body as AxumBody,
496    extract::{Request, State},
497    http::{Response, StatusCode},
498    response::IntoResponse,
499};
500
501#[derive(Clone)]
502struct AppState {
503    dispatch: DispatchTable,
504    max_request_body: usize,
505    max_response_body: usize,
506    inflight: Arc<tokio::sync::Semaphore>,
507}
508
509async fn run_axum_server(
510    listener: tokio::net::TcpListener,
511    dispatch: DispatchTable,
512    max_request_body: usize,
513    max_response_body: usize,
514    inflight: Arc<tokio::sync::Semaphore>,
515) {
516    let state = AppState {
517        dispatch,
518        max_request_body,
519        max_response_body,
520        inflight,
521    };
522    let app = Router::new().fallback(dispatch_handler).with_state(state);
523
524    axum::serve(listener, app).await.unwrap_or_else(|e| {
525        tracing::error!(error = %e, "Axum server error");
526    });
527}
528
529async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
530    let method = req.method().to_string();
531    let path = req.uri().path().to_string();
532    let query = req.uri().query().unwrap_or("").to_string();
533    let headers = req.headers().clone();
534
535    // Check Content-Length against limit BEFORE opening the stream
536    let content_length: Option<u64> = headers
537        .get(http::header::CONTENT_LENGTH)
538        .and_then(|v| v.to_str().ok())
539        .and_then(|s| s.parse().ok());
540
541    if let Some(len) = content_length
542        && len > state.max_request_body as u64
543    {
544        return Response::builder()
545            .status(StatusCode::PAYLOAD_TOO_LARGE)
546            .body(AxumBody::from("Request body exceeds configured limit"))
547            .expect("infallible");
548    }
549
550    let _permit = match Arc::clone(&state.inflight).try_acquire_owned() {
551        Ok(permit) => permit,
552        Err(_) => {
553            return Response::builder()
554                .status(StatusCode::SERVICE_UNAVAILABLE)
555                .body(AxumBody::from("Service Unavailable"))
556                .expect("infallible");
557        }
558    };
559
560    // Build StreamBody from Axum body WITHOUT materializing
561    let content_type = headers
562        .get(http::header::CONTENT_TYPE)
563        .and_then(|v| v.to_str().ok())
564        .map(|s| s.to_string());
565
566    let data_stream: BodyDataStream = req.into_body().into_data_stream();
567    let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
568    let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
569
570    let stream_body = StreamBody {
571        stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
572        metadata: StreamMetadata {
573            size_hint: content_length,
574            content_type,
575            origin: None,
576        },
577    };
578
579    // Look up handler for this path
580    let sender = {
581        let table = state.dispatch.read().await;
582        table.get(&path).cloned()
583    };
584    let Some(sender) = sender else {
585        return Response::builder()
586            .status(StatusCode::NOT_FOUND)
587            .body(AxumBody::from("No consumer registered for this path"))
588            .expect("infallible");
589    };
590
591    let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
592    let envelope = RequestEnvelope {
593        method,
594        path,
595        query,
596        headers,
597        body: stream_body,
598        reply_tx,
599    };
600
601    if sender.send(envelope).await.is_err() {
602        return Response::builder()
603            .status(StatusCode::SERVICE_UNAVAILABLE)
604            .body(AxumBody::from("Consumer unavailable"))
605            .expect("infallible");
606    }
607
608    match reply_rx.await {
609        Ok(reply) => {
610            let reply = match reply.body {
611                HttpReplyBody::Bytes(b) if exceeds_max_response_body(b.len(), state.max_response_body) => {
612                    HttpReply {
613                        status: 500,
614                        headers: vec![],
615                        body: HttpReplyBody::Bytes(bytes::Bytes::from("Response body exceeds configured limit")),
616                    }
617                }
618                _ => reply,
619            };
620
621            let status =
622                StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
623            let mut builder = Response::builder().status(status);
624            for (k, v) in &reply.headers {
625                builder = builder.header(k.as_str(), v.as_str());
626            }
627            match reply.body {
628                HttpReplyBody::Bytes(b) => builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
629                    Response::builder()
630                        .status(StatusCode::INTERNAL_SERVER_ERROR)
631                        .body(AxumBody::from("Invalid response headers from consumer"))
632                        .expect("infallible")
633                }),
634                HttpReplyBody::Stream(stream) => builder
635                    .body(AxumBody::from_stream(stream))
636                    .unwrap_or_else(|_| {
637                        Response::builder()
638                            .status(StatusCode::INTERNAL_SERVER_ERROR)
639                            .body(AxumBody::from("Invalid response headers from consumer"))
640                            .expect("infallible")
641                    }),
642            }
643        }
644        Err(_) => Response::builder()
645            .status(StatusCode::INTERNAL_SERVER_ERROR)
646            .body(AxumBody::from("Pipeline error"))
647            .expect("Response::builder() with a known-valid status code and body is infallible"),
648    }
649}
650
651fn exceeds_max_response_body(len: usize, max: usize) -> bool {
652    len > max
653}
654
655// ---------------------------------------------------------------------------
656// HttpConsumer
657// ---------------------------------------------------------------------------
658
659pub struct HttpConsumer {
660    config: HttpServerConfig,
661}
662
663impl HttpConsumer {
664    pub fn new(config: HttpServerConfig) -> Self {
665        Self { config }
666    }
667}
668
669#[async_trait::async_trait]
670impl Consumer for HttpConsumer {
671    async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
672        use camel_component_api::{Body, Exchange, Message};
673
674        let dispatch = ServerRegistry::global()
675            .get_or_spawn(
676                &self.config.host,
677                self.config.port,
678                self.config.max_request_body,
679                self.config.max_response_body,
680                self.config.max_inflight_requests,
681            )
682            .await?;
683
684        // Create a channel for this path and register it
685        let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
686        {
687            let mut table = dispatch.write().await;
688            table.insert(self.config.path.clone(), env_tx);
689        }
690
691        let path = self.config.path.clone();
692        let cancel_token = ctx.cancel_token();
693        loop {
694            tokio::select! {
695                _ = ctx.cancelled() => {
696                    break;
697                }
698                envelope = env_rx.recv() => {
699                    let Some(envelope) = envelope else { break; };
700
701                    // Build Exchange from HTTP request
702                    let mut msg = Message::default();
703
704                    // Set standard Camel HTTP headers
705                    msg.set_header("CamelHttpMethod",
706                        serde_json::Value::String(envelope.method.clone()));
707                    msg.set_header("CamelHttpPath",
708                        serde_json::Value::String(envelope.path.clone()));
709                    msg.set_header("CamelHttpQuery",
710                        serde_json::Value::String(envelope.query.clone()));
711
712                    // Forward HTTP headers (skip pseudo-headers)
713                    for (k, v) in &envelope.headers {
714                        if let Ok(val_str) = v.to_str() {
715                            msg.set_header(
716                                k.as_str(),
717                                serde_json::Value::String(val_str.to_string()),
718                            );
719                        }
720                    }
721
722                    // Body: always arrives as Body::Stream (native streaming)
723                    // Routes can call into_bytes() if they need to materialize
724                    msg.body = Body::Stream(envelope.body);
725
726                    #[allow(unused_mut)]
727                    let mut exchange = Exchange::new(msg);
728
729                    // Extract W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
730                    #[cfg(feature = "otel")]
731                    {
732                        let headers: HashMap<String, String> = envelope
733                            .headers
734                            .iter()
735                            .filter_map(|(k, v)| {
736                                Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
737                            })
738                            .collect();
739                        camel_otel::extract_into_exchange(&mut exchange, &headers);
740                    }
741
742                    let reply_tx = envelope.reply_tx;
743                    let sender = ctx.sender().clone();
744                    let path_clone = path.clone();
745                    let cancel = cancel_token.clone();
746
747                    // Spawn a task to handle this request concurrently
748                    //
749                    // NOTE: This spawns a separate tokio task for each incoming HTTP request to enable
750                    // true concurrent request processing. This change was introduced as part of the
751                    // pipeline concurrency feature and was NOT part of the original HttpConsumer design.
752                    //
753                    // Rationale:
754                    // 1. Without spawning per-request tasks, the send_and_wait() operation would block
755                    //    the consumer's main loop until the pipeline processing completes
756                    // 2. This blocking would prevent multiple HTTP requests from being processed
757                    //    concurrently, even when ConcurrencyModel::Concurrent is enabled on the pipeline
758                    // 3. The channel would never have multiple exchanges buffered simultaneously,
759                    //    defeating the purpose of pipeline-side concurrency
760                    // 4. By spawning a task per request, we allow the consumer loop to continue
761                    //    accepting new requests while existing ones are processed in the pipeline
762                    //
763                    // This approach effectively decouples request acceptance from pipeline processing,
764                    // allowing the channel to buffer multiple exchanges that can be processed concurrently
765                    // by the pipeline when ConcurrencyModel::Concurrent is active.
766                    tokio::spawn(async move {
767                        // Check for cancellation before sending to pipeline.
768                        // Returns 503 (Service Unavailable) instead of letting the request
769                        // enter a shutting-down pipeline. This is a behavioral change from
770                        // the pre-concurrency implementation where cancellation during
771                        // processing would result in a 500 (Internal Server Error).
772                        // 503 is more semantically correct: the server is temporarily
773                        // unable to handle the request due to shutdown.
774                        if cancel.is_cancelled() {
775                            let _ = reply_tx.send(HttpReply {
776                                status: 503,
777                                headers: vec![],
778                                body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
779                            });
780                            return;
781                        }
782
783                        // Send through pipeline and await result
784                        let (tx, rx) = tokio::sync::oneshot::channel();
785                        let envelope = camel_component_api::consumer::ExchangeEnvelope {
786                            exchange,
787                            reply_tx: Some(tx),
788                        };
789
790                        let result = match sender.send(envelope).await {
791                            Ok(()) => rx.await.map_err(|_| camel_component_api::CamelError::ChannelClosed),
792                            Err(_) => Err(camel_component_api::CamelError::ChannelClosed),
793                        }
794                        .and_then(|r| r);
795
796                        let reply = match result {
797                            Ok(out) => {
798                                let status = out
799                                    .input
800                                    .header("CamelHttpResponseCode")
801                                    .and_then(|v| v.as_u64())
802                                    .map(|s| s as u16)
803                                    .unwrap_or(200);
804
805                                let reply_body: HttpReplyBody = match out.input.body {
806                                    Body::Empty => HttpReplyBody::Bytes(bytes::Bytes::new()),
807                                    Body::Bytes(b) => HttpReplyBody::Bytes(b),
808                                    Body::Text(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
809                                    Body::Xml(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
810                                    Body::Json(v) => HttpReplyBody::Bytes(bytes::Bytes::from(
811                                        v.to_string().into_bytes(),
812                                    )),
813                                    Body::Stream(s) => {
814                                        match s.stream.lock().await.take() {
815                                            Some(stream) => HttpReplyBody::Stream(stream),
816                                            None => {
817                                                tracing::error!(
818                                                    "Body::Stream already consumed before HTTP reply — returning 500"
819                                                );
820                                                let error_reply = HttpReply {
821                                                    status: 500,
822                                                    headers: vec![],
823                                                    body: HttpReplyBody::Bytes(bytes::Bytes::new()),
824                                                };
825                                                if reply_tx.send(error_reply).is_err() {
826                                                    debug!("reply_tx dropped before error reply could be sent");
827                                                }
828                                                return;
829                                            }
830                                        }
831                                    }
832                                };
833
834                                let resp_headers: Vec<(String, String)> = out
835                                    .input
836                                    .headers
837                                    .iter()
838                                    // Filter Camel internal headers
839                                    .filter(|(k, _)| !k.starts_with("Camel"))
840                                    // Filter hop-by-hop and request-only headers
841                                    // Based on Apache Camel's HttpUtil.addCommonFilters()
842                                    .filter(|(k, _)| {
843                                        !matches!(
844                                            k.to_lowercase().as_str(),
845                                            // RFC 2616 Section 4.5 - General headers
846                                            "content-length" |      // Auto-calculated by framework
847                                            "content-type" |        // Auto-calculated from body
848                                            "transfer-encoding" |   // Hop-by-hop
849                                            "connection" |          // Hop-by-hop
850                                            "cache-control" |       // Hop-by-hop
851                                            "date" |                // Auto-generated
852                                            "pragma" |              // Hop-by-hop
853                                            "trailer" |             // Hop-by-hop
854                                            "upgrade" |             // Hop-by-hop
855                                            "via" |                 // Hop-by-hop
856                                            "warning" |             // Hop-by-hop
857                                            // Request-only headers
858                                            "host" |                // Request-only
859                                            "user-agent" |          // Request-only
860                                            "accept" |              // Request-only
861                                            "accept-encoding" |     // Request-only
862                                            "accept-language" |     // Request-only
863                                            "accept-charset" |      // Request-only
864                                            "authorization" |       // Request-only (security)
865                                            "proxy-authorization" | // Request-only (security)
866                                            "cookie" |              // Request-only
867                                            "expect" |              // Request-only
868                                            "from" |                // Request-only
869                                            "if-match" |            // Request-only
870                                            "if-modified-since" |   // Request-only
871                                            "if-none-match" |       // Request-only
872                                            "if-range" |            // Request-only
873                                            "if-unmodified-since" | // Request-only
874                                            "max-forwards" |        // Request-only
875                                            "proxy-connection" |    // Request-only
876                                            "range" |               // Request-only
877                                            "referer" |             // Request-only
878                                            "te"                    // Request-only
879                                        )
880                                    })
881                                    .filter_map(|(k, v)| {
882                                        v.as_str().map(|s| (k.clone(), s.to_string()))
883                                    })
884                                    .collect();
885
886                                HttpReply {
887                                    status,
888                                    headers: resp_headers,
889                                    body: reply_body,
890                                }
891                            }
892                            Err(e) => {
893                                tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
894                                HttpReply {
895                                    status: 500,
896                                    headers: vec![],
897                                    body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
898                                }
899                            }
900                        };
901
902                        // Reply to Axum handler (ignore error if client disconnected)
903                        let _ = reply_tx.send(reply);
904                    });
905                }
906            }
907        }
908
909        // Deregister this path
910        {
911            let mut table = dispatch.write().await;
912            table.remove(&path);
913        }
914
915        Ok(())
916    }
917
918    async fn stop(&mut self) -> Result<(), CamelError> {
919        Ok(())
920    }
921
922    fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
923        camel_component_api::ConcurrencyModel::Concurrent { max: None }
924    }
925}
926
927// ---------------------------------------------------------------------------
928// HttpComponent / HttpsComponent
929// ---------------------------------------------------------------------------
930
931pub struct HttpComponent {
932    client: reqwest::Client,
933    config: HttpConfig,
934}
935
936fn build_client(config: &HttpConfig) -> reqwest::Client {
937    let mut builder = reqwest::Client::builder()
938        .connect_timeout(Duration::from_millis(config.connect_timeout_ms))
939        .pool_max_idle_per_host(config.pool_max_idle_per_host)
940        .pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
941
942    if !config.follow_redirects {
943        builder = builder.redirect(reqwest::redirect::Policy::none());
944    }
945
946    builder
947        .build()
948        .expect("reqwest::Client::build() with valid config should not fail")
949}
950
951impl HttpComponent {
952    pub fn new() -> Self {
953        let config = HttpConfig::default();
954        let client = build_client(&config);
955        Self { client, config }
956    }
957
958    pub fn with_config(config: HttpConfig) -> Self {
959        let client = build_client(&config);
960        Self { client, config }
961    }
962
963    pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
964        match config {
965            Some(cfg) => Self::with_config(cfg),
966            None => Self::new(),
967        }
968    }
969}
970
971impl Default for HttpComponent {
972    fn default() -> Self {
973        Self::new()
974    }
975}
976
977impl Component for HttpComponent {
978    fn scheme(&self) -> &str {
979        "http"
980    }
981
982    fn create_endpoint(
983        &self,
984        uri: &str,
985        _ctx: &dyn camel_component_api::ComponentContext,
986    ) -> Result<Box<dyn Endpoint>, CamelError> {
987        let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
988        let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
989        Ok(Box::new(HttpEndpoint {
990            uri: uri.to_string(),
991            config,
992            server_config,
993            client: self.client.clone(),
994        }))
995    }
996}
997
998pub struct HttpsComponent {
999    client: reqwest::Client,
1000    config: HttpConfig,
1001}
1002
1003impl HttpsComponent {
1004    pub fn new() -> Self {
1005        let config = HttpConfig::default();
1006        let client = build_client(&config);
1007        Self { client, config }
1008    }
1009
1010    pub fn with_config(config: HttpConfig) -> Self {
1011        let client = build_client(&config);
1012        Self { client, config }
1013    }
1014
1015    pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1016        match config {
1017            Some(cfg) => Self::with_config(cfg),
1018            None => Self::new(),
1019        }
1020    }
1021}
1022
1023impl Default for HttpsComponent {
1024    fn default() -> Self {
1025        Self::new()
1026    }
1027}
1028
1029impl Component for HttpsComponent {
1030    fn scheme(&self) -> &str {
1031        "https"
1032    }
1033
1034    fn create_endpoint(
1035        &self,
1036        uri: &str,
1037        _ctx: &dyn camel_component_api::ComponentContext,
1038    ) -> Result<Box<dyn Endpoint>, CamelError> {
1039        let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1040        let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1041        Ok(Box::new(HttpEndpoint {
1042            uri: uri.to_string(),
1043            config,
1044            server_config,
1045            client: self.client.clone(),
1046        }))
1047    }
1048}
1049
1050// ---------------------------------------------------------------------------
1051// HttpEndpoint
1052// ---------------------------------------------------------------------------
1053
1054struct HttpEndpoint {
1055    uri: String,
1056    config: HttpEndpointConfig,
1057    server_config: HttpServerConfig,
1058    client: reqwest::Client,
1059}
1060
1061impl Endpoint for HttpEndpoint {
1062    fn uri(&self) -> &str {
1063        &self.uri
1064    }
1065
1066    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1067        Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
1068    }
1069
1070    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1071        Ok(BoxProcessor::new(HttpProducer {
1072            config: Arc::new(self.config.clone()),
1073            client: self.client.clone(),
1074        }))
1075    }
1076}
1077
1078// ---------------------------------------------------------------------------
1079// SSRF Protection
1080// ---------------------------------------------------------------------------
1081
1082fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
1083    let parsed = url::Url::parse(url)
1084        .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1085
1086    // Check blocked hosts
1087    if let Some(host) = parsed.host_str()
1088        && config.blocked_hosts.iter().any(|blocked| host == blocked)
1089    {
1090        return Err(CamelError::ProcessorError(format!(
1091            "Host '{}' is blocked",
1092            host
1093        )));
1094    }
1095
1096    // Check private IPs if not allowed
1097    if !config.allow_private_ips
1098        && let Some(host) = parsed.host()
1099    {
1100        match host {
1101            url::Host::Ipv4(ip) => {
1102                if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
1103                    return Err(CamelError::ProcessorError(format!(
1104                        "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
1105                        ip
1106                    )));
1107                }
1108            }
1109            url::Host::Ipv6(ip) => {
1110                if ip.is_loopback() {
1111                    return Err(CamelError::ProcessorError(format!(
1112                        "Loopback IP '{}' not allowed",
1113                        ip
1114                    )));
1115                }
1116            }
1117            url::Host::Domain(domain) => {
1118                // Block common internal domains
1119                let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
1120                if blocked_domains.contains(&domain) {
1121                    return Err(CamelError::ProcessorError(format!(
1122                        "Domain '{}' is not allowed",
1123                        domain
1124                    )));
1125                }
1126            }
1127        }
1128    }
1129
1130    Ok(())
1131}
1132
1133// ---------------------------------------------------------------------------
1134// HttpProducer
1135// ---------------------------------------------------------------------------
1136
1137#[derive(Clone)]
1138struct HttpProducer {
1139    config: Arc<HttpEndpointConfig>,
1140    client: reqwest::Client,
1141}
1142
1143impl HttpProducer {
1144    fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1145        if let Some(ref method) = config.http_method {
1146            return method.to_uppercase();
1147        }
1148        if let Some(method) = exchange
1149            .input
1150            .header("CamelHttpMethod")
1151            .and_then(|v| v.as_str())
1152        {
1153            return method.to_uppercase();
1154        }
1155        if !exchange.input.body.is_empty() {
1156            return "POST".to_string();
1157        }
1158        "GET".to_string()
1159    }
1160
1161    fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1162        if let Some(uri) = exchange
1163            .input
1164            .header("CamelHttpUri")
1165            .and_then(|v| v.as_str())
1166        {
1167            let mut url = uri.to_string();
1168            if let Some(path) = exchange
1169                .input
1170                .header("CamelHttpPath")
1171                .and_then(|v| v.as_str())
1172            {
1173                if !url.ends_with('/') && !path.starts_with('/') {
1174                    url.push('/');
1175                }
1176                url.push_str(path);
1177            }
1178            if let Some(query) = exchange
1179                .input
1180                .header("CamelHttpQuery")
1181                .and_then(|v| v.as_str())
1182            {
1183                url.push('?');
1184                url.push_str(query);
1185            }
1186            return url;
1187        }
1188
1189        let mut url = config.base_url.clone();
1190
1191        if let Some(path) = exchange
1192            .input
1193            .header("CamelHttpPath")
1194            .and_then(|v| v.as_str())
1195        {
1196            if !url.ends_with('/') && !path.starts_with('/') {
1197                url.push('/');
1198            }
1199            url.push_str(path);
1200        }
1201
1202        if let Some(query) = exchange
1203            .input
1204            .header("CamelHttpQuery")
1205            .and_then(|v| v.as_str())
1206        {
1207            url.push('?');
1208            url.push_str(query);
1209        } else if !config.query_params.is_empty() {
1210            // Forward non-Camel query params from config
1211            url.push('?');
1212            let query_string: String = config
1213                .query_params
1214                .iter()
1215                .map(|(k, v)| format!("{k}={v}"))
1216                .collect::<Vec<_>>()
1217                .join("&");
1218            url.push_str(&query_string);
1219        }
1220
1221        url
1222    }
1223
1224    fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1225        status >= range.0 && status <= range.1
1226    }
1227}
1228
1229impl Service<Exchange> for HttpProducer {
1230    type Response = Exchange;
1231    type Error = CamelError;
1232    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1233
1234    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1235        Poll::Ready(Ok(()))
1236    }
1237
1238    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1239        let config = self.config.clone();
1240        let client = self.client.clone();
1241
1242        Box::pin(async move {
1243            let method_str = HttpProducer::resolve_method(&exchange, &config);
1244            let url = HttpProducer::resolve_url(&exchange, &config);
1245
1246            // SECURITY: Validate URL for SSRF
1247            validate_url_for_ssrf(&url, &config)?;
1248
1249            debug!(
1250                correlation_id = %exchange.correlation_id(),
1251                method = %method_str,
1252                url = %url,
1253                "HTTP request"
1254            );
1255
1256            let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1257                CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1258            })?;
1259
1260            let mut request = client.request(method, &url);
1261
1262            if let Some(timeout) = config.response_timeout {
1263                request = request.timeout(timeout);
1264            }
1265
1266            // Inject W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
1267            #[cfg(feature = "otel")]
1268            {
1269                let mut otel_headers = HashMap::new();
1270                camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1271                for (k, v) in otel_headers {
1272                    if let (Ok(name), Ok(val)) = (
1273                        reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1274                        reqwest::header::HeaderValue::from_str(&v),
1275                    ) {
1276                        request = request.header(name, val);
1277                    }
1278                }
1279            }
1280
1281            for (key, value) in &exchange.input.headers {
1282                if !key.starts_with("Camel")
1283                    && let Some(val_str) = value.as_str()
1284                    && let (Ok(name), Ok(val)) = (
1285                        reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1286                        reqwest::header::HeaderValue::from_str(val_str),
1287                    )
1288                {
1289                    request = request.header(name, val);
1290                }
1291            }
1292
1293            match exchange.input.body {
1294                Body::Stream(ref s) => {
1295                    let mut stream_lock = s.stream.lock().await;
1296                    if let Some(stream) = stream_lock.take() {
1297                        request = request.body(reqwest::Body::wrap_stream(stream));
1298                    } else {
1299                        return Err(CamelError::AlreadyConsumed);
1300                    }
1301                }
1302                _ => {
1303                    // For other types, materialize with configured limit
1304                    let body = std::mem::take(&mut exchange.input.body);
1305                    let bytes = body.into_bytes(config.max_body_size).await?;
1306                    if !bytes.is_empty() {
1307                        request = request.body(bytes);
1308                    }
1309                }
1310            }
1311
1312            let response = request
1313                .send()
1314                .await
1315                .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1316
1317            let status_code = response.status().as_u16();
1318            let status_text = response
1319                .status()
1320                .canonical_reason()
1321                .unwrap_or("Unknown")
1322                .to_string();
1323
1324            for (key, value) in response.headers() {
1325                if let Ok(val_str) = value.to_str() {
1326                    exchange
1327                        .input
1328                        .set_header(key.as_str(), serde_json::Value::String(val_str.to_string()));
1329                }
1330            }
1331
1332            exchange.input.set_header(
1333                "CamelHttpResponseCode",
1334                serde_json::Value::Number(status_code.into()),
1335            );
1336            exchange.input.set_header(
1337                "CamelHttpResponseText",
1338                serde_json::Value::String(status_text.clone()),
1339            );
1340
1341            let response_body = response.bytes().await.map_err(|e| {
1342                CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1343            })?;
1344
1345            if config.throw_exception_on_failure
1346                && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1347            {
1348                return Err(CamelError::HttpOperationFailed {
1349                    method: method_str,
1350                    url,
1351                    status_code,
1352                    status_text,
1353                    response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1354                });
1355            }
1356
1357            if !response_body.is_empty() {
1358                exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1359            }
1360
1361            debug!(
1362                correlation_id = %exchange.correlation_id(),
1363                status = status_code,
1364                url = %url,
1365                "HTTP response"
1366            );
1367            Ok(exchange)
1368        })
1369    }
1370}
1371
1372#[cfg(test)]
1373mod tests {
1374    use super::*;
1375    use camel_component_api::{Message, NoOpComponentContext};
1376    use std::sync::Arc;
1377    use std::time::Duration;
1378
1379    fn test_producer_ctx() -> ProducerContext {
1380        ProducerContext::new()
1381    }
1382
1383    #[test]
1384    fn test_http_config_defaults() {
1385        let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1386        assert_eq!(config.base_url, "http://localhost:8080/api");
1387        assert!(config.http_method.is_none());
1388        assert!(config.throw_exception_on_failure);
1389        assert_eq!(config.ok_status_code_range, (200, 299));
1390        assert!(config.response_timeout.is_none());
1391    }
1392
1393    #[test]
1394    fn test_http_config_scheme() {
1395        // UriConfig trait method returns "http" as primary scheme
1396        assert_eq!(HttpEndpointConfig::scheme(), "http");
1397    }
1398
1399    #[test]
1400    fn test_http_config_from_components() {
1401        // Test from_components directly (trait method)
1402        let components = camel_component_api::UriComponents {
1403            scheme: "https".to_string(),
1404            path: "//api.example.com/v1".to_string(),
1405            params: std::collections::HashMap::from([(
1406                "httpMethod".to_string(),
1407                "POST".to_string(),
1408            )]),
1409        };
1410        let config = HttpEndpointConfig::from_components(components).unwrap();
1411        assert_eq!(config.base_url, "https://api.example.com/v1");
1412        assert_eq!(config.http_method, Some("POST".to_string()));
1413    }
1414
1415    #[test]
1416    fn test_http_config_with_options() {
1417        let config = HttpEndpointConfig::from_uri(
1418            "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1419        ).unwrap();
1420        assert_eq!(config.base_url, "https://api.example.com/v1");
1421        assert_eq!(config.http_method, Some("PUT".to_string()));
1422        assert!(!config.throw_exception_on_failure);
1423        assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1424    }
1425
1426    #[test]
1427    fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
1428        let config = HttpConfig::default()
1429            .with_response_timeout_ms(999)
1430            .with_allow_private_ips(true)
1431            .with_blocked_hosts(vec!["evil.com".to_string()])
1432            .with_max_body_size(12345);
1433        let endpoint =
1434            HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
1435        assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
1436        assert!(endpoint.allow_private_ips);
1437        assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
1438        assert_eq!(endpoint.max_body_size, 12345);
1439    }
1440
1441    #[test]
1442    fn test_from_uri_with_defaults_uri_overrides_config() {
1443        let config = HttpConfig::default()
1444            .with_response_timeout_ms(999)
1445            .with_allow_private_ips(true)
1446            .with_blocked_hosts(vec!["evil.com".to_string()])
1447            .with_max_body_size(12345);
1448        let endpoint = HttpEndpointConfig::from_uri_with_defaults(
1449            "http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
1450            &config,
1451        )
1452        .unwrap();
1453        assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
1454        assert!(!endpoint.allow_private_ips);
1455        assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
1456        assert_eq!(endpoint.max_body_size, 99);
1457    }
1458
1459    #[test]
1460    fn test_http_config_ok_status_range() {
1461        let config =
1462            HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1463        assert_eq!(config.ok_status_code_range, (200, 204));
1464    }
1465
1466    #[test]
1467    fn test_http_config_wrong_scheme() {
1468        let result = HttpEndpointConfig::from_uri("file:/tmp");
1469        assert!(result.is_err());
1470    }
1471
1472    #[test]
1473    fn test_http_component_scheme() {
1474        let component = HttpComponent::new();
1475        assert_eq!(component.scheme(), "http");
1476    }
1477
1478    #[test]
1479    fn test_https_component_scheme() {
1480        let component = HttpsComponent::new();
1481        assert_eq!(component.scheme(), "https");
1482    }
1483
1484    #[test]
1485    fn test_http_endpoint_creates_consumer() {
1486        let component = HttpComponent::new();
1487        let ctx = NoOpComponentContext;
1488        let endpoint = component
1489            .create_endpoint("http://0.0.0.0:19100/test", &ctx)
1490            .unwrap();
1491        assert!(endpoint.create_consumer().is_ok());
1492    }
1493
1494    #[test]
1495    fn test_https_endpoint_creates_consumer() {
1496        let component = HttpsComponent::new();
1497        let ctx = NoOpComponentContext;
1498        let endpoint = component
1499            .create_endpoint("https://0.0.0.0:8443/test", &ctx)
1500            .unwrap();
1501        assert!(endpoint.create_consumer().is_ok());
1502    }
1503
1504    #[test]
1505    fn test_http_endpoint_creates_producer() {
1506        let ctx = test_producer_ctx();
1507        let component = HttpComponent::new();
1508        let endpoint_ctx = NoOpComponentContext;
1509        let endpoint = component
1510            .create_endpoint("http://localhost/api", &endpoint_ctx)
1511            .unwrap();
1512        assert!(endpoint.create_producer(&ctx).is_ok());
1513    }
1514
1515    // -----------------------------------------------------------------------
1516    // Producer tests
1517    // -----------------------------------------------------------------------
1518
1519    async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
1520        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1521        let addr = listener.local_addr().unwrap();
1522        let url = format!("http://127.0.0.1:{}", addr.port());
1523
1524        let handle = tokio::spawn(async move {
1525            loop {
1526                if let Ok((mut stream, _)) = listener.accept().await {
1527                    tokio::spawn(async move {
1528                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1529                        let mut buf = vec![0u8; 4096];
1530                        let n = stream.read(&mut buf).await.unwrap_or(0);
1531                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
1532
1533                        let method = request.split_whitespace().next().unwrap_or("GET");
1534
1535                        let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
1536                        let response = format!(
1537                            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
1538                            body.len(),
1539                            body
1540                        );
1541                        let _ = stream.write_all(response.as_bytes()).await;
1542                    });
1543                }
1544            }
1545        });
1546
1547        (url, handle)
1548    }
1549
1550    async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
1551        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1552        let addr = listener.local_addr().unwrap();
1553        let url = format!("http://127.0.0.1:{}", addr.port());
1554
1555        let handle = tokio::spawn(async move {
1556            loop {
1557                if let Ok((mut stream, _)) = listener.accept().await {
1558                    let status = status;
1559                    tokio::spawn(async move {
1560                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1561                        let mut buf = vec![0u8; 4096];
1562                        let _ = stream.read(&mut buf).await;
1563
1564                        let status_text = match status {
1565                            404 => "Not Found",
1566                            500 => "Internal Server Error",
1567                            _ => "Error",
1568                        };
1569                        let body = "error body";
1570                        let response = format!(
1571                            "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
1572                            status,
1573                            status_text,
1574                            body.len(),
1575                            body
1576                        );
1577                        let _ = stream.write_all(response.as_bytes()).await;
1578                    });
1579                }
1580            }
1581        });
1582
1583        (url, handle)
1584    }
1585
1586    #[tokio::test]
1587    async fn test_http_producer_get_request() {
1588        use tower::ServiceExt;
1589
1590        let (url, _handle) = start_test_server().await;
1591        let ctx = test_producer_ctx();
1592
1593        let component = HttpComponent::new();
1594        let endpoint_ctx = NoOpComponentContext;
1595        let endpoint = component
1596            .create_endpoint(
1597                &format!("{url}/api/test?allowPrivateIps=true"),
1598                &endpoint_ctx,
1599            )
1600            .unwrap();
1601        let producer = endpoint.create_producer(&ctx).unwrap();
1602
1603        let exchange = Exchange::new(Message::default());
1604        let result = producer.oneshot(exchange).await.unwrap();
1605
1606        let status = result
1607            .input
1608            .header("CamelHttpResponseCode")
1609            .and_then(|v| v.as_u64())
1610            .unwrap();
1611        assert_eq!(status, 200);
1612
1613        assert!(!result.input.body.is_empty());
1614    }
1615
1616    #[tokio::test]
1617    async fn test_http_producer_post_with_body() {
1618        use tower::ServiceExt;
1619
1620        let (url, _handle) = start_test_server().await;
1621        let ctx = test_producer_ctx();
1622
1623        let component = HttpComponent::new();
1624        let endpoint_ctx = NoOpComponentContext;
1625        let endpoint = component
1626            .create_endpoint(
1627                &format!("{url}/api/data?allowPrivateIps=true"),
1628                &endpoint_ctx,
1629            )
1630            .unwrap();
1631        let producer = endpoint.create_producer(&ctx).unwrap();
1632
1633        let exchange = Exchange::new(Message::new("request body"));
1634        let result = producer.oneshot(exchange).await.unwrap();
1635
1636        let status = result
1637            .input
1638            .header("CamelHttpResponseCode")
1639            .and_then(|v| v.as_u64())
1640            .unwrap();
1641        assert_eq!(status, 200);
1642    }
1643
1644    #[tokio::test]
1645    async fn test_http_producer_method_from_header() {
1646        use tower::ServiceExt;
1647
1648        let (url, _handle) = start_test_server().await;
1649        let ctx = test_producer_ctx();
1650
1651        let component = HttpComponent::new();
1652        let endpoint_ctx = NoOpComponentContext;
1653        let endpoint = component
1654            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
1655            .unwrap();
1656        let producer = endpoint.create_producer(&ctx).unwrap();
1657
1658        let mut exchange = Exchange::new(Message::default());
1659        exchange.input.set_header(
1660            "CamelHttpMethod",
1661            serde_json::Value::String("DELETE".to_string()),
1662        );
1663
1664        let result = producer.oneshot(exchange).await.unwrap();
1665        let status = result
1666            .input
1667            .header("CamelHttpResponseCode")
1668            .and_then(|v| v.as_u64())
1669            .unwrap();
1670        assert_eq!(status, 200);
1671    }
1672
1673    #[tokio::test]
1674    async fn test_http_producer_forced_method() {
1675        use tower::ServiceExt;
1676
1677        let (url, _handle) = start_test_server().await;
1678        let ctx = test_producer_ctx();
1679
1680        let component = HttpComponent::new();
1681        let endpoint_ctx = NoOpComponentContext;
1682        let endpoint = component
1683            .create_endpoint(
1684                &format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"),
1685                &endpoint_ctx,
1686            )
1687            .unwrap();
1688        let producer = endpoint.create_producer(&ctx).unwrap();
1689
1690        let exchange = Exchange::new(Message::default());
1691        let result = producer.oneshot(exchange).await.unwrap();
1692
1693        let status = result
1694            .input
1695            .header("CamelHttpResponseCode")
1696            .and_then(|v| v.as_u64())
1697            .unwrap();
1698        assert_eq!(status, 200);
1699    }
1700
1701    #[tokio::test]
1702    async fn test_http_producer_throw_exception_on_failure() {
1703        use tower::ServiceExt;
1704
1705        let (url, _handle) = start_status_server(404).await;
1706        let ctx = test_producer_ctx();
1707
1708        let component = HttpComponent::new();
1709        let endpoint_ctx = NoOpComponentContext;
1710        let endpoint = component
1711            .create_endpoint(
1712                &format!("{url}/not-found?allowPrivateIps=true"),
1713                &endpoint_ctx,
1714            )
1715            .unwrap();
1716        let producer = endpoint.create_producer(&ctx).unwrap();
1717
1718        let exchange = Exchange::new(Message::default());
1719        let result = producer.oneshot(exchange).await;
1720        assert!(result.is_err());
1721
1722        match result.unwrap_err() {
1723            CamelError::HttpOperationFailed { status_code, .. } => {
1724                assert_eq!(status_code, 404);
1725            }
1726            e => panic!("Expected HttpOperationFailed, got: {e}"),
1727        }
1728    }
1729
1730    #[tokio::test]
1731    async fn test_http_producer_no_throw_on_failure() {
1732        use tower::ServiceExt;
1733
1734        let (url, _handle) = start_status_server(500).await;
1735        let ctx = test_producer_ctx();
1736
1737        let component = HttpComponent::new();
1738        let endpoint_ctx = NoOpComponentContext;
1739        let endpoint = component
1740            .create_endpoint(
1741                &format!("{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"),
1742                &endpoint_ctx,
1743            )
1744            .unwrap();
1745        let producer = endpoint.create_producer(&ctx).unwrap();
1746
1747        let exchange = Exchange::new(Message::default());
1748        let result = producer.oneshot(exchange).await.unwrap();
1749
1750        let status = result
1751            .input
1752            .header("CamelHttpResponseCode")
1753            .and_then(|v| v.as_u64())
1754            .unwrap();
1755        assert_eq!(status, 500);
1756    }
1757
1758    #[tokio::test]
1759    async fn test_http_producer_uri_override() {
1760        use tower::ServiceExt;
1761
1762        let (url, _handle) = start_test_server().await;
1763        let ctx = test_producer_ctx();
1764
1765        let component = HttpComponent::new();
1766        let endpoint_ctx = NoOpComponentContext;
1767        let endpoint = component
1768            .create_endpoint(
1769                "http://localhost:1/does-not-exist?allowPrivateIps=true",
1770                &endpoint_ctx,
1771            )
1772            .unwrap();
1773        let producer = endpoint.create_producer(&ctx).unwrap();
1774
1775        let mut exchange = Exchange::new(Message::default());
1776        exchange.input.set_header(
1777            "CamelHttpUri",
1778            serde_json::Value::String(format!("{url}/api")),
1779        );
1780
1781        let result = producer.oneshot(exchange).await.unwrap();
1782        let status = result
1783            .input
1784            .header("CamelHttpResponseCode")
1785            .and_then(|v| v.as_u64())
1786            .unwrap();
1787        assert_eq!(status, 200);
1788    }
1789
1790    #[tokio::test]
1791    async fn test_http_producer_response_headers_mapped() {
1792        use tower::ServiceExt;
1793
1794        let (url, _handle) = start_test_server().await;
1795        let ctx = test_producer_ctx();
1796
1797        let component = HttpComponent::new();
1798        let endpoint_ctx = NoOpComponentContext;
1799        let endpoint = component
1800            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
1801            .unwrap();
1802        let producer = endpoint.create_producer(&ctx).unwrap();
1803
1804        let exchange = Exchange::new(Message::default());
1805        let result = producer.oneshot(exchange).await.unwrap();
1806
1807        assert!(
1808            result.input.header("content-type").is_some()
1809                || result.input.header("Content-Type").is_some()
1810        );
1811        assert!(result.input.header("CamelHttpResponseText").is_some());
1812    }
1813
1814    // -----------------------------------------------------------------------
1815    // Bug fix tests: Client configuration per-endpoint
1816    // -----------------------------------------------------------------------
1817
1818    async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
1819        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1820        let addr = listener.local_addr().unwrap();
1821        let url = format!("http://127.0.0.1:{}", addr.port());
1822
1823        let handle = tokio::spawn(async move {
1824            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1825            loop {
1826                if let Ok((mut stream, _)) = listener.accept().await {
1827                    tokio::spawn(async move {
1828                        let mut buf = vec![0u8; 4096];
1829                        let n = stream.read(&mut buf).await.unwrap_or(0);
1830                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
1831
1832                        // Check if this is a request to /final
1833                        if request.contains("GET /final") {
1834                            let body = r#"{"status":"final"}"#;
1835                            let response = format!(
1836                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1837                                body.len(),
1838                                body
1839                            );
1840                            let _ = stream.write_all(response.as_bytes()).await;
1841                        } else {
1842                            // Redirect to /final
1843                            let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
1844                            let _ = stream.write_all(response.as_bytes()).await;
1845                        }
1846                    });
1847                }
1848            }
1849        });
1850
1851        (url, handle)
1852    }
1853
1854    #[tokio::test]
1855    async fn test_follow_redirects_false_does_not_follow() {
1856        use tower::ServiceExt;
1857
1858        let (url, _handle) = start_redirect_server().await;
1859        let ctx = test_producer_ctx();
1860
1861        let component =
1862            HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
1863        let endpoint_ctx = NoOpComponentContext;
1864        let endpoint = component
1865            .create_endpoint(
1866                &format!("{url}?throwExceptionOnFailure=false&allowPrivateIps=true"),
1867                &endpoint_ctx,
1868            )
1869            .unwrap();
1870        let producer = endpoint.create_producer(&ctx).unwrap();
1871
1872        let exchange = Exchange::new(Message::default());
1873        let result = producer.oneshot(exchange).await.unwrap();
1874
1875        // Should get 302, NOT follow redirect to 200
1876        let status = result
1877            .input
1878            .header("CamelHttpResponseCode")
1879            .and_then(|v| v.as_u64())
1880            .unwrap();
1881        assert_eq!(
1882            status, 302,
1883            "Should NOT follow redirect when followRedirects=false"
1884        );
1885    }
1886
1887    #[tokio::test]
1888    async fn test_follow_redirects_true_follows_redirect() {
1889        use tower::ServiceExt;
1890
1891        let (url, _handle) = start_redirect_server().await;
1892        let ctx = test_producer_ctx();
1893
1894        let component =
1895            HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
1896        let endpoint_ctx = NoOpComponentContext;
1897        let endpoint = component
1898            .create_endpoint(&format!("{url}?allowPrivateIps=true"), &endpoint_ctx)
1899            .unwrap();
1900        let producer = endpoint.create_producer(&ctx).unwrap();
1901
1902        let exchange = Exchange::new(Message::default());
1903        let result = producer.oneshot(exchange).await.unwrap();
1904
1905        // Should follow redirect and get 200
1906        let status = result
1907            .input
1908            .header("CamelHttpResponseCode")
1909            .and_then(|v| v.as_u64())
1910            .unwrap();
1911        assert_eq!(
1912            status, 200,
1913            "Should follow redirect when followRedirects=true"
1914        );
1915    }
1916
1917    #[tokio::test]
1918    async fn test_query_params_forwarded_to_http_request() {
1919        use tower::ServiceExt;
1920
1921        let (url, _handle) = start_test_server().await;
1922        let ctx = test_producer_ctx();
1923
1924        let component = HttpComponent::new();
1925        let endpoint_ctx = NoOpComponentContext;
1926        // apiKey is NOT a Camel option, should be forwarded as query param
1927        let endpoint = component
1928            .create_endpoint(
1929                &format!("{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"),
1930                &endpoint_ctx,
1931            )
1932            .unwrap();
1933        let producer = endpoint.create_producer(&ctx).unwrap();
1934
1935        let exchange = Exchange::new(Message::default());
1936        let result = producer.oneshot(exchange).await.unwrap();
1937
1938        // The test server returns the request info in response
1939        // We just verify it succeeds (the query param was sent)
1940        let status = result
1941            .input
1942            .header("CamelHttpResponseCode")
1943            .and_then(|v| v.as_u64())
1944            .unwrap();
1945        assert_eq!(status, 200);
1946    }
1947
1948    #[tokio::test]
1949    async fn test_non_camel_query_params_are_forwarded() {
1950        // This test verifies Bug #3 fix: non-Camel options should be forwarded
1951        // We'll test the config parsing, not the actual HTTP call
1952        let config = HttpEndpointConfig::from_uri(
1953            "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
1954        )
1955        .unwrap();
1956
1957        // apiKey and token are NOT Camel options, should be forwarded
1958        assert!(
1959            config.query_params.contains_key("apiKey"),
1960            "apiKey should be preserved"
1961        );
1962        assert!(
1963            config.query_params.contains_key("token"),
1964            "token should be preserved"
1965        );
1966        assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
1967        assert_eq!(config.query_params.get("token").unwrap(), "abc456");
1968
1969        // httpMethod IS a Camel option, should NOT be in query_params
1970        assert!(
1971            !config.query_params.contains_key("httpMethod"),
1972            "httpMethod should not be forwarded"
1973        );
1974    }
1975
1976    // -----------------------------------------------------------------------
1977    // SSRF Protection tests
1978    // -----------------------------------------------------------------------
1979
1980    #[tokio::test]
1981    async fn test_http_producer_blocks_metadata_endpoint() {
1982        use tower::ServiceExt;
1983
1984        let ctx = test_producer_ctx();
1985        let component = HttpComponent::new();
1986        let endpoint_ctx = NoOpComponentContext;
1987        let endpoint = component
1988            .create_endpoint(
1989                "http://example.com/api?allowPrivateIps=false",
1990                &endpoint_ctx,
1991            )
1992            .unwrap();
1993        let producer = endpoint.create_producer(&ctx).unwrap();
1994
1995        let mut exchange = Exchange::new(Message::default());
1996        exchange.input.set_header(
1997            "CamelHttpUri",
1998            serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
1999        );
2000
2001        let result = producer.oneshot(exchange).await;
2002        assert!(result.is_err(), "Should block AWS metadata endpoint");
2003
2004        let err = result.unwrap_err();
2005        assert!(
2006            err.to_string().contains("Private IP"),
2007            "Error should mention private IP blocking, got: {}",
2008            err
2009        );
2010    }
2011
2012    #[test]
2013    fn test_ssrf_config_defaults() {
2014        let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
2015        assert!(
2016            !config.allow_private_ips,
2017            "Private IPs should be blocked by default"
2018        );
2019        assert!(
2020            config.blocked_hosts.is_empty(),
2021            "Blocked hosts should be empty by default"
2022        );
2023    }
2024
2025    #[test]
2026    fn test_ssrf_config_allow_private_ips() {
2027        let config =
2028            HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
2029        assert!(
2030            config.allow_private_ips,
2031            "Private IPs should be allowed when explicitly set"
2032        );
2033    }
2034
2035    #[test]
2036    fn test_ssrf_config_blocked_hosts() {
2037        let config = HttpEndpointConfig::from_uri(
2038            "http://example.com/api?blockedHosts=evil.com,malware.net",
2039        )
2040        .unwrap();
2041        assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
2042    }
2043
2044    #[tokio::test]
2045    async fn test_http_producer_blocks_localhost() {
2046        use tower::ServiceExt;
2047
2048        let ctx = test_producer_ctx();
2049        let component = HttpComponent::new();
2050        let endpoint_ctx = NoOpComponentContext;
2051        let endpoint = component
2052            .create_endpoint("http://example.com/api", &endpoint_ctx)
2053            .unwrap();
2054        let producer = endpoint.create_producer(&ctx).unwrap();
2055
2056        let mut exchange = Exchange::new(Message::default());
2057        exchange.input.set_header(
2058            "CamelHttpUri",
2059            serde_json::Value::String("http://localhost:8080/internal".to_string()),
2060        );
2061
2062        let result = producer.oneshot(exchange).await;
2063        assert!(result.is_err(), "Should block localhost");
2064    }
2065
2066    #[tokio::test]
2067    async fn test_http_producer_blocks_loopback_ip() {
2068        use tower::ServiceExt;
2069
2070        let ctx = test_producer_ctx();
2071        let component = HttpComponent::new();
2072        let endpoint_ctx = NoOpComponentContext;
2073        let endpoint = component
2074            .create_endpoint("http://example.com/api", &endpoint_ctx)
2075            .unwrap();
2076        let producer = endpoint.create_producer(&ctx).unwrap();
2077
2078        let mut exchange = Exchange::new(Message::default());
2079        exchange.input.set_header(
2080            "CamelHttpUri",
2081            serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
2082        );
2083
2084        let result = producer.oneshot(exchange).await;
2085        assert!(result.is_err(), "Should block loopback IP");
2086    }
2087
2088    #[tokio::test]
2089    async fn test_http_producer_allows_private_ip_when_enabled() {
2090        use tower::ServiceExt;
2091
2092        let ctx = test_producer_ctx();
2093        let component = HttpComponent::new();
2094        let endpoint_ctx = NoOpComponentContext;
2095        // With allowPrivateIps=true, the validation should pass
2096        // (actual connection will fail, but that's expected)
2097        let endpoint = component
2098            .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true", &endpoint_ctx)
2099            .unwrap();
2100        let producer = endpoint.create_producer(&ctx).unwrap();
2101
2102        let exchange = Exchange::new(Message::default());
2103
2104        // The request will fail because we can't connect, but it should NOT fail
2105        // due to SSRF protection
2106        let result = producer.oneshot(exchange).await;
2107        // We expect connection error, not SSRF error
2108        if let Err(ref e) = result {
2109            let err_str = e.to_string();
2110            assert!(
2111                !err_str.contains("Private IP") && !err_str.contains("not allowed"),
2112                "Should not be SSRF error, got: {}",
2113                err_str
2114            );
2115        }
2116    }
2117
2118    // -----------------------------------------------------------------------
2119    // HttpServerConfig tests
2120    // -----------------------------------------------------------------------
2121
2122    #[test]
2123    fn test_http_server_config_parse() {
2124        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
2125        assert_eq!(cfg.host, "0.0.0.0");
2126        assert_eq!(cfg.port, 8080);
2127        assert_eq!(cfg.path, "/orders");
2128        assert_eq!(cfg.max_inflight_requests, 1024);
2129    }
2130
2131    #[test]
2132    fn test_http_server_config_scheme() {
2133        // UriConfig trait method returns "http" as primary scheme
2134        assert_eq!(HttpServerConfig::scheme(), "http");
2135    }
2136
2137    #[test]
2138    fn test_http_server_config_from_components() {
2139        // Test from_components directly (trait method)
2140        let components = camel_component_api::UriComponents {
2141            scheme: "https".to_string(),
2142            path: "//0.0.0.0:8443/api".to_string(),
2143            params: std::collections::HashMap::from([
2144                ("maxRequestBody".to_string(), "5242880".to_string()),
2145                ("maxInflightRequests".to_string(), "7".to_string()),
2146            ]),
2147        };
2148        let cfg = HttpServerConfig::from_components(components).unwrap();
2149        assert_eq!(cfg.host, "0.0.0.0");
2150        assert_eq!(cfg.port, 8443);
2151        assert_eq!(cfg.path, "/api");
2152        assert_eq!(cfg.max_request_body, 5242880);
2153        assert_eq!(cfg.max_inflight_requests, 7);
2154    }
2155
2156    #[test]
2157    fn test_http_server_config_default_path() {
2158        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
2159        assert_eq!(cfg.path, "/");
2160    }
2161
2162    #[test]
2163    fn test_http_server_config_wrong_scheme() {
2164        assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
2165    }
2166
2167    #[test]
2168    fn test_http_server_config_invalid_port() {
2169        assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
2170    }
2171
2172    #[test]
2173    fn test_http_server_config_default_port_by_scheme() {
2174        // HTTP without explicit port should default to 80
2175        let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
2176        assert_eq!(cfg_http.port, 80);
2177
2178        // HTTPS without explicit port should default to 443
2179        let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
2180        assert_eq!(cfg_https.port, 443);
2181    }
2182
2183    #[test]
2184    fn test_request_envelope_and_reply_are_send() {
2185        fn assert_send<T: Send>() {}
2186        assert_send::<RequestEnvelope>();
2187        assert_send::<HttpReply>();
2188    }
2189
2190    // -----------------------------------------------------------------------
2191    // ServerRegistry tests
2192    // -----------------------------------------------------------------------
2193
2194    #[test]
2195    fn test_server_registry_global_is_singleton() {
2196        let r1 = ServerRegistry::global();
2197        let r2 = ServerRegistry::global();
2198        assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
2199    }
2200
2201    #[tokio::test]
2202    async fn test_concurrent_get_or_spawn_returns_same_dispatch() {
2203        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2204        let port = listener.local_addr().unwrap().port();
2205        drop(listener);
2206
2207        let results: Arc<std::sync::Mutex<Vec<DispatchTable>>> =
2208            Arc::new(std::sync::Mutex::new(Vec::new()));
2209
2210        let mut handles = Vec::new();
2211        for _ in 0..4 {
2212            let results = results.clone();
2213            handles.push(tokio::spawn(async move {
2214                let dispatch = ServerRegistry::global()
2215                    .get_or_spawn(
2216                        "127.0.0.1",
2217                        port,
2218                        2 * 1024 * 1024,
2219                        10 * 1024 * 1024,
2220                        1024,
2221                    )
2222                    .await
2223                    .unwrap();
2224                results.lock().unwrap().push(dispatch);
2225            }));
2226        }
2227
2228        for h in handles {
2229            h.await.unwrap();
2230        }
2231
2232        let dispatches = results.lock().unwrap();
2233        assert_eq!(dispatches.len(), 4);
2234        for i in 1..dispatches.len() {
2235            assert!(
2236                Arc::ptr_eq(&dispatches[0], &dispatches[i]),
2237                "all concurrent callers should get the same dispatch table"
2238            );
2239        }
2240    }
2241
2242    #[test]
2243    fn test_server_registry_distinguishes_host_and_port() {
2244        let rt = tokio::runtime::Runtime::new().expect("runtime");
2245        rt.block_on(async {
2246            let registry = ServerRegistry::global();
2247            // Use two distinct host values with same configured port key.
2248            // Port 0 is acceptable here because the registry key uses the configured
2249            // tuple, not the OS-assigned ephemeral port.
2250            let d1 = registry
2251                .get_or_spawn("127.0.0.1", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2252                .await;
2253            let d2 = registry
2254                .get_or_spawn("0.0.0.0", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2255                .await;
2256            assert!(d1.is_ok());
2257            assert!(d2.is_ok());
2258            assert!(!Arc::ptr_eq(&d1.unwrap(), &d2.unwrap()));
2259        });
2260    }
2261
2262    #[tokio::test]
2263    async fn test_shared_server_max_request_body_policy_is_deterministic() {
2264        let registry = ServerRegistry::global();
2265        // First registration: maxRequestBody = 1 MB
2266        let d1 = registry
2267            .get_or_spawn("127.0.0.1", 9991, 1024 * 1024, 10 * 1024 * 1024, 1024)
2268            .await;
2269        assert!(d1.is_ok());
2270
2271        // Second registration on same (host,port): maxRequestBody = 2 MB
2272        // Expected: explicit EndpointCreationFailed about incompatible maxRequestBody
2273        let d2 = registry
2274            .get_or_spawn("127.0.0.1", 9991, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2275            .await;
2276        assert!(d2.is_err());
2277        let err = d2.unwrap_err();
2278        assert!(
2279            err.to_string().contains("maxRequestBody") || err.to_string().contains("incompatible"),
2280            "Expected incompatible maxRequestBody error, got: {}",
2281            err
2282        );
2283    }
2284
2285    // -----------------------------------------------------------------------
2286    // Axum dispatch handler tests
2287    // -----------------------------------------------------------------------
2288
2289    #[tokio::test]
2290    async fn test_dispatch_handler_returns_404_for_unknown_path() {
2291        let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
2292        // Nothing registered in the dispatch table
2293        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2294        let port = listener.local_addr().unwrap().port();
2295        tokio::spawn(run_axum_server(
2296            listener,
2297            dispatch,
2298            2 * 1024 * 1024,
2299            10 * 1024 * 1024,
2300            Arc::new(tokio::sync::Semaphore::new(1024)),
2301        ));
2302
2303        // Wait for server to start
2304        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2305
2306        let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
2307            .await
2308            .unwrap();
2309        assert_eq!(resp.status().as_u16(), 404);
2310    }
2311
2312    // -----------------------------------------------------------------------
2313    // HttpConsumer tests
2314    // -----------------------------------------------------------------------
2315
2316    #[tokio::test]
2317    async fn test_http_consumer_start_registers_path() {
2318        use camel_component_api::ConsumerContext;
2319
2320        // Get an OS-assigned free port
2321        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2322        let port = listener.local_addr().unwrap().port();
2323        drop(listener); // Release port — ServerRegistry will rebind it
2324
2325        let consumer_cfg = HttpServerConfig {
2326            host: "127.0.0.1".to_string(),
2327            port,
2328            path: "/ping".to_string(),
2329            max_request_body: 2 * 1024 * 1024,
2330            max_response_body: 10 * 1024 * 1024,
2331            max_inflight_requests: 1024,
2332        };
2333        let mut consumer = HttpConsumer::new(consumer_cfg);
2334
2335        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
2336        let token = tokio_util::sync::CancellationToken::new();
2337        let ctx = ConsumerContext::new(tx, token.clone());
2338
2339        tokio::spawn(async move {
2340            consumer.start(ctx).await.unwrap();
2341        });
2342
2343        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2344
2345        let client = reqwest::Client::new();
2346        let resp_future = client
2347            .post(format!("http://127.0.0.1:{port}/ping"))
2348            .body("hello world")
2349            .send();
2350
2351        let (http_result, _) = tokio::join!(resp_future, async {
2352            if let Some(mut envelope) = rx.recv().await {
2353                // Set a custom status code
2354                envelope.exchange.input.set_header(
2355                    "CamelHttpResponseCode",
2356                    serde_json::Value::Number(201.into()),
2357                );
2358                if let Some(reply_tx) = envelope.reply_tx {
2359                    let _ = reply_tx.send(Ok(envelope.exchange));
2360                }
2361            }
2362        });
2363
2364        let resp = http_result.unwrap();
2365        assert_eq!(resp.status().as_u16(), 201);
2366
2367        token.cancel();
2368    }
2369
2370    #[tokio::test]
2371    async fn test_http_consumer_returns_503_when_inflight_limit_reached() {
2372        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2373
2374        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2375        let port = listener.local_addr().unwrap().port();
2376        drop(listener);
2377
2378        let consumer_cfg = HttpServerConfig {
2379            host: "127.0.0.1".to_string(),
2380            port,
2381            path: "/saturation".to_string(),
2382            max_request_body: 2 * 1024 * 1024,
2383            max_response_body: 10 * 1024 * 1024,
2384            max_inflight_requests: 1,
2385        };
2386        let mut consumer = HttpConsumer::new(consumer_cfg);
2387
2388        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2389        let token = tokio_util::sync::CancellationToken::new();
2390        let ctx = ConsumerContext::new(tx, token.clone());
2391        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2392        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2393
2394        let (first_seen_tx, first_seen_rx) = tokio::sync::oneshot::channel::<()>();
2395        let (unblock_first_tx, unblock_first_rx) = tokio::sync::oneshot::channel::<()>();
2396
2397        tokio::spawn(async move {
2398            let mut first_seen_tx = Some(first_seen_tx);
2399            let mut unblock_first_rx = Some(unblock_first_rx);
2400
2401            while let Some(envelope) = rx.recv().await {
2402                if let Some(tx) = first_seen_tx.take() {
2403                    let _ = tx.send(());
2404                    if let Some(rx_unblock) = unblock_first_rx.take() {
2405                        let _ = rx_unblock.await;
2406                    }
2407                }
2408
2409                if let Some(reply_tx) = envelope.reply_tx {
2410                    let _ = reply_tx.send(Ok(envelope.exchange));
2411                }
2412            }
2413        });
2414
2415        let client = reqwest::Client::new();
2416        let first_req = {
2417            let client = client.clone();
2418            async move {
2419                client
2420                    .get(format!("http://127.0.0.1:{port}/saturation"))
2421                    .send()
2422                    .await
2423                    .unwrap()
2424            }
2425        };
2426
2427        let first_handle = tokio::spawn(first_req);
2428        first_seen_rx.await.unwrap();
2429
2430        let second_resp = client
2431            .get(format!("http://127.0.0.1:{port}/saturation"))
2432            .send()
2433            .await
2434            .unwrap();
2435
2436        assert_eq!(second_resp.status().as_u16(), 503);
2437
2438        let _ = unblock_first_tx.send(());
2439        let first_resp = first_handle.await.unwrap();
2440        assert_eq!(first_resp.status().as_u16(), 200);
2441
2442        token.cancel();
2443    }
2444
2445    #[tokio::test]
2446    async fn test_http_consumer_enforces_max_response_body_for_bytes() {
2447        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2448
2449        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2450        let port = listener.local_addr().unwrap().port();
2451        drop(listener);
2452
2453        let consumer_cfg = HttpServerConfig {
2454            host: "127.0.0.1".to_string(),
2455            port,
2456            path: "/limit-bytes".to_string(),
2457            max_request_body: 2 * 1024 * 1024,
2458            max_response_body: 16,
2459            max_inflight_requests: 1024,
2460        };
2461        let mut consumer = HttpConsumer::new(consumer_cfg);
2462
2463        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2464        let token = tokio_util::sync::CancellationToken::new();
2465        let ctx = ConsumerContext::new(tx, token.clone());
2466        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2467        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2468
2469        let client = reqwest::Client::new();
2470        let send_fut = client
2471            .get(format!("http://127.0.0.1:{port}/limit-bytes"))
2472            .send();
2473
2474        let (http_result, _) = tokio::join!(send_fut, async {
2475            if let Some(mut envelope) = rx.recv().await {
2476                envelope.exchange.input.body =
2477                    camel_component_api::Body::Bytes(bytes::Bytes::from(vec![b'x'; 32]));
2478                if let Some(reply_tx) = envelope.reply_tx {
2479                    let _ = reply_tx.send(Ok(envelope.exchange));
2480                }
2481            }
2482        });
2483
2484        let resp = http_result.unwrap();
2485        assert_eq!(resp.status().as_u16(), 500);
2486        let body = resp.text().await.unwrap();
2487        assert_eq!(body, "Response body exceeds configured limit");
2488        token.cancel();
2489    }
2490
2491    #[tokio::test]
2492    async fn test_http_consumer_enforces_max_response_body_for_json() {
2493        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2494
2495        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2496        let port = listener.local_addr().unwrap().port();
2497        drop(listener);
2498
2499        let consumer_cfg = HttpServerConfig {
2500            host: "127.0.0.1".to_string(),
2501            port,
2502            path: "/limit-json".to_string(),
2503            max_request_body: 2 * 1024 * 1024,
2504            max_response_body: 16,
2505            max_inflight_requests: 1024,
2506        };
2507        let mut consumer = HttpConsumer::new(consumer_cfg);
2508
2509        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2510        let token = tokio_util::sync::CancellationToken::new();
2511        let ctx = ConsumerContext::new(tx, token.clone());
2512        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2513        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2514
2515        let client = reqwest::Client::new();
2516        let send_fut = client.get(format!("http://127.0.0.1:{port}/limit-json")).send();
2517
2518        let (http_result, _) = tokio::join!(send_fut, async {
2519            if let Some(mut envelope) = rx.recv().await {
2520                envelope.exchange.input.body = camel_component_api::Body::Json(
2521                    serde_json::json!({"message":"this response is bigger than sixteen"}),
2522                );
2523                if let Some(reply_tx) = envelope.reply_tx {
2524                    let _ = reply_tx.send(Ok(envelope.exchange));
2525                }
2526            }
2527        });
2528
2529        let resp = http_result.unwrap();
2530        assert_eq!(resp.status().as_u16(), 500);
2531        let body = resp.text().await.unwrap();
2532        assert_eq!(body, "Response body exceeds configured limit");
2533        token.cancel();
2534    }
2535
2536    #[tokio::test]
2537    async fn test_http_consumer_enforces_max_response_body_for_xml() {
2538        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2539
2540        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2541        let port = listener.local_addr().unwrap().port();
2542        drop(listener);
2543
2544        let consumer_cfg = HttpServerConfig {
2545            host: "127.0.0.1".to_string(),
2546            port,
2547            path: "/limit-xml".to_string(),
2548            max_request_body: 2 * 1024 * 1024,
2549            max_response_body: 16,
2550            max_inflight_requests: 1024,
2551        };
2552        let mut consumer = HttpConsumer::new(consumer_cfg);
2553
2554        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2555        let token = tokio_util::sync::CancellationToken::new();
2556        let ctx = ConsumerContext::new(tx, token.clone());
2557        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2558        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2559
2560        let client = reqwest::Client::new();
2561        let send_fut = client.get(format!("http://127.0.0.1:{port}/limit-xml")).send();
2562
2563        let (http_result, _) = tokio::join!(send_fut, async {
2564            if let Some(mut envelope) = rx.recv().await {
2565                envelope.exchange.input.body =
2566                    camel_component_api::Body::Xml("<root><value>way-too-large</value></root>".into());
2567                if let Some(reply_tx) = envelope.reply_tx {
2568                    let _ = reply_tx.send(Ok(envelope.exchange));
2569                }
2570            }
2571        });
2572
2573        let resp = http_result.unwrap();
2574        assert_eq!(resp.status().as_u16(), 500);
2575        let body = resp.text().await.unwrap();
2576        assert_eq!(body, "Response body exceeds configured limit");
2577        token.cancel();
2578    }
2579
2580    #[tokio::test]
2581    async fn test_http_consumer_does_not_enforce_max_response_body_for_stream() {
2582        use camel_component_api::{CamelError, ConsumerContext, ExchangeEnvelope, StreamBody, StreamMetadata};
2583        use futures::stream;
2584
2585        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2586        let port = listener.local_addr().unwrap().port();
2587        drop(listener);
2588
2589        let consumer_cfg = HttpServerConfig {
2590            host: "127.0.0.1".to_string(),
2591            port,
2592            path: "/limit-stream".to_string(),
2593            max_request_body: 2 * 1024 * 1024,
2594            max_response_body: 16,
2595            max_inflight_requests: 1024,
2596        };
2597        let mut consumer = HttpConsumer::new(consumer_cfg);
2598
2599        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2600        let token = tokio_util::sync::CancellationToken::new();
2601        let ctx = ConsumerContext::new(tx, token.clone());
2602        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2603        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2604
2605        let client = reqwest::Client::new();
2606        let send_fut = client
2607            .get(format!("http://127.0.0.1:{port}/limit-stream"))
2608            .send();
2609
2610        let (http_result, _) = tokio::join!(send_fut, async {
2611            if let Some(mut envelope) = rx.recv().await {
2612                let chunks: Vec<Result<bytes::Bytes, CamelError>> =
2613                    vec![Ok(bytes::Bytes::from(vec![b'x'; 32]))];
2614                let stream = Box::pin(stream::iter(chunks));
2615                envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
2616                    stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
2617                    metadata: StreamMetadata {
2618                        size_hint: Some(32),
2619                        content_type: Some("application/octet-stream".into()),
2620                        origin: None,
2621                    },
2622                });
2623                if let Some(reply_tx) = envelope.reply_tx {
2624                    let _ = reply_tx.send(Ok(envelope.exchange));
2625                }
2626            }
2627        });
2628
2629        let resp = http_result.unwrap();
2630        assert_eq!(resp.status().as_u16(), 200);
2631        let body = resp.bytes().await.unwrap();
2632        assert_eq!(body.len(), 32);
2633        token.cancel();
2634    }
2635
2636    // -----------------------------------------------------------------------
2637    // Integration tests
2638    // -----------------------------------------------------------------------
2639
2640    #[tokio::test]
2641    async fn test_integration_single_consumer_round_trip() {
2642        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2643
2644        // Get an OS-assigned free port (ephemeral)
2645        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2646        let port = listener.local_addr().unwrap().port();
2647        drop(listener); // Release — ServerRegistry will rebind
2648
2649        let component = HttpComponent::new();
2650        let endpoint_ctx = NoOpComponentContext;
2651        let endpoint = component
2652            .create_endpoint(&format!("http://127.0.0.1:{port}/echo"), &endpoint_ctx)
2653            .unwrap();
2654        let mut consumer = endpoint.create_consumer().unwrap();
2655
2656        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2657        let token = tokio_util::sync::CancellationToken::new();
2658        let ctx = ConsumerContext::new(tx, token.clone());
2659
2660        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2661        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2662
2663        let client = reqwest::Client::new();
2664        let send_fut = client
2665            .post(format!("http://127.0.0.1:{port}/echo"))
2666            .header("Content-Type", "text/plain")
2667            .body("ping")
2668            .send();
2669
2670        let (http_result, _) = tokio::join!(send_fut, async {
2671            if let Some(mut envelope) = rx.recv().await {
2672                assert_eq!(
2673                    envelope.exchange.input.header("CamelHttpMethod"),
2674                    Some(&serde_json::Value::String("POST".into()))
2675                );
2676                assert_eq!(
2677                    envelope.exchange.input.header("CamelHttpPath"),
2678                    Some(&serde_json::Value::String("/echo".into()))
2679                );
2680                envelope.exchange.input.body = camel_component_api::Body::Text("pong".to_string());
2681                if let Some(reply_tx) = envelope.reply_tx {
2682                    let _ = reply_tx.send(Ok(envelope.exchange));
2683                }
2684            }
2685        });
2686
2687        let resp = http_result.unwrap();
2688        assert_eq!(resp.status().as_u16(), 200);
2689        let body = resp.text().await.unwrap();
2690        assert_eq!(body, "pong");
2691
2692        token.cancel();
2693    }
2694
2695    #[tokio::test]
2696    async fn test_integration_two_consumers_shared_port() {
2697        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2698
2699        // Get an OS-assigned free port (ephemeral)
2700        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2701        let port = listener.local_addr().unwrap().port();
2702        drop(listener);
2703
2704        let component = HttpComponent::new();
2705        let endpoint_ctx = NoOpComponentContext;
2706
2707        // Consumer A: /hello
2708        let endpoint_a = component
2709            .create_endpoint(&format!("http://127.0.0.1:{port}/hello"), &endpoint_ctx)
2710            .unwrap();
2711        let mut consumer_a = endpoint_a.create_consumer().unwrap();
2712
2713        // Consumer B: /world
2714        let endpoint_b = component
2715            .create_endpoint(&format!("http://127.0.0.1:{port}/world"), &endpoint_ctx)
2716            .unwrap();
2717        let mut consumer_b = endpoint_b.create_consumer().unwrap();
2718
2719        let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2720        let token_a = tokio_util::sync::CancellationToken::new();
2721        let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
2722
2723        let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2724        let token_b = tokio_util::sync::CancellationToken::new();
2725        let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
2726
2727        tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
2728        tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
2729        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2730
2731        let client = reqwest::Client::new();
2732
2733        // Request to /hello
2734        let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
2735        let (resp_hello, _) = tokio::join!(fut_hello, async {
2736            if let Some(mut envelope) = rx_a.recv().await {
2737                envelope.exchange.input.body =
2738                    camel_component_api::Body::Text("hello-response".to_string());
2739                if let Some(reply_tx) = envelope.reply_tx {
2740                    let _ = reply_tx.send(Ok(envelope.exchange));
2741                }
2742            }
2743        });
2744
2745        // Request to /world
2746        let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
2747        let (resp_world, _) = tokio::join!(fut_world, async {
2748            if let Some(mut envelope) = rx_b.recv().await {
2749                envelope.exchange.input.body =
2750                    camel_component_api::Body::Text("world-response".to_string());
2751                if let Some(reply_tx) = envelope.reply_tx {
2752                    let _ = reply_tx.send(Ok(envelope.exchange));
2753                }
2754            }
2755        });
2756
2757        let body_a = resp_hello.unwrap().text().await.unwrap();
2758        let body_b = resp_world.unwrap().text().await.unwrap();
2759
2760        assert_eq!(body_a, "hello-response");
2761        assert_eq!(body_b, "world-response");
2762
2763        token_a.cancel();
2764        token_b.cancel();
2765    }
2766
2767    #[tokio::test]
2768    async fn test_integration_unregistered_path_returns_404() {
2769        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2770
2771        // Get an OS-assigned free port (ephemeral)
2772        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2773        let port = listener.local_addr().unwrap().port();
2774        drop(listener);
2775
2776        let component = HttpComponent::new();
2777        let endpoint_ctx = NoOpComponentContext;
2778        let endpoint = component
2779            .create_endpoint(
2780                &format!("http://127.0.0.1:{port}/registered"),
2781                &endpoint_ctx,
2782            )
2783            .unwrap();
2784        let mut consumer = endpoint.create_consumer().unwrap();
2785
2786        let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2787        let token = tokio_util::sync::CancellationToken::new();
2788        let ctx = ConsumerContext::new(tx, token.clone());
2789
2790        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2791        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2792
2793        let client = reqwest::Client::new();
2794        let resp = client
2795            .get(format!("http://127.0.0.1:{port}/not-there"))
2796            .send()
2797            .await
2798            .unwrap();
2799        assert_eq!(resp.status().as_u16(), 404);
2800
2801        token.cancel();
2802    }
2803
2804    #[test]
2805    fn test_http_consumer_declares_concurrent() {
2806        use camel_component_api::ConcurrencyModel;
2807
2808        let config = HttpServerConfig {
2809            host: "127.0.0.1".to_string(),
2810            port: 19999,
2811            path: "/test".to_string(),
2812            max_request_body: 2 * 1024 * 1024,
2813            max_response_body: 10 * 1024 * 1024,
2814            max_inflight_requests: 1024,
2815        };
2816        let consumer = HttpConsumer::new(config);
2817        assert_eq!(
2818            consumer.concurrency_model(),
2819            ConcurrencyModel::Concurrent { max: None }
2820        );
2821    }
2822
2823    // -----------------------------------------------------------------------
2824    // HttpReplyBody streaming tests
2825    // -----------------------------------------------------------------------
2826
2827    #[tokio::test]
2828    async fn test_http_reply_body_stream_variant_exists() {
2829        use bytes::Bytes;
2830        use camel_component_api::CamelError;
2831        use futures::stream;
2832
2833        let chunks: Vec<Result<Bytes, CamelError>> =
2834            vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
2835        let stream = Box::pin(stream::iter(chunks));
2836        let reply_body = HttpReplyBody::Stream(stream);
2837        // Si compila y el match funciona, el test pasa
2838        match reply_body {
2839            HttpReplyBody::Stream(_) => {}
2840            HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
2841        }
2842    }
2843
2844    // -----------------------------------------------------------------------
2845    // OpenTelemetry propagation tests (only compiled with "otel" feature)
2846    // -----------------------------------------------------------------------
2847
2848    #[cfg(feature = "otel")]
2849    mod otel_tests {
2850        use super::*;
2851        use camel_component_api::Message;
2852        use tower::ServiceExt;
2853
2854        #[tokio::test]
2855        async fn test_producer_injects_traceparent_header() {
2856            let (url, _handle) = start_test_server_with_header_capture().await;
2857            let ctx = test_producer_ctx();
2858
2859            let component = HttpComponent::new();
2860            let endpoint_ctx = NoOpComponentContext;
2861            let endpoint = component
2862                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2863                .unwrap();
2864            let producer = endpoint.create_producer(&ctx).unwrap();
2865
2866            // Create exchange with an OTel context by extracting from a traceparent header
2867            let mut exchange = Exchange::new(Message::default());
2868            let mut headers = std::collections::HashMap::new();
2869            headers.insert(
2870                "traceparent".to_string(),
2871                "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
2872            );
2873            camel_otel::extract_into_exchange(&mut exchange, &headers);
2874
2875            let result = producer.oneshot(exchange).await.unwrap();
2876
2877            // Verify request succeeded
2878            let status = result
2879                .input
2880                .header("CamelHttpResponseCode")
2881                .and_then(|v| v.as_u64())
2882                .unwrap();
2883            assert_eq!(status, 200);
2884
2885            // The test server echoes back the received traceparent header
2886            let traceparent = result.input.header("x-received-traceparent");
2887            assert!(
2888                traceparent.is_some(),
2889                "traceparent header should have been sent"
2890            );
2891
2892            let traceparent_str = traceparent.unwrap().as_str().unwrap();
2893            // Verify format: version-traceid-spanid-flags
2894            let parts: Vec<&str> = traceparent_str.split('-').collect();
2895            assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2896            assert_eq!(parts[0], "00", "version should be 00");
2897            assert_eq!(
2898                parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2899                "trace-id should match"
2900            );
2901            assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
2902            assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
2903        }
2904
2905        #[tokio::test]
2906        async fn test_consumer_extracts_traceparent_header() {
2907            use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2908
2909            // Get an OS-assigned free port
2910            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2911            let port = listener.local_addr().unwrap().port();
2912            drop(listener);
2913
2914            let component = HttpComponent::new();
2915            let endpoint_ctx = NoOpComponentContext;
2916            let endpoint = component
2917                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
2918                .unwrap();
2919            let mut consumer = endpoint.create_consumer().unwrap();
2920
2921            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2922            let token = tokio_util::sync::CancellationToken::new();
2923            let ctx = ConsumerContext::new(tx, token.clone());
2924
2925            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2926            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2927
2928            // Send request with traceparent header
2929            let client = reqwest::Client::new();
2930            let send_fut = client
2931                .post(format!("http://127.0.0.1:{port}/trace"))
2932                .header(
2933                    "traceparent",
2934                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2935                )
2936                .body("test")
2937                .send();
2938
2939            let (http_result, _) = tokio::join!(send_fut, async {
2940                if let Some(envelope) = rx.recv().await {
2941                    // Verify the exchange has a valid OTel context by re-injecting it
2942                    // and checking the traceparent matches
2943                    let mut injected_headers = std::collections::HashMap::new();
2944                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2945
2946                    assert!(
2947                        injected_headers.contains_key("traceparent"),
2948                        "Exchange should have traceparent after extraction"
2949                    );
2950
2951                    let traceparent = injected_headers.get("traceparent").unwrap();
2952                    let parts: Vec<&str> = traceparent.split('-').collect();
2953                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2954                    assert_eq!(
2955                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2956                        "Trace ID should match the original traceparent header"
2957                    );
2958
2959                    if let Some(reply_tx) = envelope.reply_tx {
2960                        let _ = reply_tx.send(Ok(envelope.exchange));
2961                    }
2962                }
2963            });
2964
2965            let resp = http_result.unwrap();
2966            assert_eq!(resp.status().as_u16(), 200);
2967
2968            token.cancel();
2969        }
2970
2971        #[tokio::test]
2972        async fn test_consumer_extracts_mixed_case_traceparent_header() {
2973            use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2974
2975            // Get an OS-assigned free port
2976            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2977            let port = listener.local_addr().unwrap().port();
2978            drop(listener);
2979
2980            let component = HttpComponent::new();
2981            let endpoint_ctx = NoOpComponentContext;
2982            let endpoint = component
2983                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
2984                .unwrap();
2985            let mut consumer = endpoint.create_consumer().unwrap();
2986
2987            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2988            let token = tokio_util::sync::CancellationToken::new();
2989            let ctx = ConsumerContext::new(tx, token.clone());
2990
2991            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2992            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2993
2994            // Send request with MIXED-CASE TraceParent header (not lowercase)
2995            let client = reqwest::Client::new();
2996            let send_fut = client
2997                .post(format!("http://127.0.0.1:{port}/trace"))
2998                .header(
2999                    "TraceParent",
3000                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3001                )
3002                .body("test")
3003                .send();
3004
3005            let (http_result, _) = tokio::join!(send_fut, async {
3006                if let Some(envelope) = rx.recv().await {
3007                    // Verify the exchange has a valid OTel context by re-injecting it
3008                    // and checking the traceparent matches
3009                    let mut injected_headers = HashMap::new();
3010                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3011
3012                    assert!(
3013                        injected_headers.contains_key("traceparent"),
3014                        "Exchange should have traceparent after extraction from mixed-case header"
3015                    );
3016
3017                    let traceparent = injected_headers.get("traceparent").unwrap();
3018                    let parts: Vec<&str> = traceparent.split('-').collect();
3019                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3020                    assert_eq!(
3021                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3022                        "Trace ID should match the original mixed-case TraceParent header"
3023                    );
3024
3025                    if let Some(reply_tx) = envelope.reply_tx {
3026                        let _ = reply_tx.send(Ok(envelope.exchange));
3027                    }
3028                }
3029            });
3030
3031            let resp = http_result.unwrap();
3032            assert_eq!(resp.status().as_u16(), 200);
3033
3034            token.cancel();
3035        }
3036
3037        #[tokio::test]
3038        async fn test_producer_no_trace_context_no_crash() {
3039            let (url, _handle) = start_test_server().await;
3040            let ctx = test_producer_ctx();
3041
3042            let component = HttpComponent::new();
3043            let endpoint_ctx = NoOpComponentContext;
3044            let endpoint = component
3045                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3046                .unwrap();
3047            let producer = endpoint.create_producer(&ctx).unwrap();
3048
3049            // Create exchange with default (empty) otel_context - no trace context
3050            let exchange = Exchange::new(Message::default());
3051
3052            // Should succeed without panic
3053            let result = producer.oneshot(exchange).await.unwrap();
3054
3055            // Verify request succeeded
3056            let status = result
3057                .input
3058                .header("CamelHttpResponseCode")
3059                .and_then(|v| v.as_u64())
3060                .unwrap();
3061            assert_eq!(status, 200);
3062        }
3063
3064        /// Test server that captures and echoes back the traceparent header
3065        async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
3066            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3067            let addr = listener.local_addr().unwrap();
3068            let url = format!("http://127.0.0.1:{}", addr.port());
3069
3070            let handle = tokio::spawn(async move {
3071                loop {
3072                    if let Ok((mut stream, _)) = listener.accept().await {
3073                        tokio::spawn(async move {
3074                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
3075                            let mut buf = vec![0u8; 8192];
3076                            let n = stream.read(&mut buf).await.unwrap_or(0);
3077                            let request = String::from_utf8_lossy(&buf[..n]).to_string();
3078
3079                            // Extract traceparent header from request
3080                            let traceparent = request
3081                                .lines()
3082                                .find(|line| line.to_lowercase().starts_with("traceparent:"))
3083                                .map(|line| {
3084                                    line.split(':')
3085                                        .nth(1)
3086                                        .map(|s| s.trim().to_string())
3087                                        .unwrap_or_default()
3088                                })
3089                                .unwrap_or_default();
3090
3091                            let body =
3092                                format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
3093                            let response = format!(
3094                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
3095                                body.len(),
3096                                traceparent,
3097                                body
3098                            );
3099                            let _ = stream.write_all(response.as_bytes()).await;
3100                        });
3101                    }
3102                }
3103            });
3104
3105            (url, handle)
3106        }
3107    }
3108
3109    // -----------------------------------------------------------------------
3110    // Response streaming tests (Eje A - Task 2)
3111    // -----------------------------------------------------------------------
3112
3113    // -----------------------------------------------------------------------
3114    // Request streaming tests (Eje B - Task 3)
3115    // -----------------------------------------------------------------------
3116
3117    #[tokio::test]
3118    async fn test_request_body_arrives_as_stream() {
3119        use camel_component_api::Body;
3120        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3121
3122        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3123        let port = listener.local_addr().unwrap().port();
3124        drop(listener);
3125
3126        let component = HttpComponent::new();
3127        let endpoint_ctx = NoOpComponentContext;
3128        let endpoint = component
3129            .create_endpoint(&format!("http://127.0.0.1:{port}/upload"), &endpoint_ctx)
3130            .unwrap();
3131        let mut consumer = endpoint.create_consumer().unwrap();
3132
3133        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3134        let token = tokio_util::sync::CancellationToken::new();
3135        let ctx = ConsumerContext::new(tx, token.clone());
3136
3137        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3138        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3139
3140        let client = reqwest::Client::new();
3141        let send_fut = client
3142            .post(format!("http://127.0.0.1:{port}/upload"))
3143            .body("hello streaming world")
3144            .send();
3145
3146        let (http_result, _) = tokio::join!(send_fut, async {
3147            if let Some(mut envelope) = rx.recv().await {
3148                // Body must be Body::Stream, not Body::Text or Body::Bytes
3149                assert!(
3150                    matches!(envelope.exchange.input.body, Body::Stream(_)),
3151                    "expected Body::Stream, got discriminant {:?}",
3152                    std::mem::discriminant(&envelope.exchange.input.body)
3153                );
3154                // Materialize to verify content
3155                let bytes = envelope
3156                    .exchange
3157                    .input
3158                    .body
3159                    .into_bytes(1024 * 1024)
3160                    .await
3161                    .unwrap();
3162                assert_eq!(&bytes[..], b"hello streaming world");
3163
3164                envelope.exchange.input.body = camel_component_api::Body::Empty;
3165                if let Some(reply_tx) = envelope.reply_tx {
3166                    let _ = reply_tx.send(Ok(envelope.exchange));
3167                }
3168            }
3169        });
3170
3171        let resp = http_result.unwrap();
3172        assert_eq!(resp.status().as_u16(), 200);
3173
3174        token.cancel();
3175    }
3176
3177    // -----------------------------------------------------------------------
3178    // Response streaming tests (Eje A - Task 2)
3179    // -----------------------------------------------------------------------
3180
3181    #[tokio::test]
3182    async fn test_streaming_response_chunked() {
3183        use bytes::Bytes;
3184        use camel_component_api::Body;
3185        use camel_component_api::CamelError;
3186        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3187        use camel_component_api::{StreamBody, StreamMetadata};
3188        use futures::stream;
3189        use std::sync::Arc;
3190        use tokio::sync::Mutex;
3191
3192        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3193        let port = listener.local_addr().unwrap().port();
3194        drop(listener);
3195
3196        let component = HttpComponent::new();
3197        let endpoint_ctx = NoOpComponentContext;
3198        let endpoint = component
3199            .create_endpoint(&format!("http://127.0.0.1:{port}/stream"), &endpoint_ctx)
3200            .unwrap();
3201        let mut consumer = endpoint.create_consumer().unwrap();
3202
3203        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3204        let token = tokio_util::sync::CancellationToken::new();
3205        let ctx = ConsumerContext::new(tx, token.clone());
3206
3207        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3208        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3209
3210        let client = reqwest::Client::new();
3211        let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
3212
3213        let (http_result, _) = tokio::join!(send_fut, async {
3214            if let Some(mut envelope) = rx.recv().await {
3215                // Respond with Body::Stream
3216                let chunks: Vec<Result<Bytes, CamelError>> =
3217                    vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
3218                let stream = Box::pin(stream::iter(chunks));
3219                envelope.exchange.input.body = Body::Stream(StreamBody {
3220                    stream: Arc::new(Mutex::new(Some(stream))),
3221                    metadata: StreamMetadata::default(),
3222                });
3223                if let Some(reply_tx) = envelope.reply_tx {
3224                    let _ = reply_tx.send(Ok(envelope.exchange));
3225                }
3226            }
3227        });
3228
3229        let resp = http_result.unwrap();
3230        assert_eq!(resp.status().as_u16(), 200);
3231        let body = resp.text().await.unwrap();
3232        assert_eq!(body, "chunk1chunk2");
3233
3234        token.cancel();
3235    }
3236
3237    // -----------------------------------------------------------------------
3238    // 413 Content-Length limit test (Task 4)
3239    // -----------------------------------------------------------------------
3240
3241    #[tokio::test]
3242    async fn test_413_when_content_length_exceeds_limit() {
3243        use camel_component_api::ConsumerContext;
3244
3245        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3246        let port = listener.local_addr().unwrap().port();
3247        drop(listener);
3248
3249        // maxRequestBody=100 — any request declaring more than 100 bytes must get 413
3250        let component = HttpComponent::new();
3251        let endpoint_ctx = NoOpComponentContext;
3252        let endpoint = component
3253            .create_endpoint(
3254                &format!("http://127.0.0.1:{port}/upload?maxRequestBody=100"),
3255                &endpoint_ctx,
3256            )
3257            .unwrap();
3258        let mut consumer = endpoint.create_consumer().unwrap();
3259
3260        let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3261        let token = tokio_util::sync::CancellationToken::new();
3262        let ctx = ConsumerContext::new(tx, token.clone());
3263
3264        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3265        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3266
3267        let client = reqwest::Client::new();
3268        let resp = client
3269            .post(format!("http://127.0.0.1:{port}/upload"))
3270            .header("Content-Length", "1000") // declares 1000 bytes, limit is 100
3271            .body("x".repeat(1000))
3272            .send()
3273            .await
3274            .unwrap();
3275
3276        assert_eq!(resp.status().as_u16(), 413);
3277
3278        token.cancel();
3279    }
3280
3281    /// Chunked upload without Content-Length header must NOT be rejected by maxRequestBody.
3282    /// The spec says: "If there is no Content-Length, the limit does not apply at the
3283    /// consumer level — the route is responsible."
3284    #[tokio::test]
3285    async fn test_chunked_upload_without_content_length_bypasses_limit() {
3286        use bytes::Bytes;
3287        use camel_component_api::Body;
3288        use camel_component_api::ConsumerContext;
3289        use futures::stream;
3290
3291        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3292        let port = listener.local_addr().unwrap().port();
3293        drop(listener);
3294
3295        // maxRequestBody=10 — very small limit; chunked uploads have no Content-Length
3296        let component = HttpComponent::new();
3297        let endpoint_ctx = NoOpComponentContext;
3298        let endpoint = component
3299            .create_endpoint(
3300                &format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"),
3301                &endpoint_ctx,
3302            )
3303            .unwrap();
3304        let mut consumer = endpoint.create_consumer().unwrap();
3305
3306        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3307        let token = tokio_util::sync::CancellationToken::new();
3308        let ctx = ConsumerContext::new(tx, token.clone());
3309
3310        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3311        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3312
3313        let client = reqwest::Client::new();
3314
3315        // Use wrap_stream so reqwest sends chunked transfer encoding WITHOUT a
3316        // Content-Length header. 100 bytes exceeds the 10-byte maxRequestBody limit,
3317        // but since there's no Content-Length the 413 check must NOT fire.
3318        let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
3319            Ok(Bytes::from("y".repeat(50))),
3320            Ok(Bytes::from("y".repeat(50))),
3321        ];
3322        let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
3323        let send_fut = client
3324            .post(format!("http://127.0.0.1:{port}/upload"))
3325            .body(stream_body)
3326            .send();
3327
3328        let consumer_fut = async {
3329            // Use timeout to avoid deadlock if the handler rejects before enqueueing
3330            match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
3331                Ok(Some(mut envelope)) => {
3332                    assert!(
3333                        matches!(envelope.exchange.input.body, Body::Stream(_)),
3334                        "expected Body::Stream"
3335                    );
3336                    envelope.exchange.input.body = camel_component_api::Body::Empty;
3337                    if let Some(reply_tx) = envelope.reply_tx {
3338                        let _ = reply_tx.send(Ok(envelope.exchange));
3339                    }
3340                }
3341                Ok(None) => panic!("consumer channel closed unexpectedly"),
3342                Err(_) => {
3343                    // Timeout: the request was rejected before reaching the consumer.
3344                    // The HTTP response will carry the real status code (we check below).
3345                }
3346            }
3347        };
3348
3349        let (http_result, _) = tokio::join!(send_fut, consumer_fut);
3350
3351        let resp = http_result.unwrap();
3352        // Must NOT be 413; chunked uploads without Content-Length bypass the limit.
3353        assert_ne!(
3354            resp.status().as_u16(),
3355            413,
3356            "chunked upload must not be rejected by maxRequestBody"
3357        );
3358        assert_eq!(resp.status().as_u16(), 200);
3359
3360        token.cancel();
3361    }
3362}