Skip to main content

camel_component_http/
lib.rs

1pub mod bundle;
2pub mod config;
3pub use bundle::HttpBundle;
4pub use config::HttpConfig;
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex, OnceLock};
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use tokio::sync::{OnceCell, RwLock};
14use tower::Service;
15use tracing::debug;
16
17use axum::body::BodyDataStream;
18use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, StreamBody, StreamMetadata};
19use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
20use camel_component_api::{UriComponents, UriConfig, parse_uri};
21use futures::TryStreamExt;
22use futures::stream::BoxStream;
23
24// ---------------------------------------------------------------------------
25// HttpEndpointConfig
26// ---------------------------------------------------------------------------
27
28/// Configuration for an HTTP client (producer) endpoint.
29///
30/// # Memory Limits
31///
32/// HTTP operations enforce conservative memory limits to prevent denial-of-service
33/// attacks from untrusted network sources. These limits are significantly lower than
34/// file component limits (100MB) because HTTP typically handles API responses rather
35/// than large file transfers, and clients may be untrusted.
36///
37/// ## Default Limits
38///
39/// - **HTTP client body**: 10MB (typical API responses)
40/// - **HTTP server request**: 2MB (untrusted network input - see `HttpServerConfig`)
41/// - **HTTP server response**: 10MB (same as client - see `HttpServerConfig`)
42///
43/// ## Rationale
44///
45/// The 10MB limit for HTTP client responses is appropriate for most API interactions
46/// while providing protection against:
47/// - Malicious servers sending oversized responses
48/// - Runaway processes generating unexpectedly large payloads
49/// - Memory exhaustion attacks
50///
51/// The 2MB server request limit is even more conservative because it handles input
52/// from potentially untrusted clients on the public internet.
53///
54/// ## Overriding Limits
55///
56/// Override the default client body limit using the `maxBodySize` URI parameter:
57///
58/// ```text
59/// http://api.example.com/large-data?maxBodySize=52428800
60/// ```
61///
62/// For server endpoints, use `maxRequestBody` and `maxResponseBody` parameters:
63///
64/// ```text
65/// http://0.0.0.0:8080/upload?maxRequestBody=52428800
66/// ```
67///
68/// ## Behavior When Exceeded
69///
70/// When a body exceeds the configured limit:
71/// - An error is returned immediately
72/// - No memory is exhausted - the limit is checked before allocation
73/// - The HTTP connection is terminated cleanly
74///
75/// ## Security Considerations
76///
77/// HTTP endpoints should be treated with more caution than file endpoints because:
78/// - Clients may be unknown and untrusted
79/// - Network traffic can be spoofed or malicious
80/// - DoS attacks often exploit unbounded resource consumption
81///
82/// Only increase limits when you control both ends of the connection or when
83/// business requirements demand larger payloads.
84#[derive(Debug, Clone)]
85pub struct HttpEndpointConfig {
86    pub base_url: String,
87    pub http_method: Option<String>,
88    pub throw_exception_on_failure: bool,
89    pub ok_status_code_range: (u16, u16),
90    pub response_timeout: Option<Duration>,
91    pub query_params: HashMap<String, String>,
92    pub allow_private_ips: bool,
93    pub blocked_hosts: Vec<String>,
94    pub max_body_size: usize,
95}
96
97/// Camel options that should NOT be forwarded as HTTP query params
98const HTTP_CAMEL_OPTIONS: &[&str] = &[
99    "httpMethod",
100    "throwExceptionOnFailure",
101    "okStatusCodeRange",
102    "followRedirects",
103    "connectTimeout",
104    "responseTimeout",
105    "allowPrivateIps",
106    "blockedHosts",
107    "maxBodySize",
108];
109
110impl UriConfig for HttpEndpointConfig {
111    /// Returns "http" as the primary scheme (also accepts "https")
112    fn scheme() -> &'static str {
113        "http"
114    }
115
116    fn from_uri(uri: &str) -> Result<Self, CamelError> {
117        let parts = parse_uri(uri)?;
118        Self::from_components(parts)
119    }
120
121    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
122        // Validate scheme - accept both http and https
123        if parts.scheme != "http" && parts.scheme != "https" {
124            return Err(CamelError::InvalidUri(format!(
125                "expected scheme 'http' or 'https', got '{}'",
126                parts.scheme
127            )));
128        }
129
130        // Construct base_url from scheme + path
131        // e.g., "http://localhost:8080/api" from scheme "http" and path "//localhost:8080/api"
132        let base_url = format!("{}:{}", parts.scheme, parts.path);
133
134        let http_method = parts.params.get("httpMethod").cloned();
135
136        let throw_exception_on_failure = parts
137            .params
138            .get("throwExceptionOnFailure")
139            .map(|v| v != "false")
140            .unwrap_or(true);
141
142        // Parse status code range from "start-end" format (e.g., "200-299")
143        let ok_status_code_range = parts
144            .params
145            .get("okStatusCodeRange")
146            .and_then(|v| {
147                let (start, end) = v.split_once('-')?;
148                Some((start.parse::<u16>().ok()?, end.parse::<u16>().ok()?))
149            })
150            .unwrap_or((200, 299));
151
152        let response_timeout = parts
153            .params
154            .get("responseTimeout")
155            .and_then(|v| v.parse::<u64>().ok())
156            .map(Duration::from_millis);
157
158        // SSRF protection settings
159        let allow_private_ips = parts
160            .params
161            .get("allowPrivateIps")
162            .map(|v| v == "true")
163            .unwrap_or(false); // Default: block private IPs
164
165        // Parse comma-separated blocked hosts
166        let blocked_hosts = parts
167            .params
168            .get("blockedHosts")
169            .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
170            .unwrap_or_default();
171
172        let max_body_size = parts
173            .params
174            .get("maxBodySize")
175            .and_then(|v| v.parse::<usize>().ok())
176            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
177
178        // Collect remaining params (not Camel options) as query params
179        let query_params: HashMap<String, String> = parts
180            .params
181            .into_iter()
182            .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
183            .collect();
184
185        Ok(Self {
186            base_url,
187            http_method,
188            throw_exception_on_failure,
189            ok_status_code_range,
190            response_timeout,
191            query_params,
192            allow_private_ips,
193            blocked_hosts,
194            max_body_size,
195        })
196    }
197}
198
199impl HttpEndpointConfig {
200    pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
201        let parts = parse_uri(uri)?;
202        let mut endpoint = Self::from_components(parts.clone())?;
203        if endpoint.response_timeout.is_none() {
204            endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
205        }
206        if !parts.params.contains_key("allowPrivateIps") {
207            endpoint.allow_private_ips = config.allow_private_ips;
208        }
209        if !parts.params.contains_key("blockedHosts") {
210            endpoint.blocked_hosts = config.blocked_hosts.clone();
211        }
212        if !parts.params.contains_key("maxBodySize") {
213            endpoint.max_body_size = config.max_body_size;
214        }
215        Ok(endpoint)
216    }
217}
218
219// ---------------------------------------------------------------------------
220// HttpServerConfig
221// ---------------------------------------------------------------------------
222
223/// Configuration for an HTTP server (consumer) endpoint.
224#[derive(Debug, Clone)]
225pub struct HttpServerConfig {
226    /// Bind address, e.g. "0.0.0.0" or "127.0.0.1".
227    pub host: String,
228    /// TCP port to listen on.
229    pub port: u16,
230    /// URL path this consumer handles, e.g. "/orders".
231    pub path: String,
232    /// Maximum request body size in bytes.
233    pub max_request_body: usize,
234    /// Maximum response body size for materializing streams in bytes.
235    pub max_response_body: usize,
236}
237
238impl UriConfig for HttpServerConfig {
239    /// Returns "http" as the primary scheme (also accepts "https")
240    fn scheme() -> &'static str {
241        "http"
242    }
243
244    fn from_uri(uri: &str) -> Result<Self, CamelError> {
245        let parts = parse_uri(uri)?;
246        Self::from_components(parts)
247    }
248
249    fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
250        // Validate scheme - accept both http and https
251        if parts.scheme != "http" && parts.scheme != "https" {
252            return Err(CamelError::InvalidUri(format!(
253                "expected scheme 'http' or 'https', got '{}'",
254                parts.scheme
255            )));
256        }
257
258        // parts.path is everything after the scheme colon, e.g. "//0.0.0.0:8080/orders"
259        // Strip leading "//"
260        let authority_and_path = parts.path.trim_start_matches('/');
261
262        // Split on the first "/" to separate "host:port" from "/path"
263        let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
264            (&authority_and_path[..idx], &authority_and_path[idx..])
265        } else {
266            (authority_and_path, "/")
267        };
268
269        let path = if path_suffix.is_empty() {
270            "/"
271        } else {
272            path_suffix
273        }
274        .to_string();
275
276        // Parse host:port from authority
277        let (host, port) = if let Some(colon) = authority.rfind(':') {
278            let port_str = &authority[colon + 1..];
279            match port_str.parse::<u16>() {
280                Ok(p) => (authority[..colon].to_string(), p),
281                Err(_) => {
282                    return Err(CamelError::InvalidUri(format!(
283                        "invalid port '{}' in authority",
284                        port_str
285                    )));
286                }
287            }
288        } else {
289            // Default port based on scheme: 443 for https, 80 for http
290            let default_port = if parts.scheme == "https" { 443 } else { 80 };
291            (authority.to_string(), default_port)
292        };
293
294        let max_request_body = parts
295            .params
296            .get("maxRequestBody")
297            .and_then(|v| v.parse::<usize>().ok())
298            .unwrap_or(2 * 1024 * 1024); // Default: 2MB
299
300        let max_response_body = parts
301            .params
302            .get("maxResponseBody")
303            .and_then(|v| v.parse::<usize>().ok())
304            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
305
306        Ok(Self {
307            host,
308            port,
309            path,
310            max_request_body,
311            max_response_body,
312        })
313    }
314}
315
316impl HttpServerConfig {
317    pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
318        let parts = parse_uri(uri)?;
319        let mut server = Self::from_components(parts.clone())?;
320        if !parts.params.contains_key("maxRequestBody") {
321            server.max_request_body = config.max_request_body;
322        }
323        if !parts.params.contains_key("maxResponseBody") {
324            server.max_response_body = config.max_body_size;
325        }
326        Ok(server)
327    }
328}
329
330// ---------------------------------------------------------------------------
331// RequestEnvelope / HttpReply
332// ---------------------------------------------------------------------------
333
334/// Body de la respuesta HTTP: bytes ya materializados o stream lazy.
335pub(crate) enum HttpReplyBody {
336    Bytes(bytes::Bytes),
337    Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
338}
339
340/// An inbound HTTP request sent from the Axum dispatch handler to an
341/// `HttpConsumer` receive loop.
342pub(crate) struct RequestEnvelope {
343    pub(crate) method: String,
344    pub(crate) path: String,
345    pub(crate) query: String,
346    pub(crate) headers: http::HeaderMap,
347    pub(crate) body: StreamBody,
348    pub(crate) reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
349}
350
351/// The HTTP response that `HttpConsumer` sends back to the Axum handler.
352pub(crate) struct HttpReply {
353    pub(crate) status: u16,
354    pub(crate) headers: Vec<(String, String)>,
355    pub(crate) body: HttpReplyBody,
356}
357
358// ---------------------------------------------------------------------------
359// DispatchTable / ServerRegistry
360// ---------------------------------------------------------------------------
361
362/// Maps URL path → channel sender for the consumer that owns that path.
363pub(crate) type DispatchTable =
364    Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
365
366/// Handle to a running Axum server on one port.
367struct ServerHandle {
368    dispatch: DispatchTable,
369    /// Kept alive so the task isn't dropped; not used directly.
370    _task: tokio::task::JoinHandle<()>,
371}
372
373/// Process-global registry mapping port → running Axum server handle.
374pub struct ServerRegistry {
375    inner: Mutex<HashMap<u16, Arc<OnceCell<ServerHandle>>>>,
376}
377
378impl ServerRegistry {
379    /// Returns the global singleton.
380    pub fn global() -> &'static Self {
381        static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
382        INSTANCE.get_or_init(|| ServerRegistry {
383            inner: Mutex::new(HashMap::new()),
384        })
385    }
386
387    /// Returns the `DispatchTable` for `port`, spawning a new Axum server if
388    /// none is running on that port yet.
389    pub(crate) async fn get_or_spawn(
390        &'static self,
391        host: &str,
392        port: u16,
393        max_request_body: usize,
394    ) -> Result<DispatchTable, CamelError> {
395        let host_owned = host.to_string();
396
397        let cell = {
398            let mut guard = self.inner.lock().map_err(|_| {
399                CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
400            })?;
401            guard
402                .entry(port)
403                .or_insert_with(|| Arc::new(OnceCell::new()))
404                .clone()
405        };
406
407        let handle = cell
408            .get_or_try_init(|| async {
409                let addr = format!("{host_owned}:{port}");
410                let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
411                    CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
412                })?;
413                let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
414                let task = tokio::spawn(run_axum_server(
415                    listener,
416                    Arc::clone(&dispatch),
417                    max_request_body,
418                ));
419                Ok::<ServerHandle, CamelError>(ServerHandle {
420                    dispatch,
421                    _task: task,
422                })
423            })
424            .await?;
425
426        Ok(Arc::clone(&handle.dispatch))
427    }
428}
429
430// ---------------------------------------------------------------------------
431// Axum server
432// ---------------------------------------------------------------------------
433
434use axum::{
435    Router,
436    body::Body as AxumBody,
437    extract::{Request, State},
438    http::{Response, StatusCode},
439    response::IntoResponse,
440};
441
442#[derive(Clone)]
443struct AppState {
444    dispatch: DispatchTable,
445    max_request_body: usize,
446}
447
448async fn run_axum_server(
449    listener: tokio::net::TcpListener,
450    dispatch: DispatchTable,
451    max_request_body: usize,
452) {
453    let state = AppState {
454        dispatch,
455        max_request_body,
456    };
457    let app = Router::new().fallback(dispatch_handler).with_state(state);
458
459    axum::serve(listener, app).await.unwrap_or_else(|e| {
460        tracing::error!(error = %e, "Axum server error");
461    });
462}
463
464async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
465    let method = req.method().to_string();
466    let path = req.uri().path().to_string();
467    let query = req.uri().query().unwrap_or("").to_string();
468    let headers = req.headers().clone();
469
470    // Check Content-Length against limit BEFORE opening the stream
471    let content_length: Option<u64> = headers
472        .get(http::header::CONTENT_LENGTH)
473        .and_then(|v| v.to_str().ok())
474        .and_then(|s| s.parse().ok());
475
476    if let Some(len) = content_length
477        && len > state.max_request_body as u64
478    {
479        return Response::builder()
480            .status(StatusCode::PAYLOAD_TOO_LARGE)
481            .body(AxumBody::from("Request body exceeds configured limit"))
482            .expect("infallible");
483    }
484
485    // Build StreamBody from Axum body WITHOUT materializing
486    let content_type = headers
487        .get(http::header::CONTENT_TYPE)
488        .and_then(|v| v.to_str().ok())
489        .map(|s| s.to_string());
490
491    let data_stream: BodyDataStream = req.into_body().into_data_stream();
492    let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
493    let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
494
495    let stream_body = StreamBody {
496        stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
497        metadata: StreamMetadata {
498            size_hint: content_length,
499            content_type,
500            origin: None,
501        },
502    };
503
504    // Look up handler for this path
505    let sender = {
506        let table = state.dispatch.read().await;
507        table.get(&path).cloned()
508    };
509    let Some(sender) = sender else {
510        return Response::builder()
511            .status(StatusCode::NOT_FOUND)
512            .body(AxumBody::from("No consumer registered for this path"))
513            .expect("infallible");
514    };
515
516    let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
517    let envelope = RequestEnvelope {
518        method,
519        path,
520        query,
521        headers,
522        body: stream_body,
523        reply_tx,
524    };
525
526    if sender.send(envelope).await.is_err() {
527        return Response::builder()
528            .status(StatusCode::SERVICE_UNAVAILABLE)
529            .body(AxumBody::from("Consumer unavailable"))
530            .expect("infallible");
531    }
532
533    match reply_rx.await {
534        Ok(reply) => {
535            let status =
536                StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
537            let mut builder = Response::builder().status(status);
538            for (k, v) in &reply.headers {
539                builder = builder.header(k.as_str(), v.as_str());
540            }
541            match reply.body {
542                HttpReplyBody::Bytes(b) => builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
543                    Response::builder()
544                        .status(StatusCode::INTERNAL_SERVER_ERROR)
545                        .body(AxumBody::from("Invalid response headers from consumer"))
546                        .expect("infallible")
547                }),
548                HttpReplyBody::Stream(stream) => builder
549                    .body(AxumBody::from_stream(stream))
550                    .unwrap_or_else(|_| {
551                        Response::builder()
552                            .status(StatusCode::INTERNAL_SERVER_ERROR)
553                            .body(AxumBody::from("Invalid response headers from consumer"))
554                            .expect("infallible")
555                    }),
556            }
557        }
558        Err(_) => Response::builder()
559            .status(StatusCode::INTERNAL_SERVER_ERROR)
560            .body(AxumBody::from("Pipeline error"))
561            .expect("Response::builder() with a known-valid status code and body is infallible"),
562    }
563}
564
565// ---------------------------------------------------------------------------
566// HttpConsumer
567// ---------------------------------------------------------------------------
568
569pub struct HttpConsumer {
570    config: HttpServerConfig,
571}
572
573impl HttpConsumer {
574    pub fn new(config: HttpServerConfig) -> Self {
575        Self { config }
576    }
577}
578
579#[async_trait::async_trait]
580impl Consumer for HttpConsumer {
581    async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
582        use camel_component_api::{Body, Exchange, Message};
583
584        let dispatch = ServerRegistry::global()
585            .get_or_spawn(
586                &self.config.host,
587                self.config.port,
588                self.config.max_request_body,
589            )
590            .await?;
591
592        // Create a channel for this path and register it
593        let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
594        {
595            let mut table = dispatch.write().await;
596            table.insert(self.config.path.clone(), env_tx);
597        }
598
599        let path = self.config.path.clone();
600        let cancel_token = ctx.cancel_token();
601        let _max_response_body = self.config.max_response_body;
602
603        loop {
604            tokio::select! {
605                _ = ctx.cancelled() => {
606                    break;
607                }
608                envelope = env_rx.recv() => {
609                    let Some(envelope) = envelope else { break; };
610
611                    // Build Exchange from HTTP request
612                    let mut msg = Message::default();
613
614                    // Set standard Camel HTTP headers
615                    msg.set_header("CamelHttpMethod",
616                        serde_json::Value::String(envelope.method.clone()));
617                    msg.set_header("CamelHttpPath",
618                        serde_json::Value::String(envelope.path.clone()));
619                    msg.set_header("CamelHttpQuery",
620                        serde_json::Value::String(envelope.query.clone()));
621
622                    // Forward HTTP headers (skip pseudo-headers)
623                    for (k, v) in &envelope.headers {
624                        if let Ok(val_str) = v.to_str() {
625                            msg.set_header(
626                                k.as_str(),
627                                serde_json::Value::String(val_str.to_string()),
628                            );
629                        }
630                    }
631
632                    // Body: always arrives as Body::Stream (native streaming)
633                    // Routes can call into_bytes() if they need to materialize
634                    msg.body = Body::Stream(envelope.body);
635
636                    #[allow(unused_mut)]
637                    let mut exchange = Exchange::new(msg);
638
639                    // Extract W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
640                    #[cfg(feature = "otel")]
641                    {
642                        let headers: HashMap<String, String> = envelope
643                            .headers
644                            .iter()
645                            .filter_map(|(k, v)| {
646                                Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
647                            })
648                            .collect();
649                        camel_otel::extract_into_exchange(&mut exchange, &headers);
650                    }
651
652                    let reply_tx = envelope.reply_tx;
653                    let sender = ctx.sender().clone();
654                    let path_clone = path.clone();
655                    let cancel = cancel_token.clone();
656
657                    // Spawn a task to handle this request concurrently
658                    //
659                    // NOTE: This spawns a separate tokio task for each incoming HTTP request to enable
660                    // true concurrent request processing. This change was introduced as part of the
661                    // pipeline concurrency feature and was NOT part of the original HttpConsumer design.
662                    //
663                    // Rationale:
664                    // 1. Without spawning per-request tasks, the send_and_wait() operation would block
665                    //    the consumer's main loop until the pipeline processing completes
666                    // 2. This blocking would prevent multiple HTTP requests from being processed
667                    //    concurrently, even when ConcurrencyModel::Concurrent is enabled on the pipeline
668                    // 3. The channel would never have multiple exchanges buffered simultaneously,
669                    //    defeating the purpose of pipeline-side concurrency
670                    // 4. By spawning a task per request, we allow the consumer loop to continue
671                    //    accepting new requests while existing ones are processed in the pipeline
672                    //
673                    // This approach effectively decouples request acceptance from pipeline processing,
674                    // allowing the channel to buffer multiple exchanges that can be processed concurrently
675                    // by the pipeline when ConcurrencyModel::Concurrent is active.
676                    tokio::spawn(async move {
677                        // Check for cancellation before sending to pipeline.
678                        // Returns 503 (Service Unavailable) instead of letting the request
679                        // enter a shutting-down pipeline. This is a behavioral change from
680                        // the pre-concurrency implementation where cancellation during
681                        // processing would result in a 500 (Internal Server Error).
682                        // 503 is more semantically correct: the server is temporarily
683                        // unable to handle the request due to shutdown.
684                        if cancel.is_cancelled() {
685                            let _ = reply_tx.send(HttpReply {
686                                status: 503,
687                                headers: vec![],
688                                body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
689                            });
690                            return;
691                        }
692
693                        // Send through pipeline and await result
694                        let (tx, rx) = tokio::sync::oneshot::channel();
695                        let envelope = camel_component_api::consumer::ExchangeEnvelope {
696                            exchange,
697                            reply_tx: Some(tx),
698                        };
699
700                        let result = match sender.send(envelope).await {
701                            Ok(()) => rx.await.map_err(|_| camel_component_api::CamelError::ChannelClosed),
702                            Err(_) => Err(camel_component_api::CamelError::ChannelClosed),
703                        }
704                        .and_then(|r| r);
705
706                        let reply = match result {
707                            Ok(out) => {
708                                let status = out
709                                    .input
710                                    .header("CamelHttpResponseCode")
711                                    .and_then(|v| v.as_u64())
712                                    .map(|s| s as u16)
713                                    .unwrap_or(200);
714
715                                let reply_body: HttpReplyBody = match out.input.body {
716                                    Body::Empty => HttpReplyBody::Bytes(bytes::Bytes::new()),
717                                    Body::Bytes(b) => HttpReplyBody::Bytes(b),
718                                    Body::Text(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
719                                    Body::Xml(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
720                                    Body::Json(v) => HttpReplyBody::Bytes(bytes::Bytes::from(
721                                        v.to_string().into_bytes(),
722                                    )),
723                                    Body::Stream(s) => {
724                                        match s.stream.lock().await.take() {
725                                            Some(stream) => HttpReplyBody::Stream(stream),
726                                            None => {
727                                                tracing::error!(
728                                                    "Body::Stream already consumed before HTTP reply — returning 500"
729                                                );
730                                                let error_reply = HttpReply {
731                                                    status: 500,
732                                                    headers: vec![],
733                                                    body: HttpReplyBody::Bytes(bytes::Bytes::new()),
734                                                };
735                                                if reply_tx.send(error_reply).is_err() {
736                                                    debug!("reply_tx dropped before error reply could be sent");
737                                                }
738                                                return;
739                                            }
740                                        }
741                                    }
742                                };
743
744                                let resp_headers: Vec<(String, String)> = out
745                                    .input
746                                    .headers
747                                    .iter()
748                                    // Filter Camel internal headers
749                                    .filter(|(k, _)| !k.starts_with("Camel"))
750                                    // Filter hop-by-hop and request-only headers
751                                    // Based on Apache Camel's HttpUtil.addCommonFilters()
752                                    .filter(|(k, _)| {
753                                        !matches!(
754                                            k.to_lowercase().as_str(),
755                                            // RFC 2616 Section 4.5 - General headers
756                                            "content-length" |      // Auto-calculated by framework
757                                            "content-type" |        // Auto-calculated from body
758                                            "transfer-encoding" |   // Hop-by-hop
759                                            "connection" |          // Hop-by-hop
760                                            "cache-control" |       // Hop-by-hop
761                                            "date" |                // Auto-generated
762                                            "pragma" |              // Hop-by-hop
763                                            "trailer" |             // Hop-by-hop
764                                            "upgrade" |             // Hop-by-hop
765                                            "via" |                 // Hop-by-hop
766                                            "warning" |             // Hop-by-hop
767                                            // Request-only headers
768                                            "host" |                // Request-only
769                                            "user-agent" |          // Request-only
770                                            "accept" |              // Request-only
771                                            "accept-encoding" |     // Request-only
772                                            "accept-language" |     // Request-only
773                                            "accept-charset" |      // Request-only
774                                            "authorization" |       // Request-only (security)
775                                            "proxy-authorization" | // Request-only (security)
776                                            "cookie" |              // Request-only
777                                            "expect" |              // Request-only
778                                            "from" |                // Request-only
779                                            "if-match" |            // Request-only
780                                            "if-modified-since" |   // Request-only
781                                            "if-none-match" |       // Request-only
782                                            "if-range" |            // Request-only
783                                            "if-unmodified-since" | // Request-only
784                                            "max-forwards" |        // Request-only
785                                            "proxy-connection" |    // Request-only
786                                            "range" |               // Request-only
787                                            "referer" |             // Request-only
788                                            "te"                    // Request-only
789                                        )
790                                    })
791                                    .filter_map(|(k, v)| {
792                                        v.as_str().map(|s| (k.clone(), s.to_string()))
793                                    })
794                                    .collect();
795
796                                HttpReply {
797                                    status,
798                                    headers: resp_headers,
799                                    body: reply_body,
800                                }
801                            }
802                            Err(e) => {
803                                tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
804                                HttpReply {
805                                    status: 500,
806                                    headers: vec![],
807                                    body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
808                                }
809                            }
810                        };
811
812                        // Reply to Axum handler (ignore error if client disconnected)
813                        let _ = reply_tx.send(reply);
814                    });
815                }
816            }
817        }
818
819        // Deregister this path
820        {
821            let mut table = dispatch.write().await;
822            table.remove(&path);
823        }
824
825        Ok(())
826    }
827
828    async fn stop(&mut self) -> Result<(), CamelError> {
829        Ok(())
830    }
831
832    fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
833        camel_component_api::ConcurrencyModel::Concurrent { max: None }
834    }
835}
836
837// ---------------------------------------------------------------------------
838// HttpComponent / HttpsComponent
839// ---------------------------------------------------------------------------
840
841pub struct HttpComponent {
842    client: reqwest::Client,
843    config: HttpConfig,
844}
845
846fn build_client(config: &HttpConfig) -> reqwest::Client {
847    let mut builder = reqwest::Client::builder()
848        .connect_timeout(Duration::from_millis(config.connect_timeout_ms))
849        .pool_max_idle_per_host(config.pool_max_idle_per_host)
850        .pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
851
852    if !config.follow_redirects {
853        builder = builder.redirect(reqwest::redirect::Policy::none());
854    }
855
856    builder
857        .build()
858        .expect("reqwest::Client::build() with valid config should not fail")
859}
860
861impl HttpComponent {
862    pub fn new() -> Self {
863        let config = HttpConfig::default();
864        let client = build_client(&config);
865        Self { client, config }
866    }
867
868    pub fn with_config(config: HttpConfig) -> Self {
869        let client = build_client(&config);
870        Self { client, config }
871    }
872
873    pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
874        match config {
875            Some(cfg) => Self::with_config(cfg),
876            None => Self::new(),
877        }
878    }
879}
880
881impl Default for HttpComponent {
882    fn default() -> Self {
883        Self::new()
884    }
885}
886
887impl Component for HttpComponent {
888    fn scheme(&self) -> &str {
889        "http"
890    }
891
892    fn create_endpoint(
893        &self,
894        uri: &str,
895        _ctx: &dyn camel_component_api::ComponentContext,
896    ) -> Result<Box<dyn Endpoint>, CamelError> {
897        let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
898        let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
899        Ok(Box::new(HttpEndpoint {
900            uri: uri.to_string(),
901            config,
902            server_config,
903            client: self.client.clone(),
904        }))
905    }
906}
907
908pub struct HttpsComponent {
909    client: reqwest::Client,
910    config: HttpConfig,
911}
912
913impl HttpsComponent {
914    pub fn new() -> Self {
915        let config = HttpConfig::default();
916        let client = build_client(&config);
917        Self { client, config }
918    }
919
920    pub fn with_config(config: HttpConfig) -> Self {
921        let client = build_client(&config);
922        Self { client, config }
923    }
924
925    pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
926        match config {
927            Some(cfg) => Self::with_config(cfg),
928            None => Self::new(),
929        }
930    }
931}
932
933impl Default for HttpsComponent {
934    fn default() -> Self {
935        Self::new()
936    }
937}
938
939impl Component for HttpsComponent {
940    fn scheme(&self) -> &str {
941        "https"
942    }
943
944    fn create_endpoint(
945        &self,
946        uri: &str,
947        _ctx: &dyn camel_component_api::ComponentContext,
948    ) -> Result<Box<dyn Endpoint>, CamelError> {
949        let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
950        let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
951        Ok(Box::new(HttpEndpoint {
952            uri: uri.to_string(),
953            config,
954            server_config,
955            client: self.client.clone(),
956        }))
957    }
958}
959
960// ---------------------------------------------------------------------------
961// HttpEndpoint
962// ---------------------------------------------------------------------------
963
964struct HttpEndpoint {
965    uri: String,
966    config: HttpEndpointConfig,
967    server_config: HttpServerConfig,
968    client: reqwest::Client,
969}
970
971impl Endpoint for HttpEndpoint {
972    fn uri(&self) -> &str {
973        &self.uri
974    }
975
976    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
977        Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
978    }
979
980    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
981        Ok(BoxProcessor::new(HttpProducer {
982            config: Arc::new(self.config.clone()),
983            client: self.client.clone(),
984        }))
985    }
986}
987
988// ---------------------------------------------------------------------------
989// SSRF Protection
990// ---------------------------------------------------------------------------
991
992fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
993    let parsed = url::Url::parse(url)
994        .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
995
996    // Check blocked hosts
997    if let Some(host) = parsed.host_str()
998        && config.blocked_hosts.iter().any(|blocked| host == blocked)
999    {
1000        return Err(CamelError::ProcessorError(format!(
1001            "Host '{}' is blocked",
1002            host
1003        )));
1004    }
1005
1006    // Check private IPs if not allowed
1007    if !config.allow_private_ips
1008        && let Some(host) = parsed.host()
1009    {
1010        match host {
1011            url::Host::Ipv4(ip) => {
1012                if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
1013                    return Err(CamelError::ProcessorError(format!(
1014                        "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
1015                        ip
1016                    )));
1017                }
1018            }
1019            url::Host::Ipv6(ip) => {
1020                if ip.is_loopback() {
1021                    return Err(CamelError::ProcessorError(format!(
1022                        "Loopback IP '{}' not allowed",
1023                        ip
1024                    )));
1025                }
1026            }
1027            url::Host::Domain(domain) => {
1028                // Block common internal domains
1029                let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
1030                if blocked_domains.contains(&domain) {
1031                    return Err(CamelError::ProcessorError(format!(
1032                        "Domain '{}' is not allowed",
1033                        domain
1034                    )));
1035                }
1036            }
1037        }
1038    }
1039
1040    Ok(())
1041}
1042
1043// ---------------------------------------------------------------------------
1044// HttpProducer
1045// ---------------------------------------------------------------------------
1046
1047#[derive(Clone)]
1048struct HttpProducer {
1049    config: Arc<HttpEndpointConfig>,
1050    client: reqwest::Client,
1051}
1052
1053impl HttpProducer {
1054    fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1055        if let Some(ref method) = config.http_method {
1056            return method.to_uppercase();
1057        }
1058        if let Some(method) = exchange
1059            .input
1060            .header("CamelHttpMethod")
1061            .and_then(|v| v.as_str())
1062        {
1063            return method.to_uppercase();
1064        }
1065        if !exchange.input.body.is_empty() {
1066            return "POST".to_string();
1067        }
1068        "GET".to_string()
1069    }
1070
1071    fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1072        if let Some(uri) = exchange
1073            .input
1074            .header("CamelHttpUri")
1075            .and_then(|v| v.as_str())
1076        {
1077            let mut url = uri.to_string();
1078            if let Some(path) = exchange
1079                .input
1080                .header("CamelHttpPath")
1081                .and_then(|v| v.as_str())
1082            {
1083                if !url.ends_with('/') && !path.starts_with('/') {
1084                    url.push('/');
1085                }
1086                url.push_str(path);
1087            }
1088            if let Some(query) = exchange
1089                .input
1090                .header("CamelHttpQuery")
1091                .and_then(|v| v.as_str())
1092            {
1093                url.push('?');
1094                url.push_str(query);
1095            }
1096            return url;
1097        }
1098
1099        let mut url = config.base_url.clone();
1100
1101        if let Some(path) = exchange
1102            .input
1103            .header("CamelHttpPath")
1104            .and_then(|v| v.as_str())
1105        {
1106            if !url.ends_with('/') && !path.starts_with('/') {
1107                url.push('/');
1108            }
1109            url.push_str(path);
1110        }
1111
1112        if let Some(query) = exchange
1113            .input
1114            .header("CamelHttpQuery")
1115            .and_then(|v| v.as_str())
1116        {
1117            url.push('?');
1118            url.push_str(query);
1119        } else if !config.query_params.is_empty() {
1120            // Forward non-Camel query params from config
1121            url.push('?');
1122            let query_string: String = config
1123                .query_params
1124                .iter()
1125                .map(|(k, v)| format!("{k}={v}"))
1126                .collect::<Vec<_>>()
1127                .join("&");
1128            url.push_str(&query_string);
1129        }
1130
1131        url
1132    }
1133
1134    fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1135        status >= range.0 && status <= range.1
1136    }
1137}
1138
1139impl Service<Exchange> for HttpProducer {
1140    type Response = Exchange;
1141    type Error = CamelError;
1142    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1143
1144    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1145        Poll::Ready(Ok(()))
1146    }
1147
1148    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1149        let config = self.config.clone();
1150        let client = self.client.clone();
1151
1152        Box::pin(async move {
1153            let method_str = HttpProducer::resolve_method(&exchange, &config);
1154            let url = HttpProducer::resolve_url(&exchange, &config);
1155
1156            // SECURITY: Validate URL for SSRF
1157            validate_url_for_ssrf(&url, &config)?;
1158
1159            debug!(
1160                correlation_id = %exchange.correlation_id(),
1161                method = %method_str,
1162                url = %url,
1163                "HTTP request"
1164            );
1165
1166            let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1167                CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1168            })?;
1169
1170            let mut request = client.request(method, &url);
1171
1172            if let Some(timeout) = config.response_timeout {
1173                request = request.timeout(timeout);
1174            }
1175
1176            // Inject W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
1177            #[cfg(feature = "otel")]
1178            {
1179                let mut otel_headers = HashMap::new();
1180                camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1181                for (k, v) in otel_headers {
1182                    if let (Ok(name), Ok(val)) = (
1183                        reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1184                        reqwest::header::HeaderValue::from_str(&v),
1185                    ) {
1186                        request = request.header(name, val);
1187                    }
1188                }
1189            }
1190
1191            for (key, value) in &exchange.input.headers {
1192                if !key.starts_with("Camel")
1193                    && let Some(val_str) = value.as_str()
1194                    && let (Ok(name), Ok(val)) = (
1195                        reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1196                        reqwest::header::HeaderValue::from_str(val_str),
1197                    )
1198                {
1199                    request = request.header(name, val);
1200                }
1201            }
1202
1203            match exchange.input.body {
1204                Body::Stream(ref s) => {
1205                    let mut stream_lock = s.stream.lock().await;
1206                    if let Some(stream) = stream_lock.take() {
1207                        request = request.body(reqwest::Body::wrap_stream(stream));
1208                    } else {
1209                        return Err(CamelError::AlreadyConsumed);
1210                    }
1211                }
1212                _ => {
1213                    // For other types, materialize with configured limit
1214                    let body = std::mem::take(&mut exchange.input.body);
1215                    let bytes = body.into_bytes(config.max_body_size).await?;
1216                    if !bytes.is_empty() {
1217                        request = request.body(bytes);
1218                    }
1219                }
1220            }
1221
1222            let response = request
1223                .send()
1224                .await
1225                .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1226
1227            let status_code = response.status().as_u16();
1228            let status_text = response
1229                .status()
1230                .canonical_reason()
1231                .unwrap_or("Unknown")
1232                .to_string();
1233
1234            for (key, value) in response.headers() {
1235                if let Ok(val_str) = value.to_str() {
1236                    exchange
1237                        .input
1238                        .set_header(key.as_str(), serde_json::Value::String(val_str.to_string()));
1239                }
1240            }
1241
1242            exchange.input.set_header(
1243                "CamelHttpResponseCode",
1244                serde_json::Value::Number(status_code.into()),
1245            );
1246            exchange.input.set_header(
1247                "CamelHttpResponseText",
1248                serde_json::Value::String(status_text.clone()),
1249            );
1250
1251            let response_body = response.bytes().await.map_err(|e| {
1252                CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1253            })?;
1254
1255            if config.throw_exception_on_failure
1256                && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1257            {
1258                return Err(CamelError::HttpOperationFailed {
1259                    method: method_str,
1260                    url,
1261                    status_code,
1262                    status_text,
1263                    response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1264                });
1265            }
1266
1267            if !response_body.is_empty() {
1268                exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1269            }
1270
1271            debug!(
1272                correlation_id = %exchange.correlation_id(),
1273                status = status_code,
1274                url = %url,
1275                "HTTP response"
1276            );
1277            Ok(exchange)
1278        })
1279    }
1280}
1281
1282#[cfg(test)]
1283mod tests {
1284    use super::*;
1285    use camel_component_api::{Message, NoOpComponentContext};
1286    use std::sync::Arc;
1287    use std::time::Duration;
1288
1289    fn test_producer_ctx() -> ProducerContext {
1290        ProducerContext::new()
1291    }
1292
1293    #[test]
1294    fn test_http_config_defaults() {
1295        let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1296        assert_eq!(config.base_url, "http://localhost:8080/api");
1297        assert!(config.http_method.is_none());
1298        assert!(config.throw_exception_on_failure);
1299        assert_eq!(config.ok_status_code_range, (200, 299));
1300        assert!(config.response_timeout.is_none());
1301    }
1302
1303    #[test]
1304    fn test_http_config_scheme() {
1305        // UriConfig trait method returns "http" as primary scheme
1306        assert_eq!(HttpEndpointConfig::scheme(), "http");
1307    }
1308
1309    #[test]
1310    fn test_http_config_from_components() {
1311        // Test from_components directly (trait method)
1312        let components = camel_component_api::UriComponents {
1313            scheme: "https".to_string(),
1314            path: "//api.example.com/v1".to_string(),
1315            params: std::collections::HashMap::from([(
1316                "httpMethod".to_string(),
1317                "POST".to_string(),
1318            )]),
1319        };
1320        let config = HttpEndpointConfig::from_components(components).unwrap();
1321        assert_eq!(config.base_url, "https://api.example.com/v1");
1322        assert_eq!(config.http_method, Some("POST".to_string()));
1323    }
1324
1325    #[test]
1326    fn test_http_config_with_options() {
1327        let config = HttpEndpointConfig::from_uri(
1328            "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1329        ).unwrap();
1330        assert_eq!(config.base_url, "https://api.example.com/v1");
1331        assert_eq!(config.http_method, Some("PUT".to_string()));
1332        assert!(!config.throw_exception_on_failure);
1333        assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1334    }
1335
1336    #[test]
1337    fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
1338        let config = HttpConfig::default()
1339            .with_response_timeout_ms(999)
1340            .with_allow_private_ips(true)
1341            .with_blocked_hosts(vec!["evil.com".to_string()])
1342            .with_max_body_size(12345);
1343        let endpoint =
1344            HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
1345        assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
1346        assert!(endpoint.allow_private_ips);
1347        assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
1348        assert_eq!(endpoint.max_body_size, 12345);
1349    }
1350
1351    #[test]
1352    fn test_from_uri_with_defaults_uri_overrides_config() {
1353        let config = HttpConfig::default()
1354            .with_response_timeout_ms(999)
1355            .with_allow_private_ips(true)
1356            .with_blocked_hosts(vec!["evil.com".to_string()])
1357            .with_max_body_size(12345);
1358        let endpoint = HttpEndpointConfig::from_uri_with_defaults(
1359            "http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
1360            &config,
1361        )
1362        .unwrap();
1363        assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
1364        assert!(!endpoint.allow_private_ips);
1365        assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
1366        assert_eq!(endpoint.max_body_size, 99);
1367    }
1368
1369    #[test]
1370    fn test_http_config_ok_status_range() {
1371        let config =
1372            HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1373        assert_eq!(config.ok_status_code_range, (200, 204));
1374    }
1375
1376    #[test]
1377    fn test_http_config_wrong_scheme() {
1378        let result = HttpEndpointConfig::from_uri("file:/tmp");
1379        assert!(result.is_err());
1380    }
1381
1382    #[test]
1383    fn test_http_component_scheme() {
1384        let component = HttpComponent::new();
1385        assert_eq!(component.scheme(), "http");
1386    }
1387
1388    #[test]
1389    fn test_https_component_scheme() {
1390        let component = HttpsComponent::new();
1391        assert_eq!(component.scheme(), "https");
1392    }
1393
1394    #[test]
1395    fn test_http_endpoint_creates_consumer() {
1396        let component = HttpComponent::new();
1397        let ctx = NoOpComponentContext;
1398        let endpoint = component
1399            .create_endpoint("http://0.0.0.0:19100/test", &ctx)
1400            .unwrap();
1401        assert!(endpoint.create_consumer().is_ok());
1402    }
1403
1404    #[test]
1405    fn test_https_endpoint_creates_consumer() {
1406        let component = HttpsComponent::new();
1407        let ctx = NoOpComponentContext;
1408        let endpoint = component
1409            .create_endpoint("https://0.0.0.0:8443/test", &ctx)
1410            .unwrap();
1411        assert!(endpoint.create_consumer().is_ok());
1412    }
1413
1414    #[test]
1415    fn test_http_endpoint_creates_producer() {
1416        let ctx = test_producer_ctx();
1417        let component = HttpComponent::new();
1418        let endpoint_ctx = NoOpComponentContext;
1419        let endpoint = component
1420            .create_endpoint("http://localhost/api", &endpoint_ctx)
1421            .unwrap();
1422        assert!(endpoint.create_producer(&ctx).is_ok());
1423    }
1424
1425    // -----------------------------------------------------------------------
1426    // Producer tests
1427    // -----------------------------------------------------------------------
1428
1429    async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
1430        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1431        let addr = listener.local_addr().unwrap();
1432        let url = format!("http://127.0.0.1:{}", addr.port());
1433
1434        let handle = tokio::spawn(async move {
1435            loop {
1436                if let Ok((mut stream, _)) = listener.accept().await {
1437                    tokio::spawn(async move {
1438                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1439                        let mut buf = vec![0u8; 4096];
1440                        let n = stream.read(&mut buf).await.unwrap_or(0);
1441                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
1442
1443                        let method = request.split_whitespace().next().unwrap_or("GET");
1444
1445                        let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
1446                        let response = format!(
1447                            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
1448                            body.len(),
1449                            body
1450                        );
1451                        let _ = stream.write_all(response.as_bytes()).await;
1452                    });
1453                }
1454            }
1455        });
1456
1457        (url, handle)
1458    }
1459
1460    async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
1461        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1462        let addr = listener.local_addr().unwrap();
1463        let url = format!("http://127.0.0.1:{}", addr.port());
1464
1465        let handle = tokio::spawn(async move {
1466            loop {
1467                if let Ok((mut stream, _)) = listener.accept().await {
1468                    let status = status;
1469                    tokio::spawn(async move {
1470                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1471                        let mut buf = vec![0u8; 4096];
1472                        let _ = stream.read(&mut buf).await;
1473
1474                        let status_text = match status {
1475                            404 => "Not Found",
1476                            500 => "Internal Server Error",
1477                            _ => "Error",
1478                        };
1479                        let body = "error body";
1480                        let response = format!(
1481                            "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
1482                            status,
1483                            status_text,
1484                            body.len(),
1485                            body
1486                        );
1487                        let _ = stream.write_all(response.as_bytes()).await;
1488                    });
1489                }
1490            }
1491        });
1492
1493        (url, handle)
1494    }
1495
1496    #[tokio::test]
1497    async fn test_http_producer_get_request() {
1498        use tower::ServiceExt;
1499
1500        let (url, _handle) = start_test_server().await;
1501        let ctx = test_producer_ctx();
1502
1503        let component = HttpComponent::new();
1504        let endpoint_ctx = NoOpComponentContext;
1505        let endpoint = component
1506            .create_endpoint(
1507                &format!("{url}/api/test?allowPrivateIps=true"),
1508                &endpoint_ctx,
1509            )
1510            .unwrap();
1511        let producer = endpoint.create_producer(&ctx).unwrap();
1512
1513        let exchange = Exchange::new(Message::default());
1514        let result = producer.oneshot(exchange).await.unwrap();
1515
1516        let status = result
1517            .input
1518            .header("CamelHttpResponseCode")
1519            .and_then(|v| v.as_u64())
1520            .unwrap();
1521        assert_eq!(status, 200);
1522
1523        assert!(!result.input.body.is_empty());
1524    }
1525
1526    #[tokio::test]
1527    async fn test_http_producer_post_with_body() {
1528        use tower::ServiceExt;
1529
1530        let (url, _handle) = start_test_server().await;
1531        let ctx = test_producer_ctx();
1532
1533        let component = HttpComponent::new();
1534        let endpoint_ctx = NoOpComponentContext;
1535        let endpoint = component
1536            .create_endpoint(
1537                &format!("{url}/api/data?allowPrivateIps=true"),
1538                &endpoint_ctx,
1539            )
1540            .unwrap();
1541        let producer = endpoint.create_producer(&ctx).unwrap();
1542
1543        let exchange = Exchange::new(Message::new("request body"));
1544        let result = producer.oneshot(exchange).await.unwrap();
1545
1546        let status = result
1547            .input
1548            .header("CamelHttpResponseCode")
1549            .and_then(|v| v.as_u64())
1550            .unwrap();
1551        assert_eq!(status, 200);
1552    }
1553
1554    #[tokio::test]
1555    async fn test_http_producer_method_from_header() {
1556        use tower::ServiceExt;
1557
1558        let (url, _handle) = start_test_server().await;
1559        let ctx = test_producer_ctx();
1560
1561        let component = HttpComponent::new();
1562        let endpoint_ctx = NoOpComponentContext;
1563        let endpoint = component
1564            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
1565            .unwrap();
1566        let producer = endpoint.create_producer(&ctx).unwrap();
1567
1568        let mut exchange = Exchange::new(Message::default());
1569        exchange.input.set_header(
1570            "CamelHttpMethod",
1571            serde_json::Value::String("DELETE".to_string()),
1572        );
1573
1574        let result = producer.oneshot(exchange).await.unwrap();
1575        let status = result
1576            .input
1577            .header("CamelHttpResponseCode")
1578            .and_then(|v| v.as_u64())
1579            .unwrap();
1580        assert_eq!(status, 200);
1581    }
1582
1583    #[tokio::test]
1584    async fn test_http_producer_forced_method() {
1585        use tower::ServiceExt;
1586
1587        let (url, _handle) = start_test_server().await;
1588        let ctx = test_producer_ctx();
1589
1590        let component = HttpComponent::new();
1591        let endpoint_ctx = NoOpComponentContext;
1592        let endpoint = component
1593            .create_endpoint(
1594                &format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"),
1595                &endpoint_ctx,
1596            )
1597            .unwrap();
1598        let producer = endpoint.create_producer(&ctx).unwrap();
1599
1600        let exchange = Exchange::new(Message::default());
1601        let result = producer.oneshot(exchange).await.unwrap();
1602
1603        let status = result
1604            .input
1605            .header("CamelHttpResponseCode")
1606            .and_then(|v| v.as_u64())
1607            .unwrap();
1608        assert_eq!(status, 200);
1609    }
1610
1611    #[tokio::test]
1612    async fn test_http_producer_throw_exception_on_failure() {
1613        use tower::ServiceExt;
1614
1615        let (url, _handle) = start_status_server(404).await;
1616        let ctx = test_producer_ctx();
1617
1618        let component = HttpComponent::new();
1619        let endpoint_ctx = NoOpComponentContext;
1620        let endpoint = component
1621            .create_endpoint(
1622                &format!("{url}/not-found?allowPrivateIps=true"),
1623                &endpoint_ctx,
1624            )
1625            .unwrap();
1626        let producer = endpoint.create_producer(&ctx).unwrap();
1627
1628        let exchange = Exchange::new(Message::default());
1629        let result = producer.oneshot(exchange).await;
1630        assert!(result.is_err());
1631
1632        match result.unwrap_err() {
1633            CamelError::HttpOperationFailed { status_code, .. } => {
1634                assert_eq!(status_code, 404);
1635            }
1636            e => panic!("Expected HttpOperationFailed, got: {e}"),
1637        }
1638    }
1639
1640    #[tokio::test]
1641    async fn test_http_producer_no_throw_on_failure() {
1642        use tower::ServiceExt;
1643
1644        let (url, _handle) = start_status_server(500).await;
1645        let ctx = test_producer_ctx();
1646
1647        let component = HttpComponent::new();
1648        let endpoint_ctx = NoOpComponentContext;
1649        let endpoint = component
1650            .create_endpoint(
1651                &format!("{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"),
1652                &endpoint_ctx,
1653            )
1654            .unwrap();
1655        let producer = endpoint.create_producer(&ctx).unwrap();
1656
1657        let exchange = Exchange::new(Message::default());
1658        let result = producer.oneshot(exchange).await.unwrap();
1659
1660        let status = result
1661            .input
1662            .header("CamelHttpResponseCode")
1663            .and_then(|v| v.as_u64())
1664            .unwrap();
1665        assert_eq!(status, 500);
1666    }
1667
1668    #[tokio::test]
1669    async fn test_http_producer_uri_override() {
1670        use tower::ServiceExt;
1671
1672        let (url, _handle) = start_test_server().await;
1673        let ctx = test_producer_ctx();
1674
1675        let component = HttpComponent::new();
1676        let endpoint_ctx = NoOpComponentContext;
1677        let endpoint = component
1678            .create_endpoint(
1679                "http://localhost:1/does-not-exist?allowPrivateIps=true",
1680                &endpoint_ctx,
1681            )
1682            .unwrap();
1683        let producer = endpoint.create_producer(&ctx).unwrap();
1684
1685        let mut exchange = Exchange::new(Message::default());
1686        exchange.input.set_header(
1687            "CamelHttpUri",
1688            serde_json::Value::String(format!("{url}/api")),
1689        );
1690
1691        let result = producer.oneshot(exchange).await.unwrap();
1692        let status = result
1693            .input
1694            .header("CamelHttpResponseCode")
1695            .and_then(|v| v.as_u64())
1696            .unwrap();
1697        assert_eq!(status, 200);
1698    }
1699
1700    #[tokio::test]
1701    async fn test_http_producer_response_headers_mapped() {
1702        use tower::ServiceExt;
1703
1704        let (url, _handle) = start_test_server().await;
1705        let ctx = test_producer_ctx();
1706
1707        let component = HttpComponent::new();
1708        let endpoint_ctx = NoOpComponentContext;
1709        let endpoint = component
1710            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
1711            .unwrap();
1712        let producer = endpoint.create_producer(&ctx).unwrap();
1713
1714        let exchange = Exchange::new(Message::default());
1715        let result = producer.oneshot(exchange).await.unwrap();
1716
1717        assert!(
1718            result.input.header("content-type").is_some()
1719                || result.input.header("Content-Type").is_some()
1720        );
1721        assert!(result.input.header("CamelHttpResponseText").is_some());
1722    }
1723
1724    // -----------------------------------------------------------------------
1725    // Bug fix tests: Client configuration per-endpoint
1726    // -----------------------------------------------------------------------
1727
1728    async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
1729        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1730        let addr = listener.local_addr().unwrap();
1731        let url = format!("http://127.0.0.1:{}", addr.port());
1732
1733        let handle = tokio::spawn(async move {
1734            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1735            loop {
1736                if let Ok((mut stream, _)) = listener.accept().await {
1737                    tokio::spawn(async move {
1738                        let mut buf = vec![0u8; 4096];
1739                        let n = stream.read(&mut buf).await.unwrap_or(0);
1740                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
1741
1742                        // Check if this is a request to /final
1743                        if request.contains("GET /final") {
1744                            let body = r#"{"status":"final"}"#;
1745                            let response = format!(
1746                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1747                                body.len(),
1748                                body
1749                            );
1750                            let _ = stream.write_all(response.as_bytes()).await;
1751                        } else {
1752                            // Redirect to /final
1753                            let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
1754                            let _ = stream.write_all(response.as_bytes()).await;
1755                        }
1756                    });
1757                }
1758            }
1759        });
1760
1761        (url, handle)
1762    }
1763
1764    #[tokio::test]
1765    async fn test_follow_redirects_false_does_not_follow() {
1766        use tower::ServiceExt;
1767
1768        let (url, _handle) = start_redirect_server().await;
1769        let ctx = test_producer_ctx();
1770
1771        let component =
1772            HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
1773        let endpoint_ctx = NoOpComponentContext;
1774        let endpoint = component
1775            .create_endpoint(
1776                &format!("{url}?throwExceptionOnFailure=false&allowPrivateIps=true"),
1777                &endpoint_ctx,
1778            )
1779            .unwrap();
1780        let producer = endpoint.create_producer(&ctx).unwrap();
1781
1782        let exchange = Exchange::new(Message::default());
1783        let result = producer.oneshot(exchange).await.unwrap();
1784
1785        // Should get 302, NOT follow redirect to 200
1786        let status = result
1787            .input
1788            .header("CamelHttpResponseCode")
1789            .and_then(|v| v.as_u64())
1790            .unwrap();
1791        assert_eq!(
1792            status, 302,
1793            "Should NOT follow redirect when followRedirects=false"
1794        );
1795    }
1796
1797    #[tokio::test]
1798    async fn test_follow_redirects_true_follows_redirect() {
1799        use tower::ServiceExt;
1800
1801        let (url, _handle) = start_redirect_server().await;
1802        let ctx = test_producer_ctx();
1803
1804        let component =
1805            HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
1806        let endpoint_ctx = NoOpComponentContext;
1807        let endpoint = component
1808            .create_endpoint(&format!("{url}?allowPrivateIps=true"), &endpoint_ctx)
1809            .unwrap();
1810        let producer = endpoint.create_producer(&ctx).unwrap();
1811
1812        let exchange = Exchange::new(Message::default());
1813        let result = producer.oneshot(exchange).await.unwrap();
1814
1815        // Should follow redirect and get 200
1816        let status = result
1817            .input
1818            .header("CamelHttpResponseCode")
1819            .and_then(|v| v.as_u64())
1820            .unwrap();
1821        assert_eq!(
1822            status, 200,
1823            "Should follow redirect when followRedirects=true"
1824        );
1825    }
1826
1827    #[tokio::test]
1828    async fn test_query_params_forwarded_to_http_request() {
1829        use tower::ServiceExt;
1830
1831        let (url, _handle) = start_test_server().await;
1832        let ctx = test_producer_ctx();
1833
1834        let component = HttpComponent::new();
1835        let endpoint_ctx = NoOpComponentContext;
1836        // apiKey is NOT a Camel option, should be forwarded as query param
1837        let endpoint = component
1838            .create_endpoint(
1839                &format!("{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"),
1840                &endpoint_ctx,
1841            )
1842            .unwrap();
1843        let producer = endpoint.create_producer(&ctx).unwrap();
1844
1845        let exchange = Exchange::new(Message::default());
1846        let result = producer.oneshot(exchange).await.unwrap();
1847
1848        // The test server returns the request info in response
1849        // We just verify it succeeds (the query param was sent)
1850        let status = result
1851            .input
1852            .header("CamelHttpResponseCode")
1853            .and_then(|v| v.as_u64())
1854            .unwrap();
1855        assert_eq!(status, 200);
1856    }
1857
1858    #[tokio::test]
1859    async fn test_non_camel_query_params_are_forwarded() {
1860        // This test verifies Bug #3 fix: non-Camel options should be forwarded
1861        // We'll test the config parsing, not the actual HTTP call
1862        let config = HttpEndpointConfig::from_uri(
1863            "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
1864        )
1865        .unwrap();
1866
1867        // apiKey and token are NOT Camel options, should be forwarded
1868        assert!(
1869            config.query_params.contains_key("apiKey"),
1870            "apiKey should be preserved"
1871        );
1872        assert!(
1873            config.query_params.contains_key("token"),
1874            "token should be preserved"
1875        );
1876        assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
1877        assert_eq!(config.query_params.get("token").unwrap(), "abc456");
1878
1879        // httpMethod IS a Camel option, should NOT be in query_params
1880        assert!(
1881            !config.query_params.contains_key("httpMethod"),
1882            "httpMethod should not be forwarded"
1883        );
1884    }
1885
1886    // -----------------------------------------------------------------------
1887    // SSRF Protection tests
1888    // -----------------------------------------------------------------------
1889
1890    #[tokio::test]
1891    async fn test_http_producer_blocks_metadata_endpoint() {
1892        use tower::ServiceExt;
1893
1894        let ctx = test_producer_ctx();
1895        let component = HttpComponent::new();
1896        let endpoint_ctx = NoOpComponentContext;
1897        let endpoint = component
1898            .create_endpoint(
1899                "http://example.com/api?allowPrivateIps=false",
1900                &endpoint_ctx,
1901            )
1902            .unwrap();
1903        let producer = endpoint.create_producer(&ctx).unwrap();
1904
1905        let mut exchange = Exchange::new(Message::default());
1906        exchange.input.set_header(
1907            "CamelHttpUri",
1908            serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
1909        );
1910
1911        let result = producer.oneshot(exchange).await;
1912        assert!(result.is_err(), "Should block AWS metadata endpoint");
1913
1914        let err = result.unwrap_err();
1915        assert!(
1916            err.to_string().contains("Private IP"),
1917            "Error should mention private IP blocking, got: {}",
1918            err
1919        );
1920    }
1921
1922    #[test]
1923    fn test_ssrf_config_defaults() {
1924        let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
1925        assert!(
1926            !config.allow_private_ips,
1927            "Private IPs should be blocked by default"
1928        );
1929        assert!(
1930            config.blocked_hosts.is_empty(),
1931            "Blocked hosts should be empty by default"
1932        );
1933    }
1934
1935    #[test]
1936    fn test_ssrf_config_allow_private_ips() {
1937        let config =
1938            HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
1939        assert!(
1940            config.allow_private_ips,
1941            "Private IPs should be allowed when explicitly set"
1942        );
1943    }
1944
1945    #[test]
1946    fn test_ssrf_config_blocked_hosts() {
1947        let config = HttpEndpointConfig::from_uri(
1948            "http://example.com/api?blockedHosts=evil.com,malware.net",
1949        )
1950        .unwrap();
1951        assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
1952    }
1953
1954    #[tokio::test]
1955    async fn test_http_producer_blocks_localhost() {
1956        use tower::ServiceExt;
1957
1958        let ctx = test_producer_ctx();
1959        let component = HttpComponent::new();
1960        let endpoint_ctx = NoOpComponentContext;
1961        let endpoint = component
1962            .create_endpoint("http://example.com/api", &endpoint_ctx)
1963            .unwrap();
1964        let producer = endpoint.create_producer(&ctx).unwrap();
1965
1966        let mut exchange = Exchange::new(Message::default());
1967        exchange.input.set_header(
1968            "CamelHttpUri",
1969            serde_json::Value::String("http://localhost:8080/internal".to_string()),
1970        );
1971
1972        let result = producer.oneshot(exchange).await;
1973        assert!(result.is_err(), "Should block localhost");
1974    }
1975
1976    #[tokio::test]
1977    async fn test_http_producer_blocks_loopback_ip() {
1978        use tower::ServiceExt;
1979
1980        let ctx = test_producer_ctx();
1981        let component = HttpComponent::new();
1982        let endpoint_ctx = NoOpComponentContext;
1983        let endpoint = component
1984            .create_endpoint("http://example.com/api", &endpoint_ctx)
1985            .unwrap();
1986        let producer = endpoint.create_producer(&ctx).unwrap();
1987
1988        let mut exchange = Exchange::new(Message::default());
1989        exchange.input.set_header(
1990            "CamelHttpUri",
1991            serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
1992        );
1993
1994        let result = producer.oneshot(exchange).await;
1995        assert!(result.is_err(), "Should block loopback IP");
1996    }
1997
1998    #[tokio::test]
1999    async fn test_http_producer_allows_private_ip_when_enabled() {
2000        use tower::ServiceExt;
2001
2002        let ctx = test_producer_ctx();
2003        let component = HttpComponent::new();
2004        let endpoint_ctx = NoOpComponentContext;
2005        // With allowPrivateIps=true, the validation should pass
2006        // (actual connection will fail, but that's expected)
2007        let endpoint = component
2008            .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true", &endpoint_ctx)
2009            .unwrap();
2010        let producer = endpoint.create_producer(&ctx).unwrap();
2011
2012        let exchange = Exchange::new(Message::default());
2013
2014        // The request will fail because we can't connect, but it should NOT fail
2015        // due to SSRF protection
2016        let result = producer.oneshot(exchange).await;
2017        // We expect connection error, not SSRF error
2018        if let Err(ref e) = result {
2019            let err_str = e.to_string();
2020            assert!(
2021                !err_str.contains("Private IP") && !err_str.contains("not allowed"),
2022                "Should not be SSRF error, got: {}",
2023                err_str
2024            );
2025        }
2026    }
2027
2028    // -----------------------------------------------------------------------
2029    // HttpServerConfig tests
2030    // -----------------------------------------------------------------------
2031
2032    #[test]
2033    fn test_http_server_config_parse() {
2034        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
2035        assert_eq!(cfg.host, "0.0.0.0");
2036        assert_eq!(cfg.port, 8080);
2037        assert_eq!(cfg.path, "/orders");
2038    }
2039
2040    #[test]
2041    fn test_http_server_config_scheme() {
2042        // UriConfig trait method returns "http" as primary scheme
2043        assert_eq!(HttpServerConfig::scheme(), "http");
2044    }
2045
2046    #[test]
2047    fn test_http_server_config_from_components() {
2048        // Test from_components directly (trait method)
2049        let components = camel_component_api::UriComponents {
2050            scheme: "https".to_string(),
2051            path: "//0.0.0.0:8443/api".to_string(),
2052            params: std::collections::HashMap::from([(
2053                "maxRequestBody".to_string(),
2054                "5242880".to_string(),
2055            )]),
2056        };
2057        let cfg = HttpServerConfig::from_components(components).unwrap();
2058        assert_eq!(cfg.host, "0.0.0.0");
2059        assert_eq!(cfg.port, 8443);
2060        assert_eq!(cfg.path, "/api");
2061        assert_eq!(cfg.max_request_body, 5242880);
2062    }
2063
2064    #[test]
2065    fn test_http_server_config_default_path() {
2066        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
2067        assert_eq!(cfg.path, "/");
2068    }
2069
2070    #[test]
2071    fn test_http_server_config_wrong_scheme() {
2072        assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
2073    }
2074
2075    #[test]
2076    fn test_http_server_config_invalid_port() {
2077        assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
2078    }
2079
2080    #[test]
2081    fn test_http_server_config_default_port_by_scheme() {
2082        // HTTP without explicit port should default to 80
2083        let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
2084        assert_eq!(cfg_http.port, 80);
2085
2086        // HTTPS without explicit port should default to 443
2087        let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
2088        assert_eq!(cfg_https.port, 443);
2089    }
2090
2091    #[test]
2092    fn test_request_envelope_and_reply_are_send() {
2093        fn assert_send<T: Send>() {}
2094        assert_send::<RequestEnvelope>();
2095        assert_send::<HttpReply>();
2096    }
2097
2098    // -----------------------------------------------------------------------
2099    // ServerRegistry tests
2100    // -----------------------------------------------------------------------
2101
2102    #[test]
2103    fn test_server_registry_global_is_singleton() {
2104        let r1 = ServerRegistry::global();
2105        let r2 = ServerRegistry::global();
2106        assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
2107    }
2108
2109    #[tokio::test]
2110    async fn test_concurrent_get_or_spawn_returns_same_dispatch() {
2111        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2112        let port = listener.local_addr().unwrap().port();
2113        drop(listener);
2114
2115        let results: Arc<std::sync::Mutex<Vec<DispatchTable>>> =
2116            Arc::new(std::sync::Mutex::new(Vec::new()));
2117
2118        let mut handles = Vec::new();
2119        for _ in 0..4 {
2120            let results = results.clone();
2121            handles.push(tokio::spawn(async move {
2122                let dispatch = ServerRegistry::global()
2123                    .get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024)
2124                    .await
2125                    .unwrap();
2126                results.lock().unwrap().push(dispatch);
2127            }));
2128        }
2129
2130        for h in handles {
2131            h.await.unwrap();
2132        }
2133
2134        let dispatches = results.lock().unwrap();
2135        assert_eq!(dispatches.len(), 4);
2136        for i in 1..dispatches.len() {
2137            assert!(
2138                Arc::ptr_eq(&dispatches[0], &dispatches[i]),
2139                "all concurrent callers should get the same dispatch table"
2140            );
2141        }
2142    }
2143
2144    // -----------------------------------------------------------------------
2145    // Axum dispatch handler tests
2146    // -----------------------------------------------------------------------
2147
2148    #[tokio::test]
2149    async fn test_dispatch_handler_returns_404_for_unknown_path() {
2150        let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
2151        // Nothing registered in the dispatch table
2152        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2153        let port = listener.local_addr().unwrap().port();
2154        tokio::spawn(run_axum_server(listener, dispatch, 2 * 1024 * 1024));
2155
2156        // Wait for server to start
2157        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2158
2159        let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
2160            .await
2161            .unwrap();
2162        assert_eq!(resp.status().as_u16(), 404);
2163    }
2164
2165    // -----------------------------------------------------------------------
2166    // HttpConsumer tests
2167    // -----------------------------------------------------------------------
2168
2169    #[tokio::test]
2170    async fn test_http_consumer_start_registers_path() {
2171        use camel_component_api::ConsumerContext;
2172
2173        // Get an OS-assigned free port
2174        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2175        let port = listener.local_addr().unwrap().port();
2176        drop(listener); // Release port — ServerRegistry will rebind it
2177
2178        let consumer_cfg = HttpServerConfig {
2179            host: "127.0.0.1".to_string(),
2180            port,
2181            path: "/ping".to_string(),
2182            max_request_body: 2 * 1024 * 1024,
2183            max_response_body: 10 * 1024 * 1024,
2184        };
2185        let mut consumer = HttpConsumer::new(consumer_cfg);
2186
2187        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
2188        let token = tokio_util::sync::CancellationToken::new();
2189        let ctx = ConsumerContext::new(tx, token.clone());
2190
2191        tokio::spawn(async move {
2192            consumer.start(ctx).await.unwrap();
2193        });
2194
2195        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2196
2197        let client = reqwest::Client::new();
2198        let resp_future = client
2199            .post(format!("http://127.0.0.1:{port}/ping"))
2200            .body("hello world")
2201            .send();
2202
2203        let (http_result, _) = tokio::join!(resp_future, async {
2204            if let Some(mut envelope) = rx.recv().await {
2205                // Set a custom status code
2206                envelope.exchange.input.set_header(
2207                    "CamelHttpResponseCode",
2208                    serde_json::Value::Number(201.into()),
2209                );
2210                if let Some(reply_tx) = envelope.reply_tx {
2211                    let _ = reply_tx.send(Ok(envelope.exchange));
2212                }
2213            }
2214        });
2215
2216        let resp = http_result.unwrap();
2217        assert_eq!(resp.status().as_u16(), 201);
2218
2219        token.cancel();
2220    }
2221
2222    // -----------------------------------------------------------------------
2223    // Integration tests
2224    // -----------------------------------------------------------------------
2225
2226    #[tokio::test]
2227    async fn test_integration_single_consumer_round_trip() {
2228        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2229
2230        // Get an OS-assigned free port (ephemeral)
2231        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2232        let port = listener.local_addr().unwrap().port();
2233        drop(listener); // Release — ServerRegistry will rebind
2234
2235        let component = HttpComponent::new();
2236        let endpoint_ctx = NoOpComponentContext;
2237        let endpoint = component
2238            .create_endpoint(&format!("http://127.0.0.1:{port}/echo"), &endpoint_ctx)
2239            .unwrap();
2240        let mut consumer = endpoint.create_consumer().unwrap();
2241
2242        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2243        let token = tokio_util::sync::CancellationToken::new();
2244        let ctx = ConsumerContext::new(tx, token.clone());
2245
2246        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2247        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2248
2249        let client = reqwest::Client::new();
2250        let send_fut = client
2251            .post(format!("http://127.0.0.1:{port}/echo"))
2252            .header("Content-Type", "text/plain")
2253            .body("ping")
2254            .send();
2255
2256        let (http_result, _) = tokio::join!(send_fut, async {
2257            if let Some(mut envelope) = rx.recv().await {
2258                assert_eq!(
2259                    envelope.exchange.input.header("CamelHttpMethod"),
2260                    Some(&serde_json::Value::String("POST".into()))
2261                );
2262                assert_eq!(
2263                    envelope.exchange.input.header("CamelHttpPath"),
2264                    Some(&serde_json::Value::String("/echo".into()))
2265                );
2266                envelope.exchange.input.body = camel_component_api::Body::Text("pong".to_string());
2267                if let Some(reply_tx) = envelope.reply_tx {
2268                    let _ = reply_tx.send(Ok(envelope.exchange));
2269                }
2270            }
2271        });
2272
2273        let resp = http_result.unwrap();
2274        assert_eq!(resp.status().as_u16(), 200);
2275        let body = resp.text().await.unwrap();
2276        assert_eq!(body, "pong");
2277
2278        token.cancel();
2279    }
2280
2281    #[tokio::test]
2282    async fn test_integration_two_consumers_shared_port() {
2283        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2284
2285        // Get an OS-assigned free port (ephemeral)
2286        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2287        let port = listener.local_addr().unwrap().port();
2288        drop(listener);
2289
2290        let component = HttpComponent::new();
2291        let endpoint_ctx = NoOpComponentContext;
2292
2293        // Consumer A: /hello
2294        let endpoint_a = component
2295            .create_endpoint(&format!("http://127.0.0.1:{port}/hello"), &endpoint_ctx)
2296            .unwrap();
2297        let mut consumer_a = endpoint_a.create_consumer().unwrap();
2298
2299        // Consumer B: /world
2300        let endpoint_b = component
2301            .create_endpoint(&format!("http://127.0.0.1:{port}/world"), &endpoint_ctx)
2302            .unwrap();
2303        let mut consumer_b = endpoint_b.create_consumer().unwrap();
2304
2305        let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2306        let token_a = tokio_util::sync::CancellationToken::new();
2307        let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
2308
2309        let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2310        let token_b = tokio_util::sync::CancellationToken::new();
2311        let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
2312
2313        tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
2314        tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
2315        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2316
2317        let client = reqwest::Client::new();
2318
2319        // Request to /hello
2320        let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
2321        let (resp_hello, _) = tokio::join!(fut_hello, async {
2322            if let Some(mut envelope) = rx_a.recv().await {
2323                envelope.exchange.input.body =
2324                    camel_component_api::Body::Text("hello-response".to_string());
2325                if let Some(reply_tx) = envelope.reply_tx {
2326                    let _ = reply_tx.send(Ok(envelope.exchange));
2327                }
2328            }
2329        });
2330
2331        // Request to /world
2332        let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
2333        let (resp_world, _) = tokio::join!(fut_world, async {
2334            if let Some(mut envelope) = rx_b.recv().await {
2335                envelope.exchange.input.body =
2336                    camel_component_api::Body::Text("world-response".to_string());
2337                if let Some(reply_tx) = envelope.reply_tx {
2338                    let _ = reply_tx.send(Ok(envelope.exchange));
2339                }
2340            }
2341        });
2342
2343        let body_a = resp_hello.unwrap().text().await.unwrap();
2344        let body_b = resp_world.unwrap().text().await.unwrap();
2345
2346        assert_eq!(body_a, "hello-response");
2347        assert_eq!(body_b, "world-response");
2348
2349        token_a.cancel();
2350        token_b.cancel();
2351    }
2352
2353    #[tokio::test]
2354    async fn test_integration_unregistered_path_returns_404() {
2355        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2356
2357        // Get an OS-assigned free port (ephemeral)
2358        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2359        let port = listener.local_addr().unwrap().port();
2360        drop(listener);
2361
2362        let component = HttpComponent::new();
2363        let endpoint_ctx = NoOpComponentContext;
2364        let endpoint = component
2365            .create_endpoint(
2366                &format!("http://127.0.0.1:{port}/registered"),
2367                &endpoint_ctx,
2368            )
2369            .unwrap();
2370        let mut consumer = endpoint.create_consumer().unwrap();
2371
2372        let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2373        let token = tokio_util::sync::CancellationToken::new();
2374        let ctx = ConsumerContext::new(tx, token.clone());
2375
2376        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2377        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2378
2379        let client = reqwest::Client::new();
2380        let resp = client
2381            .get(format!("http://127.0.0.1:{port}/not-there"))
2382            .send()
2383            .await
2384            .unwrap();
2385        assert_eq!(resp.status().as_u16(), 404);
2386
2387        token.cancel();
2388    }
2389
2390    #[test]
2391    fn test_http_consumer_declares_concurrent() {
2392        use camel_component_api::ConcurrencyModel;
2393
2394        let config = HttpServerConfig {
2395            host: "127.0.0.1".to_string(),
2396            port: 19999,
2397            path: "/test".to_string(),
2398            max_request_body: 2 * 1024 * 1024,
2399            max_response_body: 10 * 1024 * 1024,
2400        };
2401        let consumer = HttpConsumer::new(config);
2402        assert_eq!(
2403            consumer.concurrency_model(),
2404            ConcurrencyModel::Concurrent { max: None }
2405        );
2406    }
2407
2408    // -----------------------------------------------------------------------
2409    // HttpReplyBody streaming tests
2410    // -----------------------------------------------------------------------
2411
2412    #[tokio::test]
2413    async fn test_http_reply_body_stream_variant_exists() {
2414        use bytes::Bytes;
2415        use camel_component_api::CamelError;
2416        use futures::stream;
2417
2418        let chunks: Vec<Result<Bytes, CamelError>> =
2419            vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
2420        let stream = Box::pin(stream::iter(chunks));
2421        let reply_body = HttpReplyBody::Stream(stream);
2422        // Si compila y el match funciona, el test pasa
2423        match reply_body {
2424            HttpReplyBody::Stream(_) => {}
2425            HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
2426        }
2427    }
2428
2429    // -----------------------------------------------------------------------
2430    // OpenTelemetry propagation tests (only compiled with "otel" feature)
2431    // -----------------------------------------------------------------------
2432
2433    #[cfg(feature = "otel")]
2434    mod otel_tests {
2435        use super::*;
2436        use camel_component_api::Message;
2437        use tower::ServiceExt;
2438
2439        #[tokio::test]
2440        async fn test_producer_injects_traceparent_header() {
2441            let (url, _handle) = start_test_server_with_header_capture().await;
2442            let ctx = test_producer_ctx();
2443
2444            let component = HttpComponent::new();
2445            let endpoint_ctx = NoOpComponentContext;
2446            let endpoint = component
2447                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2448                .unwrap();
2449            let producer = endpoint.create_producer(&ctx).unwrap();
2450
2451            // Create exchange with an OTel context by extracting from a traceparent header
2452            let mut exchange = Exchange::new(Message::default());
2453            let mut headers = std::collections::HashMap::new();
2454            headers.insert(
2455                "traceparent".to_string(),
2456                "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
2457            );
2458            camel_otel::extract_into_exchange(&mut exchange, &headers);
2459
2460            let result = producer.oneshot(exchange).await.unwrap();
2461
2462            // Verify request succeeded
2463            let status = result
2464                .input
2465                .header("CamelHttpResponseCode")
2466                .and_then(|v| v.as_u64())
2467                .unwrap();
2468            assert_eq!(status, 200);
2469
2470            // The test server echoes back the received traceparent header
2471            let traceparent = result.input.header("x-received-traceparent");
2472            assert!(
2473                traceparent.is_some(),
2474                "traceparent header should have been sent"
2475            );
2476
2477            let traceparent_str = traceparent.unwrap().as_str().unwrap();
2478            // Verify format: version-traceid-spanid-flags
2479            let parts: Vec<&str> = traceparent_str.split('-').collect();
2480            assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2481            assert_eq!(parts[0], "00", "version should be 00");
2482            assert_eq!(
2483                parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2484                "trace-id should match"
2485            );
2486            assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
2487            assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
2488        }
2489
2490        #[tokio::test]
2491        async fn test_consumer_extracts_traceparent_header() {
2492            use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2493
2494            // Get an OS-assigned free port
2495            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2496            let port = listener.local_addr().unwrap().port();
2497            drop(listener);
2498
2499            let component = HttpComponent::new();
2500            let endpoint_ctx = NoOpComponentContext;
2501            let endpoint = component
2502                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
2503                .unwrap();
2504            let mut consumer = endpoint.create_consumer().unwrap();
2505
2506            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2507            let token = tokio_util::sync::CancellationToken::new();
2508            let ctx = ConsumerContext::new(tx, token.clone());
2509
2510            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2511            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2512
2513            // Send request with traceparent header
2514            let client = reqwest::Client::new();
2515            let send_fut = client
2516                .post(format!("http://127.0.0.1:{port}/trace"))
2517                .header(
2518                    "traceparent",
2519                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2520                )
2521                .body("test")
2522                .send();
2523
2524            let (http_result, _) = tokio::join!(send_fut, async {
2525                if let Some(envelope) = rx.recv().await {
2526                    // Verify the exchange has a valid OTel context by re-injecting it
2527                    // and checking the traceparent matches
2528                    let mut injected_headers = std::collections::HashMap::new();
2529                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2530
2531                    assert!(
2532                        injected_headers.contains_key("traceparent"),
2533                        "Exchange should have traceparent after extraction"
2534                    );
2535
2536                    let traceparent = injected_headers.get("traceparent").unwrap();
2537                    let parts: Vec<&str> = traceparent.split('-').collect();
2538                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2539                    assert_eq!(
2540                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2541                        "Trace ID should match the original traceparent header"
2542                    );
2543
2544                    if let Some(reply_tx) = envelope.reply_tx {
2545                        let _ = reply_tx.send(Ok(envelope.exchange));
2546                    }
2547                }
2548            });
2549
2550            let resp = http_result.unwrap();
2551            assert_eq!(resp.status().as_u16(), 200);
2552
2553            token.cancel();
2554        }
2555
2556        #[tokio::test]
2557        async fn test_consumer_extracts_mixed_case_traceparent_header() {
2558            use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2559
2560            // Get an OS-assigned free port
2561            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2562            let port = listener.local_addr().unwrap().port();
2563            drop(listener);
2564
2565            let component = HttpComponent::new();
2566            let endpoint_ctx = NoOpComponentContext;
2567            let endpoint = component
2568                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
2569                .unwrap();
2570            let mut consumer = endpoint.create_consumer().unwrap();
2571
2572            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2573            let token = tokio_util::sync::CancellationToken::new();
2574            let ctx = ConsumerContext::new(tx, token.clone());
2575
2576            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2577            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2578
2579            // Send request with MIXED-CASE TraceParent header (not lowercase)
2580            let client = reqwest::Client::new();
2581            let send_fut = client
2582                .post(format!("http://127.0.0.1:{port}/trace"))
2583                .header(
2584                    "TraceParent",
2585                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2586                )
2587                .body("test")
2588                .send();
2589
2590            let (http_result, _) = tokio::join!(send_fut, async {
2591                if let Some(envelope) = rx.recv().await {
2592                    // Verify the exchange has a valid OTel context by re-injecting it
2593                    // and checking the traceparent matches
2594                    let mut injected_headers = HashMap::new();
2595                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2596
2597                    assert!(
2598                        injected_headers.contains_key("traceparent"),
2599                        "Exchange should have traceparent after extraction from mixed-case header"
2600                    );
2601
2602                    let traceparent = injected_headers.get("traceparent").unwrap();
2603                    let parts: Vec<&str> = traceparent.split('-').collect();
2604                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2605                    assert_eq!(
2606                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2607                        "Trace ID should match the original mixed-case TraceParent header"
2608                    );
2609
2610                    if let Some(reply_tx) = envelope.reply_tx {
2611                        let _ = reply_tx.send(Ok(envelope.exchange));
2612                    }
2613                }
2614            });
2615
2616            let resp = http_result.unwrap();
2617            assert_eq!(resp.status().as_u16(), 200);
2618
2619            token.cancel();
2620        }
2621
2622        #[tokio::test]
2623        async fn test_producer_no_trace_context_no_crash() {
2624            let (url, _handle) = start_test_server().await;
2625            let ctx = test_producer_ctx();
2626
2627            let component = HttpComponent::new();
2628            let endpoint_ctx = NoOpComponentContext;
2629            let endpoint = component
2630                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2631                .unwrap();
2632            let producer = endpoint.create_producer(&ctx).unwrap();
2633
2634            // Create exchange with default (empty) otel_context - no trace context
2635            let exchange = Exchange::new(Message::default());
2636
2637            // Should succeed without panic
2638            let result = producer.oneshot(exchange).await.unwrap();
2639
2640            // Verify request succeeded
2641            let status = result
2642                .input
2643                .header("CamelHttpResponseCode")
2644                .and_then(|v| v.as_u64())
2645                .unwrap();
2646            assert_eq!(status, 200);
2647        }
2648
2649        /// Test server that captures and echoes back the traceparent header
2650        async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
2651            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2652            let addr = listener.local_addr().unwrap();
2653            let url = format!("http://127.0.0.1:{}", addr.port());
2654
2655            let handle = tokio::spawn(async move {
2656                loop {
2657                    if let Ok((mut stream, _)) = listener.accept().await {
2658                        tokio::spawn(async move {
2659                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
2660                            let mut buf = vec![0u8; 8192];
2661                            let n = stream.read(&mut buf).await.unwrap_or(0);
2662                            let request = String::from_utf8_lossy(&buf[..n]).to_string();
2663
2664                            // Extract traceparent header from request
2665                            let traceparent = request
2666                                .lines()
2667                                .find(|line| line.to_lowercase().starts_with("traceparent:"))
2668                                .map(|line| {
2669                                    line.split(':')
2670                                        .nth(1)
2671                                        .map(|s| s.trim().to_string())
2672                                        .unwrap_or_default()
2673                                })
2674                                .unwrap_or_default();
2675
2676                            let body =
2677                                format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
2678                            let response = format!(
2679                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
2680                                body.len(),
2681                                traceparent,
2682                                body
2683                            );
2684                            let _ = stream.write_all(response.as_bytes()).await;
2685                        });
2686                    }
2687                }
2688            });
2689
2690            (url, handle)
2691        }
2692    }
2693
2694    // -----------------------------------------------------------------------
2695    // Response streaming tests (Eje A - Task 2)
2696    // -----------------------------------------------------------------------
2697
2698    // -----------------------------------------------------------------------
2699    // Request streaming tests (Eje B - Task 3)
2700    // -----------------------------------------------------------------------
2701
2702    #[tokio::test]
2703    async fn test_request_body_arrives_as_stream() {
2704        use camel_component_api::Body;
2705        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2706
2707        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2708        let port = listener.local_addr().unwrap().port();
2709        drop(listener);
2710
2711        let component = HttpComponent::new();
2712        let endpoint_ctx = NoOpComponentContext;
2713        let endpoint = component
2714            .create_endpoint(&format!("http://127.0.0.1:{port}/upload"), &endpoint_ctx)
2715            .unwrap();
2716        let mut consumer = endpoint.create_consumer().unwrap();
2717
2718        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2719        let token = tokio_util::sync::CancellationToken::new();
2720        let ctx = ConsumerContext::new(tx, token.clone());
2721
2722        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2723        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2724
2725        let client = reqwest::Client::new();
2726        let send_fut = client
2727            .post(format!("http://127.0.0.1:{port}/upload"))
2728            .body("hello streaming world")
2729            .send();
2730
2731        let (http_result, _) = tokio::join!(send_fut, async {
2732            if let Some(mut envelope) = rx.recv().await {
2733                // Body must be Body::Stream, not Body::Text or Body::Bytes
2734                assert!(
2735                    matches!(envelope.exchange.input.body, Body::Stream(_)),
2736                    "expected Body::Stream, got discriminant {:?}",
2737                    std::mem::discriminant(&envelope.exchange.input.body)
2738                );
2739                // Materialize to verify content
2740                let bytes = envelope
2741                    .exchange
2742                    .input
2743                    .body
2744                    .into_bytes(1024 * 1024)
2745                    .await
2746                    .unwrap();
2747                assert_eq!(&bytes[..], b"hello streaming world");
2748
2749                envelope.exchange.input.body = camel_component_api::Body::Empty;
2750                if let Some(reply_tx) = envelope.reply_tx {
2751                    let _ = reply_tx.send(Ok(envelope.exchange));
2752                }
2753            }
2754        });
2755
2756        let resp = http_result.unwrap();
2757        assert_eq!(resp.status().as_u16(), 200);
2758
2759        token.cancel();
2760    }
2761
2762    // -----------------------------------------------------------------------
2763    // Response streaming tests (Eje A - Task 2)
2764    // -----------------------------------------------------------------------
2765
2766    #[tokio::test]
2767    async fn test_streaming_response_chunked() {
2768        use bytes::Bytes;
2769        use camel_component_api::Body;
2770        use camel_component_api::CamelError;
2771        use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2772        use camel_component_api::{StreamBody, StreamMetadata};
2773        use futures::stream;
2774        use std::sync::Arc;
2775        use tokio::sync::Mutex;
2776
2777        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2778        let port = listener.local_addr().unwrap().port();
2779        drop(listener);
2780
2781        let component = HttpComponent::new();
2782        let endpoint_ctx = NoOpComponentContext;
2783        let endpoint = component
2784            .create_endpoint(&format!("http://127.0.0.1:{port}/stream"), &endpoint_ctx)
2785            .unwrap();
2786        let mut consumer = endpoint.create_consumer().unwrap();
2787
2788        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2789        let token = tokio_util::sync::CancellationToken::new();
2790        let ctx = ConsumerContext::new(tx, token.clone());
2791
2792        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2793        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2794
2795        let client = reqwest::Client::new();
2796        let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
2797
2798        let (http_result, _) = tokio::join!(send_fut, async {
2799            if let Some(mut envelope) = rx.recv().await {
2800                // Respond with Body::Stream
2801                let chunks: Vec<Result<Bytes, CamelError>> =
2802                    vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
2803                let stream = Box::pin(stream::iter(chunks));
2804                envelope.exchange.input.body = Body::Stream(StreamBody {
2805                    stream: Arc::new(Mutex::new(Some(stream))),
2806                    metadata: StreamMetadata::default(),
2807                });
2808                if let Some(reply_tx) = envelope.reply_tx {
2809                    let _ = reply_tx.send(Ok(envelope.exchange));
2810                }
2811            }
2812        });
2813
2814        let resp = http_result.unwrap();
2815        assert_eq!(resp.status().as_u16(), 200);
2816        let body = resp.text().await.unwrap();
2817        assert_eq!(body, "chunk1chunk2");
2818
2819        token.cancel();
2820    }
2821
2822    // -----------------------------------------------------------------------
2823    // 413 Content-Length limit test (Task 4)
2824    // -----------------------------------------------------------------------
2825
2826    #[tokio::test]
2827    async fn test_413_when_content_length_exceeds_limit() {
2828        use camel_component_api::ConsumerContext;
2829
2830        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2831        let port = listener.local_addr().unwrap().port();
2832        drop(listener);
2833
2834        // maxRequestBody=100 — any request declaring more than 100 bytes must get 413
2835        let component = HttpComponent::new();
2836        let endpoint_ctx = NoOpComponentContext;
2837        let endpoint = component
2838            .create_endpoint(
2839                &format!("http://127.0.0.1:{port}/upload?maxRequestBody=100"),
2840                &endpoint_ctx,
2841            )
2842            .unwrap();
2843        let mut consumer = endpoint.create_consumer().unwrap();
2844
2845        let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
2846        let token = tokio_util::sync::CancellationToken::new();
2847        let ctx = ConsumerContext::new(tx, token.clone());
2848
2849        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2850        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2851
2852        let client = reqwest::Client::new();
2853        let resp = client
2854            .post(format!("http://127.0.0.1:{port}/upload"))
2855            .header("Content-Length", "1000") // declares 1000 bytes, limit is 100
2856            .body("x".repeat(1000))
2857            .send()
2858            .await
2859            .unwrap();
2860
2861        assert_eq!(resp.status().as_u16(), 413);
2862
2863        token.cancel();
2864    }
2865
2866    /// Chunked upload without Content-Length header must NOT be rejected by maxRequestBody.
2867    /// The spec says: "If there is no Content-Length, the limit does not apply at the
2868    /// consumer level — the route is responsible."
2869    #[tokio::test]
2870    async fn test_chunked_upload_without_content_length_bypasses_limit() {
2871        use bytes::Bytes;
2872        use camel_component_api::Body;
2873        use camel_component_api::ConsumerContext;
2874        use futures::stream;
2875
2876        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2877        let port = listener.local_addr().unwrap().port();
2878        drop(listener);
2879
2880        // maxRequestBody=10 — very small limit; chunked uploads have no Content-Length
2881        let component = HttpComponent::new();
2882        let endpoint_ctx = NoOpComponentContext;
2883        let endpoint = component
2884            .create_endpoint(
2885                &format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"),
2886                &endpoint_ctx,
2887            )
2888            .unwrap();
2889        let mut consumer = endpoint.create_consumer().unwrap();
2890
2891        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
2892        let token = tokio_util::sync::CancellationToken::new();
2893        let ctx = ConsumerContext::new(tx, token.clone());
2894
2895        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2896        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2897
2898        let client = reqwest::Client::new();
2899
2900        // Use wrap_stream so reqwest sends chunked transfer encoding WITHOUT a
2901        // Content-Length header. 100 bytes exceeds the 10-byte maxRequestBody limit,
2902        // but since there's no Content-Length the 413 check must NOT fire.
2903        let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
2904            Ok(Bytes::from("y".repeat(50))),
2905            Ok(Bytes::from("y".repeat(50))),
2906        ];
2907        let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
2908        let send_fut = client
2909            .post(format!("http://127.0.0.1:{port}/upload"))
2910            .body(stream_body)
2911            .send();
2912
2913        let consumer_fut = async {
2914            // Use timeout to avoid deadlock if the handler rejects before enqueueing
2915            match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
2916                Ok(Some(mut envelope)) => {
2917                    assert!(
2918                        matches!(envelope.exchange.input.body, Body::Stream(_)),
2919                        "expected Body::Stream"
2920                    );
2921                    envelope.exchange.input.body = camel_component_api::Body::Empty;
2922                    if let Some(reply_tx) = envelope.reply_tx {
2923                        let _ = reply_tx.send(Ok(envelope.exchange));
2924                    }
2925                }
2926                Ok(None) => panic!("consumer channel closed unexpectedly"),
2927                Err(_) => {
2928                    // Timeout: the request was rejected before reaching the consumer.
2929                    // The HTTP response will carry the real status code (we check below).
2930                }
2931            }
2932        };
2933
2934        let (http_result, _) = tokio::join!(send_fut, consumer_fut);
2935
2936        let resp = http_result.unwrap();
2937        // Must NOT be 413; chunked uploads without Content-Length bypass the limit.
2938        assert_ne!(
2939            resp.status().as_u16(),
2940            413,
2941            "chunked upload must not be rejected by maxRequestBody"
2942        );
2943        assert_eq!(resp.status().as_u16(), 200);
2944
2945        token.cancel();
2946    }
2947}