1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex, OnceLock};
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use tokio::sync::RwLock;
9use tower::Service;
10use tracing::debug;
11
12use camel_api::{BoxProcessor, CamelError, Exchange, body::Body};
13use camel_component::{Component, Consumer, Endpoint, ProducerContext};
14use camel_endpoint::parse_uri;
15
16#[derive(Debug, Clone)]
21pub struct HttpConfig {
22 pub base_url: String,
23 pub http_method: Option<String>,
24 pub throw_exception_on_failure: bool,
25 pub ok_status_code_range: (u16, u16),
26 pub follow_redirects: bool,
27 pub connect_timeout: Duration,
28 pub response_timeout: Option<Duration>,
29 pub query_params: HashMap<String, String>,
30 pub allow_private_ips: bool,
32 pub blocked_hosts: Vec<String>,
33}
34
35impl HttpConfig {
36 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
37 let parts = parse_uri(uri)?;
38 if parts.scheme != "http" && parts.scheme != "https" {
39 return Err(CamelError::InvalidUri(format!(
40 "expected scheme 'http' or 'https', got '{}'",
41 parts.scheme
42 )));
43 }
44
45 let base_url = format!("{}:{}", parts.scheme, parts.path);
46
47 let http_method = parts.params.get("httpMethod").cloned();
48
49 let throw_exception_on_failure = parts
50 .params
51 .get("throwExceptionOnFailure")
52 .map(|v| v != "false")
53 .unwrap_or(true);
54
55 let ok_status_code_range = parts
56 .params
57 .get("okStatusCodeRange")
58 .and_then(|v| {
59 let (start, end) = v.split_once('-')?;
60 Some((start.parse::<u16>().ok()?, end.parse::<u16>().ok()?))
61 })
62 .unwrap_or((200, 299));
63
64 let follow_redirects = parts
65 .params
66 .get("followRedirects")
67 .map(|v| v == "true")
68 .unwrap_or(false);
69
70 let connect_timeout = parts
71 .params
72 .get("connectTimeout")
73 .and_then(|v| v.parse::<u64>().ok())
74 .map(Duration::from_millis)
75 .unwrap_or(Duration::from_millis(30000));
76
77 let response_timeout = parts
78 .params
79 .get("responseTimeout")
80 .and_then(|v| v.parse::<u64>().ok())
81 .map(Duration::from_millis);
82
83 let allow_private_ips = parts
85 .params
86 .get("allowPrivateIps")
87 .map(|v| v == "true")
88 .unwrap_or(false); let blocked_hosts = parts
91 .params
92 .get("blockedHosts")
93 .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
94 .unwrap_or_default();
95
96 let camel_options = [
98 "httpMethod",
99 "throwExceptionOnFailure",
100 "okStatusCodeRange",
101 "followRedirects",
102 "connectTimeout",
103 "responseTimeout",
104 "allowPrivateIps",
105 "blockedHosts",
106 ];
107
108 let query_params: HashMap<String, String> = parts
109 .params
110 .into_iter()
111 .filter(|(k, _)| !camel_options.contains(&k.as_str()))
112 .map(|(k, v)| (k.clone(), v.clone()))
113 .collect();
114
115 Ok(Self {
116 base_url,
117 http_method,
118 throw_exception_on_failure,
119 ok_status_code_range: (ok_status_code_range.0, ok_status_code_range.1),
120 follow_redirects,
121 connect_timeout,
122 response_timeout,
123 query_params,
124 allow_private_ips,
125 blocked_hosts,
126 })
127 }
128}
129
130#[derive(Debug, Clone)]
136pub struct HttpServerConfig {
137 pub host: String,
139 pub port: u16,
141 pub path: String,
143}
144
145impl HttpServerConfig {
146 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
147 let parts = parse_uri(uri)?;
148 if parts.scheme != "http" && parts.scheme != "https" {
149 return Err(CamelError::InvalidUri(format!(
150 "expected scheme 'http' or 'https', got '{}'",
151 parts.scheme
152 )));
153 }
154
155 let authority_and_path = parts.path.trim_start_matches('/');
158
159 let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
161 (&authority_and_path[..idx], &authority_and_path[idx..])
162 } else {
163 (authority_and_path, "/")
164 };
165
166 let path = if path_suffix.is_empty() {
167 "/"
168 } else {
169 path_suffix
170 }
171 .to_string();
172
173 let (host, port) = if let Some(colon) = authority.rfind(':') {
175 let port_str = &authority[colon + 1..];
176 match port_str.parse::<u16>() {
177 Ok(p) => (authority[..colon].to_string(), p),
178 Err(_) => {
179 return Err(CamelError::InvalidUri(format!(
180 "invalid port '{}' in URI '{}'",
181 port_str, uri
182 )));
183 }
184 }
185 } else {
186 (authority.to_string(), 80)
187 };
188
189 Ok(Self { host, port, path })
190 }
191}
192
193pub struct RequestEnvelope {
200 pub method: String,
201 pub path: String,
202 pub query: String,
203 pub headers: http::HeaderMap,
204 pub body: bytes::Bytes,
205 pub reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
206}
207
208#[derive(Debug, Clone)]
210pub struct HttpReply {
211 pub status: u16,
212 pub headers: Vec<(String, String)>,
213 pub body: bytes::Bytes,
214}
215
216pub type DispatchTable = Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
222
223struct ServerHandle {
225 dispatch: DispatchTable,
226 _task: tokio::task::JoinHandle<()>,
228}
229
230pub struct ServerRegistry {
232 inner: Mutex<HashMap<u16, ServerHandle>>,
233}
234
235impl ServerRegistry {
236 pub fn global() -> &'static Self {
238 static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
239 INSTANCE.get_or_init(|| ServerRegistry {
240 inner: Mutex::new(HashMap::new()),
241 })
242 }
243
244 pub async fn get_or_spawn(
247 &'static self,
248 host: &str,
249 port: u16,
250 ) -> Result<DispatchTable, CamelError> {
251 {
253 let guard = self.inner.lock().map_err(|_| {
254 CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
255 })?;
256 if let Some(handle) = guard.get(&port) {
257 return Ok(Arc::clone(&handle.dispatch));
258 }
259 }
260
261 let addr = format!("{}:{}", host, port);
263 let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
264 CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
265 })?;
266
267 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
268 let dispatch_for_server = Arc::clone(&dispatch);
269 let task = tokio::spawn(run_axum_server(listener, dispatch_for_server));
270
271 let mut guard = self.inner.lock().map_err(|_| {
273 CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
274 })?;
275 if let Some(existing) = guard.get(&port) {
278 task.abort();
279 return Ok(Arc::clone(&existing.dispatch));
280 }
281 guard.insert(
282 port,
283 ServerHandle {
284 dispatch: Arc::clone(&dispatch),
285 _task: task,
286 },
287 );
288 Ok(dispatch)
289 }
290}
291
292use axum::{
297 Router,
298 body::Body as AxumBody,
299 extract::{Request, State},
300 http::{Response, StatusCode},
301 response::IntoResponse,
302};
303
304async fn run_axum_server(listener: tokio::net::TcpListener, dispatch: DispatchTable) {
305 let app = Router::new()
306 .fallback(dispatch_handler)
307 .with_state(dispatch);
308
309 axum::serve(listener, app).await.unwrap_or_else(|e| {
310 tracing::error!(error = %e, "Axum server error");
311 });
312}
313
314async fn dispatch_handler(
315 State(dispatch): State<DispatchTable>,
316 req: Request,
317) -> impl IntoResponse {
318 const MAX_REQUEST_BODY: usize = 2 * 1024 * 1024; let method = req.method().to_string();
321 let path = req.uri().path().to_string();
322 let query = req.uri().query().unwrap_or("").to_string();
323 let headers = req.headers().clone();
324
325 let body_bytes = match axum::body::to_bytes(req.into_body(), MAX_REQUEST_BODY).await {
326 Ok(b) => b,
327 Err(_) => {
328 return Response::builder()
329 .status(StatusCode::BAD_REQUEST)
330 .body(AxumBody::empty())
331 .expect(
332 "Response::builder() with a known-valid status code and body is infallible",
333 );
334 }
335 };
336
337 let sender = {
339 let table = dispatch.read().await;
340 table.get(&path).cloned()
341 };
342
343 let Some(sender) = sender else {
344 return Response::builder()
345 .status(StatusCode::NOT_FOUND)
346 .body(AxumBody::from("No consumer registered for this path"))
347 .expect("Response::builder() with a known-valid status code and body is infallible");
348 };
349
350 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
351 let envelope = RequestEnvelope {
352 method,
353 path,
354 query,
355 headers,
356 body: body_bytes,
357 reply_tx,
358 };
359
360 if sender.send(envelope).await.is_err() {
361 return Response::builder()
362 .status(StatusCode::SERVICE_UNAVAILABLE)
363 .body(AxumBody::from("Consumer unavailable"))
364 .expect("Response::builder() with a known-valid status code and body is infallible");
365 }
366
367 match reply_rx.await {
368 Ok(reply) => {
369 let status =
370 StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
371 let mut builder = Response::builder().status(status);
372 for (k, v) in &reply.headers {
373 builder = builder.header(k.as_str(), v.as_str());
374 }
375 builder
376 .body(AxumBody::from(reply.body))
377 .unwrap_or_else(|_| {
378 Response::builder()
379 .status(StatusCode::INTERNAL_SERVER_ERROR)
380 .body(AxumBody::from("Invalid response headers from consumer"))
381 .expect("Response::builder() with a known-valid status code and body is infallible")
382 })
383 }
384 Err(_) => Response::builder()
385 .status(StatusCode::INTERNAL_SERVER_ERROR)
386 .body(AxumBody::from("Pipeline error"))
387 .expect("Response::builder() with a known-valid status code and body is infallible"),
388 }
389}
390
391pub struct HttpConsumer {
396 config: HttpServerConfig,
397}
398
399impl HttpConsumer {
400 pub fn new(config: HttpServerConfig) -> Self {
401 Self { config }
402 }
403}
404
405#[async_trait::async_trait]
406impl Consumer for HttpConsumer {
407 async fn start(&mut self, ctx: camel_component::ConsumerContext) -> Result<(), CamelError> {
408 use camel_api::{Exchange, Message, body::Body};
409
410 let dispatch = ServerRegistry::global()
411 .get_or_spawn(&self.config.host, self.config.port)
412 .await?;
413
414 let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
416 {
417 let mut table = dispatch.write().await;
418 table.insert(self.config.path.clone(), env_tx);
419 }
420
421 let path = self.config.path.clone();
422 let cancel_token = ctx.cancel_token();
423
424 loop {
425 tokio::select! {
426 _ = ctx.cancelled() => {
427 break;
428 }
429 envelope = env_rx.recv() => {
430 let Some(envelope) = envelope else { break; };
431
432 let mut msg = Message::default();
434
435 msg.set_header("CamelHttpMethod",
437 serde_json::Value::String(envelope.method.clone()));
438 msg.set_header("CamelHttpPath",
439 serde_json::Value::String(envelope.path.clone()));
440 msg.set_header("CamelHttpQuery",
441 serde_json::Value::String(envelope.query.clone()));
442
443 for (k, v) in &envelope.headers {
445 if let Ok(val_str) = v.to_str() {
446 msg.set_header(
447 k.as_str(),
448 serde_json::Value::String(val_str.to_string()),
449 );
450 }
451 }
452
453 if !envelope.body.is_empty() {
455 match std::str::from_utf8(&envelope.body) {
456 Ok(text) => msg.body = Body::Text(text.to_string()),
457 Err(_) => msg.body = Body::Bytes(envelope.body.clone()),
458 }
459 }
460
461 let exchange = Exchange::new(msg);
462 let reply_tx = envelope.reply_tx;
463 let sender = ctx.sender().clone();
464 let path_clone = path.clone();
465 let cancel = cancel_token.clone();
466
467 tokio::spawn(async move {
487 if cancel.is_cancelled() {
495 let _ = reply_tx.send(HttpReply {
496 status: 503,
497 headers: vec![],
498 body: bytes::Bytes::from("Service Unavailable"),
499 });
500 return;
501 }
502
503 let (tx, rx) = tokio::sync::oneshot::channel();
505 let envelope = camel_component::consumer::ExchangeEnvelope {
506 exchange,
507 reply_tx: Some(tx),
508 };
509
510 let result = match sender.send(envelope).await {
511 Ok(()) => rx.await.map_err(|_| camel_api::CamelError::ChannelClosed),
512 Err(_) => Err(camel_api::CamelError::ChannelClosed),
513 }
514 .and_then(|r| r);
515
516 let reply = match result {
517 Ok(out) => {
518 let status = out
519 .input
520 .header("CamelHttpResponseCode")
521 .and_then(|v| v.as_u64())
522 .map(|s| s as u16)
523 .unwrap_or(200);
524
525 let body_bytes = match &out.input.body {
526 Body::Empty => bytes::Bytes::new(),
527 Body::Bytes(b) => b.clone(),
528 Body::Text(s) => bytes::Bytes::from(s.clone().into_bytes()),
529 Body::Json(v) => bytes::Bytes::from(v.to_string().into_bytes()),
530 };
531
532 let resp_headers: Vec<(String, String)> = out
533 .input
534 .headers
535 .iter()
536 .filter(|(k, _)| !k.starts_with("Camel"))
538 .filter(|(k, _)| {
541 !matches!(
542 k.to_lowercase().as_str(),
543 "content-length" | "content-type" | "transfer-encoding" | "connection" | "cache-control" | "date" | "pragma" | "trailer" | "upgrade" | "via" | "warning" | "host" | "user-agent" | "accept" | "accept-encoding" | "accept-language" | "accept-charset" | "authorization" | "proxy-authorization" | "cookie" | "expect" | "from" | "if-match" | "if-modified-since" | "if-none-match" | "if-range" | "if-unmodified-since" | "max-forwards" | "proxy-connection" | "range" | "referer" | "te" )
578 })
579 .filter_map(|(k, v)| {
580 v.as_str().map(|s| (k.clone(), s.to_string()))
581 })
582 .collect();
583
584 HttpReply {
585 status,
586 headers: resp_headers,
587 body: body_bytes,
588 }
589 }
590 Err(e) => {
591 tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
592 HttpReply {
593 status: 500,
594 headers: vec![],
595 body: bytes::Bytes::from("Internal Server Error"),
596 }
597 }
598 };
599
600 let _ = reply_tx.send(reply);
602 });
603 }
604 }
605 }
606
607 {
609 let mut table = dispatch.write().await;
610 table.remove(&path);
611 }
612
613 Ok(())
614 }
615
616 async fn stop(&mut self) -> Result<(), CamelError> {
617 Ok(())
618 }
619
620 fn concurrency_model(&self) -> camel_component::ConcurrencyModel {
621 camel_component::ConcurrencyModel::Concurrent { max: None }
622 }
623}
624
625pub struct HttpComponent;
630
631impl HttpComponent {
632 pub fn new() -> Self {
633 Self
634 }
635}
636
637impl Default for HttpComponent {
638 fn default() -> Self {
639 Self::new()
640 }
641}
642
643impl Component for HttpComponent {
644 fn scheme(&self) -> &str {
645 "http"
646 }
647
648 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
649 let config = HttpConfig::from_uri(uri)?;
650 let server_config = HttpServerConfig::from_uri(uri)?;
651 let client = build_client(&config)?;
652 Ok(Box::new(HttpEndpoint {
653 uri: uri.to_string(),
654 config,
655 server_config,
656 client,
657 }))
658 }
659}
660
661pub struct HttpsComponent;
662
663impl HttpsComponent {
664 pub fn new() -> Self {
665 Self
666 }
667}
668
669impl Default for HttpsComponent {
670 fn default() -> Self {
671 Self::new()
672 }
673}
674
675impl Component for HttpsComponent {
676 fn scheme(&self) -> &str {
677 "https"
678 }
679
680 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
681 let config = HttpConfig::from_uri(uri)?;
682 let server_config = HttpServerConfig::from_uri(uri)?;
683 let client = build_client(&config)?;
684 Ok(Box::new(HttpEndpoint {
685 uri: uri.to_string(),
686 config,
687 server_config,
688 client,
689 }))
690 }
691}
692
693fn build_client(config: &HttpConfig) -> Result<reqwest::Client, CamelError> {
694 let mut builder = reqwest::Client::builder().connect_timeout(config.connect_timeout);
695
696 if !config.follow_redirects {
697 builder = builder.redirect(reqwest::redirect::Policy::none());
698 }
699
700 builder.build().map_err(|e| {
701 CamelError::EndpointCreationFailed(format!("Failed to build HTTP client: {e}"))
702 })
703}
704
705struct HttpEndpoint {
710 uri: String,
711 config: HttpConfig,
712 server_config: HttpServerConfig,
713 client: reqwest::Client,
714}
715
716impl Endpoint for HttpEndpoint {
717 fn uri(&self) -> &str {
718 &self.uri
719 }
720
721 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
722 Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
723 }
724
725 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
726 Ok(BoxProcessor::new(HttpProducer {
727 config: Arc::new(self.config.clone()),
728 client: self.client.clone(),
729 }))
730 }
731}
732
733fn validate_url_for_ssrf(url: &str, config: &HttpConfig) -> Result<(), CamelError> {
738 let parsed = url::Url::parse(url)
739 .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
740
741 if let Some(host) = parsed.host_str()
743 && config.blocked_hosts.iter().any(|blocked| host == blocked)
744 {
745 return Err(CamelError::ProcessorError(format!(
746 "Host '{}' is blocked",
747 host
748 )));
749 }
750
751 if !config.allow_private_ips
753 && let Some(host) = parsed.host()
754 {
755 match host {
756 url::Host::Ipv4(ip) => {
757 if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
758 return Err(CamelError::ProcessorError(format!(
759 "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
760 ip
761 )));
762 }
763 }
764 url::Host::Ipv6(ip) => {
765 if ip.is_loopback() {
766 return Err(CamelError::ProcessorError(format!(
767 "Loopback IP '{}' not allowed",
768 ip
769 )));
770 }
771 }
772 url::Host::Domain(domain) => {
773 let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
775 if blocked_domains.contains(&domain) {
776 return Err(CamelError::ProcessorError(format!(
777 "Domain '{}' is not allowed",
778 domain
779 )));
780 }
781 }
782 }
783 }
784
785 Ok(())
786}
787
788#[derive(Clone)]
793struct HttpProducer {
794 config: Arc<HttpConfig>,
795 client: reqwest::Client,
796}
797
798impl HttpProducer {
799 fn resolve_method(exchange: &Exchange, config: &HttpConfig) -> String {
800 if let Some(ref method) = config.http_method {
801 return method.to_uppercase();
802 }
803 if let Some(method) = exchange
804 .input
805 .header("CamelHttpMethod")
806 .and_then(|v| v.as_str())
807 {
808 return method.to_uppercase();
809 }
810 if !exchange.input.body.is_empty() {
811 return "POST".to_string();
812 }
813 "GET".to_string()
814 }
815
816 fn resolve_url(exchange: &Exchange, config: &HttpConfig) -> String {
817 if let Some(uri) = exchange
818 .input
819 .header("CamelHttpUri")
820 .and_then(|v| v.as_str())
821 {
822 let mut url = uri.to_string();
823 if let Some(path) = exchange
824 .input
825 .header("CamelHttpPath")
826 .and_then(|v| v.as_str())
827 {
828 if !url.ends_with('/') && !path.starts_with('/') {
829 url.push('/');
830 }
831 url.push_str(path);
832 }
833 if let Some(query) = exchange
834 .input
835 .header("CamelHttpQuery")
836 .and_then(|v| v.as_str())
837 {
838 url.push('?');
839 url.push_str(query);
840 }
841 return url;
842 }
843
844 let mut url = config.base_url.clone();
845
846 if let Some(path) = exchange
847 .input
848 .header("CamelHttpPath")
849 .and_then(|v| v.as_str())
850 {
851 if !url.ends_with('/') && !path.starts_with('/') {
852 url.push('/');
853 }
854 url.push_str(path);
855 }
856
857 if let Some(query) = exchange
858 .input
859 .header("CamelHttpQuery")
860 .and_then(|v| v.as_str())
861 {
862 url.push('?');
863 url.push_str(query);
864 } else if !config.query_params.is_empty() {
865 url.push('?');
867 let query_string: String = config
868 .query_params
869 .iter()
870 .map(|(k, v)| format!("{k}={v}"))
871 .collect::<Vec<_>>()
872 .join("&");
873 url.push_str(&query_string);
874 }
875
876 url
877 }
878
879 fn body_to_bytes(body: &Body) -> Option<Vec<u8>> {
880 match body {
881 Body::Empty => None,
882 Body::Bytes(b) => Some(b.to_vec()),
883 Body::Text(s) => Some(s.as_bytes().to_vec()),
884 Body::Json(v) => Some(v.to_string().into_bytes()),
885 }
886 }
887
888 fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
889 status >= range.0 && status <= range.1
890 }
891}
892
893impl Service<Exchange> for HttpProducer {
894 type Response = Exchange;
895 type Error = CamelError;
896 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
897
898 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
899 Poll::Ready(Ok(()))
900 }
901
902 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
903 let config = self.config.clone();
904 let client = self.client.clone();
905
906 Box::pin(async move {
907 let method_str = HttpProducer::resolve_method(&exchange, &config);
908 let url = HttpProducer::resolve_url(&exchange, &config);
909
910 validate_url_for_ssrf(&url, &config)?;
912
913 debug!(
914 correlation_id = %exchange.correlation_id(),
915 method = %method_str,
916 url = %url,
917 "HTTP request"
918 );
919
920 let method = method_str.parse::<reqwest::Method>().map_err(|e| {
921 CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
922 })?;
923
924 let mut request = client.request(method, &url);
925
926 if let Some(timeout) = config.response_timeout {
927 request = request.timeout(timeout);
928 }
929
930 for (key, value) in &exchange.input.headers {
931 if !key.starts_with("Camel")
932 && let Some(val_str) = value.as_str()
933 && let (Ok(name), Ok(val)) = (
934 reqwest::header::HeaderName::from_bytes(key.as_bytes()),
935 reqwest::header::HeaderValue::from_str(val_str),
936 )
937 {
938 request = request.header(name, val);
939 }
940 }
941
942 if let Some(body_bytes) = HttpProducer::body_to_bytes(&exchange.input.body) {
943 request = request.body(body_bytes);
944 }
945
946 let response = request
947 .send()
948 .await
949 .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
950
951 let status_code = response.status().as_u16();
952 let status_text = response
953 .status()
954 .canonical_reason()
955 .unwrap_or("Unknown")
956 .to_string();
957
958 for (key, value) in response.headers() {
959 if let Ok(val_str) = value.to_str() {
960 exchange
961 .input
962 .set_header(key.as_str(), serde_json::Value::String(val_str.to_string()));
963 }
964 }
965
966 exchange.input.set_header(
967 "CamelHttpResponseCode",
968 serde_json::Value::Number(status_code.into()),
969 );
970 exchange.input.set_header(
971 "CamelHttpResponseText",
972 serde_json::Value::String(status_text.clone()),
973 );
974
975 let response_body = response.bytes().await.map_err(|e| {
976 CamelError::ProcessorError(format!("Failed to read response body: {e}"))
977 })?;
978
979 if config.throw_exception_on_failure
980 && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
981 {
982 return Err(CamelError::HttpOperationFailed {
983 status_code,
984 status_text,
985 response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
986 });
987 }
988
989 if !response_body.is_empty() {
990 exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
991 }
992
993 debug!(
994 correlation_id = %exchange.correlation_id(),
995 status = status_code,
996 url = %url,
997 "HTTP response"
998 );
999 Ok(exchange)
1000 })
1001 }
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006 use super::*;
1007 use camel_api::Message;
1008 use std::sync::Arc;
1009 use std::time::Duration;
1010 use tokio::sync::Mutex;
1011
1012 struct NullRouteController;
1014 #[async_trait::async_trait]
1015 impl camel_api::RouteController for NullRouteController {
1016 async fn start_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1017 Ok(())
1018 }
1019 async fn stop_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1020 Ok(())
1021 }
1022 async fn restart_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1023 Ok(())
1024 }
1025 async fn suspend_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1026 Ok(())
1027 }
1028 async fn resume_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1029 Ok(())
1030 }
1031 fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
1032 None
1033 }
1034 async fn start_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
1035 Ok(())
1036 }
1037 async fn stop_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
1038 Ok(())
1039 }
1040 }
1041
1042 fn test_producer_ctx() -> ProducerContext {
1043 ProducerContext::new(Arc::new(Mutex::new(NullRouteController)))
1044 }
1045
1046 #[test]
1047 fn test_http_config_defaults() {
1048 let config = HttpConfig::from_uri("http://localhost:8080/api").unwrap();
1049 assert_eq!(config.base_url, "http://localhost:8080/api");
1050 assert!(config.http_method.is_none());
1051 assert!(config.throw_exception_on_failure);
1052 assert_eq!(config.ok_status_code_range, (200, 299));
1053 assert!(!config.follow_redirects);
1054 assert_eq!(config.connect_timeout, Duration::from_millis(30000));
1055 assert!(config.response_timeout.is_none());
1056 }
1057
1058 #[test]
1059 fn test_http_config_with_options() {
1060 let config = HttpConfig::from_uri(
1061 "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1062 ).unwrap();
1063 assert_eq!(config.base_url, "https://api.example.com/v1");
1064 assert_eq!(config.http_method, Some("PUT".to_string()));
1065 assert!(!config.throw_exception_on_failure);
1066 assert!(config.follow_redirects);
1067 assert_eq!(config.connect_timeout, Duration::from_millis(5000));
1068 assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1069 }
1070
1071 #[test]
1072 fn test_http_config_ok_status_range() {
1073 let config =
1074 HttpConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1075 assert_eq!(config.ok_status_code_range, (200, 204));
1076 }
1077
1078 #[test]
1079 fn test_http_config_wrong_scheme() {
1080 let result = HttpConfig::from_uri("file:/tmp");
1081 assert!(result.is_err());
1082 }
1083
1084 #[test]
1085 fn test_http_component_scheme() {
1086 let component = HttpComponent::new();
1087 assert_eq!(component.scheme(), "http");
1088 }
1089
1090 #[test]
1091 fn test_https_component_scheme() {
1092 let component = HttpsComponent::new();
1093 assert_eq!(component.scheme(), "https");
1094 }
1095
1096 #[test]
1097 fn test_http_endpoint_creates_consumer() {
1098 let component = HttpComponent::new();
1099 let endpoint = component
1100 .create_endpoint("http://0.0.0.0:19100/test")
1101 .unwrap();
1102 assert!(endpoint.create_consumer().is_ok());
1103 }
1104
1105 #[test]
1106 fn test_https_endpoint_creates_consumer() {
1107 let component = HttpsComponent::new();
1108 let endpoint = component
1109 .create_endpoint("https://0.0.0.0:8443/test")
1110 .unwrap();
1111 assert!(endpoint.create_consumer().is_ok());
1112 }
1113
1114 #[test]
1115 fn test_http_endpoint_creates_producer() {
1116 let ctx = test_producer_ctx();
1117 let component = HttpComponent::new();
1118 let endpoint = component.create_endpoint("http://localhost/api").unwrap();
1119 assert!(endpoint.create_producer(&ctx).is_ok());
1120 }
1121
1122 async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
1127 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1128 let addr = listener.local_addr().unwrap();
1129 let url = format!("http://127.0.0.1:{}", addr.port());
1130
1131 let handle = tokio::spawn(async move {
1132 loop {
1133 if let Ok((mut stream, _)) = listener.accept().await {
1134 tokio::spawn(async move {
1135 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1136 let mut buf = vec![0u8; 4096];
1137 let n = stream.read(&mut buf).await.unwrap_or(0);
1138 let request = String::from_utf8_lossy(&buf[..n]).to_string();
1139
1140 let method = request.split_whitespace().next().unwrap_or("GET");
1141
1142 let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
1143 let response = format!(
1144 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
1145 body.len(),
1146 body
1147 );
1148 let _ = stream.write_all(response.as_bytes()).await;
1149 });
1150 }
1151 }
1152 });
1153
1154 (url, handle)
1155 }
1156
1157 async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
1158 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1159 let addr = listener.local_addr().unwrap();
1160 let url = format!("http://127.0.0.1:{}", addr.port());
1161
1162 let handle = tokio::spawn(async move {
1163 loop {
1164 if let Ok((mut stream, _)) = listener.accept().await {
1165 let status = status;
1166 tokio::spawn(async move {
1167 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1168 let mut buf = vec![0u8; 4096];
1169 let _ = stream.read(&mut buf).await;
1170
1171 let status_text = match status {
1172 404 => "Not Found",
1173 500 => "Internal Server Error",
1174 _ => "Error",
1175 };
1176 let body = "error body";
1177 let response = format!(
1178 "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
1179 status,
1180 status_text,
1181 body.len(),
1182 body
1183 );
1184 let _ = stream.write_all(response.as_bytes()).await;
1185 });
1186 }
1187 }
1188 });
1189
1190 (url, handle)
1191 }
1192
1193 #[tokio::test]
1194 async fn test_http_producer_get_request() {
1195 use tower::ServiceExt;
1196
1197 let (url, _handle) = start_test_server().await;
1198 let ctx = test_producer_ctx();
1199
1200 let component = HttpComponent::new();
1201 let endpoint = component
1202 .create_endpoint(&format!("{url}/api/test?allowPrivateIps=true"))
1203 .unwrap();
1204 let producer = endpoint.create_producer(&ctx).unwrap();
1205
1206 let exchange = Exchange::new(Message::default());
1207 let result = producer.oneshot(exchange).await.unwrap();
1208
1209 let status = result
1210 .input
1211 .header("CamelHttpResponseCode")
1212 .and_then(|v| v.as_u64())
1213 .unwrap();
1214 assert_eq!(status, 200);
1215
1216 assert!(!result.input.body.is_empty());
1217 }
1218
1219 #[tokio::test]
1220 async fn test_http_producer_post_with_body() {
1221 use tower::ServiceExt;
1222
1223 let (url, _handle) = start_test_server().await;
1224 let ctx = test_producer_ctx();
1225
1226 let component = HttpComponent::new();
1227 let endpoint = component
1228 .create_endpoint(&format!("{url}/api/data?allowPrivateIps=true"))
1229 .unwrap();
1230 let producer = endpoint.create_producer(&ctx).unwrap();
1231
1232 let exchange = Exchange::new(Message::new("request body"));
1233 let result = producer.oneshot(exchange).await.unwrap();
1234
1235 let status = result
1236 .input
1237 .header("CamelHttpResponseCode")
1238 .and_then(|v| v.as_u64())
1239 .unwrap();
1240 assert_eq!(status, 200);
1241 }
1242
1243 #[tokio::test]
1244 async fn test_http_producer_method_from_header() {
1245 use tower::ServiceExt;
1246
1247 let (url, _handle) = start_test_server().await;
1248 let ctx = test_producer_ctx();
1249
1250 let component = HttpComponent::new();
1251 let endpoint = component
1252 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1253 .unwrap();
1254 let producer = endpoint.create_producer(&ctx).unwrap();
1255
1256 let mut exchange = Exchange::new(Message::default());
1257 exchange.input.set_header(
1258 "CamelHttpMethod",
1259 serde_json::Value::String("DELETE".to_string()),
1260 );
1261
1262 let result = producer.oneshot(exchange).await.unwrap();
1263 let status = result
1264 .input
1265 .header("CamelHttpResponseCode")
1266 .and_then(|v| v.as_u64())
1267 .unwrap();
1268 assert_eq!(status, 200);
1269 }
1270
1271 #[tokio::test]
1272 async fn test_http_producer_forced_method() {
1273 use tower::ServiceExt;
1274
1275 let (url, _handle) = start_test_server().await;
1276 let ctx = test_producer_ctx();
1277
1278 let component = HttpComponent::new();
1279 let endpoint = component
1280 .create_endpoint(&format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"))
1281 .unwrap();
1282 let producer = endpoint.create_producer(&ctx).unwrap();
1283
1284 let exchange = Exchange::new(Message::default());
1285 let result = producer.oneshot(exchange).await.unwrap();
1286
1287 let status = result
1288 .input
1289 .header("CamelHttpResponseCode")
1290 .and_then(|v| v.as_u64())
1291 .unwrap();
1292 assert_eq!(status, 200);
1293 }
1294
1295 #[tokio::test]
1296 async fn test_http_producer_throw_exception_on_failure() {
1297 use tower::ServiceExt;
1298
1299 let (url, _handle) = start_status_server(404).await;
1300 let ctx = test_producer_ctx();
1301
1302 let component = HttpComponent::new();
1303 let endpoint = component
1304 .create_endpoint(&format!("{url}/not-found?allowPrivateIps=true"))
1305 .unwrap();
1306 let producer = endpoint.create_producer(&ctx).unwrap();
1307
1308 let exchange = Exchange::new(Message::default());
1309 let result = producer.oneshot(exchange).await;
1310 assert!(result.is_err());
1311
1312 match result.unwrap_err() {
1313 CamelError::HttpOperationFailed { status_code, .. } => {
1314 assert_eq!(status_code, 404);
1315 }
1316 e => panic!("Expected HttpOperationFailed, got: {e}"),
1317 }
1318 }
1319
1320 #[tokio::test]
1321 async fn test_http_producer_no_throw_on_failure() {
1322 use tower::ServiceExt;
1323
1324 let (url, _handle) = start_status_server(500).await;
1325 let ctx = test_producer_ctx();
1326
1327 let component = HttpComponent::new();
1328 let endpoint = component
1329 .create_endpoint(&format!(
1330 "{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"
1331 ))
1332 .unwrap();
1333 let producer = endpoint.create_producer(&ctx).unwrap();
1334
1335 let exchange = Exchange::new(Message::default());
1336 let result = producer.oneshot(exchange).await.unwrap();
1337
1338 let status = result
1339 .input
1340 .header("CamelHttpResponseCode")
1341 .and_then(|v| v.as_u64())
1342 .unwrap();
1343 assert_eq!(status, 500);
1344 }
1345
1346 #[tokio::test]
1347 async fn test_http_producer_uri_override() {
1348 use tower::ServiceExt;
1349
1350 let (url, _handle) = start_test_server().await;
1351 let ctx = test_producer_ctx();
1352
1353 let component = HttpComponent::new();
1354 let endpoint = component
1355 .create_endpoint("http://localhost:1/does-not-exist?allowPrivateIps=true")
1356 .unwrap();
1357 let producer = endpoint.create_producer(&ctx).unwrap();
1358
1359 let mut exchange = Exchange::new(Message::default());
1360 exchange.input.set_header(
1361 "CamelHttpUri",
1362 serde_json::Value::String(format!("{url}/api")),
1363 );
1364
1365 let result = producer.oneshot(exchange).await.unwrap();
1366 let status = result
1367 .input
1368 .header("CamelHttpResponseCode")
1369 .and_then(|v| v.as_u64())
1370 .unwrap();
1371 assert_eq!(status, 200);
1372 }
1373
1374 #[tokio::test]
1375 async fn test_http_producer_response_headers_mapped() {
1376 use tower::ServiceExt;
1377
1378 let (url, _handle) = start_test_server().await;
1379 let ctx = test_producer_ctx();
1380
1381 let component = HttpComponent::new();
1382 let endpoint = component
1383 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1384 .unwrap();
1385 let producer = endpoint.create_producer(&ctx).unwrap();
1386
1387 let exchange = Exchange::new(Message::default());
1388 let result = producer.oneshot(exchange).await.unwrap();
1389
1390 assert!(
1391 result.input.header("content-type").is_some()
1392 || result.input.header("Content-Type").is_some()
1393 );
1394 assert!(result.input.header("CamelHttpResponseText").is_some());
1395 }
1396
1397 async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
1402 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1403 let addr = listener.local_addr().unwrap();
1404 let url = format!("http://127.0.0.1:{}", addr.port());
1405
1406 let handle = tokio::spawn(async move {
1407 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1408 loop {
1409 if let Ok((mut stream, _)) = listener.accept().await {
1410 tokio::spawn(async move {
1411 let mut buf = vec![0u8; 4096];
1412 let n = stream.read(&mut buf).await.unwrap_or(0);
1413 let request = String::from_utf8_lossy(&buf[..n]).to_string();
1414
1415 if request.contains("GET /final") {
1417 let body = r#"{"status":"final"}"#;
1418 let response = format!(
1419 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1420 body.len(),
1421 body
1422 );
1423 let _ = stream.write_all(response.as_bytes()).await;
1424 } else {
1425 let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
1427 let _ = stream.write_all(response.as_bytes()).await;
1428 }
1429 });
1430 }
1431 }
1432 });
1433
1434 (url, handle)
1435 }
1436
1437 #[tokio::test]
1438 async fn test_follow_redirects_false_does_not_follow() {
1439 use tower::ServiceExt;
1440
1441 let (url, _handle) = start_redirect_server().await;
1442 let ctx = test_producer_ctx();
1443
1444 let component = HttpComponent::new();
1445 let endpoint = component
1446 .create_endpoint(&format!(
1447 "{url}?followRedirects=false&throwExceptionOnFailure=false&allowPrivateIps=true"
1448 ))
1449 .unwrap();
1450 let producer = endpoint.create_producer(&ctx).unwrap();
1451
1452 let exchange = Exchange::new(Message::default());
1453 let result = producer.oneshot(exchange).await.unwrap();
1454
1455 let status = result
1457 .input
1458 .header("CamelHttpResponseCode")
1459 .and_then(|v| v.as_u64())
1460 .unwrap();
1461 assert_eq!(
1462 status, 302,
1463 "Should NOT follow redirect when followRedirects=false"
1464 );
1465 }
1466
1467 #[tokio::test]
1468 async fn test_follow_redirects_true_follows_redirect() {
1469 use tower::ServiceExt;
1470
1471 let (url, _handle) = start_redirect_server().await;
1472 let ctx = test_producer_ctx();
1473
1474 let component = HttpComponent::new();
1475 let endpoint = component
1476 .create_endpoint(&format!("{url}?followRedirects=true&allowPrivateIps=true"))
1477 .unwrap();
1478 let producer = endpoint.create_producer(&ctx).unwrap();
1479
1480 let exchange = Exchange::new(Message::default());
1481 let result = producer.oneshot(exchange).await.unwrap();
1482
1483 let status = result
1485 .input
1486 .header("CamelHttpResponseCode")
1487 .and_then(|v| v.as_u64())
1488 .unwrap();
1489 assert_eq!(
1490 status, 200,
1491 "Should follow redirect when followRedirects=true"
1492 );
1493 }
1494
1495 #[tokio::test]
1496 async fn test_query_params_forwarded_to_http_request() {
1497 use tower::ServiceExt;
1498
1499 let (url, _handle) = start_test_server().await;
1500 let ctx = test_producer_ctx();
1501
1502 let component = HttpComponent::new();
1503 let endpoint = component
1505 .create_endpoint(&format!(
1506 "{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"
1507 ))
1508 .unwrap();
1509 let producer = endpoint.create_producer(&ctx).unwrap();
1510
1511 let exchange = Exchange::new(Message::default());
1512 let result = producer.oneshot(exchange).await.unwrap();
1513
1514 let status = result
1517 .input
1518 .header("CamelHttpResponseCode")
1519 .and_then(|v| v.as_u64())
1520 .unwrap();
1521 assert_eq!(status, 200);
1522 }
1523
1524 #[tokio::test]
1525 async fn test_non_camel_query_params_are_forwarded() {
1526 let config = HttpConfig::from_uri(
1529 "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
1530 )
1531 .unwrap();
1532
1533 assert!(
1535 config.query_params.contains_key("apiKey"),
1536 "apiKey should be preserved"
1537 );
1538 assert!(
1539 config.query_params.contains_key("token"),
1540 "token should be preserved"
1541 );
1542 assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
1543 assert_eq!(config.query_params.get("token").unwrap(), "abc456");
1544
1545 assert!(
1547 !config.query_params.contains_key("httpMethod"),
1548 "httpMethod should not be forwarded"
1549 );
1550 }
1551
1552 #[tokio::test]
1557 async fn test_http_producer_blocks_metadata_endpoint() {
1558 use tower::ServiceExt;
1559
1560 let ctx = test_producer_ctx();
1561 let component = HttpComponent::new();
1562 let endpoint = component
1563 .create_endpoint("http://example.com/api?allowPrivateIps=false")
1564 .unwrap();
1565 let producer = endpoint.create_producer(&ctx).unwrap();
1566
1567 let mut exchange = Exchange::new(Message::default());
1568 exchange.input.set_header(
1569 "CamelHttpUri",
1570 serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
1571 );
1572
1573 let result = producer.oneshot(exchange).await;
1574 assert!(result.is_err(), "Should block AWS metadata endpoint");
1575
1576 let err = result.unwrap_err();
1577 assert!(
1578 err.to_string().contains("Private IP"),
1579 "Error should mention private IP blocking, got: {}",
1580 err
1581 );
1582 }
1583
1584 #[test]
1585 fn test_ssrf_config_defaults() {
1586 let config = HttpConfig::from_uri("http://example.com/api").unwrap();
1587 assert!(
1588 !config.allow_private_ips,
1589 "Private IPs should be blocked by default"
1590 );
1591 assert!(
1592 config.blocked_hosts.is_empty(),
1593 "Blocked hosts should be empty by default"
1594 );
1595 }
1596
1597 #[test]
1598 fn test_ssrf_config_allow_private_ips() {
1599 let config = HttpConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
1600 assert!(
1601 config.allow_private_ips,
1602 "Private IPs should be allowed when explicitly set"
1603 );
1604 }
1605
1606 #[test]
1607 fn test_ssrf_config_blocked_hosts() {
1608 let config =
1609 HttpConfig::from_uri("http://example.com/api?blockedHosts=evil.com,malware.net")
1610 .unwrap();
1611 assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
1612 }
1613
1614 #[tokio::test]
1615 async fn test_http_producer_blocks_localhost() {
1616 use tower::ServiceExt;
1617
1618 let ctx = test_producer_ctx();
1619 let component = HttpComponent::new();
1620 let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1621 let producer = endpoint.create_producer(&ctx).unwrap();
1622
1623 let mut exchange = Exchange::new(Message::default());
1624 exchange.input.set_header(
1625 "CamelHttpUri",
1626 serde_json::Value::String("http://localhost:8080/internal".to_string()),
1627 );
1628
1629 let result = producer.oneshot(exchange).await;
1630 assert!(result.is_err(), "Should block localhost");
1631 }
1632
1633 #[tokio::test]
1634 async fn test_http_producer_blocks_loopback_ip() {
1635 use tower::ServiceExt;
1636
1637 let ctx = test_producer_ctx();
1638 let component = HttpComponent::new();
1639 let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1640 let producer = endpoint.create_producer(&ctx).unwrap();
1641
1642 let mut exchange = Exchange::new(Message::default());
1643 exchange.input.set_header(
1644 "CamelHttpUri",
1645 serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
1646 );
1647
1648 let result = producer.oneshot(exchange).await;
1649 assert!(result.is_err(), "Should block loopback IP");
1650 }
1651
1652 #[tokio::test]
1653 async fn test_http_producer_allows_private_ip_when_enabled() {
1654 use tower::ServiceExt;
1655
1656 let ctx = test_producer_ctx();
1657 let component = HttpComponent::new();
1658 let endpoint = component
1661 .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true")
1662 .unwrap();
1663 let producer = endpoint.create_producer(&ctx).unwrap();
1664
1665 let exchange = Exchange::new(Message::default());
1666
1667 let result = producer.oneshot(exchange).await;
1670 if let Err(ref e) = result {
1672 let err_str = e.to_string();
1673 assert!(
1674 !err_str.contains("Private IP") && !err_str.contains("not allowed"),
1675 "Should not be SSRF error, got: {}",
1676 err_str
1677 );
1678 }
1679 }
1680
1681 #[test]
1686 fn test_http_server_config_parse() {
1687 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
1688 assert_eq!(cfg.host, "0.0.0.0");
1689 assert_eq!(cfg.port, 8080);
1690 assert_eq!(cfg.path, "/orders");
1691 }
1692
1693 #[test]
1694 fn test_http_server_config_default_path() {
1695 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
1696 assert_eq!(cfg.path, "/");
1697 }
1698
1699 #[test]
1700 fn test_http_server_config_wrong_scheme() {
1701 assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
1702 }
1703
1704 #[test]
1705 fn test_http_server_config_invalid_port() {
1706 assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
1707 }
1708
1709 #[test]
1710 fn test_request_envelope_and_reply_are_send() {
1711 fn assert_send<T: Send>() {}
1712 assert_send::<RequestEnvelope>();
1713 assert_send::<HttpReply>();
1714 }
1715
1716 #[test]
1721 fn test_server_registry_global_is_singleton() {
1722 let r1 = ServerRegistry::global();
1723 let r2 = ServerRegistry::global();
1724 assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
1725 }
1726
1727 #[tokio::test]
1732 async fn test_dispatch_handler_returns_404_for_unknown_path() {
1733 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
1734 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1736 let port = listener.local_addr().unwrap().port();
1737 tokio::spawn(run_axum_server(listener, dispatch));
1738
1739 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1741
1742 let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
1743 .await
1744 .unwrap();
1745 assert_eq!(resp.status().as_u16(), 404);
1746 }
1747
1748 #[tokio::test]
1753 async fn test_http_consumer_start_registers_path() {
1754 use camel_component::ConsumerContext;
1755
1756 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1758 let port = listener.local_addr().unwrap().port();
1759 drop(listener); let consumer_cfg = HttpServerConfig {
1762 host: "127.0.0.1".to_string(),
1763 port,
1764 path: "/ping".to_string(),
1765 };
1766 let mut consumer = HttpConsumer::new(consumer_cfg);
1767
1768 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component::ExchangeEnvelope>(16);
1769 let token = tokio_util::sync::CancellationToken::new();
1770 let ctx = ConsumerContext::new(tx, token.clone());
1771
1772 tokio::spawn(async move {
1773 consumer.start(ctx).await.unwrap();
1774 });
1775
1776 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1777
1778 let client = reqwest::Client::new();
1779 let resp_future = client
1780 .post(format!("http://127.0.0.1:{port}/ping"))
1781 .body("hello world")
1782 .send();
1783
1784 let (http_result, _) = tokio::join!(resp_future, async {
1785 if let Some(mut envelope) = rx.recv().await {
1786 envelope.exchange.input.set_header(
1788 "CamelHttpResponseCode",
1789 serde_json::Value::Number(201.into()),
1790 );
1791 if let Some(reply_tx) = envelope.reply_tx {
1792 let _ = reply_tx.send(Ok(envelope.exchange));
1793 }
1794 }
1795 });
1796
1797 let resp = http_result.unwrap();
1798 assert_eq!(resp.status().as_u16(), 201);
1799
1800 token.cancel();
1801 }
1802
1803 #[tokio::test]
1808 async fn test_integration_single_consumer_round_trip() {
1809 use camel_component::{ConsumerContext, ExchangeEnvelope};
1810
1811 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1813 let port = listener.local_addr().unwrap().port();
1814 drop(listener); let component = HttpComponent::new();
1817 let endpoint = component
1818 .create_endpoint(&format!("http://127.0.0.1:{port}/echo"))
1819 .unwrap();
1820 let mut consumer = endpoint.create_consumer().unwrap();
1821
1822 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
1823 let token = tokio_util::sync::CancellationToken::new();
1824 let ctx = ConsumerContext::new(tx, token.clone());
1825
1826 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
1827 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1828
1829 let client = reqwest::Client::new();
1830 let send_fut = client
1831 .post(format!("http://127.0.0.1:{port}/echo"))
1832 .header("Content-Type", "text/plain")
1833 .body("ping")
1834 .send();
1835
1836 let (http_result, _) = tokio::join!(send_fut, async {
1837 if let Some(mut envelope) = rx.recv().await {
1838 assert_eq!(
1839 envelope.exchange.input.header("CamelHttpMethod"),
1840 Some(&serde_json::Value::String("POST".into()))
1841 );
1842 assert_eq!(
1843 envelope.exchange.input.header("CamelHttpPath"),
1844 Some(&serde_json::Value::String("/echo".into()))
1845 );
1846 envelope.exchange.input.body = camel_api::body::Body::Text("pong".to_string());
1847 if let Some(reply_tx) = envelope.reply_tx {
1848 let _ = reply_tx.send(Ok(envelope.exchange));
1849 }
1850 }
1851 });
1852
1853 let resp = http_result.unwrap();
1854 assert_eq!(resp.status().as_u16(), 200);
1855 let body = resp.text().await.unwrap();
1856 assert_eq!(body, "pong");
1857
1858 token.cancel();
1859 }
1860
1861 #[tokio::test]
1862 async fn test_integration_two_consumers_shared_port() {
1863 use camel_component::{ConsumerContext, ExchangeEnvelope};
1864
1865 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1867 let port = listener.local_addr().unwrap().port();
1868 drop(listener);
1869
1870 let component = HttpComponent::new();
1871
1872 let endpoint_a = component
1874 .create_endpoint(&format!("http://127.0.0.1:{port}/hello"))
1875 .unwrap();
1876 let mut consumer_a = endpoint_a.create_consumer().unwrap();
1877
1878 let endpoint_b = component
1880 .create_endpoint(&format!("http://127.0.0.1:{port}/world"))
1881 .unwrap();
1882 let mut consumer_b = endpoint_b.create_consumer().unwrap();
1883
1884 let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
1885 let token_a = tokio_util::sync::CancellationToken::new();
1886 let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
1887
1888 let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
1889 let token_b = tokio_util::sync::CancellationToken::new();
1890 let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
1891
1892 tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
1893 tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
1894 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1895
1896 let client = reqwest::Client::new();
1897
1898 let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
1900 let (resp_hello, _) = tokio::join!(fut_hello, async {
1901 if let Some(mut envelope) = rx_a.recv().await {
1902 envelope.exchange.input.body =
1903 camel_api::body::Body::Text("hello-response".to_string());
1904 if let Some(reply_tx) = envelope.reply_tx {
1905 let _ = reply_tx.send(Ok(envelope.exchange));
1906 }
1907 }
1908 });
1909
1910 let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
1912 let (resp_world, _) = tokio::join!(fut_world, async {
1913 if let Some(mut envelope) = rx_b.recv().await {
1914 envelope.exchange.input.body =
1915 camel_api::body::Body::Text("world-response".to_string());
1916 if let Some(reply_tx) = envelope.reply_tx {
1917 let _ = reply_tx.send(Ok(envelope.exchange));
1918 }
1919 }
1920 });
1921
1922 let body_a = resp_hello.unwrap().text().await.unwrap();
1923 let body_b = resp_world.unwrap().text().await.unwrap();
1924
1925 assert_eq!(body_a, "hello-response");
1926 assert_eq!(body_b, "world-response");
1927
1928 token_a.cancel();
1929 token_b.cancel();
1930 }
1931
1932 #[tokio::test]
1933 async fn test_integration_unregistered_path_returns_404() {
1934 use camel_component::{ConsumerContext, ExchangeEnvelope};
1935
1936 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1938 let port = listener.local_addr().unwrap().port();
1939 drop(listener);
1940
1941 let component = HttpComponent::new();
1942 let endpoint = component
1943 .create_endpoint(&format!("http://127.0.0.1:{port}/registered"))
1944 .unwrap();
1945 let mut consumer = endpoint.create_consumer().unwrap();
1946
1947 let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
1948 let token = tokio_util::sync::CancellationToken::new();
1949 let ctx = ConsumerContext::new(tx, token.clone());
1950
1951 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
1952 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1953
1954 let client = reqwest::Client::new();
1955 let resp = client
1956 .get(format!("http://127.0.0.1:{port}/not-there"))
1957 .send()
1958 .await
1959 .unwrap();
1960 assert_eq!(resp.status().as_u16(), 404);
1961
1962 token.cancel();
1963 }
1964
1965 #[test]
1966 fn test_http_consumer_declares_concurrent() {
1967 use camel_component::ConcurrencyModel;
1968
1969 let config = HttpServerConfig {
1970 host: "127.0.0.1".to_string(),
1971 port: 19999,
1972 path: "/test".to_string(),
1973 };
1974 let consumer = HttpConsumer::new(config);
1975 assert_eq!(
1976 consumer.concurrency_model(),
1977 ConcurrencyModel::Concurrent { max: None }
1978 );
1979 }
1980}