Skip to main content

camel_component_http/
lib.rs

1pub mod config;
2pub use config::HttpConfig;
3
4use std::collections::HashMap;
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::{Arc, Mutex, OnceLock};
8use std::task::{Context, Poll};
9use std::time::Duration;
10
11use tokio::sync::RwLock;
12use tower::Service;
13use tracing::debug;
14
15use axum::body::BodyDataStream;
16use camel_api::{
17    BoxProcessor, CamelError, Exchange,
18    body::{Body, StreamBody, StreamMetadata},
19};
20use camel_component::{Component, Consumer, Endpoint, ProducerContext};
21use camel_endpoint::{UriComponents, UriConfig, parse_uri};
22use futures::TryStreamExt;
23use futures::stream::BoxStream;
24
25// ---------------------------------------------------------------------------
26// HttpEndpointConfig
27// ---------------------------------------------------------------------------
28
29/// Configuration for an HTTP client (producer) endpoint.
30///
31/// # Memory Limits
32///
33/// HTTP operations enforce conservative memory limits to prevent denial-of-service
34/// attacks from untrusted network sources. These limits are significantly lower than
35/// file component limits (100MB) because HTTP typically handles API responses rather
36/// than large file transfers, and clients may be untrusted.
37///
38/// ## Default Limits
39///
40/// - **HTTP client body**: 10MB (typical API responses)
41/// - **HTTP server request**: 2MB (untrusted network input - see `HttpServerConfig`)
42/// - **HTTP server response**: 10MB (same as client - see `HttpServerConfig`)
43///
44/// ## Rationale
45///
46/// The 10MB limit for HTTP client responses is appropriate for most API interactions
47/// while providing protection against:
48/// - Malicious servers sending oversized responses
49/// - Runaway processes generating unexpectedly large payloads
50/// - Memory exhaustion attacks
51///
52/// The 2MB server request limit is even more conservative because it handles input
53/// from potentially untrusted clients on the public internet.
54///
55/// ## Overriding Limits
56///
57/// Override the default client body limit using the `maxBodySize` URI parameter:
58///
59/// ```text
60/// http://api.example.com/large-data?maxBodySize=52428800
61/// ```
62///
63/// For server endpoints, use `maxRequestBody` and `maxResponseBody` parameters:
64///
65/// ```text
66/// http://0.0.0.0:8080/upload?maxRequestBody=52428800
67/// ```
68///
69/// ## Behavior When Exceeded
70///
71/// When a body exceeds the configured limit:
72/// - An error is returned immediately
73/// - No memory is exhausted - the limit is checked before allocation
74/// - The HTTP connection is terminated cleanly
75///
76/// ## Security Considerations
77///
78/// HTTP endpoints should be treated with more caution than file endpoints because:
79/// - Clients may be unknown and untrusted
80/// - Network traffic can be spoofed or malicious
81/// - DoS attacks often exploit unbounded resource consumption
82///
83/// Only increase limits when you control both ends of the connection or when
84/// business requirements demand larger payloads.
85#[derive(Debug, Clone)]
86pub struct HttpEndpointConfig {
87    pub base_url: String,
88    pub http_method: Option<String>,
89    pub throw_exception_on_failure: bool,
90    pub ok_status_code_range: (u16, u16),
91    pub follow_redirects: bool,
92    pub connect_timeout: Duration,
93    pub response_timeout: Option<Duration>,
94    pub query_params: HashMap<String, String>,
95    // Security settings
96    pub allow_private_ips: bool,
97    pub blocked_hosts: Vec<String>,
98    // Memory limit for body materialization (in bytes)
99    pub max_body_size: usize,
100}
101
102/// Camel options that should NOT be forwarded as HTTP query params
103const HTTP_CAMEL_OPTIONS: &[&str] = &[
104    "httpMethod",
105    "throwExceptionOnFailure",
106    "okStatusCodeRange",
107    "followRedirects",
108    "connectTimeout",
109    "responseTimeout",
110    "allowPrivateIps",
111    "blockedHosts",
112    "maxBodySize",
113];
114
115impl UriConfig for HttpEndpointConfig {
116    /// Returns "http" as the primary scheme (also accepts "https")
117    fn scheme() -> &'static str {
118        "http"
119    }
120
121    fn from_uri(uri: &str) -> Result<Self, CamelError> {
122        let parts = parse_uri(uri)?;
123        Self::from_components(parts)
124    }
125
126    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
127        // Validate scheme - accept both http and https
128        if parts.scheme != "http" && parts.scheme != "https" {
129            return Err(CamelError::InvalidUri(format!(
130                "expected scheme 'http' or 'https', got '{}'",
131                parts.scheme
132            )));
133        }
134
135        // Construct base_url from scheme + path
136        // e.g., "http://localhost:8080/api" from scheme "http" and path "//localhost:8080/api"
137        let base_url = format!("{}:{}", parts.scheme, parts.path);
138
139        let http_method = parts.params.get("httpMethod").cloned();
140
141        let throw_exception_on_failure = parts
142            .params
143            .get("throwExceptionOnFailure")
144            .map(|v| v != "false")
145            .unwrap_or(true);
146
147        // Parse status code range from "start-end" format (e.g., "200-299")
148        let ok_status_code_range = parts
149            .params
150            .get("okStatusCodeRange")
151            .and_then(|v| {
152                let (start, end) = v.split_once('-')?;
153                Some((start.parse::<u16>().ok()?, end.parse::<u16>().ok()?))
154            })
155            .unwrap_or((200, 299));
156
157        let follow_redirects = parts
158            .params
159            .get("followRedirects")
160            .map(|v| v == "true")
161            .unwrap_or(false);
162
163        let connect_timeout = parts
164            .params
165            .get("connectTimeout")
166            .and_then(|v| v.parse::<u64>().ok())
167            .map(Duration::from_millis)
168            .unwrap_or(Duration::from_millis(30000));
169
170        let response_timeout = parts
171            .params
172            .get("responseTimeout")
173            .and_then(|v| v.parse::<u64>().ok())
174            .map(Duration::from_millis);
175
176        // SSRF protection settings
177        let allow_private_ips = parts
178            .params
179            .get("allowPrivateIps")
180            .map(|v| v == "true")
181            .unwrap_or(false); // Default: block private IPs
182
183        // Parse comma-separated blocked hosts
184        let blocked_hosts = parts
185            .params
186            .get("blockedHosts")
187            .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
188            .unwrap_or_default();
189
190        let max_body_size = parts
191            .params
192            .get("maxBodySize")
193            .and_then(|v| v.parse::<usize>().ok())
194            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
195
196        // Collect remaining params (not Camel options) as query params
197        let query_params: HashMap<String, String> = parts
198            .params
199            .into_iter()
200            .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
201            .collect();
202
203        Ok(Self {
204            base_url,
205            http_method,
206            throw_exception_on_failure,
207            ok_status_code_range,
208            follow_redirects,
209            connect_timeout,
210            response_timeout,
211            query_params,
212            allow_private_ips,
213            blocked_hosts,
214            max_body_size,
215        })
216    }
217}
218
219// ---------------------------------------------------------------------------
220// HttpServerConfig
221// ---------------------------------------------------------------------------
222
223/// Configuration for an HTTP server (consumer) endpoint.
224#[derive(Debug, Clone)]
225pub struct HttpServerConfig {
226    /// Bind address, e.g. "0.0.0.0" or "127.0.0.1".
227    pub host: String,
228    /// TCP port to listen on.
229    pub port: u16,
230    /// URL path this consumer handles, e.g. "/orders".
231    pub path: String,
232    /// Maximum request body size in bytes.
233    pub max_request_body: usize,
234    /// Maximum response body size for materializing streams in bytes.
235    pub max_response_body: usize,
236}
237
238impl UriConfig for HttpServerConfig {
239    /// Returns "http" as the primary scheme (also accepts "https")
240    fn scheme() -> &'static str {
241        "http"
242    }
243
244    fn from_uri(uri: &str) -> Result<Self, CamelError> {
245        let parts = parse_uri(uri)?;
246        Self::from_components(parts)
247    }
248
249    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
250        // Validate scheme - accept both http and https
251        if parts.scheme != "http" && parts.scheme != "https" {
252            return Err(CamelError::InvalidUri(format!(
253                "expected scheme 'http' or 'https', got '{}'",
254                parts.scheme
255            )));
256        }
257
258        // parts.path is everything after the scheme colon, e.g. "//0.0.0.0:8080/orders"
259        // Strip leading "//"
260        let authority_and_path = parts.path.trim_start_matches('/');
261
262        // Split on the first "/" to separate "host:port" from "/path"
263        let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
264            (&authority_and_path[..idx], &authority_and_path[idx..])
265        } else {
266            (authority_and_path, "/")
267        };
268
269        let path = if path_suffix.is_empty() {
270            "/"
271        } else {
272            path_suffix
273        }
274        .to_string();
275
276        // Parse host:port from authority
277        let (host, port) = if let Some(colon) = authority.rfind(':') {
278            let port_str = &authority[colon + 1..];
279            match port_str.parse::<u16>() {
280                Ok(p) => (authority[..colon].to_string(), p),
281                Err(_) => {
282                    return Err(CamelError::InvalidUri(format!(
283                        "invalid port '{}' in authority",
284                        port_str
285                    )));
286                }
287            }
288        } else {
289            // Default port based on scheme: 443 for https, 80 for http
290            let default_port = if parts.scheme == "https" { 443 } else { 80 };
291            (authority.to_string(), default_port)
292        };
293
294        let max_request_body = parts
295            .params
296            .get("maxRequestBody")
297            .and_then(|v| v.parse::<usize>().ok())
298            .unwrap_or(2 * 1024 * 1024); // Default: 2MB
299
300        let max_response_body = parts
301            .params
302            .get("maxResponseBody")
303            .and_then(|v| v.parse::<usize>().ok())
304            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
305
306        Ok(Self {
307            host,
308            port,
309            path,
310            max_request_body,
311            max_response_body,
312        })
313    }
314}
315
316// ---------------------------------------------------------------------------
317// RequestEnvelope / HttpReply
318// ---------------------------------------------------------------------------
319
320/// Body de la respuesta HTTP: bytes ya materializados o stream lazy.
321pub(crate) enum HttpReplyBody {
322    Bytes(bytes::Bytes),
323    Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
324}
325
326/// An inbound HTTP request sent from the Axum dispatch handler to an
327/// `HttpConsumer` receive loop.
328pub(crate) struct RequestEnvelope {
329    pub(crate) method: String,
330    pub(crate) path: String,
331    pub(crate) query: String,
332    pub(crate) headers: http::HeaderMap,
333    pub(crate) body: StreamBody,
334    pub(crate) reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
335}
336
337/// The HTTP response that `HttpConsumer` sends back to the Axum handler.
338pub(crate) struct HttpReply {
339    pub(crate) status: u16,
340    pub(crate) headers: Vec<(String, String)>,
341    pub(crate) body: HttpReplyBody,
342}
343
344// ---------------------------------------------------------------------------
345// DispatchTable / ServerRegistry
346// ---------------------------------------------------------------------------
347
348/// Maps URL path → channel sender for the consumer that owns that path.
349pub(crate) type DispatchTable =
350    Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
351
352/// Handle to a running Axum server on one port.
353struct ServerHandle {
354    dispatch: DispatchTable,
355    /// Kept alive so the task isn't dropped; not used directly.
356    _task: tokio::task::JoinHandle<()>,
357}
358
359/// Process-global registry mapping port → running Axum server handle.
360pub struct ServerRegistry {
361    inner: Mutex<HashMap<u16, ServerHandle>>,
362}
363
364impl ServerRegistry {
365    /// Returns the global singleton.
366    pub fn global() -> &'static Self {
367        static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
368        INSTANCE.get_or_init(|| ServerRegistry {
369            inner: Mutex::new(HashMap::new()),
370        })
371    }
372
373    /// Returns the `DispatchTable` for `port`, spawning a new Axum server if
374    /// none is running on that port yet.
375    pub(crate) async fn get_or_spawn(
376        &'static self,
377        host: &str,
378        port: u16,
379        max_request_body: usize,
380    ) -> Result<DispatchTable, CamelError> {
381        // Fast path: check without spawning.
382        {
383            let guard = self.inner.lock().map_err(|_| {
384                CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
385            })?;
386            if let Some(handle) = guard.get(&port) {
387                return Ok(Arc::clone(&handle.dispatch));
388            }
389        }
390
391        // Slow path: need to bind and spawn.
392        let addr = format!("{}:{}", host, port);
393        let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
394            CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
395        })?;
396
397        let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
398        let dispatch_for_server = Arc::clone(&dispatch);
399        let task = tokio::spawn(run_axum_server(
400            listener,
401            dispatch_for_server,
402            max_request_body,
403        ));
404
405        // Re-acquire lock to insert — handle the race where another task won.
406        let mut guard = self.inner.lock().map_err(|_| {
407            CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
408        })?;
409        // If another task already registered this port while we were binding,
410        // abort our task (it has no routes) and use the winner's dispatch table.
411        if let Some(existing) = guard.get(&port) {
412            task.abort();
413            return Ok(Arc::clone(&existing.dispatch));
414        }
415        guard.insert(
416            port,
417            ServerHandle {
418                dispatch: Arc::clone(&dispatch),
419                _task: task,
420            },
421        );
422        Ok(dispatch)
423    }
424}
425
426// ---------------------------------------------------------------------------
427// Axum server
428// ---------------------------------------------------------------------------
429
430use axum::{
431    Router,
432    body::Body as AxumBody,
433    extract::{Request, State},
434    http::{Response, StatusCode},
435    response::IntoResponse,
436};
437
438#[derive(Clone)]
439struct AppState {
440    dispatch: DispatchTable,
441    max_request_body: usize,
442}
443
444async fn run_axum_server(
445    listener: tokio::net::TcpListener,
446    dispatch: DispatchTable,
447    max_request_body: usize,
448) {
449    let state = AppState {
450        dispatch,
451        max_request_body,
452    };
453    let app = Router::new().fallback(dispatch_handler).with_state(state);
454
455    axum::serve(listener, app).await.unwrap_or_else(|e| {
456        tracing::error!(error = %e, "Axum server error");
457    });
458}
459
460async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
461    let method = req.method().to_string();
462    let path = req.uri().path().to_string();
463    let query = req.uri().query().unwrap_or("").to_string();
464    let headers = req.headers().clone();
465
466    // Check Content-Length against limit BEFORE opening the stream
467    let content_length: Option<u64> = headers
468        .get(http::header::CONTENT_LENGTH)
469        .and_then(|v| v.to_str().ok())
470        .and_then(|s| s.parse().ok());
471
472    if let Some(len) = content_length
473        && len > state.max_request_body as u64
474    {
475        return Response::builder()
476            .status(StatusCode::PAYLOAD_TOO_LARGE)
477            .body(AxumBody::from("Request body exceeds configured limit"))
478            .expect("infallible");
479    }
480
481    // Build StreamBody from Axum body WITHOUT materializing
482    let content_type = headers
483        .get(http::header::CONTENT_TYPE)
484        .and_then(|v| v.to_str().ok())
485        .map(|s| s.to_string());
486
487    let data_stream: BodyDataStream = req.into_body().into_data_stream();
488    let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
489    let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
490
491    let stream_body = StreamBody {
492        stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
493        metadata: StreamMetadata {
494            size_hint: content_length,
495            content_type,
496            origin: None,
497        },
498    };
499
500    // Look up handler for this path
501    let sender = {
502        let table = state.dispatch.read().await;
503        table.get(&path).cloned()
504    };
505    let Some(sender) = sender else {
506        return Response::builder()
507            .status(StatusCode::NOT_FOUND)
508            .body(AxumBody::from("No consumer registered for this path"))
509            .expect("infallible");
510    };
511
512    let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
513    let envelope = RequestEnvelope {
514        method,
515        path,
516        query,
517        headers,
518        body: stream_body,
519        reply_tx,
520    };
521
522    if sender.send(envelope).await.is_err() {
523        return Response::builder()
524            .status(StatusCode::SERVICE_UNAVAILABLE)
525            .body(AxumBody::from("Consumer unavailable"))
526            .expect("infallible");
527    }
528
529    match reply_rx.await {
530        Ok(reply) => {
531            let status =
532                StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
533            let mut builder = Response::builder().status(status);
534            for (k, v) in &reply.headers {
535                builder = builder.header(k.as_str(), v.as_str());
536            }
537            match reply.body {
538                HttpReplyBody::Bytes(b) => builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
539                    Response::builder()
540                        .status(StatusCode::INTERNAL_SERVER_ERROR)
541                        .body(AxumBody::from("Invalid response headers from consumer"))
542                        .expect("infallible")
543                }),
544                HttpReplyBody::Stream(stream) => builder
545                    .body(AxumBody::from_stream(stream))
546                    .unwrap_or_else(|_| {
547                        Response::builder()
548                            .status(StatusCode::INTERNAL_SERVER_ERROR)
549                            .body(AxumBody::from("Invalid response headers from consumer"))
550                            .expect("infallible")
551                    }),
552            }
553        }
554        Err(_) => Response::builder()
555            .status(StatusCode::INTERNAL_SERVER_ERROR)
556            .body(AxumBody::from("Pipeline error"))
557            .expect("Response::builder() with a known-valid status code and body is infallible"),
558    }
559}
560
561// ---------------------------------------------------------------------------
562// HttpConsumer
563// ---------------------------------------------------------------------------
564
565pub struct HttpConsumer {
566    config: HttpServerConfig,
567}
568
569impl HttpConsumer {
570    pub fn new(config: HttpServerConfig) -> Self {
571        Self { config }
572    }
573}
574
575#[async_trait::async_trait]
576impl Consumer for HttpConsumer {
577    async fn start(&mut self, ctx: camel_component::ConsumerContext) -> Result<(), CamelError> {
578        use camel_api::{Exchange, Message, body::Body};
579
580        let dispatch = ServerRegistry::global()
581            .get_or_spawn(
582                &self.config.host,
583                self.config.port,
584                self.config.max_request_body,
585            )
586            .await?;
587
588        // Create a channel for this path and register it
589        let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
590        {
591            let mut table = dispatch.write().await;
592            table.insert(self.config.path.clone(), env_tx);
593        }
594
595        let path = self.config.path.clone();
596        let cancel_token = ctx.cancel_token();
597        let _max_response_body = self.config.max_response_body;
598
599        loop {
600            tokio::select! {
601                _ = ctx.cancelled() => {
602                    break;
603                }
604                envelope = env_rx.recv() => {
605                    let Some(envelope) = envelope else { break; };
606
607                    // Build Exchange from HTTP request
608                    let mut msg = Message::default();
609
610                    // Set standard Camel HTTP headers
611                    msg.set_header("CamelHttpMethod",
612                        serde_json::Value::String(envelope.method.clone()));
613                    msg.set_header("CamelHttpPath",
614                        serde_json::Value::String(envelope.path.clone()));
615                    msg.set_header("CamelHttpQuery",
616                        serde_json::Value::String(envelope.query.clone()));
617
618                    // Forward HTTP headers (skip pseudo-headers)
619                    for (k, v) in &envelope.headers {
620                        if let Ok(val_str) = v.to_str() {
621                            msg.set_header(
622                                k.as_str(),
623                                serde_json::Value::String(val_str.to_string()),
624                            );
625                        }
626                    }
627
628                    // Body: always arrives as Body::Stream (native streaming)
629                    // Routes can call into_bytes() if they need to materialize
630                    msg.body = Body::Stream(envelope.body);
631
632                    #[allow(unused_mut)]
633                    let mut exchange = Exchange::new(msg);
634
635                    // Extract W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
636                    #[cfg(feature = "otel")]
637                    {
638                        let headers: HashMap<String, String> = envelope
639                            .headers
640                            .iter()
641                            .filter_map(|(k, v)| {
642                                Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
643                            })
644                            .collect();
645                        camel_otel::extract_into_exchange(&mut exchange, &headers);
646                    }
647
648                    let reply_tx = envelope.reply_tx;
649                    let sender = ctx.sender().clone();
650                    let path_clone = path.clone();
651                    let cancel = cancel_token.clone();
652
653                    // Spawn a task to handle this request concurrently
654                    //
655                    // NOTE: This spawns a separate tokio task for each incoming HTTP request to enable
656                    // true concurrent request processing. This change was introduced as part of the
657                    // pipeline concurrency feature and was NOT part of the original HttpConsumer design.
658                    //
659                    // Rationale:
660                    // 1. Without spawning per-request tasks, the send_and_wait() operation would block
661                    //    the consumer's main loop until the pipeline processing completes
662                    // 2. This blocking would prevent multiple HTTP requests from being processed
663                    //    concurrently, even when ConcurrencyModel::Concurrent is enabled on the pipeline
664                    // 3. The channel would never have multiple exchanges buffered simultaneously,
665                    //    defeating the purpose of pipeline-side concurrency
666                    // 4. By spawning a task per request, we allow the consumer loop to continue
667                    //    accepting new requests while existing ones are processed in the pipeline
668                    //
669                    // This approach effectively decouples request acceptance from pipeline processing,
670                    // allowing the channel to buffer multiple exchanges that can be processed concurrently
671                    // by the pipeline when ConcurrencyModel::Concurrent is active.
672                    tokio::spawn(async move {
673                        // Check for cancellation before sending to pipeline.
674                        // Returns 503 (Service Unavailable) instead of letting the request
675                        // enter a shutting-down pipeline. This is a behavioral change from
676                        // the pre-concurrency implementation where cancellation during
677                        // processing would result in a 500 (Internal Server Error).
678                        // 503 is more semantically correct: the server is temporarily
679                        // unable to handle the request due to shutdown.
680                        if cancel.is_cancelled() {
681                            let _ = reply_tx.send(HttpReply {
682                                status: 503,
683                                headers: vec![],
684                                body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
685                            });
686                            return;
687                        }
688
689                        // Send through pipeline and await result
690                        let (tx, rx) = tokio::sync::oneshot::channel();
691                        let envelope = camel_component::consumer::ExchangeEnvelope {
692                            exchange,
693                            reply_tx: Some(tx),
694                        };
695
696                        let result = match sender.send(envelope).await {
697                            Ok(()) => rx.await.map_err(|_| camel_api::CamelError::ChannelClosed),
698                            Err(_) => Err(camel_api::CamelError::ChannelClosed),
699                        }
700                        .and_then(|r| r);
701
702                        let reply = match result {
703                            Ok(out) => {
704                                let status = out
705                                    .input
706                                    .header("CamelHttpResponseCode")
707                                    .and_then(|v| v.as_u64())
708                                    .map(|s| s as u16)
709                                    .unwrap_or(200);
710
711                                let reply_body: HttpReplyBody = match out.input.body {
712                                    Body::Empty => HttpReplyBody::Bytes(bytes::Bytes::new()),
713                                    Body::Bytes(b) => HttpReplyBody::Bytes(b),
714                                    Body::Text(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
715                                    Body::Xml(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
716                                    Body::Json(v) => HttpReplyBody::Bytes(bytes::Bytes::from(
717                                        v.to_string().into_bytes(),
718                                    )),
719                                    Body::Stream(s) => {
720                                        match s.stream.lock().await.take() {
721                                            Some(stream) => HttpReplyBody::Stream(stream),
722                                            None => {
723                                                tracing::error!(
724                                                    "Body::Stream already consumed before HTTP reply — returning 500"
725                                                );
726                                                let error_reply = HttpReply {
727                                                    status: 500,
728                                                    headers: vec![],
729                                                    body: HttpReplyBody::Bytes(bytes::Bytes::new()),
730                                                };
731                                                if reply_tx.send(error_reply).is_err() {
732                                                    debug!("reply_tx dropped before error reply could be sent");
733                                                }
734                                                return;
735                                            }
736                                        }
737                                    }
738                                };
739
740                                let resp_headers: Vec<(String, String)> = out
741                                    .input
742                                    .headers
743                                    .iter()
744                                    // Filter Camel internal headers
745                                    .filter(|(k, _)| !k.starts_with("Camel"))
746                                    // Filter hop-by-hop and request-only headers
747                                    // Based on Apache Camel's HttpUtil.addCommonFilters()
748                                    .filter(|(k, _)| {
749                                        !matches!(
750                                            k.to_lowercase().as_str(),
751                                            // RFC 2616 Section 4.5 - General headers
752                                            "content-length" |      // Auto-calculated by framework
753                                            "content-type" |        // Auto-calculated from body
754                                            "transfer-encoding" |   // Hop-by-hop
755                                            "connection" |          // Hop-by-hop
756                                            "cache-control" |       // Hop-by-hop
757                                            "date" |                // Auto-generated
758                                            "pragma" |              // Hop-by-hop
759                                            "trailer" |             // Hop-by-hop
760                                            "upgrade" |             // Hop-by-hop
761                                            "via" |                 // Hop-by-hop
762                                            "warning" |             // Hop-by-hop
763                                            // Request-only headers
764                                            "host" |                // Request-only
765                                            "user-agent" |          // Request-only
766                                            "accept" |              // Request-only
767                                            "accept-encoding" |     // Request-only
768                                            "accept-language" |     // Request-only
769                                            "accept-charset" |      // Request-only
770                                            "authorization" |       // Request-only (security)
771                                            "proxy-authorization" | // Request-only (security)
772                                            "cookie" |              // Request-only
773                                            "expect" |              // Request-only
774                                            "from" |                // Request-only
775                                            "if-match" |            // Request-only
776                                            "if-modified-since" |   // Request-only
777                                            "if-none-match" |       // Request-only
778                                            "if-range" |            // Request-only
779                                            "if-unmodified-since" | // Request-only
780                                            "max-forwards" |        // Request-only
781                                            "proxy-connection" |    // Request-only
782                                            "range" |               // Request-only
783                                            "referer" |             // Request-only
784                                            "te"                    // Request-only
785                                        )
786                                    })
787                                    .filter_map(|(k, v)| {
788                                        v.as_str().map(|s| (k.clone(), s.to_string()))
789                                    })
790                                    .collect();
791
792                                HttpReply {
793                                    status,
794                                    headers: resp_headers,
795                                    body: reply_body,
796                                }
797                            }
798                            Err(e) => {
799                                tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
800                                HttpReply {
801                                    status: 500,
802                                    headers: vec![],
803                                    body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
804                                }
805                            }
806                        };
807
808                        // Reply to Axum handler (ignore error if client disconnected)
809                        let _ = reply_tx.send(reply);
810                    });
811                }
812            }
813        }
814
815        // Deregister this path
816        {
817            let mut table = dispatch.write().await;
818            table.remove(&path);
819        }
820
821        Ok(())
822    }
823
824    async fn stop(&mut self) -> Result<(), CamelError> {
825        Ok(())
826    }
827
828    fn concurrency_model(&self) -> camel_component::ConcurrencyModel {
829        camel_component::ConcurrencyModel::Concurrent { max: None }
830    }
831}
832
833// ---------------------------------------------------------------------------
834// HttpComponent / HttpsComponent
835// ---------------------------------------------------------------------------
836
837pub struct HttpComponent;
838
839impl HttpComponent {
840    pub fn new() -> Self {
841        Self
842    }
843}
844
845impl Default for HttpComponent {
846    fn default() -> Self {
847        Self::new()
848    }
849}
850
851impl Component for HttpComponent {
852    fn scheme(&self) -> &str {
853        "http"
854    }
855
856    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
857        let config = HttpEndpointConfig::from_uri(uri)?;
858        let server_config = HttpServerConfig::from_uri(uri)?;
859        let client = build_client(&config)?;
860        Ok(Box::new(HttpEndpoint {
861            uri: uri.to_string(),
862            config,
863            server_config,
864            client,
865        }))
866    }
867}
868
869pub struct HttpsComponent;
870
871impl HttpsComponent {
872    pub fn new() -> Self {
873        Self
874    }
875}
876
877impl Default for HttpsComponent {
878    fn default() -> Self {
879        Self::new()
880    }
881}
882
883impl Component for HttpsComponent {
884    fn scheme(&self) -> &str {
885        "https"
886    }
887
888    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
889        let config = HttpEndpointConfig::from_uri(uri)?;
890        let server_config = HttpServerConfig::from_uri(uri)?;
891        let client = build_client(&config)?;
892        Ok(Box::new(HttpEndpoint {
893            uri: uri.to_string(),
894            config,
895            server_config,
896            client,
897        }))
898    }
899}
900
901fn build_client(config: &HttpEndpointConfig) -> Result<reqwest::Client, CamelError> {
902    let mut builder = reqwest::Client::builder().connect_timeout(config.connect_timeout);
903
904    if !config.follow_redirects {
905        builder = builder.redirect(reqwest::redirect::Policy::none());
906    }
907
908    builder.build().map_err(|e| {
909        CamelError::EndpointCreationFailed(format!("Failed to build HTTP client: {e}"))
910    })
911}
912
913// ---------------------------------------------------------------------------
914// HttpEndpoint
915// ---------------------------------------------------------------------------
916
917struct HttpEndpoint {
918    uri: String,
919    config: HttpEndpointConfig,
920    server_config: HttpServerConfig,
921    client: reqwest::Client,
922}
923
924impl Endpoint for HttpEndpoint {
925    fn uri(&self) -> &str {
926        &self.uri
927    }
928
929    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
930        Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
931    }
932
933    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
934        Ok(BoxProcessor::new(HttpProducer {
935            config: Arc::new(self.config.clone()),
936            client: self.client.clone(),
937        }))
938    }
939}
940
941// ---------------------------------------------------------------------------
942// SSRF Protection
943// ---------------------------------------------------------------------------
944
945fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
946    let parsed = url::Url::parse(url)
947        .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
948
949    // Check blocked hosts
950    if let Some(host) = parsed.host_str()
951        && config.blocked_hosts.iter().any(|blocked| host == blocked)
952    {
953        return Err(CamelError::ProcessorError(format!(
954            "Host '{}' is blocked",
955            host
956        )));
957    }
958
959    // Check private IPs if not allowed
960    if !config.allow_private_ips
961        && let Some(host) = parsed.host()
962    {
963        match host {
964            url::Host::Ipv4(ip) => {
965                if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
966                    return Err(CamelError::ProcessorError(format!(
967                        "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
968                        ip
969                    )));
970                }
971            }
972            url::Host::Ipv6(ip) => {
973                if ip.is_loopback() {
974                    return Err(CamelError::ProcessorError(format!(
975                        "Loopback IP '{}' not allowed",
976                        ip
977                    )));
978                }
979            }
980            url::Host::Domain(domain) => {
981                // Block common internal domains
982                let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
983                if blocked_domains.contains(&domain) {
984                    return Err(CamelError::ProcessorError(format!(
985                        "Domain '{}' is not allowed",
986                        domain
987                    )));
988                }
989            }
990        }
991    }
992
993    Ok(())
994}
995
996// ---------------------------------------------------------------------------
997// HttpProducer
998// ---------------------------------------------------------------------------
999
1000#[derive(Clone)]
1001struct HttpProducer {
1002    config: Arc<HttpEndpointConfig>,
1003    client: reqwest::Client,
1004}
1005
1006impl HttpProducer {
1007    fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1008        if let Some(ref method) = config.http_method {
1009            return method.to_uppercase();
1010        }
1011        if let Some(method) = exchange
1012            .input
1013            .header("CamelHttpMethod")
1014            .and_then(|v| v.as_str())
1015        {
1016            return method.to_uppercase();
1017        }
1018        if !exchange.input.body.is_empty() {
1019            return "POST".to_string();
1020        }
1021        "GET".to_string()
1022    }
1023
1024    fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1025        if let Some(uri) = exchange
1026            .input
1027            .header("CamelHttpUri")
1028            .and_then(|v| v.as_str())
1029        {
1030            let mut url = uri.to_string();
1031            if let Some(path) = exchange
1032                .input
1033                .header("CamelHttpPath")
1034                .and_then(|v| v.as_str())
1035            {
1036                if !url.ends_with('/') && !path.starts_with('/') {
1037                    url.push('/');
1038                }
1039                url.push_str(path);
1040            }
1041            if let Some(query) = exchange
1042                .input
1043                .header("CamelHttpQuery")
1044                .and_then(|v| v.as_str())
1045            {
1046                url.push('?');
1047                url.push_str(query);
1048            }
1049            return url;
1050        }
1051
1052        let mut url = config.base_url.clone();
1053
1054        if let Some(path) = exchange
1055            .input
1056            .header("CamelHttpPath")
1057            .and_then(|v| v.as_str())
1058        {
1059            if !url.ends_with('/') && !path.starts_with('/') {
1060                url.push('/');
1061            }
1062            url.push_str(path);
1063        }
1064
1065        if let Some(query) = exchange
1066            .input
1067            .header("CamelHttpQuery")
1068            .and_then(|v| v.as_str())
1069        {
1070            url.push('?');
1071            url.push_str(query);
1072        } else if !config.query_params.is_empty() {
1073            // Forward non-Camel query params from config
1074            url.push('?');
1075            let query_string: String = config
1076                .query_params
1077                .iter()
1078                .map(|(k, v)| format!("{k}={v}"))
1079                .collect::<Vec<_>>()
1080                .join("&");
1081            url.push_str(&query_string);
1082        }
1083
1084        url
1085    }
1086
1087    fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1088        status >= range.0 && status <= range.1
1089    }
1090}
1091
1092impl Service<Exchange> for HttpProducer {
1093    type Response = Exchange;
1094    type Error = CamelError;
1095    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1096
1097    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1098        Poll::Ready(Ok(()))
1099    }
1100
1101    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1102        let config = self.config.clone();
1103        let client = self.client.clone();
1104
1105        Box::pin(async move {
1106            let method_str = HttpProducer::resolve_method(&exchange, &config);
1107            let url = HttpProducer::resolve_url(&exchange, &config);
1108
1109            // SECURITY: Validate URL for SSRF
1110            validate_url_for_ssrf(&url, &config)?;
1111
1112            debug!(
1113                correlation_id = %exchange.correlation_id(),
1114                method = %method_str,
1115                url = %url,
1116                "HTTP request"
1117            );
1118
1119            let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1120                CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1121            })?;
1122
1123            let mut request = client.request(method, &url);
1124
1125            if let Some(timeout) = config.response_timeout {
1126                request = request.timeout(timeout);
1127            }
1128
1129            // Inject W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
1130            #[cfg(feature = "otel")]
1131            {
1132                let mut otel_headers = HashMap::new();
1133                camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1134                for (k, v) in otel_headers {
1135                    if let (Ok(name), Ok(val)) = (
1136                        reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1137                        reqwest::header::HeaderValue::from_str(&v),
1138                    ) {
1139                        request = request.header(name, val);
1140                    }
1141                }
1142            }
1143
1144            for (key, value) in &exchange.input.headers {
1145                if !key.starts_with("Camel")
1146                    && let Some(val_str) = value.as_str()
1147                    && let (Ok(name), Ok(val)) = (
1148                        reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1149                        reqwest::header::HeaderValue::from_str(val_str),
1150                    )
1151                {
1152                    request = request.header(name, val);
1153                }
1154            }
1155
1156            match exchange.input.body {
1157                Body::Stream(ref s) => {
1158                    let mut stream_lock = s.stream.lock().await;
1159                    if let Some(stream) = stream_lock.take() {
1160                        request = request.body(reqwest::Body::wrap_stream(stream));
1161                    } else {
1162                        return Err(CamelError::AlreadyConsumed);
1163                    }
1164                }
1165                _ => {
1166                    // For other types, materialize with configured limit
1167                    let body = std::mem::take(&mut exchange.input.body);
1168                    let bytes = body.into_bytes(config.max_body_size).await?;
1169                    if !bytes.is_empty() {
1170                        request = request.body(bytes);
1171                    }
1172                }
1173            }
1174
1175            let response = request
1176                .send()
1177                .await
1178                .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1179
1180            let status_code = response.status().as_u16();
1181            let status_text = response
1182                .status()
1183                .canonical_reason()
1184                .unwrap_or("Unknown")
1185                .to_string();
1186
1187            for (key, value) in response.headers() {
1188                if let Ok(val_str) = value.to_str() {
1189                    exchange
1190                        .input
1191                        .set_header(key.as_str(), serde_json::Value::String(val_str.to_string()));
1192                }
1193            }
1194
1195            exchange.input.set_header(
1196                "CamelHttpResponseCode",
1197                serde_json::Value::Number(status_code.into()),
1198            );
1199            exchange.input.set_header(
1200                "CamelHttpResponseText",
1201                serde_json::Value::String(status_text.clone()),
1202            );
1203
1204            let response_body = response.bytes().await.map_err(|e| {
1205                CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1206            })?;
1207
1208            if config.throw_exception_on_failure
1209                && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1210            {
1211                return Err(CamelError::HttpOperationFailed {
1212                    method: method_str,
1213                    url,
1214                    status_code,
1215                    status_text,
1216                    response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1217                });
1218            }
1219
1220            if !response_body.is_empty() {
1221                exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1222            }
1223
1224            debug!(
1225                correlation_id = %exchange.correlation_id(),
1226                status = status_code,
1227                url = %url,
1228                "HTTP response"
1229            );
1230            Ok(exchange)
1231        })
1232    }
1233}
1234
1235#[cfg(test)]
1236mod tests {
1237    use super::*;
1238    use camel_api::Message;
1239    use std::sync::Arc;
1240    use std::time::Duration;
1241
1242    fn test_producer_ctx() -> ProducerContext {
1243        ProducerContext::new()
1244    }
1245
1246    #[test]
1247    fn test_http_config_defaults() {
1248        let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1249        assert_eq!(config.base_url, "http://localhost:8080/api");
1250        assert!(config.http_method.is_none());
1251        assert!(config.throw_exception_on_failure);
1252        assert_eq!(config.ok_status_code_range, (200, 299));
1253        assert!(!config.follow_redirects);
1254        assert_eq!(config.connect_timeout, Duration::from_millis(30000));
1255        assert!(config.response_timeout.is_none());
1256    }
1257
1258    #[test]
1259    fn test_http_config_scheme() {
1260        // UriConfig trait method returns "http" as primary scheme
1261        assert_eq!(HttpEndpointConfig::scheme(), "http");
1262    }
1263
1264    #[test]
1265    fn test_http_config_from_components() {
1266        // Test from_components directly (trait method)
1267        let components = camel_endpoint::UriComponents {
1268            scheme: "https".to_string(),
1269            path: "//api.example.com/v1".to_string(),
1270            params: std::collections::HashMap::from([(
1271                "httpMethod".to_string(),
1272                "POST".to_string(),
1273            )]),
1274        };
1275        let config = HttpEndpointConfig::from_components(components).unwrap();
1276        assert_eq!(config.base_url, "https://api.example.com/v1");
1277        assert_eq!(config.http_method, Some("POST".to_string()));
1278    }
1279
1280    #[test]
1281    fn test_http_config_with_options() {
1282        let config = HttpEndpointConfig::from_uri(
1283            "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1284        ).unwrap();
1285        assert_eq!(config.base_url, "https://api.example.com/v1");
1286        assert_eq!(config.http_method, Some("PUT".to_string()));
1287        assert!(!config.throw_exception_on_failure);
1288        assert!(config.follow_redirects);
1289        assert_eq!(config.connect_timeout, Duration::from_millis(5000));
1290        assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1291    }
1292
1293    #[test]
1294    fn test_http_config_ok_status_range() {
1295        let config =
1296            HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1297        assert_eq!(config.ok_status_code_range, (200, 204));
1298    }
1299
1300    #[test]
1301    fn test_http_config_wrong_scheme() {
1302        let result = HttpEndpointConfig::from_uri("file:/tmp");
1303        assert!(result.is_err());
1304    }
1305
1306    #[test]
1307    fn test_http_component_scheme() {
1308        let component = HttpComponent::new();
1309        assert_eq!(component.scheme(), "http");
1310    }
1311
1312    #[test]
1313    fn test_https_component_scheme() {
1314        let component = HttpsComponent::new();
1315        assert_eq!(component.scheme(), "https");
1316    }
1317
1318    #[test]
1319    fn test_http_endpoint_creates_consumer() {
1320        let component = HttpComponent::new();
1321        let endpoint = component
1322            .create_endpoint("http://0.0.0.0:19100/test")
1323            .unwrap();
1324        assert!(endpoint.create_consumer().is_ok());
1325    }
1326
1327    #[test]
1328    fn test_https_endpoint_creates_consumer() {
1329        let component = HttpsComponent::new();
1330        let endpoint = component
1331            .create_endpoint("https://0.0.0.0:8443/test")
1332            .unwrap();
1333        assert!(endpoint.create_consumer().is_ok());
1334    }
1335
1336    #[test]
1337    fn test_http_endpoint_creates_producer() {
1338        let ctx = test_producer_ctx();
1339        let component = HttpComponent::new();
1340        let endpoint = component.create_endpoint("http://localhost/api").unwrap();
1341        assert!(endpoint.create_producer(&ctx).is_ok());
1342    }
1343
1344    // -----------------------------------------------------------------------
1345    // Producer tests
1346    // -----------------------------------------------------------------------
1347
1348    async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
1349        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1350        let addr = listener.local_addr().unwrap();
1351        let url = format!("http://127.0.0.1:{}", addr.port());
1352
1353        let handle = tokio::spawn(async move {
1354            loop {
1355                if let Ok((mut stream, _)) = listener.accept().await {
1356                    tokio::spawn(async move {
1357                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1358                        let mut buf = vec![0u8; 4096];
1359                        let n = stream.read(&mut buf).await.unwrap_or(0);
1360                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
1361
1362                        let method = request.split_whitespace().next().unwrap_or("GET");
1363
1364                        let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
1365                        let response = format!(
1366                            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
1367                            body.len(),
1368                            body
1369                        );
1370                        let _ = stream.write_all(response.as_bytes()).await;
1371                    });
1372                }
1373            }
1374        });
1375
1376        (url, handle)
1377    }
1378
1379    async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
1380        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1381        let addr = listener.local_addr().unwrap();
1382        let url = format!("http://127.0.0.1:{}", addr.port());
1383
1384        let handle = tokio::spawn(async move {
1385            loop {
1386                if let Ok((mut stream, _)) = listener.accept().await {
1387                    let status = status;
1388                    tokio::spawn(async move {
1389                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1390                        let mut buf = vec![0u8; 4096];
1391                        let _ = stream.read(&mut buf).await;
1392
1393                        let status_text = match status {
1394                            404 => "Not Found",
1395                            500 => "Internal Server Error",
1396                            _ => "Error",
1397                        };
1398                        let body = "error body";
1399                        let response = format!(
1400                            "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
1401                            status,
1402                            status_text,
1403                            body.len(),
1404                            body
1405                        );
1406                        let _ = stream.write_all(response.as_bytes()).await;
1407                    });
1408                }
1409            }
1410        });
1411
1412        (url, handle)
1413    }
1414
1415    #[tokio::test]
1416    async fn test_http_producer_get_request() {
1417        use tower::ServiceExt;
1418
1419        let (url, _handle) = start_test_server().await;
1420        let ctx = test_producer_ctx();
1421
1422        let component = HttpComponent::new();
1423        let endpoint = component
1424            .create_endpoint(&format!("{url}/api/test?allowPrivateIps=true"))
1425            .unwrap();
1426        let producer = endpoint.create_producer(&ctx).unwrap();
1427
1428        let exchange = Exchange::new(Message::default());
1429        let result = producer.oneshot(exchange).await.unwrap();
1430
1431        let status = result
1432            .input
1433            .header("CamelHttpResponseCode")
1434            .and_then(|v| v.as_u64())
1435            .unwrap();
1436        assert_eq!(status, 200);
1437
1438        assert!(!result.input.body.is_empty());
1439    }
1440
1441    #[tokio::test]
1442    async fn test_http_producer_post_with_body() {
1443        use tower::ServiceExt;
1444
1445        let (url, _handle) = start_test_server().await;
1446        let ctx = test_producer_ctx();
1447
1448        let component = HttpComponent::new();
1449        let endpoint = component
1450            .create_endpoint(&format!("{url}/api/data?allowPrivateIps=true"))
1451            .unwrap();
1452        let producer = endpoint.create_producer(&ctx).unwrap();
1453
1454        let exchange = Exchange::new(Message::new("request body"));
1455        let result = producer.oneshot(exchange).await.unwrap();
1456
1457        let status = result
1458            .input
1459            .header("CamelHttpResponseCode")
1460            .and_then(|v| v.as_u64())
1461            .unwrap();
1462        assert_eq!(status, 200);
1463    }
1464
1465    #[tokio::test]
1466    async fn test_http_producer_method_from_header() {
1467        use tower::ServiceExt;
1468
1469        let (url, _handle) = start_test_server().await;
1470        let ctx = test_producer_ctx();
1471
1472        let component = HttpComponent::new();
1473        let endpoint = component
1474            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1475            .unwrap();
1476        let producer = endpoint.create_producer(&ctx).unwrap();
1477
1478        let mut exchange = Exchange::new(Message::default());
1479        exchange.input.set_header(
1480            "CamelHttpMethod",
1481            serde_json::Value::String("DELETE".to_string()),
1482        );
1483
1484        let result = producer.oneshot(exchange).await.unwrap();
1485        let status = result
1486            .input
1487            .header("CamelHttpResponseCode")
1488            .and_then(|v| v.as_u64())
1489            .unwrap();
1490        assert_eq!(status, 200);
1491    }
1492
1493    #[tokio::test]
1494    async fn test_http_producer_forced_method() {
1495        use tower::ServiceExt;
1496
1497        let (url, _handle) = start_test_server().await;
1498        let ctx = test_producer_ctx();
1499
1500        let component = HttpComponent::new();
1501        let endpoint = component
1502            .create_endpoint(&format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"))
1503            .unwrap();
1504        let producer = endpoint.create_producer(&ctx).unwrap();
1505
1506        let exchange = Exchange::new(Message::default());
1507        let result = producer.oneshot(exchange).await.unwrap();
1508
1509        let status = result
1510            .input
1511            .header("CamelHttpResponseCode")
1512            .and_then(|v| v.as_u64())
1513            .unwrap();
1514        assert_eq!(status, 200);
1515    }
1516
1517    #[tokio::test]
1518    async fn test_http_producer_throw_exception_on_failure() {
1519        use tower::ServiceExt;
1520
1521        let (url, _handle) = start_status_server(404).await;
1522        let ctx = test_producer_ctx();
1523
1524        let component = HttpComponent::new();
1525        let endpoint = component
1526            .create_endpoint(&format!("{url}/not-found?allowPrivateIps=true"))
1527            .unwrap();
1528        let producer = endpoint.create_producer(&ctx).unwrap();
1529
1530        let exchange = Exchange::new(Message::default());
1531        let result = producer.oneshot(exchange).await;
1532        assert!(result.is_err());
1533
1534        match result.unwrap_err() {
1535            CamelError::HttpOperationFailed { status_code, .. } => {
1536                assert_eq!(status_code, 404);
1537            }
1538            e => panic!("Expected HttpOperationFailed, got: {e}"),
1539        }
1540    }
1541
1542    #[tokio::test]
1543    async fn test_http_producer_no_throw_on_failure() {
1544        use tower::ServiceExt;
1545
1546        let (url, _handle) = start_status_server(500).await;
1547        let ctx = test_producer_ctx();
1548
1549        let component = HttpComponent::new();
1550        let endpoint = component
1551            .create_endpoint(&format!(
1552                "{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"
1553            ))
1554            .unwrap();
1555        let producer = endpoint.create_producer(&ctx).unwrap();
1556
1557        let exchange = Exchange::new(Message::default());
1558        let result = producer.oneshot(exchange).await.unwrap();
1559
1560        let status = result
1561            .input
1562            .header("CamelHttpResponseCode")
1563            .and_then(|v| v.as_u64())
1564            .unwrap();
1565        assert_eq!(status, 500);
1566    }
1567
1568    #[tokio::test]
1569    async fn test_http_producer_uri_override() {
1570        use tower::ServiceExt;
1571
1572        let (url, _handle) = start_test_server().await;
1573        let ctx = test_producer_ctx();
1574
1575        let component = HttpComponent::new();
1576        let endpoint = component
1577            .create_endpoint("http://localhost:1/does-not-exist?allowPrivateIps=true")
1578            .unwrap();
1579        let producer = endpoint.create_producer(&ctx).unwrap();
1580
1581        let mut exchange = Exchange::new(Message::default());
1582        exchange.input.set_header(
1583            "CamelHttpUri",
1584            serde_json::Value::String(format!("{url}/api")),
1585        );
1586
1587        let result = producer.oneshot(exchange).await.unwrap();
1588        let status = result
1589            .input
1590            .header("CamelHttpResponseCode")
1591            .and_then(|v| v.as_u64())
1592            .unwrap();
1593        assert_eq!(status, 200);
1594    }
1595
1596    #[tokio::test]
1597    async fn test_http_producer_response_headers_mapped() {
1598        use tower::ServiceExt;
1599
1600        let (url, _handle) = start_test_server().await;
1601        let ctx = test_producer_ctx();
1602
1603        let component = HttpComponent::new();
1604        let endpoint = component
1605            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1606            .unwrap();
1607        let producer = endpoint.create_producer(&ctx).unwrap();
1608
1609        let exchange = Exchange::new(Message::default());
1610        let result = producer.oneshot(exchange).await.unwrap();
1611
1612        assert!(
1613            result.input.header("content-type").is_some()
1614                || result.input.header("Content-Type").is_some()
1615        );
1616        assert!(result.input.header("CamelHttpResponseText").is_some());
1617    }
1618
1619    // -----------------------------------------------------------------------
1620    // Bug fix tests: Client configuration per-endpoint
1621    // -----------------------------------------------------------------------
1622
1623    async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
1624        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1625        let addr = listener.local_addr().unwrap();
1626        let url = format!("http://127.0.0.1:{}", addr.port());
1627
1628        let handle = tokio::spawn(async move {
1629            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1630            loop {
1631                if let Ok((mut stream, _)) = listener.accept().await {
1632                    tokio::spawn(async move {
1633                        let mut buf = vec![0u8; 4096];
1634                        let n = stream.read(&mut buf).await.unwrap_or(0);
1635                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
1636
1637                        // Check if this is a request to /final
1638                        if request.contains("GET /final") {
1639                            let body = r#"{"status":"final"}"#;
1640                            let response = format!(
1641                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1642                                body.len(),
1643                                body
1644                            );
1645                            let _ = stream.write_all(response.as_bytes()).await;
1646                        } else {
1647                            // Redirect to /final
1648                            let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
1649                            let _ = stream.write_all(response.as_bytes()).await;
1650                        }
1651                    });
1652                }
1653            }
1654        });
1655
1656        (url, handle)
1657    }
1658
1659    #[tokio::test]
1660    async fn test_follow_redirects_false_does_not_follow() {
1661        use tower::ServiceExt;
1662
1663        let (url, _handle) = start_redirect_server().await;
1664        let ctx = test_producer_ctx();
1665
1666        let component = HttpComponent::new();
1667        let endpoint = component
1668            .create_endpoint(&format!(
1669                "{url}?followRedirects=false&throwExceptionOnFailure=false&allowPrivateIps=true"
1670            ))
1671            .unwrap();
1672        let producer = endpoint.create_producer(&ctx).unwrap();
1673
1674        let exchange = Exchange::new(Message::default());
1675        let result = producer.oneshot(exchange).await.unwrap();
1676
1677        // Should get 302, NOT follow redirect to 200
1678        let status = result
1679            .input
1680            .header("CamelHttpResponseCode")
1681            .and_then(|v| v.as_u64())
1682            .unwrap();
1683        assert_eq!(
1684            status, 302,
1685            "Should NOT follow redirect when followRedirects=false"
1686        );
1687    }
1688
1689    #[tokio::test]
1690    async fn test_follow_redirects_true_follows_redirect() {
1691        use tower::ServiceExt;
1692
1693        let (url, _handle) = start_redirect_server().await;
1694        let ctx = test_producer_ctx();
1695
1696        let component = HttpComponent::new();
1697        let endpoint = component
1698            .create_endpoint(&format!("{url}?followRedirects=true&allowPrivateIps=true"))
1699            .unwrap();
1700        let producer = endpoint.create_producer(&ctx).unwrap();
1701
1702        let exchange = Exchange::new(Message::default());
1703        let result = producer.oneshot(exchange).await.unwrap();
1704
1705        // Should follow redirect and get 200
1706        let status = result
1707            .input
1708            .header("CamelHttpResponseCode")
1709            .and_then(|v| v.as_u64())
1710            .unwrap();
1711        assert_eq!(
1712            status, 200,
1713            "Should follow redirect when followRedirects=true"
1714        );
1715    }
1716
1717    #[tokio::test]
1718    async fn test_query_params_forwarded_to_http_request() {
1719        use tower::ServiceExt;
1720
1721        let (url, _handle) = start_test_server().await;
1722        let ctx = test_producer_ctx();
1723
1724        let component = HttpComponent::new();
1725        // apiKey is NOT a Camel option, should be forwarded as query param
1726        let endpoint = component
1727            .create_endpoint(&format!(
1728                "{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"
1729            ))
1730            .unwrap();
1731        let producer = endpoint.create_producer(&ctx).unwrap();
1732
1733        let exchange = Exchange::new(Message::default());
1734        let result = producer.oneshot(exchange).await.unwrap();
1735
1736        // The test server returns the request info in response
1737        // We just verify it succeeds (the query param was sent)
1738        let status = result
1739            .input
1740            .header("CamelHttpResponseCode")
1741            .and_then(|v| v.as_u64())
1742            .unwrap();
1743        assert_eq!(status, 200);
1744    }
1745
1746    #[tokio::test]
1747    async fn test_non_camel_query_params_are_forwarded() {
1748        // This test verifies Bug #3 fix: non-Camel options should be forwarded
1749        // We'll test the config parsing, not the actual HTTP call
1750        let config = HttpEndpointConfig::from_uri(
1751            "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
1752        )
1753        .unwrap();
1754
1755        // apiKey and token are NOT Camel options, should be forwarded
1756        assert!(
1757            config.query_params.contains_key("apiKey"),
1758            "apiKey should be preserved"
1759        );
1760        assert!(
1761            config.query_params.contains_key("token"),
1762            "token should be preserved"
1763        );
1764        assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
1765        assert_eq!(config.query_params.get("token").unwrap(), "abc456");
1766
1767        // httpMethod IS a Camel option, should NOT be in query_params
1768        assert!(
1769            !config.query_params.contains_key("httpMethod"),
1770            "httpMethod should not be forwarded"
1771        );
1772    }
1773
1774    // -----------------------------------------------------------------------
1775    // SSRF Protection tests
1776    // -----------------------------------------------------------------------
1777
1778    #[tokio::test]
1779    async fn test_http_producer_blocks_metadata_endpoint() {
1780        use tower::ServiceExt;
1781
1782        let ctx = test_producer_ctx();
1783        let component = HttpComponent::new();
1784        let endpoint = component
1785            .create_endpoint("http://example.com/api?allowPrivateIps=false")
1786            .unwrap();
1787        let producer = endpoint.create_producer(&ctx).unwrap();
1788
1789        let mut exchange = Exchange::new(Message::default());
1790        exchange.input.set_header(
1791            "CamelHttpUri",
1792            serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
1793        );
1794
1795        let result = producer.oneshot(exchange).await;
1796        assert!(result.is_err(), "Should block AWS metadata endpoint");
1797
1798        let err = result.unwrap_err();
1799        assert!(
1800            err.to_string().contains("Private IP"),
1801            "Error should mention private IP blocking, got: {}",
1802            err
1803        );
1804    }
1805
1806    #[test]
1807    fn test_ssrf_config_defaults() {
1808        let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
1809        assert!(
1810            !config.allow_private_ips,
1811            "Private IPs should be blocked by default"
1812        );
1813        assert!(
1814            config.blocked_hosts.is_empty(),
1815            "Blocked hosts should be empty by default"
1816        );
1817    }
1818
1819    #[test]
1820    fn test_ssrf_config_allow_private_ips() {
1821        let config =
1822            HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
1823        assert!(
1824            config.allow_private_ips,
1825            "Private IPs should be allowed when explicitly set"
1826        );
1827    }
1828
1829    #[test]
1830    fn test_ssrf_config_blocked_hosts() {
1831        let config = HttpEndpointConfig::from_uri(
1832            "http://example.com/api?blockedHosts=evil.com,malware.net",
1833        )
1834        .unwrap();
1835        assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
1836    }
1837
1838    #[tokio::test]
1839    async fn test_http_producer_blocks_localhost() {
1840        use tower::ServiceExt;
1841
1842        let ctx = test_producer_ctx();
1843        let component = HttpComponent::new();
1844        let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1845        let producer = endpoint.create_producer(&ctx).unwrap();
1846
1847        let mut exchange = Exchange::new(Message::default());
1848        exchange.input.set_header(
1849            "CamelHttpUri",
1850            serde_json::Value::String("http://localhost:8080/internal".to_string()),
1851        );
1852
1853        let result = producer.oneshot(exchange).await;
1854        assert!(result.is_err(), "Should block localhost");
1855    }
1856
1857    #[tokio::test]
1858    async fn test_http_producer_blocks_loopback_ip() {
1859        use tower::ServiceExt;
1860
1861        let ctx = test_producer_ctx();
1862        let component = HttpComponent::new();
1863        let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1864        let producer = endpoint.create_producer(&ctx).unwrap();
1865
1866        let mut exchange = Exchange::new(Message::default());
1867        exchange.input.set_header(
1868            "CamelHttpUri",
1869            serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
1870        );
1871
1872        let result = producer.oneshot(exchange).await;
1873        assert!(result.is_err(), "Should block loopback IP");
1874    }
1875
1876    #[tokio::test]
1877    async fn test_http_producer_allows_private_ip_when_enabled() {
1878        use tower::ServiceExt;
1879
1880        let ctx = test_producer_ctx();
1881        let component = HttpComponent::new();
1882        // With allowPrivateIps=true, the validation should pass
1883        // (actual connection will fail, but that's expected)
1884        let endpoint = component
1885            .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true")
1886            .unwrap();
1887        let producer = endpoint.create_producer(&ctx).unwrap();
1888
1889        let exchange = Exchange::new(Message::default());
1890
1891        // The request will fail because we can't connect, but it should NOT fail
1892        // due to SSRF protection
1893        let result = producer.oneshot(exchange).await;
1894        // We expect connection error, not SSRF error
1895        if let Err(ref e) = result {
1896            let err_str = e.to_string();
1897            assert!(
1898                !err_str.contains("Private IP") && !err_str.contains("not allowed"),
1899                "Should not be SSRF error, got: {}",
1900                err_str
1901            );
1902        }
1903    }
1904
1905    // -----------------------------------------------------------------------
1906    // HttpServerConfig tests
1907    // -----------------------------------------------------------------------
1908
1909    #[test]
1910    fn test_http_server_config_parse() {
1911        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
1912        assert_eq!(cfg.host, "0.0.0.0");
1913        assert_eq!(cfg.port, 8080);
1914        assert_eq!(cfg.path, "/orders");
1915    }
1916
1917    #[test]
1918    fn test_http_server_config_scheme() {
1919        // UriConfig trait method returns "http" as primary scheme
1920        assert_eq!(HttpServerConfig::scheme(), "http");
1921    }
1922
1923    #[test]
1924    fn test_http_server_config_from_components() {
1925        // Test from_components directly (trait method)
1926        let components = camel_endpoint::UriComponents {
1927            scheme: "https".to_string(),
1928            path: "//0.0.0.0:8443/api".to_string(),
1929            params: std::collections::HashMap::from([(
1930                "maxRequestBody".to_string(),
1931                "5242880".to_string(),
1932            )]),
1933        };
1934        let cfg = HttpServerConfig::from_components(components).unwrap();
1935        assert_eq!(cfg.host, "0.0.0.0");
1936        assert_eq!(cfg.port, 8443);
1937        assert_eq!(cfg.path, "/api");
1938        assert_eq!(cfg.max_request_body, 5242880);
1939    }
1940
1941    #[test]
1942    fn test_http_server_config_default_path() {
1943        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
1944        assert_eq!(cfg.path, "/");
1945    }
1946
1947    #[test]
1948    fn test_http_server_config_wrong_scheme() {
1949        assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
1950    }
1951
1952    #[test]
1953    fn test_http_server_config_invalid_port() {
1954        assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
1955    }
1956
1957    #[test]
1958    fn test_http_server_config_default_port_by_scheme() {
1959        // HTTP without explicit port should default to 80
1960        let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
1961        assert_eq!(cfg_http.port, 80);
1962
1963        // HTTPS without explicit port should default to 443
1964        let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
1965        assert_eq!(cfg_https.port, 443);
1966    }
1967
1968    #[test]
1969    fn test_request_envelope_and_reply_are_send() {
1970        fn assert_send<T: Send>() {}
1971        assert_send::<RequestEnvelope>();
1972        assert_send::<HttpReply>();
1973    }
1974
1975    // -----------------------------------------------------------------------
1976    // ServerRegistry tests
1977    // -----------------------------------------------------------------------
1978
1979    #[test]
1980    fn test_server_registry_global_is_singleton() {
1981        let r1 = ServerRegistry::global();
1982        let r2 = ServerRegistry::global();
1983        assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
1984    }
1985
1986    // -----------------------------------------------------------------------
1987    // Axum dispatch handler tests
1988    // -----------------------------------------------------------------------
1989
1990    #[tokio::test]
1991    async fn test_dispatch_handler_returns_404_for_unknown_path() {
1992        let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
1993        // Nothing registered in the dispatch table
1994        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1995        let port = listener.local_addr().unwrap().port();
1996        tokio::spawn(run_axum_server(listener, dispatch, 2 * 1024 * 1024));
1997
1998        // Wait for server to start
1999        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2000
2001        let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
2002            .await
2003            .unwrap();
2004        assert_eq!(resp.status().as_u16(), 404);
2005    }
2006
2007    // -----------------------------------------------------------------------
2008    // HttpConsumer tests
2009    // -----------------------------------------------------------------------
2010
2011    #[tokio::test]
2012    async fn test_http_consumer_start_registers_path() {
2013        use camel_component::ConsumerContext;
2014
2015        // Get an OS-assigned free port
2016        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2017        let port = listener.local_addr().unwrap().port();
2018        drop(listener); // Release port — ServerRegistry will rebind it
2019
2020        let consumer_cfg = HttpServerConfig {
2021            host: "127.0.0.1".to_string(),
2022            port,
2023            path: "/ping".to_string(),
2024            max_request_body: 2 * 1024 * 1024,
2025            max_response_body: 10 * 1024 * 1024,
2026        };
2027        let mut consumer = HttpConsumer::new(consumer_cfg);
2028
2029        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component::ExchangeEnvelope>(16);
2030        let token = tokio_util::sync::CancellationToken::new();
2031        let ctx = ConsumerContext::new(tx, token.clone());
2032
2033        tokio::spawn(async move {
2034            consumer.start(ctx).await.unwrap();
2035        });
2036
2037        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2038
2039        let client = reqwest::Client::new();
2040        let resp_future = client
2041            .post(format!("http://127.0.0.1:{port}/ping"))
2042            .body("hello world")
2043            .send();
2044
2045        let (http_result, _) = tokio::join!(resp_future, async {
2046            if let Some(mut envelope) = rx.recv().await {
2047                // Set a custom status code
2048                envelope.exchange.input.set_header(
2049                    "CamelHttpResponseCode",
2050                    serde_json::Value::Number(201.into()),
2051                );
2052                if let Some(reply_tx) = envelope.reply_tx {
2053                    let _ = reply_tx.send(Ok(envelope.exchange));
2054                }
2055            }
2056        });
2057
2058        let resp = http_result.unwrap();
2059        assert_eq!(resp.status().as_u16(), 201);
2060
2061        token.cancel();
2062    }
2063
2064    // -----------------------------------------------------------------------
2065    // Integration tests
2066    // -----------------------------------------------------------------------
2067
2068    #[tokio::test]
2069    async fn test_integration_single_consumer_round_trip() {
2070        use camel_component::{ConsumerContext, ExchangeEnvelope};
2071
2072        // Get an OS-assigned free port (ephemeral)
2073        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2074        let port = listener.local_addr().unwrap().port();
2075        drop(listener); // Release — ServerRegistry will rebind
2076
2077        let component = HttpComponent::new();
2078        let endpoint = component
2079            .create_endpoint(&format!("http://127.0.0.1:{port}/echo"))
2080            .unwrap();
2081        let mut consumer = endpoint.create_consumer().unwrap();
2082
2083        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2084        let token = tokio_util::sync::CancellationToken::new();
2085        let ctx = ConsumerContext::new(tx, token.clone());
2086
2087        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2088        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2089
2090        let client = reqwest::Client::new();
2091        let send_fut = client
2092            .post(format!("http://127.0.0.1:{port}/echo"))
2093            .header("Content-Type", "text/plain")
2094            .body("ping")
2095            .send();
2096
2097        let (http_result, _) = tokio::join!(send_fut, async {
2098            if let Some(mut envelope) = rx.recv().await {
2099                assert_eq!(
2100                    envelope.exchange.input.header("CamelHttpMethod"),
2101                    Some(&serde_json::Value::String("POST".into()))
2102                );
2103                assert_eq!(
2104                    envelope.exchange.input.header("CamelHttpPath"),
2105                    Some(&serde_json::Value::String("/echo".into()))
2106                );
2107                envelope.exchange.input.body = camel_api::body::Body::Text("pong".to_string());
2108                if let Some(reply_tx) = envelope.reply_tx {
2109                    let _ = reply_tx.send(Ok(envelope.exchange));
2110                }
2111            }
2112        });
2113
2114        let resp = http_result.unwrap();
2115        assert_eq!(resp.status().as_u16(), 200);
2116        let body = resp.text().await.unwrap();
2117        assert_eq!(body, "pong");
2118
2119        token.cancel();
2120    }
2121
2122    #[tokio::test]
2123    async fn test_integration_two_consumers_shared_port() {
2124        use camel_component::{ConsumerContext, ExchangeEnvelope};
2125
2126        // Get an OS-assigned free port (ephemeral)
2127        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2128        let port = listener.local_addr().unwrap().port();
2129        drop(listener);
2130
2131        let component = HttpComponent::new();
2132
2133        // Consumer A: /hello
2134        let endpoint_a = component
2135            .create_endpoint(&format!("http://127.0.0.1:{port}/hello"))
2136            .unwrap();
2137        let mut consumer_a = endpoint_a.create_consumer().unwrap();
2138
2139        // Consumer B: /world
2140        let endpoint_b = component
2141            .create_endpoint(&format!("http://127.0.0.1:{port}/world"))
2142            .unwrap();
2143        let mut consumer_b = endpoint_b.create_consumer().unwrap();
2144
2145        let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2146        let token_a = tokio_util::sync::CancellationToken::new();
2147        let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
2148
2149        let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2150        let token_b = tokio_util::sync::CancellationToken::new();
2151        let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
2152
2153        tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
2154        tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
2155        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2156
2157        let client = reqwest::Client::new();
2158
2159        // Request to /hello
2160        let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
2161        let (resp_hello, _) = tokio::join!(fut_hello, async {
2162            if let Some(mut envelope) = rx_a.recv().await {
2163                envelope.exchange.input.body =
2164                    camel_api::body::Body::Text("hello-response".to_string());
2165                if let Some(reply_tx) = envelope.reply_tx {
2166                    let _ = reply_tx.send(Ok(envelope.exchange));
2167                }
2168            }
2169        });
2170
2171        // Request to /world
2172        let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
2173        let (resp_world, _) = tokio::join!(fut_world, async {
2174            if let Some(mut envelope) = rx_b.recv().await {
2175                envelope.exchange.input.body =
2176                    camel_api::body::Body::Text("world-response".to_string());
2177                if let Some(reply_tx) = envelope.reply_tx {
2178                    let _ = reply_tx.send(Ok(envelope.exchange));
2179                }
2180            }
2181        });
2182
2183        let body_a = resp_hello.unwrap().text().await.unwrap();
2184        let body_b = resp_world.unwrap().text().await.unwrap();
2185
2186        assert_eq!(body_a, "hello-response");
2187        assert_eq!(body_b, "world-response");
2188
2189        token_a.cancel();
2190        token_b.cancel();
2191    }
2192
2193    #[tokio::test]
2194    async fn test_integration_unregistered_path_returns_404() {
2195        use camel_component::{ConsumerContext, ExchangeEnvelope};
2196
2197        // Get an OS-assigned free port (ephemeral)
2198        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2199        let port = listener.local_addr().unwrap().port();
2200        drop(listener);
2201
2202        let component = HttpComponent::new();
2203        let endpoint = component
2204            .create_endpoint(&format!("http://127.0.0.1:{port}/registered"))
2205            .unwrap();
2206        let mut consumer = endpoint.create_consumer().unwrap();
2207
2208        let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2209        let token = tokio_util::sync::CancellationToken::new();
2210        let ctx = ConsumerContext::new(tx, token.clone());
2211
2212        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2213        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2214
2215        let client = reqwest::Client::new();
2216        let resp = client
2217            .get(format!("http://127.0.0.1:{port}/not-there"))
2218            .send()
2219            .await
2220            .unwrap();
2221        assert_eq!(resp.status().as_u16(), 404);
2222
2223        token.cancel();
2224    }
2225
2226    #[test]
2227    fn test_http_consumer_declares_concurrent() {
2228        use camel_component::ConcurrencyModel;
2229
2230        let config = HttpServerConfig {
2231            host: "127.0.0.1".to_string(),
2232            port: 19999,
2233            path: "/test".to_string(),
2234            max_request_body: 2 * 1024 * 1024,
2235            max_response_body: 10 * 1024 * 1024,
2236        };
2237        let consumer = HttpConsumer::new(config);
2238        assert_eq!(
2239            consumer.concurrency_model(),
2240            ConcurrencyModel::Concurrent { max: None }
2241        );
2242    }
2243
2244    // -----------------------------------------------------------------------
2245    // HttpReplyBody streaming tests
2246    // -----------------------------------------------------------------------
2247
2248    #[tokio::test]
2249    async fn test_http_reply_body_stream_variant_exists() {
2250        use bytes::Bytes;
2251        use camel_api::CamelError;
2252        use futures::stream;
2253
2254        let chunks: Vec<Result<Bytes, CamelError>> =
2255            vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
2256        let stream = Box::pin(stream::iter(chunks));
2257        let reply_body = HttpReplyBody::Stream(stream);
2258        // Si compila y el match funciona, el test pasa
2259        match reply_body {
2260            HttpReplyBody::Stream(_) => {}
2261            HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
2262        }
2263    }
2264
2265    // -----------------------------------------------------------------------
2266    // OpenTelemetry propagation tests (only compiled with "otel" feature)
2267    // -----------------------------------------------------------------------
2268
2269    #[cfg(feature = "otel")]
2270    mod otel_tests {
2271        use super::*;
2272        use camel_api::Message;
2273        use tower::ServiceExt;
2274
2275        #[tokio::test]
2276        async fn test_producer_injects_traceparent_header() {
2277            let (url, _handle) = start_test_server_with_header_capture().await;
2278            let ctx = test_producer_ctx();
2279
2280            let component = HttpComponent::new();
2281            let endpoint = component
2282                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
2283                .unwrap();
2284            let producer = endpoint.create_producer(&ctx).unwrap();
2285
2286            // Create exchange with an OTel context by extracting from a traceparent header
2287            let mut exchange = Exchange::new(Message::default());
2288            let mut headers = std::collections::HashMap::new();
2289            headers.insert(
2290                "traceparent".to_string(),
2291                "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
2292            );
2293            camel_otel::extract_into_exchange(&mut exchange, &headers);
2294
2295            let result = producer.oneshot(exchange).await.unwrap();
2296
2297            // Verify request succeeded
2298            let status = result
2299                .input
2300                .header("CamelHttpResponseCode")
2301                .and_then(|v| v.as_u64())
2302                .unwrap();
2303            assert_eq!(status, 200);
2304
2305            // The test server echoes back the received traceparent header
2306            let traceparent = result.input.header("x-received-traceparent");
2307            assert!(
2308                traceparent.is_some(),
2309                "traceparent header should have been sent"
2310            );
2311
2312            let traceparent_str = traceparent.unwrap().as_str().unwrap();
2313            // Verify format: version-traceid-spanid-flags
2314            let parts: Vec<&str> = traceparent_str.split('-').collect();
2315            assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2316            assert_eq!(parts[0], "00", "version should be 00");
2317            assert_eq!(
2318                parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2319                "trace-id should match"
2320            );
2321            assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
2322            assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
2323        }
2324
2325        #[tokio::test]
2326        async fn test_consumer_extracts_traceparent_header() {
2327            use camel_component::{ConsumerContext, ExchangeEnvelope};
2328
2329            // Get an OS-assigned free port
2330            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2331            let port = listener.local_addr().unwrap().port();
2332            drop(listener);
2333
2334            let component = HttpComponent::new();
2335            let endpoint = component
2336                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"))
2337                .unwrap();
2338            let mut consumer = endpoint.create_consumer().unwrap();
2339
2340            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2341            let token = tokio_util::sync::CancellationToken::new();
2342            let ctx = ConsumerContext::new(tx, token.clone());
2343
2344            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2345            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2346
2347            // Send request with traceparent header
2348            let client = reqwest::Client::new();
2349            let send_fut = client
2350                .post(format!("http://127.0.0.1:{port}/trace"))
2351                .header(
2352                    "traceparent",
2353                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2354                )
2355                .body("test")
2356                .send();
2357
2358            let (http_result, _) = tokio::join!(send_fut, async {
2359                if let Some(envelope) = rx.recv().await {
2360                    // Verify the exchange has a valid OTel context by re-injecting it
2361                    // and checking the traceparent matches
2362                    let mut injected_headers = std::collections::HashMap::new();
2363                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2364
2365                    assert!(
2366                        injected_headers.contains_key("traceparent"),
2367                        "Exchange should have traceparent after extraction"
2368                    );
2369
2370                    let traceparent = injected_headers.get("traceparent").unwrap();
2371                    let parts: Vec<&str> = traceparent.split('-').collect();
2372                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2373                    assert_eq!(
2374                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2375                        "Trace ID should match the original traceparent header"
2376                    );
2377
2378                    if let Some(reply_tx) = envelope.reply_tx {
2379                        let _ = reply_tx.send(Ok(envelope.exchange));
2380                    }
2381                }
2382            });
2383
2384            let resp = http_result.unwrap();
2385            assert_eq!(resp.status().as_u16(), 200);
2386
2387            token.cancel();
2388        }
2389
2390        #[tokio::test]
2391        async fn test_consumer_extracts_mixed_case_traceparent_header() {
2392            use camel_component::{ConsumerContext, ExchangeEnvelope};
2393
2394            // Get an OS-assigned free port
2395            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2396            let port = listener.local_addr().unwrap().port();
2397            drop(listener);
2398
2399            let component = HttpComponent::new();
2400            let endpoint = component
2401                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"))
2402                .unwrap();
2403            let mut consumer = endpoint.create_consumer().unwrap();
2404
2405            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2406            let token = tokio_util::sync::CancellationToken::new();
2407            let ctx = ConsumerContext::new(tx, token.clone());
2408
2409            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2410            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2411
2412            // Send request with MIXED-CASE TraceParent header (not lowercase)
2413            let client = reqwest::Client::new();
2414            let send_fut = client
2415                .post(format!("http://127.0.0.1:{port}/trace"))
2416                .header(
2417                    "TraceParent",
2418                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2419                )
2420                .body("test")
2421                .send();
2422
2423            let (http_result, _) = tokio::join!(send_fut, async {
2424                if let Some(envelope) = rx.recv().await {
2425                    // Verify the exchange has a valid OTel context by re-injecting it
2426                    // and checking the traceparent matches
2427                    let mut injected_headers = HashMap::new();
2428                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2429
2430                    assert!(
2431                        injected_headers.contains_key("traceparent"),
2432                        "Exchange should have traceparent after extraction from mixed-case header"
2433                    );
2434
2435                    let traceparent = injected_headers.get("traceparent").unwrap();
2436                    let parts: Vec<&str> = traceparent.split('-').collect();
2437                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2438                    assert_eq!(
2439                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2440                        "Trace ID should match the original mixed-case TraceParent header"
2441                    );
2442
2443                    if let Some(reply_tx) = envelope.reply_tx {
2444                        let _ = reply_tx.send(Ok(envelope.exchange));
2445                    }
2446                }
2447            });
2448
2449            let resp = http_result.unwrap();
2450            assert_eq!(resp.status().as_u16(), 200);
2451
2452            token.cancel();
2453        }
2454
2455        #[tokio::test]
2456        async fn test_producer_no_trace_context_no_crash() {
2457            let (url, _handle) = start_test_server().await;
2458            let ctx = test_producer_ctx();
2459
2460            let component = HttpComponent::new();
2461            let endpoint = component
2462                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
2463                .unwrap();
2464            let producer = endpoint.create_producer(&ctx).unwrap();
2465
2466            // Create exchange with default (empty) otel_context - no trace context
2467            let exchange = Exchange::new(Message::default());
2468
2469            // Should succeed without panic
2470            let result = producer.oneshot(exchange).await.unwrap();
2471
2472            // Verify request succeeded
2473            let status = result
2474                .input
2475                .header("CamelHttpResponseCode")
2476                .and_then(|v| v.as_u64())
2477                .unwrap();
2478            assert_eq!(status, 200);
2479        }
2480
2481        /// Test server that captures and echoes back the traceparent header
2482        async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
2483            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2484            let addr = listener.local_addr().unwrap();
2485            let url = format!("http://127.0.0.1:{}", addr.port());
2486
2487            let handle = tokio::spawn(async move {
2488                loop {
2489                    if let Ok((mut stream, _)) = listener.accept().await {
2490                        tokio::spawn(async move {
2491                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
2492                            let mut buf = vec![0u8; 8192];
2493                            let n = stream.read(&mut buf).await.unwrap_or(0);
2494                            let request = String::from_utf8_lossy(&buf[..n]).to_string();
2495
2496                            // Extract traceparent header from request
2497                            let traceparent = request
2498                                .lines()
2499                                .find(|line| line.to_lowercase().starts_with("traceparent:"))
2500                                .map(|line| {
2501                                    line.split(':')
2502                                        .nth(1)
2503                                        .map(|s| s.trim().to_string())
2504                                        .unwrap_or_default()
2505                                })
2506                                .unwrap_or_default();
2507
2508                            let body =
2509                                format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
2510                            let response = format!(
2511                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
2512                                body.len(),
2513                                traceparent,
2514                                body
2515                            );
2516                            let _ = stream.write_all(response.as_bytes()).await;
2517                        });
2518                    }
2519                }
2520            });
2521
2522            (url, handle)
2523        }
2524    }
2525
2526    // -----------------------------------------------------------------------
2527    // Response streaming tests (Eje A - Task 2)
2528    // -----------------------------------------------------------------------
2529
2530    // -----------------------------------------------------------------------
2531    // Request streaming tests (Eje B - Task 3)
2532    // -----------------------------------------------------------------------
2533
2534    #[tokio::test]
2535    async fn test_request_body_arrives_as_stream() {
2536        use camel_api::body::Body;
2537        use camel_component::{ConsumerContext, ExchangeEnvelope};
2538
2539        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2540        let port = listener.local_addr().unwrap().port();
2541        drop(listener);
2542
2543        let component = HttpComponent::new();
2544        let endpoint = component
2545            .create_endpoint(&format!("http://127.0.0.1:{port}/upload"))
2546            .unwrap();
2547        let mut consumer = endpoint.create_consumer().unwrap();
2548
2549        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2550        let token = tokio_util::sync::CancellationToken::new();
2551        let ctx = ConsumerContext::new(tx, token.clone());
2552
2553        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2554        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2555
2556        let client = reqwest::Client::new();
2557        let send_fut = client
2558            .post(format!("http://127.0.0.1:{port}/upload"))
2559            .body("hello streaming world")
2560            .send();
2561
2562        let (http_result, _) = tokio::join!(send_fut, async {
2563            if let Some(mut envelope) = rx.recv().await {
2564                // Body must be Body::Stream, not Body::Text or Body::Bytes
2565                assert!(
2566                    matches!(envelope.exchange.input.body, Body::Stream(_)),
2567                    "expected Body::Stream, got discriminant {:?}",
2568                    std::mem::discriminant(&envelope.exchange.input.body)
2569                );
2570                // Materialize to verify content
2571                let bytes = envelope
2572                    .exchange
2573                    .input
2574                    .body
2575                    .into_bytes(1024 * 1024)
2576                    .await
2577                    .unwrap();
2578                assert_eq!(&bytes[..], b"hello streaming world");
2579
2580                envelope.exchange.input.body = camel_api::body::Body::Empty;
2581                if let Some(reply_tx) = envelope.reply_tx {
2582                    let _ = reply_tx.send(Ok(envelope.exchange));
2583                }
2584            }
2585        });
2586
2587        let resp = http_result.unwrap();
2588        assert_eq!(resp.status().as_u16(), 200);
2589
2590        token.cancel();
2591    }
2592
2593    // -----------------------------------------------------------------------
2594    // Response streaming tests (Eje A - Task 2)
2595    // -----------------------------------------------------------------------
2596
2597    #[tokio::test]
2598    async fn test_streaming_response_chunked() {
2599        use bytes::Bytes;
2600        use camel_api::CamelError;
2601        use camel_api::body::{Body, StreamBody, StreamMetadata};
2602        use camel_component::{ConsumerContext, ExchangeEnvelope};
2603        use futures::stream;
2604        use std::sync::Arc;
2605        use tokio::sync::Mutex;
2606
2607        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2608        let port = listener.local_addr().unwrap().port();
2609        drop(listener);
2610
2611        let component = HttpComponent::new();
2612        let endpoint = component
2613            .create_endpoint(&format!("http://127.0.0.1:{port}/stream"))
2614            .unwrap();
2615        let mut consumer = endpoint.create_consumer().unwrap();
2616
2617        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2618        let token = tokio_util::sync::CancellationToken::new();
2619        let ctx = ConsumerContext::new(tx, token.clone());
2620
2621        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2622        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2623
2624        let client = reqwest::Client::new();
2625        let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
2626
2627        let (http_result, _) = tokio::join!(send_fut, async {
2628            if let Some(mut envelope) = rx.recv().await {
2629                // Respond with Body::Stream
2630                let chunks: Vec<Result<Bytes, CamelError>> =
2631                    vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
2632                let stream = Box::pin(stream::iter(chunks));
2633                envelope.exchange.input.body = Body::Stream(StreamBody {
2634                    stream: Arc::new(Mutex::new(Some(stream))),
2635                    metadata: StreamMetadata::default(),
2636                });
2637                if let Some(reply_tx) = envelope.reply_tx {
2638                    let _ = reply_tx.send(Ok(envelope.exchange));
2639                }
2640            }
2641        });
2642
2643        let resp = http_result.unwrap();
2644        assert_eq!(resp.status().as_u16(), 200);
2645        let body = resp.text().await.unwrap();
2646        assert_eq!(body, "chunk1chunk2");
2647
2648        token.cancel();
2649    }
2650
2651    // -----------------------------------------------------------------------
2652    // 413 Content-Length limit test (Task 4)
2653    // -----------------------------------------------------------------------
2654
2655    #[tokio::test]
2656    async fn test_413_when_content_length_exceeds_limit() {
2657        use camel_component::ConsumerContext;
2658
2659        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2660        let port = listener.local_addr().unwrap().port();
2661        drop(listener);
2662
2663        // maxRequestBody=100 — any request declaring more than 100 bytes must get 413
2664        let component = HttpComponent::new();
2665        let endpoint = component
2666            .create_endpoint(&format!(
2667                "http://127.0.0.1:{port}/upload?maxRequestBody=100"
2668            ))
2669            .unwrap();
2670        let mut consumer = endpoint.create_consumer().unwrap();
2671
2672        let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component::ExchangeEnvelope>(16);
2673        let token = tokio_util::sync::CancellationToken::new();
2674        let ctx = ConsumerContext::new(tx, token.clone());
2675
2676        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2677        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2678
2679        let client = reqwest::Client::new();
2680        let resp = client
2681            .post(format!("http://127.0.0.1:{port}/upload"))
2682            .header("Content-Length", "1000") // declares 1000 bytes, limit is 100
2683            .body("x".repeat(1000))
2684            .send()
2685            .await
2686            .unwrap();
2687
2688        assert_eq!(resp.status().as_u16(), 413);
2689
2690        token.cancel();
2691    }
2692
2693    /// Chunked upload without Content-Length header must NOT be rejected by maxRequestBody.
2694    /// The spec says: "If there is no Content-Length, the limit does not apply at the
2695    /// consumer level — the route is responsible."
2696    #[tokio::test]
2697    async fn test_chunked_upload_without_content_length_bypasses_limit() {
2698        use bytes::Bytes;
2699        use camel_api::body::Body;
2700        use camel_component::ConsumerContext;
2701        use futures::stream;
2702
2703        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2704        let port = listener.local_addr().unwrap().port();
2705        drop(listener);
2706
2707        // maxRequestBody=10 — very small limit; chunked uploads have no Content-Length
2708        let component = HttpComponent::new();
2709        let endpoint = component
2710            .create_endpoint(&format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"))
2711            .unwrap();
2712        let mut consumer = endpoint.create_consumer().unwrap();
2713
2714        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component::ExchangeEnvelope>(16);
2715        let token = tokio_util::sync::CancellationToken::new();
2716        let ctx = ConsumerContext::new(tx, token.clone());
2717
2718        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2719        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2720
2721        let client = reqwest::Client::new();
2722
2723        // Use wrap_stream so reqwest sends chunked transfer encoding WITHOUT a
2724        // Content-Length header. 100 bytes exceeds the 10-byte maxRequestBody limit,
2725        // but since there's no Content-Length the 413 check must NOT fire.
2726        let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
2727            Ok(Bytes::from("y".repeat(50))),
2728            Ok(Bytes::from("y".repeat(50))),
2729        ];
2730        let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
2731        let send_fut = client
2732            .post(format!("http://127.0.0.1:{port}/upload"))
2733            .body(stream_body)
2734            .send();
2735
2736        let consumer_fut = async {
2737            // Use timeout to avoid deadlock if the handler rejects before enqueueing
2738            match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
2739                Ok(Some(mut envelope)) => {
2740                    assert!(
2741                        matches!(envelope.exchange.input.body, Body::Stream(_)),
2742                        "expected Body::Stream"
2743                    );
2744                    envelope.exchange.input.body = camel_api::body::Body::Empty;
2745                    if let Some(reply_tx) = envelope.reply_tx {
2746                        let _ = reply_tx.send(Ok(envelope.exchange));
2747                    }
2748                }
2749                Ok(None) => panic!("consumer channel closed unexpectedly"),
2750                Err(_) => {
2751                    // Timeout: the request was rejected before reaching the consumer.
2752                    // The HTTP response will carry the real status code (we check below).
2753                }
2754            }
2755        };
2756
2757        let (http_result, _) = tokio::join!(send_fut, consumer_fut);
2758
2759        let resp = http_result.unwrap();
2760        // Must NOT be 413; chunked uploads without Content-Length bypass the limit.
2761        assert_ne!(
2762            resp.status().as_u16(),
2763            413,
2764            "chunked upload must not be rejected by maxRequestBody"
2765        );
2766        assert_eq!(resp.status().as_u16(), 200);
2767
2768        token.cancel();
2769    }
2770}