1pub mod config;
2pub use config::HttpConfig;
3
4use std::collections::HashMap;
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::{Arc, Mutex, OnceLock};
8use std::task::{Context, Poll};
9use std::time::Duration;
10
11use tokio::sync::{OnceCell, RwLock};
12use tower::Service;
13use tracing::debug;
14
15use axum::body::BodyDataStream;
16use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, StreamBody, StreamMetadata};
17use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
18use camel_component_api::{UriComponents, UriConfig, parse_uri};
19use futures::TryStreamExt;
20use futures::stream::BoxStream;
21
22#[derive(Debug, Clone)]
83pub struct HttpEndpointConfig {
84 pub base_url: String,
85 pub http_method: Option<String>,
86 pub throw_exception_on_failure: bool,
87 pub ok_status_code_range: (u16, u16),
88 pub response_timeout: Option<Duration>,
89 pub query_params: HashMap<String, String>,
90 pub allow_private_ips: bool,
91 pub blocked_hosts: Vec<String>,
92 pub max_body_size: usize,
93}
94
95const HTTP_CAMEL_OPTIONS: &[&str] = &[
97 "httpMethod",
98 "throwExceptionOnFailure",
99 "okStatusCodeRange",
100 "followRedirects",
101 "connectTimeout",
102 "responseTimeout",
103 "allowPrivateIps",
104 "blockedHosts",
105 "maxBodySize",
106];
107
108impl UriConfig for HttpEndpointConfig {
109 fn scheme() -> &'static str {
111 "http"
112 }
113
114 fn from_uri(uri: &str) -> Result<Self, CamelError> {
115 let parts = parse_uri(uri)?;
116 Self::from_components(parts)
117 }
118
119 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
120 if parts.scheme != "http" && parts.scheme != "https" {
122 return Err(CamelError::InvalidUri(format!(
123 "expected scheme 'http' or 'https', got '{}'",
124 parts.scheme
125 )));
126 }
127
128 let base_url = format!("{}:{}", parts.scheme, parts.path);
131
132 let http_method = parts.params.get("httpMethod").cloned();
133
134 let throw_exception_on_failure = parts
135 .params
136 .get("throwExceptionOnFailure")
137 .map(|v| v != "false")
138 .unwrap_or(true);
139
140 let ok_status_code_range = parts
142 .params
143 .get("okStatusCodeRange")
144 .and_then(|v| {
145 let (start, end) = v.split_once('-')?;
146 Some((start.parse::<u16>().ok()?, end.parse::<u16>().ok()?))
147 })
148 .unwrap_or((200, 299));
149
150 let response_timeout = parts
151 .params
152 .get("responseTimeout")
153 .and_then(|v| v.parse::<u64>().ok())
154 .map(Duration::from_millis);
155
156 let allow_private_ips = parts
158 .params
159 .get("allowPrivateIps")
160 .map(|v| v == "true")
161 .unwrap_or(false); let blocked_hosts = parts
165 .params
166 .get("blockedHosts")
167 .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
168 .unwrap_or_default();
169
170 let max_body_size = parts
171 .params
172 .get("maxBodySize")
173 .and_then(|v| v.parse::<usize>().ok())
174 .unwrap_or(10 * 1024 * 1024); let query_params: HashMap<String, String> = parts
178 .params
179 .into_iter()
180 .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
181 .collect();
182
183 Ok(Self {
184 base_url,
185 http_method,
186 throw_exception_on_failure,
187 ok_status_code_range,
188 response_timeout,
189 query_params,
190 allow_private_ips,
191 blocked_hosts,
192 max_body_size,
193 })
194 }
195}
196
197impl HttpEndpointConfig {
198 pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
199 let parts = parse_uri(uri)?;
200 let mut endpoint = Self::from_components(parts.clone())?;
201 if endpoint.response_timeout.is_none() {
202 endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
203 }
204 if !parts.params.contains_key("allowPrivateIps") {
205 endpoint.allow_private_ips = config.allow_private_ips;
206 }
207 if !parts.params.contains_key("blockedHosts") {
208 endpoint.blocked_hosts = config.blocked_hosts.clone();
209 }
210 if !parts.params.contains_key("maxBodySize") {
211 endpoint.max_body_size = config.max_body_size;
212 }
213 Ok(endpoint)
214 }
215}
216
217#[derive(Debug, Clone)]
223pub struct HttpServerConfig {
224 pub host: String,
226 pub port: u16,
228 pub path: String,
230 pub max_request_body: usize,
232 pub max_response_body: usize,
234}
235
236impl UriConfig for HttpServerConfig {
237 fn scheme() -> &'static str {
239 "http"
240 }
241
242 fn from_uri(uri: &str) -> Result<Self, CamelError> {
243 let parts = parse_uri(uri)?;
244 Self::from_components(parts)
245 }
246
247 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
248 if parts.scheme != "http" && parts.scheme != "https" {
250 return Err(CamelError::InvalidUri(format!(
251 "expected scheme 'http' or 'https', got '{}'",
252 parts.scheme
253 )));
254 }
255
256 let authority_and_path = parts.path.trim_start_matches('/');
259
260 let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
262 (&authority_and_path[..idx], &authority_and_path[idx..])
263 } else {
264 (authority_and_path, "/")
265 };
266
267 let path = if path_suffix.is_empty() {
268 "/"
269 } else {
270 path_suffix
271 }
272 .to_string();
273
274 let (host, port) = if let Some(colon) = authority.rfind(':') {
276 let port_str = &authority[colon + 1..];
277 match port_str.parse::<u16>() {
278 Ok(p) => (authority[..colon].to_string(), p),
279 Err(_) => {
280 return Err(CamelError::InvalidUri(format!(
281 "invalid port '{}' in authority",
282 port_str
283 )));
284 }
285 }
286 } else {
287 let default_port = if parts.scheme == "https" { 443 } else { 80 };
289 (authority.to_string(), default_port)
290 };
291
292 let max_request_body = parts
293 .params
294 .get("maxRequestBody")
295 .and_then(|v| v.parse::<usize>().ok())
296 .unwrap_or(2 * 1024 * 1024); let max_response_body = parts
299 .params
300 .get("maxResponseBody")
301 .and_then(|v| v.parse::<usize>().ok())
302 .unwrap_or(10 * 1024 * 1024); Ok(Self {
305 host,
306 port,
307 path,
308 max_request_body,
309 max_response_body,
310 })
311 }
312}
313
314impl HttpServerConfig {
315 pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
316 let parts = parse_uri(uri)?;
317 let mut server = Self::from_components(parts.clone())?;
318 if !parts.params.contains_key("maxRequestBody") {
319 server.max_request_body = config.max_request_body;
320 }
321 if !parts.params.contains_key("maxResponseBody") {
322 server.max_response_body = config.max_body_size;
323 }
324 Ok(server)
325 }
326}
327
328pub(crate) enum HttpReplyBody {
334 Bytes(bytes::Bytes),
335 Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
336}
337
338pub(crate) struct RequestEnvelope {
341 pub(crate) method: String,
342 pub(crate) path: String,
343 pub(crate) query: String,
344 pub(crate) headers: http::HeaderMap,
345 pub(crate) body: StreamBody,
346 pub(crate) reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
347}
348
349pub(crate) struct HttpReply {
351 pub(crate) status: u16,
352 pub(crate) headers: Vec<(String, String)>,
353 pub(crate) body: HttpReplyBody,
354}
355
356pub(crate) type DispatchTable =
362 Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
363
364struct ServerHandle {
366 dispatch: DispatchTable,
367 _task: tokio::task::JoinHandle<()>,
369}
370
371pub struct ServerRegistry {
373 inner: Mutex<HashMap<u16, Arc<OnceCell<ServerHandle>>>>,
374}
375
376impl ServerRegistry {
377 pub fn global() -> &'static Self {
379 static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
380 INSTANCE.get_or_init(|| ServerRegistry {
381 inner: Mutex::new(HashMap::new()),
382 })
383 }
384
385 pub(crate) async fn get_or_spawn(
388 &'static self,
389 host: &str,
390 port: u16,
391 max_request_body: usize,
392 ) -> Result<DispatchTable, CamelError> {
393 let host_owned = host.to_string();
394
395 let cell = {
396 let mut guard = self.inner.lock().map_err(|_| {
397 CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
398 })?;
399 guard
400 .entry(port)
401 .or_insert_with(|| Arc::new(OnceCell::new()))
402 .clone()
403 };
404
405 let handle = cell
406 .get_or_try_init(|| async {
407 let addr = format!("{host_owned}:{port}");
408 let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
409 CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
410 })?;
411 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
412 let task = tokio::spawn(run_axum_server(
413 listener,
414 Arc::clone(&dispatch),
415 max_request_body,
416 ));
417 Ok::<ServerHandle, CamelError>(ServerHandle {
418 dispatch,
419 _task: task,
420 })
421 })
422 .await?;
423
424 Ok(Arc::clone(&handle.dispatch))
425 }
426}
427
428use axum::{
433 Router,
434 body::Body as AxumBody,
435 extract::{Request, State},
436 http::{Response, StatusCode},
437 response::IntoResponse,
438};
439
440#[derive(Clone)]
441struct AppState {
442 dispatch: DispatchTable,
443 max_request_body: usize,
444}
445
446async fn run_axum_server(
447 listener: tokio::net::TcpListener,
448 dispatch: DispatchTable,
449 max_request_body: usize,
450) {
451 let state = AppState {
452 dispatch,
453 max_request_body,
454 };
455 let app = Router::new().fallback(dispatch_handler).with_state(state);
456
457 axum::serve(listener, app).await.unwrap_or_else(|e| {
458 tracing::error!(error = %e, "Axum server error");
459 });
460}
461
462async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
463 let method = req.method().to_string();
464 let path = req.uri().path().to_string();
465 let query = req.uri().query().unwrap_or("").to_string();
466 let headers = req.headers().clone();
467
468 let content_length: Option<u64> = headers
470 .get(http::header::CONTENT_LENGTH)
471 .and_then(|v| v.to_str().ok())
472 .and_then(|s| s.parse().ok());
473
474 if let Some(len) = content_length
475 && len > state.max_request_body as u64
476 {
477 return Response::builder()
478 .status(StatusCode::PAYLOAD_TOO_LARGE)
479 .body(AxumBody::from("Request body exceeds configured limit"))
480 .expect("infallible");
481 }
482
483 let content_type = headers
485 .get(http::header::CONTENT_TYPE)
486 .and_then(|v| v.to_str().ok())
487 .map(|s| s.to_string());
488
489 let data_stream: BodyDataStream = req.into_body().into_data_stream();
490 let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
491 let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
492
493 let stream_body = StreamBody {
494 stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
495 metadata: StreamMetadata {
496 size_hint: content_length,
497 content_type,
498 origin: None,
499 },
500 };
501
502 let sender = {
504 let table = state.dispatch.read().await;
505 table.get(&path).cloned()
506 };
507 let Some(sender) = sender else {
508 return Response::builder()
509 .status(StatusCode::NOT_FOUND)
510 .body(AxumBody::from("No consumer registered for this path"))
511 .expect("infallible");
512 };
513
514 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
515 let envelope = RequestEnvelope {
516 method,
517 path,
518 query,
519 headers,
520 body: stream_body,
521 reply_tx,
522 };
523
524 if sender.send(envelope).await.is_err() {
525 return Response::builder()
526 .status(StatusCode::SERVICE_UNAVAILABLE)
527 .body(AxumBody::from("Consumer unavailable"))
528 .expect("infallible");
529 }
530
531 match reply_rx.await {
532 Ok(reply) => {
533 let status =
534 StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
535 let mut builder = Response::builder().status(status);
536 for (k, v) in &reply.headers {
537 builder = builder.header(k.as_str(), v.as_str());
538 }
539 match reply.body {
540 HttpReplyBody::Bytes(b) => builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
541 Response::builder()
542 .status(StatusCode::INTERNAL_SERVER_ERROR)
543 .body(AxumBody::from("Invalid response headers from consumer"))
544 .expect("infallible")
545 }),
546 HttpReplyBody::Stream(stream) => builder
547 .body(AxumBody::from_stream(stream))
548 .unwrap_or_else(|_| {
549 Response::builder()
550 .status(StatusCode::INTERNAL_SERVER_ERROR)
551 .body(AxumBody::from("Invalid response headers from consumer"))
552 .expect("infallible")
553 }),
554 }
555 }
556 Err(_) => Response::builder()
557 .status(StatusCode::INTERNAL_SERVER_ERROR)
558 .body(AxumBody::from("Pipeline error"))
559 .expect("Response::builder() with a known-valid status code and body is infallible"),
560 }
561}
562
563pub struct HttpConsumer {
568 config: HttpServerConfig,
569}
570
571impl HttpConsumer {
572 pub fn new(config: HttpServerConfig) -> Self {
573 Self { config }
574 }
575}
576
577#[async_trait::async_trait]
578impl Consumer for HttpConsumer {
579 async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
580 use camel_component_api::{Body, Exchange, Message};
581
582 let dispatch = ServerRegistry::global()
583 .get_or_spawn(
584 &self.config.host,
585 self.config.port,
586 self.config.max_request_body,
587 )
588 .await?;
589
590 let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
592 {
593 let mut table = dispatch.write().await;
594 table.insert(self.config.path.clone(), env_tx);
595 }
596
597 let path = self.config.path.clone();
598 let cancel_token = ctx.cancel_token();
599 let _max_response_body = self.config.max_response_body;
600
601 loop {
602 tokio::select! {
603 _ = ctx.cancelled() => {
604 break;
605 }
606 envelope = env_rx.recv() => {
607 let Some(envelope) = envelope else { break; };
608
609 let mut msg = Message::default();
611
612 msg.set_header("CamelHttpMethod",
614 serde_json::Value::String(envelope.method.clone()));
615 msg.set_header("CamelHttpPath",
616 serde_json::Value::String(envelope.path.clone()));
617 msg.set_header("CamelHttpQuery",
618 serde_json::Value::String(envelope.query.clone()));
619
620 for (k, v) in &envelope.headers {
622 if let Ok(val_str) = v.to_str() {
623 msg.set_header(
624 k.as_str(),
625 serde_json::Value::String(val_str.to_string()),
626 );
627 }
628 }
629
630 msg.body = Body::Stream(envelope.body);
633
634 #[allow(unused_mut)]
635 let mut exchange = Exchange::new(msg);
636
637 #[cfg(feature = "otel")]
639 {
640 let headers: HashMap<String, String> = envelope
641 .headers
642 .iter()
643 .filter_map(|(k, v)| {
644 Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
645 })
646 .collect();
647 camel_otel::extract_into_exchange(&mut exchange, &headers);
648 }
649
650 let reply_tx = envelope.reply_tx;
651 let sender = ctx.sender().clone();
652 let path_clone = path.clone();
653 let cancel = cancel_token.clone();
654
655 tokio::spawn(async move {
675 if cancel.is_cancelled() {
683 let _ = reply_tx.send(HttpReply {
684 status: 503,
685 headers: vec![],
686 body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
687 });
688 return;
689 }
690
691 let (tx, rx) = tokio::sync::oneshot::channel();
693 let envelope = camel_component_api::consumer::ExchangeEnvelope {
694 exchange,
695 reply_tx: Some(tx),
696 };
697
698 let result = match sender.send(envelope).await {
699 Ok(()) => rx.await.map_err(|_| camel_component_api::CamelError::ChannelClosed),
700 Err(_) => Err(camel_component_api::CamelError::ChannelClosed),
701 }
702 .and_then(|r| r);
703
704 let reply = match result {
705 Ok(out) => {
706 let status = out
707 .input
708 .header("CamelHttpResponseCode")
709 .and_then(|v| v.as_u64())
710 .map(|s| s as u16)
711 .unwrap_or(200);
712
713 let reply_body: HttpReplyBody = match out.input.body {
714 Body::Empty => HttpReplyBody::Bytes(bytes::Bytes::new()),
715 Body::Bytes(b) => HttpReplyBody::Bytes(b),
716 Body::Text(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
717 Body::Xml(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
718 Body::Json(v) => HttpReplyBody::Bytes(bytes::Bytes::from(
719 v.to_string().into_bytes(),
720 )),
721 Body::Stream(s) => {
722 match s.stream.lock().await.take() {
723 Some(stream) => HttpReplyBody::Stream(stream),
724 None => {
725 tracing::error!(
726 "Body::Stream already consumed before HTTP reply — returning 500"
727 );
728 let error_reply = HttpReply {
729 status: 500,
730 headers: vec![],
731 body: HttpReplyBody::Bytes(bytes::Bytes::new()),
732 };
733 if reply_tx.send(error_reply).is_err() {
734 debug!("reply_tx dropped before error reply could be sent");
735 }
736 return;
737 }
738 }
739 }
740 };
741
742 let resp_headers: Vec<(String, String)> = out
743 .input
744 .headers
745 .iter()
746 .filter(|(k, _)| !k.starts_with("Camel"))
748 .filter(|(k, _)| {
751 !matches!(
752 k.to_lowercase().as_str(),
753 "content-length" | "content-type" | "transfer-encoding" | "connection" | "cache-control" | "date" | "pragma" | "trailer" | "upgrade" | "via" | "warning" | "host" | "user-agent" | "accept" | "accept-encoding" | "accept-language" | "accept-charset" | "authorization" | "proxy-authorization" | "cookie" | "expect" | "from" | "if-match" | "if-modified-since" | "if-none-match" | "if-range" | "if-unmodified-since" | "max-forwards" | "proxy-connection" | "range" | "referer" | "te" )
788 })
789 .filter_map(|(k, v)| {
790 v.as_str().map(|s| (k.clone(), s.to_string()))
791 })
792 .collect();
793
794 HttpReply {
795 status,
796 headers: resp_headers,
797 body: reply_body,
798 }
799 }
800 Err(e) => {
801 tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
802 HttpReply {
803 status: 500,
804 headers: vec![],
805 body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
806 }
807 }
808 };
809
810 let _ = reply_tx.send(reply);
812 });
813 }
814 }
815 }
816
817 {
819 let mut table = dispatch.write().await;
820 table.remove(&path);
821 }
822
823 Ok(())
824 }
825
826 async fn stop(&mut self) -> Result<(), CamelError> {
827 Ok(())
828 }
829
830 fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
831 camel_component_api::ConcurrencyModel::Concurrent { max: None }
832 }
833}
834
835pub struct HttpComponent {
840 client: reqwest::Client,
841 config: HttpConfig,
842}
843
844fn build_client(config: &HttpConfig) -> reqwest::Client {
845 let mut builder = reqwest::Client::builder()
846 .connect_timeout(Duration::from_millis(config.connect_timeout_ms))
847 .pool_max_idle_per_host(config.pool_max_idle_per_host)
848 .pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
849
850 if !config.follow_redirects {
851 builder = builder.redirect(reqwest::redirect::Policy::none());
852 }
853
854 builder
855 .build()
856 .expect("reqwest::Client::build() with valid config should not fail")
857}
858
859impl HttpComponent {
860 pub fn new() -> Self {
861 let config = HttpConfig::default();
862 let client = build_client(&config);
863 Self { client, config }
864 }
865
866 pub fn with_config(config: HttpConfig) -> Self {
867 let client = build_client(&config);
868 Self { client, config }
869 }
870
871 pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
872 match config {
873 Some(cfg) => Self::with_config(cfg),
874 None => Self::new(),
875 }
876 }
877}
878
879impl Default for HttpComponent {
880 fn default() -> Self {
881 Self::new()
882 }
883}
884
885impl Component for HttpComponent {
886 fn scheme(&self) -> &str {
887 "http"
888 }
889
890 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
891 let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
892 let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
893 Ok(Box::new(HttpEndpoint {
894 uri: uri.to_string(),
895 config,
896 server_config,
897 client: self.client.clone(),
898 }))
899 }
900}
901
902pub struct HttpsComponent {
903 client: reqwest::Client,
904 config: HttpConfig,
905}
906
907impl HttpsComponent {
908 pub fn new() -> Self {
909 let config = HttpConfig::default();
910 let client = build_client(&config);
911 Self { client, config }
912 }
913
914 pub fn with_config(config: HttpConfig) -> Self {
915 let client = build_client(&config);
916 Self { client, config }
917 }
918
919 pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
920 match config {
921 Some(cfg) => Self::with_config(cfg),
922 None => Self::new(),
923 }
924 }
925}
926
927impl Default for HttpsComponent {
928 fn default() -> Self {
929 Self::new()
930 }
931}
932
933impl Component for HttpsComponent {
934 fn scheme(&self) -> &str {
935 "https"
936 }
937
938 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
939 let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
940 let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
941 Ok(Box::new(HttpEndpoint {
942 uri: uri.to_string(),
943 config,
944 server_config,
945 client: self.client.clone(),
946 }))
947 }
948}
949
950struct HttpEndpoint {
955 uri: String,
956 config: HttpEndpointConfig,
957 server_config: HttpServerConfig,
958 client: reqwest::Client,
959}
960
961impl Endpoint for HttpEndpoint {
962 fn uri(&self) -> &str {
963 &self.uri
964 }
965
966 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
967 Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
968 }
969
970 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
971 Ok(BoxProcessor::new(HttpProducer {
972 config: Arc::new(self.config.clone()),
973 client: self.client.clone(),
974 }))
975 }
976}
977
978fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
983 let parsed = url::Url::parse(url)
984 .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
985
986 if let Some(host) = parsed.host_str()
988 && config.blocked_hosts.iter().any(|blocked| host == blocked)
989 {
990 return Err(CamelError::ProcessorError(format!(
991 "Host '{}' is blocked",
992 host
993 )));
994 }
995
996 if !config.allow_private_ips
998 && let Some(host) = parsed.host()
999 {
1000 match host {
1001 url::Host::Ipv4(ip) => {
1002 if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
1003 return Err(CamelError::ProcessorError(format!(
1004 "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
1005 ip
1006 )));
1007 }
1008 }
1009 url::Host::Ipv6(ip) => {
1010 if ip.is_loopback() {
1011 return Err(CamelError::ProcessorError(format!(
1012 "Loopback IP '{}' not allowed",
1013 ip
1014 )));
1015 }
1016 }
1017 url::Host::Domain(domain) => {
1018 let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
1020 if blocked_domains.contains(&domain) {
1021 return Err(CamelError::ProcessorError(format!(
1022 "Domain '{}' is not allowed",
1023 domain
1024 )));
1025 }
1026 }
1027 }
1028 }
1029
1030 Ok(())
1031}
1032
1033#[derive(Clone)]
1038struct HttpProducer {
1039 config: Arc<HttpEndpointConfig>,
1040 client: reqwest::Client,
1041}
1042
1043impl HttpProducer {
1044 fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1045 if let Some(ref method) = config.http_method {
1046 return method.to_uppercase();
1047 }
1048 if let Some(method) = exchange
1049 .input
1050 .header("CamelHttpMethod")
1051 .and_then(|v| v.as_str())
1052 {
1053 return method.to_uppercase();
1054 }
1055 if !exchange.input.body.is_empty() {
1056 return "POST".to_string();
1057 }
1058 "GET".to_string()
1059 }
1060
1061 fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1062 if let Some(uri) = exchange
1063 .input
1064 .header("CamelHttpUri")
1065 .and_then(|v| v.as_str())
1066 {
1067 let mut url = uri.to_string();
1068 if let Some(path) = exchange
1069 .input
1070 .header("CamelHttpPath")
1071 .and_then(|v| v.as_str())
1072 {
1073 if !url.ends_with('/') && !path.starts_with('/') {
1074 url.push('/');
1075 }
1076 url.push_str(path);
1077 }
1078 if let Some(query) = exchange
1079 .input
1080 .header("CamelHttpQuery")
1081 .and_then(|v| v.as_str())
1082 {
1083 url.push('?');
1084 url.push_str(query);
1085 }
1086 return url;
1087 }
1088
1089 let mut url = config.base_url.clone();
1090
1091 if let Some(path) = exchange
1092 .input
1093 .header("CamelHttpPath")
1094 .and_then(|v| v.as_str())
1095 {
1096 if !url.ends_with('/') && !path.starts_with('/') {
1097 url.push('/');
1098 }
1099 url.push_str(path);
1100 }
1101
1102 if let Some(query) = exchange
1103 .input
1104 .header("CamelHttpQuery")
1105 .and_then(|v| v.as_str())
1106 {
1107 url.push('?');
1108 url.push_str(query);
1109 } else if !config.query_params.is_empty() {
1110 url.push('?');
1112 let query_string: String = config
1113 .query_params
1114 .iter()
1115 .map(|(k, v)| format!("{k}={v}"))
1116 .collect::<Vec<_>>()
1117 .join("&");
1118 url.push_str(&query_string);
1119 }
1120
1121 url
1122 }
1123
1124 fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1125 status >= range.0 && status <= range.1
1126 }
1127}
1128
1129impl Service<Exchange> for HttpProducer {
1130 type Response = Exchange;
1131 type Error = CamelError;
1132 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1133
1134 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1135 Poll::Ready(Ok(()))
1136 }
1137
1138 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1139 let config = self.config.clone();
1140 let client = self.client.clone();
1141
1142 Box::pin(async move {
1143 let method_str = HttpProducer::resolve_method(&exchange, &config);
1144 let url = HttpProducer::resolve_url(&exchange, &config);
1145
1146 validate_url_for_ssrf(&url, &config)?;
1148
1149 debug!(
1150 correlation_id = %exchange.correlation_id(),
1151 method = %method_str,
1152 url = %url,
1153 "HTTP request"
1154 );
1155
1156 let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1157 CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1158 })?;
1159
1160 let mut request = client.request(method, &url);
1161
1162 if let Some(timeout) = config.response_timeout {
1163 request = request.timeout(timeout);
1164 }
1165
1166 #[cfg(feature = "otel")]
1168 {
1169 let mut otel_headers = HashMap::new();
1170 camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1171 for (k, v) in otel_headers {
1172 if let (Ok(name), Ok(val)) = (
1173 reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1174 reqwest::header::HeaderValue::from_str(&v),
1175 ) {
1176 request = request.header(name, val);
1177 }
1178 }
1179 }
1180
1181 for (key, value) in &exchange.input.headers {
1182 if !key.starts_with("Camel")
1183 && let Some(val_str) = value.as_str()
1184 && let (Ok(name), Ok(val)) = (
1185 reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1186 reqwest::header::HeaderValue::from_str(val_str),
1187 )
1188 {
1189 request = request.header(name, val);
1190 }
1191 }
1192
1193 match exchange.input.body {
1194 Body::Stream(ref s) => {
1195 let mut stream_lock = s.stream.lock().await;
1196 if let Some(stream) = stream_lock.take() {
1197 request = request.body(reqwest::Body::wrap_stream(stream));
1198 } else {
1199 return Err(CamelError::AlreadyConsumed);
1200 }
1201 }
1202 _ => {
1203 let body = std::mem::take(&mut exchange.input.body);
1205 let bytes = body.into_bytes(config.max_body_size).await?;
1206 if !bytes.is_empty() {
1207 request = request.body(bytes);
1208 }
1209 }
1210 }
1211
1212 let response = request
1213 .send()
1214 .await
1215 .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1216
1217 let status_code = response.status().as_u16();
1218 let status_text = response
1219 .status()
1220 .canonical_reason()
1221 .unwrap_or("Unknown")
1222 .to_string();
1223
1224 for (key, value) in response.headers() {
1225 if let Ok(val_str) = value.to_str() {
1226 exchange
1227 .input
1228 .set_header(key.as_str(), serde_json::Value::String(val_str.to_string()));
1229 }
1230 }
1231
1232 exchange.input.set_header(
1233 "CamelHttpResponseCode",
1234 serde_json::Value::Number(status_code.into()),
1235 );
1236 exchange.input.set_header(
1237 "CamelHttpResponseText",
1238 serde_json::Value::String(status_text.clone()),
1239 );
1240
1241 let response_body = response.bytes().await.map_err(|e| {
1242 CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1243 })?;
1244
1245 if config.throw_exception_on_failure
1246 && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1247 {
1248 return Err(CamelError::HttpOperationFailed {
1249 method: method_str,
1250 url,
1251 status_code,
1252 status_text,
1253 response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1254 });
1255 }
1256
1257 if !response_body.is_empty() {
1258 exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1259 }
1260
1261 debug!(
1262 correlation_id = %exchange.correlation_id(),
1263 status = status_code,
1264 url = %url,
1265 "HTTP response"
1266 );
1267 Ok(exchange)
1268 })
1269 }
1270}
1271
1272#[cfg(test)]
1273mod tests {
1274 use super::*;
1275 use camel_component_api::Message;
1276 use std::sync::Arc;
1277 use std::time::Duration;
1278
1279 fn test_producer_ctx() -> ProducerContext {
1280 ProducerContext::new()
1281 }
1282
1283 #[test]
1284 fn test_http_config_defaults() {
1285 let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1286 assert_eq!(config.base_url, "http://localhost:8080/api");
1287 assert!(config.http_method.is_none());
1288 assert!(config.throw_exception_on_failure);
1289 assert_eq!(config.ok_status_code_range, (200, 299));
1290 assert!(config.response_timeout.is_none());
1291 }
1292
1293 #[test]
1294 fn test_http_config_scheme() {
1295 assert_eq!(HttpEndpointConfig::scheme(), "http");
1297 }
1298
1299 #[test]
1300 fn test_http_config_from_components() {
1301 let components = camel_component_api::UriComponents {
1303 scheme: "https".to_string(),
1304 path: "//api.example.com/v1".to_string(),
1305 params: std::collections::HashMap::from([(
1306 "httpMethod".to_string(),
1307 "POST".to_string(),
1308 )]),
1309 };
1310 let config = HttpEndpointConfig::from_components(components).unwrap();
1311 assert_eq!(config.base_url, "https://api.example.com/v1");
1312 assert_eq!(config.http_method, Some("POST".to_string()));
1313 }
1314
1315 #[test]
1316 fn test_http_config_with_options() {
1317 let config = HttpEndpointConfig::from_uri(
1318 "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1319 ).unwrap();
1320 assert_eq!(config.base_url, "https://api.example.com/v1");
1321 assert_eq!(config.http_method, Some("PUT".to_string()));
1322 assert!(!config.throw_exception_on_failure);
1323 assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1324 }
1325
1326 #[test]
1327 fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
1328 let config = HttpConfig::default()
1329 .with_response_timeout_ms(999)
1330 .with_allow_private_ips(true)
1331 .with_blocked_hosts(vec!["evil.com".to_string()])
1332 .with_max_body_size(12345);
1333 let endpoint =
1334 HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
1335 assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
1336 assert!(endpoint.allow_private_ips);
1337 assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
1338 assert_eq!(endpoint.max_body_size, 12345);
1339 }
1340
1341 #[test]
1342 fn test_from_uri_with_defaults_uri_overrides_config() {
1343 let config = HttpConfig::default()
1344 .with_response_timeout_ms(999)
1345 .with_allow_private_ips(true)
1346 .with_blocked_hosts(vec!["evil.com".to_string()])
1347 .with_max_body_size(12345);
1348 let endpoint = HttpEndpointConfig::from_uri_with_defaults(
1349 "http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
1350 &config,
1351 )
1352 .unwrap();
1353 assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
1354 assert!(!endpoint.allow_private_ips);
1355 assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
1356 assert_eq!(endpoint.max_body_size, 99);
1357 }
1358
1359 #[test]
1360 fn test_http_config_ok_status_range() {
1361 let config =
1362 HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1363 assert_eq!(config.ok_status_code_range, (200, 204));
1364 }
1365
1366 #[test]
1367 fn test_http_config_wrong_scheme() {
1368 let result = HttpEndpointConfig::from_uri("file:/tmp");
1369 assert!(result.is_err());
1370 }
1371
1372 #[test]
1373 fn test_http_component_scheme() {
1374 let component = HttpComponent::new();
1375 assert_eq!(component.scheme(), "http");
1376 }
1377
1378 #[test]
1379 fn test_https_component_scheme() {
1380 let component = HttpsComponent::new();
1381 assert_eq!(component.scheme(), "https");
1382 }
1383
1384 #[test]
1385 fn test_http_endpoint_creates_consumer() {
1386 let component = HttpComponent::new();
1387 let endpoint = component
1388 .create_endpoint("http://0.0.0.0:19100/test")
1389 .unwrap();
1390 assert!(endpoint.create_consumer().is_ok());
1391 }
1392
1393 #[test]
1394 fn test_https_endpoint_creates_consumer() {
1395 let component = HttpsComponent::new();
1396 let endpoint = component
1397 .create_endpoint("https://0.0.0.0:8443/test")
1398 .unwrap();
1399 assert!(endpoint.create_consumer().is_ok());
1400 }
1401
1402 #[test]
1403 fn test_http_endpoint_creates_producer() {
1404 let ctx = test_producer_ctx();
1405 let component = HttpComponent::new();
1406 let endpoint = component.create_endpoint("http://localhost/api").unwrap();
1407 assert!(endpoint.create_producer(&ctx).is_ok());
1408 }
1409
1410 async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
1415 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1416 let addr = listener.local_addr().unwrap();
1417 let url = format!("http://127.0.0.1:{}", addr.port());
1418
1419 let handle = tokio::spawn(async move {
1420 loop {
1421 if let Ok((mut stream, _)) = listener.accept().await {
1422 tokio::spawn(async move {
1423 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1424 let mut buf = vec![0u8; 4096];
1425 let n = stream.read(&mut buf).await.unwrap_or(0);
1426 let request = String::from_utf8_lossy(&buf[..n]).to_string();
1427
1428 let method = request.split_whitespace().next().unwrap_or("GET");
1429
1430 let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
1431 let response = format!(
1432 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
1433 body.len(),
1434 body
1435 );
1436 let _ = stream.write_all(response.as_bytes()).await;
1437 });
1438 }
1439 }
1440 });
1441
1442 (url, handle)
1443 }
1444
1445 async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
1446 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1447 let addr = listener.local_addr().unwrap();
1448 let url = format!("http://127.0.0.1:{}", addr.port());
1449
1450 let handle = tokio::spawn(async move {
1451 loop {
1452 if let Ok((mut stream, _)) = listener.accept().await {
1453 let status = status;
1454 tokio::spawn(async move {
1455 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1456 let mut buf = vec![0u8; 4096];
1457 let _ = stream.read(&mut buf).await;
1458
1459 let status_text = match status {
1460 404 => "Not Found",
1461 500 => "Internal Server Error",
1462 _ => "Error",
1463 };
1464 let body = "error body";
1465 let response = format!(
1466 "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
1467 status,
1468 status_text,
1469 body.len(),
1470 body
1471 );
1472 let _ = stream.write_all(response.as_bytes()).await;
1473 });
1474 }
1475 }
1476 });
1477
1478 (url, handle)
1479 }
1480
1481 #[tokio::test]
1482 async fn test_http_producer_get_request() {
1483 use tower::ServiceExt;
1484
1485 let (url, _handle) = start_test_server().await;
1486 let ctx = test_producer_ctx();
1487
1488 let component = HttpComponent::new();
1489 let endpoint = component
1490 .create_endpoint(&format!("{url}/api/test?allowPrivateIps=true"))
1491 .unwrap();
1492 let producer = endpoint.create_producer(&ctx).unwrap();
1493
1494 let exchange = Exchange::new(Message::default());
1495 let result = producer.oneshot(exchange).await.unwrap();
1496
1497 let status = result
1498 .input
1499 .header("CamelHttpResponseCode")
1500 .and_then(|v| v.as_u64())
1501 .unwrap();
1502 assert_eq!(status, 200);
1503
1504 assert!(!result.input.body.is_empty());
1505 }
1506
1507 #[tokio::test]
1508 async fn test_http_producer_post_with_body() {
1509 use tower::ServiceExt;
1510
1511 let (url, _handle) = start_test_server().await;
1512 let ctx = test_producer_ctx();
1513
1514 let component = HttpComponent::new();
1515 let endpoint = component
1516 .create_endpoint(&format!("{url}/api/data?allowPrivateIps=true"))
1517 .unwrap();
1518 let producer = endpoint.create_producer(&ctx).unwrap();
1519
1520 let exchange = Exchange::new(Message::new("request body"));
1521 let result = producer.oneshot(exchange).await.unwrap();
1522
1523 let status = result
1524 .input
1525 .header("CamelHttpResponseCode")
1526 .and_then(|v| v.as_u64())
1527 .unwrap();
1528 assert_eq!(status, 200);
1529 }
1530
1531 #[tokio::test]
1532 async fn test_http_producer_method_from_header() {
1533 use tower::ServiceExt;
1534
1535 let (url, _handle) = start_test_server().await;
1536 let ctx = test_producer_ctx();
1537
1538 let component = HttpComponent::new();
1539 let endpoint = component
1540 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1541 .unwrap();
1542 let producer = endpoint.create_producer(&ctx).unwrap();
1543
1544 let mut exchange = Exchange::new(Message::default());
1545 exchange.input.set_header(
1546 "CamelHttpMethod",
1547 serde_json::Value::String("DELETE".to_string()),
1548 );
1549
1550 let result = producer.oneshot(exchange).await.unwrap();
1551 let status = result
1552 .input
1553 .header("CamelHttpResponseCode")
1554 .and_then(|v| v.as_u64())
1555 .unwrap();
1556 assert_eq!(status, 200);
1557 }
1558
1559 #[tokio::test]
1560 async fn test_http_producer_forced_method() {
1561 use tower::ServiceExt;
1562
1563 let (url, _handle) = start_test_server().await;
1564 let ctx = test_producer_ctx();
1565
1566 let component = HttpComponent::new();
1567 let endpoint = component
1568 .create_endpoint(&format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"))
1569 .unwrap();
1570 let producer = endpoint.create_producer(&ctx).unwrap();
1571
1572 let exchange = Exchange::new(Message::default());
1573 let result = producer.oneshot(exchange).await.unwrap();
1574
1575 let status = result
1576 .input
1577 .header("CamelHttpResponseCode")
1578 .and_then(|v| v.as_u64())
1579 .unwrap();
1580 assert_eq!(status, 200);
1581 }
1582
1583 #[tokio::test]
1584 async fn test_http_producer_throw_exception_on_failure() {
1585 use tower::ServiceExt;
1586
1587 let (url, _handle) = start_status_server(404).await;
1588 let ctx = test_producer_ctx();
1589
1590 let component = HttpComponent::new();
1591 let endpoint = component
1592 .create_endpoint(&format!("{url}/not-found?allowPrivateIps=true"))
1593 .unwrap();
1594 let producer = endpoint.create_producer(&ctx).unwrap();
1595
1596 let exchange = Exchange::new(Message::default());
1597 let result = producer.oneshot(exchange).await;
1598 assert!(result.is_err());
1599
1600 match result.unwrap_err() {
1601 CamelError::HttpOperationFailed { status_code, .. } => {
1602 assert_eq!(status_code, 404);
1603 }
1604 e => panic!("Expected HttpOperationFailed, got: {e}"),
1605 }
1606 }
1607
1608 #[tokio::test]
1609 async fn test_http_producer_no_throw_on_failure() {
1610 use tower::ServiceExt;
1611
1612 let (url, _handle) = start_status_server(500).await;
1613 let ctx = test_producer_ctx();
1614
1615 let component = HttpComponent::new();
1616 let endpoint = component
1617 .create_endpoint(&format!(
1618 "{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"
1619 ))
1620 .unwrap();
1621 let producer = endpoint.create_producer(&ctx).unwrap();
1622
1623 let exchange = Exchange::new(Message::default());
1624 let result = producer.oneshot(exchange).await.unwrap();
1625
1626 let status = result
1627 .input
1628 .header("CamelHttpResponseCode")
1629 .and_then(|v| v.as_u64())
1630 .unwrap();
1631 assert_eq!(status, 500);
1632 }
1633
1634 #[tokio::test]
1635 async fn test_http_producer_uri_override() {
1636 use tower::ServiceExt;
1637
1638 let (url, _handle) = start_test_server().await;
1639 let ctx = test_producer_ctx();
1640
1641 let component = HttpComponent::new();
1642 let endpoint = component
1643 .create_endpoint("http://localhost:1/does-not-exist?allowPrivateIps=true")
1644 .unwrap();
1645 let producer = endpoint.create_producer(&ctx).unwrap();
1646
1647 let mut exchange = Exchange::new(Message::default());
1648 exchange.input.set_header(
1649 "CamelHttpUri",
1650 serde_json::Value::String(format!("{url}/api")),
1651 );
1652
1653 let result = producer.oneshot(exchange).await.unwrap();
1654 let status = result
1655 .input
1656 .header("CamelHttpResponseCode")
1657 .and_then(|v| v.as_u64())
1658 .unwrap();
1659 assert_eq!(status, 200);
1660 }
1661
1662 #[tokio::test]
1663 async fn test_http_producer_response_headers_mapped() {
1664 use tower::ServiceExt;
1665
1666 let (url, _handle) = start_test_server().await;
1667 let ctx = test_producer_ctx();
1668
1669 let component = HttpComponent::new();
1670 let endpoint = component
1671 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1672 .unwrap();
1673 let producer = endpoint.create_producer(&ctx).unwrap();
1674
1675 let exchange = Exchange::new(Message::default());
1676 let result = producer.oneshot(exchange).await.unwrap();
1677
1678 assert!(
1679 result.input.header("content-type").is_some()
1680 || result.input.header("Content-Type").is_some()
1681 );
1682 assert!(result.input.header("CamelHttpResponseText").is_some());
1683 }
1684
1685 async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
1690 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1691 let addr = listener.local_addr().unwrap();
1692 let url = format!("http://127.0.0.1:{}", addr.port());
1693
1694 let handle = tokio::spawn(async move {
1695 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1696 loop {
1697 if let Ok((mut stream, _)) = listener.accept().await {
1698 tokio::spawn(async move {
1699 let mut buf = vec![0u8; 4096];
1700 let n = stream.read(&mut buf).await.unwrap_or(0);
1701 let request = String::from_utf8_lossy(&buf[..n]).to_string();
1702
1703 if request.contains("GET /final") {
1705 let body = r#"{"status":"final"}"#;
1706 let response = format!(
1707 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1708 body.len(),
1709 body
1710 );
1711 let _ = stream.write_all(response.as_bytes()).await;
1712 } else {
1713 let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
1715 let _ = stream.write_all(response.as_bytes()).await;
1716 }
1717 });
1718 }
1719 }
1720 });
1721
1722 (url, handle)
1723 }
1724
1725 #[tokio::test]
1726 async fn test_follow_redirects_false_does_not_follow() {
1727 use tower::ServiceExt;
1728
1729 let (url, _handle) = start_redirect_server().await;
1730 let ctx = test_producer_ctx();
1731
1732 let component =
1733 HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
1734 let endpoint = component
1735 .create_endpoint(&format!(
1736 "{url}?throwExceptionOnFailure=false&allowPrivateIps=true"
1737 ))
1738 .unwrap();
1739 let producer = endpoint.create_producer(&ctx).unwrap();
1740
1741 let exchange = Exchange::new(Message::default());
1742 let result = producer.oneshot(exchange).await.unwrap();
1743
1744 let status = result
1746 .input
1747 .header("CamelHttpResponseCode")
1748 .and_then(|v| v.as_u64())
1749 .unwrap();
1750 assert_eq!(
1751 status, 302,
1752 "Should NOT follow redirect when followRedirects=false"
1753 );
1754 }
1755
1756 #[tokio::test]
1757 async fn test_follow_redirects_true_follows_redirect() {
1758 use tower::ServiceExt;
1759
1760 let (url, _handle) = start_redirect_server().await;
1761 let ctx = test_producer_ctx();
1762
1763 let component =
1764 HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
1765 let endpoint = component
1766 .create_endpoint(&format!("{url}?allowPrivateIps=true"))
1767 .unwrap();
1768 let producer = endpoint.create_producer(&ctx).unwrap();
1769
1770 let exchange = Exchange::new(Message::default());
1771 let result = producer.oneshot(exchange).await.unwrap();
1772
1773 let status = result
1775 .input
1776 .header("CamelHttpResponseCode")
1777 .and_then(|v| v.as_u64())
1778 .unwrap();
1779 assert_eq!(
1780 status, 200,
1781 "Should follow redirect when followRedirects=true"
1782 );
1783 }
1784
1785 #[tokio::test]
1786 async fn test_query_params_forwarded_to_http_request() {
1787 use tower::ServiceExt;
1788
1789 let (url, _handle) = start_test_server().await;
1790 let ctx = test_producer_ctx();
1791
1792 let component = HttpComponent::new();
1793 let endpoint = component
1795 .create_endpoint(&format!(
1796 "{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"
1797 ))
1798 .unwrap();
1799 let producer = endpoint.create_producer(&ctx).unwrap();
1800
1801 let exchange = Exchange::new(Message::default());
1802 let result = producer.oneshot(exchange).await.unwrap();
1803
1804 let status = result
1807 .input
1808 .header("CamelHttpResponseCode")
1809 .and_then(|v| v.as_u64())
1810 .unwrap();
1811 assert_eq!(status, 200);
1812 }
1813
1814 #[tokio::test]
1815 async fn test_non_camel_query_params_are_forwarded() {
1816 let config = HttpEndpointConfig::from_uri(
1819 "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
1820 )
1821 .unwrap();
1822
1823 assert!(
1825 config.query_params.contains_key("apiKey"),
1826 "apiKey should be preserved"
1827 );
1828 assert!(
1829 config.query_params.contains_key("token"),
1830 "token should be preserved"
1831 );
1832 assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
1833 assert_eq!(config.query_params.get("token").unwrap(), "abc456");
1834
1835 assert!(
1837 !config.query_params.contains_key("httpMethod"),
1838 "httpMethod should not be forwarded"
1839 );
1840 }
1841
1842 #[tokio::test]
1847 async fn test_http_producer_blocks_metadata_endpoint() {
1848 use tower::ServiceExt;
1849
1850 let ctx = test_producer_ctx();
1851 let component = HttpComponent::new();
1852 let endpoint = component
1853 .create_endpoint("http://example.com/api?allowPrivateIps=false")
1854 .unwrap();
1855 let producer = endpoint.create_producer(&ctx).unwrap();
1856
1857 let mut exchange = Exchange::new(Message::default());
1858 exchange.input.set_header(
1859 "CamelHttpUri",
1860 serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
1861 );
1862
1863 let result = producer.oneshot(exchange).await;
1864 assert!(result.is_err(), "Should block AWS metadata endpoint");
1865
1866 let err = result.unwrap_err();
1867 assert!(
1868 err.to_string().contains("Private IP"),
1869 "Error should mention private IP blocking, got: {}",
1870 err
1871 );
1872 }
1873
1874 #[test]
1875 fn test_ssrf_config_defaults() {
1876 let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
1877 assert!(
1878 !config.allow_private_ips,
1879 "Private IPs should be blocked by default"
1880 );
1881 assert!(
1882 config.blocked_hosts.is_empty(),
1883 "Blocked hosts should be empty by default"
1884 );
1885 }
1886
1887 #[test]
1888 fn test_ssrf_config_allow_private_ips() {
1889 let config =
1890 HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
1891 assert!(
1892 config.allow_private_ips,
1893 "Private IPs should be allowed when explicitly set"
1894 );
1895 }
1896
1897 #[test]
1898 fn test_ssrf_config_blocked_hosts() {
1899 let config = HttpEndpointConfig::from_uri(
1900 "http://example.com/api?blockedHosts=evil.com,malware.net",
1901 )
1902 .unwrap();
1903 assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
1904 }
1905
1906 #[tokio::test]
1907 async fn test_http_producer_blocks_localhost() {
1908 use tower::ServiceExt;
1909
1910 let ctx = test_producer_ctx();
1911 let component = HttpComponent::new();
1912 let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1913 let producer = endpoint.create_producer(&ctx).unwrap();
1914
1915 let mut exchange = Exchange::new(Message::default());
1916 exchange.input.set_header(
1917 "CamelHttpUri",
1918 serde_json::Value::String("http://localhost:8080/internal".to_string()),
1919 );
1920
1921 let result = producer.oneshot(exchange).await;
1922 assert!(result.is_err(), "Should block localhost");
1923 }
1924
1925 #[tokio::test]
1926 async fn test_http_producer_blocks_loopback_ip() {
1927 use tower::ServiceExt;
1928
1929 let ctx = test_producer_ctx();
1930 let component = HttpComponent::new();
1931 let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1932 let producer = endpoint.create_producer(&ctx).unwrap();
1933
1934 let mut exchange = Exchange::new(Message::default());
1935 exchange.input.set_header(
1936 "CamelHttpUri",
1937 serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
1938 );
1939
1940 let result = producer.oneshot(exchange).await;
1941 assert!(result.is_err(), "Should block loopback IP");
1942 }
1943
1944 #[tokio::test]
1945 async fn test_http_producer_allows_private_ip_when_enabled() {
1946 use tower::ServiceExt;
1947
1948 let ctx = test_producer_ctx();
1949 let component = HttpComponent::new();
1950 let endpoint = component
1953 .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true")
1954 .unwrap();
1955 let producer = endpoint.create_producer(&ctx).unwrap();
1956
1957 let exchange = Exchange::new(Message::default());
1958
1959 let result = producer.oneshot(exchange).await;
1962 if let Err(ref e) = result {
1964 let err_str = e.to_string();
1965 assert!(
1966 !err_str.contains("Private IP") && !err_str.contains("not allowed"),
1967 "Should not be SSRF error, got: {}",
1968 err_str
1969 );
1970 }
1971 }
1972
1973 #[test]
1978 fn test_http_server_config_parse() {
1979 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
1980 assert_eq!(cfg.host, "0.0.0.0");
1981 assert_eq!(cfg.port, 8080);
1982 assert_eq!(cfg.path, "/orders");
1983 }
1984
1985 #[test]
1986 fn test_http_server_config_scheme() {
1987 assert_eq!(HttpServerConfig::scheme(), "http");
1989 }
1990
1991 #[test]
1992 fn test_http_server_config_from_components() {
1993 let components = camel_component_api::UriComponents {
1995 scheme: "https".to_string(),
1996 path: "//0.0.0.0:8443/api".to_string(),
1997 params: std::collections::HashMap::from([(
1998 "maxRequestBody".to_string(),
1999 "5242880".to_string(),
2000 )]),
2001 };
2002 let cfg = HttpServerConfig::from_components(components).unwrap();
2003 assert_eq!(cfg.host, "0.0.0.0");
2004 assert_eq!(cfg.port, 8443);
2005 assert_eq!(cfg.path, "/api");
2006 assert_eq!(cfg.max_request_body, 5242880);
2007 }
2008
2009 #[test]
2010 fn test_http_server_config_default_path() {
2011 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
2012 assert_eq!(cfg.path, "/");
2013 }
2014
2015 #[test]
2016 fn test_http_server_config_wrong_scheme() {
2017 assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
2018 }
2019
2020 #[test]
2021 fn test_http_server_config_invalid_port() {
2022 assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
2023 }
2024
2025 #[test]
2026 fn test_http_server_config_default_port_by_scheme() {
2027 let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
2029 assert_eq!(cfg_http.port, 80);
2030
2031 let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
2033 assert_eq!(cfg_https.port, 443);
2034 }
2035
2036 #[test]
2037 fn test_request_envelope_and_reply_are_send() {
2038 fn assert_send<T: Send>() {}
2039 assert_send::<RequestEnvelope>();
2040 assert_send::<HttpReply>();
2041 }
2042
2043 #[test]
2048 fn test_server_registry_global_is_singleton() {
2049 let r1 = ServerRegistry::global();
2050 let r2 = ServerRegistry::global();
2051 assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
2052 }
2053
2054 #[tokio::test]
2055 async fn test_concurrent_get_or_spawn_returns_same_dispatch() {
2056 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2057 let port = listener.local_addr().unwrap().port();
2058 drop(listener);
2059
2060 let results: Arc<std::sync::Mutex<Vec<DispatchTable>>> =
2061 Arc::new(std::sync::Mutex::new(Vec::new()));
2062
2063 let mut handles = Vec::new();
2064 for _ in 0..4 {
2065 let results = results.clone();
2066 handles.push(tokio::spawn(async move {
2067 let dispatch = ServerRegistry::global()
2068 .get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024)
2069 .await
2070 .unwrap();
2071 results.lock().unwrap().push(dispatch);
2072 }));
2073 }
2074
2075 for h in handles {
2076 h.await.unwrap();
2077 }
2078
2079 let dispatches = results.lock().unwrap();
2080 assert_eq!(dispatches.len(), 4);
2081 for i in 1..dispatches.len() {
2082 assert!(
2083 Arc::ptr_eq(&dispatches[0], &dispatches[i]),
2084 "all concurrent callers should get the same dispatch table"
2085 );
2086 }
2087 }
2088
2089 #[tokio::test]
2094 async fn test_dispatch_handler_returns_404_for_unknown_path() {
2095 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
2096 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2098 let port = listener.local_addr().unwrap().port();
2099 tokio::spawn(run_axum_server(listener, dispatch, 2 * 1024 * 1024));
2100
2101 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2103
2104 let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
2105 .await
2106 .unwrap();
2107 assert_eq!(resp.status().as_u16(), 404);
2108 }
2109
2110 #[tokio::test]
2115 async fn test_http_consumer_start_registers_path() {
2116 use camel_component_api::ConsumerContext;
2117
2118 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2120 let port = listener.local_addr().unwrap().port();
2121 drop(listener); let consumer_cfg = HttpServerConfig {
2124 host: "127.0.0.1".to_string(),
2125 port,
2126 path: "/ping".to_string(),
2127 max_request_body: 2 * 1024 * 1024,
2128 max_response_body: 10 * 1024 * 1024,
2129 };
2130 let mut consumer = HttpConsumer::new(consumer_cfg);
2131
2132 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
2133 let token = tokio_util::sync::CancellationToken::new();
2134 let ctx = ConsumerContext::new(tx, token.clone());
2135
2136 tokio::spawn(async move {
2137 consumer.start(ctx).await.unwrap();
2138 });
2139
2140 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2141
2142 let client = reqwest::Client::new();
2143 let resp_future = client
2144 .post(format!("http://127.0.0.1:{port}/ping"))
2145 .body("hello world")
2146 .send();
2147
2148 let (http_result, _) = tokio::join!(resp_future, async {
2149 if let Some(mut envelope) = rx.recv().await {
2150 envelope.exchange.input.set_header(
2152 "CamelHttpResponseCode",
2153 serde_json::Value::Number(201.into()),
2154 );
2155 if let Some(reply_tx) = envelope.reply_tx {
2156 let _ = reply_tx.send(Ok(envelope.exchange));
2157 }
2158 }
2159 });
2160
2161 let resp = http_result.unwrap();
2162 assert_eq!(resp.status().as_u16(), 201);
2163
2164 token.cancel();
2165 }
2166
2167 #[tokio::test]
2172 async fn test_integration_single_consumer_round_trip() {
2173 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2174
2175 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2177 let port = listener.local_addr().unwrap().port();
2178 drop(listener); let component = HttpComponent::new();
2181 let endpoint = component
2182 .create_endpoint(&format!("http://127.0.0.1:{port}/echo"))
2183 .unwrap();
2184 let mut consumer = endpoint.create_consumer().unwrap();
2185
2186 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2187 let token = tokio_util::sync::CancellationToken::new();
2188 let ctx = ConsumerContext::new(tx, token.clone());
2189
2190 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2191 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2192
2193 let client = reqwest::Client::new();
2194 let send_fut = client
2195 .post(format!("http://127.0.0.1:{port}/echo"))
2196 .header("Content-Type", "text/plain")
2197 .body("ping")
2198 .send();
2199
2200 let (http_result, _) = tokio::join!(send_fut, async {
2201 if let Some(mut envelope) = rx.recv().await {
2202 assert_eq!(
2203 envelope.exchange.input.header("CamelHttpMethod"),
2204 Some(&serde_json::Value::String("POST".into()))
2205 );
2206 assert_eq!(
2207 envelope.exchange.input.header("CamelHttpPath"),
2208 Some(&serde_json::Value::String("/echo".into()))
2209 );
2210 envelope.exchange.input.body = camel_component_api::Body::Text("pong".to_string());
2211 if let Some(reply_tx) = envelope.reply_tx {
2212 let _ = reply_tx.send(Ok(envelope.exchange));
2213 }
2214 }
2215 });
2216
2217 let resp = http_result.unwrap();
2218 assert_eq!(resp.status().as_u16(), 200);
2219 let body = resp.text().await.unwrap();
2220 assert_eq!(body, "pong");
2221
2222 token.cancel();
2223 }
2224
2225 #[tokio::test]
2226 async fn test_integration_two_consumers_shared_port() {
2227 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2228
2229 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2231 let port = listener.local_addr().unwrap().port();
2232 drop(listener);
2233
2234 let component = HttpComponent::new();
2235
2236 let endpoint_a = component
2238 .create_endpoint(&format!("http://127.0.0.1:{port}/hello"))
2239 .unwrap();
2240 let mut consumer_a = endpoint_a.create_consumer().unwrap();
2241
2242 let endpoint_b = component
2244 .create_endpoint(&format!("http://127.0.0.1:{port}/world"))
2245 .unwrap();
2246 let mut consumer_b = endpoint_b.create_consumer().unwrap();
2247
2248 let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2249 let token_a = tokio_util::sync::CancellationToken::new();
2250 let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
2251
2252 let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2253 let token_b = tokio_util::sync::CancellationToken::new();
2254 let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
2255
2256 tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
2257 tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
2258 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2259
2260 let client = reqwest::Client::new();
2261
2262 let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
2264 let (resp_hello, _) = tokio::join!(fut_hello, async {
2265 if let Some(mut envelope) = rx_a.recv().await {
2266 envelope.exchange.input.body =
2267 camel_component_api::Body::Text("hello-response".to_string());
2268 if let Some(reply_tx) = envelope.reply_tx {
2269 let _ = reply_tx.send(Ok(envelope.exchange));
2270 }
2271 }
2272 });
2273
2274 let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
2276 let (resp_world, _) = tokio::join!(fut_world, async {
2277 if let Some(mut envelope) = rx_b.recv().await {
2278 envelope.exchange.input.body =
2279 camel_component_api::Body::Text("world-response".to_string());
2280 if let Some(reply_tx) = envelope.reply_tx {
2281 let _ = reply_tx.send(Ok(envelope.exchange));
2282 }
2283 }
2284 });
2285
2286 let body_a = resp_hello.unwrap().text().await.unwrap();
2287 let body_b = resp_world.unwrap().text().await.unwrap();
2288
2289 assert_eq!(body_a, "hello-response");
2290 assert_eq!(body_b, "world-response");
2291
2292 token_a.cancel();
2293 token_b.cancel();
2294 }
2295
2296 #[tokio::test]
2297 async fn test_integration_unregistered_path_returns_404() {
2298 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2299
2300 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2302 let port = listener.local_addr().unwrap().port();
2303 drop(listener);
2304
2305 let component = HttpComponent::new();
2306 let endpoint = component
2307 .create_endpoint(&format!("http://127.0.0.1:{port}/registered"))
2308 .unwrap();
2309 let mut consumer = endpoint.create_consumer().unwrap();
2310
2311 let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2312 let token = tokio_util::sync::CancellationToken::new();
2313 let ctx = ConsumerContext::new(tx, token.clone());
2314
2315 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2316 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2317
2318 let client = reqwest::Client::new();
2319 let resp = client
2320 .get(format!("http://127.0.0.1:{port}/not-there"))
2321 .send()
2322 .await
2323 .unwrap();
2324 assert_eq!(resp.status().as_u16(), 404);
2325
2326 token.cancel();
2327 }
2328
2329 #[test]
2330 fn test_http_consumer_declares_concurrent() {
2331 use camel_component_api::ConcurrencyModel;
2332
2333 let config = HttpServerConfig {
2334 host: "127.0.0.1".to_string(),
2335 port: 19999,
2336 path: "/test".to_string(),
2337 max_request_body: 2 * 1024 * 1024,
2338 max_response_body: 10 * 1024 * 1024,
2339 };
2340 let consumer = HttpConsumer::new(config);
2341 assert_eq!(
2342 consumer.concurrency_model(),
2343 ConcurrencyModel::Concurrent { max: None }
2344 );
2345 }
2346
2347 #[tokio::test]
2352 async fn test_http_reply_body_stream_variant_exists() {
2353 use bytes::Bytes;
2354 use camel_component_api::CamelError;
2355 use futures::stream;
2356
2357 let chunks: Vec<Result<Bytes, CamelError>> =
2358 vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
2359 let stream = Box::pin(stream::iter(chunks));
2360 let reply_body = HttpReplyBody::Stream(stream);
2361 match reply_body {
2363 HttpReplyBody::Stream(_) => {}
2364 HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
2365 }
2366 }
2367
2368 #[cfg(feature = "otel")]
2373 mod otel_tests {
2374 use super::*;
2375 use camel_component_api::Message;
2376 use tower::ServiceExt;
2377
2378 #[tokio::test]
2379 async fn test_producer_injects_traceparent_header() {
2380 let (url, _handle) = start_test_server_with_header_capture().await;
2381 let ctx = test_producer_ctx();
2382
2383 let component = HttpComponent::new();
2384 let endpoint = component
2385 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
2386 .unwrap();
2387 let producer = endpoint.create_producer(&ctx).unwrap();
2388
2389 let mut exchange = Exchange::new(Message::default());
2391 let mut headers = std::collections::HashMap::new();
2392 headers.insert(
2393 "traceparent".to_string(),
2394 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
2395 );
2396 camel_otel::extract_into_exchange(&mut exchange, &headers);
2397
2398 let result = producer.oneshot(exchange).await.unwrap();
2399
2400 let status = result
2402 .input
2403 .header("CamelHttpResponseCode")
2404 .and_then(|v| v.as_u64())
2405 .unwrap();
2406 assert_eq!(status, 200);
2407
2408 let traceparent = result.input.header("x-received-traceparent");
2410 assert!(
2411 traceparent.is_some(),
2412 "traceparent header should have been sent"
2413 );
2414
2415 let traceparent_str = traceparent.unwrap().as_str().unwrap();
2416 let parts: Vec<&str> = traceparent_str.split('-').collect();
2418 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2419 assert_eq!(parts[0], "00", "version should be 00");
2420 assert_eq!(
2421 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2422 "trace-id should match"
2423 );
2424 assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
2425 assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
2426 }
2427
2428 #[tokio::test]
2429 async fn test_consumer_extracts_traceparent_header() {
2430 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2431
2432 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2434 let port = listener.local_addr().unwrap().port();
2435 drop(listener);
2436
2437 let component = HttpComponent::new();
2438 let endpoint = component
2439 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"))
2440 .unwrap();
2441 let mut consumer = endpoint.create_consumer().unwrap();
2442
2443 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2444 let token = tokio_util::sync::CancellationToken::new();
2445 let ctx = ConsumerContext::new(tx, token.clone());
2446
2447 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2448 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2449
2450 let client = reqwest::Client::new();
2452 let send_fut = client
2453 .post(format!("http://127.0.0.1:{port}/trace"))
2454 .header(
2455 "traceparent",
2456 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2457 )
2458 .body("test")
2459 .send();
2460
2461 let (http_result, _) = tokio::join!(send_fut, async {
2462 if let Some(envelope) = rx.recv().await {
2463 let mut injected_headers = std::collections::HashMap::new();
2466 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2467
2468 assert!(
2469 injected_headers.contains_key("traceparent"),
2470 "Exchange should have traceparent after extraction"
2471 );
2472
2473 let traceparent = injected_headers.get("traceparent").unwrap();
2474 let parts: Vec<&str> = traceparent.split('-').collect();
2475 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2476 assert_eq!(
2477 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2478 "Trace ID should match the original traceparent header"
2479 );
2480
2481 if let Some(reply_tx) = envelope.reply_tx {
2482 let _ = reply_tx.send(Ok(envelope.exchange));
2483 }
2484 }
2485 });
2486
2487 let resp = http_result.unwrap();
2488 assert_eq!(resp.status().as_u16(), 200);
2489
2490 token.cancel();
2491 }
2492
2493 #[tokio::test]
2494 async fn test_consumer_extracts_mixed_case_traceparent_header() {
2495 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2496
2497 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2499 let port = listener.local_addr().unwrap().port();
2500 drop(listener);
2501
2502 let component = HttpComponent::new();
2503 let endpoint = component
2504 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"))
2505 .unwrap();
2506 let mut consumer = endpoint.create_consumer().unwrap();
2507
2508 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2509 let token = tokio_util::sync::CancellationToken::new();
2510 let ctx = ConsumerContext::new(tx, token.clone());
2511
2512 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2513 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2514
2515 let client = reqwest::Client::new();
2517 let send_fut = client
2518 .post(format!("http://127.0.0.1:{port}/trace"))
2519 .header(
2520 "TraceParent",
2521 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2522 )
2523 .body("test")
2524 .send();
2525
2526 let (http_result, _) = tokio::join!(send_fut, async {
2527 if let Some(envelope) = rx.recv().await {
2528 let mut injected_headers = HashMap::new();
2531 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2532
2533 assert!(
2534 injected_headers.contains_key("traceparent"),
2535 "Exchange should have traceparent after extraction from mixed-case header"
2536 );
2537
2538 let traceparent = injected_headers.get("traceparent").unwrap();
2539 let parts: Vec<&str> = traceparent.split('-').collect();
2540 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2541 assert_eq!(
2542 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2543 "Trace ID should match the original mixed-case TraceParent header"
2544 );
2545
2546 if let Some(reply_tx) = envelope.reply_tx {
2547 let _ = reply_tx.send(Ok(envelope.exchange));
2548 }
2549 }
2550 });
2551
2552 let resp = http_result.unwrap();
2553 assert_eq!(resp.status().as_u16(), 200);
2554
2555 token.cancel();
2556 }
2557
2558 #[tokio::test]
2559 async fn test_producer_no_trace_context_no_crash() {
2560 let (url, _handle) = start_test_server().await;
2561 let ctx = test_producer_ctx();
2562
2563 let component = HttpComponent::new();
2564 let endpoint = component
2565 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
2566 .unwrap();
2567 let producer = endpoint.create_producer(&ctx).unwrap();
2568
2569 let exchange = Exchange::new(Message::default());
2571
2572 let result = producer.oneshot(exchange).await.unwrap();
2574
2575 let status = result
2577 .input
2578 .header("CamelHttpResponseCode")
2579 .and_then(|v| v.as_u64())
2580 .unwrap();
2581 assert_eq!(status, 200);
2582 }
2583
2584 async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
2586 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2587 let addr = listener.local_addr().unwrap();
2588 let url = format!("http://127.0.0.1:{}", addr.port());
2589
2590 let handle = tokio::spawn(async move {
2591 loop {
2592 if let Ok((mut stream, _)) = listener.accept().await {
2593 tokio::spawn(async move {
2594 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2595 let mut buf = vec![0u8; 8192];
2596 let n = stream.read(&mut buf).await.unwrap_or(0);
2597 let request = String::from_utf8_lossy(&buf[..n]).to_string();
2598
2599 let traceparent = request
2601 .lines()
2602 .find(|line| line.to_lowercase().starts_with("traceparent:"))
2603 .map(|line| {
2604 line.split(':')
2605 .nth(1)
2606 .map(|s| s.trim().to_string())
2607 .unwrap_or_default()
2608 })
2609 .unwrap_or_default();
2610
2611 let body =
2612 format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
2613 let response = format!(
2614 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
2615 body.len(),
2616 traceparent,
2617 body
2618 );
2619 let _ = stream.write_all(response.as_bytes()).await;
2620 });
2621 }
2622 }
2623 });
2624
2625 (url, handle)
2626 }
2627 }
2628
2629 #[tokio::test]
2638 async fn test_request_body_arrives_as_stream() {
2639 use camel_component_api::Body;
2640 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2641
2642 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2643 let port = listener.local_addr().unwrap().port();
2644 drop(listener);
2645
2646 let component = HttpComponent::new();
2647 let endpoint = component
2648 .create_endpoint(&format!("http://127.0.0.1:{port}/upload"))
2649 .unwrap();
2650 let mut consumer = endpoint.create_consumer().unwrap();
2651
2652 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2653 let token = tokio_util::sync::CancellationToken::new();
2654 let ctx = ConsumerContext::new(tx, token.clone());
2655
2656 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2657 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2658
2659 let client = reqwest::Client::new();
2660 let send_fut = client
2661 .post(format!("http://127.0.0.1:{port}/upload"))
2662 .body("hello streaming world")
2663 .send();
2664
2665 let (http_result, _) = tokio::join!(send_fut, async {
2666 if let Some(mut envelope) = rx.recv().await {
2667 assert!(
2669 matches!(envelope.exchange.input.body, Body::Stream(_)),
2670 "expected Body::Stream, got discriminant {:?}",
2671 std::mem::discriminant(&envelope.exchange.input.body)
2672 );
2673 let bytes = envelope
2675 .exchange
2676 .input
2677 .body
2678 .into_bytes(1024 * 1024)
2679 .await
2680 .unwrap();
2681 assert_eq!(&bytes[..], b"hello streaming world");
2682
2683 envelope.exchange.input.body = camel_component_api::Body::Empty;
2684 if let Some(reply_tx) = envelope.reply_tx {
2685 let _ = reply_tx.send(Ok(envelope.exchange));
2686 }
2687 }
2688 });
2689
2690 let resp = http_result.unwrap();
2691 assert_eq!(resp.status().as_u16(), 200);
2692
2693 token.cancel();
2694 }
2695
2696 #[tokio::test]
2701 async fn test_streaming_response_chunked() {
2702 use bytes::Bytes;
2703 use camel_component_api::Body;
2704 use camel_component_api::CamelError;
2705 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
2706 use camel_component_api::{StreamBody, StreamMetadata};
2707 use futures::stream;
2708 use std::sync::Arc;
2709 use tokio::sync::Mutex;
2710
2711 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2712 let port = listener.local_addr().unwrap().port();
2713 drop(listener);
2714
2715 let component = HttpComponent::new();
2716 let endpoint = component
2717 .create_endpoint(&format!("http://127.0.0.1:{port}/stream"))
2718 .unwrap();
2719 let mut consumer = endpoint.create_consumer().unwrap();
2720
2721 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2722 let token = tokio_util::sync::CancellationToken::new();
2723 let ctx = ConsumerContext::new(tx, token.clone());
2724
2725 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2726 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2727
2728 let client = reqwest::Client::new();
2729 let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
2730
2731 let (http_result, _) = tokio::join!(send_fut, async {
2732 if let Some(mut envelope) = rx.recv().await {
2733 let chunks: Vec<Result<Bytes, CamelError>> =
2735 vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
2736 let stream = Box::pin(stream::iter(chunks));
2737 envelope.exchange.input.body = Body::Stream(StreamBody {
2738 stream: Arc::new(Mutex::new(Some(stream))),
2739 metadata: StreamMetadata::default(),
2740 });
2741 if let Some(reply_tx) = envelope.reply_tx {
2742 let _ = reply_tx.send(Ok(envelope.exchange));
2743 }
2744 }
2745 });
2746
2747 let resp = http_result.unwrap();
2748 assert_eq!(resp.status().as_u16(), 200);
2749 let body = resp.text().await.unwrap();
2750 assert_eq!(body, "chunk1chunk2");
2751
2752 token.cancel();
2753 }
2754
2755 #[tokio::test]
2760 async fn test_413_when_content_length_exceeds_limit() {
2761 use camel_component_api::ConsumerContext;
2762
2763 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2764 let port = listener.local_addr().unwrap().port();
2765 drop(listener);
2766
2767 let component = HttpComponent::new();
2769 let endpoint = component
2770 .create_endpoint(&format!(
2771 "http://127.0.0.1:{port}/upload?maxRequestBody=100"
2772 ))
2773 .unwrap();
2774 let mut consumer = endpoint.create_consumer().unwrap();
2775
2776 let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
2777 let token = tokio_util::sync::CancellationToken::new();
2778 let ctx = ConsumerContext::new(tx, token.clone());
2779
2780 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2781 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2782
2783 let client = reqwest::Client::new();
2784 let resp = client
2785 .post(format!("http://127.0.0.1:{port}/upload"))
2786 .header("Content-Length", "1000") .body("x".repeat(1000))
2788 .send()
2789 .await
2790 .unwrap();
2791
2792 assert_eq!(resp.status().as_u16(), 413);
2793
2794 token.cancel();
2795 }
2796
2797 #[tokio::test]
2801 async fn test_chunked_upload_without_content_length_bypasses_limit() {
2802 use bytes::Bytes;
2803 use camel_component_api::Body;
2804 use camel_component_api::ConsumerContext;
2805 use futures::stream;
2806
2807 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2808 let port = listener.local_addr().unwrap().port();
2809 drop(listener);
2810
2811 let component = HttpComponent::new();
2813 let endpoint = component
2814 .create_endpoint(&format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"))
2815 .unwrap();
2816 let mut consumer = endpoint.create_consumer().unwrap();
2817
2818 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
2819 let token = tokio_util::sync::CancellationToken::new();
2820 let ctx = ConsumerContext::new(tx, token.clone());
2821
2822 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2823 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2824
2825 let client = reqwest::Client::new();
2826
2827 let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
2831 Ok(Bytes::from("y".repeat(50))),
2832 Ok(Bytes::from("y".repeat(50))),
2833 ];
2834 let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
2835 let send_fut = client
2836 .post(format!("http://127.0.0.1:{port}/upload"))
2837 .body(stream_body)
2838 .send();
2839
2840 let consumer_fut = async {
2841 match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
2843 Ok(Some(mut envelope)) => {
2844 assert!(
2845 matches!(envelope.exchange.input.body, Body::Stream(_)),
2846 "expected Body::Stream"
2847 );
2848 envelope.exchange.input.body = camel_component_api::Body::Empty;
2849 if let Some(reply_tx) = envelope.reply_tx {
2850 let _ = reply_tx.send(Ok(envelope.exchange));
2851 }
2852 }
2853 Ok(None) => panic!("consumer channel closed unexpectedly"),
2854 Err(_) => {
2855 }
2858 }
2859 };
2860
2861 let (http_result, _) = tokio::join!(send_fut, consumer_fut);
2862
2863 let resp = http_result.unwrap();
2864 assert_ne!(
2866 resp.status().as_u16(),
2867 413,
2868 "chunked upload must not be rejected by maxRequestBody"
2869 );
2870 assert_eq!(resp.status().as_u16(), 200);
2871
2872 token.cancel();
2873 }
2874}