Skip to main content

camel_component_http/
lib.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex, OnceLock};
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use tokio::sync::RwLock;
9use tower::Service;
10use tracing::debug;
11
12use camel_api::{BoxProcessor, CamelError, Exchange, body::Body};
13use camel_component::{Component, Consumer, Endpoint, ProducerContext};
14use camel_endpoint::parse_uri;
15
16// ---------------------------------------------------------------------------
17// HttpConfig
18// ---------------------------------------------------------------------------
19
20/// Configuration for an HTTP client (producer) endpoint.
21///
22/// # Memory Limits
23///
24/// HTTP operations enforce conservative memory limits to prevent denial-of-service
25/// attacks from untrusted network sources. These limits are significantly lower than
26/// file component limits (100MB) because HTTP typically handles API responses rather
27/// than large file transfers, and clients may be untrusted.
28///
29/// ## Default Limits
30///
31/// - **HTTP client body**: 10MB (typical API responses)
32/// - **HTTP server request**: 2MB (untrusted network input - see `HttpServerConfig`)
33/// - **HTTP server response**: 10MB (same as client - see `HttpServerConfig`)
34///
35/// ## Rationale
36///
37/// The 10MB limit for HTTP client responses is appropriate for most API interactions
38/// while providing protection against:
39/// - Malicious servers sending oversized responses
40/// - Runaway processes generating unexpectedly large payloads
41/// - Memory exhaustion attacks
42///
43/// The 2MB server request limit is even more conservative because it handles input
44/// from potentially untrusted clients on the public internet.
45///
46/// ## Overriding Limits
47///
48/// Override the default client body limit using the `maxBodySize` URI parameter:
49///
50/// ```text
51/// http://api.example.com/large-data?maxBodySize=52428800
52/// ```
53///
54/// For server endpoints, use `maxRequestBody` and `maxResponseBody` parameters:
55///
56/// ```text
57/// http://0.0.0.0:8080/upload?maxRequestBody=52428800
58/// ```
59///
60/// ## Behavior When Exceeded
61///
62/// When a body exceeds the configured limit:
63/// - An error is returned immediately
64/// - No memory is exhausted - the limit is checked before allocation
65/// - The HTTP connection is terminated cleanly
66///
67/// ## Security Considerations
68///
69/// HTTP endpoints should be treated with more caution than file endpoints because:
70/// - Clients may be unknown and untrusted
71/// - Network traffic can be spoofed or malicious
72/// - DoS attacks often exploit unbounded resource consumption
73///
74/// Only increase limits when you control both ends of the connection or when
75/// business requirements demand larger payloads.
76#[derive(Debug, Clone)]
77pub struct HttpConfig {
78    pub base_url: String,
79    pub http_method: Option<String>,
80    pub throw_exception_on_failure: bool,
81    pub ok_status_code_range: (u16, u16),
82    pub follow_redirects: bool,
83    pub connect_timeout: Duration,
84    pub response_timeout: Option<Duration>,
85    pub query_params: HashMap<String, String>,
86    // Security settings
87    pub allow_private_ips: bool,
88    pub blocked_hosts: Vec<String>,
89    // Memory limit for body materialization (in bytes)
90    pub max_body_size: usize,
91}
92
93impl HttpConfig {
94    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
95        let parts = parse_uri(uri)?;
96        if parts.scheme != "http" && parts.scheme != "https" {
97            return Err(CamelError::InvalidUri(format!(
98                "expected scheme 'http' or 'https', got '{}'",
99                parts.scheme
100            )));
101        }
102
103        let base_url = format!("{}:{}", parts.scheme, parts.path);
104
105        let http_method = parts.params.get("httpMethod").cloned();
106
107        let throw_exception_on_failure = parts
108            .params
109            .get("throwExceptionOnFailure")
110            .map(|v| v != "false")
111            .unwrap_or(true);
112
113        let ok_status_code_range = parts
114            .params
115            .get("okStatusCodeRange")
116            .and_then(|v| {
117                let (start, end) = v.split_once('-')?;
118                Some((start.parse::<u16>().ok()?, end.parse::<u16>().ok()?))
119            })
120            .unwrap_or((200, 299));
121
122        let follow_redirects = parts
123            .params
124            .get("followRedirects")
125            .map(|v| v == "true")
126            .unwrap_or(false);
127
128        let connect_timeout = parts
129            .params
130            .get("connectTimeout")
131            .and_then(|v| v.parse::<u64>().ok())
132            .map(Duration::from_millis)
133            .unwrap_or(Duration::from_millis(30000));
134
135        let response_timeout = parts
136            .params
137            .get("responseTimeout")
138            .and_then(|v| v.parse::<u64>().ok())
139            .map(Duration::from_millis);
140
141        // SSRF protection settings
142        let allow_private_ips = parts
143            .params
144            .get("allowPrivateIps")
145            .map(|v| v == "true")
146            .unwrap_or(false); // Default: block private IPs
147
148        let blocked_hosts = parts
149            .params
150            .get("blockedHosts")
151            .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
152            .unwrap_or_default();
153
154        let max_body_size = parts
155            .params
156            .get("maxBodySize")
157            .and_then(|v| v.parse::<usize>().ok())
158            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
159
160        // CAMEL_OPTIONS: params that are consumed by Camel,        // Any remaining params should be forwarded as HTTP query params
161        let camel_options = [
162            "httpMethod",
163            "throwExceptionOnFailure",
164            "okStatusCodeRange",
165            "followRedirects",
166            "connectTimeout",
167            "responseTimeout",
168            "allowPrivateIps",
169            "blockedHosts",
170            "maxBodySize",
171        ];
172
173        let query_params: HashMap<String, String> = parts
174            .params
175            .into_iter()
176            .filter(|(k, _)| !camel_options.contains(&k.as_str()))
177            .map(|(k, v)| (k.clone(), v.clone()))
178            .collect();
179
180        Ok(Self {
181            base_url,
182            http_method,
183            throw_exception_on_failure,
184            ok_status_code_range: (ok_status_code_range.0, ok_status_code_range.1),
185            follow_redirects,
186            connect_timeout,
187            response_timeout,
188            query_params,
189            allow_private_ips,
190            blocked_hosts,
191            max_body_size,
192        })
193    }
194}
195
196// ---------------------------------------------------------------------------
197// HttpServerConfig
198// ---------------------------------------------------------------------------
199
200/// Configuration for an HTTP server (consumer) endpoint.
201#[derive(Debug, Clone)]
202pub struct HttpServerConfig {
203    /// Bind address, e.g. "0.0.0.0" or "127.0.0.1".
204    pub host: String,
205    /// TCP port to listen on.
206    pub port: u16,
207    /// URL path this consumer handles, e.g. "/orders".
208    pub path: String,
209    /// Maximum request body size in bytes.
210    pub max_request_body: usize,
211    /// Maximum response body size for materializing streams in bytes.
212    pub max_response_body: usize,
213}
214
215impl HttpServerConfig {
216    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
217        let parts = parse_uri(uri)?;
218        if parts.scheme != "http" && parts.scheme != "https" {
219            return Err(CamelError::InvalidUri(format!(
220                "expected scheme 'http' or 'https', got '{}'",
221                parts.scheme
222            )));
223        }
224
225        // parts.path is everything after the scheme colon, e.g. "//0.0.0.0:8080/orders"
226        // Strip leading "//"
227        let authority_and_path = parts.path.trim_start_matches('/');
228
229        // Split on the first "/" to separate "host:port" from "/path"
230        let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
231            (&authority_and_path[..idx], &authority_and_path[idx..])
232        } else {
233            (authority_and_path, "/")
234        };
235
236        let path = if path_suffix.is_empty() {
237            "/"
238        } else {
239            path_suffix
240        }
241        .to_string();
242
243        // Parse host:port
244        let (host, port) = if let Some(colon) = authority.rfind(':') {
245            let port_str = &authority[colon + 1..];
246            match port_str.parse::<u16>() {
247                Ok(p) => (authority[..colon].to_string(), p),
248                Err(_) => {
249                    return Err(CamelError::InvalidUri(format!(
250                        "invalid port '{}' in URI '{}'",
251                        port_str, uri
252                    )));
253                }
254            }
255        } else {
256            (authority.to_string(), 80)
257        };
258
259        let max_request_body = parts
260            .params
261            .get("maxRequestBody")
262            .and_then(|v| v.parse::<usize>().ok())
263            .unwrap_or(2 * 1024 * 1024); // Default: 2MB
264
265        let max_response_body = parts
266            .params
267            .get("maxResponseBody")
268            .and_then(|v| v.parse::<usize>().ok())
269            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
270
271        Ok(Self {
272            host,
273            port,
274            path,
275            max_request_body,
276            max_response_body,
277        })
278    }
279}
280
281// ---------------------------------------------------------------------------
282// RequestEnvelope / HttpReply
283// ---------------------------------------------------------------------------
284
285/// An inbound HTTP request sent from the Axum dispatch handler to an
286/// `HttpConsumer` receive loop.
287pub struct RequestEnvelope {
288    pub method: String,
289    pub path: String,
290    pub query: String,
291    pub headers: http::HeaderMap,
292    pub body: bytes::Bytes,
293    pub reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
294}
295
296/// The HTTP response that `HttpConsumer` sends back to the Axum handler.
297#[derive(Debug, Clone)]
298pub struct HttpReply {
299    pub status: u16,
300    pub headers: Vec<(String, String)>,
301    pub body: bytes::Bytes,
302}
303
304// ---------------------------------------------------------------------------
305// DispatchTable / ServerRegistry
306// ---------------------------------------------------------------------------
307
308/// Maps URL path → channel sender for the consumer that owns that path.
309pub type DispatchTable = Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
310
311/// Handle to a running Axum server on one port.
312struct ServerHandle {
313    dispatch: DispatchTable,
314    /// Kept alive so the task isn't dropped; not used directly.
315    _task: tokio::task::JoinHandle<()>,
316}
317
318/// Process-global registry mapping port → running Axum server handle.
319pub struct ServerRegistry {
320    inner: Mutex<HashMap<u16, ServerHandle>>,
321}
322
323impl ServerRegistry {
324    /// Returns the global singleton.
325    pub fn global() -> &'static Self {
326        static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
327        INSTANCE.get_or_init(|| ServerRegistry {
328            inner: Mutex::new(HashMap::new()),
329        })
330    }
331
332    /// Returns the `DispatchTable` for `port`, spawning a new Axum server if
333    /// none is running on that port yet.
334    pub async fn get_or_spawn(
335        &'static self,
336        host: &str,
337        port: u16,
338        max_request_body: usize,
339    ) -> Result<DispatchTable, CamelError> {
340        // Fast path: check without spawning.
341        {
342            let guard = self.inner.lock().map_err(|_| {
343                CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
344            })?;
345            if let Some(handle) = guard.get(&port) {
346                return Ok(Arc::clone(&handle.dispatch));
347            }
348        }
349
350        // Slow path: need to bind and spawn.
351        let addr = format!("{}:{}", host, port);
352        let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
353            CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
354        })?;
355
356        let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
357        let dispatch_for_server = Arc::clone(&dispatch);
358        let task = tokio::spawn(run_axum_server(
359            listener,
360            dispatch_for_server,
361            max_request_body,
362        ));
363
364        // Re-acquire lock to insert — handle the race where another task won.
365        let mut guard = self.inner.lock().map_err(|_| {
366            CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
367        })?;
368        // If another task already registered this port while we were binding,
369        // abort our task (it has no routes) and use the winner's dispatch table.
370        if let Some(existing) = guard.get(&port) {
371            task.abort();
372            return Ok(Arc::clone(&existing.dispatch));
373        }
374        guard.insert(
375            port,
376            ServerHandle {
377                dispatch: Arc::clone(&dispatch),
378                _task: task,
379            },
380        );
381        Ok(dispatch)
382    }
383}
384
385// ---------------------------------------------------------------------------
386// Axum server
387// ---------------------------------------------------------------------------
388
389use axum::{
390    Router,
391    body::Body as AxumBody,
392    extract::{Request, State},
393    http::{Response, StatusCode},
394    response::IntoResponse,
395};
396
397#[derive(Clone)]
398struct AppState {
399    dispatch: DispatchTable,
400    max_request_body: usize,
401}
402
403async fn run_axum_server(
404    listener: tokio::net::TcpListener,
405    dispatch: DispatchTable,
406    max_request_body: usize,
407) {
408    let state = AppState {
409        dispatch,
410        max_request_body,
411    };
412    let app = Router::new().fallback(dispatch_handler).with_state(state);
413
414    axum::serve(listener, app).await.unwrap_or_else(|e| {
415        tracing::error!(error = %e, "Axum server error");
416    });
417}
418
419async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
420    let method = req.method().to_string();
421    let path = req.uri().path().to_string();
422    let query = req.uri().query().unwrap_or("").to_string();
423    let headers = req.headers().clone();
424
425    let body_bytes = match axum::body::to_bytes(req.into_body(), state.max_request_body).await {
426        Ok(b) => b,
427        Err(_) => {
428            return Response::builder()
429                .status(StatusCode::BAD_REQUEST)
430                .body(AxumBody::empty())
431                .expect(
432                    "Response::builder() with a known-valid status code and body is infallible",
433                );
434        }
435    };
436
437    // Look up handler for this path
438    let sender = {
439        let table = state.dispatch.read().await;
440        table.get(&path).cloned()
441    };
442
443    let Some(sender) = sender else {
444        return Response::builder()
445            .status(StatusCode::NOT_FOUND)
446            .body(AxumBody::from("No consumer registered for this path"))
447            .expect("Response::builder() with a known-valid status code and body is infallible");
448    };
449
450    let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
451    let envelope = RequestEnvelope {
452        method,
453        path,
454        query,
455        headers,
456        body: body_bytes,
457        reply_tx,
458    };
459
460    if sender.send(envelope).await.is_err() {
461        return Response::builder()
462            .status(StatusCode::SERVICE_UNAVAILABLE)
463            .body(AxumBody::from("Consumer unavailable"))
464            .expect("Response::builder() with a known-valid status code and body is infallible");
465    }
466
467    match reply_rx.await {
468        Ok(reply) => {
469            let status =
470                StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
471            let mut builder = Response::builder().status(status);
472            for (k, v) in &reply.headers {
473                builder = builder.header(k.as_str(), v.as_str());
474            }
475            builder
476                .body(AxumBody::from(reply.body))
477                .unwrap_or_else(|_| {
478                    Response::builder()
479                        .status(StatusCode::INTERNAL_SERVER_ERROR)
480                        .body(AxumBody::from("Invalid response headers from consumer"))
481                        .expect("Response::builder() with a known-valid status code and body is infallible")
482                })
483        }
484        Err(_) => Response::builder()
485            .status(StatusCode::INTERNAL_SERVER_ERROR)
486            .body(AxumBody::from("Pipeline error"))
487            .expect("Response::builder() with a known-valid status code and body is infallible"),
488    }
489}
490
491// ---------------------------------------------------------------------------
492// HttpConsumer
493// ---------------------------------------------------------------------------
494
495pub struct HttpConsumer {
496    config: HttpServerConfig,
497}
498
499impl HttpConsumer {
500    pub fn new(config: HttpServerConfig) -> Self {
501        Self { config }
502    }
503}
504
505#[async_trait::async_trait]
506impl Consumer for HttpConsumer {
507    async fn start(&mut self, ctx: camel_component::ConsumerContext) -> Result<(), CamelError> {
508        use camel_api::{Exchange, Message, body::Body};
509
510        let dispatch = ServerRegistry::global()
511            .get_or_spawn(
512                &self.config.host,
513                self.config.port,
514                self.config.max_request_body,
515            )
516            .await?;
517
518        // Create a channel for this path and register it
519        let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
520        {
521            let mut table = dispatch.write().await;
522            table.insert(self.config.path.clone(), env_tx);
523        }
524
525        let path = self.config.path.clone();
526        let cancel_token = ctx.cancel_token();
527        let max_response_body = self.config.max_response_body;
528
529        loop {
530            tokio::select! {
531                _ = ctx.cancelled() => {
532                    break;
533                }
534                envelope = env_rx.recv() => {
535                    let Some(envelope) = envelope else { break; };
536
537                    // Build Exchange from HTTP request
538                    let mut msg = Message::default();
539
540                    // Set standard Camel HTTP headers
541                    msg.set_header("CamelHttpMethod",
542                        serde_json::Value::String(envelope.method.clone()));
543                    msg.set_header("CamelHttpPath",
544                        serde_json::Value::String(envelope.path.clone()));
545                    msg.set_header("CamelHttpQuery",
546                        serde_json::Value::String(envelope.query.clone()));
547
548                    // Forward HTTP headers (skip pseudo-headers)
549                    for (k, v) in &envelope.headers {
550                        if let Ok(val_str) = v.to_str() {
551                            msg.set_header(
552                                k.as_str(),
553                                serde_json::Value::String(val_str.to_string()),
554                            );
555                        }
556                    }
557
558                    // Body: Try to convert to text if UTF-8, otherwise keep as bytes
559                    if !envelope.body.is_empty() {
560                        match std::str::from_utf8(&envelope.body) {
561                            Ok(text) => msg.body = Body::Text(text.to_string()),
562                            Err(_) => msg.body = Body::Bytes(envelope.body.clone()),
563                        }
564                    }
565
566                    #[allow(unused_mut)]
567                    let mut exchange = Exchange::new(msg);
568
569                    // Extract W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
570                    #[cfg(feature = "otel")]
571                    {
572                        let headers: HashMap<String, String> = envelope
573                            .headers
574                            .iter()
575                            .filter_map(|(k, v)| {
576                                Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
577                            })
578                            .collect();
579                        camel_otel::extract_into_exchange(&mut exchange, &headers);
580                    }
581
582                    let reply_tx = envelope.reply_tx;
583                    let sender = ctx.sender().clone();
584                    let path_clone = path.clone();
585                    let cancel = cancel_token.clone();
586
587                    // Spawn a task to handle this request concurrently
588                    //
589                    // NOTE: This spawns a separate tokio task for each incoming HTTP request to enable
590                    // true concurrent request processing. This change was introduced as part of the
591                    // pipeline concurrency feature and was NOT part of the original HttpConsumer design.
592                    //
593                    // Rationale:
594                    // 1. Without spawning per-request tasks, the send_and_wait() operation would block
595                    //    the consumer's main loop until the pipeline processing completes
596                    // 2. This blocking would prevent multiple HTTP requests from being processed
597                    //    concurrently, even when ConcurrencyModel::Concurrent is enabled on the pipeline
598                    // 3. The channel would never have multiple exchanges buffered simultaneously,
599                    //    defeating the purpose of pipeline-side concurrency
600                    // 4. By spawning a task per request, we allow the consumer loop to continue
601                    //    accepting new requests while existing ones are processed in the pipeline
602                    //
603                    // This approach effectively decouples request acceptance from pipeline processing,
604                    // allowing the channel to buffer multiple exchanges that can be processed concurrently
605                    // by the pipeline when ConcurrencyModel::Concurrent is active.
606                    tokio::spawn(async move {
607                        // Check for cancellation before sending to pipeline.
608                        // Returns 503 (Service Unavailable) instead of letting the request
609                        // enter a shutting-down pipeline. This is a behavioral change from
610                        // the pre-concurrency implementation where cancellation during
611                        // processing would result in a 500 (Internal Server Error).
612                        // 503 is more semantically correct: the server is temporarily
613                        // unable to handle the request due to shutdown.
614                        if cancel.is_cancelled() {
615                            let _ = reply_tx.send(HttpReply {
616                                status: 503,
617                                headers: vec![],
618                                body: bytes::Bytes::from("Service Unavailable"),
619                            });
620                            return;
621                        }
622
623                        // Send through pipeline and await result
624                        let (tx, rx) = tokio::sync::oneshot::channel();
625                        let envelope = camel_component::consumer::ExchangeEnvelope {
626                            exchange,
627                            reply_tx: Some(tx),
628                        };
629
630                        let result = match sender.send(envelope).await {
631                            Ok(()) => rx.await.map_err(|_| camel_api::CamelError::ChannelClosed),
632                            Err(_) => Err(camel_api::CamelError::ChannelClosed),
633                        }
634                        .and_then(|r| r);
635
636                        let reply = match result {
637                            Ok(out) => {
638                                let status = out
639                                    .input
640                                    .header("CamelHttpResponseCode")
641                                    .and_then(|v| v.as_u64())
642                                    .map(|s| s as u16)
643                                    .unwrap_or(200);
644
645                                let body_bytes = match out.input.body {
646                                    Body::Empty => bytes::Bytes::new(),
647                                    Body::Bytes(b) => b,
648                                    Body::Text(s) => bytes::Bytes::from(s.into_bytes()),
649                                    Body::Json(v) => bytes::Bytes::from(v.to_string().into_bytes()),
650                                    Body::Stream(_) => {
651                                        // Materialize stream for HTTP response
652                                        match out.input.body.into_bytes(max_response_body).await {
653                                            Ok(b) => b,
654                                            Err(e) => {
655                                                debug!(error = %e, "Failed to materialize stream body for HTTP reply");
656                                                return;
657                                            }
658                                        }
659                                    }
660                                };
661
662                                let resp_headers: Vec<(String, String)> = out
663                                    .input
664                                    .headers
665                                    .iter()
666                                    // Filter Camel internal headers
667                                    .filter(|(k, _)| !k.starts_with("Camel"))
668                                    // Filter hop-by-hop and request-only headers
669                                    // Based on Apache Camel's HttpUtil.addCommonFilters()
670                                    .filter(|(k, _)| {
671                                        !matches!(
672                                            k.to_lowercase().as_str(),
673                                            // RFC 2616 Section 4.5 - General headers
674                                            "content-length" |      // Auto-calculated by framework
675                                            "content-type" |        // Auto-calculated from body
676                                            "transfer-encoding" |   // Hop-by-hop
677                                            "connection" |          // Hop-by-hop
678                                            "cache-control" |       // Hop-by-hop
679                                            "date" |                // Auto-generated
680                                            "pragma" |              // Hop-by-hop
681                                            "trailer" |             // Hop-by-hop
682                                            "upgrade" |             // Hop-by-hop
683                                            "via" |                 // Hop-by-hop
684                                            "warning" |             // Hop-by-hop
685                                            // Request-only headers
686                                            "host" |                // Request-only
687                                            "user-agent" |          // Request-only
688                                            "accept" |              // Request-only
689                                            "accept-encoding" |     // Request-only
690                                            "accept-language" |     // Request-only
691                                            "accept-charset" |      // Request-only
692                                            "authorization" |       // Request-only (security)
693                                            "proxy-authorization" | // Request-only (security)
694                                            "cookie" |              // Request-only
695                                            "expect" |              // Request-only
696                                            "from" |                // Request-only
697                                            "if-match" |            // Request-only
698                                            "if-modified-since" |   // Request-only
699                                            "if-none-match" |       // Request-only
700                                            "if-range" |            // Request-only
701                                            "if-unmodified-since" | // Request-only
702                                            "max-forwards" |        // Request-only
703                                            "proxy-connection" |    // Request-only
704                                            "range" |               // Request-only
705                                            "referer" |             // Request-only
706                                            "te"                    // Request-only
707                                        )
708                                    })
709                                    .filter_map(|(k, v)| {
710                                        v.as_str().map(|s| (k.clone(), s.to_string()))
711                                    })
712                                    .collect();
713
714                                HttpReply {
715                                    status,
716                                    headers: resp_headers,
717                                    body: body_bytes,
718                                }
719                            }
720                            Err(e) => {
721                                tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
722                                HttpReply {
723                                    status: 500,
724                                    headers: vec![],
725                                    body: bytes::Bytes::from("Internal Server Error"),
726                                }
727                            }
728                        };
729
730                        // Reply to Axum handler (ignore error if client disconnected)
731                        let _ = reply_tx.send(reply);
732                    });
733                }
734            }
735        }
736
737        // Deregister this path
738        {
739            let mut table = dispatch.write().await;
740            table.remove(&path);
741        }
742
743        Ok(())
744    }
745
746    async fn stop(&mut self) -> Result<(), CamelError> {
747        Ok(())
748    }
749
750    fn concurrency_model(&self) -> camel_component::ConcurrencyModel {
751        camel_component::ConcurrencyModel::Concurrent { max: None }
752    }
753}
754
755// ---------------------------------------------------------------------------
756// HttpComponent / HttpsComponent
757// ---------------------------------------------------------------------------
758
759pub struct HttpComponent;
760
761impl HttpComponent {
762    pub fn new() -> Self {
763        Self
764    }
765}
766
767impl Default for HttpComponent {
768    fn default() -> Self {
769        Self::new()
770    }
771}
772
773impl Component for HttpComponent {
774    fn scheme(&self) -> &str {
775        "http"
776    }
777
778    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
779        let config = HttpConfig::from_uri(uri)?;
780        let server_config = HttpServerConfig::from_uri(uri)?;
781        let client = build_client(&config)?;
782        Ok(Box::new(HttpEndpoint {
783            uri: uri.to_string(),
784            config,
785            server_config,
786            client,
787        }))
788    }
789}
790
791pub struct HttpsComponent;
792
793impl HttpsComponent {
794    pub fn new() -> Self {
795        Self
796    }
797}
798
799impl Default for HttpsComponent {
800    fn default() -> Self {
801        Self::new()
802    }
803}
804
805impl Component for HttpsComponent {
806    fn scheme(&self) -> &str {
807        "https"
808    }
809
810    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
811        let config = HttpConfig::from_uri(uri)?;
812        let server_config = HttpServerConfig::from_uri(uri)?;
813        let client = build_client(&config)?;
814        Ok(Box::new(HttpEndpoint {
815            uri: uri.to_string(),
816            config,
817            server_config,
818            client,
819        }))
820    }
821}
822
823fn build_client(config: &HttpConfig) -> Result<reqwest::Client, CamelError> {
824    let mut builder = reqwest::Client::builder().connect_timeout(config.connect_timeout);
825
826    if !config.follow_redirects {
827        builder = builder.redirect(reqwest::redirect::Policy::none());
828    }
829
830    builder.build().map_err(|e| {
831        CamelError::EndpointCreationFailed(format!("Failed to build HTTP client: {e}"))
832    })
833}
834
835// ---------------------------------------------------------------------------
836// HttpEndpoint
837// ---------------------------------------------------------------------------
838
839struct HttpEndpoint {
840    uri: String,
841    config: HttpConfig,
842    server_config: HttpServerConfig,
843    client: reqwest::Client,
844}
845
846impl Endpoint for HttpEndpoint {
847    fn uri(&self) -> &str {
848        &self.uri
849    }
850
851    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
852        Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
853    }
854
855    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
856        Ok(BoxProcessor::new(HttpProducer {
857            config: Arc::new(self.config.clone()),
858            client: self.client.clone(),
859        }))
860    }
861}
862
863// ---------------------------------------------------------------------------
864// SSRF Protection
865// ---------------------------------------------------------------------------
866
867fn validate_url_for_ssrf(url: &str, config: &HttpConfig) -> Result<(), CamelError> {
868    let parsed = url::Url::parse(url)
869        .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
870
871    // Check blocked hosts
872    if let Some(host) = parsed.host_str()
873        && config.blocked_hosts.iter().any(|blocked| host == blocked)
874    {
875        return Err(CamelError::ProcessorError(format!(
876            "Host '{}' is blocked",
877            host
878        )));
879    }
880
881    // Check private IPs if not allowed
882    if !config.allow_private_ips
883        && let Some(host) = parsed.host()
884    {
885        match host {
886            url::Host::Ipv4(ip) => {
887                if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
888                    return Err(CamelError::ProcessorError(format!(
889                        "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
890                        ip
891                    )));
892                }
893            }
894            url::Host::Ipv6(ip) => {
895                if ip.is_loopback() {
896                    return Err(CamelError::ProcessorError(format!(
897                        "Loopback IP '{}' not allowed",
898                        ip
899                    )));
900                }
901            }
902            url::Host::Domain(domain) => {
903                // Block common internal domains
904                let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
905                if blocked_domains.contains(&domain) {
906                    return Err(CamelError::ProcessorError(format!(
907                        "Domain '{}' is not allowed",
908                        domain
909                    )));
910                }
911            }
912        }
913    }
914
915    Ok(())
916}
917
918// ---------------------------------------------------------------------------
919// HttpProducer
920// ---------------------------------------------------------------------------
921
922#[derive(Clone)]
923struct HttpProducer {
924    config: Arc<HttpConfig>,
925    client: reqwest::Client,
926}
927
928impl HttpProducer {
929    fn resolve_method(exchange: &Exchange, config: &HttpConfig) -> String {
930        if let Some(ref method) = config.http_method {
931            return method.to_uppercase();
932        }
933        if let Some(method) = exchange
934            .input
935            .header("CamelHttpMethod")
936            .and_then(|v| v.as_str())
937        {
938            return method.to_uppercase();
939        }
940        if !exchange.input.body.is_empty() {
941            return "POST".to_string();
942        }
943        "GET".to_string()
944    }
945
946    fn resolve_url(exchange: &Exchange, config: &HttpConfig) -> String {
947        if let Some(uri) = exchange
948            .input
949            .header("CamelHttpUri")
950            .and_then(|v| v.as_str())
951        {
952            let mut url = uri.to_string();
953            if let Some(path) = exchange
954                .input
955                .header("CamelHttpPath")
956                .and_then(|v| v.as_str())
957            {
958                if !url.ends_with('/') && !path.starts_with('/') {
959                    url.push('/');
960                }
961                url.push_str(path);
962            }
963            if let Some(query) = exchange
964                .input
965                .header("CamelHttpQuery")
966                .and_then(|v| v.as_str())
967            {
968                url.push('?');
969                url.push_str(query);
970            }
971            return url;
972        }
973
974        let mut url = config.base_url.clone();
975
976        if let Some(path) = exchange
977            .input
978            .header("CamelHttpPath")
979            .and_then(|v| v.as_str())
980        {
981            if !url.ends_with('/') && !path.starts_with('/') {
982                url.push('/');
983            }
984            url.push_str(path);
985        }
986
987        if let Some(query) = exchange
988            .input
989            .header("CamelHttpQuery")
990            .and_then(|v| v.as_str())
991        {
992            url.push('?');
993            url.push_str(query);
994        } else if !config.query_params.is_empty() {
995            // Forward non-Camel query params from config
996            url.push('?');
997            let query_string: String = config
998                .query_params
999                .iter()
1000                .map(|(k, v)| format!("{k}={v}"))
1001                .collect::<Vec<_>>()
1002                .join("&");
1003            url.push_str(&query_string);
1004        }
1005
1006        url
1007    }
1008
1009    fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1010        status >= range.0 && status <= range.1
1011    }
1012}
1013
1014impl Service<Exchange> for HttpProducer {
1015    type Response = Exchange;
1016    type Error = CamelError;
1017    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1018
1019    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1020        Poll::Ready(Ok(()))
1021    }
1022
1023    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1024        let config = self.config.clone();
1025        let client = self.client.clone();
1026
1027        Box::pin(async move {
1028            let method_str = HttpProducer::resolve_method(&exchange, &config);
1029            let url = HttpProducer::resolve_url(&exchange, &config);
1030
1031            // SECURITY: Validate URL for SSRF
1032            validate_url_for_ssrf(&url, &config)?;
1033
1034            debug!(
1035                correlation_id = %exchange.correlation_id(),
1036                method = %method_str,
1037                url = %url,
1038                "HTTP request"
1039            );
1040
1041            let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1042                CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1043            })?;
1044
1045            let mut request = client.request(method, &url);
1046
1047            if let Some(timeout) = config.response_timeout {
1048                request = request.timeout(timeout);
1049            }
1050
1051            // Inject W3C TraceContext headers for distributed tracing (opt-in via "otel" feature)
1052            #[cfg(feature = "otel")]
1053            {
1054                let mut otel_headers = HashMap::new();
1055                camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1056                for (k, v) in otel_headers {
1057                    if let (Ok(name), Ok(val)) = (
1058                        reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1059                        reqwest::header::HeaderValue::from_str(&v),
1060                    ) {
1061                        request = request.header(name, val);
1062                    }
1063                }
1064            }
1065
1066            for (key, value) in &exchange.input.headers {
1067                if !key.starts_with("Camel")
1068                    && let Some(val_str) = value.as_str()
1069                    && let (Ok(name), Ok(val)) = (
1070                        reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1071                        reqwest::header::HeaderValue::from_str(val_str),
1072                    )
1073                {
1074                    request = request.header(name, val);
1075                }
1076            }
1077
1078            match exchange.input.body {
1079                Body::Stream(ref s) => {
1080                    let mut stream_lock = s.stream.lock().await;
1081                    if let Some(stream) = stream_lock.take() {
1082                        request = request.body(reqwest::Body::wrap_stream(stream));
1083                    } else {
1084                        return Err(CamelError::AlreadyConsumed);
1085                    }
1086                }
1087                _ => {
1088                    // For other types, materialize with configured limit
1089                    let body = std::mem::take(&mut exchange.input.body);
1090                    let bytes = body.into_bytes(config.max_body_size).await?;
1091                    if !bytes.is_empty() {
1092                        request = request.body(bytes);
1093                    }
1094                }
1095            }
1096
1097            let response = request
1098                .send()
1099                .await
1100                .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1101
1102            let status_code = response.status().as_u16();
1103            let status_text = response
1104                .status()
1105                .canonical_reason()
1106                .unwrap_or("Unknown")
1107                .to_string();
1108
1109            for (key, value) in response.headers() {
1110                if let Ok(val_str) = value.to_str() {
1111                    exchange
1112                        .input
1113                        .set_header(key.as_str(), serde_json::Value::String(val_str.to_string()));
1114                }
1115            }
1116
1117            exchange.input.set_header(
1118                "CamelHttpResponseCode",
1119                serde_json::Value::Number(status_code.into()),
1120            );
1121            exchange.input.set_header(
1122                "CamelHttpResponseText",
1123                serde_json::Value::String(status_text.clone()),
1124            );
1125
1126            let response_body = response.bytes().await.map_err(|e| {
1127                CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1128            })?;
1129
1130            if config.throw_exception_on_failure
1131                && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1132            {
1133                return Err(CamelError::HttpOperationFailed {
1134                    method: method_str,
1135                    url,
1136                    status_code,
1137                    status_text,
1138                    response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1139                });
1140            }
1141
1142            if !response_body.is_empty() {
1143                exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1144            }
1145
1146            debug!(
1147                correlation_id = %exchange.correlation_id(),
1148                status = status_code,
1149                url = %url,
1150                "HTTP response"
1151            );
1152            Ok(exchange)
1153        })
1154    }
1155}
1156
1157#[cfg(test)]
1158mod tests {
1159    use super::*;
1160    use camel_api::Message;
1161    use std::sync::Arc;
1162    use std::time::Duration;
1163    use tokio::sync::Mutex;
1164
1165    // NullRouteController for testing
1166    struct NullRouteController;
1167    #[async_trait::async_trait]
1168    impl camel_api::RouteController for NullRouteController {
1169        async fn start_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1170            Ok(())
1171        }
1172        async fn stop_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1173            Ok(())
1174        }
1175        async fn restart_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1176            Ok(())
1177        }
1178        async fn suspend_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1179            Ok(())
1180        }
1181        async fn resume_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1182            Ok(())
1183        }
1184        fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
1185            None
1186        }
1187        async fn start_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
1188            Ok(())
1189        }
1190        async fn stop_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
1191            Ok(())
1192        }
1193    }
1194
1195    fn test_producer_ctx() -> ProducerContext {
1196        ProducerContext::new(Arc::new(Mutex::new(NullRouteController)))
1197    }
1198
1199    #[test]
1200    fn test_http_config_defaults() {
1201        let config = HttpConfig::from_uri("http://localhost:8080/api").unwrap();
1202        assert_eq!(config.base_url, "http://localhost:8080/api");
1203        assert!(config.http_method.is_none());
1204        assert!(config.throw_exception_on_failure);
1205        assert_eq!(config.ok_status_code_range, (200, 299));
1206        assert!(!config.follow_redirects);
1207        assert_eq!(config.connect_timeout, Duration::from_millis(30000));
1208        assert!(config.response_timeout.is_none());
1209    }
1210
1211    #[test]
1212    fn test_http_config_with_options() {
1213        let config = HttpConfig::from_uri(
1214            "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1215        ).unwrap();
1216        assert_eq!(config.base_url, "https://api.example.com/v1");
1217        assert_eq!(config.http_method, Some("PUT".to_string()));
1218        assert!(!config.throw_exception_on_failure);
1219        assert!(config.follow_redirects);
1220        assert_eq!(config.connect_timeout, Duration::from_millis(5000));
1221        assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1222    }
1223
1224    #[test]
1225    fn test_http_config_ok_status_range() {
1226        let config =
1227            HttpConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1228        assert_eq!(config.ok_status_code_range, (200, 204));
1229    }
1230
1231    #[test]
1232    fn test_http_config_wrong_scheme() {
1233        let result = HttpConfig::from_uri("file:/tmp");
1234        assert!(result.is_err());
1235    }
1236
1237    #[test]
1238    fn test_http_component_scheme() {
1239        let component = HttpComponent::new();
1240        assert_eq!(component.scheme(), "http");
1241    }
1242
1243    #[test]
1244    fn test_https_component_scheme() {
1245        let component = HttpsComponent::new();
1246        assert_eq!(component.scheme(), "https");
1247    }
1248
1249    #[test]
1250    fn test_http_endpoint_creates_consumer() {
1251        let component = HttpComponent::new();
1252        let endpoint = component
1253            .create_endpoint("http://0.0.0.0:19100/test")
1254            .unwrap();
1255        assert!(endpoint.create_consumer().is_ok());
1256    }
1257
1258    #[test]
1259    fn test_https_endpoint_creates_consumer() {
1260        let component = HttpsComponent::new();
1261        let endpoint = component
1262            .create_endpoint("https://0.0.0.0:8443/test")
1263            .unwrap();
1264        assert!(endpoint.create_consumer().is_ok());
1265    }
1266
1267    #[test]
1268    fn test_http_endpoint_creates_producer() {
1269        let ctx = test_producer_ctx();
1270        let component = HttpComponent::new();
1271        let endpoint = component.create_endpoint("http://localhost/api").unwrap();
1272        assert!(endpoint.create_producer(&ctx).is_ok());
1273    }
1274
1275    // -----------------------------------------------------------------------
1276    // Producer tests
1277    // -----------------------------------------------------------------------
1278
1279    async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
1280        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1281        let addr = listener.local_addr().unwrap();
1282        let url = format!("http://127.0.0.1:{}", addr.port());
1283
1284        let handle = tokio::spawn(async move {
1285            loop {
1286                if let Ok((mut stream, _)) = listener.accept().await {
1287                    tokio::spawn(async move {
1288                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1289                        let mut buf = vec![0u8; 4096];
1290                        let n = stream.read(&mut buf).await.unwrap_or(0);
1291                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
1292
1293                        let method = request.split_whitespace().next().unwrap_or("GET");
1294
1295                        let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
1296                        let response = format!(
1297                            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
1298                            body.len(),
1299                            body
1300                        );
1301                        let _ = stream.write_all(response.as_bytes()).await;
1302                    });
1303                }
1304            }
1305        });
1306
1307        (url, handle)
1308    }
1309
1310    async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
1311        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1312        let addr = listener.local_addr().unwrap();
1313        let url = format!("http://127.0.0.1:{}", addr.port());
1314
1315        let handle = tokio::spawn(async move {
1316            loop {
1317                if let Ok((mut stream, _)) = listener.accept().await {
1318                    let status = status;
1319                    tokio::spawn(async move {
1320                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1321                        let mut buf = vec![0u8; 4096];
1322                        let _ = stream.read(&mut buf).await;
1323
1324                        let status_text = match status {
1325                            404 => "Not Found",
1326                            500 => "Internal Server Error",
1327                            _ => "Error",
1328                        };
1329                        let body = "error body";
1330                        let response = format!(
1331                            "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
1332                            status,
1333                            status_text,
1334                            body.len(),
1335                            body
1336                        );
1337                        let _ = stream.write_all(response.as_bytes()).await;
1338                    });
1339                }
1340            }
1341        });
1342
1343        (url, handle)
1344    }
1345
1346    #[tokio::test]
1347    async fn test_http_producer_get_request() {
1348        use tower::ServiceExt;
1349
1350        let (url, _handle) = start_test_server().await;
1351        let ctx = test_producer_ctx();
1352
1353        let component = HttpComponent::new();
1354        let endpoint = component
1355            .create_endpoint(&format!("{url}/api/test?allowPrivateIps=true"))
1356            .unwrap();
1357        let producer = endpoint.create_producer(&ctx).unwrap();
1358
1359        let exchange = Exchange::new(Message::default());
1360        let result = producer.oneshot(exchange).await.unwrap();
1361
1362        let status = result
1363            .input
1364            .header("CamelHttpResponseCode")
1365            .and_then(|v| v.as_u64())
1366            .unwrap();
1367        assert_eq!(status, 200);
1368
1369        assert!(!result.input.body.is_empty());
1370    }
1371
1372    #[tokio::test]
1373    async fn test_http_producer_post_with_body() {
1374        use tower::ServiceExt;
1375
1376        let (url, _handle) = start_test_server().await;
1377        let ctx = test_producer_ctx();
1378
1379        let component = HttpComponent::new();
1380        let endpoint = component
1381            .create_endpoint(&format!("{url}/api/data?allowPrivateIps=true"))
1382            .unwrap();
1383        let producer = endpoint.create_producer(&ctx).unwrap();
1384
1385        let exchange = Exchange::new(Message::new("request body"));
1386        let result = producer.oneshot(exchange).await.unwrap();
1387
1388        let status = result
1389            .input
1390            .header("CamelHttpResponseCode")
1391            .and_then(|v| v.as_u64())
1392            .unwrap();
1393        assert_eq!(status, 200);
1394    }
1395
1396    #[tokio::test]
1397    async fn test_http_producer_method_from_header() {
1398        use tower::ServiceExt;
1399
1400        let (url, _handle) = start_test_server().await;
1401        let ctx = test_producer_ctx();
1402
1403        let component = HttpComponent::new();
1404        let endpoint = component
1405            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1406            .unwrap();
1407        let producer = endpoint.create_producer(&ctx).unwrap();
1408
1409        let mut exchange = Exchange::new(Message::default());
1410        exchange.input.set_header(
1411            "CamelHttpMethod",
1412            serde_json::Value::String("DELETE".to_string()),
1413        );
1414
1415        let result = producer.oneshot(exchange).await.unwrap();
1416        let status = result
1417            .input
1418            .header("CamelHttpResponseCode")
1419            .and_then(|v| v.as_u64())
1420            .unwrap();
1421        assert_eq!(status, 200);
1422    }
1423
1424    #[tokio::test]
1425    async fn test_http_producer_forced_method() {
1426        use tower::ServiceExt;
1427
1428        let (url, _handle) = start_test_server().await;
1429        let ctx = test_producer_ctx();
1430
1431        let component = HttpComponent::new();
1432        let endpoint = component
1433            .create_endpoint(&format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"))
1434            .unwrap();
1435        let producer = endpoint.create_producer(&ctx).unwrap();
1436
1437        let exchange = Exchange::new(Message::default());
1438        let result = producer.oneshot(exchange).await.unwrap();
1439
1440        let status = result
1441            .input
1442            .header("CamelHttpResponseCode")
1443            .and_then(|v| v.as_u64())
1444            .unwrap();
1445        assert_eq!(status, 200);
1446    }
1447
1448    #[tokio::test]
1449    async fn test_http_producer_throw_exception_on_failure() {
1450        use tower::ServiceExt;
1451
1452        let (url, _handle) = start_status_server(404).await;
1453        let ctx = test_producer_ctx();
1454
1455        let component = HttpComponent::new();
1456        let endpoint = component
1457            .create_endpoint(&format!("{url}/not-found?allowPrivateIps=true"))
1458            .unwrap();
1459        let producer = endpoint.create_producer(&ctx).unwrap();
1460
1461        let exchange = Exchange::new(Message::default());
1462        let result = producer.oneshot(exchange).await;
1463        assert!(result.is_err());
1464
1465        match result.unwrap_err() {
1466            CamelError::HttpOperationFailed { status_code, .. } => {
1467                assert_eq!(status_code, 404);
1468            }
1469            e => panic!("Expected HttpOperationFailed, got: {e}"),
1470        }
1471    }
1472
1473    #[tokio::test]
1474    async fn test_http_producer_no_throw_on_failure() {
1475        use tower::ServiceExt;
1476
1477        let (url, _handle) = start_status_server(500).await;
1478        let ctx = test_producer_ctx();
1479
1480        let component = HttpComponent::new();
1481        let endpoint = component
1482            .create_endpoint(&format!(
1483                "{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"
1484            ))
1485            .unwrap();
1486        let producer = endpoint.create_producer(&ctx).unwrap();
1487
1488        let exchange = Exchange::new(Message::default());
1489        let result = producer.oneshot(exchange).await.unwrap();
1490
1491        let status = result
1492            .input
1493            .header("CamelHttpResponseCode")
1494            .and_then(|v| v.as_u64())
1495            .unwrap();
1496        assert_eq!(status, 500);
1497    }
1498
1499    #[tokio::test]
1500    async fn test_http_producer_uri_override() {
1501        use tower::ServiceExt;
1502
1503        let (url, _handle) = start_test_server().await;
1504        let ctx = test_producer_ctx();
1505
1506        let component = HttpComponent::new();
1507        let endpoint = component
1508            .create_endpoint("http://localhost:1/does-not-exist?allowPrivateIps=true")
1509            .unwrap();
1510        let producer = endpoint.create_producer(&ctx).unwrap();
1511
1512        let mut exchange = Exchange::new(Message::default());
1513        exchange.input.set_header(
1514            "CamelHttpUri",
1515            serde_json::Value::String(format!("{url}/api")),
1516        );
1517
1518        let result = producer.oneshot(exchange).await.unwrap();
1519        let status = result
1520            .input
1521            .header("CamelHttpResponseCode")
1522            .and_then(|v| v.as_u64())
1523            .unwrap();
1524        assert_eq!(status, 200);
1525    }
1526
1527    #[tokio::test]
1528    async fn test_http_producer_response_headers_mapped() {
1529        use tower::ServiceExt;
1530
1531        let (url, _handle) = start_test_server().await;
1532        let ctx = test_producer_ctx();
1533
1534        let component = HttpComponent::new();
1535        let endpoint = component
1536            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1537            .unwrap();
1538        let producer = endpoint.create_producer(&ctx).unwrap();
1539
1540        let exchange = Exchange::new(Message::default());
1541        let result = producer.oneshot(exchange).await.unwrap();
1542
1543        assert!(
1544            result.input.header("content-type").is_some()
1545                || result.input.header("Content-Type").is_some()
1546        );
1547        assert!(result.input.header("CamelHttpResponseText").is_some());
1548    }
1549
1550    // -----------------------------------------------------------------------
1551    // Bug fix tests: Client configuration per-endpoint
1552    // -----------------------------------------------------------------------
1553
1554    async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
1555        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1556        let addr = listener.local_addr().unwrap();
1557        let url = format!("http://127.0.0.1:{}", addr.port());
1558
1559        let handle = tokio::spawn(async move {
1560            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1561            loop {
1562                if let Ok((mut stream, _)) = listener.accept().await {
1563                    tokio::spawn(async move {
1564                        let mut buf = vec![0u8; 4096];
1565                        let n = stream.read(&mut buf).await.unwrap_or(0);
1566                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
1567
1568                        // Check if this is a request to /final
1569                        if request.contains("GET /final") {
1570                            let body = r#"{"status":"final"}"#;
1571                            let response = format!(
1572                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1573                                body.len(),
1574                                body
1575                            );
1576                            let _ = stream.write_all(response.as_bytes()).await;
1577                        } else {
1578                            // Redirect to /final
1579                            let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
1580                            let _ = stream.write_all(response.as_bytes()).await;
1581                        }
1582                    });
1583                }
1584            }
1585        });
1586
1587        (url, handle)
1588    }
1589
1590    #[tokio::test]
1591    async fn test_follow_redirects_false_does_not_follow() {
1592        use tower::ServiceExt;
1593
1594        let (url, _handle) = start_redirect_server().await;
1595        let ctx = test_producer_ctx();
1596
1597        let component = HttpComponent::new();
1598        let endpoint = component
1599            .create_endpoint(&format!(
1600                "{url}?followRedirects=false&throwExceptionOnFailure=false&allowPrivateIps=true"
1601            ))
1602            .unwrap();
1603        let producer = endpoint.create_producer(&ctx).unwrap();
1604
1605        let exchange = Exchange::new(Message::default());
1606        let result = producer.oneshot(exchange).await.unwrap();
1607
1608        // Should get 302, NOT follow redirect to 200
1609        let status = result
1610            .input
1611            .header("CamelHttpResponseCode")
1612            .and_then(|v| v.as_u64())
1613            .unwrap();
1614        assert_eq!(
1615            status, 302,
1616            "Should NOT follow redirect when followRedirects=false"
1617        );
1618    }
1619
1620    #[tokio::test]
1621    async fn test_follow_redirects_true_follows_redirect() {
1622        use tower::ServiceExt;
1623
1624        let (url, _handle) = start_redirect_server().await;
1625        let ctx = test_producer_ctx();
1626
1627        let component = HttpComponent::new();
1628        let endpoint = component
1629            .create_endpoint(&format!("{url}?followRedirects=true&allowPrivateIps=true"))
1630            .unwrap();
1631        let producer = endpoint.create_producer(&ctx).unwrap();
1632
1633        let exchange = Exchange::new(Message::default());
1634        let result = producer.oneshot(exchange).await.unwrap();
1635
1636        // Should follow redirect and get 200
1637        let status = result
1638            .input
1639            .header("CamelHttpResponseCode")
1640            .and_then(|v| v.as_u64())
1641            .unwrap();
1642        assert_eq!(
1643            status, 200,
1644            "Should follow redirect when followRedirects=true"
1645        );
1646    }
1647
1648    #[tokio::test]
1649    async fn test_query_params_forwarded_to_http_request() {
1650        use tower::ServiceExt;
1651
1652        let (url, _handle) = start_test_server().await;
1653        let ctx = test_producer_ctx();
1654
1655        let component = HttpComponent::new();
1656        // apiKey is NOT a Camel option, should be forwarded as query param
1657        let endpoint = component
1658            .create_endpoint(&format!(
1659                "{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"
1660            ))
1661            .unwrap();
1662        let producer = endpoint.create_producer(&ctx).unwrap();
1663
1664        let exchange = Exchange::new(Message::default());
1665        let result = producer.oneshot(exchange).await.unwrap();
1666
1667        // The test server returns the request info in response
1668        // We just verify it succeeds (the query param was sent)
1669        let status = result
1670            .input
1671            .header("CamelHttpResponseCode")
1672            .and_then(|v| v.as_u64())
1673            .unwrap();
1674        assert_eq!(status, 200);
1675    }
1676
1677    #[tokio::test]
1678    async fn test_non_camel_query_params_are_forwarded() {
1679        // This test verifies Bug #3 fix: non-Camel options should be forwarded
1680        // We'll test the config parsing, not the actual HTTP call
1681        let config = HttpConfig::from_uri(
1682            "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
1683        )
1684        .unwrap();
1685
1686        // apiKey and token are NOT Camel options, should be forwarded
1687        assert!(
1688            config.query_params.contains_key("apiKey"),
1689            "apiKey should be preserved"
1690        );
1691        assert!(
1692            config.query_params.contains_key("token"),
1693            "token should be preserved"
1694        );
1695        assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
1696        assert_eq!(config.query_params.get("token").unwrap(), "abc456");
1697
1698        // httpMethod IS a Camel option, should NOT be in query_params
1699        assert!(
1700            !config.query_params.contains_key("httpMethod"),
1701            "httpMethod should not be forwarded"
1702        );
1703    }
1704
1705    // -----------------------------------------------------------------------
1706    // SSRF Protection tests
1707    // -----------------------------------------------------------------------
1708
1709    #[tokio::test]
1710    async fn test_http_producer_blocks_metadata_endpoint() {
1711        use tower::ServiceExt;
1712
1713        let ctx = test_producer_ctx();
1714        let component = HttpComponent::new();
1715        let endpoint = component
1716            .create_endpoint("http://example.com/api?allowPrivateIps=false")
1717            .unwrap();
1718        let producer = endpoint.create_producer(&ctx).unwrap();
1719
1720        let mut exchange = Exchange::new(Message::default());
1721        exchange.input.set_header(
1722            "CamelHttpUri",
1723            serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
1724        );
1725
1726        let result = producer.oneshot(exchange).await;
1727        assert!(result.is_err(), "Should block AWS metadata endpoint");
1728
1729        let err = result.unwrap_err();
1730        assert!(
1731            err.to_string().contains("Private IP"),
1732            "Error should mention private IP blocking, got: {}",
1733            err
1734        );
1735    }
1736
1737    #[test]
1738    fn test_ssrf_config_defaults() {
1739        let config = HttpConfig::from_uri("http://example.com/api").unwrap();
1740        assert!(
1741            !config.allow_private_ips,
1742            "Private IPs should be blocked by default"
1743        );
1744        assert!(
1745            config.blocked_hosts.is_empty(),
1746            "Blocked hosts should be empty by default"
1747        );
1748    }
1749
1750    #[test]
1751    fn test_ssrf_config_allow_private_ips() {
1752        let config = HttpConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
1753        assert!(
1754            config.allow_private_ips,
1755            "Private IPs should be allowed when explicitly set"
1756        );
1757    }
1758
1759    #[test]
1760    fn test_ssrf_config_blocked_hosts() {
1761        let config =
1762            HttpConfig::from_uri("http://example.com/api?blockedHosts=evil.com,malware.net")
1763                .unwrap();
1764        assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
1765    }
1766
1767    #[tokio::test]
1768    async fn test_http_producer_blocks_localhost() {
1769        use tower::ServiceExt;
1770
1771        let ctx = test_producer_ctx();
1772        let component = HttpComponent::new();
1773        let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1774        let producer = endpoint.create_producer(&ctx).unwrap();
1775
1776        let mut exchange = Exchange::new(Message::default());
1777        exchange.input.set_header(
1778            "CamelHttpUri",
1779            serde_json::Value::String("http://localhost:8080/internal".to_string()),
1780        );
1781
1782        let result = producer.oneshot(exchange).await;
1783        assert!(result.is_err(), "Should block localhost");
1784    }
1785
1786    #[tokio::test]
1787    async fn test_http_producer_blocks_loopback_ip() {
1788        use tower::ServiceExt;
1789
1790        let ctx = test_producer_ctx();
1791        let component = HttpComponent::new();
1792        let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1793        let producer = endpoint.create_producer(&ctx).unwrap();
1794
1795        let mut exchange = Exchange::new(Message::default());
1796        exchange.input.set_header(
1797            "CamelHttpUri",
1798            serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
1799        );
1800
1801        let result = producer.oneshot(exchange).await;
1802        assert!(result.is_err(), "Should block loopback IP");
1803    }
1804
1805    #[tokio::test]
1806    async fn test_http_producer_allows_private_ip_when_enabled() {
1807        use tower::ServiceExt;
1808
1809        let ctx = test_producer_ctx();
1810        let component = HttpComponent::new();
1811        // With allowPrivateIps=true, the validation should pass
1812        // (actual connection will fail, but that's expected)
1813        let endpoint = component
1814            .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true")
1815            .unwrap();
1816        let producer = endpoint.create_producer(&ctx).unwrap();
1817
1818        let exchange = Exchange::new(Message::default());
1819
1820        // The request will fail because we can't connect, but it should NOT fail
1821        // due to SSRF protection
1822        let result = producer.oneshot(exchange).await;
1823        // We expect connection error, not SSRF error
1824        if let Err(ref e) = result {
1825            let err_str = e.to_string();
1826            assert!(
1827                !err_str.contains("Private IP") && !err_str.contains("not allowed"),
1828                "Should not be SSRF error, got: {}",
1829                err_str
1830            );
1831        }
1832    }
1833
1834    // -----------------------------------------------------------------------
1835    // HttpServerConfig tests
1836    // -----------------------------------------------------------------------
1837
1838    #[test]
1839    fn test_http_server_config_parse() {
1840        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
1841        assert_eq!(cfg.host, "0.0.0.0");
1842        assert_eq!(cfg.port, 8080);
1843        assert_eq!(cfg.path, "/orders");
1844    }
1845
1846    #[test]
1847    fn test_http_server_config_default_path() {
1848        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
1849        assert_eq!(cfg.path, "/");
1850    }
1851
1852    #[test]
1853    fn test_http_server_config_wrong_scheme() {
1854        assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
1855    }
1856
1857    #[test]
1858    fn test_http_server_config_invalid_port() {
1859        assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
1860    }
1861
1862    #[test]
1863    fn test_request_envelope_and_reply_are_send() {
1864        fn assert_send<T: Send>() {}
1865        assert_send::<RequestEnvelope>();
1866        assert_send::<HttpReply>();
1867    }
1868
1869    // -----------------------------------------------------------------------
1870    // ServerRegistry tests
1871    // -----------------------------------------------------------------------
1872
1873    #[test]
1874    fn test_server_registry_global_is_singleton() {
1875        let r1 = ServerRegistry::global();
1876        let r2 = ServerRegistry::global();
1877        assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
1878    }
1879
1880    // -----------------------------------------------------------------------
1881    // Axum dispatch handler tests
1882    // -----------------------------------------------------------------------
1883
1884    #[tokio::test]
1885    async fn test_dispatch_handler_returns_404_for_unknown_path() {
1886        let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
1887        // Nothing registered in the dispatch table
1888        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1889        let port = listener.local_addr().unwrap().port();
1890        tokio::spawn(run_axum_server(listener, dispatch, 2 * 1024 * 1024));
1891
1892        // Wait for server to start
1893        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1894
1895        let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
1896            .await
1897            .unwrap();
1898        assert_eq!(resp.status().as_u16(), 404);
1899    }
1900
1901    // -----------------------------------------------------------------------
1902    // HttpConsumer tests
1903    // -----------------------------------------------------------------------
1904
1905    #[tokio::test]
1906    async fn test_http_consumer_start_registers_path() {
1907        use camel_component::ConsumerContext;
1908
1909        // Get an OS-assigned free port
1910        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1911        let port = listener.local_addr().unwrap().port();
1912        drop(listener); // Release port — ServerRegistry will rebind it
1913
1914        let consumer_cfg = HttpServerConfig {
1915            host: "127.0.0.1".to_string(),
1916            port,
1917            path: "/ping".to_string(),
1918            max_request_body: 2 * 1024 * 1024,
1919            max_response_body: 10 * 1024 * 1024,
1920        };
1921        let mut consumer = HttpConsumer::new(consumer_cfg);
1922
1923        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component::ExchangeEnvelope>(16);
1924        let token = tokio_util::sync::CancellationToken::new();
1925        let ctx = ConsumerContext::new(tx, token.clone());
1926
1927        tokio::spawn(async move {
1928            consumer.start(ctx).await.unwrap();
1929        });
1930
1931        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1932
1933        let client = reqwest::Client::new();
1934        let resp_future = client
1935            .post(format!("http://127.0.0.1:{port}/ping"))
1936            .body("hello world")
1937            .send();
1938
1939        let (http_result, _) = tokio::join!(resp_future, async {
1940            if let Some(mut envelope) = rx.recv().await {
1941                // Set a custom status code
1942                envelope.exchange.input.set_header(
1943                    "CamelHttpResponseCode",
1944                    serde_json::Value::Number(201.into()),
1945                );
1946                if let Some(reply_tx) = envelope.reply_tx {
1947                    let _ = reply_tx.send(Ok(envelope.exchange));
1948                }
1949            }
1950        });
1951
1952        let resp = http_result.unwrap();
1953        assert_eq!(resp.status().as_u16(), 201);
1954
1955        token.cancel();
1956    }
1957
1958    // -----------------------------------------------------------------------
1959    // Integration tests
1960    // -----------------------------------------------------------------------
1961
1962    #[tokio::test]
1963    async fn test_integration_single_consumer_round_trip() {
1964        use camel_component::{ConsumerContext, ExchangeEnvelope};
1965
1966        // Get an OS-assigned free port (ephemeral)
1967        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1968        let port = listener.local_addr().unwrap().port();
1969        drop(listener); // Release — ServerRegistry will rebind
1970
1971        let component = HttpComponent::new();
1972        let endpoint = component
1973            .create_endpoint(&format!("http://127.0.0.1:{port}/echo"))
1974            .unwrap();
1975        let mut consumer = endpoint.create_consumer().unwrap();
1976
1977        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
1978        let token = tokio_util::sync::CancellationToken::new();
1979        let ctx = ConsumerContext::new(tx, token.clone());
1980
1981        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
1982        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1983
1984        let client = reqwest::Client::new();
1985        let send_fut = client
1986            .post(format!("http://127.0.0.1:{port}/echo"))
1987            .header("Content-Type", "text/plain")
1988            .body("ping")
1989            .send();
1990
1991        let (http_result, _) = tokio::join!(send_fut, async {
1992            if let Some(mut envelope) = rx.recv().await {
1993                assert_eq!(
1994                    envelope.exchange.input.header("CamelHttpMethod"),
1995                    Some(&serde_json::Value::String("POST".into()))
1996                );
1997                assert_eq!(
1998                    envelope.exchange.input.header("CamelHttpPath"),
1999                    Some(&serde_json::Value::String("/echo".into()))
2000                );
2001                envelope.exchange.input.body = camel_api::body::Body::Text("pong".to_string());
2002                if let Some(reply_tx) = envelope.reply_tx {
2003                    let _ = reply_tx.send(Ok(envelope.exchange));
2004                }
2005            }
2006        });
2007
2008        let resp = http_result.unwrap();
2009        assert_eq!(resp.status().as_u16(), 200);
2010        let body = resp.text().await.unwrap();
2011        assert_eq!(body, "pong");
2012
2013        token.cancel();
2014    }
2015
2016    #[tokio::test]
2017    async fn test_integration_two_consumers_shared_port() {
2018        use camel_component::{ConsumerContext, ExchangeEnvelope};
2019
2020        // Get an OS-assigned free port (ephemeral)
2021        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2022        let port = listener.local_addr().unwrap().port();
2023        drop(listener);
2024
2025        let component = HttpComponent::new();
2026
2027        // Consumer A: /hello
2028        let endpoint_a = component
2029            .create_endpoint(&format!("http://127.0.0.1:{port}/hello"))
2030            .unwrap();
2031        let mut consumer_a = endpoint_a.create_consumer().unwrap();
2032
2033        // Consumer B: /world
2034        let endpoint_b = component
2035            .create_endpoint(&format!("http://127.0.0.1:{port}/world"))
2036            .unwrap();
2037        let mut consumer_b = endpoint_b.create_consumer().unwrap();
2038
2039        let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2040        let token_a = tokio_util::sync::CancellationToken::new();
2041        let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
2042
2043        let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2044        let token_b = tokio_util::sync::CancellationToken::new();
2045        let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
2046
2047        tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
2048        tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
2049        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2050
2051        let client = reqwest::Client::new();
2052
2053        // Request to /hello
2054        let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
2055        let (resp_hello, _) = tokio::join!(fut_hello, async {
2056            if let Some(mut envelope) = rx_a.recv().await {
2057                envelope.exchange.input.body =
2058                    camel_api::body::Body::Text("hello-response".to_string());
2059                if let Some(reply_tx) = envelope.reply_tx {
2060                    let _ = reply_tx.send(Ok(envelope.exchange));
2061                }
2062            }
2063        });
2064
2065        // Request to /world
2066        let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
2067        let (resp_world, _) = tokio::join!(fut_world, async {
2068            if let Some(mut envelope) = rx_b.recv().await {
2069                envelope.exchange.input.body =
2070                    camel_api::body::Body::Text("world-response".to_string());
2071                if let Some(reply_tx) = envelope.reply_tx {
2072                    let _ = reply_tx.send(Ok(envelope.exchange));
2073                }
2074            }
2075        });
2076
2077        let body_a = resp_hello.unwrap().text().await.unwrap();
2078        let body_b = resp_world.unwrap().text().await.unwrap();
2079
2080        assert_eq!(body_a, "hello-response");
2081        assert_eq!(body_b, "world-response");
2082
2083        token_a.cancel();
2084        token_b.cancel();
2085    }
2086
2087    #[tokio::test]
2088    async fn test_integration_unregistered_path_returns_404() {
2089        use camel_component::{ConsumerContext, ExchangeEnvelope};
2090
2091        // Get an OS-assigned free port (ephemeral)
2092        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2093        let port = listener.local_addr().unwrap().port();
2094        drop(listener);
2095
2096        let component = HttpComponent::new();
2097        let endpoint = component
2098            .create_endpoint(&format!("http://127.0.0.1:{port}/registered"))
2099            .unwrap();
2100        let mut consumer = endpoint.create_consumer().unwrap();
2101
2102        let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2103        let token = tokio_util::sync::CancellationToken::new();
2104        let ctx = ConsumerContext::new(tx, token.clone());
2105
2106        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2107        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2108
2109        let client = reqwest::Client::new();
2110        let resp = client
2111            .get(format!("http://127.0.0.1:{port}/not-there"))
2112            .send()
2113            .await
2114            .unwrap();
2115        assert_eq!(resp.status().as_u16(), 404);
2116
2117        token.cancel();
2118    }
2119
2120    #[test]
2121    fn test_http_consumer_declares_concurrent() {
2122        use camel_component::ConcurrencyModel;
2123
2124        let config = HttpServerConfig {
2125            host: "127.0.0.1".to_string(),
2126            port: 19999,
2127            path: "/test".to_string(),
2128            max_request_body: 2 * 1024 * 1024,
2129            max_response_body: 10 * 1024 * 1024,
2130        };
2131        let consumer = HttpConsumer::new(config);
2132        assert_eq!(
2133            consumer.concurrency_model(),
2134            ConcurrencyModel::Concurrent { max: None }
2135        );
2136    }
2137
2138    // -----------------------------------------------------------------------
2139    // OpenTelemetry propagation tests (only compiled with "otel" feature)
2140    // -----------------------------------------------------------------------
2141
2142    #[cfg(feature = "otel")]
2143    mod otel_tests {
2144        use super::*;
2145        use camel_api::Message;
2146        use tower::ServiceExt;
2147
2148        #[tokio::test]
2149        async fn test_producer_injects_traceparent_header() {
2150            let (url, _handle) = start_test_server_with_header_capture().await;
2151            let ctx = test_producer_ctx();
2152
2153            let component = HttpComponent::new();
2154            let endpoint = component
2155                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
2156                .unwrap();
2157            let producer = endpoint.create_producer(&ctx).unwrap();
2158
2159            // Create exchange with an OTel context by extracting from a traceparent header
2160            let mut exchange = Exchange::new(Message::default());
2161            let mut headers = std::collections::HashMap::new();
2162            headers.insert(
2163                "traceparent".to_string(),
2164                "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
2165            );
2166            camel_otel::extract_into_exchange(&mut exchange, &headers);
2167
2168            let result = producer.oneshot(exchange).await.unwrap();
2169
2170            // Verify request succeeded
2171            let status = result
2172                .input
2173                .header("CamelHttpResponseCode")
2174                .and_then(|v| v.as_u64())
2175                .unwrap();
2176            assert_eq!(status, 200);
2177
2178            // The test server echoes back the received traceparent header
2179            let traceparent = result.input.header("x-received-traceparent");
2180            assert!(
2181                traceparent.is_some(),
2182                "traceparent header should have been sent"
2183            );
2184
2185            let traceparent_str = traceparent.unwrap().as_str().unwrap();
2186            // Verify format: version-traceid-spanid-flags
2187            let parts: Vec<&str> = traceparent_str.split('-').collect();
2188            assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2189            assert_eq!(parts[0], "00", "version should be 00");
2190            assert_eq!(
2191                parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2192                "trace-id should match"
2193            );
2194            assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
2195            assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
2196        }
2197
2198        #[tokio::test]
2199        async fn test_consumer_extracts_traceparent_header() {
2200            use camel_component::{ConsumerContext, ExchangeEnvelope};
2201
2202            // Get an OS-assigned free port
2203            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2204            let port = listener.local_addr().unwrap().port();
2205            drop(listener);
2206
2207            let component = HttpComponent::new();
2208            let endpoint = component
2209                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"))
2210                .unwrap();
2211            let mut consumer = endpoint.create_consumer().unwrap();
2212
2213            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2214            let token = tokio_util::sync::CancellationToken::new();
2215            let ctx = ConsumerContext::new(tx, token.clone());
2216
2217            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2218            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2219
2220            // Send request with traceparent header
2221            let client = reqwest::Client::new();
2222            let send_fut = client
2223                .post(format!("http://127.0.0.1:{port}/trace"))
2224                .header(
2225                    "traceparent",
2226                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2227                )
2228                .body("test")
2229                .send();
2230
2231            let (http_result, _) = tokio::join!(send_fut, async {
2232                if let Some(envelope) = rx.recv().await {
2233                    // Verify the exchange has a valid OTel context by re-injecting it
2234                    // and checking the traceparent matches
2235                    let mut injected_headers = std::collections::HashMap::new();
2236                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2237
2238                    assert!(
2239                        injected_headers.contains_key("traceparent"),
2240                        "Exchange should have traceparent after extraction"
2241                    );
2242
2243                    let traceparent = injected_headers.get("traceparent").unwrap();
2244                    let parts: Vec<&str> = traceparent.split('-').collect();
2245                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2246                    assert_eq!(
2247                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2248                        "Trace ID should match the original traceparent header"
2249                    );
2250
2251                    if let Some(reply_tx) = envelope.reply_tx {
2252                        let _ = reply_tx.send(Ok(envelope.exchange));
2253                    }
2254                }
2255            });
2256
2257            let resp = http_result.unwrap();
2258            assert_eq!(resp.status().as_u16(), 200);
2259
2260            token.cancel();
2261        }
2262
2263        #[tokio::test]
2264        async fn test_consumer_extracts_mixed_case_traceparent_header() {
2265            use camel_component::{ConsumerContext, ExchangeEnvelope};
2266
2267            // Get an OS-assigned free port
2268            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2269            let port = listener.local_addr().unwrap().port();
2270            drop(listener);
2271
2272            let component = HttpComponent::new();
2273            let endpoint = component
2274                .create_endpoint(&format!("http://127.0.0.1:{port}/trace"))
2275                .unwrap();
2276            let mut consumer = endpoint.create_consumer().unwrap();
2277
2278            let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2279            let token = tokio_util::sync::CancellationToken::new();
2280            let ctx = ConsumerContext::new(tx, token.clone());
2281
2282            tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2283            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2284
2285            // Send request with MIXED-CASE TraceParent header (not lowercase)
2286            let client = reqwest::Client::new();
2287            let send_fut = client
2288                .post(format!("http://127.0.0.1:{port}/trace"))
2289                .header(
2290                    "TraceParent",
2291                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2292                )
2293                .body("test")
2294                .send();
2295
2296            let (http_result, _) = tokio::join!(send_fut, async {
2297                if let Some(envelope) = rx.recv().await {
2298                    // Verify the exchange has a valid OTel context by re-injecting it
2299                    // and checking the traceparent matches
2300                    let mut injected_headers = HashMap::new();
2301                    camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2302
2303                    assert!(
2304                        injected_headers.contains_key("traceparent"),
2305                        "Exchange should have traceparent after extraction from mixed-case header"
2306                    );
2307
2308                    let traceparent = injected_headers.get("traceparent").unwrap();
2309                    let parts: Vec<&str> = traceparent.split('-').collect();
2310                    assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2311                    assert_eq!(
2312                        parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2313                        "Trace ID should match the original mixed-case TraceParent header"
2314                    );
2315
2316                    if let Some(reply_tx) = envelope.reply_tx {
2317                        let _ = reply_tx.send(Ok(envelope.exchange));
2318                    }
2319                }
2320            });
2321
2322            let resp = http_result.unwrap();
2323            assert_eq!(resp.status().as_u16(), 200);
2324
2325            token.cancel();
2326        }
2327
2328        #[tokio::test]
2329        async fn test_producer_no_trace_context_no_crash() {
2330            let (url, _handle) = start_test_server().await;
2331            let ctx = test_producer_ctx();
2332
2333            let component = HttpComponent::new();
2334            let endpoint = component
2335                .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
2336                .unwrap();
2337            let producer = endpoint.create_producer(&ctx).unwrap();
2338
2339            // Create exchange with default (empty) otel_context - no trace context
2340            let exchange = Exchange::new(Message::default());
2341
2342            // Should succeed without panic
2343            let result = producer.oneshot(exchange).await.unwrap();
2344
2345            // Verify request succeeded
2346            let status = result
2347                .input
2348                .header("CamelHttpResponseCode")
2349                .and_then(|v| v.as_u64())
2350                .unwrap();
2351            assert_eq!(status, 200);
2352        }
2353
2354        /// Test server that captures and echoes back the traceparent header
2355        async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
2356            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2357            let addr = listener.local_addr().unwrap();
2358            let url = format!("http://127.0.0.1:{}", addr.port());
2359
2360            let handle = tokio::spawn(async move {
2361                loop {
2362                    if let Ok((mut stream, _)) = listener.accept().await {
2363                        tokio::spawn(async move {
2364                            use tokio::io::{AsyncReadExt, AsyncWriteExt};
2365                            let mut buf = vec![0u8; 8192];
2366                            let n = stream.read(&mut buf).await.unwrap_or(0);
2367                            let request = String::from_utf8_lossy(&buf[..n]).to_string();
2368
2369                            // Extract traceparent header from request
2370                            let traceparent = request
2371                                .lines()
2372                                .find(|line| line.to_lowercase().starts_with("traceparent:"))
2373                                .map(|line| {
2374                                    line.split(':')
2375                                        .nth(1)
2376                                        .map(|s| s.trim().to_string())
2377                                        .unwrap_or_default()
2378                                })
2379                                .unwrap_or_default();
2380
2381                            let body =
2382                                format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
2383                            let response = format!(
2384                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
2385                                body.len(),
2386                                traceparent,
2387                                body
2388                            );
2389                            let _ = stream.write_all(response.as_bytes()).await;
2390                        });
2391                    }
2392                }
2393            });
2394
2395            (url, handle)
2396        }
2397    }
2398}