1pub mod config;
2pub use config::HttpConfig;
3
4use std::collections::HashMap;
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::{Arc, Mutex, OnceLock};
8use std::task::{Context, Poll};
9use std::time::Duration;
10
11use tokio::sync::RwLock;
12use tower::Service;
13use tracing::debug;
14
15use axum::body::BodyDataStream;
16use camel_api::{
17 BoxProcessor, CamelError, Exchange,
18 body::{Body, StreamBody, StreamMetadata},
19};
20use camel_component::{Component, Consumer, Endpoint, ProducerContext};
21use camel_endpoint::{UriComponents, UriConfig, parse_uri};
22use futures::TryStreamExt;
23use futures::stream::BoxStream;
24
25#[derive(Debug, Clone)]
86pub struct HttpEndpointConfig {
87 pub base_url: String,
88 pub http_method: Option<String>,
89 pub throw_exception_on_failure: bool,
90 pub ok_status_code_range: (u16, u16),
91 pub follow_redirects: bool,
92 pub connect_timeout: Duration,
93 pub response_timeout: Option<Duration>,
94 pub query_params: HashMap<String, String>,
95 pub allow_private_ips: bool,
97 pub blocked_hosts: Vec<String>,
98 pub max_body_size: usize,
100}
101
102const HTTP_CAMEL_OPTIONS: &[&str] = &[
104 "httpMethod",
105 "throwExceptionOnFailure",
106 "okStatusCodeRange",
107 "followRedirects",
108 "connectTimeout",
109 "responseTimeout",
110 "allowPrivateIps",
111 "blockedHosts",
112 "maxBodySize",
113];
114
115impl UriConfig for HttpEndpointConfig {
116 fn scheme() -> &'static str {
118 "http"
119 }
120
121 fn from_uri(uri: &str) -> Result<Self, CamelError> {
122 let parts = parse_uri(uri)?;
123 Self::from_components(parts)
124 }
125
126 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
127 if parts.scheme != "http" && parts.scheme != "https" {
129 return Err(CamelError::InvalidUri(format!(
130 "expected scheme 'http' or 'https', got '{}'",
131 parts.scheme
132 )));
133 }
134
135 let base_url = format!("{}:{}", parts.scheme, parts.path);
138
139 let http_method = parts.params.get("httpMethod").cloned();
140
141 let throw_exception_on_failure = parts
142 .params
143 .get("throwExceptionOnFailure")
144 .map(|v| v != "false")
145 .unwrap_or(true);
146
147 let ok_status_code_range = parts
149 .params
150 .get("okStatusCodeRange")
151 .and_then(|v| {
152 let (start, end) = v.split_once('-')?;
153 Some((start.parse::<u16>().ok()?, end.parse::<u16>().ok()?))
154 })
155 .unwrap_or((200, 299));
156
157 let follow_redirects = parts
158 .params
159 .get("followRedirects")
160 .map(|v| v == "true")
161 .unwrap_or(false);
162
163 let connect_timeout = parts
164 .params
165 .get("connectTimeout")
166 .and_then(|v| v.parse::<u64>().ok())
167 .map(Duration::from_millis)
168 .unwrap_or(Duration::from_millis(30000));
169
170 let response_timeout = parts
171 .params
172 .get("responseTimeout")
173 .and_then(|v| v.parse::<u64>().ok())
174 .map(Duration::from_millis);
175
176 let allow_private_ips = parts
178 .params
179 .get("allowPrivateIps")
180 .map(|v| v == "true")
181 .unwrap_or(false); let blocked_hosts = parts
185 .params
186 .get("blockedHosts")
187 .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
188 .unwrap_or_default();
189
190 let max_body_size = parts
191 .params
192 .get("maxBodySize")
193 .and_then(|v| v.parse::<usize>().ok())
194 .unwrap_or(10 * 1024 * 1024); let query_params: HashMap<String, String> = parts
198 .params
199 .into_iter()
200 .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
201 .collect();
202
203 Ok(Self {
204 base_url,
205 http_method,
206 throw_exception_on_failure,
207 ok_status_code_range,
208 follow_redirects,
209 connect_timeout,
210 response_timeout,
211 query_params,
212 allow_private_ips,
213 blocked_hosts,
214 max_body_size,
215 })
216 }
217}
218
219#[derive(Debug, Clone)]
225pub struct HttpServerConfig {
226 pub host: String,
228 pub port: u16,
230 pub path: String,
232 pub max_request_body: usize,
234 pub max_response_body: usize,
236}
237
238impl UriConfig for HttpServerConfig {
239 fn scheme() -> &'static str {
241 "http"
242 }
243
244 fn from_uri(uri: &str) -> Result<Self, CamelError> {
245 let parts = parse_uri(uri)?;
246 Self::from_components(parts)
247 }
248
249 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
250 if parts.scheme != "http" && parts.scheme != "https" {
252 return Err(CamelError::InvalidUri(format!(
253 "expected scheme 'http' or 'https', got '{}'",
254 parts.scheme
255 )));
256 }
257
258 let authority_and_path = parts.path.trim_start_matches('/');
261
262 let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
264 (&authority_and_path[..idx], &authority_and_path[idx..])
265 } else {
266 (authority_and_path, "/")
267 };
268
269 let path = if path_suffix.is_empty() {
270 "/"
271 } else {
272 path_suffix
273 }
274 .to_string();
275
276 let (host, port) = if let Some(colon) = authority.rfind(':') {
278 let port_str = &authority[colon + 1..];
279 match port_str.parse::<u16>() {
280 Ok(p) => (authority[..colon].to_string(), p),
281 Err(_) => {
282 return Err(CamelError::InvalidUri(format!(
283 "invalid port '{}' in authority",
284 port_str
285 )));
286 }
287 }
288 } else {
289 let default_port = if parts.scheme == "https" { 443 } else { 80 };
291 (authority.to_string(), default_port)
292 };
293
294 let max_request_body = parts
295 .params
296 .get("maxRequestBody")
297 .and_then(|v| v.parse::<usize>().ok())
298 .unwrap_or(2 * 1024 * 1024); let max_response_body = parts
301 .params
302 .get("maxResponseBody")
303 .and_then(|v| v.parse::<usize>().ok())
304 .unwrap_or(10 * 1024 * 1024); Ok(Self {
307 host,
308 port,
309 path,
310 max_request_body,
311 max_response_body,
312 })
313 }
314}
315
316pub(crate) enum HttpReplyBody {
322 Bytes(bytes::Bytes),
323 Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
324}
325
326pub(crate) struct RequestEnvelope {
329 pub(crate) method: String,
330 pub(crate) path: String,
331 pub(crate) query: String,
332 pub(crate) headers: http::HeaderMap,
333 pub(crate) body: StreamBody,
334 pub(crate) reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
335}
336
337pub(crate) struct HttpReply {
339 pub(crate) status: u16,
340 pub(crate) headers: Vec<(String, String)>,
341 pub(crate) body: HttpReplyBody,
342}
343
344pub(crate) type DispatchTable =
350 Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
351
352struct ServerHandle {
354 dispatch: DispatchTable,
355 _task: tokio::task::JoinHandle<()>,
357}
358
359pub struct ServerRegistry {
361 inner: Mutex<HashMap<u16, ServerHandle>>,
362}
363
364impl ServerRegistry {
365 pub fn global() -> &'static Self {
367 static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
368 INSTANCE.get_or_init(|| ServerRegistry {
369 inner: Mutex::new(HashMap::new()),
370 })
371 }
372
373 pub(crate) async fn get_or_spawn(
376 &'static self,
377 host: &str,
378 port: u16,
379 max_request_body: usize,
380 ) -> Result<DispatchTable, CamelError> {
381 {
383 let guard = self.inner.lock().map_err(|_| {
384 CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
385 })?;
386 if let Some(handle) = guard.get(&port) {
387 return Ok(Arc::clone(&handle.dispatch));
388 }
389 }
390
391 let addr = format!("{}:{}", host, port);
393 let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
394 CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
395 })?;
396
397 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
398 let dispatch_for_server = Arc::clone(&dispatch);
399 let task = tokio::spawn(run_axum_server(
400 listener,
401 dispatch_for_server,
402 max_request_body,
403 ));
404
405 let mut guard = self.inner.lock().map_err(|_| {
407 CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
408 })?;
409 if let Some(existing) = guard.get(&port) {
412 task.abort();
413 return Ok(Arc::clone(&existing.dispatch));
414 }
415 guard.insert(
416 port,
417 ServerHandle {
418 dispatch: Arc::clone(&dispatch),
419 _task: task,
420 },
421 );
422 Ok(dispatch)
423 }
424}
425
426use axum::{
431 Router,
432 body::Body as AxumBody,
433 extract::{Request, State},
434 http::{Response, StatusCode},
435 response::IntoResponse,
436};
437
438#[derive(Clone)]
439struct AppState {
440 dispatch: DispatchTable,
441 max_request_body: usize,
442}
443
444async fn run_axum_server(
445 listener: tokio::net::TcpListener,
446 dispatch: DispatchTable,
447 max_request_body: usize,
448) {
449 let state = AppState {
450 dispatch,
451 max_request_body,
452 };
453 let app = Router::new().fallback(dispatch_handler).with_state(state);
454
455 axum::serve(listener, app).await.unwrap_or_else(|e| {
456 tracing::error!(error = %e, "Axum server error");
457 });
458}
459
460async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
461 let method = req.method().to_string();
462 let path = req.uri().path().to_string();
463 let query = req.uri().query().unwrap_or("").to_string();
464 let headers = req.headers().clone();
465
466 let content_length: Option<u64> = headers
468 .get(http::header::CONTENT_LENGTH)
469 .and_then(|v| v.to_str().ok())
470 .and_then(|s| s.parse().ok());
471
472 if let Some(len) = content_length
473 && len > state.max_request_body as u64
474 {
475 return Response::builder()
476 .status(StatusCode::PAYLOAD_TOO_LARGE)
477 .body(AxumBody::from("Request body exceeds configured limit"))
478 .expect("infallible");
479 }
480
481 let content_type = headers
483 .get(http::header::CONTENT_TYPE)
484 .and_then(|v| v.to_str().ok())
485 .map(|s| s.to_string());
486
487 let data_stream: BodyDataStream = req.into_body().into_data_stream();
488 let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
489 let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
490
491 let stream_body = StreamBody {
492 stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
493 metadata: StreamMetadata {
494 size_hint: content_length,
495 content_type,
496 origin: None,
497 },
498 };
499
500 let sender = {
502 let table = state.dispatch.read().await;
503 table.get(&path).cloned()
504 };
505 let Some(sender) = sender else {
506 return Response::builder()
507 .status(StatusCode::NOT_FOUND)
508 .body(AxumBody::from("No consumer registered for this path"))
509 .expect("infallible");
510 };
511
512 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
513 let envelope = RequestEnvelope {
514 method,
515 path,
516 query,
517 headers,
518 body: stream_body,
519 reply_tx,
520 };
521
522 if sender.send(envelope).await.is_err() {
523 return Response::builder()
524 .status(StatusCode::SERVICE_UNAVAILABLE)
525 .body(AxumBody::from("Consumer unavailable"))
526 .expect("infallible");
527 }
528
529 match reply_rx.await {
530 Ok(reply) => {
531 let status =
532 StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
533 let mut builder = Response::builder().status(status);
534 for (k, v) in &reply.headers {
535 builder = builder.header(k.as_str(), v.as_str());
536 }
537 match reply.body {
538 HttpReplyBody::Bytes(b) => builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
539 Response::builder()
540 .status(StatusCode::INTERNAL_SERVER_ERROR)
541 .body(AxumBody::from("Invalid response headers from consumer"))
542 .expect("infallible")
543 }),
544 HttpReplyBody::Stream(stream) => builder
545 .body(AxumBody::from_stream(stream))
546 .unwrap_or_else(|_| {
547 Response::builder()
548 .status(StatusCode::INTERNAL_SERVER_ERROR)
549 .body(AxumBody::from("Invalid response headers from consumer"))
550 .expect("infallible")
551 }),
552 }
553 }
554 Err(_) => Response::builder()
555 .status(StatusCode::INTERNAL_SERVER_ERROR)
556 .body(AxumBody::from("Pipeline error"))
557 .expect("Response::builder() with a known-valid status code and body is infallible"),
558 }
559}
560
561pub struct HttpConsumer {
566 config: HttpServerConfig,
567}
568
569impl HttpConsumer {
570 pub fn new(config: HttpServerConfig) -> Self {
571 Self { config }
572 }
573}
574
575#[async_trait::async_trait]
576impl Consumer for HttpConsumer {
577 async fn start(&mut self, ctx: camel_component::ConsumerContext) -> Result<(), CamelError> {
578 use camel_api::{Exchange, Message, body::Body};
579
580 let dispatch = ServerRegistry::global()
581 .get_or_spawn(
582 &self.config.host,
583 self.config.port,
584 self.config.max_request_body,
585 )
586 .await?;
587
588 let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
590 {
591 let mut table = dispatch.write().await;
592 table.insert(self.config.path.clone(), env_tx);
593 }
594
595 let path = self.config.path.clone();
596 let cancel_token = ctx.cancel_token();
597 let _max_response_body = self.config.max_response_body;
598
599 loop {
600 tokio::select! {
601 _ = ctx.cancelled() => {
602 break;
603 }
604 envelope = env_rx.recv() => {
605 let Some(envelope) = envelope else { break; };
606
607 let mut msg = Message::default();
609
610 msg.set_header("CamelHttpMethod",
612 serde_json::Value::String(envelope.method.clone()));
613 msg.set_header("CamelHttpPath",
614 serde_json::Value::String(envelope.path.clone()));
615 msg.set_header("CamelHttpQuery",
616 serde_json::Value::String(envelope.query.clone()));
617
618 for (k, v) in &envelope.headers {
620 if let Ok(val_str) = v.to_str() {
621 msg.set_header(
622 k.as_str(),
623 serde_json::Value::String(val_str.to_string()),
624 );
625 }
626 }
627
628 msg.body = Body::Stream(envelope.body);
631
632 #[allow(unused_mut)]
633 let mut exchange = Exchange::new(msg);
634
635 #[cfg(feature = "otel")]
637 {
638 let headers: HashMap<String, String> = envelope
639 .headers
640 .iter()
641 .filter_map(|(k, v)| {
642 Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
643 })
644 .collect();
645 camel_otel::extract_into_exchange(&mut exchange, &headers);
646 }
647
648 let reply_tx = envelope.reply_tx;
649 let sender = ctx.sender().clone();
650 let path_clone = path.clone();
651 let cancel = cancel_token.clone();
652
653 tokio::spawn(async move {
673 if cancel.is_cancelled() {
681 let _ = reply_tx.send(HttpReply {
682 status: 503,
683 headers: vec![],
684 body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
685 });
686 return;
687 }
688
689 let (tx, rx) = tokio::sync::oneshot::channel();
691 let envelope = camel_component::consumer::ExchangeEnvelope {
692 exchange,
693 reply_tx: Some(tx),
694 };
695
696 let result = match sender.send(envelope).await {
697 Ok(()) => rx.await.map_err(|_| camel_api::CamelError::ChannelClosed),
698 Err(_) => Err(camel_api::CamelError::ChannelClosed),
699 }
700 .and_then(|r| r);
701
702 let reply = match result {
703 Ok(out) => {
704 let status = out
705 .input
706 .header("CamelHttpResponseCode")
707 .and_then(|v| v.as_u64())
708 .map(|s| s as u16)
709 .unwrap_or(200);
710
711 let reply_body: HttpReplyBody = match out.input.body {
712 Body::Empty => HttpReplyBody::Bytes(bytes::Bytes::new()),
713 Body::Bytes(b) => HttpReplyBody::Bytes(b),
714 Body::Text(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
715 Body::Xml(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
716 Body::Json(v) => HttpReplyBody::Bytes(bytes::Bytes::from(
717 v.to_string().into_bytes(),
718 )),
719 Body::Stream(s) => {
720 match s.stream.lock().await.take() {
721 Some(stream) => HttpReplyBody::Stream(stream),
722 None => {
723 tracing::error!(
724 "Body::Stream already consumed before HTTP reply — returning 500"
725 );
726 let error_reply = HttpReply {
727 status: 500,
728 headers: vec![],
729 body: HttpReplyBody::Bytes(bytes::Bytes::new()),
730 };
731 if reply_tx.send(error_reply).is_err() {
732 debug!("reply_tx dropped before error reply could be sent");
733 }
734 return;
735 }
736 }
737 }
738 };
739
740 let resp_headers: Vec<(String, String)> = out
741 .input
742 .headers
743 .iter()
744 .filter(|(k, _)| !k.starts_with("Camel"))
746 .filter(|(k, _)| {
749 !matches!(
750 k.to_lowercase().as_str(),
751 "content-length" | "content-type" | "transfer-encoding" | "connection" | "cache-control" | "date" | "pragma" | "trailer" | "upgrade" | "via" | "warning" | "host" | "user-agent" | "accept" | "accept-encoding" | "accept-language" | "accept-charset" | "authorization" | "proxy-authorization" | "cookie" | "expect" | "from" | "if-match" | "if-modified-since" | "if-none-match" | "if-range" | "if-unmodified-since" | "max-forwards" | "proxy-connection" | "range" | "referer" | "te" )
786 })
787 .filter_map(|(k, v)| {
788 v.as_str().map(|s| (k.clone(), s.to_string()))
789 })
790 .collect();
791
792 HttpReply {
793 status,
794 headers: resp_headers,
795 body: reply_body,
796 }
797 }
798 Err(e) => {
799 tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
800 HttpReply {
801 status: 500,
802 headers: vec![],
803 body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
804 }
805 }
806 };
807
808 let _ = reply_tx.send(reply);
810 });
811 }
812 }
813 }
814
815 {
817 let mut table = dispatch.write().await;
818 table.remove(&path);
819 }
820
821 Ok(())
822 }
823
824 async fn stop(&mut self) -> Result<(), CamelError> {
825 Ok(())
826 }
827
828 fn concurrency_model(&self) -> camel_component::ConcurrencyModel {
829 camel_component::ConcurrencyModel::Concurrent { max: None }
830 }
831}
832
833pub struct HttpComponent;
838
839impl HttpComponent {
840 pub fn new() -> Self {
841 Self
842 }
843}
844
845impl Default for HttpComponent {
846 fn default() -> Self {
847 Self::new()
848 }
849}
850
851impl Component for HttpComponent {
852 fn scheme(&self) -> &str {
853 "http"
854 }
855
856 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
857 let config = HttpEndpointConfig::from_uri(uri)?;
858 let server_config = HttpServerConfig::from_uri(uri)?;
859 let client = build_client(&config)?;
860 Ok(Box::new(HttpEndpoint {
861 uri: uri.to_string(),
862 config,
863 server_config,
864 client,
865 }))
866 }
867}
868
869pub struct HttpsComponent;
870
871impl HttpsComponent {
872 pub fn new() -> Self {
873 Self
874 }
875}
876
877impl Default for HttpsComponent {
878 fn default() -> Self {
879 Self::new()
880 }
881}
882
883impl Component for HttpsComponent {
884 fn scheme(&self) -> &str {
885 "https"
886 }
887
888 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
889 let config = HttpEndpointConfig::from_uri(uri)?;
890 let server_config = HttpServerConfig::from_uri(uri)?;
891 let client = build_client(&config)?;
892 Ok(Box::new(HttpEndpoint {
893 uri: uri.to_string(),
894 config,
895 server_config,
896 client,
897 }))
898 }
899}
900
901fn build_client(config: &HttpEndpointConfig) -> Result<reqwest::Client, CamelError> {
902 let mut builder = reqwest::Client::builder().connect_timeout(config.connect_timeout);
903
904 if !config.follow_redirects {
905 builder = builder.redirect(reqwest::redirect::Policy::none());
906 }
907
908 builder.build().map_err(|e| {
909 CamelError::EndpointCreationFailed(format!("Failed to build HTTP client: {e}"))
910 })
911}
912
913struct HttpEndpoint {
918 uri: String,
919 config: HttpEndpointConfig,
920 server_config: HttpServerConfig,
921 client: reqwest::Client,
922}
923
924impl Endpoint for HttpEndpoint {
925 fn uri(&self) -> &str {
926 &self.uri
927 }
928
929 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
930 Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
931 }
932
933 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
934 Ok(BoxProcessor::new(HttpProducer {
935 config: Arc::new(self.config.clone()),
936 client: self.client.clone(),
937 }))
938 }
939}
940
941fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
946 let parsed = url::Url::parse(url)
947 .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
948
949 if let Some(host) = parsed.host_str()
951 && config.blocked_hosts.iter().any(|blocked| host == blocked)
952 {
953 return Err(CamelError::ProcessorError(format!(
954 "Host '{}' is blocked",
955 host
956 )));
957 }
958
959 if !config.allow_private_ips
961 && let Some(host) = parsed.host()
962 {
963 match host {
964 url::Host::Ipv4(ip) => {
965 if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
966 return Err(CamelError::ProcessorError(format!(
967 "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
968 ip
969 )));
970 }
971 }
972 url::Host::Ipv6(ip) => {
973 if ip.is_loopback() {
974 return Err(CamelError::ProcessorError(format!(
975 "Loopback IP '{}' not allowed",
976 ip
977 )));
978 }
979 }
980 url::Host::Domain(domain) => {
981 let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
983 if blocked_domains.contains(&domain) {
984 return Err(CamelError::ProcessorError(format!(
985 "Domain '{}' is not allowed",
986 domain
987 )));
988 }
989 }
990 }
991 }
992
993 Ok(())
994}
995
996#[derive(Clone)]
1001struct HttpProducer {
1002 config: Arc<HttpEndpointConfig>,
1003 client: reqwest::Client,
1004}
1005
1006impl HttpProducer {
1007 fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1008 if let Some(ref method) = config.http_method {
1009 return method.to_uppercase();
1010 }
1011 if let Some(method) = exchange
1012 .input
1013 .header("CamelHttpMethod")
1014 .and_then(|v| v.as_str())
1015 {
1016 return method.to_uppercase();
1017 }
1018 if !exchange.input.body.is_empty() {
1019 return "POST".to_string();
1020 }
1021 "GET".to_string()
1022 }
1023
1024 fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1025 if let Some(uri) = exchange
1026 .input
1027 .header("CamelHttpUri")
1028 .and_then(|v| v.as_str())
1029 {
1030 let mut url = uri.to_string();
1031 if let Some(path) = exchange
1032 .input
1033 .header("CamelHttpPath")
1034 .and_then(|v| v.as_str())
1035 {
1036 if !url.ends_with('/') && !path.starts_with('/') {
1037 url.push('/');
1038 }
1039 url.push_str(path);
1040 }
1041 if let Some(query) = exchange
1042 .input
1043 .header("CamelHttpQuery")
1044 .and_then(|v| v.as_str())
1045 {
1046 url.push('?');
1047 url.push_str(query);
1048 }
1049 return url;
1050 }
1051
1052 let mut url = config.base_url.clone();
1053
1054 if let Some(path) = exchange
1055 .input
1056 .header("CamelHttpPath")
1057 .and_then(|v| v.as_str())
1058 {
1059 if !url.ends_with('/') && !path.starts_with('/') {
1060 url.push('/');
1061 }
1062 url.push_str(path);
1063 }
1064
1065 if let Some(query) = exchange
1066 .input
1067 .header("CamelHttpQuery")
1068 .and_then(|v| v.as_str())
1069 {
1070 url.push('?');
1071 url.push_str(query);
1072 } else if !config.query_params.is_empty() {
1073 url.push('?');
1075 let query_string: String = config
1076 .query_params
1077 .iter()
1078 .map(|(k, v)| format!("{k}={v}"))
1079 .collect::<Vec<_>>()
1080 .join("&");
1081 url.push_str(&query_string);
1082 }
1083
1084 url
1085 }
1086
1087 fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1088 status >= range.0 && status <= range.1
1089 }
1090}
1091
1092impl Service<Exchange> for HttpProducer {
1093 type Response = Exchange;
1094 type Error = CamelError;
1095 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1096
1097 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1098 Poll::Ready(Ok(()))
1099 }
1100
1101 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1102 let config = self.config.clone();
1103 let client = self.client.clone();
1104
1105 Box::pin(async move {
1106 let method_str = HttpProducer::resolve_method(&exchange, &config);
1107 let url = HttpProducer::resolve_url(&exchange, &config);
1108
1109 validate_url_for_ssrf(&url, &config)?;
1111
1112 debug!(
1113 correlation_id = %exchange.correlation_id(),
1114 method = %method_str,
1115 url = %url,
1116 "HTTP request"
1117 );
1118
1119 let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1120 CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1121 })?;
1122
1123 let mut request = client.request(method, &url);
1124
1125 if let Some(timeout) = config.response_timeout {
1126 request = request.timeout(timeout);
1127 }
1128
1129 #[cfg(feature = "otel")]
1131 {
1132 let mut otel_headers = HashMap::new();
1133 camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1134 for (k, v) in otel_headers {
1135 if let (Ok(name), Ok(val)) = (
1136 reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1137 reqwest::header::HeaderValue::from_str(&v),
1138 ) {
1139 request = request.header(name, val);
1140 }
1141 }
1142 }
1143
1144 for (key, value) in &exchange.input.headers {
1145 if !key.starts_with("Camel")
1146 && let Some(val_str) = value.as_str()
1147 && let (Ok(name), Ok(val)) = (
1148 reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1149 reqwest::header::HeaderValue::from_str(val_str),
1150 )
1151 {
1152 request = request.header(name, val);
1153 }
1154 }
1155
1156 match exchange.input.body {
1157 Body::Stream(ref s) => {
1158 let mut stream_lock = s.stream.lock().await;
1159 if let Some(stream) = stream_lock.take() {
1160 request = request.body(reqwest::Body::wrap_stream(stream));
1161 } else {
1162 return Err(CamelError::AlreadyConsumed);
1163 }
1164 }
1165 _ => {
1166 let body = std::mem::take(&mut exchange.input.body);
1168 let bytes = body.into_bytes(config.max_body_size).await?;
1169 if !bytes.is_empty() {
1170 request = request.body(bytes);
1171 }
1172 }
1173 }
1174
1175 let response = request
1176 .send()
1177 .await
1178 .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1179
1180 let status_code = response.status().as_u16();
1181 let status_text = response
1182 .status()
1183 .canonical_reason()
1184 .unwrap_or("Unknown")
1185 .to_string();
1186
1187 for (key, value) in response.headers() {
1188 if let Ok(val_str) = value.to_str() {
1189 exchange
1190 .input
1191 .set_header(key.as_str(), serde_json::Value::String(val_str.to_string()));
1192 }
1193 }
1194
1195 exchange.input.set_header(
1196 "CamelHttpResponseCode",
1197 serde_json::Value::Number(status_code.into()),
1198 );
1199 exchange.input.set_header(
1200 "CamelHttpResponseText",
1201 serde_json::Value::String(status_text.clone()),
1202 );
1203
1204 let response_body = response.bytes().await.map_err(|e| {
1205 CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1206 })?;
1207
1208 if config.throw_exception_on_failure
1209 && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1210 {
1211 return Err(CamelError::HttpOperationFailed {
1212 method: method_str,
1213 url,
1214 status_code,
1215 status_text,
1216 response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1217 });
1218 }
1219
1220 if !response_body.is_empty() {
1221 exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1222 }
1223
1224 debug!(
1225 correlation_id = %exchange.correlation_id(),
1226 status = status_code,
1227 url = %url,
1228 "HTTP response"
1229 );
1230 Ok(exchange)
1231 })
1232 }
1233}
1234
1235#[cfg(test)]
1236mod tests {
1237 use super::*;
1238 use camel_api::Message;
1239 use std::sync::Arc;
1240 use std::time::Duration;
1241
1242 fn test_producer_ctx() -> ProducerContext {
1243 ProducerContext::new()
1244 }
1245
1246 #[test]
1247 fn test_http_config_defaults() {
1248 let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1249 assert_eq!(config.base_url, "http://localhost:8080/api");
1250 assert!(config.http_method.is_none());
1251 assert!(config.throw_exception_on_failure);
1252 assert_eq!(config.ok_status_code_range, (200, 299));
1253 assert!(!config.follow_redirects);
1254 assert_eq!(config.connect_timeout, Duration::from_millis(30000));
1255 assert!(config.response_timeout.is_none());
1256 }
1257
1258 #[test]
1259 fn test_http_config_scheme() {
1260 assert_eq!(HttpEndpointConfig::scheme(), "http");
1262 }
1263
1264 #[test]
1265 fn test_http_config_from_components() {
1266 let components = camel_endpoint::UriComponents {
1268 scheme: "https".to_string(),
1269 path: "//api.example.com/v1".to_string(),
1270 params: std::collections::HashMap::from([(
1271 "httpMethod".to_string(),
1272 "POST".to_string(),
1273 )]),
1274 };
1275 let config = HttpEndpointConfig::from_components(components).unwrap();
1276 assert_eq!(config.base_url, "https://api.example.com/v1");
1277 assert_eq!(config.http_method, Some("POST".to_string()));
1278 }
1279
1280 #[test]
1281 fn test_http_config_with_options() {
1282 let config = HttpEndpointConfig::from_uri(
1283 "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1284 ).unwrap();
1285 assert_eq!(config.base_url, "https://api.example.com/v1");
1286 assert_eq!(config.http_method, Some("PUT".to_string()));
1287 assert!(!config.throw_exception_on_failure);
1288 assert!(config.follow_redirects);
1289 assert_eq!(config.connect_timeout, Duration::from_millis(5000));
1290 assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1291 }
1292
1293 #[test]
1294 fn test_http_config_ok_status_range() {
1295 let config =
1296 HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1297 assert_eq!(config.ok_status_code_range, (200, 204));
1298 }
1299
1300 #[test]
1301 fn test_http_config_wrong_scheme() {
1302 let result = HttpEndpointConfig::from_uri("file:/tmp");
1303 assert!(result.is_err());
1304 }
1305
1306 #[test]
1307 fn test_http_component_scheme() {
1308 let component = HttpComponent::new();
1309 assert_eq!(component.scheme(), "http");
1310 }
1311
1312 #[test]
1313 fn test_https_component_scheme() {
1314 let component = HttpsComponent::new();
1315 assert_eq!(component.scheme(), "https");
1316 }
1317
1318 #[test]
1319 fn test_http_endpoint_creates_consumer() {
1320 let component = HttpComponent::new();
1321 let endpoint = component
1322 .create_endpoint("http://0.0.0.0:19100/test")
1323 .unwrap();
1324 assert!(endpoint.create_consumer().is_ok());
1325 }
1326
1327 #[test]
1328 fn test_https_endpoint_creates_consumer() {
1329 let component = HttpsComponent::new();
1330 let endpoint = component
1331 .create_endpoint("https://0.0.0.0:8443/test")
1332 .unwrap();
1333 assert!(endpoint.create_consumer().is_ok());
1334 }
1335
1336 #[test]
1337 fn test_http_endpoint_creates_producer() {
1338 let ctx = test_producer_ctx();
1339 let component = HttpComponent::new();
1340 let endpoint = component.create_endpoint("http://localhost/api").unwrap();
1341 assert!(endpoint.create_producer(&ctx).is_ok());
1342 }
1343
1344 async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
1349 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1350 let addr = listener.local_addr().unwrap();
1351 let url = format!("http://127.0.0.1:{}", addr.port());
1352
1353 let handle = tokio::spawn(async move {
1354 loop {
1355 if let Ok((mut stream, _)) = listener.accept().await {
1356 tokio::spawn(async move {
1357 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1358 let mut buf = vec![0u8; 4096];
1359 let n = stream.read(&mut buf).await.unwrap_or(0);
1360 let request = String::from_utf8_lossy(&buf[..n]).to_string();
1361
1362 let method = request.split_whitespace().next().unwrap_or("GET");
1363
1364 let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
1365 let response = format!(
1366 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
1367 body.len(),
1368 body
1369 );
1370 let _ = stream.write_all(response.as_bytes()).await;
1371 });
1372 }
1373 }
1374 });
1375
1376 (url, handle)
1377 }
1378
1379 async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
1380 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1381 let addr = listener.local_addr().unwrap();
1382 let url = format!("http://127.0.0.1:{}", addr.port());
1383
1384 let handle = tokio::spawn(async move {
1385 loop {
1386 if let Ok((mut stream, _)) = listener.accept().await {
1387 let status = status;
1388 tokio::spawn(async move {
1389 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1390 let mut buf = vec![0u8; 4096];
1391 let _ = stream.read(&mut buf).await;
1392
1393 let status_text = match status {
1394 404 => "Not Found",
1395 500 => "Internal Server Error",
1396 _ => "Error",
1397 };
1398 let body = "error body";
1399 let response = format!(
1400 "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
1401 status,
1402 status_text,
1403 body.len(),
1404 body
1405 );
1406 let _ = stream.write_all(response.as_bytes()).await;
1407 });
1408 }
1409 }
1410 });
1411
1412 (url, handle)
1413 }
1414
1415 #[tokio::test]
1416 async fn test_http_producer_get_request() {
1417 use tower::ServiceExt;
1418
1419 let (url, _handle) = start_test_server().await;
1420 let ctx = test_producer_ctx();
1421
1422 let component = HttpComponent::new();
1423 let endpoint = component
1424 .create_endpoint(&format!("{url}/api/test?allowPrivateIps=true"))
1425 .unwrap();
1426 let producer = endpoint.create_producer(&ctx).unwrap();
1427
1428 let exchange = Exchange::new(Message::default());
1429 let result = producer.oneshot(exchange).await.unwrap();
1430
1431 let status = result
1432 .input
1433 .header("CamelHttpResponseCode")
1434 .and_then(|v| v.as_u64())
1435 .unwrap();
1436 assert_eq!(status, 200);
1437
1438 assert!(!result.input.body.is_empty());
1439 }
1440
1441 #[tokio::test]
1442 async fn test_http_producer_post_with_body() {
1443 use tower::ServiceExt;
1444
1445 let (url, _handle) = start_test_server().await;
1446 let ctx = test_producer_ctx();
1447
1448 let component = HttpComponent::new();
1449 let endpoint = component
1450 .create_endpoint(&format!("{url}/api/data?allowPrivateIps=true"))
1451 .unwrap();
1452 let producer = endpoint.create_producer(&ctx).unwrap();
1453
1454 let exchange = Exchange::new(Message::new("request body"));
1455 let result = producer.oneshot(exchange).await.unwrap();
1456
1457 let status = result
1458 .input
1459 .header("CamelHttpResponseCode")
1460 .and_then(|v| v.as_u64())
1461 .unwrap();
1462 assert_eq!(status, 200);
1463 }
1464
1465 #[tokio::test]
1466 async fn test_http_producer_method_from_header() {
1467 use tower::ServiceExt;
1468
1469 let (url, _handle) = start_test_server().await;
1470 let ctx = test_producer_ctx();
1471
1472 let component = HttpComponent::new();
1473 let endpoint = component
1474 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1475 .unwrap();
1476 let producer = endpoint.create_producer(&ctx).unwrap();
1477
1478 let mut exchange = Exchange::new(Message::default());
1479 exchange.input.set_header(
1480 "CamelHttpMethod",
1481 serde_json::Value::String("DELETE".to_string()),
1482 );
1483
1484 let result = producer.oneshot(exchange).await.unwrap();
1485 let status = result
1486 .input
1487 .header("CamelHttpResponseCode")
1488 .and_then(|v| v.as_u64())
1489 .unwrap();
1490 assert_eq!(status, 200);
1491 }
1492
1493 #[tokio::test]
1494 async fn test_http_producer_forced_method() {
1495 use tower::ServiceExt;
1496
1497 let (url, _handle) = start_test_server().await;
1498 let ctx = test_producer_ctx();
1499
1500 let component = HttpComponent::new();
1501 let endpoint = component
1502 .create_endpoint(&format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"))
1503 .unwrap();
1504 let producer = endpoint.create_producer(&ctx).unwrap();
1505
1506 let exchange = Exchange::new(Message::default());
1507 let result = producer.oneshot(exchange).await.unwrap();
1508
1509 let status = result
1510 .input
1511 .header("CamelHttpResponseCode")
1512 .and_then(|v| v.as_u64())
1513 .unwrap();
1514 assert_eq!(status, 200);
1515 }
1516
1517 #[tokio::test]
1518 async fn test_http_producer_throw_exception_on_failure() {
1519 use tower::ServiceExt;
1520
1521 let (url, _handle) = start_status_server(404).await;
1522 let ctx = test_producer_ctx();
1523
1524 let component = HttpComponent::new();
1525 let endpoint = component
1526 .create_endpoint(&format!("{url}/not-found?allowPrivateIps=true"))
1527 .unwrap();
1528 let producer = endpoint.create_producer(&ctx).unwrap();
1529
1530 let exchange = Exchange::new(Message::default());
1531 let result = producer.oneshot(exchange).await;
1532 assert!(result.is_err());
1533
1534 match result.unwrap_err() {
1535 CamelError::HttpOperationFailed { status_code, .. } => {
1536 assert_eq!(status_code, 404);
1537 }
1538 e => panic!("Expected HttpOperationFailed, got: {e}"),
1539 }
1540 }
1541
1542 #[tokio::test]
1543 async fn test_http_producer_no_throw_on_failure() {
1544 use tower::ServiceExt;
1545
1546 let (url, _handle) = start_status_server(500).await;
1547 let ctx = test_producer_ctx();
1548
1549 let component = HttpComponent::new();
1550 let endpoint = component
1551 .create_endpoint(&format!(
1552 "{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"
1553 ))
1554 .unwrap();
1555 let producer = endpoint.create_producer(&ctx).unwrap();
1556
1557 let exchange = Exchange::new(Message::default());
1558 let result = producer.oneshot(exchange).await.unwrap();
1559
1560 let status = result
1561 .input
1562 .header("CamelHttpResponseCode")
1563 .and_then(|v| v.as_u64())
1564 .unwrap();
1565 assert_eq!(status, 500);
1566 }
1567
1568 #[tokio::test]
1569 async fn test_http_producer_uri_override() {
1570 use tower::ServiceExt;
1571
1572 let (url, _handle) = start_test_server().await;
1573 let ctx = test_producer_ctx();
1574
1575 let component = HttpComponent::new();
1576 let endpoint = component
1577 .create_endpoint("http://localhost:1/does-not-exist?allowPrivateIps=true")
1578 .unwrap();
1579 let producer = endpoint.create_producer(&ctx).unwrap();
1580
1581 let mut exchange = Exchange::new(Message::default());
1582 exchange.input.set_header(
1583 "CamelHttpUri",
1584 serde_json::Value::String(format!("{url}/api")),
1585 );
1586
1587 let result = producer.oneshot(exchange).await.unwrap();
1588 let status = result
1589 .input
1590 .header("CamelHttpResponseCode")
1591 .and_then(|v| v.as_u64())
1592 .unwrap();
1593 assert_eq!(status, 200);
1594 }
1595
1596 #[tokio::test]
1597 async fn test_http_producer_response_headers_mapped() {
1598 use tower::ServiceExt;
1599
1600 let (url, _handle) = start_test_server().await;
1601 let ctx = test_producer_ctx();
1602
1603 let component = HttpComponent::new();
1604 let endpoint = component
1605 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1606 .unwrap();
1607 let producer = endpoint.create_producer(&ctx).unwrap();
1608
1609 let exchange = Exchange::new(Message::default());
1610 let result = producer.oneshot(exchange).await.unwrap();
1611
1612 assert!(
1613 result.input.header("content-type").is_some()
1614 || result.input.header("Content-Type").is_some()
1615 );
1616 assert!(result.input.header("CamelHttpResponseText").is_some());
1617 }
1618
1619 async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
1624 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1625 let addr = listener.local_addr().unwrap();
1626 let url = format!("http://127.0.0.1:{}", addr.port());
1627
1628 let handle = tokio::spawn(async move {
1629 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1630 loop {
1631 if let Ok((mut stream, _)) = listener.accept().await {
1632 tokio::spawn(async move {
1633 let mut buf = vec![0u8; 4096];
1634 let n = stream.read(&mut buf).await.unwrap_or(0);
1635 let request = String::from_utf8_lossy(&buf[..n]).to_string();
1636
1637 if request.contains("GET /final") {
1639 let body = r#"{"status":"final"}"#;
1640 let response = format!(
1641 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1642 body.len(),
1643 body
1644 );
1645 let _ = stream.write_all(response.as_bytes()).await;
1646 } else {
1647 let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
1649 let _ = stream.write_all(response.as_bytes()).await;
1650 }
1651 });
1652 }
1653 }
1654 });
1655
1656 (url, handle)
1657 }
1658
1659 #[tokio::test]
1660 async fn test_follow_redirects_false_does_not_follow() {
1661 use tower::ServiceExt;
1662
1663 let (url, _handle) = start_redirect_server().await;
1664 let ctx = test_producer_ctx();
1665
1666 let component = HttpComponent::new();
1667 let endpoint = component
1668 .create_endpoint(&format!(
1669 "{url}?followRedirects=false&throwExceptionOnFailure=false&allowPrivateIps=true"
1670 ))
1671 .unwrap();
1672 let producer = endpoint.create_producer(&ctx).unwrap();
1673
1674 let exchange = Exchange::new(Message::default());
1675 let result = producer.oneshot(exchange).await.unwrap();
1676
1677 let status = result
1679 .input
1680 .header("CamelHttpResponseCode")
1681 .and_then(|v| v.as_u64())
1682 .unwrap();
1683 assert_eq!(
1684 status, 302,
1685 "Should NOT follow redirect when followRedirects=false"
1686 );
1687 }
1688
1689 #[tokio::test]
1690 async fn test_follow_redirects_true_follows_redirect() {
1691 use tower::ServiceExt;
1692
1693 let (url, _handle) = start_redirect_server().await;
1694 let ctx = test_producer_ctx();
1695
1696 let component = HttpComponent::new();
1697 let endpoint = component
1698 .create_endpoint(&format!("{url}?followRedirects=true&allowPrivateIps=true"))
1699 .unwrap();
1700 let producer = endpoint.create_producer(&ctx).unwrap();
1701
1702 let exchange = Exchange::new(Message::default());
1703 let result = producer.oneshot(exchange).await.unwrap();
1704
1705 let status = result
1707 .input
1708 .header("CamelHttpResponseCode")
1709 .and_then(|v| v.as_u64())
1710 .unwrap();
1711 assert_eq!(
1712 status, 200,
1713 "Should follow redirect when followRedirects=true"
1714 );
1715 }
1716
1717 #[tokio::test]
1718 async fn test_query_params_forwarded_to_http_request() {
1719 use tower::ServiceExt;
1720
1721 let (url, _handle) = start_test_server().await;
1722 let ctx = test_producer_ctx();
1723
1724 let component = HttpComponent::new();
1725 let endpoint = component
1727 .create_endpoint(&format!(
1728 "{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"
1729 ))
1730 .unwrap();
1731 let producer = endpoint.create_producer(&ctx).unwrap();
1732
1733 let exchange = Exchange::new(Message::default());
1734 let result = producer.oneshot(exchange).await.unwrap();
1735
1736 let status = result
1739 .input
1740 .header("CamelHttpResponseCode")
1741 .and_then(|v| v.as_u64())
1742 .unwrap();
1743 assert_eq!(status, 200);
1744 }
1745
1746 #[tokio::test]
1747 async fn test_non_camel_query_params_are_forwarded() {
1748 let config = HttpEndpointConfig::from_uri(
1751 "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
1752 )
1753 .unwrap();
1754
1755 assert!(
1757 config.query_params.contains_key("apiKey"),
1758 "apiKey should be preserved"
1759 );
1760 assert!(
1761 config.query_params.contains_key("token"),
1762 "token should be preserved"
1763 );
1764 assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
1765 assert_eq!(config.query_params.get("token").unwrap(), "abc456");
1766
1767 assert!(
1769 !config.query_params.contains_key("httpMethod"),
1770 "httpMethod should not be forwarded"
1771 );
1772 }
1773
1774 #[tokio::test]
1779 async fn test_http_producer_blocks_metadata_endpoint() {
1780 use tower::ServiceExt;
1781
1782 let ctx = test_producer_ctx();
1783 let component = HttpComponent::new();
1784 let endpoint = component
1785 .create_endpoint("http://example.com/api?allowPrivateIps=false")
1786 .unwrap();
1787 let producer = endpoint.create_producer(&ctx).unwrap();
1788
1789 let mut exchange = Exchange::new(Message::default());
1790 exchange.input.set_header(
1791 "CamelHttpUri",
1792 serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
1793 );
1794
1795 let result = producer.oneshot(exchange).await;
1796 assert!(result.is_err(), "Should block AWS metadata endpoint");
1797
1798 let err = result.unwrap_err();
1799 assert!(
1800 err.to_string().contains("Private IP"),
1801 "Error should mention private IP blocking, got: {}",
1802 err
1803 );
1804 }
1805
1806 #[test]
1807 fn test_ssrf_config_defaults() {
1808 let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
1809 assert!(
1810 !config.allow_private_ips,
1811 "Private IPs should be blocked by default"
1812 );
1813 assert!(
1814 config.blocked_hosts.is_empty(),
1815 "Blocked hosts should be empty by default"
1816 );
1817 }
1818
1819 #[test]
1820 fn test_ssrf_config_allow_private_ips() {
1821 let config =
1822 HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
1823 assert!(
1824 config.allow_private_ips,
1825 "Private IPs should be allowed when explicitly set"
1826 );
1827 }
1828
1829 #[test]
1830 fn test_ssrf_config_blocked_hosts() {
1831 let config = HttpEndpointConfig::from_uri(
1832 "http://example.com/api?blockedHosts=evil.com,malware.net",
1833 )
1834 .unwrap();
1835 assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
1836 }
1837
1838 #[tokio::test]
1839 async fn test_http_producer_blocks_localhost() {
1840 use tower::ServiceExt;
1841
1842 let ctx = test_producer_ctx();
1843 let component = HttpComponent::new();
1844 let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1845 let producer = endpoint.create_producer(&ctx).unwrap();
1846
1847 let mut exchange = Exchange::new(Message::default());
1848 exchange.input.set_header(
1849 "CamelHttpUri",
1850 serde_json::Value::String("http://localhost:8080/internal".to_string()),
1851 );
1852
1853 let result = producer.oneshot(exchange).await;
1854 assert!(result.is_err(), "Should block localhost");
1855 }
1856
1857 #[tokio::test]
1858 async fn test_http_producer_blocks_loopback_ip() {
1859 use tower::ServiceExt;
1860
1861 let ctx = test_producer_ctx();
1862 let component = HttpComponent::new();
1863 let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1864 let producer = endpoint.create_producer(&ctx).unwrap();
1865
1866 let mut exchange = Exchange::new(Message::default());
1867 exchange.input.set_header(
1868 "CamelHttpUri",
1869 serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
1870 );
1871
1872 let result = producer.oneshot(exchange).await;
1873 assert!(result.is_err(), "Should block loopback IP");
1874 }
1875
1876 #[tokio::test]
1877 async fn test_http_producer_allows_private_ip_when_enabled() {
1878 use tower::ServiceExt;
1879
1880 let ctx = test_producer_ctx();
1881 let component = HttpComponent::new();
1882 let endpoint = component
1885 .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true")
1886 .unwrap();
1887 let producer = endpoint.create_producer(&ctx).unwrap();
1888
1889 let exchange = Exchange::new(Message::default());
1890
1891 let result = producer.oneshot(exchange).await;
1894 if let Err(ref e) = result {
1896 let err_str = e.to_string();
1897 assert!(
1898 !err_str.contains("Private IP") && !err_str.contains("not allowed"),
1899 "Should not be SSRF error, got: {}",
1900 err_str
1901 );
1902 }
1903 }
1904
1905 #[test]
1910 fn test_http_server_config_parse() {
1911 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
1912 assert_eq!(cfg.host, "0.0.0.0");
1913 assert_eq!(cfg.port, 8080);
1914 assert_eq!(cfg.path, "/orders");
1915 }
1916
1917 #[test]
1918 fn test_http_server_config_scheme() {
1919 assert_eq!(HttpServerConfig::scheme(), "http");
1921 }
1922
1923 #[test]
1924 fn test_http_server_config_from_components() {
1925 let components = camel_endpoint::UriComponents {
1927 scheme: "https".to_string(),
1928 path: "//0.0.0.0:8443/api".to_string(),
1929 params: std::collections::HashMap::from([(
1930 "maxRequestBody".to_string(),
1931 "5242880".to_string(),
1932 )]),
1933 };
1934 let cfg = HttpServerConfig::from_components(components).unwrap();
1935 assert_eq!(cfg.host, "0.0.0.0");
1936 assert_eq!(cfg.port, 8443);
1937 assert_eq!(cfg.path, "/api");
1938 assert_eq!(cfg.max_request_body, 5242880);
1939 }
1940
1941 #[test]
1942 fn test_http_server_config_default_path() {
1943 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
1944 assert_eq!(cfg.path, "/");
1945 }
1946
1947 #[test]
1948 fn test_http_server_config_wrong_scheme() {
1949 assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
1950 }
1951
1952 #[test]
1953 fn test_http_server_config_invalid_port() {
1954 assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
1955 }
1956
1957 #[test]
1958 fn test_http_server_config_default_port_by_scheme() {
1959 let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
1961 assert_eq!(cfg_http.port, 80);
1962
1963 let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
1965 assert_eq!(cfg_https.port, 443);
1966 }
1967
1968 #[test]
1969 fn test_request_envelope_and_reply_are_send() {
1970 fn assert_send<T: Send>() {}
1971 assert_send::<RequestEnvelope>();
1972 assert_send::<HttpReply>();
1973 }
1974
1975 #[test]
1980 fn test_server_registry_global_is_singleton() {
1981 let r1 = ServerRegistry::global();
1982 let r2 = ServerRegistry::global();
1983 assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
1984 }
1985
1986 #[tokio::test]
1991 async fn test_dispatch_handler_returns_404_for_unknown_path() {
1992 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
1993 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1995 let port = listener.local_addr().unwrap().port();
1996 tokio::spawn(run_axum_server(listener, dispatch, 2 * 1024 * 1024));
1997
1998 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2000
2001 let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
2002 .await
2003 .unwrap();
2004 assert_eq!(resp.status().as_u16(), 404);
2005 }
2006
2007 #[tokio::test]
2012 async fn test_http_consumer_start_registers_path() {
2013 use camel_component::ConsumerContext;
2014
2015 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2017 let port = listener.local_addr().unwrap().port();
2018 drop(listener); let consumer_cfg = HttpServerConfig {
2021 host: "127.0.0.1".to_string(),
2022 port,
2023 path: "/ping".to_string(),
2024 max_request_body: 2 * 1024 * 1024,
2025 max_response_body: 10 * 1024 * 1024,
2026 };
2027 let mut consumer = HttpConsumer::new(consumer_cfg);
2028
2029 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component::ExchangeEnvelope>(16);
2030 let token = tokio_util::sync::CancellationToken::new();
2031 let ctx = ConsumerContext::new(tx, token.clone());
2032
2033 tokio::spawn(async move {
2034 consumer.start(ctx).await.unwrap();
2035 });
2036
2037 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2038
2039 let client = reqwest::Client::new();
2040 let resp_future = client
2041 .post(format!("http://127.0.0.1:{port}/ping"))
2042 .body("hello world")
2043 .send();
2044
2045 let (http_result, _) = tokio::join!(resp_future, async {
2046 if let Some(mut envelope) = rx.recv().await {
2047 envelope.exchange.input.set_header(
2049 "CamelHttpResponseCode",
2050 serde_json::Value::Number(201.into()),
2051 );
2052 if let Some(reply_tx) = envelope.reply_tx {
2053 let _ = reply_tx.send(Ok(envelope.exchange));
2054 }
2055 }
2056 });
2057
2058 let resp = http_result.unwrap();
2059 assert_eq!(resp.status().as_u16(), 201);
2060
2061 token.cancel();
2062 }
2063
2064 #[tokio::test]
2069 async fn test_integration_single_consumer_round_trip() {
2070 use camel_component::{ConsumerContext, ExchangeEnvelope};
2071
2072 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2074 let port = listener.local_addr().unwrap().port();
2075 drop(listener); let component = HttpComponent::new();
2078 let endpoint = component
2079 .create_endpoint(&format!("http://127.0.0.1:{port}/echo"))
2080 .unwrap();
2081 let mut consumer = endpoint.create_consumer().unwrap();
2082
2083 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2084 let token = tokio_util::sync::CancellationToken::new();
2085 let ctx = ConsumerContext::new(tx, token.clone());
2086
2087 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2088 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2089
2090 let client = reqwest::Client::new();
2091 let send_fut = client
2092 .post(format!("http://127.0.0.1:{port}/echo"))
2093 .header("Content-Type", "text/plain")
2094 .body("ping")
2095 .send();
2096
2097 let (http_result, _) = tokio::join!(send_fut, async {
2098 if let Some(mut envelope) = rx.recv().await {
2099 assert_eq!(
2100 envelope.exchange.input.header("CamelHttpMethod"),
2101 Some(&serde_json::Value::String("POST".into()))
2102 );
2103 assert_eq!(
2104 envelope.exchange.input.header("CamelHttpPath"),
2105 Some(&serde_json::Value::String("/echo".into()))
2106 );
2107 envelope.exchange.input.body = camel_api::body::Body::Text("pong".to_string());
2108 if let Some(reply_tx) = envelope.reply_tx {
2109 let _ = reply_tx.send(Ok(envelope.exchange));
2110 }
2111 }
2112 });
2113
2114 let resp = http_result.unwrap();
2115 assert_eq!(resp.status().as_u16(), 200);
2116 let body = resp.text().await.unwrap();
2117 assert_eq!(body, "pong");
2118
2119 token.cancel();
2120 }
2121
2122 #[tokio::test]
2123 async fn test_integration_two_consumers_shared_port() {
2124 use camel_component::{ConsumerContext, ExchangeEnvelope};
2125
2126 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2128 let port = listener.local_addr().unwrap().port();
2129 drop(listener);
2130
2131 let component = HttpComponent::new();
2132
2133 let endpoint_a = component
2135 .create_endpoint(&format!("http://127.0.0.1:{port}/hello"))
2136 .unwrap();
2137 let mut consumer_a = endpoint_a.create_consumer().unwrap();
2138
2139 let endpoint_b = component
2141 .create_endpoint(&format!("http://127.0.0.1:{port}/world"))
2142 .unwrap();
2143 let mut consumer_b = endpoint_b.create_consumer().unwrap();
2144
2145 let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2146 let token_a = tokio_util::sync::CancellationToken::new();
2147 let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
2148
2149 let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2150 let token_b = tokio_util::sync::CancellationToken::new();
2151 let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
2152
2153 tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
2154 tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
2155 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2156
2157 let client = reqwest::Client::new();
2158
2159 let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
2161 let (resp_hello, _) = tokio::join!(fut_hello, async {
2162 if let Some(mut envelope) = rx_a.recv().await {
2163 envelope.exchange.input.body =
2164 camel_api::body::Body::Text("hello-response".to_string());
2165 if let Some(reply_tx) = envelope.reply_tx {
2166 let _ = reply_tx.send(Ok(envelope.exchange));
2167 }
2168 }
2169 });
2170
2171 let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
2173 let (resp_world, _) = tokio::join!(fut_world, async {
2174 if let Some(mut envelope) = rx_b.recv().await {
2175 envelope.exchange.input.body =
2176 camel_api::body::Body::Text("world-response".to_string());
2177 if let Some(reply_tx) = envelope.reply_tx {
2178 let _ = reply_tx.send(Ok(envelope.exchange));
2179 }
2180 }
2181 });
2182
2183 let body_a = resp_hello.unwrap().text().await.unwrap();
2184 let body_b = resp_world.unwrap().text().await.unwrap();
2185
2186 assert_eq!(body_a, "hello-response");
2187 assert_eq!(body_b, "world-response");
2188
2189 token_a.cancel();
2190 token_b.cancel();
2191 }
2192
2193 #[tokio::test]
2194 async fn test_integration_unregistered_path_returns_404() {
2195 use camel_component::{ConsumerContext, ExchangeEnvelope};
2196
2197 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2199 let port = listener.local_addr().unwrap().port();
2200 drop(listener);
2201
2202 let component = HttpComponent::new();
2203 let endpoint = component
2204 .create_endpoint(&format!("http://127.0.0.1:{port}/registered"))
2205 .unwrap();
2206 let mut consumer = endpoint.create_consumer().unwrap();
2207
2208 let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2209 let token = tokio_util::sync::CancellationToken::new();
2210 let ctx = ConsumerContext::new(tx, token.clone());
2211
2212 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2213 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2214
2215 let client = reqwest::Client::new();
2216 let resp = client
2217 .get(format!("http://127.0.0.1:{port}/not-there"))
2218 .send()
2219 .await
2220 .unwrap();
2221 assert_eq!(resp.status().as_u16(), 404);
2222
2223 token.cancel();
2224 }
2225
2226 #[test]
2227 fn test_http_consumer_declares_concurrent() {
2228 use camel_component::ConcurrencyModel;
2229
2230 let config = HttpServerConfig {
2231 host: "127.0.0.1".to_string(),
2232 port: 19999,
2233 path: "/test".to_string(),
2234 max_request_body: 2 * 1024 * 1024,
2235 max_response_body: 10 * 1024 * 1024,
2236 };
2237 let consumer = HttpConsumer::new(config);
2238 assert_eq!(
2239 consumer.concurrency_model(),
2240 ConcurrencyModel::Concurrent { max: None }
2241 );
2242 }
2243
2244 #[tokio::test]
2249 async fn test_http_reply_body_stream_variant_exists() {
2250 use bytes::Bytes;
2251 use camel_api::CamelError;
2252 use futures::stream;
2253
2254 let chunks: Vec<Result<Bytes, CamelError>> =
2255 vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
2256 let stream = Box::pin(stream::iter(chunks));
2257 let reply_body = HttpReplyBody::Stream(stream);
2258 match reply_body {
2260 HttpReplyBody::Stream(_) => {}
2261 HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
2262 }
2263 }
2264
2265 #[cfg(feature = "otel")]
2270 mod otel_tests {
2271 use super::*;
2272 use camel_api::Message;
2273 use tower::ServiceExt;
2274
2275 #[tokio::test]
2276 async fn test_producer_injects_traceparent_header() {
2277 let (url, _handle) = start_test_server_with_header_capture().await;
2278 let ctx = test_producer_ctx();
2279
2280 let component = HttpComponent::new();
2281 let endpoint = component
2282 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
2283 .unwrap();
2284 let producer = endpoint.create_producer(&ctx).unwrap();
2285
2286 let mut exchange = Exchange::new(Message::default());
2288 let mut headers = std::collections::HashMap::new();
2289 headers.insert(
2290 "traceparent".to_string(),
2291 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
2292 );
2293 camel_otel::extract_into_exchange(&mut exchange, &headers);
2294
2295 let result = producer.oneshot(exchange).await.unwrap();
2296
2297 let status = result
2299 .input
2300 .header("CamelHttpResponseCode")
2301 .and_then(|v| v.as_u64())
2302 .unwrap();
2303 assert_eq!(status, 200);
2304
2305 let traceparent = result.input.header("x-received-traceparent");
2307 assert!(
2308 traceparent.is_some(),
2309 "traceparent header should have been sent"
2310 );
2311
2312 let traceparent_str = traceparent.unwrap().as_str().unwrap();
2313 let parts: Vec<&str> = traceparent_str.split('-').collect();
2315 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2316 assert_eq!(parts[0], "00", "version should be 00");
2317 assert_eq!(
2318 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2319 "trace-id should match"
2320 );
2321 assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
2322 assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
2323 }
2324
2325 #[tokio::test]
2326 async fn test_consumer_extracts_traceparent_header() {
2327 use camel_component::{ConsumerContext, ExchangeEnvelope};
2328
2329 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2331 let port = listener.local_addr().unwrap().port();
2332 drop(listener);
2333
2334 let component = HttpComponent::new();
2335 let endpoint = component
2336 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"))
2337 .unwrap();
2338 let mut consumer = endpoint.create_consumer().unwrap();
2339
2340 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2341 let token = tokio_util::sync::CancellationToken::new();
2342 let ctx = ConsumerContext::new(tx, token.clone());
2343
2344 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2345 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2346
2347 let client = reqwest::Client::new();
2349 let send_fut = client
2350 .post(format!("http://127.0.0.1:{port}/trace"))
2351 .header(
2352 "traceparent",
2353 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2354 )
2355 .body("test")
2356 .send();
2357
2358 let (http_result, _) = tokio::join!(send_fut, async {
2359 if let Some(envelope) = rx.recv().await {
2360 let mut injected_headers = std::collections::HashMap::new();
2363 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2364
2365 assert!(
2366 injected_headers.contains_key("traceparent"),
2367 "Exchange should have traceparent after extraction"
2368 );
2369
2370 let traceparent = injected_headers.get("traceparent").unwrap();
2371 let parts: Vec<&str> = traceparent.split('-').collect();
2372 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2373 assert_eq!(
2374 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2375 "Trace ID should match the original traceparent header"
2376 );
2377
2378 if let Some(reply_tx) = envelope.reply_tx {
2379 let _ = reply_tx.send(Ok(envelope.exchange));
2380 }
2381 }
2382 });
2383
2384 let resp = http_result.unwrap();
2385 assert_eq!(resp.status().as_u16(), 200);
2386
2387 token.cancel();
2388 }
2389
2390 #[tokio::test]
2391 async fn test_consumer_extracts_mixed_case_traceparent_header() {
2392 use camel_component::{ConsumerContext, ExchangeEnvelope};
2393
2394 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2396 let port = listener.local_addr().unwrap().port();
2397 drop(listener);
2398
2399 let component = HttpComponent::new();
2400 let endpoint = component
2401 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"))
2402 .unwrap();
2403 let mut consumer = endpoint.create_consumer().unwrap();
2404
2405 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2406 let token = tokio_util::sync::CancellationToken::new();
2407 let ctx = ConsumerContext::new(tx, token.clone());
2408
2409 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2410 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2411
2412 let client = reqwest::Client::new();
2414 let send_fut = client
2415 .post(format!("http://127.0.0.1:{port}/trace"))
2416 .header(
2417 "TraceParent",
2418 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2419 )
2420 .body("test")
2421 .send();
2422
2423 let (http_result, _) = tokio::join!(send_fut, async {
2424 if let Some(envelope) = rx.recv().await {
2425 let mut injected_headers = HashMap::new();
2428 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2429
2430 assert!(
2431 injected_headers.contains_key("traceparent"),
2432 "Exchange should have traceparent after extraction from mixed-case header"
2433 );
2434
2435 let traceparent = injected_headers.get("traceparent").unwrap();
2436 let parts: Vec<&str> = traceparent.split('-').collect();
2437 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2438 assert_eq!(
2439 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2440 "Trace ID should match the original mixed-case TraceParent header"
2441 );
2442
2443 if let Some(reply_tx) = envelope.reply_tx {
2444 let _ = reply_tx.send(Ok(envelope.exchange));
2445 }
2446 }
2447 });
2448
2449 let resp = http_result.unwrap();
2450 assert_eq!(resp.status().as_u16(), 200);
2451
2452 token.cancel();
2453 }
2454
2455 #[tokio::test]
2456 async fn test_producer_no_trace_context_no_crash() {
2457 let (url, _handle) = start_test_server().await;
2458 let ctx = test_producer_ctx();
2459
2460 let component = HttpComponent::new();
2461 let endpoint = component
2462 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
2463 .unwrap();
2464 let producer = endpoint.create_producer(&ctx).unwrap();
2465
2466 let exchange = Exchange::new(Message::default());
2468
2469 let result = producer.oneshot(exchange).await.unwrap();
2471
2472 let status = result
2474 .input
2475 .header("CamelHttpResponseCode")
2476 .and_then(|v| v.as_u64())
2477 .unwrap();
2478 assert_eq!(status, 200);
2479 }
2480
2481 async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
2483 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2484 let addr = listener.local_addr().unwrap();
2485 let url = format!("http://127.0.0.1:{}", addr.port());
2486
2487 let handle = tokio::spawn(async move {
2488 loop {
2489 if let Ok((mut stream, _)) = listener.accept().await {
2490 tokio::spawn(async move {
2491 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2492 let mut buf = vec![0u8; 8192];
2493 let n = stream.read(&mut buf).await.unwrap_or(0);
2494 let request = String::from_utf8_lossy(&buf[..n]).to_string();
2495
2496 let traceparent = request
2498 .lines()
2499 .find(|line| line.to_lowercase().starts_with("traceparent:"))
2500 .map(|line| {
2501 line.split(':')
2502 .nth(1)
2503 .map(|s| s.trim().to_string())
2504 .unwrap_or_default()
2505 })
2506 .unwrap_or_default();
2507
2508 let body =
2509 format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
2510 let response = format!(
2511 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
2512 body.len(),
2513 traceparent,
2514 body
2515 );
2516 let _ = stream.write_all(response.as_bytes()).await;
2517 });
2518 }
2519 }
2520 });
2521
2522 (url, handle)
2523 }
2524 }
2525
2526 #[tokio::test]
2535 async fn test_request_body_arrives_as_stream() {
2536 use camel_api::body::Body;
2537 use camel_component::{ConsumerContext, ExchangeEnvelope};
2538
2539 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2540 let port = listener.local_addr().unwrap().port();
2541 drop(listener);
2542
2543 let component = HttpComponent::new();
2544 let endpoint = component
2545 .create_endpoint(&format!("http://127.0.0.1:{port}/upload"))
2546 .unwrap();
2547 let mut consumer = endpoint.create_consumer().unwrap();
2548
2549 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2550 let token = tokio_util::sync::CancellationToken::new();
2551 let ctx = ConsumerContext::new(tx, token.clone());
2552
2553 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2554 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2555
2556 let client = reqwest::Client::new();
2557 let send_fut = client
2558 .post(format!("http://127.0.0.1:{port}/upload"))
2559 .body("hello streaming world")
2560 .send();
2561
2562 let (http_result, _) = tokio::join!(send_fut, async {
2563 if let Some(mut envelope) = rx.recv().await {
2564 assert!(
2566 matches!(envelope.exchange.input.body, Body::Stream(_)),
2567 "expected Body::Stream, got discriminant {:?}",
2568 std::mem::discriminant(&envelope.exchange.input.body)
2569 );
2570 let bytes = envelope
2572 .exchange
2573 .input
2574 .body
2575 .into_bytes(1024 * 1024)
2576 .await
2577 .unwrap();
2578 assert_eq!(&bytes[..], b"hello streaming world");
2579
2580 envelope.exchange.input.body = camel_api::body::Body::Empty;
2581 if let Some(reply_tx) = envelope.reply_tx {
2582 let _ = reply_tx.send(Ok(envelope.exchange));
2583 }
2584 }
2585 });
2586
2587 let resp = http_result.unwrap();
2588 assert_eq!(resp.status().as_u16(), 200);
2589
2590 token.cancel();
2591 }
2592
2593 #[tokio::test]
2598 async fn test_streaming_response_chunked() {
2599 use bytes::Bytes;
2600 use camel_api::CamelError;
2601 use camel_api::body::{Body, StreamBody, StreamMetadata};
2602 use camel_component::{ConsumerContext, ExchangeEnvelope};
2603 use futures::stream;
2604 use std::sync::Arc;
2605 use tokio::sync::Mutex;
2606
2607 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2608 let port = listener.local_addr().unwrap().port();
2609 drop(listener);
2610
2611 let component = HttpComponent::new();
2612 let endpoint = component
2613 .create_endpoint(&format!("http://127.0.0.1:{port}/stream"))
2614 .unwrap();
2615 let mut consumer = endpoint.create_consumer().unwrap();
2616
2617 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2618 let token = tokio_util::sync::CancellationToken::new();
2619 let ctx = ConsumerContext::new(tx, token.clone());
2620
2621 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2622 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2623
2624 let client = reqwest::Client::new();
2625 let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
2626
2627 let (http_result, _) = tokio::join!(send_fut, async {
2628 if let Some(mut envelope) = rx.recv().await {
2629 let chunks: Vec<Result<Bytes, CamelError>> =
2631 vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
2632 let stream = Box::pin(stream::iter(chunks));
2633 envelope.exchange.input.body = Body::Stream(StreamBody {
2634 stream: Arc::new(Mutex::new(Some(stream))),
2635 metadata: StreamMetadata::default(),
2636 });
2637 if let Some(reply_tx) = envelope.reply_tx {
2638 let _ = reply_tx.send(Ok(envelope.exchange));
2639 }
2640 }
2641 });
2642
2643 let resp = http_result.unwrap();
2644 assert_eq!(resp.status().as_u16(), 200);
2645 let body = resp.text().await.unwrap();
2646 assert_eq!(body, "chunk1chunk2");
2647
2648 token.cancel();
2649 }
2650
2651 #[tokio::test]
2656 async fn test_413_when_content_length_exceeds_limit() {
2657 use camel_component::ConsumerContext;
2658
2659 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2660 let port = listener.local_addr().unwrap().port();
2661 drop(listener);
2662
2663 let component = HttpComponent::new();
2665 let endpoint = component
2666 .create_endpoint(&format!(
2667 "http://127.0.0.1:{port}/upload?maxRequestBody=100"
2668 ))
2669 .unwrap();
2670 let mut consumer = endpoint.create_consumer().unwrap();
2671
2672 let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component::ExchangeEnvelope>(16);
2673 let token = tokio_util::sync::CancellationToken::new();
2674 let ctx = ConsumerContext::new(tx, token.clone());
2675
2676 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2677 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2678
2679 let client = reqwest::Client::new();
2680 let resp = client
2681 .post(format!("http://127.0.0.1:{port}/upload"))
2682 .header("Content-Length", "1000") .body("x".repeat(1000))
2684 .send()
2685 .await
2686 .unwrap();
2687
2688 assert_eq!(resp.status().as_u16(), 413);
2689
2690 token.cancel();
2691 }
2692
2693 #[tokio::test]
2697 async fn test_chunked_upload_without_content_length_bypasses_limit() {
2698 use bytes::Bytes;
2699 use camel_api::body::Body;
2700 use camel_component::ConsumerContext;
2701 use futures::stream;
2702
2703 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2704 let port = listener.local_addr().unwrap().port();
2705 drop(listener);
2706
2707 let component = HttpComponent::new();
2709 let endpoint = component
2710 .create_endpoint(&format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"))
2711 .unwrap();
2712 let mut consumer = endpoint.create_consumer().unwrap();
2713
2714 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component::ExchangeEnvelope>(16);
2715 let token = tokio_util::sync::CancellationToken::new();
2716 let ctx = ConsumerContext::new(tx, token.clone());
2717
2718 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2719 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2720
2721 let client = reqwest::Client::new();
2722
2723 let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
2727 Ok(Bytes::from("y".repeat(50))),
2728 Ok(Bytes::from("y".repeat(50))),
2729 ];
2730 let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
2731 let send_fut = client
2732 .post(format!("http://127.0.0.1:{port}/upload"))
2733 .body(stream_body)
2734 .send();
2735
2736 let consumer_fut = async {
2737 match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
2739 Ok(Some(mut envelope)) => {
2740 assert!(
2741 matches!(envelope.exchange.input.body, Body::Stream(_)),
2742 "expected Body::Stream"
2743 );
2744 envelope.exchange.input.body = camel_api::body::Body::Empty;
2745 if let Some(reply_tx) = envelope.reply_tx {
2746 let _ = reply_tx.send(Ok(envelope.exchange));
2747 }
2748 }
2749 Ok(None) => panic!("consumer channel closed unexpectedly"),
2750 Err(_) => {
2751 }
2754 }
2755 };
2756
2757 let (http_result, _) = tokio::join!(send_fut, consumer_fut);
2758
2759 let resp = http_result.unwrap();
2760 assert_ne!(
2762 resp.status().as_u16(),
2763 413,
2764 "chunked upload must not be rejected by maxRequestBody"
2765 );
2766 assert_eq!(resp.status().as_u16(), 200);
2767
2768 token.cancel();
2769 }
2770}