1pub mod bundle;
2pub mod config;
3use crate::config::parse_ok_status_code_range;
4pub use bundle::HttpBundle;
5pub use config::HttpConfig;
6
7use std::collections::HashMap;
8use std::future::Future;
9use std::net::IpAddr;
10use std::pin::Pin;
11use std::sync::{Arc, Mutex, OnceLock};
12use std::task::{Context, Poll};
13use std::time::Duration;
14
15use tokio::sync::{OnceCell, RwLock};
16use tower::Service;
17use tracing::debug;
18
19use axum::body::BodyDataStream;
20use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, StreamBody, StreamMetadata};
21use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
22use camel_component_api::{UriComponents, UriConfig, parse_uri};
23use futures::TryStreamExt;
24use futures::stream::BoxStream;
25
26#[derive(Debug, Clone)]
87pub struct HttpEndpointConfig {
88 pub base_url: String,
89 pub http_method: Option<String>,
90 pub throw_exception_on_failure: bool,
91 pub ok_status_code_range: (u16, u16),
92 pub response_timeout: Option<Duration>,
93 pub query_params: HashMap<String, String>,
94 pub allow_private_ips: bool,
95 pub blocked_hosts: Vec<String>,
96 pub max_body_size: usize,
97 pub read_timeout_ms: u64,
98 pub max_response_bytes: usize,
99 pub auth: HttpAuth,
100 pub user_agent: Option<String>,
101 pub cookie_handling: CookieHandling,
102 pub bridge_endpoint: bool,
103 pub connection_close: bool,
104 pub skip_request_headers: Vec<String>,
105 pub skip_response_headers: Vec<String>,
106}
107
108#[derive(Debug, Clone, PartialEq)]
109pub enum HttpAuth {
110 None,
111 Basic { username: String, password: String },
112 Bearer { token: String },
113}
114
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub enum CookieHandling {
117 Disabled,
118 InMemory,
119}
120
121const HTTP_CAMEL_OPTIONS: &[&str] = &[
123 "httpMethod",
124 "throwExceptionOnFailure",
125 "okStatusCodeRange",
126 "followRedirects",
127 "connectTimeout",
128 "responseTimeout",
129 "allowPrivateIps",
130 "blockedHosts",
131 "maxBodySize",
132 "readTimeout",
133 "maxResponseBytes",
134 "authMethod",
135 "authUsername",
136 "authPassword",
137 "authBearerToken",
138 "userAgent",
139 "cookieHandling",
140 "bridgeEndpoint",
141 "connectionClose",
142 "skipRequestHeaders",
143 "skipResponseHeaders",
144];
145
146impl UriConfig for HttpEndpointConfig {
147 fn scheme() -> &'static str {
149 "http"
150 }
151
152 fn from_uri(uri: &str) -> Result<Self, CamelError> {
153 let parts = parse_uri(uri)?;
154 Self::from_components(parts)
155 }
156
157 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
158 if parts.scheme != "http" && parts.scheme != "https" {
160 return Err(CamelError::InvalidUri(format!(
161 "expected scheme 'http' or 'https', got '{}'",
162 parts.scheme
163 )));
164 }
165
166 let base_url = format!("{}:{}", parts.scheme, parts.path);
169
170 let http_method = parts.params.get("httpMethod").cloned();
171
172 let throw_exception_on_failure = match parts.params.get("throwExceptionOnFailure") {
173 Some(v) => parse_bool_param_http(v).map_err(|e| {
174 CamelError::InvalidUri(format!("invalid value for throwExceptionOnFailure: {e}"))
175 })?,
176 None => true,
177 };
178
179 let ok_status_code_range = match parts.params.get("okStatusCodeRange") {
181 Some(v) => parse_ok_status_code_range(v)?,
182 None => (200, 299),
183 };
184
185 let response_timeout = match parts.params.get("responseTimeout") {
186 Some(v) => Some(v.parse::<u64>().map(Duration::from_millis).map_err(|e| {
187 CamelError::InvalidUri(format!("invalid value for responseTimeout: {e}"))
188 })?),
189 None => None,
190 };
191
192 let allow_private_ips = match parts.params.get("allowPrivateIps") {
194 Some(v) => parse_bool_param_http(v).map_err(|e| {
195 CamelError::InvalidUri(format!("invalid value for allowPrivateIps: {e}"))
196 })?,
197 None => false, };
199
200 let blocked_hosts = parts
202 .params
203 .get("blockedHosts")
204 .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
205 .unwrap_or_default();
206
207 let max_body_size = match parts.params.get("maxBodySize") {
208 Some(v) => v.parse::<usize>().map_err(|e| {
209 CamelError::InvalidUri(format!("invalid value for maxBodySize: {e}"))
210 })?,
211 None => 10 * 1024 * 1024, };
213
214 let read_timeout_ms = match parts.params.get("readTimeout") {
215 Some(v) => v.parse::<u64>().map_err(|e| {
216 CamelError::InvalidUri(format!("invalid value for readTimeout: {e}"))
217 })?,
218 None => 30_000, };
220
221 let max_response_bytes = match parts.params.get("maxResponseBytes") {
222 Some(v) => v.parse::<usize>().map_err(|e| {
223 CamelError::InvalidUri(format!("invalid value for maxResponseBytes: {e}"))
224 })?,
225 None => 10 * 1024 * 1024, };
227
228 let auth = parse_auth_from_params(&parts.params)?;
229
230 let user_agent = parts.params.get("userAgent").cloned();
231
232 let cookie_handling = match parts.params.get("cookieHandling") {
233 Some(v) if v.eq_ignore_ascii_case("inmemory") => CookieHandling::InMemory,
234 Some(v) if v.eq_ignore_ascii_case("disabled") => CookieHandling::Disabled,
235 Some(v) => {
236 return Err(CamelError::InvalidUri(format!(
237 "invalid value for cookieHandling: {v} (expected Disabled or InMemory)"
238 )));
239 }
240 None => CookieHandling::Disabled,
241 };
242
243 let bridge_endpoint = match parts.params.get("bridgeEndpoint") {
244 Some(v) => parse_bool_param_http(v).map_err(|e| {
245 CamelError::InvalidUri(format!("invalid value for bridgeEndpoint: {e}"))
246 })?,
247 None => false,
248 };
249
250 let connection_close = match parts.params.get("connectionClose") {
251 Some(v) => parse_bool_param_http(v).map_err(|e| {
252 CamelError::InvalidUri(format!("invalid value for connectionClose: {e}"))
253 })?,
254 None => false,
255 };
256
257 let skip_request_headers = parts
258 .params
259 .get("skipRequestHeaders")
260 .map(|v| {
261 v.split(',')
262 .map(str::trim)
263 .filter(|s| !s.is_empty())
264 .map(|s| s.to_ascii_lowercase())
265 .collect::<Vec<_>>()
266 })
267 .unwrap_or_default();
268
269 let skip_response_headers = parts
270 .params
271 .get("skipResponseHeaders")
272 .map(|v| {
273 v.split(',')
274 .map(str::trim)
275 .filter(|s| !s.is_empty())
276 .map(|s| s.to_ascii_lowercase())
277 .collect::<Vec<_>>()
278 })
279 .unwrap_or_default();
280
281 let query_params: HashMap<String, String> = parts
283 .params
284 .into_iter()
285 .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
286 .collect();
287
288 Ok(Self {
289 base_url,
290 http_method,
291 throw_exception_on_failure,
292 ok_status_code_range,
293 response_timeout,
294 query_params,
295 allow_private_ips,
296 blocked_hosts,
297 max_body_size,
298 read_timeout_ms,
299 max_response_bytes,
300 auth,
301 user_agent,
302 cookie_handling,
303 bridge_endpoint,
304 connection_close,
305 skip_request_headers,
306 skip_response_headers,
307 })
308 }
309}
310
311fn parse_auth_from_params(params: &HashMap<String, String>) -> Result<HttpAuth, CamelError> {
312 let Some(method) = params.get("authMethod") else {
313 return Ok(HttpAuth::None);
314 };
315
316 if method.eq_ignore_ascii_case("none") {
317 return Ok(HttpAuth::None);
318 }
319
320 if method.eq_ignore_ascii_case("basic") {
321 let username = params.get("authUsername").cloned().ok_or_else(|| {
322 CamelError::InvalidUri("authUsername is required for authMethod=Basic".to_string())
323 })?;
324 let password = params.get("authPassword").cloned().ok_or_else(|| {
325 CamelError::InvalidUri("authPassword is required for authMethod=Basic".to_string())
326 })?;
327 return Ok(HttpAuth::Basic { username, password });
328 }
329
330 if method.eq_ignore_ascii_case("bearer") {
331 let token = params.get("authBearerToken").cloned().ok_or_else(|| {
332 CamelError::InvalidUri("authBearerToken is required for authMethod=Bearer".to_string())
333 })?;
334 return Ok(HttpAuth::Bearer { token });
335 }
336
337 Err(CamelError::InvalidUri(format!(
338 "invalid value for authMethod: {method} (expected None, Basic, or Bearer)"
339 )))
340}
341
342fn parse_bool_param_http(value: &str) -> Result<bool, CamelError> {
343 match value.to_ascii_lowercase().as_str() {
344 "true" | "1" | "yes" => Ok(true),
345 "false" | "0" | "no" => Ok(false),
346 _ => Err(CamelError::InvalidUri(format!(
347 "invalid boolean value: '{value}'"
348 ))),
349 }
350}
351
352impl HttpEndpointConfig {
353 pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
354 let parts = parse_uri(uri)?;
355 let mut endpoint = Self::from_components(parts.clone())?;
356 if endpoint.response_timeout.is_none() {
357 endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
358 }
359 if !parts.params.contains_key("allowPrivateIps") {
360 endpoint.allow_private_ips = config.allow_private_ips;
361 }
362 if !parts.params.contains_key("blockedHosts") {
363 endpoint.blocked_hosts = config.blocked_hosts.clone();
364 }
365 if !parts.params.contains_key("maxBodySize") {
366 endpoint.max_body_size = config.max_body_size;
367 }
368 if !parts.params.contains_key("readTimeout") {
369 endpoint.read_timeout_ms = config.read_timeout_ms;
370 }
371 if !parts.params.contains_key("maxResponseBytes") {
372 endpoint.max_response_bytes = config.max_response_bytes;
373 }
374 if !parts.params.contains_key("okStatusCodeRange")
375 && let Some(range) = &config.ok_status_code_range
376 {
377 endpoint.ok_status_code_range = parse_ok_status_code_range(range)?;
378 }
379
380 Ok(endpoint)
381 }
382}
383
384#[derive(Debug, Clone)]
390pub struct HttpServerConfig {
391 pub host: String,
393 pub port: u16,
395 pub path: String,
397 pub max_request_body: usize,
399 pub max_response_body: usize,
401 pub max_inflight_requests: usize,
403}
404
405impl UriConfig for HttpServerConfig {
406 fn scheme() -> &'static str {
408 "http"
409 }
410
411 fn from_uri(uri: &str) -> Result<Self, CamelError> {
412 let parts = parse_uri(uri)?;
413 Self::from_components(parts)
414 }
415
416 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
417 if parts.scheme != "http" && parts.scheme != "https" {
419 return Err(CamelError::InvalidUri(format!(
420 "expected scheme 'http' or 'https', got '{}'",
421 parts.scheme
422 )));
423 }
424
425 let authority_and_path = parts.path.trim_start_matches('/');
428
429 let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
431 (&authority_and_path[..idx], &authority_and_path[idx..])
432 } else {
433 (authority_and_path, "/")
434 };
435
436 let path = if path_suffix.is_empty() {
437 "/"
438 } else {
439 path_suffix
440 }
441 .to_string();
442
443 let (host, port) = if let Some(colon) = authority.rfind(':') {
445 let port_str = &authority[colon + 1..];
446 match port_str.parse::<u16>() {
447 Ok(p) => (authority[..colon].to_string(), p),
448 Err(_) => {
449 return Err(CamelError::InvalidUri(format!(
450 "invalid port '{}' in authority",
451 port_str
452 )));
453 }
454 }
455 } else {
456 let default_port = if parts.scheme == "https" { 443 } else { 80 };
458 (authority.to_string(), default_port)
459 };
460
461 let max_request_body = parts
462 .params
463 .get("maxRequestBody")
464 .and_then(|v| v.parse::<usize>().ok())
465 .unwrap_or(2 * 1024 * 1024); let max_response_body = parts
468 .params
469 .get("maxResponseBody")
470 .and_then(|v| v.parse::<usize>().ok())
471 .unwrap_or(10 * 1024 * 1024); let max_inflight_requests = parts
474 .params
475 .get("maxInflightRequests")
476 .and_then(|v| v.parse::<usize>().ok())
477 .unwrap_or(1024);
478
479 Ok(Self {
480 host,
481 port,
482 path,
483 max_request_body,
484 max_response_body,
485 max_inflight_requests,
486 })
487 }
488}
489
490impl HttpServerConfig {
491 pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
492 let parts = parse_uri(uri)?;
493 let mut server = Self::from_components(parts.clone())?;
494 if !parts.params.contains_key("maxRequestBody") {
495 server.max_request_body = config.max_request_body;
496 }
497 if !parts.params.contains_key("maxResponseBody") {
498 server.max_response_body = config.max_body_size;
500 }
501 Ok(server)
502 }
503}
504
505pub(crate) enum HttpReplyBody {
511 Bytes(bytes::Bytes),
512 Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
513}
514
515pub(crate) struct RequestEnvelope {
518 pub(crate) method: String,
519 pub(crate) path: String,
520 pub(crate) query: String,
521 pub(crate) headers: http::HeaderMap,
522 pub(crate) body: StreamBody,
523 pub(crate) reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
524}
525
526pub(crate) struct HttpReply {
528 pub(crate) status: u16,
529 pub(crate) headers: Vec<(String, String)>,
530 pub(crate) body: HttpReplyBody,
531}
532
533pub(crate) type DispatchTable =
539 Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
540
541type ServerKey = (String, u16);
542
543#[allow(dead_code)]
545struct ServerHandle {
546 dispatch: DispatchTable,
547 max_request_body: usize,
548 max_response_body: usize,
549 max_inflight_requests: usize,
550 inflight: Arc<tokio::sync::Semaphore>,
551 _task: tokio::task::JoinHandle<()>,
553}
554
555pub struct ServerRegistry {
557 inner: Mutex<HashMap<ServerKey, Arc<OnceCell<ServerHandle>>>>,
558}
559
560impl ServerRegistry {
561 pub fn global() -> &'static Self {
563 static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
564 INSTANCE.get_or_init(|| ServerRegistry {
565 inner: Mutex::new(HashMap::new()),
566 })
567 }
568
569 pub(crate) async fn get_or_spawn(
572 &'static self,
573 host: &str,
574 port: u16,
575 max_request_body: usize,
576 max_response_body: usize,
577 max_inflight_requests: usize,
578 ) -> Result<DispatchTable, CamelError> {
579 let host_owned = host.to_string();
580
581 let cell = {
582 let mut guard = self.inner.lock().map_err(|_| {
583 CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
584 })?;
585 let key = (host.to_string(), port);
586 guard
587 .entry(key)
588 .or_insert_with(|| Arc::new(OnceCell::new()))
589 .clone()
590 };
591
592 if let Some(existing) = cell.get()
593 && existing.max_request_body != max_request_body
594 {
595 return Err(CamelError::EndpointCreationFailed(format!(
596 "incompatible maxRequestBody for shared server (host={host}, port={port}): {} vs {}",
597 existing.max_request_body, max_request_body
598 )));
599 }
600
601 if let Some(existing) = cell.get()
602 && existing.max_response_body != max_response_body
603 {
604 return Err(CamelError::EndpointCreationFailed(format!(
605 "incompatible maxResponseBody for shared server (host={host}, port={port}): {} vs {}",
606 existing.max_response_body, max_response_body
607 )));
608 }
609
610 if let Some(existing) = cell.get()
611 && existing.max_inflight_requests != max_inflight_requests
612 {
613 return Err(CamelError::EndpointCreationFailed(format!(
614 "incompatible maxInflightRequests for shared server (host={host}, port={port}): {} vs {}",
615 existing.max_inflight_requests, max_inflight_requests
616 )));
617 }
618
619 let handle = cell
620 .get_or_try_init(|| async {
621 let addr = format!("{host_owned}:{port}");
622 let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
623 CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
624 })?;
625 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
626 let inflight = Arc::new(tokio::sync::Semaphore::new(max_inflight_requests));
627 let task = tokio::spawn(run_axum_server(
628 listener,
629 Arc::clone(&dispatch),
630 max_request_body,
631 max_response_body,
632 Arc::clone(&inflight),
633 ));
634 Ok::<ServerHandle, CamelError>(ServerHandle {
635 dispatch,
636 max_request_body,
637 max_response_body,
638 max_inflight_requests,
639 inflight,
640 _task: task,
641 })
642 })
643 .await?;
644
645 Ok(Arc::clone(&handle.dispatch))
646 }
647
648 #[cfg(test)]
655 pub fn reset() {
656 let instance = Self::global();
657 let mut guard = instance
658 .inner
659 .lock()
660 .expect("ServerRegistry lock poisoned during test reset");
661 guard.clear();
662 }
663}
664
665use axum::{
670 Router,
671 body::Body as AxumBody,
672 extract::{Request, State},
673 http::{Response, StatusCode},
674 response::IntoResponse,
675};
676
677#[derive(Clone)]
678struct AppState {
679 dispatch: DispatchTable,
680 max_request_body: usize,
681 max_response_body: usize,
682 inflight: Arc<tokio::sync::Semaphore>,
683}
684
685async fn run_axum_server(
686 listener: tokio::net::TcpListener,
687 dispatch: DispatchTable,
688 max_request_body: usize,
689 max_response_body: usize,
690 inflight: Arc<tokio::sync::Semaphore>,
691) {
692 let state = AppState {
693 dispatch,
694 max_request_body,
695 max_response_body,
696 inflight,
697 };
698 let app = Router::new().fallback(dispatch_handler).with_state(state);
699
700 axum::serve(listener, app).await.unwrap_or_else(|e| {
701 tracing::error!(error = %e, "Axum server error");
702 });
703}
704
705async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
706 let method = req.method().to_string();
707 let path = req.uri().path().to_string();
708 let query = req.uri().query().unwrap_or("").to_string();
709 let headers = req.headers().clone();
710
711 let content_length: Option<u64> = headers
713 .get(http::header::CONTENT_LENGTH)
714 .and_then(|v| v.to_str().ok())
715 .and_then(|s| s.parse().ok());
716
717 if let Some(len) = content_length
718 && len > state.max_request_body as u64
719 {
720 return Response::builder()
721 .status(StatusCode::PAYLOAD_TOO_LARGE)
722 .body(AxumBody::from("Request body exceeds configured limit"))
723 .expect("infallible"); }
725
726 let _permit = match Arc::clone(&state.inflight).try_acquire_owned() {
727 Ok(permit) => permit,
728 Err(_) => {
729 return Response::builder()
730 .status(StatusCode::SERVICE_UNAVAILABLE)
731 .body(AxumBody::from("Service Unavailable"))
732 .expect("infallible"); }
734 };
735
736 let content_type = headers
738 .get(http::header::CONTENT_TYPE)
739 .and_then(|v| v.to_str().ok())
740 .map(|s| s.to_string());
741
742 let data_stream: BodyDataStream = req.into_body().into_data_stream();
743 let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
744 let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
745
746 let stream_body = StreamBody {
747 stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
748 metadata: StreamMetadata {
749 size_hint: content_length,
750 content_type,
751 origin: None,
752 },
753 };
754
755 let sender = {
757 let table = state.dispatch.read().await;
758 table.get(&path).cloned()
759 };
760 let Some(sender) = sender else {
761 return Response::builder()
762 .status(StatusCode::NOT_FOUND)
763 .body(AxumBody::from("No consumer registered for this path"))
764 .expect("infallible"); };
766
767 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
768 let envelope = RequestEnvelope {
769 method,
770 path,
771 query,
772 headers,
773 body: stream_body,
774 reply_tx,
775 };
776
777 if sender.send(envelope).await.is_err() {
778 return Response::builder()
779 .status(StatusCode::SERVICE_UNAVAILABLE)
780 .body(AxumBody::from("Consumer unavailable"))
781 .expect("infallible"); }
783
784 match reply_rx.await {
785 Ok(reply) => {
786 let reply = match reply.body {
787 HttpReplyBody::Bytes(b)
788 if exceeds_max_response_body(b.len(), state.max_response_body) =>
789 {
790 HttpReply {
791 status: 500,
792 headers: vec![],
793 body: HttpReplyBody::Bytes(bytes::Bytes::from(
794 "Response body exceeds configured limit",
795 )),
796 }
797 }
798 _ => reply,
799 };
800
801 let status =
802 StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
803 let mut builder = Response::builder().status(status);
804 for (k, v) in &reply.headers {
805 builder = builder.header(k.as_str(), v.as_str());
806 }
807 match reply.body {
808 HttpReplyBody::Bytes(b) => builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
809 Response::builder()
810 .status(StatusCode::INTERNAL_SERVER_ERROR)
811 .body(AxumBody::from("Invalid response headers from consumer"))
812 .expect("infallible") }),
814 HttpReplyBody::Stream(stream) => builder
815 .body(AxumBody::from_stream(stream))
816 .unwrap_or_else(|_| {
817 Response::builder()
818 .status(StatusCode::INTERNAL_SERVER_ERROR)
819 .body(AxumBody::from("Invalid response headers from consumer"))
820 .expect("infallible") }),
822 }
823 }
824 Err(_) => Response::builder()
825 .status(StatusCode::INTERNAL_SERVER_ERROR)
826 .body(AxumBody::from("Pipeline error"))
827 .expect("Response::builder() with a known-valid status code and body is infallible"), }
829}
830
831fn exceeds_max_response_body(len: usize, max: usize) -> bool {
832 len > max
833}
834
835fn title_case_header(name: &str) -> String {
836 name.split('-')
837 .map(|part| {
838 let mut chars = part.chars();
839 match chars.next() {
840 None => String::new(),
841 Some(first) => first.to_uppercase().chain(chars.as_str().chars()).collect(),
842 }
843 })
844 .collect::<Vec<_>>()
845 .join("-")
846}
847
848pub struct HttpConsumer {
853 config: HttpServerConfig,
854}
855
856impl HttpConsumer {
857 pub fn new(config: HttpServerConfig) -> Self {
858 Self { config }
859 }
860}
861
862#[async_trait::async_trait]
863impl Consumer for HttpConsumer {
864 async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
865 use camel_component_api::{Body, Exchange, Message};
866
867 let dispatch = ServerRegistry::global()
868 .get_or_spawn(
869 &self.config.host,
870 self.config.port,
871 self.config.max_request_body,
872 self.config.max_response_body,
873 self.config.max_inflight_requests,
874 )
875 .await?;
876
877 let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
879 {
880 let mut table = dispatch.write().await;
881 table.insert(self.config.path.clone(), env_tx);
882 }
883
884 let path = self.config.path.clone();
885 let cancel_token = ctx.cancel_token();
886 loop {
887 tokio::select! {
888 _ = ctx.cancelled() => {
889 break;
890 }
891 envelope = env_rx.recv() => {
892 let Some(envelope) = envelope else { break; };
893
894 let mut msg = Message::default();
896
897 msg.set_header("CamelHttpMethod",
899 serde_json::Value::String(envelope.method.clone()));
900 msg.set_header("CamelHttpPath",
901 serde_json::Value::String(envelope.path.clone()));
902 msg.set_header("CamelHttpQuery",
903 serde_json::Value::String(envelope.query.clone()));
904
905 for (k, v) in &envelope.headers {
907 if let Ok(val_str) = v.to_str() {
908 msg.set_header(
909 title_case_header(k.as_str()),
910 serde_json::Value::String(val_str.to_string()),
911 );
912 }
913 }
914
915 msg.body = Body::Stream(envelope.body);
918
919 #[allow(unused_mut)]
920 let mut exchange = Exchange::new(msg);
921
922 #[cfg(feature = "otel")]
924 {
925 let headers: HashMap<String, String> = envelope
926 .headers
927 .iter()
928 .filter_map(|(k, v)| {
929 Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
930 })
931 .collect();
932 camel_otel::extract_into_exchange(&mut exchange, &headers);
933 }
934
935 let reply_tx = envelope.reply_tx;
936 let sender = ctx.sender().clone();
937 let path_clone = path.clone();
938 let cancel = cancel_token.clone();
939
940 tokio::spawn(async move {
960 if cancel.is_cancelled() {
968 let _ = reply_tx.send(HttpReply {
969 status: 503,
970 headers: vec![],
971 body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
972 });
973 return;
974 }
975
976 let (tx, rx) = tokio::sync::oneshot::channel();
978 let envelope = camel_component_api::consumer::ExchangeEnvelope {
979 exchange,
980 reply_tx: Some(tx),
981 };
982
983 let result = match sender.send(envelope).await {
984 Ok(()) => rx.await.map_err(|_| camel_component_api::CamelError::ChannelClosed),
985 Err(_) => Err(camel_component_api::CamelError::ChannelClosed),
986 }
987 .and_then(|r| r);
988
989 let reply = match result {
990 Ok(out) => {
991 let status = out
992 .input
993 .header("CamelHttpResponseCode")
994 .and_then(|v| {
995 let raw = v.as_u64()
996 .or_else(|| v.as_str().and_then(|s| s.parse().ok()))?;
997 let code = raw as u16;
998 (100..1000).contains(&code).then_some(code)
999 })
1000 .unwrap_or(200);
1001
1002 let user_content_type = out
1003 .input
1004 .header("Content-Type")
1005 .and_then(|v| v.as_str().map(|s| s.to_string()));
1006
1007 let (reply_body, inferred_content_type): (HttpReplyBody, Option<String>) = match out.input.body {
1008 Body::Empty => (HttpReplyBody::Bytes(bytes::Bytes::new()), None),
1009 Body::Bytes(b) => (HttpReplyBody::Bytes(b), None),
1010 Body::Text(s) => (HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())), Some("text/plain; charset=utf-8".to_string())),
1011 Body::Xml(s) => (HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())), Some("application/xml".to_string())),
1012 Body::Json(v) => (HttpReplyBody::Bytes(bytes::Bytes::from(
1013 v.to_string().into_bytes(),
1014 )), Some("application/json".to_string())),
1015 Body::Stream(s) => {
1016 let ct = s.metadata.content_type.clone();
1017 match s.stream.lock().await.take() {
1018 Some(stream) => (
1019 HttpReplyBody::Stream(stream),
1020 ct,
1021 ),
1022 None => {
1023 tracing::error!(
1024 "Body::Stream already consumed before HTTP reply — returning 500"
1025 );
1026 let error_reply = HttpReply {
1027 status: 500,
1028 headers: vec![],
1029 body: HttpReplyBody::Bytes(bytes::Bytes::new()),
1030 };
1031 if reply_tx.send(error_reply).is_err() {
1032 debug!("reply_tx dropped before error reply could be sent");
1033 }
1034 return;
1035 }
1036 }
1037 }
1038 };
1039
1040 let mut resp_headers: Vec<(String, String)> = out
1041 .input
1042 .headers
1043 .iter()
1044 .filter(|(k, _)| !k.starts_with("Camel"))
1045 .filter(|(k, _)| {
1046 !matches!(
1047 k.to_lowercase().as_str(),
1048 "content-length"
1049 | "content-type"
1050 | "transfer-encoding"
1051 | "connection"
1052 | "cache-control"
1053 | "date"
1054 | "pragma"
1055 | "trailer"
1056 | "upgrade"
1057 | "via"
1058 | "warning"
1059 | "host"
1060 | "user-agent"
1061 | "accept"
1062 | "accept-encoding"
1063 | "accept-language"
1064 | "accept-charset"
1065 | "authorization"
1066 | "proxy-authorization"
1067 | "cookie"
1068 | "expect"
1069 | "from"
1070 | "if-match"
1071 | "if-modified-since"
1072 | "if-none-match"
1073 | "if-range"
1074 | "if-unmodified-since"
1075 | "max-forwards"
1076 | "proxy-connection"
1077 | "range"
1078 | "referer"
1079 | "te"
1080 )
1081 })
1082 .filter_map(|(k, v)| {
1083 v.as_str().map(|s| (k.clone(), s.to_string()))
1084 })
1085 .collect();
1086
1087 let content_type = user_content_type
1088 .or(inferred_content_type);
1089 if let Some(ct) = content_type {
1090 resp_headers.push(("Content-Type".to_string(), ct));
1091 }
1092
1093 HttpReply {
1094 status,
1095 headers: resp_headers,
1096 body: reply_body,
1097 }
1098 }
1099 Err(CamelError::Stopped) => {
1100 tracing::debug!(path = %path_clone, "Route stopped — returning 204 No Content");
1101 HttpReply {
1102 status: 204,
1103 headers: vec![],
1104 body: HttpReplyBody::Bytes(bytes::Bytes::new()),
1105 }
1106 }
1107 Err(e) => {
1108 tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
1109 HttpReply {
1110 status: 500,
1111 headers: vec![],
1112 body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
1113 }
1114 }
1115 };
1116
1117 let _ = reply_tx.send(reply);
1119 });
1120 }
1121 }
1122 }
1123
1124 {
1126 let mut table = dispatch.write().await;
1127 table.remove(&path);
1128 }
1129
1130 Ok(())
1131 }
1132
1133 async fn stop(&mut self) -> Result<(), CamelError> {
1134 Ok(())
1135 }
1136
1137 fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1138 camel_component_api::ConcurrencyModel::Concurrent { max: None }
1139 }
1140}
1141
1142pub struct HttpComponent {
1147 config: HttpConfig,
1148}
1149
1150fn build_client(config: &HttpConfig, cookie_handling: CookieHandling) -> reqwest::Client {
1151 let mut builder = reqwest::Client::builder()
1152 .connect_timeout(Duration::from_millis(config.connect_timeout_ms))
1153 .pool_max_idle_per_host(config.pool_max_idle_per_host)
1154 .pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
1155
1156 if !config.follow_redirects {
1157 builder = builder.redirect(reqwest::redirect::Policy::none());
1158 } else if let Some(max_redirects) = config.max_redirects {
1159 builder = builder.redirect(reqwest::redirect::Policy::limited(max_redirects));
1160 }
1161
1162 if let Some(proxy_url) = &config.proxy_url
1163 && let Ok(proxy) = reqwest::Proxy::all(proxy_url)
1164 {
1165 builder = builder.proxy(proxy);
1166 }
1167
1168 if matches!(cookie_handling, CookieHandling::InMemory) {
1169 }
1171
1172 if let Some(tls) = &config.tls
1173 && tls.enabled
1174 {
1175 if tls.insecure || !tls.verify_peer {
1176 builder = builder.danger_accept_invalid_certs(true);
1177 }
1178
1179 if let Some(ca_path) = &tls.ca_cert_path
1180 && let Ok(ca_bytes) = std::fs::read(ca_path)
1181 {
1182 let cert = reqwest::Certificate::from_pem(&ca_bytes)
1183 .or_else(|_| reqwest::Certificate::from_der(&ca_bytes));
1184 if let Ok(ca_cert) = cert {
1185 builder = builder.add_root_certificate(ca_cert);
1186 }
1187 }
1188
1189 if let (Some(cert_path), Some(key_path)) = (&tls.client_cert_path, &tls.client_key_path)
1190 && let (Ok(cert_bytes), Ok(key_bytes)) =
1191 (std::fs::read(cert_path), std::fs::read(key_path))
1192 {
1193 let mut identity_pem = cert_bytes;
1194 identity_pem.extend_from_slice(&key_bytes);
1195 if let Ok(identity) = reqwest::Identity::from_pem(&identity_pem) {
1196 builder = builder.identity(identity);
1197 }
1198 }
1199 }
1200
1201 builder
1202 .build()
1203 .expect("reqwest::Client::build() with valid config should not fail") }
1205
1206impl HttpComponent {
1207 pub fn new() -> Self {
1208 let config = HttpConfig::default();
1209 Self { config }
1210 }
1211
1212 pub fn with_config(config: HttpConfig) -> Self {
1213 Self { config }
1214 }
1215
1216 pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1217 match config {
1218 Some(cfg) => Self::with_config(cfg),
1219 None => Self::new(),
1220 }
1221 }
1222}
1223
1224impl Default for HttpComponent {
1225 fn default() -> Self {
1226 Self::new()
1227 }
1228}
1229
1230impl Component for HttpComponent {
1231 fn scheme(&self) -> &str {
1232 "http"
1233 }
1234
1235 fn create_endpoint(
1236 &self,
1237 uri: &str,
1238 _ctx: &dyn camel_component_api::ComponentContext,
1239 ) -> Result<Box<dyn Endpoint>, CamelError> {
1240 self.config.validate()?;
1241 let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1242 let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1243 let client = build_client(&self.config, config.cookie_handling);
1244 Ok(Box::new(HttpEndpoint {
1245 uri: uri.to_string(),
1246 config,
1247 server_config,
1248 client,
1249 }))
1250 }
1251}
1252
1253pub struct HttpsComponent {
1254 config: HttpConfig,
1255}
1256
1257impl HttpsComponent {
1258 pub fn new() -> Self {
1259 let config = HttpConfig::default();
1260 Self { config }
1261 }
1262
1263 pub fn with_config(config: HttpConfig) -> Self {
1264 Self { config }
1265 }
1266
1267 pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1268 match config {
1269 Some(cfg) => Self::with_config(cfg),
1270 None => Self::new(),
1271 }
1272 }
1273}
1274
1275impl Default for HttpsComponent {
1276 fn default() -> Self {
1277 Self::new()
1278 }
1279}
1280
1281impl Component for HttpsComponent {
1282 fn scheme(&self) -> &str {
1283 "https"
1284 }
1285
1286 fn create_endpoint(
1287 &self,
1288 uri: &str,
1289 _ctx: &dyn camel_component_api::ComponentContext,
1290 ) -> Result<Box<dyn Endpoint>, CamelError> {
1291 self.config.validate()?;
1292 let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1293 let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1294 let client = build_client(&self.config, config.cookie_handling);
1295 Ok(Box::new(HttpEndpoint {
1296 uri: uri.to_string(),
1297 config,
1298 server_config,
1299 client,
1300 }))
1301 }
1302}
1303
1304struct HttpEndpoint {
1309 uri: String,
1310 config: HttpEndpointConfig,
1311 server_config: HttpServerConfig,
1312 client: reqwest::Client,
1313}
1314
1315impl Endpoint for HttpEndpoint {
1316 fn uri(&self) -> &str {
1317 &self.uri
1318 }
1319
1320 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1321 Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
1322 }
1323
1324 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1325 Ok(BoxProcessor::new(HttpProducer {
1326 config: Arc::new(self.config.clone()),
1327 client: self.client.clone(),
1328 }))
1329 }
1330}
1331
1332fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
1337 let parsed = url::Url::parse(url)
1338 .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1339
1340 if let Some(host) = parsed.host_str()
1342 && config.blocked_hosts.iter().any(|blocked| host == blocked)
1343 {
1344 return Err(CamelError::ProcessorError(format!(
1345 "Host '{}' is blocked",
1346 host
1347 )));
1348 }
1349
1350 if !config.allow_private_ips
1352 && let Some(host) = parsed.host()
1353 {
1354 match host {
1355 url::Host::Ipv4(ip) => {
1356 if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
1357 return Err(CamelError::ProcessorError(format!(
1358 "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
1359 ip
1360 )));
1361 }
1362 }
1363 url::Host::Ipv6(ip) => {
1364 if ip.is_loopback() {
1365 return Err(CamelError::ProcessorError(format!(
1366 "Loopback IP '{}' not allowed",
1367 ip
1368 )));
1369 }
1370 }
1371 url::Host::Domain(domain) => {
1372 let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
1374 if blocked_domains.contains(&domain) {
1375 return Err(CamelError::ProcessorError(format!(
1376 "Domain '{}' is not allowed",
1377 domain
1378 )));
1379 }
1380 }
1381 }
1382 }
1383
1384 Ok(())
1385}
1386
1387fn is_private_ip(ip: &IpAddr) -> bool {
1388 match ip {
1389 IpAddr::V4(ipv4) => {
1390 ipv4.is_private() || ipv4.is_loopback() || ipv4.is_link_local() || ipv4.octets()[0] == 0
1391 }
1392 IpAddr::V6(ipv6) => {
1393 let seg0 = ipv6.segments()[0];
1394 ipv6.is_loopback()
1395 || (seg0 & 0xfe00) == 0xfc00
1397 || (seg0 & 0xffc0) == 0xfe80
1399 || ipv6
1401 .to_ipv4_mapped()
1402 .map(|v4| {
1403 v4.is_private()
1404 || v4.is_loopback()
1405 || v4.is_link_local()
1406 || v4.octets()[0] == 0
1407 })
1408 .unwrap_or(false)
1409 }
1410 }
1411}
1412
1413async fn validate_resolved_host_for_ssrf(
1414 url: &str,
1415 config: &HttpEndpointConfig,
1416) -> Result<(), CamelError> {
1417 if config.allow_private_ips {
1418 return Ok(());
1419 }
1420
1421 let parsed = url::Url::parse(url)
1422 .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1423 let Some(host) = parsed.host_str() else {
1424 return Ok(());
1425 };
1426 let Some(port) = parsed.port_or_known_default() else {
1427 return Ok(());
1428 };
1429
1430 let resolved = tokio::net::lookup_host((host, port)).await.map_err(|e| {
1431 CamelError::ProcessorError(format!("Failed to resolve host '{}': {}", host, e))
1432 })?;
1433
1434 for addr in resolved {
1435 let ip = addr.ip();
1436 if is_private_ip(&ip) {
1437 return Err(CamelError::ProcessorError(format!(
1438 "Target resolved to private IP: {}",
1439 ip
1440 )));
1441 }
1442 }
1443
1444 Ok(())
1445}
1446
1447#[derive(Clone)]
1452struct HttpProducer {
1453 config: Arc<HttpEndpointConfig>,
1454 client: reqwest::Client,
1455}
1456
1457impl HttpProducer {
1458 fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1459 if let Some(ref method) = config.http_method {
1460 return method.to_uppercase();
1461 }
1462 if let Some(method) = exchange
1463 .input
1464 .header("CamelHttpMethod")
1465 .and_then(|v| v.as_str())
1466 {
1467 return method.to_uppercase();
1468 }
1469 if !exchange.input.body.is_empty() {
1470 return "POST".to_string();
1471 }
1472 "GET".to_string()
1473 }
1474
1475 fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1476 if let Some(uri) = exchange
1477 .input
1478 .header("CamelHttpUri")
1479 .and_then(|v| v.as_str())
1480 {
1481 let mut url = uri.to_string();
1482 if let Some(path) = exchange
1483 .input
1484 .header("CamelHttpPath")
1485 .and_then(|v| v.as_str())
1486 {
1487 if !url.ends_with('/') && !path.starts_with('/') {
1488 url.push('/');
1489 }
1490 url.push_str(path);
1491 }
1492 if let Some(query) = exchange
1493 .input
1494 .header("CamelHttpQuery")
1495 .and_then(|v| v.as_str())
1496 {
1497 url.push('?');
1498 url.push_str(query);
1499 }
1500 return url;
1501 }
1502
1503 let mut url = config.base_url.clone();
1504
1505 if let Some(path) = exchange
1506 .input
1507 .header("CamelHttpPath")
1508 .and_then(|v| v.as_str())
1509 {
1510 if !url.ends_with('/') && !path.starts_with('/') {
1511 url.push('/');
1512 }
1513 url.push_str(path);
1514 }
1515
1516 if let Some(query) = exchange
1517 .input
1518 .header("CamelHttpQuery")
1519 .and_then(|v| v.as_str())
1520 {
1521 url.push('?');
1522 url.push_str(query);
1523 } else if !config.query_params.is_empty() {
1524 let mut parsed = url::Url::parse(&url).expect("base URL must be valid"); for (k, v) in &config.query_params {
1526 parsed.query_pairs_mut().append_pair(k, v);
1527 }
1528 url = parsed.to_string();
1529 }
1530
1531 url
1532 }
1533
1534 fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1535 status >= range.0 && status <= range.1
1536 }
1537}
1538
1539impl Service<Exchange> for HttpProducer {
1540 type Response = Exchange;
1541 type Error = CamelError;
1542 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1543
1544 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1545 Poll::Ready(Ok(()))
1546 }
1547
1548 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1549 let config = self.config.clone();
1550 let client = self.client.clone();
1551
1552 Box::pin(async move {
1553 let method_str = HttpProducer::resolve_method(&exchange, &config);
1554 let url = HttpProducer::resolve_url(&exchange, &config);
1555
1556 validate_url_for_ssrf(&url, &config)?;
1558 validate_resolved_host_for_ssrf(&url, &config).await?;
1559
1560 debug!(
1561 correlation_id = %exchange.correlation_id(),
1562 method = %method_str,
1563 url = %url,
1564 "HTTP request"
1565 );
1566
1567 let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1568 CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1569 })?;
1570
1571 let mut request = client.request(method, &url);
1572
1573 if let Some(timeout) = config.response_timeout {
1574 request = request.timeout(timeout);
1575 }
1576
1577 if let Some(user_agent) = &config.user_agent
1578 && !config.bridge_endpoint
1579 {
1580 request = request.header("User-Agent", user_agent.clone());
1581 }
1582
1583 #[cfg(feature = "otel")]
1585 let should_inject_otel = !config.bridge_endpoint;
1586 #[cfg(feature = "otel")]
1587 if should_inject_otel {
1588 let mut otel_headers = HashMap::new();
1589 camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1590 for (k, v) in otel_headers {
1591 if let (Ok(name), Ok(val)) = (
1592 reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1593 reqwest::header::HeaderValue::from_str(&v),
1594 ) {
1595 request = request.header(name, val);
1596 }
1597 }
1598 }
1599
1600 for (key, value) in &exchange.input.headers {
1601 if !key.starts_with("Camel")
1602 && !config
1603 .skip_request_headers
1604 .iter()
1605 .any(|h| h.eq_ignore_ascii_case(key))
1606 && let Some(val_str) = value.as_str()
1607 && let (Ok(name), Ok(val)) = (
1608 reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1609 reqwest::header::HeaderValue::from_str(val_str),
1610 )
1611 {
1612 request = request.header(name, val);
1613 }
1614 }
1615
1616 if !config.bridge_endpoint {
1617 match &config.auth {
1618 HttpAuth::None => {}
1619 HttpAuth::Basic { username, password } => {
1620 request = request.basic_auth(username, Some(password));
1621 }
1622 HttpAuth::Bearer { token } => {
1623 request = request.bearer_auth(token);
1624 }
1625 }
1626
1627 if config.connection_close {
1628 request = request.header("Connection", "close");
1629 }
1630 }
1631
1632 match exchange.input.body {
1633 Body::Stream(ref s) => {
1634 let mut stream_lock = s.stream.lock().await;
1635 if let Some(stream) = stream_lock.take() {
1636 request = request.body(reqwest::Body::wrap_stream(stream));
1637 } else {
1638 return Err(CamelError::AlreadyConsumed);
1639 }
1640 }
1641 _ => {
1642 let body = std::mem::take(&mut exchange.input.body);
1644 let bytes = body.into_bytes(config.max_body_size).await?;
1645 if !bytes.is_empty() {
1646 request = request.body(bytes);
1647 }
1648 }
1649 }
1650
1651 let response = request
1652 .send()
1653 .await
1654 .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1655
1656 let status_code = response.status().as_u16();
1657 let status_text = response
1658 .status()
1659 .canonical_reason()
1660 .unwrap_or("Unknown")
1661 .to_string();
1662
1663 for (key, value) in response.headers() {
1664 if config
1665 .skip_response_headers
1666 .iter()
1667 .any(|h| h.eq_ignore_ascii_case(key.as_str()))
1668 {
1669 continue;
1670 }
1671 if let Ok(val_str) = value.to_str() {
1672 exchange.input.set_header(
1673 title_case_header(key.as_str()),
1674 serde_json::Value::String(val_str.to_string()),
1675 );
1676 }
1677 }
1678
1679 exchange.input.set_header(
1680 "CamelHttpResponseCode",
1681 serde_json::Value::Number(status_code.into()),
1682 );
1683 exchange.input.set_header(
1684 "CamelHttpResponseText",
1685 serde_json::Value::String(status_text.clone()),
1686 );
1687
1688 let read_timeout = Duration::from_millis(config.read_timeout_ms);
1690 let response_body = tokio::time::timeout(read_timeout, async {
1691 if let Some(content_len) = response.content_length()
1693 && content_len > config.max_response_bytes as u64
1694 {
1695 return Err(CamelError::ProcessorError(format!(
1696 "Response body too large: {} bytes exceeds limit of {} bytes",
1697 content_len, config.max_response_bytes
1698 )));
1699 }
1700 use futures::TryStreamExt;
1702 let mut stream = response.bytes_stream();
1703 let mut total: usize = 0;
1704 let mut collected = Vec::new();
1705 while let Some(chunk) = stream.try_next().await.map_err(|e| {
1706 CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1707 })? {
1708 total += chunk.len();
1709 if total > config.max_response_bytes {
1710 return Err(CamelError::ProcessorError(format!(
1711 "Response body too large: {} bytes exceeds limit of {} bytes",
1712 total, config.max_response_bytes
1713 )));
1714 }
1715 collected.push(chunk);
1716 }
1717 let mut result = bytes::BytesMut::with_capacity(total);
1718 for chunk in collected {
1719 result.extend_from_slice(&chunk);
1720 }
1721 Ok::<bytes::Bytes, CamelError>(result.freeze())
1722 })
1723 .await
1724 .map_err(|_| {
1725 CamelError::ProcessorError(format!(
1726 "Read timeout after {}ms",
1727 config.read_timeout_ms
1728 ))
1729 })??;
1730
1731 if config.throw_exception_on_failure
1732 && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1733 {
1734 return Err(CamelError::HttpOperationFailed {
1735 method: method_str,
1736 url,
1737 status_code,
1738 status_text,
1739 response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1740 });
1741 }
1742
1743 if !response_body.is_empty() {
1744 exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1745 }
1746
1747 debug!(
1748 correlation_id = %exchange.correlation_id(),
1749 status = status_code,
1750 url = %url,
1751 "HTTP response"
1752 );
1753 Ok(exchange)
1754 })
1755 }
1756}
1757
1758#[cfg(test)]
1759mod tests {
1760 use super::*;
1761 use camel_component_api::{Message, NoOpComponentContext};
1762 use std::sync::Arc;
1763 use std::time::Duration;
1764
1765 fn test_producer_ctx() -> ProducerContext {
1766 ProducerContext::new()
1767 }
1768
1769 #[test]
1770 fn test_http_config_defaults() {
1771 let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1772 assert_eq!(config.base_url, "http://localhost:8080/api");
1773 assert!(config.http_method.is_none());
1774 assert!(config.throw_exception_on_failure);
1775 assert_eq!(config.ok_status_code_range, (200, 299));
1776 assert!(config.response_timeout.is_none());
1777 assert!(matches!(config.auth, HttpAuth::None));
1778 assert!(matches!(config.cookie_handling, CookieHandling::Disabled));
1779 assert!(!config.bridge_endpoint);
1780 assert!(!config.connection_close);
1781 }
1782
1783 #[test]
1784 fn test_http_config_scheme() {
1785 assert_eq!(HttpEndpointConfig::scheme(), "http");
1787 }
1788
1789 #[test]
1790 fn test_http_config_from_components() {
1791 let components = camel_component_api::UriComponents {
1793 scheme: "https".to_string(),
1794 path: "//api.example.com/v1".to_string(),
1795 params: std::collections::HashMap::from([(
1796 "httpMethod".to_string(),
1797 "POST".to_string(),
1798 )]),
1799 };
1800 let config = HttpEndpointConfig::from_components(components).unwrap();
1801 assert_eq!(config.base_url, "https://api.example.com/v1");
1802 assert_eq!(config.http_method, Some("POST".to_string()));
1803 }
1804
1805 #[test]
1806 fn test_http_config_with_options() {
1807 let config = HttpEndpointConfig::from_uri(
1808 "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1809 ).unwrap();
1810 assert_eq!(config.base_url, "https://api.example.com/v1");
1811 assert_eq!(config.http_method, Some("PUT".to_string()));
1812 assert!(!config.throw_exception_on_failure);
1813 assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1814 }
1815
1816 #[test]
1817 fn test_http_endpoint_config_auth_and_headers_options() {
1818 let config = HttpEndpointConfig::from_uri(
1819 "http://localhost/api?authMethod=Basic&authUsername=u&authPassword=p&userAgent=camel-test&bridgeEndpoint=true&connectionClose=true&skipRequestHeaders=Authorization,X-Secret&skipResponseHeaders=Set-Cookie&cookieHandling=InMemory",
1820 )
1821 .unwrap();
1822
1823 assert!(matches!(
1824 config.auth,
1825 HttpAuth::Basic { username, password } if username == "u" && password == "p"
1826 ));
1827 assert_eq!(config.user_agent.as_deref(), Some("camel-test"));
1828 assert!(matches!(config.cookie_handling, CookieHandling::InMemory));
1829 assert!(config.bridge_endpoint);
1830 assert!(config.connection_close);
1831 assert_eq!(
1832 config.skip_request_headers,
1833 vec!["authorization".to_string(), "x-secret".to_string()]
1834 );
1835 assert_eq!(config.skip_response_headers, vec!["set-cookie".to_string()]);
1836 }
1837
1838 #[test]
1839 fn test_http_endpoint_config_bearer_auth() {
1840 let config = HttpEndpointConfig::from_uri(
1841 "http://localhost/api?authMethod=Bearer&authBearerToken=t",
1842 )
1843 .unwrap();
1844 assert!(matches!(
1845 config.auth,
1846 HttpAuth::Bearer { token } if token == "t"
1847 ));
1848 }
1849
1850 #[test]
1851 fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
1852 let config = HttpConfig::default()
1853 .with_response_timeout_ms(999)
1854 .with_allow_private_ips(true)
1855 .with_blocked_hosts(vec!["evil.com".to_string()])
1856 .with_max_body_size(12345);
1857 let endpoint =
1858 HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
1859 assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
1860 assert!(endpoint.allow_private_ips);
1861 assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
1862 assert_eq!(endpoint.max_body_size, 12345);
1863 }
1864
1865 #[test]
1866 fn test_from_uri_with_defaults_uri_overrides_config() {
1867 let config = HttpConfig::default()
1868 .with_response_timeout_ms(999)
1869 .with_allow_private_ips(true)
1870 .with_blocked_hosts(vec!["evil.com".to_string()])
1871 .with_max_body_size(12345);
1872 let endpoint = HttpEndpointConfig::from_uri_with_defaults(
1873 "http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
1874 &config,
1875 )
1876 .unwrap();
1877 assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
1878 assert!(!endpoint.allow_private_ips);
1879 assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
1880 assert_eq!(endpoint.max_body_size, 99);
1881 }
1882
1883 #[test]
1884 fn test_http_config_ok_status_range() {
1885 let config =
1886 HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1887 assert_eq!(config.ok_status_code_range, (200, 204));
1888 }
1889
1890 #[test]
1891 fn test_http_config_wrong_scheme() {
1892 let result = HttpEndpointConfig::from_uri("file:/tmp");
1893 assert!(result.is_err());
1894 }
1895
1896 #[test]
1897 fn test_http_component_scheme() {
1898 let component = HttpComponent::new();
1899 assert_eq!(component.scheme(), "http");
1900 }
1901
1902 #[test]
1903 fn test_https_component_scheme() {
1904 let component = HttpsComponent::new();
1905 assert_eq!(component.scheme(), "https");
1906 }
1907
1908 #[test]
1909 fn test_http_endpoint_creates_consumer() {
1910 let component = HttpComponent::new();
1911 let ctx = NoOpComponentContext;
1912 let endpoint = component
1913 .create_endpoint("http://0.0.0.0:19100/test", &ctx)
1914 .unwrap();
1915 assert!(endpoint.create_consumer().is_ok());
1916 }
1917
1918 #[test]
1919 fn test_https_endpoint_creates_consumer() {
1920 let component = HttpsComponent::new();
1921 let ctx = NoOpComponentContext;
1922 let endpoint = component
1923 .create_endpoint("https://0.0.0.0:8443/test", &ctx)
1924 .unwrap();
1925 assert!(endpoint.create_consumer().is_ok());
1926 }
1927
1928 #[test]
1929 fn test_http_endpoint_creates_producer() {
1930 let ctx = test_producer_ctx();
1931 let component = HttpComponent::new();
1932 let endpoint_ctx = NoOpComponentContext;
1933 let endpoint = component
1934 .create_endpoint("http://localhost/api", &endpoint_ctx)
1935 .unwrap();
1936 assert!(endpoint.create_producer(&ctx).is_ok());
1937 }
1938
1939 async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
1944 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1945 let addr = listener.local_addr().unwrap();
1946 let url = format!("http://127.0.0.1:{}", addr.port());
1947
1948 let handle = tokio::spawn(async move {
1949 loop {
1950 if let Ok((mut stream, _)) = listener.accept().await {
1951 tokio::spawn(async move {
1952 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1953 let mut buf = vec![0u8; 4096];
1954 let n = stream.read(&mut buf).await.unwrap_or(0);
1955 let request = String::from_utf8_lossy(&buf[..n]).to_string();
1956
1957 let method = request.split_whitespace().next().unwrap_or("GET");
1958
1959 let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
1960 let response = format!(
1961 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
1962 body.len(),
1963 body
1964 );
1965 let _ = stream.write_all(response.as_bytes()).await;
1966 });
1967 }
1968 }
1969 });
1970
1971 (url, handle)
1972 }
1973
1974 async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
1975 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1976 let addr = listener.local_addr().unwrap();
1977 let url = format!("http://127.0.0.1:{}", addr.port());
1978
1979 let handle = tokio::spawn(async move {
1980 loop {
1981 if let Ok((mut stream, _)) = listener.accept().await {
1982 let status = status;
1983 tokio::spawn(async move {
1984 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1985 let mut buf = vec![0u8; 4096];
1986 let _ = stream.read(&mut buf).await;
1987
1988 let status_text = match status {
1989 404 => "Not Found",
1990 500 => "Internal Server Error",
1991 _ => "Error",
1992 };
1993 let body = "error body";
1994 let response = format!(
1995 "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
1996 status,
1997 status_text,
1998 body.len(),
1999 body
2000 );
2001 let _ = stream.write_all(response.as_bytes()).await;
2002 });
2003 }
2004 }
2005 });
2006
2007 (url, handle)
2008 }
2009
2010 #[tokio::test]
2011 async fn test_http_producer_get_request() {
2012 use tower::ServiceExt;
2013
2014 let (url, _handle) = start_test_server().await;
2015 let ctx = test_producer_ctx();
2016
2017 let component = HttpComponent::new();
2018 let endpoint_ctx = NoOpComponentContext;
2019 let endpoint = component
2020 .create_endpoint(
2021 &format!("{url}/api/test?allowPrivateIps=true"),
2022 &endpoint_ctx,
2023 )
2024 .unwrap();
2025 let producer = endpoint.create_producer(&ctx).unwrap();
2026
2027 let exchange = Exchange::new(Message::default());
2028 let result = producer.oneshot(exchange).await.unwrap();
2029
2030 let status = result
2031 .input
2032 .header("CamelHttpResponseCode")
2033 .and_then(|v| v.as_u64())
2034 .unwrap();
2035 assert_eq!(status, 200);
2036
2037 assert!(!result.input.body.is_empty());
2038 }
2039
2040 #[tokio::test]
2041 async fn test_http_producer_post_with_body() {
2042 use tower::ServiceExt;
2043
2044 let (url, _handle) = start_test_server().await;
2045 let ctx = test_producer_ctx();
2046
2047 let component = HttpComponent::new();
2048 let endpoint_ctx = NoOpComponentContext;
2049 let endpoint = component
2050 .create_endpoint(
2051 &format!("{url}/api/data?allowPrivateIps=true"),
2052 &endpoint_ctx,
2053 )
2054 .unwrap();
2055 let producer = endpoint.create_producer(&ctx).unwrap();
2056
2057 let exchange = Exchange::new(Message::new("request body"));
2058 let result = producer.oneshot(exchange).await.unwrap();
2059
2060 let status = result
2061 .input
2062 .header("CamelHttpResponseCode")
2063 .and_then(|v| v.as_u64())
2064 .unwrap();
2065 assert_eq!(status, 200);
2066 }
2067
2068 #[tokio::test]
2069 async fn test_http_producer_method_from_header() {
2070 use tower::ServiceExt;
2071
2072 let (url, _handle) = start_test_server().await;
2073 let ctx = test_producer_ctx();
2074
2075 let component = HttpComponent::new();
2076 let endpoint_ctx = NoOpComponentContext;
2077 let endpoint = component
2078 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2079 .unwrap();
2080 let producer = endpoint.create_producer(&ctx).unwrap();
2081
2082 let mut exchange = Exchange::new(Message::default());
2083 exchange.input.set_header(
2084 "CamelHttpMethod",
2085 serde_json::Value::String("DELETE".to_string()),
2086 );
2087
2088 let result = producer.oneshot(exchange).await.unwrap();
2089 let status = result
2090 .input
2091 .header("CamelHttpResponseCode")
2092 .and_then(|v| v.as_u64())
2093 .unwrap();
2094 assert_eq!(status, 200);
2095 }
2096
2097 #[tokio::test]
2098 async fn test_http_producer_forced_method() {
2099 use tower::ServiceExt;
2100
2101 let (url, _handle) = start_test_server().await;
2102 let ctx = test_producer_ctx();
2103
2104 let component = HttpComponent::new();
2105 let endpoint_ctx = NoOpComponentContext;
2106 let endpoint = component
2107 .create_endpoint(
2108 &format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"),
2109 &endpoint_ctx,
2110 )
2111 .unwrap();
2112 let producer = endpoint.create_producer(&ctx).unwrap();
2113
2114 let exchange = Exchange::new(Message::default());
2115 let result = producer.oneshot(exchange).await.unwrap();
2116
2117 let status = result
2118 .input
2119 .header("CamelHttpResponseCode")
2120 .and_then(|v| v.as_u64())
2121 .unwrap();
2122 assert_eq!(status, 200);
2123 }
2124
2125 #[tokio::test]
2126 async fn test_http_producer_throw_exception_on_failure() {
2127 use tower::ServiceExt;
2128
2129 let (url, _handle) = start_status_server(404).await;
2130 let ctx = test_producer_ctx();
2131
2132 let component = HttpComponent::new();
2133 let endpoint_ctx = NoOpComponentContext;
2134 let endpoint = component
2135 .create_endpoint(
2136 &format!("{url}/not-found?allowPrivateIps=true"),
2137 &endpoint_ctx,
2138 )
2139 .unwrap();
2140 let producer = endpoint.create_producer(&ctx).unwrap();
2141
2142 let exchange = Exchange::new(Message::default());
2143 let result = producer.oneshot(exchange).await;
2144 assert!(result.is_err());
2145
2146 match result.unwrap_err() {
2147 CamelError::HttpOperationFailed { status_code, .. } => {
2148 assert_eq!(status_code, 404);
2149 }
2150 e => panic!("Expected HttpOperationFailed, got: {e}"),
2151 }
2152 }
2153
2154 #[tokio::test]
2155 async fn test_http_producer_no_throw_on_failure() {
2156 use tower::ServiceExt;
2157
2158 let (url, _handle) = start_status_server(500).await;
2159 let ctx = test_producer_ctx();
2160
2161 let component = HttpComponent::new();
2162 let endpoint_ctx = NoOpComponentContext;
2163 let endpoint = component
2164 .create_endpoint(
2165 &format!("{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"),
2166 &endpoint_ctx,
2167 )
2168 .unwrap();
2169 let producer = endpoint.create_producer(&ctx).unwrap();
2170
2171 let exchange = Exchange::new(Message::default());
2172 let result = producer.oneshot(exchange).await.unwrap();
2173
2174 let status = result
2175 .input
2176 .header("CamelHttpResponseCode")
2177 .and_then(|v| v.as_u64())
2178 .unwrap();
2179 assert_eq!(status, 500);
2180 }
2181
2182 #[tokio::test]
2183 async fn test_http_producer_uri_override() {
2184 use tower::ServiceExt;
2185
2186 let (url, _handle) = start_test_server().await;
2187 let ctx = test_producer_ctx();
2188
2189 let component = HttpComponent::new();
2190 let endpoint_ctx = NoOpComponentContext;
2191 let endpoint = component
2192 .create_endpoint(
2193 "http://localhost:1/does-not-exist?allowPrivateIps=true",
2194 &endpoint_ctx,
2195 )
2196 .unwrap();
2197 let producer = endpoint.create_producer(&ctx).unwrap();
2198
2199 let mut exchange = Exchange::new(Message::default());
2200 exchange.input.set_header(
2201 "CamelHttpUri",
2202 serde_json::Value::String(format!("{url}/api")),
2203 );
2204
2205 let result = producer.oneshot(exchange).await.unwrap();
2206 let status = result
2207 .input
2208 .header("CamelHttpResponseCode")
2209 .and_then(|v| v.as_u64())
2210 .unwrap();
2211 assert_eq!(status, 200);
2212 }
2213
2214 #[tokio::test]
2215 async fn test_http_producer_response_headers_mapped() {
2216 use tower::ServiceExt;
2217
2218 let (url, _handle) = start_test_server().await;
2219 let ctx = test_producer_ctx();
2220
2221 let component = HttpComponent::new();
2222 let endpoint_ctx = NoOpComponentContext;
2223 let endpoint = component
2224 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2225 .unwrap();
2226 let producer = endpoint.create_producer(&ctx).unwrap();
2227
2228 let exchange = Exchange::new(Message::default());
2229 let result = producer.oneshot(exchange).await.unwrap();
2230
2231 assert!(
2232 result.input.header("Content-Type").is_some(),
2233 "Response should have Content-Type header"
2234 );
2235 assert!(result.input.header("CamelHttpResponseText").is_some());
2236 }
2237
2238 async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
2243 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2244 let addr = listener.local_addr().unwrap();
2245 let url = format!("http://127.0.0.1:{}", addr.port());
2246
2247 let handle = tokio::spawn(async move {
2248 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2249 loop {
2250 if let Ok((mut stream, _)) = listener.accept().await {
2251 tokio::spawn(async move {
2252 let mut buf = vec![0u8; 4096];
2253 let n = stream.read(&mut buf).await.unwrap_or(0);
2254 let request = String::from_utf8_lossy(&buf[..n]).to_string();
2255
2256 if request.contains("GET /final") {
2258 let body = r#"{"status":"final"}"#;
2259 let response = format!(
2260 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
2261 body.len(),
2262 body
2263 );
2264 let _ = stream.write_all(response.as_bytes()).await;
2265 } else {
2266 let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
2268 let _ = stream.write_all(response.as_bytes()).await;
2269 }
2270 });
2271 }
2272 }
2273 });
2274
2275 (url, handle)
2276 }
2277
2278 #[tokio::test]
2279 async fn test_follow_redirects_false_does_not_follow() {
2280 use tower::ServiceExt;
2281
2282 let (url, _handle) = start_redirect_server().await;
2283 let ctx = test_producer_ctx();
2284
2285 let component =
2286 HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
2287 let endpoint_ctx = NoOpComponentContext;
2288 let endpoint = component
2289 .create_endpoint(
2290 &format!("{url}?throwExceptionOnFailure=false&allowPrivateIps=true"),
2291 &endpoint_ctx,
2292 )
2293 .unwrap();
2294 let producer = endpoint.create_producer(&ctx).unwrap();
2295
2296 let exchange = Exchange::new(Message::default());
2297 let result = producer.oneshot(exchange).await.unwrap();
2298
2299 let status = result
2301 .input
2302 .header("CamelHttpResponseCode")
2303 .and_then(|v| v.as_u64())
2304 .unwrap();
2305 assert_eq!(
2306 status, 302,
2307 "Should NOT follow redirect when followRedirects=false"
2308 );
2309 }
2310
2311 #[tokio::test]
2312 async fn test_follow_redirects_true_follows_redirect() {
2313 use tower::ServiceExt;
2314
2315 let (url, _handle) = start_redirect_server().await;
2316 let ctx = test_producer_ctx();
2317
2318 let component =
2319 HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
2320 let endpoint_ctx = NoOpComponentContext;
2321 let endpoint = component
2322 .create_endpoint(&format!("{url}?allowPrivateIps=true"), &endpoint_ctx)
2323 .unwrap();
2324 let producer = endpoint.create_producer(&ctx).unwrap();
2325
2326 let exchange = Exchange::new(Message::default());
2327 let result = producer.oneshot(exchange).await.unwrap();
2328
2329 let status = result
2331 .input
2332 .header("CamelHttpResponseCode")
2333 .and_then(|v| v.as_u64())
2334 .unwrap();
2335 assert_eq!(
2336 status, 200,
2337 "Should follow redirect when followRedirects=true"
2338 );
2339 }
2340
2341 #[tokio::test]
2342 async fn test_query_params_forwarded_to_http_request() {
2343 use tower::ServiceExt;
2344
2345 let (url, _handle) = start_test_server().await;
2346 let ctx = test_producer_ctx();
2347
2348 let component = HttpComponent::new();
2349 let endpoint_ctx = NoOpComponentContext;
2350 let endpoint = component
2352 .create_endpoint(
2353 &format!("{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"),
2354 &endpoint_ctx,
2355 )
2356 .unwrap();
2357 let producer = endpoint.create_producer(&ctx).unwrap();
2358
2359 let exchange = Exchange::new(Message::default());
2360 let result = producer.oneshot(exchange).await.unwrap();
2361
2362 let status = result
2365 .input
2366 .header("CamelHttpResponseCode")
2367 .and_then(|v| v.as_u64())
2368 .unwrap();
2369 assert_eq!(status, 200);
2370 }
2371
2372 #[tokio::test]
2373 async fn test_non_camel_query_params_are_forwarded() {
2374 let config = HttpEndpointConfig::from_uri(
2377 "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
2378 )
2379 .unwrap();
2380
2381 assert!(
2383 config.query_params.contains_key("apiKey"),
2384 "apiKey should be preserved"
2385 );
2386 assert!(
2387 config.query_params.contains_key("token"),
2388 "token should be preserved"
2389 );
2390 assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
2391 assert_eq!(config.query_params.get("token").unwrap(), "abc456");
2392
2393 assert!(
2395 !config.query_params.contains_key("httpMethod"),
2396 "httpMethod should not be forwarded"
2397 );
2398 }
2399
2400 #[test]
2401 fn test_query_params_are_url_encoded_when_resolving_url() {
2402 let config =
2403 HttpEndpointConfig::from_uri("http://example.com/api?q=hello world&tag=a+b").unwrap();
2404 let exchange = Exchange::new(Message::default());
2405
2406 let url = HttpProducer::resolve_url(&exchange, &config);
2407
2408 assert!(url.contains("q=hello+world"), "url was: {url}");
2409 assert!(url.contains("tag=a%2Bb"), "url was: {url}");
2410 }
2411
2412 async fn start_slow_server(delay_ms: u64) -> (String, tokio::task::JoinHandle<()>) {
2417 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2418 let addr = listener.local_addr().unwrap();
2419 let url = format!("http://127.0.0.1:{}", addr.port());
2420
2421 let handle = tokio::spawn(async move {
2422 loop {
2423 if let Ok((mut stream, _)) = listener.accept().await {
2424 let delay = delay_ms;
2425 tokio::spawn(async move {
2426 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2427 let mut buf = vec![0u8; 4096];
2428 let _ = stream.read(&mut buf).await;
2429 let headers = "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nTransfer-Encoding: chunked\r\n\r\n";
2431 let _ = stream.write_all(headers.as_bytes()).await;
2432 tokio::time::sleep(Duration::from_millis(delay)).await;
2434 let body = r#"{"status":"slow"}"#;
2435 let chunk = format!("{:x}\r\n{}\r\n0\r\n\r\n", body.len(), body);
2436 let _ = stream.write_all(chunk.as_bytes()).await;
2437 });
2438 }
2439 }
2440 });
2441
2442 (url, handle)
2443 }
2444
2445 #[tokio::test]
2446 async fn test_http_producer_timeout() {
2447 use tower::ServiceExt;
2448
2449 let (url, _handle) = start_slow_server(500).await;
2451 let ctx = test_producer_ctx();
2452
2453 let component = HttpComponent::with_config(
2454 HttpConfig::default()
2455 .with_read_timeout_ms(100)
2456 .with_response_timeout_ms(30_000), );
2458 let endpoint_ctx = NoOpComponentContext;
2459 let endpoint = component
2460 .create_endpoint(&format!("{url}/slow?allowPrivateIps=true"), &endpoint_ctx)
2461 .unwrap();
2462 let producer = endpoint.create_producer(&ctx).unwrap();
2463
2464 let exchange = Exchange::new(Message::default());
2465 let result = producer.oneshot(exchange).await;
2466
2467 assert!(result.is_err(), "Expected timeout error, got: {:?}", result);
2468 let err = result.unwrap_err().to_string();
2469 assert!(
2470 err.contains("Read timeout") || err.contains("timeout"),
2471 "Error should mention timeout, got: {}",
2472 err
2473 );
2474 }
2475
2476 #[tokio::test]
2477 async fn test_http_producer_no_timeout_when_fast() {
2478 use tower::ServiceExt;
2479
2480 let (url, _handle) = start_test_server().await;
2481 let ctx = test_producer_ctx();
2482
2483 let component =
2484 HttpComponent::with_config(HttpConfig::default().with_read_timeout_ms(5_000));
2485 let endpoint_ctx = NoOpComponentContext;
2486 let endpoint = component
2487 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2488 .unwrap();
2489 let producer = endpoint.create_producer(&ctx).unwrap();
2490
2491 let exchange = Exchange::new(Message::default());
2492 let result = producer.oneshot(exchange).await.unwrap();
2493
2494 let status = result
2495 .input
2496 .header("CamelHttpResponseCode")
2497 .and_then(|v| v.as_u64())
2498 .unwrap();
2499 assert_eq!(status, 200);
2500 }
2501
2502 #[tokio::test]
2507 async fn test_http_producer_blocks_metadata_endpoint() {
2508 use tower::ServiceExt;
2509
2510 let ctx = test_producer_ctx();
2511 let component = HttpComponent::new();
2512 let endpoint_ctx = NoOpComponentContext;
2513 let endpoint = component
2514 .create_endpoint(
2515 "http://example.com/api?allowPrivateIps=false",
2516 &endpoint_ctx,
2517 )
2518 .unwrap();
2519 let producer = endpoint.create_producer(&ctx).unwrap();
2520
2521 let mut exchange = Exchange::new(Message::default());
2522 exchange.input.set_header(
2523 "CamelHttpUri",
2524 serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
2525 );
2526
2527 let result = producer.oneshot(exchange).await;
2528 assert!(result.is_err(), "Should block AWS metadata endpoint");
2529
2530 let err = result.unwrap_err();
2531 assert!(
2532 err.to_string().contains("Private IP"),
2533 "Error should mention private IP blocking, got: {}",
2534 err
2535 );
2536 }
2537
2538 #[test]
2539 fn test_ssrf_config_defaults() {
2540 let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
2541 assert!(
2542 !config.allow_private_ips,
2543 "Private IPs should be blocked by default"
2544 );
2545 assert!(
2546 config.blocked_hosts.is_empty(),
2547 "Blocked hosts should be empty by default"
2548 );
2549 }
2550
2551 #[test]
2552 fn test_ssrf_config_allow_private_ips() {
2553 let config =
2554 HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
2555 assert!(
2556 config.allow_private_ips,
2557 "Private IPs should be allowed when explicitly set"
2558 );
2559 }
2560
2561 #[test]
2562 fn test_ssrf_config_blocked_hosts() {
2563 let config = HttpEndpointConfig::from_uri(
2564 "http://example.com/api?blockedHosts=evil.com,malware.net",
2565 )
2566 .unwrap();
2567 assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
2568 }
2569
2570 #[tokio::test]
2571 async fn test_http_producer_blocks_localhost() {
2572 use tower::ServiceExt;
2573
2574 let ctx = test_producer_ctx();
2575 let component = HttpComponent::new();
2576 let endpoint_ctx = NoOpComponentContext;
2577 let endpoint = component
2578 .create_endpoint("http://example.com/api", &endpoint_ctx)
2579 .unwrap();
2580 let producer = endpoint.create_producer(&ctx).unwrap();
2581
2582 let mut exchange = Exchange::new(Message::default());
2583 exchange.input.set_header(
2584 "CamelHttpUri",
2585 serde_json::Value::String("http://localhost:8080/internal".to_string()),
2586 );
2587
2588 let result = producer.oneshot(exchange).await;
2589 assert!(result.is_err(), "Should block localhost");
2590 }
2591
2592 #[tokio::test]
2593 async fn test_http_producer_blocks_loopback_ip() {
2594 use tower::ServiceExt;
2595
2596 let ctx = test_producer_ctx();
2597 let component = HttpComponent::new();
2598 let endpoint_ctx = NoOpComponentContext;
2599 let endpoint = component
2600 .create_endpoint("http://example.com/api", &endpoint_ctx)
2601 .unwrap();
2602 let producer = endpoint.create_producer(&ctx).unwrap();
2603
2604 let mut exchange = Exchange::new(Message::default());
2605 exchange.input.set_header(
2606 "CamelHttpUri",
2607 serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
2608 );
2609
2610 let result = producer.oneshot(exchange).await;
2611 assert!(result.is_err(), "Should block loopback IP");
2612 }
2613
2614 #[tokio::test]
2615 async fn test_http_producer_allows_private_ip_when_enabled() {
2616 use tower::ServiceExt;
2617
2618 let ctx = test_producer_ctx();
2619 let component = HttpComponent::new();
2620 let endpoint_ctx = NoOpComponentContext;
2621 let endpoint = component
2624 .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true", &endpoint_ctx)
2625 .unwrap();
2626 let producer = endpoint.create_producer(&ctx).unwrap();
2627
2628 let exchange = Exchange::new(Message::default());
2629
2630 let result = producer.oneshot(exchange).await;
2633 if let Err(ref e) = result {
2635 let err_str = e.to_string();
2636 assert!(
2637 !err_str.contains("Private IP") && !err_str.contains("not allowed"),
2638 "Should not be SSRF error, got: {}",
2639 err_str
2640 );
2641 }
2642 }
2643
2644 #[test]
2649 fn test_http_server_config_parse() {
2650 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
2651 assert_eq!(cfg.host, "0.0.0.0");
2652 assert_eq!(cfg.port, 8080);
2653 assert_eq!(cfg.path, "/orders");
2654 assert_eq!(cfg.max_inflight_requests, 1024);
2655 }
2656
2657 #[test]
2658 fn test_http_server_config_scheme() {
2659 assert_eq!(HttpServerConfig::scheme(), "http");
2661 }
2662
2663 #[test]
2664 fn test_http_server_config_from_components() {
2665 let components = camel_component_api::UriComponents {
2667 scheme: "https".to_string(),
2668 path: "//0.0.0.0:8443/api".to_string(),
2669 params: std::collections::HashMap::from([
2670 ("maxRequestBody".to_string(), "5242880".to_string()),
2671 ("maxInflightRequests".to_string(), "7".to_string()),
2672 ]),
2673 };
2674 let cfg = HttpServerConfig::from_components(components).unwrap();
2675 assert_eq!(cfg.host, "0.0.0.0");
2676 assert_eq!(cfg.port, 8443);
2677 assert_eq!(cfg.path, "/api");
2678 assert_eq!(cfg.max_request_body, 5242880);
2679 assert_eq!(cfg.max_inflight_requests, 7);
2680 }
2681
2682 #[test]
2683 fn test_http_server_config_default_path() {
2684 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
2685 assert_eq!(cfg.path, "/");
2686 }
2687
2688 #[test]
2689 fn test_http_server_config_wrong_scheme() {
2690 assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
2691 }
2692
2693 #[test]
2694 fn test_http_server_config_invalid_port() {
2695 assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
2696 }
2697
2698 #[test]
2699 fn test_http_server_config_default_port_by_scheme() {
2700 let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
2702 assert_eq!(cfg_http.port, 80);
2703
2704 let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
2706 assert_eq!(cfg_https.port, 443);
2707 }
2708
2709 #[test]
2710 fn test_request_envelope_and_reply_are_send() {
2711 fn assert_send<T: Send>() {}
2712 assert_send::<RequestEnvelope>();
2713 assert_send::<HttpReply>();
2714 }
2715
2716 static REGISTRY_TEST_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
2730
2731 #[test]
2732 fn test_server_registry_global_is_singleton() {
2733 let r1 = ServerRegistry::global();
2734 let r2 = ServerRegistry::global();
2735 assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
2736 }
2737
2738 #[tokio::test]
2739 async fn test_concurrent_get_or_spawn_returns_same_dispatch() {
2740 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2741 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2742 let port = listener.local_addr().unwrap().port();
2743 drop(listener);
2744
2745 let results: Arc<std::sync::Mutex<Vec<DispatchTable>>> =
2746 Arc::new(std::sync::Mutex::new(Vec::new()));
2747
2748 let mut handles = Vec::new();
2749 for _ in 0..4 {
2750 let results = results.clone();
2751 handles.push(tokio::spawn(async move {
2752 let dispatch = ServerRegistry::global()
2753 .get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2754 .await
2755 .unwrap();
2756 results.lock().unwrap().push(dispatch);
2757 }));
2758 }
2759
2760 for h in handles {
2761 h.await.unwrap();
2762 }
2763
2764 let dispatches = results.lock().unwrap();
2765 assert_eq!(dispatches.len(), 4);
2766 for i in 1..dispatches.len() {
2767 assert!(
2768 Arc::ptr_eq(&dispatches[0], &dispatches[i]),
2769 "all concurrent callers should get the same dispatch table"
2770 );
2771 }
2772 }
2773
2774 #[test]
2775 fn test_server_registry_distinguishes_host_and_port() {
2776 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2777 let rt = tokio::runtime::Runtime::new().expect("runtime");
2778 rt.block_on(async {
2779 let registry = ServerRegistry::global();
2780 let d1 = registry
2784 .get_or_spawn("127.0.0.1", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2785 .await;
2786 let d2 = registry
2787 .get_or_spawn("0.0.0.0", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2788 .await;
2789 assert!(d1.is_ok());
2790 assert!(d2.is_ok());
2791 assert!(!Arc::ptr_eq(&d1.unwrap(), &d2.unwrap()));
2792 });
2793 }
2794
2795 #[tokio::test]
2796 async fn test_shared_server_max_request_body_policy_is_deterministic() {
2797 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2798 let registry = ServerRegistry::global();
2799 let d1 = registry
2801 .get_or_spawn("127.0.0.1", 9991, 1024 * 1024, 10 * 1024 * 1024, 1024)
2802 .await;
2803 assert!(d1.is_ok());
2804
2805 let d2 = registry
2808 .get_or_spawn("127.0.0.1", 9991, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2809 .await;
2810 assert!(d2.is_err());
2811 let err = d2.unwrap_err();
2812 assert!(
2813 err.to_string().contains("maxRequestBody") || err.to_string().contains("incompatible"),
2814 "Expected incompatible maxRequestBody error, got: {}",
2815 err
2816 );
2817 }
2818
2819 #[test]
2820 fn test_server_registry_reset_clears_entries() {
2821 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
2822 let rt = tokio::runtime::Runtime::new().expect("runtime");
2823 rt.block_on(async {
2824 let d1 = ServerRegistry::global()
2826 .get_or_spawn("127.0.0.1", 9992, 1024 * 1024, 10 * 1024 * 1024, 1024)
2827 .await;
2828 assert!(d1.is_ok());
2829
2830 let guard = ServerRegistry::global().inner.lock().expect("lock");
2832 assert!(guard.contains_key(&("127.0.0.1".to_string(), 9992)));
2833 drop(guard);
2834
2835 ServerRegistry::reset();
2837
2838 let guard = ServerRegistry::global().inner.lock().expect("lock");
2840 assert!(
2841 guard.is_empty(),
2842 "registry should be empty after reset, has {} entries",
2843 guard.len()
2844 );
2845 });
2846 }
2847
2848 #[tokio::test]
2853 async fn test_dispatch_handler_returns_404_for_unknown_path() {
2854 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
2855 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2857 let port = listener.local_addr().unwrap().port();
2858 tokio::spawn(run_axum_server(
2859 listener,
2860 dispatch,
2861 2 * 1024 * 1024,
2862 10 * 1024 * 1024,
2863 Arc::new(tokio::sync::Semaphore::new(1024)),
2864 ));
2865
2866 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2868
2869 let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
2870 .await
2871 .unwrap();
2872 assert_eq!(resp.status().as_u16(), 404);
2873 }
2874
2875 #[tokio::test]
2880 async fn test_http_consumer_start_registers_path() {
2881 use camel_component_api::ConsumerContext;
2882
2883 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2885 let port = listener.local_addr().unwrap().port();
2886 drop(listener); let consumer_cfg = HttpServerConfig {
2889 host: "127.0.0.1".to_string(),
2890 port,
2891 path: "/ping".to_string(),
2892 max_request_body: 2 * 1024 * 1024,
2893 max_response_body: 10 * 1024 * 1024,
2894 max_inflight_requests: 1024,
2895 };
2896 let mut consumer = HttpConsumer::new(consumer_cfg);
2897
2898 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
2899 let token = tokio_util::sync::CancellationToken::new();
2900 let ctx = ConsumerContext::new(tx, token.clone());
2901
2902 tokio::spawn(async move {
2903 consumer.start(ctx).await.unwrap();
2904 });
2905
2906 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2907
2908 let client = reqwest::Client::new();
2909 let resp_future = client
2910 .post(format!("http://127.0.0.1:{port}/ping"))
2911 .body("hello world")
2912 .send();
2913
2914 let (http_result, _) = tokio::join!(resp_future, async {
2915 if let Some(mut envelope) = rx.recv().await {
2916 envelope.exchange.input.set_header(
2918 "CamelHttpResponseCode",
2919 serde_json::Value::Number(201.into()),
2920 );
2921 if let Some(reply_tx) = envelope.reply_tx {
2922 let _ = reply_tx.send(Ok(envelope.exchange));
2923 }
2924 }
2925 });
2926
2927 let resp = http_result.unwrap();
2928 assert_eq!(resp.status().as_u16(), 201);
2929
2930 token.cancel();
2931 }
2932
2933 #[tokio::test]
2934 async fn test_http_consumer_returns_503_when_inflight_limit_reached() {
2935 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2936
2937 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2938 let port = listener.local_addr().unwrap().port();
2939 drop(listener);
2940
2941 let consumer_cfg = HttpServerConfig {
2942 host: "127.0.0.1".to_string(),
2943 port,
2944 path: "/saturation".to_string(),
2945 max_request_body: 2 * 1024 * 1024,
2946 max_response_body: 10 * 1024 * 1024,
2947 max_inflight_requests: 1,
2948 };
2949 let mut consumer = HttpConsumer::new(consumer_cfg);
2950
2951 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2952 let token = tokio_util::sync::CancellationToken::new();
2953 let ctx = ConsumerContext::new(tx, token.clone());
2954 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2955 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2956
2957 let (first_seen_tx, first_seen_rx) = tokio::sync::oneshot::channel::<()>();
2958 let (unblock_first_tx, unblock_first_rx) = tokio::sync::oneshot::channel::<()>();
2959
2960 tokio::spawn(async move {
2961 let mut first_seen_tx = Some(first_seen_tx);
2962 let mut unblock_first_rx = Some(unblock_first_rx);
2963
2964 while let Some(envelope) = rx.recv().await {
2965 if let Some(tx) = first_seen_tx.take() {
2966 let _ = tx.send(());
2967 if let Some(rx_unblock) = unblock_first_rx.take() {
2968 let _ = rx_unblock.await;
2969 }
2970 }
2971
2972 if let Some(reply_tx) = envelope.reply_tx {
2973 let _ = reply_tx.send(Ok(envelope.exchange));
2974 }
2975 }
2976 });
2977
2978 let client = reqwest::Client::new();
2979 let first_req = {
2980 let client = client.clone();
2981 async move {
2982 client
2983 .get(format!("http://127.0.0.1:{port}/saturation"))
2984 .send()
2985 .await
2986 .unwrap()
2987 }
2988 };
2989
2990 let first_handle = tokio::spawn(first_req);
2991 first_seen_rx.await.unwrap();
2992
2993 let second_resp = client
2994 .get(format!("http://127.0.0.1:{port}/saturation"))
2995 .send()
2996 .await
2997 .unwrap();
2998
2999 assert_eq!(second_resp.status().as_u16(), 503);
3000
3001 let _ = unblock_first_tx.send(());
3002 let first_resp = first_handle.await.unwrap();
3003 assert_eq!(first_resp.status().as_u16(), 200);
3004
3005 token.cancel();
3006 }
3007
3008 #[tokio::test]
3009 async fn test_http_consumer_enforces_max_response_body_for_bytes() {
3010 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3011
3012 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3013 let port = listener.local_addr().unwrap().port();
3014 drop(listener);
3015
3016 let consumer_cfg = HttpServerConfig {
3017 host: "127.0.0.1".to_string(),
3018 port,
3019 path: "/limit-bytes".to_string(),
3020 max_request_body: 2 * 1024 * 1024,
3021 max_response_body: 16,
3022 max_inflight_requests: 1024,
3023 };
3024 let mut consumer = HttpConsumer::new(consumer_cfg);
3025
3026 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3027 let token = tokio_util::sync::CancellationToken::new();
3028 let ctx = ConsumerContext::new(tx, token.clone());
3029 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3030 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3031
3032 let client = reqwest::Client::new();
3033 let send_fut = client
3034 .get(format!("http://127.0.0.1:{port}/limit-bytes"))
3035 .send();
3036
3037 let (http_result, _) = tokio::join!(send_fut, async {
3038 if let Some(mut envelope) = rx.recv().await {
3039 envelope.exchange.input.body =
3040 camel_component_api::Body::Bytes(bytes::Bytes::from(vec![b'x'; 32]));
3041 if let Some(reply_tx) = envelope.reply_tx {
3042 let _ = reply_tx.send(Ok(envelope.exchange));
3043 }
3044 }
3045 });
3046
3047 let resp = http_result.unwrap();
3048 assert_eq!(resp.status().as_u16(), 500);
3049 let body = resp.text().await.unwrap();
3050 assert_eq!(body, "Response body exceeds configured limit");
3051 token.cancel();
3052 }
3053
3054 #[tokio::test]
3055 async fn test_http_consumer_enforces_max_response_body_for_json() {
3056 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3057
3058 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3059 let port = listener.local_addr().unwrap().port();
3060 drop(listener);
3061
3062 let consumer_cfg = HttpServerConfig {
3063 host: "127.0.0.1".to_string(),
3064 port,
3065 path: "/limit-json".to_string(),
3066 max_request_body: 2 * 1024 * 1024,
3067 max_response_body: 16,
3068 max_inflight_requests: 1024,
3069 };
3070 let mut consumer = HttpConsumer::new(consumer_cfg);
3071
3072 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3073 let token = tokio_util::sync::CancellationToken::new();
3074 let ctx = ConsumerContext::new(tx, token.clone());
3075 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3076 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3077
3078 let client = reqwest::Client::new();
3079 let send_fut = client
3080 .get(format!("http://127.0.0.1:{port}/limit-json"))
3081 .send();
3082
3083 let (http_result, _) = tokio::join!(send_fut, async {
3084 if let Some(mut envelope) = rx.recv().await {
3085 envelope.exchange.input.body = camel_component_api::Body::Json(
3086 serde_json::json!({"message":"this response is bigger than sixteen"}),
3087 );
3088 if let Some(reply_tx) = envelope.reply_tx {
3089 let _ = reply_tx.send(Ok(envelope.exchange));
3090 }
3091 }
3092 });
3093
3094 let resp = http_result.unwrap();
3095 assert_eq!(resp.status().as_u16(), 500);
3096 let body = resp.text().await.unwrap();
3097 assert_eq!(body, "Response body exceeds configured limit");
3098 token.cancel();
3099 }
3100
3101 #[tokio::test]
3102 async fn test_http_consumer_enforces_max_response_body_for_xml() {
3103 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3104
3105 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3106 let port = listener.local_addr().unwrap().port();
3107 drop(listener);
3108
3109 let consumer_cfg = HttpServerConfig {
3110 host: "127.0.0.1".to_string(),
3111 port,
3112 path: "/limit-xml".to_string(),
3113 max_request_body: 2 * 1024 * 1024,
3114 max_response_body: 16,
3115 max_inflight_requests: 1024,
3116 };
3117 let mut consumer = HttpConsumer::new(consumer_cfg);
3118
3119 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3120 let token = tokio_util::sync::CancellationToken::new();
3121 let ctx = ConsumerContext::new(tx, token.clone());
3122 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3123 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3124
3125 let client = reqwest::Client::new();
3126 let send_fut = client
3127 .get(format!("http://127.0.0.1:{port}/limit-xml"))
3128 .send();
3129
3130 let (http_result, _) = tokio::join!(send_fut, async {
3131 if let Some(mut envelope) = rx.recv().await {
3132 envelope.exchange.input.body = camel_component_api::Body::Xml(
3133 "<root><value>way-too-large</value></root>".into(),
3134 );
3135 if let Some(reply_tx) = envelope.reply_tx {
3136 let _ = reply_tx.send(Ok(envelope.exchange));
3137 }
3138 }
3139 });
3140
3141 let resp = http_result.unwrap();
3142 assert_eq!(resp.status().as_u16(), 500);
3143 let body = resp.text().await.unwrap();
3144 assert_eq!(body, "Response body exceeds configured limit");
3145 token.cancel();
3146 }
3147
3148 #[tokio::test]
3149 async fn test_http_consumer_does_not_enforce_max_response_body_for_stream() {
3150 use camel_component_api::{
3151 CamelError, ConsumerContext, ExchangeEnvelope, StreamBody, StreamMetadata,
3152 };
3153 use futures::stream;
3154
3155 let listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();
3156 let port = listener.local_addr().unwrap().port();
3157 drop(listener);
3158
3159 let consumer_cfg = HttpServerConfig {
3160 host: "0.0.0.0".to_string(),
3161 port,
3162 path: "/limit-stream".to_string(),
3163 max_request_body: 2 * 1024 * 1024,
3164 max_response_body: 16,
3165 max_inflight_requests: 1024,
3166 };
3167 let mut consumer = HttpConsumer::new(consumer_cfg);
3168
3169 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3170 let token = tokio_util::sync::CancellationToken::new();
3171 let ctx = ConsumerContext::new(tx, token.clone());
3172 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3173 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3174
3175 let client = reqwest::Client::new();
3176 let send_fut = client
3177 .get(format!("http://127.0.0.1:{port}/limit-stream"))
3178 .send();
3179
3180 let (http_result, _) = tokio::join!(send_fut, async {
3181 if let Some(mut envelope) = rx.recv().await {
3182 let chunks: Vec<Result<bytes::Bytes, CamelError>> =
3183 vec![Ok(bytes::Bytes::from(vec![b'x'; 32]))];
3184 let stream = Box::pin(stream::iter(chunks));
3185 envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
3186 stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
3187 metadata: StreamMetadata {
3188 size_hint: Some(32),
3189 content_type: Some("application/octet-stream".into()),
3190 origin: None,
3191 },
3192 });
3193 if let Some(reply_tx) = envelope.reply_tx {
3194 let _ = reply_tx.send(Ok(envelope.exchange));
3195 }
3196 }
3197 });
3198
3199 let resp = http_result.unwrap();
3200 assert_eq!(resp.status().as_u16(), 200);
3201 let body = resp.bytes().await.unwrap();
3202 assert_eq!(body.len(), 32);
3203 token.cancel();
3204 }
3205
3206 #[tokio::test]
3211 async fn test_integration_single_consumer_round_trip() {
3212 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3213
3214 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3216 let port = listener.local_addr().unwrap().port();
3217 drop(listener); let component = HttpComponent::new();
3220 let endpoint_ctx = NoOpComponentContext;
3221 let endpoint = component
3222 .create_endpoint(&format!("http://127.0.0.1:{port}/echo"), &endpoint_ctx)
3223 .unwrap();
3224 let mut consumer = endpoint.create_consumer().unwrap();
3225
3226 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3227 let token = tokio_util::sync::CancellationToken::new();
3228 let ctx = ConsumerContext::new(tx, token.clone());
3229
3230 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3231 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3232
3233 let client = reqwest::Client::new();
3234 let send_fut = client
3235 .post(format!("http://127.0.0.1:{port}/echo"))
3236 .header("Content-Type", "text/plain")
3237 .body("ping")
3238 .send();
3239
3240 let (http_result, _) = tokio::join!(send_fut, async {
3241 if let Some(mut envelope) = rx.recv().await {
3242 assert_eq!(
3243 envelope.exchange.input.header("CamelHttpMethod"),
3244 Some(&serde_json::Value::String("POST".into()))
3245 );
3246 assert_eq!(
3247 envelope.exchange.input.header("CamelHttpPath"),
3248 Some(&serde_json::Value::String("/echo".into()))
3249 );
3250 envelope.exchange.input.body = camel_component_api::Body::Text("pong".to_string());
3251 if let Some(reply_tx) = envelope.reply_tx {
3252 let _ = reply_tx.send(Ok(envelope.exchange));
3253 }
3254 }
3255 });
3256
3257 let resp = http_result.unwrap();
3258 assert_eq!(resp.status().as_u16(), 200);
3259 let body = resp.text().await.unwrap();
3260 assert_eq!(body, "pong");
3261
3262 token.cancel();
3263 }
3264
3265 #[tokio::test]
3266 async fn test_integration_two_consumers_shared_port() {
3267 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3268
3269 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3271 let port = listener.local_addr().unwrap().port();
3272 drop(listener);
3273
3274 let component = HttpComponent::new();
3275 let endpoint_ctx = NoOpComponentContext;
3276
3277 let endpoint_a = component
3279 .create_endpoint(&format!("http://127.0.0.1:{port}/hello"), &endpoint_ctx)
3280 .unwrap();
3281 let mut consumer_a = endpoint_a.create_consumer().unwrap();
3282
3283 let endpoint_b = component
3285 .create_endpoint(&format!("http://127.0.0.1:{port}/world"), &endpoint_ctx)
3286 .unwrap();
3287 let mut consumer_b = endpoint_b.create_consumer().unwrap();
3288
3289 let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3290 let token_a = tokio_util::sync::CancellationToken::new();
3291 let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
3292
3293 let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3294 let token_b = tokio_util::sync::CancellationToken::new();
3295 let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
3296
3297 tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
3298 tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
3299 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3300
3301 let client = reqwest::Client::new();
3302
3303 let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
3305 let (resp_hello, _) = tokio::join!(fut_hello, async {
3306 if let Some(mut envelope) = rx_a.recv().await {
3307 envelope.exchange.input.body =
3308 camel_component_api::Body::Text("hello-response".to_string());
3309 if let Some(reply_tx) = envelope.reply_tx {
3310 let _ = reply_tx.send(Ok(envelope.exchange));
3311 }
3312 }
3313 });
3314
3315 let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
3317 let (resp_world, _) = tokio::join!(fut_world, async {
3318 if let Some(mut envelope) = rx_b.recv().await {
3319 envelope.exchange.input.body =
3320 camel_component_api::Body::Text("world-response".to_string());
3321 if let Some(reply_tx) = envelope.reply_tx {
3322 let _ = reply_tx.send(Ok(envelope.exchange));
3323 }
3324 }
3325 });
3326
3327 let body_a = resp_hello.unwrap().text().await.unwrap();
3328 let body_b = resp_world.unwrap().text().await.unwrap();
3329
3330 assert_eq!(body_a, "hello-response");
3331 assert_eq!(body_b, "world-response");
3332
3333 token_a.cancel();
3334 token_b.cancel();
3335 }
3336
3337 #[tokio::test]
3338 async fn test_integration_unregistered_path_returns_404() {
3339 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3340
3341 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3343 let port = listener.local_addr().unwrap().port();
3344 drop(listener);
3345
3346 let component = HttpComponent::new();
3347 let endpoint_ctx = NoOpComponentContext;
3348 let endpoint = component
3349 .create_endpoint(
3350 &format!("http://127.0.0.1:{port}/registered"),
3351 &endpoint_ctx,
3352 )
3353 .unwrap();
3354 let mut consumer = endpoint.create_consumer().unwrap();
3355
3356 let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3357 let token = tokio_util::sync::CancellationToken::new();
3358 let ctx = ConsumerContext::new(tx, token.clone());
3359
3360 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3361
3362 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
3364 loop {
3365 if tokio::net::TcpStream::connect(format!("127.0.0.1:{port}"))
3366 .await
3367 .is_ok()
3368 {
3369 break;
3370 }
3371 if std::time::Instant::now() >= deadline {
3372 panic!("HTTP server did not start within 5s on port {port}");
3373 }
3374 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3375 }
3376
3377 let client = reqwest::Client::new();
3378 let resp = client
3379 .get(format!("http://127.0.0.1:{port}/not-there"))
3380 .send()
3381 .await
3382 .unwrap();
3383 assert_eq!(resp.status().as_u16(), 404);
3384
3385 token.cancel();
3386 }
3387
3388 #[test]
3389 fn test_http_consumer_declares_concurrent() {
3390 use camel_component_api::ConcurrencyModel;
3391
3392 let config = HttpServerConfig {
3393 host: "127.0.0.1".to_string(),
3394 port: 19999,
3395 path: "/test".to_string(),
3396 max_request_body: 2 * 1024 * 1024,
3397 max_response_body: 10 * 1024 * 1024,
3398 max_inflight_requests: 1024,
3399 };
3400 let consumer = HttpConsumer::new(config);
3401 assert_eq!(
3402 consumer.concurrency_model(),
3403 ConcurrencyModel::Concurrent { max: None }
3404 );
3405 }
3406
3407 #[tokio::test]
3412 async fn test_http_reply_body_stream_variant_exists() {
3413 use bytes::Bytes;
3414 use camel_component_api::CamelError;
3415 use futures::stream;
3416
3417 let chunks: Vec<Result<Bytes, CamelError>> =
3418 vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
3419 let stream = Box::pin(stream::iter(chunks));
3420 let reply_body = HttpReplyBody::Stream(stream);
3421 match reply_body {
3423 HttpReplyBody::Stream(_) => {}
3424 HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
3425 }
3426 }
3427
3428 #[cfg(feature = "otel")]
3433 mod otel_tests {
3434 use super::*;
3435 use camel_component_api::Message;
3436 use tower::ServiceExt;
3437
3438 #[tokio::test]
3439 async fn test_producer_injects_traceparent_header() {
3440 let (url, _handle) = start_test_server_with_header_capture().await;
3441 let ctx = test_producer_ctx();
3442
3443 let component = HttpComponent::new();
3444 let endpoint_ctx = NoOpComponentContext;
3445 let endpoint = component
3446 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3447 .unwrap();
3448 let producer = endpoint.create_producer(&ctx).unwrap();
3449
3450 let mut exchange = Exchange::new(Message::default());
3452 let mut headers = std::collections::HashMap::new();
3453 headers.insert(
3454 "traceparent".to_string(),
3455 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
3456 );
3457 camel_otel::extract_into_exchange(&mut exchange, &headers);
3458
3459 let result = producer.oneshot(exchange).await.unwrap();
3460
3461 let status = result
3463 .input
3464 .header("CamelHttpResponseCode")
3465 .and_then(|v| v.as_u64())
3466 .unwrap();
3467 assert_eq!(status, 200);
3468
3469 let traceparent = result.input.header("X-Received-Traceparent");
3471 assert!(
3472 traceparent.is_some(),
3473 "traceparent header should have been sent"
3474 );
3475
3476 let traceparent_str = traceparent.unwrap().as_str().unwrap();
3477 let parts: Vec<&str> = traceparent_str.split('-').collect();
3479 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3480 assert_eq!(parts[0], "00", "version should be 00");
3481 assert_eq!(
3482 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3483 "trace-id should match"
3484 );
3485 assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
3486 assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
3487 }
3488
3489 #[tokio::test]
3490 async fn test_consumer_extracts_traceparent_header() {
3491 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3492
3493 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3495 let port = listener.local_addr().unwrap().port();
3496 drop(listener);
3497
3498 let component = HttpComponent::new();
3499 let endpoint_ctx = NoOpComponentContext;
3500 let endpoint = component
3501 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
3502 .unwrap();
3503 let mut consumer = endpoint.create_consumer().unwrap();
3504
3505 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3506 let token = tokio_util::sync::CancellationToken::new();
3507 let ctx = ConsumerContext::new(tx, token.clone());
3508
3509 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3510 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3511
3512 let client = reqwest::Client::new();
3514 let send_fut = client
3515 .post(format!("http://127.0.0.1:{port}/trace"))
3516 .header(
3517 "traceparent",
3518 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3519 )
3520 .body("test")
3521 .send();
3522
3523 let (http_result, _) = tokio::join!(send_fut, async {
3524 if let Some(envelope) = rx.recv().await {
3525 let mut injected_headers = std::collections::HashMap::new();
3528 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3529
3530 assert!(
3531 injected_headers.contains_key("traceparent"),
3532 "Exchange should have traceparent after extraction"
3533 );
3534
3535 let traceparent = injected_headers.get("traceparent").unwrap();
3536 let parts: Vec<&str> = traceparent.split('-').collect();
3537 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3538 assert_eq!(
3539 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3540 "Trace ID should match the original traceparent header"
3541 );
3542
3543 if let Some(reply_tx) = envelope.reply_tx {
3544 let _ = reply_tx.send(Ok(envelope.exchange));
3545 }
3546 }
3547 });
3548
3549 let resp = http_result.unwrap();
3550 assert_eq!(resp.status().as_u16(), 200);
3551
3552 token.cancel();
3553 }
3554
3555 #[tokio::test]
3556 async fn test_consumer_extracts_mixed_case_traceparent_header() {
3557 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3558
3559 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3561 let port = listener.local_addr().unwrap().port();
3562 drop(listener);
3563
3564 let component = HttpComponent::new();
3565 let endpoint_ctx = NoOpComponentContext;
3566 let endpoint = component
3567 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
3568 .unwrap();
3569 let mut consumer = endpoint.create_consumer().unwrap();
3570
3571 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3572 let token = tokio_util::sync::CancellationToken::new();
3573 let ctx = ConsumerContext::new(tx, token.clone());
3574
3575 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3576 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3577
3578 let client = reqwest::Client::new();
3580 let send_fut = client
3581 .post(format!("http://127.0.0.1:{port}/trace"))
3582 .header(
3583 "TraceParent",
3584 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3585 )
3586 .body("test")
3587 .send();
3588
3589 let (http_result, _) = tokio::join!(send_fut, async {
3590 if let Some(envelope) = rx.recv().await {
3591 let mut injected_headers = HashMap::new();
3594 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3595
3596 assert!(
3597 injected_headers.contains_key("traceparent"),
3598 "Exchange should have traceparent after extraction from mixed-case header"
3599 );
3600
3601 let traceparent = injected_headers.get("traceparent").unwrap();
3602 let parts: Vec<&str> = traceparent.split('-').collect();
3603 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3604 assert_eq!(
3605 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3606 "Trace ID should match the original mixed-case TraceParent header"
3607 );
3608
3609 if let Some(reply_tx) = envelope.reply_tx {
3610 let _ = reply_tx.send(Ok(envelope.exchange));
3611 }
3612 }
3613 });
3614
3615 let resp = http_result.unwrap();
3616 assert_eq!(resp.status().as_u16(), 200);
3617
3618 token.cancel();
3619 }
3620
3621 #[tokio::test]
3622 async fn test_producer_no_trace_context_no_crash() {
3623 let (url, _handle) = start_test_server().await;
3624 let ctx = test_producer_ctx();
3625
3626 let component = HttpComponent::new();
3627 let endpoint_ctx = NoOpComponentContext;
3628 let endpoint = component
3629 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3630 .unwrap();
3631 let producer = endpoint.create_producer(&ctx).unwrap();
3632
3633 let exchange = Exchange::new(Message::default());
3635
3636 let result = producer.oneshot(exchange).await.unwrap();
3638
3639 let status = result
3641 .input
3642 .header("CamelHttpResponseCode")
3643 .and_then(|v| v.as_u64())
3644 .unwrap();
3645 assert_eq!(status, 200);
3646 }
3647
3648 async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
3650 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3651 let addr = listener.local_addr().unwrap();
3652 let url = format!("http://127.0.0.1:{}", addr.port());
3653
3654 let handle = tokio::spawn(async move {
3655 loop {
3656 if let Ok((mut stream, _)) = listener.accept().await {
3657 tokio::spawn(async move {
3658 use tokio::io::{AsyncReadExt, AsyncWriteExt};
3659 let mut buf = vec![0u8; 8192];
3660 let n = stream.read(&mut buf).await.unwrap_or(0);
3661 let request = String::from_utf8_lossy(&buf[..n]).to_string();
3662
3663 let traceparent = request
3665 .lines()
3666 .find(|line| line.to_lowercase().starts_with("traceparent:"))
3667 .map(|line| {
3668 line.split(':')
3669 .nth(1)
3670 .map(|s| s.trim().to_string())
3671 .unwrap_or_default()
3672 })
3673 .unwrap_or_default();
3674
3675 let body =
3676 format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
3677 let response = format!(
3678 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
3679 body.len(),
3680 traceparent,
3681 body
3682 );
3683 let _ = stream.write_all(response.as_bytes()).await;
3684 });
3685 }
3686 }
3687 });
3688
3689 (url, handle)
3690 }
3691 }
3692
3693 #[tokio::test]
3702 async fn test_request_body_arrives_as_stream() {
3703 use camel_component_api::Body;
3704 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3705
3706 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3707 let port = listener.local_addr().unwrap().port();
3708 drop(listener);
3709
3710 let component = HttpComponent::new();
3711 let endpoint_ctx = NoOpComponentContext;
3712 let endpoint = component
3713 .create_endpoint(&format!("http://127.0.0.1:{port}/upload"), &endpoint_ctx)
3714 .unwrap();
3715 let mut consumer = endpoint.create_consumer().unwrap();
3716
3717 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3718 let token = tokio_util::sync::CancellationToken::new();
3719 let ctx = ConsumerContext::new(tx, token.clone());
3720
3721 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3722 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3723
3724 let client = reqwest::Client::new();
3725 let send_fut = client
3726 .post(format!("http://127.0.0.1:{port}/upload"))
3727 .body("hello streaming world")
3728 .send();
3729
3730 let (http_result, _) = tokio::join!(send_fut, async {
3731 if let Some(mut envelope) = rx.recv().await {
3732 assert!(
3734 matches!(envelope.exchange.input.body, Body::Stream(_)),
3735 "expected Body::Stream, got discriminant {:?}",
3736 std::mem::discriminant(&envelope.exchange.input.body)
3737 );
3738 let bytes = envelope
3740 .exchange
3741 .input
3742 .body
3743 .into_bytes(1024 * 1024)
3744 .await
3745 .unwrap();
3746 assert_eq!(&bytes[..], b"hello streaming world");
3747
3748 envelope.exchange.input.body = camel_component_api::Body::Empty;
3749 if let Some(reply_tx) = envelope.reply_tx {
3750 let _ = reply_tx.send(Ok(envelope.exchange));
3751 }
3752 }
3753 });
3754
3755 let resp = http_result.unwrap();
3756 assert_eq!(resp.status().as_u16(), 200);
3757
3758 token.cancel();
3759 }
3760
3761 #[tokio::test]
3766 async fn test_streaming_response_chunked() {
3767 use bytes::Bytes;
3768 use camel_component_api::Body;
3769 use camel_component_api::CamelError;
3770 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3771 use camel_component_api::{StreamBody, StreamMetadata};
3772 use futures::stream;
3773 use std::sync::Arc;
3774 use tokio::sync::Mutex;
3775
3776 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3777 let port = listener.local_addr().unwrap().port();
3778 drop(listener);
3779
3780 let component = HttpComponent::new();
3781 let endpoint_ctx = NoOpComponentContext;
3782 let endpoint = component
3783 .create_endpoint(&format!("http://127.0.0.1:{port}/stream"), &endpoint_ctx)
3784 .unwrap();
3785 let mut consumer = endpoint.create_consumer().unwrap();
3786
3787 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3788 let token = tokio_util::sync::CancellationToken::new();
3789 let ctx = ConsumerContext::new(tx, token.clone());
3790
3791 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3792 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3793
3794 let client = reqwest::Client::new();
3795 let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
3796
3797 let (http_result, _) = tokio::join!(send_fut, async {
3798 if let Some(mut envelope) = rx.recv().await {
3799 let chunks: Vec<Result<Bytes, CamelError>> =
3801 vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
3802 let stream = Box::pin(stream::iter(chunks));
3803 envelope.exchange.input.body = Body::Stream(StreamBody {
3804 stream: Arc::new(Mutex::new(Some(stream))),
3805 metadata: StreamMetadata::default(),
3806 });
3807 if let Some(reply_tx) = envelope.reply_tx {
3808 let _ = reply_tx.send(Ok(envelope.exchange));
3809 }
3810 }
3811 });
3812
3813 let resp = http_result.unwrap();
3814 assert_eq!(resp.status().as_u16(), 200);
3815 let body = resp.text().await.unwrap();
3816 assert_eq!(body, "chunk1chunk2");
3817
3818 token.cancel();
3819 }
3820
3821 #[tokio::test]
3826 async fn test_413_when_content_length_exceeds_limit() {
3827 use camel_component_api::ConsumerContext;
3828
3829 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3830 let port = listener.local_addr().unwrap().port();
3831 drop(listener);
3832
3833 let component = HttpComponent::new();
3835 let endpoint_ctx = NoOpComponentContext;
3836 let endpoint = component
3837 .create_endpoint(
3838 &format!("http://127.0.0.1:{port}/upload?maxRequestBody=100"),
3839 &endpoint_ctx,
3840 )
3841 .unwrap();
3842 let mut consumer = endpoint.create_consumer().unwrap();
3843
3844 let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3845 let token = tokio_util::sync::CancellationToken::new();
3846 let ctx = ConsumerContext::new(tx, token.clone());
3847
3848 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3849 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3850
3851 let client = reqwest::Client::new();
3852 let resp = client
3853 .post(format!("http://127.0.0.1:{port}/upload"))
3854 .header("Content-Length", "1000") .body("x".repeat(1000))
3856 .send()
3857 .await
3858 .unwrap();
3859
3860 assert_eq!(resp.status().as_u16(), 413);
3861
3862 token.cancel();
3863 }
3864
3865 #[tokio::test]
3869 async fn test_chunked_upload_without_content_length_bypasses_limit() {
3870 use bytes::Bytes;
3871 use camel_component_api::Body;
3872 use camel_component_api::ConsumerContext;
3873 use futures::stream;
3874
3875 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3876 let port = listener.local_addr().unwrap().port();
3877 drop(listener);
3878
3879 let component = HttpComponent::new();
3881 let endpoint_ctx = NoOpComponentContext;
3882 let endpoint = component
3883 .create_endpoint(
3884 &format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"),
3885 &endpoint_ctx,
3886 )
3887 .unwrap();
3888 let mut consumer = endpoint.create_consumer().unwrap();
3889
3890 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3891 let token = tokio_util::sync::CancellationToken::new();
3892 let ctx = ConsumerContext::new(tx, token.clone());
3893
3894 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3895 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3896
3897 let client = reqwest::Client::new();
3898
3899 let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
3903 Ok(Bytes::from("y".repeat(50))),
3904 Ok(Bytes::from("y".repeat(50))),
3905 ];
3906 let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
3907 let send_fut = client
3908 .post(format!("http://127.0.0.1:{port}/upload"))
3909 .body(stream_body)
3910 .send();
3911
3912 let consumer_fut = async {
3913 match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
3915 Ok(Some(mut envelope)) => {
3916 assert!(
3917 matches!(envelope.exchange.input.body, Body::Stream(_)),
3918 "expected Body::Stream"
3919 );
3920 envelope.exchange.input.body = camel_component_api::Body::Empty;
3921 if let Some(reply_tx) = envelope.reply_tx {
3922 let _ = reply_tx.send(Ok(envelope.exchange));
3923 }
3924 }
3925 Ok(None) => panic!("consumer channel closed unexpectedly"),
3926 Err(_) => {
3927 }
3930 }
3931 };
3932
3933 let (http_result, _) = tokio::join!(send_fut, consumer_fut);
3934
3935 let resp = http_result.unwrap();
3936 assert_ne!(
3938 resp.status().as_u16(),
3939 413,
3940 "chunked upload must not be rejected by maxRequestBody"
3941 );
3942 assert_eq!(resp.status().as_u16(), 200);
3943
3944 token.cancel();
3945 }
3946
3947 #[test]
3948 fn test_validate_url_for_ssrf_blocks_and_allows_hosts() {
3949 let mut cfg = HttpEndpointConfig::from_uri("http://example.com").unwrap();
3950 cfg.blocked_hosts = vec!["blocked.local".to_string()];
3951 cfg.allow_private_ips = false;
3952
3953 let blocked = validate_url_for_ssrf("http://blocked.local/api", &cfg);
3954 assert!(blocked.is_err());
3955
3956 let private_ip = validate_url_for_ssrf("http://127.0.0.1/api", &cfg);
3957 assert!(private_ip.is_err());
3958
3959 cfg.allow_private_ips = true;
3960 let allowed = validate_url_for_ssrf("http://127.0.0.1/api", &cfg);
3961 assert!(allowed.is_ok());
3962 }
3963
3964 #[test]
3965 fn test_is_private_ip_ranges() {
3966 assert!(is_private_ip(&"10.0.0.1".parse().unwrap())); assert!(is_private_ip(&"172.16.1.10".parse().unwrap())); assert!(is_private_ip(&"192.168.1.1".parse().unwrap())); assert!(is_private_ip(&"127.0.0.1".parse().unwrap())); assert!(is_private_ip(&"169.254.1.1".parse().unwrap())); assert!(is_private_ip(&"0.1.2.3".parse().unwrap())); assert!(is_private_ip(&"::1".parse().unwrap())); assert!(is_private_ip(&"fc00::1".parse().unwrap())); assert!(is_private_ip(&"fd12::1".parse().unwrap())); assert!(is_private_ip(&"fe80::1".parse().unwrap())); assert!(is_private_ip(&"::ffff:10.0.0.1".parse().unwrap())); assert!(is_private_ip(&"::ffff:192.168.1.1".parse().unwrap())); assert!(is_private_ip(&"::ffff:127.0.0.1".parse().unwrap())); assert!(!is_private_ip(&"8.8.8.8".parse().unwrap())); assert!(!is_private_ip(&"::ffff:8.8.8.8".parse().unwrap())); assert!(!is_private_ip(&"2001:4860:4860::8888".parse().unwrap())); }
3986
3987 #[tokio::test]
3988 async fn test_validate_resolved_host_for_ssrf_blocks_resolved_private_ip() {
3989 let mut cfg = HttpEndpointConfig::from_uri("http://example.com").unwrap(); cfg.allow_private_ips = false;
3991
3992 let err = validate_resolved_host_for_ssrf("http://localhost", &cfg)
3993 .await
3994 .expect_err("localhost must resolve to loopback and be blocked");
3995
3996 let msg = err.to_string();
3997 assert!(msg.contains("Target resolved to private IP"));
3998 }
3999
4000 #[test]
4001 fn test_title_case_header() {
4002 assert_eq!(title_case_header("content-type"), "Content-Type");
4003 assert_eq!(title_case_header("authorization"), "Authorization");
4004 assert_eq!(title_case_header("x-custom-header"), "X-Custom-Header");
4005 assert_eq!(title_case_header("host"), "Host");
4006 assert_eq!(title_case_header("x-b3-traceid"), "X-B3-Traceid");
4007 assert_eq!(title_case_header("single"), "Single");
4008 assert_eq!(title_case_header(""), "");
4009 }
4010
4011 #[test]
4012 fn test_resolve_url_combines_path_and_query_sources() {
4013 let cfg = HttpEndpointConfig::from_uri("http://example.com/base?foo=bar").unwrap();
4014 let mut exchange = Exchange::new(Message::default());
4015 exchange.input.set_header(
4016 "CamelHttpPath",
4017 serde_json::Value::String("next".to_string()),
4018 );
4019 let url = HttpProducer::resolve_url(&exchange, &cfg);
4020 assert!(url.starts_with("http://example.com/base/next?"));
4021 assert!(url.contains("foo=bar"));
4022
4023 exchange.input.set_header(
4024 "CamelHttpUri",
4025 serde_json::Value::String("http://other.test/root".to_string()),
4026 );
4027 exchange.input.set_header(
4028 "CamelHttpQuery",
4029 serde_json::Value::String("a=1&b=2".to_string()),
4030 );
4031
4032 let override_url = HttpProducer::resolve_url(&exchange, &cfg);
4033 assert_eq!(override_url, "http://other.test/root/next?a=1&b=2");
4034 }
4035
4036 #[test]
4037 fn test_http_producer_helpers_status_and_size_boundaries() {
4038 assert!(HttpProducer::is_ok_status(200, (200, 299)));
4039 assert!(HttpProducer::is_ok_status(299, (200, 299)));
4040 assert!(!HttpProducer::is_ok_status(199, (200, 299)));
4041 assert!(!HttpProducer::is_ok_status(300, (200, 299)));
4042
4043 assert!(!exceeds_max_response_body(10, 10));
4044 assert!(exceeds_max_response_body(11, 10));
4045 }
4046
4047 async fn setup_consumer_on_free_port(
4052 path: &str,
4053 ) -> (
4054 u16,
4055 tokio::sync::mpsc::Receiver<camel_component_api::ExchangeEnvelope>,
4056 tokio_util::sync::CancellationToken,
4057 ) {
4058 use camel_component_api::ConsumerContext;
4059
4060 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4061 let port = listener.local_addr().unwrap().port();
4062 drop(listener);
4063
4064 let consumer_cfg = HttpServerConfig {
4065 host: "127.0.0.1".to_string(),
4066 port,
4067 path: path.to_string(),
4068 max_request_body: 2 * 1024 * 1024,
4069 max_response_body: 10 * 1024 * 1024,
4070 max_inflight_requests: 1024,
4071 };
4072 let mut consumer = HttpConsumer::new(consumer_cfg);
4073
4074 let (tx, rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
4075 let token = tokio_util::sync::CancellationToken::new();
4076 let ctx = ConsumerContext::new(tx, token.clone());
4077
4078 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
4079 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4080
4081 (port, rx, token)
4082 }
4083
4084 #[tokio::test]
4085 async fn test_content_type_inferred_for_json_body() {
4086 let (port, mut rx, token) = setup_consumer_on_free_port("/json").await;
4087
4088 let client = reqwest::Client::new();
4089 let send_fut = client.get(format!("http://127.0.0.1:{port}/json")).send();
4090
4091 let (http_result, _) = tokio::join!(send_fut, async {
4092 if let Some(mut envelope) = rx.recv().await {
4093 envelope.exchange.input.body =
4094 camel_component_api::Body::Json(serde_json::json!({"message": "hello"}));
4095 if let Some(reply_tx) = envelope.reply_tx {
4096 let _ = reply_tx.send(Ok(envelope.exchange));
4097 }
4098 }
4099 });
4100
4101 let resp = http_result.unwrap();
4102 assert_eq!(resp.status().as_u16(), 200);
4103 let ct = resp
4104 .headers()
4105 .get("content-type")
4106 .expect("Content-Type header should be present");
4107 assert_eq!(ct, "application/json");
4108 let body = resp.text().await.unwrap();
4109 assert_eq!(body, r#"{"message":"hello"}"#);
4110
4111 token.cancel();
4112 }
4113
4114 #[tokio::test]
4115 async fn test_content_type_inferred_for_text_body() {
4116 let (port, mut rx, token) = setup_consumer_on_free_port("/text").await;
4117
4118 let client = reqwest::Client::new();
4119 let send_fut = client.get(format!("http://127.0.0.1:{port}/text")).send();
4120
4121 let (http_result, _) = tokio::join!(send_fut, async {
4122 if let Some(mut envelope) = rx.recv().await {
4123 envelope.exchange.input.body =
4124 camel_component_api::Body::Text("plain text response".to_string());
4125 if let Some(reply_tx) = envelope.reply_tx {
4126 let _ = reply_tx.send(Ok(envelope.exchange));
4127 }
4128 }
4129 });
4130
4131 let resp = http_result.unwrap();
4132 assert_eq!(resp.status().as_u16(), 200);
4133 let ct = resp
4134 .headers()
4135 .get("content-type")
4136 .expect("Content-Type header should be present");
4137 assert_eq!(ct, "text/plain; charset=utf-8");
4138 let body = resp.text().await.unwrap();
4139 assert_eq!(body, "plain text response");
4140
4141 token.cancel();
4142 }
4143
4144 #[tokio::test]
4145 async fn test_content_type_inferred_for_xml_body() {
4146 let (port, mut rx, token) = setup_consumer_on_free_port("/xml").await;
4147
4148 let client = reqwest::Client::new();
4149 let send_fut = client.get(format!("http://127.0.0.1:{port}/xml")).send();
4150
4151 let (http_result, _) = tokio::join!(send_fut, async {
4152 if let Some(mut envelope) = rx.recv().await {
4153 envelope.exchange.input.body =
4154 camel_component_api::Body::Xml("<root><item>value</item></root>".to_string());
4155 if let Some(reply_tx) = envelope.reply_tx {
4156 let _ = reply_tx.send(Ok(envelope.exchange));
4157 }
4158 }
4159 });
4160
4161 let resp = http_result.unwrap();
4162 assert_eq!(resp.status().as_u16(), 200);
4163 let ct = resp
4164 .headers()
4165 .get("content-type")
4166 .expect("Content-Type header should be present");
4167 assert_eq!(ct, "application/xml");
4168 let body = resp.text().await.unwrap();
4169 assert_eq!(body, "<root><item>value</item></root>");
4170
4171 token.cancel();
4172 }
4173
4174 #[tokio::test]
4175 async fn test_no_content_type_for_empty_body() {
4176 let (port, mut rx, token) = setup_consumer_on_free_port("/empty").await;
4177
4178 let client = reqwest::Client::new();
4179 let send_fut = client.get(format!("http://127.0.0.1:{port}/empty")).send();
4180
4181 let (http_result, _) = tokio::join!(send_fut, async {
4182 if let Some(mut envelope) = rx.recv().await {
4183 envelope.exchange.input.body = camel_component_api::Body::Empty;
4184 if let Some(reply_tx) = envelope.reply_tx {
4185 let _ = reply_tx.send(Ok(envelope.exchange));
4186 }
4187 }
4188 });
4189
4190 let resp = http_result.unwrap();
4191 assert_eq!(resp.status().as_u16(), 200);
4192 assert!(
4193 resp.headers().get("content-type").is_none(),
4194 "Empty body should not set Content-Type"
4195 );
4196
4197 token.cancel();
4198 }
4199
4200 #[tokio::test]
4201 async fn test_no_content_type_for_raw_bytes_body() {
4202 let (port, mut rx, token) = setup_consumer_on_free_port("/bytes").await;
4203
4204 let client = reqwest::Client::new();
4205 let send_fut = client.get(format!("http://127.0.0.1:{port}/bytes")).send();
4206
4207 let (http_result, _) = tokio::join!(send_fut, async {
4208 if let Some(mut envelope) = rx.recv().await {
4209 envelope.exchange.input.body =
4210 camel_component_api::Body::Bytes(bytes::Bytes::from_static(b"\x00\x01\x02"));
4211 if let Some(reply_tx) = envelope.reply_tx {
4212 let _ = reply_tx.send(Ok(envelope.exchange));
4213 }
4214 }
4215 });
4216
4217 let resp = http_result.unwrap();
4218 assert_eq!(resp.status().as_u16(), 200);
4219 assert!(
4220 resp.headers().get("content-type").is_none(),
4221 "Raw Bytes body should not set Content-Type"
4222 );
4223
4224 token.cancel();
4225 }
4226
4227 #[tokio::test]
4228 async fn test_content_type_from_stream_metadata() {
4229 use camel_component_api::{StreamBody, StreamMetadata};
4230 use futures::stream;
4231
4232 let (port, mut rx, token) = setup_consumer_on_free_port("/stream-ct").await;
4233
4234 let client = reqwest::Client::new();
4235 let send_fut = client
4236 .get(format!("http://127.0.0.1:{port}/stream-ct"))
4237 .send();
4238
4239 let (http_result, _) = tokio::join!(send_fut, async {
4240 if let Some(mut envelope) = rx.recv().await {
4241 let chunks: Vec<Result<bytes::Bytes, CamelError>> =
4242 vec![Ok(bytes::Bytes::from("audio data"))];
4243 let stream = Box::pin(stream::iter(chunks));
4244 envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
4245 stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
4246 metadata: StreamMetadata {
4247 size_hint: None,
4248 content_type: Some("audio/mpeg".to_string()),
4249 origin: None,
4250 },
4251 });
4252 if let Some(reply_tx) = envelope.reply_tx {
4253 let _ = reply_tx.send(Ok(envelope.exchange));
4254 }
4255 }
4256 });
4257
4258 let resp = http_result.unwrap();
4259 assert_eq!(resp.status().as_u16(), 200);
4260 let ct = resp
4261 .headers()
4262 .get("content-type")
4263 .expect("Content-Type header should be present");
4264 assert_eq!(ct, "audio/mpeg");
4265 let body = resp.text().await.unwrap();
4266 assert_eq!(body, "audio data");
4267
4268 token.cancel();
4269 }
4270
4271 #[tokio::test]
4272 async fn test_user_content_type_overrides_inferred() {
4273 let (port, mut rx, token) = setup_consumer_on_free_port("/override-ct").await;
4274
4275 let client = reqwest::Client::new();
4276 let send_fut = client
4277 .get(format!("http://127.0.0.1:{port}/override-ct"))
4278 .send();
4279
4280 let (http_result, _) = tokio::join!(send_fut, async {
4281 if let Some(mut envelope) = rx.recv().await {
4282 envelope.exchange.input.body =
4283 camel_component_api::Body::Json(serde_json::json!({"ok": true}));
4284 envelope.exchange.input.set_header(
4285 "Content-Type",
4286 serde_json::Value::String("text/html".to_string()),
4287 );
4288 if let Some(reply_tx) = envelope.reply_tx {
4289 let _ = reply_tx.send(Ok(envelope.exchange));
4290 }
4291 }
4292 });
4293
4294 let resp = http_result.unwrap();
4295 assert_eq!(resp.status().as_u16(), 200);
4296 let ct = resp
4297 .headers()
4298 .get("content-type")
4299 .expect("Content-Type header should be present");
4300 assert_eq!(
4301 ct, "text/html",
4302 "User-set Content-Type should take precedence over inferred type"
4303 );
4304
4305 token.cancel();
4306 }
4307
4308 #[tokio::test]
4309 async fn test_user_content_type_with_bytes_body() {
4310 let (port, mut rx, token) = setup_consumer_on_free_port("/bytes-ct").await;
4311
4312 let client = reqwest::Client::new();
4313 let send_fut = client
4314 .get(format!("http://127.0.0.1:{port}/bytes-ct"))
4315 .send();
4316
4317 let (http_result, _) = tokio::join!(send_fut, async {
4318 if let Some(mut envelope) = rx.recv().await {
4319 envelope.exchange.input.body =
4320 camel_component_api::Body::Bytes(bytes::Bytes::from_static(b"{\"ok\":true}"));
4321 envelope.exchange.input.set_header(
4322 "Content-Type",
4323 serde_json::Value::String("application/json".to_string()),
4324 );
4325 if let Some(reply_tx) = envelope.reply_tx {
4326 let _ = reply_tx.send(Ok(envelope.exchange));
4327 }
4328 }
4329 });
4330
4331 let resp = http_result.unwrap();
4332 assert_eq!(resp.status().as_u16(), 200);
4333 let ct = resp
4334 .headers()
4335 .get("content-type")
4336 .expect("Content-Type header should be present for Bytes body with user header");
4337 assert_eq!(
4338 ct, "application/json",
4339 "User Content-Type should be sent for Bytes body"
4340 );
4341
4342 token.cancel();
4343 }
4344}