1pub mod bundle;
2pub mod config;
3pub use bundle::HttpBundle;
4pub use config::HttpConfig;
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex, OnceLock};
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use tokio::sync::{OnceCell, RwLock};
14use tower::Service;
15use tracing::debug;
16
17use axum::body::BodyDataStream;
18use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, StreamBody, StreamMetadata};
19use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
20use camel_component_api::{UriComponents, UriConfig, parse_uri};
21use futures::TryStreamExt;
22use futures::stream::BoxStream;
23
24#[derive(Debug, Clone)]
85pub struct HttpEndpointConfig {
86 pub base_url: String,
87 pub http_method: Option<String>,
88 pub throw_exception_on_failure: bool,
89 pub ok_status_code_range: (u16, u16),
90 pub response_timeout: Option<Duration>,
91 pub query_params: HashMap<String, String>,
92 pub allow_private_ips: bool,
93 pub blocked_hosts: Vec<String>,
94 pub max_body_size: usize,
95}
96
97const HTTP_CAMEL_OPTIONS: &[&str] = &[
99 "httpMethod",
100 "throwExceptionOnFailure",
101 "okStatusCodeRange",
102 "followRedirects",
103 "connectTimeout",
104 "responseTimeout",
105 "allowPrivateIps",
106 "blockedHosts",
107 "maxBodySize",
108];
109
110impl UriConfig for HttpEndpointConfig {
111 fn scheme() -> &'static str {
113 "http"
114 }
115
116 fn from_uri(uri: &str) -> Result<Self, CamelError> {
117 let parts = parse_uri(uri)?;
118 Self::from_components(parts)
119 }
120
121 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
122 if parts.scheme != "http" && parts.scheme != "https" {
124 return Err(CamelError::InvalidUri(format!(
125 "expected scheme 'http' or 'https', got '{}'",
126 parts.scheme
127 )));
128 }
129
130 let base_url = format!("{}:{}", parts.scheme, parts.path);
133
134 let http_method = parts.params.get("httpMethod").cloned();
135
136 let throw_exception_on_failure = parts
137 .params
138 .get("throwExceptionOnFailure")
139 .map(|v| v != "false")
140 .unwrap_or(true);
141
142 let ok_status_code_range = parts
144 .params
145 .get("okStatusCodeRange")
146 .and_then(|v| {
147 let (start, end) = v.split_once('-')?;
148 Some((start.parse::<u16>().ok()?, end.parse::<u16>().ok()?))
149 })
150 .unwrap_or((200, 299));
151
152 let response_timeout = parts
153 .params
154 .get("responseTimeout")
155 .and_then(|v| v.parse::<u64>().ok())
156 .map(Duration::from_millis);
157
158 let allow_private_ips = parts
160 .params
161 .get("allowPrivateIps")
162 .map(|v| v == "true")
163 .unwrap_or(false); let blocked_hosts = parts
167 .params
168 .get("blockedHosts")
169 .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
170 .unwrap_or_default();
171
172 let max_body_size = parts
173 .params
174 .get("maxBodySize")
175 .and_then(|v| v.parse::<usize>().ok())
176 .unwrap_or(10 * 1024 * 1024); let query_params: HashMap<String, String> = parts
180 .params
181 .into_iter()
182 .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
183 .collect();
184
185 Ok(Self {
186 base_url,
187 http_method,
188 throw_exception_on_failure,
189 ok_status_code_range,
190 response_timeout,
191 query_params,
192 allow_private_ips,
193 blocked_hosts,
194 max_body_size,
195 })
196 }
197}
198
199impl HttpEndpointConfig {
200 pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
201 let parts = parse_uri(uri)?;
202 let mut endpoint = Self::from_components(parts.clone())?;
203 if endpoint.response_timeout.is_none() {
204 endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
205 }
206 if !parts.params.contains_key("allowPrivateIps") {
207 endpoint.allow_private_ips = config.allow_private_ips;
208 }
209 if !parts.params.contains_key("blockedHosts") {
210 endpoint.blocked_hosts = config.blocked_hosts.clone();
211 }
212 if !parts.params.contains_key("maxBodySize") {
213 endpoint.max_body_size = config.max_body_size;
214 }
215 Ok(endpoint)
216 }
217}
218
219#[derive(Debug, Clone)]
225pub struct HttpServerConfig {
226 pub host: String,
228 pub port: u16,
230 pub path: String,
232 pub max_request_body: usize,
234 pub max_response_body: usize,
236}
237
238impl UriConfig for HttpServerConfig {
239 fn scheme() -> &'static str {
241 "http"
242 }
243
244 fn from_uri(uri: &str) -> Result<Self, CamelError> {
245 let parts = parse_uri(uri)?;
246 Self::from_components(parts)
247 }
248
249 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
250 if parts.scheme != "http" && parts.scheme != "https" {
252 return Err(CamelError::InvalidUri(format!(
253 "expected scheme 'http' or 'https', got '{}'",
254 parts.scheme
255 )));
256 }
257
258 let authority_and_path = parts.path.trim_start_matches('/');
261
262 let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
264 (&authority_and_path[..idx], &authority_and_path[idx..])
265 } else {
266 (authority_and_path, "/")
267 };
268
269 let path = if path_suffix.is_empty() {
270 "/"
271 } else {
272 path_suffix
273 }
274 .to_string();
275
276 let (host, port) = if let Some(colon) = authority.rfind(':') {
278 let port_str = &authority[colon + 1..];
279 match port_str.parse::<u16>() {
280 Ok(p) => (authority[..colon].to_string(), p),
281 Err(_) => {
282 return Err(CamelError::InvalidUri(format!(
283 "invalid port '{}' in authority",
284 port_str
285 )));
286 }
287 }
288 } else {
289 let default_port = if parts.scheme == "https" { 443 } else { 80 };
291 (authority.to_string(), default_port)
292 };
293
294 let max_request_body = parts
295 .params
296 .get("maxRequestBody")
297 .and_then(|v| v.parse::<usize>().ok())
298 .unwrap_or(2 * 1024 * 1024); let max_response_body = parts
301 .params
302 .get("maxResponseBody")
303 .and_then(|v| v.parse::<usize>().ok())
304 .unwrap_or(10 * 1024 * 1024); Ok(Self {
307 host,
308 port,
309 path,
310 max_request_body,
311 max_response_body,
312 })
313 }
314}
315
316impl HttpServerConfig {
317 pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
318 let parts = parse_uri(uri)?;
319 let mut server = Self::from_components(parts.clone())?;
320 if !parts.params.contains_key("maxRequestBody") {
321 server.max_request_body = config.max_request_body;
322 }
323 if !parts.params.contains_key("maxResponseBody") {
324 server.max_response_body = config.max_body_size;
325 }
326 Ok(server)
327 }
328}
329
330pub(crate) enum HttpReplyBody {
336 Bytes(bytes::Bytes),
337 Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
338}
339
340pub(crate) struct RequestEnvelope {
343 pub(crate) method: String,
344 pub(crate) path: String,
345 pub(crate) query: String,
346 pub(crate) headers: http::HeaderMap,
347 pub(crate) body: StreamBody,
348 pub(crate) reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
349}
350
351pub(crate) struct HttpReply {
353 pub(crate) status: u16,
354 pub(crate) headers: Vec<(String, String)>,
355 pub(crate) body: HttpReplyBody,
356}
357
358pub(crate) type DispatchTable =
364 Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
365
366struct ServerHandle {
368 dispatch: DispatchTable,
369 _task: tokio::task::JoinHandle<()>,
371}
372
373pub struct ServerRegistry {
375 inner: Mutex<HashMap<u16, Arc<OnceCell<ServerHandle>>>>,
376}
377
378impl ServerRegistry {
379 pub fn global() -> &'static Self {
381 static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
382 INSTANCE.get_or_init(|| ServerRegistry {
383 inner: Mutex::new(HashMap::new()),
384 })
385 }
386
387 pub(crate) async fn get_or_spawn(
390 &'static self,
391 host: &str,
392 port: u16,
393 max_request_body: usize,
394 ) -> Result<DispatchTable, CamelError> {
395 let host_owned = host.to_string();
396
397 let cell = {
398 let mut guard = self.inner.lock().map_err(|_| {
399 CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
400 })?;
401 guard
402 .entry(port)
403 .or_insert_with(|| Arc::new(OnceCell::new()))
404 .clone()
405 };
406
407 let handle = cell
408 .get_or_try_init(|| async {
409 let addr = format!("{host_owned}:{port}");
410 let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
411 CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
412 })?;
413 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
414 let task = tokio::spawn(run_axum_server(
415 listener,
416 Arc::clone(&dispatch),
417 max_request_body,
418 ));
419 Ok::<ServerHandle, CamelError>(ServerHandle {
420 dispatch,
421 _task: task,
422 })
423 })
424 .await?;
425
426 Ok(Arc::clone(&handle.dispatch))
427 }
428}
429
430use axum::{
435 Router,
436 body::Body as AxumBody,
437 extract::{Request, State},
438 http::{Response, StatusCode},
439 response::IntoResponse,
440};
441
442#[derive(Clone)]
443struct AppState {
444 dispatch: DispatchTable,
445 max_request_body: usize,
446}
447
448async fn run_axum_server(
449 listener: tokio::net::TcpListener,
450 dispatch: DispatchTable,
451 max_request_body: usize,
452) {
453 let state = AppState {
454 dispatch,
455 max_request_body,
456 };
457 let app = Router::new().fallback(dispatch_handler).with_state(state);
458
459 axum::serve(listener, app).await.unwrap_or_else(|e| {
460 tracing::error!(error = %e, "Axum server error");
461 });
462}
463
464async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
465 let method = req.method().to_string();
466 let path = req.uri().path().to_string();
467 let query = req.uri().query().unwrap_or("").to_string();
468 let headers = req.headers().clone();
469
470 let content_length: Option<u64> = headers
472 .get(http::header::CONTENT_LENGTH)
473 .and_then(|v| v.to_str().ok())
474 .and_then(|s| s.parse().ok());
475
476 if let Some(len) = content_length
477 && len > state.max_request_body as u64
478 {
479 return Response::builder()
480 .status(StatusCode::PAYLOAD_TOO_LARGE)
481 .body(AxumBody::from("Request body exceeds configured limit"))
482 .expect("infallible");
483 }
484
485 let content_type = headers
487 .get(http::header::CONTENT_TYPE)
488 .and_then(|v| v.to_str().ok())
489 .map(|s| s.to_string());
490
491 let data_stream: BodyDataStream = req.into_body().into_data_stream();
492 let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
493 let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
494
495 let stream_body = StreamBody {
496 stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
497 metadata: StreamMetadata {
498 size_hint: content_length,
499 content_type,
500 origin: None,
501 },
502 };
503
504 let sender = {
506 let table = state.dispatch.read().await;
507 table.get(&path).cloned()
508 };
509 let Some(sender) = sender else {
510 return Response::builder()
511 .status(StatusCode::NOT_FOUND)
512 .body(AxumBody::from("No consumer registered for this path"))
513 .expect("infallible");
514 };
515
516 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
517 let envelope = RequestEnvelope {
518 method,
519 path,
520 query,
521 headers,
522 body: stream_body,
523 reply_tx,
524 };
525
526 if sender.send(envelope).await.is_err() {
527 return Response::builder()
528 .status(StatusCode::SERVICE_UNAVAILABLE)
529 .body(AxumBody::from("Consumer unavailable"))
530 .expect("infallible");
531 }
532
533 match reply_rx.await {
534 Ok(reply) => {
535 let status =
536 StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
537 let mut builder = Response::builder().status(status);
538 for (k, v) in &reply.headers {
539 builder = builder.header(k.as_str(), v.as_str());
540 }
541 match reply.body {
542 HttpReplyBody::Bytes(b) => builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
543 Response::builder()
544 .status(StatusCode::INTERNAL_SERVER_ERROR)
545 .body(AxumBody::from("Invalid response headers from consumer"))
546 .expect("infallible")
547 }),
548 HttpReplyBody::Stream(stream) => builder
549 .body(AxumBody::from_stream(stream))
550 .unwrap_or_else(|_| {
551 Response::builder()
552 .status(StatusCode::INTERNAL_SERVER_ERROR)
553 .body(AxumBody::from("Invalid response headers from consumer"))
554 .expect("infallible")
555 }),
556 }
557 }
558 Err(_) => Response::builder()
559 .status(StatusCode::INTERNAL_SERVER_ERROR)
560 .body(AxumBody::from("Pipeline error"))
561 .expect("Response::builder() with a known-valid status code and body is infallible"),
562 }
563}
564
565pub struct HttpConsumer {
570 config: HttpServerConfig,
571}
572
573impl HttpConsumer {
574 pub fn new(config: HttpServerConfig) -> Self {
575 Self { config }
576 }
577}
578
579#[async_trait::async_trait]
580impl Consumer for HttpConsumer {
581 async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
582 use camel_component_api::{Body, Exchange, Message};
583
584 let dispatch = ServerRegistry::global()
585 .get_or_spawn(
586 &self.config.host,
587 self.config.port,
588 self.config.max_request_body,
589 )
590 .await?;
591
592 let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
594 {
595 let mut table = dispatch.write().await;
596 table.insert(self.config.path.clone(), env_tx);
597 }
598
599 let path = self.config.path.clone();
600 let cancel_token = ctx.cancel_token();
601 let _max_response_body = self.config.max_response_body;
602
603 loop {
604 tokio::select! {
605 _ = ctx.cancelled() => {
606 break;
607 }
608 envelope = env_rx.recv() => {
609 let Some(envelope) = envelope else { break; };
610
611 let mut msg = Message::default();
613
614 msg.set_header("CamelHttpMethod",
616 serde_json::Value::String(envelope.method.clone()));
617 msg.set_header("CamelHttpPath",
618 serde_json::Value::String(envelope.path.clone()));
619 msg.set_header("CamelHttpQuery",
620 serde_json::Value::String(envelope.query.clone()));
621
622 for (k, v) in &envelope.headers {
624 if let Ok(val_str) = v.to_str() {
625 msg.set_header(
626 k.as_str(),
627 serde_json::Value::String(val_str.to_string()),
628 );
629 }
630 }
631
632 msg.body = Body::Stream(envelope.body);
635
636 #[allow(unused_mut)]
637 let mut exchange = Exchange::new(msg);
638
639 #[cfg(feature = "otel")]
641 {
642 let headers: HashMap<String, String> = envelope
643 .headers
644 .iter()
645 .filter_map(|(k, v)| {
646 Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
647 })
648 .collect();
649 camel_otel::extract_into_exchange(&mut exchange, &headers);
650 }
651
652 let reply_tx = envelope.reply_tx;
653 let sender = ctx.sender().clone();
654 let path_clone = path.clone();
655 let cancel = cancel_token.clone();
656
657 tokio::spawn(async move {
677 if cancel.is_cancelled() {
685 let _ = reply_tx.send(HttpReply {
686 status: 503,
687 headers: vec![],
688 body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
689 });
690 return;
691 }
692
693 let (tx, rx) = tokio::sync::oneshot::channel();
695 let envelope = camel_component_api::consumer::ExchangeEnvelope {
696 exchange,
697 reply_tx: Some(tx),
698 };
699
700 let result = match sender.send(envelope).await {
701 Ok(()) => rx.await.map_err(|_| camel_component_api::CamelError::ChannelClosed),
702 Err(_) => Err(camel_component_api::CamelError::ChannelClosed),
703 }
704 .and_then(|r| r);
705
706 let reply = match result {
707 Ok(out) => {
708 let status = out
709 .input
710 .header("CamelHttpResponseCode")
711 .and_then(|v| v.as_u64())
712 .map(|s| s as u16)
713 .unwrap_or(200);
714
715 let reply_body: HttpReplyBody = match out.input.body {
716 Body::Empty => HttpReplyBody::Bytes(bytes::Bytes::new()),
717 Body::Bytes(b) => HttpReplyBody::Bytes(b),
718 Body::Text(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
719 Body::Xml(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
720 Body::Json(v) => HttpReplyBody::Bytes(bytes::Bytes::from(
721 v.to_string().into_bytes(),
722 )),
723 Body::Stream(s) => {
724 match s.stream.lock().await.take() {
725 Some(stream) => HttpReplyBody::Stream(stream),
726 None => {
727 tracing::error!(
728 "Body::Stream already consumed before HTTP reply — returning 500"
729 );
730 let error_reply = HttpReply {
731 status: 500,
732 headers: vec![],
733 body: HttpReplyBody::Bytes(bytes::Bytes::new()),
734 };
735 if reply_tx.send(error_reply).is_err() {
736 debug!("reply_tx dropped before error reply could be sent");
737 }
738 return;
739 }
740 }
741 }
742 };
743
744 let resp_headers: Vec<(String, String)> = out
745 .input
746 .headers
747 .iter()
748 .filter(|(k, _)| !k.starts_with("Camel"))
750 .filter(|(k, _)| {
753 !matches!(
754 k.to_lowercase().as_str(),
755 "content-length" | "content-type" | "transfer-encoding" | "connection" | "cache-control" | "date" | "pragma" | "trailer" | "upgrade" | "via" | "warning" | "host" | "user-agent" | "accept" | "accept-encoding" | "accept-language" | "accept-charset" | "authorization" | "proxy-authorization" | "cookie" | "expect" | "from" | "if-match" | "if-modified-since" | "if-none-match" | "if-range" | "if-unmodified-since" | "max-forwards" | "proxy-connection" | "range" | "referer" | "te" )
790 })
791 .filter_map(|(k, v)| {
792 v.as_str().map(|s| (k.clone(), s.to_string()))
793 })
794 .collect();
795
796 HttpReply {
797 status,
798 headers: resp_headers,
799 body: reply_body,
800 }
801 }
802 Err(e) => {
803 tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
804 HttpReply {
805 status: 500,
806 headers: vec![],
807 body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
808 }
809 }
810 };
811
812 let _ = reply_tx.send(reply);
814 });
815 }
816 }
817 }
818
819 {
821 let mut table = dispatch.write().await;
822 table.remove(&path);
823 }
824
825 Ok(())
826 }
827
828 async fn stop(&mut self) -> Result<(), CamelError> {
829 Ok(())
830 }
831
832 fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
833 camel_component_api::ConcurrencyModel::Concurrent { max: None }
834 }
835}
836
837pub struct HttpComponent {
842 client: reqwest::Client,
843 config: HttpConfig,
844}
845
846fn build_client(config: &HttpConfig) -> reqwest::Client {
847 let mut builder = reqwest::Client::builder()
848 .connect_timeout(Duration::from_millis(config.connect_timeout_ms))
849 .pool_max_idle_per_host(config.pool_max_idle_per_host)
850 .pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
851
852 if !config.follow_redirects {
853 builder = builder.redirect(reqwest::redirect::Policy::none());
854 }
855
856 builder
857 .build()
858 .expect("reqwest::Client::build() with valid config should not fail")
859}
860
861impl HttpComponent {
862 pub fn new() -> Self {
863 let config = HttpConfig::default();
864 let client = build_client(&config);
865 Self { client, config }
866 }
867
868 pub fn with_config(config: HttpConfig) -> Self {
869 let client = build_client(&config);
870 Self { client, config }
871 }
872
873 pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
874 match config {
875 Some(cfg) => Self::with_config(cfg),
876 None => Self::new(),
877 }
878 }
879}
880
881impl Default for HttpComponent {
882 fn default() -> Self {
883 Self::new()
884 }
885}
886
887impl Component for HttpComponent {
888 fn scheme(&self) -> &str {
889 "http"
890 }
891
892 fn create_endpoint(
893 &self,
894 uri: &str,
895 _ctx: &dyn camel_component_api::ComponentContext,
896 ) -> Result<Box<dyn Endpoint>, CamelError> {
897 let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
898 let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
899 Ok(Box::new(HttpEndpoint {
900 uri: uri.to_string(),
901 config,
902 server_config,
903 client: self.client.clone(),
904 }))
905 }
906}
907
908pub struct HttpsComponent {
909 client: reqwest::Client,
910 config: HttpConfig,
911}
912
913impl HttpsComponent {
914 pub fn new() -> Self {
915 let config = HttpConfig::default();
916 let client = build_client(&config);
917 Self { client, config }
918 }
919
920 pub fn with_config(config: HttpConfig) -> Self {
921 let client = build_client(&config);
922 Self { client, config }
923 }
924
925 pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
926 match config {
927 Some(cfg) => Self::with_config(cfg),
928 None => Self::new(),
929 }
930 }
931}
932
933impl Default for HttpsComponent {
934 fn default() -> Self {
935 Self::new()
936 }
937}
938
939impl Component for HttpsComponent {
940 fn scheme(&self) -> &str {
941 "https"
942 }
943
944 fn create_endpoint(
945 &self,
946 uri: &str,
947 _ctx: &dyn camel_component_api::ComponentContext,
948 ) -> Result<Box<dyn Endpoint>, CamelError> {
949 let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
950 let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
951 Ok(Box::new(HttpEndpoint {
952 uri: uri.to_string(),
953 config,
954 server_config,
955 client: self.client.clone(),
956 }))
957 }
958}
959
960struct HttpEndpoint {
965 uri: String,
966 config: HttpEndpointConfig,
967 server_config: HttpServerConfig,
968 client: reqwest::Client,
969}
970
971impl Endpoint for HttpEndpoint {
972 fn uri(&self) -> &str {
973 &self.uri
974 }
975
976 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
977 Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
978 }
979
980 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
981 Ok(BoxProcessor::new(HttpProducer {
982 config: Arc::new(self.config.clone()),
983 client: self.client.clone(),
984 }))
985 }
986}
987
988fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
993 let parsed = url::Url::parse(url)
994 .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
995
996 if let Some(host) = parsed.host_str()
998 && config.blocked_hosts.iter().any(|blocked| host == blocked)
999 {
1000 return Err(CamelError::ProcessorError(format!(
1001 "Host '{}' is blocked",
1002 host
1003 )));
1004 }
1005
1006 if !config.allow_private_ips
1008 && let Some(host) = parsed.host()
1009 {
1010 match host {
1011 url::Host::Ipv4(ip) => {
1012 if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
1013 return Err(CamelError::ProcessorError(format!(
1014 "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
1015 ip
1016 )));
1017 }
1018 }
1019 url::Host::Ipv6(ip) => {
1020 if ip.is_loopback() {
1021 return Err(CamelError::ProcessorError(format!(
1022 "Loopback IP '{}' not allowed",
1023 ip
1024 )));
1025 }
1026 }
1027 url::Host::Domain(domain) => {
1028 let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
1030 if blocked_domains.contains(&domain) {
1031 return Err(CamelError::ProcessorError(format!(
1032 "Domain '{}' is not allowed",
1033 domain
1034 )));
1035 }
1036 }
1037 }
1038 }
1039
1040 Ok(())
1041}
1042
1043#[derive(Clone)]
1048struct HttpProducer {
1049 config: Arc<HttpEndpointConfig>,
1050 client: reqwest::Client,
1051}
1052
1053impl HttpProducer {
1054 fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1055 if let Some(ref method) = config.http_method {
1056 return method.to_uppercase();
1057 }
1058 if let Some(method) = exchange
1059 .input
1060 .header("CamelHttpMethod")
1061 .and_then(|v| v.as_str())
1062 {
1063 return method.to_uppercase();
1064 }
1065 if !exchange.input.body.is_empty() {
1066 return "POST".to_string();
1067 }
1068 "GET".to_string()
1069 }
1070
1071 fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1072 if let Some(uri) = exchange
1073 .input
1074 .header("CamelHttpUri")
1075 .and_then(|v| v.as_str())
1076 {
1077 let mut url = uri.to_string();
1078 if let Some(path) = exchange
1079 .input
1080 .header("CamelHttpPath")
1081 .and_then(|v| v.as_str())
1082 {
1083 if !url.ends_with('/') && !path.starts_with('/') {
1084 url.push('/');
1085 }
1086 url.push_str(path);
1087 }
1088 if let Some(query) = exchange
1089 .input
1090 .header("CamelHttpQuery")
1091 .and_then(|v| v.as_str())
1092 {
1093 url.push('?');
1094 url.push_str(query);
1095 }
1096 return url;
1097 }
1098
1099 let mut url = config.base_url.clone();
1100
1101 if let Some(path) = exchange
1102 .input
1103 .header("CamelHttpPath")
1104 .and_then(|v| v.as_str())
1105 {
1106 if !url.ends_with('/') && !path.starts_with('/') {
1107 url.push('/');
1108 }
1109 url.push_str(path);
1110 }
1111
1112 if let Some(query) = exchange
1113 .input
1114 .header("CamelHttpQuery")
1115 .and_then(|v| v.as_str())
1116 {
1117 url.push('?');
1118 url.push_str(query);
1119 } else if !config.query_params.is_empty() {
1120 url.push('?');
1122 let query_string: String = config
1123 .query_params
1124 .iter()
1125 .map(|(k, v)| format!("{k}={v}"))
1126 .collect::<Vec<_>>()
1127 .join("&");
1128 url.push_str(&query_string);
1129 }
1130
1131 url
1132 }
1133
1134 fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1135 status >= range.0 && status <= range.1
1136 }
1137}
1138
1139impl Service<Exchange> for HttpProducer {
1140 type Response = Exchange;
1141 type Error = CamelError;
1142 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1143
1144 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1145 Poll::Ready(Ok(()))
1146 }
1147
1148 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1149 let config = self.config.clone();
1150 let client = self.client.clone();
1151
1152 Box::pin(async move {
1153 let method_str = HttpProducer::resolve_method(&exchange, &config);
1154 let url = HttpProducer::resolve_url(&exchange, &config);
1155
1156 validate_url_for_ssrf(&url, &config)?;
1158
1159 debug!(
1160 correlation_id = %exchange.correlation_id(),
1161 method = %method_str,
1162 url = %url,
1163 "HTTP request"
1164 );
1165
1166 let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1167 CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1168 })?;
1169
1170 let mut request = client.request(method, &url);
1171
1172 if let Some(timeout) = config.response_timeout {
1173 request = request.timeout(timeout);
1174 }
1175
1176 #[cfg(feature = "otel")]
1178 {
1179 let mut otel_headers = HashMap::new();
1180 camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1181 for (k, v) in otel_headers {
1182 if let (Ok(name), Ok(val)) = (
1183 reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1184 reqwest::header::HeaderValue::from_str(&v),
1185 ) {
1186 request = request.header(name, val);
1187 }
1188 }
1189 }
1190
1191 for (key, value) in &exchange.input.headers {
1192 if !key.starts_with("Camel")
1193 && let Some(val_str) = value.as_str()
1194 && let (Ok(name), Ok(val)) = (
1195 reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1196 reqwest::header::HeaderValue::from_str(val_str),
1197 )
1198 {
1199 request = request.header(name, val);
1200 }
1201 }
1202
1203 match exchange.input.body {
1204 Body::Stream(ref s) => {
1205 let mut stream_lock = s.stream.lock().await;
1206 if let Some(stream) = stream_lock.take() {
1207 request = request.body(reqwest::Body::wrap_stream(stream));
1208 } else {
1209 return Err(CamelError::AlreadyConsumed);
1210 }
1211 }
1212 _ => {
1213 let body = std::mem::take(&mut exchange.input.body);
1215 let bytes = body.into_bytes(config.max_body_size).await?;
1216 if !bytes.is_empty() {
1217 request = request.body(bytes);
1218 }
1219 }
1220 }
1221
1222 let response = request
1223 .send()
1224 .await
1225 .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1226
1227 let status_code = response.status().as_u16();
1228 let status_text = response
1229 .status()
1230 .canonical_reason()
1231 .unwrap_or("Unknown")
1232 .to_string();
1233
1234 for (key, value) in response.headers() {
1235 if let Ok(val_str) = value.to_str() {
1236 exchange
1237 .input
1238 .set_header(key.as_str(), serde_json::Value::String(val_str.to_string()));
1239 }
1240 }
1241
1242 exchange.input.set_header(
1243 "CamelHttpResponseCode",
1244 serde_json::Value::Number(status_code.into()),
1245 );
1246 exchange.input.set_header(
1247 "CamelHttpResponseText",
1248 serde_json::Value::String(status_text.clone()),
1249 );
1250
1251 let response_body = response.bytes().await.map_err(|e| {
1252 CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1253 })?;
1254
1255 if config.throw_exception_on_failure
1256 && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1257 {
1258 return Err(CamelError::HttpOperationFailed {
1259 method: method_str,
1260 url,
1261 status_code,
1262 status_text,
1263 response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1264 });
1265 }
1266
1267 if !response_body.is_empty() {
1268 exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1269 }
1270
1271 debug!(
1272 correlation_id = %exchange.correlation_id(),
1273 status = status_code,
1274 url = %url,
1275 "HTTP response"
1276 );
1277 Ok(exchange)
1278 })
1279 }
1280}
1281
1282#[cfg(test)]
1283mod tests {
1284 use super::*;
1285 use camel_component_api::{Message, NoOpComponentContext};
1286 use std::sync::Arc;
1287 use std::time::Duration;
1288
1289 fn test_producer_ctx() -> ProducerContext {
1290 ProducerContext::new()
1291 }
1292
1293 #[test]
1294 fn test_http_config_defaults() {
1295 let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1296 assert_eq!(config.base_url, "http://localhost:8080/api");
1297 assert!(config.http_method.is_none());
1298 assert!(config.throw_exception_on_failure);
1299 assert_eq!(config.ok_status_code_range, (200, 299));
1300 assert!(config.response_timeout.is_none());
1301 }
1302
1303 #[test]
1304 fn test_http_config_scheme() {
1305 assert_eq!(HttpEndpointConfig::scheme(), "http");
1307 }
1308
1309 #[test]
1310 fn test_http_config_from_components() {
1311 let components = camel_component_api::UriComponents {
1313 scheme: "https".to_string(),
1314 path: "//api.example.com/v1".to_string(),
1315 params: std::collections::HashMap::from([(
1316 "httpMethod".to_string(),
1317 "POST".to_string(),
1318 )]),
1319 };
1320 let config = HttpEndpointConfig::from_components(components).unwrap();
1321 assert_eq!(config.base_url, "https://api.example.com/v1");
1322 assert_eq!(config.http_method, Some("POST".to_string()));
1323 }
1324
1325 #[test]
1326 fn test_http_config_with_options() {
1327 let config = HttpEndpointConfig::from_uri(
1328 "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1329 ).unwrap();
1330 assert_eq!(config.base_url, "https://api.example.com/v1");
1331 assert_eq!(config.http_method, Some("PUT".to_string()));
1332 assert!(!config.throw_exception_on_failure);
1333 assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1334 }
1335
1336 #[test]
1337 fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
1338 let config = HttpConfig::default()
1339 .with_response_timeout_ms(999)
1340 .with_allow_private_ips(true)
1341 .with_blocked_hosts(vec!["evil.com".to_string()])
1342 .with_max_body_size(12345);
1343 let endpoint =
1344 HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
1345 assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
1346 assert!(endpoint.allow_private_ips);
1347 assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
1348 assert_eq!(endpoint.max_body_size, 12345);
1349 }
1350
1351 #[test]
1352 fn test_from_uri_with_defaults_uri_overrides_config() {
1353 let config = HttpConfig::default()
1354 .with_response_timeout_ms(999)
1355 .with_allow_private_ips(true)
1356 .with_blocked_hosts(vec!["evil.com".to_string()])
1357 .with_max_body_size(12345);
1358 let endpoint = HttpEndpointConfig::from_uri_with_defaults(
1359 "http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
1360 &config,
1361 )
1362 .unwrap();
1363 assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
1364 assert!(!endpoint.allow_private_ips);
1365 assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
1366 assert_eq!(endpoint.max_body_size, 99);
1367 }
1368
1369 #[test]
1370 fn test_http_config_ok_status_range() {
1371 let config =
1372 HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1373 assert_eq!(config.ok_status_code_range, (200, 204));
1374 }
1375
1376 #[test]
1377 fn test_http_config_wrong_scheme() {
1378 let result = HttpEndpointConfig::from_uri("file:/tmp");
1379 assert!(result.is_err());
1380 }
1381
1382 #[test]
1383 fn test_http_component_scheme() {
1384 let component = HttpComponent::new();
1385 assert_eq!(component.scheme(), "http");
1386 }
1387
1388 #[test]
1389 fn test_https_component_scheme() {
1390 let component = HttpsComponent::new();
1391 assert_eq!(component.scheme(), "https");
1392 }
1393
1394 #[test]
1395 fn test_http_endpoint_creates_consumer() {
1396 let component = HttpComponent::new();
1397 let ctx = NoOpComponentContext;
1398 let endpoint = component
1399 .create_endpoint("http://0.0.0.0:19100/test", &ctx)
1400 .unwrap();
1401 assert!(endpoint.create_consumer().is_ok());
1402 }
1403
1404 #[test]
1405 fn test_https_endpoint_creates_consumer() {
1406 let component = HttpsComponent::new();
1407 let ctx = NoOpComponentContext;
1408 let endpoint = component
1409 .create_endpoint("https://0.0.0.0:8443/test", &ctx)
1410 .unwrap();
1411 assert!(endpoint.create_consumer().is_ok());
1412 }
1413
1414 #[test]
1415 fn test_http_endpoint_creates_producer() {
1416 let ctx = test_producer_ctx();
1417 let component = HttpComponent::new();
1418 let endpoint_ctx = NoOpComponentContext;
1419 let endpoint = component
1420 .create_endpoint("http://localhost/api", &endpoint_ctx)
1421 .unwrap();
1422 assert!(endpoint.create_producer(&ctx).is_ok());
1423 }
1424
1425 async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
1430 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1431 let addr = listener.local_addr().unwrap();
1432 let url = format!("http://127.0.0.1:{}", addr.port());
1433
1434 let handle = tokio::spawn(async move {
1435 loop {
1436 if let Ok((mut stream, _)) = listener.accept().await {
1437 tokio::spawn(async move {
1438 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1439 let mut buf = vec![0u8; 4096];
1440 let n = stream.read(&mut buf).await.unwrap_or(0);
1441 let request = String::from_utf8_lossy(&buf[..n]).to_string();
1442
1443 let method = request.split_whitespace().next().unwrap_or("GET");
1444
1445 let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
1446 let response = format!(
1447 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
1448 body.len(),
1449 body
1450 );
1451 let _ = stream.write_all(response.as_bytes()).await;
1452 });
1453 }
1454 }
1455 });
1456
1457 (url, handle)
1458 }
1459
1460 async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
1461 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1462 let addr = listener.local_addr().unwrap();
1463 let url = format!("http://127.0.0.1:{}", addr.port());
1464
1465 let handle = tokio::spawn(async move {
1466 loop {
1467 if let Ok((mut stream, _)) = listener.accept().await {
1468 let status = status;
1469 tokio::spawn(async move {
1470 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1471 let mut buf = vec![0u8; 4096];
1472 let _ = stream.read(&mut buf).await;
1473
1474 let status_text = match status {
1475 404 => "Not Found",
1476 500 => "Internal Server Error",
1477 _ => "Error",
1478 };
1479 let body = "error body";
1480 let response = format!(
1481 "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
1482 status,
1483 status_text,
1484 body.len(),
1485 body
1486 );
1487 let _ = stream.write_all(response.as_bytes()).await;
1488 });
1489 }
1490 }
1491 });
1492
1493 (url, handle)
1494 }
1495
1496 #[tokio::test]
1497 async fn test_http_producer_get_request() {
1498 use tower::ServiceExt;
1499
1500 let (url, _handle) = start_test_server().await;
1501 let ctx = test_producer_ctx();
1502
1503 let component = HttpComponent::new();
1504 let endpoint_ctx = NoOpComponentContext;
1505 let endpoint = component
1506 .create_endpoint(
1507 &format!("{url}/api/test?allowPrivateIps=true"),
1508 &endpoint_ctx,
1509 )
1510 .unwrap();
1511 let producer = endpoint.create_producer(&ctx).unwrap();
1512
1513 let exchange = Exchange::new(Message::default());
1514 let result = producer.oneshot(exchange).await.unwrap();
1515
1516 let status = result
1517 .input
1518 .header("CamelHttpResponseCode")
1519 .and_then(|v| v.as_u64())
1520 .unwrap();
1521 assert_eq!(status, 200);
1522
1523 assert!(!result.input.body.is_empty());
1524 }
1525
1526 #[tokio::test]
1527 async fn test_http_producer_post_with_body() {
1528 use tower::ServiceExt;
1529
1530 let (url, _handle) = start_test_server().await;
1531 let ctx = test_producer_ctx();
1532
1533 let component = HttpComponent::new();
1534 let endpoint_ctx = NoOpComponentContext;
1535 let endpoint = component
1536 .create_endpoint(
1537 &format!("{url}/api/data?allowPrivateIps=true"),
1538 &endpoint_ctx,
1539 )
1540 .unwrap();
1541 let producer = endpoint.create_producer(&ctx).unwrap();
1542
1543 let exchange = Exchange::new(Message::new("request body"));
1544 let result = producer.oneshot(exchange).await.unwrap();
1545
1546 let status = result
1547 .input
1548 .header("CamelHttpResponseCode")
1549 .and_then(|v| v.as_u64())
1550 .unwrap();
1551 assert_eq!(status, 200);
1552 }
1553
1554 #[tokio::test]
1555 async fn test_http_producer_method_from_header() {
1556 use tower::ServiceExt;
1557
1558 let (url, _handle) = start_test_server().await;
1559 let ctx = test_producer_ctx();
1560
1561 let component = HttpComponent::new();
1562 let endpoint_ctx = NoOpComponentContext;
1563 let endpoint = component
1564 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
1565 .unwrap();
1566 let producer = endpoint.create_producer(&ctx).unwrap();
1567
1568 let mut exchange = Exchange::new(Message::default());
1569 exchange.input.set_header(
1570 "CamelHttpMethod",
1571 serde_json::Value::String("DELETE".to_string()),
1572 );
1573
1574 let result = producer.oneshot(exchange).await.unwrap();
1575 let status = result
1576 .input
1577 .header("CamelHttpResponseCode")
1578 .and_then(|v| v.as_u64())
1579 .unwrap();
1580 assert_eq!(status, 200);
1581 }
1582
1583 #[tokio::test]
1584 async fn test_http_producer_forced_method() {
1585 use tower::ServiceExt;
1586
1587 let (url, _handle) = start_test_server().await;
1588 let ctx = test_producer_ctx();
1589
1590 let component = HttpComponent::new();
1591 let endpoint_ctx = NoOpComponentContext;
1592 let endpoint = component
1593 .create_endpoint(
1594 &format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"),
1595 &endpoint_ctx,
1596 )
1597 .unwrap();
1598 let producer = endpoint.create_producer(&ctx).unwrap();
1599
1600 let exchange = Exchange::new(Message::default());
1601 let result = producer.oneshot(exchange).await.unwrap();
1602
1603 let status = result
1604 .input
1605 .header("CamelHttpResponseCode")
1606 .and_then(|v| v.as_u64())
1607 .unwrap();
1608 assert_eq!(status, 200);
1609 }
1610
1611 #[tokio::test]
1612 async fn test_http_producer_throw_exception_on_failure() {
1613 use tower::ServiceExt;
1614
1615 let (url, _handle) = start_status_server(404).await;
1616 let ctx = test_producer_ctx();
1617
1618 let component = HttpComponent::new();
1619 let endpoint_ctx = NoOpComponentContext;
1620 let endpoint = component
1621 .create_endpoint(
1622 &format!("{url}/not-found?allowPrivateIps=true"),
1623 &endpoint_ctx,
1624 )
1625 .unwrap();
1626 let producer = endpoint.create_producer(&ctx).unwrap();
1627
1628 let exchange = Exchange::new(Message::default());
1629 let result = producer.oneshot(exchange).await;
1630 assert!(result.is_err());
1631
1632 match result.unwrap_err() {
1633 CamelError::HttpOperationFailed { status_code, .. } => {
1634 assert_eq!(status_code, 404);
1635 }
1636 e => panic!("Expected HttpOperationFailed, got: {e}"),
1637 }
1638 }
1639
1640 #[tokio::test]
1641 async fn test_http_producer_no_throw_on_failure() {
1642 use tower::ServiceExt;
1643
1644 let (url, _handle) = start_status_server(500).await;
1645 let ctx = test_producer_ctx();
1646
1647 let component = HttpComponent::new();
1648 let endpoint_ctx = NoOpComponentContext;
1649 let endpoint = component
1650 .create_endpoint(
1651 &format!("{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"),
1652 &endpoint_ctx,
1653 )
1654 .unwrap();
1655 let producer = endpoint.create_producer(&ctx).unwrap();
1656
1657 let exchange = Exchange::new(Message::default());
1658 let result = producer.oneshot(exchange).await.unwrap();
1659
1660 let status = result
1661 .input
1662 .header("CamelHttpResponseCode")
1663 .and_then(|v| v.as_u64())
1664 .unwrap();
1665 assert_eq!(status, 500);
1666 }
1667
1668 #[tokio::test]
1669 async fn test_http_producer_uri_override() {
1670 use tower::ServiceExt;
1671
1672 let (url, _handle) = start_test_server().await;
1673 let ctx = test_producer_ctx();
1674
1675 let component = HttpComponent::new();
1676 let endpoint_ctx = NoOpComponentContext;
1677 let endpoint = component
1678 .create_endpoint(
1679 "http://localhost:1/does-not-exist?allowPrivateIps=true",
1680 &endpoint_ctx,
1681 )
1682 .unwrap();
1683 let producer = endpoint.create_producer(&ctx).unwrap();
1684
1685 let mut exchange = Exchange::new(Message::default());
1686 exchange.input.set_header(
1687 "CamelHttpUri",
1688 serde_json::Value::String(format!("{url}/api")),
1689 );
1690
1691 let result = producer.oneshot(exchange).await.unwrap();
1692 let status = result
1693 .input
1694 .header("CamelHttpResponseCode")
1695 .and_then(|v| v.as_u64())
1696 .unwrap();
1697 assert_eq!(status, 200);
1698 }
1699
1700 #[tokio::test]
1701 async fn test_http_producer_response_headers_mapped() {
1702 use tower::ServiceExt;
1703
1704 let (url, _handle) = start_test_server().await;
1705 let ctx = test_producer_ctx();
1706
1707 let component = HttpComponent::new();
1708 let endpoint_ctx = NoOpComponentContext;
1709 let endpoint = component
1710 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
1711 .unwrap();
1712 let producer = endpoint.create_producer(&ctx).unwrap();
1713
1714 let exchange = Exchange::new(Message::default());
1715 let result = producer.oneshot(exchange).await.unwrap();
1716
1717 assert!(
1718 result.input.header("content-type").is_some()
1719 || result.input.header("Content-Type").is_some()
1720 );
1721 assert!(result.input.header("CamelHttpResponseText").is_some());
1722 }
1723
1724 async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
1729 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1730 let addr = listener.local_addr().unwrap();
1731 let url = format!("http://127.0.0.1:{}", addr.port());
1732
1733 let handle = tokio::spawn(async move {
1734 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1735 loop {
1736 if let Ok((mut stream, _)) = listener.accept().await {
1737 tokio::spawn(async move {
1738 let mut buf = vec![0u8; 4096];
1739 let n = stream.read(&mut buf).await.unwrap_or(0);
1740 let request = String::from_utf8_lossy(&buf[..n]).to_string();
1741
1742 if request.contains("GET /final") {
1744 let body = r#"{"status":"final"}"#;
1745 let response = format!(
1746 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1747 body.len(),
1748 body
1749 );
1750 let _ = stream.write_all(response.as_bytes()).await;
1751 } else {
1752 let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
1754 let _ = stream.write_all(response.as_bytes()).await;
1755 }
1756 });
1757 }
1758 }
1759 });
1760
1761 (url, handle)
1762 }
1763
1764 #[tokio::test]
1765 async fn test_follow_redirects_false_does_not_follow() {
1766 use tower::ServiceExt;
1767
1768 let (url, _handle) = start_redirect_server().await;
1769 let ctx = test_producer_ctx();
1770
1771 let component =
1772 HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
1773 let endpoint_ctx = NoOpComponentContext;
1774 let endpoint = component
1775 .create_endpoint(
1776 &format!("{url}?throwExceptionOnFailure=false&allowPrivateIps=true"),
1777 &endpoint_ctx,
1778 )
1779 .unwrap();
1780 let producer = endpoint.create_producer(&ctx).unwrap();
1781
1782 let exchange = Exchange::new(Message::default());
1783 let result = producer.oneshot(exchange).await.unwrap();
1784
1785 let status = result
1787 .input
1788 .header("CamelHttpResponseCode")
1789 .and_then(|v| v.as_u64())
1790 .unwrap();
1791 assert_eq!(
1792 status, 302,
1793 "Should NOT follow redirect when followRedirects=false"
1794 );
1795 }
1796
1797 #[tokio::test]
1798 async fn test_follow_redirects_true_follows_redirect() {
1799 use tower::ServiceExt;
1800
1801 let (url, _handle) = start_redirect_server().await;
1802 let ctx = test_producer_ctx();
1803
1804 let component =
1805 HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
1806 let endpoint_ctx = NoOpComponentContext;
1807 let endpoint = component
1808 .create_endpoint(&format!("{url}?allowPrivateIps=true"), &endpoint_ctx)
1809 .unwrap();
1810 let producer = endpoint.create_producer(&ctx).unwrap();
1811
1812 let exchange = Exchange::new(Message::default());
1813 let result = producer.oneshot(exchange).await.unwrap();
1814
1815 let status = result
1817 .input
1818 .header("CamelHttpResponseCode")
1819 .and_then(|v| v.as_u64())
1820 .unwrap();
1821 assert_eq!(
1822 status, 200,
1823 "Should follow redirect when followRedirects=true"
1824 );
1825 }
1826
1827 #[tokio::test]
1828 async fn test_query_params_forwarded_to_http_request() {
1829 use tower::ServiceExt;
1830
1831 let (url, _handle) = start_test_server().await;
1832 let ctx = test_producer_ctx();
1833
1834 let component = HttpComponent::new();
1835 let endpoint_ctx = NoOpComponentContext;
1836 let endpoint = component
1838 .create_endpoint(
1839 &format!("{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"),
1840 &endpoint_ctx,
1841 )
1842 .unwrap();
1843 let producer = endpoint.create_producer(&ctx).unwrap();
1844
1845 let exchange = Exchange::new(Message::default());
1846 let result = producer.oneshot(exchange).await.unwrap();
1847
1848 let status = result
1851 .input
1852 .header("CamelHttpResponseCode")
1853 .and_then(|v| v.as_u64())
1854 .unwrap();
1855 assert_eq!(status, 200);
1856 }
1857
1858 #[tokio::test]
1859 async fn test_non_camel_query_params_are_forwarded() {
1860 let config = HttpEndpointConfig::from_uri(
1863 "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
1864 )
1865 .unwrap();
1866
1867 assert!(
1869 config.query_params.contains_key("apiKey"),
1870 "apiKey should be preserved"
1871 );
1872 assert!(
1873 config.query_params.contains_key("token"),
1874 "token should be preserved"
1875 );
1876 assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
1877 assert_eq!(config.query_params.get("token").unwrap(), "abc456");
1878
1879 assert!(
1881 !config.query_params.contains_key("httpMethod"),
1882 "httpMethod should not be forwarded"
1883 );
1884 }
1885
1886 #[tokio::test]
1891 async fn test_http_producer_blocks_metadata_endpoint() {
1892 use tower::ServiceExt;
1893
1894 let ctx = test_producer_ctx();
1895 let component = HttpComponent::new();
1896 let endpoint_ctx = NoOpComponentContext;
1897 let endpoint = component
1898 .create_endpoint(
1899 "http://example.com/api?allowPrivateIps=false",
1900 &endpoint_ctx,
1901 )
1902 .unwrap();
1903 let producer = endpoint.create_producer(&ctx).unwrap();
1904
1905 let mut exchange = Exchange::new(Message::default());
1906 exchange.input.set_header(
1907 "CamelHttpUri",
1908 serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
1909 );
1910
1911 let result = producer.oneshot(exchange).await;
1912 assert!(result.is_err(), "Should block AWS metadata endpoint");
1913
1914 let err = result.unwrap_err();
1915 assert!(
1916 err.to_string().contains("Private IP"),
1917 "Error should mention private IP blocking, got: {}",
1918 err
1919 );
1920 }
1921
1922 #[test]
1923 fn test_ssrf_config_defaults() {
1924 let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
1925 assert!(
1926 !config.allow_private_ips,
1927 "Private IPs should be blocked by default"
1928 );
1929 assert!(
1930 config.blocked_hosts.is_empty(),
1931 "Blocked hosts should be empty by default"
1932 );
1933 }
1934
1935 #[test]
1936 fn test_ssrf_config_allow_private_ips() {
1937 let config =
1938 HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
1939 assert!(
1940 config.allow_private_ips,
1941 "Private IPs should be allowed when explicitly set"
1942 );
1943 }
1944
1945 #[test]
1946 fn test_ssrf_config_blocked_hosts() {
1947 let config = HttpEndpointConfig::from_uri(
1948 "http://example.com/api?blockedHosts=evil.com,malware.net",
1949 )
1950 .unwrap();
1951 assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
1952 }
1953
1954 #[tokio::test]
1955 async fn test_http_producer_blocks_localhost() {
1956 use tower::ServiceExt;
1957
1958 let ctx = test_producer_ctx();
1959 let component = HttpComponent::new();
1960 let endpoint_ctx = NoOpComponentContext;
1961 let endpoint = component
1962 .create_endpoint("http://example.com/api", &endpoint_ctx)
1963 .unwrap();
1964 let producer = endpoint.create_producer(&ctx).unwrap();
1965
1966 let mut exchange = Exchange::new(Message::default());
1967 exchange.input.set_header(
1968 "CamelHttpUri",
1969 serde_json::Value::String("http://localhost:8080/internal".to_string()),
1970 );
1971
1972 let result = producer.oneshot(exchange).await;
1973 assert!(result.is_err(), "Should block localhost");
1974 }
1975
1976 #[tokio::test]
1977 async fn test_http_producer_blocks_loopback_ip() {
1978 use tower::ServiceExt;
1979
1980 let ctx = test_producer_ctx();
1981 let component = HttpComponent::new();
1982 let endpoint_ctx = NoOpComponentContext;
1983 let endpoint = component
1984 .create_endpoint("http://example.com/api", &endpoint_ctx)
1985 .unwrap();
1986 let producer = endpoint.create_producer(&ctx).unwrap();
1987
1988 let mut exchange = Exchange::new(Message::default());
1989 exchange.input.set_header(
1990 "CamelHttpUri",
1991 serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
1992 );
1993
1994 let result = producer.oneshot(exchange).await;
1995 assert!(result.is_err(), "Should block loopback IP");
1996 }
1997
1998 #[tokio::test]
1999 async fn test_http_producer_allows_private_ip_when_enabled() {
2000 use tower::ServiceExt;
2001
2002 let ctx = test_producer_ctx();
2003 let component = HttpComponent::new();
2004 let endpoint_ctx = NoOpComponentContext;
2005 let endpoint = component
2008 .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true", &endpoint_ctx)
2009 .unwrap();
2010 let producer = endpoint.create_producer(&ctx).unwrap();
2011
2012 let exchange = Exchange::new(Message::default());
2013
2014 let result = producer.oneshot(exchange).await;
2017 if let Err(ref e) = result {
2019 let err_str = e.to_string();
2020 assert!(
2021 !err_str.contains("Private IP") && !err_str.contains("not allowed"),
2022 "Should not be SSRF error, got: {}",
2023 err_str
2024 );
2025 }
2026 }
2027
2028 #[test]
2033 fn test_http_server_config_parse() {
2034 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
2035 assert_eq!(cfg.host, "0.0.0.0");
2036 assert_eq!(cfg.port, 8080);
2037 assert_eq!(cfg.path, "/orders");
2038 }
2039
2040 #[test]
2041 fn test_http_server_config_scheme() {
2042 assert_eq!(HttpServerConfig::scheme(), "http");
2044 }
2045
2046 #[test]
2047 fn test_http_server_config_from_components() {
2048 let components = camel_component_api::UriComponents {
2050 scheme: "https".to_string(),
2051 path: "//0.0.0.0:8443/api".to_string(),
2052 params: std::collections::HashMap::from([(
2053 "maxRequestBody".to_string(),
2054 "5242880".to_string(),
2055 )]),
2056 };
2057 let cfg = HttpServerConfig::from_components(components).unwrap();
2058 assert_eq!(cfg.host, "0.0.0.0");
2059 assert_eq!(cfg.port, 8443);
2060 assert_eq!(cfg.path, "/api");
2061 assert_eq!(cfg.max_request_body, 5242880);
2062 }
2063
2064 #[test]
2065 fn test_http_server_config_default_path() {
2066 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
2067 assert_eq!(cfg.path, "/");
2068 }
2069
2070 #[test]
2071 fn test_http_server_config_wrong_scheme() {
2072 assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
2073 }
2074
2075 #[test]
2076 fn test_http_server_config_invalid_port() {
2077 assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
2078 }
2079
2080 #[test]
2081 fn test_http_server_config_default_port_by_scheme() {
2082 let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
2084 assert_eq!(cfg_http.port, 80);
2085
2086 let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
2088 assert_eq!(cfg_https.port, 443);
2089 }
2090
2091 #[test]
2092 fn test_request_envelope_and_reply_are_send() {
2093 fn assert_send<T: Send>() {}
2094 assert_send::<RequestEnvelope>();
2095 assert_send::<HttpReply>();
2096 }
2097
2098 #[test]
2103 fn test_server_registry_global_is_singleton() {
2104 let r1 = ServerRegistry::global();
2105 let r2 = ServerRegistry::global();
2106 assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
2107 }
2108
2109 #[tokio::test]
2110 async fn test_concurrent_get_or_spawn_returns_same_dispatch() {
2111 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2112 let port = listener.local_addr().unwrap().port();
2113 drop(listener);
2114
2115 let results: Arc<std::sync::Mutex<Vec<DispatchTable>>> =
2116 Arc::new(std::sync::Mutex::new(Vec::new()));
2117
2118 let mut handles = Vec::new();
2119 for _ in 0..4 {
2120 let results = results.clone();
2121 handles.push(tokio::spawn(async move {
2122 let dispatch = ServerRegistry::global()
2123 .get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024)
2124 .await
2125 .unwrap();
2126 results.lock().unwrap().push(dispatch);
2127 }));
2128 }
2129
2130 for h in handles {
2131 h.await.unwrap();
2132 }
2133
2134 let dispatches = results.lock().unwrap();
2135 assert_eq!(dispatches.len(), 4);
2136 for i in 1..dispatches.len() {
2137 assert!(
2138 Arc::ptr_eq(&dispatches[0], &dispatches[i]),
2139 "all concurrent callers should get the same dispatch table"
2140 );
2141 }
2142 }
2143
2144 #[tokio::test]
2149 async fn test_dispatch_handler_returns_404_for_unknown_path() {
2150 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
2151 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2153 let port = listener.local_addr().unwrap().port();
2154 tokio::spawn(run_axum_server(listener, dispatch, 2 * 1024 * 1024));
2155
2156 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2158
2159 let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
2160 .await
2161 .unwrap();
2162 assert_eq!(resp.status().as_u16(), 404);
2163 }
2164
2165 #[tokio::test]
2170 async fn test_http_consumer_start_registers_path() {
2171 use camel_component_api::ConsumerContext;
2172
2173 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2175 let port = listener.local_addr().unwrap().port();
2176 drop(listener); let consumer_cfg = HttpServerConfig {
2179 host: "127.0.0.1".to_string(),
2180 port,
2181 path: "/ping".to_string(),
2182 max_request_body: 2 * 1024 * 1024,
2183 max_response_body: 10 * 1024 * 1024,
2184 };
2185 let mut consumer = HttpConsumer::new(consumer_cfg);
2186
2187 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
2188 let token = tokio_util::sync::CancellationToken::new();
2189 let ctx = ConsumerContext::new(tx, token.clone());
2190
2191 tokio::spawn(async move {
2192 consumer.start(ctx).await.unwrap();
2193 });
2194
2195 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2196
2197 let client = reqwest::Client::new();
2198 let resp_future = client
2199 .post(format!("http://127.0.0.1:{port}/ping"))
2200 .body("hello world")
2201 .send();
2202
2203 let (http_result, _) = tokio::join!(resp_future, async {
2204 if let Some(mut envelope) = rx.recv().await {
2205 envelope.exchange.input.set_header(
2207 "CamelHttpResponseCode",
2208 serde_json::Value::Number(201.into()),
2209 );
2210 if let Some(reply_tx) = envelope.reply_tx {
2211 let _ = reply_tx.send(Ok(envelope.exchange));
2212 }
2213 }
2214 });
2215
2216 let resp = http_result.unwrap();
2217 assert_eq!(resp.status().as_u16(), 201);
2218
2219 token.cancel();
2220 }
2221
2222 #[tokio::test]
2227 async fn test_integration_single_consumer_round_trip() {
2228 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2229
2230 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2232 let port = listener.local_addr().unwrap().port();
2233 drop(listener); let component = HttpComponent::new();
2236 let endpoint_ctx = NoOpComponentContext;
2237 let endpoint = component
2238 .create_endpoint(&format!("http://127.0.0.1:{port}/echo"), &endpoint_ctx)
2239 .unwrap();
2240 let mut consumer = endpoint.create_consumer().unwrap();
2241
2242 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2243 let token = tokio_util::sync::CancellationToken::new();
2244 let ctx = ConsumerContext::new(tx, token.clone());
2245
2246 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2247 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2248
2249 let client = reqwest::Client::new();
2250 let send_fut = client
2251 .post(format!("http://127.0.0.1:{port}/echo"))
2252 .header("Content-Type", "text/plain")
2253 .body("ping")
2254 .send();
2255
2256 let (http_result, _) = tokio::join!(send_fut, async {
2257 if let Some(mut envelope) = rx.recv().await {
2258 assert_eq!(
2259 envelope.exchange.input.header("CamelHttpMethod"),
2260 Some(&serde_json::Value::String("POST".into()))
2261 );
2262 assert_eq!(
2263 envelope.exchange.input.header("CamelHttpPath"),
2264 Some(&serde_json::Value::String("/echo".into()))
2265 );
2266 envelope.exchange.input.body = camel_component_api::Body::Text("pong".to_string());
2267 if let Some(reply_tx) = envelope.reply_tx {
2268 let _ = reply_tx.send(Ok(envelope.exchange));
2269 }
2270 }
2271 });
2272
2273 let resp = http_result.unwrap();
2274 assert_eq!(resp.status().as_u16(), 200);
2275 let body = resp.text().await.unwrap();
2276 assert_eq!(body, "pong");
2277
2278 token.cancel();
2279 }
2280
2281 #[tokio::test]
2282 async fn test_integration_two_consumers_shared_port() {
2283 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2284
2285 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2287 let port = listener.local_addr().unwrap().port();
2288 drop(listener);
2289
2290 let component = HttpComponent::new();
2291 let endpoint_ctx = NoOpComponentContext;
2292
2293 let endpoint_a = component
2295 .create_endpoint(&format!("http://127.0.0.1:{port}/hello"), &endpoint_ctx)
2296 .unwrap();
2297 let mut consumer_a = endpoint_a.create_consumer().unwrap();
2298
2299 let endpoint_b = component
2301 .create_endpoint(&format!("http://127.0.0.1:{port}/world"), &endpoint_ctx)
2302 .unwrap();
2303 let mut consumer_b = endpoint_b.create_consumer().unwrap();
2304
2305 let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2306 let token_a = tokio_util::sync::CancellationToken::new();
2307 let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
2308
2309 let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2310 let token_b = tokio_util::sync::CancellationToken::new();
2311 let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
2312
2313 tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
2314 tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
2315 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2316
2317 let client = reqwest::Client::new();
2318
2319 let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
2321 let (resp_hello, _) = tokio::join!(fut_hello, async {
2322 if let Some(mut envelope) = rx_a.recv().await {
2323 envelope.exchange.input.body =
2324 camel_component_api::Body::Text("hello-response".to_string());
2325 if let Some(reply_tx) = envelope.reply_tx {
2326 let _ = reply_tx.send(Ok(envelope.exchange));
2327 }
2328 }
2329 });
2330
2331 let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
2333 let (resp_world, _) = tokio::join!(fut_world, async {
2334 if let Some(mut envelope) = rx_b.recv().await {
2335 envelope.exchange.input.body =
2336 camel_component_api::Body::Text("world-response".to_string());
2337 if let Some(reply_tx) = envelope.reply_tx {
2338 let _ = reply_tx.send(Ok(envelope.exchange));
2339 }
2340 }
2341 });
2342
2343 let body_a = resp_hello.unwrap().text().await.unwrap();
2344 let body_b = resp_world.unwrap().text().await.unwrap();
2345
2346 assert_eq!(body_a, "hello-response");
2347 assert_eq!(body_b, "world-response");
2348
2349 token_a.cancel();
2350 token_b.cancel();
2351 }
2352
2353 #[tokio::test]
2354 async fn test_integration_unregistered_path_returns_404() {
2355 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2356
2357 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2359 let port = listener.local_addr().unwrap().port();
2360 drop(listener);
2361
2362 let component = HttpComponent::new();
2363 let endpoint_ctx = NoOpComponentContext;
2364 let endpoint = component
2365 .create_endpoint(
2366 &format!("http://127.0.0.1:{port}/registered"),
2367 &endpoint_ctx,
2368 )
2369 .unwrap();
2370 let mut consumer = endpoint.create_consumer().unwrap();
2371
2372 let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2373 let token = tokio_util::sync::CancellationToken::new();
2374 let ctx = ConsumerContext::new(tx, token.clone());
2375
2376 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2377 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2378
2379 let client = reqwest::Client::new();
2380 let resp = client
2381 .get(format!("http://127.0.0.1:{port}/not-there"))
2382 .send()
2383 .await
2384 .unwrap();
2385 assert_eq!(resp.status().as_u16(), 404);
2386
2387 token.cancel();
2388 }
2389
2390 #[test]
2391 fn test_http_consumer_declares_concurrent() {
2392 use camel_component_api::ConcurrencyModel;
2393
2394 let config = HttpServerConfig {
2395 host: "127.0.0.1".to_string(),
2396 port: 19999,
2397 path: "/test".to_string(),
2398 max_request_body: 2 * 1024 * 1024,
2399 max_response_body: 10 * 1024 * 1024,
2400 };
2401 let consumer = HttpConsumer::new(config);
2402 assert_eq!(
2403 consumer.concurrency_model(),
2404 ConcurrencyModel::Concurrent { max: None }
2405 );
2406 }
2407
2408 #[tokio::test]
2413 async fn test_http_reply_body_stream_variant_exists() {
2414 use bytes::Bytes;
2415 use camel_component_api::CamelError;
2416 use futures::stream;
2417
2418 let chunks: Vec<Result<Bytes, CamelError>> =
2419 vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
2420 let stream = Box::pin(stream::iter(chunks));
2421 let reply_body = HttpReplyBody::Stream(stream);
2422 match reply_body {
2424 HttpReplyBody::Stream(_) => {}
2425 HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
2426 }
2427 }
2428
2429 #[cfg(feature = "otel")]
2434 mod otel_tests {
2435 use super::*;
2436 use camel_component_api::Message;
2437 use tower::ServiceExt;
2438
2439 #[tokio::test]
2440 async fn test_producer_injects_traceparent_header() {
2441 let (url, _handle) = start_test_server_with_header_capture().await;
2442 let ctx = test_producer_ctx();
2443
2444 let component = HttpComponent::new();
2445 let endpoint_ctx = NoOpComponentContext;
2446 let endpoint = component
2447 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2448 .unwrap();
2449 let producer = endpoint.create_producer(&ctx).unwrap();
2450
2451 let mut exchange = Exchange::new(Message::default());
2453 let mut headers = std::collections::HashMap::new();
2454 headers.insert(
2455 "traceparent".to_string(),
2456 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
2457 );
2458 camel_otel::extract_into_exchange(&mut exchange, &headers);
2459
2460 let result = producer.oneshot(exchange).await.unwrap();
2461
2462 let status = result
2464 .input
2465 .header("CamelHttpResponseCode")
2466 .and_then(|v| v.as_u64())
2467 .unwrap();
2468 assert_eq!(status, 200);
2469
2470 let traceparent = result.input.header("x-received-traceparent");
2472 assert!(
2473 traceparent.is_some(),
2474 "traceparent header should have been sent"
2475 );
2476
2477 let traceparent_str = traceparent.unwrap().as_str().unwrap();
2478 let parts: Vec<&str> = traceparent_str.split('-').collect();
2480 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2481 assert_eq!(parts[0], "00", "version should be 00");
2482 assert_eq!(
2483 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2484 "trace-id should match"
2485 );
2486 assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
2487 assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
2488 }
2489
2490 #[tokio::test]
2491 async fn test_consumer_extracts_traceparent_header() {
2492 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2493
2494 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2496 let port = listener.local_addr().unwrap().port();
2497 drop(listener);
2498
2499 let component = HttpComponent::new();
2500 let endpoint_ctx = NoOpComponentContext;
2501 let endpoint = component
2502 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
2503 .unwrap();
2504 let mut consumer = endpoint.create_consumer().unwrap();
2505
2506 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2507 let token = tokio_util::sync::CancellationToken::new();
2508 let ctx = ConsumerContext::new(tx, token.clone());
2509
2510 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2511 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2512
2513 let client = reqwest::Client::new();
2515 let send_fut = client
2516 .post(format!("http://127.0.0.1:{port}/trace"))
2517 .header(
2518 "traceparent",
2519 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2520 )
2521 .body("test")
2522 .send();
2523
2524 let (http_result, _) = tokio::join!(send_fut, async {
2525 if let Some(envelope) = rx.recv().await {
2526 let mut injected_headers = std::collections::HashMap::new();
2529 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2530
2531 assert!(
2532 injected_headers.contains_key("traceparent"),
2533 "Exchange should have traceparent after extraction"
2534 );
2535
2536 let traceparent = injected_headers.get("traceparent").unwrap();
2537 let parts: Vec<&str> = traceparent.split('-').collect();
2538 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2539 assert_eq!(
2540 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2541 "Trace ID should match the original traceparent header"
2542 );
2543
2544 if let Some(reply_tx) = envelope.reply_tx {
2545 let _ = reply_tx.send(Ok(envelope.exchange));
2546 }
2547 }
2548 });
2549
2550 let resp = http_result.unwrap();
2551 assert_eq!(resp.status().as_u16(), 200);
2552
2553 token.cancel();
2554 }
2555
2556 #[tokio::test]
2557 async fn test_consumer_extracts_mixed_case_traceparent_header() {
2558 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2559
2560 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2562 let port = listener.local_addr().unwrap().port();
2563 drop(listener);
2564
2565 let component = HttpComponent::new();
2566 let endpoint_ctx = NoOpComponentContext;
2567 let endpoint = component
2568 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
2569 .unwrap();
2570 let mut consumer = endpoint.create_consumer().unwrap();
2571
2572 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2573 let token = tokio_util::sync::CancellationToken::new();
2574 let ctx = ConsumerContext::new(tx, token.clone());
2575
2576 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2577 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2578
2579 let client = reqwest::Client::new();
2581 let send_fut = client
2582 .post(format!("http://127.0.0.1:{port}/trace"))
2583 .header(
2584 "TraceParent",
2585 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2586 )
2587 .body("test")
2588 .send();
2589
2590 let (http_result, _) = tokio::join!(send_fut, async {
2591 if let Some(envelope) = rx.recv().await {
2592 let mut injected_headers = HashMap::new();
2595 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2596
2597 assert!(
2598 injected_headers.contains_key("traceparent"),
2599 "Exchange should have traceparent after extraction from mixed-case header"
2600 );
2601
2602 let traceparent = injected_headers.get("traceparent").unwrap();
2603 let parts: Vec<&str> = traceparent.split('-').collect();
2604 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2605 assert_eq!(
2606 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2607 "Trace ID should match the original mixed-case TraceParent header"
2608 );
2609
2610 if let Some(reply_tx) = envelope.reply_tx {
2611 let _ = reply_tx.send(Ok(envelope.exchange));
2612 }
2613 }
2614 });
2615
2616 let resp = http_result.unwrap();
2617 assert_eq!(resp.status().as_u16(), 200);
2618
2619 token.cancel();
2620 }
2621
2622 #[tokio::test]
2623 async fn test_producer_no_trace_context_no_crash() {
2624 let (url, _handle) = start_test_server().await;
2625 let ctx = test_producer_ctx();
2626
2627 let component = HttpComponent::new();
2628 let endpoint_ctx = NoOpComponentContext;
2629 let endpoint = component
2630 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2631 .unwrap();
2632 let producer = endpoint.create_producer(&ctx).unwrap();
2633
2634 let exchange = Exchange::new(Message::default());
2636
2637 let result = producer.oneshot(exchange).await.unwrap();
2639
2640 let status = result
2642 .input
2643 .header("CamelHttpResponseCode")
2644 .and_then(|v| v.as_u64())
2645 .unwrap();
2646 assert_eq!(status, 200);
2647 }
2648
2649 async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
2651 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2652 let addr = listener.local_addr().unwrap();
2653 let url = format!("http://127.0.0.1:{}", addr.port());
2654
2655 let handle = tokio::spawn(async move {
2656 loop {
2657 if let Ok((mut stream, _)) = listener.accept().await {
2658 tokio::spawn(async move {
2659 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2660 let mut buf = vec![0u8; 8192];
2661 let n = stream.read(&mut buf).await.unwrap_or(0);
2662 let request = String::from_utf8_lossy(&buf[..n]).to_string();
2663
2664 let traceparent = request
2666 .lines()
2667 .find(|line| line.to_lowercase().starts_with("traceparent:"))
2668 .map(|line| {
2669 line.split(':')
2670 .nth(1)
2671 .map(|s| s.trim().to_string())
2672 .unwrap_or_default()
2673 })
2674 .unwrap_or_default();
2675
2676 let body =
2677 format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
2678 let response = format!(
2679 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
2680 body.len(),
2681 traceparent,
2682 body
2683 );
2684 let _ = stream.write_all(response.as_bytes()).await;
2685 });
2686 }
2687 }
2688 });
2689
2690 (url, handle)
2691 }
2692 }
2693
2694 #[tokio::test]
2703 async fn test_request_body_arrives_as_stream() {
2704 use camel_component_api::Body;
2705 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2706
2707 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2708 let port = listener.local_addr().unwrap().port();
2709 drop(listener);
2710
2711 let component = HttpComponent::new();
2712 let endpoint_ctx = NoOpComponentContext;
2713 let endpoint = component
2714 .create_endpoint(&format!("http://127.0.0.1:{port}/upload"), &endpoint_ctx)
2715 .unwrap();
2716 let mut consumer = endpoint.create_consumer().unwrap();
2717
2718 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2719 let token = tokio_util::sync::CancellationToken::new();
2720 let ctx = ConsumerContext::new(tx, token.clone());
2721
2722 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2723 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2724
2725 let client = reqwest::Client::new();
2726 let send_fut = client
2727 .post(format!("http://127.0.0.1:{port}/upload"))
2728 .body("hello streaming world")
2729 .send();
2730
2731 let (http_result, _) = tokio::join!(send_fut, async {
2732 if let Some(mut envelope) = rx.recv().await {
2733 assert!(
2735 matches!(envelope.exchange.input.body, Body::Stream(_)),
2736 "expected Body::Stream, got discriminant {:?}",
2737 std::mem::discriminant(&envelope.exchange.input.body)
2738 );
2739 let bytes = envelope
2741 .exchange
2742 .input
2743 .body
2744 .into_bytes(1024 * 1024)
2745 .await
2746 .unwrap();
2747 assert_eq!(&bytes[..], b"hello streaming world");
2748
2749 envelope.exchange.input.body = camel_component_api::Body::Empty;
2750 if let Some(reply_tx) = envelope.reply_tx {
2751 let _ = reply_tx.send(Ok(envelope.exchange));
2752 }
2753 }
2754 });
2755
2756 let resp = http_result.unwrap();
2757 assert_eq!(resp.status().as_u16(), 200);
2758
2759 token.cancel();
2760 }
2761
2762 #[tokio::test]
2767 async fn test_streaming_response_chunked() {
2768 use bytes::Bytes;
2769 use camel_component_api::Body;
2770 use camel_component_api::CamelError;
2771 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2772 use camel_component_api::{StreamBody, StreamMetadata};
2773 use futures::stream;
2774 use std::sync::Arc;
2775 use tokio::sync::Mutex;
2776
2777 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2778 let port = listener.local_addr().unwrap().port();
2779 drop(listener);
2780
2781 let component = HttpComponent::new();
2782 let endpoint_ctx = NoOpComponentContext;
2783 let endpoint = component
2784 .create_endpoint(&format!("http://127.0.0.1:{port}/stream"), &endpoint_ctx)
2785 .unwrap();
2786 let mut consumer = endpoint.create_consumer().unwrap();
2787
2788 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2789 let token = tokio_util::sync::CancellationToken::new();
2790 let ctx = ConsumerContext::new(tx, token.clone());
2791
2792 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2793 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2794
2795 let client = reqwest::Client::new();
2796 let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
2797
2798 let (http_result, _) = tokio::join!(send_fut, async {
2799 if let Some(mut envelope) = rx.recv().await {
2800 let chunks: Vec<Result<Bytes, CamelError>> =
2802 vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
2803 let stream = Box::pin(stream::iter(chunks));
2804 envelope.exchange.input.body = Body::Stream(StreamBody {
2805 stream: Arc::new(Mutex::new(Some(stream))),
2806 metadata: StreamMetadata::default(),
2807 });
2808 if let Some(reply_tx) = envelope.reply_tx {
2809 let _ = reply_tx.send(Ok(envelope.exchange));
2810 }
2811 }
2812 });
2813
2814 let resp = http_result.unwrap();
2815 assert_eq!(resp.status().as_u16(), 200);
2816 let body = resp.text().await.unwrap();
2817 assert_eq!(body, "chunk1chunk2");
2818
2819 token.cancel();
2820 }
2821
2822 #[tokio::test]
2827 async fn test_413_when_content_length_exceeds_limit() {
2828 use camel_component_api::ConsumerContext;
2829
2830 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2831 let port = listener.local_addr().unwrap().port();
2832 drop(listener);
2833
2834 let component = HttpComponent::new();
2836 let endpoint_ctx = NoOpComponentContext;
2837 let endpoint = component
2838 .create_endpoint(
2839 &format!("http://127.0.0.1:{port}/upload?maxRequestBody=100"),
2840 &endpoint_ctx,
2841 )
2842 .unwrap();
2843 let mut consumer = endpoint.create_consumer().unwrap();
2844
2845 let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
2846 let token = tokio_util::sync::CancellationToken::new();
2847 let ctx = ConsumerContext::new(tx, token.clone());
2848
2849 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2850 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2851
2852 let client = reqwest::Client::new();
2853 let resp = client
2854 .post(format!("http://127.0.0.1:{port}/upload"))
2855 .header("Content-Length", "1000") .body("x".repeat(1000))
2857 .send()
2858 .await
2859 .unwrap();
2860
2861 assert_eq!(resp.status().as_u16(), 413);
2862
2863 token.cancel();
2864 }
2865
2866 #[tokio::test]
2870 async fn test_chunked_upload_without_content_length_bypasses_limit() {
2871 use bytes::Bytes;
2872 use camel_component_api::Body;
2873 use camel_component_api::ConsumerContext;
2874 use futures::stream;
2875
2876 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2877 let port = listener.local_addr().unwrap().port();
2878 drop(listener);
2879
2880 let component = HttpComponent::new();
2882 let endpoint_ctx = NoOpComponentContext;
2883 let endpoint = component
2884 .create_endpoint(
2885 &format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"),
2886 &endpoint_ctx,
2887 )
2888 .unwrap();
2889 let mut consumer = endpoint.create_consumer().unwrap();
2890
2891 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
2892 let token = tokio_util::sync::CancellationToken::new();
2893 let ctx = ConsumerContext::new(tx, token.clone());
2894
2895 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2896 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2897
2898 let client = reqwest::Client::new();
2899
2900 let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
2904 Ok(Bytes::from("y".repeat(50))),
2905 Ok(Bytes::from("y".repeat(50))),
2906 ];
2907 let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
2908 let send_fut = client
2909 .post(format!("http://127.0.0.1:{port}/upload"))
2910 .body(stream_body)
2911 .send();
2912
2913 let consumer_fut = async {
2914 match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
2916 Ok(Some(mut envelope)) => {
2917 assert!(
2918 matches!(envelope.exchange.input.body, Body::Stream(_)),
2919 "expected Body::Stream"
2920 );
2921 envelope.exchange.input.body = camel_component_api::Body::Empty;
2922 if let Some(reply_tx) = envelope.reply_tx {
2923 let _ = reply_tx.send(Ok(envelope.exchange));
2924 }
2925 }
2926 Ok(None) => panic!("consumer channel closed unexpectedly"),
2927 Err(_) => {
2928 }
2931 }
2932 };
2933
2934 let (http_result, _) = tokio::join!(send_fut, consumer_fut);
2935
2936 let resp = http_result.unwrap();
2937 assert_ne!(
2939 resp.status().as_u16(),
2940 413,
2941 "chunked upload must not be rejected by maxRequestBody"
2942 );
2943 assert_eq!(resp.status().as_u16(), 200);
2944
2945 token.cancel();
2946 }
2947}