1pub mod auth;
2pub mod bundle;
3pub mod config;
4pub mod health;
5pub mod registry;
6pub mod static_config;
7pub mod static_dispatch;
8pub mod static_endpoint;
9use crate::config::parse_ok_status_code_range;
10pub use bundle::HttpBundle;
11pub use bundle::HttpStaticBundle;
12pub use config::HttpConfig;
13pub use health::HttpHealthCheck;
14pub use registry::HttpRouteRegistry;
15pub use static_config::HttpStaticConfig;
16pub use static_endpoint::{HttpStaticComponent, HttpStaticConsumer, HttpStaticEndpoint};
17
18use std::collections::HashMap;
19use std::future::Future;
20use std::net::IpAddr;
21use std::pin::Pin;
22use std::sync::{Arc, Mutex, OnceLock};
23use std::task::{Context, Poll};
24use std::time::Duration;
25
26use tokio::sync::OnceCell;
27use tower::Layer;
28use tower::Service;
29use tracing::debug;
30
31use axum::body::BodyDataStream;
32use camel_auth::bearer_token_layer::BearerTokenLayer;
33use camel_auth::oauth2::TokenProvider;
34use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, StreamBody, StreamMetadata};
35use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
36use camel_component_api::{UriComponents, UriConfig, parse_uri};
37use futures::TryStreamExt;
38use futures::stream::BoxStream;
39
40#[derive(Debug, Clone)]
101pub struct HttpEndpointConfig {
102 pub base_url: String,
103 pub http_method: Option<String>,
104 pub throw_exception_on_failure: bool,
105 pub ok_status_code_range: (u16, u16),
106 pub response_timeout: Option<Duration>,
107 pub query_params: HashMap<String, String>,
108 pub allow_private_ips: bool,
109 pub blocked_hosts: Vec<String>,
110 pub max_body_size: usize,
111 pub read_timeout_ms: u64,
112 pub max_response_bytes: usize,
113 pub auth: HttpAuth,
114 pub token_provider: Option<Arc<dyn TokenProvider>>,
115 pub user_agent: Option<String>,
116 pub cookie_handling: CookieHandling,
117 pub bridge_endpoint: bool,
118 pub connection_close: bool,
119 pub skip_request_headers: Vec<String>,
120 pub skip_response_headers: Vec<String>,
121}
122
123#[derive(Clone, PartialEq)]
124pub enum HttpAuth {
125 None,
126 Basic { username: String, password: String },
127 Bearer { token: String },
128}
129
130impl std::fmt::Debug for HttpAuth {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 match self {
133 HttpAuth::None => f.write_str("None"),
134 HttpAuth::Basic { username, .. } => f
135 .debug_struct("Basic")
136 .field("username", username)
137 .field("password", &"***")
138 .finish(),
139 HttpAuth::Bearer { .. } => f.debug_struct("Bearer").field("token", &"***").finish(),
140 }
141 }
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub enum CookieHandling {
146 Disabled,
147 InMemory,
148}
149
150const HTTP_CAMEL_OPTIONS: &[&str] = &[
152 "httpMethod",
153 "throwExceptionOnFailure",
154 "okStatusCodeRange",
155 "followRedirects",
156 "connectTimeout",
157 "responseTimeout",
158 "allowPrivateIps",
159 "blockedHosts",
160 "maxBodySize",
161 "readTimeout",
162 "maxResponseBytes",
163 "authMethod",
164 "authUsername",
165 "authPassword",
166 "authBearerToken",
167 "userAgent",
168 "cookieHandling",
169 "bridgeEndpoint",
170 "connectionClose",
171 "skipRequestHeaders",
172 "skipResponseHeaders",
173];
174
175impl UriConfig for HttpEndpointConfig {
176 fn scheme() -> &'static str {
178 "http"
179 }
180
181 fn from_uri(uri: &str) -> Result<Self, CamelError> {
182 let parts = parse_uri(uri)?;
183 Self::from_components(parts)
184 }
185
186 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
187 if parts.scheme != "http" && parts.scheme != "https" {
189 return Err(CamelError::InvalidUri(format!(
190 "expected scheme 'http' or 'https', got '{}'",
191 parts.scheme
192 )));
193 }
194
195 let base_url = format!("{}:{}", parts.scheme, parts.path);
198
199 let http_method = parts.params.get("httpMethod").cloned();
200
201 let throw_exception_on_failure = match parts.params.get("throwExceptionOnFailure") {
202 Some(v) => parse_bool_param_http(v).map_err(|e| {
203 CamelError::InvalidUri(format!("invalid value for throwExceptionOnFailure: {e}"))
204 })?,
205 None => true,
206 };
207
208 let ok_status_code_range = match parts.params.get("okStatusCodeRange") {
210 Some(v) => parse_ok_status_code_range(v)?,
211 None => (200, 299),
212 };
213
214 let response_timeout = match parts.params.get("responseTimeout") {
215 Some(v) => Some(v.parse::<u64>().map(Duration::from_millis).map_err(|e| {
216 CamelError::InvalidUri(format!("invalid value for responseTimeout: {e}"))
217 })?),
218 None => None,
219 };
220
221 let allow_private_ips = match parts.params.get("allowPrivateIps") {
223 Some(v) => parse_bool_param_http(v).map_err(|e| {
224 CamelError::InvalidUri(format!("invalid value for allowPrivateIps: {e}"))
225 })?,
226 None => false, };
228
229 let blocked_hosts = parts
231 .params
232 .get("blockedHosts")
233 .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
234 .unwrap_or_default();
235
236 let max_body_size = match parts.params.get("maxBodySize") {
237 Some(v) => v.parse::<usize>().map_err(|e| {
238 CamelError::InvalidUri(format!("invalid value for maxBodySize: {e}"))
239 })?,
240 None => 10 * 1024 * 1024, };
242
243 let read_timeout_ms = match parts.params.get("readTimeout") {
244 Some(v) => v.parse::<u64>().map_err(|e| {
245 CamelError::InvalidUri(format!("invalid value for readTimeout: {e}"))
246 })?,
247 None => 30_000, };
249
250 let max_response_bytes = match parts.params.get("maxResponseBytes") {
251 Some(v) => v.parse::<usize>().map_err(|e| {
252 CamelError::InvalidUri(format!("invalid value for maxResponseBytes: {e}"))
253 })?,
254 None => 10 * 1024 * 1024, };
256
257 let auth = parse_auth_from_params(&parts.params)?;
258
259 let user_agent = parts.params.get("userAgent").cloned();
260
261 let cookie_handling = match parts.params.get("cookieHandling") {
262 Some(v) if v.eq_ignore_ascii_case("inmemory") => CookieHandling::InMemory,
263 Some(v) if v.eq_ignore_ascii_case("disabled") => CookieHandling::Disabled,
264 Some(v) => {
265 return Err(CamelError::InvalidUri(format!(
266 "invalid value for cookieHandling: {v} (expected Disabled or InMemory)"
267 )));
268 }
269 None => CookieHandling::Disabled,
270 };
271
272 let bridge_endpoint = match parts.params.get("bridgeEndpoint") {
273 Some(v) => parse_bool_param_http(v).map_err(|e| {
274 CamelError::InvalidUri(format!("invalid value for bridgeEndpoint: {e}"))
275 })?,
276 None => false,
277 };
278
279 let connection_close = match parts.params.get("connectionClose") {
280 Some(v) => parse_bool_param_http(v).map_err(|e| {
281 CamelError::InvalidUri(format!("invalid value for connectionClose: {e}"))
282 })?,
283 None => false,
284 };
285
286 let skip_request_headers = parts
287 .params
288 .get("skipRequestHeaders")
289 .map(|v| {
290 v.split(',')
291 .map(str::trim)
292 .filter(|s| !s.is_empty())
293 .map(|s| s.to_ascii_lowercase())
294 .collect::<Vec<_>>()
295 })
296 .unwrap_or_default();
297
298 let skip_response_headers = parts
299 .params
300 .get("skipResponseHeaders")
301 .map(|v| {
302 v.split(',')
303 .map(str::trim)
304 .filter(|s| !s.is_empty())
305 .map(|s| s.to_ascii_lowercase())
306 .collect::<Vec<_>>()
307 })
308 .unwrap_or_default();
309
310 let query_params: HashMap<String, String> = parts
312 .params
313 .into_iter()
314 .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
315 .collect();
316
317 Ok(Self {
318 base_url,
319 http_method,
320 throw_exception_on_failure,
321 ok_status_code_range,
322 response_timeout,
323 query_params,
324 allow_private_ips,
325 blocked_hosts,
326 max_body_size,
327 read_timeout_ms,
328 max_response_bytes,
329 auth,
330 token_provider: None,
331 user_agent,
332 cookie_handling,
333 bridge_endpoint,
334 connection_close,
335 skip_request_headers,
336 skip_response_headers,
337 })
338 }
339}
340
341fn parse_auth_from_params(params: &HashMap<String, String>) -> Result<HttpAuth, CamelError> {
342 let Some(method) = params.get("authMethod") else {
343 return Ok(HttpAuth::None);
344 };
345
346 if method.eq_ignore_ascii_case("none") {
347 return Ok(HttpAuth::None);
348 }
349
350 if method.eq_ignore_ascii_case("basic") {
351 let username = params.get("authUsername").cloned().ok_or_else(|| {
352 CamelError::InvalidUri("authUsername is required for authMethod=Basic".to_string())
353 })?;
354 let password = params.get("authPassword").cloned().ok_or_else(|| {
355 CamelError::InvalidUri("authPassword is required for authMethod=Basic".to_string())
356 })?;
357 return Ok(HttpAuth::Basic { username, password });
358 }
359
360 if method.eq_ignore_ascii_case("bearer") {
361 let token = params.get("authBearerToken").cloned().ok_or_else(|| {
362 CamelError::InvalidUri("authBearerToken is required for authMethod=Bearer".to_string())
363 })?;
364 return Ok(HttpAuth::Bearer { token });
365 }
366
367 Err(CamelError::InvalidUri(format!(
368 "invalid value for authMethod: {method} (expected None, Basic, or Bearer)"
369 )))
370}
371
372fn parse_bool_param_http(value: &str) -> Result<bool, CamelError> {
373 match value.to_ascii_lowercase().as_str() {
374 "true" | "1" | "yes" => Ok(true),
375 "false" | "0" | "no" => Ok(false),
376 _ => Err(CamelError::InvalidUri(format!(
377 "invalid boolean value: '{value}'"
378 ))),
379 }
380}
381
382impl HttpEndpointConfig {
383 pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
384 let parts = parse_uri(uri)?;
385 let mut endpoint = Self::from_components(parts.clone())?;
386 if endpoint.response_timeout.is_none() {
387 endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
388 }
389 if !parts.params.contains_key("allowPrivateIps") {
390 endpoint.allow_private_ips = config.allow_private_ips;
391 }
392 if !parts.params.contains_key("blockedHosts") {
393 endpoint.blocked_hosts = config.blocked_hosts.clone();
394 }
395 if !parts.params.contains_key("maxBodySize") {
396 endpoint.max_body_size = config.max_body_size;
397 }
398 if !parts.params.contains_key("readTimeout") {
399 endpoint.read_timeout_ms = config.read_timeout_ms;
400 }
401 if !parts.params.contains_key("maxResponseBytes") {
402 endpoint.max_response_bytes = config.max_response_bytes;
403 }
404 if !parts.params.contains_key("okStatusCodeRange")
405 && let Some(range) = &config.ok_status_code_range
406 {
407 endpoint.ok_status_code_range = parse_ok_status_code_range(range)?;
408 }
409
410 Ok(endpoint)
411 }
412}
413
414#[derive(Debug, Clone)]
420pub struct HttpServerConfig {
421 pub host: String,
423 pub port: u16,
425 pub path: String,
427 pub max_request_body: usize,
429 pub max_response_body: usize,
431 pub max_inflight_requests: usize,
433}
434
435impl UriConfig for HttpServerConfig {
436 fn scheme() -> &'static str {
438 "http"
439 }
440
441 fn from_uri(uri: &str) -> Result<Self, CamelError> {
442 let parts = parse_uri(uri)?;
443 Self::from_components(parts)
444 }
445
446 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
447 if parts.scheme != "http" && parts.scheme != "https" {
449 return Err(CamelError::InvalidUri(format!(
450 "expected scheme 'http' or 'https', got '{}'",
451 parts.scheme
452 )));
453 }
454
455 let authority_and_path = parts.path.trim_start_matches('/');
458
459 let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
461 (&authority_and_path[..idx], &authority_and_path[idx..])
462 } else {
463 (authority_and_path, "/")
464 };
465
466 let path = if path_suffix.is_empty() {
467 "/"
468 } else {
469 path_suffix
470 }
471 .to_string();
472
473 let (host, port) = if let Some(colon) = authority.rfind(':') {
475 let port_str = &authority[colon + 1..];
476 match port_str.parse::<u16>() {
477 Ok(p) => (authority[..colon].to_string(), p),
478 Err(_) => {
479 return Err(CamelError::InvalidUri(format!(
480 "invalid port '{}' in authority",
481 port_str
482 )));
483 }
484 }
485 } else {
486 let default_port = if parts.scheme == "https" { 443 } else { 80 };
488 (authority.to_string(), default_port)
489 };
490
491 let max_request_body = parts
492 .params
493 .get("maxRequestBody")
494 .and_then(|v| v.parse::<usize>().ok())
495 .unwrap_or(2 * 1024 * 1024); let max_response_body = parts
498 .params
499 .get("maxResponseBody")
500 .and_then(|v| v.parse::<usize>().ok())
501 .unwrap_or(10 * 1024 * 1024); let max_inflight_requests = parts
504 .params
505 .get("maxInflightRequests")
506 .and_then(|v| v.parse::<usize>().ok())
507 .unwrap_or(1024);
508
509 Ok(Self {
510 host,
511 port,
512 path,
513 max_request_body,
514 max_response_body,
515 max_inflight_requests,
516 })
517 }
518}
519
520impl HttpServerConfig {
521 pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
522 let parts = parse_uri(uri)?;
523 let mut server = Self::from_components(parts.clone())?;
524 if !parts.params.contains_key("maxRequestBody") {
525 server.max_request_body = config.max_request_body;
526 }
527 if !parts.params.contains_key("maxResponseBody") {
528 server.max_response_body = config.max_body_size;
530 }
531 Ok(server)
532 }
533}
534
535pub enum HttpReplyBody {
543 Bytes(bytes::Bytes),
544 Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
545}
546
547pub struct RequestEnvelope {
552 pub method: String,
553 pub path: String,
554 pub query: String,
555 pub headers: http::HeaderMap,
556 pub body: StreamBody,
557 pub reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
558}
559
560pub struct HttpReply {
564 pub status: u16,
565 pub headers: Vec<(String, String)>,
566 pub body: HttpReplyBody,
567}
568
569type ServerKey = (String, u16);
574
575struct ServerHandle {
577 registry: HttpRouteRegistry,
578 max_request_body: usize,
579 max_response_body: usize,
580 max_inflight_requests: usize,
581}
582
583pub struct ServerRegistry {
585 inner: Mutex<HashMap<ServerKey, Arc<OnceCell<ServerHandle>>>>,
586}
587
588impl ServerRegistry {
589 pub fn global() -> &'static Self {
591 static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
592 INSTANCE.get_or_init(|| ServerRegistry {
593 inner: Mutex::new(HashMap::new()),
594 })
595 }
596
597 pub async fn get_or_spawn(
600 &'static self,
601 host: &str,
602 port: u16,
603 max_request_body: usize,
604 max_response_body: usize,
605 max_inflight_requests: usize,
606 ) -> Result<HttpRouteRegistry, CamelError> {
607 let host_owned = host.to_string();
608
609 let cell = {
610 let mut guard = self.inner.lock().map_err(|_| {
611 CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
612 })?;
613 let key = (host.to_string(), port);
614 guard
615 .entry(key)
616 .or_insert_with(|| Arc::new(OnceCell::new()))
617 .clone()
618 };
619
620 if let Some(existing) = cell.get()
621 && existing.max_request_body != max_request_body
622 {
623 return Err(CamelError::EndpointCreationFailed(format!(
624 "incompatible maxRequestBody for shared server (host={host}, port={port}): {} vs {}",
625 existing.max_request_body, max_request_body
626 )));
627 }
628
629 if let Some(existing) = cell.get()
630 && existing.max_response_body != max_response_body
631 {
632 return Err(CamelError::EndpointCreationFailed(format!(
633 "incompatible maxResponseBody for shared server (host={host}, port={port}): {} vs {}",
634 existing.max_response_body, max_response_body
635 )));
636 }
637
638 if let Some(existing) = cell.get()
639 && existing.max_inflight_requests != max_inflight_requests
640 {
641 return Err(CamelError::EndpointCreationFailed(format!(
642 "incompatible maxInflightRequests for shared server (host={host}, port={port}): {} vs {}",
643 existing.max_inflight_requests, max_inflight_requests
644 )));
645 }
646
647 let handle = cell
648 .get_or_try_init(|| async {
649 let addr = format!("{host_owned}:{port}");
650 let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
651 CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
652 })?;
653 let registry = HttpRouteRegistry::new();
654 let inflight = Arc::new(tokio::sync::Semaphore::new(max_inflight_requests));
655 let task = tokio::spawn(run_axum_server(
656 listener,
657 registry.clone(),
658 max_request_body,
659 max_response_body,
660 Arc::clone(&inflight),
661 ));
662 let addr_for_monitor = format!("{host_owned}:{port}");
663 tokio::spawn(monitor_axum_task(task, addr_for_monitor));
664 Ok::<ServerHandle, CamelError>(ServerHandle {
665 registry,
666 max_request_body,
667 max_response_body,
668 max_inflight_requests,
669 })
670 })
671 .await?;
672
673 Ok(handle.registry.clone())
674 }
675
676 #[cfg(test)]
683 pub fn reset() {
684 let instance = Self::global();
685 let mut guard = instance
686 .inner
687 .lock()
688 .expect("ServerRegistry lock poisoned during test reset");
689 guard.clear();
690 }
691}
692
693use axum::{
698 Router,
699 body::Body as AxumBody,
700 extract::{Request, State},
701 http::{Response, StatusCode},
702 response::IntoResponse,
703};
704
705#[derive(Clone)]
706pub(crate) struct AppState {
707 registry: HttpRouteRegistry,
708 max_request_body: usize,
709 max_response_body: usize,
710 inflight: Arc<tokio::sync::Semaphore>,
711}
712
713async fn run_axum_server(
714 listener: tokio::net::TcpListener,
715 registry: HttpRouteRegistry,
716 max_request_body: usize,
717 max_response_body: usize,
718 inflight: Arc<tokio::sync::Semaphore>,
719) {
720 let state = AppState {
721 registry,
722 max_request_body,
723 max_response_body,
724 inflight,
725 };
726 let app = Router::new().fallback(dispatch_handler).with_state(state);
727
728 axum::serve(listener, app).await.unwrap_or_else(|e| {
729 tracing::error!(error = %e, "Axum server error");
730 });
731}
732
733async fn monitor_axum_task(handle: tokio::task::JoinHandle<()>, addr: String) {
741 match handle.await {
742 Ok(()) => {
743 }
745 Err(join_err) => {
746 tracing::error!(
747 addr = %addr,
748 error = %join_err,
749 "Axum server task exited unexpectedly — all routes on this port are now dead"
750 );
751 }
752 }
753}
754
755async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
756 let path = req.uri().path().to_owned();
757
758 let api_sender = {
760 let inner = state.registry.inner.read().await;
761 inner.api_routes.get(&path).cloned()
762 }; if let Some(sender) = api_sender {
765 let method = req.method().to_string();
766 let query = req.uri().query().unwrap_or("").to_string();
767 let headers = req.headers().clone();
768
769 let content_length: Option<u64> = headers
771 .get(http::header::CONTENT_LENGTH)
772 .and_then(|v| v.to_str().ok())
773 .and_then(|s| s.parse().ok());
774
775 if let Some(len) = content_length
776 && len > state.max_request_body as u64
777 {
778 return Response::builder()
779 .status(StatusCode::PAYLOAD_TOO_LARGE)
780 .body(AxumBody::from("Request body exceeds configured limit"))
781 .expect("infallible"); }
783
784 let _permit = match Arc::clone(&state.inflight).try_acquire_owned() {
785 Ok(permit) => permit,
786 Err(_) => {
787 return Response::builder()
788 .status(StatusCode::SERVICE_UNAVAILABLE)
789 .body(AxumBody::from("Service Unavailable"))
790 .expect("infallible"); }
792 };
793
794 let content_type = headers
796 .get(http::header::CONTENT_TYPE)
797 .and_then(|v| v.to_str().ok())
798 .map(|s| s.to_string());
799
800 let data_stream: BodyDataStream = req.into_body().into_data_stream();
801 let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
802 let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
803
804 let stream_body = StreamBody {
805 stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
806 metadata: StreamMetadata {
807 size_hint: content_length,
808 content_type,
809 origin: None,
810 },
811 };
812
813 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
814 let envelope = RequestEnvelope {
815 method,
816 path,
817 query,
818 headers,
819 body: stream_body,
820 reply_tx,
821 };
822
823 if sender.send(envelope).await.is_err() {
824 return Response::builder()
825 .status(StatusCode::SERVICE_UNAVAILABLE)
826 .body(AxumBody::from("Consumer unavailable"))
827 .expect("infallible"); }
829
830 match reply_rx.await {
831 Ok(reply) => {
832 let reply = match reply.body {
833 HttpReplyBody::Bytes(b)
834 if exceeds_max_response_body(b.len(), state.max_response_body) =>
835 {
836 HttpReply {
837 status: 500,
838 headers: vec![],
839 body: HttpReplyBody::Bytes(bytes::Bytes::from(
840 "Response body exceeds configured limit",
841 )),
842 }
843 }
844 _ => reply,
845 };
846
847 let status =
848 StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
849 let mut builder = Response::builder().status(status);
850 for (k, v) in &reply.headers {
851 builder = builder.header(k.as_str(), v.as_str());
852 }
853 match reply.body {
854 HttpReplyBody::Bytes(b) => {
855 builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
856 Response::builder()
857 .status(StatusCode::INTERNAL_SERVER_ERROR)
858 .body(AxumBody::from("Invalid response headers from consumer"))
859 .expect("infallible") })
861 }
862 HttpReplyBody::Stream(stream) => builder
863 .body(AxumBody::from_stream(stream))
864 .unwrap_or_else(|_| {
865 Response::builder()
866 .status(StatusCode::INTERNAL_SERVER_ERROR)
867 .body(AxumBody::from("Invalid response headers from consumer"))
868 .expect("infallible") }),
870 }
871 }
872 Err(_) => Response::builder()
873 .status(StatusCode::INTERNAL_SERVER_ERROR)
874 .body(AxumBody::from("Pipeline error"))
875 .expect("infallible"), }
877 } else {
878 static_dispatch::dispatch_static(&state, req, &path).await
880 }
881}
882
883fn exceeds_max_response_body(len: usize, max: usize) -> bool {
884 len > max
885}
886
887fn title_case_header(name: &str) -> String {
888 name.split('-')
889 .map(|part| {
890 let mut chars = part.chars();
891 match chars.next() {
892 None => String::new(),
893 Some(first) => first.to_uppercase().chain(chars.as_str().chars()).collect(),
894 }
895 })
896 .collect::<Vec<_>>()
897 .join("-")
898}
899
900pub struct HttpConsumer {
905 config: HttpServerConfig,
906}
907
908impl HttpConsumer {
909 pub fn new(config: HttpServerConfig) -> Self {
910 Self { config }
911 }
912}
913
914#[async_trait::async_trait]
915impl Consumer for HttpConsumer {
916 async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
917 use camel_component_api::{Body, Exchange, Message};
918
919 let registry = ServerRegistry::global()
920 .get_or_spawn(
921 &self.config.host,
922 self.config.port,
923 self.config.max_request_body,
924 self.config.max_response_body,
925 self.config.max_inflight_requests,
926 )
927 .await?;
928
929 let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
931 registry
932 .register_api_route(self.config.path.clone(), env_tx)
933 .await;
934
935 let path = self.config.path.clone();
936 let registry_for_cleanup = registry.clone();
937 let cancel_token = ctx.cancel_token();
938 loop {
939 tokio::select! {
940 _ = ctx.cancelled() => {
941 break;
942 }
943 envelope = env_rx.recv() => {
944 let Some(envelope) = envelope else { break; };
945
946 let mut msg = Message::default();
948
949 msg.set_header("CamelHttpMethod",
951 serde_json::Value::String(envelope.method.clone()));
952 msg.set_header("CamelHttpPath",
953 serde_json::Value::String(envelope.path.clone()));
954 msg.set_header("CamelHttpQuery",
955 serde_json::Value::String(envelope.query.clone()));
956
957 for (k, v) in &envelope.headers {
959 if let Ok(val_str) = v.to_str() {
960 msg.set_header(
961 title_case_header(k.as_str()),
962 serde_json::Value::String(val_str.to_string()),
963 );
964 }
965 }
966
967 msg.body = Body::Stream(envelope.body);
970
971 #[allow(unused_mut)]
972 let mut exchange = Exchange::new(msg);
973
974 #[cfg(feature = "otel")]
976 {
977 let headers: HashMap<String, String> = envelope
978 .headers
979 .iter()
980 .filter_map(|(k, v)| {
981 Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
982 })
983 .collect();
984 camel_otel::extract_into_exchange(&mut exchange, &headers);
985 }
986
987 let reply_tx = envelope.reply_tx;
988 let sender = ctx.sender().clone();
989 let path_clone = path.clone();
990 let cancel = cancel_token.clone();
991
992 tokio::spawn(async move {
1012 if cancel.is_cancelled() {
1020 let _ = reply_tx.send(HttpReply {
1021 status: 503,
1022 headers: vec![],
1023 body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
1024 });
1025 return;
1026 }
1027
1028 let (tx, rx) = tokio::sync::oneshot::channel();
1030 let envelope = camel_component_api::consumer::ExchangeEnvelope {
1031 exchange,
1032 reply_tx: Some(tx),
1033 };
1034
1035 let result = match sender.send(envelope).await {
1036 Ok(()) => rx.await.map_err(|_| camel_component_api::CamelError::ChannelClosed),
1037 Err(_) => Err(camel_component_api::CamelError::ChannelClosed),
1038 }
1039 .and_then(|r| r);
1040
1041 let reply = match result {
1042 Ok(out) => {
1043 let status = out
1044 .input
1045 .header("CamelHttpResponseCode")
1046 .and_then(|v| {
1047 let raw = v.as_u64()
1048 .or_else(|| v.as_str().and_then(|s| s.parse().ok()))?;
1049 let code = raw as u16;
1050 (100..1000).contains(&code).then_some(code)
1051 })
1052 .unwrap_or(200);
1053
1054 let user_content_type = out
1055 .input
1056 .header("Content-Type")
1057 .and_then(|v| v.as_str().map(|s| s.to_string()));
1058
1059 let (reply_body, inferred_content_type): (HttpReplyBody, Option<String>) = match out.input.body {
1060 Body::Empty => (HttpReplyBody::Bytes(bytes::Bytes::new()), None),
1061 Body::Bytes(b) => (HttpReplyBody::Bytes(b), None),
1062 Body::Text(s) => (HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())), Some("text/plain; charset=utf-8".to_string())),
1063 Body::Xml(s) => (HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())), Some("application/xml".to_string())),
1064 Body::Json(v) => (HttpReplyBody::Bytes(bytes::Bytes::from(
1065 v.to_string().into_bytes(),
1066 )), Some("application/json".to_string())),
1067 Body::Stream(s) => {
1068 let ct = s.metadata.content_type.clone();
1069 match s.stream.lock().await.take() {
1070 Some(stream) => (
1071 HttpReplyBody::Stream(stream),
1072 ct,
1073 ),
1074 None => {
1075 tracing::error!(
1076 "Body::Stream already consumed before HTTP reply — returning 500"
1077 );
1078 let error_reply = HttpReply {
1079 status: 500,
1080 headers: vec![],
1081 body: HttpReplyBody::Bytes(bytes::Bytes::new()),
1082 };
1083 if reply_tx.send(error_reply).is_err() {
1084 debug!("reply_tx dropped before error reply could be sent");
1085 }
1086 return;
1087 }
1088 }
1089 }
1090 };
1091
1092 let mut resp_headers: Vec<(String, String)> = out
1093 .input
1094 .headers
1095 .iter()
1096 .filter(|(k, _)| !k.starts_with("Camel"))
1097 .filter(|(k, _)| {
1098 !matches!(
1099 k.to_lowercase().as_str(),
1100 "content-length"
1101 | "content-type"
1102 | "transfer-encoding"
1103 | "connection"
1104 | "cache-control"
1105 | "date"
1106 | "pragma"
1107 | "trailer"
1108 | "upgrade"
1109 | "via"
1110 | "warning"
1111 | "host"
1112 | "user-agent"
1113 | "accept"
1114 | "accept-encoding"
1115 | "accept-language"
1116 | "accept-charset"
1117 | "authorization"
1118 | "proxy-authorization"
1119 | "cookie"
1120 | "expect"
1121 | "from"
1122 | "if-match"
1123 | "if-modified-since"
1124 | "if-none-match"
1125 | "if-range"
1126 | "if-unmodified-since"
1127 | "max-forwards"
1128 | "proxy-connection"
1129 | "range"
1130 | "referer"
1131 | "te"
1132 )
1133 })
1134 .filter_map(|(k, v)| {
1135 v.as_str().map(|s| (k.clone(), s.to_string()))
1136 })
1137 .collect();
1138
1139 let content_type = user_content_type
1140 .or(inferred_content_type);
1141 if let Some(ct) = content_type {
1142 resp_headers.push(("Content-Type".to_string(), ct));
1143 }
1144
1145 HttpReply {
1146 status,
1147 headers: resp_headers,
1148 body: reply_body,
1149 }
1150 }
1151 Err(CamelError::Stopped) => {
1152 tracing::debug!(path = %path_clone, "Route stopped — returning 204 No Content");
1153 HttpReply {
1154 status: 204,
1155 headers: vec![],
1156 body: HttpReplyBody::Bytes(bytes::Bytes::new()),
1157 }
1158 }
1159 Err(CamelError::Unauthenticated(msg)) => {
1160 tracing::warn!(error = %msg, path = %path_clone, "Authentication failed");
1161 HttpReply {
1162 status: 401,
1163 headers: vec![("WWW-Authenticate".to_string(), "Bearer".to_string())],
1164 body: HttpReplyBody::Bytes(bytes::Bytes::from("Unauthorized")),
1165 }
1166 }
1167 Err(CamelError::Unauthorized(msg)) => {
1168 tracing::warn!(error = %msg, path = %path_clone, "Authorization failed");
1169 HttpReply {
1170 status: 403,
1171 headers: vec![],
1172 body: HttpReplyBody::Bytes(bytes::Bytes::from("Forbidden")),
1173 }
1174 }
1175 Err(e) => {
1176 tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
1177 HttpReply {
1178 status: 500,
1179 headers: vec![],
1180 body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
1181 }
1182 }
1183 };
1184
1185 let _ = reply_tx.send(reply);
1187 });
1188 }
1189 }
1190 }
1191
1192 registry_for_cleanup.unregister_api_route(&path).await;
1194
1195 Ok(())
1196 }
1197
1198 async fn stop(&mut self) -> Result<(), CamelError> {
1199 Ok(())
1200 }
1201
1202 fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1203 camel_component_api::ConcurrencyModel::Concurrent { max: None }
1204 }
1205}
1206
1207pub struct HttpComponent {
1212 config: HttpConfig,
1213}
1214
1215fn build_client(config: &HttpConfig, cookie_handling: CookieHandling) -> reqwest::Client {
1216 let mut builder = reqwest::Client::builder()
1217 .connect_timeout(Duration::from_millis(config.connect_timeout_ms))
1218 .pool_max_idle_per_host(config.pool_max_idle_per_host)
1219 .pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
1220
1221 if !config.follow_redirects {
1222 builder = builder.redirect(reqwest::redirect::Policy::none());
1223 } else if let Some(max_redirects) = config.max_redirects {
1224 builder = builder.redirect(reqwest::redirect::Policy::limited(max_redirects));
1225 }
1226
1227 if let Some(proxy_url) = &config.proxy_url
1228 && let Ok(proxy) = reqwest::Proxy::all(proxy_url)
1229 {
1230 builder = builder.proxy(proxy);
1231 }
1232
1233 if matches!(cookie_handling, CookieHandling::InMemory) {
1234 }
1236
1237 if let Some(tls) = &config.tls
1238 && tls.enabled
1239 {
1240 if tls.insecure || !tls.verify_peer {
1241 builder = builder.danger_accept_invalid_certs(true);
1242 }
1243
1244 if let Some(ca_path) = &tls.ca_cert_path
1245 && let Ok(ca_bytes) = std::fs::read(ca_path)
1246 {
1247 let cert = reqwest::Certificate::from_pem(&ca_bytes)
1248 .or_else(|_| reqwest::Certificate::from_der(&ca_bytes));
1249 if let Ok(ca_cert) = cert {
1250 builder = builder.add_root_certificate(ca_cert);
1251 }
1252 }
1253
1254 if let (Some(cert_path), Some(key_path)) = (&tls.client_cert_path, &tls.client_key_path)
1255 && let (Ok(cert_bytes), Ok(key_bytes)) =
1256 (std::fs::read(cert_path), std::fs::read(key_path))
1257 {
1258 let mut identity_pem = cert_bytes;
1259 identity_pem.extend_from_slice(&key_bytes);
1260 if let Ok(identity) = reqwest::Identity::from_pem(&identity_pem) {
1261 builder = builder.identity(identity);
1262 }
1263 }
1264 }
1265
1266 builder
1267 .build()
1268 .expect("reqwest::Client::build() with valid config should not fail") }
1270
1271impl HttpComponent {
1272 pub fn new() -> Self {
1273 let config = HttpConfig::default();
1274 Self { config }
1275 }
1276
1277 pub fn with_config(config: HttpConfig) -> Self {
1278 Self { config }
1279 }
1280
1281 pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1282 match config {
1283 Some(cfg) => Self::with_config(cfg),
1284 None => Self::new(),
1285 }
1286 }
1287}
1288
1289impl Default for HttpComponent {
1290 fn default() -> Self {
1291 Self::new()
1292 }
1293}
1294
1295impl Component for HttpComponent {
1296 fn scheme(&self) -> &str {
1297 "http"
1298 }
1299
1300 fn create_endpoint(
1301 &self,
1302 uri: &str,
1303 ctx: &dyn camel_component_api::ComponentContext,
1304 ) -> Result<Box<dyn Endpoint>, CamelError> {
1305 self.config.validate()?;
1306 let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1307 let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1308 let client = build_client(&self.config, config.cookie_handling);
1309 ctx.register_current_route_health_check(Arc::new(HttpHealthCheck::new(
1310 server_config.host.clone(),
1311 server_config.port,
1312 )));
1313 Ok(Box::new(HttpEndpoint {
1314 uri: uri.to_string(),
1315 config,
1316 server_config,
1317 client,
1318 }))
1319 }
1320}
1321
1322pub struct HttpsComponent {
1323 config: HttpConfig,
1324}
1325
1326impl HttpsComponent {
1327 pub fn new() -> Self {
1328 let config = HttpConfig::default();
1329 Self { config }
1330 }
1331
1332 pub fn with_config(config: HttpConfig) -> Self {
1333 Self { config }
1334 }
1335
1336 pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1337 match config {
1338 Some(cfg) => Self::with_config(cfg),
1339 None => Self::new(),
1340 }
1341 }
1342}
1343
1344impl Default for HttpsComponent {
1345 fn default() -> Self {
1346 Self::new()
1347 }
1348}
1349
1350impl Component for HttpsComponent {
1351 fn scheme(&self) -> &str {
1352 "https"
1353 }
1354
1355 fn create_endpoint(
1356 &self,
1357 uri: &str,
1358 ctx: &dyn camel_component_api::ComponentContext,
1359 ) -> Result<Box<dyn Endpoint>, CamelError> {
1360 self.config.validate()?;
1361 let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1362 let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1363 let client = build_client(&self.config, config.cookie_handling);
1364 ctx.register_current_route_health_check(Arc::new(HttpHealthCheck::new(
1365 server_config.host.clone(),
1366 server_config.port,
1367 )));
1368 Ok(Box::new(HttpEndpoint {
1369 uri: uri.to_string(),
1370 config,
1371 server_config,
1372 client,
1373 }))
1374 }
1375}
1376
1377struct HttpEndpoint {
1382 uri: String,
1383 config: HttpEndpointConfig,
1384 server_config: HttpServerConfig,
1385 client: reqwest::Client,
1386}
1387
1388impl Endpoint for HttpEndpoint {
1389 fn uri(&self) -> &str {
1390 &self.uri
1391 }
1392
1393 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1394 Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
1395 }
1396
1397 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1398 let producer = HttpProducer {
1399 config: Arc::new(self.config.clone()),
1400 client: self.client.clone(),
1401 };
1402 if let Some(ref provider) = self.config.token_provider {
1403 let layer = BearerTokenLayer::new(Arc::clone(provider));
1404 Ok(BoxProcessor::new(layer.layer(producer)))
1405 } else {
1406 Ok(BoxProcessor::new(producer))
1407 }
1408 }
1409}
1410
1411fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
1416 let parsed = url::Url::parse(url)
1417 .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1418
1419 if let Some(host) = parsed.host_str()
1421 && config.blocked_hosts.iter().any(|blocked| host == blocked)
1422 {
1423 return Err(CamelError::ProcessorError(format!(
1424 "Host '{}' is blocked",
1425 host
1426 )));
1427 }
1428
1429 if !config.allow_private_ips
1431 && let Some(host) = parsed.host()
1432 {
1433 match host {
1434 url::Host::Ipv4(ip) => {
1435 if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
1436 return Err(CamelError::ProcessorError(format!(
1437 "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
1438 ip
1439 )));
1440 }
1441 }
1442 url::Host::Ipv6(ip) => {
1443 if ip.is_loopback() {
1444 return Err(CamelError::ProcessorError(format!(
1445 "Loopback IP '{}' not allowed",
1446 ip
1447 )));
1448 }
1449 }
1450 url::Host::Domain(domain) => {
1451 let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
1453 if blocked_domains.contains(&domain) {
1454 return Err(CamelError::ProcessorError(format!(
1455 "Domain '{}' is not allowed",
1456 domain
1457 )));
1458 }
1459 }
1460 }
1461 }
1462
1463 Ok(())
1464}
1465
1466fn is_private_ip(ip: &IpAddr) -> bool {
1467 match ip {
1468 IpAddr::V4(ipv4) => {
1469 ipv4.is_private() || ipv4.is_loopback() || ipv4.is_link_local() || ipv4.octets()[0] == 0
1470 }
1471 IpAddr::V6(ipv6) => {
1472 let seg0 = ipv6.segments()[0];
1473 ipv6.is_loopback()
1474 || (seg0 & 0xfe00) == 0xfc00
1476 || (seg0 & 0xffc0) == 0xfe80
1478 || ipv6
1480 .to_ipv4_mapped()
1481 .map(|v4| {
1482 v4.is_private()
1483 || v4.is_loopback()
1484 || v4.is_link_local()
1485 || v4.octets()[0] == 0
1486 })
1487 .unwrap_or(false)
1488 }
1489 }
1490}
1491
1492async fn validate_resolved_host_for_ssrf(
1493 url: &str,
1494 config: &HttpEndpointConfig,
1495) -> Result<(), CamelError> {
1496 if config.allow_private_ips {
1497 return Ok(());
1498 }
1499
1500 let parsed = url::Url::parse(url)
1501 .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1502 let Some(host) = parsed.host_str() else {
1503 return Ok(());
1504 };
1505 let Some(port) = parsed.port_or_known_default() else {
1506 return Ok(());
1507 };
1508
1509 let resolved = tokio::net::lookup_host((host, port)).await.map_err(|e| {
1510 CamelError::ProcessorError(format!("Failed to resolve host '{}': {}", host, e))
1511 })?;
1512
1513 for addr in resolved {
1514 let ip = addr.ip();
1515 if is_private_ip(&ip) {
1516 return Err(CamelError::ProcessorError(format!(
1517 "Target resolved to private IP: {}",
1518 ip
1519 )));
1520 }
1521 }
1522
1523 Ok(())
1524}
1525
1526#[derive(Clone)]
1531struct HttpProducer {
1532 config: Arc<HttpEndpointConfig>,
1533 client: reqwest::Client,
1534}
1535
1536impl HttpProducer {
1537 fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1538 if let Some(ref method) = config.http_method {
1539 return method.to_uppercase();
1540 }
1541 if let Some(method) = exchange
1542 .input
1543 .header("CamelHttpMethod")
1544 .and_then(|v| v.as_str())
1545 {
1546 return method.to_uppercase();
1547 }
1548 if !exchange.input.body.is_empty() {
1549 return "POST".to_string();
1550 }
1551 "GET".to_string()
1552 }
1553
1554 fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1555 if let Some(uri) = exchange
1556 .input
1557 .header("CamelHttpUri")
1558 .and_then(|v| v.as_str())
1559 {
1560 let mut url = uri.to_string();
1561 if let Some(path) = exchange
1562 .input
1563 .header("CamelHttpPath")
1564 .and_then(|v| v.as_str())
1565 {
1566 if !url.ends_with('/') && !path.starts_with('/') {
1567 url.push('/');
1568 }
1569 url.push_str(path);
1570 }
1571 if let Some(query) = exchange
1572 .input
1573 .header("CamelHttpQuery")
1574 .and_then(|v| v.as_str())
1575 {
1576 url.push('?');
1577 url.push_str(query);
1578 }
1579 return url;
1580 }
1581
1582 let mut url = config.base_url.clone();
1583
1584 if let Some(path) = exchange
1585 .input
1586 .header("CamelHttpPath")
1587 .and_then(|v| v.as_str())
1588 {
1589 if !url.ends_with('/') && !path.starts_with('/') {
1590 url.push('/');
1591 }
1592 url.push_str(path);
1593 }
1594
1595 if let Some(query) = exchange
1596 .input
1597 .header("CamelHttpQuery")
1598 .and_then(|v| v.as_str())
1599 {
1600 url.push('?');
1601 url.push_str(query);
1602 } else if !config.query_params.is_empty() {
1603 let mut parsed = url::Url::parse(&url).expect("base URL must be valid"); for (k, v) in &config.query_params {
1605 parsed.query_pairs_mut().append_pair(k, v);
1606 }
1607 url = parsed.to_string();
1608 }
1609
1610 url
1611 }
1612
1613 fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1614 status >= range.0 && status <= range.1
1615 }
1616}
1617
1618impl Service<Exchange> for HttpProducer {
1619 type Response = Exchange;
1620 type Error = CamelError;
1621 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1622
1623 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1624 Poll::Ready(Ok(()))
1625 }
1626
1627 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1628 let config = self.config.clone();
1629 let client = self.client.clone();
1630
1631 Box::pin(async move {
1632 let method_str = HttpProducer::resolve_method(&exchange, &config);
1633 let url = HttpProducer::resolve_url(&exchange, &config);
1634
1635 validate_url_for_ssrf(&url, &config)?;
1637 validate_resolved_host_for_ssrf(&url, &config).await?;
1638
1639 debug!(
1640 correlation_id = %exchange.correlation_id(),
1641 method = %method_str,
1642 url = %url,
1643 "HTTP request"
1644 );
1645
1646 let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1647 CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1648 })?;
1649
1650 let mut request = client.request(method, &url);
1651
1652 if let Some(timeout) = config.response_timeout {
1653 request = request.timeout(timeout);
1654 }
1655
1656 if let Some(user_agent) = &config.user_agent
1657 && !config.bridge_endpoint
1658 {
1659 request = request.header("User-Agent", user_agent.clone());
1660 }
1661
1662 #[cfg(feature = "otel")]
1664 let should_inject_otel = !config.bridge_endpoint;
1665 #[cfg(feature = "otel")]
1666 if should_inject_otel {
1667 let mut otel_headers = HashMap::new();
1668 camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1669 for (k, v) in otel_headers {
1670 if let (Ok(name), Ok(val)) = (
1671 reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1672 reqwest::header::HeaderValue::from_str(&v),
1673 ) {
1674 request = request.header(name, val);
1675 }
1676 }
1677 }
1678
1679 for (key, value) in &exchange.input.headers {
1680 if !key.starts_with("Camel")
1681 && !config
1682 .skip_request_headers
1683 .iter()
1684 .any(|h| h.eq_ignore_ascii_case(key))
1685 && let Some(val_str) = value.as_str()
1686 && let (Ok(name), Ok(val)) = (
1687 reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1688 reqwest::header::HeaderValue::from_str(val_str),
1689 )
1690 {
1691 request = request.header(name, val);
1692 }
1693 }
1694
1695 if !config.bridge_endpoint {
1696 match &config.auth {
1697 HttpAuth::None => {}
1698 HttpAuth::Basic { username, password } => {
1699 request = request.basic_auth(username, Some(password));
1700 }
1701 HttpAuth::Bearer { token } => {
1702 request = request.bearer_auth(token);
1703 }
1704 }
1705
1706 if config.connection_close {
1707 request = request.header("Connection", "close");
1708 }
1709 }
1710
1711 match exchange.input.body {
1712 Body::Stream(ref s) => {
1713 let mut stream_lock = s.stream.lock().await;
1714 if let Some(stream) = stream_lock.take() {
1715 request = request.body(reqwest::Body::wrap_stream(stream));
1716 } else {
1717 return Err(CamelError::AlreadyConsumed);
1718 }
1719 }
1720 _ => {
1721 let body = std::mem::take(&mut exchange.input.body);
1723 let bytes = body.into_bytes(config.max_body_size).await?;
1724 if !bytes.is_empty() {
1725 request = request.body(bytes);
1726 }
1727 }
1728 }
1729
1730 let response = request
1731 .send()
1732 .await
1733 .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1734
1735 let status_code = response.status().as_u16();
1736 let status_text = response
1737 .status()
1738 .canonical_reason()
1739 .unwrap_or("Unknown")
1740 .to_string();
1741
1742 for (key, value) in response.headers() {
1743 if config
1744 .skip_response_headers
1745 .iter()
1746 .any(|h| h.eq_ignore_ascii_case(key.as_str()))
1747 {
1748 continue;
1749 }
1750 if let Ok(val_str) = value.to_str() {
1751 exchange.input.set_header(
1752 title_case_header(key.as_str()),
1753 serde_json::Value::String(val_str.to_string()),
1754 );
1755 }
1756 }
1757
1758 exchange.input.set_header(
1759 "CamelHttpResponseCode",
1760 serde_json::Value::Number(status_code.into()),
1761 );
1762 exchange.input.set_header(
1763 "CamelHttpResponseText",
1764 serde_json::Value::String(status_text.clone()),
1765 );
1766
1767 let read_timeout = Duration::from_millis(config.read_timeout_ms);
1769 let response_body = tokio::time::timeout(read_timeout, async {
1770 if let Some(content_len) = response.content_length()
1772 && content_len > config.max_response_bytes as u64
1773 {
1774 return Err(CamelError::ProcessorError(format!(
1775 "Response body too large: {} bytes exceeds limit of {} bytes",
1776 content_len, config.max_response_bytes
1777 )));
1778 }
1779 use futures::TryStreamExt;
1781 let mut stream = response.bytes_stream();
1782 let mut total: usize = 0;
1783 let mut collected = Vec::new();
1784 while let Some(chunk) = stream.try_next().await.map_err(|e| {
1785 CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1786 })? {
1787 total += chunk.len();
1788 if total > config.max_response_bytes {
1789 return Err(CamelError::ProcessorError(format!(
1790 "Response body too large: {} bytes exceeds limit of {} bytes",
1791 total, config.max_response_bytes
1792 )));
1793 }
1794 collected.push(chunk);
1795 }
1796 let mut result = bytes::BytesMut::with_capacity(total);
1797 for chunk in collected {
1798 result.extend_from_slice(&chunk);
1799 }
1800 Ok::<bytes::Bytes, CamelError>(result.freeze())
1801 })
1802 .await
1803 .map_err(|_| {
1804 CamelError::ProcessorError(format!(
1805 "Read timeout after {}ms",
1806 config.read_timeout_ms
1807 ))
1808 })??;
1809
1810 if config.throw_exception_on_failure
1811 && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1812 {
1813 return Err(CamelError::HttpOperationFailed {
1814 method: method_str,
1815 url,
1816 status_code,
1817 status_text,
1818 response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1819 });
1820 }
1821
1822 if !response_body.is_empty() {
1823 exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1824 }
1825
1826 debug!(
1827 correlation_id = %exchange.correlation_id(),
1828 status = status_code,
1829 url = %url,
1830 "HTTP response"
1831 );
1832 Ok(exchange)
1833 })
1834 }
1835}
1836
1837#[cfg(test)]
1847pub(crate) static REGISTRY_TEST_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
1848
1849#[cfg(test)]
1850mod tests {
1851 use super::*;
1852 use camel_component_api::{Message, NoOpComponentContext};
1853 use std::sync::Arc;
1854 use std::time::Duration;
1855
1856 fn test_producer_ctx() -> ProducerContext {
1857 ProducerContext::new()
1858 }
1859
1860 #[test]
1861 fn test_http_config_defaults() {
1862 let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1863 assert_eq!(config.base_url, "http://localhost:8080/api");
1864 assert!(config.http_method.is_none());
1865 assert!(config.throw_exception_on_failure);
1866 assert_eq!(config.ok_status_code_range, (200, 299));
1867 assert!(config.response_timeout.is_none());
1868 assert!(matches!(config.auth, HttpAuth::None));
1869 assert!(matches!(config.cookie_handling, CookieHandling::Disabled));
1870 assert!(!config.bridge_endpoint);
1871 assert!(!config.connection_close);
1872 }
1873
1874 #[test]
1875 fn test_http_config_scheme() {
1876 assert_eq!(HttpEndpointConfig::scheme(), "http");
1878 }
1879
1880 #[test]
1881 fn test_http_config_from_components() {
1882 let components = camel_component_api::UriComponents {
1884 scheme: "https".to_string(),
1885 path: "//api.example.com/v1".to_string(),
1886 params: std::collections::HashMap::from([(
1887 "httpMethod".to_string(),
1888 "POST".to_string(),
1889 )]),
1890 };
1891 let config = HttpEndpointConfig::from_components(components).unwrap();
1892 assert_eq!(config.base_url, "https://api.example.com/v1");
1893 assert_eq!(config.http_method, Some("POST".to_string()));
1894 }
1895
1896 #[test]
1897 fn test_http_config_with_options() {
1898 let config = HttpEndpointConfig::from_uri(
1899 "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1900 ).unwrap();
1901 assert_eq!(config.base_url, "https://api.example.com/v1");
1902 assert_eq!(config.http_method, Some("PUT".to_string()));
1903 assert!(!config.throw_exception_on_failure);
1904 assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1905 }
1906
1907 #[test]
1908 fn test_http_endpoint_config_auth_and_headers_options() {
1909 let config = HttpEndpointConfig::from_uri(
1910 "http://localhost/api?authMethod=Basic&authUsername=u&authPassword=p&userAgent=camel-test&bridgeEndpoint=true&connectionClose=true&skipRequestHeaders=Authorization,X-Secret&skipResponseHeaders=Set-Cookie&cookieHandling=InMemory",
1911 )
1912 .unwrap();
1913
1914 assert!(matches!(
1915 config.auth,
1916 HttpAuth::Basic { username, password } if username == "u" && password == "p"
1917 ));
1918 assert_eq!(config.user_agent.as_deref(), Some("camel-test"));
1919 assert!(matches!(config.cookie_handling, CookieHandling::InMemory));
1920 assert!(config.bridge_endpoint);
1921 assert!(config.connection_close);
1922 assert_eq!(
1923 config.skip_request_headers,
1924 vec!["authorization".to_string(), "x-secret".to_string()]
1925 );
1926 assert_eq!(config.skip_response_headers, vec!["set-cookie".to_string()]);
1927 }
1928
1929 #[test]
1930 fn test_http_endpoint_config_bearer_auth() {
1931 let config = HttpEndpointConfig::from_uri(
1932 "http://localhost/api?authMethod=Bearer&authBearerToken=t",
1933 )
1934 .unwrap();
1935 assert!(matches!(
1936 config.auth,
1937 HttpAuth::Bearer { token } if token == "t"
1938 ));
1939 }
1940
1941 #[test]
1942 fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
1943 let config = HttpConfig::default()
1944 .with_response_timeout_ms(999)
1945 .with_allow_private_ips(true)
1946 .with_blocked_hosts(vec!["evil.com".to_string()])
1947 .with_max_body_size(12345);
1948 let endpoint =
1949 HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
1950 assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
1951 assert!(endpoint.allow_private_ips);
1952 assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
1953 assert_eq!(endpoint.max_body_size, 12345);
1954 }
1955
1956 #[test]
1957 fn test_from_uri_with_defaults_uri_overrides_config() {
1958 let config = HttpConfig::default()
1959 .with_response_timeout_ms(999)
1960 .with_allow_private_ips(true)
1961 .with_blocked_hosts(vec!["evil.com".to_string()])
1962 .with_max_body_size(12345);
1963 let endpoint = HttpEndpointConfig::from_uri_with_defaults(
1964 "http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
1965 &config,
1966 )
1967 .unwrap();
1968 assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
1969 assert!(!endpoint.allow_private_ips);
1970 assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
1971 assert_eq!(endpoint.max_body_size, 99);
1972 }
1973
1974 #[test]
1975 fn test_http_config_ok_status_range() {
1976 let config =
1977 HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1978 assert_eq!(config.ok_status_code_range, (200, 204));
1979 }
1980
1981 #[test]
1982 fn test_http_config_wrong_scheme() {
1983 let result = HttpEndpointConfig::from_uri("file:/tmp");
1984 assert!(result.is_err());
1985 }
1986
1987 #[test]
1988 fn test_http_component_scheme() {
1989 let component = HttpComponent::new();
1990 assert_eq!(component.scheme(), "http");
1991 }
1992
1993 #[test]
1994 fn test_https_component_scheme() {
1995 let component = HttpsComponent::new();
1996 assert_eq!(component.scheme(), "https");
1997 }
1998
1999 #[test]
2000 fn test_http_endpoint_creates_consumer() {
2001 let component = HttpComponent::new();
2002 let ctx = NoOpComponentContext;
2003 let endpoint = component
2004 .create_endpoint("http://0.0.0.0:19100/test", &ctx)
2005 .unwrap();
2006 assert!(endpoint.create_consumer().is_ok());
2007 }
2008
2009 #[test]
2010 fn test_https_endpoint_creates_consumer() {
2011 let component = HttpsComponent::new();
2012 let ctx = NoOpComponentContext;
2013 let endpoint = component
2014 .create_endpoint("https://0.0.0.0:8443/test", &ctx)
2015 .unwrap();
2016 assert!(endpoint.create_consumer().is_ok());
2017 }
2018
2019 #[test]
2020 fn test_http_endpoint_creates_producer() {
2021 let ctx = test_producer_ctx();
2022 let component = HttpComponent::new();
2023 let endpoint_ctx = NoOpComponentContext;
2024 let endpoint = component
2025 .create_endpoint("http://localhost/api", &endpoint_ctx)
2026 .unwrap();
2027 assert!(endpoint.create_producer(&ctx).is_ok());
2028 }
2029
2030 #[tokio::test]
2035 async fn test_producer_with_token_provider() {
2036 use camel_auth::oauth2::TokenProvider;
2037 use tower::ServiceExt;
2038
2039 let captured_auth: Arc<std::sync::Mutex<Option<String>>> =
2040 Arc::new(std::sync::Mutex::new(None));
2041 let captured_clone = Arc::clone(&captured_auth);
2042
2043 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2044 let port = listener.local_addr().unwrap().port();
2045
2046 let _handle = tokio::spawn(async move {
2047 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2048 if let Ok((mut stream, _)) = listener.accept().await {
2049 let mut buf = vec![0u8; 8192];
2050 let n = stream.read(&mut buf).await.unwrap_or(0);
2051 let request = String::from_utf8_lossy(&buf[..n]).to_string();
2052 let auth = request
2053 .lines()
2054 .find(|l| l.to_lowercase().starts_with("authorization:"))
2055 .map(|l| {
2056 l.split(':')
2057 .nth(1)
2058 .map(|s| s.trim().to_string())
2059 .unwrap_or_default()
2060 });
2061 *captured_clone.lock().unwrap() = auth;
2062 let body = r#"{"echo":"ok"}"#;
2063 let resp = format!(
2064 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
2065 body.len(),
2066 body
2067 );
2068 let _ = stream.write_all(resp.as_bytes()).await;
2069 }
2070 });
2071
2072 #[derive(Debug)]
2073 struct StaticProvider;
2074 #[async_trait::async_trait]
2075 impl TokenProvider for StaticProvider {
2076 async fn get_token(&self) -> Result<String, camel_auth::types::AuthError> {
2077 Ok("injected-token".into())
2078 }
2079 }
2080
2081 let uri = format!("http://127.0.0.1:{}/api?allowPrivateIps=true", port);
2082 let ctx = test_producer_ctx();
2083 let component = HttpComponent::new();
2084 let endpoint_ctx = NoOpComponentContext;
2085 let endpoint = component.create_endpoint(&uri, &endpoint_ctx).unwrap();
2086 let producer = endpoint.create_producer(&ctx).unwrap();
2087
2088 let exchange = Exchange::new(Message::new("hello"));
2089
2090 let layer = BearerTokenLayer::new(Arc::new(StaticProvider));
2091 let mut layered = layer.layer(producer);
2092 let result = layered.ready().await.unwrap().call(exchange).await;
2093 assert!(result.is_ok(), "producer call failed: {:?}", result);
2094
2095 tokio::time::sleep(Duration::from_millis(100)).await;
2096 let auth = captured_auth.lock().unwrap().take();
2097 assert_eq!(auth.as_deref(), Some("Bearer injected-token"));
2098 }
2099
2100 async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
2101 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2102 let addr = listener.local_addr().unwrap();
2103 let url = format!("http://127.0.0.1:{}", addr.port());
2104
2105 let handle = tokio::spawn(async move {
2106 loop {
2107 if let Ok((mut stream, _)) = listener.accept().await {
2108 tokio::spawn(async move {
2109 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2110 let mut buf = vec![0u8; 4096];
2111 let n = stream.read(&mut buf).await.unwrap_or(0);
2112 let request = String::from_utf8_lossy(&buf[..n]).to_string();
2113
2114 let method = request.split_whitespace().next().unwrap_or("GET");
2115
2116 let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
2117 let response = format!(
2118 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
2119 body.len(),
2120 body
2121 );
2122 let _ = stream.write_all(response.as_bytes()).await;
2123 });
2124 }
2125 }
2126 });
2127
2128 (url, handle)
2129 }
2130
2131 async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
2132 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2133 let addr = listener.local_addr().unwrap();
2134 let url = format!("http://127.0.0.1:{}", addr.port());
2135
2136 let handle = tokio::spawn(async move {
2137 loop {
2138 if let Ok((mut stream, _)) = listener.accept().await {
2139 let status = status;
2140 tokio::spawn(async move {
2141 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2142 let mut buf = vec![0u8; 4096];
2143 let _ = stream.read(&mut buf).await;
2144
2145 let status_text = match status {
2146 404 => "Not Found",
2147 500 => "Internal Server Error",
2148 _ => "Error",
2149 };
2150 let body = "error body";
2151 let response = format!(
2152 "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
2153 status,
2154 status_text,
2155 body.len(),
2156 body
2157 );
2158 let _ = stream.write_all(response.as_bytes()).await;
2159 });
2160 }
2161 }
2162 });
2163
2164 (url, handle)
2165 }
2166
2167 #[tokio::test]
2168 async fn test_http_producer_get_request() {
2169 use tower::ServiceExt;
2170
2171 let (url, _handle) = start_test_server().await;
2172 let ctx = test_producer_ctx();
2173
2174 let component = HttpComponent::new();
2175 let endpoint_ctx = NoOpComponentContext;
2176 let endpoint = component
2177 .create_endpoint(
2178 &format!("{url}/api/test?allowPrivateIps=true"),
2179 &endpoint_ctx,
2180 )
2181 .unwrap();
2182 let producer = endpoint.create_producer(&ctx).unwrap();
2183
2184 let exchange = Exchange::new(Message::default());
2185 let result = producer.oneshot(exchange).await.unwrap();
2186
2187 let status = result
2188 .input
2189 .header("CamelHttpResponseCode")
2190 .and_then(|v| v.as_u64())
2191 .unwrap();
2192 assert_eq!(status, 200);
2193
2194 assert!(!result.input.body.is_empty());
2195 }
2196
2197 #[tokio::test]
2198 async fn test_http_producer_post_with_body() {
2199 use tower::ServiceExt;
2200
2201 let (url, _handle) = start_test_server().await;
2202 let ctx = test_producer_ctx();
2203
2204 let component = HttpComponent::new();
2205 let endpoint_ctx = NoOpComponentContext;
2206 let endpoint = component
2207 .create_endpoint(
2208 &format!("{url}/api/data?allowPrivateIps=true"),
2209 &endpoint_ctx,
2210 )
2211 .unwrap();
2212 let producer = endpoint.create_producer(&ctx).unwrap();
2213
2214 let exchange = Exchange::new(Message::new("request body"));
2215 let result = producer.oneshot(exchange).await.unwrap();
2216
2217 let status = result
2218 .input
2219 .header("CamelHttpResponseCode")
2220 .and_then(|v| v.as_u64())
2221 .unwrap();
2222 assert_eq!(status, 200);
2223 }
2224
2225 #[tokio::test]
2226 async fn test_http_producer_method_from_header() {
2227 use tower::ServiceExt;
2228
2229 let (url, _handle) = start_test_server().await;
2230 let ctx = test_producer_ctx();
2231
2232 let component = HttpComponent::new();
2233 let endpoint_ctx = NoOpComponentContext;
2234 let endpoint = component
2235 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2236 .unwrap();
2237 let producer = endpoint.create_producer(&ctx).unwrap();
2238
2239 let mut exchange = Exchange::new(Message::default());
2240 exchange.input.set_header(
2241 "CamelHttpMethod",
2242 serde_json::Value::String("DELETE".to_string()),
2243 );
2244
2245 let result = producer.oneshot(exchange).await.unwrap();
2246 let status = result
2247 .input
2248 .header("CamelHttpResponseCode")
2249 .and_then(|v| v.as_u64())
2250 .unwrap();
2251 assert_eq!(status, 200);
2252 }
2253
2254 #[tokio::test]
2255 async fn test_http_producer_forced_method() {
2256 use tower::ServiceExt;
2257
2258 let (url, _handle) = start_test_server().await;
2259 let ctx = test_producer_ctx();
2260
2261 let component = HttpComponent::new();
2262 let endpoint_ctx = NoOpComponentContext;
2263 let endpoint = component
2264 .create_endpoint(
2265 &format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"),
2266 &endpoint_ctx,
2267 )
2268 .unwrap();
2269 let producer = endpoint.create_producer(&ctx).unwrap();
2270
2271 let exchange = Exchange::new(Message::default());
2272 let result = producer.oneshot(exchange).await.unwrap();
2273
2274 let status = result
2275 .input
2276 .header("CamelHttpResponseCode")
2277 .and_then(|v| v.as_u64())
2278 .unwrap();
2279 assert_eq!(status, 200);
2280 }
2281
2282 #[tokio::test]
2283 async fn test_http_producer_throw_exception_on_failure() {
2284 use tower::ServiceExt;
2285
2286 let (url, _handle) = start_status_server(404).await;
2287 let ctx = test_producer_ctx();
2288
2289 let component = HttpComponent::new();
2290 let endpoint_ctx = NoOpComponentContext;
2291 let endpoint = component
2292 .create_endpoint(
2293 &format!("{url}/not-found?allowPrivateIps=true"),
2294 &endpoint_ctx,
2295 )
2296 .unwrap();
2297 let producer = endpoint.create_producer(&ctx).unwrap();
2298
2299 let exchange = Exchange::new(Message::default());
2300 let result = producer.oneshot(exchange).await;
2301 assert!(result.is_err());
2302
2303 match result.unwrap_err() {
2304 CamelError::HttpOperationFailed { status_code, .. } => {
2305 assert_eq!(status_code, 404);
2306 }
2307 e => panic!("Expected HttpOperationFailed, got: {e}"),
2308 }
2309 }
2310
2311 #[tokio::test]
2312 async fn test_http_producer_no_throw_on_failure() {
2313 use tower::ServiceExt;
2314
2315 let (url, _handle) = start_status_server(500).await;
2316 let ctx = test_producer_ctx();
2317
2318 let component = HttpComponent::new();
2319 let endpoint_ctx = NoOpComponentContext;
2320 let endpoint = component
2321 .create_endpoint(
2322 &format!("{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"),
2323 &endpoint_ctx,
2324 )
2325 .unwrap();
2326 let producer = endpoint.create_producer(&ctx).unwrap();
2327
2328 let exchange = Exchange::new(Message::default());
2329 let result = producer.oneshot(exchange).await.unwrap();
2330
2331 let status = result
2332 .input
2333 .header("CamelHttpResponseCode")
2334 .and_then(|v| v.as_u64())
2335 .unwrap();
2336 assert_eq!(status, 500);
2337 }
2338
2339 #[tokio::test]
2340 async fn test_http_producer_uri_override() {
2341 use tower::ServiceExt;
2342
2343 let (url, _handle) = start_test_server().await;
2344 let ctx = test_producer_ctx();
2345
2346 let component = HttpComponent::new();
2347 let endpoint_ctx = NoOpComponentContext;
2348 let endpoint = component
2349 .create_endpoint(
2350 "http://localhost:1/does-not-exist?allowPrivateIps=true",
2351 &endpoint_ctx,
2352 )
2353 .unwrap();
2354 let producer = endpoint.create_producer(&ctx).unwrap();
2355
2356 let mut exchange = Exchange::new(Message::default());
2357 exchange.input.set_header(
2358 "CamelHttpUri",
2359 serde_json::Value::String(format!("{url}/api")),
2360 );
2361
2362 let result = producer.oneshot(exchange).await.unwrap();
2363 let status = result
2364 .input
2365 .header("CamelHttpResponseCode")
2366 .and_then(|v| v.as_u64())
2367 .unwrap();
2368 assert_eq!(status, 200);
2369 }
2370
2371 #[tokio::test]
2372 async fn test_http_producer_response_headers_mapped() {
2373 use tower::ServiceExt;
2374
2375 let (url, _handle) = start_test_server().await;
2376 let ctx = test_producer_ctx();
2377
2378 let component = HttpComponent::new();
2379 let endpoint_ctx = NoOpComponentContext;
2380 let endpoint = component
2381 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2382 .unwrap();
2383 let producer = endpoint.create_producer(&ctx).unwrap();
2384
2385 let exchange = Exchange::new(Message::default());
2386 let result = producer.oneshot(exchange).await.unwrap();
2387
2388 assert!(
2389 result.input.header("Content-Type").is_some(),
2390 "Response should have Content-Type header"
2391 );
2392 assert!(result.input.header("CamelHttpResponseText").is_some());
2393 }
2394
2395 async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
2400 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2401 let addr = listener.local_addr().unwrap();
2402 let url = format!("http://127.0.0.1:{}", addr.port());
2403
2404 let handle = tokio::spawn(async move {
2405 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2406 loop {
2407 if let Ok((mut stream, _)) = listener.accept().await {
2408 tokio::spawn(async move {
2409 let mut buf = vec![0u8; 4096];
2410 let n = stream.read(&mut buf).await.unwrap_or(0);
2411 let request = String::from_utf8_lossy(&buf[..n]).to_string();
2412
2413 if request.contains("GET /final") {
2415 let body = r#"{"status":"final"}"#;
2416 let response = format!(
2417 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
2418 body.len(),
2419 body
2420 );
2421 let _ = stream.write_all(response.as_bytes()).await;
2422 } else {
2423 let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
2425 let _ = stream.write_all(response.as_bytes()).await;
2426 }
2427 });
2428 }
2429 }
2430 });
2431
2432 (url, handle)
2433 }
2434
2435 #[tokio::test]
2436 async fn test_follow_redirects_false_does_not_follow() {
2437 use tower::ServiceExt;
2438
2439 let (url, _handle) = start_redirect_server().await;
2440 let ctx = test_producer_ctx();
2441
2442 let component =
2443 HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
2444 let endpoint_ctx = NoOpComponentContext;
2445 let endpoint = component
2446 .create_endpoint(
2447 &format!("{url}?throwExceptionOnFailure=false&allowPrivateIps=true"),
2448 &endpoint_ctx,
2449 )
2450 .unwrap();
2451 let producer = endpoint.create_producer(&ctx).unwrap();
2452
2453 let exchange = Exchange::new(Message::default());
2454 let result = producer.oneshot(exchange).await.unwrap();
2455
2456 let status = result
2458 .input
2459 .header("CamelHttpResponseCode")
2460 .and_then(|v| v.as_u64())
2461 .unwrap();
2462 assert_eq!(
2463 status, 302,
2464 "Should NOT follow redirect when followRedirects=false"
2465 );
2466 }
2467
2468 #[tokio::test]
2469 async fn test_follow_redirects_true_follows_redirect() {
2470 use tower::ServiceExt;
2471
2472 let (url, _handle) = start_redirect_server().await;
2473 let ctx = test_producer_ctx();
2474
2475 let component =
2476 HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
2477 let endpoint_ctx = NoOpComponentContext;
2478 let endpoint = component
2479 .create_endpoint(&format!("{url}?allowPrivateIps=true"), &endpoint_ctx)
2480 .unwrap();
2481 let producer = endpoint.create_producer(&ctx).unwrap();
2482
2483 let exchange = Exchange::new(Message::default());
2484 let result = producer.oneshot(exchange).await.unwrap();
2485
2486 let status = result
2488 .input
2489 .header("CamelHttpResponseCode")
2490 .and_then(|v| v.as_u64())
2491 .unwrap();
2492 assert_eq!(
2493 status, 200,
2494 "Should follow redirect when followRedirects=true"
2495 );
2496 }
2497
2498 #[tokio::test]
2499 async fn test_query_params_forwarded_to_http_request() {
2500 use tower::ServiceExt;
2501
2502 let (url, _handle) = start_test_server().await;
2503 let ctx = test_producer_ctx();
2504
2505 let component = HttpComponent::new();
2506 let endpoint_ctx = NoOpComponentContext;
2507 let endpoint = component
2509 .create_endpoint(
2510 &format!("{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"),
2511 &endpoint_ctx,
2512 )
2513 .unwrap();
2514 let producer = endpoint.create_producer(&ctx).unwrap();
2515
2516 let exchange = Exchange::new(Message::default());
2517 let result = producer.oneshot(exchange).await.unwrap();
2518
2519 let status = result
2522 .input
2523 .header("CamelHttpResponseCode")
2524 .and_then(|v| v.as_u64())
2525 .unwrap();
2526 assert_eq!(status, 200);
2527 }
2528
2529 #[tokio::test]
2530 async fn test_non_camel_query_params_are_forwarded() {
2531 let config = HttpEndpointConfig::from_uri(
2534 "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
2535 )
2536 .unwrap();
2537
2538 assert!(
2540 config.query_params.contains_key("apiKey"),
2541 "apiKey should be preserved"
2542 );
2543 assert!(
2544 config.query_params.contains_key("token"),
2545 "token should be preserved"
2546 );
2547 assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
2548 assert_eq!(config.query_params.get("token").unwrap(), "abc456");
2549
2550 assert!(
2552 !config.query_params.contains_key("httpMethod"),
2553 "httpMethod should not be forwarded"
2554 );
2555 }
2556
2557 #[test]
2558 fn test_query_params_are_url_encoded_when_resolving_url() {
2559 let config =
2560 HttpEndpointConfig::from_uri("http://example.com/api?q=hello world&tag=a+b").unwrap();
2561 let exchange = Exchange::new(Message::default());
2562
2563 let url = HttpProducer::resolve_url(&exchange, &config);
2564
2565 assert!(url.contains("q=hello+world"), "url was: {url}");
2566 assert!(url.contains("tag=a%2Bb"), "url was: {url}");
2567 }
2568
2569 async fn start_slow_server(delay_ms: u64) -> (String, tokio::task::JoinHandle<()>) {
2574 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2575 let addr = listener.local_addr().unwrap();
2576 let url = format!("http://127.0.0.1:{}", addr.port());
2577
2578 let handle = tokio::spawn(async move {
2579 loop {
2580 if let Ok((mut stream, _)) = listener.accept().await {
2581 let delay = delay_ms;
2582 tokio::spawn(async move {
2583 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2584 let mut buf = vec![0u8; 4096];
2585 let _ = stream.read(&mut buf).await;
2586 let headers = "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nTransfer-Encoding: chunked\r\n\r\n";
2588 let _ = stream.write_all(headers.as_bytes()).await;
2589 tokio::time::sleep(Duration::from_millis(delay)).await;
2591 let body = r#"{"status":"slow"}"#;
2592 let chunk = format!("{:x}\r\n{}\r\n0\r\n\r\n", body.len(), body);
2593 let _ = stream.write_all(chunk.as_bytes()).await;
2594 });
2595 }
2596 }
2597 });
2598
2599 (url, handle)
2600 }
2601
2602 #[tokio::test]
2603 async fn test_http_producer_timeout() {
2604 use tower::ServiceExt;
2605
2606 let (url, _handle) = start_slow_server(500).await;
2608 let ctx = test_producer_ctx();
2609
2610 let component = HttpComponent::with_config(
2611 HttpConfig::default()
2612 .with_read_timeout_ms(100)
2613 .with_response_timeout_ms(30_000), );
2615 let endpoint_ctx = NoOpComponentContext;
2616 let endpoint = component
2617 .create_endpoint(&format!("{url}/slow?allowPrivateIps=true"), &endpoint_ctx)
2618 .unwrap();
2619 let producer = endpoint.create_producer(&ctx).unwrap();
2620
2621 let exchange = Exchange::new(Message::default());
2622 let result = producer.oneshot(exchange).await;
2623
2624 assert!(result.is_err(), "Expected timeout error, got: {:?}", result);
2625 let err = result.unwrap_err().to_string();
2626 assert!(
2627 err.contains("Read timeout") || err.contains("timeout"),
2628 "Error should mention timeout, got: {}",
2629 err
2630 );
2631 }
2632
2633 #[tokio::test]
2634 async fn test_http_producer_no_timeout_when_fast() {
2635 use tower::ServiceExt;
2636
2637 let (url, _handle) = start_test_server().await;
2638 let ctx = test_producer_ctx();
2639
2640 let component =
2641 HttpComponent::with_config(HttpConfig::default().with_read_timeout_ms(5_000));
2642 let endpoint_ctx = NoOpComponentContext;
2643 let endpoint = component
2644 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2645 .unwrap();
2646 let producer = endpoint.create_producer(&ctx).unwrap();
2647
2648 let exchange = Exchange::new(Message::default());
2649 let result = producer.oneshot(exchange).await.unwrap();
2650
2651 let status = result
2652 .input
2653 .header("CamelHttpResponseCode")
2654 .and_then(|v| v.as_u64())
2655 .unwrap();
2656 assert_eq!(status, 200);
2657 }
2658
2659 #[tokio::test]
2664 async fn test_http_producer_blocks_metadata_endpoint() {
2665 use tower::ServiceExt;
2666
2667 let ctx = test_producer_ctx();
2668 let component = HttpComponent::new();
2669 let endpoint_ctx = NoOpComponentContext;
2670 let endpoint = component
2671 .create_endpoint(
2672 "http://example.com/api?allowPrivateIps=false",
2673 &endpoint_ctx,
2674 )
2675 .unwrap();
2676 let producer = endpoint.create_producer(&ctx).unwrap();
2677
2678 let mut exchange = Exchange::new(Message::default());
2679 exchange.input.set_header(
2680 "CamelHttpUri",
2681 serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
2682 );
2683
2684 let result = producer.oneshot(exchange).await;
2685 assert!(result.is_err(), "Should block AWS metadata endpoint");
2686
2687 let err = result.unwrap_err();
2688 assert!(
2689 err.to_string().contains("Private IP"),
2690 "Error should mention private IP blocking, got: {}",
2691 err
2692 );
2693 }
2694
2695 #[test]
2696 fn test_ssrf_config_defaults() {
2697 let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
2698 assert!(
2699 !config.allow_private_ips,
2700 "Private IPs should be blocked by default"
2701 );
2702 assert!(
2703 config.blocked_hosts.is_empty(),
2704 "Blocked hosts should be empty by default"
2705 );
2706 }
2707
2708 #[test]
2709 fn test_ssrf_config_allow_private_ips() {
2710 let config =
2711 HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
2712 assert!(
2713 config.allow_private_ips,
2714 "Private IPs should be allowed when explicitly set"
2715 );
2716 }
2717
2718 #[test]
2719 fn test_ssrf_config_blocked_hosts() {
2720 let config = HttpEndpointConfig::from_uri(
2721 "http://example.com/api?blockedHosts=evil.com,malware.net",
2722 )
2723 .unwrap();
2724 assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
2725 }
2726
2727 #[tokio::test]
2728 async fn test_http_producer_blocks_localhost() {
2729 use tower::ServiceExt;
2730
2731 let ctx = test_producer_ctx();
2732 let component = HttpComponent::new();
2733 let endpoint_ctx = NoOpComponentContext;
2734 let endpoint = component
2735 .create_endpoint("http://example.com/api", &endpoint_ctx)
2736 .unwrap();
2737 let producer = endpoint.create_producer(&ctx).unwrap();
2738
2739 let mut exchange = Exchange::new(Message::default());
2740 exchange.input.set_header(
2741 "CamelHttpUri",
2742 serde_json::Value::String("http://localhost:8080/internal".to_string()),
2743 );
2744
2745 let result = producer.oneshot(exchange).await;
2746 assert!(result.is_err(), "Should block localhost");
2747 }
2748
2749 #[tokio::test]
2750 async fn test_http_producer_blocks_loopback_ip() {
2751 use tower::ServiceExt;
2752
2753 let ctx = test_producer_ctx();
2754 let component = HttpComponent::new();
2755 let endpoint_ctx = NoOpComponentContext;
2756 let endpoint = component
2757 .create_endpoint("http://example.com/api", &endpoint_ctx)
2758 .unwrap();
2759 let producer = endpoint.create_producer(&ctx).unwrap();
2760
2761 let mut exchange = Exchange::new(Message::default());
2762 exchange.input.set_header(
2763 "CamelHttpUri",
2764 serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
2765 );
2766
2767 let result = producer.oneshot(exchange).await;
2768 assert!(result.is_err(), "Should block loopback IP");
2769 }
2770
2771 #[tokio::test]
2772 async fn test_http_producer_allows_private_ip_when_enabled() {
2773 use tower::ServiceExt;
2774
2775 let ctx = test_producer_ctx();
2776 let component = HttpComponent::new();
2777 let endpoint_ctx = NoOpComponentContext;
2778 let endpoint = component
2781 .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true", &endpoint_ctx)
2782 .unwrap();
2783 let producer = endpoint.create_producer(&ctx).unwrap();
2784
2785 let exchange = Exchange::new(Message::default());
2786
2787 let result = producer.oneshot(exchange).await;
2790 if let Err(ref e) = result {
2792 let err_str = e.to_string();
2793 assert!(
2794 !err_str.contains("Private IP") && !err_str.contains("not allowed"),
2795 "Should not be SSRF error, got: {}",
2796 err_str
2797 );
2798 }
2799 }
2800
2801 #[test]
2806 fn test_http_server_config_parse() {
2807 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
2808 assert_eq!(cfg.host, "0.0.0.0");
2809 assert_eq!(cfg.port, 8080);
2810 assert_eq!(cfg.path, "/orders");
2811 assert_eq!(cfg.max_inflight_requests, 1024);
2812 }
2813
2814 #[test]
2815 fn test_http_server_config_scheme() {
2816 assert_eq!(HttpServerConfig::scheme(), "http");
2818 }
2819
2820 #[test]
2821 fn test_http_server_config_from_components() {
2822 let components = camel_component_api::UriComponents {
2824 scheme: "https".to_string(),
2825 path: "//0.0.0.0:8443/api".to_string(),
2826 params: std::collections::HashMap::from([
2827 ("maxRequestBody".to_string(), "5242880".to_string()),
2828 ("maxInflightRequests".to_string(), "7".to_string()),
2829 ]),
2830 };
2831 let cfg = HttpServerConfig::from_components(components).unwrap();
2832 assert_eq!(cfg.host, "0.0.0.0");
2833 assert_eq!(cfg.port, 8443);
2834 assert_eq!(cfg.path, "/api");
2835 assert_eq!(cfg.max_request_body, 5242880);
2836 assert_eq!(cfg.max_inflight_requests, 7);
2837 }
2838
2839 #[test]
2840 fn test_http_server_config_default_path() {
2841 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
2842 assert_eq!(cfg.path, "/");
2843 }
2844
2845 #[test]
2846 fn test_http_server_config_wrong_scheme() {
2847 assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
2848 }
2849
2850 #[test]
2851 fn test_http_server_config_invalid_port() {
2852 assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
2853 }
2854
2855 #[test]
2856 fn test_http_server_config_default_port_by_scheme() {
2857 let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
2859 assert_eq!(cfg_http.port, 80);
2860
2861 let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
2863 assert_eq!(cfg_https.port, 443);
2864 }
2865
2866 #[test]
2867 fn test_request_envelope_and_reply_are_send() {
2868 fn assert_send<T: Send>() {}
2869 assert_send::<RequestEnvelope>();
2870 assert_send::<HttpReply>();
2871 }
2872
2873 #[test]
2878 fn test_server_registry_global_is_singleton() {
2879 let r1 = ServerRegistry::global();
2880 let r2 = ServerRegistry::global();
2881 assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
2882 }
2883
2884 #[allow(clippy::await_holding_lock)]
2885 #[tokio::test]
2886 async fn test_concurrent_get_or_spawn_returns_same_registry() {
2887 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2888 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2889 let port = listener.local_addr().unwrap().port();
2890 drop(listener);
2891
2892 let results: Arc<std::sync::Mutex<Vec<HttpRouteRegistry>>> =
2893 Arc::new(std::sync::Mutex::new(Vec::new()));
2894
2895 let mut handles = Vec::new();
2896 for _ in 0..4 {
2897 let results = results.clone();
2898 handles.push(tokio::spawn(async move {
2899 let registry = ServerRegistry::global()
2900 .get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2901 .await
2902 .unwrap();
2903 results.lock().unwrap().push(registry);
2904 }));
2905 }
2906
2907 for h in handles {
2908 h.await.unwrap();
2909 }
2910
2911 let registries = results.lock().unwrap();
2912 assert_eq!(registries.len(), 4);
2913 for i in 1..registries.len() {
2914 assert!(
2915 Arc::ptr_eq(®istries[0].inner, ®istries[i].inner),
2916 "all concurrent callers should get same route registry"
2917 );
2918 }
2919 }
2920
2921 #[test]
2922 fn test_server_registry_distinguishes_host_and_port() {
2923 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2924 let rt = tokio::runtime::Runtime::new().expect("runtime");
2925 rt.block_on(async {
2926 let registry = ServerRegistry::global();
2927 let d1 = registry
2931 .get_or_spawn("127.0.0.1", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2932 .await;
2933 let d2 = registry
2934 .get_or_spawn("0.0.0.0", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2935 .await;
2936 assert!(d1.is_ok());
2937 assert!(d2.is_ok());
2938 assert!(!Arc::ptr_eq(&d1.unwrap().inner, &d2.unwrap().inner));
2939 });
2940 }
2941
2942 #[allow(clippy::await_holding_lock)]
2943 #[tokio::test]
2944 async fn test_shared_server_max_request_body_policy_is_deterministic() {
2945 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2946 let registry = ServerRegistry::global();
2947 let d1 = registry
2949 .get_or_spawn("127.0.0.1", 9991, 1024 * 1024, 10 * 1024 * 1024, 1024)
2950 .await;
2951 assert!(d1.is_ok());
2952
2953 let d2 = registry
2956 .get_or_spawn("127.0.0.1", 9991, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2957 .await;
2958 assert!(d2.is_err());
2959 let err = d2.unwrap_err();
2960 assert!(
2961 err.to_string().contains("maxRequestBody") || err.to_string().contains("incompatible"),
2962 "Expected incompatible maxRequestBody error, got: {}",
2963 err
2964 );
2965 }
2966
2967 #[test]
2968 fn test_server_registry_reset_clears_entries() {
2969 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2970 let rt = tokio::runtime::Runtime::new().expect("runtime");
2971 rt.block_on(async {
2972 let d1 = ServerRegistry::global()
2974 .get_or_spawn("127.0.0.1", 9992, 1024 * 1024, 10 * 1024 * 1024, 1024)
2975 .await;
2976 assert!(d1.is_ok());
2977
2978 let guard = ServerRegistry::global().inner.lock().expect("lock");
2980 assert!(guard.contains_key(&("127.0.0.1".to_string(), 9992)));
2981 drop(guard);
2982
2983 ServerRegistry::reset();
2985
2986 let guard = ServerRegistry::global().inner.lock().expect("lock");
2988 assert!(
2989 guard.is_empty(),
2990 "registry should be empty after reset, has {} entries",
2991 guard.len()
2992 );
2993 });
2994 }
2995
2996 #[tokio::test]
3001 async fn test_dispatch_handler_returns_404_for_unknown_path() {
3002 let registry = HttpRouteRegistry::new();
3003 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3005 let port = listener.local_addr().unwrap().port();
3006 tokio::spawn(run_axum_server(
3007 listener,
3008 registry,
3009 2 * 1024 * 1024,
3010 10 * 1024 * 1024,
3011 Arc::new(tokio::sync::Semaphore::new(1024)),
3012 ));
3013
3014 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3016
3017 let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
3018 .await
3019 .unwrap();
3020 assert_eq!(resp.status().as_u16(), 404);
3021 }
3022
3023 #[tokio::test]
3028 async fn test_http_consumer_start_registers_path() {
3029 use camel_component_api::ConsumerContext;
3030
3031 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3033 let port = listener.local_addr().unwrap().port();
3034 drop(listener); let consumer_cfg = HttpServerConfig {
3037 host: "127.0.0.1".to_string(),
3038 port,
3039 path: "/ping".to_string(),
3040 max_request_body: 2 * 1024 * 1024,
3041 max_response_body: 10 * 1024 * 1024,
3042 max_inflight_requests: 1024,
3043 };
3044 let mut consumer = HttpConsumer::new(consumer_cfg);
3045
3046 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3047 let token = tokio_util::sync::CancellationToken::new();
3048 let ctx = ConsumerContext::new(tx, token.clone());
3049
3050 tokio::spawn(async move {
3051 consumer.start(ctx).await.unwrap();
3052 });
3053
3054 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3055
3056 let client = reqwest::Client::new();
3057 let resp_future = client
3058 .post(format!("http://127.0.0.1:{port}/ping"))
3059 .body("hello world")
3060 .send();
3061
3062 let (http_result, _) = tokio::join!(resp_future, async {
3063 if let Some(mut envelope) = rx.recv().await {
3064 envelope.exchange.input.set_header(
3066 "CamelHttpResponseCode",
3067 serde_json::Value::Number(201.into()),
3068 );
3069 if let Some(reply_tx) = envelope.reply_tx {
3070 let _ = reply_tx.send(Ok(envelope.exchange));
3071 }
3072 }
3073 });
3074
3075 let resp = http_result.unwrap();
3076 assert_eq!(resp.status().as_u16(), 201);
3077
3078 token.cancel();
3079 }
3080
3081 #[tokio::test]
3082 async fn test_http_consumer_returns_503_when_inflight_limit_reached() {
3083 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3084
3085 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3086 let port = listener.local_addr().unwrap().port();
3087 drop(listener);
3088
3089 let consumer_cfg = HttpServerConfig {
3090 host: "127.0.0.1".to_string(),
3091 port,
3092 path: "/saturation".to_string(),
3093 max_request_body: 2 * 1024 * 1024,
3094 max_response_body: 10 * 1024 * 1024,
3095 max_inflight_requests: 1,
3096 };
3097 let mut consumer = HttpConsumer::new(consumer_cfg);
3098
3099 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3100 let token = tokio_util::sync::CancellationToken::new();
3101 let ctx = ConsumerContext::new(tx, token.clone());
3102 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3103 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3104
3105 let (first_seen_tx, first_seen_rx) = tokio::sync::oneshot::channel::<()>();
3106 let (unblock_first_tx, unblock_first_rx) = tokio::sync::oneshot::channel::<()>();
3107
3108 tokio::spawn(async move {
3109 let mut first_seen_tx = Some(first_seen_tx);
3110 let mut unblock_first_rx = Some(unblock_first_rx);
3111
3112 while let Some(envelope) = rx.recv().await {
3113 if let Some(tx) = first_seen_tx.take() {
3114 let _ = tx.send(());
3115 if let Some(rx_unblock) = unblock_first_rx.take() {
3116 let _ = rx_unblock.await;
3117 }
3118 }
3119
3120 if let Some(reply_tx) = envelope.reply_tx {
3121 let _ = reply_tx.send(Ok(envelope.exchange));
3122 }
3123 }
3124 });
3125
3126 let client = reqwest::Client::new();
3127 let first_req = {
3128 let client = client.clone();
3129 async move {
3130 client
3131 .get(format!("http://127.0.0.1:{port}/saturation"))
3132 .send()
3133 .await
3134 .unwrap()
3135 }
3136 };
3137
3138 let first_handle = tokio::spawn(first_req);
3139 first_seen_rx.await.unwrap();
3140
3141 let second_resp = client
3142 .get(format!("http://127.0.0.1:{port}/saturation"))
3143 .send()
3144 .await
3145 .unwrap();
3146
3147 assert_eq!(second_resp.status().as_u16(), 503);
3148
3149 let _ = unblock_first_tx.send(());
3150 let first_resp = first_handle.await.unwrap();
3151 assert_eq!(first_resp.status().as_u16(), 200);
3152
3153 token.cancel();
3154 }
3155
3156 #[tokio::test]
3157 async fn test_http_consumer_enforces_max_response_body_for_bytes() {
3158 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3159
3160 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3161
3162 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3163 let port = listener.local_addr().unwrap().port();
3164 drop(listener);
3165
3166 let consumer_cfg = HttpServerConfig {
3167 host: "127.0.0.1".to_string(),
3168 port,
3169 path: "/limit-bytes".to_string(),
3170 max_request_body: 2 * 1024 * 1024,
3171 max_response_body: 16,
3172 max_inflight_requests: 1024,
3173 };
3174 let mut consumer = HttpConsumer::new(consumer_cfg);
3175
3176 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3177 let token = tokio_util::sync::CancellationToken::new();
3178 let ctx = ConsumerContext::new(tx, token.clone());
3179 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3180 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3181
3182 let client = reqwest::Client::new();
3183 let send_fut = client
3184 .get(format!("http://127.0.0.1:{port}/limit-bytes"))
3185 .send();
3186
3187 let (http_result, _) = tokio::join!(send_fut, async {
3188 if let Some(mut envelope) = rx.recv().await {
3189 envelope.exchange.input.body =
3190 camel_component_api::Body::Bytes(bytes::Bytes::from(vec![b'x'; 32]));
3191 if let Some(reply_tx) = envelope.reply_tx {
3192 let _ = reply_tx.send(Ok(envelope.exchange));
3193 }
3194 }
3195 });
3196
3197 let resp = http_result.unwrap();
3198 assert_eq!(resp.status().as_u16(), 500);
3199 let body = resp.text().await.unwrap();
3200 assert_eq!(body, "Response body exceeds configured limit");
3201 token.cancel();
3202 }
3203
3204 #[tokio::test]
3205 async fn test_http_consumer_enforces_max_response_body_for_json() {
3206 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3207
3208 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3209
3210 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3211 let port = listener.local_addr().unwrap().port();
3212 drop(listener);
3213
3214 let consumer_cfg = HttpServerConfig {
3215 host: "127.0.0.1".to_string(),
3216 port,
3217 path: "/limit-json".to_string(),
3218 max_request_body: 2 * 1024 * 1024,
3219 max_response_body: 16,
3220 max_inflight_requests: 1024,
3221 };
3222 let mut consumer = HttpConsumer::new(consumer_cfg);
3223
3224 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3225 let token = tokio_util::sync::CancellationToken::new();
3226 let ctx = ConsumerContext::new(tx, token.clone());
3227 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3228 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3229
3230 let client = reqwest::Client::new();
3231 let send_fut = client
3232 .get(format!("http://127.0.0.1:{port}/limit-json"))
3233 .send();
3234
3235 let (http_result, _) = tokio::join!(send_fut, async {
3236 if let Some(mut envelope) = rx.recv().await {
3237 envelope.exchange.input.body = camel_component_api::Body::Json(
3238 serde_json::json!({"message":"this response is bigger than sixteen"}),
3239 );
3240 if let Some(reply_tx) = envelope.reply_tx {
3241 let _ = reply_tx.send(Ok(envelope.exchange));
3242 }
3243 }
3244 });
3245
3246 let resp = http_result.unwrap();
3247 assert_eq!(resp.status().as_u16(), 500);
3248 let body = resp.text().await.unwrap();
3249 assert_eq!(body, "Response body exceeds configured limit");
3250 token.cancel();
3251 }
3252
3253 #[tokio::test]
3254 async fn test_http_consumer_enforces_max_response_body_for_xml() {
3255 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3256
3257 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3258
3259 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3260 let port = listener.local_addr().unwrap().port();
3261 drop(listener);
3262
3263 let consumer_cfg = HttpServerConfig {
3264 host: "127.0.0.1".to_string(),
3265 port,
3266 path: "/limit-xml".to_string(),
3267 max_request_body: 2 * 1024 * 1024,
3268 max_response_body: 16,
3269 max_inflight_requests: 1024,
3270 };
3271 let mut consumer = HttpConsumer::new(consumer_cfg);
3272
3273 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3274 let token = tokio_util::sync::CancellationToken::new();
3275 let ctx = ConsumerContext::new(tx, token.clone());
3276 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3277 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3278
3279 let client = reqwest::Client::new();
3280 let send_fut = client
3281 .get(format!("http://127.0.0.1:{port}/limit-xml"))
3282 .send();
3283
3284 let (http_result, _) = tokio::join!(send_fut, async {
3285 if let Some(mut envelope) = rx.recv().await {
3286 envelope.exchange.input.body = camel_component_api::Body::Xml(
3287 "<root><value>way-too-large</value></root>".into(),
3288 );
3289 if let Some(reply_tx) = envelope.reply_tx {
3290 let _ = reply_tx.send(Ok(envelope.exchange));
3291 }
3292 }
3293 });
3294
3295 let resp = http_result.unwrap();
3296 assert_eq!(resp.status().as_u16(), 500);
3297 let body = resp.text().await.unwrap();
3298 assert_eq!(body, "Response body exceeds configured limit");
3299 token.cancel();
3300 }
3301
3302 #[tokio::test]
3303 async fn test_http_consumer_does_not_enforce_max_response_body_for_stream() {
3304 use camel_component_api::{
3305 CamelError, ConsumerContext, ExchangeEnvelope, StreamBody, StreamMetadata,
3306 };
3307 use futures::stream;
3308
3309 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3310
3311 let listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();
3312 let port = listener.local_addr().unwrap().port();
3313 drop(listener);
3314
3315 let consumer_cfg = HttpServerConfig {
3316 host: "0.0.0.0".to_string(),
3317 port,
3318 path: "/limit-stream".to_string(),
3319 max_request_body: 2 * 1024 * 1024,
3320 max_response_body: 16,
3321 max_inflight_requests: 1024,
3322 };
3323 let mut consumer = HttpConsumer::new(consumer_cfg);
3324
3325 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3326 let token = tokio_util::sync::CancellationToken::new();
3327 let ctx = ConsumerContext::new(tx, token.clone());
3328 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3329 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3330
3331 let client = reqwest::Client::new();
3332 let send_fut = client
3333 .get(format!("http://127.0.0.1:{port}/limit-stream"))
3334 .send();
3335
3336 let (http_result, _) = tokio::join!(send_fut, async {
3337 if let Some(mut envelope) = rx.recv().await {
3338 let chunks: Vec<Result<bytes::Bytes, CamelError>> =
3339 vec![Ok(bytes::Bytes::from(vec![b'x'; 32]))];
3340 let stream = Box::pin(stream::iter(chunks));
3341 envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
3342 stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
3343 metadata: StreamMetadata {
3344 size_hint: Some(32),
3345 content_type: Some("application/octet-stream".into()),
3346 origin: None,
3347 },
3348 });
3349 if let Some(reply_tx) = envelope.reply_tx {
3350 let _ = reply_tx.send(Ok(envelope.exchange));
3351 }
3352 }
3353 });
3354
3355 let resp = http_result.unwrap();
3356 assert_eq!(resp.status().as_u16(), 200);
3357 let body = resp.bytes().await.unwrap();
3358 assert_eq!(body.len(), 32);
3359 token.cancel();
3360 }
3361
3362 #[tokio::test]
3367 async fn test_integration_single_consumer_round_trip() {
3368 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3369
3370 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3372 let port = listener.local_addr().unwrap().port();
3373 drop(listener); let component = HttpComponent::new();
3376 let endpoint_ctx = NoOpComponentContext;
3377 let endpoint = component
3378 .create_endpoint(&format!("http://127.0.0.1:{port}/echo"), &endpoint_ctx)
3379 .unwrap();
3380 let mut consumer = endpoint.create_consumer().unwrap();
3381
3382 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3383 let token = tokio_util::sync::CancellationToken::new();
3384 let ctx = ConsumerContext::new(tx, token.clone());
3385
3386 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3387 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3388
3389 let client = reqwest::Client::new();
3390 let send_fut = client
3391 .post(format!("http://127.0.0.1:{port}/echo"))
3392 .header("Content-Type", "text/plain")
3393 .body("ping")
3394 .send();
3395
3396 let (http_result, _) = tokio::join!(send_fut, async {
3397 if let Some(mut envelope) = rx.recv().await {
3398 assert_eq!(
3399 envelope.exchange.input.header("CamelHttpMethod"),
3400 Some(&serde_json::Value::String("POST".into()))
3401 );
3402 assert_eq!(
3403 envelope.exchange.input.header("CamelHttpPath"),
3404 Some(&serde_json::Value::String("/echo".into()))
3405 );
3406 envelope.exchange.input.body = camel_component_api::Body::Text("pong".to_string());
3407 if let Some(reply_tx) = envelope.reply_tx {
3408 let _ = reply_tx.send(Ok(envelope.exchange));
3409 }
3410 }
3411 });
3412
3413 let resp = http_result.unwrap();
3414 assert_eq!(resp.status().as_u16(), 200);
3415 let body = resp.text().await.unwrap();
3416 assert_eq!(body, "pong");
3417
3418 token.cancel();
3419 }
3420
3421 #[tokio::test]
3422 async fn test_integration_two_consumers_shared_port() {
3423 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3424
3425 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3426
3427 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3429 let port = listener.local_addr().unwrap().port();
3430 drop(listener);
3431
3432 let component = HttpComponent::new();
3433 let endpoint_ctx = NoOpComponentContext;
3434
3435 let endpoint_a = component
3437 .create_endpoint(&format!("http://127.0.0.1:{port}/hello"), &endpoint_ctx)
3438 .unwrap();
3439 let mut consumer_a = endpoint_a.create_consumer().unwrap();
3440
3441 let endpoint_b = component
3443 .create_endpoint(&format!("http://127.0.0.1:{port}/world"), &endpoint_ctx)
3444 .unwrap();
3445 let mut consumer_b = endpoint_b.create_consumer().unwrap();
3446
3447 let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3448 let token_a = tokio_util::sync::CancellationToken::new();
3449 let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
3450
3451 let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3452 let token_b = tokio_util::sync::CancellationToken::new();
3453 let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
3454
3455 tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
3456 tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
3457 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3458
3459 let client = reqwest::Client::new();
3460
3461 let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
3463 let (resp_hello, _) = tokio::join!(fut_hello, async {
3464 if let Some(mut envelope) = rx_a.recv().await {
3465 envelope.exchange.input.body =
3466 camel_component_api::Body::Text("hello-response".to_string());
3467 if let Some(reply_tx) = envelope.reply_tx {
3468 let _ = reply_tx.send(Ok(envelope.exchange));
3469 }
3470 }
3471 });
3472
3473 let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
3475 let (resp_world, _) = tokio::join!(fut_world, async {
3476 if let Some(mut envelope) = rx_b.recv().await {
3477 envelope.exchange.input.body =
3478 camel_component_api::Body::Text("world-response".to_string());
3479 if let Some(reply_tx) = envelope.reply_tx {
3480 let _ = reply_tx.send(Ok(envelope.exchange));
3481 }
3482 }
3483 });
3484
3485 let body_a = resp_hello.unwrap().text().await.unwrap();
3486 let body_b = resp_world.unwrap().text().await.unwrap();
3487
3488 assert_eq!(body_a, "hello-response");
3489 assert_eq!(body_b, "world-response");
3490
3491 token_a.cancel();
3492 token_b.cancel();
3493 }
3494
3495 #[tokio::test]
3496 async fn test_integration_unregistered_path_returns_404() {
3497 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3498
3499 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3501 let port = listener.local_addr().unwrap().port();
3502 drop(listener);
3503
3504 let component = HttpComponent::new();
3505 let endpoint_ctx = NoOpComponentContext;
3506 let endpoint = component
3507 .create_endpoint(
3508 &format!("http://127.0.0.1:{port}/registered"),
3509 &endpoint_ctx,
3510 )
3511 .unwrap();
3512 let mut consumer = endpoint.create_consumer().unwrap();
3513
3514 let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3515 let token = tokio_util::sync::CancellationToken::new();
3516 let ctx = ConsumerContext::new(tx, token.clone());
3517
3518 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3519
3520 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
3522 loop {
3523 if tokio::net::TcpStream::connect(format!("127.0.0.1:{port}"))
3524 .await
3525 .is_ok()
3526 {
3527 break;
3528 }
3529 if std::time::Instant::now() >= deadline {
3530 panic!("HTTP server did not start within 5s on port {port}");
3531 }
3532 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3533 }
3534
3535 let client = reqwest::Client::new();
3536 let resp = client
3537 .get(format!("http://127.0.0.1:{port}/not-there"))
3538 .send()
3539 .await
3540 .unwrap();
3541 assert_eq!(resp.status().as_u16(), 404);
3542
3543 token.cancel();
3544 }
3545
3546 #[test]
3547 fn test_http_consumer_declares_concurrent() {
3548 use camel_component_api::ConcurrencyModel;
3549
3550 let config = HttpServerConfig {
3551 host: "127.0.0.1".to_string(),
3552 port: 19999,
3553 path: "/test".to_string(),
3554 max_request_body: 2 * 1024 * 1024,
3555 max_response_body: 10 * 1024 * 1024,
3556 max_inflight_requests: 1024,
3557 };
3558 let consumer = HttpConsumer::new(config);
3559 assert_eq!(
3560 consumer.concurrency_model(),
3561 ConcurrencyModel::Concurrent { max: None }
3562 );
3563 }
3564
3565 #[tokio::test]
3570 async fn test_http_reply_body_stream_variant_exists() {
3571 use bytes::Bytes;
3572 use camel_component_api::CamelError;
3573 use futures::stream;
3574
3575 let chunks: Vec<Result<Bytes, CamelError>> =
3576 vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
3577 let stream = Box::pin(stream::iter(chunks));
3578 let reply_body = HttpReplyBody::Stream(stream);
3579 match reply_body {
3581 HttpReplyBody::Stream(_) => {}
3582 HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
3583 }
3584 }
3585
3586 #[cfg(feature = "otel")]
3591 mod otel_tests {
3592 use super::*;
3593 use camel_component_api::Message;
3594 use tower::ServiceExt;
3595
3596 #[tokio::test]
3597 async fn test_producer_injects_traceparent_header() {
3598 let (url, _handle) = start_test_server_with_header_capture().await;
3599 let ctx = test_producer_ctx();
3600
3601 let component = HttpComponent::new();
3602 let endpoint_ctx = NoOpComponentContext;
3603 let endpoint = component
3604 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3605 .unwrap();
3606 let producer = endpoint.create_producer(&ctx).unwrap();
3607
3608 let mut exchange = Exchange::new(Message::default());
3610 let mut headers = std::collections::HashMap::new();
3611 headers.insert(
3612 "traceparent".to_string(),
3613 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
3614 );
3615 camel_otel::extract_into_exchange(&mut exchange, &headers);
3616
3617 let result = producer.oneshot(exchange).await.unwrap();
3618
3619 let status = result
3621 .input
3622 .header("CamelHttpResponseCode")
3623 .and_then(|v| v.as_u64())
3624 .unwrap();
3625 assert_eq!(status, 200);
3626
3627 let traceparent = result.input.header("X-Received-Traceparent");
3629 assert!(
3630 traceparent.is_some(),
3631 "traceparent header should have been sent"
3632 );
3633
3634 let traceparent_str = traceparent.unwrap().as_str().unwrap();
3635 let parts: Vec<&str> = traceparent_str.split('-').collect();
3637 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3638 assert_eq!(parts[0], "00", "version should be 00");
3639 assert_eq!(
3640 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3641 "trace-id should match"
3642 );
3643 assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
3644 assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
3645 }
3646
3647 #[tokio::test]
3648 async fn test_consumer_extracts_traceparent_header() {
3649 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3650
3651 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3653 let port = listener.local_addr().unwrap().port();
3654 drop(listener);
3655
3656 let component = HttpComponent::new();
3657 let endpoint_ctx = NoOpComponentContext;
3658 let endpoint = component
3659 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
3660 .unwrap();
3661 let mut consumer = endpoint.create_consumer().unwrap();
3662
3663 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3664 let token = tokio_util::sync::CancellationToken::new();
3665 let ctx = ConsumerContext::new(tx, token.clone());
3666
3667 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3668 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3669
3670 let client = reqwest::Client::new();
3672 let send_fut = client
3673 .post(format!("http://127.0.0.1:{port}/trace"))
3674 .header(
3675 "traceparent",
3676 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3677 )
3678 .body("test")
3679 .send();
3680
3681 let (http_result, _) = tokio::join!(send_fut, async {
3682 if let Some(envelope) = rx.recv().await {
3683 let mut injected_headers = std::collections::HashMap::new();
3686 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3687
3688 assert!(
3689 injected_headers.contains_key("traceparent"),
3690 "Exchange should have traceparent after extraction"
3691 );
3692
3693 let traceparent = injected_headers.get("traceparent").unwrap();
3694 let parts: Vec<&str> = traceparent.split('-').collect();
3695 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3696 assert_eq!(
3697 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3698 "Trace ID should match the original traceparent header"
3699 );
3700
3701 if let Some(reply_tx) = envelope.reply_tx {
3702 let _ = reply_tx.send(Ok(envelope.exchange));
3703 }
3704 }
3705 });
3706
3707 let resp = http_result.unwrap();
3708 assert_eq!(resp.status().as_u16(), 200);
3709
3710 token.cancel();
3711 }
3712
3713 #[tokio::test]
3714 async fn test_consumer_extracts_mixed_case_traceparent_header() {
3715 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3716
3717 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3719 let port = listener.local_addr().unwrap().port();
3720 drop(listener);
3721
3722 let component = HttpComponent::new();
3723 let endpoint_ctx = NoOpComponentContext;
3724 let endpoint = component
3725 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
3726 .unwrap();
3727 let mut consumer = endpoint.create_consumer().unwrap();
3728
3729 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3730 let token = tokio_util::sync::CancellationToken::new();
3731 let ctx = ConsumerContext::new(tx, token.clone());
3732
3733 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3734 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3735
3736 let client = reqwest::Client::new();
3738 let send_fut = client
3739 .post(format!("http://127.0.0.1:{port}/trace"))
3740 .header(
3741 "TraceParent",
3742 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3743 )
3744 .body("test")
3745 .send();
3746
3747 let (http_result, _) = tokio::join!(send_fut, async {
3748 if let Some(envelope) = rx.recv().await {
3749 let mut injected_headers = HashMap::new();
3752 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3753
3754 assert!(
3755 injected_headers.contains_key("traceparent"),
3756 "Exchange should have traceparent after extraction from mixed-case header"
3757 );
3758
3759 let traceparent = injected_headers.get("traceparent").unwrap();
3760 let parts: Vec<&str> = traceparent.split('-').collect();
3761 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3762 assert_eq!(
3763 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3764 "Trace ID should match the original mixed-case TraceParent header"
3765 );
3766
3767 if let Some(reply_tx) = envelope.reply_tx {
3768 let _ = reply_tx.send(Ok(envelope.exchange));
3769 }
3770 }
3771 });
3772
3773 let resp = http_result.unwrap();
3774 assert_eq!(resp.status().as_u16(), 200);
3775
3776 token.cancel();
3777 }
3778
3779 #[tokio::test]
3780 async fn test_producer_no_trace_context_no_crash() {
3781 let (url, _handle) = start_test_server().await;
3782 let ctx = test_producer_ctx();
3783
3784 let component = HttpComponent::new();
3785 let endpoint_ctx = NoOpComponentContext;
3786 let endpoint = component
3787 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3788 .unwrap();
3789 let producer = endpoint.create_producer(&ctx).unwrap();
3790
3791 let exchange = Exchange::new(Message::default());
3793
3794 let result = producer.oneshot(exchange).await.unwrap();
3796
3797 let status = result
3799 .input
3800 .header("CamelHttpResponseCode")
3801 .and_then(|v| v.as_u64())
3802 .unwrap();
3803 assert_eq!(status, 200);
3804 }
3805
3806 async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
3808 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3809 let addr = listener.local_addr().unwrap();
3810 let url = format!("http://127.0.0.1:{}", addr.port());
3811
3812 let handle = tokio::spawn(async move {
3813 loop {
3814 if let Ok((mut stream, _)) = listener.accept().await {
3815 tokio::spawn(async move {
3816 use tokio::io::{AsyncReadExt, AsyncWriteExt};
3817 let mut buf = vec![0u8; 8192];
3818 let n = stream.read(&mut buf).await.unwrap_or(0);
3819 let request = String::from_utf8_lossy(&buf[..n]).to_string();
3820
3821 let traceparent = request
3823 .lines()
3824 .find(|line| line.to_lowercase().starts_with("traceparent:"))
3825 .map(|line| {
3826 line.split(':')
3827 .nth(1)
3828 .map(|s| s.trim().to_string())
3829 .unwrap_or_default()
3830 })
3831 .unwrap_or_default();
3832
3833 let body =
3834 format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
3835 let response = format!(
3836 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
3837 body.len(),
3838 traceparent,
3839 body
3840 );
3841 let _ = stream.write_all(response.as_bytes()).await;
3842 });
3843 }
3844 }
3845 });
3846
3847 (url, handle)
3848 }
3849 }
3850
3851 #[tokio::test]
3860 async fn test_request_body_arrives_as_stream() {
3861 use camel_component_api::Body;
3862 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3863
3864 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3865 let port = listener.local_addr().unwrap().port();
3866 drop(listener);
3867
3868 let component = HttpComponent::new();
3869 let endpoint_ctx = NoOpComponentContext;
3870 let endpoint = component
3871 .create_endpoint(&format!("http://127.0.0.1:{port}/upload"), &endpoint_ctx)
3872 .unwrap();
3873 let mut consumer = endpoint.create_consumer().unwrap();
3874
3875 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3876 let token = tokio_util::sync::CancellationToken::new();
3877 let ctx = ConsumerContext::new(tx, token.clone());
3878
3879 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3880 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3881
3882 let client = reqwest::Client::new();
3883 let send_fut = client
3884 .post(format!("http://127.0.0.1:{port}/upload"))
3885 .body("hello streaming world")
3886 .send();
3887
3888 let (http_result, _) = tokio::join!(send_fut, async {
3889 if let Some(mut envelope) = rx.recv().await {
3890 assert!(
3892 matches!(envelope.exchange.input.body, Body::Stream(_)),
3893 "expected Body::Stream, got discriminant {:?}",
3894 std::mem::discriminant(&envelope.exchange.input.body)
3895 );
3896 let bytes = envelope
3898 .exchange
3899 .input
3900 .body
3901 .into_bytes(1024 * 1024)
3902 .await
3903 .unwrap();
3904 assert_eq!(&bytes[..], b"hello streaming world");
3905
3906 envelope.exchange.input.body = camel_component_api::Body::Empty;
3907 if let Some(reply_tx) = envelope.reply_tx {
3908 let _ = reply_tx.send(Ok(envelope.exchange));
3909 }
3910 }
3911 });
3912
3913 let resp = http_result.unwrap();
3914 assert_eq!(resp.status().as_u16(), 200);
3915
3916 token.cancel();
3917 }
3918
3919 #[tokio::test]
3924 async fn test_streaming_response_chunked() {
3925 use bytes::Bytes;
3926 use camel_component_api::Body;
3927 use camel_component_api::CamelError;
3928 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3929 use camel_component_api::{StreamBody, StreamMetadata};
3930 use futures::stream;
3931 use std::sync::Arc;
3932 use tokio::sync::Mutex;
3933
3934 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3935 let port = listener.local_addr().unwrap().port();
3936 drop(listener);
3937
3938 let component = HttpComponent::new();
3939 let endpoint_ctx = NoOpComponentContext;
3940 let endpoint = component
3941 .create_endpoint(&format!("http://127.0.0.1:{port}/stream"), &endpoint_ctx)
3942 .unwrap();
3943 let mut consumer = endpoint.create_consumer().unwrap();
3944
3945 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3946 let token = tokio_util::sync::CancellationToken::new();
3947 let ctx = ConsumerContext::new(tx, token.clone());
3948
3949 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3950 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3951
3952 let client = reqwest::Client::new();
3953 let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
3954
3955 let (http_result, _) = tokio::join!(send_fut, async {
3956 if let Some(mut envelope) = rx.recv().await {
3957 let chunks: Vec<Result<Bytes, CamelError>> =
3959 vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
3960 let stream = Box::pin(stream::iter(chunks));
3961 envelope.exchange.input.body = Body::Stream(StreamBody {
3962 stream: Arc::new(Mutex::new(Some(stream))),
3963 metadata: StreamMetadata::default(),
3964 });
3965 if let Some(reply_tx) = envelope.reply_tx {
3966 let _ = reply_tx.send(Ok(envelope.exchange));
3967 }
3968 }
3969 });
3970
3971 let resp = http_result.unwrap();
3972 assert_eq!(resp.status().as_u16(), 200);
3973 let body = resp.text().await.unwrap();
3974 assert_eq!(body, "chunk1chunk2");
3975
3976 token.cancel();
3977 }
3978
3979 #[tokio::test]
3984 async fn test_413_when_content_length_exceeds_limit() {
3985 use camel_component_api::ConsumerContext;
3986
3987 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3988 let port = listener.local_addr().unwrap().port();
3989 drop(listener);
3990
3991 let component = HttpComponent::new();
3993 let endpoint_ctx = NoOpComponentContext;
3994 let endpoint = component
3995 .create_endpoint(
3996 &format!("http://127.0.0.1:{port}/upload?maxRequestBody=100"),
3997 &endpoint_ctx,
3998 )
3999 .unwrap();
4000 let mut consumer = endpoint.create_consumer().unwrap();
4001
4002 let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4003 let token = tokio_util::sync::CancellationToken::new();
4004 let ctx = ConsumerContext::new(tx, token.clone());
4005
4006 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4007 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4008
4009 let client = reqwest::Client::new();
4010 let resp = client
4011 .post(format!("http://127.0.0.1:{port}/upload"))
4012 .header("Content-Length", "1000") .body("x".repeat(1000))
4014 .send()
4015 .await
4016 .unwrap();
4017
4018 assert_eq!(resp.status().as_u16(), 413);
4019
4020 token.cancel();
4021 }
4022
4023 #[tokio::test]
4027 async fn test_chunked_upload_without_content_length_bypasses_limit() {
4028 use bytes::Bytes;
4029 use camel_component_api::Body;
4030 use camel_component_api::ConsumerContext;
4031 use futures::stream;
4032
4033 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4034 let port = listener.local_addr().unwrap().port();
4035 drop(listener);
4036
4037 let component = HttpComponent::new();
4039 let endpoint_ctx = NoOpComponentContext;
4040 let endpoint = component
4041 .create_endpoint(
4042 &format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"),
4043 &endpoint_ctx,
4044 )
4045 .unwrap();
4046 let mut consumer = endpoint.create_consumer().unwrap();
4047
4048 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4049 let token = tokio_util::sync::CancellationToken::new();
4050 let ctx = ConsumerContext::new(tx, token.clone());
4051
4052 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4053 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4054
4055 let client = reqwest::Client::new();
4056
4057 let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
4061 Ok(Bytes::from("y".repeat(50))),
4062 Ok(Bytes::from("y".repeat(50))),
4063 ];
4064 let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
4065 let send_fut = client
4066 .post(format!("http://127.0.0.1:{port}/upload"))
4067 .body(stream_body)
4068 .send();
4069
4070 let consumer_fut = async {
4071 match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
4073 Ok(Some(mut envelope)) => {
4074 assert!(
4075 matches!(envelope.exchange.input.body, Body::Stream(_)),
4076 "expected Body::Stream"
4077 );
4078 envelope.exchange.input.body = camel_component_api::Body::Empty;
4079 if let Some(reply_tx) = envelope.reply_tx {
4080 let _ = reply_tx.send(Ok(envelope.exchange));
4081 }
4082 }
4083 Ok(None) => panic!("consumer channel closed unexpectedly"),
4084 Err(_) => {
4085 }
4088 }
4089 };
4090
4091 let (http_result, _) = tokio::join!(send_fut, consumer_fut);
4092
4093 let resp = http_result.unwrap();
4094 assert_ne!(
4096 resp.status().as_u16(),
4097 413,
4098 "chunked upload must not be rejected by maxRequestBody"
4099 );
4100 assert_eq!(resp.status().as_u16(), 200);
4101
4102 token.cancel();
4103 }
4104
4105 #[test]
4106 fn test_validate_url_for_ssrf_blocks_and_allows_hosts() {
4107 let mut cfg = HttpEndpointConfig::from_uri("http://example.com").unwrap();
4108 cfg.blocked_hosts = vec!["blocked.local".to_string()];
4109 cfg.allow_private_ips = false;
4110
4111 let blocked = validate_url_for_ssrf("http://blocked.local/api", &cfg);
4112 assert!(blocked.is_err());
4113
4114 let private_ip = validate_url_for_ssrf("http://127.0.0.1/api", &cfg);
4115 assert!(private_ip.is_err());
4116
4117 cfg.allow_private_ips = true;
4118 let allowed = validate_url_for_ssrf("http://127.0.0.1/api", &cfg);
4119 assert!(allowed.is_ok());
4120 }
4121
4122 #[test]
4123 fn test_is_private_ip_ranges() {
4124 assert!(is_private_ip(&"10.0.0.1".parse().unwrap())); assert!(is_private_ip(&"172.16.1.10".parse().unwrap())); assert!(is_private_ip(&"192.168.1.1".parse().unwrap())); assert!(is_private_ip(&"127.0.0.1".parse().unwrap())); assert!(is_private_ip(&"169.254.1.1".parse().unwrap())); assert!(is_private_ip(&"0.1.2.3".parse().unwrap())); assert!(is_private_ip(&"::1".parse().unwrap())); assert!(is_private_ip(&"fc00::1".parse().unwrap())); assert!(is_private_ip(&"fd12::1".parse().unwrap())); assert!(is_private_ip(&"fe80::1".parse().unwrap())); assert!(is_private_ip(&"::ffff:10.0.0.1".parse().unwrap())); assert!(is_private_ip(&"::ffff:192.168.1.1".parse().unwrap())); assert!(is_private_ip(&"::ffff:127.0.0.1".parse().unwrap())); assert!(!is_private_ip(&"8.8.8.8".parse().unwrap())); assert!(!is_private_ip(&"::ffff:8.8.8.8".parse().unwrap())); assert!(!is_private_ip(&"2001:4860:4860::8888".parse().unwrap())); }
4144
4145 #[tokio::test]
4146 async fn test_validate_resolved_host_for_ssrf_blocks_resolved_private_ip() {
4147 let mut cfg = HttpEndpointConfig::from_uri("http://example.com").unwrap(); cfg.allow_private_ips = false;
4149
4150 let err = validate_resolved_host_for_ssrf("http://localhost", &cfg)
4151 .await
4152 .expect_err("localhost must resolve to loopback and be blocked");
4153
4154 let msg = err.to_string();
4155 assert!(msg.contains("Target resolved to private IP"));
4156 }
4157
4158 #[test]
4159 fn test_title_case_header() {
4160 assert_eq!(title_case_header("content-type"), "Content-Type");
4161 assert_eq!(title_case_header("authorization"), "Authorization");
4162 assert_eq!(title_case_header("x-custom-header"), "X-Custom-Header");
4163 assert_eq!(title_case_header("host"), "Host");
4164 assert_eq!(title_case_header("x-b3-traceid"), "X-B3-Traceid");
4165 assert_eq!(title_case_header("single"), "Single");
4166 assert_eq!(title_case_header(""), "");
4167 }
4168
4169 #[test]
4170 fn test_resolve_url_combines_path_and_query_sources() {
4171 let cfg = HttpEndpointConfig::from_uri("http://example.com/base?foo=bar").unwrap();
4172 let mut exchange = Exchange::new(Message::default());
4173 exchange.input.set_header(
4174 "CamelHttpPath",
4175 serde_json::Value::String("next".to_string()),
4176 );
4177 let url = HttpProducer::resolve_url(&exchange, &cfg);
4178 assert!(url.starts_with("http://example.com/base/next?"));
4179 assert!(url.contains("foo=bar"));
4180
4181 exchange.input.set_header(
4182 "CamelHttpUri",
4183 serde_json::Value::String("http://other.test/root".to_string()),
4184 );
4185 exchange.input.set_header(
4186 "CamelHttpQuery",
4187 serde_json::Value::String("a=1&b=2".to_string()),
4188 );
4189
4190 let override_url = HttpProducer::resolve_url(&exchange, &cfg);
4191 assert_eq!(override_url, "http://other.test/root/next?a=1&b=2");
4192 }
4193
4194 #[test]
4195 fn test_http_producer_helpers_status_and_size_boundaries() {
4196 assert!(HttpProducer::is_ok_status(200, (200, 299)));
4197 assert!(HttpProducer::is_ok_status(299, (200, 299)));
4198 assert!(!HttpProducer::is_ok_status(199, (200, 299)));
4199 assert!(!HttpProducer::is_ok_status(300, (200, 299)));
4200
4201 assert!(!exceeds_max_response_body(10, 10));
4202 assert!(exceeds_max_response_body(11, 10));
4203 }
4204
4205 async fn setup_consumer_on_free_port(
4210 path: &str,
4211 ) -> (
4212 u16,
4213 tokio::sync::mpsc::Receiver<camel_component_api::ExchangeEnvelope>,
4214 tokio_util::sync::CancellationToken,
4215 ) {
4216 use camel_component_api::ConsumerContext;
4217
4218 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4219 let port = listener.local_addr().unwrap().port();
4220 drop(listener);
4221
4222 let consumer_cfg = HttpServerConfig {
4223 host: "127.0.0.1".to_string(),
4224 port,
4225 path: path.to_string(),
4226 max_request_body: 2 * 1024 * 1024,
4227 max_response_body: 10 * 1024 * 1024,
4228 max_inflight_requests: 1024,
4229 };
4230 let mut consumer = HttpConsumer::new(consumer_cfg);
4231
4232 let (tx, rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4233 let token = tokio_util::sync::CancellationToken::new();
4234 let ctx = ConsumerContext::new(tx, token.clone());
4235
4236 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4237 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4238
4239 (port, rx, token)
4240 }
4241
4242 #[tokio::test]
4243 async fn test_content_type_inferred_for_json_body() {
4244 let (port, mut rx, token) = setup_consumer_on_free_port("/json").await;
4245
4246 let client = reqwest::Client::new();
4247 let send_fut = client.get(format!("http://127.0.0.1:{port}/json")).send();
4248
4249 let (http_result, _) = tokio::join!(send_fut, async {
4250 if let Some(mut envelope) = rx.recv().await {
4251 envelope.exchange.input.body =
4252 camel_component_api::Body::Json(serde_json::json!({"message": "hello"}));
4253 if let Some(reply_tx) = envelope.reply_tx {
4254 let _ = reply_tx.send(Ok(envelope.exchange));
4255 }
4256 }
4257 });
4258
4259 let resp = http_result.unwrap();
4260 assert_eq!(resp.status().as_u16(), 200);
4261 let ct = resp
4262 .headers()
4263 .get("content-type")
4264 .expect("Content-Type header should be present");
4265 assert_eq!(ct, "application/json");
4266 let body = resp.text().await.unwrap();
4267 assert_eq!(body, r#"{"message":"hello"}"#);
4268
4269 token.cancel();
4270 }
4271
4272 #[tokio::test]
4273 async fn test_content_type_inferred_for_text_body() {
4274 let (port, mut rx, token) = setup_consumer_on_free_port("/text").await;
4275
4276 let client = reqwest::Client::new();
4277 let send_fut = client.get(format!("http://127.0.0.1:{port}/text")).send();
4278
4279 let (http_result, _) = tokio::join!(send_fut, async {
4280 if let Some(mut envelope) = rx.recv().await {
4281 envelope.exchange.input.body =
4282 camel_component_api::Body::Text("plain text response".to_string());
4283 if let Some(reply_tx) = envelope.reply_tx {
4284 let _ = reply_tx.send(Ok(envelope.exchange));
4285 }
4286 }
4287 });
4288
4289 let resp = http_result.unwrap();
4290 assert_eq!(resp.status().as_u16(), 200);
4291 let ct = resp
4292 .headers()
4293 .get("content-type")
4294 .expect("Content-Type header should be present");
4295 assert_eq!(ct, "text/plain; charset=utf-8");
4296 let body = resp.text().await.unwrap();
4297 assert_eq!(body, "plain text response");
4298
4299 token.cancel();
4300 }
4301
4302 #[tokio::test]
4303 async fn test_content_type_inferred_for_xml_body() {
4304 let (port, mut rx, token) = setup_consumer_on_free_port("/xml").await;
4305
4306 let client = reqwest::Client::new();
4307 let send_fut = client.get(format!("http://127.0.0.1:{port}/xml")).send();
4308
4309 let (http_result, _) = tokio::join!(send_fut, async {
4310 if let Some(mut envelope) = rx.recv().await {
4311 envelope.exchange.input.body =
4312 camel_component_api::Body::Xml("<root><item>value</item></root>".to_string());
4313 if let Some(reply_tx) = envelope.reply_tx {
4314 let _ = reply_tx.send(Ok(envelope.exchange));
4315 }
4316 }
4317 });
4318
4319 let resp = http_result.unwrap();
4320 assert_eq!(resp.status().as_u16(), 200);
4321 let ct = resp
4322 .headers()
4323 .get("content-type")
4324 .expect("Content-Type header should be present");
4325 assert_eq!(ct, "application/xml");
4326 let body = resp.text().await.unwrap();
4327 assert_eq!(body, "<root><item>value</item></root>");
4328
4329 token.cancel();
4330 }
4331
4332 #[tokio::test]
4333 async fn test_no_content_type_for_empty_body() {
4334 let (port, mut rx, token) = setup_consumer_on_free_port("/empty").await;
4335
4336 let client = reqwest::Client::new();
4337 let send_fut = client.get(format!("http://127.0.0.1:{port}/empty")).send();
4338
4339 let (http_result, _) = tokio::join!(send_fut, async {
4340 if let Some(mut envelope) = rx.recv().await {
4341 envelope.exchange.input.body = camel_component_api::Body::Empty;
4342 if let Some(reply_tx) = envelope.reply_tx {
4343 let _ = reply_tx.send(Ok(envelope.exchange));
4344 }
4345 }
4346 });
4347
4348 let resp = http_result.unwrap();
4349 assert_eq!(resp.status().as_u16(), 200);
4350 assert!(
4351 resp.headers().get("content-type").is_none(),
4352 "Empty body should not set Content-Type"
4353 );
4354
4355 token.cancel();
4356 }
4357
4358 #[tokio::test]
4359 async fn test_no_content_type_for_raw_bytes_body() {
4360 let (port, mut rx, token) = setup_consumer_on_free_port("/bytes").await;
4361
4362 let client = reqwest::Client::new();
4363 let send_fut = client.get(format!("http://127.0.0.1:{port}/bytes")).send();
4364
4365 let (http_result, _) = tokio::join!(send_fut, async {
4366 if let Some(mut envelope) = rx.recv().await {
4367 envelope.exchange.input.body =
4368 camel_component_api::Body::Bytes(bytes::Bytes::from_static(b"\x00\x01\x02"));
4369 if let Some(reply_tx) = envelope.reply_tx {
4370 let _ = reply_tx.send(Ok(envelope.exchange));
4371 }
4372 }
4373 });
4374
4375 let resp = http_result.unwrap();
4376 assert_eq!(resp.status().as_u16(), 200);
4377 assert!(
4378 resp.headers().get("content-type").is_none(),
4379 "Raw Bytes body should not set Content-Type"
4380 );
4381
4382 token.cancel();
4383 }
4384
4385 #[tokio::test]
4386 async fn test_content_type_from_stream_metadata() {
4387 use camel_component_api::{StreamBody, StreamMetadata};
4388 use futures::stream;
4389
4390 let (port, mut rx, token) = setup_consumer_on_free_port("/stream-ct").await;
4391
4392 let client = reqwest::Client::new();
4393 let send_fut = client
4394 .get(format!("http://127.0.0.1:{port}/stream-ct"))
4395 .send();
4396
4397 let (http_result, _) = tokio::join!(send_fut, async {
4398 if let Some(mut envelope) = rx.recv().await {
4399 let chunks: Vec<Result<bytes::Bytes, CamelError>> =
4400 vec![Ok(bytes::Bytes::from("audio data"))];
4401 let stream = Box::pin(stream::iter(chunks));
4402 envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
4403 stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
4404 metadata: StreamMetadata {
4405 size_hint: None,
4406 content_type: Some("audio/mpeg".to_string()),
4407 origin: None,
4408 },
4409 });
4410 if let Some(reply_tx) = envelope.reply_tx {
4411 let _ = reply_tx.send(Ok(envelope.exchange));
4412 }
4413 }
4414 });
4415
4416 let resp = http_result.unwrap();
4417 assert_eq!(resp.status().as_u16(), 200);
4418 let ct = resp
4419 .headers()
4420 .get("content-type")
4421 .expect("Content-Type header should be present");
4422 assert_eq!(ct, "audio/mpeg");
4423 let body = resp.text().await.unwrap();
4424 assert_eq!(body, "audio data");
4425
4426 token.cancel();
4427 }
4428
4429 #[tokio::test]
4430 async fn test_user_content_type_overrides_inferred() {
4431 let (port, mut rx, token) = setup_consumer_on_free_port("/override-ct").await;
4432
4433 let client = reqwest::Client::new();
4434 let send_fut = client
4435 .get(format!("http://127.0.0.1:{port}/override-ct"))
4436 .send();
4437
4438 let (http_result, _) = tokio::join!(send_fut, async {
4439 if let Some(mut envelope) = rx.recv().await {
4440 envelope.exchange.input.body =
4441 camel_component_api::Body::Json(serde_json::json!({"ok": true}));
4442 envelope.exchange.input.set_header(
4443 "Content-Type",
4444 serde_json::Value::String("text/html".to_string()),
4445 );
4446 if let Some(reply_tx) = envelope.reply_tx {
4447 let _ = reply_tx.send(Ok(envelope.exchange));
4448 }
4449 }
4450 });
4451
4452 let resp = http_result.unwrap();
4453 assert_eq!(resp.status().as_u16(), 200);
4454 let ct = resp
4455 .headers()
4456 .get("content-type")
4457 .expect("Content-Type header should be present");
4458 assert_eq!(
4459 ct, "text/html",
4460 "User-set Content-Type should take precedence over inferred type"
4461 );
4462
4463 token.cancel();
4464 }
4465
4466 #[tokio::test]
4467 async fn test_user_content_type_with_bytes_body() {
4468 let (port, mut rx, token) = setup_consumer_on_free_port("/bytes-ct").await;
4469
4470 let client = reqwest::Client::new();
4471 let send_fut = client
4472 .get(format!("http://127.0.0.1:{port}/bytes-ct"))
4473 .send();
4474
4475 let (http_result, _) = tokio::join!(send_fut, async {
4476 if let Some(mut envelope) = rx.recv().await {
4477 envelope.exchange.input.body =
4478 camel_component_api::Body::Bytes(bytes::Bytes::from_static(b"{\"ok\":true}"));
4479 envelope.exchange.input.set_header(
4480 "Content-Type",
4481 serde_json::Value::String("application/json".to_string()),
4482 );
4483 if let Some(reply_tx) = envelope.reply_tx {
4484 let _ = reply_tx.send(Ok(envelope.exchange));
4485 }
4486 }
4487 });
4488
4489 let resp = http_result.unwrap();
4490 assert_eq!(resp.status().as_u16(), 200);
4491 let ct = resp
4492 .headers()
4493 .get("content-type")
4494 .expect("Content-Type header should be present for Bytes body with user header");
4495 assert_eq!(
4496 ct, "application/json",
4497 "User Content-Type should be sent for Bytes body"
4498 );
4499
4500 token.cancel();
4501 }
4502
4503 #[tokio::test]
4508 async fn monitor_task_silent_on_clean_exit() {
4509 let handle: tokio::task::JoinHandle<()> = tokio::spawn(async {});
4510 monitor_axum_task(handle, "127.0.0.1:0".to_string()).await;
4512 }
4513
4514 #[tokio::test]
4515 async fn monitor_task_handles_panicked_task() {
4516 let handle: tokio::task::JoinHandle<()> = tokio::spawn(async {
4517 panic!("simulated server crash");
4518 });
4519 monitor_axum_task(handle, "127.0.0.1:9999".to_string()).await;
4521 }
4522
4523 #[test]
4528 fn http_auth_basic_debug_redacts_password() {
4529 let auth = HttpAuth::Basic {
4530 username: "admin".to_string(),
4531 password: "hunter2".to_string(),
4532 };
4533 let debug = format!("{:?}", auth);
4534 assert!(
4535 !debug.contains("hunter2"),
4536 "password must be redacted: {debug}"
4537 );
4538 assert!(debug.contains("admin"), "username should appear: {debug}");
4539 }
4540
4541 #[test]
4542 fn http_auth_bearer_debug_redacts_token() {
4543 let auth = HttpAuth::Bearer {
4544 token: "eyJhbGciOiJIUzI1NiJ9.secret".to_string(),
4545 };
4546 let debug = format!("{:?}", auth);
4547 assert!(
4548 !debug.contains("eyJhbGci"),
4549 "token must be redacted: {debug}"
4550 );
4551 }
4552
4553 #[test]
4554 fn http_auth_none_debug_shows_variant() {
4555 let debug = format!("{:?}", HttpAuth::None);
4556 assert!(
4557 debug.contains("None"),
4558 "None variant should appear: {debug}"
4559 );
4560 }
4561
4562 #[test]
4563 fn http_endpoint_config_debug_redacts_auth_credentials() {
4564 let config = HttpEndpointConfig::from_uri(
4565 "http://localhost/api?authMethod=Basic&authUsername=admin&authPassword=secret123",
4566 )
4567 .unwrap();
4568 let debug = format!("{:?}", config);
4569 assert!(
4570 !debug.contains("secret123"),
4571 "password must be redacted in HttpEndpointConfig debug: {debug}"
4572 );
4573 }
4574
4575 use crate::registry::{HttpRouteRegistry, MountMode, StaticMount};
4580 use tower_http::services::ServeDir;
4581
4582 fn make_test_registry() -> HttpRouteRegistry {
4583 HttpRouteRegistry::new()
4584 }
4585
4586 fn make_test_state(registry: HttpRouteRegistry) -> AppState {
4587 AppState {
4588 registry,
4589 max_request_body: 2 * 1024 * 1024,
4590 max_response_body: 10 * 1024 * 1024,
4591 inflight: Arc::new(tokio::sync::Semaphore::new(1024)),
4592 }
4593 }
4594
4595 #[allow(clippy::await_holding_lock)]
4596 #[tokio::test]
4597 async fn test_static_file_serving_serves_file_contents() {
4598 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
4599 ServerRegistry::reset();
4600
4601 let temp_dir =
4603 std::env::temp_dir().join(format!("http_static_test_{}", std::process::id()));
4604 std::fs::create_dir_all(&temp_dir).unwrap();
4605 std::fs::write(temp_dir.join("hello.txt"), "Hello, static world!").unwrap();
4606 std::fs::write(temp_dir.join("style.css"), "body { color: red; }").unwrap();
4607
4608 let canonical_dir = std::fs::canonicalize(&temp_dir).unwrap();
4609
4610 let registry = make_test_registry();
4611 let serve_dir = ServeDir::new(&canonical_dir)
4612 .precompressed_gzip()
4613 .precompressed_br()
4614 .append_index_html_on_directories(true);
4615
4616 let mount = StaticMount {
4617 mount_path: "/".to_string(),
4618 mode: MountMode::Static,
4619 dir: canonical_dir.clone(),
4620 cache_control: "public, max-age=3600".to_string(),
4621 error_pages: std::collections::HashMap::new(),
4622 serve_dir,
4623 };
4624 registry.register_static_mount(mount).await.unwrap();
4625
4626 let state = make_test_state(registry);
4627
4628 let req = Request::builder()
4630 .uri("/hello.txt")
4631 .body(AxumBody::empty())
4632 .unwrap();
4633 let resp = static_dispatch::dispatch_static(&state, req, "/hello.txt").await;
4634 assert_eq!(resp.status(), StatusCode::OK);
4635 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4636 .await
4637 .unwrap();
4638 assert_eq!(&body[..], b"Hello, static world!");
4639
4640 let req = Request::builder()
4642 .uri("/style.css")
4643 .body(AxumBody::empty())
4644 .unwrap();
4645 let resp = static_dispatch::dispatch_static(&state, req, "/style.css").await;
4646 assert_eq!(resp.status(), StatusCode::OK);
4647 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4648 .await
4649 .unwrap();
4650 assert_eq!(&body[..], b"body { color: red; }");
4651
4652 let req = Request::builder()
4654 .uri("/missing.txt")
4655 .body(AxumBody::empty())
4656 .unwrap();
4657 let resp = static_dispatch::dispatch_static(&state, req, "/missing.txt").await;
4658 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4659
4660 std::fs::remove_dir_all(&temp_dir).ok();
4662 }
4663
4664 #[allow(clippy::await_holding_lock)]
4665 #[tokio::test]
4666 async fn test_spa_fallback_serves_index_for_unknown_paths() {
4667 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
4668 ServerRegistry::reset();
4669
4670 let temp_dir = std::env::temp_dir().join(format!("http_spa_test_{}", std::process::id()));
4671 std::fs::create_dir_all(&temp_dir).unwrap();
4672 std::fs::write(temp_dir.join("index.html"), "<h1>SPA App</h1>").unwrap();
4673 std::fs::write(temp_dir.join("app.js"), "console.log('app')").unwrap();
4674
4675 let canonical_dir = std::fs::canonicalize(&temp_dir).unwrap();
4676
4677 let registry = make_test_registry();
4678 let serve_dir = ServeDir::new(&canonical_dir)
4679 .precompressed_gzip()
4680 .precompressed_br()
4681 .append_index_html_on_directories(true);
4682
4683 let mount = StaticMount {
4684 mount_path: "/".to_string(),
4685 mode: MountMode::Spa,
4686 dir: canonical_dir.clone(),
4687 cache_control: "public, max-age=0".to_string(),
4688 error_pages: std::collections::HashMap::new(),
4689 serve_dir,
4690 };
4691 registry.register_static_mount(mount).await.unwrap();
4693
4694 let state = make_test_state(registry);
4695
4696 let req = Request::builder()
4698 .method("GET")
4699 .uri("/dashboard")
4700 .header("Accept", "text/html")
4701 .body(AxumBody::empty())
4702 .unwrap();
4703 let resp = static_dispatch::dispatch_static(&state, req, "/dashboard").await;
4704 assert_eq!(resp.status(), StatusCode::OK);
4705 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4706 .await
4707 .unwrap();
4708 assert_eq!(&body[..], b"<h1>SPA App</h1>");
4709
4710 let req = Request::builder()
4712 .method("GET")
4713 .uri("/app.js")
4714 .body(AxumBody::empty())
4715 .unwrap();
4716 let resp = static_dispatch::dispatch_static(&state, req, "/app.js").await;
4717 assert_eq!(resp.status(), StatusCode::OK);
4718 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4719 .await
4720 .unwrap();
4721 assert_eq!(&body[..], b"console.log('app')");
4722
4723 let req = Request::builder()
4725 .method("GET")
4726 .uri("/api/data")
4727 .header("Accept", "application/json")
4728 .body(AxumBody::empty())
4729 .unwrap();
4730 let resp = static_dispatch::dispatch_static(&state, req, "/api/data").await;
4731 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4732
4733 let req = Request::builder()
4735 .method("GET")
4736 .uri("/style.css")
4737 .header("Accept", "text/html")
4738 .body(AxumBody::empty())
4739 .unwrap();
4740 let resp = static_dispatch::dispatch_static(&state, req, "/style.css").await;
4741 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4742
4743 std::fs::remove_dir_all(&temp_dir).ok();
4745 }
4746
4747 #[allow(clippy::await_holding_lock)]
4748 #[tokio::test]
4749 async fn test_error_page_mapping_serves_custom_404() {
4750 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
4751 ServerRegistry::reset();
4752
4753 let temp_dir = std::env::temp_dir().join(format!("http_error_test_{}", std::process::id()));
4754 let errors_dir = temp_dir.join("errors");
4755 std::fs::create_dir_all(&errors_dir).unwrap();
4756 std::fs::write(temp_dir.join("index.html"), "<h1>Home</h1>").unwrap();
4757 std::fs::write(errors_dir.join("404.html"), "<h1>Custom 404</h1>").unwrap();
4758
4759 let canonical_dir = std::fs::canonicalize(&temp_dir).unwrap();
4760 let canonical_404 = std::fs::canonicalize(errors_dir.join("404.html")).unwrap();
4761
4762 let registry = make_test_registry();
4763 let serve_dir = ServeDir::new(&canonical_dir)
4764 .precompressed_gzip()
4765 .precompressed_br()
4766 .append_index_html_on_directories(true);
4767
4768 let mut error_pages = std::collections::HashMap::new();
4769 error_pages.insert(404, canonical_404);
4770
4771 let mount = StaticMount {
4772 mount_path: "/".to_string(),
4773 mode: MountMode::Static,
4774 dir: canonical_dir.clone(),
4775 cache_control: "public, max-age=0".to_string(),
4776 error_pages,
4777 serve_dir,
4778 };
4779 registry.register_static_mount(mount).await.unwrap();
4780
4781 let state = make_test_state(registry);
4782
4783 let req = Request::builder()
4785 .method("GET")
4786 .uri("/missing.html")
4787 .body(AxumBody::empty())
4788 .unwrap();
4789 let resp = static_dispatch::dispatch_static(&state, req, "/missing.html").await;
4790 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4791 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4792 .await
4793 .unwrap();
4794 assert_eq!(&body[..], b"<h1>Custom 404</h1>");
4795
4796 let req = Request::builder()
4798 .method("GET")
4799 .uri("/index.html")
4800 .body(AxumBody::empty())
4801 .unwrap();
4802 let resp = static_dispatch::dispatch_static(&state, req, "/index.html").await;
4803 assert_eq!(resp.status(), StatusCode::OK);
4804 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4805 .await
4806 .unwrap();
4807 assert_eq!(&body[..], b"<h1>Home</h1>");
4808
4809 std::fs::remove_dir_all(&temp_dir).ok();
4811 }
4812}