Skip to main content

camel_component_http/
lib.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex, OnceLock};
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use tokio::sync::RwLock;
9use tower::Service;
10use tracing::debug;
11
12use camel_api::{BoxProcessor, CamelError, Exchange, body::Body};
13use camel_component::{Component, Consumer, Endpoint, ProducerContext};
14use camel_endpoint::parse_uri;
15
16// ---------------------------------------------------------------------------
17// HttpConfig
18// ---------------------------------------------------------------------------
19
20#[derive(Debug, Clone)]
21pub struct HttpConfig {
22    pub base_url: String,
23    pub http_method: Option<String>,
24    pub throw_exception_on_failure: bool,
25    pub ok_status_code_range: (u16, u16),
26    pub follow_redirects: bool,
27    pub connect_timeout: Duration,
28    pub response_timeout: Option<Duration>,
29    pub query_params: HashMap<String, String>,
30    // Security settings
31    pub allow_private_ips: bool,
32    pub blocked_hosts: Vec<String>,
33}
34
35impl HttpConfig {
36    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
37        let parts = parse_uri(uri)?;
38        if parts.scheme != "http" && parts.scheme != "https" {
39            return Err(CamelError::InvalidUri(format!(
40                "expected scheme 'http' or 'https', got '{}'",
41                parts.scheme
42            )));
43        }
44
45        let base_url = format!("{}:{}", parts.scheme, parts.path);
46
47        let http_method = parts.params.get("httpMethod").cloned();
48
49        let throw_exception_on_failure = parts
50            .params
51            .get("throwExceptionOnFailure")
52            .map(|v| v != "false")
53            .unwrap_or(true);
54
55        let ok_status_code_range = parts
56            .params
57            .get("okStatusCodeRange")
58            .and_then(|v| {
59                let (start, end) = v.split_once('-')?;
60                Some((start.parse::<u16>().ok()?, end.parse::<u16>().ok()?))
61            })
62            .unwrap_or((200, 299));
63
64        let follow_redirects = parts
65            .params
66            .get("followRedirects")
67            .map(|v| v == "true")
68            .unwrap_or(false);
69
70        let connect_timeout = parts
71            .params
72            .get("connectTimeout")
73            .and_then(|v| v.parse::<u64>().ok())
74            .map(Duration::from_millis)
75            .unwrap_or(Duration::from_millis(30000));
76
77        let response_timeout = parts
78            .params
79            .get("responseTimeout")
80            .and_then(|v| v.parse::<u64>().ok())
81            .map(Duration::from_millis);
82
83        // SSRF protection settings
84        let allow_private_ips = parts
85            .params
86            .get("allowPrivateIps")
87            .map(|v| v == "true")
88            .unwrap_or(false); // Default: block private IPs
89
90        let blocked_hosts = parts
91            .params
92            .get("blockedHosts")
93            .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
94            .unwrap_or_default();
95
96        // CAMEL_OPTIONS: params that are consumed by Camel,        // Any remaining params should be forwarded as HTTP query params
97        let camel_options = [
98            "httpMethod",
99            "throwExceptionOnFailure",
100            "okStatusCodeRange",
101            "followRedirects",
102            "connectTimeout",
103            "responseTimeout",
104            "allowPrivateIps",
105            "blockedHosts",
106        ];
107
108        let query_params: HashMap<String, String> = parts
109            .params
110            .into_iter()
111            .filter(|(k, _)| !camel_options.contains(&k.as_str()))
112            .map(|(k, v)| (k.clone(), v.clone()))
113            .collect();
114
115        Ok(Self {
116            base_url,
117            http_method,
118            throw_exception_on_failure,
119            ok_status_code_range: (ok_status_code_range.0, ok_status_code_range.1),
120            follow_redirects,
121            connect_timeout,
122            response_timeout,
123            query_params,
124            allow_private_ips,
125            blocked_hosts,
126        })
127    }
128}
129
130// ---------------------------------------------------------------------------
131// HttpServerConfig
132// ---------------------------------------------------------------------------
133
134/// Configuration for an HTTP server (consumer) endpoint.
135#[derive(Debug, Clone)]
136pub struct HttpServerConfig {
137    /// Bind address, e.g. "0.0.0.0" or "127.0.0.1".
138    pub host: String,
139    /// TCP port to listen on.
140    pub port: u16,
141    /// URL path this consumer handles, e.g. "/orders".
142    pub path: String,
143}
144
145impl HttpServerConfig {
146    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
147        let parts = parse_uri(uri)?;
148        if parts.scheme != "http" && parts.scheme != "https" {
149            return Err(CamelError::InvalidUri(format!(
150                "expected scheme 'http' or 'https', got '{}'",
151                parts.scheme
152            )));
153        }
154
155        // parts.path is everything after the scheme colon, e.g. "//0.0.0.0:8080/orders"
156        // Strip leading "//"
157        let authority_and_path = parts.path.trim_start_matches('/');
158
159        // Split on the first "/" to separate "host:port" from "/path"
160        let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
161            (&authority_and_path[..idx], &authority_and_path[idx..])
162        } else {
163            (authority_and_path, "/")
164        };
165
166        let path = if path_suffix.is_empty() {
167            "/"
168        } else {
169            path_suffix
170        }
171        .to_string();
172
173        // Parse host:port
174        let (host, port) = if let Some(colon) = authority.rfind(':') {
175            let port_str = &authority[colon + 1..];
176            match port_str.parse::<u16>() {
177                Ok(p) => (authority[..colon].to_string(), p),
178                Err(_) => {
179                    return Err(CamelError::InvalidUri(format!(
180                        "invalid port '{}' in URI '{}'",
181                        port_str, uri
182                    )));
183                }
184            }
185        } else {
186            (authority.to_string(), 80)
187        };
188
189        Ok(Self { host, port, path })
190    }
191}
192
193// ---------------------------------------------------------------------------
194// RequestEnvelope / HttpReply
195// ---------------------------------------------------------------------------
196
197/// An inbound HTTP request sent from the Axum dispatch handler to an
198/// `HttpConsumer` receive loop.
199pub struct RequestEnvelope {
200    pub method: String,
201    pub path: String,
202    pub query: String,
203    pub headers: http::HeaderMap,
204    pub body: bytes::Bytes,
205    pub reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
206}
207
208/// The HTTP response that `HttpConsumer` sends back to the Axum handler.
209#[derive(Debug, Clone)]
210pub struct HttpReply {
211    pub status: u16,
212    pub headers: Vec<(String, String)>,
213    pub body: bytes::Bytes,
214}
215
216// ---------------------------------------------------------------------------
217// DispatchTable / ServerRegistry
218// ---------------------------------------------------------------------------
219
220/// Maps URL path → channel sender for the consumer that owns that path.
221pub type DispatchTable = Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
222
223/// Handle to a running Axum server on one port.
224struct ServerHandle {
225    dispatch: DispatchTable,
226    /// Kept alive so the task isn't dropped; not used directly.
227    _task: tokio::task::JoinHandle<()>,
228}
229
230/// Process-global registry mapping port → running Axum server handle.
231pub struct ServerRegistry {
232    inner: Mutex<HashMap<u16, ServerHandle>>,
233}
234
235impl ServerRegistry {
236    /// Returns the global singleton.
237    pub fn global() -> &'static Self {
238        static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
239        INSTANCE.get_or_init(|| ServerRegistry {
240            inner: Mutex::new(HashMap::new()),
241        })
242    }
243
244    /// Returns the `DispatchTable` for `port`, spawning a new Axum server if
245    /// none is running on that port yet.
246    pub async fn get_or_spawn(
247        &'static self,
248        host: &str,
249        port: u16,
250    ) -> Result<DispatchTable, CamelError> {
251        // Fast path: check without spawning.
252        {
253            let guard = self.inner.lock().map_err(|_| {
254                CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
255            })?;
256            if let Some(handle) = guard.get(&port) {
257                return Ok(Arc::clone(&handle.dispatch));
258            }
259        }
260
261        // Slow path: need to bind and spawn.
262        let addr = format!("{}:{}", host, port);
263        let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
264            CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
265        })?;
266
267        let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
268        let dispatch_for_server = Arc::clone(&dispatch);
269        let task = tokio::spawn(run_axum_server(listener, dispatch_for_server));
270
271        // Re-acquire lock to insert — handle the race where another task won.
272        let mut guard = self.inner.lock().map_err(|_| {
273            CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
274        })?;
275        // If another task already registered this port while we were binding,
276        // abort our task (it has no routes) and use the winner's dispatch table.
277        if let Some(existing) = guard.get(&port) {
278            task.abort();
279            return Ok(Arc::clone(&existing.dispatch));
280        }
281        guard.insert(
282            port,
283            ServerHandle {
284                dispatch: Arc::clone(&dispatch),
285                _task: task,
286            },
287        );
288        Ok(dispatch)
289    }
290}
291
292// ---------------------------------------------------------------------------
293// Axum server
294// ---------------------------------------------------------------------------
295
296use axum::{
297    Router,
298    body::Body as AxumBody,
299    extract::{Request, State},
300    http::{Response, StatusCode},
301    response::IntoResponse,
302};
303
304async fn run_axum_server(listener: tokio::net::TcpListener, dispatch: DispatchTable) {
305    let app = Router::new()
306        .fallback(dispatch_handler)
307        .with_state(dispatch);
308
309    axum::serve(listener, app).await.unwrap_or_else(|e| {
310        tracing::error!(error = %e, "Axum server error");
311    });
312}
313
314async fn dispatch_handler(
315    State(dispatch): State<DispatchTable>,
316    req: Request,
317) -> impl IntoResponse {
318    const MAX_REQUEST_BODY: usize = 2 * 1024 * 1024; // 2 MB
319
320    let method = req.method().to_string();
321    let path = req.uri().path().to_string();
322    let query = req.uri().query().unwrap_or("").to_string();
323    let headers = req.headers().clone();
324
325    let body_bytes = match axum::body::to_bytes(req.into_body(), MAX_REQUEST_BODY).await {
326        Ok(b) => b,
327        Err(_) => {
328            return Response::builder()
329                .status(StatusCode::BAD_REQUEST)
330                .body(AxumBody::empty())
331                .expect(
332                    "Response::builder() with a known-valid status code and body is infallible",
333                );
334        }
335    };
336
337    // Look up handler for this path
338    let sender = {
339        let table = dispatch.read().await;
340        table.get(&path).cloned()
341    };
342
343    let Some(sender) = sender else {
344        return Response::builder()
345            .status(StatusCode::NOT_FOUND)
346            .body(AxumBody::from("No consumer registered for this path"))
347            .expect("Response::builder() with a known-valid status code and body is infallible");
348    };
349
350    let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
351    let envelope = RequestEnvelope {
352        method,
353        path,
354        query,
355        headers,
356        body: body_bytes,
357        reply_tx,
358    };
359
360    if sender.send(envelope).await.is_err() {
361        return Response::builder()
362            .status(StatusCode::SERVICE_UNAVAILABLE)
363            .body(AxumBody::from("Consumer unavailable"))
364            .expect("Response::builder() with a known-valid status code and body is infallible");
365    }
366
367    match reply_rx.await {
368        Ok(reply) => {
369            let status =
370                StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
371            let mut builder = Response::builder().status(status);
372            for (k, v) in &reply.headers {
373                builder = builder.header(k.as_str(), v.as_str());
374            }
375            builder
376                .body(AxumBody::from(reply.body))
377                .unwrap_or_else(|_| {
378                    Response::builder()
379                        .status(StatusCode::INTERNAL_SERVER_ERROR)
380                        .body(AxumBody::from("Invalid response headers from consumer"))
381                        .expect("Response::builder() with a known-valid status code and body is infallible")
382                })
383        }
384        Err(_) => Response::builder()
385            .status(StatusCode::INTERNAL_SERVER_ERROR)
386            .body(AxumBody::from("Pipeline error"))
387            .expect("Response::builder() with a known-valid status code and body is infallible"),
388    }
389}
390
391// ---------------------------------------------------------------------------
392// HttpConsumer
393// ---------------------------------------------------------------------------
394
395pub struct HttpConsumer {
396    config: HttpServerConfig,
397}
398
399impl HttpConsumer {
400    pub fn new(config: HttpServerConfig) -> Self {
401        Self { config }
402    }
403}
404
405#[async_trait::async_trait]
406impl Consumer for HttpConsumer {
407    async fn start(&mut self, ctx: camel_component::ConsumerContext) -> Result<(), CamelError> {
408        use camel_api::{Exchange, Message, body::Body};
409
410        let dispatch = ServerRegistry::global()
411            .get_or_spawn(&self.config.host, self.config.port)
412            .await?;
413
414        // Create a channel for this path and register it
415        let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
416        {
417            let mut table = dispatch.write().await;
418            table.insert(self.config.path.clone(), env_tx);
419        }
420
421        let path = self.config.path.clone();
422        let cancel_token = ctx.cancel_token();
423
424        loop {
425            tokio::select! {
426                _ = ctx.cancelled() => {
427                    break;
428                }
429                envelope = env_rx.recv() => {
430                    let Some(envelope) = envelope else { break; };
431
432                    // Build Exchange from HTTP request
433                    let mut msg = Message::default();
434
435                    // Set standard Camel HTTP headers
436                    msg.set_header("CamelHttpMethod",
437                        serde_json::Value::String(envelope.method.clone()));
438                    msg.set_header("CamelHttpPath",
439                        serde_json::Value::String(envelope.path.clone()));
440                    msg.set_header("CamelHttpQuery",
441                        serde_json::Value::String(envelope.query.clone()));
442
443                    // Forward HTTP headers (skip pseudo-headers)
444                    for (k, v) in &envelope.headers {
445                        if let Ok(val_str) = v.to_str() {
446                            msg.set_header(
447                                k.as_str(),
448                                serde_json::Value::String(val_str.to_string()),
449                            );
450                        }
451                    }
452
453                    // Body: Try to convert to text if UTF-8, otherwise keep as bytes
454                    if !envelope.body.is_empty() {
455                        match std::str::from_utf8(&envelope.body) {
456                            Ok(text) => msg.body = Body::Text(text.to_string()),
457                            Err(_) => msg.body = Body::Bytes(envelope.body.clone()),
458                        }
459                    }
460
461                    let exchange = Exchange::new(msg);
462                    let reply_tx = envelope.reply_tx;
463                    let sender = ctx.sender().clone();
464                    let path_clone = path.clone();
465                    let cancel = cancel_token.clone();
466
467                    // Spawn a task to handle this request concurrently
468                    //
469                    // NOTE: This spawns a separate tokio task for each incoming HTTP request to enable
470                    // true concurrent request processing. This change was introduced as part of the
471                    // pipeline concurrency feature and was NOT part of the original HttpConsumer design.
472                    //
473                    // Rationale:
474                    // 1. Without spawning per-request tasks, the send_and_wait() operation would block
475                    //    the consumer's main loop until the pipeline processing completes
476                    // 2. This blocking would prevent multiple HTTP requests from being processed
477                    //    concurrently, even when ConcurrencyModel::Concurrent is enabled on the pipeline
478                    // 3. The channel would never have multiple exchanges buffered simultaneously,
479                    //    defeating the purpose of pipeline-side concurrency
480                    // 4. By spawning a task per request, we allow the consumer loop to continue
481                    //    accepting new requests while existing ones are processed in the pipeline
482                    //
483                    // This approach effectively decouples request acceptance from pipeline processing,
484                    // allowing the channel to buffer multiple exchanges that can be processed concurrently
485                    // by the pipeline when ConcurrencyModel::Concurrent is active.
486                    tokio::spawn(async move {
487                        // Check for cancellation before sending to pipeline.
488                        // Returns 503 (Service Unavailable) instead of letting the request
489                        // enter a shutting-down pipeline. This is a behavioral change from
490                        // the pre-concurrency implementation where cancellation during
491                        // processing would result in a 500 (Internal Server Error).
492                        // 503 is more semantically correct: the server is temporarily
493                        // unable to handle the request due to shutdown.
494                        if cancel.is_cancelled() {
495                            let _ = reply_tx.send(HttpReply {
496                                status: 503,
497                                headers: vec![],
498                                body: bytes::Bytes::from("Service Unavailable"),
499                            });
500                            return;
501                        }
502
503                        // Send through pipeline and await result
504                        let (tx, rx) = tokio::sync::oneshot::channel();
505                        let envelope = camel_component::consumer::ExchangeEnvelope {
506                            exchange,
507                            reply_tx: Some(tx),
508                        };
509
510                        let result = match sender.send(envelope).await {
511                            Ok(()) => rx.await.map_err(|_| camel_api::CamelError::ChannelClosed),
512                            Err(_) => Err(camel_api::CamelError::ChannelClosed),
513                        }
514                        .and_then(|r| r);
515
516                        let reply = match result {
517                            Ok(out) => {
518                                let status = out
519                                    .input
520                                    .header("CamelHttpResponseCode")
521                                    .and_then(|v| v.as_u64())
522                                    .map(|s| s as u16)
523                                    .unwrap_or(200);
524
525                                let body_bytes = match &out.input.body {
526                                    Body::Empty => bytes::Bytes::new(),
527                                    Body::Bytes(b) => b.clone(),
528                                    Body::Text(s) => bytes::Bytes::from(s.clone().into_bytes()),
529                                    Body::Json(v) => bytes::Bytes::from(v.to_string().into_bytes()),
530                                };
531
532                                let resp_headers: Vec<(String, String)> = out
533                                    .input
534                                    .headers
535                                    .iter()
536                                    // Filter Camel internal headers
537                                    .filter(|(k, _)| !k.starts_with("Camel"))
538                                    // Filter hop-by-hop and request-only headers
539                                    // Based on Apache Camel's HttpUtil.addCommonFilters()
540                                    .filter(|(k, _)| {
541                                        !matches!(
542                                            k.to_lowercase().as_str(),
543                                            // RFC 2616 Section 4.5 - General headers
544                                            "content-length" |      // Auto-calculated by framework
545                                            "content-type" |        // Auto-calculated from body
546                                            "transfer-encoding" |   // Hop-by-hop
547                                            "connection" |          // Hop-by-hop
548                                            "cache-control" |       // Hop-by-hop
549                                            "date" |                // Auto-generated
550                                            "pragma" |              // Hop-by-hop
551                                            "trailer" |             // Hop-by-hop
552                                            "upgrade" |             // Hop-by-hop
553                                            "via" |                 // Hop-by-hop
554                                            "warning" |             // Hop-by-hop
555                                            // Request-only headers
556                                            "host" |                // Request-only
557                                            "user-agent" |          // Request-only
558                                            "accept" |              // Request-only
559                                            "accept-encoding" |     // Request-only
560                                            "accept-language" |     // Request-only
561                                            "accept-charset" |      // Request-only
562                                            "authorization" |       // Request-only (security)
563                                            "proxy-authorization" | // Request-only (security)
564                                            "cookie" |              // Request-only
565                                            "expect" |              // Request-only
566                                            "from" |                // Request-only
567                                            "if-match" |            // Request-only
568                                            "if-modified-since" |   // Request-only
569                                            "if-none-match" |       // Request-only
570                                            "if-range" |            // Request-only
571                                            "if-unmodified-since" | // Request-only
572                                            "max-forwards" |        // Request-only
573                                            "proxy-connection" |    // Request-only
574                                            "range" |               // Request-only
575                                            "referer" |             // Request-only
576                                            "te"                    // Request-only
577                                        )
578                                    })
579                                    .filter_map(|(k, v)| {
580                                        v.as_str().map(|s| (k.clone(), s.to_string()))
581                                    })
582                                    .collect();
583
584                                HttpReply {
585                                    status,
586                                    headers: resp_headers,
587                                    body: body_bytes,
588                                }
589                            }
590                            Err(e) => {
591                                tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
592                                HttpReply {
593                                    status: 500,
594                                    headers: vec![],
595                                    body: bytes::Bytes::from("Internal Server Error"),
596                                }
597                            }
598                        };
599
600                        // Reply to Axum handler (ignore error if client disconnected)
601                        let _ = reply_tx.send(reply);
602                    });
603                }
604            }
605        }
606
607        // Deregister this path
608        {
609            let mut table = dispatch.write().await;
610            table.remove(&path);
611        }
612
613        Ok(())
614    }
615
616    async fn stop(&mut self) -> Result<(), CamelError> {
617        Ok(())
618    }
619
620    fn concurrency_model(&self) -> camel_component::ConcurrencyModel {
621        camel_component::ConcurrencyModel::Concurrent { max: None }
622    }
623}
624
625// ---------------------------------------------------------------------------
626// HttpComponent / HttpsComponent
627// ---------------------------------------------------------------------------
628
629pub struct HttpComponent;
630
631impl HttpComponent {
632    pub fn new() -> Self {
633        Self
634    }
635}
636
637impl Default for HttpComponent {
638    fn default() -> Self {
639        Self::new()
640    }
641}
642
643impl Component for HttpComponent {
644    fn scheme(&self) -> &str {
645        "http"
646    }
647
648    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
649        let config = HttpConfig::from_uri(uri)?;
650        let server_config = HttpServerConfig::from_uri(uri)?;
651        let client = build_client(&config)?;
652        Ok(Box::new(HttpEndpoint {
653            uri: uri.to_string(),
654            config,
655            server_config,
656            client,
657        }))
658    }
659}
660
661pub struct HttpsComponent;
662
663impl HttpsComponent {
664    pub fn new() -> Self {
665        Self
666    }
667}
668
669impl Default for HttpsComponent {
670    fn default() -> Self {
671        Self::new()
672    }
673}
674
675impl Component for HttpsComponent {
676    fn scheme(&self) -> &str {
677        "https"
678    }
679
680    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
681        let config = HttpConfig::from_uri(uri)?;
682        let server_config = HttpServerConfig::from_uri(uri)?;
683        let client = build_client(&config)?;
684        Ok(Box::new(HttpEndpoint {
685            uri: uri.to_string(),
686            config,
687            server_config,
688            client,
689        }))
690    }
691}
692
693fn build_client(config: &HttpConfig) -> Result<reqwest::Client, CamelError> {
694    let mut builder = reqwest::Client::builder().connect_timeout(config.connect_timeout);
695
696    if !config.follow_redirects {
697        builder = builder.redirect(reqwest::redirect::Policy::none());
698    }
699
700    builder.build().map_err(|e| {
701        CamelError::EndpointCreationFailed(format!("Failed to build HTTP client: {e}"))
702    })
703}
704
705// ---------------------------------------------------------------------------
706// HttpEndpoint
707// ---------------------------------------------------------------------------
708
709struct HttpEndpoint {
710    uri: String,
711    config: HttpConfig,
712    server_config: HttpServerConfig,
713    client: reqwest::Client,
714}
715
716impl Endpoint for HttpEndpoint {
717    fn uri(&self) -> &str {
718        &self.uri
719    }
720
721    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
722        Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
723    }
724
725    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
726        Ok(BoxProcessor::new(HttpProducer {
727            config: Arc::new(self.config.clone()),
728            client: self.client.clone(),
729        }))
730    }
731}
732
733// ---------------------------------------------------------------------------
734// SSRF Protection
735// ---------------------------------------------------------------------------
736
737fn validate_url_for_ssrf(url: &str, config: &HttpConfig) -> Result<(), CamelError> {
738    let parsed = url::Url::parse(url)
739        .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
740
741    // Check blocked hosts
742    if let Some(host) = parsed.host_str()
743        && config.blocked_hosts.iter().any(|blocked| host == blocked)
744    {
745        return Err(CamelError::ProcessorError(format!(
746            "Host '{}' is blocked",
747            host
748        )));
749    }
750
751    // Check private IPs if not allowed
752    if !config.allow_private_ips
753        && let Some(host) = parsed.host()
754    {
755        match host {
756            url::Host::Ipv4(ip) => {
757                if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
758                    return Err(CamelError::ProcessorError(format!(
759                        "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
760                        ip
761                    )));
762                }
763            }
764            url::Host::Ipv6(ip) => {
765                if ip.is_loopback() {
766                    return Err(CamelError::ProcessorError(format!(
767                        "Loopback IP '{}' not allowed",
768                        ip
769                    )));
770                }
771            }
772            url::Host::Domain(domain) => {
773                // Block common internal domains
774                let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
775                if blocked_domains.contains(&domain) {
776                    return Err(CamelError::ProcessorError(format!(
777                        "Domain '{}' is not allowed",
778                        domain
779                    )));
780                }
781            }
782        }
783    }
784
785    Ok(())
786}
787
788// ---------------------------------------------------------------------------
789// HttpProducer
790// ---------------------------------------------------------------------------
791
792#[derive(Clone)]
793struct HttpProducer {
794    config: Arc<HttpConfig>,
795    client: reqwest::Client,
796}
797
798impl HttpProducer {
799    fn resolve_method(exchange: &Exchange, config: &HttpConfig) -> String {
800        if let Some(ref method) = config.http_method {
801            return method.to_uppercase();
802        }
803        if let Some(method) = exchange
804            .input
805            .header("CamelHttpMethod")
806            .and_then(|v| v.as_str())
807        {
808            return method.to_uppercase();
809        }
810        if !exchange.input.body.is_empty() {
811            return "POST".to_string();
812        }
813        "GET".to_string()
814    }
815
816    fn resolve_url(exchange: &Exchange, config: &HttpConfig) -> String {
817        if let Some(uri) = exchange
818            .input
819            .header("CamelHttpUri")
820            .and_then(|v| v.as_str())
821        {
822            let mut url = uri.to_string();
823            if let Some(path) = exchange
824                .input
825                .header("CamelHttpPath")
826                .and_then(|v| v.as_str())
827            {
828                if !url.ends_with('/') && !path.starts_with('/') {
829                    url.push('/');
830                }
831                url.push_str(path);
832            }
833            if let Some(query) = exchange
834                .input
835                .header("CamelHttpQuery")
836                .and_then(|v| v.as_str())
837            {
838                url.push('?');
839                url.push_str(query);
840            }
841            return url;
842        }
843
844        let mut url = config.base_url.clone();
845
846        if let Some(path) = exchange
847            .input
848            .header("CamelHttpPath")
849            .and_then(|v| v.as_str())
850        {
851            if !url.ends_with('/') && !path.starts_with('/') {
852                url.push('/');
853            }
854            url.push_str(path);
855        }
856
857        if let Some(query) = exchange
858            .input
859            .header("CamelHttpQuery")
860            .and_then(|v| v.as_str())
861        {
862            url.push('?');
863            url.push_str(query);
864        } else if !config.query_params.is_empty() {
865            // Forward non-Camel query params from config
866            url.push('?');
867            let query_string: String = config
868                .query_params
869                .iter()
870                .map(|(k, v)| format!("{k}={v}"))
871                .collect::<Vec<_>>()
872                .join("&");
873            url.push_str(&query_string);
874        }
875
876        url
877    }
878
879    fn body_to_bytes(body: &Body) -> Option<Vec<u8>> {
880        match body {
881            Body::Empty => None,
882            Body::Bytes(b) => Some(b.to_vec()),
883            Body::Text(s) => Some(s.as_bytes().to_vec()),
884            Body::Json(v) => Some(v.to_string().into_bytes()),
885        }
886    }
887
888    fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
889        status >= range.0 && status <= range.1
890    }
891}
892
893impl Service<Exchange> for HttpProducer {
894    type Response = Exchange;
895    type Error = CamelError;
896    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
897
898    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
899        Poll::Ready(Ok(()))
900    }
901
902    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
903        let config = self.config.clone();
904        let client = self.client.clone();
905
906        Box::pin(async move {
907            let method_str = HttpProducer::resolve_method(&exchange, &config);
908            let url = HttpProducer::resolve_url(&exchange, &config);
909
910            // SECURITY: Validate URL for SSRF
911            validate_url_for_ssrf(&url, &config)?;
912
913            debug!(
914                correlation_id = %exchange.correlation_id(),
915                method = %method_str,
916                url = %url,
917                "HTTP request"
918            );
919
920            let method = method_str.parse::<reqwest::Method>().map_err(|e| {
921                CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
922            })?;
923
924            let mut request = client.request(method, &url);
925
926            if let Some(timeout) = config.response_timeout {
927                request = request.timeout(timeout);
928            }
929
930            for (key, value) in &exchange.input.headers {
931                if !key.starts_with("Camel")
932                    && let Some(val_str) = value.as_str()
933                    && let (Ok(name), Ok(val)) = (
934                        reqwest::header::HeaderName::from_bytes(key.as_bytes()),
935                        reqwest::header::HeaderValue::from_str(val_str),
936                    )
937                {
938                    request = request.header(name, val);
939                }
940            }
941
942            if let Some(body_bytes) = HttpProducer::body_to_bytes(&exchange.input.body) {
943                request = request.body(body_bytes);
944            }
945
946            let response = request
947                .send()
948                .await
949                .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
950
951            let status_code = response.status().as_u16();
952            let status_text = response
953                .status()
954                .canonical_reason()
955                .unwrap_or("Unknown")
956                .to_string();
957
958            for (key, value) in response.headers() {
959                if let Ok(val_str) = value.to_str() {
960                    exchange
961                        .input
962                        .set_header(key.as_str(), serde_json::Value::String(val_str.to_string()));
963                }
964            }
965
966            exchange.input.set_header(
967                "CamelHttpResponseCode",
968                serde_json::Value::Number(status_code.into()),
969            );
970            exchange.input.set_header(
971                "CamelHttpResponseText",
972                serde_json::Value::String(status_text.clone()),
973            );
974
975            let response_body = response.bytes().await.map_err(|e| {
976                CamelError::ProcessorError(format!("Failed to read response body: {e}"))
977            })?;
978
979            if config.throw_exception_on_failure
980                && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
981            {
982                return Err(CamelError::HttpOperationFailed {
983                    status_code,
984                    status_text,
985                    response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
986                });
987            }
988
989            if !response_body.is_empty() {
990                exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
991            }
992
993            debug!(
994                correlation_id = %exchange.correlation_id(),
995                status = status_code,
996                url = %url,
997                "HTTP response"
998            );
999            Ok(exchange)
1000        })
1001    }
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006    use super::*;
1007    use camel_api::Message;
1008    use std::sync::Arc;
1009    use std::time::Duration;
1010    use tokio::sync::Mutex;
1011
1012    // NullRouteController for testing
1013    struct NullRouteController;
1014    #[async_trait::async_trait]
1015    impl camel_api::RouteController for NullRouteController {
1016        async fn start_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1017            Ok(())
1018        }
1019        async fn stop_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1020            Ok(())
1021        }
1022        async fn restart_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1023            Ok(())
1024        }
1025        async fn suspend_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1026            Ok(())
1027        }
1028        async fn resume_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1029            Ok(())
1030        }
1031        fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
1032            None
1033        }
1034        async fn start_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
1035            Ok(())
1036        }
1037        async fn stop_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
1038            Ok(())
1039        }
1040    }
1041
1042    fn test_producer_ctx() -> ProducerContext {
1043        ProducerContext::new(Arc::new(Mutex::new(NullRouteController)))
1044    }
1045
1046    #[test]
1047    fn test_http_config_defaults() {
1048        let config = HttpConfig::from_uri("http://localhost:8080/api").unwrap();
1049        assert_eq!(config.base_url, "http://localhost:8080/api");
1050        assert!(config.http_method.is_none());
1051        assert!(config.throw_exception_on_failure);
1052        assert_eq!(config.ok_status_code_range, (200, 299));
1053        assert!(!config.follow_redirects);
1054        assert_eq!(config.connect_timeout, Duration::from_millis(30000));
1055        assert!(config.response_timeout.is_none());
1056    }
1057
1058    #[test]
1059    fn test_http_config_with_options() {
1060        let config = HttpConfig::from_uri(
1061            "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1062        ).unwrap();
1063        assert_eq!(config.base_url, "https://api.example.com/v1");
1064        assert_eq!(config.http_method, Some("PUT".to_string()));
1065        assert!(!config.throw_exception_on_failure);
1066        assert!(config.follow_redirects);
1067        assert_eq!(config.connect_timeout, Duration::from_millis(5000));
1068        assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1069    }
1070
1071    #[test]
1072    fn test_http_config_ok_status_range() {
1073        let config =
1074            HttpConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1075        assert_eq!(config.ok_status_code_range, (200, 204));
1076    }
1077
1078    #[test]
1079    fn test_http_config_wrong_scheme() {
1080        let result = HttpConfig::from_uri("file:/tmp");
1081        assert!(result.is_err());
1082    }
1083
1084    #[test]
1085    fn test_http_component_scheme() {
1086        let component = HttpComponent::new();
1087        assert_eq!(component.scheme(), "http");
1088    }
1089
1090    #[test]
1091    fn test_https_component_scheme() {
1092        let component = HttpsComponent::new();
1093        assert_eq!(component.scheme(), "https");
1094    }
1095
1096    #[test]
1097    fn test_http_endpoint_creates_consumer() {
1098        let component = HttpComponent::new();
1099        let endpoint = component
1100            .create_endpoint("http://0.0.0.0:19100/test")
1101            .unwrap();
1102        assert!(endpoint.create_consumer().is_ok());
1103    }
1104
1105    #[test]
1106    fn test_https_endpoint_creates_consumer() {
1107        let component = HttpsComponent::new();
1108        let endpoint = component
1109            .create_endpoint("https://0.0.0.0:8443/test")
1110            .unwrap();
1111        assert!(endpoint.create_consumer().is_ok());
1112    }
1113
1114    #[test]
1115    fn test_http_endpoint_creates_producer() {
1116        let ctx = test_producer_ctx();
1117        let component = HttpComponent::new();
1118        let endpoint = component.create_endpoint("http://localhost/api").unwrap();
1119        assert!(endpoint.create_producer(&ctx).is_ok());
1120    }
1121
1122    // -----------------------------------------------------------------------
1123    // Producer tests
1124    // -----------------------------------------------------------------------
1125
1126    async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
1127        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1128        let addr = listener.local_addr().unwrap();
1129        let url = format!("http://127.0.0.1:{}", addr.port());
1130
1131        let handle = tokio::spawn(async move {
1132            loop {
1133                if let Ok((mut stream, _)) = listener.accept().await {
1134                    tokio::spawn(async move {
1135                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1136                        let mut buf = vec![0u8; 4096];
1137                        let n = stream.read(&mut buf).await.unwrap_or(0);
1138                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
1139
1140                        let method = request.split_whitespace().next().unwrap_or("GET");
1141
1142                        let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
1143                        let response = format!(
1144                            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
1145                            body.len(),
1146                            body
1147                        );
1148                        let _ = stream.write_all(response.as_bytes()).await;
1149                    });
1150                }
1151            }
1152        });
1153
1154        (url, handle)
1155    }
1156
1157    async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
1158        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1159        let addr = listener.local_addr().unwrap();
1160        let url = format!("http://127.0.0.1:{}", addr.port());
1161
1162        let handle = tokio::spawn(async move {
1163            loop {
1164                if let Ok((mut stream, _)) = listener.accept().await {
1165                    let status = status;
1166                    tokio::spawn(async move {
1167                        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1168                        let mut buf = vec![0u8; 4096];
1169                        let _ = stream.read(&mut buf).await;
1170
1171                        let status_text = match status {
1172                            404 => "Not Found",
1173                            500 => "Internal Server Error",
1174                            _ => "Error",
1175                        };
1176                        let body = "error body";
1177                        let response = format!(
1178                            "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
1179                            status,
1180                            status_text,
1181                            body.len(),
1182                            body
1183                        );
1184                        let _ = stream.write_all(response.as_bytes()).await;
1185                    });
1186                }
1187            }
1188        });
1189
1190        (url, handle)
1191    }
1192
1193    #[tokio::test]
1194    async fn test_http_producer_get_request() {
1195        use tower::ServiceExt;
1196
1197        let (url, _handle) = start_test_server().await;
1198        let ctx = test_producer_ctx();
1199
1200        let component = HttpComponent::new();
1201        let endpoint = component
1202            .create_endpoint(&format!("{url}/api/test?allowPrivateIps=true"))
1203            .unwrap();
1204        let producer = endpoint.create_producer(&ctx).unwrap();
1205
1206        let exchange = Exchange::new(Message::default());
1207        let result = producer.oneshot(exchange).await.unwrap();
1208
1209        let status = result
1210            .input
1211            .header("CamelHttpResponseCode")
1212            .and_then(|v| v.as_u64())
1213            .unwrap();
1214        assert_eq!(status, 200);
1215
1216        assert!(!result.input.body.is_empty());
1217    }
1218
1219    #[tokio::test]
1220    async fn test_http_producer_post_with_body() {
1221        use tower::ServiceExt;
1222
1223        let (url, _handle) = start_test_server().await;
1224        let ctx = test_producer_ctx();
1225
1226        let component = HttpComponent::new();
1227        let endpoint = component
1228            .create_endpoint(&format!("{url}/api/data?allowPrivateIps=true"))
1229            .unwrap();
1230        let producer = endpoint.create_producer(&ctx).unwrap();
1231
1232        let exchange = Exchange::new(Message::new("request body"));
1233        let result = producer.oneshot(exchange).await.unwrap();
1234
1235        let status = result
1236            .input
1237            .header("CamelHttpResponseCode")
1238            .and_then(|v| v.as_u64())
1239            .unwrap();
1240        assert_eq!(status, 200);
1241    }
1242
1243    #[tokio::test]
1244    async fn test_http_producer_method_from_header() {
1245        use tower::ServiceExt;
1246
1247        let (url, _handle) = start_test_server().await;
1248        let ctx = test_producer_ctx();
1249
1250        let component = HttpComponent::new();
1251        let endpoint = component
1252            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1253            .unwrap();
1254        let producer = endpoint.create_producer(&ctx).unwrap();
1255
1256        let mut exchange = Exchange::new(Message::default());
1257        exchange.input.set_header(
1258            "CamelHttpMethod",
1259            serde_json::Value::String("DELETE".to_string()),
1260        );
1261
1262        let result = producer.oneshot(exchange).await.unwrap();
1263        let status = result
1264            .input
1265            .header("CamelHttpResponseCode")
1266            .and_then(|v| v.as_u64())
1267            .unwrap();
1268        assert_eq!(status, 200);
1269    }
1270
1271    #[tokio::test]
1272    async fn test_http_producer_forced_method() {
1273        use tower::ServiceExt;
1274
1275        let (url, _handle) = start_test_server().await;
1276        let ctx = test_producer_ctx();
1277
1278        let component = HttpComponent::new();
1279        let endpoint = component
1280            .create_endpoint(&format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"))
1281            .unwrap();
1282        let producer = endpoint.create_producer(&ctx).unwrap();
1283
1284        let exchange = Exchange::new(Message::default());
1285        let result = producer.oneshot(exchange).await.unwrap();
1286
1287        let status = result
1288            .input
1289            .header("CamelHttpResponseCode")
1290            .and_then(|v| v.as_u64())
1291            .unwrap();
1292        assert_eq!(status, 200);
1293    }
1294
1295    #[tokio::test]
1296    async fn test_http_producer_throw_exception_on_failure() {
1297        use tower::ServiceExt;
1298
1299        let (url, _handle) = start_status_server(404).await;
1300        let ctx = test_producer_ctx();
1301
1302        let component = HttpComponent::new();
1303        let endpoint = component
1304            .create_endpoint(&format!("{url}/not-found?allowPrivateIps=true"))
1305            .unwrap();
1306        let producer = endpoint.create_producer(&ctx).unwrap();
1307
1308        let exchange = Exchange::new(Message::default());
1309        let result = producer.oneshot(exchange).await;
1310        assert!(result.is_err());
1311
1312        match result.unwrap_err() {
1313            CamelError::HttpOperationFailed { status_code, .. } => {
1314                assert_eq!(status_code, 404);
1315            }
1316            e => panic!("Expected HttpOperationFailed, got: {e}"),
1317        }
1318    }
1319
1320    #[tokio::test]
1321    async fn test_http_producer_no_throw_on_failure() {
1322        use tower::ServiceExt;
1323
1324        let (url, _handle) = start_status_server(500).await;
1325        let ctx = test_producer_ctx();
1326
1327        let component = HttpComponent::new();
1328        let endpoint = component
1329            .create_endpoint(&format!(
1330                "{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"
1331            ))
1332            .unwrap();
1333        let producer = endpoint.create_producer(&ctx).unwrap();
1334
1335        let exchange = Exchange::new(Message::default());
1336        let result = producer.oneshot(exchange).await.unwrap();
1337
1338        let status = result
1339            .input
1340            .header("CamelHttpResponseCode")
1341            .and_then(|v| v.as_u64())
1342            .unwrap();
1343        assert_eq!(status, 500);
1344    }
1345
1346    #[tokio::test]
1347    async fn test_http_producer_uri_override() {
1348        use tower::ServiceExt;
1349
1350        let (url, _handle) = start_test_server().await;
1351        let ctx = test_producer_ctx();
1352
1353        let component = HttpComponent::new();
1354        let endpoint = component
1355            .create_endpoint("http://localhost:1/does-not-exist?allowPrivateIps=true")
1356            .unwrap();
1357        let producer = endpoint.create_producer(&ctx).unwrap();
1358
1359        let mut exchange = Exchange::new(Message::default());
1360        exchange.input.set_header(
1361            "CamelHttpUri",
1362            serde_json::Value::String(format!("{url}/api")),
1363        );
1364
1365        let result = producer.oneshot(exchange).await.unwrap();
1366        let status = result
1367            .input
1368            .header("CamelHttpResponseCode")
1369            .and_then(|v| v.as_u64())
1370            .unwrap();
1371        assert_eq!(status, 200);
1372    }
1373
1374    #[tokio::test]
1375    async fn test_http_producer_response_headers_mapped() {
1376        use tower::ServiceExt;
1377
1378        let (url, _handle) = start_test_server().await;
1379        let ctx = test_producer_ctx();
1380
1381        let component = HttpComponent::new();
1382        let endpoint = component
1383            .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1384            .unwrap();
1385        let producer = endpoint.create_producer(&ctx).unwrap();
1386
1387        let exchange = Exchange::new(Message::default());
1388        let result = producer.oneshot(exchange).await.unwrap();
1389
1390        assert!(
1391            result.input.header("content-type").is_some()
1392                || result.input.header("Content-Type").is_some()
1393        );
1394        assert!(result.input.header("CamelHttpResponseText").is_some());
1395    }
1396
1397    // -----------------------------------------------------------------------
1398    // Bug fix tests: Client configuration per-endpoint
1399    // -----------------------------------------------------------------------
1400
1401    async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
1402        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1403        let addr = listener.local_addr().unwrap();
1404        let url = format!("http://127.0.0.1:{}", addr.port());
1405
1406        let handle = tokio::spawn(async move {
1407            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1408            loop {
1409                if let Ok((mut stream, _)) = listener.accept().await {
1410                    tokio::spawn(async move {
1411                        let mut buf = vec![0u8; 4096];
1412                        let n = stream.read(&mut buf).await.unwrap_or(0);
1413                        let request = String::from_utf8_lossy(&buf[..n]).to_string();
1414
1415                        // Check if this is a request to /final
1416                        if request.contains("GET /final") {
1417                            let body = r#"{"status":"final"}"#;
1418                            let response = format!(
1419                                "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1420                                body.len(),
1421                                body
1422                            );
1423                            let _ = stream.write_all(response.as_bytes()).await;
1424                        } else {
1425                            // Redirect to /final
1426                            let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
1427                            let _ = stream.write_all(response.as_bytes()).await;
1428                        }
1429                    });
1430                }
1431            }
1432        });
1433
1434        (url, handle)
1435    }
1436
1437    #[tokio::test]
1438    async fn test_follow_redirects_false_does_not_follow() {
1439        use tower::ServiceExt;
1440
1441        let (url, _handle) = start_redirect_server().await;
1442        let ctx = test_producer_ctx();
1443
1444        let component = HttpComponent::new();
1445        let endpoint = component
1446            .create_endpoint(&format!(
1447                "{url}?followRedirects=false&throwExceptionOnFailure=false&allowPrivateIps=true"
1448            ))
1449            .unwrap();
1450        let producer = endpoint.create_producer(&ctx).unwrap();
1451
1452        let exchange = Exchange::new(Message::default());
1453        let result = producer.oneshot(exchange).await.unwrap();
1454
1455        // Should get 302, NOT follow redirect to 200
1456        let status = result
1457            .input
1458            .header("CamelHttpResponseCode")
1459            .and_then(|v| v.as_u64())
1460            .unwrap();
1461        assert_eq!(
1462            status, 302,
1463            "Should NOT follow redirect when followRedirects=false"
1464        );
1465    }
1466
1467    #[tokio::test]
1468    async fn test_follow_redirects_true_follows_redirect() {
1469        use tower::ServiceExt;
1470
1471        let (url, _handle) = start_redirect_server().await;
1472        let ctx = test_producer_ctx();
1473
1474        let component = HttpComponent::new();
1475        let endpoint = component
1476            .create_endpoint(&format!("{url}?followRedirects=true&allowPrivateIps=true"))
1477            .unwrap();
1478        let producer = endpoint.create_producer(&ctx).unwrap();
1479
1480        let exchange = Exchange::new(Message::default());
1481        let result = producer.oneshot(exchange).await.unwrap();
1482
1483        // Should follow redirect and get 200
1484        let status = result
1485            .input
1486            .header("CamelHttpResponseCode")
1487            .and_then(|v| v.as_u64())
1488            .unwrap();
1489        assert_eq!(
1490            status, 200,
1491            "Should follow redirect when followRedirects=true"
1492        );
1493    }
1494
1495    #[tokio::test]
1496    async fn test_query_params_forwarded_to_http_request() {
1497        use tower::ServiceExt;
1498
1499        let (url, _handle) = start_test_server().await;
1500        let ctx = test_producer_ctx();
1501
1502        let component = HttpComponent::new();
1503        // apiKey is NOT a Camel option, should be forwarded as query param
1504        let endpoint = component
1505            .create_endpoint(&format!(
1506                "{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"
1507            ))
1508            .unwrap();
1509        let producer = endpoint.create_producer(&ctx).unwrap();
1510
1511        let exchange = Exchange::new(Message::default());
1512        let result = producer.oneshot(exchange).await.unwrap();
1513
1514        // The test server returns the request info in response
1515        // We just verify it succeeds (the query param was sent)
1516        let status = result
1517            .input
1518            .header("CamelHttpResponseCode")
1519            .and_then(|v| v.as_u64())
1520            .unwrap();
1521        assert_eq!(status, 200);
1522    }
1523
1524    #[tokio::test]
1525    async fn test_non_camel_query_params_are_forwarded() {
1526        // This test verifies Bug #3 fix: non-Camel options should be forwarded
1527        // We'll test the config parsing, not the actual HTTP call
1528        let config = HttpConfig::from_uri(
1529            "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
1530        )
1531        .unwrap();
1532
1533        // apiKey and token are NOT Camel options, should be forwarded
1534        assert!(
1535            config.query_params.contains_key("apiKey"),
1536            "apiKey should be preserved"
1537        );
1538        assert!(
1539            config.query_params.contains_key("token"),
1540            "token should be preserved"
1541        );
1542        assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
1543        assert_eq!(config.query_params.get("token").unwrap(), "abc456");
1544
1545        // httpMethod IS a Camel option, should NOT be in query_params
1546        assert!(
1547            !config.query_params.contains_key("httpMethod"),
1548            "httpMethod should not be forwarded"
1549        );
1550    }
1551
1552    // -----------------------------------------------------------------------
1553    // SSRF Protection tests
1554    // -----------------------------------------------------------------------
1555
1556    #[tokio::test]
1557    async fn test_http_producer_blocks_metadata_endpoint() {
1558        use tower::ServiceExt;
1559
1560        let ctx = test_producer_ctx();
1561        let component = HttpComponent::new();
1562        let endpoint = component
1563            .create_endpoint("http://example.com/api?allowPrivateIps=false")
1564            .unwrap();
1565        let producer = endpoint.create_producer(&ctx).unwrap();
1566
1567        let mut exchange = Exchange::new(Message::default());
1568        exchange.input.set_header(
1569            "CamelHttpUri",
1570            serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
1571        );
1572
1573        let result = producer.oneshot(exchange).await;
1574        assert!(result.is_err(), "Should block AWS metadata endpoint");
1575
1576        let err = result.unwrap_err();
1577        assert!(
1578            err.to_string().contains("Private IP"),
1579            "Error should mention private IP blocking, got: {}",
1580            err
1581        );
1582    }
1583
1584    #[test]
1585    fn test_ssrf_config_defaults() {
1586        let config = HttpConfig::from_uri("http://example.com/api").unwrap();
1587        assert!(
1588            !config.allow_private_ips,
1589            "Private IPs should be blocked by default"
1590        );
1591        assert!(
1592            config.blocked_hosts.is_empty(),
1593            "Blocked hosts should be empty by default"
1594        );
1595    }
1596
1597    #[test]
1598    fn test_ssrf_config_allow_private_ips() {
1599        let config = HttpConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
1600        assert!(
1601            config.allow_private_ips,
1602            "Private IPs should be allowed when explicitly set"
1603        );
1604    }
1605
1606    #[test]
1607    fn test_ssrf_config_blocked_hosts() {
1608        let config =
1609            HttpConfig::from_uri("http://example.com/api?blockedHosts=evil.com,malware.net")
1610                .unwrap();
1611        assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
1612    }
1613
1614    #[tokio::test]
1615    async fn test_http_producer_blocks_localhost() {
1616        use tower::ServiceExt;
1617
1618        let ctx = test_producer_ctx();
1619        let component = HttpComponent::new();
1620        let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1621        let producer = endpoint.create_producer(&ctx).unwrap();
1622
1623        let mut exchange = Exchange::new(Message::default());
1624        exchange.input.set_header(
1625            "CamelHttpUri",
1626            serde_json::Value::String("http://localhost:8080/internal".to_string()),
1627        );
1628
1629        let result = producer.oneshot(exchange).await;
1630        assert!(result.is_err(), "Should block localhost");
1631    }
1632
1633    #[tokio::test]
1634    async fn test_http_producer_blocks_loopback_ip() {
1635        use tower::ServiceExt;
1636
1637        let ctx = test_producer_ctx();
1638        let component = HttpComponent::new();
1639        let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1640        let producer = endpoint.create_producer(&ctx).unwrap();
1641
1642        let mut exchange = Exchange::new(Message::default());
1643        exchange.input.set_header(
1644            "CamelHttpUri",
1645            serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
1646        );
1647
1648        let result = producer.oneshot(exchange).await;
1649        assert!(result.is_err(), "Should block loopback IP");
1650    }
1651
1652    #[tokio::test]
1653    async fn test_http_producer_allows_private_ip_when_enabled() {
1654        use tower::ServiceExt;
1655
1656        let ctx = test_producer_ctx();
1657        let component = HttpComponent::new();
1658        // With allowPrivateIps=true, the validation should pass
1659        // (actual connection will fail, but that's expected)
1660        let endpoint = component
1661            .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true")
1662            .unwrap();
1663        let producer = endpoint.create_producer(&ctx).unwrap();
1664
1665        let exchange = Exchange::new(Message::default());
1666
1667        // The request will fail because we can't connect, but it should NOT fail
1668        // due to SSRF protection
1669        let result = producer.oneshot(exchange).await;
1670        // We expect connection error, not SSRF error
1671        if let Err(ref e) = result {
1672            let err_str = e.to_string();
1673            assert!(
1674                !err_str.contains("Private IP") && !err_str.contains("not allowed"),
1675                "Should not be SSRF error, got: {}",
1676                err_str
1677            );
1678        }
1679    }
1680
1681    // -----------------------------------------------------------------------
1682    // HttpServerConfig tests
1683    // -----------------------------------------------------------------------
1684
1685    #[test]
1686    fn test_http_server_config_parse() {
1687        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
1688        assert_eq!(cfg.host, "0.0.0.0");
1689        assert_eq!(cfg.port, 8080);
1690        assert_eq!(cfg.path, "/orders");
1691    }
1692
1693    #[test]
1694    fn test_http_server_config_default_path() {
1695        let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
1696        assert_eq!(cfg.path, "/");
1697    }
1698
1699    #[test]
1700    fn test_http_server_config_wrong_scheme() {
1701        assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
1702    }
1703
1704    #[test]
1705    fn test_http_server_config_invalid_port() {
1706        assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
1707    }
1708
1709    #[test]
1710    fn test_request_envelope_and_reply_are_send() {
1711        fn assert_send<T: Send>() {}
1712        assert_send::<RequestEnvelope>();
1713        assert_send::<HttpReply>();
1714    }
1715
1716    // -----------------------------------------------------------------------
1717    // ServerRegistry tests
1718    // -----------------------------------------------------------------------
1719
1720    #[test]
1721    fn test_server_registry_global_is_singleton() {
1722        let r1 = ServerRegistry::global();
1723        let r2 = ServerRegistry::global();
1724        assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
1725    }
1726
1727    // -----------------------------------------------------------------------
1728    // Axum dispatch handler tests
1729    // -----------------------------------------------------------------------
1730
1731    #[tokio::test]
1732    async fn test_dispatch_handler_returns_404_for_unknown_path() {
1733        let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
1734        // Nothing registered in the dispatch table
1735        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1736        let port = listener.local_addr().unwrap().port();
1737        tokio::spawn(run_axum_server(listener, dispatch));
1738
1739        // Wait for server to start
1740        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1741
1742        let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
1743            .await
1744            .unwrap();
1745        assert_eq!(resp.status().as_u16(), 404);
1746    }
1747
1748    // -----------------------------------------------------------------------
1749    // HttpConsumer tests
1750    // -----------------------------------------------------------------------
1751
1752    #[tokio::test]
1753    async fn test_http_consumer_start_registers_path() {
1754        use camel_component::ConsumerContext;
1755
1756        // Get an OS-assigned free port
1757        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1758        let port = listener.local_addr().unwrap().port();
1759        drop(listener); // Release port — ServerRegistry will rebind it
1760
1761        let consumer_cfg = HttpServerConfig {
1762            host: "127.0.0.1".to_string(),
1763            port,
1764            path: "/ping".to_string(),
1765        };
1766        let mut consumer = HttpConsumer::new(consumer_cfg);
1767
1768        let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component::ExchangeEnvelope>(16);
1769        let token = tokio_util::sync::CancellationToken::new();
1770        let ctx = ConsumerContext::new(tx, token.clone());
1771
1772        tokio::spawn(async move {
1773            consumer.start(ctx).await.unwrap();
1774        });
1775
1776        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1777
1778        let client = reqwest::Client::new();
1779        let resp_future = client
1780            .post(format!("http://127.0.0.1:{port}/ping"))
1781            .body("hello world")
1782            .send();
1783
1784        let (http_result, _) = tokio::join!(resp_future, async {
1785            if let Some(mut envelope) = rx.recv().await {
1786                // Set a custom status code
1787                envelope.exchange.input.set_header(
1788                    "CamelHttpResponseCode",
1789                    serde_json::Value::Number(201.into()),
1790                );
1791                if let Some(reply_tx) = envelope.reply_tx {
1792                    let _ = reply_tx.send(Ok(envelope.exchange));
1793                }
1794            }
1795        });
1796
1797        let resp = http_result.unwrap();
1798        assert_eq!(resp.status().as_u16(), 201);
1799
1800        token.cancel();
1801    }
1802
1803    // -----------------------------------------------------------------------
1804    // Integration tests
1805    // -----------------------------------------------------------------------
1806
1807    #[tokio::test]
1808    async fn test_integration_single_consumer_round_trip() {
1809        use camel_component::{ConsumerContext, ExchangeEnvelope};
1810
1811        // Get an OS-assigned free port (ephemeral)
1812        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1813        let port = listener.local_addr().unwrap().port();
1814        drop(listener); // Release — ServerRegistry will rebind
1815
1816        let component = HttpComponent::new();
1817        let endpoint = component
1818            .create_endpoint(&format!("http://127.0.0.1:{port}/echo"))
1819            .unwrap();
1820        let mut consumer = endpoint.create_consumer().unwrap();
1821
1822        let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
1823        let token = tokio_util::sync::CancellationToken::new();
1824        let ctx = ConsumerContext::new(tx, token.clone());
1825
1826        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
1827        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1828
1829        let client = reqwest::Client::new();
1830        let send_fut = client
1831            .post(format!("http://127.0.0.1:{port}/echo"))
1832            .header("Content-Type", "text/plain")
1833            .body("ping")
1834            .send();
1835
1836        let (http_result, _) = tokio::join!(send_fut, async {
1837            if let Some(mut envelope) = rx.recv().await {
1838                assert_eq!(
1839                    envelope.exchange.input.header("CamelHttpMethod"),
1840                    Some(&serde_json::Value::String("POST".into()))
1841                );
1842                assert_eq!(
1843                    envelope.exchange.input.header("CamelHttpPath"),
1844                    Some(&serde_json::Value::String("/echo".into()))
1845                );
1846                envelope.exchange.input.body = camel_api::body::Body::Text("pong".to_string());
1847                if let Some(reply_tx) = envelope.reply_tx {
1848                    let _ = reply_tx.send(Ok(envelope.exchange));
1849                }
1850            }
1851        });
1852
1853        let resp = http_result.unwrap();
1854        assert_eq!(resp.status().as_u16(), 200);
1855        let body = resp.text().await.unwrap();
1856        assert_eq!(body, "pong");
1857
1858        token.cancel();
1859    }
1860
1861    #[tokio::test]
1862    async fn test_integration_two_consumers_shared_port() {
1863        use camel_component::{ConsumerContext, ExchangeEnvelope};
1864
1865        // Get an OS-assigned free port (ephemeral)
1866        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1867        let port = listener.local_addr().unwrap().port();
1868        drop(listener);
1869
1870        let component = HttpComponent::new();
1871
1872        // Consumer A: /hello
1873        let endpoint_a = component
1874            .create_endpoint(&format!("http://127.0.0.1:{port}/hello"))
1875            .unwrap();
1876        let mut consumer_a = endpoint_a.create_consumer().unwrap();
1877
1878        // Consumer B: /world
1879        let endpoint_b = component
1880            .create_endpoint(&format!("http://127.0.0.1:{port}/world"))
1881            .unwrap();
1882        let mut consumer_b = endpoint_b.create_consumer().unwrap();
1883
1884        let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
1885        let token_a = tokio_util::sync::CancellationToken::new();
1886        let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
1887
1888        let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
1889        let token_b = tokio_util::sync::CancellationToken::new();
1890        let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
1891
1892        tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
1893        tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
1894        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1895
1896        let client = reqwest::Client::new();
1897
1898        // Request to /hello
1899        let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
1900        let (resp_hello, _) = tokio::join!(fut_hello, async {
1901            if let Some(mut envelope) = rx_a.recv().await {
1902                envelope.exchange.input.body =
1903                    camel_api::body::Body::Text("hello-response".to_string());
1904                if let Some(reply_tx) = envelope.reply_tx {
1905                    let _ = reply_tx.send(Ok(envelope.exchange));
1906                }
1907            }
1908        });
1909
1910        // Request to /world
1911        let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
1912        let (resp_world, _) = tokio::join!(fut_world, async {
1913            if let Some(mut envelope) = rx_b.recv().await {
1914                envelope.exchange.input.body =
1915                    camel_api::body::Body::Text("world-response".to_string());
1916                if let Some(reply_tx) = envelope.reply_tx {
1917                    let _ = reply_tx.send(Ok(envelope.exchange));
1918                }
1919            }
1920        });
1921
1922        let body_a = resp_hello.unwrap().text().await.unwrap();
1923        let body_b = resp_world.unwrap().text().await.unwrap();
1924
1925        assert_eq!(body_a, "hello-response");
1926        assert_eq!(body_b, "world-response");
1927
1928        token_a.cancel();
1929        token_b.cancel();
1930    }
1931
1932    #[tokio::test]
1933    async fn test_integration_unregistered_path_returns_404() {
1934        use camel_component::{ConsumerContext, ExchangeEnvelope};
1935
1936        // Get an OS-assigned free port (ephemeral)
1937        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1938        let port = listener.local_addr().unwrap().port();
1939        drop(listener);
1940
1941        let component = HttpComponent::new();
1942        let endpoint = component
1943            .create_endpoint(&format!("http://127.0.0.1:{port}/registered"))
1944            .unwrap();
1945        let mut consumer = endpoint.create_consumer().unwrap();
1946
1947        let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
1948        let token = tokio_util::sync::CancellationToken::new();
1949        let ctx = ConsumerContext::new(tx, token.clone());
1950
1951        tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
1952        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1953
1954        let client = reqwest::Client::new();
1955        let resp = client
1956            .get(format!("http://127.0.0.1:{port}/not-there"))
1957            .send()
1958            .await
1959            .unwrap();
1960        assert_eq!(resp.status().as_u16(), 404);
1961
1962        token.cancel();
1963    }
1964
1965    #[test]
1966    fn test_http_consumer_declares_concurrent() {
1967        use camel_component::ConcurrencyModel;
1968
1969        let config = HttpServerConfig {
1970            host: "127.0.0.1".to_string(),
1971            port: 19999,
1972            path: "/test".to_string(),
1973        };
1974        let consumer = HttpConsumer::new(config);
1975        assert_eq!(
1976            consumer.concurrency_model(),
1977            ConcurrencyModel::Concurrent { max: None }
1978        );
1979    }
1980}