1pub mod auth;
2pub mod bundle;
3pub mod config;
4pub mod health;
5pub mod registry;
6pub mod static_config;
7pub mod static_dispatch;
8pub mod static_endpoint;
9use crate::config::parse_ok_status_code_range;
10pub use bundle::HttpBundle;
11pub use bundle::HttpStaticBundle;
12pub use config::HttpConfig;
13pub use health::HttpHealthCheck;
14pub use registry::HttpRouteRegistry;
15pub use static_config::HttpStaticConfig;
16pub use static_endpoint::{HttpStaticComponent, HttpStaticConsumer, HttpStaticEndpoint};
17
18use std::collections::HashMap;
19use std::future::Future;
20use std::net::IpAddr;
21use std::pin::Pin;
22use std::sync::{Arc, Mutex, OnceLock};
23use std::task::{Context, Poll};
24use std::time::Duration;
25
26use tokio::sync::OnceCell;
27use tower::Layer;
28use tower::Service;
29use tracing::debug;
30
31use axum::body::BodyDataStream;
32use camel_auth::bearer_token_layer::BearerTokenLayer;
33use camel_auth::oauth2::TokenProvider;
34use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, StreamBody, StreamMetadata};
35use camel_component_api::{Component, Consumer, Endpoint, ProducerContext, RuntimeObservability};
36use camel_component_api::{UriComponents, UriConfig, parse_uri};
37use futures::TryStreamExt;
38use futures::stream::BoxStream;
39
40#[derive(Debug, Clone)]
101pub struct HttpEndpointConfig {
102 pub base_url: String,
103 pub http_method: Option<String>,
104 pub throw_exception_on_failure: bool,
105 pub ok_status_code_range: (u16, u16),
106 pub response_timeout: Option<Duration>,
107 pub query_params: HashMap<String, String>,
108 pub allow_private_ips: bool,
109 pub blocked_hosts: Vec<String>,
110 pub max_body_size: usize,
111 pub read_timeout_ms: u64,
112 pub max_response_bytes: usize,
113 pub auth: HttpAuth,
114 pub token_provider: Option<Arc<dyn TokenProvider>>,
115 pub user_agent: Option<String>,
116 pub cookie_handling: CookieHandling,
117 pub bridge_endpoint: bool,
118 pub connection_close: bool,
119 pub skip_request_headers: Vec<String>,
120 pub skip_response_headers: Vec<String>,
121}
122
123#[derive(Clone, PartialEq)]
124pub enum HttpAuth {
125 None,
126 Basic { username: String, password: String },
127 Bearer { token: String },
128}
129
130impl std::fmt::Debug for HttpAuth {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 match self {
133 HttpAuth::None => f.write_str("None"),
134 HttpAuth::Basic { username, .. } => f
135 .debug_struct("Basic")
136 .field("username", username)
137 .field("password", &"***")
138 .finish(),
139 HttpAuth::Bearer { .. } => f.debug_struct("Bearer").field("token", &"***").finish(),
140 }
141 }
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub enum CookieHandling {
146 Disabled,
147 InMemory,
148}
149
150const HTTP_CAMEL_OPTIONS: &[&str] = &[
152 "httpMethod",
153 "throwExceptionOnFailure",
154 "okStatusCodeRange",
155 "followRedirects",
156 "connectTimeout",
157 "responseTimeout",
158 "allowPrivateIps",
159 "blockedHosts",
160 "maxBodySize",
161 "readTimeout",
162 "maxResponseBytes",
163 "authMethod",
164 "authUsername",
165 "authPassword",
166 "authBearerToken",
167 "userAgent",
168 "cookieHandling",
169 "bridgeEndpoint",
170 "connectionClose",
171 "skipRequestHeaders",
172 "skipResponseHeaders",
173];
174
175impl UriConfig for HttpEndpointConfig {
176 fn scheme() -> &'static str {
178 "http"
179 }
180
181 fn from_uri(uri: &str) -> Result<Self, CamelError> {
182 let parts = parse_uri(uri)?;
183 Self::from_components(parts)
184 }
185
186 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
187 if parts.scheme != "http" && parts.scheme != "https" {
189 return Err(CamelError::InvalidUri(format!(
190 "expected scheme 'http' or 'https', got '{}'",
191 parts.scheme
192 )));
193 }
194
195 let base_url = format!("{}:{}", parts.scheme, parts.path);
198
199 let http_method = parts.params.get("httpMethod").cloned();
200
201 let throw_exception_on_failure = match parts.params.get("throwExceptionOnFailure") {
202 Some(v) => parse_bool_param_http(v).map_err(|e| {
203 CamelError::InvalidUri(format!("invalid value for throwExceptionOnFailure: {e}"))
204 })?,
205 None => true,
206 };
207
208 let ok_status_code_range = match parts.params.get("okStatusCodeRange") {
210 Some(v) => parse_ok_status_code_range(v)?,
211 None => (200, 299),
212 };
213
214 let response_timeout = match parts.params.get("responseTimeout") {
215 Some(v) => Some(v.parse::<u64>().map(Duration::from_millis).map_err(|e| {
216 CamelError::InvalidUri(format!("invalid value for responseTimeout: {e}"))
217 })?),
218 None => None,
219 };
220
221 let allow_private_ips = match parts.params.get("allowPrivateIps") {
223 Some(v) => parse_bool_param_http(v).map_err(|e| {
224 CamelError::InvalidUri(format!("invalid value for allowPrivateIps: {e}"))
225 })?,
226 None => false, };
228
229 let blocked_hosts = parts
231 .params
232 .get("blockedHosts")
233 .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
234 .unwrap_or_default();
235
236 let max_body_size = match parts.params.get("maxBodySize") {
237 Some(v) => v.parse::<usize>().map_err(|e| {
238 CamelError::InvalidUri(format!("invalid value for maxBodySize: {e}"))
239 })?,
240 None => 10 * 1024 * 1024, };
242
243 let read_timeout_ms = match parts.params.get("readTimeout") {
244 Some(v) => v.parse::<u64>().map_err(|e| {
245 CamelError::InvalidUri(format!("invalid value for readTimeout: {e}"))
246 })?,
247 None => 30_000, };
249
250 let max_response_bytes = match parts.params.get("maxResponseBytes") {
251 Some(v) => v.parse::<usize>().map_err(|e| {
252 CamelError::InvalidUri(format!("invalid value for maxResponseBytes: {e}"))
253 })?,
254 None => 10 * 1024 * 1024, };
256
257 let auth = parse_auth_from_params(&parts.params)?;
258
259 let user_agent = parts.params.get("userAgent").cloned();
260
261 let cookie_handling = match parts.params.get("cookieHandling") {
262 Some(v) if v.eq_ignore_ascii_case("inmemory") => CookieHandling::InMemory,
263 Some(v) if v.eq_ignore_ascii_case("disabled") => CookieHandling::Disabled,
264 Some(v) => {
265 return Err(CamelError::InvalidUri(format!(
266 "invalid value for cookieHandling: {v} (expected Disabled or InMemory)"
267 )));
268 }
269 None => CookieHandling::Disabled,
270 };
271
272 let bridge_endpoint = match parts.params.get("bridgeEndpoint") {
273 Some(v) => parse_bool_param_http(v).map_err(|e| {
274 CamelError::InvalidUri(format!("invalid value for bridgeEndpoint: {e}"))
275 })?,
276 None => false,
277 };
278
279 let connection_close = match parts.params.get("connectionClose") {
280 Some(v) => parse_bool_param_http(v).map_err(|e| {
281 CamelError::InvalidUri(format!("invalid value for connectionClose: {e}"))
282 })?,
283 None => false,
284 };
285
286 let skip_request_headers = parts
287 .params
288 .get("skipRequestHeaders")
289 .map(|v| {
290 v.split(',')
291 .map(str::trim)
292 .filter(|s| !s.is_empty())
293 .map(|s| s.to_ascii_lowercase())
294 .collect::<Vec<_>>()
295 })
296 .unwrap_or_default();
297
298 let skip_response_headers = parts
299 .params
300 .get("skipResponseHeaders")
301 .map(|v| {
302 v.split(',')
303 .map(str::trim)
304 .filter(|s| !s.is_empty())
305 .map(|s| s.to_ascii_lowercase())
306 .collect::<Vec<_>>()
307 })
308 .unwrap_or_default();
309
310 let query_params: HashMap<String, String> = parts
312 .params
313 .into_iter()
314 .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
315 .collect();
316
317 Ok(Self {
318 base_url,
319 http_method,
320 throw_exception_on_failure,
321 ok_status_code_range,
322 response_timeout,
323 query_params,
324 allow_private_ips,
325 blocked_hosts,
326 max_body_size,
327 read_timeout_ms,
328 max_response_bytes,
329 auth,
330 token_provider: None,
331 user_agent,
332 cookie_handling,
333 bridge_endpoint,
334 connection_close,
335 skip_request_headers,
336 skip_response_headers,
337 })
338 }
339}
340
341fn parse_auth_from_params(params: &HashMap<String, String>) -> Result<HttpAuth, CamelError> {
342 let Some(method) = params.get("authMethod") else {
343 return Ok(HttpAuth::None);
344 };
345
346 if method.eq_ignore_ascii_case("none") {
347 return Ok(HttpAuth::None);
348 }
349
350 if method.eq_ignore_ascii_case("basic") {
351 let username = params.get("authUsername").cloned().ok_or_else(|| {
352 CamelError::InvalidUri("authUsername is required for authMethod=Basic".to_string())
353 })?;
354 let password = params.get("authPassword").cloned().ok_or_else(|| {
355 CamelError::InvalidUri("authPassword is required for authMethod=Basic".to_string())
356 })?;
357 return Ok(HttpAuth::Basic { username, password });
358 }
359
360 if method.eq_ignore_ascii_case("bearer") {
361 let token = params.get("authBearerToken").cloned().ok_or_else(|| {
362 CamelError::InvalidUri("authBearerToken is required for authMethod=Bearer".to_string())
363 })?;
364 return Ok(HttpAuth::Bearer { token });
365 }
366
367 Err(CamelError::InvalidUri(format!(
368 "invalid value for authMethod: {method} (expected None, Basic, or Bearer)"
369 )))
370}
371
372fn parse_bool_param_http(value: &str) -> Result<bool, CamelError> {
373 match value.to_ascii_lowercase().as_str() {
374 "true" | "1" | "yes" => Ok(true),
375 "false" | "0" | "no" => Ok(false),
376 _ => Err(CamelError::InvalidUri(format!(
377 "invalid boolean value: '{value}'"
378 ))),
379 }
380}
381
382impl HttpEndpointConfig {
383 pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
384 let parts = parse_uri(uri)?;
385 let mut endpoint = Self::from_components(parts.clone())?;
386 if endpoint.response_timeout.is_none() {
387 endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
388 }
389 if !parts.params.contains_key("allowPrivateIps") {
390 endpoint.allow_private_ips = config.allow_private_ips;
391 }
392 if !parts.params.contains_key("blockedHosts") {
393 endpoint.blocked_hosts = config.blocked_hosts.clone();
394 }
395 if !parts.params.contains_key("maxBodySize") {
396 endpoint.max_body_size = config.max_body_size;
397 }
398 if !parts.params.contains_key("readTimeout") {
399 endpoint.read_timeout_ms = config.read_timeout_ms;
400 }
401 if !parts.params.contains_key("maxResponseBytes") {
402 endpoint.max_response_bytes = config.max_response_bytes;
403 }
404 if !parts.params.contains_key("okStatusCodeRange")
405 && let Some(range) = &config.ok_status_code_range
406 {
407 endpoint.ok_status_code_range = parse_ok_status_code_range(range)?;
408 }
409
410 Ok(endpoint)
411 }
412}
413
414#[derive(Debug, Clone)]
420pub struct HttpServerConfig {
421 pub host: String,
423 pub port: u16,
425 pub path: String,
427 pub max_request_body: usize,
429 pub max_response_body: usize,
431 pub max_inflight_requests: usize,
433}
434
435impl UriConfig for HttpServerConfig {
436 fn scheme() -> &'static str {
438 "http"
439 }
440
441 fn from_uri(uri: &str) -> Result<Self, CamelError> {
442 let parts = parse_uri(uri)?;
443 Self::from_components(parts)
444 }
445
446 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
447 if parts.scheme != "http" && parts.scheme != "https" {
449 return Err(CamelError::InvalidUri(format!(
450 "expected scheme 'http' or 'https', got '{}'",
451 parts.scheme
452 )));
453 }
454
455 let authority_and_path = parts.path.trim_start_matches('/');
458
459 let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
461 (&authority_and_path[..idx], &authority_and_path[idx..])
462 } else {
463 (authority_and_path, "/")
464 };
465
466 let path = if path_suffix.is_empty() {
467 "/"
468 } else {
469 path_suffix
470 }
471 .to_string();
472
473 let (host, port) = if let Some(colon) = authority.rfind(':') {
475 let port_str = &authority[colon + 1..];
476 match port_str.parse::<u16>() {
477 Ok(p) => (authority[..colon].to_string(), p),
478 Err(_) => {
479 return Err(CamelError::InvalidUri(format!(
480 "invalid port '{}' in authority",
481 port_str
482 )));
483 }
484 }
485 } else {
486 let default_port = if parts.scheme == "https" { 443 } else { 80 };
488 (authority.to_string(), default_port)
489 };
490
491 let max_request_body = parts
492 .params
493 .get("maxRequestBody")
494 .and_then(|v| v.parse::<usize>().ok())
495 .unwrap_or(2 * 1024 * 1024); let max_response_body = parts
498 .params
499 .get("maxResponseBody")
500 .and_then(|v| v.parse::<usize>().ok())
501 .unwrap_or(10 * 1024 * 1024); let max_inflight_requests = parts
504 .params
505 .get("maxInflightRequests")
506 .and_then(|v| v.parse::<usize>().ok())
507 .unwrap_or(1024);
508
509 Ok(Self {
510 host,
511 port,
512 path,
513 max_request_body,
514 max_response_body,
515 max_inflight_requests,
516 })
517 }
518}
519
520impl HttpServerConfig {
521 pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
522 let parts = parse_uri(uri)?;
523 let mut server = Self::from_components(parts.clone())?;
524 if !parts.params.contains_key("maxRequestBody") {
525 server.max_request_body = config.max_request_body;
526 }
527 if !parts.params.contains_key("maxResponseBody") {
528 server.max_response_body = config.max_body_size;
530 }
531 Ok(server)
532 }
533}
534
535pub enum HttpReplyBody {
543 Bytes(bytes::Bytes),
544 Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
545}
546
547pub struct RequestEnvelope {
552 pub method: String,
553 pub path: String,
554 pub query: String,
555 pub headers: http::HeaderMap,
556 pub body: StreamBody,
557 pub reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
558}
559
560pub struct HttpReply {
564 pub status: u16,
565 pub headers: Vec<(String, String)>,
566 pub body: HttpReplyBody,
567}
568
569type ServerKey = (String, u16);
574
575struct ServerHandle {
577 registry: HttpRouteRegistry,
578 max_request_body: usize,
579 max_response_body: usize,
580 max_inflight_requests: usize,
581}
582
583pub struct ServerRegistry {
585 inner: Mutex<HashMap<ServerKey, Arc<OnceCell<ServerHandle>>>>,
586}
587
588impl ServerRegistry {
589 pub fn global() -> &'static Self {
591 static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
592 INSTANCE.get_or_init(|| ServerRegistry {
593 inner: Mutex::new(HashMap::new()),
594 })
595 }
596
597 pub async fn get_or_spawn(
600 &'static self,
601 host: &str,
602 port: u16,
603 max_request_body: usize,
604 max_response_body: usize,
605 max_inflight_requests: usize,
606 ) -> Result<HttpRouteRegistry, CamelError> {
607 let host_owned = host.to_string();
608
609 let cell = {
610 let mut guard = self.inner.lock().map_err(|_| {
611 CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
612 })?;
613 let key = (host.to_string(), port);
614 guard
615 .entry(key)
616 .or_insert_with(|| Arc::new(OnceCell::new()))
617 .clone()
618 };
619
620 if let Some(existing) = cell.get()
621 && existing.max_request_body != max_request_body
622 {
623 return Err(CamelError::EndpointCreationFailed(format!(
624 "incompatible maxRequestBody for shared server (host={host}, port={port}): {} vs {}",
625 existing.max_request_body, max_request_body
626 )));
627 }
628
629 if let Some(existing) = cell.get()
630 && existing.max_response_body != max_response_body
631 {
632 return Err(CamelError::EndpointCreationFailed(format!(
633 "incompatible maxResponseBody for shared server (host={host}, port={port}): {} vs {}",
634 existing.max_response_body, max_response_body
635 )));
636 }
637
638 if let Some(existing) = cell.get()
639 && existing.max_inflight_requests != max_inflight_requests
640 {
641 return Err(CamelError::EndpointCreationFailed(format!(
642 "incompatible maxInflightRequests for shared server (host={host}, port={port}): {} vs {}",
643 existing.max_inflight_requests, max_inflight_requests
644 )));
645 }
646
647 let handle = cell
648 .get_or_try_init(|| async {
649 let addr = format!("{host_owned}:{port}");
650 let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
651 CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
652 })?;
653 let registry = HttpRouteRegistry::new();
654 let inflight = Arc::new(tokio::sync::Semaphore::new(max_inflight_requests));
655 let task = tokio::spawn(run_axum_server(
656 listener,
657 registry.clone(),
658 max_request_body,
659 max_response_body,
660 Arc::clone(&inflight),
661 ));
662 let addr_for_monitor = format!("{host_owned}:{port}");
663 tokio::spawn(monitor_axum_task(task, addr_for_monitor));
664 Ok::<ServerHandle, CamelError>(ServerHandle {
665 registry,
666 max_request_body,
667 max_response_body,
668 max_inflight_requests,
669 })
670 })
671 .await?;
672
673 Ok(handle.registry.clone())
674 }
675
676 #[cfg(test)]
683 pub fn reset() {
684 let instance = Self::global();
685 let mut guard = instance
686 .inner
687 .lock()
688 .expect("ServerRegistry lock poisoned during test reset");
689 guard.clear();
690 }
691}
692
693use axum::{
698 Router,
699 body::Body as AxumBody,
700 extract::{Request, State},
701 http::{Response, StatusCode},
702 response::IntoResponse,
703};
704
705#[derive(Clone)]
706pub(crate) struct AppState {
707 registry: HttpRouteRegistry,
708 max_request_body: usize,
709 max_response_body: usize,
710 inflight: Arc<tokio::sync::Semaphore>,
711}
712
713async fn run_axum_server(
714 listener: tokio::net::TcpListener,
715 registry: HttpRouteRegistry,
716 max_request_body: usize,
717 max_response_body: usize,
718 inflight: Arc<tokio::sync::Semaphore>,
719) {
720 let state = AppState {
721 registry,
722 max_request_body,
723 max_response_body,
724 inflight,
725 };
726 let app = Router::new().fallback(dispatch_handler).with_state(state);
727
728 axum::serve(listener, app).await.unwrap_or_else(|e| {
729 tracing::error!(error = %e, "Axum server error");
730 });
731}
732
733async fn monitor_axum_task(handle: tokio::task::JoinHandle<()>, addr: String) {
741 match handle.await {
742 Ok(()) => {
743 }
745 Err(join_err) => {
746 tracing::error!(
747 addr = %addr,
748 error = %join_err,
749 "Axum server task exited unexpectedly — all routes on this port are now dead"
750 );
751 }
752 }
753}
754
755async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
756 let path = req.uri().path().to_owned();
757
758 let api_sender = {
760 let inner = state.registry.inner.read().await;
761 inner.api_routes.get(&path).cloned()
762 }; if let Some(sender) = api_sender {
765 let method = req.method().to_string();
766 let query = req.uri().query().unwrap_or("").to_string();
767 let headers = req.headers().clone();
768
769 let content_length: Option<u64> = headers
771 .get(http::header::CONTENT_LENGTH)
772 .and_then(|v| v.to_str().ok())
773 .and_then(|s| s.parse().ok());
774
775 if let Some(len) = content_length
776 && len > state.max_request_body as u64
777 {
778 return Response::builder()
779 .status(StatusCode::PAYLOAD_TOO_LARGE)
780 .body(AxumBody::from("Request body exceeds configured limit"))
781 .expect("infallible"); }
783
784 let _permit = match Arc::clone(&state.inflight).try_acquire_owned() {
785 Ok(permit) => permit,
786 Err(_) => {
787 return Response::builder()
788 .status(StatusCode::SERVICE_UNAVAILABLE)
789 .body(AxumBody::from("Service Unavailable"))
790 .expect("infallible"); }
792 };
793
794 let content_type = headers
796 .get(http::header::CONTENT_TYPE)
797 .and_then(|v| v.to_str().ok())
798 .map(|s| s.to_string());
799
800 let data_stream: BodyDataStream = req.into_body().into_data_stream();
801 let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
802 let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
803
804 let stream_body = StreamBody {
805 stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
806 metadata: StreamMetadata {
807 size_hint: content_length,
808 content_type,
809 origin: None,
810 },
811 };
812
813 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
814 let envelope = RequestEnvelope {
815 method,
816 path,
817 query,
818 headers,
819 body: stream_body,
820 reply_tx,
821 };
822
823 if sender.send(envelope).await.is_err() {
824 return Response::builder()
825 .status(StatusCode::SERVICE_UNAVAILABLE)
826 .body(AxumBody::from("Consumer unavailable"))
827 .expect("infallible"); }
829
830 match reply_rx.await {
831 Ok(reply) => {
832 let reply = match reply.body {
833 HttpReplyBody::Bytes(b)
834 if exceeds_max_response_body(b.len(), state.max_response_body) =>
835 {
836 HttpReply {
837 status: 500,
838 headers: vec![],
839 body: HttpReplyBody::Bytes(bytes::Bytes::from(
840 "Response body exceeds configured limit",
841 )),
842 }
843 }
844 _ => reply,
845 };
846
847 let status =
848 StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
849 let mut builder = Response::builder().status(status);
850 for (k, v) in &reply.headers {
851 builder = builder.header(k.as_str(), v.as_str());
852 }
853 match reply.body {
854 HttpReplyBody::Bytes(b) => {
855 builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
856 Response::builder()
857 .status(StatusCode::INTERNAL_SERVER_ERROR)
858 .body(AxumBody::from("Invalid response headers from consumer"))
859 .expect("infallible") })
861 }
862 HttpReplyBody::Stream(stream) => builder
863 .body(AxumBody::from_stream(stream))
864 .unwrap_or_else(|_| {
865 Response::builder()
866 .status(StatusCode::INTERNAL_SERVER_ERROR)
867 .body(AxumBody::from("Invalid response headers from consumer"))
868 .expect("infallible") }),
870 }
871 }
872 Err(_) => Response::builder()
873 .status(StatusCode::INTERNAL_SERVER_ERROR)
874 .body(AxumBody::from("Pipeline error"))
875 .expect("infallible"), }
877 } else {
878 static_dispatch::dispatch_static(&state, req, &path).await
880 }
881}
882
883fn exceeds_max_response_body(len: usize, max: usize) -> bool {
884 len > max
885}
886
887fn title_case_header(name: &str) -> String {
888 name.split('-')
889 .map(|part| {
890 let mut chars = part.chars();
891 match chars.next() {
892 None => String::new(),
893 Some(first) => first.to_uppercase().chain(chars.as_str().chars()).collect(),
894 }
895 })
896 .collect::<Vec<_>>()
897 .join("-")
898}
899
900pub struct HttpConsumer {
905 config: HttpServerConfig,
906 #[allow(dead_code)]
909 runtime: Arc<dyn RuntimeObservability>,
910}
911
912impl HttpConsumer {
913 pub fn new(config: HttpServerConfig, runtime: Arc<dyn RuntimeObservability>) -> Self {
914 Self { config, runtime }
915 }
916}
917
918#[async_trait::async_trait]
919impl Consumer for HttpConsumer {
920 async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
921 use camel_component_api::{Body, Exchange, Message};
922
923 let registry = ServerRegistry::global()
924 .get_or_spawn(
925 &self.config.host,
926 self.config.port,
927 self.config.max_request_body,
928 self.config.max_response_body,
929 self.config.max_inflight_requests,
930 )
931 .await?;
932
933 let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
935 registry
936 .register_api_route(self.config.path.clone(), env_tx)
937 .await;
938
939 let path = self.config.path.clone();
940 let registry_for_cleanup = registry.clone();
941 let cancel_token = ctx.cancel_token();
942 loop {
943 tokio::select! {
944 _ = ctx.cancelled() => {
945 break;
946 }
947 envelope = env_rx.recv() => {
948 let Some(envelope) = envelope else { break; };
949
950 let mut msg = Message::default();
952
953 msg.set_header("CamelHttpMethod",
955 serde_json::Value::String(envelope.method.clone()));
956 msg.set_header("CamelHttpPath",
957 serde_json::Value::String(envelope.path.clone()));
958 msg.set_header("CamelHttpQuery",
959 serde_json::Value::String(envelope.query.clone()));
960
961 for (k, v) in &envelope.headers {
963 if let Ok(val_str) = v.to_str() {
964 msg.set_header(
965 title_case_header(k.as_str()),
966 serde_json::Value::String(val_str.to_string()),
967 );
968 }
969 }
970
971 msg.body = Body::Stream(envelope.body);
974
975 #[allow(unused_mut)]
976 let mut exchange = Exchange::new(msg);
977
978 #[cfg(feature = "otel")]
980 {
981 let headers: HashMap<String, String> = envelope
982 .headers
983 .iter()
984 .filter_map(|(k, v)| {
985 Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
986 })
987 .collect();
988 camel_otel::extract_into_exchange(&mut exchange, &headers);
989 }
990
991 let reply_tx = envelope.reply_tx;
992 let sender = ctx.sender().clone();
993 let path_clone = path.clone();
994 let cancel = cancel_token.clone();
995
996 tokio::spawn(async move {
1016 if cancel.is_cancelled() {
1024 let _ = reply_tx.send(HttpReply {
1025 status: 503,
1026 headers: vec![],
1027 body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
1028 });
1029 return;
1030 }
1031
1032 let (tx, rx) = tokio::sync::oneshot::channel();
1034 let envelope = camel_component_api::consumer::ExchangeEnvelope {
1035 exchange,
1036 reply_tx: Some(tx),
1037 };
1038
1039 let result = match sender.send(envelope).await {
1040 Ok(()) => rx.await.map_err(|_| camel_component_api::CamelError::ChannelClosed),
1041 Err(_) => Err(camel_component_api::CamelError::ChannelClosed),
1042 }
1043 .and_then(|r| r);
1044
1045 let reply = match result {
1046 Ok(out) => {
1047 let status = out
1048 .input
1049 .header("CamelHttpResponseCode")
1050 .and_then(|v| {
1051 let raw = v.as_u64()
1052 .or_else(|| v.as_str().and_then(|s| s.parse().ok()))?;
1053 let code = raw as u16;
1054 (100..1000).contains(&code).then_some(code)
1055 })
1056 .unwrap_or(200);
1057
1058 let user_content_type = out
1059 .input
1060 .header("Content-Type")
1061 .and_then(|v| v.as_str().map(|s| s.to_string()));
1062
1063 let (reply_body, inferred_content_type): (HttpReplyBody, Option<String>) = match out.input.body {
1064 Body::Empty => (HttpReplyBody::Bytes(bytes::Bytes::new()), None),
1065 Body::Bytes(b) => (HttpReplyBody::Bytes(b), None),
1066 Body::Text(s) => (HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())), Some("text/plain; charset=utf-8".to_string())),
1067 Body::Xml(s) => (HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())), Some("application/xml".to_string())),
1068 Body::Json(v) => (HttpReplyBody::Bytes(bytes::Bytes::from(
1069 v.to_string().into_bytes(),
1070 )), Some("application/json".to_string())),
1071 Body::Stream(s) => {
1072 let ct = s.metadata.content_type.clone();
1073 match s.stream.lock().await.take() {
1074 Some(stream) => (
1075 HttpReplyBody::Stream(stream),
1076 ct,
1077 ),
1078 None => {
1079 tracing::error!(
1080 "Body::Stream already consumed before HTTP reply — returning 500"
1081 );
1082 let error_reply = HttpReply {
1083 status: 500,
1084 headers: vec![],
1085 body: HttpReplyBody::Bytes(bytes::Bytes::new()),
1086 };
1087 if reply_tx.send(error_reply).is_err() {
1088 debug!("reply_tx dropped before error reply could be sent");
1089 }
1090 return;
1091 }
1092 }
1093 }
1094 };
1095
1096 let mut resp_headers: Vec<(String, String)> = out
1097 .input
1098 .headers
1099 .iter()
1100 .filter(|(k, _)| !k.starts_with("Camel"))
1101 .filter(|(k, _)| {
1102 !matches!(
1103 k.to_lowercase().as_str(),
1104 "content-length"
1105 | "content-type"
1106 | "transfer-encoding"
1107 | "connection"
1108 | "cache-control"
1109 | "date"
1110 | "pragma"
1111 | "trailer"
1112 | "upgrade"
1113 | "via"
1114 | "warning"
1115 | "host"
1116 | "user-agent"
1117 | "accept"
1118 | "accept-encoding"
1119 | "accept-language"
1120 | "accept-charset"
1121 | "authorization"
1122 | "proxy-authorization"
1123 | "cookie"
1124 | "expect"
1125 | "from"
1126 | "if-match"
1127 | "if-modified-since"
1128 | "if-none-match"
1129 | "if-range"
1130 | "if-unmodified-since"
1131 | "max-forwards"
1132 | "proxy-connection"
1133 | "range"
1134 | "referer"
1135 | "te"
1136 )
1137 })
1138 .filter_map(|(k, v)| {
1139 v.as_str().map(|s| (k.clone(), s.to_string()))
1140 })
1141 .collect();
1142
1143 let content_type = user_content_type
1144 .or(inferred_content_type);
1145 if let Some(ct) = content_type {
1146 resp_headers.push(("Content-Type".to_string(), ct));
1147 }
1148
1149 HttpReply {
1150 status,
1151 headers: resp_headers,
1152 body: reply_body,
1153 }
1154 }
1155 Err(CamelError::Stopped) => {
1156 tracing::debug!(path = %path_clone, "Route stopped — returning 204 No Content");
1157 HttpReply {
1158 status: 204,
1159 headers: vec![],
1160 body: HttpReplyBody::Bytes(bytes::Bytes::new()),
1161 }
1162 }
1163 Err(CamelError::Unauthenticated(msg)) => {
1164 tracing::warn!(error = %msg, path = %path_clone, "Authentication failed");
1165 HttpReply {
1166 status: 401,
1167 headers: vec![("WWW-Authenticate".to_string(), "Bearer".to_string())],
1168 body: HttpReplyBody::Bytes(bytes::Bytes::from("Unauthorized")),
1169 }
1170 }
1171 Err(CamelError::Unauthorized(msg)) => {
1172 tracing::warn!(error = %msg, path = %path_clone, "Authorization failed");
1173 HttpReply {
1174 status: 403,
1175 headers: vec![],
1176 body: HttpReplyBody::Bytes(bytes::Bytes::from("Forbidden")),
1177 }
1178 }
1179 Err(e) => {
1180 tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
1181 HttpReply {
1182 status: 500,
1183 headers: vec![],
1184 body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
1185 }
1186 }
1187 };
1188
1189 let _ = reply_tx.send(reply);
1191 });
1192 }
1193 }
1194 }
1195
1196 registry_for_cleanup.unregister_api_route(&path).await;
1198
1199 Ok(())
1200 }
1201
1202 async fn stop(&mut self) -> Result<(), CamelError> {
1203 Ok(())
1204 }
1205
1206 fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1207 camel_component_api::ConcurrencyModel::Concurrent { max: None }
1208 }
1209}
1210
1211pub struct HttpComponent {
1216 config: HttpConfig,
1217}
1218
1219fn build_client(config: &HttpConfig, cookie_handling: CookieHandling) -> reqwest::Client {
1220 let mut builder = reqwest::Client::builder()
1221 .connect_timeout(Duration::from_millis(config.connect_timeout_ms))
1222 .pool_max_idle_per_host(config.pool_max_idle_per_host)
1223 .pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
1224
1225 if !config.follow_redirects {
1226 builder = builder.redirect(reqwest::redirect::Policy::none());
1227 } else if let Some(max_redirects) = config.max_redirects {
1228 builder = builder.redirect(reqwest::redirect::Policy::limited(max_redirects));
1229 }
1230
1231 if let Some(proxy_url) = &config.proxy_url
1232 && let Ok(proxy) = reqwest::Proxy::all(proxy_url)
1233 {
1234 builder = builder.proxy(proxy);
1235 }
1236
1237 if matches!(cookie_handling, CookieHandling::InMemory) {
1238 }
1240
1241 if let Some(tls) = &config.tls
1242 && tls.enabled
1243 {
1244 if tls.insecure || !tls.verify_peer {
1245 builder = builder.danger_accept_invalid_certs(true);
1246 }
1247
1248 if let Some(ca_path) = &tls.ca_cert_path
1249 && let Ok(ca_bytes) = std::fs::read(ca_path)
1250 {
1251 let cert = reqwest::Certificate::from_pem(&ca_bytes)
1252 .or_else(|_| reqwest::Certificate::from_der(&ca_bytes));
1253 if let Ok(ca_cert) = cert {
1254 builder = builder.add_root_certificate(ca_cert);
1255 }
1256 }
1257
1258 if let (Some(cert_path), Some(key_path)) = (&tls.client_cert_path, &tls.client_key_path)
1259 && let (Ok(cert_bytes), Ok(key_bytes)) =
1260 (std::fs::read(cert_path), std::fs::read(key_path))
1261 {
1262 let mut identity_pem = cert_bytes;
1263 identity_pem.extend_from_slice(&key_bytes);
1264 if let Ok(identity) = reqwest::Identity::from_pem(&identity_pem) {
1265 builder = builder.identity(identity);
1266 }
1267 }
1268 }
1269
1270 builder
1271 .build()
1272 .expect("reqwest::Client::build() with valid config should not fail") }
1274
1275impl HttpComponent {
1276 pub fn new() -> Self {
1277 let config = HttpConfig::default();
1278 Self { config }
1279 }
1280
1281 pub fn with_config(config: HttpConfig) -> Self {
1282 Self { config }
1283 }
1284
1285 pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1286 match config {
1287 Some(cfg) => Self::with_config(cfg),
1288 None => Self::new(),
1289 }
1290 }
1291}
1292
1293impl Default for HttpComponent {
1294 fn default() -> Self {
1295 Self::new()
1296 }
1297}
1298
1299impl Component for HttpComponent {
1300 fn scheme(&self) -> &str {
1301 "http"
1302 }
1303
1304 fn create_endpoint(
1305 &self,
1306 uri: &str,
1307 ctx: &dyn camel_component_api::ComponentContext,
1308 ) -> Result<Box<dyn Endpoint>, CamelError> {
1309 self.config.validate()?;
1310 let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1311 let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1312 let client = build_client(&self.config, config.cookie_handling);
1313 ctx.register_current_route_health_check(Arc::new(HttpHealthCheck::new(
1314 server_config.host.clone(),
1315 server_config.port,
1316 )));
1317 Ok(Box::new(HttpEndpoint {
1318 uri: uri.to_string(),
1319 config,
1320 server_config,
1321 client,
1322 }))
1323 }
1324}
1325
1326pub struct HttpsComponent {
1327 config: HttpConfig,
1328}
1329
1330impl HttpsComponent {
1331 pub fn new() -> Self {
1332 let config = HttpConfig::default();
1333 Self { config }
1334 }
1335
1336 pub fn with_config(config: HttpConfig) -> Self {
1337 Self { config }
1338 }
1339
1340 pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1341 match config {
1342 Some(cfg) => Self::with_config(cfg),
1343 None => Self::new(),
1344 }
1345 }
1346}
1347
1348impl Default for HttpsComponent {
1349 fn default() -> Self {
1350 Self::new()
1351 }
1352}
1353
1354impl Component for HttpsComponent {
1355 fn scheme(&self) -> &str {
1356 "https"
1357 }
1358
1359 fn create_endpoint(
1360 &self,
1361 uri: &str,
1362 ctx: &dyn camel_component_api::ComponentContext,
1363 ) -> Result<Box<dyn Endpoint>, CamelError> {
1364 self.config.validate()?;
1365 let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1366 let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1367 let client = build_client(&self.config, config.cookie_handling);
1368 ctx.register_current_route_health_check(Arc::new(HttpHealthCheck::new(
1369 server_config.host.clone(),
1370 server_config.port,
1371 )));
1372 Ok(Box::new(HttpEndpoint {
1373 uri: uri.to_string(),
1374 config,
1375 server_config,
1376 client,
1377 }))
1378 }
1379}
1380
1381struct HttpEndpoint {
1386 uri: String,
1387 config: HttpEndpointConfig,
1388 server_config: HttpServerConfig,
1389 client: reqwest::Client,
1390}
1391
1392impl Endpoint for HttpEndpoint {
1393 fn uri(&self) -> &str {
1394 &self.uri
1395 }
1396
1397 fn create_consumer(
1398 &self,
1399 rt: Arc<dyn camel_component_api::RuntimeObservability>,
1400 ) -> Result<Box<dyn Consumer>, CamelError> {
1401 Ok(Box::new(HttpConsumer::new(self.server_config.clone(), rt)))
1402 }
1403
1404 fn create_producer(
1405 &self,
1406 _rt: Arc<dyn camel_component_api::RuntimeObservability>,
1407 _ctx: &ProducerContext,
1408 ) -> Result<BoxProcessor, CamelError> {
1409 let producer = HttpProducer {
1410 config: Arc::new(self.config.clone()),
1411 client: self.client.clone(),
1412 };
1413 if let Some(ref provider) = self.config.token_provider {
1414 let layer = BearerTokenLayer::new(Arc::clone(provider));
1415 Ok(BoxProcessor::new(layer.layer(producer)))
1416 } else {
1417 Ok(BoxProcessor::new(producer))
1418 }
1419 }
1420}
1421
1422fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
1427 let parsed = url::Url::parse(url)
1428 .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1429
1430 if let Some(host) = parsed.host_str()
1432 && config.blocked_hosts.iter().any(|blocked| host == blocked)
1433 {
1434 return Err(CamelError::ProcessorError(format!(
1435 "Host '{}' is blocked",
1436 host
1437 )));
1438 }
1439
1440 if !config.allow_private_ips
1442 && let Some(host) = parsed.host()
1443 {
1444 match host {
1445 url::Host::Ipv4(ip) => {
1446 if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
1447 return Err(CamelError::ProcessorError(format!(
1448 "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
1449 ip
1450 )));
1451 }
1452 }
1453 url::Host::Ipv6(ip) => {
1454 if ip.is_loopback() {
1455 return Err(CamelError::ProcessorError(format!(
1456 "Loopback IP '{}' not allowed",
1457 ip
1458 )));
1459 }
1460 }
1461 url::Host::Domain(domain) => {
1462 let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
1464 if blocked_domains.contains(&domain) {
1465 return Err(CamelError::ProcessorError(format!(
1466 "Domain '{}' is not allowed",
1467 domain
1468 )));
1469 }
1470 }
1471 }
1472 }
1473
1474 Ok(())
1475}
1476
1477fn is_private_ip(ip: &IpAddr) -> bool {
1478 match ip {
1479 IpAddr::V4(ipv4) => {
1480 ipv4.is_private() || ipv4.is_loopback() || ipv4.is_link_local() || ipv4.octets()[0] == 0
1481 }
1482 IpAddr::V6(ipv6) => {
1483 let seg0 = ipv6.segments()[0];
1484 ipv6.is_loopback()
1485 || (seg0 & 0xfe00) == 0xfc00
1487 || (seg0 & 0xffc0) == 0xfe80
1489 || ipv6
1491 .to_ipv4_mapped()
1492 .map(|v4| {
1493 v4.is_private()
1494 || v4.is_loopback()
1495 || v4.is_link_local()
1496 || v4.octets()[0] == 0
1497 })
1498 .unwrap_or(false)
1499 }
1500 }
1501}
1502
1503async fn validate_resolved_host_for_ssrf(
1504 url: &str,
1505 config: &HttpEndpointConfig,
1506) -> Result<(), CamelError> {
1507 if config.allow_private_ips {
1508 return Ok(());
1509 }
1510
1511 let parsed = url::Url::parse(url)
1512 .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1513 let Some(host) = parsed.host_str() else {
1514 return Ok(());
1515 };
1516 let Some(port) = parsed.port_or_known_default() else {
1517 return Ok(());
1518 };
1519
1520 let resolved = tokio::net::lookup_host((host, port)).await.map_err(|e| {
1521 CamelError::ProcessorError(format!("Failed to resolve host '{}': {}", host, e))
1522 })?;
1523
1524 for addr in resolved {
1525 let ip = addr.ip();
1526 if is_private_ip(&ip) {
1527 return Err(CamelError::ProcessorError(format!(
1528 "Target resolved to private IP: {}",
1529 ip
1530 )));
1531 }
1532 }
1533
1534 Ok(())
1535}
1536
1537#[derive(Clone)]
1542struct HttpProducer {
1543 config: Arc<HttpEndpointConfig>,
1544 client: reqwest::Client,
1545}
1546
1547impl HttpProducer {
1548 fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1549 if let Some(ref method) = config.http_method {
1550 return method.to_uppercase();
1551 }
1552 if let Some(method) = exchange
1553 .input
1554 .header("CamelHttpMethod")
1555 .and_then(|v| v.as_str())
1556 {
1557 return method.to_uppercase();
1558 }
1559 if !exchange.input.body.is_empty() {
1560 return "POST".to_string();
1561 }
1562 "GET".to_string()
1563 }
1564
1565 fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1566 if let Some(uri) = exchange
1567 .input
1568 .header("CamelHttpUri")
1569 .and_then(|v| v.as_str())
1570 {
1571 let mut url = uri.to_string();
1572 if let Some(path) = exchange
1573 .input
1574 .header("CamelHttpPath")
1575 .and_then(|v| v.as_str())
1576 {
1577 if !url.ends_with('/') && !path.starts_with('/') {
1578 url.push('/');
1579 }
1580 url.push_str(path);
1581 }
1582 if let Some(query) = exchange
1583 .input
1584 .header("CamelHttpQuery")
1585 .and_then(|v| v.as_str())
1586 {
1587 url.push('?');
1588 url.push_str(query);
1589 }
1590 return url;
1591 }
1592
1593 let mut url = config.base_url.clone();
1594
1595 if let Some(path) = exchange
1596 .input
1597 .header("CamelHttpPath")
1598 .and_then(|v| v.as_str())
1599 {
1600 if !url.ends_with('/') && !path.starts_with('/') {
1601 url.push('/');
1602 }
1603 url.push_str(path);
1604 }
1605
1606 if let Some(query) = exchange
1607 .input
1608 .header("CamelHttpQuery")
1609 .and_then(|v| v.as_str())
1610 {
1611 url.push('?');
1612 url.push_str(query);
1613 } else if !config.query_params.is_empty() {
1614 let mut parsed = url::Url::parse(&url).expect("base URL must be valid"); for (k, v) in &config.query_params {
1616 parsed.query_pairs_mut().append_pair(k, v);
1617 }
1618 url = parsed.to_string();
1619 }
1620
1621 url
1622 }
1623
1624 fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1625 status >= range.0 && status <= range.1
1626 }
1627}
1628
1629impl Service<Exchange> for HttpProducer {
1630 type Response = Exchange;
1631 type Error = CamelError;
1632 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1633
1634 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1635 Poll::Ready(Ok(()))
1636 }
1637
1638 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1639 let config = self.config.clone();
1640 let client = self.client.clone();
1641
1642 Box::pin(async move {
1643 let method_str = HttpProducer::resolve_method(&exchange, &config);
1644 let url = HttpProducer::resolve_url(&exchange, &config);
1645
1646 validate_url_for_ssrf(&url, &config)?;
1648 validate_resolved_host_for_ssrf(&url, &config).await?;
1649
1650 debug!(
1651 correlation_id = %exchange.correlation_id(),
1652 method = %method_str,
1653 url = %url,
1654 "HTTP request"
1655 );
1656
1657 let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1658 CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1659 })?;
1660
1661 let mut request = client.request(method, &url);
1662
1663 if let Some(timeout) = config.response_timeout {
1664 request = request.timeout(timeout);
1665 }
1666
1667 if let Some(user_agent) = &config.user_agent
1668 && !config.bridge_endpoint
1669 {
1670 request = request.header("User-Agent", user_agent.clone());
1671 }
1672
1673 #[cfg(feature = "otel")]
1675 let should_inject_otel = !config.bridge_endpoint;
1676 #[cfg(feature = "otel")]
1677 if should_inject_otel {
1678 let mut otel_headers = HashMap::new();
1679 camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1680 for (k, v) in otel_headers {
1681 if let (Ok(name), Ok(val)) = (
1682 reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1683 reqwest::header::HeaderValue::from_str(&v),
1684 ) {
1685 request = request.header(name, val);
1686 }
1687 }
1688 }
1689
1690 for (key, value) in &exchange.input.headers {
1691 if !key.starts_with("Camel")
1692 && !config
1693 .skip_request_headers
1694 .iter()
1695 .any(|h| h.eq_ignore_ascii_case(key))
1696 && let Some(val_str) = value.as_str()
1697 && let (Ok(name), Ok(val)) = (
1698 reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1699 reqwest::header::HeaderValue::from_str(val_str),
1700 )
1701 {
1702 request = request.header(name, val);
1703 }
1704 }
1705
1706 if !config.bridge_endpoint {
1707 match &config.auth {
1708 HttpAuth::None => {}
1709 HttpAuth::Basic { username, password } => {
1710 request = request.basic_auth(username, Some(password));
1711 }
1712 HttpAuth::Bearer { token } => {
1713 request = request.bearer_auth(token);
1714 }
1715 }
1716
1717 if config.connection_close {
1718 request = request.header("Connection", "close");
1719 }
1720 }
1721
1722 match exchange.input.body {
1723 Body::Stream(ref s) => {
1724 let mut stream_lock = s.stream.lock().await;
1725 if let Some(stream) = stream_lock.take() {
1726 request = request.body(reqwest::Body::wrap_stream(stream));
1727 } else {
1728 return Err(CamelError::AlreadyConsumed);
1729 }
1730 }
1731 _ => {
1732 let body = std::mem::take(&mut exchange.input.body);
1734 let bytes = body.into_bytes(config.max_body_size).await?;
1735 if !bytes.is_empty() {
1736 request = request.body(bytes);
1737 }
1738 }
1739 }
1740
1741 let response = request
1742 .send()
1743 .await
1744 .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1745
1746 let status_code = response.status().as_u16();
1747 let status_text = response
1748 .status()
1749 .canonical_reason()
1750 .unwrap_or("Unknown")
1751 .to_string();
1752
1753 for (key, value) in response.headers() {
1754 if config
1755 .skip_response_headers
1756 .iter()
1757 .any(|h| h.eq_ignore_ascii_case(key.as_str()))
1758 {
1759 continue;
1760 }
1761 if let Ok(val_str) = value.to_str() {
1762 exchange.input.set_header(
1763 title_case_header(key.as_str()),
1764 serde_json::Value::String(val_str.to_string()),
1765 );
1766 }
1767 }
1768
1769 exchange.input.set_header(
1770 "CamelHttpResponseCode",
1771 serde_json::Value::Number(status_code.into()),
1772 );
1773 exchange.input.set_header(
1774 "CamelHttpResponseText",
1775 serde_json::Value::String(status_text.clone()),
1776 );
1777
1778 let read_timeout = Duration::from_millis(config.read_timeout_ms);
1780 let response_body = tokio::time::timeout(read_timeout, async {
1781 if let Some(content_len) = response.content_length()
1783 && content_len > config.max_response_bytes as u64
1784 {
1785 return Err(CamelError::ProcessorError(format!(
1786 "Response body too large: {} bytes exceeds limit of {} bytes",
1787 content_len, config.max_response_bytes
1788 )));
1789 }
1790 use futures::TryStreamExt;
1792 let mut stream = response.bytes_stream();
1793 let mut total: usize = 0;
1794 let mut collected = Vec::new();
1795 while let Some(chunk) = stream.try_next().await.map_err(|e| {
1796 CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1797 })? {
1798 total += chunk.len();
1799 if total > config.max_response_bytes {
1800 return Err(CamelError::ProcessorError(format!(
1801 "Response body too large: {} bytes exceeds limit of {} bytes",
1802 total, config.max_response_bytes
1803 )));
1804 }
1805 collected.push(chunk);
1806 }
1807 let mut result = bytes::BytesMut::with_capacity(total);
1808 for chunk in collected {
1809 result.extend_from_slice(&chunk);
1810 }
1811 Ok::<bytes::Bytes, CamelError>(result.freeze())
1812 })
1813 .await
1814 .map_err(|_| {
1815 CamelError::ProcessorError(format!(
1816 "Read timeout after {}ms",
1817 config.read_timeout_ms
1818 ))
1819 })??;
1820
1821 if config.throw_exception_on_failure
1822 && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1823 {
1824 return Err(CamelError::HttpOperationFailed {
1825 method: method_str,
1826 url,
1827 status_code,
1828 status_text,
1829 response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1830 });
1831 }
1832
1833 if !response_body.is_empty() {
1834 exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1835 }
1836
1837 debug!(
1838 correlation_id = %exchange.correlation_id(),
1839 status = status_code,
1840 url = %url,
1841 "HTTP response"
1842 );
1843 Ok(exchange)
1844 })
1845 }
1846}
1847
1848#[cfg(test)]
1858pub(crate) static REGISTRY_TEST_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
1859
1860#[cfg(test)]
1861mod tests {
1862 use camel_component_api::test_support::PanicRuntimeObservability;
1863 fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
1864 std::sync::Arc::new(PanicRuntimeObservability)
1865 }
1866 fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
1867 std::sync::Arc::new(PanicRuntimeObservability)
1868 }
1869
1870 use super::*;
1871 use camel_component_api::{Message, NoOpComponentContext};
1872 use std::sync::Arc;
1873 use std::time::Duration;
1874
1875 fn test_producer_ctx() -> ProducerContext {
1876 ProducerContext::new()
1877 }
1878
1879 #[test]
1880 fn test_http_config_defaults() {
1881 let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1882 assert_eq!(config.base_url, "http://localhost:8080/api");
1883 assert!(config.http_method.is_none());
1884 assert!(config.throw_exception_on_failure);
1885 assert_eq!(config.ok_status_code_range, (200, 299));
1886 assert!(config.response_timeout.is_none());
1887 assert!(matches!(config.auth, HttpAuth::None));
1888 assert!(matches!(config.cookie_handling, CookieHandling::Disabled));
1889 assert!(!config.bridge_endpoint);
1890 assert!(!config.connection_close);
1891 }
1892
1893 #[test]
1894 fn test_http_config_scheme() {
1895 assert_eq!(HttpEndpointConfig::scheme(), "http");
1897 }
1898
1899 #[test]
1900 fn test_http_config_from_components() {
1901 let components = camel_component_api::UriComponents {
1903 scheme: "https".to_string(),
1904 path: "//api.example.com/v1".to_string(),
1905 params: std::collections::HashMap::from([(
1906 "httpMethod".to_string(),
1907 "POST".to_string(),
1908 )]),
1909 };
1910 let config = HttpEndpointConfig::from_components(components).unwrap();
1911 assert_eq!(config.base_url, "https://api.example.com/v1");
1912 assert_eq!(config.http_method, Some("POST".to_string()));
1913 }
1914
1915 #[test]
1916 fn test_http_config_with_options() {
1917 let config = HttpEndpointConfig::from_uri(
1918 "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1919 ).unwrap();
1920 assert_eq!(config.base_url, "https://api.example.com/v1");
1921 assert_eq!(config.http_method, Some("PUT".to_string()));
1922 assert!(!config.throw_exception_on_failure);
1923 assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1924 }
1925
1926 #[test]
1927 fn test_http_endpoint_config_auth_and_headers_options() {
1928 let config = HttpEndpointConfig::from_uri(
1929 "http://localhost/api?authMethod=Basic&authUsername=u&authPassword=p&userAgent=camel-test&bridgeEndpoint=true&connectionClose=true&skipRequestHeaders=Authorization,X-Secret&skipResponseHeaders=Set-Cookie&cookieHandling=InMemory",
1930 )
1931 .unwrap();
1932
1933 assert!(matches!(
1934 config.auth,
1935 HttpAuth::Basic { username, password } if username == "u" && password == "p"
1936 ));
1937 assert_eq!(config.user_agent.as_deref(), Some("camel-test"));
1938 assert!(matches!(config.cookie_handling, CookieHandling::InMemory));
1939 assert!(config.bridge_endpoint);
1940 assert!(config.connection_close);
1941 assert_eq!(
1942 config.skip_request_headers,
1943 vec!["authorization".to_string(), "x-secret".to_string()]
1944 );
1945 assert_eq!(config.skip_response_headers, vec!["set-cookie".to_string()]);
1946 }
1947
1948 #[test]
1949 fn test_http_endpoint_config_bearer_auth() {
1950 let config = HttpEndpointConfig::from_uri(
1951 "http://localhost/api?authMethod=Bearer&authBearerToken=t",
1952 )
1953 .unwrap();
1954 assert!(matches!(
1955 config.auth,
1956 HttpAuth::Bearer { token } if token == "t"
1957 ));
1958 }
1959
1960 #[test]
1961 fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
1962 let config = HttpConfig::default()
1963 .with_response_timeout_ms(999)
1964 .with_allow_private_ips(true)
1965 .with_blocked_hosts(vec!["evil.com".to_string()])
1966 .with_max_body_size(12345);
1967 let endpoint =
1968 HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
1969 assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
1970 assert!(endpoint.allow_private_ips);
1971 assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
1972 assert_eq!(endpoint.max_body_size, 12345);
1973 }
1974
1975 #[test]
1976 fn test_from_uri_with_defaults_uri_overrides_config() {
1977 let config = HttpConfig::default()
1978 .with_response_timeout_ms(999)
1979 .with_allow_private_ips(true)
1980 .with_blocked_hosts(vec!["evil.com".to_string()])
1981 .with_max_body_size(12345);
1982 let endpoint = HttpEndpointConfig::from_uri_with_defaults(
1983 "http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
1984 &config,
1985 )
1986 .unwrap();
1987 assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
1988 assert!(!endpoint.allow_private_ips);
1989 assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
1990 assert_eq!(endpoint.max_body_size, 99);
1991 }
1992
1993 #[test]
1994 fn test_http_config_ok_status_range() {
1995 let config =
1996 HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1997 assert_eq!(config.ok_status_code_range, (200, 204));
1998 }
1999
2000 #[test]
2001 fn test_http_config_wrong_scheme() {
2002 let result = HttpEndpointConfig::from_uri("file:/tmp");
2003 assert!(result.is_err());
2004 }
2005
2006 #[test]
2007 fn test_http_component_scheme() {
2008 let component = HttpComponent::new();
2009 assert_eq!(component.scheme(), "http");
2010 }
2011
2012 #[test]
2013 fn test_https_component_scheme() {
2014 let component = HttpsComponent::new();
2015 assert_eq!(component.scheme(), "https");
2016 }
2017
2018 #[test]
2019 fn test_http_endpoint_creates_consumer() {
2020 let component = HttpComponent::new();
2021 let ctx = NoOpComponentContext;
2022 let endpoint = component
2023 .create_endpoint("http://0.0.0.0:19100/test", &ctx)
2024 .unwrap();
2025 assert!(endpoint.create_consumer(rt()).is_ok());
2026 }
2027
2028 #[test]
2029 fn test_https_endpoint_creates_consumer() {
2030 let component = HttpsComponent::new();
2031 let ctx = NoOpComponentContext;
2032 let endpoint = component
2033 .create_endpoint("https://0.0.0.0:8443/test", &ctx)
2034 .unwrap();
2035 assert!(endpoint.create_consumer(rt()).is_ok());
2036 }
2037
2038 #[test]
2039 fn test_http_endpoint_creates_producer() {
2040 let ctx = test_producer_ctx();
2041 let component = HttpComponent::new();
2042 let endpoint_ctx = NoOpComponentContext;
2043 let endpoint = component
2044 .create_endpoint("http://localhost/api", &endpoint_ctx)
2045 .unwrap();
2046 assert!(endpoint.create_producer(rt(), &ctx).is_ok());
2047 }
2048
2049 #[tokio::test]
2054 async fn test_producer_with_token_provider() {
2055 use camel_auth::oauth2::TokenProvider;
2056 use tower::ServiceExt;
2057
2058 let captured_auth: Arc<std::sync::Mutex<Option<String>>> =
2059 Arc::new(std::sync::Mutex::new(None));
2060 let captured_clone = Arc::clone(&captured_auth);
2061
2062 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2063 let port = listener.local_addr().unwrap().port();
2064
2065 let _handle = tokio::spawn(async move {
2066 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2067 if let Ok((mut stream, _)) = listener.accept().await {
2068 let mut buf = vec![0u8; 8192];
2069 let n = stream.read(&mut buf).await.unwrap_or(0);
2070 let request = String::from_utf8_lossy(&buf[..n]).to_string();
2071 let auth = request
2072 .lines()
2073 .find(|l| l.to_lowercase().starts_with("authorization:"))
2074 .map(|l| {
2075 l.split(':')
2076 .nth(1)
2077 .map(|s| s.trim().to_string())
2078 .unwrap_or_default()
2079 });
2080 *captured_clone.lock().unwrap() = auth;
2081 let body = r#"{"echo":"ok"}"#;
2082 let resp = format!(
2083 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
2084 body.len(),
2085 body
2086 );
2087 let _ = stream.write_all(resp.as_bytes()).await;
2088 }
2089 });
2090
2091 #[derive(Debug)]
2092 struct StaticProvider;
2093 #[async_trait::async_trait]
2094 impl TokenProvider for StaticProvider {
2095 async fn get_token(&self) -> Result<String, camel_auth::types::AuthError> {
2096 Ok("injected-token".into())
2097 }
2098 }
2099
2100 let uri = format!("http://127.0.0.1:{}/api?allowPrivateIps=true", port);
2101 let ctx = test_producer_ctx();
2102 let component = HttpComponent::new();
2103 let endpoint_ctx = NoOpComponentContext;
2104 let endpoint = component.create_endpoint(&uri, &endpoint_ctx).unwrap();
2105 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2106
2107 let exchange = Exchange::new(Message::new("hello"));
2108
2109 let layer = BearerTokenLayer::new(Arc::new(StaticProvider));
2110 let mut layered = layer.layer(producer);
2111 let result = layered.ready().await.unwrap().call(exchange).await;
2112 assert!(result.is_ok(), "producer call failed: {:?}", result);
2113
2114 tokio::time::sleep(Duration::from_millis(100)).await;
2115 let auth = captured_auth.lock().unwrap().take();
2116 assert_eq!(auth.as_deref(), Some("Bearer injected-token"));
2117 }
2118
2119 async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
2120 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2121 let addr = listener.local_addr().unwrap();
2122 let url = format!("http://127.0.0.1:{}", addr.port());
2123
2124 let handle = tokio::spawn(async move {
2125 loop {
2126 if let Ok((mut stream, _)) = listener.accept().await {
2127 tokio::spawn(async move {
2128 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2129 let mut buf = vec![0u8; 4096];
2130 let n = stream.read(&mut buf).await.unwrap_or(0);
2131 let request = String::from_utf8_lossy(&buf[..n]).to_string();
2132
2133 let method = request.split_whitespace().next().unwrap_or("GET");
2134
2135 let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
2136 let response = format!(
2137 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
2138 body.len(),
2139 body
2140 );
2141 let _ = stream.write_all(response.as_bytes()).await;
2142 });
2143 }
2144 }
2145 });
2146
2147 (url, handle)
2148 }
2149
2150 async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
2151 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2152 let addr = listener.local_addr().unwrap();
2153 let url = format!("http://127.0.0.1:{}", addr.port());
2154
2155 let handle = tokio::spawn(async move {
2156 loop {
2157 if let Ok((mut stream, _)) = listener.accept().await {
2158 let status = status;
2159 tokio::spawn(async move {
2160 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2161 let mut buf = vec![0u8; 4096];
2162 let _ = stream.read(&mut buf).await;
2163
2164 let status_text = match status {
2165 404 => "Not Found",
2166 500 => "Internal Server Error",
2167 _ => "Error",
2168 };
2169 let body = "error body";
2170 let response = format!(
2171 "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
2172 status,
2173 status_text,
2174 body.len(),
2175 body
2176 );
2177 let _ = stream.write_all(response.as_bytes()).await;
2178 });
2179 }
2180 }
2181 });
2182
2183 (url, handle)
2184 }
2185
2186 #[tokio::test]
2187 async fn test_http_producer_get_request() {
2188 use tower::ServiceExt;
2189
2190 let (url, _handle) = start_test_server().await;
2191 let ctx = test_producer_ctx();
2192
2193 let component = HttpComponent::new();
2194 let endpoint_ctx = NoOpComponentContext;
2195 let endpoint = component
2196 .create_endpoint(
2197 &format!("{url}/api/test?allowPrivateIps=true"),
2198 &endpoint_ctx,
2199 )
2200 .unwrap();
2201 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2202
2203 let exchange = Exchange::new(Message::default());
2204 let result = producer.oneshot(exchange).await.unwrap();
2205
2206 let status = result
2207 .input
2208 .header("CamelHttpResponseCode")
2209 .and_then(|v| v.as_u64())
2210 .unwrap();
2211 assert_eq!(status, 200);
2212
2213 assert!(!result.input.body.is_empty());
2214 }
2215
2216 #[tokio::test]
2217 async fn test_http_producer_post_with_body() {
2218 use tower::ServiceExt;
2219
2220 let (url, _handle) = start_test_server().await;
2221 let ctx = test_producer_ctx();
2222
2223 let component = HttpComponent::new();
2224 let endpoint_ctx = NoOpComponentContext;
2225 let endpoint = component
2226 .create_endpoint(
2227 &format!("{url}/api/data?allowPrivateIps=true"),
2228 &endpoint_ctx,
2229 )
2230 .unwrap();
2231 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2232
2233 let exchange = Exchange::new(Message::new("request body"));
2234 let result = producer.oneshot(exchange).await.unwrap();
2235
2236 let status = result
2237 .input
2238 .header("CamelHttpResponseCode")
2239 .and_then(|v| v.as_u64())
2240 .unwrap();
2241 assert_eq!(status, 200);
2242 }
2243
2244 #[tokio::test]
2245 async fn test_http_producer_method_from_header() {
2246 use tower::ServiceExt;
2247
2248 let (url, _handle) = start_test_server().await;
2249 let ctx = test_producer_ctx();
2250
2251 let component = HttpComponent::new();
2252 let endpoint_ctx = NoOpComponentContext;
2253 let endpoint = component
2254 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2255 .unwrap();
2256 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2257
2258 let mut exchange = Exchange::new(Message::default());
2259 exchange.input.set_header(
2260 "CamelHttpMethod",
2261 serde_json::Value::String("DELETE".to_string()),
2262 );
2263
2264 let result = producer.oneshot(exchange).await.unwrap();
2265 let status = result
2266 .input
2267 .header("CamelHttpResponseCode")
2268 .and_then(|v| v.as_u64())
2269 .unwrap();
2270 assert_eq!(status, 200);
2271 }
2272
2273 #[tokio::test]
2274 async fn test_http_producer_forced_method() {
2275 use tower::ServiceExt;
2276
2277 let (url, _handle) = start_test_server().await;
2278 let ctx = test_producer_ctx();
2279
2280 let component = HttpComponent::new();
2281 let endpoint_ctx = NoOpComponentContext;
2282 let endpoint = component
2283 .create_endpoint(
2284 &format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"),
2285 &endpoint_ctx,
2286 )
2287 .unwrap();
2288 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2289
2290 let exchange = Exchange::new(Message::default());
2291 let result = producer.oneshot(exchange).await.unwrap();
2292
2293 let status = result
2294 .input
2295 .header("CamelHttpResponseCode")
2296 .and_then(|v| v.as_u64())
2297 .unwrap();
2298 assert_eq!(status, 200);
2299 }
2300
2301 #[tokio::test]
2302 async fn test_http_producer_throw_exception_on_failure() {
2303 use tower::ServiceExt;
2304
2305 let (url, _handle) = start_status_server(404).await;
2306 let ctx = test_producer_ctx();
2307
2308 let component = HttpComponent::new();
2309 let endpoint_ctx = NoOpComponentContext;
2310 let endpoint = component
2311 .create_endpoint(
2312 &format!("{url}/not-found?allowPrivateIps=true"),
2313 &endpoint_ctx,
2314 )
2315 .unwrap();
2316 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2317
2318 let exchange = Exchange::new(Message::default());
2319 let result = producer.oneshot(exchange).await;
2320 assert!(result.is_err());
2321
2322 match result.unwrap_err() {
2323 CamelError::HttpOperationFailed { status_code, .. } => {
2324 assert_eq!(status_code, 404);
2325 }
2326 e => panic!("Expected HttpOperationFailed, got: {e}"),
2327 }
2328 }
2329
2330 #[tokio::test]
2331 async fn test_http_producer_no_throw_on_failure() {
2332 use tower::ServiceExt;
2333
2334 let (url, _handle) = start_status_server(500).await;
2335 let ctx = test_producer_ctx();
2336
2337 let component = HttpComponent::new();
2338 let endpoint_ctx = NoOpComponentContext;
2339 let endpoint = component
2340 .create_endpoint(
2341 &format!("{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"),
2342 &endpoint_ctx,
2343 )
2344 .unwrap();
2345 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2346
2347 let exchange = Exchange::new(Message::default());
2348 let result = producer.oneshot(exchange).await.unwrap();
2349
2350 let status = result
2351 .input
2352 .header("CamelHttpResponseCode")
2353 .and_then(|v| v.as_u64())
2354 .unwrap();
2355 assert_eq!(status, 500);
2356 }
2357
2358 #[tokio::test]
2359 async fn test_http_producer_uri_override() {
2360 use tower::ServiceExt;
2361
2362 let (url, _handle) = start_test_server().await;
2363 let ctx = test_producer_ctx();
2364
2365 let component = HttpComponent::new();
2366 let endpoint_ctx = NoOpComponentContext;
2367 let endpoint = component
2368 .create_endpoint(
2369 "http://localhost:1/does-not-exist?allowPrivateIps=true",
2370 &endpoint_ctx,
2371 )
2372 .unwrap();
2373 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2374
2375 let mut exchange = Exchange::new(Message::default());
2376 exchange.input.set_header(
2377 "CamelHttpUri",
2378 serde_json::Value::String(format!("{url}/api")),
2379 );
2380
2381 let result = producer.oneshot(exchange).await.unwrap();
2382 let status = result
2383 .input
2384 .header("CamelHttpResponseCode")
2385 .and_then(|v| v.as_u64())
2386 .unwrap();
2387 assert_eq!(status, 200);
2388 }
2389
2390 #[tokio::test]
2391 async fn test_http_producer_response_headers_mapped() {
2392 use tower::ServiceExt;
2393
2394 let (url, _handle) = start_test_server().await;
2395 let ctx = test_producer_ctx();
2396
2397 let component = HttpComponent::new();
2398 let endpoint_ctx = NoOpComponentContext;
2399 let endpoint = component
2400 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2401 .unwrap();
2402 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2403
2404 let exchange = Exchange::new(Message::default());
2405 let result = producer.oneshot(exchange).await.unwrap();
2406
2407 assert!(
2408 result.input.header("Content-Type").is_some(),
2409 "Response should have Content-Type header"
2410 );
2411 assert!(result.input.header("CamelHttpResponseText").is_some());
2412 }
2413
2414 async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
2419 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2420 let addr = listener.local_addr().unwrap();
2421 let url = format!("http://127.0.0.1:{}", addr.port());
2422
2423 let handle = tokio::spawn(async move {
2424 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2425 loop {
2426 if let Ok((mut stream, _)) = listener.accept().await {
2427 tokio::spawn(async move {
2428 let mut buf = vec![0u8; 4096];
2429 let n = stream.read(&mut buf).await.unwrap_or(0);
2430 let request = String::from_utf8_lossy(&buf[..n]).to_string();
2431
2432 if request.contains("GET /final") {
2434 let body = r#"{"status":"final"}"#;
2435 let response = format!(
2436 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
2437 body.len(),
2438 body
2439 );
2440 let _ = stream.write_all(response.as_bytes()).await;
2441 } else {
2442 let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
2444 let _ = stream.write_all(response.as_bytes()).await;
2445 }
2446 });
2447 }
2448 }
2449 });
2450
2451 (url, handle)
2452 }
2453
2454 #[tokio::test]
2455 async fn test_follow_redirects_false_does_not_follow() {
2456 use tower::ServiceExt;
2457
2458 let (url, _handle) = start_redirect_server().await;
2459 let ctx = test_producer_ctx();
2460
2461 let component =
2462 HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
2463 let endpoint_ctx = NoOpComponentContext;
2464 let endpoint = component
2465 .create_endpoint(
2466 &format!("{url}?throwExceptionOnFailure=false&allowPrivateIps=true"),
2467 &endpoint_ctx,
2468 )
2469 .unwrap();
2470 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2471
2472 let exchange = Exchange::new(Message::default());
2473 let result = producer.oneshot(exchange).await.unwrap();
2474
2475 let status = result
2477 .input
2478 .header("CamelHttpResponseCode")
2479 .and_then(|v| v.as_u64())
2480 .unwrap();
2481 assert_eq!(
2482 status, 302,
2483 "Should NOT follow redirect when followRedirects=false"
2484 );
2485 }
2486
2487 #[tokio::test]
2488 async fn test_follow_redirects_true_follows_redirect() {
2489 use tower::ServiceExt;
2490
2491 let (url, _handle) = start_redirect_server().await;
2492 let ctx = test_producer_ctx();
2493
2494 let component =
2495 HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
2496 let endpoint_ctx = NoOpComponentContext;
2497 let endpoint = component
2498 .create_endpoint(&format!("{url}?allowPrivateIps=true"), &endpoint_ctx)
2499 .unwrap();
2500 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2501
2502 let exchange = Exchange::new(Message::default());
2503 let result = producer.oneshot(exchange).await.unwrap();
2504
2505 let status = result
2507 .input
2508 .header("CamelHttpResponseCode")
2509 .and_then(|v| v.as_u64())
2510 .unwrap();
2511 assert_eq!(
2512 status, 200,
2513 "Should follow redirect when followRedirects=true"
2514 );
2515 }
2516
2517 #[tokio::test]
2518 async fn test_query_params_forwarded_to_http_request() {
2519 use tower::ServiceExt;
2520
2521 let (url, _handle) = start_test_server().await;
2522 let ctx = test_producer_ctx();
2523
2524 let component = HttpComponent::new();
2525 let endpoint_ctx = NoOpComponentContext;
2526 let endpoint = component
2528 .create_endpoint(
2529 &format!("{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"),
2530 &endpoint_ctx,
2531 )
2532 .unwrap();
2533 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2534
2535 let exchange = Exchange::new(Message::default());
2536 let result = producer.oneshot(exchange).await.unwrap();
2537
2538 let status = result
2541 .input
2542 .header("CamelHttpResponseCode")
2543 .and_then(|v| v.as_u64())
2544 .unwrap();
2545 assert_eq!(status, 200);
2546 }
2547
2548 #[tokio::test]
2549 async fn test_non_camel_query_params_are_forwarded() {
2550 let config = HttpEndpointConfig::from_uri(
2553 "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
2554 )
2555 .unwrap();
2556
2557 assert!(
2559 config.query_params.contains_key("apiKey"),
2560 "apiKey should be preserved"
2561 );
2562 assert!(
2563 config.query_params.contains_key("token"),
2564 "token should be preserved"
2565 );
2566 assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
2567 assert_eq!(config.query_params.get("token").unwrap(), "abc456");
2568
2569 assert!(
2571 !config.query_params.contains_key("httpMethod"),
2572 "httpMethod should not be forwarded"
2573 );
2574 }
2575
2576 #[test]
2577 fn test_query_params_are_url_encoded_when_resolving_url() {
2578 let config =
2579 HttpEndpointConfig::from_uri("http://example.com/api?q=hello world&tag=a+b").unwrap();
2580 let exchange = Exchange::new(Message::default());
2581
2582 let url = HttpProducer::resolve_url(&exchange, &config);
2583
2584 assert!(url.contains("q=hello+world"), "url was: {url}");
2585 assert!(url.contains("tag=a%2Bb"), "url was: {url}");
2586 }
2587
2588 async fn start_slow_server(delay_ms: u64) -> (String, tokio::task::JoinHandle<()>) {
2593 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2594 let addr = listener.local_addr().unwrap();
2595 let url = format!("http://127.0.0.1:{}", addr.port());
2596
2597 let handle = tokio::spawn(async move {
2598 loop {
2599 if let Ok((mut stream, _)) = listener.accept().await {
2600 let delay = delay_ms;
2601 tokio::spawn(async move {
2602 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2603 let mut buf = vec![0u8; 4096];
2604 let _ = stream.read(&mut buf).await;
2605 let headers = "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nTransfer-Encoding: chunked\r\n\r\n";
2607 let _ = stream.write_all(headers.as_bytes()).await;
2608 tokio::time::sleep(Duration::from_millis(delay)).await;
2610 let body = r#"{"status":"slow"}"#;
2611 let chunk = format!("{:x}\r\n{}\r\n0\r\n\r\n", body.len(), body);
2612 let _ = stream.write_all(chunk.as_bytes()).await;
2613 });
2614 }
2615 }
2616 });
2617
2618 (url, handle)
2619 }
2620
2621 #[tokio::test]
2622 async fn test_http_producer_timeout() {
2623 use tower::ServiceExt;
2624
2625 let (url, _handle) = start_slow_server(500).await;
2627 let ctx = test_producer_ctx();
2628
2629 let component = HttpComponent::with_config(
2630 HttpConfig::default()
2631 .with_read_timeout_ms(100)
2632 .with_response_timeout_ms(30_000), );
2634 let endpoint_ctx = NoOpComponentContext;
2635 let endpoint = component
2636 .create_endpoint(&format!("{url}/slow?allowPrivateIps=true"), &endpoint_ctx)
2637 .unwrap();
2638 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2639
2640 let exchange = Exchange::new(Message::default());
2641 let result = producer.oneshot(exchange).await;
2642
2643 assert!(result.is_err(), "Expected timeout error, got: {:?}", result);
2644 let err = result.unwrap_err().to_string();
2645 assert!(
2646 err.contains("Read timeout") || err.contains("timeout"),
2647 "Error should mention timeout, got: {}",
2648 err
2649 );
2650 }
2651
2652 #[tokio::test]
2653 async fn test_http_producer_no_timeout_when_fast() {
2654 use tower::ServiceExt;
2655
2656 let (url, _handle) = start_test_server().await;
2657 let ctx = test_producer_ctx();
2658
2659 let component =
2660 HttpComponent::with_config(HttpConfig::default().with_read_timeout_ms(5_000));
2661 let endpoint_ctx = NoOpComponentContext;
2662 let endpoint = component
2663 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2664 .unwrap();
2665 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2666
2667 let exchange = Exchange::new(Message::default());
2668 let result = producer.oneshot(exchange).await.unwrap();
2669
2670 let status = result
2671 .input
2672 .header("CamelHttpResponseCode")
2673 .and_then(|v| v.as_u64())
2674 .unwrap();
2675 assert_eq!(status, 200);
2676 }
2677
2678 #[tokio::test]
2683 async fn test_http_producer_blocks_metadata_endpoint() {
2684 use tower::ServiceExt;
2685
2686 let ctx = test_producer_ctx();
2687 let component = HttpComponent::new();
2688 let endpoint_ctx = NoOpComponentContext;
2689 let endpoint = component
2690 .create_endpoint(
2691 "http://example.com/api?allowPrivateIps=false",
2692 &endpoint_ctx,
2693 )
2694 .unwrap();
2695 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2696
2697 let mut exchange = Exchange::new(Message::default());
2698 exchange.input.set_header(
2699 "CamelHttpUri",
2700 serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
2701 );
2702
2703 let result = producer.oneshot(exchange).await;
2704 assert!(result.is_err(), "Should block AWS metadata endpoint");
2705
2706 let err = result.unwrap_err();
2707 assert!(
2708 err.to_string().contains("Private IP"),
2709 "Error should mention private IP blocking, got: {}",
2710 err
2711 );
2712 }
2713
2714 #[test]
2715 fn test_ssrf_config_defaults() {
2716 let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
2717 assert!(
2718 !config.allow_private_ips,
2719 "Private IPs should be blocked by default"
2720 );
2721 assert!(
2722 config.blocked_hosts.is_empty(),
2723 "Blocked hosts should be empty by default"
2724 );
2725 }
2726
2727 #[test]
2728 fn test_ssrf_config_allow_private_ips() {
2729 let config =
2730 HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
2731 assert!(
2732 config.allow_private_ips,
2733 "Private IPs should be allowed when explicitly set"
2734 );
2735 }
2736
2737 #[test]
2738 fn test_ssrf_config_blocked_hosts() {
2739 let config = HttpEndpointConfig::from_uri(
2740 "http://example.com/api?blockedHosts=evil.com,malware.net",
2741 )
2742 .unwrap();
2743 assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
2744 }
2745
2746 #[tokio::test]
2747 async fn test_http_producer_blocks_localhost() {
2748 use tower::ServiceExt;
2749
2750 let ctx = test_producer_ctx();
2751 let component = HttpComponent::new();
2752 let endpoint_ctx = NoOpComponentContext;
2753 let endpoint = component
2754 .create_endpoint("http://example.com/api", &endpoint_ctx)
2755 .unwrap();
2756 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2757
2758 let mut exchange = Exchange::new(Message::default());
2759 exchange.input.set_header(
2760 "CamelHttpUri",
2761 serde_json::Value::String("http://localhost:8080/internal".to_string()),
2762 );
2763
2764 let result = producer.oneshot(exchange).await;
2765 assert!(result.is_err(), "Should block localhost");
2766 }
2767
2768 #[tokio::test]
2769 async fn test_http_producer_blocks_loopback_ip() {
2770 use tower::ServiceExt;
2771
2772 let ctx = test_producer_ctx();
2773 let component = HttpComponent::new();
2774 let endpoint_ctx = NoOpComponentContext;
2775 let endpoint = component
2776 .create_endpoint("http://example.com/api", &endpoint_ctx)
2777 .unwrap();
2778 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2779
2780 let mut exchange = Exchange::new(Message::default());
2781 exchange.input.set_header(
2782 "CamelHttpUri",
2783 serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
2784 );
2785
2786 let result = producer.oneshot(exchange).await;
2787 assert!(result.is_err(), "Should block loopback IP");
2788 }
2789
2790 #[tokio::test]
2791 async fn test_http_producer_allows_private_ip_when_enabled() {
2792 use tower::ServiceExt;
2793
2794 let ctx = test_producer_ctx();
2795 let component = HttpComponent::new();
2796 let endpoint_ctx = NoOpComponentContext;
2797 let endpoint = component
2800 .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true", &endpoint_ctx)
2801 .unwrap();
2802 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
2803
2804 let exchange = Exchange::new(Message::default());
2805
2806 let result = producer.oneshot(exchange).await;
2809 if let Err(ref e) = result {
2811 let err_str = e.to_string();
2812 assert!(
2813 !err_str.contains("Private IP") && !err_str.contains("not allowed"),
2814 "Should not be SSRF error, got: {}",
2815 err_str
2816 );
2817 }
2818 }
2819
2820 #[test]
2825 fn test_http_server_config_parse() {
2826 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
2827 assert_eq!(cfg.host, "0.0.0.0");
2828 assert_eq!(cfg.port, 8080);
2829 assert_eq!(cfg.path, "/orders");
2830 assert_eq!(cfg.max_inflight_requests, 1024);
2831 }
2832
2833 #[test]
2834 fn test_http_server_config_scheme() {
2835 assert_eq!(HttpServerConfig::scheme(), "http");
2837 }
2838
2839 #[test]
2840 fn test_http_server_config_from_components() {
2841 let components = camel_component_api::UriComponents {
2843 scheme: "https".to_string(),
2844 path: "//0.0.0.0:8443/api".to_string(),
2845 params: std::collections::HashMap::from([
2846 ("maxRequestBody".to_string(), "5242880".to_string()),
2847 ("maxInflightRequests".to_string(), "7".to_string()),
2848 ]),
2849 };
2850 let cfg = HttpServerConfig::from_components(components).unwrap();
2851 assert_eq!(cfg.host, "0.0.0.0");
2852 assert_eq!(cfg.port, 8443);
2853 assert_eq!(cfg.path, "/api");
2854 assert_eq!(cfg.max_request_body, 5242880);
2855 assert_eq!(cfg.max_inflight_requests, 7);
2856 }
2857
2858 #[test]
2859 fn test_http_server_config_default_path() {
2860 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
2861 assert_eq!(cfg.path, "/");
2862 }
2863
2864 #[test]
2865 fn test_http_server_config_wrong_scheme() {
2866 assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
2867 }
2868
2869 #[test]
2870 fn test_http_server_config_invalid_port() {
2871 assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
2872 }
2873
2874 #[test]
2875 fn test_http_server_config_default_port_by_scheme() {
2876 let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
2878 assert_eq!(cfg_http.port, 80);
2879
2880 let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
2882 assert_eq!(cfg_https.port, 443);
2883 }
2884
2885 #[test]
2886 fn test_request_envelope_and_reply_are_send() {
2887 fn assert_send<T: Send>() {}
2888 assert_send::<RequestEnvelope>();
2889 assert_send::<HttpReply>();
2890 }
2891
2892 #[test]
2897 fn test_server_registry_global_is_singleton() {
2898 let r1 = ServerRegistry::global();
2899 let r2 = ServerRegistry::global();
2900 assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
2901 }
2902
2903 #[allow(clippy::await_holding_lock)]
2904 #[tokio::test]
2905 async fn test_concurrent_get_or_spawn_returns_same_registry() {
2906 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2907 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2908 let port = listener.local_addr().unwrap().port();
2909 drop(listener);
2910
2911 let results: Arc<std::sync::Mutex<Vec<HttpRouteRegistry>>> =
2912 Arc::new(std::sync::Mutex::new(Vec::new()));
2913
2914 let mut handles = Vec::new();
2915 for _ in 0..4 {
2916 let results = results.clone();
2917 handles.push(tokio::spawn(async move {
2918 let registry = ServerRegistry::global()
2919 .get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2920 .await
2921 .unwrap();
2922 results.lock().unwrap().push(registry);
2923 }));
2924 }
2925
2926 for h in handles {
2927 h.await.unwrap();
2928 }
2929
2930 let registries = results.lock().unwrap();
2931 assert_eq!(registries.len(), 4);
2932 for i in 1..registries.len() {
2933 assert!(
2934 Arc::ptr_eq(®istries[0].inner, ®istries[i].inner),
2935 "all concurrent callers should get same route registry"
2936 );
2937 }
2938 }
2939
2940 #[test]
2941 fn test_server_registry_distinguishes_host_and_port() {
2942 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2943 let rt = tokio::runtime::Runtime::new().expect("runtime");
2944 rt.block_on(async {
2945 let registry = ServerRegistry::global();
2946 let d1 = registry
2950 .get_or_spawn("127.0.0.1", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2951 .await;
2952 let d2 = registry
2953 .get_or_spawn("0.0.0.0", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2954 .await;
2955 assert!(d1.is_ok());
2956 assert!(d2.is_ok());
2957 assert!(!Arc::ptr_eq(&d1.unwrap().inner, &d2.unwrap().inner));
2958 });
2959 }
2960
2961 #[allow(clippy::await_holding_lock)]
2962 #[tokio::test]
2963 async fn test_shared_server_max_request_body_policy_is_deterministic() {
2964 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2965 let registry = ServerRegistry::global();
2966 let d1 = registry
2968 .get_or_spawn("127.0.0.1", 9991, 1024 * 1024, 10 * 1024 * 1024, 1024)
2969 .await;
2970 assert!(d1.is_ok());
2971
2972 let d2 = registry
2975 .get_or_spawn("127.0.0.1", 9991, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2976 .await;
2977 assert!(d2.is_err());
2978 let err = d2.unwrap_err();
2979 assert!(
2980 err.to_string().contains("maxRequestBody") || err.to_string().contains("incompatible"),
2981 "Expected incompatible maxRequestBody error, got: {}",
2982 err
2983 );
2984 }
2985
2986 #[test]
2987 fn test_server_registry_reset_clears_entries() {
2988 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2989 let rt = tokio::runtime::Runtime::new().expect("runtime");
2990 rt.block_on(async {
2991 let d1 = ServerRegistry::global()
2993 .get_or_spawn("127.0.0.1", 9992, 1024 * 1024, 10 * 1024 * 1024, 1024)
2994 .await;
2995 assert!(d1.is_ok());
2996
2997 let guard = ServerRegistry::global().inner.lock().expect("lock");
2999 assert!(guard.contains_key(&("127.0.0.1".to_string(), 9992)));
3000 drop(guard);
3001
3002 ServerRegistry::reset();
3004
3005 let guard = ServerRegistry::global().inner.lock().expect("lock");
3007 assert!(
3008 guard.is_empty(),
3009 "registry should be empty after reset, has {} entries",
3010 guard.len()
3011 );
3012 });
3013 }
3014
3015 #[tokio::test]
3020 async fn test_dispatch_handler_returns_404_for_unknown_path() {
3021 let registry = HttpRouteRegistry::new();
3022 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3024 let port = listener.local_addr().unwrap().port();
3025 tokio::spawn(run_axum_server(
3026 listener,
3027 registry,
3028 2 * 1024 * 1024,
3029 10 * 1024 * 1024,
3030 Arc::new(tokio::sync::Semaphore::new(1024)),
3031 ));
3032
3033 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3035
3036 let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
3037 .await
3038 .unwrap();
3039 assert_eq!(resp.status().as_u16(), 404);
3040 }
3041
3042 #[tokio::test]
3047 async fn test_http_consumer_start_registers_path() {
3048 use camel_component_api::ConsumerContext;
3049
3050 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3052 let port = listener.local_addr().unwrap().port();
3053 drop(listener); let consumer_cfg = HttpServerConfig {
3056 host: "127.0.0.1".to_string(),
3057 port,
3058 path: "/ping".to_string(),
3059 max_request_body: 2 * 1024 * 1024,
3060 max_response_body: 10 * 1024 * 1024,
3061 max_inflight_requests: 1024,
3062 };
3063 let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3064
3065 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3066 let token = tokio_util::sync::CancellationToken::new();
3067 let ctx = ConsumerContext::new(tx, token.clone());
3068
3069 tokio::spawn(async move {
3070 consumer.start(ctx).await.unwrap();
3071 });
3072
3073 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3074
3075 let client = reqwest::Client::new();
3076 let resp_future = client
3077 .post(format!("http://127.0.0.1:{port}/ping"))
3078 .body("hello world")
3079 .send();
3080
3081 let (http_result, _) = tokio::join!(resp_future, async {
3082 if let Some(mut envelope) = rx.recv().await {
3083 envelope.exchange.input.set_header(
3085 "CamelHttpResponseCode",
3086 serde_json::Value::Number(201.into()),
3087 );
3088 if let Some(reply_tx) = envelope.reply_tx {
3089 let _ = reply_tx.send(Ok(envelope.exchange));
3090 }
3091 }
3092 });
3093
3094 let resp = http_result.unwrap();
3095 assert_eq!(resp.status().as_u16(), 201);
3096
3097 token.cancel();
3098 }
3099
3100 #[tokio::test]
3101 async fn test_http_consumer_returns_503_when_inflight_limit_reached() {
3102 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3103
3104 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3105 let port = listener.local_addr().unwrap().port();
3106 drop(listener);
3107
3108 let consumer_cfg = HttpServerConfig {
3109 host: "127.0.0.1".to_string(),
3110 port,
3111 path: "/saturation".to_string(),
3112 max_request_body: 2 * 1024 * 1024,
3113 max_response_body: 10 * 1024 * 1024,
3114 max_inflight_requests: 1,
3115 };
3116 let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3117
3118 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3119 let token = tokio_util::sync::CancellationToken::new();
3120 let ctx = ConsumerContext::new(tx, token.clone());
3121 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3122 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3123
3124 let (first_seen_tx, first_seen_rx) = tokio::sync::oneshot::channel::<()>();
3125 let (unblock_first_tx, unblock_first_rx) = tokio::sync::oneshot::channel::<()>();
3126
3127 tokio::spawn(async move {
3128 let mut first_seen_tx = Some(first_seen_tx);
3129 let mut unblock_first_rx = Some(unblock_first_rx);
3130
3131 while let Some(envelope) = rx.recv().await {
3132 if let Some(tx) = first_seen_tx.take() {
3133 let _ = tx.send(());
3134 if let Some(rx_unblock) = unblock_first_rx.take() {
3135 let _ = rx_unblock.await;
3136 }
3137 }
3138
3139 if let Some(reply_tx) = envelope.reply_tx {
3140 let _ = reply_tx.send(Ok(envelope.exchange));
3141 }
3142 }
3143 });
3144
3145 let client = reqwest::Client::new();
3146 let first_req = {
3147 let client = client.clone();
3148 async move {
3149 client
3150 .get(format!("http://127.0.0.1:{port}/saturation"))
3151 .send()
3152 .await
3153 .unwrap()
3154 }
3155 };
3156
3157 let first_handle = tokio::spawn(first_req);
3158 first_seen_rx.await.unwrap();
3159
3160 let second_resp = client
3161 .get(format!("http://127.0.0.1:{port}/saturation"))
3162 .send()
3163 .await
3164 .unwrap();
3165
3166 assert_eq!(second_resp.status().as_u16(), 503);
3167
3168 let _ = unblock_first_tx.send(());
3169 let first_resp = first_handle.await.unwrap();
3170 assert_eq!(first_resp.status().as_u16(), 200);
3171
3172 token.cancel();
3173 }
3174
3175 #[tokio::test]
3176 async fn test_http_consumer_enforces_max_response_body_for_bytes() {
3177 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3178
3179 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3180
3181 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3182 let port = listener.local_addr().unwrap().port();
3183 drop(listener);
3184
3185 let consumer_cfg = HttpServerConfig {
3186 host: "127.0.0.1".to_string(),
3187 port,
3188 path: "/limit-bytes".to_string(),
3189 max_request_body: 2 * 1024 * 1024,
3190 max_response_body: 16,
3191 max_inflight_requests: 1024,
3192 };
3193 let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3194
3195 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3196 let token = tokio_util::sync::CancellationToken::new();
3197 let ctx = ConsumerContext::new(tx, token.clone());
3198 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3199 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3200
3201 let client = reqwest::Client::new();
3202 let send_fut = client
3203 .get(format!("http://127.0.0.1:{port}/limit-bytes"))
3204 .send();
3205
3206 let (http_result, _) = tokio::join!(send_fut, async {
3207 if let Some(mut envelope) = rx.recv().await {
3208 envelope.exchange.input.body =
3209 camel_component_api::Body::Bytes(bytes::Bytes::from(vec![b'x'; 32]));
3210 if let Some(reply_tx) = envelope.reply_tx {
3211 let _ = reply_tx.send(Ok(envelope.exchange));
3212 }
3213 }
3214 });
3215
3216 let resp = http_result.unwrap();
3217 assert_eq!(resp.status().as_u16(), 500);
3218 let body = resp.text().await.unwrap();
3219 assert_eq!(body, "Response body exceeds configured limit");
3220 token.cancel();
3221 }
3222
3223 #[tokio::test]
3224 async fn test_http_consumer_enforces_max_response_body_for_json() {
3225 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3226
3227 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3228
3229 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3230 let port = listener.local_addr().unwrap().port();
3231 drop(listener);
3232
3233 let consumer_cfg = HttpServerConfig {
3234 host: "127.0.0.1".to_string(),
3235 port,
3236 path: "/limit-json".to_string(),
3237 max_request_body: 2 * 1024 * 1024,
3238 max_response_body: 16,
3239 max_inflight_requests: 1024,
3240 };
3241 let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3242
3243 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3244 let token = tokio_util::sync::CancellationToken::new();
3245 let ctx = ConsumerContext::new(tx, token.clone());
3246 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3247 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3248
3249 let client = reqwest::Client::new();
3250 let send_fut = client
3251 .get(format!("http://127.0.0.1:{port}/limit-json"))
3252 .send();
3253
3254 let (http_result, _) = tokio::join!(send_fut, async {
3255 if let Some(mut envelope) = rx.recv().await {
3256 envelope.exchange.input.body = camel_component_api::Body::Json(
3257 serde_json::json!({"message":"this response is bigger than sixteen"}),
3258 );
3259 if let Some(reply_tx) = envelope.reply_tx {
3260 let _ = reply_tx.send(Ok(envelope.exchange));
3261 }
3262 }
3263 });
3264
3265 let resp = http_result.unwrap();
3266 assert_eq!(resp.status().as_u16(), 500);
3267 let body = resp.text().await.unwrap();
3268 assert_eq!(body, "Response body exceeds configured limit");
3269 token.cancel();
3270 }
3271
3272 #[tokio::test]
3273 async fn test_http_consumer_enforces_max_response_body_for_xml() {
3274 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3275
3276 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3277
3278 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3279 let port = listener.local_addr().unwrap().port();
3280 drop(listener);
3281
3282 let consumer_cfg = HttpServerConfig {
3283 host: "127.0.0.1".to_string(),
3284 port,
3285 path: "/limit-xml".to_string(),
3286 max_request_body: 2 * 1024 * 1024,
3287 max_response_body: 16,
3288 max_inflight_requests: 1024,
3289 };
3290 let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3291
3292 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3293 let token = tokio_util::sync::CancellationToken::new();
3294 let ctx = ConsumerContext::new(tx, token.clone());
3295 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3296 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3297
3298 let client = reqwest::Client::new();
3299 let send_fut = client
3300 .get(format!("http://127.0.0.1:{port}/limit-xml"))
3301 .send();
3302
3303 let (http_result, _) = tokio::join!(send_fut, async {
3304 if let Some(mut envelope) = rx.recv().await {
3305 envelope.exchange.input.body = camel_component_api::Body::Xml(
3306 "<root><value>way-too-large</value></root>".into(),
3307 );
3308 if let Some(reply_tx) = envelope.reply_tx {
3309 let _ = reply_tx.send(Ok(envelope.exchange));
3310 }
3311 }
3312 });
3313
3314 let resp = http_result.unwrap();
3315 assert_eq!(resp.status().as_u16(), 500);
3316 let body = resp.text().await.unwrap();
3317 assert_eq!(body, "Response body exceeds configured limit");
3318 token.cancel();
3319 }
3320
3321 #[tokio::test]
3322 async fn test_http_consumer_does_not_enforce_max_response_body_for_stream() {
3323 use camel_component_api::{
3324 CamelError, ConsumerContext, ExchangeEnvelope, StreamBody, StreamMetadata,
3325 };
3326 use futures::stream;
3327
3328 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3329
3330 let listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();
3331 let port = listener.local_addr().unwrap().port();
3332 drop(listener);
3333
3334 let consumer_cfg = HttpServerConfig {
3335 host: "0.0.0.0".to_string(),
3336 port,
3337 path: "/limit-stream".to_string(),
3338 max_request_body: 2 * 1024 * 1024,
3339 max_response_body: 16,
3340 max_inflight_requests: 1024,
3341 };
3342 let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
3343
3344 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3345 let token = tokio_util::sync::CancellationToken::new();
3346 let ctx = ConsumerContext::new(tx, token.clone());
3347 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3348 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3349
3350 let client = reqwest::Client::new();
3351 let send_fut = client
3352 .get(format!("http://127.0.0.1:{port}/limit-stream"))
3353 .send();
3354
3355 let (http_result, _) = tokio::join!(send_fut, async {
3356 if let Some(mut envelope) = rx.recv().await {
3357 let chunks: Vec<Result<bytes::Bytes, CamelError>> =
3358 vec![Ok(bytes::Bytes::from(vec![b'x'; 32]))];
3359 let stream = Box::pin(stream::iter(chunks));
3360 envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
3361 stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
3362 metadata: StreamMetadata {
3363 size_hint: Some(32),
3364 content_type: Some("application/octet-stream".into()),
3365 origin: None,
3366 },
3367 });
3368 if let Some(reply_tx) = envelope.reply_tx {
3369 let _ = reply_tx.send(Ok(envelope.exchange));
3370 }
3371 }
3372 });
3373
3374 let resp = http_result.unwrap();
3375 assert_eq!(resp.status().as_u16(), 200);
3376 let body = resp.bytes().await.unwrap();
3377 assert_eq!(body.len(), 32);
3378 token.cancel();
3379 }
3380
3381 #[tokio::test]
3386 async fn test_integration_single_consumer_round_trip() {
3387 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3388
3389 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3391 let port = listener.local_addr().unwrap().port();
3392 drop(listener); let component = HttpComponent::new();
3395 let endpoint_ctx = NoOpComponentContext;
3396 let endpoint = component
3397 .create_endpoint(&format!("http://127.0.0.1:{port}/echo"), &endpoint_ctx)
3398 .unwrap();
3399 let mut consumer = endpoint.create_consumer(rt()).unwrap();
3400
3401 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3402 let token = tokio_util::sync::CancellationToken::new();
3403 let ctx = ConsumerContext::new(tx, token.clone());
3404
3405 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3406 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3407
3408 let client = reqwest::Client::new();
3409 let send_fut = client
3410 .post(format!("http://127.0.0.1:{port}/echo"))
3411 .header("Content-Type", "text/plain")
3412 .body("ping")
3413 .send();
3414
3415 let (http_result, _) = tokio::join!(send_fut, async {
3416 if let Some(mut envelope) = rx.recv().await {
3417 assert_eq!(
3418 envelope.exchange.input.header("CamelHttpMethod"),
3419 Some(&serde_json::Value::String("POST".into()))
3420 );
3421 assert_eq!(
3422 envelope.exchange.input.header("CamelHttpPath"),
3423 Some(&serde_json::Value::String("/echo".into()))
3424 );
3425 envelope.exchange.input.body = camel_component_api::Body::Text("pong".to_string());
3426 if let Some(reply_tx) = envelope.reply_tx {
3427 let _ = reply_tx.send(Ok(envelope.exchange));
3428 }
3429 }
3430 });
3431
3432 let resp = http_result.unwrap();
3433 assert_eq!(resp.status().as_u16(), 200);
3434 let body = resp.text().await.unwrap();
3435 assert_eq!(body, "pong");
3436
3437 token.cancel();
3438 }
3439
3440 #[tokio::test]
3441 async fn test_integration_two_consumers_shared_port() {
3442 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3443
3444 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
3445
3446 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3448 let port = listener.local_addr().unwrap().port();
3449 drop(listener);
3450
3451 let component = HttpComponent::new();
3452 let endpoint_ctx = NoOpComponentContext;
3453
3454 let endpoint_a = component
3456 .create_endpoint(&format!("http://127.0.0.1:{port}/hello"), &endpoint_ctx)
3457 .unwrap();
3458 let mut consumer_a = endpoint_a.create_consumer(rt()).unwrap();
3459
3460 let endpoint_b = component
3462 .create_endpoint(&format!("http://127.0.0.1:{port}/world"), &endpoint_ctx)
3463 .unwrap();
3464 let mut consumer_b = endpoint_b.create_consumer(rt()).unwrap();
3465
3466 let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3467 let token_a = tokio_util::sync::CancellationToken::new();
3468 let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
3469
3470 let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3471 let token_b = tokio_util::sync::CancellationToken::new();
3472 let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
3473
3474 tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
3475 tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
3476 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3477
3478 let client = reqwest::Client::new();
3479
3480 let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
3482 let (resp_hello, _) = tokio::join!(fut_hello, async {
3483 if let Some(mut envelope) = rx_a.recv().await {
3484 envelope.exchange.input.body =
3485 camel_component_api::Body::Text("hello-response".to_string());
3486 if let Some(reply_tx) = envelope.reply_tx {
3487 let _ = reply_tx.send(Ok(envelope.exchange));
3488 }
3489 }
3490 });
3491
3492 let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
3494 let (resp_world, _) = tokio::join!(fut_world, async {
3495 if let Some(mut envelope) = rx_b.recv().await {
3496 envelope.exchange.input.body =
3497 camel_component_api::Body::Text("world-response".to_string());
3498 if let Some(reply_tx) = envelope.reply_tx {
3499 let _ = reply_tx.send(Ok(envelope.exchange));
3500 }
3501 }
3502 });
3503
3504 let body_a = resp_hello.unwrap().text().await.unwrap();
3505 let body_b = resp_world.unwrap().text().await.unwrap();
3506
3507 assert_eq!(body_a, "hello-response");
3508 assert_eq!(body_b, "world-response");
3509
3510 token_a.cancel();
3511 token_b.cancel();
3512 }
3513
3514 #[tokio::test]
3515 async fn test_integration_unregistered_path_returns_404() {
3516 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3517
3518 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3520 let port = listener.local_addr().unwrap().port();
3521 drop(listener);
3522
3523 let component = HttpComponent::new();
3524 let endpoint_ctx = NoOpComponentContext;
3525 let endpoint = component
3526 .create_endpoint(
3527 &format!("http://127.0.0.1:{port}/registered"),
3528 &endpoint_ctx,
3529 )
3530 .unwrap();
3531 let mut consumer = endpoint.create_consumer(rt()).unwrap();
3532
3533 let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3534 let token = tokio_util::sync::CancellationToken::new();
3535 let ctx = ConsumerContext::new(tx, token.clone());
3536
3537 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3538
3539 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
3541 loop {
3542 if tokio::net::TcpStream::connect(format!("127.0.0.1:{port}"))
3543 .await
3544 .is_ok()
3545 {
3546 break;
3547 }
3548 if std::time::Instant::now() >= deadline {
3549 panic!("HTTP server did not start within 5s on port {port}");
3550 }
3551 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3552 }
3553
3554 let client = reqwest::Client::new();
3555 let resp = client
3556 .get(format!("http://127.0.0.1:{port}/not-there"))
3557 .send()
3558 .await
3559 .unwrap();
3560 assert_eq!(resp.status().as_u16(), 404);
3561
3562 token.cancel();
3563 }
3564
3565 #[test]
3566 fn test_http_consumer_declares_concurrent() {
3567 use camel_component_api::ConcurrencyModel;
3568
3569 let config = HttpServerConfig {
3570 host: "127.0.0.1".to_string(),
3571 port: 19999,
3572 path: "/test".to_string(),
3573 max_request_body: 2 * 1024 * 1024,
3574 max_response_body: 10 * 1024 * 1024,
3575 max_inflight_requests: 1024,
3576 };
3577 let consumer = HttpConsumer::new(config, test_rt());
3578 assert_eq!(
3579 consumer.concurrency_model(),
3580 ConcurrencyModel::Concurrent { max: None }
3581 );
3582 }
3583
3584 #[tokio::test]
3589 async fn test_http_reply_body_stream_variant_exists() {
3590 use bytes::Bytes;
3591 use camel_component_api::CamelError;
3592 use futures::stream;
3593
3594 let chunks: Vec<Result<Bytes, CamelError>> =
3595 vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
3596 let stream = Box::pin(stream::iter(chunks));
3597 let reply_body = HttpReplyBody::Stream(stream);
3598 match reply_body {
3600 HttpReplyBody::Stream(_) => {}
3601 HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
3602 }
3603 }
3604
3605 #[cfg(feature = "otel")]
3610 mod otel_tests {
3611 use super::*;
3612 use camel_component_api::Message;
3613 use tower::ServiceExt;
3614
3615 #[tokio::test]
3616 async fn test_producer_injects_traceparent_header() {
3617 let (url, _handle) = start_test_server_with_header_capture().await;
3618 let ctx = test_producer_ctx();
3619
3620 let component = HttpComponent::new();
3621 let endpoint_ctx = NoOpComponentContext;
3622 let endpoint = component
3623 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3624 .unwrap();
3625 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
3626
3627 let mut exchange = Exchange::new(Message::default());
3629 let mut headers = std::collections::HashMap::new();
3630 headers.insert(
3631 "traceparent".to_string(),
3632 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
3633 );
3634 camel_otel::extract_into_exchange(&mut exchange, &headers);
3635
3636 let result = producer.oneshot(exchange).await.unwrap();
3637
3638 let status = result
3640 .input
3641 .header("CamelHttpResponseCode")
3642 .and_then(|v| v.as_u64())
3643 .unwrap();
3644 assert_eq!(status, 200);
3645
3646 let traceparent = result.input.header("X-Received-Traceparent");
3648 assert!(
3649 traceparent.is_some(),
3650 "traceparent header should have been sent"
3651 );
3652
3653 let traceparent_str = traceparent.unwrap().as_str().unwrap();
3654 let parts: Vec<&str> = traceparent_str.split('-').collect();
3656 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3657 assert_eq!(parts[0], "00", "version should be 00");
3658 assert_eq!(
3659 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3660 "trace-id should match"
3661 );
3662 assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
3663 assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
3664 }
3665
3666 #[tokio::test]
3667 async fn test_consumer_extracts_traceparent_header() {
3668 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3669
3670 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3672 let port = listener.local_addr().unwrap().port();
3673 drop(listener);
3674
3675 let component = HttpComponent::new();
3676 let endpoint_ctx = NoOpComponentContext;
3677 let endpoint = component
3678 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
3679 .unwrap();
3680 let mut consumer = endpoint.create_consumer(rt()).unwrap();
3681
3682 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3683 let token = tokio_util::sync::CancellationToken::new();
3684 let ctx = ConsumerContext::new(tx, token.clone());
3685
3686 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3687 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3688
3689 let client = reqwest::Client::new();
3691 let send_fut = client
3692 .post(format!("http://127.0.0.1:{port}/trace"))
3693 .header(
3694 "traceparent",
3695 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3696 )
3697 .body("test")
3698 .send();
3699
3700 let (http_result, _) = tokio::join!(send_fut, async {
3701 if let Some(envelope) = rx.recv().await {
3702 let mut injected_headers = std::collections::HashMap::new();
3705 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3706
3707 assert!(
3708 injected_headers.contains_key("traceparent"),
3709 "Exchange should have traceparent after extraction"
3710 );
3711
3712 let traceparent = injected_headers.get("traceparent").unwrap();
3713 let parts: Vec<&str> = traceparent.split('-').collect();
3714 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3715 assert_eq!(
3716 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3717 "Trace ID should match the original traceparent header"
3718 );
3719
3720 if let Some(reply_tx) = envelope.reply_tx {
3721 let _ = reply_tx.send(Ok(envelope.exchange));
3722 }
3723 }
3724 });
3725
3726 let resp = http_result.unwrap();
3727 assert_eq!(resp.status().as_u16(), 200);
3728
3729 token.cancel();
3730 }
3731
3732 #[tokio::test]
3733 async fn test_consumer_extracts_mixed_case_traceparent_header() {
3734 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3735
3736 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3738 let port = listener.local_addr().unwrap().port();
3739 drop(listener);
3740
3741 let component = HttpComponent::new();
3742 let endpoint_ctx = NoOpComponentContext;
3743 let endpoint = component
3744 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
3745 .unwrap();
3746 let mut consumer = endpoint.create_consumer(rt()).unwrap();
3747
3748 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3749 let token = tokio_util::sync::CancellationToken::new();
3750 let ctx = ConsumerContext::new(tx, token.clone());
3751
3752 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3753 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3754
3755 let client = reqwest::Client::new();
3757 let send_fut = client
3758 .post(format!("http://127.0.0.1:{port}/trace"))
3759 .header(
3760 "TraceParent",
3761 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3762 )
3763 .body("test")
3764 .send();
3765
3766 let (http_result, _) = tokio::join!(send_fut, async {
3767 if let Some(envelope) = rx.recv().await {
3768 let mut injected_headers = HashMap::new();
3771 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3772
3773 assert!(
3774 injected_headers.contains_key("traceparent"),
3775 "Exchange should have traceparent after extraction from mixed-case header"
3776 );
3777
3778 let traceparent = injected_headers.get("traceparent").unwrap();
3779 let parts: Vec<&str> = traceparent.split('-').collect();
3780 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3781 assert_eq!(
3782 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3783 "Trace ID should match the original mixed-case TraceParent header"
3784 );
3785
3786 if let Some(reply_tx) = envelope.reply_tx {
3787 let _ = reply_tx.send(Ok(envelope.exchange));
3788 }
3789 }
3790 });
3791
3792 let resp = http_result.unwrap();
3793 assert_eq!(resp.status().as_u16(), 200);
3794
3795 token.cancel();
3796 }
3797
3798 #[tokio::test]
3799 async fn test_producer_no_trace_context_no_crash() {
3800 let (url, _handle) = start_test_server().await;
3801 let ctx = test_producer_ctx();
3802
3803 let component = HttpComponent::new();
3804 let endpoint_ctx = NoOpComponentContext;
3805 let endpoint = component
3806 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3807 .unwrap();
3808 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
3809
3810 let exchange = Exchange::new(Message::default());
3812
3813 let result = producer.oneshot(exchange).await.unwrap();
3815
3816 let status = result
3818 .input
3819 .header("CamelHttpResponseCode")
3820 .and_then(|v| v.as_u64())
3821 .unwrap();
3822 assert_eq!(status, 200);
3823 }
3824
3825 async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
3827 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3828 let addr = listener.local_addr().unwrap();
3829 let url = format!("http://127.0.0.1:{}", addr.port());
3830
3831 let handle = tokio::spawn(async move {
3832 loop {
3833 if let Ok((mut stream, _)) = listener.accept().await {
3834 tokio::spawn(async move {
3835 use tokio::io::{AsyncReadExt, AsyncWriteExt};
3836 let mut buf = vec![0u8; 8192];
3837 let n = stream.read(&mut buf).await.unwrap_or(0);
3838 let request = String::from_utf8_lossy(&buf[..n]).to_string();
3839
3840 let traceparent = request
3842 .lines()
3843 .find(|line| line.to_lowercase().starts_with("traceparent:"))
3844 .map(|line| {
3845 line.split(':')
3846 .nth(1)
3847 .map(|s| s.trim().to_string())
3848 .unwrap_or_default()
3849 })
3850 .unwrap_or_default();
3851
3852 let body =
3853 format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
3854 let response = format!(
3855 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
3856 body.len(),
3857 traceparent,
3858 body
3859 );
3860 let _ = stream.write_all(response.as_bytes()).await;
3861 });
3862 }
3863 }
3864 });
3865
3866 (url, handle)
3867 }
3868 }
3869
3870 #[tokio::test]
3879 async fn test_request_body_arrives_as_stream() {
3880 use camel_component_api::Body;
3881 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3882
3883 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3884 let port = listener.local_addr().unwrap().port();
3885 drop(listener);
3886
3887 let component = HttpComponent::new();
3888 let endpoint_ctx = NoOpComponentContext;
3889 let endpoint = component
3890 .create_endpoint(&format!("http://127.0.0.1:{port}/upload"), &endpoint_ctx)
3891 .unwrap();
3892 let mut consumer = endpoint.create_consumer(rt()).unwrap();
3893
3894 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3895 let token = tokio_util::sync::CancellationToken::new();
3896 let ctx = ConsumerContext::new(tx, token.clone());
3897
3898 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3899 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3900
3901 let client = reqwest::Client::new();
3902 let send_fut = client
3903 .post(format!("http://127.0.0.1:{port}/upload"))
3904 .body("hello streaming world")
3905 .send();
3906
3907 let (http_result, _) = tokio::join!(send_fut, async {
3908 if let Some(mut envelope) = rx.recv().await {
3909 assert!(
3911 matches!(envelope.exchange.input.body, Body::Stream(_)),
3912 "expected Body::Stream, got discriminant {:?}",
3913 std::mem::discriminant(&envelope.exchange.input.body)
3914 );
3915 let bytes = envelope
3917 .exchange
3918 .input
3919 .body
3920 .into_bytes(1024 * 1024)
3921 .await
3922 .unwrap();
3923 assert_eq!(&bytes[..], b"hello streaming world");
3924
3925 envelope.exchange.input.body = camel_component_api::Body::Empty;
3926 if let Some(reply_tx) = envelope.reply_tx {
3927 let _ = reply_tx.send(Ok(envelope.exchange));
3928 }
3929 }
3930 });
3931
3932 let resp = http_result.unwrap();
3933 assert_eq!(resp.status().as_u16(), 200);
3934
3935 token.cancel();
3936 }
3937
3938 #[tokio::test]
3943 async fn test_streaming_response_chunked() {
3944 use bytes::Bytes;
3945 use camel_component_api::Body;
3946 use camel_component_api::CamelError;
3947 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3948 use camel_component_api::{StreamBody, StreamMetadata};
3949 use futures::stream;
3950 use std::sync::Arc;
3951 use tokio::sync::Mutex;
3952
3953 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3954 let port = listener.local_addr().unwrap().port();
3955 drop(listener);
3956
3957 let component = HttpComponent::new();
3958 let endpoint_ctx = NoOpComponentContext;
3959 let endpoint = component
3960 .create_endpoint(&format!("http://127.0.0.1:{port}/stream"), &endpoint_ctx)
3961 .unwrap();
3962 let mut consumer = endpoint.create_consumer(rt()).unwrap();
3963
3964 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3965 let token = tokio_util::sync::CancellationToken::new();
3966 let ctx = ConsumerContext::new(tx, token.clone());
3967
3968 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3969 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3970
3971 let client = reqwest::Client::new();
3972 let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
3973
3974 let (http_result, _) = tokio::join!(send_fut, async {
3975 if let Some(mut envelope) = rx.recv().await {
3976 let chunks: Vec<Result<Bytes, CamelError>> =
3978 vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
3979 let stream = Box::pin(stream::iter(chunks));
3980 envelope.exchange.input.body = Body::Stream(StreamBody {
3981 stream: Arc::new(Mutex::new(Some(stream))),
3982 metadata: StreamMetadata::default(),
3983 });
3984 if let Some(reply_tx) = envelope.reply_tx {
3985 let _ = reply_tx.send(Ok(envelope.exchange));
3986 }
3987 }
3988 });
3989
3990 let resp = http_result.unwrap();
3991 assert_eq!(resp.status().as_u16(), 200);
3992 let body = resp.text().await.unwrap();
3993 assert_eq!(body, "chunk1chunk2");
3994
3995 token.cancel();
3996 }
3997
3998 #[tokio::test]
4003 async fn test_413_when_content_length_exceeds_limit() {
4004 use camel_component_api::ConsumerContext;
4005
4006 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4007 let port = listener.local_addr().unwrap().port();
4008 drop(listener);
4009
4010 let component = HttpComponent::new();
4012 let endpoint_ctx = NoOpComponentContext;
4013 let endpoint = component
4014 .create_endpoint(
4015 &format!("http://127.0.0.1:{port}/upload?maxRequestBody=100"),
4016 &endpoint_ctx,
4017 )
4018 .unwrap();
4019 let mut consumer = endpoint.create_consumer(rt()).unwrap();
4020
4021 let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4022 let token = tokio_util::sync::CancellationToken::new();
4023 let ctx = ConsumerContext::new(tx, token.clone());
4024
4025 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4026 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4027
4028 let client = reqwest::Client::new();
4029 let resp = client
4030 .post(format!("http://127.0.0.1:{port}/upload"))
4031 .header("Content-Length", "1000") .body("x".repeat(1000))
4033 .send()
4034 .await
4035 .unwrap();
4036
4037 assert_eq!(resp.status().as_u16(), 413);
4038
4039 token.cancel();
4040 }
4041
4042 #[tokio::test]
4046 async fn test_chunked_upload_without_content_length_bypasses_limit() {
4047 use bytes::Bytes;
4048 use camel_component_api::Body;
4049 use camel_component_api::ConsumerContext;
4050 use futures::stream;
4051
4052 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4053 let port = listener.local_addr().unwrap().port();
4054 drop(listener);
4055
4056 let component = HttpComponent::new();
4058 let endpoint_ctx = NoOpComponentContext;
4059 let endpoint = component
4060 .create_endpoint(
4061 &format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"),
4062 &endpoint_ctx,
4063 )
4064 .unwrap();
4065 let mut consumer = endpoint.create_consumer(rt()).unwrap();
4066
4067 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4068 let token = tokio_util::sync::CancellationToken::new();
4069 let ctx = ConsumerContext::new(tx, token.clone());
4070
4071 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4072 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4073
4074 let client = reqwest::Client::new();
4075
4076 let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
4080 Ok(Bytes::from("y".repeat(50))),
4081 Ok(Bytes::from("y".repeat(50))),
4082 ];
4083 let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
4084 let send_fut = client
4085 .post(format!("http://127.0.0.1:{port}/upload"))
4086 .body(stream_body)
4087 .send();
4088
4089 let consumer_fut = async {
4090 match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
4092 Ok(Some(mut envelope)) => {
4093 assert!(
4094 matches!(envelope.exchange.input.body, Body::Stream(_)),
4095 "expected Body::Stream"
4096 );
4097 envelope.exchange.input.body = camel_component_api::Body::Empty;
4098 if let Some(reply_tx) = envelope.reply_tx {
4099 let _ = reply_tx.send(Ok(envelope.exchange));
4100 }
4101 }
4102 Ok(None) => panic!("consumer channel closed unexpectedly"),
4103 Err(_) => {
4104 }
4107 }
4108 };
4109
4110 let (http_result, _) = tokio::join!(send_fut, consumer_fut);
4111
4112 let resp = http_result.unwrap();
4113 assert_ne!(
4115 resp.status().as_u16(),
4116 413,
4117 "chunked upload must not be rejected by maxRequestBody"
4118 );
4119 assert_eq!(resp.status().as_u16(), 200);
4120
4121 token.cancel();
4122 }
4123
4124 #[test]
4125 fn test_validate_url_for_ssrf_blocks_and_allows_hosts() {
4126 let mut cfg = HttpEndpointConfig::from_uri("http://example.com").unwrap();
4127 cfg.blocked_hosts = vec!["blocked.local".to_string()];
4128 cfg.allow_private_ips = false;
4129
4130 let blocked = validate_url_for_ssrf("http://blocked.local/api", &cfg);
4131 assert!(blocked.is_err());
4132
4133 let private_ip = validate_url_for_ssrf("http://127.0.0.1/api", &cfg);
4134 assert!(private_ip.is_err());
4135
4136 cfg.allow_private_ips = true;
4137 let allowed = validate_url_for_ssrf("http://127.0.0.1/api", &cfg);
4138 assert!(allowed.is_ok());
4139 }
4140
4141 #[test]
4142 fn test_is_private_ip_ranges() {
4143 assert!(is_private_ip(&"10.0.0.1".parse().unwrap())); assert!(is_private_ip(&"172.16.1.10".parse().unwrap())); assert!(is_private_ip(&"192.168.1.1".parse().unwrap())); assert!(is_private_ip(&"127.0.0.1".parse().unwrap())); assert!(is_private_ip(&"169.254.1.1".parse().unwrap())); assert!(is_private_ip(&"0.1.2.3".parse().unwrap())); assert!(is_private_ip(&"::1".parse().unwrap())); assert!(is_private_ip(&"fc00::1".parse().unwrap())); assert!(is_private_ip(&"fd12::1".parse().unwrap())); assert!(is_private_ip(&"fe80::1".parse().unwrap())); assert!(is_private_ip(&"::ffff:10.0.0.1".parse().unwrap())); assert!(is_private_ip(&"::ffff:192.168.1.1".parse().unwrap())); assert!(is_private_ip(&"::ffff:127.0.0.1".parse().unwrap())); assert!(!is_private_ip(&"8.8.8.8".parse().unwrap())); assert!(!is_private_ip(&"::ffff:8.8.8.8".parse().unwrap())); assert!(!is_private_ip(&"2001:4860:4860::8888".parse().unwrap())); }
4163
4164 #[tokio::test]
4165 async fn test_validate_resolved_host_for_ssrf_blocks_resolved_private_ip() {
4166 let mut cfg = HttpEndpointConfig::from_uri("http://example.com").unwrap(); cfg.allow_private_ips = false;
4168
4169 let err = validate_resolved_host_for_ssrf("http://localhost", &cfg)
4170 .await
4171 .expect_err("localhost must resolve to loopback and be blocked");
4172
4173 let msg = err.to_string();
4174 assert!(msg.contains("Target resolved to private IP"));
4175 }
4176
4177 #[test]
4178 fn test_title_case_header() {
4179 assert_eq!(title_case_header("content-type"), "Content-Type");
4180 assert_eq!(title_case_header("authorization"), "Authorization");
4181 assert_eq!(title_case_header("x-custom-header"), "X-Custom-Header");
4182 assert_eq!(title_case_header("host"), "Host");
4183 assert_eq!(title_case_header("x-b3-traceid"), "X-B3-Traceid");
4184 assert_eq!(title_case_header("single"), "Single");
4185 assert_eq!(title_case_header(""), "");
4186 }
4187
4188 #[test]
4189 fn test_resolve_url_combines_path_and_query_sources() {
4190 let cfg = HttpEndpointConfig::from_uri("http://example.com/base?foo=bar").unwrap();
4191 let mut exchange = Exchange::new(Message::default());
4192 exchange.input.set_header(
4193 "CamelHttpPath",
4194 serde_json::Value::String("next".to_string()),
4195 );
4196 let url = HttpProducer::resolve_url(&exchange, &cfg);
4197 assert!(url.starts_with("http://example.com/base/next?"));
4198 assert!(url.contains("foo=bar"));
4199
4200 exchange.input.set_header(
4201 "CamelHttpUri",
4202 serde_json::Value::String("http://other.test/root".to_string()),
4203 );
4204 exchange.input.set_header(
4205 "CamelHttpQuery",
4206 serde_json::Value::String("a=1&b=2".to_string()),
4207 );
4208
4209 let override_url = HttpProducer::resolve_url(&exchange, &cfg);
4210 assert_eq!(override_url, "http://other.test/root/next?a=1&b=2");
4211 }
4212
4213 #[test]
4214 fn test_http_producer_helpers_status_and_size_boundaries() {
4215 assert!(HttpProducer::is_ok_status(200, (200, 299)));
4216 assert!(HttpProducer::is_ok_status(299, (200, 299)));
4217 assert!(!HttpProducer::is_ok_status(199, (200, 299)));
4218 assert!(!HttpProducer::is_ok_status(300, (200, 299)));
4219
4220 assert!(!exceeds_max_response_body(10, 10));
4221 assert!(exceeds_max_response_body(11, 10));
4222 }
4223
4224 async fn setup_consumer_on_free_port(
4229 path: &str,
4230 ) -> (
4231 u16,
4232 tokio::sync::mpsc::Receiver<camel_component_api::ExchangeEnvelope>,
4233 tokio_util::sync::CancellationToken,
4234 ) {
4235 use camel_component_api::ConsumerContext;
4236
4237 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4238 let port = listener.local_addr().unwrap().port();
4239 drop(listener);
4240
4241 let consumer_cfg = HttpServerConfig {
4242 host: "127.0.0.1".to_string(),
4243 port,
4244 path: path.to_string(),
4245 max_request_body: 2 * 1024 * 1024,
4246 max_response_body: 10 * 1024 * 1024,
4247 max_inflight_requests: 1024,
4248 };
4249 let mut consumer = HttpConsumer::new(consumer_cfg, test_rt());
4250
4251 let (tx, rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4252 let token = tokio_util::sync::CancellationToken::new();
4253 let ctx = ConsumerContext::new(tx, token.clone());
4254
4255 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4256 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4257
4258 (port, rx, token)
4259 }
4260
4261 #[tokio::test]
4262 async fn test_content_type_inferred_for_json_body() {
4263 let (port, mut rx, token) = setup_consumer_on_free_port("/json").await;
4264
4265 let client = reqwest::Client::new();
4266 let send_fut = client.get(format!("http://127.0.0.1:{port}/json")).send();
4267
4268 let (http_result, _) = tokio::join!(send_fut, async {
4269 if let Some(mut envelope) = rx.recv().await {
4270 envelope.exchange.input.body =
4271 camel_component_api::Body::Json(serde_json::json!({"message": "hello"}));
4272 if let Some(reply_tx) = envelope.reply_tx {
4273 let _ = reply_tx.send(Ok(envelope.exchange));
4274 }
4275 }
4276 });
4277
4278 let resp = http_result.unwrap();
4279 assert_eq!(resp.status().as_u16(), 200);
4280 let ct = resp
4281 .headers()
4282 .get("content-type")
4283 .expect("Content-Type header should be present");
4284 assert_eq!(ct, "application/json");
4285 let body = resp.text().await.unwrap();
4286 assert_eq!(body, r#"{"message":"hello"}"#);
4287
4288 token.cancel();
4289 }
4290
4291 #[tokio::test]
4292 async fn test_content_type_inferred_for_text_body() {
4293 let (port, mut rx, token) = setup_consumer_on_free_port("/text").await;
4294
4295 let client = reqwest::Client::new();
4296 let send_fut = client.get(format!("http://127.0.0.1:{port}/text")).send();
4297
4298 let (http_result, _) = tokio::join!(send_fut, async {
4299 if let Some(mut envelope) = rx.recv().await {
4300 envelope.exchange.input.body =
4301 camel_component_api::Body::Text("plain text response".to_string());
4302 if let Some(reply_tx) = envelope.reply_tx {
4303 let _ = reply_tx.send(Ok(envelope.exchange));
4304 }
4305 }
4306 });
4307
4308 let resp = http_result.unwrap();
4309 assert_eq!(resp.status().as_u16(), 200);
4310 let ct = resp
4311 .headers()
4312 .get("content-type")
4313 .expect("Content-Type header should be present");
4314 assert_eq!(ct, "text/plain; charset=utf-8");
4315 let body = resp.text().await.unwrap();
4316 assert_eq!(body, "plain text response");
4317
4318 token.cancel();
4319 }
4320
4321 #[tokio::test]
4322 async fn test_content_type_inferred_for_xml_body() {
4323 let (port, mut rx, token) = setup_consumer_on_free_port("/xml").await;
4324
4325 let client = reqwest::Client::new();
4326 let send_fut = client.get(format!("http://127.0.0.1:{port}/xml")).send();
4327
4328 let (http_result, _) = tokio::join!(send_fut, async {
4329 if let Some(mut envelope) = rx.recv().await {
4330 envelope.exchange.input.body =
4331 camel_component_api::Body::Xml("<root><item>value</item></root>".to_string());
4332 if let Some(reply_tx) = envelope.reply_tx {
4333 let _ = reply_tx.send(Ok(envelope.exchange));
4334 }
4335 }
4336 });
4337
4338 let resp = http_result.unwrap();
4339 assert_eq!(resp.status().as_u16(), 200);
4340 let ct = resp
4341 .headers()
4342 .get("content-type")
4343 .expect("Content-Type header should be present");
4344 assert_eq!(ct, "application/xml");
4345 let body = resp.text().await.unwrap();
4346 assert_eq!(body, "<root><item>value</item></root>");
4347
4348 token.cancel();
4349 }
4350
4351 #[tokio::test]
4352 async fn test_no_content_type_for_empty_body() {
4353 let (port, mut rx, token) = setup_consumer_on_free_port("/empty").await;
4354
4355 let client = reqwest::Client::new();
4356 let send_fut = client.get(format!("http://127.0.0.1:{port}/empty")).send();
4357
4358 let (http_result, _) = tokio::join!(send_fut, async {
4359 if let Some(mut envelope) = rx.recv().await {
4360 envelope.exchange.input.body = camel_component_api::Body::Empty;
4361 if let Some(reply_tx) = envelope.reply_tx {
4362 let _ = reply_tx.send(Ok(envelope.exchange));
4363 }
4364 }
4365 });
4366
4367 let resp = http_result.unwrap();
4368 assert_eq!(resp.status().as_u16(), 200);
4369 assert!(
4370 resp.headers().get("content-type").is_none(),
4371 "Empty body should not set Content-Type"
4372 );
4373
4374 token.cancel();
4375 }
4376
4377 #[tokio::test]
4378 async fn test_no_content_type_for_raw_bytes_body() {
4379 let (port, mut rx, token) = setup_consumer_on_free_port("/bytes").await;
4380
4381 let client = reqwest::Client::new();
4382 let send_fut = client.get(format!("http://127.0.0.1:{port}/bytes")).send();
4383
4384 let (http_result, _) = tokio::join!(send_fut, async {
4385 if let Some(mut envelope) = rx.recv().await {
4386 envelope.exchange.input.body =
4387 camel_component_api::Body::Bytes(bytes::Bytes::from_static(b"\x00\x01\x02"));
4388 if let Some(reply_tx) = envelope.reply_tx {
4389 let _ = reply_tx.send(Ok(envelope.exchange));
4390 }
4391 }
4392 });
4393
4394 let resp = http_result.unwrap();
4395 assert_eq!(resp.status().as_u16(), 200);
4396 assert!(
4397 resp.headers().get("content-type").is_none(),
4398 "Raw Bytes body should not set Content-Type"
4399 );
4400
4401 token.cancel();
4402 }
4403
4404 #[tokio::test]
4405 async fn test_content_type_from_stream_metadata() {
4406 use camel_component_api::{StreamBody, StreamMetadata};
4407 use futures::stream;
4408
4409 let (port, mut rx, token) = setup_consumer_on_free_port("/stream-ct").await;
4410
4411 let client = reqwest::Client::new();
4412 let send_fut = client
4413 .get(format!("http://127.0.0.1:{port}/stream-ct"))
4414 .send();
4415
4416 let (http_result, _) = tokio::join!(send_fut, async {
4417 if let Some(mut envelope) = rx.recv().await {
4418 let chunks: Vec<Result<bytes::Bytes, CamelError>> =
4419 vec![Ok(bytes::Bytes::from("audio data"))];
4420 let stream = Box::pin(stream::iter(chunks));
4421 envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
4422 stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
4423 metadata: StreamMetadata {
4424 size_hint: None,
4425 content_type: Some("audio/mpeg".to_string()),
4426 origin: None,
4427 },
4428 });
4429 if let Some(reply_tx) = envelope.reply_tx {
4430 let _ = reply_tx.send(Ok(envelope.exchange));
4431 }
4432 }
4433 });
4434
4435 let resp = http_result.unwrap();
4436 assert_eq!(resp.status().as_u16(), 200);
4437 let ct = resp
4438 .headers()
4439 .get("content-type")
4440 .expect("Content-Type header should be present");
4441 assert_eq!(ct, "audio/mpeg");
4442 let body = resp.text().await.unwrap();
4443 assert_eq!(body, "audio data");
4444
4445 token.cancel();
4446 }
4447
4448 #[tokio::test]
4449 async fn test_user_content_type_overrides_inferred() {
4450 let (port, mut rx, token) = setup_consumer_on_free_port("/override-ct").await;
4451
4452 let client = reqwest::Client::new();
4453 let send_fut = client
4454 .get(format!("http://127.0.0.1:{port}/override-ct"))
4455 .send();
4456
4457 let (http_result, _) = tokio::join!(send_fut, async {
4458 if let Some(mut envelope) = rx.recv().await {
4459 envelope.exchange.input.body =
4460 camel_component_api::Body::Json(serde_json::json!({"ok": true}));
4461 envelope.exchange.input.set_header(
4462 "Content-Type",
4463 serde_json::Value::String("text/html".to_string()),
4464 );
4465 if let Some(reply_tx) = envelope.reply_tx {
4466 let _ = reply_tx.send(Ok(envelope.exchange));
4467 }
4468 }
4469 });
4470
4471 let resp = http_result.unwrap();
4472 assert_eq!(resp.status().as_u16(), 200);
4473 let ct = resp
4474 .headers()
4475 .get("content-type")
4476 .expect("Content-Type header should be present");
4477 assert_eq!(
4478 ct, "text/html",
4479 "User-set Content-Type should take precedence over inferred type"
4480 );
4481
4482 token.cancel();
4483 }
4484
4485 #[tokio::test]
4486 async fn test_user_content_type_with_bytes_body() {
4487 let (port, mut rx, token) = setup_consumer_on_free_port("/bytes-ct").await;
4488
4489 let client = reqwest::Client::new();
4490 let send_fut = client
4491 .get(format!("http://127.0.0.1:{port}/bytes-ct"))
4492 .send();
4493
4494 let (http_result, _) = tokio::join!(send_fut, async {
4495 if let Some(mut envelope) = rx.recv().await {
4496 envelope.exchange.input.body =
4497 camel_component_api::Body::Bytes(bytes::Bytes::from_static(b"{\"ok\":true}"));
4498 envelope.exchange.input.set_header(
4499 "Content-Type",
4500 serde_json::Value::String("application/json".to_string()),
4501 );
4502 if let Some(reply_tx) = envelope.reply_tx {
4503 let _ = reply_tx.send(Ok(envelope.exchange));
4504 }
4505 }
4506 });
4507
4508 let resp = http_result.unwrap();
4509 assert_eq!(resp.status().as_u16(), 200);
4510 let ct = resp
4511 .headers()
4512 .get("content-type")
4513 .expect("Content-Type header should be present for Bytes body with user header");
4514 assert_eq!(
4515 ct, "application/json",
4516 "User Content-Type should be sent for Bytes body"
4517 );
4518
4519 token.cancel();
4520 }
4521
4522 #[tokio::test]
4527 async fn monitor_task_silent_on_clean_exit() {
4528 let handle: tokio::task::JoinHandle<()> = tokio::spawn(async {});
4529 monitor_axum_task(handle, "127.0.0.1:0".to_string()).await;
4531 }
4532
4533 #[tokio::test]
4534 async fn monitor_task_handles_panicked_task() {
4535 let handle: tokio::task::JoinHandle<()> = tokio::spawn(async {
4536 panic!("simulated server crash");
4537 });
4538 monitor_axum_task(handle, "127.0.0.1:9999".to_string()).await;
4540 }
4541
4542 #[test]
4547 fn http_auth_basic_debug_redacts_password() {
4548 let auth = HttpAuth::Basic {
4549 username: "admin".to_string(),
4550 password: "hunter2".to_string(),
4551 };
4552 let debug = format!("{:?}", auth);
4553 assert!(
4554 !debug.contains("hunter2"),
4555 "password must be redacted: {debug}"
4556 );
4557 assert!(debug.contains("admin"), "username should appear: {debug}");
4558 }
4559
4560 #[test]
4561 fn http_auth_bearer_debug_redacts_token() {
4562 let auth = HttpAuth::Bearer {
4563 token: "eyJhbGciOiJIUzI1NiJ9.secret".to_string(),
4564 };
4565 let debug = format!("{:?}", auth);
4566 assert!(
4567 !debug.contains("eyJhbGci"),
4568 "token must be redacted: {debug}"
4569 );
4570 }
4571
4572 #[test]
4573 fn http_auth_none_debug_shows_variant() {
4574 let debug = format!("{:?}", HttpAuth::None);
4575 assert!(
4576 debug.contains("None"),
4577 "None variant should appear: {debug}"
4578 );
4579 }
4580
4581 #[test]
4582 fn http_endpoint_config_debug_redacts_auth_credentials() {
4583 let config = HttpEndpointConfig::from_uri(
4584 "http://localhost/api?authMethod=Basic&authUsername=admin&authPassword=secret123",
4585 )
4586 .unwrap();
4587 let debug = format!("{:?}", config);
4588 assert!(
4589 !debug.contains("secret123"),
4590 "password must be redacted in HttpEndpointConfig debug: {debug}"
4591 );
4592 }
4593
4594 use crate::registry::{HttpRouteRegistry, MountMode, StaticMount};
4599 use tower_http::services::ServeDir;
4600
4601 fn make_test_registry() -> HttpRouteRegistry {
4602 HttpRouteRegistry::new()
4603 }
4604
4605 fn make_test_state(registry: HttpRouteRegistry) -> AppState {
4606 AppState {
4607 registry,
4608 max_request_body: 2 * 1024 * 1024,
4609 max_response_body: 10 * 1024 * 1024,
4610 inflight: Arc::new(tokio::sync::Semaphore::new(1024)),
4611 }
4612 }
4613
4614 #[allow(clippy::await_holding_lock)]
4615 #[tokio::test]
4616 async fn test_static_file_serving_serves_file_contents() {
4617 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
4618 ServerRegistry::reset();
4619
4620 let temp_dir =
4622 std::env::temp_dir().join(format!("http_static_test_{}", std::process::id()));
4623 std::fs::create_dir_all(&temp_dir).unwrap();
4624 std::fs::write(temp_dir.join("hello.txt"), "Hello, static world!").unwrap();
4625 std::fs::write(temp_dir.join("style.css"), "body { color: red; }").unwrap();
4626
4627 let canonical_dir = std::fs::canonicalize(&temp_dir).unwrap();
4628
4629 let registry = make_test_registry();
4630 let serve_dir = ServeDir::new(&canonical_dir)
4631 .precompressed_gzip()
4632 .precompressed_br()
4633 .append_index_html_on_directories(true);
4634
4635 let mount = StaticMount {
4636 mount_path: "/".to_string(),
4637 mode: MountMode::Static,
4638 dir: canonical_dir.clone(),
4639 cache_control: "public, max-age=3600".to_string(),
4640 error_pages: std::collections::HashMap::new(),
4641 serve_dir,
4642 };
4643 registry.register_static_mount(mount).await.unwrap();
4644
4645 let state = make_test_state(registry);
4646
4647 let req = Request::builder()
4649 .uri("/hello.txt")
4650 .body(AxumBody::empty())
4651 .unwrap();
4652 let resp = static_dispatch::dispatch_static(&state, req, "/hello.txt").await;
4653 assert_eq!(resp.status(), StatusCode::OK);
4654 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4655 .await
4656 .unwrap();
4657 assert_eq!(&body[..], b"Hello, static world!");
4658
4659 let req = Request::builder()
4661 .uri("/style.css")
4662 .body(AxumBody::empty())
4663 .unwrap();
4664 let resp = static_dispatch::dispatch_static(&state, req, "/style.css").await;
4665 assert_eq!(resp.status(), StatusCode::OK);
4666 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4667 .await
4668 .unwrap();
4669 assert_eq!(&body[..], b"body { color: red; }");
4670
4671 let req = Request::builder()
4673 .uri("/missing.txt")
4674 .body(AxumBody::empty())
4675 .unwrap();
4676 let resp = static_dispatch::dispatch_static(&state, req, "/missing.txt").await;
4677 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4678
4679 std::fs::remove_dir_all(&temp_dir).ok();
4681 }
4682
4683 #[allow(clippy::await_holding_lock)]
4684 #[tokio::test]
4685 async fn test_spa_fallback_serves_index_for_unknown_paths() {
4686 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
4687 ServerRegistry::reset();
4688
4689 let temp_dir = std::env::temp_dir().join(format!("http_spa_test_{}", std::process::id()));
4690 std::fs::create_dir_all(&temp_dir).unwrap();
4691 std::fs::write(temp_dir.join("index.html"), "<h1>SPA App</h1>").unwrap();
4692 std::fs::write(temp_dir.join("app.js"), "console.log('app')").unwrap();
4693
4694 let canonical_dir = std::fs::canonicalize(&temp_dir).unwrap();
4695
4696 let registry = make_test_registry();
4697 let serve_dir = ServeDir::new(&canonical_dir)
4698 .precompressed_gzip()
4699 .precompressed_br()
4700 .append_index_html_on_directories(true);
4701
4702 let mount = StaticMount {
4703 mount_path: "/".to_string(),
4704 mode: MountMode::Spa,
4705 dir: canonical_dir.clone(),
4706 cache_control: "public, max-age=0".to_string(),
4707 error_pages: std::collections::HashMap::new(),
4708 serve_dir,
4709 };
4710 registry.register_static_mount(mount).await.unwrap();
4712
4713 let state = make_test_state(registry);
4714
4715 let req = Request::builder()
4717 .method("GET")
4718 .uri("/dashboard")
4719 .header("Accept", "text/html")
4720 .body(AxumBody::empty())
4721 .unwrap();
4722 let resp = static_dispatch::dispatch_static(&state, req, "/dashboard").await;
4723 assert_eq!(resp.status(), StatusCode::OK);
4724 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4725 .await
4726 .unwrap();
4727 assert_eq!(&body[..], b"<h1>SPA App</h1>");
4728
4729 let req = Request::builder()
4731 .method("GET")
4732 .uri("/app.js")
4733 .body(AxumBody::empty())
4734 .unwrap();
4735 let resp = static_dispatch::dispatch_static(&state, req, "/app.js").await;
4736 assert_eq!(resp.status(), StatusCode::OK);
4737 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4738 .await
4739 .unwrap();
4740 assert_eq!(&body[..], b"console.log('app')");
4741
4742 let req = Request::builder()
4744 .method("GET")
4745 .uri("/api/data")
4746 .header("Accept", "application/json")
4747 .body(AxumBody::empty())
4748 .unwrap();
4749 let resp = static_dispatch::dispatch_static(&state, req, "/api/data").await;
4750 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4751
4752 let req = Request::builder()
4754 .method("GET")
4755 .uri("/style.css")
4756 .header("Accept", "text/html")
4757 .body(AxumBody::empty())
4758 .unwrap();
4759 let resp = static_dispatch::dispatch_static(&state, req, "/style.css").await;
4760 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4761
4762 std::fs::remove_dir_all(&temp_dir).ok();
4764 }
4765
4766 #[allow(clippy::await_holding_lock)]
4767 #[tokio::test]
4768 async fn test_error_page_mapping_serves_custom_404() {
4769 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
4770 ServerRegistry::reset();
4771
4772 let temp_dir = std::env::temp_dir().join(format!("http_error_test_{}", std::process::id()));
4773 let errors_dir = temp_dir.join("errors");
4774 std::fs::create_dir_all(&errors_dir).unwrap();
4775 std::fs::write(temp_dir.join("index.html"), "<h1>Home</h1>").unwrap();
4776 std::fs::write(errors_dir.join("404.html"), "<h1>Custom 404</h1>").unwrap();
4777
4778 let canonical_dir = std::fs::canonicalize(&temp_dir).unwrap();
4779 let canonical_404 = std::fs::canonicalize(errors_dir.join("404.html")).unwrap();
4780
4781 let registry = make_test_registry();
4782 let serve_dir = ServeDir::new(&canonical_dir)
4783 .precompressed_gzip()
4784 .precompressed_br()
4785 .append_index_html_on_directories(true);
4786
4787 let mut error_pages = std::collections::HashMap::new();
4788 error_pages.insert(404, canonical_404);
4789
4790 let mount = StaticMount {
4791 mount_path: "/".to_string(),
4792 mode: MountMode::Static,
4793 dir: canonical_dir.clone(),
4794 cache_control: "public, max-age=0".to_string(),
4795 error_pages,
4796 serve_dir,
4797 };
4798 registry.register_static_mount(mount).await.unwrap();
4799
4800 let state = make_test_state(registry);
4801
4802 let req = Request::builder()
4804 .method("GET")
4805 .uri("/missing.html")
4806 .body(AxumBody::empty())
4807 .unwrap();
4808 let resp = static_dispatch::dispatch_static(&state, req, "/missing.html").await;
4809 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4810 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4811 .await
4812 .unwrap();
4813 assert_eq!(&body[..], b"<h1>Custom 404</h1>");
4814
4815 let req = Request::builder()
4817 .method("GET")
4818 .uri("/index.html")
4819 .body(AxumBody::empty())
4820 .unwrap();
4821 let resp = static_dispatch::dispatch_static(&state, req, "/index.html").await;
4822 assert_eq!(resp.status(), StatusCode::OK);
4823 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4824 .await
4825 .unwrap();
4826 assert_eq!(&body[..], b"<h1>Home</h1>");
4827
4828 std::fs::remove_dir_all(&temp_dir).ok();
4830 }
4831}