Skip to main content

camel_component_http/
lib.rs

1pub mod config;
2pub use config::HttpConfig;
3
4use std::collections::HashMap;
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::{Arc, Mutex, OnceLock};
8use std::task::{Context, Poll};
9use std::time::Duration;
10
11use tokio::sync::{OnceCell, RwLock};
12use tower::Service;
13use tracing::debug;
14
15use axum::body::BodyDataStream;
16use camel_api::{
17    BoxProcessor, CamelError, Exchange,
18    body::{Body, StreamBody, StreamMetadata},
19};
20use camel_component::{Component, Consumer, Endpoint, ProducerContext};
21use camel_endpoint::{UriComponents, UriConfig, parse_uri};
22use futures::TryStreamExt;
23use futures::stream::BoxStream;
24
25// ---------------------------------------------------------------------------
26// HttpEndpointConfig
27// ---------------------------------------------------------------------------
28
29/// Configuration for an HTTP client (producer) endpoint.
30///
31/// # Memory Limits
32///
33/// HTTP operations enforce conservative memory limits to prevent denial-of-service
34/// attacks from untrusted network sources. These limits are significantly lower than
35/// file component limits (100MB) because HTTP typically handles API responses rather
36/// than large file transfers, and clients may be untrusted.
37///
38/// ## Default Limits
39///
40/// - **HTTP client body**: 10MB (typical API responses)
41/// - **HTTP server request**: 2MB (untrusted network input - see `HttpServerConfig`)
42/// - **HTTP server response**: 10MB (same as client - see `HttpServerConfig`)
43///
44/// ## Rationale
45///
46/// The 10MB limit for HTTP client responses is appropriate for most API interactions
47/// while providing protection against:
48/// - Malicious servers sending oversized responses
49/// - Runaway processes generating unexpectedly large payloads
50/// - Memory exhaustion attacks
51///
52/// The 2MB server request limit is even more conservative because it handles input
53/// from potentially untrusted clients on the public internet.
54///
55/// ## Overriding Limits
56///
57/// Override the default client body limit using the `maxBodySize` URI parameter:
58///
59/// ```text
60/// http://api.example.com/large-data?maxBodySize=52428800
61/// ```
62///
63/// For server endpoints, use `maxRequestBody` and `maxResponseBody` parameters:
64///
65/// ```text
66/// http://0.0.0.0:8080/upload?maxRequestBody=52428800
67/// ```
68///
69/// ## Behavior When Exceeded
70///
71/// When a body exceeds the configured limit:
72/// - An error is returned immediately
73/// - No memory is exhausted - the limit is checked before allocation
74/// - The HTTP connection is terminated cleanly
75///
76/// ## Security Considerations
77///
78/// HTTP endpoints should be treated with more caution than file endpoints because:
79/// - Clients may be unknown and untrusted
80/// - Network traffic can be spoofed or malicious
81/// - DoS attacks often exploit unbounded resource consumption
82///
83/// Only increase limits when you control both ends of the connection or when
84/// business requirements demand larger payloads.
85#[derive(Debug, Clone)]
86pub struct HttpEndpointConfig {
87    pub base_url: String,
88    pub http_method: Option<String>,
89    pub throw_exception_on_failure: bool,
90    pub ok_status_code_range: (u16, u16),
91    pub response_timeout: Option<Duration>,
92    pub query_params: HashMap<String, String>,
93    pub allow_private_ips: bool,
94    pub blocked_hosts: Vec<String>,
95    pub max_body_size: usize,
96}
97
98/// Camel options that should NOT be forwarded as HTTP query params
99const HTTP_CAMEL_OPTIONS: &[&str] = &[
100    "httpMethod",
101    "throwExceptionOnFailure",
102    "okStatusCodeRange",
103    "followRedirects",
104    "connectTimeout",
105    "responseTimeout",
106    "allowPrivateIps",
107    "blockedHosts",
108    "maxBodySize",
109];
110
111impl UriConfig for HttpEndpointConfig {
112    /// Returns "http" as the primary scheme (also accepts "https")
113    fn scheme() -> &'static str {
114        "http"
115    }
116
117    fn from_uri(uri: &str) -> Result<Self, CamelError> {
118        let parts = parse_uri(uri)?;
119        Self::from_components(parts)
120    }
121
122    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
123        // Validate scheme - accept both http and https
124        if parts.scheme != "http" && parts.scheme != "https" {
125            return Err(CamelError::InvalidUri(format!(
126                "expected scheme 'http' or 'https', got '{}'",
127                parts.scheme
128            )));
129        }
130
131        // Construct base_url from scheme + path
132        // e.g., "http://localhost:8080/api" from scheme "http" and path "//localhost:8080/api"
133        let base_url = format!("{}:{}", parts.scheme, parts.path);
134
135        let http_method = parts.params.get("httpMethod").cloned();
136
137        let throw_exception_on_failure = parts
138            .params
139            .get("throwExceptionOnFailure")
140            .map(|v| v != "false")
141            .unwrap_or(true);
142
143        // Parse status code range from "start-end" format (e.g., "200-299")
144        let ok_status_code_range = parts
145            .params
146            .get("okStatusCodeRange")
147            .and_then(|v| {
148                let (start, end) = v.split_once('-')?;
149                Some((start.parse::<u16>().ok()?, end.parse::<u16>().ok()?))
150            })
151            .unwrap_or((200, 299));
152
153        let response_timeout = parts
154            .params
155            .get("responseTimeout")
156            .and_then(|v| v.parse::<u64>().ok())
157            .map(Duration::from_millis);
158
159        // SSRF protection settings
160        let allow_private_ips = parts
161            .params
162            .get("allowPrivateIps")
163            .map(|v| v == "true")
164            .unwrap_or(false); // Default: block private IPs
165
166        // Parse comma-separated blocked hosts
167        let blocked_hosts = parts
168            .params
169            .get("blockedHosts")
170            .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
171            .unwrap_or_default();
172
173        let max_body_size = parts
174            .params
175            .get("maxBodySize")
176            .and_then(|v| v.parse::<usize>().ok())
177            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
178
179        // Collect remaining params (not Camel options) as query params
180        let query_params: HashMap<String, String> = parts
181            .params
182            .into_iter()
183            .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
184            .collect();
185
186        Ok(Self {
187            base_url,
188            http_method,
189            throw_exception_on_failure,
190            ok_status_code_range,
191            response_timeout,
192            query_params,
193            allow_private_ips,
194            blocked_hosts,
195            max_body_size,
196        })
197    }
198}
199
200impl HttpEndpointConfig {
201    pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
202        let parts = parse_uri(uri)?;
203        let mut endpoint = Self::from_components(parts.clone())?;
204        if endpoint.response_timeout.is_none() {
205            endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
206        }
207        if !parts.params.contains_key("allowPrivateIps") {
208            endpoint.allow_private_ips = config.allow_private_ips;
209        }
210        if !parts.params.contains_key("blockedHosts") {
211            endpoint.blocked_hosts = config.blocked_hosts.clone();
212        }
213        if !parts.params.contains_key("maxBodySize") {
214            endpoint.max_body_size = config.max_body_size;
215        }
216        Ok(endpoint)
217    }
218}
219
220// ---------------------------------------------------------------------------
221// HttpServerConfig
222// ---------------------------------------------------------------------------
223
224/// Configuration for an HTTP server (consumer) endpoint.
225#[derive(Debug, Clone)]
226pub struct HttpServerConfig {
227    /// Bind address, e.g. "0.0.0.0" or "127.0.0.1".
228    pub host: String,
229    /// TCP port to listen on.
230    pub port: u16,
231    /// URL path this consumer handles, e.g. "/orders".
232    pub path: String,
233    /// Maximum request body size in bytes.
234    pub max_request_body: usize,
235    /// Maximum response body size for materializing streams in bytes.
236    pub max_response_body: usize,
237}
238
239impl UriConfig for HttpServerConfig {
240    /// Returns "http" as the primary scheme (also accepts "https")
241    fn scheme() -> &'static str {
242        "http"
243    }
244
245    fn from_uri(uri: &str) -> Result<Self, CamelError> {
246        let parts = parse_uri(uri)?;
247        Self::from_components(parts)
248    }
249
250    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
251        // Validate scheme - accept both http and https
252        if parts.scheme != "http" && parts.scheme != "https" {
253            return Err(CamelError::InvalidUri(format!(
254                "expected scheme 'http' or 'https', got '{}'",
255                parts.scheme
256            )));
257        }
258
259        // parts.path is everything after the scheme colon, e.g. "//0.0.0.0:8080/orders"
260        // Strip leading "//"
261        let authority_and_path = parts.path.trim_start_matches('/');
262
263        // Split on the first "/" to separate "host:port" from "/path"
264        let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
265            (&authority_and_path[..idx], &authority_and_path[idx..])
266        } else {
267            (authority_and_path, "/")
268        };
269
270        let path = if path_suffix.is_empty() {
271            "/"
272        } else {
273            path_suffix
274        }
275        .to_string();
276
277        // Parse host:port from authority
278        let (host, port) = if let Some(colon) = authority.rfind(':') {
279            let port_str = &authority[colon + 1..];
280            match port_str.parse::<u16>() {
281                Ok(p) => (authority[..colon].to_string(), p),
282                Err(_) => {
283                    return Err(CamelError::InvalidUri(format!(
284                        "invalid port '{}' in authority",
285                        port_str
286                    )));
287                }
288            }
289        } else {
290            // Default port based on scheme: 443 for https, 80 for http
291            let default_port = if parts.scheme == "https" { 443 } else { 80 };
292            (authority.to_string(), default_port)
293        };
294
295        let max_request_body = parts
296            .params
297            .get("maxRequestBody")
298            .and_then(|v| v.parse::<usize>().ok())
299            .unwrap_or(2 * 1024 * 1024); // Default: 2MB
300
301        let max_response_body = parts
302            .params
303            .get("maxResponseBody")
304            .and_then(|v| v.parse::<usize>().ok())
305            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
306
307        Ok(Self {
308            host,
309            port,
310            path,
311            max_request_body,
312            max_response_body,
313        })
314    }
315}
316
317impl HttpServerConfig {
318    pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
319        let parts = parse_uri(uri)?;
320        let mut server = Self::from_components(parts.clone())?;
321        if !parts.params.contains_key("maxRequestBody") {
322            server.max_request_body = config.max_request_body;
323        }
324        if !parts.params.contains_key("maxResponseBody") {
325            server.max_response_body = config.max_body_size;
326        }
327        Ok(server)
328    }
329}
330
331// ---------------------------------------------------------------------------
332// RequestEnvelope / HttpReply
333// ---------------------------------------------------------------------------
334
335/// Body de la respuesta HTTP: bytes ya materializados o stream lazy.
336pub(crate) enum HttpReplyBody {
337    Bytes(bytes::Bytes),
338    Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
339}
340
341/// An inbound HTTP request sent from the Axum dispatch handler to an
342/// `HttpConsumer` receive loop.
343pub(crate) struct RequestEnvelope {
344    pub(crate) method: String,
345    pub(crate) path: String,
346    pub(crate) query: String,
347    pub(crate) headers: http::HeaderMap,
348    pub(crate) body: StreamBody,
349    pub(crate) reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
350}
351
352/// The HTTP response that `HttpConsumer` sends back to the Axum handler.
353pub(crate) struct HttpReply {
354    pub(crate) status: u16,
355    pub(crate) headers: Vec<(String, String)>,
356    pub(crate) body: HttpReplyBody,
357}
358
359// ---------------------------------------------------------------------------
360// DispatchTable / ServerRegistry
361// ---------------------------------------------------------------------------
362
363/// Maps URL path → channel sender for the consumer that owns that path.
364pub(crate) type DispatchTable =
365    Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
366
367/// Handle to a running Axum server on one port.
368struct ServerHandle {
369    dispatch: DispatchTable,
370    /// Kept alive so the task isn't dropped; not used directly.
371    _task: tokio::task::JoinHandle<()>,
372}
373
374/// Process-global registry mapping port → running Axum server handle.
375pub struct ServerRegistry {
376    inner: Mutex<HashMap<u16, Arc<OnceCell<ServerHandle>>>>,
377}
378
379impl ServerRegistry {
380    /// Returns the global singleton.
381    pub fn global() -> &'static Self {
382        static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
383        INSTANCE.get_or_init(|| ServerRegistry {
384            inner: Mutex::new(HashMap::new()),
385        })
386    }
387
388    /// Returns the `DispatchTable` for `port`, spawning a new Axum server if
389    /// none is running on that port yet.
390    pub(crate) async fn get_or_spawn(
391        &'static self,
392        host: &str,
393        port: u16,
394        max_request_body: usize,
395    ) -> Result<DispatchTable, CamelError> {
396        let host_owned = host.to_string();
397
398        let cell = {
399            let mut guard = self.inner.lock().map_err(|_| {
400                CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
401            })?;
402            guard
403                .entry(port)
404                .or_insert_with(|| Arc::new(OnceCell::new()))
405                .clone()
406        };
407
408        let handle = cell
409            .get_or_try_init(|| async {
410                let addr = format!("{host_owned}:{port}");
411                let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
412                    CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
413                })?;
414                let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
415                let task = tokio::spawn(run_axum_server(
416                    listener,
417                    Arc::clone(&dispatch),
418                    max_request_body,
419                ));
420                Ok::<ServerHandle, CamelError>(ServerHandle {
421                    dispatch,
422                    _task: task,
423                })
424            })
425            .await?;
426
427        Ok(Arc::clone(&handle.dispatch))
428    }
429}
430
431// ---------------------------------------------------------------------------
432// Axum server
433// ---------------------------------------------------------------------------
434
435use axum::{
436    Router,
437    body::Body as AxumBody,
438    extract::{Request, State},
439    http::{Response, StatusCode},
440    response::IntoResponse,
441};
442
443#[derive(Clone)]
444struct AppState {
445    dispatch: DispatchTable,
446    max_request_body: usize,
447}
448
449async fn run_axum_server(
450    listener: tokio::net::TcpListener,
451    dispatch: DispatchTable,
452    max_request_body: usize,
453) {
454    let state = AppState {
455        dispatch,
456        max_request_body,
457    };
458    let app = Router::new().fallback(dispatch_handler).with_state(state);
459
460    axum::serve(listener, app).await.unwrap_or_else(|e| {
461        tracing::error!(error = %e, "Axum server error");
462    });
463}
464
465async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
466    let method = req.method().to_string();
467    let path = req.uri().path().to_string();
468    let query = req.uri().query().unwrap_or("").to_string();
469    let headers = req.headers().clone();
470
471    // Check Content-Length against limit BEFORE opening the stream
472    let content_length: Option<u64> = headers
473        .get(http::header::CONTENT_LENGTH)
474        .and_then(|v| v.to_str().ok())
475        .and_then(|s| s.parse().ok());
476
477    if let Some(len) = content_length
478        && len > state.max_request_body as u64
479    {
480        return Response::builder()
481            .status(StatusCode::PAYLOAD_TOO_LARGE)
482            .body(AxumBody::from("Request body exceeds configured limit"))
483            .expect("infallible");
484    }
485
486    // Build StreamBody from Axum body WITHOUT materializing
487    let content_type = headers
488        .get(http::header::CONTENT_TYPE)
489        .and_then(|v| v.to_str().ok())
490        .map(|s| s.to_string());
491
492    let data_stream: BodyDataStream = req.into_body().into_data_stream();
493    let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
494    let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
495
496    let stream_body = StreamBody {
497        stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
498        metadata: StreamMetadata {
499            size_hint: content_length,
500            content_type,
501            origin: None,
502        },
503    };
504
505    // Look up handler for this path
506    let sender = {
507        let table = state.dispatch.read().await;
508        table.get(&path).cloned()
509    };
510    let Some(sender) = sender else {
511        return Response::builder()
512            .status(StatusCode::NOT_FOUND)
513            .body(AxumBody::from("No consumer registered for this path"))
514            .expect("infallible");
515    };
516
517    let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
518    let envelope = RequestEnvelope {
519        method,
520        path,
521        query,
522        headers,
523        body: stream_body,
524        reply_tx,
525    };
526
527    if sender.send(envelope).await.is_err() {
528        return Response::builder()
529            .status(StatusCode::SERVICE_UNAVAILABLE)
530            .body(AxumBody::from("Consumer unavailable"))
531            .expect("infallible");
532    }
533
534    match reply_rx.await {
535        Ok(reply) => {
536            let status =
537                StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
538            let mut builder = Response::builder().status(status);
539            for (k, v) in &reply.headers {
540                builder = builder.header(k.as_str(), v.as_str());
541            }
542            match reply.body {
543                HttpReplyBody::Bytes(b) => builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
544                    Response::builder()
545                        .status(StatusCode::INTERNAL_SERVER_ERROR)
546                        .body(AxumBody::from("Invalid response headers from consumer"))
547                        .expect("infallible")
548                }),
549                HttpReplyBody::Stream(stream) => builder
550                    .body(AxumBody::from_stream(stream))
551                    .unwrap_or_else(|_| {
552                        Response::builder()
553                            .status(StatusCode::INTERNAL_SERVER_ERROR)
554                            .body(AxumBody::from("Invalid response headers from consumer"))
555                            .expect("infallible")
556                    }),
557            }
558        }
559        Err(_) => Response::builder()
560            .status(StatusCode::INTERNAL_SERVER_ERROR)
561            .body(AxumBody::from("Pipeline error"))
562            .expect("Response::builder() with a known-valid status code and body is infallible"),
563    }
564}
565
566// ---------------------------------------------------------------------------
567// HttpConsumer
568// ---------------------------------------------------------------------------
569
570pub struct HttpConsumer {
571    config: HttpServerConfig,
572}
573
574impl HttpConsumer {
575    pub fn new(config: HttpServerConfig) -> Self {
576        Self { config }
577    }
578}
579
580#[async_trait::async_trait]
581impl Consumer for HttpConsumer {
582    async fn start(&mut self, ctx: camel_component::ConsumerContext) -> Result<(), CamelError> {
583        use camel_api::{Exchange, Message, body::Body};
584
585        let dispatch = ServerRegistry::global()
586            .get_or_spawn(
587                &self.config.host,
588                self.config.port,
589                self.config.max_request_body,
590            )
591            .await?;
592
593        // Create a channel for this path and register it
594        let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
595        {
596            let mut table = dispatch.write().await;
597            table.insert(self.config.path.clone(), env_tx);
598        }
599
600        let path = self.config.path.clone();
601        let cancel_token = ctx.cancel_token();
602        let _max_response_body = self.config.max_response_body;
603
604        loop {
605            tokio::select! {
606                _ = ctx.cancelled() => {
607                    break;
608                }
609                envelope = env_rx.recv() => {
610                    let Some(envelope) = envelope else { break; };
611
612                    // Build Exchange from HTTP request
613                    let mut msg = Message::default();
614
615                    // Set standard Camel HTTP headers
616                    msg.set_header("CamelHttpMethod",
617                        serde_json::Value::String(envelope.method.clone()));
618                    msg.set_header("CamelHttpPath",
619                        serde_json::Value::String(envelope.path.clone()));
620                    msg.set_header("CamelHttpQuery",
621                        serde_json::Value::String(envelope.query.clone()));
622
623                    // Forward HTTP headers (skip pseudo-headers)
624                    for (k, v) in &envelope.headers {
625                        if let Ok(val_str) = v.to_str() {
626                            msg.set_header(
627                                k.as_str(),
628                                serde_json::Value::String(val_str.to_string()),
629                            );
630                        }
631                    }
632
633                    // Body: always arrives as Body::Stream (native streaming)
634                    // Routes can call into_bytes() if they need to materialize
635                    msg.body = Body::Stream(envelope.body);
636
637                    #[allow(unused_mut)]
638                    let mut exchange = Exchange::new(msg);
639
640                    // Extract W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
641                    #[cfg(feature = "otel")]
642                    {
643                        let headers: HashMap<String, String> = envelope
644                            .headers
645                            .iter()
646                            .filter_map(|(k, v)| {
647                                Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
648                            })
649                            .collect();
650                        camel_otel::extract_into_exchange(&mut exchange, &headers);
651                    }
652
653                    let reply_tx = envelope.reply_tx;
654                    let sender = ctx.sender().clone();
655                    let path_clone = path.clone();
656                    let cancel = cancel_token.clone();
657
658                    // Spawn a task to handle this request concurrently
659                    //
660                    // NOTE: This spawns a separate tokio task for each incoming HTTP request to enable
661                    // true concurrent request processing. This change was introduced as part of the
662                    // pipeline concurrency feature and was NOT part of the original HttpConsumer design.
663                    //
664                    // Rationale:
665                    // 1. Without spawning per-request tasks, the send_and_wait() operation would block
666                    //    the consumer's main loop until the pipeline processing completes
667                    // 2. This blocking would prevent multiple HTTP requests from being processed
668                    //    concurrently, even when ConcurrencyModel::Concurrent is enabled on the pipeline
669                    // 3. The channel would never have multiple exchanges buffered simultaneously,
670                    //    defeating the purpose of pipeline-side concurrency
671                    // 4. By spawning a task per request, we allow the consumer loop to continue
672                    //    accepting new requests while existing ones are processed in the pipeline
673                    //
674                    // This approach effectively decouples request acceptance from pipeline processing,
675                    // allowing the channel to buffer multiple exchanges that can be processed concurrently
676                    // by the pipeline when ConcurrencyModel::Concurrent is active.
677                    tokio::spawn(async move {
678                        // Check for cancellation before sending to pipeline.
679                        // Returns 503 (Service Unavailable) instead of letting the request
680                        // enter a shutting-down pipeline. This is a behavioral change from
681                        // the pre-concurrency implementation where cancellation during
682                        // processing would result in a 500 (Internal Server Error).
683                        // 503 is more semantically correct: the server is temporarily
684                        // unable to handle the request due to shutdown.
685                        if cancel.is_cancelled() {
686                            let _ = reply_tx.send(HttpReply {
687                                status: 503,
688                                headers: vec![],
689                                body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
690                            });
691                            return;
692                        }
693
694                        // Send through pipeline and await result
695                        let (tx, rx) = tokio::sync::oneshot::channel();
696                        let envelope = camel_component::consumer::ExchangeEnvelope {
697                            exchange,
698                            reply_tx: Some(tx),
699                        };
700
701                        let result = match sender.send(envelope).await {
702                            Ok(()) => rx.await.map_err(|_| camel_api::CamelError::ChannelClosed),
703                            Err(_) => Err(camel_api::CamelError::ChannelClosed),
704                        }
705                        .and_then(|r| r);
706
707                        let reply = match result {
708                            Ok(out) => {
709                                let status = out
710                                    .input
711                                    .header("CamelHttpResponseCode")
712                                    .and_then(|v| v.as_u64())
713                                    .map(|s| s as u16)
714                                    .unwrap_or(200);
715
716                                let reply_body: HttpReplyBody = match out.input.body {
717                                    Body::Empty => HttpReplyBody::Bytes(bytes::Bytes::new()),
718                                    Body::Bytes(b) => HttpReplyBody::Bytes(b),
719                                    Body::Text(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
720                                    Body::Xml(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
721                                    Body::Json(v) => HttpReplyBody::Bytes(bytes::Bytes::from(
722                                        v.to_string().into_bytes(),
723                                    )),
724                                    Body::Stream(s) => {
725                                        match s.stream.lock().await.take() {
726                                            Some(stream) => HttpReplyBody::Stream(stream),
727                                            None => {
728                                                tracing::error!(
729                                                    "Body::Stream already consumed before HTTP reply — returning 500"
730                                                );
731                                                let error_reply = HttpReply {
732                                                    status: 500,
733                                                    headers: vec![],
734                                                    body: HttpReplyBody::Bytes(bytes::Bytes::new()),
735                                                };
736                                                if reply_tx.send(error_reply).is_err() {
737                                                    debug!("reply_tx dropped before error reply could be sent");
738                                                }
739                                                return;
740                                            }
741                                        }
742                                    }
743                                };
744
745                                let resp_headers: Vec<(String, String)> = out
746                                    .input
747                                    .headers
748                                    .iter()
749                                    // Filter Camel internal headers
750                                    .filter(|(k, _)| !k.starts_with("Camel"))
751                                    // Filter hop-by-hop and request-only headers
752                                    // Based on Apache Camel's HttpUtil.addCommonFilters()
753                                    .filter(|(k, _)| {
754                                        !matches!(
755                                            k.to_lowercase().as_str(),
756                                            // RFC 2616 Section 4.5 - General headers
757                                            "content-length" |      // Auto-calculated by framework
758                                            "content-type" |        // Auto-calculated from body
759                                            "transfer-encoding" |   // Hop-by-hop
760                                            "connection" |          // Hop-by-hop
761                                            "cache-control" |       // Hop-by-hop
762                                            "date" |                // Auto-generated
763                                            "pragma" |              // Hop-by-hop
764                                            "trailer" |             // Hop-by-hop
765                                            "upgrade" |             // Hop-by-hop
766                                            "via" |                 // Hop-by-hop
767                                            "warning" |             // Hop-by-hop
768                                            // Request-only headers
769                                            "host" |                // Request-only
770                                            "user-agent" |          // Request-only
771                                            "accept" |              // Request-only
772                                            "accept-encoding" |     // Request-only
773                                            "accept-language" |     // Request-only
774                                            "accept-charset" |      // Request-only
775                                            "authorization" |       // Request-only (security)
776                                            "proxy-authorization" | // Request-only (security)
777                                            "cookie" |              // Request-only
778                                            "expect" |              // Request-only
779                                            "from" |                // Request-only
780                                            "if-match" |            // Request-only
781                                            "if-modified-since" |   // Request-only
782                                            "if-none-match" |       // Request-only
783                                            "if-range" |            // Request-only
784                                            "if-unmodified-since" | // Request-only
785                                            "max-forwards" |        // Request-only
786                                            "proxy-connection" |    // Request-only
787                                            "range" |               // Request-only
788                                            "referer" |             // Request-only
789                                            "te"                    // Request-only
790                                        )
791                                    })
792                                    .filter_map(|(k, v)| {
793                                        v.as_str().map(|s| (k.clone(), s.to_string()))
794                                    })
795                                    .collect();
796
797                                HttpReply {
798                                    status,
799                                    headers: resp_headers,
800                                    body: reply_body,
801                                }
802                            }
803                            Err(e) => {
804                                tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
805                                HttpReply {
806                                    status: 500,
807                                    headers: vec![],
808                                    body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
809                                }
810                            }
811                        };
812
813                        // Reply to Axum handler (ignore error if client disconnected)
814                        let _ = reply_tx.send(reply);
815                    });
816                }
817            }
818        }
819
820        // Deregister this path
821        {
822            let mut table = dispatch.write().await;
823            table.remove(&path);
824        }
825
826        Ok(())
827    }
828
829    async fn stop(&mut self) -> Result<(), CamelError> {
830        Ok(())
831    }
832
833    fn concurrency_model(&self) -> camel_component::ConcurrencyModel {
834        camel_component::ConcurrencyModel::Concurrent { max: None }
835    }
836}
837
838// ---------------------------------------------------------------------------
839// HttpComponent / HttpsComponent
840// ---------------------------------------------------------------------------
841
842pub struct HttpComponent {
843    client: reqwest::Client,
844    config: HttpConfig,
845}
846
847fn build_client(config: &HttpConfig) -> reqwest::Client {
848    let mut builder = reqwest::Client::builder()
849        .connect_timeout(Duration::from_millis(config.connect_timeout_ms))
850        .pool_max_idle_per_host(config.pool_max_idle_per_host)
851        .pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
852
853    if !config.follow_redirects {
854        builder = builder.redirect(reqwest::redirect::Policy::none());
855    }
856
857    builder
858        .build()
859        .expect("reqwest::Client::build() with valid config should not fail")
860}
861
862impl HttpComponent {
863    pub fn new() -> Self {
864        let config = HttpConfig::default();
865        let client = build_client(&config);
866        Self { client, config }
867    }
868
869    pub fn with_config(config: HttpConfig) -> Self {
870        let client = build_client(&config);
871        Self { client, config }
872    }
873
874    pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
875        match config {
876            Some(cfg) => Self::with_config(cfg),
877            None => Self::new(),
878        }
879    }
880}
881
882impl Default for HttpComponent {
883    fn default() -> Self {
884        Self::new()
885    }
886}
887
888impl Component for HttpComponent {
889    fn scheme(&self) -> &str {
890        "http"
891    }
892
893    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
894        let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
895        let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
896        Ok(Box::new(HttpEndpoint {
897            uri: uri.to_string(),
898            config,
899            server_config,
900            client: self.client.clone(),
901        }))
902    }
903}
904
905pub struct HttpsComponent {
906    client: reqwest::Client,
907    config: HttpConfig,
908}
909
910impl HttpsComponent {
911    pub fn new() -> Self {
912        let config = HttpConfig::default();
913        let client = build_client(&config);
914        Self { client, config }
915    }
916
917    pub fn with_config(config: HttpConfig) -> Self {
918        let client = build_client(&config);
919        Self { client, config }
920    }
921
922    pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
923        match config {
924            Some(cfg) => Self::with_config(cfg),
925            None => Self::new(),
926        }
927    }
928}
929
930impl Default for HttpsComponent {
931    fn default() -> Self {
932        Self::new()
933    }
934}
935
936impl Component for HttpsComponent {
937    fn scheme(&self) -> &str {
938        "https"
939    }
940
941    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
942        let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
943        let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
944        Ok(Box::new(HttpEndpoint {
945            uri: uri.to_string(),
946            config,
947            server_config,
948            client: self.client.clone(),
949        }))
950    }
951}
952
953// ---------------------------------------------------------------------------
954// HttpEndpoint
955// ---------------------------------------------------------------------------
956
957struct HttpEndpoint {
958    uri: String,
959    config: HttpEndpointConfig,
960    server_config: HttpServerConfig,
961    client: reqwest::Client,
962}
963
964impl Endpoint for HttpEndpoint {
965    fn uri(&self) -> &str {
966        &self.uri
967    }
968
969    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
970        Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
971    }
972
973    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
974        Ok(BoxProcessor::new(HttpProducer {
975            config: Arc::new(self.config.clone()),
976            client: self.client.clone(),
977        }))
978    }
979}
980
981// ---------------------------------------------------------------------------
982// SSRF Protection
983// ---------------------------------------------------------------------------
984
985fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
986    let parsed = url::Url::parse(url)
987        .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
988
989    // Check blocked hosts
990    if let Some(host) = parsed.host_str()
991        && config.blocked_hosts.iter().any(|blocked| host == blocked)
992    {
993        return Err(CamelError::ProcessorError(format!(
994            "Host '{}' is blocked",
995            host
996        )));
997    }
998
999    // Check private IPs if not allowed
1000    if !config.allow_private_ips
1001        && let Some(host) = parsed.host()
1002    {
1003        match host {
1004            url::Host::Ipv4(ip) => {
1005                if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
1006                    return Err(CamelError::ProcessorError(format!(
1007                        "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
1008                        ip
1009                    )));
1010                }
1011            }
1012            url::Host::Ipv6(ip) => {
1013                if ip.is_loopback() {
1014                    return Err(CamelError::ProcessorError(format!(
1015                        "Loopback IP '{}' not allowed",
1016                        ip
1017                    )));
1018                }
1019            }
1020            url::Host::Domain(domain) => {
1021                // Block common internal domains
1022                let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
1023                if blocked_domains.contains(&domain) {
1024                    return Err(CamelError::ProcessorError(format!(
1025                        "Domain '{}' is not allowed",
1026                        domain
1027                    )));
1028                }
1029            }
1030        }
1031    }
1032
1033    Ok(())
1034}
1035
1036// ---------------------------------------------------------------------------
1037// HttpProducer
1038// ---------------------------------------------------------------------------
1039
1040#[derive(Clone)]
1041struct HttpProducer {
1042    config: Arc<HttpEndpointConfig>,
1043    client: reqwest::Client,
1044}
1045
1046impl HttpProducer {
1047    fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1048        if let Some(ref method) = config.http_method {
1049            return method.to_uppercase();
1050        }
1051        if let Some(method) = exchange
1052            .input
1053            .header("CamelHttpMethod")
1054            .and_then(|v| v.as_str())
1055        {
1056            return method.to_uppercase();
1057        }
1058        if !exchange.input.body.is_empty() {
1059            return "POST".to_string();
1060        }
1061        "GET".to_string()
1062    }
1063
1064    fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1065        if let Some(uri) = exchange
1066            .input
1067            .header("CamelHttpUri")
1068            .and_then(|v| v.as_str())
1069        {
1070            let mut url = uri.to_string();
1071            if let Some(path) = exchange
1072                .input
1073                .header("CamelHttpPath")
1074                .and_then(|v| v.as_str())
1075            {
1076                if !url.ends_with('/') && !path.starts_with('/') {
1077                    url.push('/');
1078                }
1079                url.push_str(path);
1080            }
1081            if let Some(query) = exchange
1082                .input
1083                .header("CamelHttpQuery")
1084                .and_then(|v| v.as_str())
1085            {
1086                url.push('?');
1087                url.push_str(query);
1088            }
1089            return url;
1090        }
1091
1092        let mut url = config.base_url.clone();
1093
1094        if let Some(path) = exchange
1095            .input
1096            .header("CamelHttpPath")
1097            .and_then(|v| v.as_str())
1098        {
1099            if !url.ends_with('/') && !path.starts_with('/') {
1100                url.push('/');
1101            }
1102            url.push_str(path);
1103        }
1104
1105        if let Some(query) = exchange
1106            .input
1107            .header("CamelHttpQuery")
1108            .and_then(|v| v.as_str())
1109        {
1110            url.push('?');
1111            url.push_str(query);
1112        } else if !config.query_params.is_empty() {
1113            // Forward non-Camel query params from config
1114            url.push('?');
1115            let query_string: String = config
1116                .query_params
1117                .iter()
1118                .map(|(k, v)| format!("{k}={v}"))
1119                .collect::<Vec<_>>()
1120                .join("&");
1121            url.push_str(&query_string);
1122        }
1123
1124        url
1125    }
1126
1127    fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1128        status >= range.0 && status <= range.1
1129    }
1130}
1131
1132impl Service<Exchange> for HttpProducer {
1133    type Response = Exchange;
1134    type Error = CamelError;
1135    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1136
1137    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1138        Poll::Ready(Ok(()))
1139    }
1140
1141    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1142        let config = self.config.clone();
1143        let client = self.client.clone();
1144
1145        Box::pin(async move {
1146            let method_str = HttpProducer::resolve_method(&exchange, &config);
1147            let url = HttpProducer::resolve_url(&exchange, &config);
1148
1149            // SECURITY: Validate URL for SSRF
1150            validate_url_for_ssrf(&url, &config)?;
1151
1152            debug!(
1153                correlation_id = %exchange.correlation_id(),
1154                method = %method_str,
1155                url = %url,
1156                "HTTP request"
1157            );
1158
1159            let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1160                CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1161            })?;
1162
1163            let mut request = client.request(method, &url);
1164
1165            if let Some(timeout) = config.response_timeout {
1166                request = request.timeout(timeout);
1167            }
1168
1169            // Inject W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
1170            #[cfg(feature = "otel")]
1171            {
1172                let mut otel_headers = HashMap::new();
1173                camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1174                for (k, v) in otel_headers {
1175                    if let (Ok(name), Ok(val)) = (
1176                        reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1177                        reqwest::header::HeaderValue::from_str(&v),
1178                    ) {
1179                        request = request.header(name, val);
1180                    }
1181                }
1182            }
1183
1184            for (key, value) in &exchange.input.headers {
1185                if !key.starts_with("Camel")
1186                    && let Some(val_str) = value.as_str()
1187                    && let (Ok(name), Ok(val)) = (
1188                        reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1189                        reqwest::header::HeaderValue::from_str(val_str),
1190                    )
1191                {
1192                    request = request.header(name, val);
1193                }
1194            }
1195
1196            match exchange.input.body {
1197                Body::Stream(ref s) => {
1198                    let mut stream_lock = s.stream.lock().await;
1199                    if let Some(stream) = stream_lock.take() {
1200                        request = request.body(reqwest::Body::wrap_stream(stream));
1201                    } else {
1202                        return Err(CamelError::AlreadyConsumed);
1203                    }
1204                }
1205                _ => {
1206                    // For other types, materialize with configured limit
1207                    let body = std::mem::take(&mut exchange.input.body);
1208                    let bytes = body.into_bytes(config.max_body_size).await?;
1209                    if !bytes.is_empty() {
1210                        request = request.body(bytes);
1211                    }
1212                }
1213            }
1214
1215            let response = request
1216                .send()
1217                .await
1218                .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1219
1220            let status_code = response.status().as_u16();
1221            let status_text = response
1222                .status()
1223                .canonical_reason()
1224                .unwrap_or("Unknown")
1225                .to_string();
1226
1227            for (key, value) in response.headers() {
1228                if let Ok(val_str) = value.to_str() {
1229                    exchange
1230                        .input
1231                        .set_header(key.as_str(), serde_json::Value::String(val_str.to_string()));
1232                }
1233            }
1234
1235            exchange.input.set_header(
1236                "CamelHttpResponseCode",
1237                serde_json::Value::Number(status_code.into()),
1238            );
1239            exchange.input.set_header(
1240                "CamelHttpResponseText",
1241                serde_json::Value::String(status_text.clone()),
1242            );
1243
1244            let response_body = response.bytes().await.map_err(|e| {
1245                CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1246            })?;
1247
1248            if config.throw_exception_on_failure
1249                && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1250            {
1251                return Err(CamelError::HttpOperationFailed {
1252                    method: method_str,
1253                    url,
1254                    status_code,
1255                    status_text,
1256                    response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1257                });
1258            }
1259
1260            if !response_body.is_empty() {
1261                exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1262            }
1263
1264            debug!(
1265                correlation_id = %exchange.correlation_id(),
1266                status = status_code,
1267                url = %url,
1268                "HTTP response"
1269            );
1270            Ok(exchange)
1271        })
1272    }
1273}
1274
1275#[cfg(test)]
1276mod tests {
1277    use super::*;
1278    use camel_api::Message;
1279    use std::sync::Arc;
1280    use std::time::Duration;
1281
1282    fn test_producer_ctx() -> ProducerContext {
1283        ProducerContext::new()
1284    }
1285
1286    #[test]
1287    fn test_http_config_defaults() {
1288        let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1289        assert_eq!(config.base_url, "http://localhost:8080/api");
1290        assert!(config.http_method.is_none());
1291        assert!(config.throw_exception_on_failure);
1292        assert_eq!(config.ok_status_code_range, (200, 299));
1293        assert!(config.response_timeout.is_none());
1294    }
1295
1296    #[test]
1297    fn test_http_config_scheme() {
1298        // UriConfig trait method returns "http" as primary scheme
1299        assert_eq!(HttpEndpointConfig::scheme(), "http");
1300    }
1301
1302    #[test]
1303    fn test_http_config_from_components() {
1304        // Test from_components directly (trait method)
1305        let components = camel_endpoint::UriComponents {
1306            scheme: "https".to_string(),
1307            path: "//api.example.com/v1".to_string(),
1308            params: std::collections::HashMap::from([(
1309                "httpMethod".to_string(),
1310                "POST".to_string(),
1311            )]),
1312        };
1313        let config = HttpEndpointConfig::from_components(components).unwrap();
1314        assert_eq!(config.base_url, "https://api.example.com/v1");
1315        assert_eq!(config.http_method, Some("POST".to_string()));
1316    }
1317
1318    #[test]
1319    fn test_http_config_with_options() {
1320        let config = HttpEndpointConfig::from_uri(
1321            "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1322        ).unwrap();
1323        assert_eq!(config.base_url, "https://api.example.com/v1");
1324        assert_eq!(config.http_method, Some("PUT".to_string()));
1325        assert!(!config.throw_exception_on_failure);
1326        assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1327    }
1328
1329    #[test]
1330    fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
1331        let config = HttpConfig::default()
1332            .with_response_timeout_ms(999)
1333            .with_allow_private_ips(true)
1334            .with_blocked_hosts(vec!["evil.com".to_string()])
1335            .with_max_body_size(12345);
1336        let endpoint =
1337            HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
1338        assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
1339        assert!(endpoint.allow_private_ips);
1340        assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
1341        assert_eq!(endpoint.max_body_size, 12345);
1342    }
1343
1344    #[test]
1345    fn test_from_uri_with_defaults_uri_overrides_config() {
1346        let config = HttpConfig::default()
1347            .with_response_timeout_ms(999)
1348            .with_allow_private_ips(true)
1349            .with_blocked_hosts(vec!["evil.com".to_string()])
1350            .with_max_body_size(12345);
1351        let endpoint = HttpEndpointConfig::from_uri_with_defaults(
1352            "http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
1353            &config,
1354        )
1355        .unwrap();
1356        assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
1357        assert!(!endpoint.allow_private_ips);
1358        assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
1359        assert_eq!(endpoint.max_body_size, 99);
1360    }
1361
1362    #[test]
1363    fn test_http_config_ok_status_range() {
1364        let config =
1365            HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1366        assert_eq!(config.ok_status_code_range, (200, 204));
1367    }
1368
1369    #[test]
1370    fn test_http_config_wrong_scheme() {
1371        let result = HttpEndpointConfig::from_uri("file:/tmp");
1372        assert!(result.is_err());
1373    }
1374
1375    #[test]
1376    fn test_http_component_scheme() {
1377        let component = HttpComponent::new();
1378        assert_eq!(component.scheme(), "http");
1379    }
1380
1381    #[test]
1382    fn test_https_component_scheme() {
1383        let component = HttpsComponent::new();
1384        assert_eq!(component.scheme(), "https");
1385    }
1386
1387    #[test]
1388    fn test_http_endpoint_creates_consumer() {
1389        let component = HttpComponent::new();
1390        let endpoint = component
1391            .create_endpoint("http://0.0.0.0:19100/test")
1392            .unwrap();
1393        assert!(endpoint.create_consumer().is_ok());
1394    }
1395
1396    #[test]
1397    fn test_https_endpoint_creates_consumer() {
1398        let component = HttpsComponent::new();
1399        let endpoint = component
1400            .create_endpoint("https://0.0.0.0:8443/test")
1401            .unwrap();
1402        assert!(endpoint.create_consumer().is_ok());
1403    }
1404
1405    #[test]
1406    fn test_http_endpoint_creates_producer() {
1407        let ctx = test_producer_ctx();
1408        let component = HttpComponent::new();
1409        let endpoint = component.create_endpoint("http://localhost/api").unwrap();
1410        assert!(endpoint.create_producer(&ctx).is_ok());
1411    }
1412
1413    // -----------------------------------------------------------------------
1414    // Producer tests
1415    // -----------------------------------------------------------------------
1416
1417    async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
1418        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1419        let addr = listener.local_addr().unwrap();
1420        let url = format!("http://127.0.0.1:{}", addr.port());
1421
1422        let handle = tokio::spawn(async move {
1423            loop {
1424                if let Ok((mut stream, _)) = listener.accept().await {
1425                    tokio::spawn(async move {
1426                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1427                        let mut buf = vec![0u8; 4096];
1428                        let n = stream.read(&mut buf).await.unwrap_or(0);
1429                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
1430
1431                        let method = request.split_whitespace().next().unwrap_or("GET");
1432
1433                        let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
1434                        let response = format!(
1435                            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
1436                            body.len(),
1437                            body
1438                        );
1439                        let _ = stream.write_all(response.as_bytes()).await;
1440                    });
1441                }
1442            }
1443        });
1444
1445        (url, handle)
1446    }
1447
1448    async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
1449        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1450        let addr = listener.local_addr().unwrap();
1451        let url = format!("http://127.0.0.1:{}", addr.port());
1452
1453        let handle = tokio::spawn(async move {
1454            loop {
1455                if let Ok((mut stream, _)) = listener.accept().await {
1456                    let status = status;
1457                    tokio::spawn(async move {
1458                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1459                        let mut buf = vec![0u8; 4096];
1460                        let _ = stream.read(&mut buf).await;
1461
1462                        let status_text = match status {
1463                            404 => "Not Found",
1464                            500 => "Internal Server Error",
1465                            _ => "Error",
1466                        };
1467                        let body = "error body";
1468                        let response = format!(
1469                            "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
1470                            status,
1471                            status_text,
1472                            body.len(),
1473                            body
1474                        );
1475                        let _ = stream.write_all(response.as_bytes()).await;
1476                    });
1477                }
1478            }
1479        });
1480
1481        (url, handle)
1482    }
1483
1484    #[tokio::test]
1485    async fn test_http_producer_get_request() {
1486        use tower::ServiceExt;
1487
1488        let (url, _handle) = start_test_server().await;
1489        let ctx = test_producer_ctx();
1490
1491        let component = HttpComponent::new();
1492        let endpoint = component
1493            .create_endpoint(&format!("{url}/api/test?allowPrivateIps=true"))
1494            .unwrap();
1495        let producer = endpoint.create_producer(&ctx).unwrap();
1496
1497        let exchange = Exchange::new(Message::default());
1498        let result = producer.oneshot(exchange).await.unwrap();
1499
1500        let status = result
1501            .input
1502            .header("CamelHttpResponseCode")
1503            .and_then(|v| v.as_u64())
1504            .unwrap();
1505        assert_eq!(status, 200);
1506
1507        assert!(!result.input.body.is_empty());
1508    }
1509
1510    #[tokio::test]
1511    async fn test_http_producer_post_with_body() {
1512        use tower::ServiceExt;
1513
1514        let (url, _handle) = start_test_server().await;
1515        let ctx = test_producer_ctx();
1516
1517        let component = HttpComponent::new();
1518        let endpoint = component
1519            .create_endpoint(&format!("{url}/api/data?allowPrivateIps=true"))
1520            .unwrap();
1521        let producer = endpoint.create_producer(&ctx).unwrap();
1522
1523        let exchange = Exchange::new(Message::new("request body"));
1524        let result = producer.oneshot(exchange).await.unwrap();
1525
1526        let status = result
1527            .input
1528            .header("CamelHttpResponseCode")
1529            .and_then(|v| v.as_u64())
1530            .unwrap();
1531        assert_eq!(status, 200);
1532    }
1533
1534    #[tokio::test]
1535    async fn test_http_producer_method_from_header() {
1536        use tower::ServiceExt;
1537
1538        let (url, _handle) = start_test_server().await;
1539        let ctx = test_producer_ctx();
1540
1541        let component = HttpComponent::new();
1542        let endpoint = component
1543            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1544            .unwrap();
1545        let producer = endpoint.create_producer(&ctx).unwrap();
1546
1547        let mut exchange = Exchange::new(Message::default());
1548        exchange.input.set_header(
1549            "CamelHttpMethod",
1550            serde_json::Value::String("DELETE".to_string()),
1551        );
1552
1553        let result = producer.oneshot(exchange).await.unwrap();
1554        let status = result
1555            .input
1556            .header("CamelHttpResponseCode")
1557            .and_then(|v| v.as_u64())
1558            .unwrap();
1559        assert_eq!(status, 200);
1560    }
1561
1562    #[tokio::test]
1563    async fn test_http_producer_forced_method() {
1564        use tower::ServiceExt;
1565
1566        let (url, _handle) = start_test_server().await;
1567        let ctx = test_producer_ctx();
1568
1569        let component = HttpComponent::new();
1570        let endpoint = component
1571            .create_endpoint(&format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"))
1572            .unwrap();
1573        let producer = endpoint.create_producer(&ctx).unwrap();
1574
1575        let exchange = Exchange::new(Message::default());
1576        let result = producer.oneshot(exchange).await.unwrap();
1577
1578        let status = result
1579            .input
1580            .header("CamelHttpResponseCode")
1581            .and_then(|v| v.as_u64())
1582            .unwrap();
1583        assert_eq!(status, 200);
1584    }
1585
1586    #[tokio::test]
1587    async fn test_http_producer_throw_exception_on_failure() {
1588        use tower::ServiceExt;
1589
1590        let (url, _handle) = start_status_server(404).await;
1591        let ctx = test_producer_ctx();
1592
1593        let component = HttpComponent::new();
1594        let endpoint = component
1595            .create_endpoint(&format!("{url}/not-found?allowPrivateIps=true"))
1596            .unwrap();
1597        let producer = endpoint.create_producer(&ctx).unwrap();
1598
1599        let exchange = Exchange::new(Message::default());
1600        let result = producer.oneshot(exchange).await;
1601        assert!(result.is_err());
1602
1603        match result.unwrap_err() {
1604            CamelError::HttpOperationFailed { status_code, .. } => {
1605                assert_eq!(status_code, 404);
1606            }
1607            e => panic!("Expected HttpOperationFailed, got: {e}"),
1608        }
1609    }
1610
1611    #[tokio::test]
1612    async fn test_http_producer_no_throw_on_failure() {
1613        use tower::ServiceExt;
1614
1615        let (url, _handle) = start_status_server(500).await;
1616        let ctx = test_producer_ctx();
1617
1618        let component = HttpComponent::new();
1619        let endpoint = component
1620            .create_endpoint(&format!(
1621                "{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"
1622            ))
1623            .unwrap();
1624        let producer = endpoint.create_producer(&ctx).unwrap();
1625
1626        let exchange = Exchange::new(Message::default());
1627        let result = producer.oneshot(exchange).await.unwrap();
1628
1629        let status = result
1630            .input
1631            .header("CamelHttpResponseCode")
1632            .and_then(|v| v.as_u64())
1633            .unwrap();
1634        assert_eq!(status, 500);
1635    }
1636
1637    #[tokio::test]
1638    async fn test_http_producer_uri_override() {
1639        use tower::ServiceExt;
1640
1641        let (url, _handle) = start_test_server().await;
1642        let ctx = test_producer_ctx();
1643
1644        let component = HttpComponent::new();
1645        let endpoint = component
1646            .create_endpoint("http://localhost:1/does-not-exist?allowPrivateIps=true")
1647            .unwrap();
1648        let producer = endpoint.create_producer(&ctx).unwrap();
1649
1650        let mut exchange = Exchange::new(Message::default());
1651        exchange.input.set_header(
1652            "CamelHttpUri",
1653            serde_json::Value::String(format!("{url}/api")),
1654        );
1655
1656        let result = producer.oneshot(exchange).await.unwrap();
1657        let status = result
1658            .input
1659            .header("CamelHttpResponseCode")
1660            .and_then(|v| v.as_u64())
1661            .unwrap();
1662        assert_eq!(status, 200);
1663    }
1664
1665    #[tokio::test]
1666    async fn test_http_producer_response_headers_mapped() {
1667        use tower::ServiceExt;
1668
1669        let (url, _handle) = start_test_server().await;
1670        let ctx = test_producer_ctx();
1671
1672        let component = HttpComponent::new();
1673        let endpoint = component
1674            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1675            .unwrap();
1676        let producer = endpoint.create_producer(&ctx).unwrap();
1677
1678        let exchange = Exchange::new(Message::default());
1679        let result = producer.oneshot(exchange).await.unwrap();
1680
1681        assert!(
1682            result.input.header("content-type").is_some()
1683                || result.input.header("Content-Type").is_some()
1684        );
1685        assert!(result.input.header("CamelHttpResponseText").is_some());
1686    }
1687
1688    // -----------------------------------------------------------------------
1689    // Bug fix tests: Client configuration per-endpoint
1690    // -----------------------------------------------------------------------
1691
1692    async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
1693        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1694        let addr = listener.local_addr().unwrap();
1695        let url = format!("http://127.0.0.1:{}", addr.port());
1696
1697        let handle = tokio::spawn(async move {
1698            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1699            loop {
1700                if let Ok((mut stream, _)) = listener.accept().await {
1701                    tokio::spawn(async move {
1702                        let mut buf = vec![0u8; 4096];
1703                        let n = stream.read(&mut buf).await.unwrap_or(0);
1704                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
1705
1706                        // Check if this is a request to /final
1707                        if request.contains("GET /final") {
1708                            let body = r#"{"status":"final"}"#;
1709                            let response = format!(
1710                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1711                                body.len(),
1712                                body
1713                            );
1714                            let _ = stream.write_all(response.as_bytes()).await;
1715                        } else {
1716                            // Redirect to /final
1717                            let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
1718                            let _ = stream.write_all(response.as_bytes()).await;
1719                        }
1720                    });
1721                }
1722            }
1723        });
1724
1725        (url, handle)
1726    }
1727
1728    #[tokio::test]
1729    async fn test_follow_redirects_false_does_not_follow() {
1730        use tower::ServiceExt;
1731
1732        let (url, _handle) = start_redirect_server().await;
1733        let ctx = test_producer_ctx();
1734
1735        let component =
1736            HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
1737        let endpoint = component
1738            .create_endpoint(&format!(
1739                "{url}?throwExceptionOnFailure=false&allowPrivateIps=true"
1740            ))
1741            .unwrap();
1742        let producer = endpoint.create_producer(&ctx).unwrap();
1743
1744        let exchange = Exchange::new(Message::default());
1745        let result = producer.oneshot(exchange).await.unwrap();
1746
1747        // Should get 302, NOT follow redirect to 200
1748        let status = result
1749            .input
1750            .header("CamelHttpResponseCode")
1751            .and_then(|v| v.as_u64())
1752            .unwrap();
1753        assert_eq!(
1754            status, 302,
1755            "Should NOT follow redirect when followRedirects=false"
1756        );
1757    }
1758
1759    #[tokio::test]
1760    async fn test_follow_redirects_true_follows_redirect() {
1761        use tower::ServiceExt;
1762
1763        let (url, _handle) = start_redirect_server().await;
1764        let ctx = test_producer_ctx();
1765
1766        let component =
1767            HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
1768        let endpoint = component
1769            .create_endpoint(&format!("{url}?allowPrivateIps=true"))
1770            .unwrap();
1771        let producer = endpoint.create_producer(&ctx).unwrap();
1772
1773        let exchange = Exchange::new(Message::default());
1774        let result = producer.oneshot(exchange).await.unwrap();
1775
1776        // Should follow redirect and get 200
1777        let status = result
1778            .input
1779            .header("CamelHttpResponseCode")
1780            .and_then(|v| v.as_u64())
1781            .unwrap();
1782        assert_eq!(
1783            status, 200,
1784            "Should follow redirect when followRedirects=true"
1785        );
1786    }
1787
1788    #[tokio::test]
1789    async fn test_query_params_forwarded_to_http_request() {
1790        use tower::ServiceExt;
1791
1792        let (url, _handle) = start_test_server().await;
1793        let ctx = test_producer_ctx();
1794
1795        let component = HttpComponent::new();
1796        // apiKey is NOT a Camel option, should be forwarded as query param
1797        let endpoint = component
1798            .create_endpoint(&format!(
1799                "{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"
1800            ))
1801            .unwrap();
1802        let producer = endpoint.create_producer(&ctx).unwrap();
1803
1804        let exchange = Exchange::new(Message::default());
1805        let result = producer.oneshot(exchange).await.unwrap();
1806
1807        // The test server returns the request info in response
1808        // We just verify it succeeds (the query param was sent)
1809        let status = result
1810            .input
1811            .header("CamelHttpResponseCode")
1812            .and_then(|v| v.as_u64())
1813            .unwrap();
1814        assert_eq!(status, 200);
1815    }
1816
1817    #[tokio::test]
1818    async fn test_non_camel_query_params_are_forwarded() {
1819        // This test verifies Bug #3 fix: non-Camel options should be forwarded
1820        // We'll test the config parsing, not the actual HTTP call
1821        let config = HttpEndpointConfig::from_uri(
1822            "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
1823        )
1824        .unwrap();
1825
1826        // apiKey and token are NOT Camel options, should be forwarded
1827        assert!(
1828            config.query_params.contains_key("apiKey"),
1829            "apiKey should be preserved"
1830        );
1831        assert!(
1832            config.query_params.contains_key("token"),
1833            "token should be preserved"
1834        );
1835        assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
1836        assert_eq!(config.query_params.get("token").unwrap(), "abc456");
1837
1838        // httpMethod IS a Camel option, should NOT be in query_params
1839        assert!(
1840            !config.query_params.contains_key("httpMethod"),
1841            "httpMethod should not be forwarded"
1842        );
1843    }
1844
1845    // -----------------------------------------------------------------------
1846    // SSRF Protection tests
1847    // -----------------------------------------------------------------------
1848
1849    #[tokio::test]
1850    async fn test_http_producer_blocks_metadata_endpoint() {
1851        use tower::ServiceExt;
1852
1853        let ctx = test_producer_ctx();
1854        let component = HttpComponent::new();
1855        let endpoint = component
1856            .create_endpoint("http://example.com/api?allowPrivateIps=false")
1857            .unwrap();
1858        let producer = endpoint.create_producer(&ctx).unwrap();
1859
1860        let mut exchange = Exchange::new(Message::default());
1861        exchange.input.set_header(
1862            "CamelHttpUri",
1863            serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
1864        );
1865
1866        let result = producer.oneshot(exchange).await;
1867        assert!(result.is_err(), "Should block AWS metadata endpoint");
1868
1869        let err = result.unwrap_err();
1870        assert!(
1871            err.to_string().contains("Private IP"),
1872            "Error should mention private IP blocking, got: {}",
1873            err
1874        );
1875    }
1876
1877    #[test]
1878    fn test_ssrf_config_defaults() {
1879        let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
1880        assert!(
1881            !config.allow_private_ips,
1882            "Private IPs should be blocked by default"
1883        );
1884        assert!(
1885            config.blocked_hosts.is_empty(),
1886            "Blocked hosts should be empty by default"
1887        );
1888    }
1889
1890    #[test]
1891    fn test_ssrf_config_allow_private_ips() {
1892        let config =
1893            HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
1894        assert!(
1895            config.allow_private_ips,
1896            "Private IPs should be allowed when explicitly set"
1897        );
1898    }
1899
1900    #[test]
1901    fn test_ssrf_config_blocked_hosts() {
1902        let config = HttpEndpointConfig::from_uri(
1903            "http://example.com/api?blockedHosts=evil.com,malware.net",
1904        )
1905        .unwrap();
1906        assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
1907    }
1908
1909    #[tokio::test]
1910    async fn test_http_producer_blocks_localhost() {
1911        use tower::ServiceExt;
1912
1913        let ctx = test_producer_ctx();
1914        let component = HttpComponent::new();
1915        let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1916        let producer = endpoint.create_producer(&ctx).unwrap();
1917
1918        let mut exchange = Exchange::new(Message::default());
1919        exchange.input.set_header(
1920            "CamelHttpUri",
1921            serde_json::Value::String("http://localhost:8080/internal".to_string()),
1922        );
1923
1924        let result = producer.oneshot(exchange).await;
1925        assert!(result.is_err(), "Should block localhost");
1926    }
1927
1928    #[tokio::test]
1929    async fn test_http_producer_blocks_loopback_ip() {
1930        use tower::ServiceExt;
1931
1932        let ctx = test_producer_ctx();
1933        let component = HttpComponent::new();
1934        let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1935        let producer = endpoint.create_producer(&ctx).unwrap();
1936
1937        let mut exchange = Exchange::new(Message::default());
1938        exchange.input.set_header(
1939            "CamelHttpUri",
1940            serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
1941        );
1942
1943        let result = producer.oneshot(exchange).await;
1944        assert!(result.is_err(), "Should block loopback IP");
1945    }
1946
1947    #[tokio::test]
1948    async fn test_http_producer_allows_private_ip_when_enabled() {
1949        use tower::ServiceExt;
1950
1951        let ctx = test_producer_ctx();
1952        let component = HttpComponent::new();
1953        // With allowPrivateIps=true, the validation should pass
1954        // (actual connection will fail, but that's expected)
1955        let endpoint = component
1956            .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true")
1957            .unwrap();
1958        let producer = endpoint.create_producer(&ctx).unwrap();
1959
1960        let exchange = Exchange::new(Message::default());
1961
1962        // The request will fail because we can't connect, but it should NOT fail
1963        // due to SSRF protection
1964        let result = producer.oneshot(exchange).await;
1965        // We expect connection error, not SSRF error
1966        if let Err(ref e) = result {
1967            let err_str = e.to_string();
1968            assert!(
1969                !err_str.contains("Private IP") && !err_str.contains("not allowed"),
1970                "Should not be SSRF error, got: {}",
1971                err_str
1972            );
1973        }
1974    }
1975
1976    // -----------------------------------------------------------------------
1977    // HttpServerConfig tests
1978    // -----------------------------------------------------------------------
1979
1980    #[test]
1981    fn test_http_server_config_parse() {
1982        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
1983        assert_eq!(cfg.host, "0.0.0.0");
1984        assert_eq!(cfg.port, 8080);
1985        assert_eq!(cfg.path, "/orders");
1986    }
1987
1988    #[test]
1989    fn test_http_server_config_scheme() {
1990        // UriConfig trait method returns "http" as primary scheme
1991        assert_eq!(HttpServerConfig::scheme(), "http");
1992    }
1993
1994    #[test]
1995    fn test_http_server_config_from_components() {
1996        // Test from_components directly (trait method)
1997        let components = camel_endpoint::UriComponents {
1998            scheme: "https".to_string(),
1999            path: "//0.0.0.0:8443/api".to_string(),
2000            params: std::collections::HashMap::from([(
2001                "maxRequestBody".to_string(),
2002                "5242880".to_string(),
2003            )]),
2004        };
2005        let cfg = HttpServerConfig::from_components(components).unwrap();
2006        assert_eq!(cfg.host, "0.0.0.0");
2007        assert_eq!(cfg.port, 8443);
2008        assert_eq!(cfg.path, "/api");
2009        assert_eq!(cfg.max_request_body, 5242880);
2010    }
2011
2012    #[test]
2013    fn test_http_server_config_default_path() {
2014        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
2015        assert_eq!(cfg.path, "/");
2016    }
2017
2018    #[test]
2019    fn test_http_server_config_wrong_scheme() {
2020        assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
2021    }
2022
2023    #[test]
2024    fn test_http_server_config_invalid_port() {
2025        assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
2026    }
2027
2028    #[test]
2029    fn test_http_server_config_default_port_by_scheme() {
2030        // HTTP without explicit port should default to 80
2031        let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
2032        assert_eq!(cfg_http.port, 80);
2033
2034        // HTTPS without explicit port should default to 443
2035        let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
2036        assert_eq!(cfg_https.port, 443);
2037    }
2038
2039    #[test]
2040    fn test_request_envelope_and_reply_are_send() {
2041        fn assert_send<T: Send>() {}
2042        assert_send::<RequestEnvelope>();
2043        assert_send::<HttpReply>();
2044    }
2045
2046    // -----------------------------------------------------------------------
2047    // ServerRegistry tests
2048    // -----------------------------------------------------------------------
2049
2050    #[test]
2051    fn test_server_registry_global_is_singleton() {
2052        let r1 = ServerRegistry::global();
2053        let r2 = ServerRegistry::global();
2054        assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
2055    }
2056
2057    #[tokio::test]
2058    async fn test_concurrent_get_or_spawn_returns_same_dispatch() {
2059        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2060        let port = listener.local_addr().unwrap().port();
2061        drop(listener);
2062
2063        let results: Arc<std::sync::Mutex<Vec<DispatchTable>>> =
2064            Arc::new(std::sync::Mutex::new(Vec::new()));
2065
2066        let mut handles = Vec::new();
2067        for _ in 0..4 {
2068            let results = results.clone();
2069            handles.push(tokio::spawn(async move {
2070                let dispatch = ServerRegistry::global()
2071                    .get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024)
2072                    .await
2073                    .unwrap();
2074                results.lock().unwrap().push(dispatch);
2075            }));
2076        }
2077
2078        for h in handles {
2079            h.await.unwrap();
2080        }
2081
2082        let dispatches = results.lock().unwrap();
2083        assert_eq!(dispatches.len(), 4);
2084        for i in 1..dispatches.len() {
2085            assert!(
2086                Arc::ptr_eq(&dispatches[0], &dispatches[i]),
2087                "all concurrent callers should get the same dispatch table"
2088            );
2089        }
2090    }
2091
2092    // -----------------------------------------------------------------------
2093    // Axum dispatch handler tests
2094    // -----------------------------------------------------------------------
2095
2096    #[tokio::test]
2097    async fn test_dispatch_handler_returns_404_for_unknown_path() {
2098        let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
2099        // Nothing registered in the dispatch table
2100        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2101        let port = listener.local_addr().unwrap().port();
2102        tokio::spawn(run_axum_server(listener, dispatch, 2 * 1024 * 1024));
2103
2104        // Wait for server to start
2105        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2106
2107        let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
2108            .await
2109            .unwrap();
2110        assert_eq!(resp.status().as_u16(), 404);
2111    }
2112
2113    // -----------------------------------------------------------------------
2114    // HttpConsumer tests
2115    // -----------------------------------------------------------------------
2116
2117    #[tokio::test]
2118    async fn test_http_consumer_start_registers_path() {
2119        use camel_component::ConsumerContext;
2120
2121        // Get an OS-assigned free port
2122        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2123        let port = listener.local_addr().unwrap().port();
2124        drop(listener); // Release port — ServerRegistry will rebind it
2125
2126        let consumer_cfg = HttpServerConfig {
2127            host: "127.0.0.1".to_string(),
2128            port,
2129            path: "/ping".to_string(),
2130            max_request_body: 2 * 1024 * 1024,
2131            max_response_body: 10 * 1024 * 1024,
2132        };
2133        let mut consumer = HttpConsumer::new(consumer_cfg);
2134
2135        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component::ExchangeEnvelope>(16);
2136        let token = tokio_util::sync::CancellationToken::new();
2137        let ctx = ConsumerContext::new(tx, token.clone());
2138
2139        tokio::spawn(async move {
2140            consumer.start(ctx).await.unwrap();
2141        });
2142
2143        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2144
2145        let client = reqwest::Client::new();
2146        let resp_future = client
2147            .post(format!("http://127.0.0.1:{port}/ping"))
2148            .body("hello world")
2149            .send();
2150
2151        let (http_result, _) = tokio::join!(resp_future, async {
2152            if let Some(mut envelope) = rx.recv().await {
2153                // Set a custom status code
2154                envelope.exchange.input.set_header(
2155                    "CamelHttpResponseCode",
2156                    serde_json::Value::Number(201.into()),
2157                );
2158                if let Some(reply_tx) = envelope.reply_tx {
2159                    let _ = reply_tx.send(Ok(envelope.exchange));
2160                }
2161            }
2162        });
2163
2164        let resp = http_result.unwrap();
2165        assert_eq!(resp.status().as_u16(), 201);
2166
2167        token.cancel();
2168    }
2169
2170    // -----------------------------------------------------------------------
2171    // Integration tests
2172    // -----------------------------------------------------------------------
2173
2174    #[tokio::test]
2175    async fn test_integration_single_consumer_round_trip() {
2176        use camel_component::{ConsumerContext, ExchangeEnvelope};
2177
2178        // Get an OS-assigned free port (ephemeral)
2179        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2180        let port = listener.local_addr().unwrap().port();
2181        drop(listener); // Release — ServerRegistry will rebind
2182
2183        let component = HttpComponent::new();
2184        let endpoint = component
2185            .create_endpoint(&format!("http://127.0.0.1:{port}/echo"))
2186            .unwrap();
2187        let mut consumer = endpoint.create_consumer().unwrap();
2188
2189        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2190        let token = tokio_util::sync::CancellationToken::new();
2191        let ctx = ConsumerContext::new(tx, token.clone());
2192
2193        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2194        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2195
2196        let client = reqwest::Client::new();
2197        let send_fut = client
2198            .post(format!("http://127.0.0.1:{port}/echo"))
2199            .header("Content-Type", "text/plain")
2200            .body("ping")
2201            .send();
2202
2203        let (http_result, _) = tokio::join!(send_fut, async {
2204            if let Some(mut envelope) = rx.recv().await {
2205                assert_eq!(
2206                    envelope.exchange.input.header("CamelHttpMethod"),
2207                    Some(&serde_json::Value::String("POST".into()))
2208                );
2209                assert_eq!(
2210                    envelope.exchange.input.header("CamelHttpPath"),
2211                    Some(&serde_json::Value::String("/echo".into()))
2212                );
2213                envelope.exchange.input.body = camel_api::body::Body::Text("pong".to_string());
2214                if let Some(reply_tx) = envelope.reply_tx {
2215                    let _ = reply_tx.send(Ok(envelope.exchange));
2216                }
2217            }
2218        });
2219
2220        let resp = http_result.unwrap();
2221        assert_eq!(resp.status().as_u16(), 200);
2222        let body = resp.text().await.unwrap();
2223        assert_eq!(body, "pong");
2224
2225        token.cancel();
2226    }
2227
2228    #[tokio::test]
2229    async fn test_integration_two_consumers_shared_port() {
2230        use camel_component::{ConsumerContext, ExchangeEnvelope};
2231
2232        // Get an OS-assigned free port (ephemeral)
2233        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2234        let port = listener.local_addr().unwrap().port();
2235        drop(listener);
2236
2237        let component = HttpComponent::new();
2238
2239        // Consumer A: /hello
2240        let endpoint_a = component
2241            .create_endpoint(&format!("http://127.0.0.1:{port}/hello"))
2242            .unwrap();
2243        let mut consumer_a = endpoint_a.create_consumer().unwrap();
2244
2245        // Consumer B: /world
2246        let endpoint_b = component
2247            .create_endpoint(&format!("http://127.0.0.1:{port}/world"))
2248            .unwrap();
2249        let mut consumer_b = endpoint_b.create_consumer().unwrap();
2250
2251        let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2252        let token_a = tokio_util::sync::CancellationToken::new();
2253        let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
2254
2255        let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2256        let token_b = tokio_util::sync::CancellationToken::new();
2257        let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
2258
2259        tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
2260        tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
2261        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2262
2263        let client = reqwest::Client::new();
2264
2265        // Request to /hello
2266        let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
2267        let (resp_hello, _) = tokio::join!(fut_hello, async {
2268            if let Some(mut envelope) = rx_a.recv().await {
2269                envelope.exchange.input.body =
2270                    camel_api::body::Body::Text("hello-response".to_string());
2271                if let Some(reply_tx) = envelope.reply_tx {
2272                    let _ = reply_tx.send(Ok(envelope.exchange));
2273                }
2274            }
2275        });
2276
2277        // Request to /world
2278        let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
2279        let (resp_world, _) = tokio::join!(fut_world, async {
2280            if let Some(mut envelope) = rx_b.recv().await {
2281                envelope.exchange.input.body =
2282                    camel_api::body::Body::Text("world-response".to_string());
2283                if let Some(reply_tx) = envelope.reply_tx {
2284                    let _ = reply_tx.send(Ok(envelope.exchange));
2285                }
2286            }
2287        });
2288
2289        let body_a = resp_hello.unwrap().text().await.unwrap();
2290        let body_b = resp_world.unwrap().text().await.unwrap();
2291
2292        assert_eq!(body_a, "hello-response");
2293        assert_eq!(body_b, "world-response");
2294
2295        token_a.cancel();
2296        token_b.cancel();
2297    }
2298
2299    #[tokio::test]
2300    async fn test_integration_unregistered_path_returns_404() {
2301        use camel_component::{ConsumerContext, ExchangeEnvelope};
2302
2303        // Get an OS-assigned free port (ephemeral)
2304        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2305        let port = listener.local_addr().unwrap().port();
2306        drop(listener);
2307
2308        let component = HttpComponent::new();
2309        let endpoint = component
2310            .create_endpoint(&format!("http://127.0.0.1:{port}/registered"))
2311            .unwrap();
2312        let mut consumer = endpoint.create_consumer().unwrap();
2313
2314        let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2315        let token = tokio_util::sync::CancellationToken::new();
2316        let ctx = ConsumerContext::new(tx, token.clone());
2317
2318        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2319        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2320
2321        let client = reqwest::Client::new();
2322        let resp = client
2323            .get(format!("http://127.0.0.1:{port}/not-there"))
2324            .send()
2325            .await
2326            .unwrap();
2327        assert_eq!(resp.status().as_u16(), 404);
2328
2329        token.cancel();
2330    }
2331
2332    #[test]
2333    fn test_http_consumer_declares_concurrent() {
2334        use camel_component::ConcurrencyModel;
2335
2336        let config = HttpServerConfig {
2337            host: "127.0.0.1".to_string(),
2338            port: 19999,
2339            path: "/test".to_string(),
2340            max_request_body: 2 * 1024 * 1024,
2341            max_response_body: 10 * 1024 * 1024,
2342        };
2343        let consumer = HttpConsumer::new(config);
2344        assert_eq!(
2345            consumer.concurrency_model(),
2346            ConcurrencyModel::Concurrent { max: None }
2347        );
2348    }
2349
2350    // -----------------------------------------------------------------------
2351    // HttpReplyBody streaming tests
2352    // -----------------------------------------------------------------------
2353
2354    #[tokio::test]
2355    async fn test_http_reply_body_stream_variant_exists() {
2356        use bytes::Bytes;
2357        use camel_api::CamelError;
2358        use futures::stream;
2359
2360        let chunks: Vec<Result<Bytes, CamelError>> =
2361            vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
2362        let stream = Box::pin(stream::iter(chunks));
2363        let reply_body = HttpReplyBody::Stream(stream);
2364        // Si compila y el match funciona, el test pasa
2365        match reply_body {
2366            HttpReplyBody::Stream(_) => {}
2367            HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
2368        }
2369    }
2370
2371    // -----------------------------------------------------------------------
2372    // OpenTelemetry propagation tests (only compiled with "otel" feature)
2373    // -----------------------------------------------------------------------
2374
2375    #[cfg(feature = "otel")]
2376    mod otel_tests {
2377        use super::*;
2378        use camel_api::Message;
2379        use tower::ServiceExt;
2380
2381        #[tokio::test]
2382        async fn test_producer_injects_traceparent_header() {
2383            let (url, _handle) = start_test_server_with_header_capture().await;
2384            let ctx = test_producer_ctx();
2385
2386            let component = HttpComponent::new();
2387            let endpoint = component
2388                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
2389                .unwrap();
2390            let producer = endpoint.create_producer(&ctx).unwrap();
2391
2392            // Create exchange with an OTel context by extracting from a traceparent header
2393            let mut exchange = Exchange::new(Message::default());
2394            let mut headers = std::collections::HashMap::new();
2395            headers.insert(
2396                "traceparent".to_string(),
2397                "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
2398            );
2399            camel_otel::extract_into_exchange(&mut exchange, &headers);
2400
2401            let result = producer.oneshot(exchange).await.unwrap();
2402
2403            // Verify request succeeded
2404            let status = result
2405                .input
2406                .header("CamelHttpResponseCode")
2407                .and_then(|v| v.as_u64())
2408                .unwrap();
2409            assert_eq!(status, 200);
2410
2411            // The test server echoes back the received traceparent header
2412            let traceparent = result.input.header("x-received-traceparent");
2413            assert!(
2414                traceparent.is_some(),
2415                "traceparent header should have been sent"
2416            );
2417
2418            let traceparent_str = traceparent.unwrap().as_str().unwrap();
2419            // Verify format: version-traceid-spanid-flags
2420            let parts: Vec<&str> = traceparent_str.split('-').collect();
2421            assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2422            assert_eq!(parts[0], "00", "version should be 00");
2423            assert_eq!(
2424                parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2425                "trace-id should match"
2426            );
2427            assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
2428            assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
2429        }
2430
2431        #[tokio::test]
2432        async fn test_consumer_extracts_traceparent_header() {
2433            use camel_component::{ConsumerContext, ExchangeEnvelope};
2434
2435            // Get an OS-assigned free port
2436            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2437            let port = listener.local_addr().unwrap().port();
2438            drop(listener);
2439
2440            let component = HttpComponent::new();
2441            let endpoint = component
2442                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"))
2443                .unwrap();
2444            let mut consumer = endpoint.create_consumer().unwrap();
2445
2446            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2447            let token = tokio_util::sync::CancellationToken::new();
2448            let ctx = ConsumerContext::new(tx, token.clone());
2449
2450            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2451            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2452
2453            // Send request with traceparent header
2454            let client = reqwest::Client::new();
2455            let send_fut = client
2456                .post(format!("http://127.0.0.1:{port}/trace"))
2457                .header(
2458                    "traceparent",
2459                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2460                )
2461                .body("test")
2462                .send();
2463
2464            let (http_result, _) = tokio::join!(send_fut, async {
2465                if let Some(envelope) = rx.recv().await {
2466                    // Verify the exchange has a valid OTel context by re-injecting it
2467                    // and checking the traceparent matches
2468                    let mut injected_headers = std::collections::HashMap::new();
2469                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2470
2471                    assert!(
2472                        injected_headers.contains_key("traceparent"),
2473                        "Exchange should have traceparent after extraction"
2474                    );
2475
2476                    let traceparent = injected_headers.get("traceparent").unwrap();
2477                    let parts: Vec<&str> = traceparent.split('-').collect();
2478                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2479                    assert_eq!(
2480                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2481                        "Trace ID should match the original traceparent header"
2482                    );
2483
2484                    if let Some(reply_tx) = envelope.reply_tx {
2485                        let _ = reply_tx.send(Ok(envelope.exchange));
2486                    }
2487                }
2488            });
2489
2490            let resp = http_result.unwrap();
2491            assert_eq!(resp.status().as_u16(), 200);
2492
2493            token.cancel();
2494        }
2495
2496        #[tokio::test]
2497        async fn test_consumer_extracts_mixed_case_traceparent_header() {
2498            use camel_component::{ConsumerContext, ExchangeEnvelope};
2499
2500            // Get an OS-assigned free port
2501            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2502            let port = listener.local_addr().unwrap().port();
2503            drop(listener);
2504
2505            let component = HttpComponent::new();
2506            let endpoint = component
2507                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"))
2508                .unwrap();
2509            let mut consumer = endpoint.create_consumer().unwrap();
2510
2511            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2512            let token = tokio_util::sync::CancellationToken::new();
2513            let ctx = ConsumerContext::new(tx, token.clone());
2514
2515            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2516            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2517
2518            // Send request with MIXED-CASE TraceParent header (not lowercase)
2519            let client = reqwest::Client::new();
2520            let send_fut = client
2521                .post(format!("http://127.0.0.1:{port}/trace"))
2522                .header(
2523                    "TraceParent",
2524                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2525                )
2526                .body("test")
2527                .send();
2528
2529            let (http_result, _) = tokio::join!(send_fut, async {
2530                if let Some(envelope) = rx.recv().await {
2531                    // Verify the exchange has a valid OTel context by re-injecting it
2532                    // and checking the traceparent matches
2533                    let mut injected_headers = HashMap::new();
2534                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2535
2536                    assert!(
2537                        injected_headers.contains_key("traceparent"),
2538                        "Exchange should have traceparent after extraction from mixed-case header"
2539                    );
2540
2541                    let traceparent = injected_headers.get("traceparent").unwrap();
2542                    let parts: Vec<&str> = traceparent.split('-').collect();
2543                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2544                    assert_eq!(
2545                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2546                        "Trace ID should match the original mixed-case TraceParent header"
2547                    );
2548
2549                    if let Some(reply_tx) = envelope.reply_tx {
2550                        let _ = reply_tx.send(Ok(envelope.exchange));
2551                    }
2552                }
2553            });
2554
2555            let resp = http_result.unwrap();
2556            assert_eq!(resp.status().as_u16(), 200);
2557
2558            token.cancel();
2559        }
2560
2561        #[tokio::test]
2562        async fn test_producer_no_trace_context_no_crash() {
2563            let (url, _handle) = start_test_server().await;
2564            let ctx = test_producer_ctx();
2565
2566            let component = HttpComponent::new();
2567            let endpoint = component
2568                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
2569                .unwrap();
2570            let producer = endpoint.create_producer(&ctx).unwrap();
2571
2572            // Create exchange with default (empty) otel_context - no trace context
2573            let exchange = Exchange::new(Message::default());
2574
2575            // Should succeed without panic
2576            let result = producer.oneshot(exchange).await.unwrap();
2577
2578            // Verify request succeeded
2579            let status = result
2580                .input
2581                .header("CamelHttpResponseCode")
2582                .and_then(|v| v.as_u64())
2583                .unwrap();
2584            assert_eq!(status, 200);
2585        }
2586
2587        /// Test server that captures and echoes back the traceparent header
2588        async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
2589            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2590            let addr = listener.local_addr().unwrap();
2591            let url = format!("http://127.0.0.1:{}", addr.port());
2592
2593            let handle = tokio::spawn(async move {
2594                loop {
2595                    if let Ok((mut stream, _)) = listener.accept().await {
2596                        tokio::spawn(async move {
2597                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
2598                            let mut buf = vec![0u8; 8192];
2599                            let n = stream.read(&mut buf).await.unwrap_or(0);
2600                            let request = String::from_utf8_lossy(&buf[..n]).to_string();
2601
2602                            // Extract traceparent header from request
2603                            let traceparent = request
2604                                .lines()
2605                                .find(|line| line.to_lowercase().starts_with("traceparent:"))
2606                                .map(|line| {
2607                                    line.split(':')
2608                                        .nth(1)
2609                                        .map(|s| s.trim().to_string())
2610                                        .unwrap_or_default()
2611                                })
2612                                .unwrap_or_default();
2613
2614                            let body =
2615                                format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
2616                            let response = format!(
2617                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
2618                                body.len(),
2619                                traceparent,
2620                                body
2621                            );
2622                            let _ = stream.write_all(response.as_bytes()).await;
2623                        });
2624                    }
2625                }
2626            });
2627
2628            (url, handle)
2629        }
2630    }
2631
2632    // -----------------------------------------------------------------------
2633    // Response streaming tests (Eje A - Task 2)
2634    // -----------------------------------------------------------------------
2635
2636    // -----------------------------------------------------------------------
2637    // Request streaming tests (Eje B - Task 3)
2638    // -----------------------------------------------------------------------
2639
2640    #[tokio::test]
2641    async fn test_request_body_arrives_as_stream() {
2642        use camel_api::body::Body;
2643        use camel_component::{ConsumerContext, ExchangeEnvelope};
2644
2645        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2646        let port = listener.local_addr().unwrap().port();
2647        drop(listener);
2648
2649        let component = HttpComponent::new();
2650        let endpoint = component
2651            .create_endpoint(&format!("http://127.0.0.1:{port}/upload"))
2652            .unwrap();
2653        let mut consumer = endpoint.create_consumer().unwrap();
2654
2655        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2656        let token = tokio_util::sync::CancellationToken::new();
2657        let ctx = ConsumerContext::new(tx, token.clone());
2658
2659        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2660        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2661
2662        let client = reqwest::Client::new();
2663        let send_fut = client
2664            .post(format!("http://127.0.0.1:{port}/upload"))
2665            .body("hello streaming world")
2666            .send();
2667
2668        let (http_result, _) = tokio::join!(send_fut, async {
2669            if let Some(mut envelope) = rx.recv().await {
2670                // Body must be Body::Stream, not Body::Text or Body::Bytes
2671                assert!(
2672                    matches!(envelope.exchange.input.body, Body::Stream(_)),
2673                    "expected Body::Stream, got discriminant {:?}",
2674                    std::mem::discriminant(&envelope.exchange.input.body)
2675                );
2676                // Materialize to verify content
2677                let bytes = envelope
2678                    .exchange
2679                    .input
2680                    .body
2681                    .into_bytes(1024 * 1024)
2682                    .await
2683                    .unwrap();
2684                assert_eq!(&bytes[..], b"hello streaming world");
2685
2686                envelope.exchange.input.body = camel_api::body::Body::Empty;
2687                if let Some(reply_tx) = envelope.reply_tx {
2688                    let _ = reply_tx.send(Ok(envelope.exchange));
2689                }
2690            }
2691        });
2692
2693        let resp = http_result.unwrap();
2694        assert_eq!(resp.status().as_u16(), 200);
2695
2696        token.cancel();
2697    }
2698
2699    // -----------------------------------------------------------------------
2700    // Response streaming tests (Eje A - Task 2)
2701    // -----------------------------------------------------------------------
2702
2703    #[tokio::test]
2704    async fn test_streaming_response_chunked() {
2705        use bytes::Bytes;
2706        use camel_api::CamelError;
2707        use camel_api::body::{Body, StreamBody, StreamMetadata};
2708        use camel_component::{ConsumerContext, ExchangeEnvelope};
2709        use futures::stream;
2710        use std::sync::Arc;
2711        use tokio::sync::Mutex;
2712
2713        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2714        let port = listener.local_addr().unwrap().port();
2715        drop(listener);
2716
2717        let component = HttpComponent::new();
2718        let endpoint = component
2719            .create_endpoint(&format!("http://127.0.0.1:{port}/stream"))
2720            .unwrap();
2721        let mut consumer = endpoint.create_consumer().unwrap();
2722
2723        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2724        let token = tokio_util::sync::CancellationToken::new();
2725        let ctx = ConsumerContext::new(tx, token.clone());
2726
2727        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2728        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2729
2730        let client = reqwest::Client::new();
2731        let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
2732
2733        let (http_result, _) = tokio::join!(send_fut, async {
2734            if let Some(mut envelope) = rx.recv().await {
2735                // Respond with Body::Stream
2736                let chunks: Vec<Result<Bytes, CamelError>> =
2737                    vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
2738                let stream = Box::pin(stream::iter(chunks));
2739                envelope.exchange.input.body = Body::Stream(StreamBody {
2740                    stream: Arc::new(Mutex::new(Some(stream))),
2741                    metadata: StreamMetadata::default(),
2742                });
2743                if let Some(reply_tx) = envelope.reply_tx {
2744                    let _ = reply_tx.send(Ok(envelope.exchange));
2745                }
2746            }
2747        });
2748
2749        let resp = http_result.unwrap();
2750        assert_eq!(resp.status().as_u16(), 200);
2751        let body = resp.text().await.unwrap();
2752        assert_eq!(body, "chunk1chunk2");
2753
2754        token.cancel();
2755    }
2756
2757    // -----------------------------------------------------------------------
2758    // 413 Content-Length limit test (Task 4)
2759    // -----------------------------------------------------------------------
2760
2761    #[tokio::test]
2762    async fn test_413_when_content_length_exceeds_limit() {
2763        use camel_component::ConsumerContext;
2764
2765        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2766        let port = listener.local_addr().unwrap().port();
2767        drop(listener);
2768
2769        // maxRequestBody=100 — any request declaring more than 100 bytes must get 413
2770        let component = HttpComponent::new();
2771        let endpoint = component
2772            .create_endpoint(&format!(
2773                "http://127.0.0.1:{port}/upload?maxRequestBody=100"
2774            ))
2775            .unwrap();
2776        let mut consumer = endpoint.create_consumer().unwrap();
2777
2778        let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component::ExchangeEnvelope>(16);
2779        let token = tokio_util::sync::CancellationToken::new();
2780        let ctx = ConsumerContext::new(tx, token.clone());
2781
2782        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2783        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2784
2785        let client = reqwest::Client::new();
2786        let resp = client
2787            .post(format!("http://127.0.0.1:{port}/upload"))
2788            .header("Content-Length", "1000") // declares 1000 bytes, limit is 100
2789            .body("x".repeat(1000))
2790            .send()
2791            .await
2792            .unwrap();
2793
2794        assert_eq!(resp.status().as_u16(), 413);
2795
2796        token.cancel();
2797    }
2798
2799    /// Chunked upload without Content-Length header must NOT be rejected by maxRequestBody.
2800    /// The spec says: "If there is no Content-Length, the limit does not apply at the
2801    /// consumer level — the route is responsible."
2802    #[tokio::test]
2803    async fn test_chunked_upload_without_content_length_bypasses_limit() {
2804        use bytes::Bytes;
2805        use camel_api::body::Body;
2806        use camel_component::ConsumerContext;
2807        use futures::stream;
2808
2809        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2810        let port = listener.local_addr().unwrap().port();
2811        drop(listener);
2812
2813        // maxRequestBody=10 — very small limit; chunked uploads have no Content-Length
2814        let component = HttpComponent::new();
2815        let endpoint = component
2816            .create_endpoint(&format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"))
2817            .unwrap();
2818        let mut consumer = endpoint.create_consumer().unwrap();
2819
2820        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component::ExchangeEnvelope>(16);
2821        let token = tokio_util::sync::CancellationToken::new();
2822        let ctx = ConsumerContext::new(tx, token.clone());
2823
2824        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2825        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2826
2827        let client = reqwest::Client::new();
2828
2829        // Use wrap_stream so reqwest sends chunked transfer encoding WITHOUT a
2830        // Content-Length header. 100 bytes exceeds the 10-byte maxRequestBody limit,
2831        // but since there's no Content-Length the 413 check must NOT fire.
2832        let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
2833            Ok(Bytes::from("y".repeat(50))),
2834            Ok(Bytes::from("y".repeat(50))),
2835        ];
2836        let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
2837        let send_fut = client
2838            .post(format!("http://127.0.0.1:{port}/upload"))
2839            .body(stream_body)
2840            .send();
2841
2842        let consumer_fut = async {
2843            // Use timeout to avoid deadlock if the handler rejects before enqueueing
2844            match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
2845                Ok(Some(mut envelope)) => {
2846                    assert!(
2847                        matches!(envelope.exchange.input.body, Body::Stream(_)),
2848                        "expected Body::Stream"
2849                    );
2850                    envelope.exchange.input.body = camel_api::body::Body::Empty;
2851                    if let Some(reply_tx) = envelope.reply_tx {
2852                        let _ = reply_tx.send(Ok(envelope.exchange));
2853                    }
2854                }
2855                Ok(None) => panic!("consumer channel closed unexpectedly"),
2856                Err(_) => {
2857                    // Timeout: the request was rejected before reaching the consumer.
2858                    // The HTTP response will carry the real status code (we check below).
2859                }
2860            }
2861        };
2862
2863        let (http_result, _) = tokio::join!(send_fut, consumer_fut);
2864
2865        let resp = http_result.unwrap();
2866        // Must NOT be 413; chunked uploads without Content-Length bypass the limit.
2867        assert_ne!(
2868            resp.status().as_u16(),
2869            413,
2870            "chunked upload must not be rejected by maxRequestBody"
2871        );
2872        assert_eq!(resp.status().as_u16(), 200);
2873
2874        token.cancel();
2875    }
2876}