1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex, OnceLock};
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use tokio::sync::RwLock;
9use tower::Service;
10use tracing::debug;
11
12use camel_api::{BoxProcessor, CamelError, Exchange, body::Body};
13use camel_component::{Component, Consumer, Endpoint, ProducerContext};
14use camel_endpoint::parse_uri;
15
16#[derive(Debug, Clone)]
77pub struct HttpConfig {
78 pub base_url: String,
79 pub http_method: Option<String>,
80 pub throw_exception_on_failure: bool,
81 pub ok_status_code_range: (u16, u16),
82 pub follow_redirects: bool,
83 pub connect_timeout: Duration,
84 pub response_timeout: Option<Duration>,
85 pub query_params: HashMap<String, String>,
86 pub allow_private_ips: bool,
88 pub blocked_hosts: Vec<String>,
89 pub max_body_size: usize,
91}
92
93impl HttpConfig {
94 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
95 let parts = parse_uri(uri)?;
96 if parts.scheme != "http" && parts.scheme != "https" {
97 return Err(CamelError::InvalidUri(format!(
98 "expected scheme 'http' or 'https', got '{}'",
99 parts.scheme
100 )));
101 }
102
103 let base_url = format!("{}:{}", parts.scheme, parts.path);
104
105 let http_method = parts.params.get("httpMethod").cloned();
106
107 let throw_exception_on_failure = parts
108 .params
109 .get("throwExceptionOnFailure")
110 .map(|v| v != "false")
111 .unwrap_or(true);
112
113 let ok_status_code_range = parts
114 .params
115 .get("okStatusCodeRange")
116 .and_then(|v| {
117 let (start, end) = v.split_once('-')?;
118 Some((start.parse::<u16>().ok()?, end.parse::<u16>().ok()?))
119 })
120 .unwrap_or((200, 299));
121
122 let follow_redirects = parts
123 .params
124 .get("followRedirects")
125 .map(|v| v == "true")
126 .unwrap_or(false);
127
128 let connect_timeout = parts
129 .params
130 .get("connectTimeout")
131 .and_then(|v| v.parse::<u64>().ok())
132 .map(Duration::from_millis)
133 .unwrap_or(Duration::from_millis(30000));
134
135 let response_timeout = parts
136 .params
137 .get("responseTimeout")
138 .and_then(|v| v.parse::<u64>().ok())
139 .map(Duration::from_millis);
140
141 let allow_private_ips = parts
143 .params
144 .get("allowPrivateIps")
145 .map(|v| v == "true")
146 .unwrap_or(false); let blocked_hosts = parts
149 .params
150 .get("blockedHosts")
151 .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
152 .unwrap_or_default();
153
154 let max_body_size = parts
155 .params
156 .get("maxBodySize")
157 .and_then(|v| v.parse::<usize>().ok())
158 .unwrap_or(10 * 1024 * 1024); let camel_options = [
162 "httpMethod",
163 "throwExceptionOnFailure",
164 "okStatusCodeRange",
165 "followRedirects",
166 "connectTimeout",
167 "responseTimeout",
168 "allowPrivateIps",
169 "blockedHosts",
170 "maxBodySize",
171 ];
172
173 let query_params: HashMap<String, String> = parts
174 .params
175 .into_iter()
176 .filter(|(k, _)| !camel_options.contains(&k.as_str()))
177 .map(|(k, v)| (k.clone(), v.clone()))
178 .collect();
179
180 Ok(Self {
181 base_url,
182 http_method,
183 throw_exception_on_failure,
184 ok_status_code_range: (ok_status_code_range.0, ok_status_code_range.1),
185 follow_redirects,
186 connect_timeout,
187 response_timeout,
188 query_params,
189 allow_private_ips,
190 blocked_hosts,
191 max_body_size,
192 })
193 }
194}
195
196#[derive(Debug, Clone)]
202pub struct HttpServerConfig {
203 pub host: String,
205 pub port: u16,
207 pub path: String,
209 pub max_request_body: usize,
211 pub max_response_body: usize,
213}
214
215impl HttpServerConfig {
216 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
217 let parts = parse_uri(uri)?;
218 if parts.scheme != "http" && parts.scheme != "https" {
219 return Err(CamelError::InvalidUri(format!(
220 "expected scheme 'http' or 'https', got '{}'",
221 parts.scheme
222 )));
223 }
224
225 let authority_and_path = parts.path.trim_start_matches('/');
228
229 let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
231 (&authority_and_path[..idx], &authority_and_path[idx..])
232 } else {
233 (authority_and_path, "/")
234 };
235
236 let path = if path_suffix.is_empty() {
237 "/"
238 } else {
239 path_suffix
240 }
241 .to_string();
242
243 let (host, port) = if let Some(colon) = authority.rfind(':') {
245 let port_str = &authority[colon + 1..];
246 match port_str.parse::<u16>() {
247 Ok(p) => (authority[..colon].to_string(), p),
248 Err(_) => {
249 return Err(CamelError::InvalidUri(format!(
250 "invalid port '{}' in URI '{}'",
251 port_str, uri
252 )));
253 }
254 }
255 } else {
256 (authority.to_string(), 80)
257 };
258
259 let max_request_body = parts
260 .params
261 .get("maxRequestBody")
262 .and_then(|v| v.parse::<usize>().ok())
263 .unwrap_or(2 * 1024 * 1024); let max_response_body = parts
266 .params
267 .get("maxResponseBody")
268 .and_then(|v| v.parse::<usize>().ok())
269 .unwrap_or(10 * 1024 * 1024); Ok(Self {
272 host,
273 port,
274 path,
275 max_request_body,
276 max_response_body,
277 })
278 }
279}
280
281pub struct RequestEnvelope {
288 pub method: String,
289 pub path: String,
290 pub query: String,
291 pub headers: http::HeaderMap,
292 pub body: bytes::Bytes,
293 pub reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
294}
295
296#[derive(Debug, Clone)]
298pub struct HttpReply {
299 pub status: u16,
300 pub headers: Vec<(String, String)>,
301 pub body: bytes::Bytes,
302}
303
304pub type DispatchTable = Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
310
311struct ServerHandle {
313 dispatch: DispatchTable,
314 _task: tokio::task::JoinHandle<()>,
316}
317
318pub struct ServerRegistry {
320 inner: Mutex<HashMap<u16, ServerHandle>>,
321}
322
323impl ServerRegistry {
324 pub fn global() -> &'static Self {
326 static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
327 INSTANCE.get_or_init(|| ServerRegistry {
328 inner: Mutex::new(HashMap::new()),
329 })
330 }
331
332 pub async fn get_or_spawn(
335 &'static self,
336 host: &str,
337 port: u16,
338 max_request_body: usize,
339 ) -> Result<DispatchTable, CamelError> {
340 {
342 let guard = self.inner.lock().map_err(|_| {
343 CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
344 })?;
345 if let Some(handle) = guard.get(&port) {
346 return Ok(Arc::clone(&handle.dispatch));
347 }
348 }
349
350 let addr = format!("{}:{}", host, port);
352 let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
353 CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
354 })?;
355
356 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
357 let dispatch_for_server = Arc::clone(&dispatch);
358 let task = tokio::spawn(run_axum_server(
359 listener,
360 dispatch_for_server,
361 max_request_body,
362 ));
363
364 let mut guard = self.inner.lock().map_err(|_| {
366 CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
367 })?;
368 if let Some(existing) = guard.get(&port) {
371 task.abort();
372 return Ok(Arc::clone(&existing.dispatch));
373 }
374 guard.insert(
375 port,
376 ServerHandle {
377 dispatch: Arc::clone(&dispatch),
378 _task: task,
379 },
380 );
381 Ok(dispatch)
382 }
383}
384
385use axum::{
390 Router,
391 body::Body as AxumBody,
392 extract::{Request, State},
393 http::{Response, StatusCode},
394 response::IntoResponse,
395};
396
397#[derive(Clone)]
398struct AppState {
399 dispatch: DispatchTable,
400 max_request_body: usize,
401}
402
403async fn run_axum_server(
404 listener: tokio::net::TcpListener,
405 dispatch: DispatchTable,
406 max_request_body: usize,
407) {
408 let state = AppState {
409 dispatch,
410 max_request_body,
411 };
412 let app = Router::new().fallback(dispatch_handler).with_state(state);
413
414 axum::serve(listener, app).await.unwrap_or_else(|e| {
415 tracing::error!(error = %e, "Axum server error");
416 });
417}
418
419async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
420 let method = req.method().to_string();
421 let path = req.uri().path().to_string();
422 let query = req.uri().query().unwrap_or("").to_string();
423 let headers = req.headers().clone();
424
425 let body_bytes = match axum::body::to_bytes(req.into_body(), state.max_request_body).await {
426 Ok(b) => b,
427 Err(_) => {
428 return Response::builder()
429 .status(StatusCode::BAD_REQUEST)
430 .body(AxumBody::empty())
431 .expect(
432 "Response::builder() with a known-valid status code and body is infallible",
433 );
434 }
435 };
436
437 let sender = {
439 let table = state.dispatch.read().await;
440 table.get(&path).cloned()
441 };
442
443 let Some(sender) = sender else {
444 return Response::builder()
445 .status(StatusCode::NOT_FOUND)
446 .body(AxumBody::from("No consumer registered for this path"))
447 .expect("Response::builder() with a known-valid status code and body is infallible");
448 };
449
450 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
451 let envelope = RequestEnvelope {
452 method,
453 path,
454 query,
455 headers,
456 body: body_bytes,
457 reply_tx,
458 };
459
460 if sender.send(envelope).await.is_err() {
461 return Response::builder()
462 .status(StatusCode::SERVICE_UNAVAILABLE)
463 .body(AxumBody::from("Consumer unavailable"))
464 .expect("Response::builder() with a known-valid status code and body is infallible");
465 }
466
467 match reply_rx.await {
468 Ok(reply) => {
469 let status =
470 StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
471 let mut builder = Response::builder().status(status);
472 for (k, v) in &reply.headers {
473 builder = builder.header(k.as_str(), v.as_str());
474 }
475 builder
476 .body(AxumBody::from(reply.body))
477 .unwrap_or_else(|_| {
478 Response::builder()
479 .status(StatusCode::INTERNAL_SERVER_ERROR)
480 .body(AxumBody::from("Invalid response headers from consumer"))
481 .expect("Response::builder() with a known-valid status code and body is infallible")
482 })
483 }
484 Err(_) => Response::builder()
485 .status(StatusCode::INTERNAL_SERVER_ERROR)
486 .body(AxumBody::from("Pipeline error"))
487 .expect("Response::builder() with a known-valid status code and body is infallible"),
488 }
489}
490
491pub struct HttpConsumer {
496 config: HttpServerConfig,
497}
498
499impl HttpConsumer {
500 pub fn new(config: HttpServerConfig) -> Self {
501 Self { config }
502 }
503}
504
505#[async_trait::async_trait]
506impl Consumer for HttpConsumer {
507 async fn start(&mut self, ctx: camel_component::ConsumerContext) -> Result<(), CamelError> {
508 use camel_api::{Exchange, Message, body::Body};
509
510 let dispatch = ServerRegistry::global()
511 .get_or_spawn(
512 &self.config.host,
513 self.config.port,
514 self.config.max_request_body,
515 )
516 .await?;
517
518 let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
520 {
521 let mut table = dispatch.write().await;
522 table.insert(self.config.path.clone(), env_tx);
523 }
524
525 let path = self.config.path.clone();
526 let cancel_token = ctx.cancel_token();
527 let max_response_body = self.config.max_response_body;
528
529 loop {
530 tokio::select! {
531 _ = ctx.cancelled() => {
532 break;
533 }
534 envelope = env_rx.recv() => {
535 let Some(envelope) = envelope else { break; };
536
537 let mut msg = Message::default();
539
540 msg.set_header("CamelHttpMethod",
542 serde_json::Value::String(envelope.method.clone()));
543 msg.set_header("CamelHttpPath",
544 serde_json::Value::String(envelope.path.clone()));
545 msg.set_header("CamelHttpQuery",
546 serde_json::Value::String(envelope.query.clone()));
547
548 for (k, v) in &envelope.headers {
550 if let Ok(val_str) = v.to_str() {
551 msg.set_header(
552 k.as_str(),
553 serde_json::Value::String(val_str.to_string()),
554 );
555 }
556 }
557
558 if !envelope.body.is_empty() {
560 match std::str::from_utf8(&envelope.body) {
561 Ok(text) => msg.body = Body::Text(text.to_string()),
562 Err(_) => msg.body = Body::Bytes(envelope.body.clone()),
563 }
564 }
565
566 #[allow(unused_mut)]
567 let mut exchange = Exchange::new(msg);
568
569 #[cfg(feature = "otel")]
571 {
572 let headers: HashMap<String, String> = envelope
573 .headers
574 .iter()
575 .filter_map(|(k, v)| {
576 Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
577 })
578 .collect();
579 camel_otel::extract_into_exchange(&mut exchange, &headers);
580 }
581
582 let reply_tx = envelope.reply_tx;
583 let sender = ctx.sender().clone();
584 let path_clone = path.clone();
585 let cancel = cancel_token.clone();
586
587 tokio::spawn(async move {
607 if cancel.is_cancelled() {
615 let _ = reply_tx.send(HttpReply {
616 status: 503,
617 headers: vec![],
618 body: bytes::Bytes::from("Service Unavailable"),
619 });
620 return;
621 }
622
623 let (tx, rx) = tokio::sync::oneshot::channel();
625 let envelope = camel_component::consumer::ExchangeEnvelope {
626 exchange,
627 reply_tx: Some(tx),
628 };
629
630 let result = match sender.send(envelope).await {
631 Ok(()) => rx.await.map_err(|_| camel_api::CamelError::ChannelClosed),
632 Err(_) => Err(camel_api::CamelError::ChannelClosed),
633 }
634 .and_then(|r| r);
635
636 let reply = match result {
637 Ok(out) => {
638 let status = out
639 .input
640 .header("CamelHttpResponseCode")
641 .and_then(|v| v.as_u64())
642 .map(|s| s as u16)
643 .unwrap_or(200);
644
645 let body_bytes = match out.input.body {
646 Body::Empty => bytes::Bytes::new(),
647 Body::Bytes(b) => b,
648 Body::Text(s) => bytes::Bytes::from(s.into_bytes()),
649 Body::Json(v) => bytes::Bytes::from(v.to_string().into_bytes()),
650 Body::Stream(_) => {
651 match out.input.body.into_bytes(max_response_body).await {
653 Ok(b) => b,
654 Err(e) => {
655 debug!(error = %e, "Failed to materialize stream body for HTTP reply");
656 return;
657 }
658 }
659 }
660 };
661
662 let resp_headers: Vec<(String, String)> = out
663 .input
664 .headers
665 .iter()
666 .filter(|(k, _)| !k.starts_with("Camel"))
668 .filter(|(k, _)| {
671 !matches!(
672 k.to_lowercase().as_str(),
673 "content-length" | "content-type" | "transfer-encoding" | "connection" | "cache-control" | "date" | "pragma" | "trailer" | "upgrade" | "via" | "warning" | "host" | "user-agent" | "accept" | "accept-encoding" | "accept-language" | "accept-charset" | "authorization" | "proxy-authorization" | "cookie" | "expect" | "from" | "if-match" | "if-modified-since" | "if-none-match" | "if-range" | "if-unmodified-since" | "max-forwards" | "proxy-connection" | "range" | "referer" | "te" )
708 })
709 .filter_map(|(k, v)| {
710 v.as_str().map(|s| (k.clone(), s.to_string()))
711 })
712 .collect();
713
714 HttpReply {
715 status,
716 headers: resp_headers,
717 body: body_bytes,
718 }
719 }
720 Err(e) => {
721 tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
722 HttpReply {
723 status: 500,
724 headers: vec![],
725 body: bytes::Bytes::from("Internal Server Error"),
726 }
727 }
728 };
729
730 let _ = reply_tx.send(reply);
732 });
733 }
734 }
735 }
736
737 {
739 let mut table = dispatch.write().await;
740 table.remove(&path);
741 }
742
743 Ok(())
744 }
745
746 async fn stop(&mut self) -> Result<(), CamelError> {
747 Ok(())
748 }
749
750 fn concurrency_model(&self) -> camel_component::ConcurrencyModel {
751 camel_component::ConcurrencyModel::Concurrent { max: None }
752 }
753}
754
755pub struct HttpComponent;
760
761impl HttpComponent {
762 pub fn new() -> Self {
763 Self
764 }
765}
766
767impl Default for HttpComponent {
768 fn default() -> Self {
769 Self::new()
770 }
771}
772
773impl Component for HttpComponent {
774 fn scheme(&self) -> &str {
775 "http"
776 }
777
778 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
779 let config = HttpConfig::from_uri(uri)?;
780 let server_config = HttpServerConfig::from_uri(uri)?;
781 let client = build_client(&config)?;
782 Ok(Box::new(HttpEndpoint {
783 uri: uri.to_string(),
784 config,
785 server_config,
786 client,
787 }))
788 }
789}
790
791pub struct HttpsComponent;
792
793impl HttpsComponent {
794 pub fn new() -> Self {
795 Self
796 }
797}
798
799impl Default for HttpsComponent {
800 fn default() -> Self {
801 Self::new()
802 }
803}
804
805impl Component for HttpsComponent {
806 fn scheme(&self) -> &str {
807 "https"
808 }
809
810 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
811 let config = HttpConfig::from_uri(uri)?;
812 let server_config = HttpServerConfig::from_uri(uri)?;
813 let client = build_client(&config)?;
814 Ok(Box::new(HttpEndpoint {
815 uri: uri.to_string(),
816 config,
817 server_config,
818 client,
819 }))
820 }
821}
822
823fn build_client(config: &HttpConfig) -> Result<reqwest::Client, CamelError> {
824 let mut builder = reqwest::Client::builder().connect_timeout(config.connect_timeout);
825
826 if !config.follow_redirects {
827 builder = builder.redirect(reqwest::redirect::Policy::none());
828 }
829
830 builder.build().map_err(|e| {
831 CamelError::EndpointCreationFailed(format!("Failed to build HTTP client: {e}"))
832 })
833}
834
835struct HttpEndpoint {
840 uri: String,
841 config: HttpConfig,
842 server_config: HttpServerConfig,
843 client: reqwest::Client,
844}
845
846impl Endpoint for HttpEndpoint {
847 fn uri(&self) -> &str {
848 &self.uri
849 }
850
851 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
852 Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
853 }
854
855 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
856 Ok(BoxProcessor::new(HttpProducer {
857 config: Arc::new(self.config.clone()),
858 client: self.client.clone(),
859 }))
860 }
861}
862
863fn validate_url_for_ssrf(url: &str, config: &HttpConfig) -> Result<(), CamelError> {
868 let parsed = url::Url::parse(url)
869 .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
870
871 if let Some(host) = parsed.host_str()
873 && config.blocked_hosts.iter().any(|blocked| host == blocked)
874 {
875 return Err(CamelError::ProcessorError(format!(
876 "Host '{}' is blocked",
877 host
878 )));
879 }
880
881 if !config.allow_private_ips
883 && let Some(host) = parsed.host()
884 {
885 match host {
886 url::Host::Ipv4(ip) => {
887 if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
888 return Err(CamelError::ProcessorError(format!(
889 "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
890 ip
891 )));
892 }
893 }
894 url::Host::Ipv6(ip) => {
895 if ip.is_loopback() {
896 return Err(CamelError::ProcessorError(format!(
897 "Loopback IP '{}' not allowed",
898 ip
899 )));
900 }
901 }
902 url::Host::Domain(domain) => {
903 let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
905 if blocked_domains.contains(&domain) {
906 return Err(CamelError::ProcessorError(format!(
907 "Domain '{}' is not allowed",
908 domain
909 )));
910 }
911 }
912 }
913 }
914
915 Ok(())
916}
917
918#[derive(Clone)]
923struct HttpProducer {
924 config: Arc<HttpConfig>,
925 client: reqwest::Client,
926}
927
928impl HttpProducer {
929 fn resolve_method(exchange: &Exchange, config: &HttpConfig) -> String {
930 if let Some(ref method) = config.http_method {
931 return method.to_uppercase();
932 }
933 if let Some(method) = exchange
934 .input
935 .header("CamelHttpMethod")
936 .and_then(|v| v.as_str())
937 {
938 return method.to_uppercase();
939 }
940 if !exchange.input.body.is_empty() {
941 return "POST".to_string();
942 }
943 "GET".to_string()
944 }
945
946 fn resolve_url(exchange: &Exchange, config: &HttpConfig) -> String {
947 if let Some(uri) = exchange
948 .input
949 .header("CamelHttpUri")
950 .and_then(|v| v.as_str())
951 {
952 let mut url = uri.to_string();
953 if let Some(path) = exchange
954 .input
955 .header("CamelHttpPath")
956 .and_then(|v| v.as_str())
957 {
958 if !url.ends_with('/') && !path.starts_with('/') {
959 url.push('/');
960 }
961 url.push_str(path);
962 }
963 if let Some(query) = exchange
964 .input
965 .header("CamelHttpQuery")
966 .and_then(|v| v.as_str())
967 {
968 url.push('?');
969 url.push_str(query);
970 }
971 return url;
972 }
973
974 let mut url = config.base_url.clone();
975
976 if let Some(path) = exchange
977 .input
978 .header("CamelHttpPath")
979 .and_then(|v| v.as_str())
980 {
981 if !url.ends_with('/') && !path.starts_with('/') {
982 url.push('/');
983 }
984 url.push_str(path);
985 }
986
987 if let Some(query) = exchange
988 .input
989 .header("CamelHttpQuery")
990 .and_then(|v| v.as_str())
991 {
992 url.push('?');
993 url.push_str(query);
994 } else if !config.query_params.is_empty() {
995 url.push('?');
997 let query_string: String = config
998 .query_params
999 .iter()
1000 .map(|(k, v)| format!("{k}={v}"))
1001 .collect::<Vec<_>>()
1002 .join("&");
1003 url.push_str(&query_string);
1004 }
1005
1006 url
1007 }
1008
1009 fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1010 status >= range.0 && status <= range.1
1011 }
1012}
1013
1014impl Service<Exchange> for HttpProducer {
1015 type Response = Exchange;
1016 type Error = CamelError;
1017 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1018
1019 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1020 Poll::Ready(Ok(()))
1021 }
1022
1023 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1024 let config = self.config.clone();
1025 let client = self.client.clone();
1026
1027 Box::pin(async move {
1028 let method_str = HttpProducer::resolve_method(&exchange, &config);
1029 let url = HttpProducer::resolve_url(&exchange, &config);
1030
1031 validate_url_for_ssrf(&url, &config)?;
1033
1034 debug!(
1035 correlation_id = %exchange.correlation_id(),
1036 method = %method_str,
1037 url = %url,
1038 "HTTP request"
1039 );
1040
1041 let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1042 CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1043 })?;
1044
1045 let mut request = client.request(method, &url);
1046
1047 if let Some(timeout) = config.response_timeout {
1048 request = request.timeout(timeout);
1049 }
1050
1051 #[cfg(feature = "otel")]
1053 {
1054 let mut otel_headers = HashMap::new();
1055 camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1056 for (k, v) in otel_headers {
1057 if let (Ok(name), Ok(val)) = (
1058 reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1059 reqwest::header::HeaderValue::from_str(&v),
1060 ) {
1061 request = request.header(name, val);
1062 }
1063 }
1064 }
1065
1066 for (key, value) in &exchange.input.headers {
1067 if !key.starts_with("Camel")
1068 && let Some(val_str) = value.as_str()
1069 && let (Ok(name), Ok(val)) = (
1070 reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1071 reqwest::header::HeaderValue::from_str(val_str),
1072 )
1073 {
1074 request = request.header(name, val);
1075 }
1076 }
1077
1078 match exchange.input.body {
1079 Body::Stream(ref s) => {
1080 let mut stream_lock = s.stream.lock().await;
1081 if let Some(stream) = stream_lock.take() {
1082 request = request.body(reqwest::Body::wrap_stream(stream));
1083 } else {
1084 return Err(CamelError::AlreadyConsumed);
1085 }
1086 }
1087 _ => {
1088 let body = std::mem::take(&mut exchange.input.body);
1090 let bytes = body.into_bytes(config.max_body_size).await?;
1091 if !bytes.is_empty() {
1092 request = request.body(bytes);
1093 }
1094 }
1095 }
1096
1097 let response = request
1098 .send()
1099 .await
1100 .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1101
1102 let status_code = response.status().as_u16();
1103 let status_text = response
1104 .status()
1105 .canonical_reason()
1106 .unwrap_or("Unknown")
1107 .to_string();
1108
1109 for (key, value) in response.headers() {
1110 if let Ok(val_str) = value.to_str() {
1111 exchange
1112 .input
1113 .set_header(key.as_str(), serde_json::Value::String(val_str.to_string()));
1114 }
1115 }
1116
1117 exchange.input.set_header(
1118 "CamelHttpResponseCode",
1119 serde_json::Value::Number(status_code.into()),
1120 );
1121 exchange.input.set_header(
1122 "CamelHttpResponseText",
1123 serde_json::Value::String(status_text.clone()),
1124 );
1125
1126 let response_body = response.bytes().await.map_err(|e| {
1127 CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1128 })?;
1129
1130 if config.throw_exception_on_failure
1131 && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1132 {
1133 return Err(CamelError::HttpOperationFailed {
1134 method: method_str,
1135 url,
1136 status_code,
1137 status_text,
1138 response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1139 });
1140 }
1141
1142 if !response_body.is_empty() {
1143 exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1144 }
1145
1146 debug!(
1147 correlation_id = %exchange.correlation_id(),
1148 status = status_code,
1149 url = %url,
1150 "HTTP response"
1151 );
1152 Ok(exchange)
1153 })
1154 }
1155}
1156
1157#[cfg(test)]
1158mod tests {
1159 use super::*;
1160 use camel_api::Message;
1161 use std::sync::Arc;
1162 use std::time::Duration;
1163 use tokio::sync::Mutex;
1164
1165 struct NullRouteController;
1167 #[async_trait::async_trait]
1168 impl camel_api::RouteController for NullRouteController {
1169 async fn start_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1170 Ok(())
1171 }
1172 async fn stop_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1173 Ok(())
1174 }
1175 async fn restart_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1176 Ok(())
1177 }
1178 async fn suspend_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1179 Ok(())
1180 }
1181 async fn resume_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1182 Ok(())
1183 }
1184 fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
1185 None
1186 }
1187 async fn start_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
1188 Ok(())
1189 }
1190 async fn stop_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
1191 Ok(())
1192 }
1193 }
1194
1195 fn test_producer_ctx() -> ProducerContext {
1196 ProducerContext::new(Arc::new(Mutex::new(NullRouteController)))
1197 }
1198
1199 #[test]
1200 fn test_http_config_defaults() {
1201 let config = HttpConfig::from_uri("http://localhost:8080/api").unwrap();
1202 assert_eq!(config.base_url, "http://localhost:8080/api");
1203 assert!(config.http_method.is_none());
1204 assert!(config.throw_exception_on_failure);
1205 assert_eq!(config.ok_status_code_range, (200, 299));
1206 assert!(!config.follow_redirects);
1207 assert_eq!(config.connect_timeout, Duration::from_millis(30000));
1208 assert!(config.response_timeout.is_none());
1209 }
1210
1211 #[test]
1212 fn test_http_config_with_options() {
1213 let config = HttpConfig::from_uri(
1214 "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1215 ).unwrap();
1216 assert_eq!(config.base_url, "https://api.example.com/v1");
1217 assert_eq!(config.http_method, Some("PUT".to_string()));
1218 assert!(!config.throw_exception_on_failure);
1219 assert!(config.follow_redirects);
1220 assert_eq!(config.connect_timeout, Duration::from_millis(5000));
1221 assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1222 }
1223
1224 #[test]
1225 fn test_http_config_ok_status_range() {
1226 let config =
1227 HttpConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1228 assert_eq!(config.ok_status_code_range, (200, 204));
1229 }
1230
1231 #[test]
1232 fn test_http_config_wrong_scheme() {
1233 let result = HttpConfig::from_uri("file:/tmp");
1234 assert!(result.is_err());
1235 }
1236
1237 #[test]
1238 fn test_http_component_scheme() {
1239 let component = HttpComponent::new();
1240 assert_eq!(component.scheme(), "http");
1241 }
1242
1243 #[test]
1244 fn test_https_component_scheme() {
1245 let component = HttpsComponent::new();
1246 assert_eq!(component.scheme(), "https");
1247 }
1248
1249 #[test]
1250 fn test_http_endpoint_creates_consumer() {
1251 let component = HttpComponent::new();
1252 let endpoint = component
1253 .create_endpoint("http://0.0.0.0:19100/test")
1254 .unwrap();
1255 assert!(endpoint.create_consumer().is_ok());
1256 }
1257
1258 #[test]
1259 fn test_https_endpoint_creates_consumer() {
1260 let component = HttpsComponent::new();
1261 let endpoint = component
1262 .create_endpoint("https://0.0.0.0:8443/test")
1263 .unwrap();
1264 assert!(endpoint.create_consumer().is_ok());
1265 }
1266
1267 #[test]
1268 fn test_http_endpoint_creates_producer() {
1269 let ctx = test_producer_ctx();
1270 let component = HttpComponent::new();
1271 let endpoint = component.create_endpoint("http://localhost/api").unwrap();
1272 assert!(endpoint.create_producer(&ctx).is_ok());
1273 }
1274
1275 async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
1280 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1281 let addr = listener.local_addr().unwrap();
1282 let url = format!("http://127.0.0.1:{}", addr.port());
1283
1284 let handle = tokio::spawn(async move {
1285 loop {
1286 if let Ok((mut stream, _)) = listener.accept().await {
1287 tokio::spawn(async move {
1288 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1289 let mut buf = vec![0u8; 4096];
1290 let n = stream.read(&mut buf).await.unwrap_or(0);
1291 let request = String::from_utf8_lossy(&buf[..n]).to_string();
1292
1293 let method = request.split_whitespace().next().unwrap_or("GET");
1294
1295 let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
1296 let response = format!(
1297 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
1298 body.len(),
1299 body
1300 );
1301 let _ = stream.write_all(response.as_bytes()).await;
1302 });
1303 }
1304 }
1305 });
1306
1307 (url, handle)
1308 }
1309
1310 async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
1311 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1312 let addr = listener.local_addr().unwrap();
1313 let url = format!("http://127.0.0.1:{}", addr.port());
1314
1315 let handle = tokio::spawn(async move {
1316 loop {
1317 if let Ok((mut stream, _)) = listener.accept().await {
1318 let status = status;
1319 tokio::spawn(async move {
1320 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1321 let mut buf = vec![0u8; 4096];
1322 let _ = stream.read(&mut buf).await;
1323
1324 let status_text = match status {
1325 404 => "Not Found",
1326 500 => "Internal Server Error",
1327 _ => "Error",
1328 };
1329 let body = "error body";
1330 let response = format!(
1331 "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
1332 status,
1333 status_text,
1334 body.len(),
1335 body
1336 );
1337 let _ = stream.write_all(response.as_bytes()).await;
1338 });
1339 }
1340 }
1341 });
1342
1343 (url, handle)
1344 }
1345
1346 #[tokio::test]
1347 async fn test_http_producer_get_request() {
1348 use tower::ServiceExt;
1349
1350 let (url, _handle) = start_test_server().await;
1351 let ctx = test_producer_ctx();
1352
1353 let component = HttpComponent::new();
1354 let endpoint = component
1355 .create_endpoint(&format!("{url}/api/test?allowPrivateIps=true"))
1356 .unwrap();
1357 let producer = endpoint.create_producer(&ctx).unwrap();
1358
1359 let exchange = Exchange::new(Message::default());
1360 let result = producer.oneshot(exchange).await.unwrap();
1361
1362 let status = result
1363 .input
1364 .header("CamelHttpResponseCode")
1365 .and_then(|v| v.as_u64())
1366 .unwrap();
1367 assert_eq!(status, 200);
1368
1369 assert!(!result.input.body.is_empty());
1370 }
1371
1372 #[tokio::test]
1373 async fn test_http_producer_post_with_body() {
1374 use tower::ServiceExt;
1375
1376 let (url, _handle) = start_test_server().await;
1377 let ctx = test_producer_ctx();
1378
1379 let component = HttpComponent::new();
1380 let endpoint = component
1381 .create_endpoint(&format!("{url}/api/data?allowPrivateIps=true"))
1382 .unwrap();
1383 let producer = endpoint.create_producer(&ctx).unwrap();
1384
1385 let exchange = Exchange::new(Message::new("request body"));
1386 let result = producer.oneshot(exchange).await.unwrap();
1387
1388 let status = result
1389 .input
1390 .header("CamelHttpResponseCode")
1391 .and_then(|v| v.as_u64())
1392 .unwrap();
1393 assert_eq!(status, 200);
1394 }
1395
1396 #[tokio::test]
1397 async fn test_http_producer_method_from_header() {
1398 use tower::ServiceExt;
1399
1400 let (url, _handle) = start_test_server().await;
1401 let ctx = test_producer_ctx();
1402
1403 let component = HttpComponent::new();
1404 let endpoint = component
1405 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1406 .unwrap();
1407 let producer = endpoint.create_producer(&ctx).unwrap();
1408
1409 let mut exchange = Exchange::new(Message::default());
1410 exchange.input.set_header(
1411 "CamelHttpMethod",
1412 serde_json::Value::String("DELETE".to_string()),
1413 );
1414
1415 let result = producer.oneshot(exchange).await.unwrap();
1416 let status = result
1417 .input
1418 .header("CamelHttpResponseCode")
1419 .and_then(|v| v.as_u64())
1420 .unwrap();
1421 assert_eq!(status, 200);
1422 }
1423
1424 #[tokio::test]
1425 async fn test_http_producer_forced_method() {
1426 use tower::ServiceExt;
1427
1428 let (url, _handle) = start_test_server().await;
1429 let ctx = test_producer_ctx();
1430
1431 let component = HttpComponent::new();
1432 let endpoint = component
1433 .create_endpoint(&format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"))
1434 .unwrap();
1435 let producer = endpoint.create_producer(&ctx).unwrap();
1436
1437 let exchange = Exchange::new(Message::default());
1438 let result = producer.oneshot(exchange).await.unwrap();
1439
1440 let status = result
1441 .input
1442 .header("CamelHttpResponseCode")
1443 .and_then(|v| v.as_u64())
1444 .unwrap();
1445 assert_eq!(status, 200);
1446 }
1447
1448 #[tokio::test]
1449 async fn test_http_producer_throw_exception_on_failure() {
1450 use tower::ServiceExt;
1451
1452 let (url, _handle) = start_status_server(404).await;
1453 let ctx = test_producer_ctx();
1454
1455 let component = HttpComponent::new();
1456 let endpoint = component
1457 .create_endpoint(&format!("{url}/not-found?allowPrivateIps=true"))
1458 .unwrap();
1459 let producer = endpoint.create_producer(&ctx).unwrap();
1460
1461 let exchange = Exchange::new(Message::default());
1462 let result = producer.oneshot(exchange).await;
1463 assert!(result.is_err());
1464
1465 match result.unwrap_err() {
1466 CamelError::HttpOperationFailed { status_code, .. } => {
1467 assert_eq!(status_code, 404);
1468 }
1469 e => panic!("Expected HttpOperationFailed, got: {e}"),
1470 }
1471 }
1472
1473 #[tokio::test]
1474 async fn test_http_producer_no_throw_on_failure() {
1475 use tower::ServiceExt;
1476
1477 let (url, _handle) = start_status_server(500).await;
1478 let ctx = test_producer_ctx();
1479
1480 let component = HttpComponent::new();
1481 let endpoint = component
1482 .create_endpoint(&format!(
1483 "{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"
1484 ))
1485 .unwrap();
1486 let producer = endpoint.create_producer(&ctx).unwrap();
1487
1488 let exchange = Exchange::new(Message::default());
1489 let result = producer.oneshot(exchange).await.unwrap();
1490
1491 let status = result
1492 .input
1493 .header("CamelHttpResponseCode")
1494 .and_then(|v| v.as_u64())
1495 .unwrap();
1496 assert_eq!(status, 500);
1497 }
1498
1499 #[tokio::test]
1500 async fn test_http_producer_uri_override() {
1501 use tower::ServiceExt;
1502
1503 let (url, _handle) = start_test_server().await;
1504 let ctx = test_producer_ctx();
1505
1506 let component = HttpComponent::new();
1507 let endpoint = component
1508 .create_endpoint("http://localhost:1/does-not-exist?allowPrivateIps=true")
1509 .unwrap();
1510 let producer = endpoint.create_producer(&ctx).unwrap();
1511
1512 let mut exchange = Exchange::new(Message::default());
1513 exchange.input.set_header(
1514 "CamelHttpUri",
1515 serde_json::Value::String(format!("{url}/api")),
1516 );
1517
1518 let result = producer.oneshot(exchange).await.unwrap();
1519 let status = result
1520 .input
1521 .header("CamelHttpResponseCode")
1522 .and_then(|v| v.as_u64())
1523 .unwrap();
1524 assert_eq!(status, 200);
1525 }
1526
1527 #[tokio::test]
1528 async fn test_http_producer_response_headers_mapped() {
1529 use tower::ServiceExt;
1530
1531 let (url, _handle) = start_test_server().await;
1532 let ctx = test_producer_ctx();
1533
1534 let component = HttpComponent::new();
1535 let endpoint = component
1536 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1537 .unwrap();
1538 let producer = endpoint.create_producer(&ctx).unwrap();
1539
1540 let exchange = Exchange::new(Message::default());
1541 let result = producer.oneshot(exchange).await.unwrap();
1542
1543 assert!(
1544 result.input.header("content-type").is_some()
1545 || result.input.header("Content-Type").is_some()
1546 );
1547 assert!(result.input.header("CamelHttpResponseText").is_some());
1548 }
1549
1550 async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
1555 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1556 let addr = listener.local_addr().unwrap();
1557 let url = format!("http://127.0.0.1:{}", addr.port());
1558
1559 let handle = tokio::spawn(async move {
1560 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1561 loop {
1562 if let Ok((mut stream, _)) = listener.accept().await {
1563 tokio::spawn(async move {
1564 let mut buf = vec![0u8; 4096];
1565 let n = stream.read(&mut buf).await.unwrap_or(0);
1566 let request = String::from_utf8_lossy(&buf[..n]).to_string();
1567
1568 if request.contains("GET /final") {
1570 let body = r#"{"status":"final"}"#;
1571 let response = format!(
1572 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1573 body.len(),
1574 body
1575 );
1576 let _ = stream.write_all(response.as_bytes()).await;
1577 } else {
1578 let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
1580 let _ = stream.write_all(response.as_bytes()).await;
1581 }
1582 });
1583 }
1584 }
1585 });
1586
1587 (url, handle)
1588 }
1589
1590 #[tokio::test]
1591 async fn test_follow_redirects_false_does_not_follow() {
1592 use tower::ServiceExt;
1593
1594 let (url, _handle) = start_redirect_server().await;
1595 let ctx = test_producer_ctx();
1596
1597 let component = HttpComponent::new();
1598 let endpoint = component
1599 .create_endpoint(&format!(
1600 "{url}?followRedirects=false&throwExceptionOnFailure=false&allowPrivateIps=true"
1601 ))
1602 .unwrap();
1603 let producer = endpoint.create_producer(&ctx).unwrap();
1604
1605 let exchange = Exchange::new(Message::default());
1606 let result = producer.oneshot(exchange).await.unwrap();
1607
1608 let status = result
1610 .input
1611 .header("CamelHttpResponseCode")
1612 .and_then(|v| v.as_u64())
1613 .unwrap();
1614 assert_eq!(
1615 status, 302,
1616 "Should NOT follow redirect when followRedirects=false"
1617 );
1618 }
1619
1620 #[tokio::test]
1621 async fn test_follow_redirects_true_follows_redirect() {
1622 use tower::ServiceExt;
1623
1624 let (url, _handle) = start_redirect_server().await;
1625 let ctx = test_producer_ctx();
1626
1627 let component = HttpComponent::new();
1628 let endpoint = component
1629 .create_endpoint(&format!("{url}?followRedirects=true&allowPrivateIps=true"))
1630 .unwrap();
1631 let producer = endpoint.create_producer(&ctx).unwrap();
1632
1633 let exchange = Exchange::new(Message::default());
1634 let result = producer.oneshot(exchange).await.unwrap();
1635
1636 let status = result
1638 .input
1639 .header("CamelHttpResponseCode")
1640 .and_then(|v| v.as_u64())
1641 .unwrap();
1642 assert_eq!(
1643 status, 200,
1644 "Should follow redirect when followRedirects=true"
1645 );
1646 }
1647
1648 #[tokio::test]
1649 async fn test_query_params_forwarded_to_http_request() {
1650 use tower::ServiceExt;
1651
1652 let (url, _handle) = start_test_server().await;
1653 let ctx = test_producer_ctx();
1654
1655 let component = HttpComponent::new();
1656 let endpoint = component
1658 .create_endpoint(&format!(
1659 "{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"
1660 ))
1661 .unwrap();
1662 let producer = endpoint.create_producer(&ctx).unwrap();
1663
1664 let exchange = Exchange::new(Message::default());
1665 let result = producer.oneshot(exchange).await.unwrap();
1666
1667 let status = result
1670 .input
1671 .header("CamelHttpResponseCode")
1672 .and_then(|v| v.as_u64())
1673 .unwrap();
1674 assert_eq!(status, 200);
1675 }
1676
1677 #[tokio::test]
1678 async fn test_non_camel_query_params_are_forwarded() {
1679 let config = HttpConfig::from_uri(
1682 "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
1683 )
1684 .unwrap();
1685
1686 assert!(
1688 config.query_params.contains_key("apiKey"),
1689 "apiKey should be preserved"
1690 );
1691 assert!(
1692 config.query_params.contains_key("token"),
1693 "token should be preserved"
1694 );
1695 assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
1696 assert_eq!(config.query_params.get("token").unwrap(), "abc456");
1697
1698 assert!(
1700 !config.query_params.contains_key("httpMethod"),
1701 "httpMethod should not be forwarded"
1702 );
1703 }
1704
1705 #[tokio::test]
1710 async fn test_http_producer_blocks_metadata_endpoint() {
1711 use tower::ServiceExt;
1712
1713 let ctx = test_producer_ctx();
1714 let component = HttpComponent::new();
1715 let endpoint = component
1716 .create_endpoint("http://example.com/api?allowPrivateIps=false")
1717 .unwrap();
1718 let producer = endpoint.create_producer(&ctx).unwrap();
1719
1720 let mut exchange = Exchange::new(Message::default());
1721 exchange.input.set_header(
1722 "CamelHttpUri",
1723 serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
1724 );
1725
1726 let result = producer.oneshot(exchange).await;
1727 assert!(result.is_err(), "Should block AWS metadata endpoint");
1728
1729 let err = result.unwrap_err();
1730 assert!(
1731 err.to_string().contains("Private IP"),
1732 "Error should mention private IP blocking, got: {}",
1733 err
1734 );
1735 }
1736
1737 #[test]
1738 fn test_ssrf_config_defaults() {
1739 let config = HttpConfig::from_uri("http://example.com/api").unwrap();
1740 assert!(
1741 !config.allow_private_ips,
1742 "Private IPs should be blocked by default"
1743 );
1744 assert!(
1745 config.blocked_hosts.is_empty(),
1746 "Blocked hosts should be empty by default"
1747 );
1748 }
1749
1750 #[test]
1751 fn test_ssrf_config_allow_private_ips() {
1752 let config = HttpConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
1753 assert!(
1754 config.allow_private_ips,
1755 "Private IPs should be allowed when explicitly set"
1756 );
1757 }
1758
1759 #[test]
1760 fn test_ssrf_config_blocked_hosts() {
1761 let config =
1762 HttpConfig::from_uri("http://example.com/api?blockedHosts=evil.com,malware.net")
1763 .unwrap();
1764 assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
1765 }
1766
1767 #[tokio::test]
1768 async fn test_http_producer_blocks_localhost() {
1769 use tower::ServiceExt;
1770
1771 let ctx = test_producer_ctx();
1772 let component = HttpComponent::new();
1773 let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1774 let producer = endpoint.create_producer(&ctx).unwrap();
1775
1776 let mut exchange = Exchange::new(Message::default());
1777 exchange.input.set_header(
1778 "CamelHttpUri",
1779 serde_json::Value::String("http://localhost:8080/internal".to_string()),
1780 );
1781
1782 let result = producer.oneshot(exchange).await;
1783 assert!(result.is_err(), "Should block localhost");
1784 }
1785
1786 #[tokio::test]
1787 async fn test_http_producer_blocks_loopback_ip() {
1788 use tower::ServiceExt;
1789
1790 let ctx = test_producer_ctx();
1791 let component = HttpComponent::new();
1792 let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1793 let producer = endpoint.create_producer(&ctx).unwrap();
1794
1795 let mut exchange = Exchange::new(Message::default());
1796 exchange.input.set_header(
1797 "CamelHttpUri",
1798 serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
1799 );
1800
1801 let result = producer.oneshot(exchange).await;
1802 assert!(result.is_err(), "Should block loopback IP");
1803 }
1804
1805 #[tokio::test]
1806 async fn test_http_producer_allows_private_ip_when_enabled() {
1807 use tower::ServiceExt;
1808
1809 let ctx = test_producer_ctx();
1810 let component = HttpComponent::new();
1811 let endpoint = component
1814 .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true")
1815 .unwrap();
1816 let producer = endpoint.create_producer(&ctx).unwrap();
1817
1818 let exchange = Exchange::new(Message::default());
1819
1820 let result = producer.oneshot(exchange).await;
1823 if let Err(ref e) = result {
1825 let err_str = e.to_string();
1826 assert!(
1827 !err_str.contains("Private IP") && !err_str.contains("not allowed"),
1828 "Should not be SSRF error, got: {}",
1829 err_str
1830 );
1831 }
1832 }
1833
1834 #[test]
1839 fn test_http_server_config_parse() {
1840 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
1841 assert_eq!(cfg.host, "0.0.0.0");
1842 assert_eq!(cfg.port, 8080);
1843 assert_eq!(cfg.path, "/orders");
1844 }
1845
1846 #[test]
1847 fn test_http_server_config_default_path() {
1848 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
1849 assert_eq!(cfg.path, "/");
1850 }
1851
1852 #[test]
1853 fn test_http_server_config_wrong_scheme() {
1854 assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
1855 }
1856
1857 #[test]
1858 fn test_http_server_config_invalid_port() {
1859 assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
1860 }
1861
1862 #[test]
1863 fn test_request_envelope_and_reply_are_send() {
1864 fn assert_send<T: Send>() {}
1865 assert_send::<RequestEnvelope>();
1866 assert_send::<HttpReply>();
1867 }
1868
1869 #[test]
1874 fn test_server_registry_global_is_singleton() {
1875 let r1 = ServerRegistry::global();
1876 let r2 = ServerRegistry::global();
1877 assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
1878 }
1879
1880 #[tokio::test]
1885 async fn test_dispatch_handler_returns_404_for_unknown_path() {
1886 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
1887 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1889 let port = listener.local_addr().unwrap().port();
1890 tokio::spawn(run_axum_server(listener, dispatch, 2 * 1024 * 1024));
1891
1892 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1894
1895 let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
1896 .await
1897 .unwrap();
1898 assert_eq!(resp.status().as_u16(), 404);
1899 }
1900
1901 #[tokio::test]
1906 async fn test_http_consumer_start_registers_path() {
1907 use camel_component::ConsumerContext;
1908
1909 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1911 let port = listener.local_addr().unwrap().port();
1912 drop(listener); let consumer_cfg = HttpServerConfig {
1915 host: "127.0.0.1".to_string(),
1916 port,
1917 path: "/ping".to_string(),
1918 max_request_body: 2 * 1024 * 1024,
1919 max_response_body: 10 * 1024 * 1024,
1920 };
1921 let mut consumer = HttpConsumer::new(consumer_cfg);
1922
1923 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component::ExchangeEnvelope>(16);
1924 let token = tokio_util::sync::CancellationToken::new();
1925 let ctx = ConsumerContext::new(tx, token.clone());
1926
1927 tokio::spawn(async move {
1928 consumer.start(ctx).await.unwrap();
1929 });
1930
1931 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1932
1933 let client = reqwest::Client::new();
1934 let resp_future = client
1935 .post(format!("http://127.0.0.1:{port}/ping"))
1936 .body("hello world")
1937 .send();
1938
1939 let (http_result, _) = tokio::join!(resp_future, async {
1940 if let Some(mut envelope) = rx.recv().await {
1941 envelope.exchange.input.set_header(
1943 "CamelHttpResponseCode",
1944 serde_json::Value::Number(201.into()),
1945 );
1946 if let Some(reply_tx) = envelope.reply_tx {
1947 let _ = reply_tx.send(Ok(envelope.exchange));
1948 }
1949 }
1950 });
1951
1952 let resp = http_result.unwrap();
1953 assert_eq!(resp.status().as_u16(), 201);
1954
1955 token.cancel();
1956 }
1957
1958 #[tokio::test]
1963 async fn test_integration_single_consumer_round_trip() {
1964 use camel_component::{ConsumerContext, ExchangeEnvelope};
1965
1966 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1968 let port = listener.local_addr().unwrap().port();
1969 drop(listener); let component = HttpComponent::new();
1972 let endpoint = component
1973 .create_endpoint(&format!("http://127.0.0.1:{port}/echo"))
1974 .unwrap();
1975 let mut consumer = endpoint.create_consumer().unwrap();
1976
1977 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
1978 let token = tokio_util::sync::CancellationToken::new();
1979 let ctx = ConsumerContext::new(tx, token.clone());
1980
1981 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
1982 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1983
1984 let client = reqwest::Client::new();
1985 let send_fut = client
1986 .post(format!("http://127.0.0.1:{port}/echo"))
1987 .header("Content-Type", "text/plain")
1988 .body("ping")
1989 .send();
1990
1991 let (http_result, _) = tokio::join!(send_fut, async {
1992 if let Some(mut envelope) = rx.recv().await {
1993 assert_eq!(
1994 envelope.exchange.input.header("CamelHttpMethod"),
1995 Some(&serde_json::Value::String("POST".into()))
1996 );
1997 assert_eq!(
1998 envelope.exchange.input.header("CamelHttpPath"),
1999 Some(&serde_json::Value::String("/echo".into()))
2000 );
2001 envelope.exchange.input.body = camel_api::body::Body::Text("pong".to_string());
2002 if let Some(reply_tx) = envelope.reply_tx {
2003 let _ = reply_tx.send(Ok(envelope.exchange));
2004 }
2005 }
2006 });
2007
2008 let resp = http_result.unwrap();
2009 assert_eq!(resp.status().as_u16(), 200);
2010 let body = resp.text().await.unwrap();
2011 assert_eq!(body, "pong");
2012
2013 token.cancel();
2014 }
2015
2016 #[tokio::test]
2017 async fn test_integration_two_consumers_shared_port() {
2018 use camel_component::{ConsumerContext, ExchangeEnvelope};
2019
2020 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2022 let port = listener.local_addr().unwrap().port();
2023 drop(listener);
2024
2025 let component = HttpComponent::new();
2026
2027 let endpoint_a = component
2029 .create_endpoint(&format!("http://127.0.0.1:{port}/hello"))
2030 .unwrap();
2031 let mut consumer_a = endpoint_a.create_consumer().unwrap();
2032
2033 let endpoint_b = component
2035 .create_endpoint(&format!("http://127.0.0.1:{port}/world"))
2036 .unwrap();
2037 let mut consumer_b = endpoint_b.create_consumer().unwrap();
2038
2039 let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2040 let token_a = tokio_util::sync::CancellationToken::new();
2041 let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
2042
2043 let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2044 let token_b = tokio_util::sync::CancellationToken::new();
2045 let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
2046
2047 tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
2048 tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
2049 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2050
2051 let client = reqwest::Client::new();
2052
2053 let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
2055 let (resp_hello, _) = tokio::join!(fut_hello, async {
2056 if let Some(mut envelope) = rx_a.recv().await {
2057 envelope.exchange.input.body =
2058 camel_api::body::Body::Text("hello-response".to_string());
2059 if let Some(reply_tx) = envelope.reply_tx {
2060 let _ = reply_tx.send(Ok(envelope.exchange));
2061 }
2062 }
2063 });
2064
2065 let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
2067 let (resp_world, _) = tokio::join!(fut_world, async {
2068 if let Some(mut envelope) = rx_b.recv().await {
2069 envelope.exchange.input.body =
2070 camel_api::body::Body::Text("world-response".to_string());
2071 if let Some(reply_tx) = envelope.reply_tx {
2072 let _ = reply_tx.send(Ok(envelope.exchange));
2073 }
2074 }
2075 });
2076
2077 let body_a = resp_hello.unwrap().text().await.unwrap();
2078 let body_b = resp_world.unwrap().text().await.unwrap();
2079
2080 assert_eq!(body_a, "hello-response");
2081 assert_eq!(body_b, "world-response");
2082
2083 token_a.cancel();
2084 token_b.cancel();
2085 }
2086
2087 #[tokio::test]
2088 async fn test_integration_unregistered_path_returns_404() {
2089 use camel_component::{ConsumerContext, ExchangeEnvelope};
2090
2091 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2093 let port = listener.local_addr().unwrap().port();
2094 drop(listener);
2095
2096 let component = HttpComponent::new();
2097 let endpoint = component
2098 .create_endpoint(&format!("http://127.0.0.1:{port}/registered"))
2099 .unwrap();
2100 let mut consumer = endpoint.create_consumer().unwrap();
2101
2102 let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2103 let token = tokio_util::sync::CancellationToken::new();
2104 let ctx = ConsumerContext::new(tx, token.clone());
2105
2106 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2107 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2108
2109 let client = reqwest::Client::new();
2110 let resp = client
2111 .get(format!("http://127.0.0.1:{port}/not-there"))
2112 .send()
2113 .await
2114 .unwrap();
2115 assert_eq!(resp.status().as_u16(), 404);
2116
2117 token.cancel();
2118 }
2119
2120 #[test]
2121 fn test_http_consumer_declares_concurrent() {
2122 use camel_component::ConcurrencyModel;
2123
2124 let config = HttpServerConfig {
2125 host: "127.0.0.1".to_string(),
2126 port: 19999,
2127 path: "/test".to_string(),
2128 max_request_body: 2 * 1024 * 1024,
2129 max_response_body: 10 * 1024 * 1024,
2130 };
2131 let consumer = HttpConsumer::new(config);
2132 assert_eq!(
2133 consumer.concurrency_model(),
2134 ConcurrencyModel::Concurrent { max: None }
2135 );
2136 }
2137
2138 #[cfg(feature = "otel")]
2143 mod otel_tests {
2144 use super::*;
2145 use camel_api::Message;
2146 use tower::ServiceExt;
2147
2148 #[tokio::test]
2149 async fn test_producer_injects_traceparent_header() {
2150 let (url, _handle) = start_test_server_with_header_capture().await;
2151 let ctx = test_producer_ctx();
2152
2153 let component = HttpComponent::new();
2154 let endpoint = component
2155 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
2156 .unwrap();
2157 let producer = endpoint.create_producer(&ctx).unwrap();
2158
2159 let mut exchange = Exchange::new(Message::default());
2161 let mut headers = std::collections::HashMap::new();
2162 headers.insert(
2163 "traceparent".to_string(),
2164 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
2165 );
2166 camel_otel::extract_into_exchange(&mut exchange, &headers);
2167
2168 let result = producer.oneshot(exchange).await.unwrap();
2169
2170 let status = result
2172 .input
2173 .header("CamelHttpResponseCode")
2174 .and_then(|v| v.as_u64())
2175 .unwrap();
2176 assert_eq!(status, 200);
2177
2178 let traceparent = result.input.header("x-received-traceparent");
2180 assert!(
2181 traceparent.is_some(),
2182 "traceparent header should have been sent"
2183 );
2184
2185 let traceparent_str = traceparent.unwrap().as_str().unwrap();
2186 let parts: Vec<&str> = traceparent_str.split('-').collect();
2188 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2189 assert_eq!(parts[0], "00", "version should be 00");
2190 assert_eq!(
2191 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2192 "trace-id should match"
2193 );
2194 assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
2195 assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
2196 }
2197
2198 #[tokio::test]
2199 async fn test_consumer_extracts_traceparent_header() {
2200 use camel_component::{ConsumerContext, ExchangeEnvelope};
2201
2202 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2204 let port = listener.local_addr().unwrap().port();
2205 drop(listener);
2206
2207 let component = HttpComponent::new();
2208 let endpoint = component
2209 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"))
2210 .unwrap();
2211 let mut consumer = endpoint.create_consumer().unwrap();
2212
2213 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2214 let token = tokio_util::sync::CancellationToken::new();
2215 let ctx = ConsumerContext::new(tx, token.clone());
2216
2217 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2218 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2219
2220 let client = reqwest::Client::new();
2222 let send_fut = client
2223 .post(format!("http://127.0.0.1:{port}/trace"))
2224 .header(
2225 "traceparent",
2226 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2227 )
2228 .body("test")
2229 .send();
2230
2231 let (http_result, _) = tokio::join!(send_fut, async {
2232 if let Some(envelope) = rx.recv().await {
2233 let mut injected_headers = std::collections::HashMap::new();
2236 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2237
2238 assert!(
2239 injected_headers.contains_key("traceparent"),
2240 "Exchange should have traceparent after extraction"
2241 );
2242
2243 let traceparent = injected_headers.get("traceparent").unwrap();
2244 let parts: Vec<&str> = traceparent.split('-').collect();
2245 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2246 assert_eq!(
2247 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2248 "Trace ID should match the original traceparent header"
2249 );
2250
2251 if let Some(reply_tx) = envelope.reply_tx {
2252 let _ = reply_tx.send(Ok(envelope.exchange));
2253 }
2254 }
2255 });
2256
2257 let resp = http_result.unwrap();
2258 assert_eq!(resp.status().as_u16(), 200);
2259
2260 token.cancel();
2261 }
2262
2263 #[tokio::test]
2264 async fn test_consumer_extracts_mixed_case_traceparent_header() {
2265 use camel_component::{ConsumerContext, ExchangeEnvelope};
2266
2267 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2269 let port = listener.local_addr().unwrap().port();
2270 drop(listener);
2271
2272 let component = HttpComponent::new();
2273 let endpoint = component
2274 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"))
2275 .unwrap();
2276 let mut consumer = endpoint.create_consumer().unwrap();
2277
2278 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2279 let token = tokio_util::sync::CancellationToken::new();
2280 let ctx = ConsumerContext::new(tx, token.clone());
2281
2282 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2283 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2284
2285 let client = reqwest::Client::new();
2287 let send_fut = client
2288 .post(format!("http://127.0.0.1:{port}/trace"))
2289 .header(
2290 "TraceParent",
2291 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2292 )
2293 .body("test")
2294 .send();
2295
2296 let (http_result, _) = tokio::join!(send_fut, async {
2297 if let Some(envelope) = rx.recv().await {
2298 let mut injected_headers = HashMap::new();
2301 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2302
2303 assert!(
2304 injected_headers.contains_key("traceparent"),
2305 "Exchange should have traceparent after extraction from mixed-case header"
2306 );
2307
2308 let traceparent = injected_headers.get("traceparent").unwrap();
2309 let parts: Vec<&str> = traceparent.split('-').collect();
2310 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2311 assert_eq!(
2312 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2313 "Trace ID should match the original mixed-case TraceParent header"
2314 );
2315
2316 if let Some(reply_tx) = envelope.reply_tx {
2317 let _ = reply_tx.send(Ok(envelope.exchange));
2318 }
2319 }
2320 });
2321
2322 let resp = http_result.unwrap();
2323 assert_eq!(resp.status().as_u16(), 200);
2324
2325 token.cancel();
2326 }
2327
2328 #[tokio::test]
2329 async fn test_producer_no_trace_context_no_crash() {
2330 let (url, _handle) = start_test_server().await;
2331 let ctx = test_producer_ctx();
2332
2333 let component = HttpComponent::new();
2334 let endpoint = component
2335 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
2336 .unwrap();
2337 let producer = endpoint.create_producer(&ctx).unwrap();
2338
2339 let exchange = Exchange::new(Message::default());
2341
2342 let result = producer.oneshot(exchange).await.unwrap();
2344
2345 let status = result
2347 .input
2348 .header("CamelHttpResponseCode")
2349 .and_then(|v| v.as_u64())
2350 .unwrap();
2351 assert_eq!(status, 200);
2352 }
2353
2354 async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
2356 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2357 let addr = listener.local_addr().unwrap();
2358 let url = format!("http://127.0.0.1:{}", addr.port());
2359
2360 let handle = tokio::spawn(async move {
2361 loop {
2362 if let Ok((mut stream, _)) = listener.accept().await {
2363 tokio::spawn(async move {
2364 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2365 let mut buf = vec![0u8; 8192];
2366 let n = stream.read(&mut buf).await.unwrap_or(0);
2367 let request = String::from_utf8_lossy(&buf[..n]).to_string();
2368
2369 let traceparent = request
2371 .lines()
2372 .find(|line| line.to_lowercase().starts_with("traceparent:"))
2373 .map(|line| {
2374 line.split(':')
2375 .nth(1)
2376 .map(|s| s.trim().to_string())
2377 .unwrap_or_default()
2378 })
2379 .unwrap_or_default();
2380
2381 let body =
2382 format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
2383 let response = format!(
2384 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
2385 body.len(),
2386 traceparent,
2387 body
2388 );
2389 let _ = stream.write_all(response.as_bytes()).await;
2390 });
2391 }
2392 }
2393 });
2394
2395 (url, handle)
2396 }
2397 }
2398}