1pub mod auth;
2pub mod bundle;
3pub mod config;
4pub mod health;
5use crate::config::parse_ok_status_code_range;
6pub use bundle::HttpBundle;
7pub use config::HttpConfig;
8pub use health::HttpHealthCheck;
9
10use std::collections::HashMap;
11use std::future::Future;
12use std::net::IpAddr;
13use std::pin::Pin;
14use std::sync::{Arc, Mutex, OnceLock};
15use std::task::{Context, Poll};
16use std::time::Duration;
17
18use tokio::sync::{OnceCell, RwLock};
19use tower::Layer;
20use tower::Service;
21use tracing::debug;
22
23use axum::body::BodyDataStream;
24use camel_auth::bearer_token_layer::BearerTokenLayer;
25use camel_auth::oauth2::TokenProvider;
26use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, StreamBody, StreamMetadata};
27use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
28use camel_component_api::{UriComponents, UriConfig, parse_uri};
29use futures::TryStreamExt;
30use futures::stream::BoxStream;
31
32#[derive(Debug, Clone)]
93pub struct HttpEndpointConfig {
94 pub base_url: String,
95 pub http_method: Option<String>,
96 pub throw_exception_on_failure: bool,
97 pub ok_status_code_range: (u16, u16),
98 pub response_timeout: Option<Duration>,
99 pub query_params: HashMap<String, String>,
100 pub allow_private_ips: bool,
101 pub blocked_hosts: Vec<String>,
102 pub max_body_size: usize,
103 pub read_timeout_ms: u64,
104 pub max_response_bytes: usize,
105 pub auth: HttpAuth,
106 pub token_provider: Option<Arc<dyn TokenProvider>>,
107 pub user_agent: Option<String>,
108 pub cookie_handling: CookieHandling,
109 pub bridge_endpoint: bool,
110 pub connection_close: bool,
111 pub skip_request_headers: Vec<String>,
112 pub skip_response_headers: Vec<String>,
113}
114
115#[derive(Clone, PartialEq)]
116pub enum HttpAuth {
117 None,
118 Basic { username: String, password: String },
119 Bearer { token: String },
120}
121
122impl std::fmt::Debug for HttpAuth {
123 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124 match self {
125 HttpAuth::None => f.write_str("None"),
126 HttpAuth::Basic { username, .. } => f
127 .debug_struct("Basic")
128 .field("username", username)
129 .field("password", &"***")
130 .finish(),
131 HttpAuth::Bearer { .. } => f.debug_struct("Bearer").field("token", &"***").finish(),
132 }
133 }
134}
135
136#[derive(Debug, Clone, Copy, PartialEq, Eq)]
137pub enum CookieHandling {
138 Disabled,
139 InMemory,
140}
141
142const HTTP_CAMEL_OPTIONS: &[&str] = &[
144 "httpMethod",
145 "throwExceptionOnFailure",
146 "okStatusCodeRange",
147 "followRedirects",
148 "connectTimeout",
149 "responseTimeout",
150 "allowPrivateIps",
151 "blockedHosts",
152 "maxBodySize",
153 "readTimeout",
154 "maxResponseBytes",
155 "authMethod",
156 "authUsername",
157 "authPassword",
158 "authBearerToken",
159 "userAgent",
160 "cookieHandling",
161 "bridgeEndpoint",
162 "connectionClose",
163 "skipRequestHeaders",
164 "skipResponseHeaders",
165];
166
167impl UriConfig for HttpEndpointConfig {
168 fn scheme() -> &'static str {
170 "http"
171 }
172
173 fn from_uri(uri: &str) -> Result<Self, CamelError> {
174 let parts = parse_uri(uri)?;
175 Self::from_components(parts)
176 }
177
178 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
179 if parts.scheme != "http" && parts.scheme != "https" {
181 return Err(CamelError::InvalidUri(format!(
182 "expected scheme 'http' or 'https', got '{}'",
183 parts.scheme
184 )));
185 }
186
187 let base_url = format!("{}:{}", parts.scheme, parts.path);
190
191 let http_method = parts.params.get("httpMethod").cloned();
192
193 let throw_exception_on_failure = match parts.params.get("throwExceptionOnFailure") {
194 Some(v) => parse_bool_param_http(v).map_err(|e| {
195 CamelError::InvalidUri(format!("invalid value for throwExceptionOnFailure: {e}"))
196 })?,
197 None => true,
198 };
199
200 let ok_status_code_range = match parts.params.get("okStatusCodeRange") {
202 Some(v) => parse_ok_status_code_range(v)?,
203 None => (200, 299),
204 };
205
206 let response_timeout = match parts.params.get("responseTimeout") {
207 Some(v) => Some(v.parse::<u64>().map(Duration::from_millis).map_err(|e| {
208 CamelError::InvalidUri(format!("invalid value for responseTimeout: {e}"))
209 })?),
210 None => None,
211 };
212
213 let allow_private_ips = match parts.params.get("allowPrivateIps") {
215 Some(v) => parse_bool_param_http(v).map_err(|e| {
216 CamelError::InvalidUri(format!("invalid value for allowPrivateIps: {e}"))
217 })?,
218 None => false, };
220
221 let blocked_hosts = parts
223 .params
224 .get("blockedHosts")
225 .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
226 .unwrap_or_default();
227
228 let max_body_size = match parts.params.get("maxBodySize") {
229 Some(v) => v.parse::<usize>().map_err(|e| {
230 CamelError::InvalidUri(format!("invalid value for maxBodySize: {e}"))
231 })?,
232 None => 10 * 1024 * 1024, };
234
235 let read_timeout_ms = match parts.params.get("readTimeout") {
236 Some(v) => v.parse::<u64>().map_err(|e| {
237 CamelError::InvalidUri(format!("invalid value for readTimeout: {e}"))
238 })?,
239 None => 30_000, };
241
242 let max_response_bytes = match parts.params.get("maxResponseBytes") {
243 Some(v) => v.parse::<usize>().map_err(|e| {
244 CamelError::InvalidUri(format!("invalid value for maxResponseBytes: {e}"))
245 })?,
246 None => 10 * 1024 * 1024, };
248
249 let auth = parse_auth_from_params(&parts.params)?;
250
251 let user_agent = parts.params.get("userAgent").cloned();
252
253 let cookie_handling = match parts.params.get("cookieHandling") {
254 Some(v) if v.eq_ignore_ascii_case("inmemory") => CookieHandling::InMemory,
255 Some(v) if v.eq_ignore_ascii_case("disabled") => CookieHandling::Disabled,
256 Some(v) => {
257 return Err(CamelError::InvalidUri(format!(
258 "invalid value for cookieHandling: {v} (expected Disabled or InMemory)"
259 )));
260 }
261 None => CookieHandling::Disabled,
262 };
263
264 let bridge_endpoint = match parts.params.get("bridgeEndpoint") {
265 Some(v) => parse_bool_param_http(v).map_err(|e| {
266 CamelError::InvalidUri(format!("invalid value for bridgeEndpoint: {e}"))
267 })?,
268 None => false,
269 };
270
271 let connection_close = match parts.params.get("connectionClose") {
272 Some(v) => parse_bool_param_http(v).map_err(|e| {
273 CamelError::InvalidUri(format!("invalid value for connectionClose: {e}"))
274 })?,
275 None => false,
276 };
277
278 let skip_request_headers = parts
279 .params
280 .get("skipRequestHeaders")
281 .map(|v| {
282 v.split(',')
283 .map(str::trim)
284 .filter(|s| !s.is_empty())
285 .map(|s| s.to_ascii_lowercase())
286 .collect::<Vec<_>>()
287 })
288 .unwrap_or_default();
289
290 let skip_response_headers = parts
291 .params
292 .get("skipResponseHeaders")
293 .map(|v| {
294 v.split(',')
295 .map(str::trim)
296 .filter(|s| !s.is_empty())
297 .map(|s| s.to_ascii_lowercase())
298 .collect::<Vec<_>>()
299 })
300 .unwrap_or_default();
301
302 let query_params: HashMap<String, String> = parts
304 .params
305 .into_iter()
306 .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
307 .collect();
308
309 Ok(Self {
310 base_url,
311 http_method,
312 throw_exception_on_failure,
313 ok_status_code_range,
314 response_timeout,
315 query_params,
316 allow_private_ips,
317 blocked_hosts,
318 max_body_size,
319 read_timeout_ms,
320 max_response_bytes,
321 auth,
322 token_provider: None,
323 user_agent,
324 cookie_handling,
325 bridge_endpoint,
326 connection_close,
327 skip_request_headers,
328 skip_response_headers,
329 })
330 }
331}
332
333fn parse_auth_from_params(params: &HashMap<String, String>) -> Result<HttpAuth, CamelError> {
334 let Some(method) = params.get("authMethod") else {
335 return Ok(HttpAuth::None);
336 };
337
338 if method.eq_ignore_ascii_case("none") {
339 return Ok(HttpAuth::None);
340 }
341
342 if method.eq_ignore_ascii_case("basic") {
343 let username = params.get("authUsername").cloned().ok_or_else(|| {
344 CamelError::InvalidUri("authUsername is required for authMethod=Basic".to_string())
345 })?;
346 let password = params.get("authPassword").cloned().ok_or_else(|| {
347 CamelError::InvalidUri("authPassword is required for authMethod=Basic".to_string())
348 })?;
349 return Ok(HttpAuth::Basic { username, password });
350 }
351
352 if method.eq_ignore_ascii_case("bearer") {
353 let token = params.get("authBearerToken").cloned().ok_or_else(|| {
354 CamelError::InvalidUri("authBearerToken is required for authMethod=Bearer".to_string())
355 })?;
356 return Ok(HttpAuth::Bearer { token });
357 }
358
359 Err(CamelError::InvalidUri(format!(
360 "invalid value for authMethod: {method} (expected None, Basic, or Bearer)"
361 )))
362}
363
364fn parse_bool_param_http(value: &str) -> Result<bool, CamelError> {
365 match value.to_ascii_lowercase().as_str() {
366 "true" | "1" | "yes" => Ok(true),
367 "false" | "0" | "no" => Ok(false),
368 _ => Err(CamelError::InvalidUri(format!(
369 "invalid boolean value: '{value}'"
370 ))),
371 }
372}
373
374impl HttpEndpointConfig {
375 pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
376 let parts = parse_uri(uri)?;
377 let mut endpoint = Self::from_components(parts.clone())?;
378 if endpoint.response_timeout.is_none() {
379 endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
380 }
381 if !parts.params.contains_key("allowPrivateIps") {
382 endpoint.allow_private_ips = config.allow_private_ips;
383 }
384 if !parts.params.contains_key("blockedHosts") {
385 endpoint.blocked_hosts = config.blocked_hosts.clone();
386 }
387 if !parts.params.contains_key("maxBodySize") {
388 endpoint.max_body_size = config.max_body_size;
389 }
390 if !parts.params.contains_key("readTimeout") {
391 endpoint.read_timeout_ms = config.read_timeout_ms;
392 }
393 if !parts.params.contains_key("maxResponseBytes") {
394 endpoint.max_response_bytes = config.max_response_bytes;
395 }
396 if !parts.params.contains_key("okStatusCodeRange")
397 && let Some(range) = &config.ok_status_code_range
398 {
399 endpoint.ok_status_code_range = parse_ok_status_code_range(range)?;
400 }
401
402 Ok(endpoint)
403 }
404}
405
406#[derive(Debug, Clone)]
412pub struct HttpServerConfig {
413 pub host: String,
415 pub port: u16,
417 pub path: String,
419 pub max_request_body: usize,
421 pub max_response_body: usize,
423 pub max_inflight_requests: usize,
425}
426
427impl UriConfig for HttpServerConfig {
428 fn scheme() -> &'static str {
430 "http"
431 }
432
433 fn from_uri(uri: &str) -> Result<Self, CamelError> {
434 let parts = parse_uri(uri)?;
435 Self::from_components(parts)
436 }
437
438 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
439 if parts.scheme != "http" && parts.scheme != "https" {
441 return Err(CamelError::InvalidUri(format!(
442 "expected scheme 'http' or 'https', got '{}'",
443 parts.scheme
444 )));
445 }
446
447 let authority_and_path = parts.path.trim_start_matches('/');
450
451 let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
453 (&authority_and_path[..idx], &authority_and_path[idx..])
454 } else {
455 (authority_and_path, "/")
456 };
457
458 let path = if path_suffix.is_empty() {
459 "/"
460 } else {
461 path_suffix
462 }
463 .to_string();
464
465 let (host, port) = if let Some(colon) = authority.rfind(':') {
467 let port_str = &authority[colon + 1..];
468 match port_str.parse::<u16>() {
469 Ok(p) => (authority[..colon].to_string(), p),
470 Err(_) => {
471 return Err(CamelError::InvalidUri(format!(
472 "invalid port '{}' in authority",
473 port_str
474 )));
475 }
476 }
477 } else {
478 let default_port = if parts.scheme == "https" { 443 } else { 80 };
480 (authority.to_string(), default_port)
481 };
482
483 let max_request_body = parts
484 .params
485 .get("maxRequestBody")
486 .and_then(|v| v.parse::<usize>().ok())
487 .unwrap_or(2 * 1024 * 1024); let max_response_body = parts
490 .params
491 .get("maxResponseBody")
492 .and_then(|v| v.parse::<usize>().ok())
493 .unwrap_or(10 * 1024 * 1024); let max_inflight_requests = parts
496 .params
497 .get("maxInflightRequests")
498 .and_then(|v| v.parse::<usize>().ok())
499 .unwrap_or(1024);
500
501 Ok(Self {
502 host,
503 port,
504 path,
505 max_request_body,
506 max_response_body,
507 max_inflight_requests,
508 })
509 }
510}
511
512impl HttpServerConfig {
513 pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
514 let parts = parse_uri(uri)?;
515 let mut server = Self::from_components(parts.clone())?;
516 if !parts.params.contains_key("maxRequestBody") {
517 server.max_request_body = config.max_request_body;
518 }
519 if !parts.params.contains_key("maxResponseBody") {
520 server.max_response_body = config.max_body_size;
522 }
523 Ok(server)
524 }
525}
526
527pub(crate) enum HttpReplyBody {
533 Bytes(bytes::Bytes),
534 Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
535}
536
537pub(crate) struct RequestEnvelope {
540 pub(crate) method: String,
541 pub(crate) path: String,
542 pub(crate) query: String,
543 pub(crate) headers: http::HeaderMap,
544 pub(crate) body: StreamBody,
545 pub(crate) reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
546}
547
548pub(crate) struct HttpReply {
550 pub(crate) status: u16,
551 pub(crate) headers: Vec<(String, String)>,
552 pub(crate) body: HttpReplyBody,
553}
554
555pub(crate) type DispatchTable =
561 Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
562
563type ServerKey = (String, u16);
564
565struct ServerHandle {
567 dispatch: DispatchTable,
568 max_request_body: usize,
569 max_response_body: usize,
570 max_inflight_requests: usize,
571}
572
573pub struct ServerRegistry {
575 inner: Mutex<HashMap<ServerKey, Arc<OnceCell<ServerHandle>>>>,
576}
577
578impl ServerRegistry {
579 pub fn global() -> &'static Self {
581 static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
582 INSTANCE.get_or_init(|| ServerRegistry {
583 inner: Mutex::new(HashMap::new()),
584 })
585 }
586
587 pub(crate) async fn get_or_spawn(
590 &'static self,
591 host: &str,
592 port: u16,
593 max_request_body: usize,
594 max_response_body: usize,
595 max_inflight_requests: usize,
596 ) -> Result<DispatchTable, CamelError> {
597 let host_owned = host.to_string();
598
599 let cell = {
600 let mut guard = self.inner.lock().map_err(|_| {
601 CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
602 })?;
603 let key = (host.to_string(), port);
604 guard
605 .entry(key)
606 .or_insert_with(|| Arc::new(OnceCell::new()))
607 .clone()
608 };
609
610 if let Some(existing) = cell.get()
611 && existing.max_request_body != max_request_body
612 {
613 return Err(CamelError::EndpointCreationFailed(format!(
614 "incompatible maxRequestBody for shared server (host={host}, port={port}): {} vs {}",
615 existing.max_request_body, max_request_body
616 )));
617 }
618
619 if let Some(existing) = cell.get()
620 && existing.max_response_body != max_response_body
621 {
622 return Err(CamelError::EndpointCreationFailed(format!(
623 "incompatible maxResponseBody for shared server (host={host}, port={port}): {} vs {}",
624 existing.max_response_body, max_response_body
625 )));
626 }
627
628 if let Some(existing) = cell.get()
629 && existing.max_inflight_requests != max_inflight_requests
630 {
631 return Err(CamelError::EndpointCreationFailed(format!(
632 "incompatible maxInflightRequests for shared server (host={host}, port={port}): {} vs {}",
633 existing.max_inflight_requests, max_inflight_requests
634 )));
635 }
636
637 let handle = cell
638 .get_or_try_init(|| async {
639 let addr = format!("{host_owned}:{port}");
640 let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
641 CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
642 })?;
643 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
644 let inflight = Arc::new(tokio::sync::Semaphore::new(max_inflight_requests));
645 let task = tokio::spawn(run_axum_server(
646 listener,
647 Arc::clone(&dispatch),
648 max_request_body,
649 max_response_body,
650 Arc::clone(&inflight),
651 ));
652 let addr_for_monitor = format!("{host_owned}:{port}");
653 tokio::spawn(monitor_axum_task(task, addr_for_monitor));
654 Ok::<ServerHandle, CamelError>(ServerHandle {
655 dispatch,
656 max_request_body,
657 max_response_body,
658 max_inflight_requests,
659 })
660 })
661 .await?;
662
663 Ok(Arc::clone(&handle.dispatch))
664 }
665
666 #[cfg(test)]
673 pub fn reset() {
674 let instance = Self::global();
675 let mut guard = instance
676 .inner
677 .lock()
678 .expect("ServerRegistry lock poisoned during test reset");
679 guard.clear();
680 }
681}
682
683use axum::{
688 Router,
689 body::Body as AxumBody,
690 extract::{Request, State},
691 http::{Response, StatusCode},
692 response::IntoResponse,
693};
694
695#[derive(Clone)]
696struct AppState {
697 dispatch: DispatchTable,
698 max_request_body: usize,
699 max_response_body: usize,
700 inflight: Arc<tokio::sync::Semaphore>,
701}
702
703async fn run_axum_server(
704 listener: tokio::net::TcpListener,
705 dispatch: DispatchTable,
706 max_request_body: usize,
707 max_response_body: usize,
708 inflight: Arc<tokio::sync::Semaphore>,
709) {
710 let state = AppState {
711 dispatch,
712 max_request_body,
713 max_response_body,
714 inflight,
715 };
716 let app = Router::new().fallback(dispatch_handler).with_state(state);
717
718 axum::serve(listener, app).await.unwrap_or_else(|e| {
719 tracing::error!(error = %e, "Axum server error");
720 });
721}
722
723async fn monitor_axum_task(handle: tokio::task::JoinHandle<()>, addr: String) {
731 match handle.await {
732 Ok(()) => {
733 }
735 Err(join_err) => {
736 tracing::error!(
737 addr = %addr,
738 error = %join_err,
739 "Axum server task exited unexpectedly — all routes on this port are now dead"
740 );
741 }
742 }
743}
744
745async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
746 let method = req.method().to_string();
747 let path = req.uri().path().to_string();
748 let query = req.uri().query().unwrap_or("").to_string();
749 let headers = req.headers().clone();
750
751 let content_length: Option<u64> = headers
753 .get(http::header::CONTENT_LENGTH)
754 .and_then(|v| v.to_str().ok())
755 .and_then(|s| s.parse().ok());
756
757 if let Some(len) = content_length
758 && len > state.max_request_body as u64
759 {
760 return Response::builder()
761 .status(StatusCode::PAYLOAD_TOO_LARGE)
762 .body(AxumBody::from("Request body exceeds configured limit"))
763 .expect("infallible"); }
765
766 let _permit = match Arc::clone(&state.inflight).try_acquire_owned() {
767 Ok(permit) => permit,
768 Err(_) => {
769 return Response::builder()
770 .status(StatusCode::SERVICE_UNAVAILABLE)
771 .body(AxumBody::from("Service Unavailable"))
772 .expect("infallible"); }
774 };
775
776 let content_type = headers
778 .get(http::header::CONTENT_TYPE)
779 .and_then(|v| v.to_str().ok())
780 .map(|s| s.to_string());
781
782 let data_stream: BodyDataStream = req.into_body().into_data_stream();
783 let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
784 let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
785
786 let stream_body = StreamBody {
787 stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
788 metadata: StreamMetadata {
789 size_hint: content_length,
790 content_type,
791 origin: None,
792 },
793 };
794
795 let sender = {
797 let table = state.dispatch.read().await;
798 table.get(&path).cloned()
799 };
800 let Some(sender) = sender else {
801 return Response::builder()
802 .status(StatusCode::NOT_FOUND)
803 .body(AxumBody::from("No consumer registered for this path"))
804 .expect("infallible"); };
806
807 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
808 let envelope = RequestEnvelope {
809 method,
810 path,
811 query,
812 headers,
813 body: stream_body,
814 reply_tx,
815 };
816
817 if sender.send(envelope).await.is_err() {
818 return Response::builder()
819 .status(StatusCode::SERVICE_UNAVAILABLE)
820 .body(AxumBody::from("Consumer unavailable"))
821 .expect("infallible"); }
823
824 match reply_rx.await {
825 Ok(reply) => {
826 let reply = match reply.body {
827 HttpReplyBody::Bytes(b)
828 if exceeds_max_response_body(b.len(), state.max_response_body) =>
829 {
830 HttpReply {
831 status: 500,
832 headers: vec![],
833 body: HttpReplyBody::Bytes(bytes::Bytes::from(
834 "Response body exceeds configured limit",
835 )),
836 }
837 }
838 _ => reply,
839 };
840
841 let status =
842 StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
843 let mut builder = Response::builder().status(status);
844 for (k, v) in &reply.headers {
845 builder = builder.header(k.as_str(), v.as_str());
846 }
847 match reply.body {
848 HttpReplyBody::Bytes(b) => builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
849 Response::builder()
850 .status(StatusCode::INTERNAL_SERVER_ERROR)
851 .body(AxumBody::from("Invalid response headers from consumer"))
852 .expect("infallible") }),
854 HttpReplyBody::Stream(stream) => builder
855 .body(AxumBody::from_stream(stream))
856 .unwrap_or_else(|_| {
857 Response::builder()
858 .status(StatusCode::INTERNAL_SERVER_ERROR)
859 .body(AxumBody::from("Invalid response headers from consumer"))
860 .expect("infallible") }),
862 }
863 }
864 Err(_) => Response::builder()
865 .status(StatusCode::INTERNAL_SERVER_ERROR)
866 .body(AxumBody::from("Pipeline error"))
867 .expect("Response::builder() with a known-valid status code and body is infallible"), }
869}
870
871fn exceeds_max_response_body(len: usize, max: usize) -> bool {
872 len > max
873}
874
875fn title_case_header(name: &str) -> String {
876 name.split('-')
877 .map(|part| {
878 let mut chars = part.chars();
879 match chars.next() {
880 None => String::new(),
881 Some(first) => first.to_uppercase().chain(chars.as_str().chars()).collect(),
882 }
883 })
884 .collect::<Vec<_>>()
885 .join("-")
886}
887
888pub struct HttpConsumer {
893 config: HttpServerConfig,
894}
895
896impl HttpConsumer {
897 pub fn new(config: HttpServerConfig) -> Self {
898 Self { config }
899 }
900}
901
902#[async_trait::async_trait]
903impl Consumer for HttpConsumer {
904 async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
905 use camel_component_api::{Body, Exchange, Message};
906
907 let dispatch = ServerRegistry::global()
908 .get_or_spawn(
909 &self.config.host,
910 self.config.port,
911 self.config.max_request_body,
912 self.config.max_response_body,
913 self.config.max_inflight_requests,
914 )
915 .await?;
916
917 let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
919 {
920 let mut table = dispatch.write().await;
921 table.insert(self.config.path.clone(), env_tx);
922 }
923
924 let path = self.config.path.clone();
925 let cancel_token = ctx.cancel_token();
926 loop {
927 tokio::select! {
928 _ = ctx.cancelled() => {
929 break;
930 }
931 envelope = env_rx.recv() => {
932 let Some(envelope) = envelope else { break; };
933
934 let mut msg = Message::default();
936
937 msg.set_header("CamelHttpMethod",
939 serde_json::Value::String(envelope.method.clone()));
940 msg.set_header("CamelHttpPath",
941 serde_json::Value::String(envelope.path.clone()));
942 msg.set_header("CamelHttpQuery",
943 serde_json::Value::String(envelope.query.clone()));
944
945 for (k, v) in &envelope.headers {
947 if let Ok(val_str) = v.to_str() {
948 msg.set_header(
949 title_case_header(k.as_str()),
950 serde_json::Value::String(val_str.to_string()),
951 );
952 }
953 }
954
955 msg.body = Body::Stream(envelope.body);
958
959 #[allow(unused_mut)]
960 let mut exchange = Exchange::new(msg);
961
962 #[cfg(feature = "otel")]
964 {
965 let headers: HashMap<String, String> = envelope
966 .headers
967 .iter()
968 .filter_map(|(k, v)| {
969 Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
970 })
971 .collect();
972 camel_otel::extract_into_exchange(&mut exchange, &headers);
973 }
974
975 let reply_tx = envelope.reply_tx;
976 let sender = ctx.sender().clone();
977 let path_clone = path.clone();
978 let cancel = cancel_token.clone();
979
980 tokio::spawn(async move {
1000 if cancel.is_cancelled() {
1008 let _ = reply_tx.send(HttpReply {
1009 status: 503,
1010 headers: vec![],
1011 body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
1012 });
1013 return;
1014 }
1015
1016 let (tx, rx) = tokio::sync::oneshot::channel();
1018 let envelope = camel_component_api::consumer::ExchangeEnvelope {
1019 exchange,
1020 reply_tx: Some(tx),
1021 };
1022
1023 let result = match sender.send(envelope).await {
1024 Ok(()) => rx.await.map_err(|_| camel_component_api::CamelError::ChannelClosed),
1025 Err(_) => Err(camel_component_api::CamelError::ChannelClosed),
1026 }
1027 .and_then(|r| r);
1028
1029 let reply = match result {
1030 Ok(out) => {
1031 let status = out
1032 .input
1033 .header("CamelHttpResponseCode")
1034 .and_then(|v| {
1035 let raw = v.as_u64()
1036 .or_else(|| v.as_str().and_then(|s| s.parse().ok()))?;
1037 let code = raw as u16;
1038 (100..1000).contains(&code).then_some(code)
1039 })
1040 .unwrap_or(200);
1041
1042 let user_content_type = out
1043 .input
1044 .header("Content-Type")
1045 .and_then(|v| v.as_str().map(|s| s.to_string()));
1046
1047 let (reply_body, inferred_content_type): (HttpReplyBody, Option<String>) = match out.input.body {
1048 Body::Empty => (HttpReplyBody::Bytes(bytes::Bytes::new()), None),
1049 Body::Bytes(b) => (HttpReplyBody::Bytes(b), None),
1050 Body::Text(s) => (HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())), Some("text/plain; charset=utf-8".to_string())),
1051 Body::Xml(s) => (HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())), Some("application/xml".to_string())),
1052 Body::Json(v) => (HttpReplyBody::Bytes(bytes::Bytes::from(
1053 v.to_string().into_bytes(),
1054 )), Some("application/json".to_string())),
1055 Body::Stream(s) => {
1056 let ct = s.metadata.content_type.clone();
1057 match s.stream.lock().await.take() {
1058 Some(stream) => (
1059 HttpReplyBody::Stream(stream),
1060 ct,
1061 ),
1062 None => {
1063 tracing::error!(
1064 "Body::Stream already consumed before HTTP reply — returning 500"
1065 );
1066 let error_reply = HttpReply {
1067 status: 500,
1068 headers: vec![],
1069 body: HttpReplyBody::Bytes(bytes::Bytes::new()),
1070 };
1071 if reply_tx.send(error_reply).is_err() {
1072 debug!("reply_tx dropped before error reply could be sent");
1073 }
1074 return;
1075 }
1076 }
1077 }
1078 };
1079
1080 let mut resp_headers: Vec<(String, String)> = out
1081 .input
1082 .headers
1083 .iter()
1084 .filter(|(k, _)| !k.starts_with("Camel"))
1085 .filter(|(k, _)| {
1086 !matches!(
1087 k.to_lowercase().as_str(),
1088 "content-length"
1089 | "content-type"
1090 | "transfer-encoding"
1091 | "connection"
1092 | "cache-control"
1093 | "date"
1094 | "pragma"
1095 | "trailer"
1096 | "upgrade"
1097 | "via"
1098 | "warning"
1099 | "host"
1100 | "user-agent"
1101 | "accept"
1102 | "accept-encoding"
1103 | "accept-language"
1104 | "accept-charset"
1105 | "authorization"
1106 | "proxy-authorization"
1107 | "cookie"
1108 | "expect"
1109 | "from"
1110 | "if-match"
1111 | "if-modified-since"
1112 | "if-none-match"
1113 | "if-range"
1114 | "if-unmodified-since"
1115 | "max-forwards"
1116 | "proxy-connection"
1117 | "range"
1118 | "referer"
1119 | "te"
1120 )
1121 })
1122 .filter_map(|(k, v)| {
1123 v.as_str().map(|s| (k.clone(), s.to_string()))
1124 })
1125 .collect();
1126
1127 let content_type = user_content_type
1128 .or(inferred_content_type);
1129 if let Some(ct) = content_type {
1130 resp_headers.push(("Content-Type".to_string(), ct));
1131 }
1132
1133 HttpReply {
1134 status,
1135 headers: resp_headers,
1136 body: reply_body,
1137 }
1138 }
1139 Err(CamelError::Stopped) => {
1140 tracing::debug!(path = %path_clone, "Route stopped — returning 204 No Content");
1141 HttpReply {
1142 status: 204,
1143 headers: vec![],
1144 body: HttpReplyBody::Bytes(bytes::Bytes::new()),
1145 }
1146 }
1147 Err(CamelError::Unauthenticated(msg)) => {
1148 tracing::warn!(error = %msg, path = %path_clone, "Authentication failed");
1149 HttpReply {
1150 status: 401,
1151 headers: vec![("WWW-Authenticate".to_string(), "Bearer".to_string())],
1152 body: HttpReplyBody::Bytes(bytes::Bytes::from("Unauthorized")),
1153 }
1154 }
1155 Err(CamelError::Unauthorized(msg)) => {
1156 tracing::warn!(error = %msg, path = %path_clone, "Authorization failed");
1157 HttpReply {
1158 status: 403,
1159 headers: vec![],
1160 body: HttpReplyBody::Bytes(bytes::Bytes::from("Forbidden")),
1161 }
1162 }
1163 Err(e) => {
1164 tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
1165 HttpReply {
1166 status: 500,
1167 headers: vec![],
1168 body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
1169 }
1170 }
1171 };
1172
1173 let _ = reply_tx.send(reply);
1175 });
1176 }
1177 }
1178 }
1179
1180 {
1182 let mut table = dispatch.write().await;
1183 table.remove(&path);
1184 }
1185
1186 Ok(())
1187 }
1188
1189 async fn stop(&mut self) -> Result<(), CamelError> {
1190 Ok(())
1191 }
1192
1193 fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1194 camel_component_api::ConcurrencyModel::Concurrent { max: None }
1195 }
1196}
1197
1198pub struct HttpComponent {
1203 config: HttpConfig,
1204}
1205
1206fn build_client(config: &HttpConfig, cookie_handling: CookieHandling) -> reqwest::Client {
1207 let mut builder = reqwest::Client::builder()
1208 .connect_timeout(Duration::from_millis(config.connect_timeout_ms))
1209 .pool_max_idle_per_host(config.pool_max_idle_per_host)
1210 .pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
1211
1212 if !config.follow_redirects {
1213 builder = builder.redirect(reqwest::redirect::Policy::none());
1214 } else if let Some(max_redirects) = config.max_redirects {
1215 builder = builder.redirect(reqwest::redirect::Policy::limited(max_redirects));
1216 }
1217
1218 if let Some(proxy_url) = &config.proxy_url
1219 && let Ok(proxy) = reqwest::Proxy::all(proxy_url)
1220 {
1221 builder = builder.proxy(proxy);
1222 }
1223
1224 if matches!(cookie_handling, CookieHandling::InMemory) {
1225 }
1227
1228 if let Some(tls) = &config.tls
1229 && tls.enabled
1230 {
1231 if tls.insecure || !tls.verify_peer {
1232 builder = builder.danger_accept_invalid_certs(true);
1233 }
1234
1235 if let Some(ca_path) = &tls.ca_cert_path
1236 && let Ok(ca_bytes) = std::fs::read(ca_path)
1237 {
1238 let cert = reqwest::Certificate::from_pem(&ca_bytes)
1239 .or_else(|_| reqwest::Certificate::from_der(&ca_bytes));
1240 if let Ok(ca_cert) = cert {
1241 builder = builder.add_root_certificate(ca_cert);
1242 }
1243 }
1244
1245 if let (Some(cert_path), Some(key_path)) = (&tls.client_cert_path, &tls.client_key_path)
1246 && let (Ok(cert_bytes), Ok(key_bytes)) =
1247 (std::fs::read(cert_path), std::fs::read(key_path))
1248 {
1249 let mut identity_pem = cert_bytes;
1250 identity_pem.extend_from_slice(&key_bytes);
1251 if let Ok(identity) = reqwest::Identity::from_pem(&identity_pem) {
1252 builder = builder.identity(identity);
1253 }
1254 }
1255 }
1256
1257 builder
1258 .build()
1259 .expect("reqwest::Client::build() with valid config should not fail") }
1261
1262impl HttpComponent {
1263 pub fn new() -> Self {
1264 let config = HttpConfig::default();
1265 Self { config }
1266 }
1267
1268 pub fn with_config(config: HttpConfig) -> Self {
1269 Self { config }
1270 }
1271
1272 pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1273 match config {
1274 Some(cfg) => Self::with_config(cfg),
1275 None => Self::new(),
1276 }
1277 }
1278}
1279
1280impl Default for HttpComponent {
1281 fn default() -> Self {
1282 Self::new()
1283 }
1284}
1285
1286impl Component for HttpComponent {
1287 fn scheme(&self) -> &str {
1288 "http"
1289 }
1290
1291 fn create_endpoint(
1292 &self,
1293 uri: &str,
1294 ctx: &dyn camel_component_api::ComponentContext,
1295 ) -> Result<Box<dyn Endpoint>, CamelError> {
1296 self.config.validate()?;
1297 let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1298 let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1299 let client = build_client(&self.config, config.cookie_handling);
1300 ctx.register_current_route_health_check(Arc::new(HttpHealthCheck::new(
1301 server_config.host.clone(),
1302 server_config.port,
1303 )));
1304 Ok(Box::new(HttpEndpoint {
1305 uri: uri.to_string(),
1306 config,
1307 server_config,
1308 client,
1309 }))
1310 }
1311}
1312
1313pub struct HttpsComponent {
1314 config: HttpConfig,
1315}
1316
1317impl HttpsComponent {
1318 pub fn new() -> Self {
1319 let config = HttpConfig::default();
1320 Self { config }
1321 }
1322
1323 pub fn with_config(config: HttpConfig) -> Self {
1324 Self { config }
1325 }
1326
1327 pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1328 match config {
1329 Some(cfg) => Self::with_config(cfg),
1330 None => Self::new(),
1331 }
1332 }
1333}
1334
1335impl Default for HttpsComponent {
1336 fn default() -> Self {
1337 Self::new()
1338 }
1339}
1340
1341impl Component for HttpsComponent {
1342 fn scheme(&self) -> &str {
1343 "https"
1344 }
1345
1346 fn create_endpoint(
1347 &self,
1348 uri: &str,
1349 ctx: &dyn camel_component_api::ComponentContext,
1350 ) -> Result<Box<dyn Endpoint>, CamelError> {
1351 self.config.validate()?;
1352 let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1353 let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1354 let client = build_client(&self.config, config.cookie_handling);
1355 ctx.register_current_route_health_check(Arc::new(HttpHealthCheck::new(
1356 server_config.host.clone(),
1357 server_config.port,
1358 )));
1359 Ok(Box::new(HttpEndpoint {
1360 uri: uri.to_string(),
1361 config,
1362 server_config,
1363 client,
1364 }))
1365 }
1366}
1367
1368struct HttpEndpoint {
1373 uri: String,
1374 config: HttpEndpointConfig,
1375 server_config: HttpServerConfig,
1376 client: reqwest::Client,
1377}
1378
1379impl Endpoint for HttpEndpoint {
1380 fn uri(&self) -> &str {
1381 &self.uri
1382 }
1383
1384 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1385 Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
1386 }
1387
1388 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1389 let producer = HttpProducer {
1390 config: Arc::new(self.config.clone()),
1391 client: self.client.clone(),
1392 };
1393 if let Some(ref provider) = self.config.token_provider {
1394 let layer = BearerTokenLayer::new(Arc::clone(provider));
1395 Ok(BoxProcessor::new(layer.layer(producer)))
1396 } else {
1397 Ok(BoxProcessor::new(producer))
1398 }
1399 }
1400}
1401
1402fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
1407 let parsed = url::Url::parse(url)
1408 .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1409
1410 if let Some(host) = parsed.host_str()
1412 && config.blocked_hosts.iter().any(|blocked| host == blocked)
1413 {
1414 return Err(CamelError::ProcessorError(format!(
1415 "Host '{}' is blocked",
1416 host
1417 )));
1418 }
1419
1420 if !config.allow_private_ips
1422 && let Some(host) = parsed.host()
1423 {
1424 match host {
1425 url::Host::Ipv4(ip) => {
1426 if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
1427 return Err(CamelError::ProcessorError(format!(
1428 "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
1429 ip
1430 )));
1431 }
1432 }
1433 url::Host::Ipv6(ip) => {
1434 if ip.is_loopback() {
1435 return Err(CamelError::ProcessorError(format!(
1436 "Loopback IP '{}' not allowed",
1437 ip
1438 )));
1439 }
1440 }
1441 url::Host::Domain(domain) => {
1442 let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
1444 if blocked_domains.contains(&domain) {
1445 return Err(CamelError::ProcessorError(format!(
1446 "Domain '{}' is not allowed",
1447 domain
1448 )));
1449 }
1450 }
1451 }
1452 }
1453
1454 Ok(())
1455}
1456
1457fn is_private_ip(ip: &IpAddr) -> bool {
1458 match ip {
1459 IpAddr::V4(ipv4) => {
1460 ipv4.is_private() || ipv4.is_loopback() || ipv4.is_link_local() || ipv4.octets()[0] == 0
1461 }
1462 IpAddr::V6(ipv6) => {
1463 let seg0 = ipv6.segments()[0];
1464 ipv6.is_loopback()
1465 || (seg0 & 0xfe00) == 0xfc00
1467 || (seg0 & 0xffc0) == 0xfe80
1469 || ipv6
1471 .to_ipv4_mapped()
1472 .map(|v4| {
1473 v4.is_private()
1474 || v4.is_loopback()
1475 || v4.is_link_local()
1476 || v4.octets()[0] == 0
1477 })
1478 .unwrap_or(false)
1479 }
1480 }
1481}
1482
1483async fn validate_resolved_host_for_ssrf(
1484 url: &str,
1485 config: &HttpEndpointConfig,
1486) -> Result<(), CamelError> {
1487 if config.allow_private_ips {
1488 return Ok(());
1489 }
1490
1491 let parsed = url::Url::parse(url)
1492 .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1493 let Some(host) = parsed.host_str() else {
1494 return Ok(());
1495 };
1496 let Some(port) = parsed.port_or_known_default() else {
1497 return Ok(());
1498 };
1499
1500 let resolved = tokio::net::lookup_host((host, port)).await.map_err(|e| {
1501 CamelError::ProcessorError(format!("Failed to resolve host '{}': {}", host, e))
1502 })?;
1503
1504 for addr in resolved {
1505 let ip = addr.ip();
1506 if is_private_ip(&ip) {
1507 return Err(CamelError::ProcessorError(format!(
1508 "Target resolved to private IP: {}",
1509 ip
1510 )));
1511 }
1512 }
1513
1514 Ok(())
1515}
1516
1517#[derive(Clone)]
1522struct HttpProducer {
1523 config: Arc<HttpEndpointConfig>,
1524 client: reqwest::Client,
1525}
1526
1527impl HttpProducer {
1528 fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1529 if let Some(ref method) = config.http_method {
1530 return method.to_uppercase();
1531 }
1532 if let Some(method) = exchange
1533 .input
1534 .header("CamelHttpMethod")
1535 .and_then(|v| v.as_str())
1536 {
1537 return method.to_uppercase();
1538 }
1539 if !exchange.input.body.is_empty() {
1540 return "POST".to_string();
1541 }
1542 "GET".to_string()
1543 }
1544
1545 fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1546 if let Some(uri) = exchange
1547 .input
1548 .header("CamelHttpUri")
1549 .and_then(|v| v.as_str())
1550 {
1551 let mut url = uri.to_string();
1552 if let Some(path) = exchange
1553 .input
1554 .header("CamelHttpPath")
1555 .and_then(|v| v.as_str())
1556 {
1557 if !url.ends_with('/') && !path.starts_with('/') {
1558 url.push('/');
1559 }
1560 url.push_str(path);
1561 }
1562 if let Some(query) = exchange
1563 .input
1564 .header("CamelHttpQuery")
1565 .and_then(|v| v.as_str())
1566 {
1567 url.push('?');
1568 url.push_str(query);
1569 }
1570 return url;
1571 }
1572
1573 let mut url = config.base_url.clone();
1574
1575 if let Some(path) = exchange
1576 .input
1577 .header("CamelHttpPath")
1578 .and_then(|v| v.as_str())
1579 {
1580 if !url.ends_with('/') && !path.starts_with('/') {
1581 url.push('/');
1582 }
1583 url.push_str(path);
1584 }
1585
1586 if let Some(query) = exchange
1587 .input
1588 .header("CamelHttpQuery")
1589 .and_then(|v| v.as_str())
1590 {
1591 url.push('?');
1592 url.push_str(query);
1593 } else if !config.query_params.is_empty() {
1594 let mut parsed = url::Url::parse(&url).expect("base URL must be valid"); for (k, v) in &config.query_params {
1596 parsed.query_pairs_mut().append_pair(k, v);
1597 }
1598 url = parsed.to_string();
1599 }
1600
1601 url
1602 }
1603
1604 fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1605 status >= range.0 && status <= range.1
1606 }
1607}
1608
1609impl Service<Exchange> for HttpProducer {
1610 type Response = Exchange;
1611 type Error = CamelError;
1612 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1613
1614 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1615 Poll::Ready(Ok(()))
1616 }
1617
1618 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1619 let config = self.config.clone();
1620 let client = self.client.clone();
1621
1622 Box::pin(async move {
1623 let method_str = HttpProducer::resolve_method(&exchange, &config);
1624 let url = HttpProducer::resolve_url(&exchange, &config);
1625
1626 validate_url_for_ssrf(&url, &config)?;
1628 validate_resolved_host_for_ssrf(&url, &config).await?;
1629
1630 debug!(
1631 correlation_id = %exchange.correlation_id(),
1632 method = %method_str,
1633 url = %url,
1634 "HTTP request"
1635 );
1636
1637 let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1638 CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1639 })?;
1640
1641 let mut request = client.request(method, &url);
1642
1643 if let Some(timeout) = config.response_timeout {
1644 request = request.timeout(timeout);
1645 }
1646
1647 if let Some(user_agent) = &config.user_agent
1648 && !config.bridge_endpoint
1649 {
1650 request = request.header("User-Agent", user_agent.clone());
1651 }
1652
1653 #[cfg(feature = "otel")]
1655 let should_inject_otel = !config.bridge_endpoint;
1656 #[cfg(feature = "otel")]
1657 if should_inject_otel {
1658 let mut otel_headers = HashMap::new();
1659 camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1660 for (k, v) in otel_headers {
1661 if let (Ok(name), Ok(val)) = (
1662 reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1663 reqwest::header::HeaderValue::from_str(&v),
1664 ) {
1665 request = request.header(name, val);
1666 }
1667 }
1668 }
1669
1670 for (key, value) in &exchange.input.headers {
1671 if !key.starts_with("Camel")
1672 && !config
1673 .skip_request_headers
1674 .iter()
1675 .any(|h| h.eq_ignore_ascii_case(key))
1676 && let Some(val_str) = value.as_str()
1677 && let (Ok(name), Ok(val)) = (
1678 reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1679 reqwest::header::HeaderValue::from_str(val_str),
1680 )
1681 {
1682 request = request.header(name, val);
1683 }
1684 }
1685
1686 if !config.bridge_endpoint {
1687 match &config.auth {
1688 HttpAuth::None => {}
1689 HttpAuth::Basic { username, password } => {
1690 request = request.basic_auth(username, Some(password));
1691 }
1692 HttpAuth::Bearer { token } => {
1693 request = request.bearer_auth(token);
1694 }
1695 }
1696
1697 if config.connection_close {
1698 request = request.header("Connection", "close");
1699 }
1700 }
1701
1702 match exchange.input.body {
1703 Body::Stream(ref s) => {
1704 let mut stream_lock = s.stream.lock().await;
1705 if let Some(stream) = stream_lock.take() {
1706 request = request.body(reqwest::Body::wrap_stream(stream));
1707 } else {
1708 return Err(CamelError::AlreadyConsumed);
1709 }
1710 }
1711 _ => {
1712 let body = std::mem::take(&mut exchange.input.body);
1714 let bytes = body.into_bytes(config.max_body_size).await?;
1715 if !bytes.is_empty() {
1716 request = request.body(bytes);
1717 }
1718 }
1719 }
1720
1721 let response = request
1722 .send()
1723 .await
1724 .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1725
1726 let status_code = response.status().as_u16();
1727 let status_text = response
1728 .status()
1729 .canonical_reason()
1730 .unwrap_or("Unknown")
1731 .to_string();
1732
1733 for (key, value) in response.headers() {
1734 if config
1735 .skip_response_headers
1736 .iter()
1737 .any(|h| h.eq_ignore_ascii_case(key.as_str()))
1738 {
1739 continue;
1740 }
1741 if let Ok(val_str) = value.to_str() {
1742 exchange.input.set_header(
1743 title_case_header(key.as_str()),
1744 serde_json::Value::String(val_str.to_string()),
1745 );
1746 }
1747 }
1748
1749 exchange.input.set_header(
1750 "CamelHttpResponseCode",
1751 serde_json::Value::Number(status_code.into()),
1752 );
1753 exchange.input.set_header(
1754 "CamelHttpResponseText",
1755 serde_json::Value::String(status_text.clone()),
1756 );
1757
1758 let read_timeout = Duration::from_millis(config.read_timeout_ms);
1760 let response_body = tokio::time::timeout(read_timeout, async {
1761 if let Some(content_len) = response.content_length()
1763 && content_len > config.max_response_bytes as u64
1764 {
1765 return Err(CamelError::ProcessorError(format!(
1766 "Response body too large: {} bytes exceeds limit of {} bytes",
1767 content_len, config.max_response_bytes
1768 )));
1769 }
1770 use futures::TryStreamExt;
1772 let mut stream = response.bytes_stream();
1773 let mut total: usize = 0;
1774 let mut collected = Vec::new();
1775 while let Some(chunk) = stream.try_next().await.map_err(|e| {
1776 CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1777 })? {
1778 total += chunk.len();
1779 if total > config.max_response_bytes {
1780 return Err(CamelError::ProcessorError(format!(
1781 "Response body too large: {} bytes exceeds limit of {} bytes",
1782 total, config.max_response_bytes
1783 )));
1784 }
1785 collected.push(chunk);
1786 }
1787 let mut result = bytes::BytesMut::with_capacity(total);
1788 for chunk in collected {
1789 result.extend_from_slice(&chunk);
1790 }
1791 Ok::<bytes::Bytes, CamelError>(result.freeze())
1792 })
1793 .await
1794 .map_err(|_| {
1795 CamelError::ProcessorError(format!(
1796 "Read timeout after {}ms",
1797 config.read_timeout_ms
1798 ))
1799 })??;
1800
1801 if config.throw_exception_on_failure
1802 && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1803 {
1804 return Err(CamelError::HttpOperationFailed {
1805 method: method_str,
1806 url,
1807 status_code,
1808 status_text,
1809 response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1810 });
1811 }
1812
1813 if !response_body.is_empty() {
1814 exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1815 }
1816
1817 debug!(
1818 correlation_id = %exchange.correlation_id(),
1819 status = status_code,
1820 url = %url,
1821 "HTTP response"
1822 );
1823 Ok(exchange)
1824 })
1825 }
1826}
1827
1828#[cfg(test)]
1829mod tests {
1830 use super::*;
1831 use camel_component_api::{Message, NoOpComponentContext};
1832 use std::sync::Arc;
1833 use std::time::Duration;
1834
1835 fn test_producer_ctx() -> ProducerContext {
1836 ProducerContext::new()
1837 }
1838
1839 #[test]
1840 fn test_http_config_defaults() {
1841 let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1842 assert_eq!(config.base_url, "http://localhost:8080/api");
1843 assert!(config.http_method.is_none());
1844 assert!(config.throw_exception_on_failure);
1845 assert_eq!(config.ok_status_code_range, (200, 299));
1846 assert!(config.response_timeout.is_none());
1847 assert!(matches!(config.auth, HttpAuth::None));
1848 assert!(matches!(config.cookie_handling, CookieHandling::Disabled));
1849 assert!(!config.bridge_endpoint);
1850 assert!(!config.connection_close);
1851 }
1852
1853 #[test]
1854 fn test_http_config_scheme() {
1855 assert_eq!(HttpEndpointConfig::scheme(), "http");
1857 }
1858
1859 #[test]
1860 fn test_http_config_from_components() {
1861 let components = camel_component_api::UriComponents {
1863 scheme: "https".to_string(),
1864 path: "//api.example.com/v1".to_string(),
1865 params: std::collections::HashMap::from([(
1866 "httpMethod".to_string(),
1867 "POST".to_string(),
1868 )]),
1869 };
1870 let config = HttpEndpointConfig::from_components(components).unwrap();
1871 assert_eq!(config.base_url, "https://api.example.com/v1");
1872 assert_eq!(config.http_method, Some("POST".to_string()));
1873 }
1874
1875 #[test]
1876 fn test_http_config_with_options() {
1877 let config = HttpEndpointConfig::from_uri(
1878 "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1879 ).unwrap();
1880 assert_eq!(config.base_url, "https://api.example.com/v1");
1881 assert_eq!(config.http_method, Some("PUT".to_string()));
1882 assert!(!config.throw_exception_on_failure);
1883 assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1884 }
1885
1886 #[test]
1887 fn test_http_endpoint_config_auth_and_headers_options() {
1888 let config = HttpEndpointConfig::from_uri(
1889 "http://localhost/api?authMethod=Basic&authUsername=u&authPassword=p&userAgent=camel-test&bridgeEndpoint=true&connectionClose=true&skipRequestHeaders=Authorization,X-Secret&skipResponseHeaders=Set-Cookie&cookieHandling=InMemory",
1890 )
1891 .unwrap();
1892
1893 assert!(matches!(
1894 config.auth,
1895 HttpAuth::Basic { username, password } if username == "u" && password == "p"
1896 ));
1897 assert_eq!(config.user_agent.as_deref(), Some("camel-test"));
1898 assert!(matches!(config.cookie_handling, CookieHandling::InMemory));
1899 assert!(config.bridge_endpoint);
1900 assert!(config.connection_close);
1901 assert_eq!(
1902 config.skip_request_headers,
1903 vec!["authorization".to_string(), "x-secret".to_string()]
1904 );
1905 assert_eq!(config.skip_response_headers, vec!["set-cookie".to_string()]);
1906 }
1907
1908 #[test]
1909 fn test_http_endpoint_config_bearer_auth() {
1910 let config = HttpEndpointConfig::from_uri(
1911 "http://localhost/api?authMethod=Bearer&authBearerToken=t",
1912 )
1913 .unwrap();
1914 assert!(matches!(
1915 config.auth,
1916 HttpAuth::Bearer { token } if token == "t"
1917 ));
1918 }
1919
1920 #[test]
1921 fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
1922 let config = HttpConfig::default()
1923 .with_response_timeout_ms(999)
1924 .with_allow_private_ips(true)
1925 .with_blocked_hosts(vec!["evil.com".to_string()])
1926 .with_max_body_size(12345);
1927 let endpoint =
1928 HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
1929 assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
1930 assert!(endpoint.allow_private_ips);
1931 assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
1932 assert_eq!(endpoint.max_body_size, 12345);
1933 }
1934
1935 #[test]
1936 fn test_from_uri_with_defaults_uri_overrides_config() {
1937 let config = HttpConfig::default()
1938 .with_response_timeout_ms(999)
1939 .with_allow_private_ips(true)
1940 .with_blocked_hosts(vec!["evil.com".to_string()])
1941 .with_max_body_size(12345);
1942 let endpoint = HttpEndpointConfig::from_uri_with_defaults(
1943 "http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
1944 &config,
1945 )
1946 .unwrap();
1947 assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
1948 assert!(!endpoint.allow_private_ips);
1949 assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
1950 assert_eq!(endpoint.max_body_size, 99);
1951 }
1952
1953 #[test]
1954 fn test_http_config_ok_status_range() {
1955 let config =
1956 HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1957 assert_eq!(config.ok_status_code_range, (200, 204));
1958 }
1959
1960 #[test]
1961 fn test_http_config_wrong_scheme() {
1962 let result = HttpEndpointConfig::from_uri("file:/tmp");
1963 assert!(result.is_err());
1964 }
1965
1966 #[test]
1967 fn test_http_component_scheme() {
1968 let component = HttpComponent::new();
1969 assert_eq!(component.scheme(), "http");
1970 }
1971
1972 #[test]
1973 fn test_https_component_scheme() {
1974 let component = HttpsComponent::new();
1975 assert_eq!(component.scheme(), "https");
1976 }
1977
1978 #[test]
1979 fn test_http_endpoint_creates_consumer() {
1980 let component = HttpComponent::new();
1981 let ctx = NoOpComponentContext;
1982 let endpoint = component
1983 .create_endpoint("http://0.0.0.0:19100/test", &ctx)
1984 .unwrap();
1985 assert!(endpoint.create_consumer().is_ok());
1986 }
1987
1988 #[test]
1989 fn test_https_endpoint_creates_consumer() {
1990 let component = HttpsComponent::new();
1991 let ctx = NoOpComponentContext;
1992 let endpoint = component
1993 .create_endpoint("https://0.0.0.0:8443/test", &ctx)
1994 .unwrap();
1995 assert!(endpoint.create_consumer().is_ok());
1996 }
1997
1998 #[test]
1999 fn test_http_endpoint_creates_producer() {
2000 let ctx = test_producer_ctx();
2001 let component = HttpComponent::new();
2002 let endpoint_ctx = NoOpComponentContext;
2003 let endpoint = component
2004 .create_endpoint("http://localhost/api", &endpoint_ctx)
2005 .unwrap();
2006 assert!(endpoint.create_producer(&ctx).is_ok());
2007 }
2008
2009 #[tokio::test]
2014 async fn test_producer_with_token_provider() {
2015 use camel_auth::oauth2::TokenProvider;
2016 use tower::ServiceExt;
2017
2018 let captured_auth: Arc<std::sync::Mutex<Option<String>>> =
2019 Arc::new(std::sync::Mutex::new(None));
2020 let captured_clone = Arc::clone(&captured_auth);
2021
2022 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2023 let port = listener.local_addr().unwrap().port();
2024
2025 let _handle = tokio::spawn(async move {
2026 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2027 if let Ok((mut stream, _)) = listener.accept().await {
2028 let mut buf = vec![0u8; 8192];
2029 let n = stream.read(&mut buf).await.unwrap_or(0);
2030 let request = String::from_utf8_lossy(&buf[..n]).to_string();
2031 let auth = request
2032 .lines()
2033 .find(|l| l.to_lowercase().starts_with("authorization:"))
2034 .map(|l| {
2035 l.split(':')
2036 .nth(1)
2037 .map(|s| s.trim().to_string())
2038 .unwrap_or_default()
2039 });
2040 *captured_clone.lock().unwrap() = auth;
2041 let body = r#"{"echo":"ok"}"#;
2042 let resp = format!(
2043 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
2044 body.len(),
2045 body
2046 );
2047 let _ = stream.write_all(resp.as_bytes()).await;
2048 }
2049 });
2050
2051 #[derive(Debug)]
2052 struct StaticProvider;
2053 #[async_trait::async_trait]
2054 impl TokenProvider for StaticProvider {
2055 async fn get_token(&self) -> Result<String, camel_auth::types::AuthError> {
2056 Ok("injected-token".into())
2057 }
2058 }
2059
2060 let uri = format!("http://127.0.0.1:{}/api?allowPrivateIps=true", port);
2061 let ctx = test_producer_ctx();
2062 let component = HttpComponent::new();
2063 let endpoint_ctx = NoOpComponentContext;
2064 let endpoint = component.create_endpoint(&uri, &endpoint_ctx).unwrap();
2065 let producer = endpoint.create_producer(&ctx).unwrap();
2066
2067 let mut exchange = Exchange::new(Message::new("hello"));
2068
2069 let layer = BearerTokenLayer::new(Arc::new(StaticProvider));
2070 let mut layered = layer.layer(producer);
2071 let result = layered.ready().await.unwrap().call(exchange).await;
2072 assert!(result.is_ok(), "producer call failed: {:?}", result);
2073
2074 tokio::time::sleep(Duration::from_millis(100)).await;
2075 let auth = captured_auth.lock().unwrap().take();
2076 assert_eq!(auth.as_deref(), Some("Bearer injected-token"));
2077 }
2078
2079 async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
2080 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2081 let addr = listener.local_addr().unwrap();
2082 let url = format!("http://127.0.0.1:{}", addr.port());
2083
2084 let handle = tokio::spawn(async move {
2085 loop {
2086 if let Ok((mut stream, _)) = listener.accept().await {
2087 tokio::spawn(async move {
2088 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2089 let mut buf = vec![0u8; 4096];
2090 let n = stream.read(&mut buf).await.unwrap_or(0);
2091 let request = String::from_utf8_lossy(&buf[..n]).to_string();
2092
2093 let method = request.split_whitespace().next().unwrap_or("GET");
2094
2095 let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
2096 let response = format!(
2097 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
2098 body.len(),
2099 body
2100 );
2101 let _ = stream.write_all(response.as_bytes()).await;
2102 });
2103 }
2104 }
2105 });
2106
2107 (url, handle)
2108 }
2109
2110 async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
2111 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2112 let addr = listener.local_addr().unwrap();
2113 let url = format!("http://127.0.0.1:{}", addr.port());
2114
2115 let handle = tokio::spawn(async move {
2116 loop {
2117 if let Ok((mut stream, _)) = listener.accept().await {
2118 let status = status;
2119 tokio::spawn(async move {
2120 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2121 let mut buf = vec![0u8; 4096];
2122 let _ = stream.read(&mut buf).await;
2123
2124 let status_text = match status {
2125 404 => "Not Found",
2126 500 => "Internal Server Error",
2127 _ => "Error",
2128 };
2129 let body = "error body";
2130 let response = format!(
2131 "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
2132 status,
2133 status_text,
2134 body.len(),
2135 body
2136 );
2137 let _ = stream.write_all(response.as_bytes()).await;
2138 });
2139 }
2140 }
2141 });
2142
2143 (url, handle)
2144 }
2145
2146 #[tokio::test]
2147 async fn test_http_producer_get_request() {
2148 use tower::ServiceExt;
2149
2150 let (url, _handle) = start_test_server().await;
2151 let ctx = test_producer_ctx();
2152
2153 let component = HttpComponent::new();
2154 let endpoint_ctx = NoOpComponentContext;
2155 let endpoint = component
2156 .create_endpoint(
2157 &format!("{url}/api/test?allowPrivateIps=true"),
2158 &endpoint_ctx,
2159 )
2160 .unwrap();
2161 let producer = endpoint.create_producer(&ctx).unwrap();
2162
2163 let exchange = Exchange::new(Message::default());
2164 let result = producer.oneshot(exchange).await.unwrap();
2165
2166 let status = result
2167 .input
2168 .header("CamelHttpResponseCode")
2169 .and_then(|v| v.as_u64())
2170 .unwrap();
2171 assert_eq!(status, 200);
2172
2173 assert!(!result.input.body.is_empty());
2174 }
2175
2176 #[tokio::test]
2177 async fn test_http_producer_post_with_body() {
2178 use tower::ServiceExt;
2179
2180 let (url, _handle) = start_test_server().await;
2181 let ctx = test_producer_ctx();
2182
2183 let component = HttpComponent::new();
2184 let endpoint_ctx = NoOpComponentContext;
2185 let endpoint = component
2186 .create_endpoint(
2187 &format!("{url}/api/data?allowPrivateIps=true"),
2188 &endpoint_ctx,
2189 )
2190 .unwrap();
2191 let producer = endpoint.create_producer(&ctx).unwrap();
2192
2193 let exchange = Exchange::new(Message::new("request body"));
2194 let result = producer.oneshot(exchange).await.unwrap();
2195
2196 let status = result
2197 .input
2198 .header("CamelHttpResponseCode")
2199 .and_then(|v| v.as_u64())
2200 .unwrap();
2201 assert_eq!(status, 200);
2202 }
2203
2204 #[tokio::test]
2205 async fn test_http_producer_method_from_header() {
2206 use tower::ServiceExt;
2207
2208 let (url, _handle) = start_test_server().await;
2209 let ctx = test_producer_ctx();
2210
2211 let component = HttpComponent::new();
2212 let endpoint_ctx = NoOpComponentContext;
2213 let endpoint = component
2214 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2215 .unwrap();
2216 let producer = endpoint.create_producer(&ctx).unwrap();
2217
2218 let mut exchange = Exchange::new(Message::default());
2219 exchange.input.set_header(
2220 "CamelHttpMethod",
2221 serde_json::Value::String("DELETE".to_string()),
2222 );
2223
2224 let result = producer.oneshot(exchange).await.unwrap();
2225 let status = result
2226 .input
2227 .header("CamelHttpResponseCode")
2228 .and_then(|v| v.as_u64())
2229 .unwrap();
2230 assert_eq!(status, 200);
2231 }
2232
2233 #[tokio::test]
2234 async fn test_http_producer_forced_method() {
2235 use tower::ServiceExt;
2236
2237 let (url, _handle) = start_test_server().await;
2238 let ctx = test_producer_ctx();
2239
2240 let component = HttpComponent::new();
2241 let endpoint_ctx = NoOpComponentContext;
2242 let endpoint = component
2243 .create_endpoint(
2244 &format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"),
2245 &endpoint_ctx,
2246 )
2247 .unwrap();
2248 let producer = endpoint.create_producer(&ctx).unwrap();
2249
2250 let exchange = Exchange::new(Message::default());
2251 let result = producer.oneshot(exchange).await.unwrap();
2252
2253 let status = result
2254 .input
2255 .header("CamelHttpResponseCode")
2256 .and_then(|v| v.as_u64())
2257 .unwrap();
2258 assert_eq!(status, 200);
2259 }
2260
2261 #[tokio::test]
2262 async fn test_http_producer_throw_exception_on_failure() {
2263 use tower::ServiceExt;
2264
2265 let (url, _handle) = start_status_server(404).await;
2266 let ctx = test_producer_ctx();
2267
2268 let component = HttpComponent::new();
2269 let endpoint_ctx = NoOpComponentContext;
2270 let endpoint = component
2271 .create_endpoint(
2272 &format!("{url}/not-found?allowPrivateIps=true"),
2273 &endpoint_ctx,
2274 )
2275 .unwrap();
2276 let producer = endpoint.create_producer(&ctx).unwrap();
2277
2278 let exchange = Exchange::new(Message::default());
2279 let result = producer.oneshot(exchange).await;
2280 assert!(result.is_err());
2281
2282 match result.unwrap_err() {
2283 CamelError::HttpOperationFailed { status_code, .. } => {
2284 assert_eq!(status_code, 404);
2285 }
2286 e => panic!("Expected HttpOperationFailed, got: {e}"),
2287 }
2288 }
2289
2290 #[tokio::test]
2291 async fn test_http_producer_no_throw_on_failure() {
2292 use tower::ServiceExt;
2293
2294 let (url, _handle) = start_status_server(500).await;
2295 let ctx = test_producer_ctx();
2296
2297 let component = HttpComponent::new();
2298 let endpoint_ctx = NoOpComponentContext;
2299 let endpoint = component
2300 .create_endpoint(
2301 &format!("{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"),
2302 &endpoint_ctx,
2303 )
2304 .unwrap();
2305 let producer = endpoint.create_producer(&ctx).unwrap();
2306
2307 let exchange = Exchange::new(Message::default());
2308 let result = producer.oneshot(exchange).await.unwrap();
2309
2310 let status = result
2311 .input
2312 .header("CamelHttpResponseCode")
2313 .and_then(|v| v.as_u64())
2314 .unwrap();
2315 assert_eq!(status, 500);
2316 }
2317
2318 #[tokio::test]
2319 async fn test_http_producer_uri_override() {
2320 use tower::ServiceExt;
2321
2322 let (url, _handle) = start_test_server().await;
2323 let ctx = test_producer_ctx();
2324
2325 let component = HttpComponent::new();
2326 let endpoint_ctx = NoOpComponentContext;
2327 let endpoint = component
2328 .create_endpoint(
2329 "http://localhost:1/does-not-exist?allowPrivateIps=true",
2330 &endpoint_ctx,
2331 )
2332 .unwrap();
2333 let producer = endpoint.create_producer(&ctx).unwrap();
2334
2335 let mut exchange = Exchange::new(Message::default());
2336 exchange.input.set_header(
2337 "CamelHttpUri",
2338 serde_json::Value::String(format!("{url}/api")),
2339 );
2340
2341 let result = producer.oneshot(exchange).await.unwrap();
2342 let status = result
2343 .input
2344 .header("CamelHttpResponseCode")
2345 .and_then(|v| v.as_u64())
2346 .unwrap();
2347 assert_eq!(status, 200);
2348 }
2349
2350 #[tokio::test]
2351 async fn test_http_producer_response_headers_mapped() {
2352 use tower::ServiceExt;
2353
2354 let (url, _handle) = start_test_server().await;
2355 let ctx = test_producer_ctx();
2356
2357 let component = HttpComponent::new();
2358 let endpoint_ctx = NoOpComponentContext;
2359 let endpoint = component
2360 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2361 .unwrap();
2362 let producer = endpoint.create_producer(&ctx).unwrap();
2363
2364 let exchange = Exchange::new(Message::default());
2365 let result = producer.oneshot(exchange).await.unwrap();
2366
2367 assert!(
2368 result.input.header("Content-Type").is_some(),
2369 "Response should have Content-Type header"
2370 );
2371 assert!(result.input.header("CamelHttpResponseText").is_some());
2372 }
2373
2374 async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
2379 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2380 let addr = listener.local_addr().unwrap();
2381 let url = format!("http://127.0.0.1:{}", addr.port());
2382
2383 let handle = tokio::spawn(async move {
2384 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2385 loop {
2386 if let Ok((mut stream, _)) = listener.accept().await {
2387 tokio::spawn(async move {
2388 let mut buf = vec![0u8; 4096];
2389 let n = stream.read(&mut buf).await.unwrap_or(0);
2390 let request = String::from_utf8_lossy(&buf[..n]).to_string();
2391
2392 if request.contains("GET /final") {
2394 let body = r#"{"status":"final"}"#;
2395 let response = format!(
2396 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
2397 body.len(),
2398 body
2399 );
2400 let _ = stream.write_all(response.as_bytes()).await;
2401 } else {
2402 let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
2404 let _ = stream.write_all(response.as_bytes()).await;
2405 }
2406 });
2407 }
2408 }
2409 });
2410
2411 (url, handle)
2412 }
2413
2414 #[tokio::test]
2415 async fn test_follow_redirects_false_does_not_follow() {
2416 use tower::ServiceExt;
2417
2418 let (url, _handle) = start_redirect_server().await;
2419 let ctx = test_producer_ctx();
2420
2421 let component =
2422 HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
2423 let endpoint_ctx = NoOpComponentContext;
2424 let endpoint = component
2425 .create_endpoint(
2426 &format!("{url}?throwExceptionOnFailure=false&allowPrivateIps=true"),
2427 &endpoint_ctx,
2428 )
2429 .unwrap();
2430 let producer = endpoint.create_producer(&ctx).unwrap();
2431
2432 let exchange = Exchange::new(Message::default());
2433 let result = producer.oneshot(exchange).await.unwrap();
2434
2435 let status = result
2437 .input
2438 .header("CamelHttpResponseCode")
2439 .and_then(|v| v.as_u64())
2440 .unwrap();
2441 assert_eq!(
2442 status, 302,
2443 "Should NOT follow redirect when followRedirects=false"
2444 );
2445 }
2446
2447 #[tokio::test]
2448 async fn test_follow_redirects_true_follows_redirect() {
2449 use tower::ServiceExt;
2450
2451 let (url, _handle) = start_redirect_server().await;
2452 let ctx = test_producer_ctx();
2453
2454 let component =
2455 HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
2456 let endpoint_ctx = NoOpComponentContext;
2457 let endpoint = component
2458 .create_endpoint(&format!("{url}?allowPrivateIps=true"), &endpoint_ctx)
2459 .unwrap();
2460 let producer = endpoint.create_producer(&ctx).unwrap();
2461
2462 let exchange = Exchange::new(Message::default());
2463 let result = producer.oneshot(exchange).await.unwrap();
2464
2465 let status = result
2467 .input
2468 .header("CamelHttpResponseCode")
2469 .and_then(|v| v.as_u64())
2470 .unwrap();
2471 assert_eq!(
2472 status, 200,
2473 "Should follow redirect when followRedirects=true"
2474 );
2475 }
2476
2477 #[tokio::test]
2478 async fn test_query_params_forwarded_to_http_request() {
2479 use tower::ServiceExt;
2480
2481 let (url, _handle) = start_test_server().await;
2482 let ctx = test_producer_ctx();
2483
2484 let component = HttpComponent::new();
2485 let endpoint_ctx = NoOpComponentContext;
2486 let endpoint = component
2488 .create_endpoint(
2489 &format!("{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"),
2490 &endpoint_ctx,
2491 )
2492 .unwrap();
2493 let producer = endpoint.create_producer(&ctx).unwrap();
2494
2495 let exchange = Exchange::new(Message::default());
2496 let result = producer.oneshot(exchange).await.unwrap();
2497
2498 let status = result
2501 .input
2502 .header("CamelHttpResponseCode")
2503 .and_then(|v| v.as_u64())
2504 .unwrap();
2505 assert_eq!(status, 200);
2506 }
2507
2508 #[tokio::test]
2509 async fn test_non_camel_query_params_are_forwarded() {
2510 let config = HttpEndpointConfig::from_uri(
2513 "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
2514 )
2515 .unwrap();
2516
2517 assert!(
2519 config.query_params.contains_key("apiKey"),
2520 "apiKey should be preserved"
2521 );
2522 assert!(
2523 config.query_params.contains_key("token"),
2524 "token should be preserved"
2525 );
2526 assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
2527 assert_eq!(config.query_params.get("token").unwrap(), "abc456");
2528
2529 assert!(
2531 !config.query_params.contains_key("httpMethod"),
2532 "httpMethod should not be forwarded"
2533 );
2534 }
2535
2536 #[test]
2537 fn test_query_params_are_url_encoded_when_resolving_url() {
2538 let config =
2539 HttpEndpointConfig::from_uri("http://example.com/api?q=hello world&tag=a+b").unwrap();
2540 let exchange = Exchange::new(Message::default());
2541
2542 let url = HttpProducer::resolve_url(&exchange, &config);
2543
2544 assert!(url.contains("q=hello+world"), "url was: {url}");
2545 assert!(url.contains("tag=a%2Bb"), "url was: {url}");
2546 }
2547
2548 async fn start_slow_server(delay_ms: u64) -> (String, tokio::task::JoinHandle<()>) {
2553 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2554 let addr = listener.local_addr().unwrap();
2555 let url = format!("http://127.0.0.1:{}", addr.port());
2556
2557 let handle = tokio::spawn(async move {
2558 loop {
2559 if let Ok((mut stream, _)) = listener.accept().await {
2560 let delay = delay_ms;
2561 tokio::spawn(async move {
2562 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2563 let mut buf = vec![0u8; 4096];
2564 let _ = stream.read(&mut buf).await;
2565 let headers = "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nTransfer-Encoding: chunked\r\n\r\n";
2567 let _ = stream.write_all(headers.as_bytes()).await;
2568 tokio::time::sleep(Duration::from_millis(delay)).await;
2570 let body = r#"{"status":"slow"}"#;
2571 let chunk = format!("{:x}\r\n{}\r\n0\r\n\r\n", body.len(), body);
2572 let _ = stream.write_all(chunk.as_bytes()).await;
2573 });
2574 }
2575 }
2576 });
2577
2578 (url, handle)
2579 }
2580
2581 #[tokio::test]
2582 async fn test_http_producer_timeout() {
2583 use tower::ServiceExt;
2584
2585 let (url, _handle) = start_slow_server(500).await;
2587 let ctx = test_producer_ctx();
2588
2589 let component = HttpComponent::with_config(
2590 HttpConfig::default()
2591 .with_read_timeout_ms(100)
2592 .with_response_timeout_ms(30_000), );
2594 let endpoint_ctx = NoOpComponentContext;
2595 let endpoint = component
2596 .create_endpoint(&format!("{url}/slow?allowPrivateIps=true"), &endpoint_ctx)
2597 .unwrap();
2598 let producer = endpoint.create_producer(&ctx).unwrap();
2599
2600 let exchange = Exchange::new(Message::default());
2601 let result = producer.oneshot(exchange).await;
2602
2603 assert!(result.is_err(), "Expected timeout error, got: {:?}", result);
2604 let err = result.unwrap_err().to_string();
2605 assert!(
2606 err.contains("Read timeout") || err.contains("timeout"),
2607 "Error should mention timeout, got: {}",
2608 err
2609 );
2610 }
2611
2612 #[tokio::test]
2613 async fn test_http_producer_no_timeout_when_fast() {
2614 use tower::ServiceExt;
2615
2616 let (url, _handle) = start_test_server().await;
2617 let ctx = test_producer_ctx();
2618
2619 let component =
2620 HttpComponent::with_config(HttpConfig::default().with_read_timeout_ms(5_000));
2621 let endpoint_ctx = NoOpComponentContext;
2622 let endpoint = component
2623 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2624 .unwrap();
2625 let producer = endpoint.create_producer(&ctx).unwrap();
2626
2627 let exchange = Exchange::new(Message::default());
2628 let result = producer.oneshot(exchange).await.unwrap();
2629
2630 let status = result
2631 .input
2632 .header("CamelHttpResponseCode")
2633 .and_then(|v| v.as_u64())
2634 .unwrap();
2635 assert_eq!(status, 200);
2636 }
2637
2638 #[tokio::test]
2643 async fn test_http_producer_blocks_metadata_endpoint() {
2644 use tower::ServiceExt;
2645
2646 let ctx = test_producer_ctx();
2647 let component = HttpComponent::new();
2648 let endpoint_ctx = NoOpComponentContext;
2649 let endpoint = component
2650 .create_endpoint(
2651 "http://example.com/api?allowPrivateIps=false",
2652 &endpoint_ctx,
2653 )
2654 .unwrap();
2655 let producer = endpoint.create_producer(&ctx).unwrap();
2656
2657 let mut exchange = Exchange::new(Message::default());
2658 exchange.input.set_header(
2659 "CamelHttpUri",
2660 serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
2661 );
2662
2663 let result = producer.oneshot(exchange).await;
2664 assert!(result.is_err(), "Should block AWS metadata endpoint");
2665
2666 let err = result.unwrap_err();
2667 assert!(
2668 err.to_string().contains("Private IP"),
2669 "Error should mention private IP blocking, got: {}",
2670 err
2671 );
2672 }
2673
2674 #[test]
2675 fn test_ssrf_config_defaults() {
2676 let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
2677 assert!(
2678 !config.allow_private_ips,
2679 "Private IPs should be blocked by default"
2680 );
2681 assert!(
2682 config.blocked_hosts.is_empty(),
2683 "Blocked hosts should be empty by default"
2684 );
2685 }
2686
2687 #[test]
2688 fn test_ssrf_config_allow_private_ips() {
2689 let config =
2690 HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
2691 assert!(
2692 config.allow_private_ips,
2693 "Private IPs should be allowed when explicitly set"
2694 );
2695 }
2696
2697 #[test]
2698 fn test_ssrf_config_blocked_hosts() {
2699 let config = HttpEndpointConfig::from_uri(
2700 "http://example.com/api?blockedHosts=evil.com,malware.net",
2701 )
2702 .unwrap();
2703 assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
2704 }
2705
2706 #[tokio::test]
2707 async fn test_http_producer_blocks_localhost() {
2708 use tower::ServiceExt;
2709
2710 let ctx = test_producer_ctx();
2711 let component = HttpComponent::new();
2712 let endpoint_ctx = NoOpComponentContext;
2713 let endpoint = component
2714 .create_endpoint("http://example.com/api", &endpoint_ctx)
2715 .unwrap();
2716 let producer = endpoint.create_producer(&ctx).unwrap();
2717
2718 let mut exchange = Exchange::new(Message::default());
2719 exchange.input.set_header(
2720 "CamelHttpUri",
2721 serde_json::Value::String("http://localhost:8080/internal".to_string()),
2722 );
2723
2724 let result = producer.oneshot(exchange).await;
2725 assert!(result.is_err(), "Should block localhost");
2726 }
2727
2728 #[tokio::test]
2729 async fn test_http_producer_blocks_loopback_ip() {
2730 use tower::ServiceExt;
2731
2732 let ctx = test_producer_ctx();
2733 let component = HttpComponent::new();
2734 let endpoint_ctx = NoOpComponentContext;
2735 let endpoint = component
2736 .create_endpoint("http://example.com/api", &endpoint_ctx)
2737 .unwrap();
2738 let producer = endpoint.create_producer(&ctx).unwrap();
2739
2740 let mut exchange = Exchange::new(Message::default());
2741 exchange.input.set_header(
2742 "CamelHttpUri",
2743 serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
2744 );
2745
2746 let result = producer.oneshot(exchange).await;
2747 assert!(result.is_err(), "Should block loopback IP");
2748 }
2749
2750 #[tokio::test]
2751 async fn test_http_producer_allows_private_ip_when_enabled() {
2752 use tower::ServiceExt;
2753
2754 let ctx = test_producer_ctx();
2755 let component = HttpComponent::new();
2756 let endpoint_ctx = NoOpComponentContext;
2757 let endpoint = component
2760 .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true", &endpoint_ctx)
2761 .unwrap();
2762 let producer = endpoint.create_producer(&ctx).unwrap();
2763
2764 let exchange = Exchange::new(Message::default());
2765
2766 let result = producer.oneshot(exchange).await;
2769 if let Err(ref e) = result {
2771 let err_str = e.to_string();
2772 assert!(
2773 !err_str.contains("Private IP") && !err_str.contains("not allowed"),
2774 "Should not be SSRF error, got: {}",
2775 err_str
2776 );
2777 }
2778 }
2779
2780 #[test]
2785 fn test_http_server_config_parse() {
2786 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
2787 assert_eq!(cfg.host, "0.0.0.0");
2788 assert_eq!(cfg.port, 8080);
2789 assert_eq!(cfg.path, "/orders");
2790 assert_eq!(cfg.max_inflight_requests, 1024);
2791 }
2792
2793 #[test]
2794 fn test_http_server_config_scheme() {
2795 assert_eq!(HttpServerConfig::scheme(), "http");
2797 }
2798
2799 #[test]
2800 fn test_http_server_config_from_components() {
2801 let components = camel_component_api::UriComponents {
2803 scheme: "https".to_string(),
2804 path: "//0.0.0.0:8443/api".to_string(),
2805 params: std::collections::HashMap::from([
2806 ("maxRequestBody".to_string(), "5242880".to_string()),
2807 ("maxInflightRequests".to_string(), "7".to_string()),
2808 ]),
2809 };
2810 let cfg = HttpServerConfig::from_components(components).unwrap();
2811 assert_eq!(cfg.host, "0.0.0.0");
2812 assert_eq!(cfg.port, 8443);
2813 assert_eq!(cfg.path, "/api");
2814 assert_eq!(cfg.max_request_body, 5242880);
2815 assert_eq!(cfg.max_inflight_requests, 7);
2816 }
2817
2818 #[test]
2819 fn test_http_server_config_default_path() {
2820 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
2821 assert_eq!(cfg.path, "/");
2822 }
2823
2824 #[test]
2825 fn test_http_server_config_wrong_scheme() {
2826 assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
2827 }
2828
2829 #[test]
2830 fn test_http_server_config_invalid_port() {
2831 assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
2832 }
2833
2834 #[test]
2835 fn test_http_server_config_default_port_by_scheme() {
2836 let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
2838 assert_eq!(cfg_http.port, 80);
2839
2840 let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
2842 assert_eq!(cfg_https.port, 443);
2843 }
2844
2845 #[test]
2846 fn test_request_envelope_and_reply_are_send() {
2847 fn assert_send<T: Send>() {}
2848 assert_send::<RequestEnvelope>();
2849 assert_send::<HttpReply>();
2850 }
2851
2852 static REGISTRY_TEST_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
2866
2867 #[test]
2868 fn test_server_registry_global_is_singleton() {
2869 let r1 = ServerRegistry::global();
2870 let r2 = ServerRegistry::global();
2871 assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
2872 }
2873
2874 #[tokio::test]
2875 async fn test_concurrent_get_or_spawn_returns_same_dispatch() {
2876 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2877 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2878 let port = listener.local_addr().unwrap().port();
2879 drop(listener);
2880
2881 let results: Arc<std::sync::Mutex<Vec<DispatchTable>>> =
2882 Arc::new(std::sync::Mutex::new(Vec::new()));
2883
2884 let mut handles = Vec::new();
2885 for _ in 0..4 {
2886 let results = results.clone();
2887 handles.push(tokio::spawn(async move {
2888 let dispatch = ServerRegistry::global()
2889 .get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2890 .await
2891 .unwrap();
2892 results.lock().unwrap().push(dispatch);
2893 }));
2894 }
2895
2896 for h in handles {
2897 h.await.unwrap();
2898 }
2899
2900 let dispatches = results.lock().unwrap();
2901 assert_eq!(dispatches.len(), 4);
2902 for i in 1..dispatches.len() {
2903 assert!(
2904 Arc::ptr_eq(&dispatches[0], &dispatches[i]),
2905 "all concurrent callers should get the same dispatch table"
2906 );
2907 }
2908 }
2909
2910 #[test]
2911 fn test_server_registry_distinguishes_host_and_port() {
2912 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2913 let rt = tokio::runtime::Runtime::new().expect("runtime");
2914 rt.block_on(async {
2915 let registry = ServerRegistry::global();
2916 let d1 = registry
2920 .get_or_spawn("127.0.0.1", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2921 .await;
2922 let d2 = registry
2923 .get_or_spawn("0.0.0.0", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2924 .await;
2925 assert!(d1.is_ok());
2926 assert!(d2.is_ok());
2927 assert!(!Arc::ptr_eq(&d1.unwrap(), &d2.unwrap()));
2928 });
2929 }
2930
2931 #[tokio::test]
2932 async fn test_shared_server_max_request_body_policy_is_deterministic() {
2933 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2934 let registry = ServerRegistry::global();
2935 let d1 = registry
2937 .get_or_spawn("127.0.0.1", 9991, 1024 * 1024, 10 * 1024 * 1024, 1024)
2938 .await;
2939 assert!(d1.is_ok());
2940
2941 let d2 = registry
2944 .get_or_spawn("127.0.0.1", 9991, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2945 .await;
2946 assert!(d2.is_err());
2947 let err = d2.unwrap_err();
2948 assert!(
2949 err.to_string().contains("maxRequestBody") || err.to_string().contains("incompatible"),
2950 "Expected incompatible maxRequestBody error, got: {}",
2951 err
2952 );
2953 }
2954
2955 #[test]
2956 fn test_server_registry_reset_clears_entries() {
2957 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2958 let rt = tokio::runtime::Runtime::new().expect("runtime");
2959 rt.block_on(async {
2960 let d1 = ServerRegistry::global()
2962 .get_or_spawn("127.0.0.1", 9992, 1024 * 1024, 10 * 1024 * 1024, 1024)
2963 .await;
2964 assert!(d1.is_ok());
2965
2966 let guard = ServerRegistry::global().inner.lock().expect("lock");
2968 assert!(guard.contains_key(&("127.0.0.1".to_string(), 9992)));
2969 drop(guard);
2970
2971 ServerRegistry::reset();
2973
2974 let guard = ServerRegistry::global().inner.lock().expect("lock");
2976 assert!(
2977 guard.is_empty(),
2978 "registry should be empty after reset, has {} entries",
2979 guard.len()
2980 );
2981 });
2982 }
2983
2984 #[tokio::test]
2989 async fn test_dispatch_handler_returns_404_for_unknown_path() {
2990 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
2991 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2993 let port = listener.local_addr().unwrap().port();
2994 tokio::spawn(run_axum_server(
2995 listener,
2996 dispatch,
2997 2 * 1024 * 1024,
2998 10 * 1024 * 1024,
2999 Arc::new(tokio::sync::Semaphore::new(1024)),
3000 ));
3001
3002 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3004
3005 let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
3006 .await
3007 .unwrap();
3008 assert_eq!(resp.status().as_u16(), 404);
3009 }
3010
3011 #[tokio::test]
3016 async fn test_http_consumer_start_registers_path() {
3017 use camel_component_api::ConsumerContext;
3018
3019 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3021 let port = listener.local_addr().unwrap().port();
3022 drop(listener); let consumer_cfg = HttpServerConfig {
3025 host: "127.0.0.1".to_string(),
3026 port,
3027 path: "/ping".to_string(),
3028 max_request_body: 2 * 1024 * 1024,
3029 max_response_body: 10 * 1024 * 1024,
3030 max_inflight_requests: 1024,
3031 };
3032 let mut consumer = HttpConsumer::new(consumer_cfg);
3033
3034 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3035 let token = tokio_util::sync::CancellationToken::new();
3036 let ctx = ConsumerContext::new(tx, token.clone());
3037
3038 tokio::spawn(async move {
3039 consumer.start(ctx).await.unwrap();
3040 });
3041
3042 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3043
3044 let client = reqwest::Client::new();
3045 let resp_future = client
3046 .post(format!("http://127.0.0.1:{port}/ping"))
3047 .body("hello world")
3048 .send();
3049
3050 let (http_result, _) = tokio::join!(resp_future, async {
3051 if let Some(mut envelope) = rx.recv().await {
3052 envelope.exchange.input.set_header(
3054 "CamelHttpResponseCode",
3055 serde_json::Value::Number(201.into()),
3056 );
3057 if let Some(reply_tx) = envelope.reply_tx {
3058 let _ = reply_tx.send(Ok(envelope.exchange));
3059 }
3060 }
3061 });
3062
3063 let resp = http_result.unwrap();
3064 assert_eq!(resp.status().as_u16(), 201);
3065
3066 token.cancel();
3067 }
3068
3069 #[tokio::test]
3070 async fn test_http_consumer_returns_503_when_inflight_limit_reached() {
3071 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3072
3073 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3074 let port = listener.local_addr().unwrap().port();
3075 drop(listener);
3076
3077 let consumer_cfg = HttpServerConfig {
3078 host: "127.0.0.1".to_string(),
3079 port,
3080 path: "/saturation".to_string(),
3081 max_request_body: 2 * 1024 * 1024,
3082 max_response_body: 10 * 1024 * 1024,
3083 max_inflight_requests: 1,
3084 };
3085 let mut consumer = HttpConsumer::new(consumer_cfg);
3086
3087 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3088 let token = tokio_util::sync::CancellationToken::new();
3089 let ctx = ConsumerContext::new(tx, token.clone());
3090 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3091 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3092
3093 let (first_seen_tx, first_seen_rx) = tokio::sync::oneshot::channel::<()>();
3094 let (unblock_first_tx, unblock_first_rx) = tokio::sync::oneshot::channel::<()>();
3095
3096 tokio::spawn(async move {
3097 let mut first_seen_tx = Some(first_seen_tx);
3098 let mut unblock_first_rx = Some(unblock_first_rx);
3099
3100 while let Some(envelope) = rx.recv().await {
3101 if let Some(tx) = first_seen_tx.take() {
3102 let _ = tx.send(());
3103 if let Some(rx_unblock) = unblock_first_rx.take() {
3104 let _ = rx_unblock.await;
3105 }
3106 }
3107
3108 if let Some(reply_tx) = envelope.reply_tx {
3109 let _ = reply_tx.send(Ok(envelope.exchange));
3110 }
3111 }
3112 });
3113
3114 let client = reqwest::Client::new();
3115 let first_req = {
3116 let client = client.clone();
3117 async move {
3118 client
3119 .get(format!("http://127.0.0.1:{port}/saturation"))
3120 .send()
3121 .await
3122 .unwrap()
3123 }
3124 };
3125
3126 let first_handle = tokio::spawn(first_req);
3127 first_seen_rx.await.unwrap();
3128
3129 let second_resp = client
3130 .get(format!("http://127.0.0.1:{port}/saturation"))
3131 .send()
3132 .await
3133 .unwrap();
3134
3135 assert_eq!(second_resp.status().as_u16(), 503);
3136
3137 let _ = unblock_first_tx.send(());
3138 let first_resp = first_handle.await.unwrap();
3139 assert_eq!(first_resp.status().as_u16(), 200);
3140
3141 token.cancel();
3142 }
3143
3144 #[tokio::test]
3145 async fn test_http_consumer_enforces_max_response_body_for_bytes() {
3146 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3147
3148 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3149 let port = listener.local_addr().unwrap().port();
3150 drop(listener);
3151
3152 let consumer_cfg = HttpServerConfig {
3153 host: "127.0.0.1".to_string(),
3154 port,
3155 path: "/limit-bytes".to_string(),
3156 max_request_body: 2 * 1024 * 1024,
3157 max_response_body: 16,
3158 max_inflight_requests: 1024,
3159 };
3160 let mut consumer = HttpConsumer::new(consumer_cfg);
3161
3162 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3163 let token = tokio_util::sync::CancellationToken::new();
3164 let ctx = ConsumerContext::new(tx, token.clone());
3165 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3166 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3167
3168 let client = reqwest::Client::new();
3169 let send_fut = client
3170 .get(format!("http://127.0.0.1:{port}/limit-bytes"))
3171 .send();
3172
3173 let (http_result, _) = tokio::join!(send_fut, async {
3174 if let Some(mut envelope) = rx.recv().await {
3175 envelope.exchange.input.body =
3176 camel_component_api::Body::Bytes(bytes::Bytes::from(vec![b'x'; 32]));
3177 if let Some(reply_tx) = envelope.reply_tx {
3178 let _ = reply_tx.send(Ok(envelope.exchange));
3179 }
3180 }
3181 });
3182
3183 let resp = http_result.unwrap();
3184 assert_eq!(resp.status().as_u16(), 500);
3185 let body = resp.text().await.unwrap();
3186 assert_eq!(body, "Response body exceeds configured limit");
3187 token.cancel();
3188 }
3189
3190 #[tokio::test]
3191 async fn test_http_consumer_enforces_max_response_body_for_json() {
3192 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3193
3194 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3195 let port = listener.local_addr().unwrap().port();
3196 drop(listener);
3197
3198 let consumer_cfg = HttpServerConfig {
3199 host: "127.0.0.1".to_string(),
3200 port,
3201 path: "/limit-json".to_string(),
3202 max_request_body: 2 * 1024 * 1024,
3203 max_response_body: 16,
3204 max_inflight_requests: 1024,
3205 };
3206 let mut consumer = HttpConsumer::new(consumer_cfg);
3207
3208 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3209 let token = tokio_util::sync::CancellationToken::new();
3210 let ctx = ConsumerContext::new(tx, token.clone());
3211 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3212 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3213
3214 let client = reqwest::Client::new();
3215 let send_fut = client
3216 .get(format!("http://127.0.0.1:{port}/limit-json"))
3217 .send();
3218
3219 let (http_result, _) = tokio::join!(send_fut, async {
3220 if let Some(mut envelope) = rx.recv().await {
3221 envelope.exchange.input.body = camel_component_api::Body::Json(
3222 serde_json::json!({"message":"this response is bigger than sixteen"}),
3223 );
3224 if let Some(reply_tx) = envelope.reply_tx {
3225 let _ = reply_tx.send(Ok(envelope.exchange));
3226 }
3227 }
3228 });
3229
3230 let resp = http_result.unwrap();
3231 assert_eq!(resp.status().as_u16(), 500);
3232 let body = resp.text().await.unwrap();
3233 assert_eq!(body, "Response body exceeds configured limit");
3234 token.cancel();
3235 }
3236
3237 #[tokio::test]
3238 async fn test_http_consumer_enforces_max_response_body_for_xml() {
3239 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3240
3241 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3242 let port = listener.local_addr().unwrap().port();
3243 drop(listener);
3244
3245 let consumer_cfg = HttpServerConfig {
3246 host: "127.0.0.1".to_string(),
3247 port,
3248 path: "/limit-xml".to_string(),
3249 max_request_body: 2 * 1024 * 1024,
3250 max_response_body: 16,
3251 max_inflight_requests: 1024,
3252 };
3253 let mut consumer = HttpConsumer::new(consumer_cfg);
3254
3255 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3256 let token = tokio_util::sync::CancellationToken::new();
3257 let ctx = ConsumerContext::new(tx, token.clone());
3258 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3259 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3260
3261 let client = reqwest::Client::new();
3262 let send_fut = client
3263 .get(format!("http://127.0.0.1:{port}/limit-xml"))
3264 .send();
3265
3266 let (http_result, _) = tokio::join!(send_fut, async {
3267 if let Some(mut envelope) = rx.recv().await {
3268 envelope.exchange.input.body = camel_component_api::Body::Xml(
3269 "<root><value>way-too-large</value></root>".into(),
3270 );
3271 if let Some(reply_tx) = envelope.reply_tx {
3272 let _ = reply_tx.send(Ok(envelope.exchange));
3273 }
3274 }
3275 });
3276
3277 let resp = http_result.unwrap();
3278 assert_eq!(resp.status().as_u16(), 500);
3279 let body = resp.text().await.unwrap();
3280 assert_eq!(body, "Response body exceeds configured limit");
3281 token.cancel();
3282 }
3283
3284 #[tokio::test]
3285 async fn test_http_consumer_does_not_enforce_max_response_body_for_stream() {
3286 use camel_component_api::{
3287 CamelError, ConsumerContext, ExchangeEnvelope, StreamBody, StreamMetadata,
3288 };
3289 use futures::stream;
3290
3291 let listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();
3292 let port = listener.local_addr().unwrap().port();
3293 drop(listener);
3294
3295 let consumer_cfg = HttpServerConfig {
3296 host: "0.0.0.0".to_string(),
3297 port,
3298 path: "/limit-stream".to_string(),
3299 max_request_body: 2 * 1024 * 1024,
3300 max_response_body: 16,
3301 max_inflight_requests: 1024,
3302 };
3303 let mut consumer = HttpConsumer::new(consumer_cfg);
3304
3305 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3306 let token = tokio_util::sync::CancellationToken::new();
3307 let ctx = ConsumerContext::new(tx, token.clone());
3308 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3309 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3310
3311 let client = reqwest::Client::new();
3312 let send_fut = client
3313 .get(format!("http://127.0.0.1:{port}/limit-stream"))
3314 .send();
3315
3316 let (http_result, _) = tokio::join!(send_fut, async {
3317 if let Some(mut envelope) = rx.recv().await {
3318 let chunks: Vec<Result<bytes::Bytes, CamelError>> =
3319 vec![Ok(bytes::Bytes::from(vec![b'x'; 32]))];
3320 let stream = Box::pin(stream::iter(chunks));
3321 envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
3322 stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
3323 metadata: StreamMetadata {
3324 size_hint: Some(32),
3325 content_type: Some("application/octet-stream".into()),
3326 origin: None,
3327 },
3328 });
3329 if let Some(reply_tx) = envelope.reply_tx {
3330 let _ = reply_tx.send(Ok(envelope.exchange));
3331 }
3332 }
3333 });
3334
3335 let resp = http_result.unwrap();
3336 assert_eq!(resp.status().as_u16(), 200);
3337 let body = resp.bytes().await.unwrap();
3338 assert_eq!(body.len(), 32);
3339 token.cancel();
3340 }
3341
3342 #[tokio::test]
3347 async fn test_integration_single_consumer_round_trip() {
3348 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3349
3350 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3352 let port = listener.local_addr().unwrap().port();
3353 drop(listener); let component = HttpComponent::new();
3356 let endpoint_ctx = NoOpComponentContext;
3357 let endpoint = component
3358 .create_endpoint(&format!("http://127.0.0.1:{port}/echo"), &endpoint_ctx)
3359 .unwrap();
3360 let mut consumer = endpoint.create_consumer().unwrap();
3361
3362 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3363 let token = tokio_util::sync::CancellationToken::new();
3364 let ctx = ConsumerContext::new(tx, token.clone());
3365
3366 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3367 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3368
3369 let client = reqwest::Client::new();
3370 let send_fut = client
3371 .post(format!("http://127.0.0.1:{port}/echo"))
3372 .header("Content-Type", "text/plain")
3373 .body("ping")
3374 .send();
3375
3376 let (http_result, _) = tokio::join!(send_fut, async {
3377 if let Some(mut envelope) = rx.recv().await {
3378 assert_eq!(
3379 envelope.exchange.input.header("CamelHttpMethod"),
3380 Some(&serde_json::Value::String("POST".into()))
3381 );
3382 assert_eq!(
3383 envelope.exchange.input.header("CamelHttpPath"),
3384 Some(&serde_json::Value::String("/echo".into()))
3385 );
3386 envelope.exchange.input.body = camel_component_api::Body::Text("pong".to_string());
3387 if let Some(reply_tx) = envelope.reply_tx {
3388 let _ = reply_tx.send(Ok(envelope.exchange));
3389 }
3390 }
3391 });
3392
3393 let resp = http_result.unwrap();
3394 assert_eq!(resp.status().as_u16(), 200);
3395 let body = resp.text().await.unwrap();
3396 assert_eq!(body, "pong");
3397
3398 token.cancel();
3399 }
3400
3401 #[tokio::test]
3402 async fn test_integration_two_consumers_shared_port() {
3403 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3404
3405 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3407 let port = listener.local_addr().unwrap().port();
3408 drop(listener);
3409
3410 let component = HttpComponent::new();
3411 let endpoint_ctx = NoOpComponentContext;
3412
3413 let endpoint_a = component
3415 .create_endpoint(&format!("http://127.0.0.1:{port}/hello"), &endpoint_ctx)
3416 .unwrap();
3417 let mut consumer_a = endpoint_a.create_consumer().unwrap();
3418
3419 let endpoint_b = component
3421 .create_endpoint(&format!("http://127.0.0.1:{port}/world"), &endpoint_ctx)
3422 .unwrap();
3423 let mut consumer_b = endpoint_b.create_consumer().unwrap();
3424
3425 let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3426 let token_a = tokio_util::sync::CancellationToken::new();
3427 let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
3428
3429 let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3430 let token_b = tokio_util::sync::CancellationToken::new();
3431 let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
3432
3433 tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
3434 tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
3435 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3436
3437 let client = reqwest::Client::new();
3438
3439 let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
3441 let (resp_hello, _) = tokio::join!(fut_hello, async {
3442 if let Some(mut envelope) = rx_a.recv().await {
3443 envelope.exchange.input.body =
3444 camel_component_api::Body::Text("hello-response".to_string());
3445 if let Some(reply_tx) = envelope.reply_tx {
3446 let _ = reply_tx.send(Ok(envelope.exchange));
3447 }
3448 }
3449 });
3450
3451 let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
3453 let (resp_world, _) = tokio::join!(fut_world, async {
3454 if let Some(mut envelope) = rx_b.recv().await {
3455 envelope.exchange.input.body =
3456 camel_component_api::Body::Text("world-response".to_string());
3457 if let Some(reply_tx) = envelope.reply_tx {
3458 let _ = reply_tx.send(Ok(envelope.exchange));
3459 }
3460 }
3461 });
3462
3463 let body_a = resp_hello.unwrap().text().await.unwrap();
3464 let body_b = resp_world.unwrap().text().await.unwrap();
3465
3466 assert_eq!(body_a, "hello-response");
3467 assert_eq!(body_b, "world-response");
3468
3469 token_a.cancel();
3470 token_b.cancel();
3471 }
3472
3473 #[tokio::test]
3474 async fn test_integration_unregistered_path_returns_404() {
3475 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3476
3477 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3479 let port = listener.local_addr().unwrap().port();
3480 drop(listener);
3481
3482 let component = HttpComponent::new();
3483 let endpoint_ctx = NoOpComponentContext;
3484 let endpoint = component
3485 .create_endpoint(
3486 &format!("http://127.0.0.1:{port}/registered"),
3487 &endpoint_ctx,
3488 )
3489 .unwrap();
3490 let mut consumer = endpoint.create_consumer().unwrap();
3491
3492 let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3493 let token = tokio_util::sync::CancellationToken::new();
3494 let ctx = ConsumerContext::new(tx, token.clone());
3495
3496 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3497
3498 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
3500 loop {
3501 if tokio::net::TcpStream::connect(format!("127.0.0.1:{port}"))
3502 .await
3503 .is_ok()
3504 {
3505 break;
3506 }
3507 if std::time::Instant::now() >= deadline {
3508 panic!("HTTP server did not start within 5s on port {port}");
3509 }
3510 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3511 }
3512
3513 let client = reqwest::Client::new();
3514 let resp = client
3515 .get(format!("http://127.0.0.1:{port}/not-there"))
3516 .send()
3517 .await
3518 .unwrap();
3519 assert_eq!(resp.status().as_u16(), 404);
3520
3521 token.cancel();
3522 }
3523
3524 #[test]
3525 fn test_http_consumer_declares_concurrent() {
3526 use camel_component_api::ConcurrencyModel;
3527
3528 let config = HttpServerConfig {
3529 host: "127.0.0.1".to_string(),
3530 port: 19999,
3531 path: "/test".to_string(),
3532 max_request_body: 2 * 1024 * 1024,
3533 max_response_body: 10 * 1024 * 1024,
3534 max_inflight_requests: 1024,
3535 };
3536 let consumer = HttpConsumer::new(config);
3537 assert_eq!(
3538 consumer.concurrency_model(),
3539 ConcurrencyModel::Concurrent { max: None }
3540 );
3541 }
3542
3543 #[tokio::test]
3548 async fn test_http_reply_body_stream_variant_exists() {
3549 use bytes::Bytes;
3550 use camel_component_api::CamelError;
3551 use futures::stream;
3552
3553 let chunks: Vec<Result<Bytes, CamelError>> =
3554 vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
3555 let stream = Box::pin(stream::iter(chunks));
3556 let reply_body = HttpReplyBody::Stream(stream);
3557 match reply_body {
3559 HttpReplyBody::Stream(_) => {}
3560 HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
3561 }
3562 }
3563
3564 #[cfg(feature = "otel")]
3569 mod otel_tests {
3570 use super::*;
3571 use camel_component_api::Message;
3572 use tower::ServiceExt;
3573
3574 #[tokio::test]
3575 async fn test_producer_injects_traceparent_header() {
3576 let (url, _handle) = start_test_server_with_header_capture().await;
3577 let ctx = test_producer_ctx();
3578
3579 let component = HttpComponent::new();
3580 let endpoint_ctx = NoOpComponentContext;
3581 let endpoint = component
3582 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3583 .unwrap();
3584 let producer = endpoint.create_producer(&ctx).unwrap();
3585
3586 let mut exchange = Exchange::new(Message::default());
3588 let mut headers = std::collections::HashMap::new();
3589 headers.insert(
3590 "traceparent".to_string(),
3591 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
3592 );
3593 camel_otel::extract_into_exchange(&mut exchange, &headers);
3594
3595 let result = producer.oneshot(exchange).await.unwrap();
3596
3597 let status = result
3599 .input
3600 .header("CamelHttpResponseCode")
3601 .and_then(|v| v.as_u64())
3602 .unwrap();
3603 assert_eq!(status, 200);
3604
3605 let traceparent = result.input.header("X-Received-Traceparent");
3607 assert!(
3608 traceparent.is_some(),
3609 "traceparent header should have been sent"
3610 );
3611
3612 let traceparent_str = traceparent.unwrap().as_str().unwrap();
3613 let parts: Vec<&str> = traceparent_str.split('-').collect();
3615 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3616 assert_eq!(parts[0], "00", "version should be 00");
3617 assert_eq!(
3618 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3619 "trace-id should match"
3620 );
3621 assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
3622 assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
3623 }
3624
3625 #[tokio::test]
3626 async fn test_consumer_extracts_traceparent_header() {
3627 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3628
3629 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3631 let port = listener.local_addr().unwrap().port();
3632 drop(listener);
3633
3634 let component = HttpComponent::new();
3635 let endpoint_ctx = NoOpComponentContext;
3636 let endpoint = component
3637 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
3638 .unwrap();
3639 let mut consumer = endpoint.create_consumer().unwrap();
3640
3641 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3642 let token = tokio_util::sync::CancellationToken::new();
3643 let ctx = ConsumerContext::new(tx, token.clone());
3644
3645 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3646 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3647
3648 let client = reqwest::Client::new();
3650 let send_fut = client
3651 .post(format!("http://127.0.0.1:{port}/trace"))
3652 .header(
3653 "traceparent",
3654 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3655 )
3656 .body("test")
3657 .send();
3658
3659 let (http_result, _) = tokio::join!(send_fut, async {
3660 if let Some(envelope) = rx.recv().await {
3661 let mut injected_headers = std::collections::HashMap::new();
3664 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3665
3666 assert!(
3667 injected_headers.contains_key("traceparent"),
3668 "Exchange should have traceparent after extraction"
3669 );
3670
3671 let traceparent = injected_headers.get("traceparent").unwrap();
3672 let parts: Vec<&str> = traceparent.split('-').collect();
3673 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3674 assert_eq!(
3675 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3676 "Trace ID should match the original traceparent header"
3677 );
3678
3679 if let Some(reply_tx) = envelope.reply_tx {
3680 let _ = reply_tx.send(Ok(envelope.exchange));
3681 }
3682 }
3683 });
3684
3685 let resp = http_result.unwrap();
3686 assert_eq!(resp.status().as_u16(), 200);
3687
3688 token.cancel();
3689 }
3690
3691 #[tokio::test]
3692 async fn test_consumer_extracts_mixed_case_traceparent_header() {
3693 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3694
3695 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3697 let port = listener.local_addr().unwrap().port();
3698 drop(listener);
3699
3700 let component = HttpComponent::new();
3701 let endpoint_ctx = NoOpComponentContext;
3702 let endpoint = component
3703 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
3704 .unwrap();
3705 let mut consumer = endpoint.create_consumer().unwrap();
3706
3707 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3708 let token = tokio_util::sync::CancellationToken::new();
3709 let ctx = ConsumerContext::new(tx, token.clone());
3710
3711 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3712 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3713
3714 let client = reqwest::Client::new();
3716 let send_fut = client
3717 .post(format!("http://127.0.0.1:{port}/trace"))
3718 .header(
3719 "TraceParent",
3720 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3721 )
3722 .body("test")
3723 .send();
3724
3725 let (http_result, _) = tokio::join!(send_fut, async {
3726 if let Some(envelope) = rx.recv().await {
3727 let mut injected_headers = HashMap::new();
3730 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3731
3732 assert!(
3733 injected_headers.contains_key("traceparent"),
3734 "Exchange should have traceparent after extraction from mixed-case header"
3735 );
3736
3737 let traceparent = injected_headers.get("traceparent").unwrap();
3738 let parts: Vec<&str> = traceparent.split('-').collect();
3739 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3740 assert_eq!(
3741 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3742 "Trace ID should match the original mixed-case TraceParent header"
3743 );
3744
3745 if let Some(reply_tx) = envelope.reply_tx {
3746 let _ = reply_tx.send(Ok(envelope.exchange));
3747 }
3748 }
3749 });
3750
3751 let resp = http_result.unwrap();
3752 assert_eq!(resp.status().as_u16(), 200);
3753
3754 token.cancel();
3755 }
3756
3757 #[tokio::test]
3758 async fn test_producer_no_trace_context_no_crash() {
3759 let (url, _handle) = start_test_server().await;
3760 let ctx = test_producer_ctx();
3761
3762 let component = HttpComponent::new();
3763 let endpoint_ctx = NoOpComponentContext;
3764 let endpoint = component
3765 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3766 .unwrap();
3767 let producer = endpoint.create_producer(&ctx).unwrap();
3768
3769 let exchange = Exchange::new(Message::default());
3771
3772 let result = producer.oneshot(exchange).await.unwrap();
3774
3775 let status = result
3777 .input
3778 .header("CamelHttpResponseCode")
3779 .and_then(|v| v.as_u64())
3780 .unwrap();
3781 assert_eq!(status, 200);
3782 }
3783
3784 async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
3786 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3787 let addr = listener.local_addr().unwrap();
3788 let url = format!("http://127.0.0.1:{}", addr.port());
3789
3790 let handle = tokio::spawn(async move {
3791 loop {
3792 if let Ok((mut stream, _)) = listener.accept().await {
3793 tokio::spawn(async move {
3794 use tokio::io::{AsyncReadExt, AsyncWriteExt};
3795 let mut buf = vec![0u8; 8192];
3796 let n = stream.read(&mut buf).await.unwrap_or(0);
3797 let request = String::from_utf8_lossy(&buf[..n]).to_string();
3798
3799 let traceparent = request
3801 .lines()
3802 .find(|line| line.to_lowercase().starts_with("traceparent:"))
3803 .map(|line| {
3804 line.split(':')
3805 .nth(1)
3806 .map(|s| s.trim().to_string())
3807 .unwrap_or_default()
3808 })
3809 .unwrap_or_default();
3810
3811 let body =
3812 format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
3813 let response = format!(
3814 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
3815 body.len(),
3816 traceparent,
3817 body
3818 );
3819 let _ = stream.write_all(response.as_bytes()).await;
3820 });
3821 }
3822 }
3823 });
3824
3825 (url, handle)
3826 }
3827 }
3828
3829 #[tokio::test]
3838 async fn test_request_body_arrives_as_stream() {
3839 use camel_component_api::Body;
3840 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3841
3842 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3843 let port = listener.local_addr().unwrap().port();
3844 drop(listener);
3845
3846 let component = HttpComponent::new();
3847 let endpoint_ctx = NoOpComponentContext;
3848 let endpoint = component
3849 .create_endpoint(&format!("http://127.0.0.1:{port}/upload"), &endpoint_ctx)
3850 .unwrap();
3851 let mut consumer = endpoint.create_consumer().unwrap();
3852
3853 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3854 let token = tokio_util::sync::CancellationToken::new();
3855 let ctx = ConsumerContext::new(tx, token.clone());
3856
3857 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3858 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3859
3860 let client = reqwest::Client::new();
3861 let send_fut = client
3862 .post(format!("http://127.0.0.1:{port}/upload"))
3863 .body("hello streaming world")
3864 .send();
3865
3866 let (http_result, _) = tokio::join!(send_fut, async {
3867 if let Some(mut envelope) = rx.recv().await {
3868 assert!(
3870 matches!(envelope.exchange.input.body, Body::Stream(_)),
3871 "expected Body::Stream, got discriminant {:?}",
3872 std::mem::discriminant(&envelope.exchange.input.body)
3873 );
3874 let bytes = envelope
3876 .exchange
3877 .input
3878 .body
3879 .into_bytes(1024 * 1024)
3880 .await
3881 .unwrap();
3882 assert_eq!(&bytes[..], b"hello streaming world");
3883
3884 envelope.exchange.input.body = camel_component_api::Body::Empty;
3885 if let Some(reply_tx) = envelope.reply_tx {
3886 let _ = reply_tx.send(Ok(envelope.exchange));
3887 }
3888 }
3889 });
3890
3891 let resp = http_result.unwrap();
3892 assert_eq!(resp.status().as_u16(), 200);
3893
3894 token.cancel();
3895 }
3896
3897 #[tokio::test]
3902 async fn test_streaming_response_chunked() {
3903 use bytes::Bytes;
3904 use camel_component_api::Body;
3905 use camel_component_api::CamelError;
3906 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3907 use camel_component_api::{StreamBody, StreamMetadata};
3908 use futures::stream;
3909 use std::sync::Arc;
3910 use tokio::sync::Mutex;
3911
3912 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3913 let port = listener.local_addr().unwrap().port();
3914 drop(listener);
3915
3916 let component = HttpComponent::new();
3917 let endpoint_ctx = NoOpComponentContext;
3918 let endpoint = component
3919 .create_endpoint(&format!("http://127.0.0.1:{port}/stream"), &endpoint_ctx)
3920 .unwrap();
3921 let mut consumer = endpoint.create_consumer().unwrap();
3922
3923 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3924 let token = tokio_util::sync::CancellationToken::new();
3925 let ctx = ConsumerContext::new(tx, token.clone());
3926
3927 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3928 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3929
3930 let client = reqwest::Client::new();
3931 let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
3932
3933 let (http_result, _) = tokio::join!(send_fut, async {
3934 if let Some(mut envelope) = rx.recv().await {
3935 let chunks: Vec<Result<Bytes, CamelError>> =
3937 vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
3938 let stream = Box::pin(stream::iter(chunks));
3939 envelope.exchange.input.body = Body::Stream(StreamBody {
3940 stream: Arc::new(Mutex::new(Some(stream))),
3941 metadata: StreamMetadata::default(),
3942 });
3943 if let Some(reply_tx) = envelope.reply_tx {
3944 let _ = reply_tx.send(Ok(envelope.exchange));
3945 }
3946 }
3947 });
3948
3949 let resp = http_result.unwrap();
3950 assert_eq!(resp.status().as_u16(), 200);
3951 let body = resp.text().await.unwrap();
3952 assert_eq!(body, "chunk1chunk2");
3953
3954 token.cancel();
3955 }
3956
3957 #[tokio::test]
3962 async fn test_413_when_content_length_exceeds_limit() {
3963 use camel_component_api::ConsumerContext;
3964
3965 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3966 let port = listener.local_addr().unwrap().port();
3967 drop(listener);
3968
3969 let component = HttpComponent::new();
3971 let endpoint_ctx = NoOpComponentContext;
3972 let endpoint = component
3973 .create_endpoint(
3974 &format!("http://127.0.0.1:{port}/upload?maxRequestBody=100"),
3975 &endpoint_ctx,
3976 )
3977 .unwrap();
3978 let mut consumer = endpoint.create_consumer().unwrap();
3979
3980 let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3981 let token = tokio_util::sync::CancellationToken::new();
3982 let ctx = ConsumerContext::new(tx, token.clone());
3983
3984 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3985 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3986
3987 let client = reqwest::Client::new();
3988 let resp = client
3989 .post(format!("http://127.0.0.1:{port}/upload"))
3990 .header("Content-Length", "1000") .body("x".repeat(1000))
3992 .send()
3993 .await
3994 .unwrap();
3995
3996 assert_eq!(resp.status().as_u16(), 413);
3997
3998 token.cancel();
3999 }
4000
4001 #[tokio::test]
4005 async fn test_chunked_upload_without_content_length_bypasses_limit() {
4006 use bytes::Bytes;
4007 use camel_component_api::Body;
4008 use camel_component_api::ConsumerContext;
4009 use futures::stream;
4010
4011 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4012 let port = listener.local_addr().unwrap().port();
4013 drop(listener);
4014
4015 let component = HttpComponent::new();
4017 let endpoint_ctx = NoOpComponentContext;
4018 let endpoint = component
4019 .create_endpoint(
4020 &format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"),
4021 &endpoint_ctx,
4022 )
4023 .unwrap();
4024 let mut consumer = endpoint.create_consumer().unwrap();
4025
4026 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4027 let token = tokio_util::sync::CancellationToken::new();
4028 let ctx = ConsumerContext::new(tx, token.clone());
4029
4030 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4031 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4032
4033 let client = reqwest::Client::new();
4034
4035 let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
4039 Ok(Bytes::from("y".repeat(50))),
4040 Ok(Bytes::from("y".repeat(50))),
4041 ];
4042 let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
4043 let send_fut = client
4044 .post(format!("http://127.0.0.1:{port}/upload"))
4045 .body(stream_body)
4046 .send();
4047
4048 let consumer_fut = async {
4049 match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
4051 Ok(Some(mut envelope)) => {
4052 assert!(
4053 matches!(envelope.exchange.input.body, Body::Stream(_)),
4054 "expected Body::Stream"
4055 );
4056 envelope.exchange.input.body = camel_component_api::Body::Empty;
4057 if let Some(reply_tx) = envelope.reply_tx {
4058 let _ = reply_tx.send(Ok(envelope.exchange));
4059 }
4060 }
4061 Ok(None) => panic!("consumer channel closed unexpectedly"),
4062 Err(_) => {
4063 }
4066 }
4067 };
4068
4069 let (http_result, _) = tokio::join!(send_fut, consumer_fut);
4070
4071 let resp = http_result.unwrap();
4072 assert_ne!(
4074 resp.status().as_u16(),
4075 413,
4076 "chunked upload must not be rejected by maxRequestBody"
4077 );
4078 assert_eq!(resp.status().as_u16(), 200);
4079
4080 token.cancel();
4081 }
4082
4083 #[test]
4084 fn test_validate_url_for_ssrf_blocks_and_allows_hosts() {
4085 let mut cfg = HttpEndpointConfig::from_uri("http://example.com").unwrap();
4086 cfg.blocked_hosts = vec!["blocked.local".to_string()];
4087 cfg.allow_private_ips = false;
4088
4089 let blocked = validate_url_for_ssrf("http://blocked.local/api", &cfg);
4090 assert!(blocked.is_err());
4091
4092 let private_ip = validate_url_for_ssrf("http://127.0.0.1/api", &cfg);
4093 assert!(private_ip.is_err());
4094
4095 cfg.allow_private_ips = true;
4096 let allowed = validate_url_for_ssrf("http://127.0.0.1/api", &cfg);
4097 assert!(allowed.is_ok());
4098 }
4099
4100 #[test]
4101 fn test_is_private_ip_ranges() {
4102 assert!(is_private_ip(&"10.0.0.1".parse().unwrap())); assert!(is_private_ip(&"172.16.1.10".parse().unwrap())); assert!(is_private_ip(&"192.168.1.1".parse().unwrap())); assert!(is_private_ip(&"127.0.0.1".parse().unwrap())); assert!(is_private_ip(&"169.254.1.1".parse().unwrap())); assert!(is_private_ip(&"0.1.2.3".parse().unwrap())); assert!(is_private_ip(&"::1".parse().unwrap())); assert!(is_private_ip(&"fc00::1".parse().unwrap())); assert!(is_private_ip(&"fd12::1".parse().unwrap())); assert!(is_private_ip(&"fe80::1".parse().unwrap())); assert!(is_private_ip(&"::ffff:10.0.0.1".parse().unwrap())); assert!(is_private_ip(&"::ffff:192.168.1.1".parse().unwrap())); assert!(is_private_ip(&"::ffff:127.0.0.1".parse().unwrap())); assert!(!is_private_ip(&"8.8.8.8".parse().unwrap())); assert!(!is_private_ip(&"::ffff:8.8.8.8".parse().unwrap())); assert!(!is_private_ip(&"2001:4860:4860::8888".parse().unwrap())); }
4122
4123 #[tokio::test]
4124 async fn test_validate_resolved_host_for_ssrf_blocks_resolved_private_ip() {
4125 let mut cfg = HttpEndpointConfig::from_uri("http://example.com").unwrap(); cfg.allow_private_ips = false;
4127
4128 let err = validate_resolved_host_for_ssrf("http://localhost", &cfg)
4129 .await
4130 .expect_err("localhost must resolve to loopback and be blocked");
4131
4132 let msg = err.to_string();
4133 assert!(msg.contains("Target resolved to private IP"));
4134 }
4135
4136 #[test]
4137 fn test_title_case_header() {
4138 assert_eq!(title_case_header("content-type"), "Content-Type");
4139 assert_eq!(title_case_header("authorization"), "Authorization");
4140 assert_eq!(title_case_header("x-custom-header"), "X-Custom-Header");
4141 assert_eq!(title_case_header("host"), "Host");
4142 assert_eq!(title_case_header("x-b3-traceid"), "X-B3-Traceid");
4143 assert_eq!(title_case_header("single"), "Single");
4144 assert_eq!(title_case_header(""), "");
4145 }
4146
4147 #[test]
4148 fn test_resolve_url_combines_path_and_query_sources() {
4149 let cfg = HttpEndpointConfig::from_uri("http://example.com/base?foo=bar").unwrap();
4150 let mut exchange = Exchange::new(Message::default());
4151 exchange.input.set_header(
4152 "CamelHttpPath",
4153 serde_json::Value::String("next".to_string()),
4154 );
4155 let url = HttpProducer::resolve_url(&exchange, &cfg);
4156 assert!(url.starts_with("http://example.com/base/next?"));
4157 assert!(url.contains("foo=bar"));
4158
4159 exchange.input.set_header(
4160 "CamelHttpUri",
4161 serde_json::Value::String("http://other.test/root".to_string()),
4162 );
4163 exchange.input.set_header(
4164 "CamelHttpQuery",
4165 serde_json::Value::String("a=1&b=2".to_string()),
4166 );
4167
4168 let override_url = HttpProducer::resolve_url(&exchange, &cfg);
4169 assert_eq!(override_url, "http://other.test/root/next?a=1&b=2");
4170 }
4171
4172 #[test]
4173 fn test_http_producer_helpers_status_and_size_boundaries() {
4174 assert!(HttpProducer::is_ok_status(200, (200, 299)));
4175 assert!(HttpProducer::is_ok_status(299, (200, 299)));
4176 assert!(!HttpProducer::is_ok_status(199, (200, 299)));
4177 assert!(!HttpProducer::is_ok_status(300, (200, 299)));
4178
4179 assert!(!exceeds_max_response_body(10, 10));
4180 assert!(exceeds_max_response_body(11, 10));
4181 }
4182
4183 async fn setup_consumer_on_free_port(
4188 path: &str,
4189 ) -> (
4190 u16,
4191 tokio::sync::mpsc::Receiver<camel_component_api::ExchangeEnvelope>,
4192 tokio_util::sync::CancellationToken,
4193 ) {
4194 use camel_component_api::ConsumerContext;
4195
4196 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4197 let port = listener.local_addr().unwrap().port();
4198 drop(listener);
4199
4200 let consumer_cfg = HttpServerConfig {
4201 host: "127.0.0.1".to_string(),
4202 port,
4203 path: path.to_string(),
4204 max_request_body: 2 * 1024 * 1024,
4205 max_response_body: 10 * 1024 * 1024,
4206 max_inflight_requests: 1024,
4207 };
4208 let mut consumer = HttpConsumer::new(consumer_cfg);
4209
4210 let (tx, rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4211 let token = tokio_util::sync::CancellationToken::new();
4212 let ctx = ConsumerContext::new(tx, token.clone());
4213
4214 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4215 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4216
4217 (port, rx, token)
4218 }
4219
4220 #[tokio::test]
4221 async fn test_content_type_inferred_for_json_body() {
4222 let (port, mut rx, token) = setup_consumer_on_free_port("/json").await;
4223
4224 let client = reqwest::Client::new();
4225 let send_fut = client.get(format!("http://127.0.0.1:{port}/json")).send();
4226
4227 let (http_result, _) = tokio::join!(send_fut, async {
4228 if let Some(mut envelope) = rx.recv().await {
4229 envelope.exchange.input.body =
4230 camel_component_api::Body::Json(serde_json::json!({"message": "hello"}));
4231 if let Some(reply_tx) = envelope.reply_tx {
4232 let _ = reply_tx.send(Ok(envelope.exchange));
4233 }
4234 }
4235 });
4236
4237 let resp = http_result.unwrap();
4238 assert_eq!(resp.status().as_u16(), 200);
4239 let ct = resp
4240 .headers()
4241 .get("content-type")
4242 .expect("Content-Type header should be present");
4243 assert_eq!(ct, "application/json");
4244 let body = resp.text().await.unwrap();
4245 assert_eq!(body, r#"{"message":"hello"}"#);
4246
4247 token.cancel();
4248 }
4249
4250 #[tokio::test]
4251 async fn test_content_type_inferred_for_text_body() {
4252 let (port, mut rx, token) = setup_consumer_on_free_port("/text").await;
4253
4254 let client = reqwest::Client::new();
4255 let send_fut = client.get(format!("http://127.0.0.1:{port}/text")).send();
4256
4257 let (http_result, _) = tokio::join!(send_fut, async {
4258 if let Some(mut envelope) = rx.recv().await {
4259 envelope.exchange.input.body =
4260 camel_component_api::Body::Text("plain text response".to_string());
4261 if let Some(reply_tx) = envelope.reply_tx {
4262 let _ = reply_tx.send(Ok(envelope.exchange));
4263 }
4264 }
4265 });
4266
4267 let resp = http_result.unwrap();
4268 assert_eq!(resp.status().as_u16(), 200);
4269 let ct = resp
4270 .headers()
4271 .get("content-type")
4272 .expect("Content-Type header should be present");
4273 assert_eq!(ct, "text/plain; charset=utf-8");
4274 let body = resp.text().await.unwrap();
4275 assert_eq!(body, "plain text response");
4276
4277 token.cancel();
4278 }
4279
4280 #[tokio::test]
4281 async fn test_content_type_inferred_for_xml_body() {
4282 let (port, mut rx, token) = setup_consumer_on_free_port("/xml").await;
4283
4284 let client = reqwest::Client::new();
4285 let send_fut = client.get(format!("http://127.0.0.1:{port}/xml")).send();
4286
4287 let (http_result, _) = tokio::join!(send_fut, async {
4288 if let Some(mut envelope) = rx.recv().await {
4289 envelope.exchange.input.body =
4290 camel_component_api::Body::Xml("<root><item>value</item></root>".to_string());
4291 if let Some(reply_tx) = envelope.reply_tx {
4292 let _ = reply_tx.send(Ok(envelope.exchange));
4293 }
4294 }
4295 });
4296
4297 let resp = http_result.unwrap();
4298 assert_eq!(resp.status().as_u16(), 200);
4299 let ct = resp
4300 .headers()
4301 .get("content-type")
4302 .expect("Content-Type header should be present");
4303 assert_eq!(ct, "application/xml");
4304 let body = resp.text().await.unwrap();
4305 assert_eq!(body, "<root><item>value</item></root>");
4306
4307 token.cancel();
4308 }
4309
4310 #[tokio::test]
4311 async fn test_no_content_type_for_empty_body() {
4312 let (port, mut rx, token) = setup_consumer_on_free_port("/empty").await;
4313
4314 let client = reqwest::Client::new();
4315 let send_fut = client.get(format!("http://127.0.0.1:{port}/empty")).send();
4316
4317 let (http_result, _) = tokio::join!(send_fut, async {
4318 if let Some(mut envelope) = rx.recv().await {
4319 envelope.exchange.input.body = camel_component_api::Body::Empty;
4320 if let Some(reply_tx) = envelope.reply_tx {
4321 let _ = reply_tx.send(Ok(envelope.exchange));
4322 }
4323 }
4324 });
4325
4326 let resp = http_result.unwrap();
4327 assert_eq!(resp.status().as_u16(), 200);
4328 assert!(
4329 resp.headers().get("content-type").is_none(),
4330 "Empty body should not set Content-Type"
4331 );
4332
4333 token.cancel();
4334 }
4335
4336 #[tokio::test]
4337 async fn test_no_content_type_for_raw_bytes_body() {
4338 let (port, mut rx, token) = setup_consumer_on_free_port("/bytes").await;
4339
4340 let client = reqwest::Client::new();
4341 let send_fut = client.get(format!("http://127.0.0.1:{port}/bytes")).send();
4342
4343 let (http_result, _) = tokio::join!(send_fut, async {
4344 if let Some(mut envelope) = rx.recv().await {
4345 envelope.exchange.input.body =
4346 camel_component_api::Body::Bytes(bytes::Bytes::from_static(b"\x00\x01\x02"));
4347 if let Some(reply_tx) = envelope.reply_tx {
4348 let _ = reply_tx.send(Ok(envelope.exchange));
4349 }
4350 }
4351 });
4352
4353 let resp = http_result.unwrap();
4354 assert_eq!(resp.status().as_u16(), 200);
4355 assert!(
4356 resp.headers().get("content-type").is_none(),
4357 "Raw Bytes body should not set Content-Type"
4358 );
4359
4360 token.cancel();
4361 }
4362
4363 #[tokio::test]
4364 async fn test_content_type_from_stream_metadata() {
4365 use camel_component_api::{StreamBody, StreamMetadata};
4366 use futures::stream;
4367
4368 let (port, mut rx, token) = setup_consumer_on_free_port("/stream-ct").await;
4369
4370 let client = reqwest::Client::new();
4371 let send_fut = client
4372 .get(format!("http://127.0.0.1:{port}/stream-ct"))
4373 .send();
4374
4375 let (http_result, _) = tokio::join!(send_fut, async {
4376 if let Some(mut envelope) = rx.recv().await {
4377 let chunks: Vec<Result<bytes::Bytes, CamelError>> =
4378 vec![Ok(bytes::Bytes::from("audio data"))];
4379 let stream = Box::pin(stream::iter(chunks));
4380 envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
4381 stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
4382 metadata: StreamMetadata {
4383 size_hint: None,
4384 content_type: Some("audio/mpeg".to_string()),
4385 origin: None,
4386 },
4387 });
4388 if let Some(reply_tx) = envelope.reply_tx {
4389 let _ = reply_tx.send(Ok(envelope.exchange));
4390 }
4391 }
4392 });
4393
4394 let resp = http_result.unwrap();
4395 assert_eq!(resp.status().as_u16(), 200);
4396 let ct = resp
4397 .headers()
4398 .get("content-type")
4399 .expect("Content-Type header should be present");
4400 assert_eq!(ct, "audio/mpeg");
4401 let body = resp.text().await.unwrap();
4402 assert_eq!(body, "audio data");
4403
4404 token.cancel();
4405 }
4406
4407 #[tokio::test]
4408 async fn test_user_content_type_overrides_inferred() {
4409 let (port, mut rx, token) = setup_consumer_on_free_port("/override-ct").await;
4410
4411 let client = reqwest::Client::new();
4412 let send_fut = client
4413 .get(format!("http://127.0.0.1:{port}/override-ct"))
4414 .send();
4415
4416 let (http_result, _) = tokio::join!(send_fut, async {
4417 if let Some(mut envelope) = rx.recv().await {
4418 envelope.exchange.input.body =
4419 camel_component_api::Body::Json(serde_json::json!({"ok": true}));
4420 envelope.exchange.input.set_header(
4421 "Content-Type",
4422 serde_json::Value::String("text/html".to_string()),
4423 );
4424 if let Some(reply_tx) = envelope.reply_tx {
4425 let _ = reply_tx.send(Ok(envelope.exchange));
4426 }
4427 }
4428 });
4429
4430 let resp = http_result.unwrap();
4431 assert_eq!(resp.status().as_u16(), 200);
4432 let ct = resp
4433 .headers()
4434 .get("content-type")
4435 .expect("Content-Type header should be present");
4436 assert_eq!(
4437 ct, "text/html",
4438 "User-set Content-Type should take precedence over inferred type"
4439 );
4440
4441 token.cancel();
4442 }
4443
4444 #[tokio::test]
4445 async fn test_user_content_type_with_bytes_body() {
4446 let (port, mut rx, token) = setup_consumer_on_free_port("/bytes-ct").await;
4447
4448 let client = reqwest::Client::new();
4449 let send_fut = client
4450 .get(format!("http://127.0.0.1:{port}/bytes-ct"))
4451 .send();
4452
4453 let (http_result, _) = tokio::join!(send_fut, async {
4454 if let Some(mut envelope) = rx.recv().await {
4455 envelope.exchange.input.body =
4456 camel_component_api::Body::Bytes(bytes::Bytes::from_static(b"{\"ok\":true}"));
4457 envelope.exchange.input.set_header(
4458 "Content-Type",
4459 serde_json::Value::String("application/json".to_string()),
4460 );
4461 if let Some(reply_tx) = envelope.reply_tx {
4462 let _ = reply_tx.send(Ok(envelope.exchange));
4463 }
4464 }
4465 });
4466
4467 let resp = http_result.unwrap();
4468 assert_eq!(resp.status().as_u16(), 200);
4469 let ct = resp
4470 .headers()
4471 .get("content-type")
4472 .expect("Content-Type header should be present for Bytes body with user header");
4473 assert_eq!(
4474 ct, "application/json",
4475 "User Content-Type should be sent for Bytes body"
4476 );
4477
4478 token.cancel();
4479 }
4480
4481 #[tokio::test]
4486 async fn monitor_task_silent_on_clean_exit() {
4487 let handle: tokio::task::JoinHandle<()> = tokio::spawn(async {});
4488 monitor_axum_task(handle, "127.0.0.1:0".to_string()).await;
4490 }
4491
4492 #[tokio::test]
4493 async fn monitor_task_handles_panicked_task() {
4494 let handle: tokio::task::JoinHandle<()> = tokio::spawn(async {
4495 panic!("simulated server crash");
4496 });
4497 monitor_axum_task(handle, "127.0.0.1:9999".to_string()).await;
4499 }
4500
4501 #[test]
4506 fn http_auth_basic_debug_redacts_password() {
4507 let auth = HttpAuth::Basic {
4508 username: "admin".to_string(),
4509 password: "hunter2".to_string(),
4510 };
4511 let debug = format!("{:?}", auth);
4512 assert!(
4513 !debug.contains("hunter2"),
4514 "password must be redacted: {debug}"
4515 );
4516 assert!(debug.contains("admin"), "username should appear: {debug}");
4517 }
4518
4519 #[test]
4520 fn http_auth_bearer_debug_redacts_token() {
4521 let auth = HttpAuth::Bearer {
4522 token: "eyJhbGciOiJIUzI1NiJ9.secret".to_string(),
4523 };
4524 let debug = format!("{:?}", auth);
4525 assert!(
4526 !debug.contains("eyJhbGci"),
4527 "token must be redacted: {debug}"
4528 );
4529 }
4530
4531 #[test]
4532 fn http_auth_none_debug_shows_variant() {
4533 let debug = format!("{:?}", HttpAuth::None);
4534 assert!(
4535 debug.contains("None"),
4536 "None variant should appear: {debug}"
4537 );
4538 }
4539
4540 #[test]
4541 fn http_endpoint_config_debug_redacts_auth_credentials() {
4542 let config = HttpEndpointConfig::from_uri(
4543 "http://localhost/api?authMethod=Basic&authUsername=admin&authPassword=secret123",
4544 )
4545 .unwrap();
4546 let debug = format!("{:?}", config);
4547 assert!(
4548 !debug.contains("secret123"),
4549 "password must be redacted in HttpEndpointConfig debug: {debug}"
4550 );
4551 }
4552}