Skip to main content

camel_component_http/
lib.rs

1pub mod config;
2pub use config::HttpConfig;
3
4use std::collections::HashMap;
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::{Arc, Mutex, OnceLock};
8use std::task::{Context, Poll};
9use std::time::Duration;
10
11use tokio::sync::{OnceCell, RwLock};
12use tower::Service;
13use tracing::debug;
14
15use axum::body::BodyDataStream;
16use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, StreamBody, StreamMetadata};
17use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
18use camel_component_api::{UriComponents, UriConfig, parse_uri};
19use futures::TryStreamExt;
20use futures::stream::BoxStream;
21
22// ---------------------------------------------------------------------------
23// HttpEndpointConfig
24// ---------------------------------------------------------------------------
25
26/// Configuration for an HTTP client (producer) endpoint.
27///
28/// # Memory Limits
29///
30/// HTTP operations enforce conservative memory limits to prevent denial-of-service
31/// attacks from untrusted network sources. These limits are significantly lower than
32/// file component limits (100MB) because HTTP typically handles API responses rather
33/// than large file transfers, and clients may be untrusted.
34///
35/// ## Default Limits
36///
37/// - **HTTP client body**: 10MB (typical API responses)
38/// - **HTTP server request**: 2MB (untrusted network input - see `HttpServerConfig`)
39/// - **HTTP server response**: 10MB (same as client - see `HttpServerConfig`)
40///
41/// ## Rationale
42///
43/// The 10MB limit for HTTP client responses is appropriate for most API interactions
44/// while providing protection against:
45/// - Malicious servers sending oversized responses
46/// - Runaway processes generating unexpectedly large payloads
47/// - Memory exhaustion attacks
48///
49/// The 2MB server request limit is even more conservative because it handles input
50/// from potentially untrusted clients on the public internet.
51///
52/// ## Overriding Limits
53///
54/// Override the default client body limit using the `maxBodySize` URI parameter:
55///
56/// ```text
57/// http://api.example.com/large-data?maxBodySize=52428800
58/// ```
59///
60/// For server endpoints, use `maxRequestBody` and `maxResponseBody` parameters:
61///
62/// ```text
63/// http://0.0.0.0:8080/upload?maxRequestBody=52428800
64/// ```
65///
66/// ## Behavior When Exceeded
67///
68/// When a body exceeds the configured limit:
69/// - An error is returned immediately
70/// - No memory is exhausted - the limit is checked before allocation
71/// - The HTTP connection is terminated cleanly
72///
73/// ## Security Considerations
74///
75/// HTTP endpoints should be treated with more caution than file endpoints because:
76/// - Clients may be unknown and untrusted
77/// - Network traffic can be spoofed or malicious
78/// - DoS attacks often exploit unbounded resource consumption
79///
80/// Only increase limits when you control both ends of the connection or when
81/// business requirements demand larger payloads.
82#[derive(Debug, Clone)]
83pub struct HttpEndpointConfig {
84    pub base_url: String,
85    pub http_method: Option<String>,
86    pub throw_exception_on_failure: bool,
87    pub ok_status_code_range: (u16, u16),
88    pub response_timeout: Option<Duration>,
89    pub query_params: HashMap<String, String>,
90    pub allow_private_ips: bool,
91    pub blocked_hosts: Vec<String>,
92    pub max_body_size: usize,
93}
94
95/// Camel options that should NOT be forwarded as HTTP query params
96const HTTP_CAMEL_OPTIONS: &[&str] = &[
97    "httpMethod",
98    "throwExceptionOnFailure",
99    "okStatusCodeRange",
100    "followRedirects",
101    "connectTimeout",
102    "responseTimeout",
103    "allowPrivateIps",
104    "blockedHosts",
105    "maxBodySize",
106];
107
108impl UriConfig for HttpEndpointConfig {
109    /// Returns "http" as the primary scheme (also accepts "https")
110    fn scheme() -> &'static str {
111        "http"
112    }
113
114    fn from_uri(uri: &str) -> Result<Self, CamelError> {
115        let parts = parse_uri(uri)?;
116        Self::from_components(parts)
117    }
118
119    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
120        // Validate scheme - accept both http and https
121        if parts.scheme != "http" && parts.scheme != "https" {
122            return Err(CamelError::InvalidUri(format!(
123                "expected scheme 'http' or 'https', got '{}'",
124                parts.scheme
125            )));
126        }
127
128        // Construct base_url from scheme + path
129        // e.g., "http://localhost:8080/api" from scheme "http" and path "//localhost:8080/api"
130        let base_url = format!("{}:{}", parts.scheme, parts.path);
131
132        let http_method = parts.params.get("httpMethod").cloned();
133
134        let throw_exception_on_failure = parts
135            .params
136            .get("throwExceptionOnFailure")
137            .map(|v| v != "false")
138            .unwrap_or(true);
139
140        // Parse status code range from "start-end" format (e.g., "200-299")
141        let ok_status_code_range = parts
142            .params
143            .get("okStatusCodeRange")
144            .and_then(|v| {
145                let (start, end) = v.split_once('-')?;
146                Some((start.parse::<u16>().ok()?, end.parse::<u16>().ok()?))
147            })
148            .unwrap_or((200, 299));
149
150        let response_timeout = parts
151            .params
152            .get("responseTimeout")
153            .and_then(|v| v.parse::<u64>().ok())
154            .map(Duration::from_millis);
155
156        // SSRF protection settings
157        let allow_private_ips = parts
158            .params
159            .get("allowPrivateIps")
160            .map(|v| v == "true")
161            .unwrap_or(false); // Default: block private IPs
162
163        // Parse comma-separated blocked hosts
164        let blocked_hosts = parts
165            .params
166            .get("blockedHosts")
167            .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
168            .unwrap_or_default();
169
170        let max_body_size = parts
171            .params
172            .get("maxBodySize")
173            .and_then(|v| v.parse::<usize>().ok())
174            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
175
176        // Collect remaining params (not Camel options) as query params
177        let query_params: HashMap<String, String> = parts
178            .params
179            .into_iter()
180            .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
181            .collect();
182
183        Ok(Self {
184            base_url,
185            http_method,
186            throw_exception_on_failure,
187            ok_status_code_range,
188            response_timeout,
189            query_params,
190            allow_private_ips,
191            blocked_hosts,
192            max_body_size,
193        })
194    }
195}
196
197impl HttpEndpointConfig {
198    pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
199        let parts = parse_uri(uri)?;
200        let mut endpoint = Self::from_components(parts.clone())?;
201        if endpoint.response_timeout.is_none() {
202            endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
203        }
204        if !parts.params.contains_key("allowPrivateIps") {
205            endpoint.allow_private_ips = config.allow_private_ips;
206        }
207        if !parts.params.contains_key("blockedHosts") {
208            endpoint.blocked_hosts = config.blocked_hosts.clone();
209        }
210        if !parts.params.contains_key("maxBodySize") {
211            endpoint.max_body_size = config.max_body_size;
212        }
213        Ok(endpoint)
214    }
215}
216
217// ---------------------------------------------------------------------------
218// HttpServerConfig
219// ---------------------------------------------------------------------------
220
221/// Configuration for an HTTP server (consumer) endpoint.
222#[derive(Debug, Clone)]
223pub struct HttpServerConfig {
224    /// Bind address, e.g. "0.0.0.0" or "127.0.0.1".
225    pub host: String,
226    /// TCP port to listen on.
227    pub port: u16,
228    /// URL path this consumer handles, e.g. "/orders".
229    pub path: String,
230    /// Maximum request body size in bytes.
231    pub max_request_body: usize,
232    /// Maximum response body size for materializing streams in bytes.
233    pub max_response_body: usize,
234}
235
236impl UriConfig for HttpServerConfig {
237    /// Returns "http" as the primary scheme (also accepts "https")
238    fn scheme() -> &'static str {
239        "http"
240    }
241
242    fn from_uri(uri: &str) -> Result<Self, CamelError> {
243        let parts = parse_uri(uri)?;
244        Self::from_components(parts)
245    }
246
247    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
248        // Validate scheme - accept both http and https
249        if parts.scheme != "http" && parts.scheme != "https" {
250            return Err(CamelError::InvalidUri(format!(
251                "expected scheme 'http' or 'https', got '{}'",
252                parts.scheme
253            )));
254        }
255
256        // parts.path is everything after the scheme colon, e.g. "//0.0.0.0:8080/orders"
257        // Strip leading "//"
258        let authority_and_path = parts.path.trim_start_matches('/');
259
260        // Split on the first "/" to separate "host:port" from "/path"
261        let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
262            (&authority_and_path[..idx], &authority_and_path[idx..])
263        } else {
264            (authority_and_path, "/")
265        };
266
267        let path = if path_suffix.is_empty() {
268            "/"
269        } else {
270            path_suffix
271        }
272        .to_string();
273
274        // Parse host:port from authority
275        let (host, port) = if let Some(colon) = authority.rfind(':') {
276            let port_str = &authority[colon + 1..];
277            match port_str.parse::<u16>() {
278                Ok(p) => (authority[..colon].to_string(), p),
279                Err(_) => {
280                    return Err(CamelError::InvalidUri(format!(
281                        "invalid port '{}' in authority",
282                        port_str
283                    )));
284                }
285            }
286        } else {
287            // Default port based on scheme: 443 for https, 80 for http
288            let default_port = if parts.scheme == "https" { 443 } else { 80 };
289            (authority.to_string(), default_port)
290        };
291
292        let max_request_body = parts
293            .params
294            .get("maxRequestBody")
295            .and_then(|v| v.parse::<usize>().ok())
296            .unwrap_or(2 * 1024 * 1024); // Default: 2MB
297
298        let max_response_body = parts
299            .params
300            .get("maxResponseBody")
301            .and_then(|v| v.parse::<usize>().ok())
302            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
303
304        Ok(Self {
305            host,
306            port,
307            path,
308            max_request_body,
309            max_response_body,
310        })
311    }
312}
313
314impl HttpServerConfig {
315    pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
316        let parts = parse_uri(uri)?;
317        let mut server = Self::from_components(parts.clone())?;
318        if !parts.params.contains_key("maxRequestBody") {
319            server.max_request_body = config.max_request_body;
320        }
321        if !parts.params.contains_key("maxResponseBody") {
322            server.max_response_body = config.max_body_size;
323        }
324        Ok(server)
325    }
326}
327
328// ---------------------------------------------------------------------------
329// RequestEnvelope / HttpReply
330// ---------------------------------------------------------------------------
331
332/// Body de la respuesta HTTP: bytes ya materializados o stream lazy.
333pub(crate) enum HttpReplyBody {
334    Bytes(bytes::Bytes),
335    Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
336}
337
338/// An inbound HTTP request sent from the Axum dispatch handler to an
339/// `HttpConsumer` receive loop.
340pub(crate) struct RequestEnvelope {
341    pub(crate) method: String,
342    pub(crate) path: String,
343    pub(crate) query: String,
344    pub(crate) headers: http::HeaderMap,
345    pub(crate) body: StreamBody,
346    pub(crate) reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
347}
348
349/// The HTTP response that `HttpConsumer` sends back to the Axum handler.
350pub(crate) struct HttpReply {
351    pub(crate) status: u16,
352    pub(crate) headers: Vec<(String, String)>,
353    pub(crate) body: HttpReplyBody,
354}
355
356// ---------------------------------------------------------------------------
357// DispatchTable / ServerRegistry
358// ---------------------------------------------------------------------------
359
360/// Maps URL path → channel sender for the consumer that owns that path.
361pub(crate) type DispatchTable =
362    Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
363
364/// Handle to a running Axum server on one port.
365struct ServerHandle {
366    dispatch: DispatchTable,
367    /// Kept alive so the task isn't dropped; not used directly.
368    _task: tokio::task::JoinHandle<()>,
369}
370
371/// Process-global registry mapping port → running Axum server handle.
372pub struct ServerRegistry {
373    inner: Mutex<HashMap<u16, Arc<OnceCell<ServerHandle>>>>,
374}
375
376impl ServerRegistry {
377    /// Returns the global singleton.
378    pub fn global() -> &'static Self {
379        static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
380        INSTANCE.get_or_init(|| ServerRegistry {
381            inner: Mutex::new(HashMap::new()),
382        })
383    }
384
385    /// Returns the `DispatchTable` for `port`, spawning a new Axum server if
386    /// none is running on that port yet.
387    pub(crate) async fn get_or_spawn(
388        &'static self,
389        host: &str,
390        port: u16,
391        max_request_body: usize,
392    ) -> Result<DispatchTable, CamelError> {
393        let host_owned = host.to_string();
394
395        let cell = {
396            let mut guard = self.inner.lock().map_err(|_| {
397                CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
398            })?;
399            guard
400                .entry(port)
401                .or_insert_with(|| Arc::new(OnceCell::new()))
402                .clone()
403        };
404
405        let handle = cell
406            .get_or_try_init(|| async {
407                let addr = format!("{host_owned}:{port}");
408                let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
409                    CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
410                })?;
411                let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
412                let task = tokio::spawn(run_axum_server(
413                    listener,
414                    Arc::clone(&dispatch),
415                    max_request_body,
416                ));
417                Ok::<ServerHandle, CamelError>(ServerHandle {
418                    dispatch,
419                    _task: task,
420                })
421            })
422            .await?;
423
424        Ok(Arc::clone(&handle.dispatch))
425    }
426}
427
428// ---------------------------------------------------------------------------
429// Axum server
430// ---------------------------------------------------------------------------
431
432use axum::{
433    Router,
434    body::Body as AxumBody,
435    extract::{Request, State},
436    http::{Response, StatusCode},
437    response::IntoResponse,
438};
439
440#[derive(Clone)]
441struct AppState {
442    dispatch: DispatchTable,
443    max_request_body: usize,
444}
445
446async fn run_axum_server(
447    listener: tokio::net::TcpListener,
448    dispatch: DispatchTable,
449    max_request_body: usize,
450) {
451    let state = AppState {
452        dispatch,
453        max_request_body,
454    };
455    let app = Router::new().fallback(dispatch_handler).with_state(state);
456
457    axum::serve(listener, app).await.unwrap_or_else(|e| {
458        tracing::error!(error = %e, "Axum server error");
459    });
460}
461
462async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
463    let method = req.method().to_string();
464    let path = req.uri().path().to_string();
465    let query = req.uri().query().unwrap_or("").to_string();
466    let headers = req.headers().clone();
467
468    // Check Content-Length against limit BEFORE opening the stream
469    let content_length: Option<u64> = headers
470        .get(http::header::CONTENT_LENGTH)
471        .and_then(|v| v.to_str().ok())
472        .and_then(|s| s.parse().ok());
473
474    if let Some(len) = content_length
475        && len > state.max_request_body as u64
476    {
477        return Response::builder()
478            .status(StatusCode::PAYLOAD_TOO_LARGE)
479            .body(AxumBody::from("Request body exceeds configured limit"))
480            .expect("infallible");
481    }
482
483    // Build StreamBody from Axum body WITHOUT materializing
484    let content_type = headers
485        .get(http::header::CONTENT_TYPE)
486        .and_then(|v| v.to_str().ok())
487        .map(|s| s.to_string());
488
489    let data_stream: BodyDataStream = req.into_body().into_data_stream();
490    let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
491    let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
492
493    let stream_body = StreamBody {
494        stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
495        metadata: StreamMetadata {
496            size_hint: content_length,
497            content_type,
498            origin: None,
499        },
500    };
501
502    // Look up handler for this path
503    let sender = {
504        let table = state.dispatch.read().await;
505        table.get(&path).cloned()
506    };
507    let Some(sender) = sender else {
508        return Response::builder()
509            .status(StatusCode::NOT_FOUND)
510            .body(AxumBody::from("No consumer registered for this path"))
511            .expect("infallible");
512    };
513
514    let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
515    let envelope = RequestEnvelope {
516        method,
517        path,
518        query,
519        headers,
520        body: stream_body,
521        reply_tx,
522    };
523
524    if sender.send(envelope).await.is_err() {
525        return Response::builder()
526            .status(StatusCode::SERVICE_UNAVAILABLE)
527            .body(AxumBody::from("Consumer unavailable"))
528            .expect("infallible");
529    }
530
531    match reply_rx.await {
532        Ok(reply) => {
533            let status =
534                StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
535            let mut builder = Response::builder().status(status);
536            for (k, v) in &reply.headers {
537                builder = builder.header(k.as_str(), v.as_str());
538            }
539            match reply.body {
540                HttpReplyBody::Bytes(b) => builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
541                    Response::builder()
542                        .status(StatusCode::INTERNAL_SERVER_ERROR)
543                        .body(AxumBody::from("Invalid response headers from consumer"))
544                        .expect("infallible")
545                }),
546                HttpReplyBody::Stream(stream) => builder
547                    .body(AxumBody::from_stream(stream))
548                    .unwrap_or_else(|_| {
549                        Response::builder()
550                            .status(StatusCode::INTERNAL_SERVER_ERROR)
551                            .body(AxumBody::from("Invalid response headers from consumer"))
552                            .expect("infallible")
553                    }),
554            }
555        }
556        Err(_) => Response::builder()
557            .status(StatusCode::INTERNAL_SERVER_ERROR)
558            .body(AxumBody::from("Pipeline error"))
559            .expect("Response::builder() with a known-valid status code and body is infallible"),
560    }
561}
562
563// ---------------------------------------------------------------------------
564// HttpConsumer
565// ---------------------------------------------------------------------------
566
567pub struct HttpConsumer {
568    config: HttpServerConfig,
569}
570
571impl HttpConsumer {
572    pub fn new(config: HttpServerConfig) -> Self {
573        Self { config }
574    }
575}
576
577#[async_trait::async_trait]
578impl Consumer for HttpConsumer {
579    async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
580        use camel_component_api::{Body, Exchange, Message};
581
582        let dispatch = ServerRegistry::global()
583            .get_or_spawn(
584                &self.config.host,
585                self.config.port,
586                self.config.max_request_body,
587            )
588            .await?;
589
590        // Create a channel for this path and register it
591        let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
592        {
593            let mut table = dispatch.write().await;
594            table.insert(self.config.path.clone(), env_tx);
595        }
596
597        let path = self.config.path.clone();
598        let cancel_token = ctx.cancel_token();
599        let _max_response_body = self.config.max_response_body;
600
601        loop {
602            tokio::select! {
603                _ = ctx.cancelled() => {
604                    break;
605                }
606                envelope = env_rx.recv() => {
607                    let Some(envelope) = envelope else { break; };
608
609                    // Build Exchange from HTTP request
610                    let mut msg = Message::default();
611
612                    // Set standard Camel HTTP headers
613                    msg.set_header("CamelHttpMethod",
614                        serde_json::Value::String(envelope.method.clone()));
615                    msg.set_header("CamelHttpPath",
616                        serde_json::Value::String(envelope.path.clone()));
617                    msg.set_header("CamelHttpQuery",
618                        serde_json::Value::String(envelope.query.clone()));
619
620                    // Forward HTTP headers (skip pseudo-headers)
621                    for (k, v) in &envelope.headers {
622                        if let Ok(val_str) = v.to_str() {
623                            msg.set_header(
624                                k.as_str(),
625                                serde_json::Value::String(val_str.to_string()),
626                            );
627                        }
628                    }
629
630                    // Body: always arrives as Body::Stream (native streaming)
631                    // Routes can call into_bytes() if they need to materialize
632                    msg.body = Body::Stream(envelope.body);
633
634                    #[allow(unused_mut)]
635                    let mut exchange = Exchange::new(msg);
636
637                    // Extract W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
638                    #[cfg(feature = "otel")]
639                    {
640                        let headers: HashMap<String, String> = envelope
641                            .headers
642                            .iter()
643                            .filter_map(|(k, v)| {
644                                Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
645                            })
646                            .collect();
647                        camel_otel::extract_into_exchange(&mut exchange, &headers);
648                    }
649
650                    let reply_tx = envelope.reply_tx;
651                    let sender = ctx.sender().clone();
652                    let path_clone = path.clone();
653                    let cancel = cancel_token.clone();
654
655                    // Spawn a task to handle this request concurrently
656                    //
657                    // NOTE: This spawns a separate tokio task for each incoming HTTP request to enable
658                    // true concurrent request processing. This change was introduced as part of the
659                    // pipeline concurrency feature and was NOT part of the original HttpConsumer design.
660                    //
661                    // Rationale:
662                    // 1. Without spawning per-request tasks, the send_and_wait() operation would block
663                    //    the consumer's main loop until the pipeline processing completes
664                    // 2. This blocking would prevent multiple HTTP requests from being processed
665                    //    concurrently, even when ConcurrencyModel::Concurrent is enabled on the pipeline
666                    // 3. The channel would never have multiple exchanges buffered simultaneously,
667                    //    defeating the purpose of pipeline-side concurrency
668                    // 4. By spawning a task per request, we allow the consumer loop to continue
669                    //    accepting new requests while existing ones are processed in the pipeline
670                    //
671                    // This approach effectively decouples request acceptance from pipeline processing,
672                    // allowing the channel to buffer multiple exchanges that can be processed concurrently
673                    // by the pipeline when ConcurrencyModel::Concurrent is active.
674                    tokio::spawn(async move {
675                        // Check for cancellation before sending to pipeline.
676                        // Returns 503 (Service Unavailable) instead of letting the request
677                        // enter a shutting-down pipeline. This is a behavioral change from
678                        // the pre-concurrency implementation where cancellation during
679                        // processing would result in a 500 (Internal Server Error).
680                        // 503 is more semantically correct: the server is temporarily
681                        // unable to handle the request due to shutdown.
682                        if cancel.is_cancelled() {
683                            let _ = reply_tx.send(HttpReply {
684                                status: 503,
685                                headers: vec![],
686                                body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
687                            });
688                            return;
689                        }
690
691                        // Send through pipeline and await result
692                        let (tx, rx) = tokio::sync::oneshot::channel();
693                        let envelope = camel_component_api::consumer::ExchangeEnvelope {
694                            exchange,
695                            reply_tx: Some(tx),
696                        };
697
698                        let result = match sender.send(envelope).await {
699                            Ok(()) => rx.await.map_err(|_| camel_component_api::CamelError::ChannelClosed),
700                            Err(_) => Err(camel_component_api::CamelError::ChannelClosed),
701                        }
702                        .and_then(|r| r);
703
704                        let reply = match result {
705                            Ok(out) => {
706                                let status = out
707                                    .input
708                                    .header("CamelHttpResponseCode")
709                                    .and_then(|v| v.as_u64())
710                                    .map(|s| s as u16)
711                                    .unwrap_or(200);
712
713                                let reply_body: HttpReplyBody = match out.input.body {
714                                    Body::Empty => HttpReplyBody::Bytes(bytes::Bytes::new()),
715                                    Body::Bytes(b) => HttpReplyBody::Bytes(b),
716                                    Body::Text(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
717                                    Body::Xml(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
718                                    Body::Json(v) => HttpReplyBody::Bytes(bytes::Bytes::from(
719                                        v.to_string().into_bytes(),
720                                    )),
721                                    Body::Stream(s) => {
722                                        match s.stream.lock().await.take() {
723                                            Some(stream) => HttpReplyBody::Stream(stream),
724                                            None => {
725                                                tracing::error!(
726                                                    "Body::Stream already consumed before HTTP reply — returning 500"
727                                                );
728                                                let error_reply = HttpReply {
729                                                    status: 500,
730                                                    headers: vec![],
731                                                    body: HttpReplyBody::Bytes(bytes::Bytes::new()),
732                                                };
733                                                if reply_tx.send(error_reply).is_err() {
734                                                    debug!("reply_tx dropped before error reply could be sent");
735                                                }
736                                                return;
737                                            }
738                                        }
739                                    }
740                                };
741
742                                let resp_headers: Vec<(String, String)> = out
743                                    .input
744                                    .headers
745                                    .iter()
746                                    // Filter Camel internal headers
747                                    .filter(|(k, _)| !k.starts_with("Camel"))
748                                    // Filter hop-by-hop and request-only headers
749                                    // Based on Apache Camel's HttpUtil.addCommonFilters()
750                                    .filter(|(k, _)| {
751                                        !matches!(
752                                            k.to_lowercase().as_str(),
753                                            // RFC 2616 Section 4.5 - General headers
754                                            "content-length" |      // Auto-calculated by framework
755                                            "content-type" |        // Auto-calculated from body
756                                            "transfer-encoding" |   // Hop-by-hop
757                                            "connection" |          // Hop-by-hop
758                                            "cache-control" |       // Hop-by-hop
759                                            "date" |                // Auto-generated
760                                            "pragma" |              // Hop-by-hop
761                                            "trailer" |             // Hop-by-hop
762                                            "upgrade" |             // Hop-by-hop
763                                            "via" |                 // Hop-by-hop
764                                            "warning" |             // Hop-by-hop
765                                            // Request-only headers
766                                            "host" |                // Request-only
767                                            "user-agent" |          // Request-only
768                                            "accept" |              // Request-only
769                                            "accept-encoding" |     // Request-only
770                                            "accept-language" |     // Request-only
771                                            "accept-charset" |      // Request-only
772                                            "authorization" |       // Request-only (security)
773                                            "proxy-authorization" | // Request-only (security)
774                                            "cookie" |              // Request-only
775                                            "expect" |              // Request-only
776                                            "from" |                // Request-only
777                                            "if-match" |            // Request-only
778                                            "if-modified-since" |   // Request-only
779                                            "if-none-match" |       // Request-only
780                                            "if-range" |            // Request-only
781                                            "if-unmodified-since" | // Request-only
782                                            "max-forwards" |        // Request-only
783                                            "proxy-connection" |    // Request-only
784                                            "range" |               // Request-only
785                                            "referer" |             // Request-only
786                                            "te"                    // Request-only
787                                        )
788                                    })
789                                    .filter_map(|(k, v)| {
790                                        v.as_str().map(|s| (k.clone(), s.to_string()))
791                                    })
792                                    .collect();
793
794                                HttpReply {
795                                    status,
796                                    headers: resp_headers,
797                                    body: reply_body,
798                                }
799                            }
800                            Err(e) => {
801                                tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
802                                HttpReply {
803                                    status: 500,
804                                    headers: vec![],
805                                    body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
806                                }
807                            }
808                        };
809
810                        // Reply to Axum handler (ignore error if client disconnected)
811                        let _ = reply_tx.send(reply);
812                    });
813                }
814            }
815        }
816
817        // Deregister this path
818        {
819            let mut table = dispatch.write().await;
820            table.remove(&path);
821        }
822
823        Ok(())
824    }
825
826    async fn stop(&mut self) -> Result<(), CamelError> {
827        Ok(())
828    }
829
830    fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
831        camel_component_api::ConcurrencyModel::Concurrent { max: None }
832    }
833}
834
835// ---------------------------------------------------------------------------
836// HttpComponent / HttpsComponent
837// ---------------------------------------------------------------------------
838
839pub struct HttpComponent {
840    client: reqwest::Client,
841    config: HttpConfig,
842}
843
844fn build_client(config: &HttpConfig) -> reqwest::Client {
845    let mut builder = reqwest::Client::builder()
846        .connect_timeout(Duration::from_millis(config.connect_timeout_ms))
847        .pool_max_idle_per_host(config.pool_max_idle_per_host)
848        .pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
849
850    if !config.follow_redirects {
851        builder = builder.redirect(reqwest::redirect::Policy::none());
852    }
853
854    builder
855        .build()
856        .expect("reqwest::Client::build() with valid config should not fail")
857}
858
859impl HttpComponent {
860    pub fn new() -> Self {
861        let config = HttpConfig::default();
862        let client = build_client(&config);
863        Self { client, config }
864    }
865
866    pub fn with_config(config: HttpConfig) -> Self {
867        let client = build_client(&config);
868        Self { client, config }
869    }
870
871    pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
872        match config {
873            Some(cfg) => Self::with_config(cfg),
874            None => Self::new(),
875        }
876    }
877}
878
879impl Default for HttpComponent {
880    fn default() -> Self {
881        Self::new()
882    }
883}
884
885impl Component for HttpComponent {
886    fn scheme(&self) -> &str {
887        "http"
888    }
889
890    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
891        let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
892        let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
893        Ok(Box::new(HttpEndpoint {
894            uri: uri.to_string(),
895            config,
896            server_config,
897            client: self.client.clone(),
898        }))
899    }
900}
901
902pub struct HttpsComponent {
903    client: reqwest::Client,
904    config: HttpConfig,
905}
906
907impl HttpsComponent {
908    pub fn new() -> Self {
909        let config = HttpConfig::default();
910        let client = build_client(&config);
911        Self { client, config }
912    }
913
914    pub fn with_config(config: HttpConfig) -> Self {
915        let client = build_client(&config);
916        Self { client, config }
917    }
918
919    pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
920        match config {
921            Some(cfg) => Self::with_config(cfg),
922            None => Self::new(),
923        }
924    }
925}
926
927impl Default for HttpsComponent {
928    fn default() -> Self {
929        Self::new()
930    }
931}
932
933impl Component for HttpsComponent {
934    fn scheme(&self) -> &str {
935        "https"
936    }
937
938    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
939        let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
940        let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
941        Ok(Box::new(HttpEndpoint {
942            uri: uri.to_string(),
943            config,
944            server_config,
945            client: self.client.clone(),
946        }))
947    }
948}
949
950// ---------------------------------------------------------------------------
951// HttpEndpoint
952// ---------------------------------------------------------------------------
953
954struct HttpEndpoint {
955    uri: String,
956    config: HttpEndpointConfig,
957    server_config: HttpServerConfig,
958    client: reqwest::Client,
959}
960
961impl Endpoint for HttpEndpoint {
962    fn uri(&self) -> &str {
963        &self.uri
964    }
965
966    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
967        Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
968    }
969
970    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
971        Ok(BoxProcessor::new(HttpProducer {
972            config: Arc::new(self.config.clone()),
973            client: self.client.clone(),
974        }))
975    }
976}
977
978// ---------------------------------------------------------------------------
979// SSRF Protection
980// ---------------------------------------------------------------------------
981
982fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
983    let parsed = url::Url::parse(url)
984        .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
985
986    // Check blocked hosts
987    if let Some(host) = parsed.host_str()
988        && config.blocked_hosts.iter().any(|blocked| host == blocked)
989    {
990        return Err(CamelError::ProcessorError(format!(
991            "Host '{}' is blocked",
992            host
993        )));
994    }
995
996    // Check private IPs if not allowed
997    if !config.allow_private_ips
998        && let Some(host) = parsed.host()
999    {
1000        match host {
1001            url::Host::Ipv4(ip) => {
1002                if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
1003                    return Err(CamelError::ProcessorError(format!(
1004                        "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
1005                        ip
1006                    )));
1007                }
1008            }
1009            url::Host::Ipv6(ip) => {
1010                if ip.is_loopback() {
1011                    return Err(CamelError::ProcessorError(format!(
1012                        "Loopback IP '{}' not allowed",
1013                        ip
1014                    )));
1015                }
1016            }
1017            url::Host::Domain(domain) => {
1018                // Block common internal domains
1019                let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
1020                if blocked_domains.contains(&domain) {
1021                    return Err(CamelError::ProcessorError(format!(
1022                        "Domain '{}' is not allowed",
1023                        domain
1024                    )));
1025                }
1026            }
1027        }
1028    }
1029
1030    Ok(())
1031}
1032
1033// ---------------------------------------------------------------------------
1034// HttpProducer
1035// ---------------------------------------------------------------------------
1036
1037#[derive(Clone)]
1038struct HttpProducer {
1039    config: Arc<HttpEndpointConfig>,
1040    client: reqwest::Client,
1041}
1042
1043impl HttpProducer {
1044    fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1045        if let Some(ref method) = config.http_method {
1046            return method.to_uppercase();
1047        }
1048        if let Some(method) = exchange
1049            .input
1050            .header("CamelHttpMethod")
1051            .and_then(|v| v.as_str())
1052        {
1053            return method.to_uppercase();
1054        }
1055        if !exchange.input.body.is_empty() {
1056            return "POST".to_string();
1057        }
1058        "GET".to_string()
1059    }
1060
1061    fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1062        if let Some(uri) = exchange
1063            .input
1064            .header("CamelHttpUri")
1065            .and_then(|v| v.as_str())
1066        {
1067            let mut url = uri.to_string();
1068            if let Some(path) = exchange
1069                .input
1070                .header("CamelHttpPath")
1071                .and_then(|v| v.as_str())
1072            {
1073                if !url.ends_with('/') && !path.starts_with('/') {
1074                    url.push('/');
1075                }
1076                url.push_str(path);
1077            }
1078            if let Some(query) = exchange
1079                .input
1080                .header("CamelHttpQuery")
1081                .and_then(|v| v.as_str())
1082            {
1083                url.push('?');
1084                url.push_str(query);
1085            }
1086            return url;
1087        }
1088
1089        let mut url = config.base_url.clone();
1090
1091        if let Some(path) = exchange
1092            .input
1093            .header("CamelHttpPath")
1094            .and_then(|v| v.as_str())
1095        {
1096            if !url.ends_with('/') && !path.starts_with('/') {
1097                url.push('/');
1098            }
1099            url.push_str(path);
1100        }
1101
1102        if let Some(query) = exchange
1103            .input
1104            .header("CamelHttpQuery")
1105            .and_then(|v| v.as_str())
1106        {
1107            url.push('?');
1108            url.push_str(query);
1109        } else if !config.query_params.is_empty() {
1110            // Forward non-Camel query params from config
1111            url.push('?');
1112            let query_string: String = config
1113                .query_params
1114                .iter()
1115                .map(|(k, v)| format!("{k}={v}"))
1116                .collect::<Vec<_>>()
1117                .join("&");
1118            url.push_str(&query_string);
1119        }
1120
1121        url
1122    }
1123
1124    fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1125        status >= range.0 && status <= range.1
1126    }
1127}
1128
1129impl Service<Exchange> for HttpProducer {
1130    type Response = Exchange;
1131    type Error = CamelError;
1132    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1133
1134    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1135        Poll::Ready(Ok(()))
1136    }
1137
1138    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1139        let config = self.config.clone();
1140        let client = self.client.clone();
1141
1142        Box::pin(async move {
1143            let method_str = HttpProducer::resolve_method(&exchange, &config);
1144            let url = HttpProducer::resolve_url(&exchange, &config);
1145
1146            // SECURITY: Validate URL for SSRF
1147            validate_url_for_ssrf(&url, &config)?;
1148
1149            debug!(
1150                correlation_id = %exchange.correlation_id(),
1151                method = %method_str,
1152                url = %url,
1153                "HTTP request"
1154            );
1155
1156            let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1157                CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1158            })?;
1159
1160            let mut request = client.request(method, &url);
1161
1162            if let Some(timeout) = config.response_timeout {
1163                request = request.timeout(timeout);
1164            }
1165
1166            // Inject W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
1167            #[cfg(feature = "otel")]
1168            {
1169                let mut otel_headers = HashMap::new();
1170                camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1171                for (k, v) in otel_headers {
1172                    if let (Ok(name), Ok(val)) = (
1173                        reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1174                        reqwest::header::HeaderValue::from_str(&v),
1175                    ) {
1176                        request = request.header(name, val);
1177                    }
1178                }
1179            }
1180
1181            for (key, value) in &exchange.input.headers {
1182                if !key.starts_with("Camel")
1183                    && let Some(val_str) = value.as_str()
1184                    && let (Ok(name), Ok(val)) = (
1185                        reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1186                        reqwest::header::HeaderValue::from_str(val_str),
1187                    )
1188                {
1189                    request = request.header(name, val);
1190                }
1191            }
1192
1193            match exchange.input.body {
1194                Body::Stream(ref s) => {
1195                    let mut stream_lock = s.stream.lock().await;
1196                    if let Some(stream) = stream_lock.take() {
1197                        request = request.body(reqwest::Body::wrap_stream(stream));
1198                    } else {
1199                        return Err(CamelError::AlreadyConsumed);
1200                    }
1201                }
1202                _ => {
1203                    // For other types, materialize with configured limit
1204                    let body = std::mem::take(&mut exchange.input.body);
1205                    let bytes = body.into_bytes(config.max_body_size).await?;
1206                    if !bytes.is_empty() {
1207                        request = request.body(bytes);
1208                    }
1209                }
1210            }
1211
1212            let response = request
1213                .send()
1214                .await
1215                .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1216
1217            let status_code = response.status().as_u16();
1218            let status_text = response
1219                .status()
1220                .canonical_reason()
1221                .unwrap_or("Unknown")
1222                .to_string();
1223
1224            for (key, value) in response.headers() {
1225                if let Ok(val_str) = value.to_str() {
1226                    exchange
1227                        .input
1228                        .set_header(key.as_str(), serde_json::Value::String(val_str.to_string()));
1229                }
1230            }
1231
1232            exchange.input.set_header(
1233                "CamelHttpResponseCode",
1234                serde_json::Value::Number(status_code.into()),
1235            );
1236            exchange.input.set_header(
1237                "CamelHttpResponseText",
1238                serde_json::Value::String(status_text.clone()),
1239            );
1240
1241            let response_body = response.bytes().await.map_err(|e| {
1242                CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1243            })?;
1244
1245            if config.throw_exception_on_failure
1246                && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1247            {
1248                return Err(CamelError::HttpOperationFailed {
1249                    method: method_str,
1250                    url,
1251                    status_code,
1252                    status_text,
1253                    response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1254                });
1255            }
1256
1257            if !response_body.is_empty() {
1258                exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1259            }
1260
1261            debug!(
1262                correlation_id = %exchange.correlation_id(),
1263                status = status_code,
1264                url = %url,
1265                "HTTP response"
1266            );
1267            Ok(exchange)
1268        })
1269    }
1270}
1271
1272#[cfg(test)]
1273mod tests {
1274    use super::*;
1275    use camel_component_api::Message;
1276    use std::sync::Arc;
1277    use std::time::Duration;
1278
1279    fn test_producer_ctx() -> ProducerContext {
1280        ProducerContext::new()
1281    }
1282
1283    #[test]
1284    fn test_http_config_defaults() {
1285        let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1286        assert_eq!(config.base_url, "http://localhost:8080/api");
1287        assert!(config.http_method.is_none());
1288        assert!(config.throw_exception_on_failure);
1289        assert_eq!(config.ok_status_code_range, (200, 299));
1290        assert!(config.response_timeout.is_none());
1291    }
1292
1293    #[test]
1294    fn test_http_config_scheme() {
1295        // UriConfig trait method returns "http" as primary scheme
1296        assert_eq!(HttpEndpointConfig::scheme(), "http");
1297    }
1298
1299    #[test]
1300    fn test_http_config_from_components() {
1301        // Test from_components directly (trait method)
1302        let components = camel_component_api::UriComponents {
1303            scheme: "https".to_string(),
1304            path: "//api.example.com/v1".to_string(),
1305            params: std::collections::HashMap::from([(
1306                "httpMethod".to_string(),
1307                "POST".to_string(),
1308            )]),
1309        };
1310        let config = HttpEndpointConfig::from_components(components).unwrap();
1311        assert_eq!(config.base_url, "https://api.example.com/v1");
1312        assert_eq!(config.http_method, Some("POST".to_string()));
1313    }
1314
1315    #[test]
1316    fn test_http_config_with_options() {
1317        let config = HttpEndpointConfig::from_uri(
1318            "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1319        ).unwrap();
1320        assert_eq!(config.base_url, "https://api.example.com/v1");
1321        assert_eq!(config.http_method, Some("PUT".to_string()));
1322        assert!(!config.throw_exception_on_failure);
1323        assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1324    }
1325
1326    #[test]
1327    fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
1328        let config = HttpConfig::default()
1329            .with_response_timeout_ms(999)
1330            .with_allow_private_ips(true)
1331            .with_blocked_hosts(vec!["evil.com".to_string()])
1332            .with_max_body_size(12345);
1333        let endpoint =
1334            HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
1335        assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
1336        assert!(endpoint.allow_private_ips);
1337        assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
1338        assert_eq!(endpoint.max_body_size, 12345);
1339    }
1340
1341    #[test]
1342    fn test_from_uri_with_defaults_uri_overrides_config() {
1343        let config = HttpConfig::default()
1344            .with_response_timeout_ms(999)
1345            .with_allow_private_ips(true)
1346            .with_blocked_hosts(vec!["evil.com".to_string()])
1347            .with_max_body_size(12345);
1348        let endpoint = HttpEndpointConfig::from_uri_with_defaults(
1349            "http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
1350            &config,
1351        )
1352        .unwrap();
1353        assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
1354        assert!(!endpoint.allow_private_ips);
1355        assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
1356        assert_eq!(endpoint.max_body_size, 99);
1357    }
1358
1359    #[test]
1360    fn test_http_config_ok_status_range() {
1361        let config =
1362            HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1363        assert_eq!(config.ok_status_code_range, (200, 204));
1364    }
1365
1366    #[test]
1367    fn test_http_config_wrong_scheme() {
1368        let result = HttpEndpointConfig::from_uri("file:/tmp");
1369        assert!(result.is_err());
1370    }
1371
1372    #[test]
1373    fn test_http_component_scheme() {
1374        let component = HttpComponent::new();
1375        assert_eq!(component.scheme(), "http");
1376    }
1377
1378    #[test]
1379    fn test_https_component_scheme() {
1380        let component = HttpsComponent::new();
1381        assert_eq!(component.scheme(), "https");
1382    }
1383
1384    #[test]
1385    fn test_http_endpoint_creates_consumer() {
1386        let component = HttpComponent::new();
1387        let endpoint = component
1388            .create_endpoint("http://0.0.0.0:19100/test")
1389            .unwrap();
1390        assert!(endpoint.create_consumer().is_ok());
1391    }
1392
1393    #[test]
1394    fn test_https_endpoint_creates_consumer() {
1395        let component = HttpsComponent::new();
1396        let endpoint = component
1397            .create_endpoint("https://0.0.0.0:8443/test")
1398            .unwrap();
1399        assert!(endpoint.create_consumer().is_ok());
1400    }
1401
1402    #[test]
1403    fn test_http_endpoint_creates_producer() {
1404        let ctx = test_producer_ctx();
1405        let component = HttpComponent::new();
1406        let endpoint = component.create_endpoint("http://localhost/api").unwrap();
1407        assert!(endpoint.create_producer(&ctx).is_ok());
1408    }
1409
1410    // -----------------------------------------------------------------------
1411    // Producer tests
1412    // -----------------------------------------------------------------------
1413
1414    async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
1415        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1416        let addr = listener.local_addr().unwrap();
1417        let url = format!("http://127.0.0.1:{}", addr.port());
1418
1419        let handle = tokio::spawn(async move {
1420            loop {
1421                if let Ok((mut stream, _)) = listener.accept().await {
1422                    tokio::spawn(async move {
1423                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1424                        let mut buf = vec![0u8; 4096];
1425                        let n = stream.read(&mut buf).await.unwrap_or(0);
1426                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
1427
1428                        let method = request.split_whitespace().next().unwrap_or("GET");
1429
1430                        let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
1431                        let response = format!(
1432                            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
1433                            body.len(),
1434                            body
1435                        );
1436                        let _ = stream.write_all(response.as_bytes()).await;
1437                    });
1438                }
1439            }
1440        });
1441
1442        (url, handle)
1443    }
1444
1445    async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
1446        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1447        let addr = listener.local_addr().unwrap();
1448        let url = format!("http://127.0.0.1:{}", addr.port());
1449
1450        let handle = tokio::spawn(async move {
1451            loop {
1452                if let Ok((mut stream, _)) = listener.accept().await {
1453                    let status = status;
1454                    tokio::spawn(async move {
1455                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1456                        let mut buf = vec![0u8; 4096];
1457                        let _ = stream.read(&mut buf).await;
1458
1459                        let status_text = match status {
1460                            404 => "Not Found",
1461                            500 => "Internal Server Error",
1462                            _ => "Error",
1463                        };
1464                        let body = "error body";
1465                        let response = format!(
1466                            "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
1467                            status,
1468                            status_text,
1469                            body.len(),
1470                            body
1471                        );
1472                        let _ = stream.write_all(response.as_bytes()).await;
1473                    });
1474                }
1475            }
1476        });
1477
1478        (url, handle)
1479    }
1480
1481    #[tokio::test]
1482    async fn test_http_producer_get_request() {
1483        use tower::ServiceExt;
1484
1485        let (url, _handle) = start_test_server().await;
1486        let ctx = test_producer_ctx();
1487
1488        let component = HttpComponent::new();
1489        let endpoint = component
1490            .create_endpoint(&format!("{url}/api/test?allowPrivateIps=true"))
1491            .unwrap();
1492        let producer = endpoint.create_producer(&ctx).unwrap();
1493
1494        let exchange = Exchange::new(Message::default());
1495        let result = producer.oneshot(exchange).await.unwrap();
1496
1497        let status = result
1498            .input
1499            .header("CamelHttpResponseCode")
1500            .and_then(|v| v.as_u64())
1501            .unwrap();
1502        assert_eq!(status, 200);
1503
1504        assert!(!result.input.body.is_empty());
1505    }
1506
1507    #[tokio::test]
1508    async fn test_http_producer_post_with_body() {
1509        use tower::ServiceExt;
1510
1511        let (url, _handle) = start_test_server().await;
1512        let ctx = test_producer_ctx();
1513
1514        let component = HttpComponent::new();
1515        let endpoint = component
1516            .create_endpoint(&format!("{url}/api/data?allowPrivateIps=true"))
1517            .unwrap();
1518        let producer = endpoint.create_producer(&ctx).unwrap();
1519
1520        let exchange = Exchange::new(Message::new("request body"));
1521        let result = producer.oneshot(exchange).await.unwrap();
1522
1523        let status = result
1524            .input
1525            .header("CamelHttpResponseCode")
1526            .and_then(|v| v.as_u64())
1527            .unwrap();
1528        assert_eq!(status, 200);
1529    }
1530
1531    #[tokio::test]
1532    async fn test_http_producer_method_from_header() {
1533        use tower::ServiceExt;
1534
1535        let (url, _handle) = start_test_server().await;
1536        let ctx = test_producer_ctx();
1537
1538        let component = HttpComponent::new();
1539        let endpoint = component
1540            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1541            .unwrap();
1542        let producer = endpoint.create_producer(&ctx).unwrap();
1543
1544        let mut exchange = Exchange::new(Message::default());
1545        exchange.input.set_header(
1546            "CamelHttpMethod",
1547            serde_json::Value::String("DELETE".to_string()),
1548        );
1549
1550        let result = producer.oneshot(exchange).await.unwrap();
1551        let status = result
1552            .input
1553            .header("CamelHttpResponseCode")
1554            .and_then(|v| v.as_u64())
1555            .unwrap();
1556        assert_eq!(status, 200);
1557    }
1558
1559    #[tokio::test]
1560    async fn test_http_producer_forced_method() {
1561        use tower::ServiceExt;
1562
1563        let (url, _handle) = start_test_server().await;
1564        let ctx = test_producer_ctx();
1565
1566        let component = HttpComponent::new();
1567        let endpoint = component
1568            .create_endpoint(&format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"))
1569            .unwrap();
1570        let producer = endpoint.create_producer(&ctx).unwrap();
1571
1572        let exchange = Exchange::new(Message::default());
1573        let result = producer.oneshot(exchange).await.unwrap();
1574
1575        let status = result
1576            .input
1577            .header("CamelHttpResponseCode")
1578            .and_then(|v| v.as_u64())
1579            .unwrap();
1580        assert_eq!(status, 200);
1581    }
1582
1583    #[tokio::test]
1584    async fn test_http_producer_throw_exception_on_failure() {
1585        use tower::ServiceExt;
1586
1587        let (url, _handle) = start_status_server(404).await;
1588        let ctx = test_producer_ctx();
1589
1590        let component = HttpComponent::new();
1591        let endpoint = component
1592            .create_endpoint(&format!("{url}/not-found?allowPrivateIps=true"))
1593            .unwrap();
1594        let producer = endpoint.create_producer(&ctx).unwrap();
1595
1596        let exchange = Exchange::new(Message::default());
1597        let result = producer.oneshot(exchange).await;
1598        assert!(result.is_err());
1599
1600        match result.unwrap_err() {
1601            CamelError::HttpOperationFailed { status_code, .. } => {
1602                assert_eq!(status_code, 404);
1603            }
1604            e => panic!("Expected HttpOperationFailed, got: {e}"),
1605        }
1606    }
1607
1608    #[tokio::test]
1609    async fn test_http_producer_no_throw_on_failure() {
1610        use tower::ServiceExt;
1611
1612        let (url, _handle) = start_status_server(500).await;
1613        let ctx = test_producer_ctx();
1614
1615        let component = HttpComponent::new();
1616        let endpoint = component
1617            .create_endpoint(&format!(
1618                "{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"
1619            ))
1620            .unwrap();
1621        let producer = endpoint.create_producer(&ctx).unwrap();
1622
1623        let exchange = Exchange::new(Message::default());
1624        let result = producer.oneshot(exchange).await.unwrap();
1625
1626        let status = result
1627            .input
1628            .header("CamelHttpResponseCode")
1629            .and_then(|v| v.as_u64())
1630            .unwrap();
1631        assert_eq!(status, 500);
1632    }
1633
1634    #[tokio::test]
1635    async fn test_http_producer_uri_override() {
1636        use tower::ServiceExt;
1637
1638        let (url, _handle) = start_test_server().await;
1639        let ctx = test_producer_ctx();
1640
1641        let component = HttpComponent::new();
1642        let endpoint = component
1643            .create_endpoint("http://localhost:1/does-not-exist?allowPrivateIps=true")
1644            .unwrap();
1645        let producer = endpoint.create_producer(&ctx).unwrap();
1646
1647        let mut exchange = Exchange::new(Message::default());
1648        exchange.input.set_header(
1649            "CamelHttpUri",
1650            serde_json::Value::String(format!("{url}/api")),
1651        );
1652
1653        let result = producer.oneshot(exchange).await.unwrap();
1654        let status = result
1655            .input
1656            .header("CamelHttpResponseCode")
1657            .and_then(|v| v.as_u64())
1658            .unwrap();
1659        assert_eq!(status, 200);
1660    }
1661
1662    #[tokio::test]
1663    async fn test_http_producer_response_headers_mapped() {
1664        use tower::ServiceExt;
1665
1666        let (url, _handle) = start_test_server().await;
1667        let ctx = test_producer_ctx();
1668
1669        let component = HttpComponent::new();
1670        let endpoint = component
1671            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1672            .unwrap();
1673        let producer = endpoint.create_producer(&ctx).unwrap();
1674
1675        let exchange = Exchange::new(Message::default());
1676        let result = producer.oneshot(exchange).await.unwrap();
1677
1678        assert!(
1679            result.input.header("content-type").is_some()
1680                || result.input.header("Content-Type").is_some()
1681        );
1682        assert!(result.input.header("CamelHttpResponseText").is_some());
1683    }
1684
1685    // -----------------------------------------------------------------------
1686    // Bug fix tests: Client configuration per-endpoint
1687    // -----------------------------------------------------------------------
1688
1689    async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
1690        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1691        let addr = listener.local_addr().unwrap();
1692        let url = format!("http://127.0.0.1:{}", addr.port());
1693
1694        let handle = tokio::spawn(async move {
1695            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1696            loop {
1697                if let Ok((mut stream, _)) = listener.accept().await {
1698                    tokio::spawn(async move {
1699                        let mut buf = vec![0u8; 4096];
1700                        let n = stream.read(&mut buf).await.unwrap_or(0);
1701                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
1702
1703                        // Check if this is a request to /final
1704                        if request.contains("GET /final") {
1705                            let body = r#"{"status":"final"}"#;
1706                            let response = format!(
1707                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1708                                body.len(),
1709                                body
1710                            );
1711                            let _ = stream.write_all(response.as_bytes()).await;
1712                        } else {
1713                            // Redirect to /final
1714                            let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
1715                            let _ = stream.write_all(response.as_bytes()).await;
1716                        }
1717                    });
1718                }
1719            }
1720        });
1721
1722        (url, handle)
1723    }
1724
1725    #[tokio::test]
1726    async fn test_follow_redirects_false_does_not_follow() {
1727        use tower::ServiceExt;
1728
1729        let (url, _handle) = start_redirect_server().await;
1730        let ctx = test_producer_ctx();
1731
1732        let component =
1733            HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
1734        let endpoint = component
1735            .create_endpoint(&format!(
1736                "{url}?throwExceptionOnFailure=false&allowPrivateIps=true"
1737            ))
1738            .unwrap();
1739        let producer = endpoint.create_producer(&ctx).unwrap();
1740
1741        let exchange = Exchange::new(Message::default());
1742        let result = producer.oneshot(exchange).await.unwrap();
1743
1744        // Should get 302, NOT follow redirect to 200
1745        let status = result
1746            .input
1747            .header("CamelHttpResponseCode")
1748            .and_then(|v| v.as_u64())
1749            .unwrap();
1750        assert_eq!(
1751            status, 302,
1752            "Should NOT follow redirect when followRedirects=false"
1753        );
1754    }
1755
1756    #[tokio::test]
1757    async fn test_follow_redirects_true_follows_redirect() {
1758        use tower::ServiceExt;
1759
1760        let (url, _handle) = start_redirect_server().await;
1761        let ctx = test_producer_ctx();
1762
1763        let component =
1764            HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
1765        let endpoint = component
1766            .create_endpoint(&format!("{url}?allowPrivateIps=true"))
1767            .unwrap();
1768        let producer = endpoint.create_producer(&ctx).unwrap();
1769
1770        let exchange = Exchange::new(Message::default());
1771        let result = producer.oneshot(exchange).await.unwrap();
1772
1773        // Should follow redirect and get 200
1774        let status = result
1775            .input
1776            .header("CamelHttpResponseCode")
1777            .and_then(|v| v.as_u64())
1778            .unwrap();
1779        assert_eq!(
1780            status, 200,
1781            "Should follow redirect when followRedirects=true"
1782        );
1783    }
1784
1785    #[tokio::test]
1786    async fn test_query_params_forwarded_to_http_request() {
1787        use tower::ServiceExt;
1788
1789        let (url, _handle) = start_test_server().await;
1790        let ctx = test_producer_ctx();
1791
1792        let component = HttpComponent::new();
1793        // apiKey is NOT a Camel option, should be forwarded as query param
1794        let endpoint = component
1795            .create_endpoint(&format!(
1796                "{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"
1797            ))
1798            .unwrap();
1799        let producer = endpoint.create_producer(&ctx).unwrap();
1800
1801        let exchange = Exchange::new(Message::default());
1802        let result = producer.oneshot(exchange).await.unwrap();
1803
1804        // The test server returns the request info in response
1805        // We just verify it succeeds (the query param was sent)
1806        let status = result
1807            .input
1808            .header("CamelHttpResponseCode")
1809            .and_then(|v| v.as_u64())
1810            .unwrap();
1811        assert_eq!(status, 200);
1812    }
1813
1814    #[tokio::test]
1815    async fn test_non_camel_query_params_are_forwarded() {
1816        // This test verifies Bug #3 fix: non-Camel options should be forwarded
1817        // We'll test the config parsing, not the actual HTTP call
1818        let config = HttpEndpointConfig::from_uri(
1819            "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
1820        )
1821        .unwrap();
1822
1823        // apiKey and token are NOT Camel options, should be forwarded
1824        assert!(
1825            config.query_params.contains_key("apiKey"),
1826            "apiKey should be preserved"
1827        );
1828        assert!(
1829            config.query_params.contains_key("token"),
1830            "token should be preserved"
1831        );
1832        assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
1833        assert_eq!(config.query_params.get("token").unwrap(), "abc456");
1834
1835        // httpMethod IS a Camel option, should NOT be in query_params
1836        assert!(
1837            !config.query_params.contains_key("httpMethod"),
1838            "httpMethod should not be forwarded"
1839        );
1840    }
1841
1842    // -----------------------------------------------------------------------
1843    // SSRF Protection tests
1844    // -----------------------------------------------------------------------
1845
1846    #[tokio::test]
1847    async fn test_http_producer_blocks_metadata_endpoint() {
1848        use tower::ServiceExt;
1849
1850        let ctx = test_producer_ctx();
1851        let component = HttpComponent::new();
1852        let endpoint = component
1853            .create_endpoint("http://example.com/api?allowPrivateIps=false")
1854            .unwrap();
1855        let producer = endpoint.create_producer(&ctx).unwrap();
1856
1857        let mut exchange = Exchange::new(Message::default());
1858        exchange.input.set_header(
1859            "CamelHttpUri",
1860            serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
1861        );
1862
1863        let result = producer.oneshot(exchange).await;
1864        assert!(result.is_err(), "Should block AWS metadata endpoint");
1865
1866        let err = result.unwrap_err();
1867        assert!(
1868            err.to_string().contains("Private IP"),
1869            "Error should mention private IP blocking, got: {}",
1870            err
1871        );
1872    }
1873
1874    #[test]
1875    fn test_ssrf_config_defaults() {
1876        let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
1877        assert!(
1878            !config.allow_private_ips,
1879            "Private IPs should be blocked by default"
1880        );
1881        assert!(
1882            config.blocked_hosts.is_empty(),
1883            "Blocked hosts should be empty by default"
1884        );
1885    }
1886
1887    #[test]
1888    fn test_ssrf_config_allow_private_ips() {
1889        let config =
1890            HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
1891        assert!(
1892            config.allow_private_ips,
1893            "Private IPs should be allowed when explicitly set"
1894        );
1895    }
1896
1897    #[test]
1898    fn test_ssrf_config_blocked_hosts() {
1899        let config = HttpEndpointConfig::from_uri(
1900            "http://example.com/api?blockedHosts=evil.com,malware.net",
1901        )
1902        .unwrap();
1903        assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
1904    }
1905
1906    #[tokio::test]
1907    async fn test_http_producer_blocks_localhost() {
1908        use tower::ServiceExt;
1909
1910        let ctx = test_producer_ctx();
1911        let component = HttpComponent::new();
1912        let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1913        let producer = endpoint.create_producer(&ctx).unwrap();
1914
1915        let mut exchange = Exchange::new(Message::default());
1916        exchange.input.set_header(
1917            "CamelHttpUri",
1918            serde_json::Value::String("http://localhost:8080/internal".to_string()),
1919        );
1920
1921        let result = producer.oneshot(exchange).await;
1922        assert!(result.is_err(), "Should block localhost");
1923    }
1924
1925    #[tokio::test]
1926    async fn test_http_producer_blocks_loopback_ip() {
1927        use tower::ServiceExt;
1928
1929        let ctx = test_producer_ctx();
1930        let component = HttpComponent::new();
1931        let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1932        let producer = endpoint.create_producer(&ctx).unwrap();
1933
1934        let mut exchange = Exchange::new(Message::default());
1935        exchange.input.set_header(
1936            "CamelHttpUri",
1937            serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
1938        );
1939
1940        let result = producer.oneshot(exchange).await;
1941        assert!(result.is_err(), "Should block loopback IP");
1942    }
1943
1944    #[tokio::test]
1945    async fn test_http_producer_allows_private_ip_when_enabled() {
1946        use tower::ServiceExt;
1947
1948        let ctx = test_producer_ctx();
1949        let component = HttpComponent::new();
1950        // With allowPrivateIps=true, the validation should pass
1951        // (actual connection will fail, but that's expected)
1952        let endpoint = component
1953            .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true")
1954            .unwrap();
1955        let producer = endpoint.create_producer(&ctx).unwrap();
1956
1957        let exchange = Exchange::new(Message::default());
1958
1959        // The request will fail because we can't connect, but it should NOT fail
1960        // due to SSRF protection
1961        let result = producer.oneshot(exchange).await;
1962        // We expect connection error, not SSRF error
1963        if let Err(ref e) = result {
1964            let err_str = e.to_string();
1965            assert!(
1966                !err_str.contains("Private IP") && !err_str.contains("not allowed"),
1967                "Should not be SSRF error, got: {}",
1968                err_str
1969            );
1970        }
1971    }
1972
1973    // -----------------------------------------------------------------------
1974    // HttpServerConfig tests
1975    // -----------------------------------------------------------------------
1976
1977    #[test]
1978    fn test_http_server_config_parse() {
1979        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
1980        assert_eq!(cfg.host, "0.0.0.0");
1981        assert_eq!(cfg.port, 8080);
1982        assert_eq!(cfg.path, "/orders");
1983    }
1984
1985    #[test]
1986    fn test_http_server_config_scheme() {
1987        // UriConfig trait method returns "http" as primary scheme
1988        assert_eq!(HttpServerConfig::scheme(), "http");
1989    }
1990
1991    #[test]
1992    fn test_http_server_config_from_components() {
1993        // Test from_components directly (trait method)
1994        let components = camel_component_api::UriComponents {
1995            scheme: "https".to_string(),
1996            path: "//0.0.0.0:8443/api".to_string(),
1997            params: std::collections::HashMap::from([(
1998                "maxRequestBody".to_string(),
1999                "5242880".to_string(),
2000            )]),
2001        };
2002        let cfg = HttpServerConfig::from_components(components).unwrap();
2003        assert_eq!(cfg.host, "0.0.0.0");
2004        assert_eq!(cfg.port, 8443);
2005        assert_eq!(cfg.path, "/api");
2006        assert_eq!(cfg.max_request_body, 5242880);
2007    }
2008
2009    #[test]
2010    fn test_http_server_config_default_path() {
2011        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
2012        assert_eq!(cfg.path, "/");
2013    }
2014
2015    #[test]
2016    fn test_http_server_config_wrong_scheme() {
2017        assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
2018    }
2019
2020    #[test]
2021    fn test_http_server_config_invalid_port() {
2022        assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
2023    }
2024
2025    #[test]
2026    fn test_http_server_config_default_port_by_scheme() {
2027        // HTTP without explicit port should default to 80
2028        let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
2029        assert_eq!(cfg_http.port, 80);
2030
2031        // HTTPS without explicit port should default to 443
2032        let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
2033        assert_eq!(cfg_https.port, 443);
2034    }
2035
2036    #[test]
2037    fn test_request_envelope_and_reply_are_send() {
2038        fn assert_send<T: Send>() {}
2039        assert_send::<RequestEnvelope>();
2040        assert_send::<HttpReply>();
2041    }
2042
2043    // -----------------------------------------------------------------------
2044    // ServerRegistry tests
2045    // -----------------------------------------------------------------------
2046
2047    #[test]
2048    fn test_server_registry_global_is_singleton() {
2049        let r1 = ServerRegistry::global();
2050        let r2 = ServerRegistry::global();
2051        assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
2052    }
2053
2054    #[tokio::test]
2055    async fn test_concurrent_get_or_spawn_returns_same_dispatch() {
2056        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2057        let port = listener.local_addr().unwrap().port();
2058        drop(listener);
2059
2060        let results: Arc<std::sync::Mutex<Vec<DispatchTable>>> =
2061            Arc::new(std::sync::Mutex::new(Vec::new()));
2062
2063        let mut handles = Vec::new();
2064        for _ in 0..4 {
2065            let results = results.clone();
2066            handles.push(tokio::spawn(async move {
2067                let dispatch = ServerRegistry::global()
2068                    .get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024)
2069                    .await
2070                    .unwrap();
2071                results.lock().unwrap().push(dispatch);
2072            }));
2073        }
2074
2075        for h in handles {
2076            h.await.unwrap();
2077        }
2078
2079        let dispatches = results.lock().unwrap();
2080        assert_eq!(dispatches.len(), 4);
2081        for i in 1..dispatches.len() {
2082            assert!(
2083                Arc::ptr_eq(&dispatches[0], &dispatches[i]),
2084                "all concurrent callers should get the same dispatch table"
2085            );
2086        }
2087    }
2088
2089    // -----------------------------------------------------------------------
2090    // Axum dispatch handler tests
2091    // -----------------------------------------------------------------------
2092
2093    #[tokio::test]
2094    async fn test_dispatch_handler_returns_404_for_unknown_path() {
2095        let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
2096        // Nothing registered in the dispatch table
2097        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2098        let port = listener.local_addr().unwrap().port();
2099        tokio::spawn(run_axum_server(listener, dispatch, 2 * 1024 * 1024));
2100
2101        // Wait for server to start
2102        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2103
2104        let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
2105            .await
2106            .unwrap();
2107        assert_eq!(resp.status().as_u16(), 404);
2108    }
2109
2110    // -----------------------------------------------------------------------
2111    // HttpConsumer tests
2112    // -----------------------------------------------------------------------
2113
2114    #[tokio::test]
2115    async fn test_http_consumer_start_registers_path() {
2116        use camel_component_api::ConsumerContext;
2117
2118        // Get an OS-assigned free port
2119        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2120        let port = listener.local_addr().unwrap().port();
2121        drop(listener); // Release port — ServerRegistry will rebind it
2122
2123        let consumer_cfg = HttpServerConfig {
2124            host: "127.0.0.1".to_string(),
2125            port,
2126            path: "/ping".to_string(),
2127            max_request_body: 2 * 1024 * 1024,
2128            max_response_body: 10 * 1024 * 1024,
2129        };
2130        let mut consumer = HttpConsumer::new(consumer_cfg);
2131
2132        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
2133        let token = tokio_util::sync::CancellationToken::new();
2134        let ctx = ConsumerContext::new(tx, token.clone());
2135
2136        tokio::spawn(async move {
2137            consumer.start(ctx).await.unwrap();
2138        });
2139
2140        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2141
2142        let client = reqwest::Client::new();
2143        let resp_future = client
2144            .post(format!("http://127.0.0.1:{port}/ping"))
2145            .body("hello world")
2146            .send();
2147
2148        let (http_result, _) = tokio::join!(resp_future, async {
2149            if let Some(mut envelope) = rx.recv().await {
2150                // Set a custom status code
2151                envelope.exchange.input.set_header(
2152                    "CamelHttpResponseCode",
2153                    serde_json::Value::Number(201.into()),
2154                );
2155                if let Some(reply_tx) = envelope.reply_tx {
2156                    let _ = reply_tx.send(Ok(envelope.exchange));
2157                }
2158            }
2159        });
2160
2161        let resp = http_result.unwrap();
2162        assert_eq!(resp.status().as_u16(), 201);
2163
2164        token.cancel();
2165    }
2166
2167    // -----------------------------------------------------------------------
2168    // Integration tests
2169    // -----------------------------------------------------------------------
2170
2171    #[tokio::test]
2172    async fn test_integration_single_consumer_round_trip() {
2173        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2174
2175        // Get an OS-assigned free port (ephemeral)
2176        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2177        let port = listener.local_addr().unwrap().port();
2178        drop(listener); // Release — ServerRegistry will rebind
2179
2180        let component = HttpComponent::new();
2181        let endpoint = component
2182            .create_endpoint(&format!("http://127.0.0.1:{port}/echo"))
2183            .unwrap();
2184        let mut consumer = endpoint.create_consumer().unwrap();
2185
2186        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2187        let token = tokio_util::sync::CancellationToken::new();
2188        let ctx = ConsumerContext::new(tx, token.clone());
2189
2190        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2191        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2192
2193        let client = reqwest::Client::new();
2194        let send_fut = client
2195            .post(format!("http://127.0.0.1:{port}/echo"))
2196            .header("Content-Type", "text/plain")
2197            .body("ping")
2198            .send();
2199
2200        let (http_result, _) = tokio::join!(send_fut, async {
2201            if let Some(mut envelope) = rx.recv().await {
2202                assert_eq!(
2203                    envelope.exchange.input.header("CamelHttpMethod"),
2204                    Some(&serde_json::Value::String("POST".into()))
2205                );
2206                assert_eq!(
2207                    envelope.exchange.input.header("CamelHttpPath"),
2208                    Some(&serde_json::Value::String("/echo".into()))
2209                );
2210                envelope.exchange.input.body = camel_component_api::Body::Text("pong".to_string());
2211                if let Some(reply_tx) = envelope.reply_tx {
2212                    let _ = reply_tx.send(Ok(envelope.exchange));
2213                }
2214            }
2215        });
2216
2217        let resp = http_result.unwrap();
2218        assert_eq!(resp.status().as_u16(), 200);
2219        let body = resp.text().await.unwrap();
2220        assert_eq!(body, "pong");
2221
2222        token.cancel();
2223    }
2224
2225    #[tokio::test]
2226    async fn test_integration_two_consumers_shared_port() {
2227        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2228
2229        // Get an OS-assigned free port (ephemeral)
2230        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2231        let port = listener.local_addr().unwrap().port();
2232        drop(listener);
2233
2234        let component = HttpComponent::new();
2235
2236        // Consumer A: /hello
2237        let endpoint_a = component
2238            .create_endpoint(&format!("http://127.0.0.1:{port}/hello"))
2239            .unwrap();
2240        let mut consumer_a = endpoint_a.create_consumer().unwrap();
2241
2242        // Consumer B: /world
2243        let endpoint_b = component
2244            .create_endpoint(&format!("http://127.0.0.1:{port}/world"))
2245            .unwrap();
2246        let mut consumer_b = endpoint_b.create_consumer().unwrap();
2247
2248        let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2249        let token_a = tokio_util::sync::CancellationToken::new();
2250        let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
2251
2252        let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2253        let token_b = tokio_util::sync::CancellationToken::new();
2254        let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
2255
2256        tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
2257        tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
2258        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2259
2260        let client = reqwest::Client::new();
2261
2262        // Request to /hello
2263        let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
2264        let (resp_hello, _) = tokio::join!(fut_hello, async {
2265            if let Some(mut envelope) = rx_a.recv().await {
2266                envelope.exchange.input.body =
2267                    camel_component_api::Body::Text("hello-response".to_string());
2268                if let Some(reply_tx) = envelope.reply_tx {
2269                    let _ = reply_tx.send(Ok(envelope.exchange));
2270                }
2271            }
2272        });
2273
2274        // Request to /world
2275        let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
2276        let (resp_world, _) = tokio::join!(fut_world, async {
2277            if let Some(mut envelope) = rx_b.recv().await {
2278                envelope.exchange.input.body =
2279                    camel_component_api::Body::Text("world-response".to_string());
2280                if let Some(reply_tx) = envelope.reply_tx {
2281                    let _ = reply_tx.send(Ok(envelope.exchange));
2282                }
2283            }
2284        });
2285
2286        let body_a = resp_hello.unwrap().text().await.unwrap();
2287        let body_b = resp_world.unwrap().text().await.unwrap();
2288
2289        assert_eq!(body_a, "hello-response");
2290        assert_eq!(body_b, "world-response");
2291
2292        token_a.cancel();
2293        token_b.cancel();
2294    }
2295
2296    #[tokio::test]
2297    async fn test_integration_unregistered_path_returns_404() {
2298        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2299
2300        // Get an OS-assigned free port (ephemeral)
2301        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2302        let port = listener.local_addr().unwrap().port();
2303        drop(listener);
2304
2305        let component = HttpComponent::new();
2306        let endpoint = component
2307            .create_endpoint(&format!("http://127.0.0.1:{port}/registered"))
2308            .unwrap();
2309        let mut consumer = endpoint.create_consumer().unwrap();
2310
2311        let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2312        let token = tokio_util::sync::CancellationToken::new();
2313        let ctx = ConsumerContext::new(tx, token.clone());
2314
2315        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2316        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2317
2318        let client = reqwest::Client::new();
2319        let resp = client
2320            .get(format!("http://127.0.0.1:{port}/not-there"))
2321            .send()
2322            .await
2323            .unwrap();
2324        assert_eq!(resp.status().as_u16(), 404);
2325
2326        token.cancel();
2327    }
2328
2329    #[test]
2330    fn test_http_consumer_declares_concurrent() {
2331        use camel_component_api::ConcurrencyModel;
2332
2333        let config = HttpServerConfig {
2334            host: "127.0.0.1".to_string(),
2335            port: 19999,
2336            path: "/test".to_string(),
2337            max_request_body: 2 * 1024 * 1024,
2338            max_response_body: 10 * 1024 * 1024,
2339        };
2340        let consumer = HttpConsumer::new(config);
2341        assert_eq!(
2342            consumer.concurrency_model(),
2343            ConcurrencyModel::Concurrent { max: None }
2344        );
2345    }
2346
2347    // -----------------------------------------------------------------------
2348    // HttpReplyBody streaming tests
2349    // -----------------------------------------------------------------------
2350
2351    #[tokio::test]
2352    async fn test_http_reply_body_stream_variant_exists() {
2353        use bytes::Bytes;
2354        use camel_component_api::CamelError;
2355        use futures::stream;
2356
2357        let chunks: Vec<Result<Bytes, CamelError>> =
2358            vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
2359        let stream = Box::pin(stream::iter(chunks));
2360        let reply_body = HttpReplyBody::Stream(stream);
2361        // Si compila y el match funciona, el test pasa
2362        match reply_body {
2363            HttpReplyBody::Stream(_) => {}
2364            HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
2365        }
2366    }
2367
2368    // -----------------------------------------------------------------------
2369    // OpenTelemetry propagation tests (only compiled with "otel" feature)
2370    // -----------------------------------------------------------------------
2371
2372    #[cfg(feature = "otel")]
2373    mod otel_tests {
2374        use super::*;
2375        use camel_component_api::Message;
2376        use tower::ServiceExt;
2377
2378        #[tokio::test]
2379        async fn test_producer_injects_traceparent_header() {
2380            let (url, _handle) = start_test_server_with_header_capture().await;
2381            let ctx = test_producer_ctx();
2382
2383            let component = HttpComponent::new();
2384            let endpoint = component
2385                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
2386                .unwrap();
2387            let producer = endpoint.create_producer(&ctx).unwrap();
2388
2389            // Create exchange with an OTel context by extracting from a traceparent header
2390            let mut exchange = Exchange::new(Message::default());
2391            let mut headers = std::collections::HashMap::new();
2392            headers.insert(
2393                "traceparent".to_string(),
2394                "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
2395            );
2396            camel_otel::extract_into_exchange(&mut exchange, &headers);
2397
2398            let result = producer.oneshot(exchange).await.unwrap();
2399
2400            // Verify request succeeded
2401            let status = result
2402                .input
2403                .header("CamelHttpResponseCode")
2404                .and_then(|v| v.as_u64())
2405                .unwrap();
2406            assert_eq!(status, 200);
2407
2408            // The test server echoes back the received traceparent header
2409            let traceparent = result.input.header("x-received-traceparent");
2410            assert!(
2411                traceparent.is_some(),
2412                "traceparent header should have been sent"
2413            );
2414
2415            let traceparent_str = traceparent.unwrap().as_str().unwrap();
2416            // Verify format: version-traceid-spanid-flags
2417            let parts: Vec<&str> = traceparent_str.split('-').collect();
2418            assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2419            assert_eq!(parts[0], "00", "version should be 00");
2420            assert_eq!(
2421                parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2422                "trace-id should match"
2423            );
2424            assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
2425            assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
2426        }
2427
2428        #[tokio::test]
2429        async fn test_consumer_extracts_traceparent_header() {
2430            use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2431
2432            // Get an OS-assigned free port
2433            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2434            let port = listener.local_addr().unwrap().port();
2435            drop(listener);
2436
2437            let component = HttpComponent::new();
2438            let endpoint = component
2439                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"))
2440                .unwrap();
2441            let mut consumer = endpoint.create_consumer().unwrap();
2442
2443            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2444            let token = tokio_util::sync::CancellationToken::new();
2445            let ctx = ConsumerContext::new(tx, token.clone());
2446
2447            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2448            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2449
2450            // Send request with traceparent header
2451            let client = reqwest::Client::new();
2452            let send_fut = client
2453                .post(format!("http://127.0.0.1:{port}/trace"))
2454                .header(
2455                    "traceparent",
2456                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2457                )
2458                .body("test")
2459                .send();
2460
2461            let (http_result, _) = tokio::join!(send_fut, async {
2462                if let Some(envelope) = rx.recv().await {
2463                    // Verify the exchange has a valid OTel context by re-injecting it
2464                    // and checking the traceparent matches
2465                    let mut injected_headers = std::collections::HashMap::new();
2466                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2467
2468                    assert!(
2469                        injected_headers.contains_key("traceparent"),
2470                        "Exchange should have traceparent after extraction"
2471                    );
2472
2473                    let traceparent = injected_headers.get("traceparent").unwrap();
2474                    let parts: Vec<&str> = traceparent.split('-').collect();
2475                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2476                    assert_eq!(
2477                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2478                        "Trace ID should match the original traceparent header"
2479                    );
2480
2481                    if let Some(reply_tx) = envelope.reply_tx {
2482                        let _ = reply_tx.send(Ok(envelope.exchange));
2483                    }
2484                }
2485            });
2486
2487            let resp = http_result.unwrap();
2488            assert_eq!(resp.status().as_u16(), 200);
2489
2490            token.cancel();
2491        }
2492
2493        #[tokio::test]
2494        async fn test_consumer_extracts_mixed_case_traceparent_header() {
2495            use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2496
2497            // Get an OS-assigned free port
2498            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2499            let port = listener.local_addr().unwrap().port();
2500            drop(listener);
2501
2502            let component = HttpComponent::new();
2503            let endpoint = component
2504                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"))
2505                .unwrap();
2506            let mut consumer = endpoint.create_consumer().unwrap();
2507
2508            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2509            let token = tokio_util::sync::CancellationToken::new();
2510            let ctx = ConsumerContext::new(tx, token.clone());
2511
2512            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2513            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2514
2515            // Send request with MIXED-CASE TraceParent header (not lowercase)
2516            let client = reqwest::Client::new();
2517            let send_fut = client
2518                .post(format!("http://127.0.0.1:{port}/trace"))
2519                .header(
2520                    "TraceParent",
2521                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2522                )
2523                .body("test")
2524                .send();
2525
2526            let (http_result, _) = tokio::join!(send_fut, async {
2527                if let Some(envelope) = rx.recv().await {
2528                    // Verify the exchange has a valid OTel context by re-injecting it
2529                    // and checking the traceparent matches
2530                    let mut injected_headers = HashMap::new();
2531                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2532
2533                    assert!(
2534                        injected_headers.contains_key("traceparent"),
2535                        "Exchange should have traceparent after extraction from mixed-case header"
2536                    );
2537
2538                    let traceparent = injected_headers.get("traceparent").unwrap();
2539                    let parts: Vec<&str> = traceparent.split('-').collect();
2540                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2541                    assert_eq!(
2542                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2543                        "Trace ID should match the original mixed-case TraceParent header"
2544                    );
2545
2546                    if let Some(reply_tx) = envelope.reply_tx {
2547                        let _ = reply_tx.send(Ok(envelope.exchange));
2548                    }
2549                }
2550            });
2551
2552            let resp = http_result.unwrap();
2553            assert_eq!(resp.status().as_u16(), 200);
2554
2555            token.cancel();
2556        }
2557
2558        #[tokio::test]
2559        async fn test_producer_no_trace_context_no_crash() {
2560            let (url, _handle) = start_test_server().await;
2561            let ctx = test_producer_ctx();
2562
2563            let component = HttpComponent::new();
2564            let endpoint = component
2565                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
2566                .unwrap();
2567            let producer = endpoint.create_producer(&ctx).unwrap();
2568
2569            // Create exchange with default (empty) otel_context - no trace context
2570            let exchange = Exchange::new(Message::default());
2571
2572            // Should succeed without panic
2573            let result = producer.oneshot(exchange).await.unwrap();
2574
2575            // Verify request succeeded
2576            let status = result
2577                .input
2578                .header("CamelHttpResponseCode")
2579                .and_then(|v| v.as_u64())
2580                .unwrap();
2581            assert_eq!(status, 200);
2582        }
2583
2584        /// Test server that captures and echoes back the traceparent header
2585        async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
2586            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2587            let addr = listener.local_addr().unwrap();
2588            let url = format!("http://127.0.0.1:{}", addr.port());
2589
2590            let handle = tokio::spawn(async move {
2591                loop {
2592                    if let Ok((mut stream, _)) = listener.accept().await {
2593                        tokio::spawn(async move {
2594                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
2595                            let mut buf = vec![0u8; 8192];
2596                            let n = stream.read(&mut buf).await.unwrap_or(0);
2597                            let request = String::from_utf8_lossy(&buf[..n]).to_string();
2598
2599                            // Extract traceparent header from request
2600                            let traceparent = request
2601                                .lines()
2602                                .find(|line| line.to_lowercase().starts_with("traceparent:"))
2603                                .map(|line| {
2604                                    line.split(':')
2605                                        .nth(1)
2606                                        .map(|s| s.trim().to_string())
2607                                        .unwrap_or_default()
2608                                })
2609                                .unwrap_or_default();
2610
2611                            let body =
2612                                format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
2613                            let response = format!(
2614                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
2615                                body.len(),
2616                                traceparent,
2617                                body
2618                            );
2619                            let _ = stream.write_all(response.as_bytes()).await;
2620                        });
2621                    }
2622                }
2623            });
2624
2625            (url, handle)
2626        }
2627    }
2628
2629    // -----------------------------------------------------------------------
2630    // Response streaming tests (Eje A - Task 2)
2631    // -----------------------------------------------------------------------
2632
2633    // -----------------------------------------------------------------------
2634    // Request streaming tests (Eje B - Task 3)
2635    // -----------------------------------------------------------------------
2636
2637    #[tokio::test]
2638    async fn test_request_body_arrives_as_stream() {
2639        use camel_component_api::Body;
2640        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2641
2642        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2643        let port = listener.local_addr().unwrap().port();
2644        drop(listener);
2645
2646        let component = HttpComponent::new();
2647        let endpoint = component
2648            .create_endpoint(&format!("http://127.0.0.1:{port}/upload"))
2649            .unwrap();
2650        let mut consumer = endpoint.create_consumer().unwrap();
2651
2652        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2653        let token = tokio_util::sync::CancellationToken::new();
2654        let ctx = ConsumerContext::new(tx, token.clone());
2655
2656        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2657        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2658
2659        let client = reqwest::Client::new();
2660        let send_fut = client
2661            .post(format!("http://127.0.0.1:{port}/upload"))
2662            .body("hello streaming world")
2663            .send();
2664
2665        let (http_result, _) = tokio::join!(send_fut, async {
2666            if let Some(mut envelope) = rx.recv().await {
2667                // Body must be Body::Stream, not Body::Text or Body::Bytes
2668                assert!(
2669                    matches!(envelope.exchange.input.body, Body::Stream(_)),
2670                    "expected Body::Stream, got discriminant {:?}",
2671                    std::mem::discriminant(&envelope.exchange.input.body)
2672                );
2673                // Materialize to verify content
2674                let bytes = envelope
2675                    .exchange
2676                    .input
2677                    .body
2678                    .into_bytes(1024 * 1024)
2679                    .await
2680                    .unwrap();
2681                assert_eq!(&bytes[..], b"hello streaming world");
2682
2683                envelope.exchange.input.body = camel_component_api::Body::Empty;
2684                if let Some(reply_tx) = envelope.reply_tx {
2685                    let _ = reply_tx.send(Ok(envelope.exchange));
2686                }
2687            }
2688        });
2689
2690        let resp = http_result.unwrap();
2691        assert_eq!(resp.status().as_u16(), 200);
2692
2693        token.cancel();
2694    }
2695
2696    // -----------------------------------------------------------------------
2697    // Response streaming tests (Eje A - Task 2)
2698    // -----------------------------------------------------------------------
2699
2700    #[tokio::test]
2701    async fn test_streaming_response_chunked() {
2702        use bytes::Bytes;
2703        use camel_component_api::Body;
2704        use camel_component_api::CamelError;
2705        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2706        use camel_component_api::{StreamBody, StreamMetadata};
2707        use futures::stream;
2708        use std::sync::Arc;
2709        use tokio::sync::Mutex;
2710
2711        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2712        let port = listener.local_addr().unwrap().port();
2713        drop(listener);
2714
2715        let component = HttpComponent::new();
2716        let endpoint = component
2717            .create_endpoint(&format!("http://127.0.0.1:{port}/stream"))
2718            .unwrap();
2719        let mut consumer = endpoint.create_consumer().unwrap();
2720
2721        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2722        let token = tokio_util::sync::CancellationToken::new();
2723        let ctx = ConsumerContext::new(tx, token.clone());
2724
2725        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2726        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2727
2728        let client = reqwest::Client::new();
2729        let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
2730
2731        let (http_result, _) = tokio::join!(send_fut, async {
2732            if let Some(mut envelope) = rx.recv().await {
2733                // Respond with Body::Stream
2734                let chunks: Vec<Result<Bytes, CamelError>> =
2735                    vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
2736                let stream = Box::pin(stream::iter(chunks));
2737                envelope.exchange.input.body = Body::Stream(StreamBody {
2738                    stream: Arc::new(Mutex::new(Some(stream))),
2739                    metadata: StreamMetadata::default(),
2740                });
2741                if let Some(reply_tx) = envelope.reply_tx {
2742                    let _ = reply_tx.send(Ok(envelope.exchange));
2743                }
2744            }
2745        });
2746
2747        let resp = http_result.unwrap();
2748        assert_eq!(resp.status().as_u16(), 200);
2749        let body = resp.text().await.unwrap();
2750        assert_eq!(body, "chunk1chunk2");
2751
2752        token.cancel();
2753    }
2754
2755    // -----------------------------------------------------------------------
2756    // 413 Content-Length limit test (Task 4)
2757    // -----------------------------------------------------------------------
2758
2759    #[tokio::test]
2760    async fn test_413_when_content_length_exceeds_limit() {
2761        use camel_component_api::ConsumerContext;
2762
2763        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2764        let port = listener.local_addr().unwrap().port();
2765        drop(listener);
2766
2767        // maxRequestBody=100 — any request declaring more than 100 bytes must get 413
2768        let component = HttpComponent::new();
2769        let endpoint = component
2770            .create_endpoint(&format!(
2771                "http://127.0.0.1:{port}/upload?maxRequestBody=100"
2772            ))
2773            .unwrap();
2774        let mut consumer = endpoint.create_consumer().unwrap();
2775
2776        let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
2777        let token = tokio_util::sync::CancellationToken::new();
2778        let ctx = ConsumerContext::new(tx, token.clone());
2779
2780        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2781        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2782
2783        let client = reqwest::Client::new();
2784        let resp = client
2785            .post(format!("http://127.0.0.1:{port}/upload"))
2786            .header("Content-Length", "1000") // declares 1000 bytes, limit is 100
2787            .body("x".repeat(1000))
2788            .send()
2789            .await
2790            .unwrap();
2791
2792        assert_eq!(resp.status().as_u16(), 413);
2793
2794        token.cancel();
2795    }
2796
2797    /// Chunked upload without Content-Length header must NOT be rejected by maxRequestBody.
2798    /// The spec says: "If there is no Content-Length, the limit does not apply at the
2799    /// consumer level — the route is responsible."
2800    #[tokio::test]
2801    async fn test_chunked_upload_without_content_length_bypasses_limit() {
2802        use bytes::Bytes;
2803        use camel_component_api::Body;
2804        use camel_component_api::ConsumerContext;
2805        use futures::stream;
2806
2807        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2808        let port = listener.local_addr().unwrap().port();
2809        drop(listener);
2810
2811        // maxRequestBody=10 — very small limit; chunked uploads have no Content-Length
2812        let component = HttpComponent::new();
2813        let endpoint = component
2814            .create_endpoint(&format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"))
2815            .unwrap();
2816        let mut consumer = endpoint.create_consumer().unwrap();
2817
2818        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
2819        let token = tokio_util::sync::CancellationToken::new();
2820        let ctx = ConsumerContext::new(tx, token.clone());
2821
2822        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2823        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2824
2825        let client = reqwest::Client::new();
2826
2827        // Use wrap_stream so reqwest sends chunked transfer encoding WITHOUT a
2828        // Content-Length header. 100 bytes exceeds the 10-byte maxRequestBody limit,
2829        // but since there's no Content-Length the 413 check must NOT fire.
2830        let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
2831            Ok(Bytes::from("y".repeat(50))),
2832            Ok(Bytes::from("y".repeat(50))),
2833        ];
2834        let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
2835        let send_fut = client
2836            .post(format!("http://127.0.0.1:{port}/upload"))
2837            .body(stream_body)
2838            .send();
2839
2840        let consumer_fut = async {
2841            // Use timeout to avoid deadlock if the handler rejects before enqueueing
2842            match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
2843                Ok(Some(mut envelope)) => {
2844                    assert!(
2845                        matches!(envelope.exchange.input.body, Body::Stream(_)),
2846                        "expected Body::Stream"
2847                    );
2848                    envelope.exchange.input.body = camel_component_api::Body::Empty;
2849                    if let Some(reply_tx) = envelope.reply_tx {
2850                        let _ = reply_tx.send(Ok(envelope.exchange));
2851                    }
2852                }
2853                Ok(None) => panic!("consumer channel closed unexpectedly"),
2854                Err(_) => {
2855                    // Timeout: the request was rejected before reaching the consumer.
2856                    // The HTTP response will carry the real status code (we check below).
2857                }
2858            }
2859        };
2860
2861        let (http_result, _) = tokio::join!(send_fut, consumer_fut);
2862
2863        let resp = http_result.unwrap();
2864        // Must NOT be 413; chunked uploads without Content-Length bypass the limit.
2865        assert_ne!(
2866            resp.status().as_u16(),
2867            413,
2868            "chunked upload must not be rejected by maxRequestBody"
2869        );
2870        assert_eq!(resp.status().as_u16(), 200);
2871
2872        token.cancel();
2873    }
2874}