1pub mod bundle;
2pub mod config;
3pub use bundle::HttpBundle;
4pub use config::HttpConfig;
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex, OnceLock};
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use tokio::sync::{OnceCell, RwLock};
14use tower::Service;
15use tracing::debug;
16
17use axum::body::BodyDataStream;
18use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, StreamBody, StreamMetadata};
19use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
20use camel_component_api::{UriComponents, UriConfig, parse_uri};
21use futures::TryStreamExt;
22use futures::stream::BoxStream;
23
24#[derive(Debug, Clone)]
85pub struct HttpEndpointConfig {
86 pub base_url: String,
87 pub http_method: Option<String>,
88 pub throw_exception_on_failure: bool,
89 pub ok_status_code_range: (u16, u16),
90 pub response_timeout: Option<Duration>,
91 pub query_params: HashMap<String, String>,
92 pub allow_private_ips: bool,
93 pub blocked_hosts: Vec<String>,
94 pub max_body_size: usize,
95}
96
97const HTTP_CAMEL_OPTIONS: &[&str] = &[
99 "httpMethod",
100 "throwExceptionOnFailure",
101 "okStatusCodeRange",
102 "followRedirects",
103 "connectTimeout",
104 "responseTimeout",
105 "allowPrivateIps",
106 "blockedHosts",
107 "maxBodySize",
108];
109
110impl UriConfig for HttpEndpointConfig {
111 fn scheme() -> &'static str {
113 "http"
114 }
115
116 fn from_uri(uri: &str) -> Result<Self, CamelError> {
117 let parts = parse_uri(uri)?;
118 Self::from_components(parts)
119 }
120
121 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
122 if parts.scheme != "http" && parts.scheme != "https" {
124 return Err(CamelError::InvalidUri(format!(
125 "expected scheme 'http' or 'https', got '{}'",
126 parts.scheme
127 )));
128 }
129
130 let base_url = format!("{}:{}", parts.scheme, parts.path);
133
134 let http_method = parts.params.get("httpMethod").cloned();
135
136 let throw_exception_on_failure = parts
137 .params
138 .get("throwExceptionOnFailure")
139 .map(|v| v != "false")
140 .unwrap_or(true);
141
142 let ok_status_code_range = parts
144 .params
145 .get("okStatusCodeRange")
146 .and_then(|v| {
147 let (start, end) = v.split_once('-')?;
148 Some((start.parse::<u16>().ok()?, end.parse::<u16>().ok()?))
149 })
150 .unwrap_or((200, 299));
151
152 let response_timeout = parts
153 .params
154 .get("responseTimeout")
155 .and_then(|v| v.parse::<u64>().ok())
156 .map(Duration::from_millis);
157
158 let allow_private_ips = parts
160 .params
161 .get("allowPrivateIps")
162 .map(|v| v == "true")
163 .unwrap_or(false); let blocked_hosts = parts
167 .params
168 .get("blockedHosts")
169 .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
170 .unwrap_or_default();
171
172 let max_body_size = parts
173 .params
174 .get("maxBodySize")
175 .and_then(|v| v.parse::<usize>().ok())
176 .unwrap_or(10 * 1024 * 1024); let query_params: HashMap<String, String> = parts
180 .params
181 .into_iter()
182 .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
183 .collect();
184
185 Ok(Self {
186 base_url,
187 http_method,
188 throw_exception_on_failure,
189 ok_status_code_range,
190 response_timeout,
191 query_params,
192 allow_private_ips,
193 blocked_hosts,
194 max_body_size,
195 })
196 }
197}
198
199impl HttpEndpointConfig {
200 pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
201 let parts = parse_uri(uri)?;
202 let mut endpoint = Self::from_components(parts.clone())?;
203 if endpoint.response_timeout.is_none() {
204 endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
205 }
206 if !parts.params.contains_key("allowPrivateIps") {
207 endpoint.allow_private_ips = config.allow_private_ips;
208 }
209 if !parts.params.contains_key("blockedHosts") {
210 endpoint.blocked_hosts = config.blocked_hosts.clone();
211 }
212 if !parts.params.contains_key("maxBodySize") {
213 endpoint.max_body_size = config.max_body_size;
214 }
215 Ok(endpoint)
216 }
217}
218
219#[derive(Debug, Clone)]
225pub struct HttpServerConfig {
226 pub host: String,
228 pub port: u16,
230 pub path: String,
232 pub max_request_body: usize,
234 pub max_response_body: usize,
236 pub max_inflight_requests: usize,
238}
239
240impl UriConfig for HttpServerConfig {
241 fn scheme() -> &'static str {
243 "http"
244 }
245
246 fn from_uri(uri: &str) -> Result<Self, CamelError> {
247 let parts = parse_uri(uri)?;
248 Self::from_components(parts)
249 }
250
251 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
252 if parts.scheme != "http" && parts.scheme != "https" {
254 return Err(CamelError::InvalidUri(format!(
255 "expected scheme 'http' or 'https', got '{}'",
256 parts.scheme
257 )));
258 }
259
260 let authority_and_path = parts.path.trim_start_matches('/');
263
264 let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
266 (&authority_and_path[..idx], &authority_and_path[idx..])
267 } else {
268 (authority_and_path, "/")
269 };
270
271 let path = if path_suffix.is_empty() {
272 "/"
273 } else {
274 path_suffix
275 }
276 .to_string();
277
278 let (host, port) = if let Some(colon) = authority.rfind(':') {
280 let port_str = &authority[colon + 1..];
281 match port_str.parse::<u16>() {
282 Ok(p) => (authority[..colon].to_string(), p),
283 Err(_) => {
284 return Err(CamelError::InvalidUri(format!(
285 "invalid port '{}' in authority",
286 port_str
287 )));
288 }
289 }
290 } else {
291 let default_port = if parts.scheme == "https" { 443 } else { 80 };
293 (authority.to_string(), default_port)
294 };
295
296 let max_request_body = parts
297 .params
298 .get("maxRequestBody")
299 .and_then(|v| v.parse::<usize>().ok())
300 .unwrap_or(2 * 1024 * 1024); let max_response_body = parts
303 .params
304 .get("maxResponseBody")
305 .and_then(|v| v.parse::<usize>().ok())
306 .unwrap_or(10 * 1024 * 1024); let max_inflight_requests = parts
309 .params
310 .get("maxInflightRequests")
311 .and_then(|v| v.parse::<usize>().ok())
312 .unwrap_or(1024);
313
314 Ok(Self {
315 host,
316 port,
317 path,
318 max_request_body,
319 max_response_body,
320 max_inflight_requests,
321 })
322 }
323}
324
325impl HttpServerConfig {
326 pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
327 let parts = parse_uri(uri)?;
328 let mut server = Self::from_components(parts.clone())?;
329 if !parts.params.contains_key("maxRequestBody") {
330 server.max_request_body = config.max_request_body;
331 }
332 if !parts.params.contains_key("maxResponseBody") {
333 server.max_response_body = config.max_body_size;
334 }
335 Ok(server)
336 }
337}
338
339pub(crate) enum HttpReplyBody {
345 Bytes(bytes::Bytes),
346 Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
347}
348
349pub(crate) struct RequestEnvelope {
352 pub(crate) method: String,
353 pub(crate) path: String,
354 pub(crate) query: String,
355 pub(crate) headers: http::HeaderMap,
356 pub(crate) body: StreamBody,
357 pub(crate) reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
358}
359
360pub(crate) struct HttpReply {
362 pub(crate) status: u16,
363 pub(crate) headers: Vec<(String, String)>,
364 pub(crate) body: HttpReplyBody,
365}
366
367pub(crate) type DispatchTable =
373 Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
374
375type ServerKey = (String, u16);
376
377#[allow(dead_code)]
379struct ServerHandle {
380 dispatch: DispatchTable,
381 max_request_body: usize,
382 max_response_body: usize,
383 max_inflight_requests: usize,
384 inflight: Arc<tokio::sync::Semaphore>,
385 _task: tokio::task::JoinHandle<()>,
387}
388
389pub struct ServerRegistry {
391 inner: Mutex<HashMap<ServerKey, Arc<OnceCell<ServerHandle>>>>,
392}
393
394impl ServerRegistry {
395 pub fn global() -> &'static Self {
397 static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
398 INSTANCE.get_or_init(|| ServerRegistry {
399 inner: Mutex::new(HashMap::new()),
400 })
401 }
402
403 pub(crate) async fn get_or_spawn(
406 &'static self,
407 host: &str,
408 port: u16,
409 max_request_body: usize,
410 max_response_body: usize,
411 max_inflight_requests: usize,
412 ) -> Result<DispatchTable, CamelError> {
413 let host_owned = host.to_string();
414
415 let cell = {
416 let mut guard = self.inner.lock().map_err(|_| {
417 CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
418 })?;
419 let key = (host.to_string(), port);
420 guard
421 .entry(key)
422 .or_insert_with(|| Arc::new(OnceCell::new()))
423 .clone()
424 };
425
426 if let Some(existing) = cell.get()
427 && existing.max_request_body != max_request_body
428 {
429 return Err(CamelError::EndpointCreationFailed(format!(
430 "incompatible maxRequestBody for shared server (host={host}, port={port}): {} vs {}",
431 existing.max_request_body, max_request_body
432 )));
433 }
434
435 if let Some(existing) = cell.get()
436 && existing.max_response_body != max_response_body
437 {
438 return Err(CamelError::EndpointCreationFailed(format!(
439 "incompatible maxResponseBody for shared server (host={host}, port={port}): {} vs {}",
440 existing.max_response_body, max_response_body
441 )));
442 }
443
444 if let Some(existing) = cell.get()
445 && existing.max_inflight_requests != max_inflight_requests
446 {
447 return Err(CamelError::EndpointCreationFailed(format!(
448 "incompatible maxInflightRequests for shared server (host={host}, port={port}): {} vs {}",
449 existing.max_inflight_requests, max_inflight_requests
450 )));
451 }
452
453 let handle = cell
454 .get_or_try_init(|| async {
455 let addr = format!("{host_owned}:{port}");
456 let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
457 CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
458 })?;
459 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
460 let inflight = Arc::new(tokio::sync::Semaphore::new(max_inflight_requests));
461 let task = tokio::spawn(run_axum_server(
462 listener,
463 Arc::clone(&dispatch),
464 max_request_body,
465 max_response_body,
466 Arc::clone(&inflight),
467 ));
468 Ok::<ServerHandle, CamelError>(ServerHandle {
469 dispatch,
470 max_request_body,
471 max_response_body,
472 max_inflight_requests,
473 inflight,
474 _task: task,
475 })
476 })
477 .await?;
478
479 Ok(Arc::clone(&handle.dispatch))
480 }
481}
482
483use axum::{
488 Router,
489 body::Body as AxumBody,
490 extract::{Request, State},
491 http::{Response, StatusCode},
492 response::IntoResponse,
493};
494
495#[derive(Clone)]
496struct AppState {
497 dispatch: DispatchTable,
498 max_request_body: usize,
499 max_response_body: usize,
500 inflight: Arc<tokio::sync::Semaphore>,
501}
502
503async fn run_axum_server(
504 listener: tokio::net::TcpListener,
505 dispatch: DispatchTable,
506 max_request_body: usize,
507 max_response_body: usize,
508 inflight: Arc<tokio::sync::Semaphore>,
509) {
510 let state = AppState {
511 dispatch,
512 max_request_body,
513 max_response_body,
514 inflight,
515 };
516 let app = Router::new().fallback(dispatch_handler).with_state(state);
517
518 axum::serve(listener, app).await.unwrap_or_else(|e| {
519 tracing::error!(error = %e, "Axum server error");
520 });
521}
522
523async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
524 let method = req.method().to_string();
525 let path = req.uri().path().to_string();
526 let query = req.uri().query().unwrap_or("").to_string();
527 let headers = req.headers().clone();
528
529 let content_length: Option<u64> = headers
531 .get(http::header::CONTENT_LENGTH)
532 .and_then(|v| v.to_str().ok())
533 .and_then(|s| s.parse().ok());
534
535 if let Some(len) = content_length
536 && len > state.max_request_body as u64
537 {
538 return Response::builder()
539 .status(StatusCode::PAYLOAD_TOO_LARGE)
540 .body(AxumBody::from("Request body exceeds configured limit"))
541 .expect("infallible");
542 }
543
544 let _permit = match Arc::clone(&state.inflight).try_acquire_owned() {
545 Ok(permit) => permit,
546 Err(_) => {
547 return Response::builder()
548 .status(StatusCode::SERVICE_UNAVAILABLE)
549 .body(AxumBody::from("Service Unavailable"))
550 .expect("infallible");
551 }
552 };
553
554 let content_type = headers
556 .get(http::header::CONTENT_TYPE)
557 .and_then(|v| v.to_str().ok())
558 .map(|s| s.to_string());
559
560 let data_stream: BodyDataStream = req.into_body().into_data_stream();
561 let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
562 let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
563
564 let stream_body = StreamBody {
565 stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
566 metadata: StreamMetadata {
567 size_hint: content_length,
568 content_type,
569 origin: None,
570 },
571 };
572
573 let sender = {
575 let table = state.dispatch.read().await;
576 table.get(&path).cloned()
577 };
578 let Some(sender) = sender else {
579 return Response::builder()
580 .status(StatusCode::NOT_FOUND)
581 .body(AxumBody::from("No consumer registered for this path"))
582 .expect("infallible");
583 };
584
585 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
586 let envelope = RequestEnvelope {
587 method,
588 path,
589 query,
590 headers,
591 body: stream_body,
592 reply_tx,
593 };
594
595 if sender.send(envelope).await.is_err() {
596 return Response::builder()
597 .status(StatusCode::SERVICE_UNAVAILABLE)
598 .body(AxumBody::from("Consumer unavailable"))
599 .expect("infallible");
600 }
601
602 match reply_rx.await {
603 Ok(reply) => {
604 let reply = match reply.body {
605 HttpReplyBody::Bytes(b)
606 if exceeds_max_response_body(b.len(), state.max_response_body) =>
607 {
608 HttpReply {
609 status: 500,
610 headers: vec![],
611 body: HttpReplyBody::Bytes(bytes::Bytes::from(
612 "Response body exceeds configured limit",
613 )),
614 }
615 }
616 _ => reply,
617 };
618
619 let status =
620 StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
621 let mut builder = Response::builder().status(status);
622 for (k, v) in &reply.headers {
623 builder = builder.header(k.as_str(), v.as_str());
624 }
625 match reply.body {
626 HttpReplyBody::Bytes(b) => builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
627 Response::builder()
628 .status(StatusCode::INTERNAL_SERVER_ERROR)
629 .body(AxumBody::from("Invalid response headers from consumer"))
630 .expect("infallible")
631 }),
632 HttpReplyBody::Stream(stream) => builder
633 .body(AxumBody::from_stream(stream))
634 .unwrap_or_else(|_| {
635 Response::builder()
636 .status(StatusCode::INTERNAL_SERVER_ERROR)
637 .body(AxumBody::from("Invalid response headers from consumer"))
638 .expect("infallible")
639 }),
640 }
641 }
642 Err(_) => Response::builder()
643 .status(StatusCode::INTERNAL_SERVER_ERROR)
644 .body(AxumBody::from("Pipeline error"))
645 .expect("Response::builder() with a known-valid status code and body is infallible"),
646 }
647}
648
649fn exceeds_max_response_body(len: usize, max: usize) -> bool {
650 len > max
651}
652
653pub struct HttpConsumer {
658 config: HttpServerConfig,
659}
660
661impl HttpConsumer {
662 pub fn new(config: HttpServerConfig) -> Self {
663 Self { config }
664 }
665}
666
667#[async_trait::async_trait]
668impl Consumer for HttpConsumer {
669 async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
670 use camel_component_api::{Body, Exchange, Message};
671
672 let dispatch = ServerRegistry::global()
673 .get_or_spawn(
674 &self.config.host,
675 self.config.port,
676 self.config.max_request_body,
677 self.config.max_response_body,
678 self.config.max_inflight_requests,
679 )
680 .await?;
681
682 let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
684 {
685 let mut table = dispatch.write().await;
686 table.insert(self.config.path.clone(), env_tx);
687 }
688
689 let path = self.config.path.clone();
690 let cancel_token = ctx.cancel_token();
691 loop {
692 tokio::select! {
693 _ = ctx.cancelled() => {
694 break;
695 }
696 envelope = env_rx.recv() => {
697 let Some(envelope) = envelope else { break; };
698
699 let mut msg = Message::default();
701
702 msg.set_header("CamelHttpMethod",
704 serde_json::Value::String(envelope.method.clone()));
705 msg.set_header("CamelHttpPath",
706 serde_json::Value::String(envelope.path.clone()));
707 msg.set_header("CamelHttpQuery",
708 serde_json::Value::String(envelope.query.clone()));
709
710 for (k, v) in &envelope.headers {
712 if let Ok(val_str) = v.to_str() {
713 msg.set_header(
714 k.as_str(),
715 serde_json::Value::String(val_str.to_string()),
716 );
717 }
718 }
719
720 msg.body = Body::Stream(envelope.body);
723
724 #[allow(unused_mut)]
725 let mut exchange = Exchange::new(msg);
726
727 #[cfg(feature = "otel")]
729 {
730 let headers: HashMap<String, String> = envelope
731 .headers
732 .iter()
733 .filter_map(|(k, v)| {
734 Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
735 })
736 .collect();
737 camel_otel::extract_into_exchange(&mut exchange, &headers);
738 }
739
740 let reply_tx = envelope.reply_tx;
741 let sender = ctx.sender().clone();
742 let path_clone = path.clone();
743 let cancel = cancel_token.clone();
744
745 tokio::spawn(async move {
765 if cancel.is_cancelled() {
773 let _ = reply_tx.send(HttpReply {
774 status: 503,
775 headers: vec![],
776 body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
777 });
778 return;
779 }
780
781 let (tx, rx) = tokio::sync::oneshot::channel();
783 let envelope = camel_component_api::consumer::ExchangeEnvelope {
784 exchange,
785 reply_tx: Some(tx),
786 };
787
788 let result = match sender.send(envelope).await {
789 Ok(()) => rx.await.map_err(|_| camel_component_api::CamelError::ChannelClosed),
790 Err(_) => Err(camel_component_api::CamelError::ChannelClosed),
791 }
792 .and_then(|r| r);
793
794 let reply = match result {
795 Ok(out) => {
796 let status = out
797 .input
798 .header("CamelHttpResponseCode")
799 .and_then(|v| v.as_u64())
800 .map(|s| s as u16)
801 .unwrap_or(200);
802
803 let reply_body: HttpReplyBody = match out.input.body {
804 Body::Empty => HttpReplyBody::Bytes(bytes::Bytes::new()),
805 Body::Bytes(b) => HttpReplyBody::Bytes(b),
806 Body::Text(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
807 Body::Xml(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
808 Body::Json(v) => HttpReplyBody::Bytes(bytes::Bytes::from(
809 v.to_string().into_bytes(),
810 )),
811 Body::Stream(s) => {
812 match s.stream.lock().await.take() {
813 Some(stream) => HttpReplyBody::Stream(stream),
814 None => {
815 tracing::error!(
816 "Body::Stream already consumed before HTTP reply — returning 500"
817 );
818 let error_reply = HttpReply {
819 status: 500,
820 headers: vec![],
821 body: HttpReplyBody::Bytes(bytes::Bytes::new()),
822 };
823 if reply_tx.send(error_reply).is_err() {
824 debug!("reply_tx dropped before error reply could be sent");
825 }
826 return;
827 }
828 }
829 }
830 };
831
832 let resp_headers: Vec<(String, String)> = out
833 .input
834 .headers
835 .iter()
836 .filter(|(k, _)| !k.starts_with("Camel"))
838 .filter(|(k, _)| {
841 !matches!(
842 k.to_lowercase().as_str(),
843 "content-length" | "content-type" | "transfer-encoding" | "connection" | "cache-control" | "date" | "pragma" | "trailer" | "upgrade" | "via" | "warning" | "host" | "user-agent" | "accept" | "accept-encoding" | "accept-language" | "accept-charset" | "authorization" | "proxy-authorization" | "cookie" | "expect" | "from" | "if-match" | "if-modified-since" | "if-none-match" | "if-range" | "if-unmodified-since" | "max-forwards" | "proxy-connection" | "range" | "referer" | "te" )
878 })
879 .filter_map(|(k, v)| {
880 v.as_str().map(|s| (k.clone(), s.to_string()))
881 })
882 .collect();
883
884 HttpReply {
885 status,
886 headers: resp_headers,
887 body: reply_body,
888 }
889 }
890 Err(e) => {
891 tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
892 HttpReply {
893 status: 500,
894 headers: vec![],
895 body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
896 }
897 }
898 };
899
900 let _ = reply_tx.send(reply);
902 });
903 }
904 }
905 }
906
907 {
909 let mut table = dispatch.write().await;
910 table.remove(&path);
911 }
912
913 Ok(())
914 }
915
916 async fn stop(&mut self) -> Result<(), CamelError> {
917 Ok(())
918 }
919
920 fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
921 camel_component_api::ConcurrencyModel::Concurrent { max: None }
922 }
923}
924
925pub struct HttpComponent {
930 client: reqwest::Client,
931 config: HttpConfig,
932}
933
934fn build_client(config: &HttpConfig) -> reqwest::Client {
935 let mut builder = reqwest::Client::builder()
936 .connect_timeout(Duration::from_millis(config.connect_timeout_ms))
937 .pool_max_idle_per_host(config.pool_max_idle_per_host)
938 .pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
939
940 if !config.follow_redirects {
941 builder = builder.redirect(reqwest::redirect::Policy::none());
942 }
943
944 builder
945 .build()
946 .expect("reqwest::Client::build() with valid config should not fail")
947}
948
949impl HttpComponent {
950 pub fn new() -> Self {
951 let config = HttpConfig::default();
952 let client = build_client(&config);
953 Self { client, config }
954 }
955
956 pub fn with_config(config: HttpConfig) -> Self {
957 let client = build_client(&config);
958 Self { client, config }
959 }
960
961 pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
962 match config {
963 Some(cfg) => Self::with_config(cfg),
964 None => Self::new(),
965 }
966 }
967}
968
969impl Default for HttpComponent {
970 fn default() -> Self {
971 Self::new()
972 }
973}
974
975impl Component for HttpComponent {
976 fn scheme(&self) -> &str {
977 "http"
978 }
979
980 fn create_endpoint(
981 &self,
982 uri: &str,
983 _ctx: &dyn camel_component_api::ComponentContext,
984 ) -> Result<Box<dyn Endpoint>, CamelError> {
985 let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
986 let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
987 Ok(Box::new(HttpEndpoint {
988 uri: uri.to_string(),
989 config,
990 server_config,
991 client: self.client.clone(),
992 }))
993 }
994}
995
996pub struct HttpsComponent {
997 client: reqwest::Client,
998 config: HttpConfig,
999}
1000
1001impl HttpsComponent {
1002 pub fn new() -> Self {
1003 let config = HttpConfig::default();
1004 let client = build_client(&config);
1005 Self { client, config }
1006 }
1007
1008 pub fn with_config(config: HttpConfig) -> Self {
1009 let client = build_client(&config);
1010 Self { client, config }
1011 }
1012
1013 pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1014 match config {
1015 Some(cfg) => Self::with_config(cfg),
1016 None => Self::new(),
1017 }
1018 }
1019}
1020
1021impl Default for HttpsComponent {
1022 fn default() -> Self {
1023 Self::new()
1024 }
1025}
1026
1027impl Component for HttpsComponent {
1028 fn scheme(&self) -> &str {
1029 "https"
1030 }
1031
1032 fn create_endpoint(
1033 &self,
1034 uri: &str,
1035 _ctx: &dyn camel_component_api::ComponentContext,
1036 ) -> Result<Box<dyn Endpoint>, CamelError> {
1037 let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1038 let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1039 Ok(Box::new(HttpEndpoint {
1040 uri: uri.to_string(),
1041 config,
1042 server_config,
1043 client: self.client.clone(),
1044 }))
1045 }
1046}
1047
1048struct HttpEndpoint {
1053 uri: String,
1054 config: HttpEndpointConfig,
1055 server_config: HttpServerConfig,
1056 client: reqwest::Client,
1057}
1058
1059impl Endpoint for HttpEndpoint {
1060 fn uri(&self) -> &str {
1061 &self.uri
1062 }
1063
1064 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1065 Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
1066 }
1067
1068 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1069 Ok(BoxProcessor::new(HttpProducer {
1070 config: Arc::new(self.config.clone()),
1071 client: self.client.clone(),
1072 }))
1073 }
1074}
1075
1076fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
1081 let parsed = url::Url::parse(url)
1082 .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1083
1084 if let Some(host) = parsed.host_str()
1086 && config.blocked_hosts.iter().any(|blocked| host == blocked)
1087 {
1088 return Err(CamelError::ProcessorError(format!(
1089 "Host '{}' is blocked",
1090 host
1091 )));
1092 }
1093
1094 if !config.allow_private_ips
1096 && let Some(host) = parsed.host()
1097 {
1098 match host {
1099 url::Host::Ipv4(ip) => {
1100 if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
1101 return Err(CamelError::ProcessorError(format!(
1102 "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
1103 ip
1104 )));
1105 }
1106 }
1107 url::Host::Ipv6(ip) => {
1108 if ip.is_loopback() {
1109 return Err(CamelError::ProcessorError(format!(
1110 "Loopback IP '{}' not allowed",
1111 ip
1112 )));
1113 }
1114 }
1115 url::Host::Domain(domain) => {
1116 let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
1118 if blocked_domains.contains(&domain) {
1119 return Err(CamelError::ProcessorError(format!(
1120 "Domain '{}' is not allowed",
1121 domain
1122 )));
1123 }
1124 }
1125 }
1126 }
1127
1128 Ok(())
1129}
1130
1131#[derive(Clone)]
1136struct HttpProducer {
1137 config: Arc<HttpEndpointConfig>,
1138 client: reqwest::Client,
1139}
1140
1141impl HttpProducer {
1142 fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1143 if let Some(ref method) = config.http_method {
1144 return method.to_uppercase();
1145 }
1146 if let Some(method) = exchange
1147 .input
1148 .header("CamelHttpMethod")
1149 .and_then(|v| v.as_str())
1150 {
1151 return method.to_uppercase();
1152 }
1153 if !exchange.input.body.is_empty() {
1154 return "POST".to_string();
1155 }
1156 "GET".to_string()
1157 }
1158
1159 fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1160 if let Some(uri) = exchange
1161 .input
1162 .header("CamelHttpUri")
1163 .and_then(|v| v.as_str())
1164 {
1165 let mut url = uri.to_string();
1166 if let Some(path) = exchange
1167 .input
1168 .header("CamelHttpPath")
1169 .and_then(|v| v.as_str())
1170 {
1171 if !url.ends_with('/') && !path.starts_with('/') {
1172 url.push('/');
1173 }
1174 url.push_str(path);
1175 }
1176 if let Some(query) = exchange
1177 .input
1178 .header("CamelHttpQuery")
1179 .and_then(|v| v.as_str())
1180 {
1181 url.push('?');
1182 url.push_str(query);
1183 }
1184 return url;
1185 }
1186
1187 let mut url = config.base_url.clone();
1188
1189 if let Some(path) = exchange
1190 .input
1191 .header("CamelHttpPath")
1192 .and_then(|v| v.as_str())
1193 {
1194 if !url.ends_with('/') && !path.starts_with('/') {
1195 url.push('/');
1196 }
1197 url.push_str(path);
1198 }
1199
1200 if let Some(query) = exchange
1201 .input
1202 .header("CamelHttpQuery")
1203 .and_then(|v| v.as_str())
1204 {
1205 url.push('?');
1206 url.push_str(query);
1207 } else if !config.query_params.is_empty() {
1208 url.push('?');
1210 let query_string: String = config
1211 .query_params
1212 .iter()
1213 .map(|(k, v)| format!("{k}={v}"))
1214 .collect::<Vec<_>>()
1215 .join("&");
1216 url.push_str(&query_string);
1217 }
1218
1219 url
1220 }
1221
1222 fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1223 status >= range.0 && status <= range.1
1224 }
1225}
1226
1227impl Service<Exchange> for HttpProducer {
1228 type Response = Exchange;
1229 type Error = CamelError;
1230 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1231
1232 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1233 Poll::Ready(Ok(()))
1234 }
1235
1236 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1237 let config = self.config.clone();
1238 let client = self.client.clone();
1239
1240 Box::pin(async move {
1241 let method_str = HttpProducer::resolve_method(&exchange, &config);
1242 let url = HttpProducer::resolve_url(&exchange, &config);
1243
1244 validate_url_for_ssrf(&url, &config)?;
1246
1247 debug!(
1248 correlation_id = %exchange.correlation_id(),
1249 method = %method_str,
1250 url = %url,
1251 "HTTP request"
1252 );
1253
1254 let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1255 CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1256 })?;
1257
1258 let mut request = client.request(method, &url);
1259
1260 if let Some(timeout) = config.response_timeout {
1261 request = request.timeout(timeout);
1262 }
1263
1264 #[cfg(feature = "otel")]
1266 {
1267 let mut otel_headers = HashMap::new();
1268 camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1269 for (k, v) in otel_headers {
1270 if let (Ok(name), Ok(val)) = (
1271 reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1272 reqwest::header::HeaderValue::from_str(&v),
1273 ) {
1274 request = request.header(name, val);
1275 }
1276 }
1277 }
1278
1279 for (key, value) in &exchange.input.headers {
1280 if !key.starts_with("Camel")
1281 && let Some(val_str) = value.as_str()
1282 && let (Ok(name), Ok(val)) = (
1283 reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1284 reqwest::header::HeaderValue::from_str(val_str),
1285 )
1286 {
1287 request = request.header(name, val);
1288 }
1289 }
1290
1291 match exchange.input.body {
1292 Body::Stream(ref s) => {
1293 let mut stream_lock = s.stream.lock().await;
1294 if let Some(stream) = stream_lock.take() {
1295 request = request.body(reqwest::Body::wrap_stream(stream));
1296 } else {
1297 return Err(CamelError::AlreadyConsumed);
1298 }
1299 }
1300 _ => {
1301 let body = std::mem::take(&mut exchange.input.body);
1303 let bytes = body.into_bytes(config.max_body_size).await?;
1304 if !bytes.is_empty() {
1305 request = request.body(bytes);
1306 }
1307 }
1308 }
1309
1310 let response = request
1311 .send()
1312 .await
1313 .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1314
1315 let status_code = response.status().as_u16();
1316 let status_text = response
1317 .status()
1318 .canonical_reason()
1319 .unwrap_or("Unknown")
1320 .to_string();
1321
1322 for (key, value) in response.headers() {
1323 if let Ok(val_str) = value.to_str() {
1324 exchange
1325 .input
1326 .set_header(key.as_str(), serde_json::Value::String(val_str.to_string()));
1327 }
1328 }
1329
1330 exchange.input.set_header(
1331 "CamelHttpResponseCode",
1332 serde_json::Value::Number(status_code.into()),
1333 );
1334 exchange.input.set_header(
1335 "CamelHttpResponseText",
1336 serde_json::Value::String(status_text.clone()),
1337 );
1338
1339 let response_body = response.bytes().await.map_err(|e| {
1340 CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1341 })?;
1342
1343 if config.throw_exception_on_failure
1344 && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1345 {
1346 return Err(CamelError::HttpOperationFailed {
1347 method: method_str,
1348 url,
1349 status_code,
1350 status_text,
1351 response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1352 });
1353 }
1354
1355 if !response_body.is_empty() {
1356 exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1357 }
1358
1359 debug!(
1360 correlation_id = %exchange.correlation_id(),
1361 status = status_code,
1362 url = %url,
1363 "HTTP response"
1364 );
1365 Ok(exchange)
1366 })
1367 }
1368}
1369
1370#[cfg(test)]
1371mod tests {
1372 use super::*;
1373 use camel_component_api::{Message, NoOpComponentContext};
1374 use std::sync::Arc;
1375 use std::time::Duration;
1376
1377 fn test_producer_ctx() -> ProducerContext {
1378 ProducerContext::new()
1379 }
1380
1381 #[test]
1382 fn test_http_config_defaults() {
1383 let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1384 assert_eq!(config.base_url, "http://localhost:8080/api");
1385 assert!(config.http_method.is_none());
1386 assert!(config.throw_exception_on_failure);
1387 assert_eq!(config.ok_status_code_range, (200, 299));
1388 assert!(config.response_timeout.is_none());
1389 }
1390
1391 #[test]
1392 fn test_http_config_scheme() {
1393 assert_eq!(HttpEndpointConfig::scheme(), "http");
1395 }
1396
1397 #[test]
1398 fn test_http_config_from_components() {
1399 let components = camel_component_api::UriComponents {
1401 scheme: "https".to_string(),
1402 path: "//api.example.com/v1".to_string(),
1403 params: std::collections::HashMap::from([(
1404 "httpMethod".to_string(),
1405 "POST".to_string(),
1406 )]),
1407 };
1408 let config = HttpEndpointConfig::from_components(components).unwrap();
1409 assert_eq!(config.base_url, "https://api.example.com/v1");
1410 assert_eq!(config.http_method, Some("POST".to_string()));
1411 }
1412
1413 #[test]
1414 fn test_http_config_with_options() {
1415 let config = HttpEndpointConfig::from_uri(
1416 "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1417 ).unwrap();
1418 assert_eq!(config.base_url, "https://api.example.com/v1");
1419 assert_eq!(config.http_method, Some("PUT".to_string()));
1420 assert!(!config.throw_exception_on_failure);
1421 assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1422 }
1423
1424 #[test]
1425 fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
1426 let config = HttpConfig::default()
1427 .with_response_timeout_ms(999)
1428 .with_allow_private_ips(true)
1429 .with_blocked_hosts(vec!["evil.com".to_string()])
1430 .with_max_body_size(12345);
1431 let endpoint =
1432 HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
1433 assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
1434 assert!(endpoint.allow_private_ips);
1435 assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
1436 assert_eq!(endpoint.max_body_size, 12345);
1437 }
1438
1439 #[test]
1440 fn test_from_uri_with_defaults_uri_overrides_config() {
1441 let config = HttpConfig::default()
1442 .with_response_timeout_ms(999)
1443 .with_allow_private_ips(true)
1444 .with_blocked_hosts(vec!["evil.com".to_string()])
1445 .with_max_body_size(12345);
1446 let endpoint = HttpEndpointConfig::from_uri_with_defaults(
1447 "http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
1448 &config,
1449 )
1450 .unwrap();
1451 assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
1452 assert!(!endpoint.allow_private_ips);
1453 assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
1454 assert_eq!(endpoint.max_body_size, 99);
1455 }
1456
1457 #[test]
1458 fn test_http_config_ok_status_range() {
1459 let config =
1460 HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1461 assert_eq!(config.ok_status_code_range, (200, 204));
1462 }
1463
1464 #[test]
1465 fn test_http_config_wrong_scheme() {
1466 let result = HttpEndpointConfig::from_uri("file:/tmp");
1467 assert!(result.is_err());
1468 }
1469
1470 #[test]
1471 fn test_http_component_scheme() {
1472 let component = HttpComponent::new();
1473 assert_eq!(component.scheme(), "http");
1474 }
1475
1476 #[test]
1477 fn test_https_component_scheme() {
1478 let component = HttpsComponent::new();
1479 assert_eq!(component.scheme(), "https");
1480 }
1481
1482 #[test]
1483 fn test_http_endpoint_creates_consumer() {
1484 let component = HttpComponent::new();
1485 let ctx = NoOpComponentContext;
1486 let endpoint = component
1487 .create_endpoint("http://0.0.0.0:19100/test", &ctx)
1488 .unwrap();
1489 assert!(endpoint.create_consumer().is_ok());
1490 }
1491
1492 #[test]
1493 fn test_https_endpoint_creates_consumer() {
1494 let component = HttpsComponent::new();
1495 let ctx = NoOpComponentContext;
1496 let endpoint = component
1497 .create_endpoint("https://0.0.0.0:8443/test", &ctx)
1498 .unwrap();
1499 assert!(endpoint.create_consumer().is_ok());
1500 }
1501
1502 #[test]
1503 fn test_http_endpoint_creates_producer() {
1504 let ctx = test_producer_ctx();
1505 let component = HttpComponent::new();
1506 let endpoint_ctx = NoOpComponentContext;
1507 let endpoint = component
1508 .create_endpoint("http://localhost/api", &endpoint_ctx)
1509 .unwrap();
1510 assert!(endpoint.create_producer(&ctx).is_ok());
1511 }
1512
1513 async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
1518 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1519 let addr = listener.local_addr().unwrap();
1520 let url = format!("http://127.0.0.1:{}", addr.port());
1521
1522 let handle = tokio::spawn(async move {
1523 loop {
1524 if let Ok((mut stream, _)) = listener.accept().await {
1525 tokio::spawn(async move {
1526 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1527 let mut buf = vec![0u8; 4096];
1528 let n = stream.read(&mut buf).await.unwrap_or(0);
1529 let request = String::from_utf8_lossy(&buf[..n]).to_string();
1530
1531 let method = request.split_whitespace().next().unwrap_or("GET");
1532
1533 let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
1534 let response = format!(
1535 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
1536 body.len(),
1537 body
1538 );
1539 let _ = stream.write_all(response.as_bytes()).await;
1540 });
1541 }
1542 }
1543 });
1544
1545 (url, handle)
1546 }
1547
1548 async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
1549 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1550 let addr = listener.local_addr().unwrap();
1551 let url = format!("http://127.0.0.1:{}", addr.port());
1552
1553 let handle = tokio::spawn(async move {
1554 loop {
1555 if let Ok((mut stream, _)) = listener.accept().await {
1556 let status = status;
1557 tokio::spawn(async move {
1558 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1559 let mut buf = vec![0u8; 4096];
1560 let _ = stream.read(&mut buf).await;
1561
1562 let status_text = match status {
1563 404 => "Not Found",
1564 500 => "Internal Server Error",
1565 _ => "Error",
1566 };
1567 let body = "error body";
1568 let response = format!(
1569 "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
1570 status,
1571 status_text,
1572 body.len(),
1573 body
1574 );
1575 let _ = stream.write_all(response.as_bytes()).await;
1576 });
1577 }
1578 }
1579 });
1580
1581 (url, handle)
1582 }
1583
1584 #[tokio::test]
1585 async fn test_http_producer_get_request() {
1586 use tower::ServiceExt;
1587
1588 let (url, _handle) = start_test_server().await;
1589 let ctx = test_producer_ctx();
1590
1591 let component = HttpComponent::new();
1592 let endpoint_ctx = NoOpComponentContext;
1593 let endpoint = component
1594 .create_endpoint(
1595 &format!("{url}/api/test?allowPrivateIps=true"),
1596 &endpoint_ctx,
1597 )
1598 .unwrap();
1599 let producer = endpoint.create_producer(&ctx).unwrap();
1600
1601 let exchange = Exchange::new(Message::default());
1602 let result = producer.oneshot(exchange).await.unwrap();
1603
1604 let status = result
1605 .input
1606 .header("CamelHttpResponseCode")
1607 .and_then(|v| v.as_u64())
1608 .unwrap();
1609 assert_eq!(status, 200);
1610
1611 assert!(!result.input.body.is_empty());
1612 }
1613
1614 #[tokio::test]
1615 async fn test_http_producer_post_with_body() {
1616 use tower::ServiceExt;
1617
1618 let (url, _handle) = start_test_server().await;
1619 let ctx = test_producer_ctx();
1620
1621 let component = HttpComponent::new();
1622 let endpoint_ctx = NoOpComponentContext;
1623 let endpoint = component
1624 .create_endpoint(
1625 &format!("{url}/api/data?allowPrivateIps=true"),
1626 &endpoint_ctx,
1627 )
1628 .unwrap();
1629 let producer = endpoint.create_producer(&ctx).unwrap();
1630
1631 let exchange = Exchange::new(Message::new("request body"));
1632 let result = producer.oneshot(exchange).await.unwrap();
1633
1634 let status = result
1635 .input
1636 .header("CamelHttpResponseCode")
1637 .and_then(|v| v.as_u64())
1638 .unwrap();
1639 assert_eq!(status, 200);
1640 }
1641
1642 #[tokio::test]
1643 async fn test_http_producer_method_from_header() {
1644 use tower::ServiceExt;
1645
1646 let (url, _handle) = start_test_server().await;
1647 let ctx = test_producer_ctx();
1648
1649 let component = HttpComponent::new();
1650 let endpoint_ctx = NoOpComponentContext;
1651 let endpoint = component
1652 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
1653 .unwrap();
1654 let producer = endpoint.create_producer(&ctx).unwrap();
1655
1656 let mut exchange = Exchange::new(Message::default());
1657 exchange.input.set_header(
1658 "CamelHttpMethod",
1659 serde_json::Value::String("DELETE".to_string()),
1660 );
1661
1662 let result = producer.oneshot(exchange).await.unwrap();
1663 let status = result
1664 .input
1665 .header("CamelHttpResponseCode")
1666 .and_then(|v| v.as_u64())
1667 .unwrap();
1668 assert_eq!(status, 200);
1669 }
1670
1671 #[tokio::test]
1672 async fn test_http_producer_forced_method() {
1673 use tower::ServiceExt;
1674
1675 let (url, _handle) = start_test_server().await;
1676 let ctx = test_producer_ctx();
1677
1678 let component = HttpComponent::new();
1679 let endpoint_ctx = NoOpComponentContext;
1680 let endpoint = component
1681 .create_endpoint(
1682 &format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"),
1683 &endpoint_ctx,
1684 )
1685 .unwrap();
1686 let producer = endpoint.create_producer(&ctx).unwrap();
1687
1688 let exchange = Exchange::new(Message::default());
1689 let result = producer.oneshot(exchange).await.unwrap();
1690
1691 let status = result
1692 .input
1693 .header("CamelHttpResponseCode")
1694 .and_then(|v| v.as_u64())
1695 .unwrap();
1696 assert_eq!(status, 200);
1697 }
1698
1699 #[tokio::test]
1700 async fn test_http_producer_throw_exception_on_failure() {
1701 use tower::ServiceExt;
1702
1703 let (url, _handle) = start_status_server(404).await;
1704 let ctx = test_producer_ctx();
1705
1706 let component = HttpComponent::new();
1707 let endpoint_ctx = NoOpComponentContext;
1708 let endpoint = component
1709 .create_endpoint(
1710 &format!("{url}/not-found?allowPrivateIps=true"),
1711 &endpoint_ctx,
1712 )
1713 .unwrap();
1714 let producer = endpoint.create_producer(&ctx).unwrap();
1715
1716 let exchange = Exchange::new(Message::default());
1717 let result = producer.oneshot(exchange).await;
1718 assert!(result.is_err());
1719
1720 match result.unwrap_err() {
1721 CamelError::HttpOperationFailed { status_code, .. } => {
1722 assert_eq!(status_code, 404);
1723 }
1724 e => panic!("Expected HttpOperationFailed, got: {e}"),
1725 }
1726 }
1727
1728 #[tokio::test]
1729 async fn test_http_producer_no_throw_on_failure() {
1730 use tower::ServiceExt;
1731
1732 let (url, _handle) = start_status_server(500).await;
1733 let ctx = test_producer_ctx();
1734
1735 let component = HttpComponent::new();
1736 let endpoint_ctx = NoOpComponentContext;
1737 let endpoint = component
1738 .create_endpoint(
1739 &format!("{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"),
1740 &endpoint_ctx,
1741 )
1742 .unwrap();
1743 let producer = endpoint.create_producer(&ctx).unwrap();
1744
1745 let exchange = Exchange::new(Message::default());
1746 let result = producer.oneshot(exchange).await.unwrap();
1747
1748 let status = result
1749 .input
1750 .header("CamelHttpResponseCode")
1751 .and_then(|v| v.as_u64())
1752 .unwrap();
1753 assert_eq!(status, 500);
1754 }
1755
1756 #[tokio::test]
1757 async fn test_http_producer_uri_override() {
1758 use tower::ServiceExt;
1759
1760 let (url, _handle) = start_test_server().await;
1761 let ctx = test_producer_ctx();
1762
1763 let component = HttpComponent::new();
1764 let endpoint_ctx = NoOpComponentContext;
1765 let endpoint = component
1766 .create_endpoint(
1767 "http://localhost:1/does-not-exist?allowPrivateIps=true",
1768 &endpoint_ctx,
1769 )
1770 .unwrap();
1771 let producer = endpoint.create_producer(&ctx).unwrap();
1772
1773 let mut exchange = Exchange::new(Message::default());
1774 exchange.input.set_header(
1775 "CamelHttpUri",
1776 serde_json::Value::String(format!("{url}/api")),
1777 );
1778
1779 let result = producer.oneshot(exchange).await.unwrap();
1780 let status = result
1781 .input
1782 .header("CamelHttpResponseCode")
1783 .and_then(|v| v.as_u64())
1784 .unwrap();
1785 assert_eq!(status, 200);
1786 }
1787
1788 #[tokio::test]
1789 async fn test_http_producer_response_headers_mapped() {
1790 use tower::ServiceExt;
1791
1792 let (url, _handle) = start_test_server().await;
1793 let ctx = test_producer_ctx();
1794
1795 let component = HttpComponent::new();
1796 let endpoint_ctx = NoOpComponentContext;
1797 let endpoint = component
1798 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
1799 .unwrap();
1800 let producer = endpoint.create_producer(&ctx).unwrap();
1801
1802 let exchange = Exchange::new(Message::default());
1803 let result = producer.oneshot(exchange).await.unwrap();
1804
1805 assert!(
1806 result.input.header("content-type").is_some()
1807 || result.input.header("Content-Type").is_some()
1808 );
1809 assert!(result.input.header("CamelHttpResponseText").is_some());
1810 }
1811
1812 async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
1817 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1818 let addr = listener.local_addr().unwrap();
1819 let url = format!("http://127.0.0.1:{}", addr.port());
1820
1821 let handle = tokio::spawn(async move {
1822 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1823 loop {
1824 if let Ok((mut stream, _)) = listener.accept().await {
1825 tokio::spawn(async move {
1826 let mut buf = vec![0u8; 4096];
1827 let n = stream.read(&mut buf).await.unwrap_or(0);
1828 let request = String::from_utf8_lossy(&buf[..n]).to_string();
1829
1830 if request.contains("GET /final") {
1832 let body = r#"{"status":"final"}"#;
1833 let response = format!(
1834 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1835 body.len(),
1836 body
1837 );
1838 let _ = stream.write_all(response.as_bytes()).await;
1839 } else {
1840 let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
1842 let _ = stream.write_all(response.as_bytes()).await;
1843 }
1844 });
1845 }
1846 }
1847 });
1848
1849 (url, handle)
1850 }
1851
1852 #[tokio::test]
1853 async fn test_follow_redirects_false_does_not_follow() {
1854 use tower::ServiceExt;
1855
1856 let (url, _handle) = start_redirect_server().await;
1857 let ctx = test_producer_ctx();
1858
1859 let component =
1860 HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
1861 let endpoint_ctx = NoOpComponentContext;
1862 let endpoint = component
1863 .create_endpoint(
1864 &format!("{url}?throwExceptionOnFailure=false&allowPrivateIps=true"),
1865 &endpoint_ctx,
1866 )
1867 .unwrap();
1868 let producer = endpoint.create_producer(&ctx).unwrap();
1869
1870 let exchange = Exchange::new(Message::default());
1871 let result = producer.oneshot(exchange).await.unwrap();
1872
1873 let status = result
1875 .input
1876 .header("CamelHttpResponseCode")
1877 .and_then(|v| v.as_u64())
1878 .unwrap();
1879 assert_eq!(
1880 status, 302,
1881 "Should NOT follow redirect when followRedirects=false"
1882 );
1883 }
1884
1885 #[tokio::test]
1886 async fn test_follow_redirects_true_follows_redirect() {
1887 use tower::ServiceExt;
1888
1889 let (url, _handle) = start_redirect_server().await;
1890 let ctx = test_producer_ctx();
1891
1892 let component =
1893 HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
1894 let endpoint_ctx = NoOpComponentContext;
1895 let endpoint = component
1896 .create_endpoint(&format!("{url}?allowPrivateIps=true"), &endpoint_ctx)
1897 .unwrap();
1898 let producer = endpoint.create_producer(&ctx).unwrap();
1899
1900 let exchange = Exchange::new(Message::default());
1901 let result = producer.oneshot(exchange).await.unwrap();
1902
1903 let status = result
1905 .input
1906 .header("CamelHttpResponseCode")
1907 .and_then(|v| v.as_u64())
1908 .unwrap();
1909 assert_eq!(
1910 status, 200,
1911 "Should follow redirect when followRedirects=true"
1912 );
1913 }
1914
1915 #[tokio::test]
1916 async fn test_query_params_forwarded_to_http_request() {
1917 use tower::ServiceExt;
1918
1919 let (url, _handle) = start_test_server().await;
1920 let ctx = test_producer_ctx();
1921
1922 let component = HttpComponent::new();
1923 let endpoint_ctx = NoOpComponentContext;
1924 let endpoint = component
1926 .create_endpoint(
1927 &format!("{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"),
1928 &endpoint_ctx,
1929 )
1930 .unwrap();
1931 let producer = endpoint.create_producer(&ctx).unwrap();
1932
1933 let exchange = Exchange::new(Message::default());
1934 let result = producer.oneshot(exchange).await.unwrap();
1935
1936 let status = result
1939 .input
1940 .header("CamelHttpResponseCode")
1941 .and_then(|v| v.as_u64())
1942 .unwrap();
1943 assert_eq!(status, 200);
1944 }
1945
1946 #[tokio::test]
1947 async fn test_non_camel_query_params_are_forwarded() {
1948 let config = HttpEndpointConfig::from_uri(
1951 "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
1952 )
1953 .unwrap();
1954
1955 assert!(
1957 config.query_params.contains_key("apiKey"),
1958 "apiKey should be preserved"
1959 );
1960 assert!(
1961 config.query_params.contains_key("token"),
1962 "token should be preserved"
1963 );
1964 assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
1965 assert_eq!(config.query_params.get("token").unwrap(), "abc456");
1966
1967 assert!(
1969 !config.query_params.contains_key("httpMethod"),
1970 "httpMethod should not be forwarded"
1971 );
1972 }
1973
1974 #[tokio::test]
1979 async fn test_http_producer_blocks_metadata_endpoint() {
1980 use tower::ServiceExt;
1981
1982 let ctx = test_producer_ctx();
1983 let component = HttpComponent::new();
1984 let endpoint_ctx = NoOpComponentContext;
1985 let endpoint = component
1986 .create_endpoint(
1987 "http://example.com/api?allowPrivateIps=false",
1988 &endpoint_ctx,
1989 )
1990 .unwrap();
1991 let producer = endpoint.create_producer(&ctx).unwrap();
1992
1993 let mut exchange = Exchange::new(Message::default());
1994 exchange.input.set_header(
1995 "CamelHttpUri",
1996 serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
1997 );
1998
1999 let result = producer.oneshot(exchange).await;
2000 assert!(result.is_err(), "Should block AWS metadata endpoint");
2001
2002 let err = result.unwrap_err();
2003 assert!(
2004 err.to_string().contains("Private IP"),
2005 "Error should mention private IP blocking, got: {}",
2006 err
2007 );
2008 }
2009
2010 #[test]
2011 fn test_ssrf_config_defaults() {
2012 let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
2013 assert!(
2014 !config.allow_private_ips,
2015 "Private IPs should be blocked by default"
2016 );
2017 assert!(
2018 config.blocked_hosts.is_empty(),
2019 "Blocked hosts should be empty by default"
2020 );
2021 }
2022
2023 #[test]
2024 fn test_ssrf_config_allow_private_ips() {
2025 let config =
2026 HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
2027 assert!(
2028 config.allow_private_ips,
2029 "Private IPs should be allowed when explicitly set"
2030 );
2031 }
2032
2033 #[test]
2034 fn test_ssrf_config_blocked_hosts() {
2035 let config = HttpEndpointConfig::from_uri(
2036 "http://example.com/api?blockedHosts=evil.com,malware.net",
2037 )
2038 .unwrap();
2039 assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
2040 }
2041
2042 #[tokio::test]
2043 async fn test_http_producer_blocks_localhost() {
2044 use tower::ServiceExt;
2045
2046 let ctx = test_producer_ctx();
2047 let component = HttpComponent::new();
2048 let endpoint_ctx = NoOpComponentContext;
2049 let endpoint = component
2050 .create_endpoint("http://example.com/api", &endpoint_ctx)
2051 .unwrap();
2052 let producer = endpoint.create_producer(&ctx).unwrap();
2053
2054 let mut exchange = Exchange::new(Message::default());
2055 exchange.input.set_header(
2056 "CamelHttpUri",
2057 serde_json::Value::String("http://localhost:8080/internal".to_string()),
2058 );
2059
2060 let result = producer.oneshot(exchange).await;
2061 assert!(result.is_err(), "Should block localhost");
2062 }
2063
2064 #[tokio::test]
2065 async fn test_http_producer_blocks_loopback_ip() {
2066 use tower::ServiceExt;
2067
2068 let ctx = test_producer_ctx();
2069 let component = HttpComponent::new();
2070 let endpoint_ctx = NoOpComponentContext;
2071 let endpoint = component
2072 .create_endpoint("http://example.com/api", &endpoint_ctx)
2073 .unwrap();
2074 let producer = endpoint.create_producer(&ctx).unwrap();
2075
2076 let mut exchange = Exchange::new(Message::default());
2077 exchange.input.set_header(
2078 "CamelHttpUri",
2079 serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
2080 );
2081
2082 let result = producer.oneshot(exchange).await;
2083 assert!(result.is_err(), "Should block loopback IP");
2084 }
2085
2086 #[tokio::test]
2087 async fn test_http_producer_allows_private_ip_when_enabled() {
2088 use tower::ServiceExt;
2089
2090 let ctx = test_producer_ctx();
2091 let component = HttpComponent::new();
2092 let endpoint_ctx = NoOpComponentContext;
2093 let endpoint = component
2096 .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true", &endpoint_ctx)
2097 .unwrap();
2098 let producer = endpoint.create_producer(&ctx).unwrap();
2099
2100 let exchange = Exchange::new(Message::default());
2101
2102 let result = producer.oneshot(exchange).await;
2105 if let Err(ref e) = result {
2107 let err_str = e.to_string();
2108 assert!(
2109 !err_str.contains("Private IP") && !err_str.contains("not allowed"),
2110 "Should not be SSRF error, got: {}",
2111 err_str
2112 );
2113 }
2114 }
2115
2116 #[test]
2121 fn test_http_server_config_parse() {
2122 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
2123 assert_eq!(cfg.host, "0.0.0.0");
2124 assert_eq!(cfg.port, 8080);
2125 assert_eq!(cfg.path, "/orders");
2126 assert_eq!(cfg.max_inflight_requests, 1024);
2127 }
2128
2129 #[test]
2130 fn test_http_server_config_scheme() {
2131 assert_eq!(HttpServerConfig::scheme(), "http");
2133 }
2134
2135 #[test]
2136 fn test_http_server_config_from_components() {
2137 let components = camel_component_api::UriComponents {
2139 scheme: "https".to_string(),
2140 path: "//0.0.0.0:8443/api".to_string(),
2141 params: std::collections::HashMap::from([
2142 ("maxRequestBody".to_string(), "5242880".to_string()),
2143 ("maxInflightRequests".to_string(), "7".to_string()),
2144 ]),
2145 };
2146 let cfg = HttpServerConfig::from_components(components).unwrap();
2147 assert_eq!(cfg.host, "0.0.0.0");
2148 assert_eq!(cfg.port, 8443);
2149 assert_eq!(cfg.path, "/api");
2150 assert_eq!(cfg.max_request_body, 5242880);
2151 assert_eq!(cfg.max_inflight_requests, 7);
2152 }
2153
2154 #[test]
2155 fn test_http_server_config_default_path() {
2156 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
2157 assert_eq!(cfg.path, "/");
2158 }
2159
2160 #[test]
2161 fn test_http_server_config_wrong_scheme() {
2162 assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
2163 }
2164
2165 #[test]
2166 fn test_http_server_config_invalid_port() {
2167 assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
2168 }
2169
2170 #[test]
2171 fn test_http_server_config_default_port_by_scheme() {
2172 let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
2174 assert_eq!(cfg_http.port, 80);
2175
2176 let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
2178 assert_eq!(cfg_https.port, 443);
2179 }
2180
2181 #[test]
2182 fn test_request_envelope_and_reply_are_send() {
2183 fn assert_send<T: Send>() {}
2184 assert_send::<RequestEnvelope>();
2185 assert_send::<HttpReply>();
2186 }
2187
2188 #[test]
2193 fn test_server_registry_global_is_singleton() {
2194 let r1 = ServerRegistry::global();
2195 let r2 = ServerRegistry::global();
2196 assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
2197 }
2198
2199 #[tokio::test]
2200 async fn test_concurrent_get_or_spawn_returns_same_dispatch() {
2201 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2202 let port = listener.local_addr().unwrap().port();
2203 drop(listener);
2204
2205 let results: Arc<std::sync::Mutex<Vec<DispatchTable>>> =
2206 Arc::new(std::sync::Mutex::new(Vec::new()));
2207
2208 let mut handles = Vec::new();
2209 for _ in 0..4 {
2210 let results = results.clone();
2211 handles.push(tokio::spawn(async move {
2212 let dispatch = ServerRegistry::global()
2213 .get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2214 .await
2215 .unwrap();
2216 results.lock().unwrap().push(dispatch);
2217 }));
2218 }
2219
2220 for h in handles {
2221 h.await.unwrap();
2222 }
2223
2224 let dispatches = results.lock().unwrap();
2225 assert_eq!(dispatches.len(), 4);
2226 for i in 1..dispatches.len() {
2227 assert!(
2228 Arc::ptr_eq(&dispatches[0], &dispatches[i]),
2229 "all concurrent callers should get the same dispatch table"
2230 );
2231 }
2232 }
2233
2234 #[test]
2235 fn test_server_registry_distinguishes_host_and_port() {
2236 let rt = tokio::runtime::Runtime::new().expect("runtime");
2237 rt.block_on(async {
2238 let registry = ServerRegistry::global();
2239 let d1 = registry
2243 .get_or_spawn("127.0.0.1", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2244 .await;
2245 let d2 = registry
2246 .get_or_spawn("0.0.0.0", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2247 .await;
2248 assert!(d1.is_ok());
2249 assert!(d2.is_ok());
2250 assert!(!Arc::ptr_eq(&d1.unwrap(), &d2.unwrap()));
2251 });
2252 }
2253
2254 #[tokio::test]
2255 async fn test_shared_server_max_request_body_policy_is_deterministic() {
2256 let registry = ServerRegistry::global();
2257 let d1 = registry
2259 .get_or_spawn("127.0.0.1", 9991, 1024 * 1024, 10 * 1024 * 1024, 1024)
2260 .await;
2261 assert!(d1.is_ok());
2262
2263 let d2 = registry
2266 .get_or_spawn("127.0.0.1", 9991, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2267 .await;
2268 assert!(d2.is_err());
2269 let err = d2.unwrap_err();
2270 assert!(
2271 err.to_string().contains("maxRequestBody") || err.to_string().contains("incompatible"),
2272 "Expected incompatible maxRequestBody error, got: {}",
2273 err
2274 );
2275 }
2276
2277 #[tokio::test]
2282 async fn test_dispatch_handler_returns_404_for_unknown_path() {
2283 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
2284 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2286 let port = listener.local_addr().unwrap().port();
2287 tokio::spawn(run_axum_server(
2288 listener,
2289 dispatch,
2290 2 * 1024 * 1024,
2291 10 * 1024 * 1024,
2292 Arc::new(tokio::sync::Semaphore::new(1024)),
2293 ));
2294
2295 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2297
2298 let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
2299 .await
2300 .unwrap();
2301 assert_eq!(resp.status().as_u16(), 404);
2302 }
2303
2304 #[tokio::test]
2309 async fn test_http_consumer_start_registers_path() {
2310 use camel_component_api::ConsumerContext;
2311
2312 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2314 let port = listener.local_addr().unwrap().port();
2315 drop(listener); let consumer_cfg = HttpServerConfig {
2318 host: "127.0.0.1".to_string(),
2319 port,
2320 path: "/ping".to_string(),
2321 max_request_body: 2 * 1024 * 1024,
2322 max_response_body: 10 * 1024 * 1024,
2323 max_inflight_requests: 1024,
2324 };
2325 let mut consumer = HttpConsumer::new(consumer_cfg);
2326
2327 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
2328 let token = tokio_util::sync::CancellationToken::new();
2329 let ctx = ConsumerContext::new(tx, token.clone());
2330
2331 tokio::spawn(async move {
2332 consumer.start(ctx).await.unwrap();
2333 });
2334
2335 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2336
2337 let client = reqwest::Client::new();
2338 let resp_future = client
2339 .post(format!("http://127.0.0.1:{port}/ping"))
2340 .body("hello world")
2341 .send();
2342
2343 let (http_result, _) = tokio::join!(resp_future, async {
2344 if let Some(mut envelope) = rx.recv().await {
2345 envelope.exchange.input.set_header(
2347 "CamelHttpResponseCode",
2348 serde_json::Value::Number(201.into()),
2349 );
2350 if let Some(reply_tx) = envelope.reply_tx {
2351 let _ = reply_tx.send(Ok(envelope.exchange));
2352 }
2353 }
2354 });
2355
2356 let resp = http_result.unwrap();
2357 assert_eq!(resp.status().as_u16(), 201);
2358
2359 token.cancel();
2360 }
2361
2362 #[tokio::test]
2363 async fn test_http_consumer_returns_503_when_inflight_limit_reached() {
2364 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2365
2366 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2367 let port = listener.local_addr().unwrap().port();
2368 drop(listener);
2369
2370 let consumer_cfg = HttpServerConfig {
2371 host: "127.0.0.1".to_string(),
2372 port,
2373 path: "/saturation".to_string(),
2374 max_request_body: 2 * 1024 * 1024,
2375 max_response_body: 10 * 1024 * 1024,
2376 max_inflight_requests: 1,
2377 };
2378 let mut consumer = HttpConsumer::new(consumer_cfg);
2379
2380 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2381 let token = tokio_util::sync::CancellationToken::new();
2382 let ctx = ConsumerContext::new(tx, token.clone());
2383 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2384 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2385
2386 let (first_seen_tx, first_seen_rx) = tokio::sync::oneshot::channel::<()>();
2387 let (unblock_first_tx, unblock_first_rx) = tokio::sync::oneshot::channel::<()>();
2388
2389 tokio::spawn(async move {
2390 let mut first_seen_tx = Some(first_seen_tx);
2391 let mut unblock_first_rx = Some(unblock_first_rx);
2392
2393 while let Some(envelope) = rx.recv().await {
2394 if let Some(tx) = first_seen_tx.take() {
2395 let _ = tx.send(());
2396 if let Some(rx_unblock) = unblock_first_rx.take() {
2397 let _ = rx_unblock.await;
2398 }
2399 }
2400
2401 if let Some(reply_tx) = envelope.reply_tx {
2402 let _ = reply_tx.send(Ok(envelope.exchange));
2403 }
2404 }
2405 });
2406
2407 let client = reqwest::Client::new();
2408 let first_req = {
2409 let client = client.clone();
2410 async move {
2411 client
2412 .get(format!("http://127.0.0.1:{port}/saturation"))
2413 .send()
2414 .await
2415 .unwrap()
2416 }
2417 };
2418
2419 let first_handle = tokio::spawn(first_req);
2420 first_seen_rx.await.unwrap();
2421
2422 let second_resp = client
2423 .get(format!("http://127.0.0.1:{port}/saturation"))
2424 .send()
2425 .await
2426 .unwrap();
2427
2428 assert_eq!(second_resp.status().as_u16(), 503);
2429
2430 let _ = unblock_first_tx.send(());
2431 let first_resp = first_handle.await.unwrap();
2432 assert_eq!(first_resp.status().as_u16(), 200);
2433
2434 token.cancel();
2435 }
2436
2437 #[tokio::test]
2438 async fn test_http_consumer_enforces_max_response_body_for_bytes() {
2439 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2440
2441 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2442 let port = listener.local_addr().unwrap().port();
2443 drop(listener);
2444
2445 let consumer_cfg = HttpServerConfig {
2446 host: "127.0.0.1".to_string(),
2447 port,
2448 path: "/limit-bytes".to_string(),
2449 max_request_body: 2 * 1024 * 1024,
2450 max_response_body: 16,
2451 max_inflight_requests: 1024,
2452 };
2453 let mut consumer = HttpConsumer::new(consumer_cfg);
2454
2455 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2456 let token = tokio_util::sync::CancellationToken::new();
2457 let ctx = ConsumerContext::new(tx, token.clone());
2458 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2459 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2460
2461 let client = reqwest::Client::new();
2462 let send_fut = client
2463 .get(format!("http://127.0.0.1:{port}/limit-bytes"))
2464 .send();
2465
2466 let (http_result, _) = tokio::join!(send_fut, async {
2467 if let Some(mut envelope) = rx.recv().await {
2468 envelope.exchange.input.body =
2469 camel_component_api::Body::Bytes(bytes::Bytes::from(vec![b'x'; 32]));
2470 if let Some(reply_tx) = envelope.reply_tx {
2471 let _ = reply_tx.send(Ok(envelope.exchange));
2472 }
2473 }
2474 });
2475
2476 let resp = http_result.unwrap();
2477 assert_eq!(resp.status().as_u16(), 500);
2478 let body = resp.text().await.unwrap();
2479 assert_eq!(body, "Response body exceeds configured limit");
2480 token.cancel();
2481 }
2482
2483 #[tokio::test]
2484 async fn test_http_consumer_enforces_max_response_body_for_json() {
2485 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2486
2487 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2488 let port = listener.local_addr().unwrap().port();
2489 drop(listener);
2490
2491 let consumer_cfg = HttpServerConfig {
2492 host: "127.0.0.1".to_string(),
2493 port,
2494 path: "/limit-json".to_string(),
2495 max_request_body: 2 * 1024 * 1024,
2496 max_response_body: 16,
2497 max_inflight_requests: 1024,
2498 };
2499 let mut consumer = HttpConsumer::new(consumer_cfg);
2500
2501 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2502 let token = tokio_util::sync::CancellationToken::new();
2503 let ctx = ConsumerContext::new(tx, token.clone());
2504 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2505 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2506
2507 let client = reqwest::Client::new();
2508 let send_fut = client
2509 .get(format!("http://127.0.0.1:{port}/limit-json"))
2510 .send();
2511
2512 let (http_result, _) = tokio::join!(send_fut, async {
2513 if let Some(mut envelope) = rx.recv().await {
2514 envelope.exchange.input.body = camel_component_api::Body::Json(
2515 serde_json::json!({"message":"this response is bigger than sixteen"}),
2516 );
2517 if let Some(reply_tx) = envelope.reply_tx {
2518 let _ = reply_tx.send(Ok(envelope.exchange));
2519 }
2520 }
2521 });
2522
2523 let resp = http_result.unwrap();
2524 assert_eq!(resp.status().as_u16(), 500);
2525 let body = resp.text().await.unwrap();
2526 assert_eq!(body, "Response body exceeds configured limit");
2527 token.cancel();
2528 }
2529
2530 #[tokio::test]
2531 async fn test_http_consumer_enforces_max_response_body_for_xml() {
2532 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2533
2534 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2535 let port = listener.local_addr().unwrap().port();
2536 drop(listener);
2537
2538 let consumer_cfg = HttpServerConfig {
2539 host: "127.0.0.1".to_string(),
2540 port,
2541 path: "/limit-xml".to_string(),
2542 max_request_body: 2 * 1024 * 1024,
2543 max_response_body: 16,
2544 max_inflight_requests: 1024,
2545 };
2546 let mut consumer = HttpConsumer::new(consumer_cfg);
2547
2548 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2549 let token = tokio_util::sync::CancellationToken::new();
2550 let ctx = ConsumerContext::new(tx, token.clone());
2551 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2552 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2553
2554 let client = reqwest::Client::new();
2555 let send_fut = client
2556 .get(format!("http://127.0.0.1:{port}/limit-xml"))
2557 .send();
2558
2559 let (http_result, _) = tokio::join!(send_fut, async {
2560 if let Some(mut envelope) = rx.recv().await {
2561 envelope.exchange.input.body = camel_component_api::Body::Xml(
2562 "<root><value>way-too-large</value></root>".into(),
2563 );
2564 if let Some(reply_tx) = envelope.reply_tx {
2565 let _ = reply_tx.send(Ok(envelope.exchange));
2566 }
2567 }
2568 });
2569
2570 let resp = http_result.unwrap();
2571 assert_eq!(resp.status().as_u16(), 500);
2572 let body = resp.text().await.unwrap();
2573 assert_eq!(body, "Response body exceeds configured limit");
2574 token.cancel();
2575 }
2576
2577 #[tokio::test]
2578 async fn test_http_consumer_does_not_enforce_max_response_body_for_stream() {
2579 use camel_component_api::{
2580 CamelError, ConsumerContext, ExchangeEnvelope, StreamBody, StreamMetadata,
2581 };
2582 use futures::stream;
2583
2584 let listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();
2585 let port = listener.local_addr().unwrap().port();
2586 drop(listener);
2587
2588 let consumer_cfg = HttpServerConfig {
2589 host: "0.0.0.0".to_string(),
2590 port,
2591 path: "/limit-stream".to_string(),
2592 max_request_body: 2 * 1024 * 1024,
2593 max_response_body: 16,
2594 max_inflight_requests: 1024,
2595 };
2596 let mut consumer = HttpConsumer::new(consumer_cfg);
2597
2598 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2599 let token = tokio_util::sync::CancellationToken::new();
2600 let ctx = ConsumerContext::new(tx, token.clone());
2601 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2602 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2603
2604 let client = reqwest::Client::new();
2605 let send_fut = client
2606 .get(format!("http://127.0.0.1:{port}/limit-stream"))
2607 .send();
2608
2609 let (http_result, _) = tokio::join!(send_fut, async {
2610 if let Some(mut envelope) = rx.recv().await {
2611 let chunks: Vec<Result<bytes::Bytes, CamelError>> =
2612 vec![Ok(bytes::Bytes::from(vec![b'x'; 32]))];
2613 let stream = Box::pin(stream::iter(chunks));
2614 envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
2615 stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
2616 metadata: StreamMetadata {
2617 size_hint: Some(32),
2618 content_type: Some("application/octet-stream".into()),
2619 origin: None,
2620 },
2621 });
2622 if let Some(reply_tx) = envelope.reply_tx {
2623 let _ = reply_tx.send(Ok(envelope.exchange));
2624 }
2625 }
2626 });
2627
2628 let resp = http_result.unwrap();
2629 assert_eq!(resp.status().as_u16(), 200);
2630 let body = resp.bytes().await.unwrap();
2631 assert_eq!(body.len(), 32);
2632 token.cancel();
2633 }
2634
2635 #[tokio::test]
2640 async fn test_integration_single_consumer_round_trip() {
2641 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2642
2643 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2645 let port = listener.local_addr().unwrap().port();
2646 drop(listener); let component = HttpComponent::new();
2649 let endpoint_ctx = NoOpComponentContext;
2650 let endpoint = component
2651 .create_endpoint(&format!("http://127.0.0.1:{port}/echo"), &endpoint_ctx)
2652 .unwrap();
2653 let mut consumer = endpoint.create_consumer().unwrap();
2654
2655 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2656 let token = tokio_util::sync::CancellationToken::new();
2657 let ctx = ConsumerContext::new(tx, token.clone());
2658
2659 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2660 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2661
2662 let client = reqwest::Client::new();
2663 let send_fut = client
2664 .post(format!("http://127.0.0.1:{port}/echo"))
2665 .header("Content-Type", "text/plain")
2666 .body("ping")
2667 .send();
2668
2669 let (http_result, _) = tokio::join!(send_fut, async {
2670 if let Some(mut envelope) = rx.recv().await {
2671 assert_eq!(
2672 envelope.exchange.input.header("CamelHttpMethod"),
2673 Some(&serde_json::Value::String("POST".into()))
2674 );
2675 assert_eq!(
2676 envelope.exchange.input.header("CamelHttpPath"),
2677 Some(&serde_json::Value::String("/echo".into()))
2678 );
2679 envelope.exchange.input.body = camel_component_api::Body::Text("pong".to_string());
2680 if let Some(reply_tx) = envelope.reply_tx {
2681 let _ = reply_tx.send(Ok(envelope.exchange));
2682 }
2683 }
2684 });
2685
2686 let resp = http_result.unwrap();
2687 assert_eq!(resp.status().as_u16(), 200);
2688 let body = resp.text().await.unwrap();
2689 assert_eq!(body, "pong");
2690
2691 token.cancel();
2692 }
2693
2694 #[tokio::test]
2695 async fn test_integration_two_consumers_shared_port() {
2696 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2697
2698 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2700 let port = listener.local_addr().unwrap().port();
2701 drop(listener);
2702
2703 let component = HttpComponent::new();
2704 let endpoint_ctx = NoOpComponentContext;
2705
2706 let endpoint_a = component
2708 .create_endpoint(&format!("http://127.0.0.1:{port}/hello"), &endpoint_ctx)
2709 .unwrap();
2710 let mut consumer_a = endpoint_a.create_consumer().unwrap();
2711
2712 let endpoint_b = component
2714 .create_endpoint(&format!("http://127.0.0.1:{port}/world"), &endpoint_ctx)
2715 .unwrap();
2716 let mut consumer_b = endpoint_b.create_consumer().unwrap();
2717
2718 let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2719 let token_a = tokio_util::sync::CancellationToken::new();
2720 let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
2721
2722 let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2723 let token_b = tokio_util::sync::CancellationToken::new();
2724 let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
2725
2726 tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
2727 tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
2728 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2729
2730 let client = reqwest::Client::new();
2731
2732 let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
2734 let (resp_hello, _) = tokio::join!(fut_hello, async {
2735 if let Some(mut envelope) = rx_a.recv().await {
2736 envelope.exchange.input.body =
2737 camel_component_api::Body::Text("hello-response".to_string());
2738 if let Some(reply_tx) = envelope.reply_tx {
2739 let _ = reply_tx.send(Ok(envelope.exchange));
2740 }
2741 }
2742 });
2743
2744 let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
2746 let (resp_world, _) = tokio::join!(fut_world, async {
2747 if let Some(mut envelope) = rx_b.recv().await {
2748 envelope.exchange.input.body =
2749 camel_component_api::Body::Text("world-response".to_string());
2750 if let Some(reply_tx) = envelope.reply_tx {
2751 let _ = reply_tx.send(Ok(envelope.exchange));
2752 }
2753 }
2754 });
2755
2756 let body_a = resp_hello.unwrap().text().await.unwrap();
2757 let body_b = resp_world.unwrap().text().await.unwrap();
2758
2759 assert_eq!(body_a, "hello-response");
2760 assert_eq!(body_b, "world-response");
2761
2762 token_a.cancel();
2763 token_b.cancel();
2764 }
2765
2766 #[tokio::test]
2767 async fn test_integration_unregistered_path_returns_404() {
2768 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2769
2770 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2772 let port = listener.local_addr().unwrap().port();
2773 drop(listener);
2774
2775 let component = HttpComponent::new();
2776 let endpoint_ctx = NoOpComponentContext;
2777 let endpoint = component
2778 .create_endpoint(
2779 &format!("http://127.0.0.1:{port}/registered"),
2780 &endpoint_ctx,
2781 )
2782 .unwrap();
2783 let mut consumer = endpoint.create_consumer().unwrap();
2784
2785 let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2786 let token = tokio_util::sync::CancellationToken::new();
2787 let ctx = ConsumerContext::new(tx, token.clone());
2788
2789 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2790 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2791
2792 let client = reqwest::Client::new();
2793 let resp = client
2794 .get(format!("http://127.0.0.1:{port}/not-there"))
2795 .send()
2796 .await
2797 .unwrap();
2798 assert_eq!(resp.status().as_u16(), 404);
2799
2800 token.cancel();
2801 }
2802
2803 #[test]
2804 fn test_http_consumer_declares_concurrent() {
2805 use camel_component_api::ConcurrencyModel;
2806
2807 let config = HttpServerConfig {
2808 host: "127.0.0.1".to_string(),
2809 port: 19999,
2810 path: "/test".to_string(),
2811 max_request_body: 2 * 1024 * 1024,
2812 max_response_body: 10 * 1024 * 1024,
2813 max_inflight_requests: 1024,
2814 };
2815 let consumer = HttpConsumer::new(config);
2816 assert_eq!(
2817 consumer.concurrency_model(),
2818 ConcurrencyModel::Concurrent { max: None }
2819 );
2820 }
2821
2822 #[tokio::test]
2827 async fn test_http_reply_body_stream_variant_exists() {
2828 use bytes::Bytes;
2829 use camel_component_api::CamelError;
2830 use futures::stream;
2831
2832 let chunks: Vec<Result<Bytes, CamelError>> =
2833 vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
2834 let stream = Box::pin(stream::iter(chunks));
2835 let reply_body = HttpReplyBody::Stream(stream);
2836 match reply_body {
2838 HttpReplyBody::Stream(_) => {}
2839 HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
2840 }
2841 }
2842
2843 #[cfg(feature = "otel")]
2848 mod otel_tests {
2849 use super::*;
2850 use camel_component_api::Message;
2851 use tower::ServiceExt;
2852
2853 #[tokio::test]
2854 async fn test_producer_injects_traceparent_header() {
2855 let (url, _handle) = start_test_server_with_header_capture().await;
2856 let ctx = test_producer_ctx();
2857
2858 let component = HttpComponent::new();
2859 let endpoint_ctx = NoOpComponentContext;
2860 let endpoint = component
2861 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2862 .unwrap();
2863 let producer = endpoint.create_producer(&ctx).unwrap();
2864
2865 let mut exchange = Exchange::new(Message::default());
2867 let mut headers = std::collections::HashMap::new();
2868 headers.insert(
2869 "traceparent".to_string(),
2870 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
2871 );
2872 camel_otel::extract_into_exchange(&mut exchange, &headers);
2873
2874 let result = producer.oneshot(exchange).await.unwrap();
2875
2876 let status = result
2878 .input
2879 .header("CamelHttpResponseCode")
2880 .and_then(|v| v.as_u64())
2881 .unwrap();
2882 assert_eq!(status, 200);
2883
2884 let traceparent = result.input.header("x-received-traceparent");
2886 assert!(
2887 traceparent.is_some(),
2888 "traceparent header should have been sent"
2889 );
2890
2891 let traceparent_str = traceparent.unwrap().as_str().unwrap();
2892 let parts: Vec<&str> = traceparent_str.split('-').collect();
2894 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2895 assert_eq!(parts[0], "00", "version should be 00");
2896 assert_eq!(
2897 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2898 "trace-id should match"
2899 );
2900 assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
2901 assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
2902 }
2903
2904 #[tokio::test]
2905 async fn test_consumer_extracts_traceparent_header() {
2906 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2907
2908 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2910 let port = listener.local_addr().unwrap().port();
2911 drop(listener);
2912
2913 let component = HttpComponent::new();
2914 let endpoint_ctx = NoOpComponentContext;
2915 let endpoint = component
2916 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
2917 .unwrap();
2918 let mut consumer = endpoint.create_consumer().unwrap();
2919
2920 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2921 let token = tokio_util::sync::CancellationToken::new();
2922 let ctx = ConsumerContext::new(tx, token.clone());
2923
2924 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2925 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2926
2927 let client = reqwest::Client::new();
2929 let send_fut = client
2930 .post(format!("http://127.0.0.1:{port}/trace"))
2931 .header(
2932 "traceparent",
2933 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2934 )
2935 .body("test")
2936 .send();
2937
2938 let (http_result, _) = tokio::join!(send_fut, async {
2939 if let Some(envelope) = rx.recv().await {
2940 let mut injected_headers = std::collections::HashMap::new();
2943 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2944
2945 assert!(
2946 injected_headers.contains_key("traceparent"),
2947 "Exchange should have traceparent after extraction"
2948 );
2949
2950 let traceparent = injected_headers.get("traceparent").unwrap();
2951 let parts: Vec<&str> = traceparent.split('-').collect();
2952 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2953 assert_eq!(
2954 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2955 "Trace ID should match the original traceparent header"
2956 );
2957
2958 if let Some(reply_tx) = envelope.reply_tx {
2959 let _ = reply_tx.send(Ok(envelope.exchange));
2960 }
2961 }
2962 });
2963
2964 let resp = http_result.unwrap();
2965 assert_eq!(resp.status().as_u16(), 200);
2966
2967 token.cancel();
2968 }
2969
2970 #[tokio::test]
2971 async fn test_consumer_extracts_mixed_case_traceparent_header() {
2972 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2973
2974 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2976 let port = listener.local_addr().unwrap().port();
2977 drop(listener);
2978
2979 let component = HttpComponent::new();
2980 let endpoint_ctx = NoOpComponentContext;
2981 let endpoint = component
2982 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
2983 .unwrap();
2984 let mut consumer = endpoint.create_consumer().unwrap();
2985
2986 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2987 let token = tokio_util::sync::CancellationToken::new();
2988 let ctx = ConsumerContext::new(tx, token.clone());
2989
2990 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2991 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2992
2993 let client = reqwest::Client::new();
2995 let send_fut = client
2996 .post(format!("http://127.0.0.1:{port}/trace"))
2997 .header(
2998 "TraceParent",
2999 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3000 )
3001 .body("test")
3002 .send();
3003
3004 let (http_result, _) = tokio::join!(send_fut, async {
3005 if let Some(envelope) = rx.recv().await {
3006 let mut injected_headers = HashMap::new();
3009 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3010
3011 assert!(
3012 injected_headers.contains_key("traceparent"),
3013 "Exchange should have traceparent after extraction from mixed-case header"
3014 );
3015
3016 let traceparent = injected_headers.get("traceparent").unwrap();
3017 let parts: Vec<&str> = traceparent.split('-').collect();
3018 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3019 assert_eq!(
3020 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3021 "Trace ID should match the original mixed-case TraceParent header"
3022 );
3023
3024 if let Some(reply_tx) = envelope.reply_tx {
3025 let _ = reply_tx.send(Ok(envelope.exchange));
3026 }
3027 }
3028 });
3029
3030 let resp = http_result.unwrap();
3031 assert_eq!(resp.status().as_u16(), 200);
3032
3033 token.cancel();
3034 }
3035
3036 #[tokio::test]
3037 async fn test_producer_no_trace_context_no_crash() {
3038 let (url, _handle) = start_test_server().await;
3039 let ctx = test_producer_ctx();
3040
3041 let component = HttpComponent::new();
3042 let endpoint_ctx = NoOpComponentContext;
3043 let endpoint = component
3044 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3045 .unwrap();
3046 let producer = endpoint.create_producer(&ctx).unwrap();
3047
3048 let exchange = Exchange::new(Message::default());
3050
3051 let result = producer.oneshot(exchange).await.unwrap();
3053
3054 let status = result
3056 .input
3057 .header("CamelHttpResponseCode")
3058 .and_then(|v| v.as_u64())
3059 .unwrap();
3060 assert_eq!(status, 200);
3061 }
3062
3063 async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
3065 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3066 let addr = listener.local_addr().unwrap();
3067 let url = format!("http://127.0.0.1:{}", addr.port());
3068
3069 let handle = tokio::spawn(async move {
3070 loop {
3071 if let Ok((mut stream, _)) = listener.accept().await {
3072 tokio::spawn(async move {
3073 use tokio::io::{AsyncReadExt, AsyncWriteExt};
3074 let mut buf = vec![0u8; 8192];
3075 let n = stream.read(&mut buf).await.unwrap_or(0);
3076 let request = String::from_utf8_lossy(&buf[..n]).to_string();
3077
3078 let traceparent = request
3080 .lines()
3081 .find(|line| line.to_lowercase().starts_with("traceparent:"))
3082 .map(|line| {
3083 line.split(':')
3084 .nth(1)
3085 .map(|s| s.trim().to_string())
3086 .unwrap_or_default()
3087 })
3088 .unwrap_or_default();
3089
3090 let body =
3091 format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
3092 let response = format!(
3093 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
3094 body.len(),
3095 traceparent,
3096 body
3097 );
3098 let _ = stream.write_all(response.as_bytes()).await;
3099 });
3100 }
3101 }
3102 });
3103
3104 (url, handle)
3105 }
3106 }
3107
3108 #[tokio::test]
3117 async fn test_request_body_arrives_as_stream() {
3118 use camel_component_api::Body;
3119 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3120
3121 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3122 let port = listener.local_addr().unwrap().port();
3123 drop(listener);
3124
3125 let component = HttpComponent::new();
3126 let endpoint_ctx = NoOpComponentContext;
3127 let endpoint = component
3128 .create_endpoint(&format!("http://127.0.0.1:{port}/upload"), &endpoint_ctx)
3129 .unwrap();
3130 let mut consumer = endpoint.create_consumer().unwrap();
3131
3132 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3133 let token = tokio_util::sync::CancellationToken::new();
3134 let ctx = ConsumerContext::new(tx, token.clone());
3135
3136 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3137 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3138
3139 let client = reqwest::Client::new();
3140 let send_fut = client
3141 .post(format!("http://127.0.0.1:{port}/upload"))
3142 .body("hello streaming world")
3143 .send();
3144
3145 let (http_result, _) = tokio::join!(send_fut, async {
3146 if let Some(mut envelope) = rx.recv().await {
3147 assert!(
3149 matches!(envelope.exchange.input.body, Body::Stream(_)),
3150 "expected Body::Stream, got discriminant {:?}",
3151 std::mem::discriminant(&envelope.exchange.input.body)
3152 );
3153 let bytes = envelope
3155 .exchange
3156 .input
3157 .body
3158 .into_bytes(1024 * 1024)
3159 .await
3160 .unwrap();
3161 assert_eq!(&bytes[..], b"hello streaming world");
3162
3163 envelope.exchange.input.body = camel_component_api::Body::Empty;
3164 if let Some(reply_tx) = envelope.reply_tx {
3165 let _ = reply_tx.send(Ok(envelope.exchange));
3166 }
3167 }
3168 });
3169
3170 let resp = http_result.unwrap();
3171 assert_eq!(resp.status().as_u16(), 200);
3172
3173 token.cancel();
3174 }
3175
3176 #[tokio::test]
3181 async fn test_streaming_response_chunked() {
3182 use bytes::Bytes;
3183 use camel_component_api::Body;
3184 use camel_component_api::CamelError;
3185 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3186 use camel_component_api::{StreamBody, StreamMetadata};
3187 use futures::stream;
3188 use std::sync::Arc;
3189 use tokio::sync::Mutex;
3190
3191 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3192 let port = listener.local_addr().unwrap().port();
3193 drop(listener);
3194
3195 let component = HttpComponent::new();
3196 let endpoint_ctx = NoOpComponentContext;
3197 let endpoint = component
3198 .create_endpoint(&format!("http://127.0.0.1:{port}/stream"), &endpoint_ctx)
3199 .unwrap();
3200 let mut consumer = endpoint.create_consumer().unwrap();
3201
3202 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3203 let token = tokio_util::sync::CancellationToken::new();
3204 let ctx = ConsumerContext::new(tx, token.clone());
3205
3206 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3207 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3208
3209 let client = reqwest::Client::new();
3210 let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
3211
3212 let (http_result, _) = tokio::join!(send_fut, async {
3213 if let Some(mut envelope) = rx.recv().await {
3214 let chunks: Vec<Result<Bytes, CamelError>> =
3216 vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
3217 let stream = Box::pin(stream::iter(chunks));
3218 envelope.exchange.input.body = Body::Stream(StreamBody {
3219 stream: Arc::new(Mutex::new(Some(stream))),
3220 metadata: StreamMetadata::default(),
3221 });
3222 if let Some(reply_tx) = envelope.reply_tx {
3223 let _ = reply_tx.send(Ok(envelope.exchange));
3224 }
3225 }
3226 });
3227
3228 let resp = http_result.unwrap();
3229 assert_eq!(resp.status().as_u16(), 200);
3230 let body = resp.text().await.unwrap();
3231 assert_eq!(body, "chunk1chunk2");
3232
3233 token.cancel();
3234 }
3235
3236 #[tokio::test]
3241 async fn test_413_when_content_length_exceeds_limit() {
3242 use camel_component_api::ConsumerContext;
3243
3244 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3245 let port = listener.local_addr().unwrap().port();
3246 drop(listener);
3247
3248 let component = HttpComponent::new();
3250 let endpoint_ctx = NoOpComponentContext;
3251 let endpoint = component
3252 .create_endpoint(
3253 &format!("http://127.0.0.1:{port}/upload?maxRequestBody=100"),
3254 &endpoint_ctx,
3255 )
3256 .unwrap();
3257 let mut consumer = endpoint.create_consumer().unwrap();
3258
3259 let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3260 let token = tokio_util::sync::CancellationToken::new();
3261 let ctx = ConsumerContext::new(tx, token.clone());
3262
3263 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3264 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3265
3266 let client = reqwest::Client::new();
3267 let resp = client
3268 .post(format!("http://127.0.0.1:{port}/upload"))
3269 .header("Content-Length", "1000") .body("x".repeat(1000))
3271 .send()
3272 .await
3273 .unwrap();
3274
3275 assert_eq!(resp.status().as_u16(), 413);
3276
3277 token.cancel();
3278 }
3279
3280 #[tokio::test]
3284 async fn test_chunked_upload_without_content_length_bypasses_limit() {
3285 use bytes::Bytes;
3286 use camel_component_api::Body;
3287 use camel_component_api::ConsumerContext;
3288 use futures::stream;
3289
3290 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3291 let port = listener.local_addr().unwrap().port();
3292 drop(listener);
3293
3294 let component = HttpComponent::new();
3296 let endpoint_ctx = NoOpComponentContext;
3297 let endpoint = component
3298 .create_endpoint(
3299 &format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"),
3300 &endpoint_ctx,
3301 )
3302 .unwrap();
3303 let mut consumer = endpoint.create_consumer().unwrap();
3304
3305 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3306 let token = tokio_util::sync::CancellationToken::new();
3307 let ctx = ConsumerContext::new(tx, token.clone());
3308
3309 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3310 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3311
3312 let client = reqwest::Client::new();
3313
3314 let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
3318 Ok(Bytes::from("y".repeat(50))),
3319 Ok(Bytes::from("y".repeat(50))),
3320 ];
3321 let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
3322 let send_fut = client
3323 .post(format!("http://127.0.0.1:{port}/upload"))
3324 .body(stream_body)
3325 .send();
3326
3327 let consumer_fut = async {
3328 match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
3330 Ok(Some(mut envelope)) => {
3331 assert!(
3332 matches!(envelope.exchange.input.body, Body::Stream(_)),
3333 "expected Body::Stream"
3334 );
3335 envelope.exchange.input.body = camel_component_api::Body::Empty;
3336 if let Some(reply_tx) = envelope.reply_tx {
3337 let _ = reply_tx.send(Ok(envelope.exchange));
3338 }
3339 }
3340 Ok(None) => panic!("consumer channel closed unexpectedly"),
3341 Err(_) => {
3342 }
3345 }
3346 };
3347
3348 let (http_result, _) = tokio::join!(send_fut, consumer_fut);
3349
3350 let resp = http_result.unwrap();
3351 assert_ne!(
3353 resp.status().as_u16(),
3354 413,
3355 "chunked upload must not be rejected by maxRequestBody"
3356 );
3357 assert_eq!(resp.status().as_u16(), 200);
3358
3359 token.cancel();
3360 }
3361}