1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex, OnceLock};
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use tokio::sync::RwLock;
9use tower::Service;
10use tracing::debug;
11
12use camel_api::{BoxProcessor, CamelError, Exchange, body::Body};
13use camel_component::{Component, Consumer, Endpoint, ProducerContext};
14use camel_endpoint::parse_uri;
15
16#[derive(Debug, Clone)]
77pub struct HttpConfig {
78 pub base_url: String,
79 pub http_method: Option<String>,
80 pub throw_exception_on_failure: bool,
81 pub ok_status_code_range: (u16, u16),
82 pub follow_redirects: bool,
83 pub connect_timeout: Duration,
84 pub response_timeout: Option<Duration>,
85 pub query_params: HashMap<String, String>,
86 pub allow_private_ips: bool,
88 pub blocked_hosts: Vec<String>,
89 pub max_body_size: usize,
91}
92
93impl HttpConfig {
94 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
95 let parts = parse_uri(uri)?;
96 if parts.scheme != "http" && parts.scheme != "https" {
97 return Err(CamelError::InvalidUri(format!(
98 "expected scheme 'http' or 'https', got '{}'",
99 parts.scheme
100 )));
101 }
102
103 let base_url = format!("{}:{}", parts.scheme, parts.path);
104
105 let http_method = parts.params.get("httpMethod").cloned();
106
107 let throw_exception_on_failure = parts
108 .params
109 .get("throwExceptionOnFailure")
110 .map(|v| v != "false")
111 .unwrap_or(true);
112
113 let ok_status_code_range = parts
114 .params
115 .get("okStatusCodeRange")
116 .and_then(|v| {
117 let (start, end) = v.split_once('-')?;
118 Some((start.parse::<u16>().ok()?, end.parse::<u16>().ok()?))
119 })
120 .unwrap_or((200, 299));
121
122 let follow_redirects = parts
123 .params
124 .get("followRedirects")
125 .map(|v| v == "true")
126 .unwrap_or(false);
127
128 let connect_timeout = parts
129 .params
130 .get("connectTimeout")
131 .and_then(|v| v.parse::<u64>().ok())
132 .map(Duration::from_millis)
133 .unwrap_or(Duration::from_millis(30000));
134
135 let response_timeout = parts
136 .params
137 .get("responseTimeout")
138 .and_then(|v| v.parse::<u64>().ok())
139 .map(Duration::from_millis);
140
141 let allow_private_ips = parts
143 .params
144 .get("allowPrivateIps")
145 .map(|v| v == "true")
146 .unwrap_or(false); let blocked_hosts = parts
149 .params
150 .get("blockedHosts")
151 .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
152 .unwrap_or_default();
153
154 let max_body_size = parts
155 .params
156 .get("maxBodySize")
157 .and_then(|v| v.parse::<usize>().ok())
158 .unwrap_or(10 * 1024 * 1024); let camel_options = [
162 "httpMethod",
163 "throwExceptionOnFailure",
164 "okStatusCodeRange",
165 "followRedirects",
166 "connectTimeout",
167 "responseTimeout",
168 "allowPrivateIps",
169 "blockedHosts",
170 "maxBodySize",
171 ];
172
173 let query_params: HashMap<String, String> = parts
174 .params
175 .into_iter()
176 .filter(|(k, _)| !camel_options.contains(&k.as_str()))
177 .map(|(k, v)| (k.clone(), v.clone()))
178 .collect();
179
180 Ok(Self {
181 base_url,
182 http_method,
183 throw_exception_on_failure,
184 ok_status_code_range: (ok_status_code_range.0, ok_status_code_range.1),
185 follow_redirects,
186 connect_timeout,
187 response_timeout,
188 query_params,
189 allow_private_ips,
190 blocked_hosts,
191 max_body_size,
192 })
193 }
194}
195
196#[derive(Debug, Clone)]
202pub struct HttpServerConfig {
203 pub host: String,
205 pub port: u16,
207 pub path: String,
209 pub max_request_body: usize,
211 pub max_response_body: usize,
213}
214
215impl HttpServerConfig {
216 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
217 let parts = parse_uri(uri)?;
218 if parts.scheme != "http" && parts.scheme != "https" {
219 return Err(CamelError::InvalidUri(format!(
220 "expected scheme 'http' or 'https', got '{}'",
221 parts.scheme
222 )));
223 }
224
225 let authority_and_path = parts.path.trim_start_matches('/');
228
229 let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
231 (&authority_and_path[..idx], &authority_and_path[idx..])
232 } else {
233 (authority_and_path, "/")
234 };
235
236 let path = if path_suffix.is_empty() {
237 "/"
238 } else {
239 path_suffix
240 }
241 .to_string();
242
243 let (host, port) = if let Some(colon) = authority.rfind(':') {
245 let port_str = &authority[colon + 1..];
246 match port_str.parse::<u16>() {
247 Ok(p) => (authority[..colon].to_string(), p),
248 Err(_) => {
249 return Err(CamelError::InvalidUri(format!(
250 "invalid port '{}' in URI '{}'",
251 port_str, uri
252 )));
253 }
254 }
255 } else {
256 (authority.to_string(), 80)
257 };
258
259 let max_request_body = parts
260 .params
261 .get("maxRequestBody")
262 .and_then(|v| v.parse::<usize>().ok())
263 .unwrap_or(2 * 1024 * 1024); let max_response_body = parts
266 .params
267 .get("maxResponseBody")
268 .and_then(|v| v.parse::<usize>().ok())
269 .unwrap_or(10 * 1024 * 1024); Ok(Self {
272 host,
273 port,
274 path,
275 max_request_body,
276 max_response_body,
277 })
278 }
279}
280
281pub struct RequestEnvelope {
288 pub method: String,
289 pub path: String,
290 pub query: String,
291 pub headers: http::HeaderMap,
292 pub body: bytes::Bytes,
293 pub reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
294}
295
296#[derive(Debug, Clone)]
298pub struct HttpReply {
299 pub status: u16,
300 pub headers: Vec<(String, String)>,
301 pub body: bytes::Bytes,
302}
303
304pub type DispatchTable = Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
310
311struct ServerHandle {
313 dispatch: DispatchTable,
314 _task: tokio::task::JoinHandle<()>,
316}
317
318pub struct ServerRegistry {
320 inner: Mutex<HashMap<u16, ServerHandle>>,
321}
322
323impl ServerRegistry {
324 pub fn global() -> &'static Self {
326 static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
327 INSTANCE.get_or_init(|| ServerRegistry {
328 inner: Mutex::new(HashMap::new()),
329 })
330 }
331
332 pub async fn get_or_spawn(
335 &'static self,
336 host: &str,
337 port: u16,
338 max_request_body: usize,
339 ) -> Result<DispatchTable, CamelError> {
340 {
342 let guard = self.inner.lock().map_err(|_| {
343 CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
344 })?;
345 if let Some(handle) = guard.get(&port) {
346 return Ok(Arc::clone(&handle.dispatch));
347 }
348 }
349
350 let addr = format!("{}:{}", host, port);
352 let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
353 CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
354 })?;
355
356 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
357 let dispatch_for_server = Arc::clone(&dispatch);
358 let task = tokio::spawn(run_axum_server(
359 listener,
360 dispatch_for_server,
361 max_request_body,
362 ));
363
364 let mut guard = self.inner.lock().map_err(|_| {
366 CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
367 })?;
368 if let Some(existing) = guard.get(&port) {
371 task.abort();
372 return Ok(Arc::clone(&existing.dispatch));
373 }
374 guard.insert(
375 port,
376 ServerHandle {
377 dispatch: Arc::clone(&dispatch),
378 _task: task,
379 },
380 );
381 Ok(dispatch)
382 }
383}
384
385use axum::{
390 Router,
391 body::Body as AxumBody,
392 extract::{Request, State},
393 http::{Response, StatusCode},
394 response::IntoResponse,
395};
396
397#[derive(Clone)]
398struct AppState {
399 dispatch: DispatchTable,
400 max_request_body: usize,
401}
402
403async fn run_axum_server(
404 listener: tokio::net::TcpListener,
405 dispatch: DispatchTable,
406 max_request_body: usize,
407) {
408 let state = AppState {
409 dispatch,
410 max_request_body,
411 };
412 let app = Router::new().fallback(dispatch_handler).with_state(state);
413
414 axum::serve(listener, app).await.unwrap_or_else(|e| {
415 tracing::error!(error = %e, "Axum server error");
416 });
417}
418
419async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
420 let method = req.method().to_string();
421 let path = req.uri().path().to_string();
422 let query = req.uri().query().unwrap_or("").to_string();
423 let headers = req.headers().clone();
424
425 let body_bytes = match axum::body::to_bytes(req.into_body(), state.max_request_body).await {
426 Ok(b) => b,
427 Err(_) => {
428 return Response::builder()
429 .status(StatusCode::BAD_REQUEST)
430 .body(AxumBody::empty())
431 .expect(
432 "Response::builder() with a known-valid status code and body is infallible",
433 );
434 }
435 };
436
437 let sender = {
439 let table = state.dispatch.read().await;
440 table.get(&path).cloned()
441 };
442
443 let Some(sender) = sender else {
444 return Response::builder()
445 .status(StatusCode::NOT_FOUND)
446 .body(AxumBody::from("No consumer registered for this path"))
447 .expect("Response::builder() with a known-valid status code and body is infallible");
448 };
449
450 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
451 let envelope = RequestEnvelope {
452 method,
453 path,
454 query,
455 headers,
456 body: body_bytes,
457 reply_tx,
458 };
459
460 if sender.send(envelope).await.is_err() {
461 return Response::builder()
462 .status(StatusCode::SERVICE_UNAVAILABLE)
463 .body(AxumBody::from("Consumer unavailable"))
464 .expect("Response::builder() with a known-valid status code and body is infallible");
465 }
466
467 match reply_rx.await {
468 Ok(reply) => {
469 let status =
470 StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
471 let mut builder = Response::builder().status(status);
472 for (k, v) in &reply.headers {
473 builder = builder.header(k.as_str(), v.as_str());
474 }
475 builder
476 .body(AxumBody::from(reply.body))
477 .unwrap_or_else(|_| {
478 Response::builder()
479 .status(StatusCode::INTERNAL_SERVER_ERROR)
480 .body(AxumBody::from("Invalid response headers from consumer"))
481 .expect("Response::builder() with a known-valid status code and body is infallible")
482 })
483 }
484 Err(_) => Response::builder()
485 .status(StatusCode::INTERNAL_SERVER_ERROR)
486 .body(AxumBody::from("Pipeline error"))
487 .expect("Response::builder() with a known-valid status code and body is infallible"),
488 }
489}
490
491pub struct HttpConsumer {
496 config: HttpServerConfig,
497}
498
499impl HttpConsumer {
500 pub fn new(config: HttpServerConfig) -> Self {
501 Self { config }
502 }
503}
504
505#[async_trait::async_trait]
506impl Consumer for HttpConsumer {
507 async fn start(&mut self, ctx: camel_component::ConsumerContext) -> Result<(), CamelError> {
508 use camel_api::{Exchange, Message, body::Body};
509
510 let dispatch = ServerRegistry::global()
511 .get_or_spawn(
512 &self.config.host,
513 self.config.port,
514 self.config.max_request_body,
515 )
516 .await?;
517
518 let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
520 {
521 let mut table = dispatch.write().await;
522 table.insert(self.config.path.clone(), env_tx);
523 }
524
525 let path = self.config.path.clone();
526 let cancel_token = ctx.cancel_token();
527 let max_response_body = self.config.max_response_body;
528
529 loop {
530 tokio::select! {
531 _ = ctx.cancelled() => {
532 break;
533 }
534 envelope = env_rx.recv() => {
535 let Some(envelope) = envelope else { break; };
536
537 let mut msg = Message::default();
539
540 msg.set_header("CamelHttpMethod",
542 serde_json::Value::String(envelope.method.clone()));
543 msg.set_header("CamelHttpPath",
544 serde_json::Value::String(envelope.path.clone()));
545 msg.set_header("CamelHttpQuery",
546 serde_json::Value::String(envelope.query.clone()));
547
548 for (k, v) in &envelope.headers {
550 if let Ok(val_str) = v.to_str() {
551 msg.set_header(
552 k.as_str(),
553 serde_json::Value::String(val_str.to_string()),
554 );
555 }
556 }
557
558 if !envelope.body.is_empty() {
560 match std::str::from_utf8(&envelope.body) {
561 Ok(text) => msg.body = Body::Text(text.to_string()),
562 Err(_) => msg.body = Body::Bytes(envelope.body.clone()),
563 }
564 }
565
566 let exchange = Exchange::new(msg);
567 let reply_tx = envelope.reply_tx;
568 let sender = ctx.sender().clone();
569 let path_clone = path.clone();
570 let cancel = cancel_token.clone();
571
572 tokio::spawn(async move {
592 if cancel.is_cancelled() {
600 let _ = reply_tx.send(HttpReply {
601 status: 503,
602 headers: vec![],
603 body: bytes::Bytes::from("Service Unavailable"),
604 });
605 return;
606 }
607
608 let (tx, rx) = tokio::sync::oneshot::channel();
610 let envelope = camel_component::consumer::ExchangeEnvelope {
611 exchange,
612 reply_tx: Some(tx),
613 };
614
615 let result = match sender.send(envelope).await {
616 Ok(()) => rx.await.map_err(|_| camel_api::CamelError::ChannelClosed),
617 Err(_) => Err(camel_api::CamelError::ChannelClosed),
618 }
619 .and_then(|r| r);
620
621 let reply = match result {
622 Ok(out) => {
623 let status = out
624 .input
625 .header("CamelHttpResponseCode")
626 .and_then(|v| v.as_u64())
627 .map(|s| s as u16)
628 .unwrap_or(200);
629
630 let body_bytes = match out.input.body {
631 Body::Empty => bytes::Bytes::new(),
632 Body::Bytes(b) => b,
633 Body::Text(s) => bytes::Bytes::from(s.into_bytes()),
634 Body::Json(v) => bytes::Bytes::from(v.to_string().into_bytes()),
635 Body::Stream(_) => {
636 match out.input.body.into_bytes(max_response_body).await {
638 Ok(b) => b,
639 Err(e) => {
640 debug!(error = %e, "Failed to materialize stream body for HTTP reply");
641 return;
642 }
643 }
644 }
645 };
646
647 let resp_headers: Vec<(String, String)> = out
648 .input
649 .headers
650 .iter()
651 .filter(|(k, _)| !k.starts_with("Camel"))
653 .filter(|(k, _)| {
656 !matches!(
657 k.to_lowercase().as_str(),
658 "content-length" | "content-type" | "transfer-encoding" | "connection" | "cache-control" | "date" | "pragma" | "trailer" | "upgrade" | "via" | "warning" | "host" | "user-agent" | "accept" | "accept-encoding" | "accept-language" | "accept-charset" | "authorization" | "proxy-authorization" | "cookie" | "expect" | "from" | "if-match" | "if-modified-since" | "if-none-match" | "if-range" | "if-unmodified-since" | "max-forwards" | "proxy-connection" | "range" | "referer" | "te" )
693 })
694 .filter_map(|(k, v)| {
695 v.as_str().map(|s| (k.clone(), s.to_string()))
696 })
697 .collect();
698
699 HttpReply {
700 status,
701 headers: resp_headers,
702 body: body_bytes,
703 }
704 }
705 Err(e) => {
706 tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
707 HttpReply {
708 status: 500,
709 headers: vec![],
710 body: bytes::Bytes::from("Internal Server Error"),
711 }
712 }
713 };
714
715 let _ = reply_tx.send(reply);
717 });
718 }
719 }
720 }
721
722 {
724 let mut table = dispatch.write().await;
725 table.remove(&path);
726 }
727
728 Ok(())
729 }
730
731 async fn stop(&mut self) -> Result<(), CamelError> {
732 Ok(())
733 }
734
735 fn concurrency_model(&self) -> camel_component::ConcurrencyModel {
736 camel_component::ConcurrencyModel::Concurrent { max: None }
737 }
738}
739
740pub struct HttpComponent;
745
746impl HttpComponent {
747 pub fn new() -> Self {
748 Self
749 }
750}
751
752impl Default for HttpComponent {
753 fn default() -> Self {
754 Self::new()
755 }
756}
757
758impl Component for HttpComponent {
759 fn scheme(&self) -> &str {
760 "http"
761 }
762
763 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
764 let config = HttpConfig::from_uri(uri)?;
765 let server_config = HttpServerConfig::from_uri(uri)?;
766 let client = build_client(&config)?;
767 Ok(Box::new(HttpEndpoint {
768 uri: uri.to_string(),
769 config,
770 server_config,
771 client,
772 }))
773 }
774}
775
776pub struct HttpsComponent;
777
778impl HttpsComponent {
779 pub fn new() -> Self {
780 Self
781 }
782}
783
784impl Default for HttpsComponent {
785 fn default() -> Self {
786 Self::new()
787 }
788}
789
790impl Component for HttpsComponent {
791 fn scheme(&self) -> &str {
792 "https"
793 }
794
795 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
796 let config = HttpConfig::from_uri(uri)?;
797 let server_config = HttpServerConfig::from_uri(uri)?;
798 let client = build_client(&config)?;
799 Ok(Box::new(HttpEndpoint {
800 uri: uri.to_string(),
801 config,
802 server_config,
803 client,
804 }))
805 }
806}
807
808fn build_client(config: &HttpConfig) -> Result<reqwest::Client, CamelError> {
809 let mut builder = reqwest::Client::builder().connect_timeout(config.connect_timeout);
810
811 if !config.follow_redirects {
812 builder = builder.redirect(reqwest::redirect::Policy::none());
813 }
814
815 builder.build().map_err(|e| {
816 CamelError::EndpointCreationFailed(format!("Failed to build HTTP client: {e}"))
817 })
818}
819
820struct HttpEndpoint {
825 uri: String,
826 config: HttpConfig,
827 server_config: HttpServerConfig,
828 client: reqwest::Client,
829}
830
831impl Endpoint for HttpEndpoint {
832 fn uri(&self) -> &str {
833 &self.uri
834 }
835
836 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
837 Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
838 }
839
840 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
841 Ok(BoxProcessor::new(HttpProducer {
842 config: Arc::new(self.config.clone()),
843 client: self.client.clone(),
844 }))
845 }
846}
847
848fn validate_url_for_ssrf(url: &str, config: &HttpConfig) -> Result<(), CamelError> {
853 let parsed = url::Url::parse(url)
854 .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
855
856 if let Some(host) = parsed.host_str()
858 && config.blocked_hosts.iter().any(|blocked| host == blocked)
859 {
860 return Err(CamelError::ProcessorError(format!(
861 "Host '{}' is blocked",
862 host
863 )));
864 }
865
866 if !config.allow_private_ips
868 && let Some(host) = parsed.host()
869 {
870 match host {
871 url::Host::Ipv4(ip) => {
872 if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
873 return Err(CamelError::ProcessorError(format!(
874 "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
875 ip
876 )));
877 }
878 }
879 url::Host::Ipv6(ip) => {
880 if ip.is_loopback() {
881 return Err(CamelError::ProcessorError(format!(
882 "Loopback IP '{}' not allowed",
883 ip
884 )));
885 }
886 }
887 url::Host::Domain(domain) => {
888 let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
890 if blocked_domains.contains(&domain) {
891 return Err(CamelError::ProcessorError(format!(
892 "Domain '{}' is not allowed",
893 domain
894 )));
895 }
896 }
897 }
898 }
899
900 Ok(())
901}
902
903#[derive(Clone)]
908struct HttpProducer {
909 config: Arc<HttpConfig>,
910 client: reqwest::Client,
911}
912
913impl HttpProducer {
914 fn resolve_method(exchange: &Exchange, config: &HttpConfig) -> String {
915 if let Some(ref method) = config.http_method {
916 return method.to_uppercase();
917 }
918 if let Some(method) = exchange
919 .input
920 .header("CamelHttpMethod")
921 .and_then(|v| v.as_str())
922 {
923 return method.to_uppercase();
924 }
925 if !exchange.input.body.is_empty() {
926 return "POST".to_string();
927 }
928 "GET".to_string()
929 }
930
931 fn resolve_url(exchange: &Exchange, config: &HttpConfig) -> String {
932 if let Some(uri) = exchange
933 .input
934 .header("CamelHttpUri")
935 .and_then(|v| v.as_str())
936 {
937 let mut url = uri.to_string();
938 if let Some(path) = exchange
939 .input
940 .header("CamelHttpPath")
941 .and_then(|v| v.as_str())
942 {
943 if !url.ends_with('/') && !path.starts_with('/') {
944 url.push('/');
945 }
946 url.push_str(path);
947 }
948 if let Some(query) = exchange
949 .input
950 .header("CamelHttpQuery")
951 .and_then(|v| v.as_str())
952 {
953 url.push('?');
954 url.push_str(query);
955 }
956 return url;
957 }
958
959 let mut url = config.base_url.clone();
960
961 if let Some(path) = exchange
962 .input
963 .header("CamelHttpPath")
964 .and_then(|v| v.as_str())
965 {
966 if !url.ends_with('/') && !path.starts_with('/') {
967 url.push('/');
968 }
969 url.push_str(path);
970 }
971
972 if let Some(query) = exchange
973 .input
974 .header("CamelHttpQuery")
975 .and_then(|v| v.as_str())
976 {
977 url.push('?');
978 url.push_str(query);
979 } else if !config.query_params.is_empty() {
980 url.push('?');
982 let query_string: String = config
983 .query_params
984 .iter()
985 .map(|(k, v)| format!("{k}={v}"))
986 .collect::<Vec<_>>()
987 .join("&");
988 url.push_str(&query_string);
989 }
990
991 url
992 }
993
994 fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
995 status >= range.0 && status <= range.1
996 }
997}
998
999impl Service<Exchange> for HttpProducer {
1000 type Response = Exchange;
1001 type Error = CamelError;
1002 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1003
1004 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1005 Poll::Ready(Ok(()))
1006 }
1007
1008 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1009 let config = self.config.clone();
1010 let client = self.client.clone();
1011
1012 Box::pin(async move {
1013 let method_str = HttpProducer::resolve_method(&exchange, &config);
1014 let url = HttpProducer::resolve_url(&exchange, &config);
1015
1016 validate_url_for_ssrf(&url, &config)?;
1018
1019 debug!(
1020 correlation_id = %exchange.correlation_id(),
1021 method = %method_str,
1022 url = %url,
1023 "HTTP request"
1024 );
1025
1026 let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1027 CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1028 })?;
1029
1030 let mut request = client.request(method, &url);
1031
1032 if let Some(timeout) = config.response_timeout {
1033 request = request.timeout(timeout);
1034 }
1035
1036 for (key, value) in &exchange.input.headers {
1037 if !key.starts_with("Camel")
1038 && let Some(val_str) = value.as_str()
1039 && let (Ok(name), Ok(val)) = (
1040 reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1041 reqwest::header::HeaderValue::from_str(val_str),
1042 )
1043 {
1044 request = request.header(name, val);
1045 }
1046 }
1047
1048 match exchange.input.body {
1049 Body::Stream(ref s) => {
1050 let mut stream_lock = s.stream.lock().await;
1051 if let Some(stream) = stream_lock.take() {
1052 request = request.body(reqwest::Body::wrap_stream(stream));
1053 } else {
1054 return Err(CamelError::AlreadyConsumed);
1055 }
1056 }
1057 _ => {
1058 let body = std::mem::take(&mut exchange.input.body);
1060 let bytes = body.into_bytes(config.max_body_size).await?;
1061 if !bytes.is_empty() {
1062 request = request.body(bytes);
1063 }
1064 }
1065 }
1066
1067 let response = request
1068 .send()
1069 .await
1070 .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1071
1072 let status_code = response.status().as_u16();
1073 let status_text = response
1074 .status()
1075 .canonical_reason()
1076 .unwrap_or("Unknown")
1077 .to_string();
1078
1079 for (key, value) in response.headers() {
1080 if let Ok(val_str) = value.to_str() {
1081 exchange
1082 .input
1083 .set_header(key.as_str(), serde_json::Value::String(val_str.to_string()));
1084 }
1085 }
1086
1087 exchange.input.set_header(
1088 "CamelHttpResponseCode",
1089 serde_json::Value::Number(status_code.into()),
1090 );
1091 exchange.input.set_header(
1092 "CamelHttpResponseText",
1093 serde_json::Value::String(status_text.clone()),
1094 );
1095
1096 let response_body = response.bytes().await.map_err(|e| {
1097 CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1098 })?;
1099
1100 if config.throw_exception_on_failure
1101 && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1102 {
1103 return Err(CamelError::HttpOperationFailed {
1104 status_code,
1105 status_text,
1106 response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1107 });
1108 }
1109
1110 if !response_body.is_empty() {
1111 exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1112 }
1113
1114 debug!(
1115 correlation_id = %exchange.correlation_id(),
1116 status = status_code,
1117 url = %url,
1118 "HTTP response"
1119 );
1120 Ok(exchange)
1121 })
1122 }
1123}
1124
1125#[cfg(test)]
1126mod tests {
1127 use super::*;
1128 use camel_api::Message;
1129 use std::sync::Arc;
1130 use std::time::Duration;
1131 use tokio::sync::Mutex;
1132
1133 struct NullRouteController;
1135 #[async_trait::async_trait]
1136 impl camel_api::RouteController for NullRouteController {
1137 async fn start_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1138 Ok(())
1139 }
1140 async fn stop_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1141 Ok(())
1142 }
1143 async fn restart_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1144 Ok(())
1145 }
1146 async fn suspend_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1147 Ok(())
1148 }
1149 async fn resume_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
1150 Ok(())
1151 }
1152 fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
1153 None
1154 }
1155 async fn start_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
1156 Ok(())
1157 }
1158 async fn stop_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
1159 Ok(())
1160 }
1161 }
1162
1163 fn test_producer_ctx() -> ProducerContext {
1164 ProducerContext::new(Arc::new(Mutex::new(NullRouteController)))
1165 }
1166
1167 #[test]
1168 fn test_http_config_defaults() {
1169 let config = HttpConfig::from_uri("http://localhost:8080/api").unwrap();
1170 assert_eq!(config.base_url, "http://localhost:8080/api");
1171 assert!(config.http_method.is_none());
1172 assert!(config.throw_exception_on_failure);
1173 assert_eq!(config.ok_status_code_range, (200, 299));
1174 assert!(!config.follow_redirects);
1175 assert_eq!(config.connect_timeout, Duration::from_millis(30000));
1176 assert!(config.response_timeout.is_none());
1177 }
1178
1179 #[test]
1180 fn test_http_config_with_options() {
1181 let config = HttpConfig::from_uri(
1182 "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1183 ).unwrap();
1184 assert_eq!(config.base_url, "https://api.example.com/v1");
1185 assert_eq!(config.http_method, Some("PUT".to_string()));
1186 assert!(!config.throw_exception_on_failure);
1187 assert!(config.follow_redirects);
1188 assert_eq!(config.connect_timeout, Duration::from_millis(5000));
1189 assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1190 }
1191
1192 #[test]
1193 fn test_http_config_ok_status_range() {
1194 let config =
1195 HttpConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1196 assert_eq!(config.ok_status_code_range, (200, 204));
1197 }
1198
1199 #[test]
1200 fn test_http_config_wrong_scheme() {
1201 let result = HttpConfig::from_uri("file:/tmp");
1202 assert!(result.is_err());
1203 }
1204
1205 #[test]
1206 fn test_http_component_scheme() {
1207 let component = HttpComponent::new();
1208 assert_eq!(component.scheme(), "http");
1209 }
1210
1211 #[test]
1212 fn test_https_component_scheme() {
1213 let component = HttpsComponent::new();
1214 assert_eq!(component.scheme(), "https");
1215 }
1216
1217 #[test]
1218 fn test_http_endpoint_creates_consumer() {
1219 let component = HttpComponent::new();
1220 let endpoint = component
1221 .create_endpoint("http://0.0.0.0:19100/test")
1222 .unwrap();
1223 assert!(endpoint.create_consumer().is_ok());
1224 }
1225
1226 #[test]
1227 fn test_https_endpoint_creates_consumer() {
1228 let component = HttpsComponent::new();
1229 let endpoint = component
1230 .create_endpoint("https://0.0.0.0:8443/test")
1231 .unwrap();
1232 assert!(endpoint.create_consumer().is_ok());
1233 }
1234
1235 #[test]
1236 fn test_http_endpoint_creates_producer() {
1237 let ctx = test_producer_ctx();
1238 let component = HttpComponent::new();
1239 let endpoint = component.create_endpoint("http://localhost/api").unwrap();
1240 assert!(endpoint.create_producer(&ctx).is_ok());
1241 }
1242
1243 async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
1248 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1249 let addr = listener.local_addr().unwrap();
1250 let url = format!("http://127.0.0.1:{}", addr.port());
1251
1252 let handle = tokio::spawn(async move {
1253 loop {
1254 if let Ok((mut stream, _)) = listener.accept().await {
1255 tokio::spawn(async move {
1256 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1257 let mut buf = vec![0u8; 4096];
1258 let n = stream.read(&mut buf).await.unwrap_or(0);
1259 let request = String::from_utf8_lossy(&buf[..n]).to_string();
1260
1261 let method = request.split_whitespace().next().unwrap_or("GET");
1262
1263 let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
1264 let response = format!(
1265 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
1266 body.len(),
1267 body
1268 );
1269 let _ = stream.write_all(response.as_bytes()).await;
1270 });
1271 }
1272 }
1273 });
1274
1275 (url, handle)
1276 }
1277
1278 async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
1279 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1280 let addr = listener.local_addr().unwrap();
1281 let url = format!("http://127.0.0.1:{}", addr.port());
1282
1283 let handle = tokio::spawn(async move {
1284 loop {
1285 if let Ok((mut stream, _)) = listener.accept().await {
1286 let status = status;
1287 tokio::spawn(async move {
1288 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1289 let mut buf = vec![0u8; 4096];
1290 let _ = stream.read(&mut buf).await;
1291
1292 let status_text = match status {
1293 404 => "Not Found",
1294 500 => "Internal Server Error",
1295 _ => "Error",
1296 };
1297 let body = "error body";
1298 let response = format!(
1299 "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
1300 status,
1301 status_text,
1302 body.len(),
1303 body
1304 );
1305 let _ = stream.write_all(response.as_bytes()).await;
1306 });
1307 }
1308 }
1309 });
1310
1311 (url, handle)
1312 }
1313
1314 #[tokio::test]
1315 async fn test_http_producer_get_request() {
1316 use tower::ServiceExt;
1317
1318 let (url, _handle) = start_test_server().await;
1319 let ctx = test_producer_ctx();
1320
1321 let component = HttpComponent::new();
1322 let endpoint = component
1323 .create_endpoint(&format!("{url}/api/test?allowPrivateIps=true"))
1324 .unwrap();
1325 let producer = endpoint.create_producer(&ctx).unwrap();
1326
1327 let exchange = Exchange::new(Message::default());
1328 let result = producer.oneshot(exchange).await.unwrap();
1329
1330 let status = result
1331 .input
1332 .header("CamelHttpResponseCode")
1333 .and_then(|v| v.as_u64())
1334 .unwrap();
1335 assert_eq!(status, 200);
1336
1337 assert!(!result.input.body.is_empty());
1338 }
1339
1340 #[tokio::test]
1341 async fn test_http_producer_post_with_body() {
1342 use tower::ServiceExt;
1343
1344 let (url, _handle) = start_test_server().await;
1345 let ctx = test_producer_ctx();
1346
1347 let component = HttpComponent::new();
1348 let endpoint = component
1349 .create_endpoint(&format!("{url}/api/data?allowPrivateIps=true"))
1350 .unwrap();
1351 let producer = endpoint.create_producer(&ctx).unwrap();
1352
1353 let exchange = Exchange::new(Message::new("request body"));
1354 let result = producer.oneshot(exchange).await.unwrap();
1355
1356 let status = result
1357 .input
1358 .header("CamelHttpResponseCode")
1359 .and_then(|v| v.as_u64())
1360 .unwrap();
1361 assert_eq!(status, 200);
1362 }
1363
1364 #[tokio::test]
1365 async fn test_http_producer_method_from_header() {
1366 use tower::ServiceExt;
1367
1368 let (url, _handle) = start_test_server().await;
1369 let ctx = test_producer_ctx();
1370
1371 let component = HttpComponent::new();
1372 let endpoint = component
1373 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1374 .unwrap();
1375 let producer = endpoint.create_producer(&ctx).unwrap();
1376
1377 let mut exchange = Exchange::new(Message::default());
1378 exchange.input.set_header(
1379 "CamelHttpMethod",
1380 serde_json::Value::String("DELETE".to_string()),
1381 );
1382
1383 let result = producer.oneshot(exchange).await.unwrap();
1384 let status = result
1385 .input
1386 .header("CamelHttpResponseCode")
1387 .and_then(|v| v.as_u64())
1388 .unwrap();
1389 assert_eq!(status, 200);
1390 }
1391
1392 #[tokio::test]
1393 async fn test_http_producer_forced_method() {
1394 use tower::ServiceExt;
1395
1396 let (url, _handle) = start_test_server().await;
1397 let ctx = test_producer_ctx();
1398
1399 let component = HttpComponent::new();
1400 let endpoint = component
1401 .create_endpoint(&format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"))
1402 .unwrap();
1403 let producer = endpoint.create_producer(&ctx).unwrap();
1404
1405 let exchange = Exchange::new(Message::default());
1406 let result = producer.oneshot(exchange).await.unwrap();
1407
1408 let status = result
1409 .input
1410 .header("CamelHttpResponseCode")
1411 .and_then(|v| v.as_u64())
1412 .unwrap();
1413 assert_eq!(status, 200);
1414 }
1415
1416 #[tokio::test]
1417 async fn test_http_producer_throw_exception_on_failure() {
1418 use tower::ServiceExt;
1419
1420 let (url, _handle) = start_status_server(404).await;
1421 let ctx = test_producer_ctx();
1422
1423 let component = HttpComponent::new();
1424 let endpoint = component
1425 .create_endpoint(&format!("{url}/not-found?allowPrivateIps=true"))
1426 .unwrap();
1427 let producer = endpoint.create_producer(&ctx).unwrap();
1428
1429 let exchange = Exchange::new(Message::default());
1430 let result = producer.oneshot(exchange).await;
1431 assert!(result.is_err());
1432
1433 match result.unwrap_err() {
1434 CamelError::HttpOperationFailed { status_code, .. } => {
1435 assert_eq!(status_code, 404);
1436 }
1437 e => panic!("Expected HttpOperationFailed, got: {e}"),
1438 }
1439 }
1440
1441 #[tokio::test]
1442 async fn test_http_producer_no_throw_on_failure() {
1443 use tower::ServiceExt;
1444
1445 let (url, _handle) = start_status_server(500).await;
1446 let ctx = test_producer_ctx();
1447
1448 let component = HttpComponent::new();
1449 let endpoint = component
1450 .create_endpoint(&format!(
1451 "{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"
1452 ))
1453 .unwrap();
1454 let producer = endpoint.create_producer(&ctx).unwrap();
1455
1456 let exchange = Exchange::new(Message::default());
1457 let result = producer.oneshot(exchange).await.unwrap();
1458
1459 let status = result
1460 .input
1461 .header("CamelHttpResponseCode")
1462 .and_then(|v| v.as_u64())
1463 .unwrap();
1464 assert_eq!(status, 500);
1465 }
1466
1467 #[tokio::test]
1468 async fn test_http_producer_uri_override() {
1469 use tower::ServiceExt;
1470
1471 let (url, _handle) = start_test_server().await;
1472 let ctx = test_producer_ctx();
1473
1474 let component = HttpComponent::new();
1475 let endpoint = component
1476 .create_endpoint("http://localhost:1/does-not-exist?allowPrivateIps=true")
1477 .unwrap();
1478 let producer = endpoint.create_producer(&ctx).unwrap();
1479
1480 let mut exchange = Exchange::new(Message::default());
1481 exchange.input.set_header(
1482 "CamelHttpUri",
1483 serde_json::Value::String(format!("{url}/api")),
1484 );
1485
1486 let result = producer.oneshot(exchange).await.unwrap();
1487 let status = result
1488 .input
1489 .header("CamelHttpResponseCode")
1490 .and_then(|v| v.as_u64())
1491 .unwrap();
1492 assert_eq!(status, 200);
1493 }
1494
1495 #[tokio::test]
1496 async fn test_http_producer_response_headers_mapped() {
1497 use tower::ServiceExt;
1498
1499 let (url, _handle) = start_test_server().await;
1500 let ctx = test_producer_ctx();
1501
1502 let component = HttpComponent::new();
1503 let endpoint = component
1504 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1505 .unwrap();
1506 let producer = endpoint.create_producer(&ctx).unwrap();
1507
1508 let exchange = Exchange::new(Message::default());
1509 let result = producer.oneshot(exchange).await.unwrap();
1510
1511 assert!(
1512 result.input.header("content-type").is_some()
1513 || result.input.header("Content-Type").is_some()
1514 );
1515 assert!(result.input.header("CamelHttpResponseText").is_some());
1516 }
1517
1518 async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
1523 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1524 let addr = listener.local_addr().unwrap();
1525 let url = format!("http://127.0.0.1:{}", addr.port());
1526
1527 let handle = tokio::spawn(async move {
1528 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1529 loop {
1530 if let Ok((mut stream, _)) = listener.accept().await {
1531 tokio::spawn(async move {
1532 let mut buf = vec![0u8; 4096];
1533 let n = stream.read(&mut buf).await.unwrap_or(0);
1534 let request = String::from_utf8_lossy(&buf[..n]).to_string();
1535
1536 if request.contains("GET /final") {
1538 let body = r#"{"status":"final"}"#;
1539 let response = format!(
1540 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1541 body.len(),
1542 body
1543 );
1544 let _ = stream.write_all(response.as_bytes()).await;
1545 } else {
1546 let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
1548 let _ = stream.write_all(response.as_bytes()).await;
1549 }
1550 });
1551 }
1552 }
1553 });
1554
1555 (url, handle)
1556 }
1557
1558 #[tokio::test]
1559 async fn test_follow_redirects_false_does_not_follow() {
1560 use tower::ServiceExt;
1561
1562 let (url, _handle) = start_redirect_server().await;
1563 let ctx = test_producer_ctx();
1564
1565 let component = HttpComponent::new();
1566 let endpoint = component
1567 .create_endpoint(&format!(
1568 "{url}?followRedirects=false&throwExceptionOnFailure=false&allowPrivateIps=true"
1569 ))
1570 .unwrap();
1571 let producer = endpoint.create_producer(&ctx).unwrap();
1572
1573 let exchange = Exchange::new(Message::default());
1574 let result = producer.oneshot(exchange).await.unwrap();
1575
1576 let status = result
1578 .input
1579 .header("CamelHttpResponseCode")
1580 .and_then(|v| v.as_u64())
1581 .unwrap();
1582 assert_eq!(
1583 status, 302,
1584 "Should NOT follow redirect when followRedirects=false"
1585 );
1586 }
1587
1588 #[tokio::test]
1589 async fn test_follow_redirects_true_follows_redirect() {
1590 use tower::ServiceExt;
1591
1592 let (url, _handle) = start_redirect_server().await;
1593 let ctx = test_producer_ctx();
1594
1595 let component = HttpComponent::new();
1596 let endpoint = component
1597 .create_endpoint(&format!("{url}?followRedirects=true&allowPrivateIps=true"))
1598 .unwrap();
1599 let producer = endpoint.create_producer(&ctx).unwrap();
1600
1601 let exchange = Exchange::new(Message::default());
1602 let result = producer.oneshot(exchange).await.unwrap();
1603
1604 let status = result
1606 .input
1607 .header("CamelHttpResponseCode")
1608 .and_then(|v| v.as_u64())
1609 .unwrap();
1610 assert_eq!(
1611 status, 200,
1612 "Should follow redirect when followRedirects=true"
1613 );
1614 }
1615
1616 #[tokio::test]
1617 async fn test_query_params_forwarded_to_http_request() {
1618 use tower::ServiceExt;
1619
1620 let (url, _handle) = start_test_server().await;
1621 let ctx = test_producer_ctx();
1622
1623 let component = HttpComponent::new();
1624 let endpoint = component
1626 .create_endpoint(&format!(
1627 "{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"
1628 ))
1629 .unwrap();
1630 let producer = endpoint.create_producer(&ctx).unwrap();
1631
1632 let exchange = Exchange::new(Message::default());
1633 let result = producer.oneshot(exchange).await.unwrap();
1634
1635 let status = result
1638 .input
1639 .header("CamelHttpResponseCode")
1640 .and_then(|v| v.as_u64())
1641 .unwrap();
1642 assert_eq!(status, 200);
1643 }
1644
1645 #[tokio::test]
1646 async fn test_non_camel_query_params_are_forwarded() {
1647 let config = HttpConfig::from_uri(
1650 "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
1651 )
1652 .unwrap();
1653
1654 assert!(
1656 config.query_params.contains_key("apiKey"),
1657 "apiKey should be preserved"
1658 );
1659 assert!(
1660 config.query_params.contains_key("token"),
1661 "token should be preserved"
1662 );
1663 assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
1664 assert_eq!(config.query_params.get("token").unwrap(), "abc456");
1665
1666 assert!(
1668 !config.query_params.contains_key("httpMethod"),
1669 "httpMethod should not be forwarded"
1670 );
1671 }
1672
1673 #[tokio::test]
1678 async fn test_http_producer_blocks_metadata_endpoint() {
1679 use tower::ServiceExt;
1680
1681 let ctx = test_producer_ctx();
1682 let component = HttpComponent::new();
1683 let endpoint = component
1684 .create_endpoint("http://example.com/api?allowPrivateIps=false")
1685 .unwrap();
1686 let producer = endpoint.create_producer(&ctx).unwrap();
1687
1688 let mut exchange = Exchange::new(Message::default());
1689 exchange.input.set_header(
1690 "CamelHttpUri",
1691 serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
1692 );
1693
1694 let result = producer.oneshot(exchange).await;
1695 assert!(result.is_err(), "Should block AWS metadata endpoint");
1696
1697 let err = result.unwrap_err();
1698 assert!(
1699 err.to_string().contains("Private IP"),
1700 "Error should mention private IP blocking, got: {}",
1701 err
1702 );
1703 }
1704
1705 #[test]
1706 fn test_ssrf_config_defaults() {
1707 let config = HttpConfig::from_uri("http://example.com/api").unwrap();
1708 assert!(
1709 !config.allow_private_ips,
1710 "Private IPs should be blocked by default"
1711 );
1712 assert!(
1713 config.blocked_hosts.is_empty(),
1714 "Blocked hosts should be empty by default"
1715 );
1716 }
1717
1718 #[test]
1719 fn test_ssrf_config_allow_private_ips() {
1720 let config = HttpConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
1721 assert!(
1722 config.allow_private_ips,
1723 "Private IPs should be allowed when explicitly set"
1724 );
1725 }
1726
1727 #[test]
1728 fn test_ssrf_config_blocked_hosts() {
1729 let config =
1730 HttpConfig::from_uri("http://example.com/api?blockedHosts=evil.com,malware.net")
1731 .unwrap();
1732 assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
1733 }
1734
1735 #[tokio::test]
1736 async fn test_http_producer_blocks_localhost() {
1737 use tower::ServiceExt;
1738
1739 let ctx = test_producer_ctx();
1740 let component = HttpComponent::new();
1741 let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1742 let producer = endpoint.create_producer(&ctx).unwrap();
1743
1744 let mut exchange = Exchange::new(Message::default());
1745 exchange.input.set_header(
1746 "CamelHttpUri",
1747 serde_json::Value::String("http://localhost:8080/internal".to_string()),
1748 );
1749
1750 let result = producer.oneshot(exchange).await;
1751 assert!(result.is_err(), "Should block localhost");
1752 }
1753
1754 #[tokio::test]
1755 async fn test_http_producer_blocks_loopback_ip() {
1756 use tower::ServiceExt;
1757
1758 let ctx = test_producer_ctx();
1759 let component = HttpComponent::new();
1760 let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1761 let producer = endpoint.create_producer(&ctx).unwrap();
1762
1763 let mut exchange = Exchange::new(Message::default());
1764 exchange.input.set_header(
1765 "CamelHttpUri",
1766 serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
1767 );
1768
1769 let result = producer.oneshot(exchange).await;
1770 assert!(result.is_err(), "Should block loopback IP");
1771 }
1772
1773 #[tokio::test]
1774 async fn test_http_producer_allows_private_ip_when_enabled() {
1775 use tower::ServiceExt;
1776
1777 let ctx = test_producer_ctx();
1778 let component = HttpComponent::new();
1779 let endpoint = component
1782 .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true")
1783 .unwrap();
1784 let producer = endpoint.create_producer(&ctx).unwrap();
1785
1786 let exchange = Exchange::new(Message::default());
1787
1788 let result = producer.oneshot(exchange).await;
1791 if let Err(ref e) = result {
1793 let err_str = e.to_string();
1794 assert!(
1795 !err_str.contains("Private IP") && !err_str.contains("not allowed"),
1796 "Should not be SSRF error, got: {}",
1797 err_str
1798 );
1799 }
1800 }
1801
1802 #[test]
1807 fn test_http_server_config_parse() {
1808 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
1809 assert_eq!(cfg.host, "0.0.0.0");
1810 assert_eq!(cfg.port, 8080);
1811 assert_eq!(cfg.path, "/orders");
1812 }
1813
1814 #[test]
1815 fn test_http_server_config_default_path() {
1816 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
1817 assert_eq!(cfg.path, "/");
1818 }
1819
1820 #[test]
1821 fn test_http_server_config_wrong_scheme() {
1822 assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
1823 }
1824
1825 #[test]
1826 fn test_http_server_config_invalid_port() {
1827 assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
1828 }
1829
1830 #[test]
1831 fn test_request_envelope_and_reply_are_send() {
1832 fn assert_send<T: Send>() {}
1833 assert_send::<RequestEnvelope>();
1834 assert_send::<HttpReply>();
1835 }
1836
1837 #[test]
1842 fn test_server_registry_global_is_singleton() {
1843 let r1 = ServerRegistry::global();
1844 let r2 = ServerRegistry::global();
1845 assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
1846 }
1847
1848 #[tokio::test]
1853 async fn test_dispatch_handler_returns_404_for_unknown_path() {
1854 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
1855 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1857 let port = listener.local_addr().unwrap().port();
1858 tokio::spawn(run_axum_server(listener, dispatch, 2 * 1024 * 1024));
1859
1860 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1862
1863 let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
1864 .await
1865 .unwrap();
1866 assert_eq!(resp.status().as_u16(), 404);
1867 }
1868
1869 #[tokio::test]
1874 async fn test_http_consumer_start_registers_path() {
1875 use camel_component::ConsumerContext;
1876
1877 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1879 let port = listener.local_addr().unwrap().port();
1880 drop(listener); let consumer_cfg = HttpServerConfig {
1883 host: "127.0.0.1".to_string(),
1884 port,
1885 path: "/ping".to_string(),
1886 max_request_body: 2 * 1024 * 1024,
1887 max_response_body: 10 * 1024 * 1024,
1888 };
1889 let mut consumer = HttpConsumer::new(consumer_cfg);
1890
1891 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component::ExchangeEnvelope>(16);
1892 let token = tokio_util::sync::CancellationToken::new();
1893 let ctx = ConsumerContext::new(tx, token.clone());
1894
1895 tokio::spawn(async move {
1896 consumer.start(ctx).await.unwrap();
1897 });
1898
1899 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1900
1901 let client = reqwest::Client::new();
1902 let resp_future = client
1903 .post(format!("http://127.0.0.1:{port}/ping"))
1904 .body("hello world")
1905 .send();
1906
1907 let (http_result, _) = tokio::join!(resp_future, async {
1908 if let Some(mut envelope) = rx.recv().await {
1909 envelope.exchange.input.set_header(
1911 "CamelHttpResponseCode",
1912 serde_json::Value::Number(201.into()),
1913 );
1914 if let Some(reply_tx) = envelope.reply_tx {
1915 let _ = reply_tx.send(Ok(envelope.exchange));
1916 }
1917 }
1918 });
1919
1920 let resp = http_result.unwrap();
1921 assert_eq!(resp.status().as_u16(), 201);
1922
1923 token.cancel();
1924 }
1925
1926 #[tokio::test]
1931 async fn test_integration_single_consumer_round_trip() {
1932 use camel_component::{ConsumerContext, ExchangeEnvelope};
1933
1934 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1936 let port = listener.local_addr().unwrap().port();
1937 drop(listener); let component = HttpComponent::new();
1940 let endpoint = component
1941 .create_endpoint(&format!("http://127.0.0.1:{port}/echo"))
1942 .unwrap();
1943 let mut consumer = endpoint.create_consumer().unwrap();
1944
1945 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
1946 let token = tokio_util::sync::CancellationToken::new();
1947 let ctx = ConsumerContext::new(tx, token.clone());
1948
1949 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
1950 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1951
1952 let client = reqwest::Client::new();
1953 let send_fut = client
1954 .post(format!("http://127.0.0.1:{port}/echo"))
1955 .header("Content-Type", "text/plain")
1956 .body("ping")
1957 .send();
1958
1959 let (http_result, _) = tokio::join!(send_fut, async {
1960 if let Some(mut envelope) = rx.recv().await {
1961 assert_eq!(
1962 envelope.exchange.input.header("CamelHttpMethod"),
1963 Some(&serde_json::Value::String("POST".into()))
1964 );
1965 assert_eq!(
1966 envelope.exchange.input.header("CamelHttpPath"),
1967 Some(&serde_json::Value::String("/echo".into()))
1968 );
1969 envelope.exchange.input.body = camel_api::body::Body::Text("pong".to_string());
1970 if let Some(reply_tx) = envelope.reply_tx {
1971 let _ = reply_tx.send(Ok(envelope.exchange));
1972 }
1973 }
1974 });
1975
1976 let resp = http_result.unwrap();
1977 assert_eq!(resp.status().as_u16(), 200);
1978 let body = resp.text().await.unwrap();
1979 assert_eq!(body, "pong");
1980
1981 token.cancel();
1982 }
1983
1984 #[tokio::test]
1985 async fn test_integration_two_consumers_shared_port() {
1986 use camel_component::{ConsumerContext, ExchangeEnvelope};
1987
1988 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1990 let port = listener.local_addr().unwrap().port();
1991 drop(listener);
1992
1993 let component = HttpComponent::new();
1994
1995 let endpoint_a = component
1997 .create_endpoint(&format!("http://127.0.0.1:{port}/hello"))
1998 .unwrap();
1999 let mut consumer_a = endpoint_a.create_consumer().unwrap();
2000
2001 let endpoint_b = component
2003 .create_endpoint(&format!("http://127.0.0.1:{port}/world"))
2004 .unwrap();
2005 let mut consumer_b = endpoint_b.create_consumer().unwrap();
2006
2007 let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2008 let token_a = tokio_util::sync::CancellationToken::new();
2009 let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
2010
2011 let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2012 let token_b = tokio_util::sync::CancellationToken::new();
2013 let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
2014
2015 tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
2016 tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
2017 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2018
2019 let client = reqwest::Client::new();
2020
2021 let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
2023 let (resp_hello, _) = tokio::join!(fut_hello, async {
2024 if let Some(mut envelope) = rx_a.recv().await {
2025 envelope.exchange.input.body =
2026 camel_api::body::Body::Text("hello-response".to_string());
2027 if let Some(reply_tx) = envelope.reply_tx {
2028 let _ = reply_tx.send(Ok(envelope.exchange));
2029 }
2030 }
2031 });
2032
2033 let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
2035 let (resp_world, _) = tokio::join!(fut_world, async {
2036 if let Some(mut envelope) = rx_b.recv().await {
2037 envelope.exchange.input.body =
2038 camel_api::body::Body::Text("world-response".to_string());
2039 if let Some(reply_tx) = envelope.reply_tx {
2040 let _ = reply_tx.send(Ok(envelope.exchange));
2041 }
2042 }
2043 });
2044
2045 let body_a = resp_hello.unwrap().text().await.unwrap();
2046 let body_b = resp_world.unwrap().text().await.unwrap();
2047
2048 assert_eq!(body_a, "hello-response");
2049 assert_eq!(body_b, "world-response");
2050
2051 token_a.cancel();
2052 token_b.cancel();
2053 }
2054
2055 #[tokio::test]
2056 async fn test_integration_unregistered_path_returns_404() {
2057 use camel_component::{ConsumerContext, ExchangeEnvelope};
2058
2059 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2061 let port = listener.local_addr().unwrap().port();
2062 drop(listener);
2063
2064 let component = HttpComponent::new();
2065 let endpoint = component
2066 .create_endpoint(&format!("http://127.0.0.1:{port}/registered"))
2067 .unwrap();
2068 let mut consumer = endpoint.create_consumer().unwrap();
2069
2070 let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2071 let token = tokio_util::sync::CancellationToken::new();
2072 let ctx = ConsumerContext::new(tx, token.clone());
2073
2074 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2075 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2076
2077 let client = reqwest::Client::new();
2078 let resp = client
2079 .get(format!("http://127.0.0.1:{port}/not-there"))
2080 .send()
2081 .await
2082 .unwrap();
2083 assert_eq!(resp.status().as_u16(), 404);
2084
2085 token.cancel();
2086 }
2087
2088 #[test]
2089 fn test_http_consumer_declares_concurrent() {
2090 use camel_component::ConcurrencyModel;
2091
2092 let config = HttpServerConfig {
2093 host: "127.0.0.1".to_string(),
2094 port: 19999,
2095 path: "/test".to_string(),
2096 max_request_body: 2 * 1024 * 1024,
2097 max_response_body: 10 * 1024 * 1024,
2098 };
2099 let consumer = HttpConsumer::new(config);
2100 assert_eq!(
2101 consumer.concurrency_model(),
2102 ConcurrencyModel::Concurrent { max: None }
2103 );
2104 }
2105}