1pub mod bundle;
2pub mod config;
3pub use bundle::HttpBundle;
4pub use config::HttpConfig;
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex, OnceLock};
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use tokio::sync::{OnceCell, RwLock};
14use tower::Service;
15use tracing::debug;
16
17use axum::body::BodyDataStream;
18use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, StreamBody, StreamMetadata};
19use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
20use camel_component_api::{UriComponents, UriConfig, parse_uri};
21use futures::TryStreamExt;
22use futures::stream::BoxStream;
23
24#[derive(Debug, Clone)]
85pub struct HttpEndpointConfig {
86 pub base_url: String,
87 pub http_method: Option<String>,
88 pub throw_exception_on_failure: bool,
89 pub ok_status_code_range: (u16, u16),
90 pub response_timeout: Option<Duration>,
91 pub query_params: HashMap<String, String>,
92 pub allow_private_ips: bool,
93 pub blocked_hosts: Vec<String>,
94 pub max_body_size: usize,
95}
96
97const HTTP_CAMEL_OPTIONS: &[&str] = &[
99 "httpMethod",
100 "throwExceptionOnFailure",
101 "okStatusCodeRange",
102 "followRedirects",
103 "connectTimeout",
104 "responseTimeout",
105 "allowPrivateIps",
106 "blockedHosts",
107 "maxBodySize",
108];
109
110impl UriConfig for HttpEndpointConfig {
111 fn scheme() -> &'static str {
113 "http"
114 }
115
116 fn from_uri(uri: &str) -> Result<Self, CamelError> {
117 let parts = parse_uri(uri)?;
118 Self::from_components(parts)
119 }
120
121 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
122 if parts.scheme != "http" && parts.scheme != "https" {
124 return Err(CamelError::InvalidUri(format!(
125 "expected scheme 'http' or 'https', got '{}'",
126 parts.scheme
127 )));
128 }
129
130 let base_url = format!("{}:{}", parts.scheme, parts.path);
133
134 let http_method = parts.params.get("httpMethod").cloned();
135
136 let throw_exception_on_failure = parts
137 .params
138 .get("throwExceptionOnFailure")
139 .map(|v| v != "false")
140 .unwrap_or(true);
141
142 let ok_status_code_range = parts
144 .params
145 .get("okStatusCodeRange")
146 .and_then(|v| {
147 let (start, end) = v.split_once('-')?;
148 Some((start.parse::<u16>().ok()?, end.parse::<u16>().ok()?))
149 })
150 .unwrap_or((200, 299));
151
152 let response_timeout = parts
153 .params
154 .get("responseTimeout")
155 .and_then(|v| v.parse::<u64>().ok())
156 .map(Duration::from_millis);
157
158 let allow_private_ips = parts
160 .params
161 .get("allowPrivateIps")
162 .map(|v| v == "true")
163 .unwrap_or(false); let blocked_hosts = parts
167 .params
168 .get("blockedHosts")
169 .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
170 .unwrap_or_default();
171
172 let max_body_size = parts
173 .params
174 .get("maxBodySize")
175 .and_then(|v| v.parse::<usize>().ok())
176 .unwrap_or(10 * 1024 * 1024); let query_params: HashMap<String, String> = parts
180 .params
181 .into_iter()
182 .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
183 .collect();
184
185 Ok(Self {
186 base_url,
187 http_method,
188 throw_exception_on_failure,
189 ok_status_code_range,
190 response_timeout,
191 query_params,
192 allow_private_ips,
193 blocked_hosts,
194 max_body_size,
195 })
196 }
197}
198
199impl HttpEndpointConfig {
200 pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
201 let parts = parse_uri(uri)?;
202 let mut endpoint = Self::from_components(parts.clone())?;
203 if endpoint.response_timeout.is_none() {
204 endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
205 }
206 if !parts.params.contains_key("allowPrivateIps") {
207 endpoint.allow_private_ips = config.allow_private_ips;
208 }
209 if !parts.params.contains_key("blockedHosts") {
210 endpoint.blocked_hosts = config.blocked_hosts.clone();
211 }
212 if !parts.params.contains_key("maxBodySize") {
213 endpoint.max_body_size = config.max_body_size;
214 }
215 Ok(endpoint)
216 }
217}
218
219#[derive(Debug, Clone)]
225pub struct HttpServerConfig {
226 pub host: String,
228 pub port: u16,
230 pub path: String,
232 pub max_request_body: usize,
234 pub max_response_body: usize,
236 pub max_inflight_requests: usize,
238}
239
240impl UriConfig for HttpServerConfig {
241 fn scheme() -> &'static str {
243 "http"
244 }
245
246 fn from_uri(uri: &str) -> Result<Self, CamelError> {
247 let parts = parse_uri(uri)?;
248 Self::from_components(parts)
249 }
250
251 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
252 if parts.scheme != "http" && parts.scheme != "https" {
254 return Err(CamelError::InvalidUri(format!(
255 "expected scheme 'http' or 'https', got '{}'",
256 parts.scheme
257 )));
258 }
259
260 let authority_and_path = parts.path.trim_start_matches('/');
263
264 let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
266 (&authority_and_path[..idx], &authority_and_path[idx..])
267 } else {
268 (authority_and_path, "/")
269 };
270
271 let path = if path_suffix.is_empty() {
272 "/"
273 } else {
274 path_suffix
275 }
276 .to_string();
277
278 let (host, port) = if let Some(colon) = authority.rfind(':') {
280 let port_str = &authority[colon + 1..];
281 match port_str.parse::<u16>() {
282 Ok(p) => (authority[..colon].to_string(), p),
283 Err(_) => {
284 return Err(CamelError::InvalidUri(format!(
285 "invalid port '{}' in authority",
286 port_str
287 )));
288 }
289 }
290 } else {
291 let default_port = if parts.scheme == "https" { 443 } else { 80 };
293 (authority.to_string(), default_port)
294 };
295
296 let max_request_body = parts
297 .params
298 .get("maxRequestBody")
299 .and_then(|v| v.parse::<usize>().ok())
300 .unwrap_or(2 * 1024 * 1024); let max_response_body = parts
303 .params
304 .get("maxResponseBody")
305 .and_then(|v| v.parse::<usize>().ok())
306 .unwrap_or(10 * 1024 * 1024); let max_inflight_requests = parts
309 .params
310 .get("maxInflightRequests")
311 .and_then(|v| v.parse::<usize>().ok())
312 .unwrap_or(1024);
313
314 Ok(Self {
315 host,
316 port,
317 path,
318 max_request_body,
319 max_response_body,
320 max_inflight_requests,
321 })
322 }
323}
324
325impl HttpServerConfig {
326 pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
327 let parts = parse_uri(uri)?;
328 let mut server = Self::from_components(parts.clone())?;
329 if !parts.params.contains_key("maxRequestBody") {
330 server.max_request_body = config.max_request_body;
331 }
332 if !parts.params.contains_key("maxResponseBody") {
333 server.max_response_body = config.max_body_size;
334 }
335 Ok(server)
336 }
337}
338
339pub(crate) enum HttpReplyBody {
345 Bytes(bytes::Bytes),
346 Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
347}
348
349pub(crate) struct RequestEnvelope {
352 pub(crate) method: String,
353 pub(crate) path: String,
354 pub(crate) query: String,
355 pub(crate) headers: http::HeaderMap,
356 pub(crate) body: StreamBody,
357 pub(crate) reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
358}
359
360pub(crate) struct HttpReply {
362 pub(crate) status: u16,
363 pub(crate) headers: Vec<(String, String)>,
364 pub(crate) body: HttpReplyBody,
365}
366
367pub(crate) type DispatchTable =
373 Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
374
375type ServerKey = (String, u16);
376
377#[allow(dead_code)]
379struct ServerHandle {
380 dispatch: DispatchTable,
381 max_request_body: usize,
382 max_response_body: usize,
383 max_inflight_requests: usize,
384 inflight: Arc<tokio::sync::Semaphore>,
385 _task: tokio::task::JoinHandle<()>,
387}
388
389pub struct ServerRegistry {
391 inner: Mutex<HashMap<ServerKey, Arc<OnceCell<ServerHandle>>>>,
392}
393
394impl ServerRegistry {
395 pub fn global() -> &'static Self {
397 static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
398 INSTANCE.get_or_init(|| ServerRegistry {
399 inner: Mutex::new(HashMap::new()),
400 })
401 }
402
403 pub(crate) async fn get_or_spawn(
406 &'static self,
407 host: &str,
408 port: u16,
409 max_request_body: usize,
410 max_response_body: usize,
411 max_inflight_requests: usize,
412 ) -> Result<DispatchTable, CamelError> {
413 let host_owned = host.to_string();
414
415 let cell = {
416 let mut guard = self.inner.lock().map_err(|_| {
417 CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
418 })?;
419 let key = (host.to_string(), port);
420 guard
421 .entry(key)
422 .or_insert_with(|| Arc::new(OnceCell::new()))
423 .clone()
424 };
425
426 if let Some(existing) = cell.get()
427 && existing.max_request_body != max_request_body
428 {
429 return Err(CamelError::EndpointCreationFailed(
430 format!(
431 "incompatible maxRequestBody for shared server (host={host}, port={port}): {} vs {}",
432 existing.max_request_body, max_request_body
433 ),
434 ));
435 }
436
437 if let Some(existing) = cell.get()
438 && existing.max_response_body != max_response_body
439 {
440 return Err(CamelError::EndpointCreationFailed(
441 format!(
442 "incompatible maxResponseBody for shared server (host={host}, port={port}): {} vs {}",
443 existing.max_response_body, max_response_body
444 ),
445 ));
446 }
447
448 if let Some(existing) = cell.get()
449 && existing.max_inflight_requests != max_inflight_requests
450 {
451 return Err(CamelError::EndpointCreationFailed(
452 format!(
453 "incompatible maxInflightRequests for shared server (host={host}, port={port}): {} vs {}",
454 existing.max_inflight_requests, max_inflight_requests
455 ),
456 ));
457 }
458
459 let handle = cell
460 .get_or_try_init(|| async {
461 let addr = format!("{host_owned}:{port}");
462 let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
463 CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
464 })?;
465 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
466 let inflight = Arc::new(tokio::sync::Semaphore::new(max_inflight_requests));
467 let task = tokio::spawn(run_axum_server(
468 listener,
469 Arc::clone(&dispatch),
470 max_request_body,
471 max_response_body,
472 Arc::clone(&inflight),
473 ));
474 Ok::<ServerHandle, CamelError>(ServerHandle {
475 dispatch,
476 max_request_body,
477 max_response_body,
478 max_inflight_requests,
479 inflight,
480 _task: task,
481 })
482 })
483 .await?;
484
485 Ok(Arc::clone(&handle.dispatch))
486 }
487}
488
489use axum::{
494 Router,
495 body::Body as AxumBody,
496 extract::{Request, State},
497 http::{Response, StatusCode},
498 response::IntoResponse,
499};
500
501#[derive(Clone)]
502struct AppState {
503 dispatch: DispatchTable,
504 max_request_body: usize,
505 max_response_body: usize,
506 inflight: Arc<tokio::sync::Semaphore>,
507}
508
509async fn run_axum_server(
510 listener: tokio::net::TcpListener,
511 dispatch: DispatchTable,
512 max_request_body: usize,
513 max_response_body: usize,
514 inflight: Arc<tokio::sync::Semaphore>,
515) {
516 let state = AppState {
517 dispatch,
518 max_request_body,
519 max_response_body,
520 inflight,
521 };
522 let app = Router::new().fallback(dispatch_handler).with_state(state);
523
524 axum::serve(listener, app).await.unwrap_or_else(|e| {
525 tracing::error!(error = %e, "Axum server error");
526 });
527}
528
529async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
530 let method = req.method().to_string();
531 let path = req.uri().path().to_string();
532 let query = req.uri().query().unwrap_or("").to_string();
533 let headers = req.headers().clone();
534
535 let content_length: Option<u64> = headers
537 .get(http::header::CONTENT_LENGTH)
538 .and_then(|v| v.to_str().ok())
539 .and_then(|s| s.parse().ok());
540
541 if let Some(len) = content_length
542 && len > state.max_request_body as u64
543 {
544 return Response::builder()
545 .status(StatusCode::PAYLOAD_TOO_LARGE)
546 .body(AxumBody::from("Request body exceeds configured limit"))
547 .expect("infallible");
548 }
549
550 let _permit = match Arc::clone(&state.inflight).try_acquire_owned() {
551 Ok(permit) => permit,
552 Err(_) => {
553 return Response::builder()
554 .status(StatusCode::SERVICE_UNAVAILABLE)
555 .body(AxumBody::from("Service Unavailable"))
556 .expect("infallible");
557 }
558 };
559
560 let content_type = headers
562 .get(http::header::CONTENT_TYPE)
563 .and_then(|v| v.to_str().ok())
564 .map(|s| s.to_string());
565
566 let data_stream: BodyDataStream = req.into_body().into_data_stream();
567 let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
568 let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
569
570 let stream_body = StreamBody {
571 stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
572 metadata: StreamMetadata {
573 size_hint: content_length,
574 content_type,
575 origin: None,
576 },
577 };
578
579 let sender = {
581 let table = state.dispatch.read().await;
582 table.get(&path).cloned()
583 };
584 let Some(sender) = sender else {
585 return Response::builder()
586 .status(StatusCode::NOT_FOUND)
587 .body(AxumBody::from("No consumer registered for this path"))
588 .expect("infallible");
589 };
590
591 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
592 let envelope = RequestEnvelope {
593 method,
594 path,
595 query,
596 headers,
597 body: stream_body,
598 reply_tx,
599 };
600
601 if sender.send(envelope).await.is_err() {
602 return Response::builder()
603 .status(StatusCode::SERVICE_UNAVAILABLE)
604 .body(AxumBody::from("Consumer unavailable"))
605 .expect("infallible");
606 }
607
608 match reply_rx.await {
609 Ok(reply) => {
610 let reply = match reply.body {
611 HttpReplyBody::Bytes(b) if exceeds_max_response_body(b.len(), state.max_response_body) => {
612 HttpReply {
613 status: 500,
614 headers: vec![],
615 body: HttpReplyBody::Bytes(bytes::Bytes::from("Response body exceeds configured limit")),
616 }
617 }
618 _ => reply,
619 };
620
621 let status =
622 StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
623 let mut builder = Response::builder().status(status);
624 for (k, v) in &reply.headers {
625 builder = builder.header(k.as_str(), v.as_str());
626 }
627 match reply.body {
628 HttpReplyBody::Bytes(b) => builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
629 Response::builder()
630 .status(StatusCode::INTERNAL_SERVER_ERROR)
631 .body(AxumBody::from("Invalid response headers from consumer"))
632 .expect("infallible")
633 }),
634 HttpReplyBody::Stream(stream) => builder
635 .body(AxumBody::from_stream(stream))
636 .unwrap_or_else(|_| {
637 Response::builder()
638 .status(StatusCode::INTERNAL_SERVER_ERROR)
639 .body(AxumBody::from("Invalid response headers from consumer"))
640 .expect("infallible")
641 }),
642 }
643 }
644 Err(_) => Response::builder()
645 .status(StatusCode::INTERNAL_SERVER_ERROR)
646 .body(AxumBody::from("Pipeline error"))
647 .expect("Response::builder() with a known-valid status code and body is infallible"),
648 }
649}
650
651fn exceeds_max_response_body(len: usize, max: usize) -> bool {
652 len > max
653}
654
655pub struct HttpConsumer {
660 config: HttpServerConfig,
661}
662
663impl HttpConsumer {
664 pub fn new(config: HttpServerConfig) -> Self {
665 Self { config }
666 }
667}
668
669#[async_trait::async_trait]
670impl Consumer for HttpConsumer {
671 async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
672 use camel_component_api::{Body, Exchange, Message};
673
674 let dispatch = ServerRegistry::global()
675 .get_or_spawn(
676 &self.config.host,
677 self.config.port,
678 self.config.max_request_body,
679 self.config.max_response_body,
680 self.config.max_inflight_requests,
681 )
682 .await?;
683
684 let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
686 {
687 let mut table = dispatch.write().await;
688 table.insert(self.config.path.clone(), env_tx);
689 }
690
691 let path = self.config.path.clone();
692 let cancel_token = ctx.cancel_token();
693 loop {
694 tokio::select! {
695 _ = ctx.cancelled() => {
696 break;
697 }
698 envelope = env_rx.recv() => {
699 let Some(envelope) = envelope else { break; };
700
701 let mut msg = Message::default();
703
704 msg.set_header("CamelHttpMethod",
706 serde_json::Value::String(envelope.method.clone()));
707 msg.set_header("CamelHttpPath",
708 serde_json::Value::String(envelope.path.clone()));
709 msg.set_header("CamelHttpQuery",
710 serde_json::Value::String(envelope.query.clone()));
711
712 for (k, v) in &envelope.headers {
714 if let Ok(val_str) = v.to_str() {
715 msg.set_header(
716 k.as_str(),
717 serde_json::Value::String(val_str.to_string()),
718 );
719 }
720 }
721
722 msg.body = Body::Stream(envelope.body);
725
726 #[allow(unused_mut)]
727 let mut exchange = Exchange::new(msg);
728
729 #[cfg(feature = "otel")]
731 {
732 let headers: HashMap<String, String> = envelope
733 .headers
734 .iter()
735 .filter_map(|(k, v)| {
736 Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
737 })
738 .collect();
739 camel_otel::extract_into_exchange(&mut exchange, &headers);
740 }
741
742 let reply_tx = envelope.reply_tx;
743 let sender = ctx.sender().clone();
744 let path_clone = path.clone();
745 let cancel = cancel_token.clone();
746
747 tokio::spawn(async move {
767 if cancel.is_cancelled() {
775 let _ = reply_tx.send(HttpReply {
776 status: 503,
777 headers: vec![],
778 body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
779 });
780 return;
781 }
782
783 let (tx, rx) = tokio::sync::oneshot::channel();
785 let envelope = camel_component_api::consumer::ExchangeEnvelope {
786 exchange,
787 reply_tx: Some(tx),
788 };
789
790 let result = match sender.send(envelope).await {
791 Ok(()) => rx.await.map_err(|_| camel_component_api::CamelError::ChannelClosed),
792 Err(_) => Err(camel_component_api::CamelError::ChannelClosed),
793 }
794 .and_then(|r| r);
795
796 let reply = match result {
797 Ok(out) => {
798 let status = out
799 .input
800 .header("CamelHttpResponseCode")
801 .and_then(|v| v.as_u64())
802 .map(|s| s as u16)
803 .unwrap_or(200);
804
805 let reply_body: HttpReplyBody = match out.input.body {
806 Body::Empty => HttpReplyBody::Bytes(bytes::Bytes::new()),
807 Body::Bytes(b) => HttpReplyBody::Bytes(b),
808 Body::Text(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
809 Body::Xml(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
810 Body::Json(v) => HttpReplyBody::Bytes(bytes::Bytes::from(
811 v.to_string().into_bytes(),
812 )),
813 Body::Stream(s) => {
814 match s.stream.lock().await.take() {
815 Some(stream) => HttpReplyBody::Stream(stream),
816 None => {
817 tracing::error!(
818 "Body::Stream already consumed before HTTP reply — returning 500"
819 );
820 let error_reply = HttpReply {
821 status: 500,
822 headers: vec![],
823 body: HttpReplyBody::Bytes(bytes::Bytes::new()),
824 };
825 if reply_tx.send(error_reply).is_err() {
826 debug!("reply_tx dropped before error reply could be sent");
827 }
828 return;
829 }
830 }
831 }
832 };
833
834 let resp_headers: Vec<(String, String)> = out
835 .input
836 .headers
837 .iter()
838 .filter(|(k, _)| !k.starts_with("Camel"))
840 .filter(|(k, _)| {
843 !matches!(
844 k.to_lowercase().as_str(),
845 "content-length" | "content-type" | "transfer-encoding" | "connection" | "cache-control" | "date" | "pragma" | "trailer" | "upgrade" | "via" | "warning" | "host" | "user-agent" | "accept" | "accept-encoding" | "accept-language" | "accept-charset" | "authorization" | "proxy-authorization" | "cookie" | "expect" | "from" | "if-match" | "if-modified-since" | "if-none-match" | "if-range" | "if-unmodified-since" | "max-forwards" | "proxy-connection" | "range" | "referer" | "te" )
880 })
881 .filter_map(|(k, v)| {
882 v.as_str().map(|s| (k.clone(), s.to_string()))
883 })
884 .collect();
885
886 HttpReply {
887 status,
888 headers: resp_headers,
889 body: reply_body,
890 }
891 }
892 Err(e) => {
893 tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
894 HttpReply {
895 status: 500,
896 headers: vec![],
897 body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
898 }
899 }
900 };
901
902 let _ = reply_tx.send(reply);
904 });
905 }
906 }
907 }
908
909 {
911 let mut table = dispatch.write().await;
912 table.remove(&path);
913 }
914
915 Ok(())
916 }
917
918 async fn stop(&mut self) -> Result<(), CamelError> {
919 Ok(())
920 }
921
922 fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
923 camel_component_api::ConcurrencyModel::Concurrent { max: None }
924 }
925}
926
927pub struct HttpComponent {
932 client: reqwest::Client,
933 config: HttpConfig,
934}
935
936fn build_client(config: &HttpConfig) -> reqwest::Client {
937 let mut builder = reqwest::Client::builder()
938 .connect_timeout(Duration::from_millis(config.connect_timeout_ms))
939 .pool_max_idle_per_host(config.pool_max_idle_per_host)
940 .pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
941
942 if !config.follow_redirects {
943 builder = builder.redirect(reqwest::redirect::Policy::none());
944 }
945
946 builder
947 .build()
948 .expect("reqwest::Client::build() with valid config should not fail")
949}
950
951impl HttpComponent {
952 pub fn new() -> Self {
953 let config = HttpConfig::default();
954 let client = build_client(&config);
955 Self { client, config }
956 }
957
958 pub fn with_config(config: HttpConfig) -> Self {
959 let client = build_client(&config);
960 Self { client, config }
961 }
962
963 pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
964 match config {
965 Some(cfg) => Self::with_config(cfg),
966 None => Self::new(),
967 }
968 }
969}
970
971impl Default for HttpComponent {
972 fn default() -> Self {
973 Self::new()
974 }
975}
976
977impl Component for HttpComponent {
978 fn scheme(&self) -> &str {
979 "http"
980 }
981
982 fn create_endpoint(
983 &self,
984 uri: &str,
985 _ctx: &dyn camel_component_api::ComponentContext,
986 ) -> Result<Box<dyn Endpoint>, CamelError> {
987 let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
988 let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
989 Ok(Box::new(HttpEndpoint {
990 uri: uri.to_string(),
991 config,
992 server_config,
993 client: self.client.clone(),
994 }))
995 }
996}
997
998pub struct HttpsComponent {
999 client: reqwest::Client,
1000 config: HttpConfig,
1001}
1002
1003impl HttpsComponent {
1004 pub fn new() -> Self {
1005 let config = HttpConfig::default();
1006 let client = build_client(&config);
1007 Self { client, config }
1008 }
1009
1010 pub fn with_config(config: HttpConfig) -> Self {
1011 let client = build_client(&config);
1012 Self { client, config }
1013 }
1014
1015 pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
1016 match config {
1017 Some(cfg) => Self::with_config(cfg),
1018 None => Self::new(),
1019 }
1020 }
1021}
1022
1023impl Default for HttpsComponent {
1024 fn default() -> Self {
1025 Self::new()
1026 }
1027}
1028
1029impl Component for HttpsComponent {
1030 fn scheme(&self) -> &str {
1031 "https"
1032 }
1033
1034 fn create_endpoint(
1035 &self,
1036 uri: &str,
1037 _ctx: &dyn camel_component_api::ComponentContext,
1038 ) -> Result<Box<dyn Endpoint>, CamelError> {
1039 let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
1040 let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
1041 Ok(Box::new(HttpEndpoint {
1042 uri: uri.to_string(),
1043 config,
1044 server_config,
1045 client: self.client.clone(),
1046 }))
1047 }
1048}
1049
1050struct HttpEndpoint {
1055 uri: String,
1056 config: HttpEndpointConfig,
1057 server_config: HttpServerConfig,
1058 client: reqwest::Client,
1059}
1060
1061impl Endpoint for HttpEndpoint {
1062 fn uri(&self) -> &str {
1063 &self.uri
1064 }
1065
1066 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1067 Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
1068 }
1069
1070 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1071 Ok(BoxProcessor::new(HttpProducer {
1072 config: Arc::new(self.config.clone()),
1073 client: self.client.clone(),
1074 }))
1075 }
1076}
1077
1078fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
1083 let parsed = url::Url::parse(url)
1084 .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
1085
1086 if let Some(host) = parsed.host_str()
1088 && config.blocked_hosts.iter().any(|blocked| host == blocked)
1089 {
1090 return Err(CamelError::ProcessorError(format!(
1091 "Host '{}' is blocked",
1092 host
1093 )));
1094 }
1095
1096 if !config.allow_private_ips
1098 && let Some(host) = parsed.host()
1099 {
1100 match host {
1101 url::Host::Ipv4(ip) => {
1102 if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
1103 return Err(CamelError::ProcessorError(format!(
1104 "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
1105 ip
1106 )));
1107 }
1108 }
1109 url::Host::Ipv6(ip) => {
1110 if ip.is_loopback() {
1111 return Err(CamelError::ProcessorError(format!(
1112 "Loopback IP '{}' not allowed",
1113 ip
1114 )));
1115 }
1116 }
1117 url::Host::Domain(domain) => {
1118 let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
1120 if blocked_domains.contains(&domain) {
1121 return Err(CamelError::ProcessorError(format!(
1122 "Domain '{}' is not allowed",
1123 domain
1124 )));
1125 }
1126 }
1127 }
1128 }
1129
1130 Ok(())
1131}
1132
1133#[derive(Clone)]
1138struct HttpProducer {
1139 config: Arc<HttpEndpointConfig>,
1140 client: reqwest::Client,
1141}
1142
1143impl HttpProducer {
1144 fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1145 if let Some(ref method) = config.http_method {
1146 return method.to_uppercase();
1147 }
1148 if let Some(method) = exchange
1149 .input
1150 .header("CamelHttpMethod")
1151 .and_then(|v| v.as_str())
1152 {
1153 return method.to_uppercase();
1154 }
1155 if !exchange.input.body.is_empty() {
1156 return "POST".to_string();
1157 }
1158 "GET".to_string()
1159 }
1160
1161 fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1162 if let Some(uri) = exchange
1163 .input
1164 .header("CamelHttpUri")
1165 .and_then(|v| v.as_str())
1166 {
1167 let mut url = uri.to_string();
1168 if let Some(path) = exchange
1169 .input
1170 .header("CamelHttpPath")
1171 .and_then(|v| v.as_str())
1172 {
1173 if !url.ends_with('/') && !path.starts_with('/') {
1174 url.push('/');
1175 }
1176 url.push_str(path);
1177 }
1178 if let Some(query) = exchange
1179 .input
1180 .header("CamelHttpQuery")
1181 .and_then(|v| v.as_str())
1182 {
1183 url.push('?');
1184 url.push_str(query);
1185 }
1186 return url;
1187 }
1188
1189 let mut url = config.base_url.clone();
1190
1191 if let Some(path) = exchange
1192 .input
1193 .header("CamelHttpPath")
1194 .and_then(|v| v.as_str())
1195 {
1196 if !url.ends_with('/') && !path.starts_with('/') {
1197 url.push('/');
1198 }
1199 url.push_str(path);
1200 }
1201
1202 if let Some(query) = exchange
1203 .input
1204 .header("CamelHttpQuery")
1205 .and_then(|v| v.as_str())
1206 {
1207 url.push('?');
1208 url.push_str(query);
1209 } else if !config.query_params.is_empty() {
1210 url.push('?');
1212 let query_string: String = config
1213 .query_params
1214 .iter()
1215 .map(|(k, v)| format!("{k}={v}"))
1216 .collect::<Vec<_>>()
1217 .join("&");
1218 url.push_str(&query_string);
1219 }
1220
1221 url
1222 }
1223
1224 fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1225 status >= range.0 && status <= range.1
1226 }
1227}
1228
1229impl Service<Exchange> for HttpProducer {
1230 type Response = Exchange;
1231 type Error = CamelError;
1232 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1233
1234 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1235 Poll::Ready(Ok(()))
1236 }
1237
1238 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1239 let config = self.config.clone();
1240 let client = self.client.clone();
1241
1242 Box::pin(async move {
1243 let method_str = HttpProducer::resolve_method(&exchange, &config);
1244 let url = HttpProducer::resolve_url(&exchange, &config);
1245
1246 validate_url_for_ssrf(&url, &config)?;
1248
1249 debug!(
1250 correlation_id = %exchange.correlation_id(),
1251 method = %method_str,
1252 url = %url,
1253 "HTTP request"
1254 );
1255
1256 let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1257 CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1258 })?;
1259
1260 let mut request = client.request(method, &url);
1261
1262 if let Some(timeout) = config.response_timeout {
1263 request = request.timeout(timeout);
1264 }
1265
1266 #[cfg(feature = "otel")]
1268 {
1269 let mut otel_headers = HashMap::new();
1270 camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1271 for (k, v) in otel_headers {
1272 if let (Ok(name), Ok(val)) = (
1273 reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1274 reqwest::header::HeaderValue::from_str(&v),
1275 ) {
1276 request = request.header(name, val);
1277 }
1278 }
1279 }
1280
1281 for (key, value) in &exchange.input.headers {
1282 if !key.starts_with("Camel")
1283 && let Some(val_str) = value.as_str()
1284 && let (Ok(name), Ok(val)) = (
1285 reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1286 reqwest::header::HeaderValue::from_str(val_str),
1287 )
1288 {
1289 request = request.header(name, val);
1290 }
1291 }
1292
1293 match exchange.input.body {
1294 Body::Stream(ref s) => {
1295 let mut stream_lock = s.stream.lock().await;
1296 if let Some(stream) = stream_lock.take() {
1297 request = request.body(reqwest::Body::wrap_stream(stream));
1298 } else {
1299 return Err(CamelError::AlreadyConsumed);
1300 }
1301 }
1302 _ => {
1303 let body = std::mem::take(&mut exchange.input.body);
1305 let bytes = body.into_bytes(config.max_body_size).await?;
1306 if !bytes.is_empty() {
1307 request = request.body(bytes);
1308 }
1309 }
1310 }
1311
1312 let response = request
1313 .send()
1314 .await
1315 .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1316
1317 let status_code = response.status().as_u16();
1318 let status_text = response
1319 .status()
1320 .canonical_reason()
1321 .unwrap_or("Unknown")
1322 .to_string();
1323
1324 for (key, value) in response.headers() {
1325 if let Ok(val_str) = value.to_str() {
1326 exchange
1327 .input
1328 .set_header(key.as_str(), serde_json::Value::String(val_str.to_string()));
1329 }
1330 }
1331
1332 exchange.input.set_header(
1333 "CamelHttpResponseCode",
1334 serde_json::Value::Number(status_code.into()),
1335 );
1336 exchange.input.set_header(
1337 "CamelHttpResponseText",
1338 serde_json::Value::String(status_text.clone()),
1339 );
1340
1341 let response_body = response.bytes().await.map_err(|e| {
1342 CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1343 })?;
1344
1345 if config.throw_exception_on_failure
1346 && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1347 {
1348 return Err(CamelError::HttpOperationFailed {
1349 method: method_str,
1350 url,
1351 status_code,
1352 status_text,
1353 response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1354 });
1355 }
1356
1357 if !response_body.is_empty() {
1358 exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1359 }
1360
1361 debug!(
1362 correlation_id = %exchange.correlation_id(),
1363 status = status_code,
1364 url = %url,
1365 "HTTP response"
1366 );
1367 Ok(exchange)
1368 })
1369 }
1370}
1371
1372#[cfg(test)]
1373mod tests {
1374 use super::*;
1375 use camel_component_api::{Message, NoOpComponentContext};
1376 use std::sync::Arc;
1377 use std::time::Duration;
1378
1379 fn test_producer_ctx() -> ProducerContext {
1380 ProducerContext::new()
1381 }
1382
1383 #[test]
1384 fn test_http_config_defaults() {
1385 let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1386 assert_eq!(config.base_url, "http://localhost:8080/api");
1387 assert!(config.http_method.is_none());
1388 assert!(config.throw_exception_on_failure);
1389 assert_eq!(config.ok_status_code_range, (200, 299));
1390 assert!(config.response_timeout.is_none());
1391 }
1392
1393 #[test]
1394 fn test_http_config_scheme() {
1395 assert_eq!(HttpEndpointConfig::scheme(), "http");
1397 }
1398
1399 #[test]
1400 fn test_http_config_from_components() {
1401 let components = camel_component_api::UriComponents {
1403 scheme: "https".to_string(),
1404 path: "//api.example.com/v1".to_string(),
1405 params: std::collections::HashMap::from([(
1406 "httpMethod".to_string(),
1407 "POST".to_string(),
1408 )]),
1409 };
1410 let config = HttpEndpointConfig::from_components(components).unwrap();
1411 assert_eq!(config.base_url, "https://api.example.com/v1");
1412 assert_eq!(config.http_method, Some("POST".to_string()));
1413 }
1414
1415 #[test]
1416 fn test_http_config_with_options() {
1417 let config = HttpEndpointConfig::from_uri(
1418 "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1419 ).unwrap();
1420 assert_eq!(config.base_url, "https://api.example.com/v1");
1421 assert_eq!(config.http_method, Some("PUT".to_string()));
1422 assert!(!config.throw_exception_on_failure);
1423 assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1424 }
1425
1426 #[test]
1427 fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
1428 let config = HttpConfig::default()
1429 .with_response_timeout_ms(999)
1430 .with_allow_private_ips(true)
1431 .with_blocked_hosts(vec!["evil.com".to_string()])
1432 .with_max_body_size(12345);
1433 let endpoint =
1434 HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
1435 assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
1436 assert!(endpoint.allow_private_ips);
1437 assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
1438 assert_eq!(endpoint.max_body_size, 12345);
1439 }
1440
1441 #[test]
1442 fn test_from_uri_with_defaults_uri_overrides_config() {
1443 let config = HttpConfig::default()
1444 .with_response_timeout_ms(999)
1445 .with_allow_private_ips(true)
1446 .with_blocked_hosts(vec!["evil.com".to_string()])
1447 .with_max_body_size(12345);
1448 let endpoint = HttpEndpointConfig::from_uri_with_defaults(
1449 "http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
1450 &config,
1451 )
1452 .unwrap();
1453 assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
1454 assert!(!endpoint.allow_private_ips);
1455 assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
1456 assert_eq!(endpoint.max_body_size, 99);
1457 }
1458
1459 #[test]
1460 fn test_http_config_ok_status_range() {
1461 let config =
1462 HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1463 assert_eq!(config.ok_status_code_range, (200, 204));
1464 }
1465
1466 #[test]
1467 fn test_http_config_wrong_scheme() {
1468 let result = HttpEndpointConfig::from_uri("file:/tmp");
1469 assert!(result.is_err());
1470 }
1471
1472 #[test]
1473 fn test_http_component_scheme() {
1474 let component = HttpComponent::new();
1475 assert_eq!(component.scheme(), "http");
1476 }
1477
1478 #[test]
1479 fn test_https_component_scheme() {
1480 let component = HttpsComponent::new();
1481 assert_eq!(component.scheme(), "https");
1482 }
1483
1484 #[test]
1485 fn test_http_endpoint_creates_consumer() {
1486 let component = HttpComponent::new();
1487 let ctx = NoOpComponentContext;
1488 let endpoint = component
1489 .create_endpoint("http://0.0.0.0:19100/test", &ctx)
1490 .unwrap();
1491 assert!(endpoint.create_consumer().is_ok());
1492 }
1493
1494 #[test]
1495 fn test_https_endpoint_creates_consumer() {
1496 let component = HttpsComponent::new();
1497 let ctx = NoOpComponentContext;
1498 let endpoint = component
1499 .create_endpoint("https://0.0.0.0:8443/test", &ctx)
1500 .unwrap();
1501 assert!(endpoint.create_consumer().is_ok());
1502 }
1503
1504 #[test]
1505 fn test_http_endpoint_creates_producer() {
1506 let ctx = test_producer_ctx();
1507 let component = HttpComponent::new();
1508 let endpoint_ctx = NoOpComponentContext;
1509 let endpoint = component
1510 .create_endpoint("http://localhost/api", &endpoint_ctx)
1511 .unwrap();
1512 assert!(endpoint.create_producer(&ctx).is_ok());
1513 }
1514
1515 async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
1520 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1521 let addr = listener.local_addr().unwrap();
1522 let url = format!("http://127.0.0.1:{}", addr.port());
1523
1524 let handle = tokio::spawn(async move {
1525 loop {
1526 if let Ok((mut stream, _)) = listener.accept().await {
1527 tokio::spawn(async move {
1528 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1529 let mut buf = vec![0u8; 4096];
1530 let n = stream.read(&mut buf).await.unwrap_or(0);
1531 let request = String::from_utf8_lossy(&buf[..n]).to_string();
1532
1533 let method = request.split_whitespace().next().unwrap_or("GET");
1534
1535 let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
1536 let response = format!(
1537 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
1538 body.len(),
1539 body
1540 );
1541 let _ = stream.write_all(response.as_bytes()).await;
1542 });
1543 }
1544 }
1545 });
1546
1547 (url, handle)
1548 }
1549
1550 async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
1551 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1552 let addr = listener.local_addr().unwrap();
1553 let url = format!("http://127.0.0.1:{}", addr.port());
1554
1555 let handle = tokio::spawn(async move {
1556 loop {
1557 if let Ok((mut stream, _)) = listener.accept().await {
1558 let status = status;
1559 tokio::spawn(async move {
1560 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1561 let mut buf = vec![0u8; 4096];
1562 let _ = stream.read(&mut buf).await;
1563
1564 let status_text = match status {
1565 404 => "Not Found",
1566 500 => "Internal Server Error",
1567 _ => "Error",
1568 };
1569 let body = "error body";
1570 let response = format!(
1571 "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
1572 status,
1573 status_text,
1574 body.len(),
1575 body
1576 );
1577 let _ = stream.write_all(response.as_bytes()).await;
1578 });
1579 }
1580 }
1581 });
1582
1583 (url, handle)
1584 }
1585
1586 #[tokio::test]
1587 async fn test_http_producer_get_request() {
1588 use tower::ServiceExt;
1589
1590 let (url, _handle) = start_test_server().await;
1591 let ctx = test_producer_ctx();
1592
1593 let component = HttpComponent::new();
1594 let endpoint_ctx = NoOpComponentContext;
1595 let endpoint = component
1596 .create_endpoint(
1597 &format!("{url}/api/test?allowPrivateIps=true"),
1598 &endpoint_ctx,
1599 )
1600 .unwrap();
1601 let producer = endpoint.create_producer(&ctx).unwrap();
1602
1603 let exchange = Exchange::new(Message::default());
1604 let result = producer.oneshot(exchange).await.unwrap();
1605
1606 let status = result
1607 .input
1608 .header("CamelHttpResponseCode")
1609 .and_then(|v| v.as_u64())
1610 .unwrap();
1611 assert_eq!(status, 200);
1612
1613 assert!(!result.input.body.is_empty());
1614 }
1615
1616 #[tokio::test]
1617 async fn test_http_producer_post_with_body() {
1618 use tower::ServiceExt;
1619
1620 let (url, _handle) = start_test_server().await;
1621 let ctx = test_producer_ctx();
1622
1623 let component = HttpComponent::new();
1624 let endpoint_ctx = NoOpComponentContext;
1625 let endpoint = component
1626 .create_endpoint(
1627 &format!("{url}/api/data?allowPrivateIps=true"),
1628 &endpoint_ctx,
1629 )
1630 .unwrap();
1631 let producer = endpoint.create_producer(&ctx).unwrap();
1632
1633 let exchange = Exchange::new(Message::new("request body"));
1634 let result = producer.oneshot(exchange).await.unwrap();
1635
1636 let status = result
1637 .input
1638 .header("CamelHttpResponseCode")
1639 .and_then(|v| v.as_u64())
1640 .unwrap();
1641 assert_eq!(status, 200);
1642 }
1643
1644 #[tokio::test]
1645 async fn test_http_producer_method_from_header() {
1646 use tower::ServiceExt;
1647
1648 let (url, _handle) = start_test_server().await;
1649 let ctx = test_producer_ctx();
1650
1651 let component = HttpComponent::new();
1652 let endpoint_ctx = NoOpComponentContext;
1653 let endpoint = component
1654 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
1655 .unwrap();
1656 let producer = endpoint.create_producer(&ctx).unwrap();
1657
1658 let mut exchange = Exchange::new(Message::default());
1659 exchange.input.set_header(
1660 "CamelHttpMethod",
1661 serde_json::Value::String("DELETE".to_string()),
1662 );
1663
1664 let result = producer.oneshot(exchange).await.unwrap();
1665 let status = result
1666 .input
1667 .header("CamelHttpResponseCode")
1668 .and_then(|v| v.as_u64())
1669 .unwrap();
1670 assert_eq!(status, 200);
1671 }
1672
1673 #[tokio::test]
1674 async fn test_http_producer_forced_method() {
1675 use tower::ServiceExt;
1676
1677 let (url, _handle) = start_test_server().await;
1678 let ctx = test_producer_ctx();
1679
1680 let component = HttpComponent::new();
1681 let endpoint_ctx = NoOpComponentContext;
1682 let endpoint = component
1683 .create_endpoint(
1684 &format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"),
1685 &endpoint_ctx,
1686 )
1687 .unwrap();
1688 let producer = endpoint.create_producer(&ctx).unwrap();
1689
1690 let exchange = Exchange::new(Message::default());
1691 let result = producer.oneshot(exchange).await.unwrap();
1692
1693 let status = result
1694 .input
1695 .header("CamelHttpResponseCode")
1696 .and_then(|v| v.as_u64())
1697 .unwrap();
1698 assert_eq!(status, 200);
1699 }
1700
1701 #[tokio::test]
1702 async fn test_http_producer_throw_exception_on_failure() {
1703 use tower::ServiceExt;
1704
1705 let (url, _handle) = start_status_server(404).await;
1706 let ctx = test_producer_ctx();
1707
1708 let component = HttpComponent::new();
1709 let endpoint_ctx = NoOpComponentContext;
1710 let endpoint = component
1711 .create_endpoint(
1712 &format!("{url}/not-found?allowPrivateIps=true"),
1713 &endpoint_ctx,
1714 )
1715 .unwrap();
1716 let producer = endpoint.create_producer(&ctx).unwrap();
1717
1718 let exchange = Exchange::new(Message::default());
1719 let result = producer.oneshot(exchange).await;
1720 assert!(result.is_err());
1721
1722 match result.unwrap_err() {
1723 CamelError::HttpOperationFailed { status_code, .. } => {
1724 assert_eq!(status_code, 404);
1725 }
1726 e => panic!("Expected HttpOperationFailed, got: {e}"),
1727 }
1728 }
1729
1730 #[tokio::test]
1731 async fn test_http_producer_no_throw_on_failure() {
1732 use tower::ServiceExt;
1733
1734 let (url, _handle) = start_status_server(500).await;
1735 let ctx = test_producer_ctx();
1736
1737 let component = HttpComponent::new();
1738 let endpoint_ctx = NoOpComponentContext;
1739 let endpoint = component
1740 .create_endpoint(
1741 &format!("{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"),
1742 &endpoint_ctx,
1743 )
1744 .unwrap();
1745 let producer = endpoint.create_producer(&ctx).unwrap();
1746
1747 let exchange = Exchange::new(Message::default());
1748 let result = producer.oneshot(exchange).await.unwrap();
1749
1750 let status = result
1751 .input
1752 .header("CamelHttpResponseCode")
1753 .and_then(|v| v.as_u64())
1754 .unwrap();
1755 assert_eq!(status, 500);
1756 }
1757
1758 #[tokio::test]
1759 async fn test_http_producer_uri_override() {
1760 use tower::ServiceExt;
1761
1762 let (url, _handle) = start_test_server().await;
1763 let ctx = test_producer_ctx();
1764
1765 let component = HttpComponent::new();
1766 let endpoint_ctx = NoOpComponentContext;
1767 let endpoint = component
1768 .create_endpoint(
1769 "http://localhost:1/does-not-exist?allowPrivateIps=true",
1770 &endpoint_ctx,
1771 )
1772 .unwrap();
1773 let producer = endpoint.create_producer(&ctx).unwrap();
1774
1775 let mut exchange = Exchange::new(Message::default());
1776 exchange.input.set_header(
1777 "CamelHttpUri",
1778 serde_json::Value::String(format!("{url}/api")),
1779 );
1780
1781 let result = producer.oneshot(exchange).await.unwrap();
1782 let status = result
1783 .input
1784 .header("CamelHttpResponseCode")
1785 .and_then(|v| v.as_u64())
1786 .unwrap();
1787 assert_eq!(status, 200);
1788 }
1789
1790 #[tokio::test]
1791 async fn test_http_producer_response_headers_mapped() {
1792 use tower::ServiceExt;
1793
1794 let (url, _handle) = start_test_server().await;
1795 let ctx = test_producer_ctx();
1796
1797 let component = HttpComponent::new();
1798 let endpoint_ctx = NoOpComponentContext;
1799 let endpoint = component
1800 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
1801 .unwrap();
1802 let producer = endpoint.create_producer(&ctx).unwrap();
1803
1804 let exchange = Exchange::new(Message::default());
1805 let result = producer.oneshot(exchange).await.unwrap();
1806
1807 assert!(
1808 result.input.header("content-type").is_some()
1809 || result.input.header("Content-Type").is_some()
1810 );
1811 assert!(result.input.header("CamelHttpResponseText").is_some());
1812 }
1813
1814 async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
1819 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1820 let addr = listener.local_addr().unwrap();
1821 let url = format!("http://127.0.0.1:{}", addr.port());
1822
1823 let handle = tokio::spawn(async move {
1824 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1825 loop {
1826 if let Ok((mut stream, _)) = listener.accept().await {
1827 tokio::spawn(async move {
1828 let mut buf = vec![0u8; 4096];
1829 let n = stream.read(&mut buf).await.unwrap_or(0);
1830 let request = String::from_utf8_lossy(&buf[..n]).to_string();
1831
1832 if request.contains("GET /final") {
1834 let body = r#"{"status":"final"}"#;
1835 let response = format!(
1836 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1837 body.len(),
1838 body
1839 );
1840 let _ = stream.write_all(response.as_bytes()).await;
1841 } else {
1842 let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
1844 let _ = stream.write_all(response.as_bytes()).await;
1845 }
1846 });
1847 }
1848 }
1849 });
1850
1851 (url, handle)
1852 }
1853
1854 #[tokio::test]
1855 async fn test_follow_redirects_false_does_not_follow() {
1856 use tower::ServiceExt;
1857
1858 let (url, _handle) = start_redirect_server().await;
1859 let ctx = test_producer_ctx();
1860
1861 let component =
1862 HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
1863 let endpoint_ctx = NoOpComponentContext;
1864 let endpoint = component
1865 .create_endpoint(
1866 &format!("{url}?throwExceptionOnFailure=false&allowPrivateIps=true"),
1867 &endpoint_ctx,
1868 )
1869 .unwrap();
1870 let producer = endpoint.create_producer(&ctx).unwrap();
1871
1872 let exchange = Exchange::new(Message::default());
1873 let result = producer.oneshot(exchange).await.unwrap();
1874
1875 let status = result
1877 .input
1878 .header("CamelHttpResponseCode")
1879 .and_then(|v| v.as_u64())
1880 .unwrap();
1881 assert_eq!(
1882 status, 302,
1883 "Should NOT follow redirect when followRedirects=false"
1884 );
1885 }
1886
1887 #[tokio::test]
1888 async fn test_follow_redirects_true_follows_redirect() {
1889 use tower::ServiceExt;
1890
1891 let (url, _handle) = start_redirect_server().await;
1892 let ctx = test_producer_ctx();
1893
1894 let component =
1895 HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
1896 let endpoint_ctx = NoOpComponentContext;
1897 let endpoint = component
1898 .create_endpoint(&format!("{url}?allowPrivateIps=true"), &endpoint_ctx)
1899 .unwrap();
1900 let producer = endpoint.create_producer(&ctx).unwrap();
1901
1902 let exchange = Exchange::new(Message::default());
1903 let result = producer.oneshot(exchange).await.unwrap();
1904
1905 let status = result
1907 .input
1908 .header("CamelHttpResponseCode")
1909 .and_then(|v| v.as_u64())
1910 .unwrap();
1911 assert_eq!(
1912 status, 200,
1913 "Should follow redirect when followRedirects=true"
1914 );
1915 }
1916
1917 #[tokio::test]
1918 async fn test_query_params_forwarded_to_http_request() {
1919 use tower::ServiceExt;
1920
1921 let (url, _handle) = start_test_server().await;
1922 let ctx = test_producer_ctx();
1923
1924 let component = HttpComponent::new();
1925 let endpoint_ctx = NoOpComponentContext;
1926 let endpoint = component
1928 .create_endpoint(
1929 &format!("{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"),
1930 &endpoint_ctx,
1931 )
1932 .unwrap();
1933 let producer = endpoint.create_producer(&ctx).unwrap();
1934
1935 let exchange = Exchange::new(Message::default());
1936 let result = producer.oneshot(exchange).await.unwrap();
1937
1938 let status = result
1941 .input
1942 .header("CamelHttpResponseCode")
1943 .and_then(|v| v.as_u64())
1944 .unwrap();
1945 assert_eq!(status, 200);
1946 }
1947
1948 #[tokio::test]
1949 async fn test_non_camel_query_params_are_forwarded() {
1950 let config = HttpEndpointConfig::from_uri(
1953 "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
1954 )
1955 .unwrap();
1956
1957 assert!(
1959 config.query_params.contains_key("apiKey"),
1960 "apiKey should be preserved"
1961 );
1962 assert!(
1963 config.query_params.contains_key("token"),
1964 "token should be preserved"
1965 );
1966 assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
1967 assert_eq!(config.query_params.get("token").unwrap(), "abc456");
1968
1969 assert!(
1971 !config.query_params.contains_key("httpMethod"),
1972 "httpMethod should not be forwarded"
1973 );
1974 }
1975
1976 #[tokio::test]
1981 async fn test_http_producer_blocks_metadata_endpoint() {
1982 use tower::ServiceExt;
1983
1984 let ctx = test_producer_ctx();
1985 let component = HttpComponent::new();
1986 let endpoint_ctx = NoOpComponentContext;
1987 let endpoint = component
1988 .create_endpoint(
1989 "http://example.com/api?allowPrivateIps=false",
1990 &endpoint_ctx,
1991 )
1992 .unwrap();
1993 let producer = endpoint.create_producer(&ctx).unwrap();
1994
1995 let mut exchange = Exchange::new(Message::default());
1996 exchange.input.set_header(
1997 "CamelHttpUri",
1998 serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
1999 );
2000
2001 let result = producer.oneshot(exchange).await;
2002 assert!(result.is_err(), "Should block AWS metadata endpoint");
2003
2004 let err = result.unwrap_err();
2005 assert!(
2006 err.to_string().contains("Private IP"),
2007 "Error should mention private IP blocking, got: {}",
2008 err
2009 );
2010 }
2011
2012 #[test]
2013 fn test_ssrf_config_defaults() {
2014 let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
2015 assert!(
2016 !config.allow_private_ips,
2017 "Private IPs should be blocked by default"
2018 );
2019 assert!(
2020 config.blocked_hosts.is_empty(),
2021 "Blocked hosts should be empty by default"
2022 );
2023 }
2024
2025 #[test]
2026 fn test_ssrf_config_allow_private_ips() {
2027 let config =
2028 HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
2029 assert!(
2030 config.allow_private_ips,
2031 "Private IPs should be allowed when explicitly set"
2032 );
2033 }
2034
2035 #[test]
2036 fn test_ssrf_config_blocked_hosts() {
2037 let config = HttpEndpointConfig::from_uri(
2038 "http://example.com/api?blockedHosts=evil.com,malware.net",
2039 )
2040 .unwrap();
2041 assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
2042 }
2043
2044 #[tokio::test]
2045 async fn test_http_producer_blocks_localhost() {
2046 use tower::ServiceExt;
2047
2048 let ctx = test_producer_ctx();
2049 let component = HttpComponent::new();
2050 let endpoint_ctx = NoOpComponentContext;
2051 let endpoint = component
2052 .create_endpoint("http://example.com/api", &endpoint_ctx)
2053 .unwrap();
2054 let producer = endpoint.create_producer(&ctx).unwrap();
2055
2056 let mut exchange = Exchange::new(Message::default());
2057 exchange.input.set_header(
2058 "CamelHttpUri",
2059 serde_json::Value::String("http://localhost:8080/internal".to_string()),
2060 );
2061
2062 let result = producer.oneshot(exchange).await;
2063 assert!(result.is_err(), "Should block localhost");
2064 }
2065
2066 #[tokio::test]
2067 async fn test_http_producer_blocks_loopback_ip() {
2068 use tower::ServiceExt;
2069
2070 let ctx = test_producer_ctx();
2071 let component = HttpComponent::new();
2072 let endpoint_ctx = NoOpComponentContext;
2073 let endpoint = component
2074 .create_endpoint("http://example.com/api", &endpoint_ctx)
2075 .unwrap();
2076 let producer = endpoint.create_producer(&ctx).unwrap();
2077
2078 let mut exchange = Exchange::new(Message::default());
2079 exchange.input.set_header(
2080 "CamelHttpUri",
2081 serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
2082 );
2083
2084 let result = producer.oneshot(exchange).await;
2085 assert!(result.is_err(), "Should block loopback IP");
2086 }
2087
2088 #[tokio::test]
2089 async fn test_http_producer_allows_private_ip_when_enabled() {
2090 use tower::ServiceExt;
2091
2092 let ctx = test_producer_ctx();
2093 let component = HttpComponent::new();
2094 let endpoint_ctx = NoOpComponentContext;
2095 let endpoint = component
2098 .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true", &endpoint_ctx)
2099 .unwrap();
2100 let producer = endpoint.create_producer(&ctx).unwrap();
2101
2102 let exchange = Exchange::new(Message::default());
2103
2104 let result = producer.oneshot(exchange).await;
2107 if let Err(ref e) = result {
2109 let err_str = e.to_string();
2110 assert!(
2111 !err_str.contains("Private IP") && !err_str.contains("not allowed"),
2112 "Should not be SSRF error, got: {}",
2113 err_str
2114 );
2115 }
2116 }
2117
2118 #[test]
2123 fn test_http_server_config_parse() {
2124 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
2125 assert_eq!(cfg.host, "0.0.0.0");
2126 assert_eq!(cfg.port, 8080);
2127 assert_eq!(cfg.path, "/orders");
2128 assert_eq!(cfg.max_inflight_requests, 1024);
2129 }
2130
2131 #[test]
2132 fn test_http_server_config_scheme() {
2133 assert_eq!(HttpServerConfig::scheme(), "http");
2135 }
2136
2137 #[test]
2138 fn test_http_server_config_from_components() {
2139 let components = camel_component_api::UriComponents {
2141 scheme: "https".to_string(),
2142 path: "//0.0.0.0:8443/api".to_string(),
2143 params: std::collections::HashMap::from([
2144 ("maxRequestBody".to_string(), "5242880".to_string()),
2145 ("maxInflightRequests".to_string(), "7".to_string()),
2146 ]),
2147 };
2148 let cfg = HttpServerConfig::from_components(components).unwrap();
2149 assert_eq!(cfg.host, "0.0.0.0");
2150 assert_eq!(cfg.port, 8443);
2151 assert_eq!(cfg.path, "/api");
2152 assert_eq!(cfg.max_request_body, 5242880);
2153 assert_eq!(cfg.max_inflight_requests, 7);
2154 }
2155
2156 #[test]
2157 fn test_http_server_config_default_path() {
2158 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
2159 assert_eq!(cfg.path, "/");
2160 }
2161
2162 #[test]
2163 fn test_http_server_config_wrong_scheme() {
2164 assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
2165 }
2166
2167 #[test]
2168 fn test_http_server_config_invalid_port() {
2169 assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
2170 }
2171
2172 #[test]
2173 fn test_http_server_config_default_port_by_scheme() {
2174 let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
2176 assert_eq!(cfg_http.port, 80);
2177
2178 let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
2180 assert_eq!(cfg_https.port, 443);
2181 }
2182
2183 #[test]
2184 fn test_request_envelope_and_reply_are_send() {
2185 fn assert_send<T: Send>() {}
2186 assert_send::<RequestEnvelope>();
2187 assert_send::<HttpReply>();
2188 }
2189
2190 #[test]
2195 fn test_server_registry_global_is_singleton() {
2196 let r1 = ServerRegistry::global();
2197 let r2 = ServerRegistry::global();
2198 assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
2199 }
2200
2201 #[tokio::test]
2202 async fn test_concurrent_get_or_spawn_returns_same_dispatch() {
2203 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2204 let port = listener.local_addr().unwrap().port();
2205 drop(listener);
2206
2207 let results: Arc<std::sync::Mutex<Vec<DispatchTable>>> =
2208 Arc::new(std::sync::Mutex::new(Vec::new()));
2209
2210 let mut handles = Vec::new();
2211 for _ in 0..4 {
2212 let results = results.clone();
2213 handles.push(tokio::spawn(async move {
2214 let dispatch = ServerRegistry::global()
2215 .get_or_spawn(
2216 "127.0.0.1",
2217 port,
2218 2 * 1024 * 1024,
2219 10 * 1024 * 1024,
2220 1024,
2221 )
2222 .await
2223 .unwrap();
2224 results.lock().unwrap().push(dispatch);
2225 }));
2226 }
2227
2228 for h in handles {
2229 h.await.unwrap();
2230 }
2231
2232 let dispatches = results.lock().unwrap();
2233 assert_eq!(dispatches.len(), 4);
2234 for i in 1..dispatches.len() {
2235 assert!(
2236 Arc::ptr_eq(&dispatches[0], &dispatches[i]),
2237 "all concurrent callers should get the same dispatch table"
2238 );
2239 }
2240 }
2241
2242 #[test]
2243 fn test_server_registry_distinguishes_host_and_port() {
2244 let rt = tokio::runtime::Runtime::new().expect("runtime");
2245 rt.block_on(async {
2246 let registry = ServerRegistry::global();
2247 let d1 = registry
2251 .get_or_spawn("127.0.0.1", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2252 .await;
2253 let d2 = registry
2254 .get_or_spawn("0.0.0.0", 0, 1024 * 1024, 10 * 1024 * 1024, 1024)
2255 .await;
2256 assert!(d1.is_ok());
2257 assert!(d2.is_ok());
2258 assert!(!Arc::ptr_eq(&d1.unwrap(), &d2.unwrap()));
2259 });
2260 }
2261
2262 #[tokio::test]
2263 async fn test_shared_server_max_request_body_policy_is_deterministic() {
2264 let registry = ServerRegistry::global();
2265 let d1 = registry
2267 .get_or_spawn("127.0.0.1", 9991, 1024 * 1024, 10 * 1024 * 1024, 1024)
2268 .await;
2269 assert!(d1.is_ok());
2270
2271 let d2 = registry
2274 .get_or_spawn("127.0.0.1", 9991, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
2275 .await;
2276 assert!(d2.is_err());
2277 let err = d2.unwrap_err();
2278 assert!(
2279 err.to_string().contains("maxRequestBody") || err.to_string().contains("incompatible"),
2280 "Expected incompatible maxRequestBody error, got: {}",
2281 err
2282 );
2283 }
2284
2285 #[tokio::test]
2290 async fn test_dispatch_handler_returns_404_for_unknown_path() {
2291 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
2292 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2294 let port = listener.local_addr().unwrap().port();
2295 tokio::spawn(run_axum_server(
2296 listener,
2297 dispatch,
2298 2 * 1024 * 1024,
2299 10 * 1024 * 1024,
2300 Arc::new(tokio::sync::Semaphore::new(1024)),
2301 ));
2302
2303 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2305
2306 let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
2307 .await
2308 .unwrap();
2309 assert_eq!(resp.status().as_u16(), 404);
2310 }
2311
2312 #[tokio::test]
2317 async fn test_http_consumer_start_registers_path() {
2318 use camel_component_api::ConsumerContext;
2319
2320 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2322 let port = listener.local_addr().unwrap().port();
2323 drop(listener); let consumer_cfg = HttpServerConfig {
2326 host: "127.0.0.1".to_string(),
2327 port,
2328 path: "/ping".to_string(),
2329 max_request_body: 2 * 1024 * 1024,
2330 max_response_body: 10 * 1024 * 1024,
2331 max_inflight_requests: 1024,
2332 };
2333 let mut consumer = HttpConsumer::new(consumer_cfg);
2334
2335 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
2336 let token = tokio_util::sync::CancellationToken::new();
2337 let ctx = ConsumerContext::new(tx, token.clone());
2338
2339 tokio::spawn(async move {
2340 consumer.start(ctx).await.unwrap();
2341 });
2342
2343 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2344
2345 let client = reqwest::Client::new();
2346 let resp_future = client
2347 .post(format!("http://127.0.0.1:{port}/ping"))
2348 .body("hello world")
2349 .send();
2350
2351 let (http_result, _) = tokio::join!(resp_future, async {
2352 if let Some(mut envelope) = rx.recv().await {
2353 envelope.exchange.input.set_header(
2355 "CamelHttpResponseCode",
2356 serde_json::Value::Number(201.into()),
2357 );
2358 if let Some(reply_tx) = envelope.reply_tx {
2359 let _ = reply_tx.send(Ok(envelope.exchange));
2360 }
2361 }
2362 });
2363
2364 let resp = http_result.unwrap();
2365 assert_eq!(resp.status().as_u16(), 201);
2366
2367 token.cancel();
2368 }
2369
2370 #[tokio::test]
2371 async fn test_http_consumer_returns_503_when_inflight_limit_reached() {
2372 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2373
2374 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2375 let port = listener.local_addr().unwrap().port();
2376 drop(listener);
2377
2378 let consumer_cfg = HttpServerConfig {
2379 host: "127.0.0.1".to_string(),
2380 port,
2381 path: "/saturation".to_string(),
2382 max_request_body: 2 * 1024 * 1024,
2383 max_response_body: 10 * 1024 * 1024,
2384 max_inflight_requests: 1,
2385 };
2386 let mut consumer = HttpConsumer::new(consumer_cfg);
2387
2388 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2389 let token = tokio_util::sync::CancellationToken::new();
2390 let ctx = ConsumerContext::new(tx, token.clone());
2391 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2392 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2393
2394 let (first_seen_tx, first_seen_rx) = tokio::sync::oneshot::channel::<()>();
2395 let (unblock_first_tx, unblock_first_rx) = tokio::sync::oneshot::channel::<()>();
2396
2397 tokio::spawn(async move {
2398 let mut first_seen_tx = Some(first_seen_tx);
2399 let mut unblock_first_rx = Some(unblock_first_rx);
2400
2401 while let Some(envelope) = rx.recv().await {
2402 if let Some(tx) = first_seen_tx.take() {
2403 let _ = tx.send(());
2404 if let Some(rx_unblock) = unblock_first_rx.take() {
2405 let _ = rx_unblock.await;
2406 }
2407 }
2408
2409 if let Some(reply_tx) = envelope.reply_tx {
2410 let _ = reply_tx.send(Ok(envelope.exchange));
2411 }
2412 }
2413 });
2414
2415 let client = reqwest::Client::new();
2416 let first_req = {
2417 let client = client.clone();
2418 async move {
2419 client
2420 .get(format!("http://127.0.0.1:{port}/saturation"))
2421 .send()
2422 .await
2423 .unwrap()
2424 }
2425 };
2426
2427 let first_handle = tokio::spawn(first_req);
2428 first_seen_rx.await.unwrap();
2429
2430 let second_resp = client
2431 .get(format!("http://127.0.0.1:{port}/saturation"))
2432 .send()
2433 .await
2434 .unwrap();
2435
2436 assert_eq!(second_resp.status().as_u16(), 503);
2437
2438 let _ = unblock_first_tx.send(());
2439 let first_resp = first_handle.await.unwrap();
2440 assert_eq!(first_resp.status().as_u16(), 200);
2441
2442 token.cancel();
2443 }
2444
2445 #[tokio::test]
2446 async fn test_http_consumer_enforces_max_response_body_for_bytes() {
2447 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2448
2449 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2450 let port = listener.local_addr().unwrap().port();
2451 drop(listener);
2452
2453 let consumer_cfg = HttpServerConfig {
2454 host: "127.0.0.1".to_string(),
2455 port,
2456 path: "/limit-bytes".to_string(),
2457 max_request_body: 2 * 1024 * 1024,
2458 max_response_body: 16,
2459 max_inflight_requests: 1024,
2460 };
2461 let mut consumer = HttpConsumer::new(consumer_cfg);
2462
2463 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2464 let token = tokio_util::sync::CancellationToken::new();
2465 let ctx = ConsumerContext::new(tx, token.clone());
2466 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2467 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2468
2469 let client = reqwest::Client::new();
2470 let send_fut = client
2471 .get(format!("http://127.0.0.1:{port}/limit-bytes"))
2472 .send();
2473
2474 let (http_result, _) = tokio::join!(send_fut, async {
2475 if let Some(mut envelope) = rx.recv().await {
2476 envelope.exchange.input.body =
2477 camel_component_api::Body::Bytes(bytes::Bytes::from(vec![b'x'; 32]));
2478 if let Some(reply_tx) = envelope.reply_tx {
2479 let _ = reply_tx.send(Ok(envelope.exchange));
2480 }
2481 }
2482 });
2483
2484 let resp = http_result.unwrap();
2485 assert_eq!(resp.status().as_u16(), 500);
2486 let body = resp.text().await.unwrap();
2487 assert_eq!(body, "Response body exceeds configured limit");
2488 token.cancel();
2489 }
2490
2491 #[tokio::test]
2492 async fn test_http_consumer_enforces_max_response_body_for_json() {
2493 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2494
2495 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2496 let port = listener.local_addr().unwrap().port();
2497 drop(listener);
2498
2499 let consumer_cfg = HttpServerConfig {
2500 host: "127.0.0.1".to_string(),
2501 port,
2502 path: "/limit-json".to_string(),
2503 max_request_body: 2 * 1024 * 1024,
2504 max_response_body: 16,
2505 max_inflight_requests: 1024,
2506 };
2507 let mut consumer = HttpConsumer::new(consumer_cfg);
2508
2509 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2510 let token = tokio_util::sync::CancellationToken::new();
2511 let ctx = ConsumerContext::new(tx, token.clone());
2512 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2513 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2514
2515 let client = reqwest::Client::new();
2516 let send_fut = client.get(format!("http://127.0.0.1:{port}/limit-json")).send();
2517
2518 let (http_result, _) = tokio::join!(send_fut, async {
2519 if let Some(mut envelope) = rx.recv().await {
2520 envelope.exchange.input.body = camel_component_api::Body::Json(
2521 serde_json::json!({"message":"this response is bigger than sixteen"}),
2522 );
2523 if let Some(reply_tx) = envelope.reply_tx {
2524 let _ = reply_tx.send(Ok(envelope.exchange));
2525 }
2526 }
2527 });
2528
2529 let resp = http_result.unwrap();
2530 assert_eq!(resp.status().as_u16(), 500);
2531 let body = resp.text().await.unwrap();
2532 assert_eq!(body, "Response body exceeds configured limit");
2533 token.cancel();
2534 }
2535
2536 #[tokio::test]
2537 async fn test_http_consumer_enforces_max_response_body_for_xml() {
2538 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2539
2540 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2541 let port = listener.local_addr().unwrap().port();
2542 drop(listener);
2543
2544 let consumer_cfg = HttpServerConfig {
2545 host: "127.0.0.1".to_string(),
2546 port,
2547 path: "/limit-xml".to_string(),
2548 max_request_body: 2 * 1024 * 1024,
2549 max_response_body: 16,
2550 max_inflight_requests: 1024,
2551 };
2552 let mut consumer = HttpConsumer::new(consumer_cfg);
2553
2554 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2555 let token = tokio_util::sync::CancellationToken::new();
2556 let ctx = ConsumerContext::new(tx, token.clone());
2557 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2558 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2559
2560 let client = reqwest::Client::new();
2561 let send_fut = client.get(format!("http://127.0.0.1:{port}/limit-xml")).send();
2562
2563 let (http_result, _) = tokio::join!(send_fut, async {
2564 if let Some(mut envelope) = rx.recv().await {
2565 envelope.exchange.input.body =
2566 camel_component_api::Body::Xml("<root><value>way-too-large</value></root>".into());
2567 if let Some(reply_tx) = envelope.reply_tx {
2568 let _ = reply_tx.send(Ok(envelope.exchange));
2569 }
2570 }
2571 });
2572
2573 let resp = http_result.unwrap();
2574 assert_eq!(resp.status().as_u16(), 500);
2575 let body = resp.text().await.unwrap();
2576 assert_eq!(body, "Response body exceeds configured limit");
2577 token.cancel();
2578 }
2579
2580 #[tokio::test]
2581 async fn test_http_consumer_does_not_enforce_max_response_body_for_stream() {
2582 use camel_component_api::{CamelError, ConsumerContext, ExchangeEnvelope, StreamBody, StreamMetadata};
2583 use futures::stream;
2584
2585 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2586 let port = listener.local_addr().unwrap().port();
2587 drop(listener);
2588
2589 let consumer_cfg = HttpServerConfig {
2590 host: "127.0.0.1".to_string(),
2591 port,
2592 path: "/limit-stream".to_string(),
2593 max_request_body: 2 * 1024 * 1024,
2594 max_response_body: 16,
2595 max_inflight_requests: 1024,
2596 };
2597 let mut consumer = HttpConsumer::new(consumer_cfg);
2598
2599 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2600 let token = tokio_util::sync::CancellationToken::new();
2601 let ctx = ConsumerContext::new(tx, token.clone());
2602 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2603 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2604
2605 let client = reqwest::Client::new();
2606 let send_fut = client
2607 .get(format!("http://127.0.0.1:{port}/limit-stream"))
2608 .send();
2609
2610 let (http_result, _) = tokio::join!(send_fut, async {
2611 if let Some(mut envelope) = rx.recv().await {
2612 let chunks: Vec<Result<bytes::Bytes, CamelError>> =
2613 vec![Ok(bytes::Bytes::from(vec![b'x'; 32]))];
2614 let stream = Box::pin(stream::iter(chunks));
2615 envelope.exchange.input.body = camel_component_api::Body::Stream(StreamBody {
2616 stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
2617 metadata: StreamMetadata {
2618 size_hint: Some(32),
2619 content_type: Some("application/octet-stream".into()),
2620 origin: None,
2621 },
2622 });
2623 if let Some(reply_tx) = envelope.reply_tx {
2624 let _ = reply_tx.send(Ok(envelope.exchange));
2625 }
2626 }
2627 });
2628
2629 let resp = http_result.unwrap();
2630 assert_eq!(resp.status().as_u16(), 200);
2631 let body = resp.bytes().await.unwrap();
2632 assert_eq!(body.len(), 32);
2633 token.cancel();
2634 }
2635
2636 #[tokio::test]
2641 async fn test_integration_single_consumer_round_trip() {
2642 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2643
2644 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2646 let port = listener.local_addr().unwrap().port();
2647 drop(listener); let component = HttpComponent::new();
2650 let endpoint_ctx = NoOpComponentContext;
2651 let endpoint = component
2652 .create_endpoint(&format!("http://127.0.0.1:{port}/echo"), &endpoint_ctx)
2653 .unwrap();
2654 let mut consumer = endpoint.create_consumer().unwrap();
2655
2656 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2657 let token = tokio_util::sync::CancellationToken::new();
2658 let ctx = ConsumerContext::new(tx, token.clone());
2659
2660 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2661 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2662
2663 let client = reqwest::Client::new();
2664 let send_fut = client
2665 .post(format!("http://127.0.0.1:{port}/echo"))
2666 .header("Content-Type", "text/plain")
2667 .body("ping")
2668 .send();
2669
2670 let (http_result, _) = tokio::join!(send_fut, async {
2671 if let Some(mut envelope) = rx.recv().await {
2672 assert_eq!(
2673 envelope.exchange.input.header("CamelHttpMethod"),
2674 Some(&serde_json::Value::String("POST".into()))
2675 );
2676 assert_eq!(
2677 envelope.exchange.input.header("CamelHttpPath"),
2678 Some(&serde_json::Value::String("/echo".into()))
2679 );
2680 envelope.exchange.input.body = camel_component_api::Body::Text("pong".to_string());
2681 if let Some(reply_tx) = envelope.reply_tx {
2682 let _ = reply_tx.send(Ok(envelope.exchange));
2683 }
2684 }
2685 });
2686
2687 let resp = http_result.unwrap();
2688 assert_eq!(resp.status().as_u16(), 200);
2689 let body = resp.text().await.unwrap();
2690 assert_eq!(body, "pong");
2691
2692 token.cancel();
2693 }
2694
2695 #[tokio::test]
2696 async fn test_integration_two_consumers_shared_port() {
2697 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2698
2699 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2701 let port = listener.local_addr().unwrap().port();
2702 drop(listener);
2703
2704 let component = HttpComponent::new();
2705 let endpoint_ctx = NoOpComponentContext;
2706
2707 let endpoint_a = component
2709 .create_endpoint(&format!("http://127.0.0.1:{port}/hello"), &endpoint_ctx)
2710 .unwrap();
2711 let mut consumer_a = endpoint_a.create_consumer().unwrap();
2712
2713 let endpoint_b = component
2715 .create_endpoint(&format!("http://127.0.0.1:{port}/world"), &endpoint_ctx)
2716 .unwrap();
2717 let mut consumer_b = endpoint_b.create_consumer().unwrap();
2718
2719 let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2720 let token_a = tokio_util::sync::CancellationToken::new();
2721 let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
2722
2723 let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2724 let token_b = tokio_util::sync::CancellationToken::new();
2725 let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
2726
2727 tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
2728 tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
2729 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2730
2731 let client = reqwest::Client::new();
2732
2733 let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
2735 let (resp_hello, _) = tokio::join!(fut_hello, async {
2736 if let Some(mut envelope) = rx_a.recv().await {
2737 envelope.exchange.input.body =
2738 camel_component_api::Body::Text("hello-response".to_string());
2739 if let Some(reply_tx) = envelope.reply_tx {
2740 let _ = reply_tx.send(Ok(envelope.exchange));
2741 }
2742 }
2743 });
2744
2745 let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
2747 let (resp_world, _) = tokio::join!(fut_world, async {
2748 if let Some(mut envelope) = rx_b.recv().await {
2749 envelope.exchange.input.body =
2750 camel_component_api::Body::Text("world-response".to_string());
2751 if let Some(reply_tx) = envelope.reply_tx {
2752 let _ = reply_tx.send(Ok(envelope.exchange));
2753 }
2754 }
2755 });
2756
2757 let body_a = resp_hello.unwrap().text().await.unwrap();
2758 let body_b = resp_world.unwrap().text().await.unwrap();
2759
2760 assert_eq!(body_a, "hello-response");
2761 assert_eq!(body_b, "world-response");
2762
2763 token_a.cancel();
2764 token_b.cancel();
2765 }
2766
2767 #[tokio::test]
2768 async fn test_integration_unregistered_path_returns_404() {
2769 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2770
2771 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2773 let port = listener.local_addr().unwrap().port();
2774 drop(listener);
2775
2776 let component = HttpComponent::new();
2777 let endpoint_ctx = NoOpComponentContext;
2778 let endpoint = component
2779 .create_endpoint(
2780 &format!("http://127.0.0.1:{port}/registered"),
2781 &endpoint_ctx,
2782 )
2783 .unwrap();
2784 let mut consumer = endpoint.create_consumer().unwrap();
2785
2786 let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2787 let token = tokio_util::sync::CancellationToken::new();
2788 let ctx = ConsumerContext::new(tx, token.clone());
2789
2790 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2791 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2792
2793 let client = reqwest::Client::new();
2794 let resp = client
2795 .get(format!("http://127.0.0.1:{port}/not-there"))
2796 .send()
2797 .await
2798 .unwrap();
2799 assert_eq!(resp.status().as_u16(), 404);
2800
2801 token.cancel();
2802 }
2803
2804 #[test]
2805 fn test_http_consumer_declares_concurrent() {
2806 use camel_component_api::ConcurrencyModel;
2807
2808 let config = HttpServerConfig {
2809 host: "127.0.0.1".to_string(),
2810 port: 19999,
2811 path: "/test".to_string(),
2812 max_request_body: 2 * 1024 * 1024,
2813 max_response_body: 10 * 1024 * 1024,
2814 max_inflight_requests: 1024,
2815 };
2816 let consumer = HttpConsumer::new(config);
2817 assert_eq!(
2818 consumer.concurrency_model(),
2819 ConcurrencyModel::Concurrent { max: None }
2820 );
2821 }
2822
2823 #[tokio::test]
2828 async fn test_http_reply_body_stream_variant_exists() {
2829 use bytes::Bytes;
2830 use camel_component_api::CamelError;
2831 use futures::stream;
2832
2833 let chunks: Vec<Result<Bytes, CamelError>> =
2834 vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
2835 let stream = Box::pin(stream::iter(chunks));
2836 let reply_body = HttpReplyBody::Stream(stream);
2837 match reply_body {
2839 HttpReplyBody::Stream(_) => {}
2840 HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
2841 }
2842 }
2843
2844 #[cfg(feature = "otel")]
2849 mod otel_tests {
2850 use super::*;
2851 use camel_component_api::Message;
2852 use tower::ServiceExt;
2853
2854 #[tokio::test]
2855 async fn test_producer_injects_traceparent_header() {
2856 let (url, _handle) = start_test_server_with_header_capture().await;
2857 let ctx = test_producer_ctx();
2858
2859 let component = HttpComponent::new();
2860 let endpoint_ctx = NoOpComponentContext;
2861 let endpoint = component
2862 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
2863 .unwrap();
2864 let producer = endpoint.create_producer(&ctx).unwrap();
2865
2866 let mut exchange = Exchange::new(Message::default());
2868 let mut headers = std::collections::HashMap::new();
2869 headers.insert(
2870 "traceparent".to_string(),
2871 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
2872 );
2873 camel_otel::extract_into_exchange(&mut exchange, &headers);
2874
2875 let result = producer.oneshot(exchange).await.unwrap();
2876
2877 let status = result
2879 .input
2880 .header("CamelHttpResponseCode")
2881 .and_then(|v| v.as_u64())
2882 .unwrap();
2883 assert_eq!(status, 200);
2884
2885 let traceparent = result.input.header("x-received-traceparent");
2887 assert!(
2888 traceparent.is_some(),
2889 "traceparent header should have been sent"
2890 );
2891
2892 let traceparent_str = traceparent.unwrap().as_str().unwrap();
2893 let parts: Vec<&str> = traceparent_str.split('-').collect();
2895 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2896 assert_eq!(parts[0], "00", "version should be 00");
2897 assert_eq!(
2898 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2899 "trace-id should match"
2900 );
2901 assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
2902 assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
2903 }
2904
2905 #[tokio::test]
2906 async fn test_consumer_extracts_traceparent_header() {
2907 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2908
2909 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2911 let port = listener.local_addr().unwrap().port();
2912 drop(listener);
2913
2914 let component = HttpComponent::new();
2915 let endpoint_ctx = NoOpComponentContext;
2916 let endpoint = component
2917 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
2918 .unwrap();
2919 let mut consumer = endpoint.create_consumer().unwrap();
2920
2921 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2922 let token = tokio_util::sync::CancellationToken::new();
2923 let ctx = ConsumerContext::new(tx, token.clone());
2924
2925 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2926 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2927
2928 let client = reqwest::Client::new();
2930 let send_fut = client
2931 .post(format!("http://127.0.0.1:{port}/trace"))
2932 .header(
2933 "traceparent",
2934 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2935 )
2936 .body("test")
2937 .send();
2938
2939 let (http_result, _) = tokio::join!(send_fut, async {
2940 if let Some(envelope) = rx.recv().await {
2941 let mut injected_headers = std::collections::HashMap::new();
2944 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2945
2946 assert!(
2947 injected_headers.contains_key("traceparent"),
2948 "Exchange should have traceparent after extraction"
2949 );
2950
2951 let traceparent = injected_headers.get("traceparent").unwrap();
2952 let parts: Vec<&str> = traceparent.split('-').collect();
2953 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2954 assert_eq!(
2955 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2956 "Trace ID should match the original traceparent header"
2957 );
2958
2959 if let Some(reply_tx) = envelope.reply_tx {
2960 let _ = reply_tx.send(Ok(envelope.exchange));
2961 }
2962 }
2963 });
2964
2965 let resp = http_result.unwrap();
2966 assert_eq!(resp.status().as_u16(), 200);
2967
2968 token.cancel();
2969 }
2970
2971 #[tokio::test]
2972 async fn test_consumer_extracts_mixed_case_traceparent_header() {
2973 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2974
2975 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2977 let port = listener.local_addr().unwrap().port();
2978 drop(listener);
2979
2980 let component = HttpComponent::new();
2981 let endpoint_ctx = NoOpComponentContext;
2982 let endpoint = component
2983 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"), &endpoint_ctx)
2984 .unwrap();
2985 let mut consumer = endpoint.create_consumer().unwrap();
2986
2987 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2988 let token = tokio_util::sync::CancellationToken::new();
2989 let ctx = ConsumerContext::new(tx, token.clone());
2990
2991 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2992 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2993
2994 let client = reqwest::Client::new();
2996 let send_fut = client
2997 .post(format!("http://127.0.0.1:{port}/trace"))
2998 .header(
2999 "TraceParent",
3000 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
3001 )
3002 .body("test")
3003 .send();
3004
3005 let (http_result, _) = tokio::join!(send_fut, async {
3006 if let Some(envelope) = rx.recv().await {
3007 let mut injected_headers = HashMap::new();
3010 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
3011
3012 assert!(
3013 injected_headers.contains_key("traceparent"),
3014 "Exchange should have traceparent after extraction from mixed-case header"
3015 );
3016
3017 let traceparent = injected_headers.get("traceparent").unwrap();
3018 let parts: Vec<&str> = traceparent.split('-').collect();
3019 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
3020 assert_eq!(
3021 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
3022 "Trace ID should match the original mixed-case TraceParent header"
3023 );
3024
3025 if let Some(reply_tx) = envelope.reply_tx {
3026 let _ = reply_tx.send(Ok(envelope.exchange));
3027 }
3028 }
3029 });
3030
3031 let resp = http_result.unwrap();
3032 assert_eq!(resp.status().as_u16(), 200);
3033
3034 token.cancel();
3035 }
3036
3037 #[tokio::test]
3038 async fn test_producer_no_trace_context_no_crash() {
3039 let (url, _handle) = start_test_server().await;
3040 let ctx = test_producer_ctx();
3041
3042 let component = HttpComponent::new();
3043 let endpoint_ctx = NoOpComponentContext;
3044 let endpoint = component
3045 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"), &endpoint_ctx)
3046 .unwrap();
3047 let producer = endpoint.create_producer(&ctx).unwrap();
3048
3049 let exchange = Exchange::new(Message::default());
3051
3052 let result = producer.oneshot(exchange).await.unwrap();
3054
3055 let status = result
3057 .input
3058 .header("CamelHttpResponseCode")
3059 .and_then(|v| v.as_u64())
3060 .unwrap();
3061 assert_eq!(status, 200);
3062 }
3063
3064 async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
3066 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3067 let addr = listener.local_addr().unwrap();
3068 let url = format!("http://127.0.0.1:{}", addr.port());
3069
3070 let handle = tokio::spawn(async move {
3071 loop {
3072 if let Ok((mut stream, _)) = listener.accept().await {
3073 tokio::spawn(async move {
3074 use tokio::io::{AsyncReadExt, AsyncWriteExt};
3075 let mut buf = vec![0u8; 8192];
3076 let n = stream.read(&mut buf).await.unwrap_or(0);
3077 let request = String::from_utf8_lossy(&buf[..n]).to_string();
3078
3079 let traceparent = request
3081 .lines()
3082 .find(|line| line.to_lowercase().starts_with("traceparent:"))
3083 .map(|line| {
3084 line.split(':')
3085 .nth(1)
3086 .map(|s| s.trim().to_string())
3087 .unwrap_or_default()
3088 })
3089 .unwrap_or_default();
3090
3091 let body =
3092 format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
3093 let response = format!(
3094 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
3095 body.len(),
3096 traceparent,
3097 body
3098 );
3099 let _ = stream.write_all(response.as_bytes()).await;
3100 });
3101 }
3102 }
3103 });
3104
3105 (url, handle)
3106 }
3107 }
3108
3109 #[tokio::test]
3118 async fn test_request_body_arrives_as_stream() {
3119 use camel_component_api::Body;
3120 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3121
3122 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3123 let port = listener.local_addr().unwrap().port();
3124 drop(listener);
3125
3126 let component = HttpComponent::new();
3127 let endpoint_ctx = NoOpComponentContext;
3128 let endpoint = component
3129 .create_endpoint(&format!("http://127.0.0.1:{port}/upload"), &endpoint_ctx)
3130 .unwrap();
3131 let mut consumer = endpoint.create_consumer().unwrap();
3132
3133 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3134 let token = tokio_util::sync::CancellationToken::new();
3135 let ctx = ConsumerContext::new(tx, token.clone());
3136
3137 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3138 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3139
3140 let client = reqwest::Client::new();
3141 let send_fut = client
3142 .post(format!("http://127.0.0.1:{port}/upload"))
3143 .body("hello streaming world")
3144 .send();
3145
3146 let (http_result, _) = tokio::join!(send_fut, async {
3147 if let Some(mut envelope) = rx.recv().await {
3148 assert!(
3150 matches!(envelope.exchange.input.body, Body::Stream(_)),
3151 "expected Body::Stream, got discriminant {:?}",
3152 std::mem::discriminant(&envelope.exchange.input.body)
3153 );
3154 let bytes = envelope
3156 .exchange
3157 .input
3158 .body
3159 .into_bytes(1024 * 1024)
3160 .await
3161 .unwrap();
3162 assert_eq!(&bytes[..], b"hello streaming world");
3163
3164 envelope.exchange.input.body = camel_component_api::Body::Empty;
3165 if let Some(reply_tx) = envelope.reply_tx {
3166 let _ = reply_tx.send(Ok(envelope.exchange));
3167 }
3168 }
3169 });
3170
3171 let resp = http_result.unwrap();
3172 assert_eq!(resp.status().as_u16(), 200);
3173
3174 token.cancel();
3175 }
3176
3177 #[tokio::test]
3182 async fn test_streaming_response_chunked() {
3183 use bytes::Bytes;
3184 use camel_component_api::Body;
3185 use camel_component_api::CamelError;
3186 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
3187 use camel_component_api::{StreamBody, StreamMetadata};
3188 use futures::stream;
3189 use std::sync::Arc;
3190 use tokio::sync::Mutex;
3191
3192 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3193 let port = listener.local_addr().unwrap().port();
3194 drop(listener);
3195
3196 let component = HttpComponent::new();
3197 let endpoint_ctx = NoOpComponentContext;
3198 let endpoint = component
3199 .create_endpoint(&format!("http://127.0.0.1:{port}/stream"), &endpoint_ctx)
3200 .unwrap();
3201 let mut consumer = endpoint.create_consumer().unwrap();
3202
3203 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
3204 let token = tokio_util::sync::CancellationToken::new();
3205 let ctx = ConsumerContext::new(tx, token.clone());
3206
3207 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3208 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3209
3210 let client = reqwest::Client::new();
3211 let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
3212
3213 let (http_result, _) = tokio::join!(send_fut, async {
3214 if let Some(mut envelope) = rx.recv().await {
3215 let chunks: Vec<Result<Bytes, CamelError>> =
3217 vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
3218 let stream = Box::pin(stream::iter(chunks));
3219 envelope.exchange.input.body = Body::Stream(StreamBody {
3220 stream: Arc::new(Mutex::new(Some(stream))),
3221 metadata: StreamMetadata::default(),
3222 });
3223 if let Some(reply_tx) = envelope.reply_tx {
3224 let _ = reply_tx.send(Ok(envelope.exchange));
3225 }
3226 }
3227 });
3228
3229 let resp = http_result.unwrap();
3230 assert_eq!(resp.status().as_u16(), 200);
3231 let body = resp.text().await.unwrap();
3232 assert_eq!(body, "chunk1chunk2");
3233
3234 token.cancel();
3235 }
3236
3237 #[tokio::test]
3242 async fn test_413_when_content_length_exceeds_limit() {
3243 use camel_component_api::ConsumerContext;
3244
3245 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3246 let port = listener.local_addr().unwrap().port();
3247 drop(listener);
3248
3249 let component = HttpComponent::new();
3251 let endpoint_ctx = NoOpComponentContext;
3252 let endpoint = component
3253 .create_endpoint(
3254 &format!("http://127.0.0.1:{port}/upload?maxRequestBody=100"),
3255 &endpoint_ctx,
3256 )
3257 .unwrap();
3258 let mut consumer = endpoint.create_consumer().unwrap();
3259
3260 let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3261 let token = tokio_util::sync::CancellationToken::new();
3262 let ctx = ConsumerContext::new(tx, token.clone());
3263
3264 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3265 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3266
3267 let client = reqwest::Client::new();
3268 let resp = client
3269 .post(format!("http://127.0.0.1:{port}/upload"))
3270 .header("Content-Length", "1000") .body("x".repeat(1000))
3272 .send()
3273 .await
3274 .unwrap();
3275
3276 assert_eq!(resp.status().as_u16(), 413);
3277
3278 token.cancel();
3279 }
3280
3281 #[tokio::test]
3285 async fn test_chunked_upload_without_content_length_bypasses_limit() {
3286 use bytes::Bytes;
3287 use camel_component_api::Body;
3288 use camel_component_api::ConsumerContext;
3289 use futures::stream;
3290
3291 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3292 let port = listener.local_addr().unwrap().port();
3293 drop(listener);
3294
3295 let component = HttpComponent::new();
3297 let endpoint_ctx = NoOpComponentContext;
3298 let endpoint = component
3299 .create_endpoint(
3300 &format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"),
3301 &endpoint_ctx,
3302 )
3303 .unwrap();
3304 let mut consumer = endpoint.create_consumer().unwrap();
3305
3306 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
3307 let token = tokio_util::sync::CancellationToken::new();
3308 let ctx = ConsumerContext::new(tx, token.clone());
3309
3310 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
3311 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3312
3313 let client = reqwest::Client::new();
3314
3315 let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
3319 Ok(Bytes::from("y".repeat(50))),
3320 Ok(Bytes::from("y".repeat(50))),
3321 ];
3322 let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
3323 let send_fut = client
3324 .post(format!("http://127.0.0.1:{port}/upload"))
3325 .body(stream_body)
3326 .send();
3327
3328 let consumer_fut = async {
3329 match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
3331 Ok(Some(mut envelope)) => {
3332 assert!(
3333 matches!(envelope.exchange.input.body, Body::Stream(_)),
3334 "expected Body::Stream"
3335 );
3336 envelope.exchange.input.body = camel_component_api::Body::Empty;
3337 if let Some(reply_tx) = envelope.reply_tx {
3338 let _ = reply_tx.send(Ok(envelope.exchange));
3339 }
3340 }
3341 Ok(None) => panic!("consumer channel closed unexpectedly"),
3342 Err(_) => {
3343 }
3346 }
3347 };
3348
3349 let (http_result, _) = tokio::join!(send_fut, consumer_fut);
3350
3351 let resp = http_result.unwrap();
3352 assert_ne!(
3354 resp.status().as_u16(),
3355 413,
3356 "chunked upload must not be rejected by maxRequestBody"
3357 );
3358 assert_eq!(resp.status().as_u16(), 200);
3359
3360 token.cancel();
3361 }
3362}