Skip to main content

camel_component_http/
lib.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex, OnceLock};
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use tokio::sync::RwLock;
9use tower::Service;
10use tracing::debug;
11
12use camel_api::{BoxProcessor, CamelError, Exchange, body::Body};
13use camel_component::{Component, Consumer, Endpoint, ProducerContext};
14use camel_endpoint::parse_uri;
15
16// ---------------------------------------------------------------------------
17// HttpConfig
18// ---------------------------------------------------------------------------
19
20/// Configuration for an HTTP client (producer) endpoint.
21///
22/// # Memory Limits
23///
24/// HTTP operations enforce conservative memory limits to prevent denial-of-service
25/// attacks from untrusted network sources. These limits are significantly lower than
26/// file component limits (100MB) because HTTP typically handles API responses rather
27/// than large file transfers, and clients may be untrusted.
28///
29/// ## Default Limits
30///
31/// - **HTTP client body**: 10MB (typical API responses)
32/// - **HTTP server request**: 2MB (untrusted network input - see `HttpServerConfig`)
33/// - **HTTP server response**: 10MB (same as client - see `HttpServerConfig`)
34///
35/// ## Rationale
36///
37/// The 10MB limit for HTTP client responses is appropriate for most API interactions
38/// while providing protection against:
39/// - Malicious servers sending oversized responses
40/// - Runaway processes generating unexpectedly large payloads
41/// - Memory exhaustion attacks
42///
43/// The 2MB server request limit is even more conservative because it handles input
44/// from potentially untrusted clients on the public internet.
45///
46/// ## Overriding Limits
47///
48/// Override the default client body limit using the `maxBodySize` URI parameter:
49///
50/// ```text
51/// http://api.example.com/large-data?maxBodySize=52428800
52/// ```
53///
54/// For server endpoints, use `maxRequestBody` and `maxResponseBody` parameters:
55///
56/// ```text
57/// http://0.0.0.0:8080/upload?maxRequestBody=52428800
58/// ```
59///
60/// ## Behavior When Exceeded
61///
62/// When a body exceeds the configured limit:
63/// - An error is returned immediately
64/// - No memory is exhausted - the limit is checked before allocation
65/// - The HTTP connection is terminated cleanly
66///
67/// ## Security Considerations
68///
69/// HTTP endpoints should be treated with more caution than file endpoints because:
70/// - Clients may be unknown and untrusted
71/// - Network traffic can be spoofed or malicious
72/// - DoS attacks often exploit unbounded resource consumption
73///
74/// Only increase limits when you control both ends of the connection or when
75/// business requirements demand larger payloads.
76#[derive(Debug, Clone)]
77pub struct HttpConfig {
78    pub base_url: String,
79    pub http_method: Option<String>,
80    pub throw_exception_on_failure: bool,
81    pub ok_status_code_range: (u16, u16),
82    pub follow_redirects: bool,
83    pub connect_timeout: Duration,
84    pub response_timeout: Option<Duration>,
85    pub query_params: HashMap<String, String>,
86    // Security settings
87    pub allow_private_ips: bool,
88    pub blocked_hosts: Vec<String>,
89    // Memory limit for body materialization (in bytes)
90    pub max_body_size: usize,
91}
92
93impl HttpConfig {
94    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
95        let parts = parse_uri(uri)?;
96        if parts.scheme != "http" && parts.scheme != "https" {
97            return Err(CamelError::InvalidUri(format!(
98                "expected scheme 'http' or 'https', got '{}'",
99                parts.scheme
100            )));
101        }
102
103        let base_url = format!("{}:{}", parts.scheme, parts.path);
104
105        let http_method = parts.params.get("httpMethod").cloned();
106
107        let throw_exception_on_failure = parts
108            .params
109            .get("throwExceptionOnFailure")
110            .map(|v| v != "false")
111            .unwrap_or(true);
112
113        let ok_status_code_range = parts
114            .params
115            .get("okStatusCodeRange")
116            .and_then(|v| {
117                let (start, end) = v.split_once('-')?;
118                Some((start.parse::<u16>().ok()?, end.parse::<u16>().ok()?))
119            })
120            .unwrap_or((200, 299));
121
122        let follow_redirects = parts
123            .params
124            .get("followRedirects")
125            .map(|v| v == "true")
126            .unwrap_or(false);
127
128        let connect_timeout = parts
129            .params
130            .get("connectTimeout")
131            .and_then(|v| v.parse::<u64>().ok())
132            .map(Duration::from_millis)
133            .unwrap_or(Duration::from_millis(30000));
134
135        let response_timeout = parts
136            .params
137            .get("responseTimeout")
138            .and_then(|v| v.parse::<u64>().ok())
139            .map(Duration::from_millis);
140
141        // SSRF protection settings
142        let allow_private_ips = parts
143            .params
144            .get("allowPrivateIps")
145            .map(|v| v == "true")
146            .unwrap_or(false); // Default: block private IPs
147
148        let blocked_hosts = parts
149            .params
150            .get("blockedHosts")
151            .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
152            .unwrap_or_default();
153
154        let max_body_size = parts
155            .params
156            .get("maxBodySize")
157            .and_then(|v| v.parse::<usize>().ok())
158            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
159
160        // CAMEL_OPTIONS: params that are consumed by Camel,        // Any remaining params should be forwarded as HTTP query params
161        let camel_options = [
162            "httpMethod",
163            "throwExceptionOnFailure",
164            "okStatusCodeRange",
165            "followRedirects",
166            "connectTimeout",
167            "responseTimeout",
168            "allowPrivateIps",
169            "blockedHosts",
170            "maxBodySize",
171        ];
172
173        let query_params: HashMap<String, String> = parts
174            .params
175            .into_iter()
176            .filter(|(k, _)| !camel_options.contains(&k.as_str()))
177            .map(|(k, v)| (k.clone(), v.clone()))
178            .collect();
179
180        Ok(Self {
181            base_url,
182            http_method,
183            throw_exception_on_failure,
184            ok_status_code_range: (ok_status_code_range.0, ok_status_code_range.1),
185            follow_redirects,
186            connect_timeout,
187            response_timeout,
188            query_params,
189            allow_private_ips,
190            blocked_hosts,
191            max_body_size,
192        })
193    }
194}
195
196// ---------------------------------------------------------------------------
197// HttpServerConfig
198// ---------------------------------------------------------------------------
199
200/// Configuration for an HTTP server (consumer) endpoint.
201#[derive(Debug, Clone)]
202pub struct HttpServerConfig {
203    /// Bind address, e.g. "0.0.0.0" or "127.0.0.1".
204    pub host: String,
205    /// TCP port to listen on.
206    pub port: u16,
207    /// URL path this consumer handles, e.g. "/orders".
208    pub path: String,
209    /// Maximum request body size in bytes.
210    pub max_request_body: usize,
211    /// Maximum response body size for materializing streams in bytes.
212    pub max_response_body: usize,
213}
214
215impl HttpServerConfig {
216    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
217        let parts = parse_uri(uri)?;
218        if parts.scheme != "http" && parts.scheme != "https" {
219            return Err(CamelError::InvalidUri(format!(
220                "expected scheme 'http' or 'https', got '{}'",
221                parts.scheme
222            )));
223        }
224
225        // parts.path is everything after the scheme colon, e.g. "//0.0.0.0:8080/orders"
226        // Strip leading "//"
227        let authority_and_path = parts.path.trim_start_matches('/');
228
229        // Split on the first "/" to separate "host:port" from "/path"
230        let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
231            (&authority_and_path[..idx], &authority_and_path[idx..])
232        } else {
233            (authority_and_path, "/")
234        };
235
236        let path = if path_suffix.is_empty() {
237            "/"
238        } else {
239            path_suffix
240        }
241        .to_string();
242
243        // Parse host:port
244        let (host, port) = if let Some(colon) = authority.rfind(':') {
245            let port_str = &authority[colon + 1..];
246            match port_str.parse::<u16>() {
247                Ok(p) => (authority[..colon].to_string(), p),
248                Err(_) => {
249                    return Err(CamelError::InvalidUri(format!(
250                        "invalid port '{}' in URI '{}'",
251                        port_str, uri
252                    )));
253                }
254            }
255        } else {
256            (authority.to_string(), 80)
257        };
258
259        let max_request_body = parts
260            .params
261            .get("maxRequestBody")
262            .and_then(|v| v.parse::<usize>().ok())
263            .unwrap_or(2 * 1024 * 1024); // Default: 2MB
264
265        let max_response_body = parts
266            .params
267            .get("maxResponseBody")
268            .and_then(|v| v.parse::<usize>().ok())
269            .unwrap_or(10 * 1024 * 1024); // Default: 10MB
270
271        Ok(Self {
272            host,
273            port,
274            path,
275            max_request_body,
276            max_response_body,
277        })
278    }
279}
280
281// ---------------------------------------------------------------------------
282// RequestEnvelope / HttpReply
283// ---------------------------------------------------------------------------
284
285/// An inbound HTTP request sent from the Axum dispatch handler to an
286/// `HttpConsumer` receive loop.
287pub struct RequestEnvelope {
288    pub method: String,
289    pub path: String,
290    pub query: String,
291    pub headers: http::HeaderMap,
292    pub body: bytes::Bytes,
293    pub reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
294}
295
296/// The HTTP response that `HttpConsumer` sends back to the Axum handler.
297#[derive(Debug, Clone)]
298pub struct HttpReply {
299    pub status: u16,
300    pub headers: Vec<(String, String)>,
301    pub body: bytes::Bytes,
302}
303
304// ---------------------------------------------------------------------------
305// DispatchTable / ServerRegistry
306// ---------------------------------------------------------------------------
307
308/// Maps URL path → channel sender for the consumer that owns that path.
309pub type DispatchTable = Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
310
311/// Handle to a running Axum server on one port.
312struct ServerHandle {
313    dispatch: DispatchTable,
314    /// Kept alive so the task isn't dropped; not used directly.
315    _task: tokio::task::JoinHandle<()>,
316}
317
318/// Process-global registry mapping port → running Axum server handle.
319pub struct ServerRegistry {
320    inner: Mutex<HashMap<u16, ServerHandle>>,
321}
322
323impl ServerRegistry {
324    /// Returns the global singleton.
325    pub fn global() -> &'static Self {
326        static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
327        INSTANCE.get_or_init(|| ServerRegistry {
328            inner: Mutex::new(HashMap::new()),
329        })
330    }
331
332    /// Returns the `DispatchTable` for `port`, spawning a new Axum server if
333    /// none is running on that port yet.
334    pub async fn get_or_spawn(
335        &'static self,
336        host: &str,
337        port: u16,
338        max_request_body: usize,
339    ) -> Result<DispatchTable, CamelError> {
340        // Fast path: check without spawning.
341        {
342            let guard = self.inner.lock().map_err(|_| {
343                CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
344            })?;
345            if let Some(handle) = guard.get(&port) {
346                return Ok(Arc::clone(&handle.dispatch));
347            }
348        }
349
350        // Slow path: need to bind and spawn.
351        let addr = format!("{}:{}", host, port);
352        let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
353            CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
354        })?;
355
356        let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
357        let dispatch_for_server = Arc::clone(&dispatch);
358        let task = tokio::spawn(run_axum_server(
359            listener,
360            dispatch_for_server,
361            max_request_body,
362        ));
363
364        // Re-acquire lock to insert — handle the race where another task won.
365        let mut guard = self.inner.lock().map_err(|_| {
366            CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
367        })?;
368        // If another task already registered this port while we were binding,
369        // abort our task (it has no routes) and use the winner's dispatch table.
370        if let Some(existing) = guard.get(&port) {
371            task.abort();
372            return Ok(Arc::clone(&existing.dispatch));
373        }
374        guard.insert(
375            port,
376            ServerHandle {
377                dispatch: Arc::clone(&dispatch),
378                _task: task,
379            },
380        );
381        Ok(dispatch)
382    }
383}
384
385// ---------------------------------------------------------------------------
386// Axum server
387// ---------------------------------------------------------------------------
388
389use axum::{
390    Router,
391    body::Body as AxumBody,
392    extract::{Request, State},
393    http::{Response, StatusCode},
394    response::IntoResponse,
395};
396
397#[derive(Clone)]
398struct AppState {
399    dispatch: DispatchTable,
400    max_request_body: usize,
401}
402
403async fn run_axum_server(
404    listener: tokio::net::TcpListener,
405    dispatch: DispatchTable,
406    max_request_body: usize,
407) {
408    let state = AppState {
409        dispatch,
410        max_request_body,
411    };
412    let app = Router::new().fallback(dispatch_handler).with_state(state);
413
414    axum::serve(listener, app).await.unwrap_or_else(|e| {
415        tracing::error!(error = %e, "Axum server error");
416    });
417}
418
419async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
420    let method = req.method().to_string();
421    let path = req.uri().path().to_string();
422    let query = req.uri().query().unwrap_or("").to_string();
423    let headers = req.headers().clone();
424
425    let body_bytes = match axum::body::to_bytes(req.into_body(), state.max_request_body).await {
426        Ok(b) => b,
427        Err(_) => {
428            return Response::builder()
429                .status(StatusCode::BAD_REQUEST)
430                .body(AxumBody::empty())
431                .expect(
432                    "Response::builder() with a known-valid status code and body is infallible",
433                );
434        }
435    };
436
437    // Look up handler for this path
438    let sender = {
439        let table = state.dispatch.read().await;
440        table.get(&path).cloned()
441    };
442
443    let Some(sender) = sender else {
444        return Response::builder()
445            .status(StatusCode::NOT_FOUND)
446            .body(AxumBody::from("No consumer registered for this path"))
447            .expect("Response::builder() with a known-valid status code and body is infallible");
448    };
449
450    let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
451    let envelope = RequestEnvelope {
452        method,
453        path,
454        query,
455        headers,
456        body: body_bytes,
457        reply_tx,
458    };
459
460    if sender.send(envelope).await.is_err() {
461        return Response::builder()
462            .status(StatusCode::SERVICE_UNAVAILABLE)
463            .body(AxumBody::from("Consumer unavailable"))
464            .expect("Response::builder() with a known-valid status code and body is infallible");
465    }
466
467    match reply_rx.await {
468        Ok(reply) => {
469            let status =
470                StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
471            let mut builder = Response::builder().status(status);
472            for (k, v) in &reply.headers {
473                builder = builder.header(k.as_str(), v.as_str());
474            }
475            builder
476                .body(AxumBody::from(reply.body))
477                .unwrap_or_else(|_| {
478                    Response::builder()
479                        .status(StatusCode::INTERNAL_SERVER_ERROR)
480                        .body(AxumBody::from("Invalid response headers from consumer"))
481                        .expect("Response::builder() with a known-valid status code and body is infallible")
482                })
483        }
484        Err(_) => Response::builder()
485            .status(StatusCode::INTERNAL_SERVER_ERROR)
486            .body(AxumBody::from("Pipeline error"))
487            .expect("Response::builder() with a known-valid status code and body is infallible"),
488    }
489}
490
491// ---------------------------------------------------------------------------
492// HttpConsumer
493// ---------------------------------------------------------------------------
494
495pub struct HttpConsumer {
496    config: HttpServerConfig,
497}
498
499impl HttpConsumer {
500    pub fn new(config: HttpServerConfig) -> Self {
501        Self { config }
502    }
503}
504
505#[async_trait::async_trait]
506impl Consumer for HttpConsumer {
507    async fn start(&mut self, ctx: camel_component::ConsumerContext) -> Result<(), CamelError> {
508        use camel_api::{Exchange, Message, body::Body};
509
510        let dispatch = ServerRegistry::global()
511            .get_or_spawn(
512                &self.config.host,
513                self.config.port,
514                self.config.max_request_body,
515            )
516            .await?;
517
518        // Create a channel for this path and register it
519        let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
520        {
521            let mut table = dispatch.write().await;
522            table.insert(self.config.path.clone(), env_tx);
523        }
524
525        let path = self.config.path.clone();
526        let cancel_token = ctx.cancel_token();
527        let max_response_body = self.config.max_response_body;
528
529        loop {
530            tokio::select! {
531                _ = ctx.cancelled() => {
532                    break;
533                }
534                envelope = env_rx.recv() => {
535                    let Some(envelope) = envelope else { break; };
536
537                    // Build Exchange from HTTP request
538                    let mut msg = Message::default();
539
540                    // Set standard Camel HTTP headers
541                    msg.set_header("CamelHttpMethod",
542                        serde_json::Value::String(envelope.method.clone()));
543                    msg.set_header("CamelHttpPath",
544                        serde_json::Value::String(envelope.path.clone()));
545                    msg.set_header("CamelHttpQuery",
546                        serde_json::Value::String(envelope.query.clone()));
547
548                    // Forward HTTP headers (skip pseudo-headers)
549                    for (k, v) in &envelope.headers {
550                        if let Ok(val_str) = v.to_str() {
551                            msg.set_header(
552                                k.as_str(),
553                                serde_json::Value::String(val_str.to_string()),
554                            );
555                        }
556                    }
557
558                    // Body: Try to convert to text if UTF-8, otherwise keep as bytes
559                    if !envelope.body.is_empty() {
560                        match std::str::from_utf8(&envelope.body) {
561                            Ok(text) => msg.body = Body::Text(text.to_string()),
562                            Err(_) => msg.body = Body::Bytes(envelope.body.clone()),
563                        }
564                    }
565
566                    let exchange = Exchange::new(msg);
567                    let reply_tx = envelope.reply_tx;
568                    let sender = ctx.sender().clone();
569                    let path_clone = path.clone();
570                    let cancel = cancel_token.clone();
571
572                    // Spawn a task to handle this request concurrently
573                    //
574                    // NOTE: This spawns a separate tokio task for each incoming HTTP request to enable
575                    // true concurrent request processing. This change was introduced as part of the
576                    // pipeline concurrency feature and was NOT part of the original HttpConsumer design.
577                    //
578                    // Rationale:
579                    // 1. Without spawning per-request tasks, the send_and_wait() operation would block
580                    //    the consumer's main loop until the pipeline processing completes
581                    // 2. This blocking would prevent multiple HTTP requests from being processed
582                    //    concurrently, even when ConcurrencyModel::Concurrent is enabled on the pipeline
583                    // 3. The channel would never have multiple exchanges buffered simultaneously,
584                    //    defeating the purpose of pipeline-side concurrency
585                    // 4. By spawning a task per request, we allow the consumer loop to continue
586                    //    accepting new requests while existing ones are processed in the pipeline
587                    //
588                    // This approach effectively decouples request acceptance from pipeline processing,
589                    // allowing the channel to buffer multiple exchanges that can be processed concurrently
590                    // by the pipeline when ConcurrencyModel::Concurrent is active.
591                    tokio::spawn(async move {
592                        // Check for cancellation before sending to pipeline.
593                        // Returns 503 (Service Unavailable) instead of letting the request
594                        // enter a shutting-down pipeline. This is a behavioral change from
595                        // the pre-concurrency implementation where cancellation during
596                        // processing would result in a 500 (Internal Server Error).
597                        // 503 is more semantically correct: the server is temporarily
598                        // unable to handle the request due to shutdown.
599                        if cancel.is_cancelled() {
600                            let _ = reply_tx.send(HttpReply {
601                                status: 503,
602                                headers: vec![],
603                                body: bytes::Bytes::from("Service Unavailable"),
604                            });
605                            return;
606                        }
607
608                        // Send through pipeline and await result
609                        let (tx, rx) = tokio::sync::oneshot::channel();
610                        let envelope = camel_component::consumer::ExchangeEnvelope {
611                            exchange,
612                            reply_tx: Some(tx),
613                        };
614
615                        let result = match sender.send(envelope).await {
616                            Ok(()) => rx.await.map_err(|_| camel_api::CamelError::ChannelClosed),
617                            Err(_) => Err(camel_api::CamelError::ChannelClosed),
618                        }
619                        .and_then(|r| r);
620
621                        let reply = match result {
622                            Ok(out) => {
623                                let status = out
624                                    .input
625                                    .header("CamelHttpResponseCode")
626                                    .and_then(|v| v.as_u64())
627                                    .map(|s| s as u16)
628                                    .unwrap_or(200);
629
630                                let body_bytes = match out.input.body {
631                                    Body::Empty => bytes::Bytes::new(),
632                                    Body::Bytes(b) => b,
633                                    Body::Text(s) => bytes::Bytes::from(s.into_bytes()),
634                                    Body::Json(v) => bytes::Bytes::from(v.to_string().into_bytes()),
635                                    Body::Stream(_) => {
636                                        // Materialize stream for HTTP response
637                                        match out.input.body.into_bytes(max_response_body).await {
638                                            Ok(b) => b,
639                                            Err(e) => {
640                                                debug!(error = %e, "Failed to materialize stream body for HTTP reply");
641                                                return;
642                                            }
643                                        }
644                                    }
645                                };
646
647                                let resp_headers: Vec<(String, String)> = out
648                                    .input
649                                    .headers
650                                    .iter()
651                                    // Filter Camel internal headers
652                                    .filter(|(k, _)| !k.starts_with("Camel"))
653                                    // Filter hop-by-hop and request-only headers
654                                    // Based on Apache Camel's HttpUtil.addCommonFilters()
655                                    .filter(|(k, _)| {
656                                        !matches!(
657                                            k.to_lowercase().as_str(),
658                                            // RFC 2616 Section 4.5 - General headers
659                                            "content-length" |      // Auto-calculated by framework
660                                            "content-type" |        // Auto-calculated from body
661                                            "transfer-encoding" |   // Hop-by-hop
662                                            "connection" |          // Hop-by-hop
663                                            "cache-control" |       // Hop-by-hop
664                                            "date" |                // Auto-generated
665                                            "pragma" |              // Hop-by-hop
666                                            "trailer" |             // Hop-by-hop
667                                            "upgrade" |             // Hop-by-hop
668                                            "via" |                 // Hop-by-hop
669                                            "warning" |             // Hop-by-hop
670                                            // Request-only headers
671                                            "host" |                // Request-only
672                                            "user-agent" |          // Request-only
673                                            "accept" |              // Request-only
674                                            "accept-encoding" |     // Request-only
675                                            "accept-language" |     // Request-only
676                                            "accept-charset" |      // Request-only
677                                            "authorization" |       // Request-only (security)
678                                            "proxy-authorization" | // Request-only (security)
679                                            "cookie" |              // Request-only
680                                            "expect" |              // Request-only
681                                            "from" |                // Request-only
682                                            "if-match" |            // Request-only
683                                            "if-modified-since" |   // Request-only
684                                            "if-none-match" |       // Request-only
685                                            "if-range" |            // Request-only
686                                            "if-unmodified-since" | // Request-only
687                                            "max-forwards" |        // Request-only
688                                            "proxy-connection" |    // Request-only
689                                            "range" |               // Request-only
690                                            "referer" |             // Request-only
691                                            "te"                    // Request-only
692                                        )
693                                    })
694                                    .filter_map(|(k, v)| {
695                                        v.as_str().map(|s| (k.clone(), s.to_string()))
696                                    })
697                                    .collect();
698
699                                HttpReply {
700                                    status,
701                                    headers: resp_headers,
702                                    body: body_bytes,
703                                }
704                            }
705                            Err(e) => {
706                                tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
707                                HttpReply {
708                                    status: 500,
709                                    headers: vec![],
710                                    body: bytes::Bytes::from("Internal Server Error"),
711                                }
712                            }
713                        };
714
715                        // Reply to Axum handler (ignore error if client disconnected)
716                        let _ = reply_tx.send(reply);
717                    });
718                }
719            }
720        }
721
722        // Deregister this path
723        {
724            let mut table = dispatch.write().await;
725            table.remove(&path);
726        }
727
728        Ok(())
729    }
730
731    async fn stop(&mut self) -> Result<(), CamelError> {
732        Ok(())
733    }
734
735    fn concurrency_model(&self) -> camel_component::ConcurrencyModel {
736        camel_component::ConcurrencyModel::Concurrent { max: None }
737    }
738}
739
740// ---------------------------------------------------------------------------
741// HttpComponent / HttpsComponent
742// ---------------------------------------------------------------------------
743
744pub struct HttpComponent;
745
746impl HttpComponent {
747    pub fn new() -> Self {
748        Self
749    }
750}
751
752impl Default for HttpComponent {
753    fn default() -> Self {
754        Self::new()
755    }
756}
757
758impl Component for HttpComponent {
759    fn scheme(&self) -> &str {
760        "http"
761    }
762
763    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
764        let config = HttpConfig::from_uri(uri)?;
765        let server_config = HttpServerConfig::from_uri(uri)?;
766        let client = build_client(&config)?;
767        Ok(Box::new(HttpEndpoint {
768            uri: uri.to_string(),
769            config,
770            server_config,
771            client,
772        }))
773    }
774}
775
776pub struct HttpsComponent;
777
778impl HttpsComponent {
779    pub fn new() -> Self {
780        Self
781    }
782}
783
784impl Default for HttpsComponent {
785    fn default() -> Self {
786        Self::new()
787    }
788}
789
790impl Component for HttpsComponent {
791    fn scheme(&self) -> &str {
792        "https"
793    }
794
795    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
796        let config = HttpConfig::from_uri(uri)?;
797        let server_config = HttpServerConfig::from_uri(uri)?;
798        let client = build_client(&config)?;
799        Ok(Box::new(HttpEndpoint {
800            uri: uri.to_string(),
801            config,
802            server_config,
803            client,
804        }))
805    }
806}
807
808fn build_client(config: &HttpConfig) -> Result<reqwest::Client, CamelError> {
809    let mut builder = reqwest::Client::builder().connect_timeout(config.connect_timeout);
810
811    if !config.follow_redirects {
812        builder = builder.redirect(reqwest::redirect::Policy::none());
813    }
814
815    builder.build().map_err(|e| {
816        CamelError::EndpointCreationFailed(format!("Failed to build HTTP client: {e}"))
817    })
818}
819
820// ---------------------------------------------------------------------------
821// HttpEndpoint
822// ---------------------------------------------------------------------------
823
824struct HttpEndpoint {
825    uri: String,
826    config: HttpConfig,
827    server_config: HttpServerConfig,
828    client: reqwest::Client,
829}
830
831impl Endpoint for HttpEndpoint {
832    fn uri(&self) -> &str {
833        &self.uri
834    }
835
836    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
837        Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
838    }
839
840    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
841        Ok(BoxProcessor::new(HttpProducer {
842            config: Arc::new(self.config.clone()),
843            client: self.client.clone(),
844        }))
845    }
846}
847
848// ---------------------------------------------------------------------------
849// SSRF Protection
850// ---------------------------------------------------------------------------
851
852fn validate_url_for_ssrf(url: &str, config: &HttpConfig) -> Result<(), CamelError> {
853    let parsed = url::Url::parse(url)
854        .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
855
856    // Check blocked hosts
857    if let Some(host) = parsed.host_str()
858        && config.blocked_hosts.iter().any(|blocked| host == blocked)
859    {
860        return Err(CamelError::ProcessorError(format!(
861            "Host '{}' is blocked",
862            host
863        )));
864    }
865
866    // Check private IPs if not allowed
867    if !config.allow_private_ips
868        && let Some(host) = parsed.host()
869    {
870        match host {
871            url::Host::Ipv4(ip) => {
872                if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
873                    return Err(CamelError::ProcessorError(format!(
874                        "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
875                        ip
876                    )));
877                }
878            }
879            url::Host::Ipv6(ip) => {
880                if ip.is_loopback() {
881                    return Err(CamelError::ProcessorError(format!(
882                        "Loopback IP '{}' not allowed",
883                        ip
884                    )));
885                }
886            }
887            url::Host::Domain(domain) => {
888                // Block common internal domains
889                let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
890                if blocked_domains.contains(&domain) {
891                    return Err(CamelError::ProcessorError(format!(
892                        "Domain '{}' is not allowed",
893                        domain
894                    )));
895                }
896            }
897        }
898    }
899
900    Ok(())
901}
902
903// ---------------------------------------------------------------------------
904// HttpProducer
905// ---------------------------------------------------------------------------
906
907#[derive(Clone)]
908struct HttpProducer {
909    config: Arc<HttpConfig>,
910    client: reqwest::Client,
911}
912
913impl HttpProducer {
914    fn resolve_method(exchange: &Exchange, config: &HttpConfig) -> String {
915        if let Some(ref method) = config.http_method {
916            return method.to_uppercase();
917        }
918        if let Some(method) = exchange
919            .input
920            .header("CamelHttpMethod")
921            .and_then(|v| v.as_str())
922        {
923            return method.to_uppercase();
924        }
925        if !exchange.input.body.is_empty() {
926            return "POST".to_string();
927        }
928        "GET".to_string()
929    }
930
931    fn resolve_url(exchange: &Exchange, config: &HttpConfig) -> String {
932        if let Some(uri) = exchange
933            .input
934            .header("CamelHttpUri")
935            .and_then(|v| v.as_str())
936        {
937            let mut url = uri.to_string();
938            if let Some(path) = exchange
939                .input
940                .header("CamelHttpPath")
941                .and_then(|v| v.as_str())
942            {
943                if !url.ends_with('/') && !path.starts_with('/') {
944                    url.push('/');
945                }
946                url.push_str(path);
947            }
948            if let Some(query) = exchange
949                .input
950                .header("CamelHttpQuery")
951                .and_then(|v| v.as_str())
952            {
953                url.push('?');
954                url.push_str(query);
955            }
956            return url;
957        }
958
959        let mut url = config.base_url.clone();
960
961        if let Some(path) = exchange
962            .input
963            .header("CamelHttpPath")
964            .and_then(|v| v.as_str())
965        {
966            if !url.ends_with('/') && !path.starts_with('/') {
967                url.push('/');
968            }
969            url.push_str(path);
970        }
971
972        if let Some(query) = exchange
973            .input
974            .header("CamelHttpQuery")
975            .and_then(|v| v.as_str())
976        {
977            url.push('?');
978            url.push_str(query);
979        } else if !config.query_params.is_empty() {
980            // Forward non-Camel query params from config
981            url.push('?');
982            let query_string: String = config
983                .query_params
984                .iter()
985                .map(|(k, v)| format!("{k}={v}"))
986                .collect::<Vec<_>>()
987                .join("&");
988            url.push_str(&query_string);
989        }
990
991        url
992    }
993
994    fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
995        status >= range.0 && status <= range.1
996    }
997}
998
999impl Service<Exchange> for HttpProducer {
1000    type Response = Exchange;
1001    type Error = CamelError;
1002    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1003
1004    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1005        Poll::Ready(Ok(()))
1006    }
1007
1008    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1009        let config = self.config.clone();
1010        let client = self.client.clone();
1011
1012        Box::pin(async move {
1013            let method_str = HttpProducer::resolve_method(&exchange, &config);
1014            let url = HttpProducer::resolve_url(&exchange, &config);
1015
1016            // SECURITY: Validate URL for SSRF
1017            validate_url_for_ssrf(&url, &config)?;
1018
1019            debug!(
1020                correlation_id = %exchange.correlation_id(),
1021                method = %method_str,
1022                url = %url,
1023                "HTTP request"
1024            );
1025
1026            let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1027                CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1028            })?;
1029
1030            let mut request = client.request(method, &url);
1031
1032            if let Some(timeout) = config.response_timeout {
1033                request = request.timeout(timeout);
1034            }
1035
1036            for (key, value) in &exchange.input.headers {
1037                if !key.starts_with("Camel")
1038                    && let Some(val_str) = value.as_str()
1039                    && let (Ok(name), Ok(val)) = (
1040                        reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1041                        reqwest::header::HeaderValue::from_str(val_str),
1042                    )
1043                {
1044                    request = request.header(name, val);
1045                }
1046            }
1047
1048            match exchange.input.body {
1049                Body::Stream(ref s) => {
1050                    let mut stream_lock = s.stream.lock().await;
1051                    if let Some(stream) = stream_lock.take() {
1052                        request = request.body(reqwest::Body::wrap_stream(stream));
1053                    } else {
1054                        return Err(CamelError::AlreadyConsumed);
1055                    }
1056                }
1057                _ => {
1058                    // For other types, materialize with configured limit
1059                    let body = std::mem::take(&mut exchange.input.body);
1060                    let bytes = body.into_bytes(config.max_body_size).await?;
1061                    if !bytes.is_empty() {
1062                        request = request.body(bytes);
1063                    }
1064                }
1065            }
1066
1067            let response = request
1068                .send()
1069                .await
1070                .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1071
1072            let status_code = response.status().as_u16();
1073            let status_text = response
1074                .status()
1075                .canonical_reason()
1076                .unwrap_or("Unknown")
1077                .to_string();
1078
1079            for (key, value) in response.headers() {
1080                if let Ok(val_str) = value.to_str() {
1081                    exchange
1082                        .input
1083                        .set_header(key.as_str(), serde_json::Value::String(val_str.to_string()));
1084                }
1085            }
1086
1087            exchange.input.set_header(
1088                "CamelHttpResponseCode",
1089                serde_json::Value::Number(status_code.into()),
1090            );
1091            exchange.input.set_header(
1092                "CamelHttpResponseText",
1093                serde_json::Value::String(status_text.clone()),
1094            );
1095
1096            let response_body = response.bytes().await.map_err(|e| {
1097                CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1098            })?;
1099
1100            if config.throw_exception_on_failure
1101                && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1102            {
1103                return Err(CamelError::HttpOperationFailed {
1104                    status_code,
1105                    status_text,
1106                    response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1107                });
1108            }
1109
1110            if !response_body.is_empty() {
1111                exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1112            }
1113
1114            debug!(
1115                correlation_id = %exchange.correlation_id(),
1116                status = status_code,
1117                url = %url,
1118                "HTTP response"
1119            );
1120            Ok(exchange)
1121        })
1122    }
1123}
1124
1125#[cfg(test)]
1126mod tests {
1127    use super::*;
1128    use camel_api::Message;
1129    use std::sync::Arc;
1130    use std::time::Duration;
1131    use tokio::sync::Mutex;
1132
1133    // NullRouteController for testing
1134    struct NullRouteController;
1135    #[async_trait::async_trait]
1136    impl camel_api::RouteController for NullRouteController {
1137        async fn start_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1138            Ok(())
1139        }
1140        async fn stop_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1141            Ok(())
1142        }
1143        async fn restart_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1144            Ok(())
1145        }
1146        async fn suspend_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1147            Ok(())
1148        }
1149        async fn resume_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1150            Ok(())
1151        }
1152        fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
1153            None
1154        }
1155        async fn start_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
1156            Ok(())
1157        }
1158        async fn stop_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
1159            Ok(())
1160        }
1161    }
1162
1163    fn test_producer_ctx() -> ProducerContext {
1164        ProducerContext::new(Arc::new(Mutex::new(NullRouteController)))
1165    }
1166
1167    #[test]
1168    fn test_http_config_defaults() {
1169        let config = HttpConfig::from_uri("http://localhost:8080/api").unwrap();
1170        assert_eq!(config.base_url, "http://localhost:8080/api");
1171        assert!(config.http_method.is_none());
1172        assert!(config.throw_exception_on_failure);
1173        assert_eq!(config.ok_status_code_range, (200, 299));
1174        assert!(!config.follow_redirects);
1175        assert_eq!(config.connect_timeout, Duration::from_millis(30000));
1176        assert!(config.response_timeout.is_none());
1177    }
1178
1179    #[test]
1180    fn test_http_config_with_options() {
1181        let config = HttpConfig::from_uri(
1182            "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1183        ).unwrap();
1184        assert_eq!(config.base_url, "https://api.example.com/v1");
1185        assert_eq!(config.http_method, Some("PUT".to_string()));
1186        assert!(!config.throw_exception_on_failure);
1187        assert!(config.follow_redirects);
1188        assert_eq!(config.connect_timeout, Duration::from_millis(5000));
1189        assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1190    }
1191
1192    #[test]
1193    fn test_http_config_ok_status_range() {
1194        let config =
1195            HttpConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1196        assert_eq!(config.ok_status_code_range, (200, 204));
1197    }
1198
1199    #[test]
1200    fn test_http_config_wrong_scheme() {
1201        let result = HttpConfig::from_uri("file:/tmp");
1202        assert!(result.is_err());
1203    }
1204
1205    #[test]
1206    fn test_http_component_scheme() {
1207        let component = HttpComponent::new();
1208        assert_eq!(component.scheme(), "http");
1209    }
1210
1211    #[test]
1212    fn test_https_component_scheme() {
1213        let component = HttpsComponent::new();
1214        assert_eq!(component.scheme(), "https");
1215    }
1216
1217    #[test]
1218    fn test_http_endpoint_creates_consumer() {
1219        let component = HttpComponent::new();
1220        let endpoint = component
1221            .create_endpoint("http://0.0.0.0:19100/test")
1222            .unwrap();
1223        assert!(endpoint.create_consumer().is_ok());
1224    }
1225
1226    #[test]
1227    fn test_https_endpoint_creates_consumer() {
1228        let component = HttpsComponent::new();
1229        let endpoint = component
1230            .create_endpoint("https://0.0.0.0:8443/test")
1231            .unwrap();
1232        assert!(endpoint.create_consumer().is_ok());
1233    }
1234
1235    #[test]
1236    fn test_http_endpoint_creates_producer() {
1237        let ctx = test_producer_ctx();
1238        let component = HttpComponent::new();
1239        let endpoint = component.create_endpoint("http://localhost/api").unwrap();
1240        assert!(endpoint.create_producer(&ctx).is_ok());
1241    }
1242
1243    // -----------------------------------------------------------------------
1244    // Producer tests
1245    // -----------------------------------------------------------------------
1246
1247    async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
1248        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1249        let addr = listener.local_addr().unwrap();
1250        let url = format!("http://127.0.0.1:{}", addr.port());
1251
1252        let handle = tokio::spawn(async move {
1253            loop {
1254                if let Ok((mut stream, _)) = listener.accept().await {
1255                    tokio::spawn(async move {
1256                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1257                        let mut buf = vec![0u8; 4096];
1258                        let n = stream.read(&mut buf).await.unwrap_or(0);
1259                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
1260
1261                        let method = request.split_whitespace().next().unwrap_or("GET");
1262
1263                        let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
1264                        let response = format!(
1265                            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
1266                            body.len(),
1267                            body
1268                        );
1269                        let _ = stream.write_all(response.as_bytes()).await;
1270                    });
1271                }
1272            }
1273        });
1274
1275        (url, handle)
1276    }
1277
1278    async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
1279        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1280        let addr = listener.local_addr().unwrap();
1281        let url = format!("http://127.0.0.1:{}", addr.port());
1282
1283        let handle = tokio::spawn(async move {
1284            loop {
1285                if let Ok((mut stream, _)) = listener.accept().await {
1286                    let status = status;
1287                    tokio::spawn(async move {
1288                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1289                        let mut buf = vec![0u8; 4096];
1290                        let _ = stream.read(&mut buf).await;
1291
1292                        let status_text = match status {
1293                            404 => "Not Found",
1294                            500 => "Internal Server Error",
1295                            _ => "Error",
1296                        };
1297                        let body = "error body";
1298                        let response = format!(
1299                            "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
1300                            status,
1301                            status_text,
1302                            body.len(),
1303                            body
1304                        );
1305                        let _ = stream.write_all(response.as_bytes()).await;
1306                    });
1307                }
1308            }
1309        });
1310
1311        (url, handle)
1312    }
1313
1314    #[tokio::test]
1315    async fn test_http_producer_get_request() {
1316        use tower::ServiceExt;
1317
1318        let (url, _handle) = start_test_server().await;
1319        let ctx = test_producer_ctx();
1320
1321        let component = HttpComponent::new();
1322        let endpoint = component
1323            .create_endpoint(&format!("{url}/api/test?allowPrivateIps=true"))
1324            .unwrap();
1325        let producer = endpoint.create_producer(&ctx).unwrap();
1326
1327        let exchange = Exchange::new(Message::default());
1328        let result = producer.oneshot(exchange).await.unwrap();
1329
1330        let status = result
1331            .input
1332            .header("CamelHttpResponseCode")
1333            .and_then(|v| v.as_u64())
1334            .unwrap();
1335        assert_eq!(status, 200);
1336
1337        assert!(!result.input.body.is_empty());
1338    }
1339
1340    #[tokio::test]
1341    async fn test_http_producer_post_with_body() {
1342        use tower::ServiceExt;
1343
1344        let (url, _handle) = start_test_server().await;
1345        let ctx = test_producer_ctx();
1346
1347        let component = HttpComponent::new();
1348        let endpoint = component
1349            .create_endpoint(&format!("{url}/api/data?allowPrivateIps=true"))
1350            .unwrap();
1351        let producer = endpoint.create_producer(&ctx).unwrap();
1352
1353        let exchange = Exchange::new(Message::new("request body"));
1354        let result = producer.oneshot(exchange).await.unwrap();
1355
1356        let status = result
1357            .input
1358            .header("CamelHttpResponseCode")
1359            .and_then(|v| v.as_u64())
1360            .unwrap();
1361        assert_eq!(status, 200);
1362    }
1363
1364    #[tokio::test]
1365    async fn test_http_producer_method_from_header() {
1366        use tower::ServiceExt;
1367
1368        let (url, _handle) = start_test_server().await;
1369        let ctx = test_producer_ctx();
1370
1371        let component = HttpComponent::new();
1372        let endpoint = component
1373            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1374            .unwrap();
1375        let producer = endpoint.create_producer(&ctx).unwrap();
1376
1377        let mut exchange = Exchange::new(Message::default());
1378        exchange.input.set_header(
1379            "CamelHttpMethod",
1380            serde_json::Value::String("DELETE".to_string()),
1381        );
1382
1383        let result = producer.oneshot(exchange).await.unwrap();
1384        let status = result
1385            .input
1386            .header("CamelHttpResponseCode")
1387            .and_then(|v| v.as_u64())
1388            .unwrap();
1389        assert_eq!(status, 200);
1390    }
1391
1392    #[tokio::test]
1393    async fn test_http_producer_forced_method() {
1394        use tower::ServiceExt;
1395
1396        let (url, _handle) = start_test_server().await;
1397        let ctx = test_producer_ctx();
1398
1399        let component = HttpComponent::new();
1400        let endpoint = component
1401            .create_endpoint(&format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"))
1402            .unwrap();
1403        let producer = endpoint.create_producer(&ctx).unwrap();
1404
1405        let exchange = Exchange::new(Message::default());
1406        let result = producer.oneshot(exchange).await.unwrap();
1407
1408        let status = result
1409            .input
1410            .header("CamelHttpResponseCode")
1411            .and_then(|v| v.as_u64())
1412            .unwrap();
1413        assert_eq!(status, 200);
1414    }
1415
1416    #[tokio::test]
1417    async fn test_http_producer_throw_exception_on_failure() {
1418        use tower::ServiceExt;
1419
1420        let (url, _handle) = start_status_server(404).await;
1421        let ctx = test_producer_ctx();
1422
1423        let component = HttpComponent::new();
1424        let endpoint = component
1425            .create_endpoint(&format!("{url}/not-found?allowPrivateIps=true"))
1426            .unwrap();
1427        let producer = endpoint.create_producer(&ctx).unwrap();
1428
1429        let exchange = Exchange::new(Message::default());
1430        let result = producer.oneshot(exchange).await;
1431        assert!(result.is_err());
1432
1433        match result.unwrap_err() {
1434            CamelError::HttpOperationFailed { status_code, .. } => {
1435                assert_eq!(status_code, 404);
1436            }
1437            e => panic!("Expected HttpOperationFailed, got: {e}"),
1438        }
1439    }
1440
1441    #[tokio::test]
1442    async fn test_http_producer_no_throw_on_failure() {
1443        use tower::ServiceExt;
1444
1445        let (url, _handle) = start_status_server(500).await;
1446        let ctx = test_producer_ctx();
1447
1448        let component = HttpComponent::new();
1449        let endpoint = component
1450            .create_endpoint(&format!(
1451                "{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"
1452            ))
1453            .unwrap();
1454        let producer = endpoint.create_producer(&ctx).unwrap();
1455
1456        let exchange = Exchange::new(Message::default());
1457        let result = producer.oneshot(exchange).await.unwrap();
1458
1459        let status = result
1460            .input
1461            .header("CamelHttpResponseCode")
1462            .and_then(|v| v.as_u64())
1463            .unwrap();
1464        assert_eq!(status, 500);
1465    }
1466
1467    #[tokio::test]
1468    async fn test_http_producer_uri_override() {
1469        use tower::ServiceExt;
1470
1471        let (url, _handle) = start_test_server().await;
1472        let ctx = test_producer_ctx();
1473
1474        let component = HttpComponent::new();
1475        let endpoint = component
1476            .create_endpoint("http://localhost:1/does-not-exist?allowPrivateIps=true")
1477            .unwrap();
1478        let producer = endpoint.create_producer(&ctx).unwrap();
1479
1480        let mut exchange = Exchange::new(Message::default());
1481        exchange.input.set_header(
1482            "CamelHttpUri",
1483            serde_json::Value::String(format!("{url}/api")),
1484        );
1485
1486        let result = producer.oneshot(exchange).await.unwrap();
1487        let status = result
1488            .input
1489            .header("CamelHttpResponseCode")
1490            .and_then(|v| v.as_u64())
1491            .unwrap();
1492        assert_eq!(status, 200);
1493    }
1494
1495    #[tokio::test]
1496    async fn test_http_producer_response_headers_mapped() {
1497        use tower::ServiceExt;
1498
1499        let (url, _handle) = start_test_server().await;
1500        let ctx = test_producer_ctx();
1501
1502        let component = HttpComponent::new();
1503        let endpoint = component
1504            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1505            .unwrap();
1506        let producer = endpoint.create_producer(&ctx).unwrap();
1507
1508        let exchange = Exchange::new(Message::default());
1509        let result = producer.oneshot(exchange).await.unwrap();
1510
1511        assert!(
1512            result.input.header("content-type").is_some()
1513                || result.input.header("Content-Type").is_some()
1514        );
1515        assert!(result.input.header("CamelHttpResponseText").is_some());
1516    }
1517
1518    // -----------------------------------------------------------------------
1519    // Bug fix tests: Client configuration per-endpoint
1520    // -----------------------------------------------------------------------
1521
1522    async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
1523        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1524        let addr = listener.local_addr().unwrap();
1525        let url = format!("http://127.0.0.1:{}", addr.port());
1526
1527        let handle = tokio::spawn(async move {
1528            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1529            loop {
1530                if let Ok((mut stream, _)) = listener.accept().await {
1531                    tokio::spawn(async move {
1532                        let mut buf = vec![0u8; 4096];
1533                        let n = stream.read(&mut buf).await.unwrap_or(0);
1534                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
1535
1536                        // Check if this is a request to /final
1537                        if request.contains("GET /final") {
1538                            let body = r#"{"status":"final"}"#;
1539                            let response = format!(
1540                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1541                                body.len(),
1542                                body
1543                            );
1544                            let _ = stream.write_all(response.as_bytes()).await;
1545                        } else {
1546                            // Redirect to /final
1547                            let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
1548                            let _ = stream.write_all(response.as_bytes()).await;
1549                        }
1550                    });
1551                }
1552            }
1553        });
1554
1555        (url, handle)
1556    }
1557
1558    #[tokio::test]
1559    async fn test_follow_redirects_false_does_not_follow() {
1560        use tower::ServiceExt;
1561
1562        let (url, _handle) = start_redirect_server().await;
1563        let ctx = test_producer_ctx();
1564
1565        let component = HttpComponent::new();
1566        let endpoint = component
1567            .create_endpoint(&format!(
1568                "{url}?followRedirects=false&throwExceptionOnFailure=false&allowPrivateIps=true"
1569            ))
1570            .unwrap();
1571        let producer = endpoint.create_producer(&ctx).unwrap();
1572
1573        let exchange = Exchange::new(Message::default());
1574        let result = producer.oneshot(exchange).await.unwrap();
1575
1576        // Should get 302, NOT follow redirect to 200
1577        let status = result
1578            .input
1579            .header("CamelHttpResponseCode")
1580            .and_then(|v| v.as_u64())
1581            .unwrap();
1582        assert_eq!(
1583            status, 302,
1584            "Should NOT follow redirect when followRedirects=false"
1585        );
1586    }
1587
1588    #[tokio::test]
1589    async fn test_follow_redirects_true_follows_redirect() {
1590        use tower::ServiceExt;
1591
1592        let (url, _handle) = start_redirect_server().await;
1593        let ctx = test_producer_ctx();
1594
1595        let component = HttpComponent::new();
1596        let endpoint = component
1597            .create_endpoint(&format!("{url}?followRedirects=true&allowPrivateIps=true"))
1598            .unwrap();
1599        let producer = endpoint.create_producer(&ctx).unwrap();
1600
1601        let exchange = Exchange::new(Message::default());
1602        let result = producer.oneshot(exchange).await.unwrap();
1603
1604        // Should follow redirect and get 200
1605        let status = result
1606            .input
1607            .header("CamelHttpResponseCode")
1608            .and_then(|v| v.as_u64())
1609            .unwrap();
1610        assert_eq!(
1611            status, 200,
1612            "Should follow redirect when followRedirects=true"
1613        );
1614    }
1615
1616    #[tokio::test]
1617    async fn test_query_params_forwarded_to_http_request() {
1618        use tower::ServiceExt;
1619
1620        let (url, _handle) = start_test_server().await;
1621        let ctx = test_producer_ctx();
1622
1623        let component = HttpComponent::new();
1624        // apiKey is NOT a Camel option, should be forwarded as query param
1625        let endpoint = component
1626            .create_endpoint(&format!(
1627                "{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"
1628            ))
1629            .unwrap();
1630        let producer = endpoint.create_producer(&ctx).unwrap();
1631
1632        let exchange = Exchange::new(Message::default());
1633        let result = producer.oneshot(exchange).await.unwrap();
1634
1635        // The test server returns the request info in response
1636        // We just verify it succeeds (the query param was sent)
1637        let status = result
1638            .input
1639            .header("CamelHttpResponseCode")
1640            .and_then(|v| v.as_u64())
1641            .unwrap();
1642        assert_eq!(status, 200);
1643    }
1644
1645    #[tokio::test]
1646    async fn test_non_camel_query_params_are_forwarded() {
1647        // This test verifies Bug #3 fix: non-Camel options should be forwarded
1648        // We'll test the config parsing, not the actual HTTP call
1649        let config = HttpConfig::from_uri(
1650            "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
1651        )
1652        .unwrap();
1653
1654        // apiKey and token are NOT Camel options, should be forwarded
1655        assert!(
1656            config.query_params.contains_key("apiKey"),
1657            "apiKey should be preserved"
1658        );
1659        assert!(
1660            config.query_params.contains_key("token"),
1661            "token should be preserved"
1662        );
1663        assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
1664        assert_eq!(config.query_params.get("token").unwrap(), "abc456");
1665
1666        // httpMethod IS a Camel option, should NOT be in query_params
1667        assert!(
1668            !config.query_params.contains_key("httpMethod"),
1669            "httpMethod should not be forwarded"
1670        );
1671    }
1672
1673    // -----------------------------------------------------------------------
1674    // SSRF Protection tests
1675    // -----------------------------------------------------------------------
1676
1677    #[tokio::test]
1678    async fn test_http_producer_blocks_metadata_endpoint() {
1679        use tower::ServiceExt;
1680
1681        let ctx = test_producer_ctx();
1682        let component = HttpComponent::new();
1683        let endpoint = component
1684            .create_endpoint("http://example.com/api?allowPrivateIps=false")
1685            .unwrap();
1686        let producer = endpoint.create_producer(&ctx).unwrap();
1687
1688        let mut exchange = Exchange::new(Message::default());
1689        exchange.input.set_header(
1690            "CamelHttpUri",
1691            serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
1692        );
1693
1694        let result = producer.oneshot(exchange).await;
1695        assert!(result.is_err(), "Should block AWS metadata endpoint");
1696
1697        let err = result.unwrap_err();
1698        assert!(
1699            err.to_string().contains("Private IP"),
1700            "Error should mention private IP blocking, got: {}",
1701            err
1702        );
1703    }
1704
1705    #[test]
1706    fn test_ssrf_config_defaults() {
1707        let config = HttpConfig::from_uri("http://example.com/api").unwrap();
1708        assert!(
1709            !config.allow_private_ips,
1710            "Private IPs should be blocked by default"
1711        );
1712        assert!(
1713            config.blocked_hosts.is_empty(),
1714            "Blocked hosts should be empty by default"
1715        );
1716    }
1717
1718    #[test]
1719    fn test_ssrf_config_allow_private_ips() {
1720        let config = HttpConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
1721        assert!(
1722            config.allow_private_ips,
1723            "Private IPs should be allowed when explicitly set"
1724        );
1725    }
1726
1727    #[test]
1728    fn test_ssrf_config_blocked_hosts() {
1729        let config =
1730            HttpConfig::from_uri("http://example.com/api?blockedHosts=evil.com,malware.net")
1731                .unwrap();
1732        assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
1733    }
1734
1735    #[tokio::test]
1736    async fn test_http_producer_blocks_localhost() {
1737        use tower::ServiceExt;
1738
1739        let ctx = test_producer_ctx();
1740        let component = HttpComponent::new();
1741        let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1742        let producer = endpoint.create_producer(&ctx).unwrap();
1743
1744        let mut exchange = Exchange::new(Message::default());
1745        exchange.input.set_header(
1746            "CamelHttpUri",
1747            serde_json::Value::String("http://localhost:8080/internal".to_string()),
1748        );
1749
1750        let result = producer.oneshot(exchange).await;
1751        assert!(result.is_err(), "Should block localhost");
1752    }
1753
1754    #[tokio::test]
1755    async fn test_http_producer_blocks_loopback_ip() {
1756        use tower::ServiceExt;
1757
1758        let ctx = test_producer_ctx();
1759        let component = HttpComponent::new();
1760        let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1761        let producer = endpoint.create_producer(&ctx).unwrap();
1762
1763        let mut exchange = Exchange::new(Message::default());
1764        exchange.input.set_header(
1765            "CamelHttpUri",
1766            serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
1767        );
1768
1769        let result = producer.oneshot(exchange).await;
1770        assert!(result.is_err(), "Should block loopback IP");
1771    }
1772
1773    #[tokio::test]
1774    async fn test_http_producer_allows_private_ip_when_enabled() {
1775        use tower::ServiceExt;
1776
1777        let ctx = test_producer_ctx();
1778        let component = HttpComponent::new();
1779        // With allowPrivateIps=true, the validation should pass
1780        // (actual connection will fail, but that's expected)
1781        let endpoint = component
1782            .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true")
1783            .unwrap();
1784        let producer = endpoint.create_producer(&ctx).unwrap();
1785
1786        let exchange = Exchange::new(Message::default());
1787
1788        // The request will fail because we can't connect, but it should NOT fail
1789        // due to SSRF protection
1790        let result = producer.oneshot(exchange).await;
1791        // We expect connection error, not SSRF error
1792        if let Err(ref e) = result {
1793            let err_str = e.to_string();
1794            assert!(
1795                !err_str.contains("Private IP") && !err_str.contains("not allowed"),
1796                "Should not be SSRF error, got: {}",
1797                err_str
1798            );
1799        }
1800    }
1801
1802    // -----------------------------------------------------------------------
1803    // HttpServerConfig tests
1804    // -----------------------------------------------------------------------
1805
1806    #[test]
1807    fn test_http_server_config_parse() {
1808        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
1809        assert_eq!(cfg.host, "0.0.0.0");
1810        assert_eq!(cfg.port, 8080);
1811        assert_eq!(cfg.path, "/orders");
1812    }
1813
1814    #[test]
1815    fn test_http_server_config_default_path() {
1816        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
1817        assert_eq!(cfg.path, "/");
1818    }
1819
1820    #[test]
1821    fn test_http_server_config_wrong_scheme() {
1822        assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
1823    }
1824
1825    #[test]
1826    fn test_http_server_config_invalid_port() {
1827        assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
1828    }
1829
1830    #[test]
1831    fn test_request_envelope_and_reply_are_send() {
1832        fn assert_send<T: Send>() {}
1833        assert_send::<RequestEnvelope>();
1834        assert_send::<HttpReply>();
1835    }
1836
1837    // -----------------------------------------------------------------------
1838    // ServerRegistry tests
1839    // -----------------------------------------------------------------------
1840
1841    #[test]
1842    fn test_server_registry_global_is_singleton() {
1843        let r1 = ServerRegistry::global();
1844        let r2 = ServerRegistry::global();
1845        assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
1846    }
1847
1848    // -----------------------------------------------------------------------
1849    // Axum dispatch handler tests
1850    // -----------------------------------------------------------------------
1851
1852    #[tokio::test]
1853    async fn test_dispatch_handler_returns_404_for_unknown_path() {
1854        let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
1855        // Nothing registered in the dispatch table
1856        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1857        let port = listener.local_addr().unwrap().port();
1858        tokio::spawn(run_axum_server(listener, dispatch, 2 * 1024 * 1024));
1859
1860        // Wait for server to start
1861        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1862
1863        let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
1864            .await
1865            .unwrap();
1866        assert_eq!(resp.status().as_u16(), 404);
1867    }
1868
1869    // -----------------------------------------------------------------------
1870    // HttpConsumer tests
1871    // -----------------------------------------------------------------------
1872
1873    #[tokio::test]
1874    async fn test_http_consumer_start_registers_path() {
1875        use camel_component::ConsumerContext;
1876
1877        // Get an OS-assigned free port
1878        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1879        let port = listener.local_addr().unwrap().port();
1880        drop(listener); // Release port — ServerRegistry will rebind it
1881
1882        let consumer_cfg = HttpServerConfig {
1883            host: "127.0.0.1".to_string(),
1884            port,
1885            path: "/ping".to_string(),
1886            max_request_body: 2 * 1024 * 1024,
1887            max_response_body: 10 * 1024 * 1024,
1888        };
1889        let mut consumer = HttpConsumer::new(consumer_cfg);
1890
1891        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component::ExchangeEnvelope>(16);
1892        let token = tokio_util::sync::CancellationToken::new();
1893        let ctx = ConsumerContext::new(tx, token.clone());
1894
1895        tokio::spawn(async move {
1896            consumer.start(ctx).await.unwrap();
1897        });
1898
1899        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1900
1901        let client = reqwest::Client::new();
1902        let resp_future = client
1903            .post(format!("http://127.0.0.1:{port}/ping"))
1904            .body("hello world")
1905            .send();
1906
1907        let (http_result, _) = tokio::join!(resp_future, async {
1908            if let Some(mut envelope) = rx.recv().await {
1909                // Set a custom status code
1910                envelope.exchange.input.set_header(
1911                    "CamelHttpResponseCode",
1912                    serde_json::Value::Number(201.into()),
1913                );
1914                if let Some(reply_tx) = envelope.reply_tx {
1915                    let _ = reply_tx.send(Ok(envelope.exchange));
1916                }
1917            }
1918        });
1919
1920        let resp = http_result.unwrap();
1921        assert_eq!(resp.status().as_u16(), 201);
1922
1923        token.cancel();
1924    }
1925
1926    // -----------------------------------------------------------------------
1927    // Integration tests
1928    // -----------------------------------------------------------------------
1929
1930    #[tokio::test]
1931    async fn test_integration_single_consumer_round_trip() {
1932        use camel_component::{ConsumerContext, ExchangeEnvelope};
1933
1934        // Get an OS-assigned free port (ephemeral)
1935        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1936        let port = listener.local_addr().unwrap().port();
1937        drop(listener); // Release — ServerRegistry will rebind
1938
1939        let component = HttpComponent::new();
1940        let endpoint = component
1941            .create_endpoint(&format!("http://127.0.0.1:{port}/echo"))
1942            .unwrap();
1943        let mut consumer = endpoint.create_consumer().unwrap();
1944
1945        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
1946        let token = tokio_util::sync::CancellationToken::new();
1947        let ctx = ConsumerContext::new(tx, token.clone());
1948
1949        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
1950        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1951
1952        let client = reqwest::Client::new();
1953        let send_fut = client
1954            .post(format!("http://127.0.0.1:{port}/echo"))
1955            .header("Content-Type", "text/plain")
1956            .body("ping")
1957            .send();
1958
1959        let (http_result, _) = tokio::join!(send_fut, async {
1960            if let Some(mut envelope) = rx.recv().await {
1961                assert_eq!(
1962                    envelope.exchange.input.header("CamelHttpMethod"),
1963                    Some(&serde_json::Value::String("POST".into()))
1964                );
1965                assert_eq!(
1966                    envelope.exchange.input.header("CamelHttpPath"),
1967                    Some(&serde_json::Value::String("/echo".into()))
1968                );
1969                envelope.exchange.input.body = camel_api::body::Body::Text("pong".to_string());
1970                if let Some(reply_tx) = envelope.reply_tx {
1971                    let _ = reply_tx.send(Ok(envelope.exchange));
1972                }
1973            }
1974        });
1975
1976        let resp = http_result.unwrap();
1977        assert_eq!(resp.status().as_u16(), 200);
1978        let body = resp.text().await.unwrap();
1979        assert_eq!(body, "pong");
1980
1981        token.cancel();
1982    }
1983
1984    #[tokio::test]
1985    async fn test_integration_two_consumers_shared_port() {
1986        use camel_component::{ConsumerContext, ExchangeEnvelope};
1987
1988        // Get an OS-assigned free port (ephemeral)
1989        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1990        let port = listener.local_addr().unwrap().port();
1991        drop(listener);
1992
1993        let component = HttpComponent::new();
1994
1995        // Consumer A: /hello
1996        let endpoint_a = component
1997            .create_endpoint(&format!("http://127.0.0.1:{port}/hello"))
1998            .unwrap();
1999        let mut consumer_a = endpoint_a.create_consumer().unwrap();
2000
2001        // Consumer B: /world
2002        let endpoint_b = component
2003            .create_endpoint(&format!("http://127.0.0.1:{port}/world"))
2004            .unwrap();
2005        let mut consumer_b = endpoint_b.create_consumer().unwrap();
2006
2007        let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2008        let token_a = tokio_util::sync::CancellationToken::new();
2009        let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
2010
2011        let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2012        let token_b = tokio_util::sync::CancellationToken::new();
2013        let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
2014
2015        tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
2016        tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
2017        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2018
2019        let client = reqwest::Client::new();
2020
2021        // Request to /hello
2022        let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
2023        let (resp_hello, _) = tokio::join!(fut_hello, async {
2024            if let Some(mut envelope) = rx_a.recv().await {
2025                envelope.exchange.input.body =
2026                    camel_api::body::Body::Text("hello-response".to_string());
2027                if let Some(reply_tx) = envelope.reply_tx {
2028                    let _ = reply_tx.send(Ok(envelope.exchange));
2029                }
2030            }
2031        });
2032
2033        // Request to /world
2034        let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
2035        let (resp_world, _) = tokio::join!(fut_world, async {
2036            if let Some(mut envelope) = rx_b.recv().await {
2037                envelope.exchange.input.body =
2038                    camel_api::body::Body::Text("world-response".to_string());
2039                if let Some(reply_tx) = envelope.reply_tx {
2040                    let _ = reply_tx.send(Ok(envelope.exchange));
2041                }
2042            }
2043        });
2044
2045        let body_a = resp_hello.unwrap().text().await.unwrap();
2046        let body_b = resp_world.unwrap().text().await.unwrap();
2047
2048        assert_eq!(body_a, "hello-response");
2049        assert_eq!(body_b, "world-response");
2050
2051        token_a.cancel();
2052        token_b.cancel();
2053    }
2054
2055    #[tokio::test]
2056    async fn test_integration_unregistered_path_returns_404() {
2057        use camel_component::{ConsumerContext, ExchangeEnvelope};
2058
2059        // Get an OS-assigned free port (ephemeral)
2060        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2061        let port = listener.local_addr().unwrap().port();
2062        drop(listener);
2063
2064        let component = HttpComponent::new();
2065        let endpoint = component
2066            .create_endpoint(&format!("http://127.0.0.1:{port}/registered"))
2067            .unwrap();
2068        let mut consumer = endpoint.create_consumer().unwrap();
2069
2070        let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2071        let token = tokio_util::sync::CancellationToken::new();
2072        let ctx = ConsumerContext::new(tx, token.clone());
2073
2074        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2075        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2076
2077        let client = reqwest::Client::new();
2078        let resp = client
2079            .get(format!("http://127.0.0.1:{port}/not-there"))
2080            .send()
2081            .await
2082            .unwrap();
2083        assert_eq!(resp.status().as_u16(), 404);
2084
2085        token.cancel();
2086    }
2087
2088    #[test]
2089    fn test_http_consumer_declares_concurrent() {
2090        use camel_component::ConcurrencyModel;
2091
2092        let config = HttpServerConfig {
2093            host: "127.0.0.1".to_string(),
2094            port: 19999,
2095            path: "/test".to_string(),
2096            max_request_body: 2 * 1024 * 1024,
2097            max_response_body: 10 * 1024 * 1024,
2098        };
2099        let consumer = HttpConsumer::new(config);
2100        assert_eq!(
2101            consumer.concurrency_model(),
2102            ConcurrencyModel::Concurrent { max: None }
2103        );
2104    }
2105}