1pub mod auth;
2pub mod bundle;
3pub mod config;
4pub mod health;
5pub mod registry;
6pub mod static_config;
7pub mod static_dispatch;
8pub mod static_endpoint;
9use crate::config::parse_ok_status_code_range;
10pub use bundle::HttpBundle;
11pub use bundle::HttpStaticBundle;
12pub use config::HttpConfig;
13pub use health::HttpHealthCheck;
14pub use registry::HttpRouteRegistry;
15pub use static_config::HttpStaticConfig;
16pub use static_endpoint::{HttpStaticComponent, HttpStaticConsumer, HttpStaticEndpoint};
17
18use std::collections::HashMap;
19use std::future::Future;
20use std::net::IpAddr;
21use std::pin::Pin;
22use std::sync::{Arc, Mutex, OnceLock};
23use std::task::{Context, Poll};
24use std::time::Duration;
25
26use tokio::sync::OnceCell;
27use tower::Layer;
28use tower::Service;
29use tracing::debug;
30
31use axum::body::BodyDataStream;
32use camel_auth::bearer_token_layer::BearerTokenLayer;
33use camel_auth::oauth2::TokenProvider;
34use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, StreamBody, StreamMetadata};
35use camel_component_api::{Component, Consumer, Endpoint, ProducerContext, RuntimeObservability};
36use camel_component_api::{UriComponents, UriConfig, parse_uri};
37use futures::TryStreamExt;
38use futures::stream::BoxStream;
39
40#[derive(Debug, Clone)]
101pub struct HttpEndpointConfig {
102 pub base_url: String,
103 pub http_method: Option<String>,
104 pub throw_exception_on_failure: bool,
105 pub ok_status_code_range: (u16, u16),
106 pub response_timeout: Option<Duration>,
107 pub query_params: HashMap<String, String>,
108 pub allow_private_ips: bool,
109 pub blocked_hosts: Vec<String>,
110 pub max_body_size: usize,
111 pub read_timeout_ms: u64,
112 pub max_response_bytes: usize,
113 pub auth: HttpAuth,
114 pub token_provider: Option<Arc<dyn TokenProvider>>,
115 pub user_agent: Option<String>,
116 pub cookie_handling: CookieHandling,
117 pub bridge_endpoint: bool,
118 pub connection_close: bool,
119 pub skip_request_headers: Vec<String>,
120 pub skip_response_headers: Vec<String>,
121}
122
123#[derive(Clone, PartialEq)]
124pub enum HttpAuth {
125 None,
126 Basic { username: String, password: String },
127 Bearer { token: String },
128}
129
130impl std::fmt::Debug for HttpAuth {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 match self {
133 HttpAuth::None => f.write_str("None"),
134 HttpAuth::Basic { username, .. } => f
135 .debug_struct("Basic")
136 .field("username", username)
137 .field("password", &"***")
138 .finish(),
139 HttpAuth::Bearer { .. } => f.debug_struct("Bearer").field("token", &"***").finish(),
140 }
141 }
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub enum CookieHandling {
146 Disabled,
147 InMemory,
148}
149
150const HTTP_CAMEL_OPTIONS: &[&str] = &[
152 "httpMethod",
153 "throwExceptionOnFailure",
154 "okStatusCodeRange",
155 "followRedirects",
156 "connectTimeout",
157 "responseTimeout",
158 "allowPrivateIps",
159 "blockedHosts",
160 "maxBodySize",
161 "readTimeout",
162 "maxResponseBytes",
163 "authMethod",
164 "authUsername",
165 "authPassword",
166 "authBearerToken",
167 "userAgent",
168 "cookieHandling",
169 "bridgeEndpoint",
170 "connectionClose",
171 "skipRequestHeaders",
172 "skipResponseHeaders",
173];
174
175impl UriConfig for HttpEndpointConfig {
176 fn scheme() -> &'static str {
178 "http"
179 }
180
181 fn from_uri(uri: &str) -> Result<Self, CamelError> {
182 let parts = parse_uri(uri)?;
183 Self::from_components(parts)
184 }
185
186 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
187 if parts.scheme != "http" && parts.scheme != "https" {
189 return Err(CamelError::InvalidUri(format!(
190 "expected scheme 'http' or 'https', got '{}'",
191 parts.scheme
192 )));
193 }
194
195 let base_url = format!("{}:{}", parts.scheme, parts.path);
198
199 let http_method = parts.params.get("httpMethod").cloned();
200
201 let throw_exception_on_failure = match parts.params.get("throwExceptionOnFailure") {
202 Some(v) => parse_bool_param_http(v).map_err(|e| {
203 CamelError::InvalidUri(format!("invalid value for throwExceptionOnFailure: {e}"))
204 })?,
205 None => true,
206 };
207
208 let ok_status_code_range = match parts.params.get("okStatusCodeRange") {
210 Some(v) => parse_ok_status_code_range(v)?,
211 None => (200, 299),
212 };
213
214 let response_timeout = match parts.params.get("responseTimeout") {
215 Some(v) => Some(v.parse::<u64>().map(Duration::from_millis).map_err(|e| {
216 CamelError::InvalidUri(format!("invalid value for responseTimeout: {e}"))
217 })?),
218 None => None,
219 };
220
221 let allow_private_ips = match parts.params.get("allowPrivateIps") {
223 Some(v) => parse_bool_param_http(v).map_err(|e| {
224 CamelError::InvalidUri(format!("invalid value for allowPrivateIps: {e}"))
225 })?,
226 None => false, };
228
229 let blocked_hosts = parts
231 .params
232 .get("blockedHosts")
233 .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
234 .unwrap_or_default();
235
236 let max_body_size = match parts.params.get("maxBodySize") {
237 Some(v) => v.parse::<usize>().map_err(|e| {
238 CamelError::InvalidUri(format!("invalid value for maxBodySize: {e}"))
239 })?,
240 None => 10 * 1024 * 1024, };
242
243 let read_timeout_ms = match parts.params.get("readTimeout") {
244 Some(v) => v.parse::<u64>().map_err(|e| {
245 CamelError::InvalidUri(format!("invalid value for readTimeout: {e}"))
246 })?,
247 None => 30_000, };
249
250 let max_response_bytes = match parts.params.get("maxResponseBytes") {
251 Some(v) => v.parse::<usize>().map_err(|e| {
252 CamelError::InvalidUri(format!("invalid value for maxResponseBytes: {e}"))
253 })?,
254 None => 10 * 1024 * 1024, };
256
257 let auth = parse_auth_from_params(&parts.params)?;
258
259 let user_agent = parts.params.get("userAgent").cloned();
260
261 let cookie_handling = match parts.params.get("cookieHandling") {
262 Some(v) if v.eq_ignore_ascii_case("inmemory") => CookieHandling::InMemory,
263 Some(v) if v.eq_ignore_ascii_case("disabled") => CookieHandling::Disabled,
264 Some(v) => {
265 return Err(CamelError::InvalidUri(format!(
266 "invalid value for cookieHandling: {v} (expected Disabled or InMemory)"
267 )));
268 }
269 None => CookieHandling::Disabled,
270 };
271
272 let bridge_endpoint = match parts.params.get("bridgeEndpoint") {
273 Some(v) => parse_bool_param_http(v).map_err(|e| {
274 CamelError::InvalidUri(format!("invalid value for bridgeEndpoint: {e}"))
275 })?,
276 None => false,
277 };
278
279 let connection_close = match parts.params.get("connectionClose") {
280 Some(v) => parse_bool_param_http(v).map_err(|e| {
281 CamelError::InvalidUri(format!("invalid value for connectionClose: {e}"))
282 })?,
283 None => false,
284 };
285
286 let skip_request_headers = parts
287 .params
288 .get("skipRequestHeaders")
289 .map(|v| {
290 v.split(',')
291 .map(str::trim)
292 .filter(|s| !s.is_empty())
293 .map(|s| s.to_ascii_lowercase())
294 .collect::<Vec<_>>()
295 })
296 .unwrap_or_default();
297
298 let skip_response_headers = parts
299 .params
300 .get("skipResponseHeaders")
301 .map(|v| {
302 v.split(',')
303 .map(str::trim)
304 .filter(|s| !s.is_empty())
305 .map(|s| s.to_ascii_lowercase())
306 .collect::<Vec<_>>()
307 })
308 .unwrap_or_default();
309
310 let query_params: HashMap<String, String> = parts
312 .params
313 .into_iter()
314 .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
315 .collect();
316
317 Ok(Self {
318 base_url,
319 http_method,
320 throw_exception_on_failure,
321 ok_status_code_range,
322 response_timeout,
323 query_params,
324 allow_private_ips,
325 blocked_hosts,
326 max_body_size,
327 read_timeout_ms,
328 max_response_bytes,
329 auth,
330 token_provider: None,
331 user_agent,
332 cookie_handling,
333 bridge_endpoint,
334 connection_close,
335 skip_request_headers,
336 skip_response_headers,
337 })
338 }
339}
340
341fn parse_auth_from_params(params: &HashMap<String, String>) -> Result<HttpAuth, CamelError> {
342 let Some(method) = params.get("authMethod") else {
343 return Ok(HttpAuth::None);
344 };
345
346 if method.eq_ignore_ascii_case("none") {
347 return Ok(HttpAuth::None);
348 }
349
350 if method.eq_ignore_ascii_case("basic") {
351 let username = params.get("authUsername").cloned().ok_or_else(|| {
352 CamelError::InvalidUri("authUsername is required for authMethod=Basic".to_string())
353 })?;
354 let password = params.get("authPassword").cloned().ok_or_else(|| {
355 CamelError::InvalidUri("authPassword is required for authMethod=Basic".to_string())
356 })?;
357 return Ok(HttpAuth::Basic { username, password });
358 }
359
360 if method.eq_ignore_ascii_case("bearer") {
361 let token = params.get("authBearerToken").cloned().ok_or_else(|| {
362 CamelError::InvalidUri("authBearerToken is required for authMethod=Bearer".to_string())
363 })?;
364 return Ok(HttpAuth::Bearer { token });
365 }
366
367 Err(CamelError::InvalidUri(format!(
368 "invalid value for authMethod: {method} (expected None, Basic, or Bearer)"
369 )))
370}
371
372fn parse_bool_param_http(value: &str) -> Result<bool, CamelError> {
373 match value.to_ascii_lowercase().as_str() {
374 "true" | "1" | "yes" => Ok(true),
375 "false" | "0" | "no" => Ok(false),
376 _ => Err(CamelError::InvalidUri(format!(
377 "invalid boolean value: '{value}'"
378 ))),
379 }
380}
381
382impl HttpEndpointConfig {
383 pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
384 let parts = parse_uri(uri)?;
385 let mut endpoint = Self::from_components(parts.clone())?;
386 if endpoint.response_timeout.is_none() {
387 endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
388 }
389 if !parts.params.contains_key("allowPrivateIps") {
390 endpoint.allow_private_ips = config.allow_private_ips;
391 }
392 if !parts.params.contains_key("blockedHosts") {
393 endpoint.blocked_hosts = config.blocked_hosts.clone();
394 }
395 if !parts.params.contains_key("maxBodySize") {
396 endpoint.max_body_size = config.max_body_size;
397 }
398 if !parts.params.contains_key("readTimeout") {
399 endpoint.read_timeout_ms = config.read_timeout_ms;
400 }
401 if !parts.params.contains_key("maxResponseBytes") {
402 endpoint.max_response_bytes = config.max_response_bytes;
403 }
404 if !parts.params.contains_key("okStatusCodeRange")
405 && let Some(range) = &config.ok_status_code_range
406 {
407 endpoint.ok_status_code_range = parse_ok_status_code_range(range)?;
408 }
409
410 Ok(endpoint)
411 }
412}
413
414#[derive(Debug, Clone)]
420pub struct HttpServerConfig {
421 pub host: String,
423 pub port: u16,
425 pub path: String,
427 pub max_request_body: usize,
429 pub max_response_body: usize,
431 pub max_inflight_requests: usize,
433}
434
435impl UriConfig for HttpServerConfig {
436 fn scheme() -> &'static str {
438 "http"
439 }
440
441 fn from_uri(uri: &str) -> Result<Self, CamelError> {
442 let parts = parse_uri(uri)?;
443 Self::from_components(parts)
444 }
445
446 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
447 if parts.scheme != "http" && parts.scheme != "https" {
449 return Err(CamelError::InvalidUri(format!(
450 "expected scheme 'http' or 'https', got '{}'",
451 parts.scheme
452 )));
453 }
454
455 let authority_and_path = parts.path.trim_start_matches('/');
458
459 let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
461 (&authority_and_path[..idx], &authority_and_path[idx..])
462 } else {
463 (authority_and_path, "/")
464 };
465
466 let path = if path_suffix.is_empty() {
467 "/"
468 } else {
469 path_suffix
470 }
471 .to_string();
472
473 let (host, port) = if let Some(colon) = authority.rfind(':') {
475 let port_str = &authority[colon + 1..];
476 match port_str.parse::<u16>() {
477 Ok(p) => (authority[..colon].to_string(), p),
478 Err(_) => {
479 return Err(CamelError::InvalidUri(format!(
480 "invalid port '{}' in authority",
481 port_str
482 )));
483 }
484 }
485 } else {
486 let default_port = if parts.scheme == "https" { 443 } else { 80 };
488 (authority.to_string(), default_port)
489 };
490
491 let max_request_body = parts
492 .params
493 .get("maxRequestBody")
494 .and_then(|v| v.parse::<usize>().ok())
495 .unwrap_or(2 * 1024 * 1024); let max_response_body = parts
498 .params
499 .get("maxResponseBody")
500 .and_then(|v| v.parse::<usize>().ok())
501 .unwrap_or(10 * 1024 * 1024); let max_inflight_requests = parts
504 .params
505 .get("maxInflightRequests")
506 .and_then(|v| v.parse::<usize>().ok())
507 .unwrap_or(1024);
508
509 Ok(Self {
510 host,
511 port,
512 path,
513 max_request_body,
514 max_response_body,
515 max_inflight_requests,
516 })
517 }
518}
519
520impl HttpServerConfig {
521 pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
522 let parts = parse_uri(uri)?;
523 let mut server = Self::from_components(parts.clone())?;
524 if !parts.params.contains_key("maxRequestBody") {
525 server.max_request_body = config.max_request_body;
526 }
527 if !parts.params.contains_key("maxResponseBody") {
528 server.max_response_body = config.max_body_size;
530 }
531 Ok(server)
532 }
533}
534
535pub enum HttpReplyBody {
543 Bytes(bytes::Bytes),
544 Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
545}
546
547pub struct RequestEnvelope {
552 pub method: String,
553 pub path: String,
554 pub query: String,
555 pub headers: http::HeaderMap,
556 pub body: StreamBody,
557 pub reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
558}
559
560pub struct HttpReply {
564 pub status: u16,
565 pub headers: Vec<(String, String)>,
566 pub body: HttpReplyBody,
567}
568
569type ServerKey = (String, u16);
574
575struct ServerHandle {
577 registry: HttpRouteRegistry,
578 max_request_body: usize,
579 max_response_body: usize,
580 max_inflight_requests: usize,
581}
582
583pub struct ServerRegistry {
585 inner: Mutex<HashMap<ServerKey, Arc<OnceCell<ServerHandle>>>>,
586}
587
588impl ServerRegistry {
589 pub fn global() -> &'static Self {
591 static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
592 INSTANCE.get_or_init(|| ServerRegistry {
593 inner: Mutex::new(HashMap::new()),
594 })
595 }
596
597 #[allow(clippy::too_many_arguments)]
600 pub async fn get_or_spawn(
601 &'static self,
602 host: &str,
603 port: u16,
604 max_request_body: usize,
605 max_response_body: usize,
606 max_inflight_requests: usize,
607 runtime: Arc<dyn RuntimeObservability>,
608 route_id: String,
609 ) -> Result<HttpRouteRegistry, CamelError> {
610 let host_owned = host.to_string();
611
612 let cell = {
613 let mut guard = self.inner.lock().map_err(|_| {
614 CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
615 })?;
616 let key = (host.to_string(), port);
617 guard
618 .entry(key)
619 .or_insert_with(|| Arc::new(OnceCell::new()))
620 .clone()
621 };
622
623 if let Some(existing) = cell.get()
624 && existing.max_request_body != max_request_body
625 {
626 return Err(CamelError::EndpointCreationFailed(format!(
627 "incompatible maxRequestBody for shared server (host={host}, port={port}): {} vs {}",
628 existing.max_request_body, max_request_body
629 )));
630 }
631
632 if let Some(existing) = cell.get()
633 && existing.max_response_body != max_response_body
634 {
635 return Err(CamelError::EndpointCreationFailed(format!(
636 "incompatible maxResponseBody for shared server (host={host}, port={port}): {} vs {}",
637 existing.max_response_body, max_response_body
638 )));
639 }
640
641 if let Some(existing) = cell.get()
642 && existing.max_inflight_requests != max_inflight_requests
643 {
644 return Err(CamelError::EndpointCreationFailed(format!(
645 "incompatible maxInflightRequests for shared server (host={host}, port={port}): {} vs {}",
646 existing.max_inflight_requests, max_inflight_requests
647 )));
648 }
649
650 let handle = cell
651 .get_or_try_init(|| {
652 let rt = Arc::clone(&runtime);
653 let rid = route_id.clone();
654 async move {
655 let addr = format!("{host_owned}:{port}");
656 let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
657 CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
658 })?;
659 let registry = HttpRouteRegistry::new();
660 let inflight = Arc::new(tokio::sync::Semaphore::new(max_inflight_requests));
661 let task = tokio::spawn(run_axum_server(
662 listener,
663 registry.clone(),
664 max_request_body,
665 max_response_body,
666 Arc::clone(&inflight),
667 Arc::clone(&rt),
668 rid.clone(),
669 ));
670 let addr_for_monitor = format!("{host_owned}:{port}");
671 tokio::spawn(monitor_axum_task(
672 task,
673 addr_for_monitor,
674 Arc::clone(&rt),
675 rid,
676 ));
677 Ok::<ServerHandle, CamelError>(ServerHandle {
678 registry,
679 max_request_body,
680 max_response_body,
681 max_inflight_requests,
682 })
683 }
684 })
685 .await?;
686
687 Ok(handle.registry.clone())
688 }
689
690 #[cfg(test)]
697 pub fn reset() {
698 let instance = Self::global();
699 let mut guard = instance
700 .inner
701 .lock()
702 .expect("ServerRegistry lock poisoned during test reset");
703 guard.clear();
704 }
705}
706
707use axum::{
712 Router,
713 body::Body as AxumBody,
714 extract::{Request, State},
715 http::{Response, StatusCode},
716 response::IntoResponse,
717};
718
719#[derive(Clone)]
720pub(crate) struct AppState {
721 registry: HttpRouteRegistry,
722 max_request_body: usize,
723 max_response_body: usize,
724 inflight: Arc<tokio::sync::Semaphore>,
725}
726
727async fn run_axum_server(
728 listener: tokio::net::TcpListener,
729 registry: HttpRouteRegistry,
730 max_request_body: usize,
731 max_response_body: usize,
732 inflight: Arc<tokio::sync::Semaphore>,
733 runtime: Arc<dyn RuntimeObservability>,
734 route_id: String,
735) {
736 let state = AppState {
737 registry,
738 max_request_body,
739 max_response_body,
740 inflight,
741 };
742 let app = Router::new().fallback(dispatch_handler).with_state(state);
743
744 axum::serve(listener, app).await.unwrap_or_else(|e| {
745 runtime
746 .metrics()
747 .increment_errors(&route_id, "e:http:accept");
748 tracing::error!(error = %e, "Axum server error");
750 });
751}
752
753async fn monitor_axum_task(
761 handle: tokio::task::JoinHandle<()>,
762 addr: String,
763 runtime: Arc<dyn RuntimeObservability>,
764 route_id: String,
765) {
766 match handle.await {
767 Ok(()) => {
768 }
770 Err(join_err) => {
771 runtime
772 .metrics()
773 .increment_errors(&route_id, "e:http:server-task-exited");
774 tracing::error!(
776 addr = %addr,
777 error = %join_err,
778 "Axum server task exited unexpectedly — all routes on this port are now dead"
779 );
780 }
781 }
782}
783
784async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
785 let path = req.uri().path().to_owned();
786
787 let api_sender = {
789 let inner = state.registry.inner.read().await;
790 inner.api_routes.get(&path).cloned()
791 }; if let Some(sender) = api_sender {
794 let method = req.method().to_string();
795 let query = req.uri().query().unwrap_or("").to_string();
796 let headers = req.headers().clone();
797
798 let content_length: Option<u64> = headers
800 .get(http::header::CONTENT_LENGTH)
801 .and_then(|v| v.to_str().ok())
802 .and_then(|s| s.parse().ok());
803
804 if let Some(len) = content_length
805 && len > state.max_request_body as u64
806 {
807 return Response::builder()
808 .status(StatusCode::PAYLOAD_TOO_LARGE)
809 .body(AxumBody::from("Request body exceeds configured limit"))
810 .expect("infallible"); }
812
813 let _permit = match Arc::clone(&state.inflight).try_acquire_owned() {
814 Ok(permit) => permit,
815 Err(_) => {
816 return Response::builder()
817 .status(StatusCode::SERVICE_UNAVAILABLE)
818 .body(AxumBody::from("Service Unavailable"))
819 .expect("infallible"); }
821 };
822
823 let content_type = headers
825 .get(http::header::CONTENT_TYPE)
826 .and_then(|v| v.to_str().ok())
827 .map(|s| s.to_string());
828
829 let data_stream: BodyDataStream = req.into_body().into_data_stream();
830 let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
831 let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
832
833 let stream_body = StreamBody {
834 stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
835 metadata: StreamMetadata {
836 size_hint: content_length,
837 content_type,
838 origin: None,
839 },
840 };
841
842 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
843 let envelope = RequestEnvelope {
844 method,
845 path,
846 query,
847 headers,
848 body: stream_body,
849 reply_tx,
850 };
851
852 if sender.send(envelope).await.is_err() {
853 return Response::builder()
854 .status(StatusCode::SERVICE_UNAVAILABLE)
855 .body(AxumBody::from("Consumer unavailable"))
856 .expect("infallible"); }
858
859 match reply_rx.await {
860 Ok(reply) => {
861 let reply = match reply.body {
862 HttpReplyBody::Bytes(b)
863 if exceeds_max_response_body(b.len(), state.max_response_body) =>
864 {
865 HttpReply {
866 status: 500,
867 headers: vec![],
868 body: HttpReplyBody::Bytes(bytes::Bytes::from(
869 "Response body exceeds configured limit",
870 )),
871 }
872 }
873 _ => reply,
874 };
875
876 let status =
877 StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
878 let mut builder = Response::builder().status(status);
879 for (k, v) in &reply.headers {
880 builder = builder.header(k.as_str(), v.as_str());
881 }
882 match reply.body {
883 HttpReplyBody::Bytes(b) => {
884 builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
885 Response::builder()
886 .status(StatusCode::INTERNAL_SERVER_ERROR)
887 .body(AxumBody::from("Invalid response headers from consumer"))
888 .expect("infallible") })
890 }
891 HttpReplyBody::Stream(stream) => builder
892 .body(AxumBody::from_stream(stream))
893 .unwrap_or_else(|_| {
894 Response::builder()
895 .status(StatusCode::INTERNAL_SERVER_ERROR)
896 .body(AxumBody::from("Invalid response headers from consumer"))
897 .expect("infallible") }),
899 }
900 }
901 Err(_) => Response::builder()
902 .status(StatusCode::INTERNAL_SERVER_ERROR)
903 .body(AxumBody::from("Pipeline error"))
904 .expect("infallible"), }
906 } else {
907 static_dispatch::dispatch_static(&state, req, &path).await
909 }
910}
911
912fn exceeds_max_response_body(len: usize, max: usize) -> bool {
913 len > max
914}
915
916fn title_case_header(name: &str) -> String {
917 name.split('-')
918 .map(|part| {
919 let mut chars = part.chars();
920 match chars.next() {
921 None => String::new(),
922 Some(first) => first.to_uppercase().chain(chars.as_str().chars()).collect(),
923 }
924 })
925 .collect::<Vec<_>>()
926 .join("-")
927}
928
929pub struct HttpConsumer {
934 config: HttpServerConfig,
935 runtime: Arc<dyn RuntimeObservability>,
937}
938
939impl HttpConsumer {
940 pub fn new(config: HttpServerConfig, runtime: Arc<dyn RuntimeObservability>) -> Self {
941 Self { config, runtime }
942 }
943}
944
945#[async_trait::async_trait]
946impl Consumer for HttpConsumer {
947 async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
948 use camel_component_api::{Body, Exchange, Message};
949
950 let registry = ServerRegistry::global()
951 .get_or_spawn(
952 &self.config.host,
953 self.config.port,
954 self.config.max_request_body,
955 self.config.max_response_body,
956 self.config.max_inflight_requests,
957 self.runtime.clone(),
958 ctx.route_id().to_string(),
959 )
960 .await?;
961
962 let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
964 registry
965 .register_api_route(self.config.path.clone(), env_tx)
966 .await;
967
968 let path = self.config.path.clone();
969 let registry_for_cleanup = registry.clone();
970 let cancel_token = ctx.cancel_token();
971 loop {
972 tokio::select! {
973 _ = ctx.cancelled() => {
974 break;
975 }
976 envelope = env_rx.recv() => {
977 let Some(envelope) = envelope else { break; };
978
979 let mut msg = Message::default();
981
982 msg.set_header("CamelHttpMethod",
984 serde_json::Value::String(envelope.method.clone()));
985 msg.set_header("CamelHttpPath",
986 serde_json::Value::String(envelope.path.clone()));
987 msg.set_header("CamelHttpQuery",
988 serde_json::Value::String(envelope.query.clone()));
989
990 for (k, v) in &envelope.headers {
992 if let Ok(val_str) = v.to_str() {
993 msg.set_header(
994 title_case_header(k.as_str()),
995 serde_json::Value::String(val_str.to_string()),
996 );
997 }
998 }
999
1000 msg.body = Body::Stream(envelope.body);
1003
1004 #[allow(unused_mut)]
1005 let mut exchange = Exchange::new(msg);
1006
1007 #[cfg(feature = "otel")]
1009 {
1010 let headers: HashMap<String, String> = envelope
1011 .headers
1012 .iter()
1013 .filter_map(|(k, v)| {
1014 Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
1015 })
1016 .collect();
1017 camel_otel::extract_into_exchange(&mut exchange, &headers);
1018 }
1019
1020 let reply_tx = envelope.reply_tx;
1021 let sender = ctx.sender().clone();
1022 let path_clone = path.clone();
1023 let cancel = cancel_token.clone();
1024
1025 tokio::spawn(async move {
1045 if cancel.is_cancelled() {
1053 let _ = reply_tx.send(HttpReply {
1054 status: 503,
1055 headers: vec![],
1056 body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
1057 });
1058 return;
1059 }
1060
1061 let (tx, rx) = tokio::sync::oneshot::channel();
1063 let envelope = camel_component_api::consumer::ExchangeEnvelope {
1064 exchange,
1065 reply_tx: Some(tx),
1066 };
1067
1068 let result = match sender.send(envelope).await {
1069 Ok(()) => rx.await.map_err(|_| camel_component_api::CamelError::ChannelClosed),
1070 Err(_) => Err(camel_component_api::CamelError::ChannelClosed),
1071 }
1072 .and_then(|r| r);
1073
1074 let reply = match result {
1075 Ok(out) => {
1076 let status = out
1077 .input
1078 .header("CamelHttpResponseCode")
1079 .and_then(|v| {
1080 let raw = v.as_u64()
1081 .or_else(|| v.as_str().and_then(|s| s.parse().ok()))?;
1082 let code = raw as u16;
1083 (100..1000).contains(&code).then_some(code)
1084 })
1085 .unwrap_or(200);
1086
1087 let user_content_type = out
1088 .input
1089 .header("Content-Type")
1090 .and_then(|v| v.as_str().map(|s| s.to_string()));
1091
1092 let (reply_body, inferred_content_type): (HttpReplyBody, Option<String>) = match out.input.body {
1093 Body::Empty => (HttpReplyBody::Bytes(bytes::Bytes::new()), None),
1094 Body::Bytes(b) => (HttpReplyBody::Bytes(b), None),
1095 Body::Text(s) => (HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())), Some("text/plain; charset=utf-8".to_string())),
1096 Body::Xml(s) => (HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())), Some("application/xml".to_string())),
1097 Body::Json(v) => (HttpReplyBody::Bytes(bytes::Bytes::from(
1098 v.to_string().into_bytes(),
1099 )), Some("application/json".to_string())),
1100 Body::Stream(s) => {
1101 let ct = s.metadata.content_type.clone();
1102 match s.stream.lock().await.take() {
1103 Some(stream) => (
1104 HttpReplyBody::Stream(stream),
1105 ct,
1106 ),
1107 None => {
1108 tracing::error!(
1110 "Body::Stream already consumed before HTTP reply — returning 500"
1111 );
1112 let error_reply = HttpReply {
1113 status: 500,
1114 headers: vec![],
1115 body: HttpReplyBody::Bytes(bytes::Bytes::new()),
1116 };
1117 if reply_tx.send(error_reply).is_err() {
1118 debug!("reply_tx dropped before error reply could be sent");
1119 }
1120 return;
1121 }
1122 }
1123 }
1124 };
1125
1126 let mut resp_headers: Vec<(String, String)> = out
1127 .input
1128 .headers
1129 .iter()
1130 .filter(|(k, _)| !k.starts_with("Camel"))
1131 .filter(|(k, _)| {
1132 !matches!(
1133 k.to_lowercase().as_str(),
1134 "content-length"
1135 | "content-type"
1136 | "transfer-encoding"
1137 | "connection"
1138 | "cache-control"
1139 | "date"
1140 | "pragma"
1141 | "trailer"
1142 | "upgrade"
1143 | "via"
1144 | "warning"
1145 | "host"
1146 | "user-agent"
1147 | "accept"
1148 | "accept-encoding"
1149 | "accept-language"
1150 | "accept-charset"
1151 | "authorization"
1152 | "proxy-authorization"
1153 | "cookie"
1154 | "expect"
1155 | "from"
1156 | "if-match"
1157 | "if-modified-since"
1158 | "if-none-match"
1159 | "if-range"
1160 | "if-unmodified-since"
1161 | "max-forwards"
1162 | "proxy-connection"
1163 | "range"
1164 | "referer"
1165 | "te"
1166 )
1167 })
1168 .filter_map(|(k, v)| {
1169 v.as_str().map(|s| (k.clone(), s.to_string()))
1170 })
1171 .collect();
1172
1173 let content_type = user_content_type
1174 .or(inferred_content_type);
1175 if let Some(ct) = content_type {
1176 resp_headers.push(("Content-Type".to_string(), ct));
1177 }
1178
1179 HttpReply {
1180 status,
1181 headers: resp_headers,
1182 body: reply_body,
1183 }
1184 }
1185 Err(CamelError::Stopped) => {
1186 tracing::debug!(path = %path_clone, "Route stopped — returning 204 No Content");
1187 HttpReply {
1188 status: 204,
1189 headers: vec![],
1190 body: HttpReplyBody::Bytes(bytes::Bytes::new()),
1191 }
1192 }
1193 Err(CamelError::Unauthenticated(msg)) => {
1194 tracing::warn!(error = %msg, path = %path_clone, "Authentication failed");
1195 HttpReply {
1196 status: 401,
1197 headers: vec![("WWW-Authenticate".to_string(), "Bearer".to_string())],
1198 body: HttpReplyBody::Bytes(bytes::Bytes::from("Unauthorized")),
1199 }
1200 }
1201 Err(CamelError::Unauthorized(msg)) => {
1202 tracing::warn!(error = %msg, path = %path_clone, "Authorization failed");
1203 HttpReply {
1204 status: 403,
1205 headers: vec![],
1206 body: HttpReplyBody::Bytes(bytes::Bytes::from("Forbidden")),
1207 }
1208 }
1209 Err(e) => {
1210 tracing::warn!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
1212 HttpReply {
1213 status: 500,
1214 headers: vec![],
1215 body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
1216 }
1217 }
1218 };
1219
1220 let _ = reply_tx.send(reply);
1222 });
1223 }
1224 }
1225 }
1226
1227 registry_for_cleanup.unregister_api_route(&path).await;
1229
1230 Ok(())
1231 }
1232
1233 async fn stop(&mut self) -> Result<(), CamelError> {
1234 Ok(())
1235 }
1236
1237 fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1238 camel_component_api::ConcurrencyModel::Concurrent { max: None }
1239 }
1240}
1241
1242pub struct HttpComponent {
1247 config: HttpConfig,
1248}
1249
1250fn build_client(config: &HttpConfig, cookie_handling: CookieHandling) -> reqwest::Client {
1251 let mut builder = reqwest::Client::builder()
1252 .connect_timeout(Duration::from_millis(config.connect_timeout_ms))
1253 .pool_max_idle_per_host(config.pool_max_idle_per_host)
1254 .pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
1255
1256 if !config.follow_redirects {
1257 builder = builder.redirect(reqwest::redirect::Policy::none());
1258 } else if let Some(max_redirects) = config.max_redirects {
1259 builder = builder.redirect(reqwest::redirect::Policy::limited(max_redirects));
1260 }
1261
1262 if let Some(proxy_url) = &config.proxy_url
1263 && let Ok(proxy) = reqwest::Proxy::all(proxy_url)
1264 {
1265 builder = builder.proxy(proxy);
1266 }
1267
1268 if matches!(cookie_handling, CookieHandling::InMemory) {
1269 }
1271
1272 if let Some(tls) = &config.tls
1273 && tls.enabled
1274 {
1275 if tls.insecure || !tls.verify_peer {
1276 builder = builder.danger_accept_invalid_certs(true);
1277 }
1278
1279 if let Some(ca_path) = &tls.ca_cert_path
1280 && let Ok(ca_bytes) = std::fs::read(ca_path)
1281 {
1282 let cert = reqwest::Certificate::from_pem(&ca_bytes)
1283 .or_else(|_| reqwest::Certificate::from_der(&ca_bytes));
1284 if let Ok(ca_cert) = cert {
1285 builder = builder.add_root_certificate(ca_cert);
1286 }
1287 }
1288
1289 if let (Some(cert_path), Some(key_path)) = (&tls.client_cert_path, &tls.client_key_path)
1290 && let (Ok(cert_bytes), Ok(key_bytes)) =
1291 (std::fs::read(cert_path), std::fs::read(key_path))
1292 {
1293 let mut identity_pem = cert_bytes;
1294 identity_pem.extend_from_slice(&key_bytes);
1295 if let Ok(identity) = reqwest::Identity::from_pem(&identity_pem) {
1296 builder = builder.identity(identity);
1297 }
1298 }
1299 }
1300
1301 builder
1302 .build()
1303 .expect("reqwest::Client::build() with valid config should not fail") }
1305
1306impl HttpComponent {
1307 pub fn new() -> Self {
1308 let config = HttpConfig::default();
1309 Self { config }
1310 }
1311
1312 pub fn with_config(config: HttpConfig) -> Self {
1313 Self { config }
1314 }
1315
1316 pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1317 match config {
1318 Some(cfg) => Self::with_config(cfg),
1319 None => Self::new(),
1320 }
1321 }
1322}
1323
1324impl Default for HttpComponent {
1325 fn default() -> Self {
1326 Self::new()
1327 }
1328}
1329
1330impl Component for HttpComponent {
1331 fn scheme(&self) -> &str {
1332 "http"
1333 }
1334
1335 fn create_endpoint(
1336 &self,
1337 uri: &str,
1338 ctx: &dyn camel_component_api::ComponentContext,
1339 ) -> Result<Box<dyn Endpoint>, CamelError> {
1340 self.config.validate()?;
1341 let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1342 let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1343 let client = build_client(&self.config, config.cookie_handling);
1344 ctx.register_current_route_health_check(Arc::new(HttpHealthCheck::new(
1345 server_config.host.clone(),
1346 server_config.port,
1347 )));
1348 Ok(Box::new(HttpEndpoint {
1349 uri: uri.to_string(),
1350 config,
1351 server_config,
1352 client,
1353 }))
1354 }
1355}
1356
1357pub struct HttpsComponent {
1358 config: HttpConfig,
1359}
1360
1361impl HttpsComponent {
1362 pub fn new() -> Self {
1363 let config = HttpConfig::default();
1364 Self { config }
1365 }
1366
1367 pub fn with_config(config: HttpConfig) -> Self {
1368 Self { config }
1369 }
1370
1371 pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1372 match config {
1373 Some(cfg) => Self::with_config(cfg),
1374 None => Self::new(),
1375 }
1376 }
1377}
1378
1379impl Default for HttpsComponent {
1380 fn default() -> Self {
1381 Self::new()
1382 }
1383}
1384
1385impl Component for HttpsComponent {
1386 fn scheme(&self) -> &str {
1387 "https"
1388 }
1389
1390 fn create_endpoint(
1391 &self,
1392 uri: &str,
1393 ctx: &dyn camel_component_api::ComponentContext,
1394 ) -> Result<Box<dyn Endpoint>, CamelError> {
1395 self.config.validate()?;
1396 let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1397 let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1398 let client = build_client(&self.config, config.cookie_handling);
1399 ctx.register_current_route_health_check(Arc::new(HttpHealthCheck::new(
1400 server_config.host.clone(),
1401 server_config.port,
1402 )));
1403 Ok(Box::new(HttpEndpoint {
1404 uri: uri.to_string(),
1405 config,
1406 server_config,
1407 client,
1408 }))
1409 }
1410}
1411
1412struct HttpEndpoint {
1417 uri: String,
1418 config: HttpEndpointConfig,
1419 server_config: HttpServerConfig,
1420 client: reqwest::Client,
1421}
1422
1423impl Endpoint for HttpEndpoint {
1424 fn uri(&self) -> &str {
1425 &self.uri
1426 }
1427
1428 fn create_consumer(
1429 &self,
1430 rt: Arc<dyn camel_component_api::RuntimeObservability>,
1431 ) -> Result<Box<dyn Consumer>, CamelError> {
1432 Ok(Box::new(HttpConsumer::new(self.server_config.clone(), rt)))
1433 }
1434
1435 fn create_producer(
1436 &self,
1437 _rt: Arc<dyn camel_component_api::RuntimeObservability>,
1438 _ctx: &ProducerContext,
1439 ) -> Result<BoxProcessor, CamelError> {
1440 let producer = HttpProducer {
1441 config: Arc::new(self.config.clone()),
1442 client: self.client.clone(),
1443 };
1444 if let Some(ref provider) = self.config.token_provider {
1445 let layer = BearerTokenLayer::new(Arc::clone(provider));
1446 Ok(BoxProcessor::new(layer.layer(producer)))
1447 } else {
1448 Ok(BoxProcessor::new(producer))
1449 }
1450 }
1451}
1452
1453fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
1458 let parsed = url::Url::parse(url)
1459 .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1460
1461 if let Some(host) = parsed.host_str()
1463 && config.blocked_hosts.iter().any(|blocked| host == blocked)
1464 {
1465 return Err(CamelError::ProcessorError(format!(
1466 "Host '{}' is blocked",
1467 host
1468 )));
1469 }
1470
1471 if !config.allow_private_ips
1473 && let Some(host) = parsed.host()
1474 {
1475 match host {
1476 url::Host::Ipv4(ip) => {
1477 if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
1478 return Err(CamelError::ProcessorError(format!(
1479 "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
1480 ip
1481 )));
1482 }
1483 }
1484 url::Host::Ipv6(ip) => {
1485 if ip.is_loopback() {
1486 return Err(CamelError::ProcessorError(format!(
1487 "Loopback IP '{}' not allowed",
1488 ip
1489 )));
1490 }
1491 }
1492 url::Host::Domain(domain) => {
1493 let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
1495 if blocked_domains.contains(&domain) {
1496 return Err(CamelError::ProcessorError(format!(
1497 "Domain '{}' is not allowed",
1498 domain
1499 )));
1500 }
1501 }
1502 }
1503 }
1504
1505 Ok(())
1506}
1507
1508fn is_private_ip(ip: &IpAddr) -> bool {
1509 match ip {
1510 IpAddr::V4(ipv4) => {
1511 ipv4.is_private() || ipv4.is_loopback() || ipv4.is_link_local() || ipv4.octets()[0] == 0
1512 }
1513 IpAddr::V6(ipv6) => {
1514 let seg0 = ipv6.segments()[0];
1515 ipv6.is_loopback()
1516 || (seg0 & 0xfe00) == 0xfc00
1518 || (seg0 & 0xffc0) == 0xfe80
1520 || ipv6
1522 .to_ipv4_mapped()
1523 .map(|v4| {
1524 v4.is_private()
1525 || v4.is_loopback()
1526 || v4.is_link_local()
1527 || v4.octets()[0] == 0
1528 })
1529 .unwrap_or(false)
1530 }
1531 }
1532}
1533
1534async fn validate_resolved_host_for_ssrf(
1535 url: &str,
1536 config: &HttpEndpointConfig,
1537) -> Result<(), CamelError> {
1538 if config.allow_private_ips {
1539 return Ok(());
1540 }
1541
1542 let parsed = url::Url::parse(url)
1543 .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1544 let Some(host) = parsed.host_str() else {
1545 return Ok(());
1546 };
1547 let Some(port) = parsed.port_or_known_default() else {
1548 return Ok(());
1549 };
1550
1551 let resolved = tokio::net::lookup_host((host, port)).await.map_err(|e| {
1552 CamelError::ProcessorError(format!("Failed to resolve host '{}': {}", host, e))
1553 })?;
1554
1555 for addr in resolved {
1556 let ip = addr.ip();
1557 if is_private_ip(&ip) {
1558 return Err(CamelError::ProcessorError(format!(
1559 "Target resolved to private IP: {}",
1560 ip
1561 )));
1562 }
1563 }
1564
1565 Ok(())
1566}
1567
1568#[derive(Clone)]
1573struct HttpProducer {
1574 config: Arc<HttpEndpointConfig>,
1575 client: reqwest::Client,
1576}
1577
1578impl HttpProducer {
1579 fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1580 if let Some(ref method) = config.http_method {
1581 return method.to_uppercase();
1582 }
1583 if let Some(method) = exchange
1584 .input
1585 .header("CamelHttpMethod")
1586 .and_then(|v| v.as_str())
1587 {
1588 return method.to_uppercase();
1589 }
1590 if !exchange.input.body.is_empty() {
1591 return "POST".to_string();
1592 }
1593 "GET".to_string()
1594 }
1595
1596 fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1597 if let Some(uri) = exchange
1598 .input
1599 .header("CamelHttpUri")
1600 .and_then(|v| v.as_str())
1601 {
1602 let mut url = uri.to_string();
1603 if let Some(path) = exchange
1604 .input
1605 .header("CamelHttpPath")
1606 .and_then(|v| v.as_str())
1607 {
1608 if !url.ends_with('/') && !path.starts_with('/') {
1609 url.push('/');
1610 }
1611 url.push_str(path);
1612 }
1613 if let Some(query) = exchange
1614 .input
1615 .header("CamelHttpQuery")
1616 .and_then(|v| v.as_str())
1617 {
1618 url.push('?');
1619 url.push_str(query);
1620 }
1621 return url;
1622 }
1623
1624 let mut url = config.base_url.clone();
1625
1626 if let Some(path) = exchange
1627 .input
1628 .header("CamelHttpPath")
1629 .and_then(|v| v.as_str())
1630 {
1631 if !url.ends_with('/') && !path.starts_with('/') {
1632 url.push('/');
1633 }
1634 url.push_str(path);
1635 }
1636
1637 if let Some(query) = exchange
1638 .input
1639 .header("CamelHttpQuery")
1640 .and_then(|v| v.as_str())
1641 {
1642 url.push('?');
1643 url.push_str(query);
1644 } else if !config.query_params.is_empty() {
1645 let mut parsed = url::Url::parse(&url).expect("base URL must be valid"); for (k, v) in &config.query_params {
1647 parsed.query_pairs_mut().append_pair(k, v);
1648 }
1649 url = parsed.to_string();
1650 }
1651
1652 url
1653 }
1654
1655 fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1656 status >= range.0 && status <= range.1
1657 }
1658}
1659
1660impl Service<Exchange> for HttpProducer {
1661 type Response = Exchange;
1662 type Error = CamelError;
1663 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1664
1665 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1666 Poll::Ready(Ok(()))
1667 }
1668
1669 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1670 let config = self.config.clone();
1671 let client = self.client.clone();
1672
1673 Box::pin(async move {
1674 let method_str = HttpProducer::resolve_method(&exchange, &config);
1675 let url = HttpProducer::resolve_url(&exchange, &config);
1676
1677 validate_url_for_ssrf(&url, &config)?;
1679 validate_resolved_host_for_ssrf(&url, &config).await?;
1680
1681 debug!(
1682 correlation_id = %exchange.correlation_id(),
1683 method = %method_str,
1684 url = %url,
1685 "HTTP request"
1686 );
1687
1688 let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1689 CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1690 })?;
1691
1692 let mut request = client.request(method, &url);
1693
1694 if let Some(timeout) = config.response_timeout {
1695 request = request.timeout(timeout);
1696 }
1697
1698 if let Some(user_agent) = &config.user_agent
1699 && !config.bridge_endpoint
1700 {
1701 request = request.header("User-Agent", user_agent.clone());
1702 }
1703
1704 #[cfg(feature = "otel")]
1706 let should_inject_otel = !config.bridge_endpoint;
1707 #[cfg(feature = "otel")]
1708 if should_inject_otel {
1709 let mut otel_headers = HashMap::new();
1710 camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1711 for (k, v) in otel_headers {
1712 if let (Ok(name), Ok(val)) = (
1713 reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1714 reqwest::header::HeaderValue::from_str(&v),
1715 ) {
1716 request = request.header(name, val);
1717 }
1718 }
1719 }
1720
1721 for (key, value) in &exchange.input.headers {
1722 if !key.starts_with("Camel")
1723 && !config
1724 .skip_request_headers
1725 .iter()
1726 .any(|h| h.eq_ignore_ascii_case(key))
1727 && let Some(val_str) = value.as_str()
1728 && let (Ok(name), Ok(val)) = (
1729 reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1730 reqwest::header::HeaderValue::from_str(val_str),
1731 )
1732 {
1733 request = request.header(name, val);
1734 }
1735 }
1736
1737 if !config.bridge_endpoint {
1738 match &config.auth {
1739 HttpAuth::None => {}
1740 HttpAuth::Basic { username, password } => {
1741 request = request.basic_auth(username, Some(password));
1742 }
1743 HttpAuth::Bearer { token } => {
1744 request = request.bearer_auth(token);
1745 }
1746 }
1747
1748 if config.connection_close {
1749 request = request.header("Connection", "close");
1750 }
1751 }
1752
1753 match exchange.input.body {
1754 Body::Stream(ref s) => {
1755 let mut stream_lock = s.stream.lock().await;
1756 if let Some(stream) = stream_lock.take() {
1757 request = request.body(reqwest::Body::wrap_stream(stream));
1758 } else {
1759 return Err(CamelError::AlreadyConsumed);
1760 }
1761 }
1762 _ => {
1763 let body = std::mem::take(&mut exchange.input.body);
1765 let bytes = body.into_bytes(config.max_body_size).await?;
1766 if !bytes.is_empty() {
1767 request = request.body(bytes);
1768 }
1769 }
1770 }
1771
1772 let response = request
1773 .send()
1774 .await
1775 .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1776
1777 let status_code = response.status().as_u16();
1778 let status_text = response
1779 .status()
1780 .canonical_reason()
1781 .unwrap_or("Unknown")
1782 .to_string();
1783
1784 for (key, value) in response.headers() {
1785 if config
1786 .skip_response_headers
1787 .iter()
1788 .any(|h| h.eq_ignore_ascii_case(key.as_str()))
1789 {
1790 continue;
1791 }
1792 if let Ok(val_str) = value.to_str() {
1793 exchange.input.set_header(
1794 title_case_header(key.as_str()),
1795 serde_json::Value::String(val_str.to_string()),
1796 );
1797 }
1798 }
1799
1800 exchange.input.set_header(
1801 "CamelHttpResponseCode",
1802 serde_json::Value::Number(status_code.into()),
1803 );
1804 exchange.input.set_header(
1805 "CamelHttpResponseText",
1806 serde_json::Value::String(status_text.clone()),
1807 );
1808
1809 let read_timeout = Duration::from_millis(config.read_timeout_ms);
1811 let response_body = tokio::time::timeout(read_timeout, async {
1812 if let Some(content_len) = response.content_length()
1814 && content_len > config.max_response_bytes as u64
1815 {
1816 return Err(CamelError::ProcessorError(format!(
1817 "Response body too large: {} bytes exceeds limit of {} bytes",
1818 content_len, config.max_response_bytes
1819 )));
1820 }
1821 use futures::TryStreamExt;
1823 let mut stream = response.bytes_stream();
1824 let mut total: usize = 0;
1825 let mut collected = Vec::new();
1826 while let Some(chunk) = stream.try_next().await.map_err(|e| {
1827 CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1828 })? {
1829 total += chunk.len();
1830 if total > config.max_response_bytes {
1831 return Err(CamelError::ProcessorError(format!(
1832 "Response body too large: {} bytes exceeds limit of {} bytes",
1833 total, config.max_response_bytes
1834 )));
1835 }
1836 collected.push(chunk);
1837 }
1838 let mut result = bytes::BytesMut::with_capacity(total);
1839 for chunk in collected {
1840 result.extend_from_slice(&chunk);
1841 }
1842 Ok::<bytes::Bytes, CamelError>(result.freeze())
1843 })
1844 .await
1845 .map_err(|_| {
1846 CamelError::ProcessorError(format!(
1847 "Read timeout after {}ms",
1848 config.read_timeout_ms
1849 ))
1850 })??;
1851
1852 if config.throw_exception_on_failure
1853 && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1854 {
1855 return Err(CamelError::HttpOperationFailed {
1856 method: method_str,
1857 url,
1858 status_code,
1859 status_text,
1860 response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1861 });
1862 }
1863
1864 if !response_body.is_empty() {
1865 exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1866 }
1867
1868 debug!(
1869 correlation_id = %exchange.correlation_id(),
1870 status = status_code,
1871 url = %url,
1872 "HTTP response"
1873 );
1874 Ok(exchange)
1875 })
1876 }
1877}
1878
1879#[cfg(test)]
1889pub(crate) static REGISTRY_TEST_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
1890
1891#[cfg(test)]
1892mod tests {
1893 use camel_component_api::test_support::{NoopRuntimeObservability, PanicRuntimeObservability};
1894 fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
1895 std::sync::Arc::new(PanicRuntimeObservability)
1896 }
1897 fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
1898 std::sync::Arc::new(PanicRuntimeObservability)
1899 }
1900 fn noop_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
1901 std::sync::Arc::new(NoopRuntimeObservability)
1902 }
1903
1904 use super::*;
1905 use camel_component_api::{Message, NoOpComponentContext};
1906 use std::sync::Arc;
1907 use std::time::Duration;
1908
1909 fn test_producer_ctx() -> ProducerContext {
1910 ProducerContext::new()
1911 }
1912
1913 #[test]
1914 fn test_http_config_defaults() {
1915 let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1916 assert_eq!(config.base_url, "http://localhost:8080/api");
1917 assert!(config.http_method.is_none());
1918 assert!(config.throw_exception_on_failure);
1919 assert_eq!(config.ok_status_code_range, (200, 299));
1920 assert!(config.response_timeout.is_none());
1921 assert!(matches!(config.auth, HttpAuth::None));
1922 assert!(matches!(config.cookie_handling, CookieHandling::Disabled));
1923 assert!(!config.bridge_endpoint);
1924 assert!(!config.connection_close);
1925 }
1926
1927 #[test]
1928 fn test_http_config_scheme() {
1929 assert_eq!(HttpEndpointConfig::scheme(), "http");
1931 }
1932
1933 #[test]
1934 fn test_http_config_from_components() {
1935 let components = camel_component_api::UriComponents {
1937 scheme: "https".to_string(),
1938 path: "//api.example.com/v1".to_string(),
1939 params: std::collections::HashMap::from([(
1940 "httpMethod".to_string(),
1941 "POST".to_string(),
1942 )]),
1943 };
1944 let config = HttpEndpointConfig::from_components(components).unwrap();
1945 assert_eq!(config.base_url, "https://api.example.com/v1");
1946 assert_eq!(config.http_method, Some("POST".to_string()));
1947 }
1948
1949 #[test]
1950 fn test_http_config_with_options() {
1951 let config = HttpEndpointConfig::from_uri(
1952 "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1953 ).unwrap();
1954 assert_eq!(config.base_url, "https://api.example.com/v1");
1955 assert_eq!(config.http_method, Some("PUT".to_string()));
1956 assert!(!config.throw_exception_on_failure);
1957 assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1958 }
1959
1960 #[test]
1961 fn test_http_endpoint_config_auth_and_headers_options() {
1962 let config = HttpEndpointConfig::from_uri(
1963 "http://localhost/api?authMethod=Basic&authUsername=u&authPassword=p&userAgent=camel-test&bridgeEndpoint=true&connectionClose=true&skipRequestHeaders=Authorization,X-Secret&skipResponseHeaders=Set-Cookie&cookieHandling=InMemory",
1964 )
1965 .unwrap();
1966
1967 assert!(matches!(
1968 config.auth,
1969 HttpAuth::Basic { username, password } if username == "u" && password == "p"
1970 ));
1971 assert_eq!(config.user_agent.as_deref(), Some("camel-test"));
1972 assert!(matches!(config.cookie_handling, CookieHandling::InMemory));
1973 assert!(config.bridge_endpoint);
1974 assert!(config.connection_close);
1975 assert_eq!(
1976 config.skip_request_headers,
1977 vec!["authorization".to_string(), "x-secret".to_string()]
1978 );
1979 assert_eq!(config.skip_response_headers, vec!["set-cookie".to_string()]);
1980 }
1981
1982 #[test]
1983 fn test_http_endpoint_config_bearer_auth() {
1984 let config = HttpEndpointConfig::from_uri(
1985 "http://localhost/api?authMethod=Bearer&authBearerToken=t",
1986 )
1987 .unwrap();
1988 assert!(matches!(
1989 config.auth,
1990 HttpAuth::Bearer { token } if token == "t"
1991 ));
1992 }
1993
1994 #[test]
1995 fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
1996 let config = HttpConfig::default()
1997 .with_response_timeout_ms(999)
1998 .with_allow_private_ips(true)
1999 .with_blocked_hosts(vec!["evil.com".to_string()])
2000 .with_max_body_size(12345);
2001 let endpoint =
2002 HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
2003 assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
2004 assert!(endpoint.allow_private_ips);
2005 assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
2006 assert_eq!(endpoint.max_body_size, 12345);
2007 }
2008
2009 #[test]
2010 fn test_from_uri_with_defaults_uri_overrides_config() {
2011 let config = HttpConfig::default()
2012 .with_response_timeout_ms(999)
2013 .with_allow_private_ips(true)
2014 .with_blocked_hosts(vec!["evil.com".to_string()])
2015 .with_max_body_size(12345);
2016 let endpoint = HttpEndpointConfig::from_uri_with_defaults(
2017 "http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
2018 &config,
2019 )
2020 .unwrap();
2021 assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
2022 assert!(!endpoint.allow_private_ips);
2023 assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
2024 assert_eq!(endpoint.max_body_size, 99);
2025 }
2026
2027 #[test]
2028 fn test_http_config_ok_status_range() {
2029 let config =
2030 HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
2031 assert_eq!(config.ok_status_code_range, (200, 204));
2032 }
2033
2034 #[test]
2035 fn test_http_config_wrong_scheme() {
2036 let result = HttpEndpointConfig::from_uri("file:/tmp");
2037 assert!(result.is_err());
2038 }
2039
2040 #[test]
2041 fn test_http_component_scheme() {
2042 let component = HttpComponent::new();
2043 assert_eq!(component.scheme(), "http");
2044 }
2045
2046 #[test]
2047 fn test_https_component_scheme() {
2048 let component = HttpsComponent::new();
2049 assert_eq!(component.scheme(), "https");
2050 }
2051
2052 #[test]
2053 fn test_http_endpoint_creates_consumer() {
2054 let component = HttpComponent::new();
2055 let ctx = NoOpComponentContext;
2056 let endpoint = component
2057 .create_endpoint("http://0.0.0.0:19100/test", &ctx)
2058 .unwrap();
2059 assert!(endpoint.create_consumer(rt()).is_ok());
2060 }
2061
2062 #[test]
2063 fn test_https_endpoint_creates_consumer() {
2064 let component = HttpsComponent::new();
2065 let ctx = NoOpComponentContext;
2066 let endpoint = component
2067 .create_endpoint("https://0.0.0.0:8443/test", &ctx)
2068 .unwrap();
2069 assert!(endpoint.create_consumer(rt()).is_ok());
2070 }
2071
2072 #[test]
2073 fn test_http_endpoint_creates_producer() {
2074 let ctx = test_producer_ctx();
2075 let component = HttpComponent::new();
2076 let endpoint_ctx = NoOpComponentContext;
2077 let endpoint = component
2078 .create_endpoint("http://localhost/api", &endpoint_ctx)
2079 .unwrap();
2080 assert!(endpoint.create_producer(rt(), &ctx).is_ok());
2081 }
2082
2083 #[tokio::test]
2088 async fn test_producer_with_token_provider() {
2089 use camel_auth::oauth2::TokenProvider;
2090 use tower::ServiceExt;
2091
2092 let captured_auth: Arc<std::sync::Mutex<Option<String>>> =
2093 Arc::new(std::sync::Mutex::new(None));
2094 let captured_clone = Arc::clone(&captured_auth);
2095
2096 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2097 let port = listener.local_addr().unwrap().port();
2098
2099 let _handle = tokio::spawn(async move {
2100 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2101 if let Ok((mut stream, _)) = listener.accept().await {
2102 let mut buf = vec![0u8; 8192];
2103 let n = stream.read(&mut buf).await.unwrap_or(0);
2104 let request = String::from_utf8_lossy(&buf[..n]).to_string();
2105 let auth = request
2106 .lines()
2107 .find(|l| l.to_lowercase().starts_with("authorization:"))
2108 .map(|l| {
2109 l.split(':')
2110 .nth(1)
2111 .map(|s| s.trim().to_string())
2112 .unwrap_or_default()
2113 });
2114 *captured_clone.lock().unwrap() = auth;
2115 let body = r#"{"echo":"ok"}"#;
2116 let resp = format!(
2117 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
2118 body.len(),
2119 body
2120 );
2121 let _ = stream.write_all(resp.as_bytes()).await;
2122 }
2123 });
2124
2125 #[derive(Debug)]
2126 struct StaticProvider;
2127 #[async_trait::async_trait]
2128 impl TokenProvider for StaticProvider {
2129 async fn get_token(&self) -> Result<String, camel_auth::types::AuthError> {
2130 Ok("injected-token".into())
2131 }
2132 }
2133
2134 let uri = format!("http://127.0.0.1:{}/api?allowPrivateIps=true", port);
2135 let ctx = test_producer_ctx();
2136 let component = HttpComponent::new();
2137 let endpoint_ctx = NoOpComponentContext;
2138 let endpoint = component.create_endpoint(&uri, &endpoint_ctx).unwrap();
2139 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2140
2141 let exchange = Exchange::new(Message::new("hello"));
2142
2143 let layer = BearerTokenLayer::new(Arc::new(StaticProvider));
2144 let mut layered = layer.layer(producer);
2145 let result = layered.ready().await.unwrap().call(exchange).await;
2146 assert!(result.is_ok(), "producer call failed: {:?}", result);
2147
2148 tokio::time::sleep(Duration::from_millis(100)).await;
2149 let auth = captured_auth.lock().unwrap().take();
2150 assert_eq!(auth.as_deref(), Some("Bearer injected-token"));
2151 }
2152
2153 async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
2154 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2155 let addr = listener.local_addr().unwrap();
2156 let url = format!("http://127.0.0.1:{}", addr.port());
2157
2158 let handle = tokio::spawn(async move {
2159 loop {
2160 if let Ok((mut stream, _)) = listener.accept().await {
2161 tokio::spawn(async move {
2162 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2163 let mut buf = vec![0u8; 4096];
2164 let n = stream.read(&mut buf).await.unwrap_or(0);
2165 let request = String::from_utf8_lossy(&buf[..n]).to_string();
2166
2167 let method = request.split_whitespace().next().unwrap_or("GET");
2168
2169 let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
2170 let response = format!(
2171 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
2172 body.len(),
2173 body
2174 );
2175 let _ = stream.write_all(response.as_bytes()).await;
2176 });
2177 }
2178 }
2179 });
2180
2181 (url, handle)
2182 }
2183
2184 async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
2185 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2186 let addr = listener.local_addr().unwrap();
2187 let url = format!("http://127.0.0.1:{}", addr.port());
2188
2189 let handle = tokio::spawn(async move {
2190 loop {
2191 if let Ok((mut stream, _)) = listener.accept().await {
2192 let status = status;
2193 tokio::spawn(async move {
2194 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2195 let mut buf = vec![0u8; 4096];
2196 let _ = stream.read(&mut buf).await;
2197
2198 let status_text = match status {
2199 404 => "Not Found",
2200 500 => "Internal Server Error",
2201 _ => "Error",
2202 };
2203 let body = "error body";
2204 let response = format!(
2205 "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
2206 status,
2207 status_text,
2208 body.len(),
2209 body
2210 );
2211 let _ = stream.write_all(response.as_bytes()).await;
2212 });
2213 }
2214 }
2215 });
2216
2217 (url, handle)
2218 }
2219
2220 #[tokio::test]
2221 async fn test_http_producer_get_request() {
2222 use tower::ServiceExt;
2223
2224 let (url, _handle) = start_test_server().await;
2225 let ctx = test_producer_ctx();
2226
2227 let component = HttpComponent::new();
2228 let endpoint_ctx = NoOpComponentContext;
2229 let endpoint = component
2230 .create_endpoint(
2231 &format!("{url}/api/test?allowPrivateIps=true"),
2232 &endpoint_ctx,
2233 )
2234 .unwrap();
2235 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2236
2237 let exchange = Exchange::new(Message::default());
2238 let result = producer.oneshot(exchange).await.unwrap();
2239
2240 let status = result
2241 .input
2242 .header("CamelHttpResponseCode")
2243 .and_then(|v| v.as_u64())
2244 .unwrap();
2245 assert_eq!(status, 200);
2246
2247 assert!(!result.input.body.is_empty());
2248 }
2249
2250 #[tokio::test]
2251 async fn test_http_producer_post_with_body() {
2252 use tower::ServiceExt;
2253
2254 let (url, _handle) = start_test_server().await;
2255 let ctx = test_producer_ctx();
2256
2257 let component = HttpComponent::new();
2258 let endpoint_ctx = NoOpComponentContext;
2259 let endpoint = component
2260 .create_endpoint(
2261 &format!("{url}/api/data?allowPrivateIps=true"),
2262 &endpoint_ctx,
2263 )
2264 .unwrap();
2265 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2266
2267 let exchange = Exchange::new(Message::new("request body"));
2268 let result = producer.oneshot(exchange).await.unwrap();
2269
2270 let status = result
2271 .input
2272 .header("CamelHttpResponseCode")
2273 .and_then(|v| v.as_u64())
2274 .unwrap();
2275 assert_eq!(status, 200);
2276 }
2277
2278 #[tokio::test]
2279 async fn test_http_producer_method_from_header() {
2280 use tower::ServiceExt;
2281
2282 let (url, _handle) = start_test_server().await;
2283 let ctx = test_producer_ctx();
2284
2285 let component = HttpComponent::new();
2286 let endpoint_ctx = NoOpComponentContext;
2287 let endpoint = component
2288 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2289 .unwrap();
2290 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2291
2292 let mut exchange = Exchange::new(Message::default());
2293 exchange.input.set_header(
2294 "CamelHttpMethod",
2295 serde_json::Value::String("DELETE".to_string()),
2296 );
2297
2298 let result = producer.oneshot(exchange).await.unwrap();
2299 let status = result
2300 .input
2301 .header("CamelHttpResponseCode")
2302 .and_then(|v| v.as_u64())
2303 .unwrap();
2304 assert_eq!(status, 200);
2305 }
2306
2307 #[tokio::test]
2308 async fn test_http_producer_forced_method() {
2309 use tower::ServiceExt;
2310
2311 let (url, _handle) = start_test_server().await;
2312 let ctx = test_producer_ctx();
2313
2314 let component = HttpComponent::new();
2315 let endpoint_ctx = NoOpComponentContext;
2316 let endpoint = component
2317 .create_endpoint(
2318 &format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"),
2319 &endpoint_ctx,
2320 )
2321 .unwrap();
2322 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2323
2324 let exchange = Exchange::new(Message::default());
2325 let result = producer.oneshot(exchange).await.unwrap();
2326
2327 let status = result
2328 .input
2329 .header("CamelHttpResponseCode")
2330 .and_then(|v| v.as_u64())
2331 .unwrap();
2332 assert_eq!(status, 200);
2333 }
2334
2335 #[tokio::test]
2336 async fn test_http_producer_throw_exception_on_failure() {
2337 use tower::ServiceExt;
2338
2339 let (url, _handle) = start_status_server(404).await;
2340 let ctx = test_producer_ctx();
2341
2342 let component = HttpComponent::new();
2343 let endpoint_ctx = NoOpComponentContext;
2344 let endpoint = component
2345 .create_endpoint(
2346 &format!("{url}/not-found?allowPrivateIps=true"),
2347 &endpoint_ctx,
2348 )
2349 .unwrap();
2350 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2351
2352 let exchange = Exchange::new(Message::default());
2353 let result = producer.oneshot(exchange).await;
2354 assert!(result.is_err());
2355
2356 match result.unwrap_err() {
2357 CamelError::HttpOperationFailed { status_code, .. } => {
2358 assert_eq!(status_code, 404);
2359 }
2360 e => panic!("Expected HttpOperationFailed, got: {e}"),
2361 }
2362 }
2363
2364 #[tokio::test]
2365 async fn test_http_producer_no_throw_on_failure() {
2366 use tower::ServiceExt;
2367
2368 let (url, _handle) = start_status_server(500).await;
2369 let ctx = test_producer_ctx();
2370
2371 let component = HttpComponent::new();
2372 let endpoint_ctx = NoOpComponentContext;
2373 let endpoint = component
2374 .create_endpoint(
2375 &format!("{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"),
2376 &endpoint_ctx,
2377 )
2378 .unwrap();
2379 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2380
2381 let exchange = Exchange::new(Message::default());
2382 let result = producer.oneshot(exchange).await.unwrap();
2383
2384 let status = result
2385 .input
2386 .header("CamelHttpResponseCode")
2387 .and_then(|v| v.as_u64())
2388 .unwrap();
2389 assert_eq!(status, 500);
2390 }
2391
2392 #[tokio::test]
2393 async fn test_http_producer_uri_override() {
2394 use tower::ServiceExt;
2395
2396 let (url, _handle) = start_test_server().await;
2397 let ctx = test_producer_ctx();
2398
2399 let component = HttpComponent::new();
2400 let endpoint_ctx = NoOpComponentContext;
2401 let endpoint = component
2402 .create_endpoint(
2403 "http://localhost:1/does-not-exist?allowPrivateIps=true",
2404 &endpoint_ctx,
2405 )
2406 .unwrap();
2407 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2408
2409 let mut exchange = Exchange::new(Message::default());
2410 exchange.input.set_header(
2411 "CamelHttpUri",
2412 serde_json::Value::String(format!("{url}/api")),
2413 );
2414
2415 let result = producer.oneshot(exchange).await.unwrap();
2416 let status = result
2417 .input
2418 .header("CamelHttpResponseCode")
2419 .and_then(|v| v.as_u64())
2420 .unwrap();
2421 assert_eq!(status, 200);
2422 }
2423
2424 #[tokio::test]
2425 async fn test_http_producer_response_headers_mapped() {
2426 use tower::ServiceExt;
2427
2428 let (url, _handle) = start_test_server().await;
2429 let ctx = test_producer_ctx();
2430
2431 let component = HttpComponent::new();
2432 let endpoint_ctx = NoOpComponentContext;
2433 let endpoint = component
2434 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2435 .unwrap();
2436 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2437
2438 let exchange = Exchange::new(Message::default());
2439 let result = producer.oneshot(exchange).await.unwrap();
2440
2441 assert!(
2442 result.input.header("Content-Type").is_some(),
2443 "Response should have Content-Type header"
2444 );
2445 assert!(result.input.header("CamelHttpResponseText").is_some());
2446 }
2447
2448 async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
2453 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2454 let addr = listener.local_addr().unwrap();
2455 let url = format!("http://127.0.0.1:{}", addr.port());
2456
2457 let handle = tokio::spawn(async move {
2458 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2459 loop {
2460 if let Ok((mut stream, _)) = listener.accept().await {
2461 tokio::spawn(async move {
2462 let mut buf = vec![0u8; 4096];
2463 let n = stream.read(&mut buf).await.unwrap_or(0);
2464 let request = String::from_utf8_lossy(&buf[..n]).to_string();
2465
2466 if request.contains("GET /final") {
2468 let body = r#"{"status":"final"}"#;
2469 let response = format!(
2470 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
2471 body.len(),
2472 body
2473 );
2474 let _ = stream.write_all(response.as_bytes()).await;
2475 } else {
2476 let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
2478 let _ = stream.write_all(response.as_bytes()).await;
2479 }
2480 });
2481 }
2482 }
2483 });
2484
2485 (url, handle)
2486 }
2487
2488 #[tokio::test]
2489 async fn test_follow_redirects_false_does_not_follow() {
2490 use tower::ServiceExt;
2491
2492 let (url, _handle) = start_redirect_server().await;
2493 let ctx = test_producer_ctx();
2494
2495 let component =
2496 HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
2497 let endpoint_ctx = NoOpComponentContext;
2498 let endpoint = component
2499 .create_endpoint(
2500 &format!("{url}?throwExceptionOnFailure=false&allowPrivateIps=true"),
2501 &endpoint_ctx,
2502 )
2503 .unwrap();
2504 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2505
2506 let exchange = Exchange::new(Message::default());
2507 let result = producer.oneshot(exchange).await.unwrap();
2508
2509 let status = result
2511 .input
2512 .header("CamelHttpResponseCode")
2513 .and_then(|v| v.as_u64())
2514 .unwrap();
2515 assert_eq!(
2516 status, 302,
2517 "Should NOT follow redirect when followRedirects=false"
2518 );
2519 }
2520
2521 #[tokio::test]
2522 async fn test_follow_redirects_true_follows_redirect() {
2523 use tower::ServiceExt;
2524
2525 let (url, _handle) = start_redirect_server().await;
2526 let ctx = test_producer_ctx();
2527
2528 let component =
2529 HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
2530 let endpoint_ctx = NoOpComponentContext;
2531 let endpoint = component
2532 .create_endpoint(&format!("{url}?allowPrivateIps=true"), &endpoint_ctx)
2533 .unwrap();
2534 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2535
2536 let exchange = Exchange::new(Message::default());
2537 let result = producer.oneshot(exchange).await.unwrap();
2538
2539 let status = result
2541 .input
2542 .header("CamelHttpResponseCode")
2543 .and_then(|v| v.as_u64())
2544 .unwrap();
2545 assert_eq!(
2546 status, 200,
2547 "Should follow redirect when followRedirects=true"
2548 );
2549 }
2550
2551 #[tokio::test]
2552 async fn test_query_params_forwarded_to_http_request() {
2553 use tower::ServiceExt;
2554
2555 let (url, _handle) = start_test_server().await;
2556 let ctx = test_producer_ctx();
2557
2558 let component = HttpComponent::new();
2559 let endpoint_ctx = NoOpComponentContext;
2560 let endpoint = component
2562 .create_endpoint(
2563 &format!("{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"),
2564 &endpoint_ctx,
2565 )
2566 .unwrap();
2567 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2568
2569 let exchange = Exchange::new(Message::default());
2570 let result = producer.oneshot(exchange).await.unwrap();
2571
2572 let status = result
2575 .input
2576 .header("CamelHttpResponseCode")
2577 .and_then(|v| v.as_u64())
2578 .unwrap();
2579 assert_eq!(status, 200);
2580 }
2581
2582 #[tokio::test]
2583 async fn test_non_camel_query_params_are_forwarded() {
2584 let config = HttpEndpointConfig::from_uri(
2587 "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
2588 )
2589 .unwrap();
2590
2591 assert!(
2593 config.query_params.contains_key("apiKey"),
2594 "apiKey should be preserved"
2595 );
2596 assert!(
2597 config.query_params.contains_key("token"),
2598 "token should be preserved"
2599 );
2600 assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
2601 assert_eq!(config.query_params.get("token").unwrap(), "abc456");
2602
2603 assert!(
2605 !config.query_params.contains_key("httpMethod"),
2606 "httpMethod should not be forwarded"
2607 );
2608 }
2609
2610 #[test]
2611 fn test_query_params_are_url_encoded_when_resolving_url() {
2612 let config =
2613 HttpEndpointConfig::from_uri("http://example.com/api?q=hello world&tag=a+b").unwrap();
2614 let exchange = Exchange::new(Message::default());
2615
2616 let url = HttpProducer::resolve_url(&exchange, &config);
2617
2618 assert!(url.contains("q=hello+world"), "url was: {url}");
2619 assert!(url.contains("tag=a%2Bb"), "url was: {url}");
2620 }
2621
2622 async fn start_slow_server(delay_ms: u64) -> (String, tokio::task::JoinHandle<()>) {
2627 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2628 let addr = listener.local_addr().unwrap();
2629 let url = format!("http://127.0.0.1:{}", addr.port());
2630
2631 let handle = tokio::spawn(async move {
2632 loop {
2633 if let Ok((mut stream, _)) = listener.accept().await {
2634 let delay = delay_ms;
2635 tokio::spawn(async move {
2636 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2637 let mut buf = vec![0u8; 4096];
2638 let _ = stream.read(&mut buf).await;
2639 let headers = "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nTransfer-Encoding: chunked\r\n\r\n";
2641 let _ = stream.write_all(headers.as_bytes()).await;
2642 tokio::time::sleep(Duration::from_millis(delay)).await;
2644 let body = r#"{"status":"slow"}"#;
2645 let chunk = format!("{:x}\r\n{}\r\n0\r\n\r\n", body.len(), body);
2646 let _ = stream.write_all(chunk.as_bytes()).await;
2647 });
2648 }
2649 }
2650 });
2651
2652 (url, handle)
2653 }
2654
2655 #[tokio::test]
2656 async fn test_http_producer_timeout() {
2657 use tower::ServiceExt;
2658
2659 let (url, _handle) = start_slow_server(500).await;
2661 let ctx = test_producer_ctx();
2662
2663 let component = HttpComponent::with_config(
2664 HttpConfig::default()
2665 .with_read_timeout_ms(100)
2666 .with_response_timeout_ms(30_000), );
2668 let endpoint_ctx = NoOpComponentContext;
2669 let endpoint = component
2670 .create_endpoint(&format!("{url}/slow?allowPrivateIps=true"), &endpoint_ctx)
2671 .unwrap();
2672 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2673
2674 let exchange = Exchange::new(Message::default());
2675 let result = producer.oneshot(exchange).await;
2676
2677 assert!(result.is_err(), "Expected timeout error, got: {:?}", result);
2678 let err = result.unwrap_err().to_string();
2679 assert!(
2680 err.contains("Read timeout") || err.contains("timeout"),
2681 "Error should mention timeout, got: {}",
2682 err
2683 );
2684 }
2685
2686 #[tokio::test]
2687 async fn test_http_producer_no_timeout_when_fast() {
2688 use tower::ServiceExt;
2689
2690 let (url, _handle) = start_test_server().await;
2691 let ctx = test_producer_ctx();
2692
2693 let component =
2694 HttpComponent::with_config(HttpConfig::default().with_read_timeout_ms(5_000));
2695 let endpoint_ctx = NoOpComponentContext;
2696 let endpoint = component
2697 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2698 .unwrap();
2699 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2700
2701 let exchange = Exchange::new(Message::default());
2702 let result = producer.oneshot(exchange).await.unwrap();
2703
2704 let status = result
2705 .input
2706 .header("CamelHttpResponseCode")
2707 .and_then(|v| v.as_u64())
2708 .unwrap();
2709 assert_eq!(status, 200);
2710 }
2711
2712 #[tokio::test]
2717 async fn test_http_producer_blocks_metadata_endpoint() {
2718 use tower::ServiceExt;
2719
2720 let ctx = test_producer_ctx();
2721 let component = HttpComponent::new();
2722 let endpoint_ctx = NoOpComponentContext;
2723 let endpoint = component
2724 .create_endpoint(
2725 "http://example.com/api?allowPrivateIps=false",
2726 &endpoint_ctx,
2727 )
2728 .unwrap();
2729 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2730
2731 let mut exchange = Exchange::new(Message::default());
2732 exchange.input.set_header(
2733 "CamelHttpUri",
2734 serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
2735 );
2736
2737 let result = producer.oneshot(exchange).await;
2738 assert!(result.is_err(), "Should block AWS metadata endpoint");
2739
2740 let err = result.unwrap_err();
2741 assert!(
2742 err.to_string().contains("Private IP"),
2743 "Error should mention private IP blocking, got: {}",
2744 err
2745 );
2746 }
2747
2748 #[test]
2749 fn test_ssrf_config_defaults() {
2750 let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
2751 assert!(
2752 !config.allow_private_ips,
2753 "Private IPs should be blocked by default"
2754 );
2755 assert!(
2756 config.blocked_hosts.is_empty(),
2757 "Blocked hosts should be empty by default"
2758 );
2759 }
2760
2761 #[test]
2762 fn test_ssrf_config_allow_private_ips() {
2763 let config =
2764 HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
2765 assert!(
2766 config.allow_private_ips,
2767 "Private IPs should be allowed when explicitly set"
2768 );
2769 }
2770
2771 #[test]
2772 fn test_ssrf_config_blocked_hosts() {
2773 let config = HttpEndpointConfig::from_uri(
2774 "http://example.com/api?blockedHosts=evil.com,malware.net",
2775 )
2776 .unwrap();
2777 assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
2778 }
2779
2780 #[tokio::test]
2781 async fn test_http_producer_blocks_localhost() {
2782 use tower::ServiceExt;
2783
2784 let ctx = test_producer_ctx();
2785 let component = HttpComponent::new();
2786 let endpoint_ctx = NoOpComponentContext;
2787 let endpoint = component
2788 .create_endpoint("http://example.com/api", &endpoint_ctx)
2789 .unwrap();
2790 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2791
2792 let mut exchange = Exchange::new(Message::default());
2793 exchange.input.set_header(
2794 "CamelHttpUri",
2795 serde_json::Value::String("http://localhost:8080/internal".to_string()),
2796 );
2797
2798 let result = producer.oneshot(exchange).await;
2799 assert!(result.is_err(), "Should block localhost");
2800 }
2801
2802 #[tokio::test]
2803 async fn test_http_producer_blocks_loopback_ip() {
2804 use tower::ServiceExt;
2805
2806 let ctx = test_producer_ctx();
2807 let component = HttpComponent::new();
2808 let endpoint_ctx = NoOpComponentContext;
2809 let endpoint = component
2810 .create_endpoint("http://example.com/api", &endpoint_ctx)
2811 .unwrap();
2812 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2813
2814 let mut exchange = Exchange::new(Message::default());
2815 exchange.input.set_header(
2816 "CamelHttpUri",
2817 serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
2818 );
2819
2820 let result = producer.oneshot(exchange).await;
2821 assert!(result.is_err(), "Should block loopback IP");
2822 }
2823
2824 #[tokio::test]
2825 async fn test_http_producer_allows_private_ip_when_enabled() {
2826 use tower::ServiceExt;
2827
2828 let ctx = test_producer_ctx();
2829 let component = HttpComponent::new();
2830 let endpoint_ctx = NoOpComponentContext;
2831 let endpoint = component
2834 .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true", &endpoint_ctx)
2835 .unwrap();
2836 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2837
2838 let exchange = Exchange::new(Message::default());
2839
2840 let result = producer.oneshot(exchange).await;
2843 if let Err(ref e) = result {
2845 let err_str = e.to_string();
2846 assert!(
2847 !err_str.contains("Private IP") && !err_str.contains("not allowed"),
2848 "Should not be SSRF error, got: {}",
2849 err_str
2850 );
2851 }
2852 }
2853
2854 #[test]
2859 fn test_http_server_config_parse() {
2860 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
2861 assert_eq!(cfg.host, "0.0.0.0");
2862 assert_eq!(cfg.port, 8080);
2863 assert_eq!(cfg.path, "/orders");
2864 assert_eq!(cfg.max_inflight_requests, 1024);
2865 }
2866
2867 #[test]
2868 fn test_http_server_config_scheme() {
2869 assert_eq!(HttpServerConfig::scheme(), "http");
2871 }
2872
2873 #[test]
2874 fn test_http_server_config_from_components() {
2875 let components = camel_component_api::UriComponents {
2877 scheme: "https".to_string(),
2878 path: "//0.0.0.0:8443/api".to_string(),
2879 params: std::collections::HashMap::from([
2880 ("maxRequestBody".to_string(), "5242880".to_string()),
2881 ("maxInflightRequests".to_string(), "7".to_string()),
2882 ]),
2883 };
2884 let cfg = HttpServerConfig::from_components(components).unwrap();
2885 assert_eq!(cfg.host, "0.0.0.0");
2886 assert_eq!(cfg.port, 8443);
2887 assert_eq!(cfg.path, "/api");
2888 assert_eq!(cfg.max_request_body, 5242880);
2889 assert_eq!(cfg.max_inflight_requests, 7);
2890 }
2891
2892 #[test]
2893 fn test_http_server_config_default_path() {
2894 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
2895 assert_eq!(cfg.path, "/");
2896 }
2897
2898 #[test]
2899 fn test_http_server_config_wrong_scheme() {
2900 assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
2901 }
2902
2903 #[test]
2904 fn test_http_server_config_invalid_port() {
2905 assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
2906 }
2907
2908 #[test]
2909 fn test_http_server_config_default_port_by_scheme() {
2910 let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
2912 assert_eq!(cfg_http.port, 80);
2913
2914 let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
2916 assert_eq!(cfg_https.port, 443);
2917 }
2918
2919 #[test]
2920 fn test_request_envelope_and_reply_are_send() {
2921 fn assert_send<T: Send>() {}
2922 assert_send::<RequestEnvelope>();
2923 assert_send::<HttpReply>();
2924 }
2925
2926 #[test]
2931 fn test_server_registry_global_is_singleton() {
2932 let r1 = ServerRegistry::global();
2933 let r2 = ServerRegistry::global();
2934 assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
2935 }
2936
2937 #[allow(clippy::await_holding_lock)]
2938 #[tokio::test]
2939 async fn test_concurrent_get_or_spawn_returns_same_registry() {
2940 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2941 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2942 let port = listener.local_addr().unwrap().port();
2943 drop(listener);
2944
2945 let results: Arc<std::sync::Mutex<Vec<HttpRouteRegistry>>> =
2946 Arc::new(std::sync::Mutex::new(Vec::new()));
2947
2948 let mut handles = Vec::new();
2949 for _ in 0..4 {
2950 let results = results.clone();
2951 handles.push(tokio::spawn(async move {
2952 let registry = ServerRegistry::global()
2953 .get_or_spawn(
2954 "127.0.0.1",
2955 port,
2956 2 * 1024 * 1024,
2957 10 * 1024 * 1024,
2958 1024,
2959 test_rt(),
2960 "test-route".into(),
2961 )
2962 .await
2963 .unwrap();
2964 results.lock().unwrap().push(registry);
2965 }));
2966 }
2967
2968 for h in handles {
2969 h.await.unwrap();
2970 }
2971
2972 let registries = results.lock().unwrap();
2973 assert_eq!(registries.len(), 4);
2974 for i in 1..registries.len() {
2975 assert!(
2976 Arc::ptr_eq(®istries[0].inner, ®istries[i].inner),
2977 "all concurrent callers should get same route registry"
2978 );
2979 }
2980 }
2981
2982 #[test]
2983 fn test_server_registry_distinguishes_host_and_port() {
2984 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2985 let rt = tokio::runtime::Runtime::new().expect("runtime");
2986 rt.block_on(async {
2987 let registry = ServerRegistry::global();
2988 let d1 = registry
2992 .get_or_spawn(
2993 "127.0.0.1",
2994 0,
2995 1024 * 1024,
2996 10 * 1024 * 1024,
2997 1024,
2998 test_rt(),
2999 "test-route-1".into(),
3000 )
3001 .await;
3002 let d2 = registry
3003 .get_or_spawn(
3004 "0.0.0.0",
3005 0,
3006 1024 * 1024,
3007 10 * 1024 * 1024,
3008 1024,
3009 test_rt(),
3010 "test-route-2".into(),
3011 )
3012 .await;
3013 assert!(d1.is_ok());
3014 assert!(d2.is_ok());
3015 assert!(!Arc::ptr_eq(&d1.unwrap().inner, &d2.unwrap().inner));
3016 });
3017 }
3018
3019 #[allow(clippy::await_holding_lock)]
3020 #[tokio::test]
3021 async fn test_shared_server_max_request_body_policy_is_deterministic() {
3022 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3023 let registry = ServerRegistry::global();
3024 let d1 = registry
3026 .get_or_spawn(
3027 "127.0.0.1",
3028 9991,
3029 1024 * 1024,
3030 10 * 1024 * 1024,
3031 1024,
3032 test_rt(),
3033 "test-route".into(),
3034 )
3035 .await;
3036 assert!(d1.is_ok());
3037
3038 let d2 = registry
3041 .get_or_spawn(
3042 "127.0.0.1",
3043 9991,
3044 2 * 1024 * 1024,
3045 10 * 1024 * 1024,
3046 1024,
3047 test_rt(),
3048 "test-route-2".into(),
3049 )
3050 .await;
3051 assert!(d2.is_err());
3052 let err = d2.unwrap_err();
3053 assert!(
3054 err.to_string().contains("maxRequestBody") || err.to_string().contains("incompatible"),
3055 "Expected incompatible maxRequestBody error, got: {}",
3056 err
3057 );
3058 }
3059
3060 #[test]
3061 fn test_server_registry_reset_clears_entries() {
3062 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3063 let rt = tokio::runtime::Runtime::new().expect("runtime");
3064 rt.block_on(async {
3065 let d1 = ServerRegistry::global()
3067 .get_or_spawn(
3068 "127.0.0.1",
3069 9992,
3070 1024 * 1024,
3071 10 * 1024 * 1024,
3072 1024,
3073 test_rt(),
3074 "test-route".into(),
3075 )
3076 .await;
3077 assert!(d1.is_ok());
3078
3079 let guard = ServerRegistry::global().inner.lock().expect("lock");
3081 assert!(guard.contains_key(&("127.0.0.1".to_string(), 9992)));
3082 drop(guard);
3083
3084 ServerRegistry::reset();
3086
3087 let guard = ServerRegistry::global().inner.lock().expect("lock");
3089 assert!(
3090 guard.is_empty(),
3091 "registry should be empty after reset, has {} entries",
3092 guard.len()
3093 );
3094 });
3095 }
3096
3097 #[tokio::test]
3102 async fn test_dispatch_handler_returns_404_for_unknown_path() {
3103 let registry = HttpRouteRegistry::new();
3104 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3106 let port = listener.local_addr().unwrap().port();
3107 tokio::spawn(run_axum_server(
3108 listener,
3109 registry,
3110 2 * 1024 * 1024,
3111 10 * 1024 * 1024,
3112 Arc::new(tokio::sync::Semaphore::new(1024)),
3113 test_rt(),
3114 "test-route".into(),
3115 ));
3116
3117 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3119
3120 let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
3121 .await
3122 .unwrap();
3123 assert_eq!(resp.status().as_u16(), 404);
3124 }
3125
3126 #[tokio::test]
3131 async fn test_http_consumer_start_registers_path() {
3132 use camel_component_api::ConsumerContext;
3133
3134 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3136 let port = listener.local_addr().unwrap().port();
3137 drop(listener); let consumer_cfg = HttpServerConfig {
3140 host: "127.0.0.1".to_string(),
3141 port,
3142 path: "/ping".to_string(),
3143 max_request_body: 2 * 1024 * 1024,
3144 max_response_body: 10 * 1024 * 1024,
3145 max_inflight_requests: 1024,
3146 };
3147 let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3148
3149 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3150 let token = tokio_util::sync::CancellationToken::new();
3151 let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3152
3153 tokio::spawn(async move {
3154 consumer.start(ctx).await.unwrap();
3155 });
3156
3157 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3158
3159 let client = reqwest::Client::new();
3160 let resp_future = client
3161 .post(format!("http://127.0.0.1:{port}/ping"))
3162 .body("hello world")
3163 .send();
3164
3165 let (http_result, _) = tokio::join!(resp_future, async {
3166 if let Some(mut envelope) = rx.recv().await {
3167 envelope.exchange.input.set_header(
3169 "CamelHttpResponseCode",
3170 serde_json::Value::Number(201.into()),
3171 );
3172 if let Some(reply_tx) = envelope.reply_tx {
3173 let _ = reply_tx.send(Ok(envelope.exchange));
3174 }
3175 }
3176 });
3177
3178 let resp = http_result.unwrap();
3179 assert_eq!(resp.status().as_u16(), 201);
3180
3181 token.cancel();
3182 }
3183
3184 #[tokio::test]
3185 async fn test_http_consumer_returns_503_when_inflight_limit_reached() {
3186 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3187
3188 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3189 let port = listener.local_addr().unwrap().port();
3190 drop(listener);
3191
3192 let consumer_cfg = HttpServerConfig {
3193 host: "127.0.0.1".to_string(),
3194 port,
3195 path: "/saturation".to_string(),
3196 max_request_body: 2 * 1024 * 1024,
3197 max_response_body: 10 * 1024 * 1024,
3198 max_inflight_requests: 1,
3199 };
3200 let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3201
3202 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3203 let token = tokio_util::sync::CancellationToken::new();
3204 let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3205 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3206 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3207
3208 let (first_seen_tx, first_seen_rx) = tokio::sync::oneshot::channel::<()>();
3209 let (unblock_first_tx, unblock_first_rx) = tokio::sync::oneshot::channel::<()>();
3210
3211 tokio::spawn(async move {
3212 let mut first_seen_tx = Some(first_seen_tx);
3213 let mut unblock_first_rx = Some(unblock_first_rx);
3214
3215 while let Some(envelope) = rx.recv().await {
3216 if let Some(tx) = first_seen_tx.take() {
3217 let _ = tx.send(());
3218 if let Some(rx_unblock) = unblock_first_rx.take() {
3219 let _ = rx_unblock.await;
3220 }
3221 }
3222
3223 if let Some(reply_tx) = envelope.reply_tx {
3224 let _ = reply_tx.send(Ok(envelope.exchange));
3225 }
3226 }
3227 });
3228
3229 let client = reqwest::Client::new();
3230 let first_req = {
3231 let client = client.clone();
3232 async move {
3233 client
3234 .get(format!("http://127.0.0.1:{port}/saturation"))
3235 .send()
3236 .await
3237 .unwrap()
3238 }
3239 };
3240
3241 let first_handle = tokio::spawn(first_req);
3242 first_seen_rx.await.unwrap();
3243
3244 let second_resp = client
3245 .get(format!("http://127.0.0.1:{port}/saturation"))
3246 .send()
3247 .await
3248 .unwrap();
3249
3250 assert_eq!(second_resp.status().as_u16(), 503);
3251
3252 let _ = unblock_first_tx.send(());
3253 let first_resp = first_handle.await.unwrap();
3254 assert_eq!(first_resp.status().as_u16(), 200);
3255
3256 token.cancel();
3257 }
3258
3259 #[tokio::test]
3260 async fn test_http_consumer_enforces_max_response_body_for_bytes() {
3261 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3262
3263 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3264
3265 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3266 let port = listener.local_addr().unwrap().port();
3267 drop(listener);
3268
3269 let consumer_cfg = HttpServerConfig {
3270 host: "127.0.0.1".to_string(),
3271 port,
3272 path: "/limit-bytes".to_string(),
3273 max_request_body: 2 * 1024 * 1024,
3274 max_response_body: 16,
3275 max_inflight_requests: 1024,
3276 };
3277 let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3278
3279 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3280 let token = tokio_util::sync::CancellationToken::new();
3281 let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3282 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3283 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3284
3285 let client = reqwest::Client::new();
3286 let send_fut = client
3287 .get(format!("http://127.0.0.1:{port}/limit-bytes"))
3288 .send();
3289
3290 let (http_result, _) = tokio::join!(send_fut, async {
3291 if let Some(mut envelope) = rx.recv().await {
3292 envelope.exchange.input.body =
3293 camel_component_api::Body::Bytes(bytes::Bytes::from(vec![b'x'; 32]));
3294 if let Some(reply_tx) = envelope.reply_tx {
3295 let _ = reply_tx.send(Ok(envelope.exchange));
3296 }
3297 }
3298 });
3299
3300 let resp = http_result.unwrap();
3301 assert_eq!(resp.status().as_u16(), 500);
3302 let body = resp.text().await.unwrap();
3303 assert_eq!(body, "Response body exceeds configured limit");
3304 token.cancel();
3305 }
3306
3307 #[tokio::test]
3308 async fn test_http_consumer_enforces_max_response_body_for_json() {
3309 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3310
3311 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3312
3313 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3314 let port = listener.local_addr().unwrap().port();
3315 drop(listener);
3316
3317 let consumer_cfg = HttpServerConfig {
3318 host: "127.0.0.1".to_string(),
3319 port,
3320 path: "/limit-json".to_string(),
3321 max_request_body: 2 * 1024 * 1024,
3322 max_response_body: 16,
3323 max_inflight_requests: 1024,
3324 };
3325 let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3326
3327 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3328 let token = tokio_util::sync::CancellationToken::new();
3329 let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3330 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3331 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3332
3333 let client = reqwest::Client::new();
3334 let send_fut = client
3335 .get(format!("http://127.0.0.1:{port}/limit-json"))
3336 .send();
3337
3338 let (http_result, _) = tokio::join!(send_fut, async {
3339 if let Some(mut envelope) = rx.recv().await {
3340 envelope.exchange.input.body = camel_component_api::Body::Json(
3341 serde_json::json!({"message":"this response is bigger than sixteen"}),
3342 );
3343 if let Some(reply_tx) = envelope.reply_tx {
3344 let _ = reply_tx.send(Ok(envelope.exchange));
3345 }
3346 }
3347 });
3348
3349 let resp = http_result.unwrap();
3350 assert_eq!(resp.status().as_u16(), 500);
3351 let body = resp.text().await.unwrap();
3352 assert_eq!(body, "Response body exceeds configured limit");
3353 token.cancel();
3354 }
3355
3356 #[tokio::test]
3357 async fn test_http_consumer_enforces_max_response_body_for_xml() {
3358 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3359
3360 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3361
3362 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3363 let port = listener.local_addr().unwrap().port();
3364 drop(listener);
3365
3366 let consumer_cfg = HttpServerConfig {
3367 host: "127.0.0.1".to_string(),
3368 port,
3369 path: "/limit-xml".to_string(),
3370 max_request_body: 2 * 1024 * 1024,
3371 max_response_body: 16,
3372 max_inflight_requests: 1024,
3373 };
3374 let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3375
3376 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3377 let token = tokio_util::sync::CancellationToken::new();
3378 let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3379 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3380 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3381
3382 let client = reqwest::Client::new();
3383 let send_fut = client
3384 .get(format!("http://127.0.0.1:{port}/limit-xml"))
3385 .send();
3386
3387 let (http_result, _) = tokio::join!(send_fut, async {
3388 if let Some(mut envelope) = rx.recv().await {
3389 envelope.exchange.input.body = camel_component_api::Body::Xml(
3390 "<root><value>way-too-large</value></root>".into(),
3391 );
3392 if let Some(reply_tx) = envelope.reply_tx {
3393 let _ = reply_tx.send(Ok(envelope.exchange));
3394 }
3395 }
3396 });
3397
3398 let resp = http_result.unwrap();
3399 assert_eq!(resp.status().as_u16(), 500);
3400 let body = resp.text().await.unwrap();
3401 assert_eq!(body, "Response body exceeds configured limit");
3402 token.cancel();
3403 }
3404
3405 #[tokio::test]
3406 async fn test_http_consumer_does_not_enforce_max_response_body_for_stream() {
3407 use camel_component_api::{
3408 CamelError, ConsumerContext, ExchangeEnvelope, StreamBody, StreamMetadata,
3409 };
3410 use futures::stream;
3411
3412 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3413
3414 let listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();
3415 let port = listener.local_addr().unwrap().port();
3416 drop(listener);
3417
3418 let consumer_cfg = HttpServerConfig {
3419 host: "0.0.0.0".to_string(),
3420 port,
3421 path: "/limit-stream".to_string(),
3422 max_request_body: 2 * 1024 * 1024,
3423 max_response_body: 16,
3424 max_inflight_requests: 1024,
3425 };
3426 let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3427
3428 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3429 let token = tokio_util::sync::CancellationToken::new();
3430 let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3431 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3432 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3433
3434 let client = reqwest::Client::new();
3435 let send_fut = client
3436 .get(format!("http://127.0.0.1:{port}/limit-stream"))
3437 .send();
3438
3439 let (http_result, _) = tokio::join!(send_fut, async {
3440 if let Some(mut envelope) = rx.recv().await {
3441 let chunks: Vec<Result<bytes::Bytes, CamelError>> =
3442 vec![Ok(bytes::Bytes::from(vec![b'x'; 32]))];
3443 let stream = Box::pin(stream::iter(chunks));
3444 envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
3445 stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
3446 metadata: StreamMetadata {
3447 size_hint: Some(32),
3448 content_type: Some("application/octet-stream".into()),
3449 origin: None,
3450 },
3451 });
3452 if let Some(reply_tx) = envelope.reply_tx {
3453 let _ = reply_tx.send(Ok(envelope.exchange));
3454 }
3455 }
3456 });
3457
3458 let resp = http_result.unwrap();
3459 assert_eq!(resp.status().as_u16(), 200);
3460 let body = resp.bytes().await.unwrap();
3461 assert_eq!(body.len(), 32);
3462 token.cancel();
3463 }
3464
3465 #[tokio::test]
3470 async fn test_integration_single_consumer_round_trip() {
3471 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3472
3473 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3475 let port = listener.local_addr().unwrap().port();
3476 drop(listener); let component = HttpComponent::new();
3479 let endpoint_ctx = NoOpComponentContext;
3480 let endpoint = component
3481 .create_endpoint(&format!("http://127.0.0.1:{port}/echo"), &endpoint_ctx)
3482 .unwrap();
3483 let mut consumer = endpoint.create_consumer(rt()).unwrap();
3484
3485 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3486 let token = tokio_util::sync::CancellationToken::new();
3487 let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3488
3489 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3490 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3491
3492 let client = reqwest::Client::new();
3493 let send_fut = client
3494 .post(format!("http://127.0.0.1:{port}/echo"))
3495 .header("Content-Type", "text/plain")
3496 .body("ping")
3497 .send();
3498
3499 let (http_result, _) = tokio::join!(send_fut, async {
3500 if let Some(mut envelope) = rx.recv().await {
3501 assert_eq!(
3502 envelope.exchange.input.header("CamelHttpMethod"),
3503 Some(&serde_json::Value::String("POST".into()))
3504 );
3505 assert_eq!(
3506 envelope.exchange.input.header("CamelHttpPath"),
3507 Some(&serde_json::Value::String("/echo".into()))
3508 );
3509 envelope.exchange.input.body = camel_component_api::Body::Text("pong".to_string());
3510 if let Some(reply_tx) = envelope.reply_tx {
3511 let _ = reply_tx.send(Ok(envelope.exchange));
3512 }
3513 }
3514 });
3515
3516 let resp = http_result.unwrap();
3517 assert_eq!(resp.status().as_u16(), 200);
3518 let body = resp.text().await.unwrap();
3519 assert_eq!(body, "pong");
3520
3521 token.cancel();
3522 }
3523
3524 #[tokio::test]
3525 async fn test_integration_two_consumers_shared_port() {
3526 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3527
3528 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3529
3530 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3532 let port = listener.local_addr().unwrap().port();
3533 drop(listener);
3534
3535 let component = HttpComponent::new();
3536 let endpoint_ctx = NoOpComponentContext;
3537
3538 let endpoint_a = component
3540 .create_endpoint(&format!("http://127.0.0.1:{port}/hello"), &endpoint_ctx)
3541 .unwrap();
3542 let mut consumer_a = endpoint_a.create_consumer(rt()).unwrap();
3543
3544 let endpoint_b = component
3546 .create_endpoint(&format!("http://127.0.0.1:{port}/world"), &endpoint_ctx)
3547 .unwrap();
3548 let mut consumer_b = endpoint_b.create_consumer(rt()).unwrap();
3549
3550 let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3551 let token_a = tokio_util::sync::CancellationToken::new();
3552 let ctx_a = ConsumerContext::new(tx_a, token_a.clone(), "http-test-route-a".to_string());
3553
3554 let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3555 let token_b = tokio_util::sync::CancellationToken::new();
3556 let ctx_b = ConsumerContext::new(tx_b, token_b.clone(), "http-test-route-b".to_string());
3557
3558 tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
3559 tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
3560 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3561
3562 let client = reqwest::Client::new();
3563
3564 let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
3566 let (resp_hello, _) = tokio::join!(fut_hello, async {
3567 if let Some(mut envelope) = rx_a.recv().await {
3568 envelope.exchange.input.body =
3569 camel_component_api::Body::Text("hello-response".to_string());
3570 if let Some(reply_tx) = envelope.reply_tx {
3571 let _ = reply_tx.send(Ok(envelope.exchange));
3572 }
3573 }
3574 });
3575
3576 let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
3578 let (resp_world, _) = tokio::join!(fut_world, async {
3579 if let Some(mut envelope) = rx_b.recv().await {
3580 envelope.exchange.input.body =
3581 camel_component_api::Body::Text("world-response".to_string());
3582 if let Some(reply_tx) = envelope.reply_tx {
3583 let _ = reply_tx.send(Ok(envelope.exchange));
3584 }
3585 }
3586 });
3587
3588 let body_a = resp_hello.unwrap().text().await.unwrap();
3589 let body_b = resp_world.unwrap().text().await.unwrap();
3590
3591 assert_eq!(body_a, "hello-response");
3592 assert_eq!(body_b, "world-response");
3593
3594 token_a.cancel();
3595 token_b.cancel();
3596 }
3597
3598 #[tokio::test]
3599 async fn test_integration_unregistered_path_returns_404() {
3600 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3601
3602 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3604 let port = listener.local_addr().unwrap().port();
3605 drop(listener);
3606
3607 let component = HttpComponent::new();
3608 let endpoint_ctx = NoOpComponentContext;
3609 let endpoint = component
3610 .create_endpoint(
3611 &format!("http://127.0.0.1:{port}/registered"),
3612 &endpoint_ctx,
3613 )
3614 .unwrap();
3615 let mut consumer = endpoint.create_consumer(rt()).unwrap();
3616
3617 let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3618 let token = tokio_util::sync::CancellationToken::new();
3619 let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3620
3621 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3622
3623 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
3625 loop {
3626 if tokio::net::TcpStream::connect(format!("127.0.0.1:{port}"))
3627 .await
3628 .is_ok()
3629 {
3630 break;
3631 }
3632 if std::time::Instant::now() >= deadline {
3633 panic!("HTTP server did not start within 5s on port {port}");
3634 }
3635 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3636 }
3637
3638 let client = reqwest::Client::new();
3639 let resp = client
3640 .get(format!("http://127.0.0.1:{port}/not-there"))
3641 .send()
3642 .await
3643 .unwrap();
3644 assert_eq!(resp.status().as_u16(), 404);
3645
3646 token.cancel();
3647 }
3648
3649 #[test]
3650 fn test_http_consumer_declares_concurrent() {
3651 use camel_component_api::ConcurrencyModel;
3652
3653 let config = HttpServerConfig {
3654 host: "127.0.0.1".to_string(),
3655 port: 19999,
3656 path: "/test".to_string(),
3657 max_request_body: 2 * 1024 * 1024,
3658 max_response_body: 10 * 1024 * 1024,
3659 max_inflight_requests: 1024,
3660 };
3661 let consumer = HttpConsumer::new(config, test_rt());
3662 assert_eq!(
3663 consumer.concurrency_model(),
3664 ConcurrencyModel::Concurrent { max: None }
3665 );
3666 }
3667
3668 #[tokio::test]
3673 async fn test_http_reply_body_stream_variant_exists() {
3674 use bytes::Bytes;
3675 use camel_component_api::CamelError;
3676 use futures::stream;
3677
3678 let chunks: Vec<Result<Bytes, CamelError>> =
3679 vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
3680 let stream = Box::pin(stream::iter(chunks));
3681 let reply_body = HttpReplyBody::Stream(stream);
3682 match reply_body {
3684 HttpReplyBody::Stream(_) => {}
3685 HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
3686 }
3687 }
3688
3689 #[cfg(feature = "otel")]
3694 mod otel_tests {
3695 use super::*;
3696 use camel_component_api::Message;
3697 use tower::ServiceExt;
3698
3699 #[tokio::test]
3700 async fn test_producer_injects_traceparent_header() {
3701 let (url, _handle) = start_test_server_with_header_capture().await;
3702 let ctx = test_producer_ctx();
3703
3704 let component = HttpComponent::new();
3705 let endpoint_ctx = NoOpComponentContext;
3706 let endpoint = component
3707 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3708 .unwrap();
3709 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
3710
3711 let mut exchange = Exchange::new(Message::default());
3713 let mut headers = std::collections::HashMap::new();
3714 headers.insert(
3715 "traceparent".to_string(),
3716 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
3717 );
3718 camel_otel::extract_into_exchange(&mut exchange, &headers);
3719
3720 let result = producer.oneshot(exchange).await.unwrap();
3721
3722 let status = result
3724 .input
3725 .header("CamelHttpResponseCode")
3726 .and_then(|v| v.as_u64())
3727 .unwrap();
3728 assert_eq!(status, 200);
3729
3730 let traceparent = result.input.header("X-Received-Traceparent");
3732 assert!(
3733 traceparent.is_some(),
3734 "traceparent header should have been sent"
3735 );
3736
3737 let traceparent_str = traceparent.unwrap().as_str().unwrap();
3738 let parts: Vec<&str> = traceparent_str.split('-').collect();
3740 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3741 assert_eq!(parts[0], "00", "version should be 00");
3742 assert_eq!(
3743 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3744 "trace-id should match"
3745 );
3746 assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
3747 assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
3748 }
3749
3750 #[tokio::test]
3751 async fn test_consumer_extracts_traceparent_header() {
3752 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3753
3754 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3756 let port = listener.local_addr().unwrap().port();
3757 drop(listener);
3758
3759 let component = HttpComponent::new();
3760 let endpoint_ctx = NoOpComponentContext;
3761 let endpoint = component
3762 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
3763 .unwrap();
3764 let mut consumer = endpoint.create_consumer(rt()).unwrap();
3765
3766 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3767 let token = tokio_util::sync::CancellationToken::new();
3768 let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3769
3770 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3771 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3772
3773 let client = reqwest::Client::new();
3775 let send_fut = client
3776 .post(format!("http://127.0.0.1:{port}/trace"))
3777 .header(
3778 "traceparent",
3779 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3780 )
3781 .body("test")
3782 .send();
3783
3784 let (http_result, _) = tokio::join!(send_fut, async {
3785 if let Some(envelope) = rx.recv().await {
3786 let mut injected_headers = std::collections::HashMap::new();
3789 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3790
3791 assert!(
3792 injected_headers.contains_key("traceparent"),
3793 "Exchange should have traceparent after extraction"
3794 );
3795
3796 let traceparent = injected_headers.get("traceparent").unwrap();
3797 let parts: Vec<&str> = traceparent.split('-').collect();
3798 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3799 assert_eq!(
3800 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3801 "Trace ID should match the original traceparent header"
3802 );
3803
3804 if let Some(reply_tx) = envelope.reply_tx {
3805 let _ = reply_tx.send(Ok(envelope.exchange));
3806 }
3807 }
3808 });
3809
3810 let resp = http_result.unwrap();
3811 assert_eq!(resp.status().as_u16(), 200);
3812
3813 token.cancel();
3814 }
3815
3816 #[tokio::test]
3817 async fn test_consumer_extracts_mixed_case_traceparent_header() {
3818 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3819
3820 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3822 let port = listener.local_addr().unwrap().port();
3823 drop(listener);
3824
3825 let component = HttpComponent::new();
3826 let endpoint_ctx = NoOpComponentContext;
3827 let endpoint = component
3828 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
3829 .unwrap();
3830 let mut consumer = endpoint.create_consumer(rt()).unwrap();
3831
3832 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3833 let token = tokio_util::sync::CancellationToken::new();
3834 let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3835
3836 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3837 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3838
3839 let client = reqwest::Client::new();
3841 let send_fut = client
3842 .post(format!("http://127.0.0.1:{port}/trace"))
3843 .header(
3844 "TraceParent",
3845 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3846 )
3847 .body("test")
3848 .send();
3849
3850 let (http_result, _) = tokio::join!(send_fut, async {
3851 if let Some(envelope) = rx.recv().await {
3852 let mut injected_headers = HashMap::new();
3855 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3856
3857 assert!(
3858 injected_headers.contains_key("traceparent"),
3859 "Exchange should have traceparent after extraction from mixed-case header"
3860 );
3861
3862 let traceparent = injected_headers.get("traceparent").unwrap();
3863 let parts: Vec<&str> = traceparent.split('-').collect();
3864 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3865 assert_eq!(
3866 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3867 "Trace ID should match the original mixed-case TraceParent header"
3868 );
3869
3870 if let Some(reply_tx) = envelope.reply_tx {
3871 let _ = reply_tx.send(Ok(envelope.exchange));
3872 }
3873 }
3874 });
3875
3876 let resp = http_result.unwrap();
3877 assert_eq!(resp.status().as_u16(), 200);
3878
3879 token.cancel();
3880 }
3881
3882 #[tokio::test]
3883 async fn test_producer_no_trace_context_no_crash() {
3884 let (url, _handle) = start_test_server().await;
3885 let ctx = test_producer_ctx();
3886
3887 let component = HttpComponent::new();
3888 let endpoint_ctx = NoOpComponentContext;
3889 let endpoint = component
3890 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3891 .unwrap();
3892 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
3893
3894 let exchange = Exchange::new(Message::default());
3896
3897 let result = producer.oneshot(exchange).await.unwrap();
3899
3900 let status = result
3902 .input
3903 .header("CamelHttpResponseCode")
3904 .and_then(|v| v.as_u64())
3905 .unwrap();
3906 assert_eq!(status, 200);
3907 }
3908
3909 async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
3911 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3912 let addr = listener.local_addr().unwrap();
3913 let url = format!("http://127.0.0.1:{}", addr.port());
3914
3915 let handle = tokio::spawn(async move {
3916 loop {
3917 if let Ok((mut stream, _)) = listener.accept().await {
3918 tokio::spawn(async move {
3919 use tokio::io::{AsyncReadExt, AsyncWriteExt};
3920 let mut buf = vec![0u8; 8192];
3921 let n = stream.read(&mut buf).await.unwrap_or(0);
3922 let request = String::from_utf8_lossy(&buf[..n]).to_string();
3923
3924 let traceparent = request
3926 .lines()
3927 .find(|line| line.to_lowercase().starts_with("traceparent:"))
3928 .map(|line| {
3929 line.split(':')
3930 .nth(1)
3931 .map(|s| s.trim().to_string())
3932 .unwrap_or_default()
3933 })
3934 .unwrap_or_default();
3935
3936 let body =
3937 format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
3938 let response = format!(
3939 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
3940 body.len(),
3941 traceparent,
3942 body
3943 );
3944 let _ = stream.write_all(response.as_bytes()).await;
3945 });
3946 }
3947 }
3948 });
3949
3950 (url, handle)
3951 }
3952 }
3953
3954 #[tokio::test]
3963 async fn test_request_body_arrives_as_stream() {
3964 use camel_component_api::Body;
3965 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3966
3967 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3968 let port = listener.local_addr().unwrap().port();
3969 drop(listener);
3970
3971 let component = HttpComponent::new();
3972 let endpoint_ctx = NoOpComponentContext;
3973 let endpoint = component
3974 .create_endpoint(&format!("http://127.0.0.1:{port}/upload"), &endpoint_ctx)
3975 .unwrap();
3976 let mut consumer = endpoint.create_consumer(rt()).unwrap();
3977
3978 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3979 let token = tokio_util::sync::CancellationToken::new();
3980 let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
3981
3982 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3983 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3984
3985 let client = reqwest::Client::new();
3986 let send_fut = client
3987 .post(format!("http://127.0.0.1:{port}/upload"))
3988 .body("hello streaming world")
3989 .send();
3990
3991 let (http_result, _) = tokio::join!(send_fut, async {
3992 if let Some(mut envelope) = rx.recv().await {
3993 assert!(
3995 matches!(envelope.exchange.input.body, Body::Stream(_)),
3996 "expected Body::Stream, got discriminant {:?}",
3997 std::mem::discriminant(&envelope.exchange.input.body)
3998 );
3999 let bytes = envelope
4001 .exchange
4002 .input
4003 .body
4004 .into_bytes(1024 * 1024)
4005 .await
4006 .unwrap();
4007 assert_eq!(&bytes[..], b"hello streaming world");
4008
4009 envelope.exchange.input.body = camel_component_api::Body::Empty;
4010 if let Some(reply_tx) = envelope.reply_tx {
4011 let _ = reply_tx.send(Ok(envelope.exchange));
4012 }
4013 }
4014 });
4015
4016 let resp = http_result.unwrap();
4017 assert_eq!(resp.status().as_u16(), 200);
4018
4019 token.cancel();
4020 }
4021
4022 #[tokio::test]
4027 async fn test_streaming_response_chunked() {
4028 use bytes::Bytes;
4029 use camel_component_api::Body;
4030 use camel_component_api::CamelError;
4031 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
4032 use camel_component_api::{StreamBody, StreamMetadata};
4033 use futures::stream;
4034 use std::sync::Arc;
4035 use tokio::sync::Mutex;
4036
4037 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4038 let port = listener.local_addr().unwrap().port();
4039 drop(listener);
4040
4041 let component = HttpComponent::new();
4042 let endpoint_ctx = NoOpComponentContext;
4043 let endpoint = component
4044 .create_endpoint(&format!("http://127.0.0.1:{port}/stream"), &endpoint_ctx)
4045 .unwrap();
4046 let mut consumer = endpoint.create_consumer(rt()).unwrap();
4047
4048 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
4049 let token = tokio_util::sync::CancellationToken::new();
4050 let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
4051
4052 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4053 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4054
4055 let client = reqwest::Client::new();
4056 let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
4057
4058 let (http_result, _) = tokio::join!(send_fut, async {
4059 if let Some(mut envelope) = rx.recv().await {
4060 let chunks: Vec<Result<Bytes, CamelError>> =
4062 vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
4063 let stream = Box::pin(stream::iter(chunks));
4064 envelope.exchange.input.body = Body::Stream(StreamBody {
4065 stream: Arc::new(Mutex::new(Some(stream))),
4066 metadata: StreamMetadata::default(),
4067 });
4068 if let Some(reply_tx) = envelope.reply_tx {
4069 let _ = reply_tx.send(Ok(envelope.exchange));
4070 }
4071 }
4072 });
4073
4074 let resp = http_result.unwrap();
4075 assert_eq!(resp.status().as_u16(), 200);
4076 let body = resp.text().await.unwrap();
4077 assert_eq!(body, "chunk1chunk2");
4078
4079 token.cancel();
4080 }
4081
4082 #[tokio::test]
4087 async fn test_413_when_content_length_exceeds_limit() {
4088 use camel_component_api::ConsumerContext;
4089
4090 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4091 let port = listener.local_addr().unwrap().port();
4092 drop(listener);
4093
4094 let component = HttpComponent::new();
4096 let endpoint_ctx = NoOpComponentContext;
4097 let endpoint = component
4098 .create_endpoint(
4099 &format!("http://127.0.0.1:{port}/upload?maxRequestBody=100"),
4100 &endpoint_ctx,
4101 )
4102 .unwrap();
4103 let mut consumer = endpoint.create_consumer(rt()).unwrap();
4104
4105 let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4106 let token = tokio_util::sync::CancellationToken::new();
4107 let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
4108
4109 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4110 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4111
4112 let client = reqwest::Client::new();
4113 let resp = client
4114 .post(format!("http://127.0.0.1:{port}/upload"))
4115 .header("Content-Length", "1000") .body("x".repeat(1000))
4117 .send()
4118 .await
4119 .unwrap();
4120
4121 assert_eq!(resp.status().as_u16(), 413);
4122
4123 token.cancel();
4124 }
4125
4126 #[tokio::test]
4130 async fn test_chunked_upload_without_content_length_bypasses_limit() {
4131 use bytes::Bytes;
4132 use camel_component_api::Body;
4133 use camel_component_api::ConsumerContext;
4134 use futures::stream;
4135
4136 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4137 let port = listener.local_addr().unwrap().port();
4138 drop(listener);
4139
4140 let component = HttpComponent::new();
4142 let endpoint_ctx = NoOpComponentContext;
4143 let endpoint = component
4144 .create_endpoint(
4145 &format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"),
4146 &endpoint_ctx,
4147 )
4148 .unwrap();
4149 let mut consumer = endpoint.create_consumer(rt()).unwrap();
4150
4151 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4152 let token = tokio_util::sync::CancellationToken::new();
4153 let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
4154
4155 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4156 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4157
4158 let client = reqwest::Client::new();
4159
4160 let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
4164 Ok(Bytes::from("y".repeat(50))),
4165 Ok(Bytes::from("y".repeat(50))),
4166 ];
4167 let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
4168 let send_fut = client
4169 .post(format!("http://127.0.0.1:{port}/upload"))
4170 .body(stream_body)
4171 .send();
4172
4173 let consumer_fut = async {
4174 match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
4176 Ok(Some(mut envelope)) => {
4177 assert!(
4178 matches!(envelope.exchange.input.body, Body::Stream(_)),
4179 "expected Body::Stream"
4180 );
4181 envelope.exchange.input.body = camel_component_api::Body::Empty;
4182 if let Some(reply_tx) = envelope.reply_tx {
4183 let _ = reply_tx.send(Ok(envelope.exchange));
4184 }
4185 }
4186 Ok(None) => panic!("consumer channel closed unexpectedly"),
4187 Err(_) => {
4188 }
4191 }
4192 };
4193
4194 let (http_result, _) = tokio::join!(send_fut, consumer_fut);
4195
4196 let resp = http_result.unwrap();
4197 assert_ne!(
4199 resp.status().as_u16(),
4200 413,
4201 "chunked upload must not be rejected by maxRequestBody"
4202 );
4203 assert_eq!(resp.status().as_u16(), 200);
4204
4205 token.cancel();
4206 }
4207
4208 #[test]
4209 fn test_validate_url_for_ssrf_blocks_and_allows_hosts() {
4210 let mut cfg = HttpEndpointConfig::from_uri("http://example.com").unwrap();
4211 cfg.blocked_hosts = vec!["blocked.local".to_string()];
4212 cfg.allow_private_ips = false;
4213
4214 let blocked = validate_url_for_ssrf("http://blocked.local/api", &cfg);
4215 assert!(blocked.is_err());
4216
4217 let private_ip = validate_url_for_ssrf("http://127.0.0.1/api", &cfg);
4218 assert!(private_ip.is_err());
4219
4220 cfg.allow_private_ips = true;
4221 let allowed = validate_url_for_ssrf("http://127.0.0.1/api", &cfg);
4222 assert!(allowed.is_ok());
4223 }
4224
4225 #[test]
4226 fn test_is_private_ip_ranges() {
4227 assert!(is_private_ip(&"10.0.0.1".parse().unwrap())); assert!(is_private_ip(&"172.16.1.10".parse().unwrap())); assert!(is_private_ip(&"192.168.1.1".parse().unwrap())); assert!(is_private_ip(&"127.0.0.1".parse().unwrap())); assert!(is_private_ip(&"169.254.1.1".parse().unwrap())); assert!(is_private_ip(&"0.1.2.3".parse().unwrap())); assert!(is_private_ip(&"::1".parse().unwrap())); assert!(is_private_ip(&"fc00::1".parse().unwrap())); assert!(is_private_ip(&"fd12::1".parse().unwrap())); assert!(is_private_ip(&"fe80::1".parse().unwrap())); assert!(is_private_ip(&"::ffff:10.0.0.1".parse().unwrap())); assert!(is_private_ip(&"::ffff:192.168.1.1".parse().unwrap())); assert!(is_private_ip(&"::ffff:127.0.0.1".parse().unwrap())); assert!(!is_private_ip(&"8.8.8.8".parse().unwrap())); assert!(!is_private_ip(&"::ffff:8.8.8.8".parse().unwrap())); assert!(!is_private_ip(&"2001:4860:4860::8888".parse().unwrap())); }
4247
4248 #[tokio::test]
4249 async fn test_validate_resolved_host_for_ssrf_blocks_resolved_private_ip() {
4250 let mut cfg = HttpEndpointConfig::from_uri("http://example.com").unwrap(); cfg.allow_private_ips = false;
4252
4253 let err = validate_resolved_host_for_ssrf("http://localhost", &cfg)
4254 .await
4255 .expect_err("localhost must resolve to loopback and be blocked");
4256
4257 let msg = err.to_string();
4258 assert!(msg.contains("Target resolved to private IP"));
4259 }
4260
4261 #[test]
4262 fn test_title_case_header() {
4263 assert_eq!(title_case_header("content-type"), "Content-Type");
4264 assert_eq!(title_case_header("authorization"), "Authorization");
4265 assert_eq!(title_case_header("x-custom-header"), "X-Custom-Header");
4266 assert_eq!(title_case_header("host"), "Host");
4267 assert_eq!(title_case_header("x-b3-traceid"), "X-B3-Traceid");
4268 assert_eq!(title_case_header("single"), "Single");
4269 assert_eq!(title_case_header(""), "");
4270 }
4271
4272 #[test]
4273 fn test_resolve_url_combines_path_and_query_sources() {
4274 let cfg = HttpEndpointConfig::from_uri("http://example.com/base?foo=bar").unwrap();
4275 let mut exchange = Exchange::new(Message::default());
4276 exchange.input.set_header(
4277 "CamelHttpPath",
4278 serde_json::Value::String("next".to_string()),
4279 );
4280 let url = HttpProducer::resolve_url(&exchange, &cfg);
4281 assert!(url.starts_with("http://example.com/base/next?"));
4282 assert!(url.contains("foo=bar"));
4283
4284 exchange.input.set_header(
4285 "CamelHttpUri",
4286 serde_json::Value::String("http://other.test/root".to_string()),
4287 );
4288 exchange.input.set_header(
4289 "CamelHttpQuery",
4290 serde_json::Value::String("a=1&b=2".to_string()),
4291 );
4292
4293 let override_url = HttpProducer::resolve_url(&exchange, &cfg);
4294 assert_eq!(override_url, "http://other.test/root/next?a=1&b=2");
4295 }
4296
4297 #[test]
4298 fn test_http_producer_helpers_status_and_size_boundaries() {
4299 assert!(HttpProducer::is_ok_status(200, (200, 299)));
4300 assert!(HttpProducer::is_ok_status(299, (200, 299)));
4301 assert!(!HttpProducer::is_ok_status(199, (200, 299)));
4302 assert!(!HttpProducer::is_ok_status(300, (200, 299)));
4303
4304 assert!(!exceeds_max_response_body(10, 10));
4305 assert!(exceeds_max_response_body(11, 10));
4306 }
4307
4308 async fn setup_consumer_on_free_port(
4313 path: &str,
4314 ) -> (
4315 u16,
4316 tokio::sync::mpsc::Receiver<camel_component_api::ExchangeEnvelope>,
4317 tokio_util::sync::CancellationToken,
4318 ) {
4319 use camel_component_api::ConsumerContext;
4320
4321 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4322 let port = listener.local_addr().unwrap().port();
4323 drop(listener);
4324
4325 let consumer_cfg = HttpServerConfig {
4326 host: "127.0.0.1".to_string(),
4327 port,
4328 path: path.to_string(),
4329 max_request_body: 2 * 1024 * 1024,
4330 max_response_body: 10 * 1024 * 1024,
4331 max_inflight_requests: 1024,
4332 };
4333 let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
4334
4335 let (tx, rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4336 let token = tokio_util::sync::CancellationToken::new();
4337 let ctx = ConsumerContext::new(tx, token.clone(), "http-test-route".to_string());
4338
4339 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4340 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4341
4342 (port, rx, token)
4343 }
4344
4345 #[tokio::test]
4346 async fn test_content_type_inferred_for_json_body() {
4347 let (port, mut rx, token) = setup_consumer_on_free_port("/json").await;
4348
4349 let client = reqwest::Client::new();
4350 let send_fut = client.get(format!("http://127.0.0.1:{port}/json")).send();
4351
4352 let (http_result, _) = tokio::join!(send_fut, async {
4353 if let Some(mut envelope) = rx.recv().await {
4354 envelope.exchange.input.body =
4355 camel_component_api::Body::Json(serde_json::json!({"message": "hello"}));
4356 if let Some(reply_tx) = envelope.reply_tx {
4357 let _ = reply_tx.send(Ok(envelope.exchange));
4358 }
4359 }
4360 });
4361
4362 let resp = http_result.unwrap();
4363 assert_eq!(resp.status().as_u16(), 200);
4364 let ct = resp
4365 .headers()
4366 .get("content-type")
4367 .expect("Content-Type header should be present");
4368 assert_eq!(ct, "application/json");
4369 let body = resp.text().await.unwrap();
4370 assert_eq!(body, r#"{"message":"hello"}"#);
4371
4372 token.cancel();
4373 }
4374
4375 #[tokio::test]
4376 async fn test_content_type_inferred_for_text_body() {
4377 let (port, mut rx, token) = setup_consumer_on_free_port("/text").await;
4378
4379 let client = reqwest::Client::new();
4380 let send_fut = client.get(format!("http://127.0.0.1:{port}/text")).send();
4381
4382 let (http_result, _) = tokio::join!(send_fut, async {
4383 if let Some(mut envelope) = rx.recv().await {
4384 envelope.exchange.input.body =
4385 camel_component_api::Body::Text("plain text response".to_string());
4386 if let Some(reply_tx) = envelope.reply_tx {
4387 let _ = reply_tx.send(Ok(envelope.exchange));
4388 }
4389 }
4390 });
4391
4392 let resp = http_result.unwrap();
4393 assert_eq!(resp.status().as_u16(), 200);
4394 let ct = resp
4395 .headers()
4396 .get("content-type")
4397 .expect("Content-Type header should be present");
4398 assert_eq!(ct, "text/plain; charset=utf-8");
4399 let body = resp.text().await.unwrap();
4400 assert_eq!(body, "plain text response");
4401
4402 token.cancel();
4403 }
4404
4405 #[tokio::test]
4406 async fn test_content_type_inferred_for_xml_body() {
4407 let (port, mut rx, token) = setup_consumer_on_free_port("/xml").await;
4408
4409 let client = reqwest::Client::new();
4410 let send_fut = client.get(format!("http://127.0.0.1:{port}/xml")).send();
4411
4412 let (http_result, _) = tokio::join!(send_fut, async {
4413 if let Some(mut envelope) = rx.recv().await {
4414 envelope.exchange.input.body =
4415 camel_component_api::Body::Xml("<root><item>value</item></root>".to_string());
4416 if let Some(reply_tx) = envelope.reply_tx {
4417 let _ = reply_tx.send(Ok(envelope.exchange));
4418 }
4419 }
4420 });
4421
4422 let resp = http_result.unwrap();
4423 assert_eq!(resp.status().as_u16(), 200);
4424 let ct = resp
4425 .headers()
4426 .get("content-type")
4427 .expect("Content-Type header should be present");
4428 assert_eq!(ct, "application/xml");
4429 let body = resp.text().await.unwrap();
4430 assert_eq!(body, "<root><item>value</item></root>");
4431
4432 token.cancel();
4433 }
4434
4435 #[tokio::test]
4436 async fn test_no_content_type_for_empty_body() {
4437 let (port, mut rx, token) = setup_consumer_on_free_port("/empty").await;
4438
4439 let client = reqwest::Client::new();
4440 let send_fut = client.get(format!("http://127.0.0.1:{port}/empty")).send();
4441
4442 let (http_result, _) = tokio::join!(send_fut, async {
4443 if let Some(mut envelope) = rx.recv().await {
4444 envelope.exchange.input.body = camel_component_api::Body::Empty;
4445 if let Some(reply_tx) = envelope.reply_tx {
4446 let _ = reply_tx.send(Ok(envelope.exchange));
4447 }
4448 }
4449 });
4450
4451 let resp = http_result.unwrap();
4452 assert_eq!(resp.status().as_u16(), 200);
4453 assert!(
4454 resp.headers().get("content-type").is_none(),
4455 "Empty body should not set Content-Type"
4456 );
4457
4458 token.cancel();
4459 }
4460
4461 #[tokio::test]
4462 async fn test_no_content_type_for_raw_bytes_body() {
4463 let (port, mut rx, token) = setup_consumer_on_free_port("/bytes").await;
4464
4465 let client = reqwest::Client::new();
4466 let send_fut = client.get(format!("http://127.0.0.1:{port}/bytes")).send();
4467
4468 let (http_result, _) = tokio::join!(send_fut, async {
4469 if let Some(mut envelope) = rx.recv().await {
4470 envelope.exchange.input.body =
4471 camel_component_api::Body::Bytes(bytes::Bytes::from_static(b"\x00\x01\x02"));
4472 if let Some(reply_tx) = envelope.reply_tx {
4473 let _ = reply_tx.send(Ok(envelope.exchange));
4474 }
4475 }
4476 });
4477
4478 let resp = http_result.unwrap();
4479 assert_eq!(resp.status().as_u16(), 200);
4480 assert!(
4481 resp.headers().get("content-type").is_none(),
4482 "Raw Bytes body should not set Content-Type"
4483 );
4484
4485 token.cancel();
4486 }
4487
4488 #[tokio::test]
4489 async fn test_content_type_from_stream_metadata() {
4490 use camel_component_api::{StreamBody, StreamMetadata};
4491 use futures::stream;
4492
4493 let (port, mut rx, token) = setup_consumer_on_free_port("/stream-ct").await;
4494
4495 let client = reqwest::Client::new();
4496 let send_fut = client
4497 .get(format!("http://127.0.0.1:{port}/stream-ct"))
4498 .send();
4499
4500 let (http_result, _) = tokio::join!(send_fut, async {
4501 if let Some(mut envelope) = rx.recv().await {
4502 let chunks: Vec<Result<bytes::Bytes, CamelError>> =
4503 vec![Ok(bytes::Bytes::from("audio data"))];
4504 let stream = Box::pin(stream::iter(chunks));
4505 envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
4506 stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
4507 metadata: StreamMetadata {
4508 size_hint: None,
4509 content_type: Some("audio/mpeg".to_string()),
4510 origin: None,
4511 },
4512 });
4513 if let Some(reply_tx) = envelope.reply_tx {
4514 let _ = reply_tx.send(Ok(envelope.exchange));
4515 }
4516 }
4517 });
4518
4519 let resp = http_result.unwrap();
4520 assert_eq!(resp.status().as_u16(), 200);
4521 let ct = resp
4522 .headers()
4523 .get("content-type")
4524 .expect("Content-Type header should be present");
4525 assert_eq!(ct, "audio/mpeg");
4526 let body = resp.text().await.unwrap();
4527 assert_eq!(body, "audio data");
4528
4529 token.cancel();
4530 }
4531
4532 #[tokio::test]
4533 async fn test_user_content_type_overrides_inferred() {
4534 let (port, mut rx, token) = setup_consumer_on_free_port("/override-ct").await;
4535
4536 let client = reqwest::Client::new();
4537 let send_fut = client
4538 .get(format!("http://127.0.0.1:{port}/override-ct"))
4539 .send();
4540
4541 let (http_result, _) = tokio::join!(send_fut, async {
4542 if let Some(mut envelope) = rx.recv().await {
4543 envelope.exchange.input.body =
4544 camel_component_api::Body::Json(serde_json::json!({"ok": true}));
4545 envelope.exchange.input.set_header(
4546 "Content-Type",
4547 serde_json::Value::String("text/html".to_string()),
4548 );
4549 if let Some(reply_tx) = envelope.reply_tx {
4550 let _ = reply_tx.send(Ok(envelope.exchange));
4551 }
4552 }
4553 });
4554
4555 let resp = http_result.unwrap();
4556 assert_eq!(resp.status().as_u16(), 200);
4557 let ct = resp
4558 .headers()
4559 .get("content-type")
4560 .expect("Content-Type header should be present");
4561 assert_eq!(
4562 ct, "text/html",
4563 "User-set Content-Type should take precedence over inferred type"
4564 );
4565
4566 token.cancel();
4567 }
4568
4569 #[tokio::test]
4570 async fn test_user_content_type_with_bytes_body() {
4571 let (port, mut rx, token) = setup_consumer_on_free_port("/bytes-ct").await;
4572
4573 let client = reqwest::Client::new();
4574 let send_fut = client
4575 .get(format!("http://127.0.0.1:{port}/bytes-ct"))
4576 .send();
4577
4578 let (http_result, _) = tokio::join!(send_fut, async {
4579 if let Some(mut envelope) = rx.recv().await {
4580 envelope.exchange.input.body =
4581 camel_component_api::Body::Bytes(bytes::Bytes::from_static(b"{\"ok\":true}"));
4582 envelope.exchange.input.set_header(
4583 "Content-Type",
4584 serde_json::Value::String("application/json".to_string()),
4585 );
4586 if let Some(reply_tx) = envelope.reply_tx {
4587 let _ = reply_tx.send(Ok(envelope.exchange));
4588 }
4589 }
4590 });
4591
4592 let resp = http_result.unwrap();
4593 assert_eq!(resp.status().as_u16(), 200);
4594 let ct = resp
4595 .headers()
4596 .get("content-type")
4597 .expect("Content-Type header should be present for Bytes body with user header");
4598 assert_eq!(
4599 ct, "application/json",
4600 "User Content-Type should be sent for Bytes body"
4601 );
4602
4603 token.cancel();
4604 }
4605
4606 #[tokio::test]
4611 async fn monitor_task_silent_on_clean_exit() {
4612 let handle: tokio::task::JoinHandle<()> = tokio::spawn(async {});
4613 monitor_axum_task(
4615 handle,
4616 "127.0.0.1:0".to_string(),
4617 noop_rt(),
4618 "test-monitor".into(),
4619 )
4620 .await;
4621 }
4622
4623 #[tokio::test]
4624 async fn monitor_task_handles_panicked_task() {
4625 let handle: tokio::task::JoinHandle<()> = tokio::spawn(async {
4626 panic!("simulated server crash");
4627 });
4628 monitor_axum_task(
4630 handle,
4631 "127.0.0.1:9999".to_string(),
4632 noop_rt(),
4633 "test-monitor".into(),
4634 )
4635 .await;
4636 }
4637
4638 #[test]
4643 fn http_auth_basic_debug_redacts_password() {
4644 let auth = HttpAuth::Basic {
4645 username: "admin".to_string(),
4646 password: "hunter2".to_string(),
4647 };
4648 let debug = format!("{:?}", auth);
4649 assert!(
4650 !debug.contains("hunter2"),
4651 "password must be redacted: {debug}"
4652 );
4653 assert!(debug.contains("admin"), "username should appear: {debug}");
4654 }
4655
4656 #[test]
4657 fn http_auth_bearer_debug_redacts_token() {
4658 let auth = HttpAuth::Bearer {
4659 token: "eyJhbGciOiJIUzI1NiJ9.secret".to_string(),
4660 };
4661 let debug = format!("{:?}", auth);
4662 assert!(
4663 !debug.contains("eyJhbGci"),
4664 "token must be redacted: {debug}"
4665 );
4666 }
4667
4668 #[test]
4669 fn http_auth_none_debug_shows_variant() {
4670 let debug = format!("{:?}", HttpAuth::None);
4671 assert!(
4672 debug.contains("None"),
4673 "None variant should appear: {debug}"
4674 );
4675 }
4676
4677 #[test]
4678 fn http_endpoint_config_debug_redacts_auth_credentials() {
4679 let config = HttpEndpointConfig::from_uri(
4680 "http://localhost/api?authMethod=Basic&authUsername=admin&authPassword=secret123",
4681 )
4682 .unwrap();
4683 let debug = format!("{:?}", config);
4684 assert!(
4685 !debug.contains("secret123"),
4686 "password must be redacted in HttpEndpointConfig debug: {debug}"
4687 );
4688 }
4689
4690 use crate::registry::{HttpRouteRegistry, MountMode, StaticMount};
4695 use tower_http::services::ServeDir;
4696
4697 fn make_test_registry() -> HttpRouteRegistry {
4698 HttpRouteRegistry::new()
4699 }
4700
4701 fn make_test_state(registry: HttpRouteRegistry) -> AppState {
4702 AppState {
4703 registry,
4704 max_request_body: 2 * 1024 * 1024,
4705 max_response_body: 10 * 1024 * 1024,
4706 inflight: Arc::new(tokio::sync::Semaphore::new(1024)),
4707 }
4708 }
4709
4710 #[allow(clippy::await_holding_lock)]
4711 #[tokio::test]
4712 async fn test_static_file_serving_serves_file_contents() {
4713 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
4714 ServerRegistry::reset();
4715
4716 let temp_dir =
4718 std::env::temp_dir().join(format!("http_static_test_{}", std::process::id()));
4719 std::fs::create_dir_all(&temp_dir).unwrap();
4720 std::fs::write(temp_dir.join("hello.txt"), "Hello, static world!").unwrap();
4721 std::fs::write(temp_dir.join("style.css"), "body { color: red; }").unwrap();
4722
4723 let canonical_dir = std::fs::canonicalize(&temp_dir).unwrap();
4724
4725 let registry = make_test_registry();
4726 let serve_dir = ServeDir::new(&canonical_dir)
4727 .precompressed_gzip()
4728 .precompressed_br()
4729 .append_index_html_on_directories(true);
4730
4731 let mount = StaticMount {
4732 mount_path: "/".to_string(),
4733 mode: MountMode::Static,
4734 dir: canonical_dir.clone(),
4735 cache_control: "public, max-age=3600".to_string(),
4736 error_pages: std::collections::HashMap::new(),
4737 serve_dir,
4738 };
4739 registry.register_static_mount(mount).await.unwrap();
4740
4741 let state = make_test_state(registry);
4742
4743 let req = Request::builder()
4745 .uri("/hello.txt")
4746 .body(AxumBody::empty())
4747 .unwrap();
4748 let resp = static_dispatch::dispatch_static(&state, req, "/hello.txt").await;
4749 assert_eq!(resp.status(), StatusCode::OK);
4750 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4751 .await
4752 .unwrap();
4753 assert_eq!(&body[..], b"Hello, static world!");
4754
4755 let req = Request::builder()
4757 .uri("/style.css")
4758 .body(AxumBody::empty())
4759 .unwrap();
4760 let resp = static_dispatch::dispatch_static(&state, req, "/style.css").await;
4761 assert_eq!(resp.status(), StatusCode::OK);
4762 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4763 .await
4764 .unwrap();
4765 assert_eq!(&body[..], b"body { color: red; }");
4766
4767 let req = Request::builder()
4769 .uri("/missing.txt")
4770 .body(AxumBody::empty())
4771 .unwrap();
4772 let resp = static_dispatch::dispatch_static(&state, req, "/missing.txt").await;
4773 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4774
4775 std::fs::remove_dir_all(&temp_dir).ok();
4777 }
4778
4779 #[allow(clippy::await_holding_lock)]
4780 #[tokio::test]
4781 async fn test_spa_fallback_serves_index_for_unknown_paths() {
4782 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
4783 ServerRegistry::reset();
4784
4785 let temp_dir = std::env::temp_dir().join(format!("http_spa_test_{}", std::process::id()));
4786 std::fs::create_dir_all(&temp_dir).unwrap();
4787 std::fs::write(temp_dir.join("index.html"), "<h1>SPA App</h1>").unwrap();
4788 std::fs::write(temp_dir.join("app.js"), "console.log('app')").unwrap();
4789
4790 let canonical_dir = std::fs::canonicalize(&temp_dir).unwrap();
4791
4792 let registry = make_test_registry();
4793 let serve_dir = ServeDir::new(&canonical_dir)
4794 .precompressed_gzip()
4795 .precompressed_br()
4796 .append_index_html_on_directories(true);
4797
4798 let mount = StaticMount {
4799 mount_path: "/".to_string(),
4800 mode: MountMode::Spa,
4801 dir: canonical_dir.clone(),
4802 cache_control: "public, max-age=0".to_string(),
4803 error_pages: std::collections::HashMap::new(),
4804 serve_dir,
4805 };
4806 registry.register_static_mount(mount).await.unwrap();
4808
4809 let state = make_test_state(registry);
4810
4811 let req = Request::builder()
4813 .method("GET")
4814 .uri("/dashboard")
4815 .header("Accept", "text/html")
4816 .body(AxumBody::empty())
4817 .unwrap();
4818 let resp = static_dispatch::dispatch_static(&state, req, "/dashboard").await;
4819 assert_eq!(resp.status(), StatusCode::OK);
4820 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4821 .await
4822 .unwrap();
4823 assert_eq!(&body[..], b"<h1>SPA App</h1>");
4824
4825 let req = Request::builder()
4827 .method("GET")
4828 .uri("/app.js")
4829 .body(AxumBody::empty())
4830 .unwrap();
4831 let resp = static_dispatch::dispatch_static(&state, req, "/app.js").await;
4832 assert_eq!(resp.status(), StatusCode::OK);
4833 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4834 .await
4835 .unwrap();
4836 assert_eq!(&body[..], b"console.log('app')");
4837
4838 let req = Request::builder()
4840 .method("GET")
4841 .uri("/api/data")
4842 .header("Accept", "application/json")
4843 .body(AxumBody::empty())
4844 .unwrap();
4845 let resp = static_dispatch::dispatch_static(&state, req, "/api/data").await;
4846 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4847
4848 let req = Request::builder()
4850 .method("GET")
4851 .uri("/style.css")
4852 .header("Accept", "text/html")
4853 .body(AxumBody::empty())
4854 .unwrap();
4855 let resp = static_dispatch::dispatch_static(&state, req, "/style.css").await;
4856 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4857
4858 std::fs::remove_dir_all(&temp_dir).ok();
4860 }
4861
4862 #[allow(clippy::await_holding_lock)]
4863 #[tokio::test]
4864 async fn test_error_page_mapping_serves_custom_404() {
4865 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
4866 ServerRegistry::reset();
4867
4868 let temp_dir = std::env::temp_dir().join(format!("http_error_test_{}", std::process::id()));
4869 let errors_dir = temp_dir.join("errors");
4870 std::fs::create_dir_all(&errors_dir).unwrap();
4871 std::fs::write(temp_dir.join("index.html"), "<h1>Home</h1>").unwrap();
4872 std::fs::write(errors_dir.join("404.html"), "<h1>Custom 404</h1>").unwrap();
4873
4874 let canonical_dir = std::fs::canonicalize(&temp_dir).unwrap();
4875 let canonical_404 = std::fs::canonicalize(errors_dir.join("404.html")).unwrap();
4876
4877 let registry = make_test_registry();
4878 let serve_dir = ServeDir::new(&canonical_dir)
4879 .precompressed_gzip()
4880 .precompressed_br()
4881 .append_index_html_on_directories(true);
4882
4883 let mut error_pages = std::collections::HashMap::new();
4884 error_pages.insert(404, canonical_404);
4885
4886 let mount = StaticMount {
4887 mount_path: "/".to_string(),
4888 mode: MountMode::Static,
4889 dir: canonical_dir.clone(),
4890 cache_control: "public, max-age=0".to_string(),
4891 error_pages,
4892 serve_dir,
4893 };
4894 registry.register_static_mount(mount).await.unwrap();
4895
4896 let state = make_test_state(registry);
4897
4898 let req = Request::builder()
4900 .method("GET")
4901 .uri("/missing.html")
4902 .body(AxumBody::empty())
4903 .unwrap();
4904 let resp = static_dispatch::dispatch_static(&state, req, "/missing.html").await;
4905 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4906 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4907 .await
4908 .unwrap();
4909 assert_eq!(&body[..], b"<h1>Custom 404</h1>");
4910
4911 let req = Request::builder()
4913 .method("GET")
4914 .uri("/index.html")
4915 .body(AxumBody::empty())
4916 .unwrap();
4917 let resp = static_dispatch::dispatch_static(&state, req, "/index.html").await;
4918 assert_eq!(resp.status(), StatusCode::OK);
4919 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4920 .await
4921 .unwrap();
4922 assert_eq!(&body[..], b"<h1>Home</h1>");
4923
4924 std::fs::remove_dir_all(&temp_dir).ok();
4926 }
4927}