1pub mod config;
2pub use config::HttpConfig;
3
4use std::collections::HashMap;
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::{Arc, Mutex, OnceLock};
8use std::task::{Context, Poll};
9use std::time::Duration;
10
11use tokio::sync::{OnceCell, RwLock};
12use tower::Service;
13use tracing::debug;
14
15use axum::body::BodyDataStream;
16use camel_api::{
17 BoxProcessor, CamelError, Exchange,
18 body::{Body, StreamBody, StreamMetadata},
19};
20use camel_component::{Component, Consumer, Endpoint, ProducerContext};
21use camel_endpoint::{UriComponents, UriConfig, parse_uri};
22use futures::TryStreamExt;
23use futures::stream::BoxStream;
24
25#[derive(Debug, Clone)]
86pub struct HttpEndpointConfig {
87 pub base_url: String,
88 pub http_method: Option<String>,
89 pub throw_exception_on_failure: bool,
90 pub ok_status_code_range: (u16, u16),
91 pub response_timeout: Option<Duration>,
92 pub query_params: HashMap<String, String>,
93 pub allow_private_ips: bool,
94 pub blocked_hosts: Vec<String>,
95 pub max_body_size: usize,
96}
97
98const HTTP_CAMEL_OPTIONS: &[&str] = &[
100 "httpMethod",
101 "throwExceptionOnFailure",
102 "okStatusCodeRange",
103 "followRedirects",
104 "connectTimeout",
105 "responseTimeout",
106 "allowPrivateIps",
107 "blockedHosts",
108 "maxBodySize",
109];
110
111impl UriConfig for HttpEndpointConfig {
112 fn scheme() -> &'static str {
114 "http"
115 }
116
117 fn from_uri(uri: &str) -> Result<Self, CamelError> {
118 let parts = parse_uri(uri)?;
119 Self::from_components(parts)
120 }
121
122 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
123 if parts.scheme != "http" && parts.scheme != "https" {
125 return Err(CamelError::InvalidUri(format!(
126 "expected scheme 'http' or 'https', got '{}'",
127 parts.scheme
128 )));
129 }
130
131 let base_url = format!("{}:{}", parts.scheme, parts.path);
134
135 let http_method = parts.params.get("httpMethod").cloned();
136
137 let throw_exception_on_failure = parts
138 .params
139 .get("throwExceptionOnFailure")
140 .map(|v| v != "false")
141 .unwrap_or(true);
142
143 let ok_status_code_range = parts
145 .params
146 .get("okStatusCodeRange")
147 .and_then(|v| {
148 let (start, end) = v.split_once('-')?;
149 Some((start.parse::<u16>().ok()?, end.parse::<u16>().ok()?))
150 })
151 .unwrap_or((200, 299));
152
153 let response_timeout = parts
154 .params
155 .get("responseTimeout")
156 .and_then(|v| v.parse::<u64>().ok())
157 .map(Duration::from_millis);
158
159 let allow_private_ips = parts
161 .params
162 .get("allowPrivateIps")
163 .map(|v| v == "true")
164 .unwrap_or(false); let blocked_hosts = parts
168 .params
169 .get("blockedHosts")
170 .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
171 .unwrap_or_default();
172
173 let max_body_size = parts
174 .params
175 .get("maxBodySize")
176 .and_then(|v| v.parse::<usize>().ok())
177 .unwrap_or(10 * 1024 * 1024); let query_params: HashMap<String, String> = parts
181 .params
182 .into_iter()
183 .filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
184 .collect();
185
186 Ok(Self {
187 base_url,
188 http_method,
189 throw_exception_on_failure,
190 ok_status_code_range,
191 response_timeout,
192 query_params,
193 allow_private_ips,
194 blocked_hosts,
195 max_body_size,
196 })
197 }
198}
199
200impl HttpEndpointConfig {
201 pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
202 let parts = parse_uri(uri)?;
203 let mut endpoint = Self::from_components(parts.clone())?;
204 if endpoint.response_timeout.is_none() {
205 endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
206 }
207 if !parts.params.contains_key("allowPrivateIps") {
208 endpoint.allow_private_ips = config.allow_private_ips;
209 }
210 if !parts.params.contains_key("blockedHosts") {
211 endpoint.blocked_hosts = config.blocked_hosts.clone();
212 }
213 if !parts.params.contains_key("maxBodySize") {
214 endpoint.max_body_size = config.max_body_size;
215 }
216 Ok(endpoint)
217 }
218}
219
220#[derive(Debug, Clone)]
226pub struct HttpServerConfig {
227 pub host: String,
229 pub port: u16,
231 pub path: String,
233 pub max_request_body: usize,
235 pub max_response_body: usize,
237}
238
239impl UriConfig for HttpServerConfig {
240 fn scheme() -> &'static str {
242 "http"
243 }
244
245 fn from_uri(uri: &str) -> Result<Self, CamelError> {
246 let parts = parse_uri(uri)?;
247 Self::from_components(parts)
248 }
249
250 fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
251 if parts.scheme != "http" && parts.scheme != "https" {
253 return Err(CamelError::InvalidUri(format!(
254 "expected scheme 'http' or 'https', got '{}'",
255 parts.scheme
256 )));
257 }
258
259 let authority_and_path = parts.path.trim_start_matches('/');
262
263 let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
265 (&authority_and_path[..idx], &authority_and_path[idx..])
266 } else {
267 (authority_and_path, "/")
268 };
269
270 let path = if path_suffix.is_empty() {
271 "/"
272 } else {
273 path_suffix
274 }
275 .to_string();
276
277 let (host, port) = if let Some(colon) = authority.rfind(':') {
279 let port_str = &authority[colon + 1..];
280 match port_str.parse::<u16>() {
281 Ok(p) => (authority[..colon].to_string(), p),
282 Err(_) => {
283 return Err(CamelError::InvalidUri(format!(
284 "invalid port '{}' in authority",
285 port_str
286 )));
287 }
288 }
289 } else {
290 let default_port = if parts.scheme == "https" { 443 } else { 80 };
292 (authority.to_string(), default_port)
293 };
294
295 let max_request_body = parts
296 .params
297 .get("maxRequestBody")
298 .and_then(|v| v.parse::<usize>().ok())
299 .unwrap_or(2 * 1024 * 1024); let max_response_body = parts
302 .params
303 .get("maxResponseBody")
304 .and_then(|v| v.parse::<usize>().ok())
305 .unwrap_or(10 * 1024 * 1024); Ok(Self {
308 host,
309 port,
310 path,
311 max_request_body,
312 max_response_body,
313 })
314 }
315}
316
317impl HttpServerConfig {
318 pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
319 let parts = parse_uri(uri)?;
320 let mut server = Self::from_components(parts.clone())?;
321 if !parts.params.contains_key("maxRequestBody") {
322 server.max_request_body = config.max_request_body;
323 }
324 if !parts.params.contains_key("maxResponseBody") {
325 server.max_response_body = config.max_body_size;
326 }
327 Ok(server)
328 }
329}
330
331pub(crate) enum HttpReplyBody {
337 Bytes(bytes::Bytes),
338 Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
339}
340
341pub(crate) struct RequestEnvelope {
344 pub(crate) method: String,
345 pub(crate) path: String,
346 pub(crate) query: String,
347 pub(crate) headers: http::HeaderMap,
348 pub(crate) body: StreamBody,
349 pub(crate) reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
350}
351
352pub(crate) struct HttpReply {
354 pub(crate) status: u16,
355 pub(crate) headers: Vec<(String, String)>,
356 pub(crate) body: HttpReplyBody,
357}
358
359pub(crate) type DispatchTable =
365 Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
366
367struct ServerHandle {
369 dispatch: DispatchTable,
370 _task: tokio::task::JoinHandle<()>,
372}
373
374pub struct ServerRegistry {
376 inner: Mutex<HashMap<u16, Arc<OnceCell<ServerHandle>>>>,
377}
378
379impl ServerRegistry {
380 pub fn global() -> &'static Self {
382 static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
383 INSTANCE.get_or_init(|| ServerRegistry {
384 inner: Mutex::new(HashMap::new()),
385 })
386 }
387
388 pub(crate) async fn get_or_spawn(
391 &'static self,
392 host: &str,
393 port: u16,
394 max_request_body: usize,
395 ) -> Result<DispatchTable, CamelError> {
396 let host_owned = host.to_string();
397
398 let cell = {
399 let mut guard = self.inner.lock().map_err(|_| {
400 CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
401 })?;
402 guard
403 .entry(port)
404 .or_insert_with(|| Arc::new(OnceCell::new()))
405 .clone()
406 };
407
408 let handle = cell
409 .get_or_try_init(|| async {
410 let addr = format!("{host_owned}:{port}");
411 let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
412 CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
413 })?;
414 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
415 let task = tokio::spawn(run_axum_server(
416 listener,
417 Arc::clone(&dispatch),
418 max_request_body,
419 ));
420 Ok::<ServerHandle, CamelError>(ServerHandle {
421 dispatch,
422 _task: task,
423 })
424 })
425 .await?;
426
427 Ok(Arc::clone(&handle.dispatch))
428 }
429}
430
431use axum::{
436 Router,
437 body::Body as AxumBody,
438 extract::{Request, State},
439 http::{Response, StatusCode},
440 response::IntoResponse,
441};
442
443#[derive(Clone)]
444struct AppState {
445 dispatch: DispatchTable,
446 max_request_body: usize,
447}
448
449async fn run_axum_server(
450 listener: tokio::net::TcpListener,
451 dispatch: DispatchTable,
452 max_request_body: usize,
453) {
454 let state = AppState {
455 dispatch,
456 max_request_body,
457 };
458 let app = Router::new().fallback(dispatch_handler).with_state(state);
459
460 axum::serve(listener, app).await.unwrap_or_else(|e| {
461 tracing::error!(error = %e, "Axum server error");
462 });
463}
464
465async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
466 let method = req.method().to_string();
467 let path = req.uri().path().to_string();
468 let query = req.uri().query().unwrap_or("").to_string();
469 let headers = req.headers().clone();
470
471 let content_length: Option<u64> = headers
473 .get(http::header::CONTENT_LENGTH)
474 .and_then(|v| v.to_str().ok())
475 .and_then(|s| s.parse().ok());
476
477 if let Some(len) = content_length
478 && len > state.max_request_body as u64
479 {
480 return Response::builder()
481 .status(StatusCode::PAYLOAD_TOO_LARGE)
482 .body(AxumBody::from("Request body exceeds configured limit"))
483 .expect("infallible");
484 }
485
486 let content_type = headers
488 .get(http::header::CONTENT_TYPE)
489 .and_then(|v| v.to_str().ok())
490 .map(|s| s.to_string());
491
492 let data_stream: BodyDataStream = req.into_body().into_data_stream();
493 let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
494 let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
495
496 let stream_body = StreamBody {
497 stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
498 metadata: StreamMetadata {
499 size_hint: content_length,
500 content_type,
501 origin: None,
502 },
503 };
504
505 let sender = {
507 let table = state.dispatch.read().await;
508 table.get(&path).cloned()
509 };
510 let Some(sender) = sender else {
511 return Response::builder()
512 .status(StatusCode::NOT_FOUND)
513 .body(AxumBody::from("No consumer registered for this path"))
514 .expect("infallible");
515 };
516
517 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
518 let envelope = RequestEnvelope {
519 method,
520 path,
521 query,
522 headers,
523 body: stream_body,
524 reply_tx,
525 };
526
527 if sender.send(envelope).await.is_err() {
528 return Response::builder()
529 .status(StatusCode::SERVICE_UNAVAILABLE)
530 .body(AxumBody::from("Consumer unavailable"))
531 .expect("infallible");
532 }
533
534 match reply_rx.await {
535 Ok(reply) => {
536 let status =
537 StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
538 let mut builder = Response::builder().status(status);
539 for (k, v) in &reply.headers {
540 builder = builder.header(k.as_str(), v.as_str());
541 }
542 match reply.body {
543 HttpReplyBody::Bytes(b) => builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
544 Response::builder()
545 .status(StatusCode::INTERNAL_SERVER_ERROR)
546 .body(AxumBody::from("Invalid response headers from consumer"))
547 .expect("infallible")
548 }),
549 HttpReplyBody::Stream(stream) => builder
550 .body(AxumBody::from_stream(stream))
551 .unwrap_or_else(|_| {
552 Response::builder()
553 .status(StatusCode::INTERNAL_SERVER_ERROR)
554 .body(AxumBody::from("Invalid response headers from consumer"))
555 .expect("infallible")
556 }),
557 }
558 }
559 Err(_) => Response::builder()
560 .status(StatusCode::INTERNAL_SERVER_ERROR)
561 .body(AxumBody::from("Pipeline error"))
562 .expect("Response::builder() with a known-valid status code and body is infallible"),
563 }
564}
565
566pub struct HttpConsumer {
571 config: HttpServerConfig,
572}
573
574impl HttpConsumer {
575 pub fn new(config: HttpServerConfig) -> Self {
576 Self { config }
577 }
578}
579
580#[async_trait::async_trait]
581impl Consumer for HttpConsumer {
582 async fn start(&mut self, ctx: camel_component::ConsumerContext) -> Result<(), CamelError> {
583 use camel_api::{Exchange, Message, body::Body};
584
585 let dispatch = ServerRegistry::global()
586 .get_or_spawn(
587 &self.config.host,
588 self.config.port,
589 self.config.max_request_body,
590 )
591 .await?;
592
593 let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
595 {
596 let mut table = dispatch.write().await;
597 table.insert(self.config.path.clone(), env_tx);
598 }
599
600 let path = self.config.path.clone();
601 let cancel_token = ctx.cancel_token();
602 let _max_response_body = self.config.max_response_body;
603
604 loop {
605 tokio::select! {
606 _ = ctx.cancelled() => {
607 break;
608 }
609 envelope = env_rx.recv() => {
610 let Some(envelope) = envelope else { break; };
611
612 let mut msg = Message::default();
614
615 msg.set_header("CamelHttpMethod",
617 serde_json::Value::String(envelope.method.clone()));
618 msg.set_header("CamelHttpPath",
619 serde_json::Value::String(envelope.path.clone()));
620 msg.set_header("CamelHttpQuery",
621 serde_json::Value::String(envelope.query.clone()));
622
623 for (k, v) in &envelope.headers {
625 if let Ok(val_str) = v.to_str() {
626 msg.set_header(
627 k.as_str(),
628 serde_json::Value::String(val_str.to_string()),
629 );
630 }
631 }
632
633 msg.body = Body::Stream(envelope.body);
636
637 #[allow(unused_mut)]
638 let mut exchange = Exchange::new(msg);
639
640 #[cfg(feature = "otel")]
642 {
643 let headers: HashMap<String, String> = envelope
644 .headers
645 .iter()
646 .filter_map(|(k, v)| {
647 Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
648 })
649 .collect();
650 camel_otel::extract_into_exchange(&mut exchange, &headers);
651 }
652
653 let reply_tx = envelope.reply_tx;
654 let sender = ctx.sender().clone();
655 let path_clone = path.clone();
656 let cancel = cancel_token.clone();
657
658 tokio::spawn(async move {
678 if cancel.is_cancelled() {
686 let _ = reply_tx.send(HttpReply {
687 status: 503,
688 headers: vec![],
689 body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
690 });
691 return;
692 }
693
694 let (tx, rx) = tokio::sync::oneshot::channel();
696 let envelope = camel_component::consumer::ExchangeEnvelope {
697 exchange,
698 reply_tx: Some(tx),
699 };
700
701 let result = match sender.send(envelope).await {
702 Ok(()) => rx.await.map_err(|_| camel_api::CamelError::ChannelClosed),
703 Err(_) => Err(camel_api::CamelError::ChannelClosed),
704 }
705 .and_then(|r| r);
706
707 let reply = match result {
708 Ok(out) => {
709 let status = out
710 .input
711 .header("CamelHttpResponseCode")
712 .and_then(|v| v.as_u64())
713 .map(|s| s as u16)
714 .unwrap_or(200);
715
716 let reply_body: HttpReplyBody = match out.input.body {
717 Body::Empty => HttpReplyBody::Bytes(bytes::Bytes::new()),
718 Body::Bytes(b) => HttpReplyBody::Bytes(b),
719 Body::Text(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
720 Body::Xml(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
721 Body::Json(v) => HttpReplyBody::Bytes(bytes::Bytes::from(
722 v.to_string().into_bytes(),
723 )),
724 Body::Stream(s) => {
725 match s.stream.lock().await.take() {
726 Some(stream) => HttpReplyBody::Stream(stream),
727 None => {
728 tracing::error!(
729 "Body::Stream already consumed before HTTP reply — returning 500"
730 );
731 let error_reply = HttpReply {
732 status: 500,
733 headers: vec![],
734 body: HttpReplyBody::Bytes(bytes::Bytes::new()),
735 };
736 if reply_tx.send(error_reply).is_err() {
737 debug!("reply_tx dropped before error reply could be sent");
738 }
739 return;
740 }
741 }
742 }
743 };
744
745 let resp_headers: Vec<(String, String)> = out
746 .input
747 .headers
748 .iter()
749 .filter(|(k, _)| !k.starts_with("Camel"))
751 .filter(|(k, _)| {
754 !matches!(
755 k.to_lowercase().as_str(),
756 "content-length" | "content-type" | "transfer-encoding" | "connection" | "cache-control" | "date" | "pragma" | "trailer" | "upgrade" | "via" | "warning" | "host" | "user-agent" | "accept" | "accept-encoding" | "accept-language" | "accept-charset" | "authorization" | "proxy-authorization" | "cookie" | "expect" | "from" | "if-match" | "if-modified-since" | "if-none-match" | "if-range" | "if-unmodified-since" | "max-forwards" | "proxy-connection" | "range" | "referer" | "te" )
791 })
792 .filter_map(|(k, v)| {
793 v.as_str().map(|s| (k.clone(), s.to_string()))
794 })
795 .collect();
796
797 HttpReply {
798 status,
799 headers: resp_headers,
800 body: reply_body,
801 }
802 }
803 Err(e) => {
804 tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
805 HttpReply {
806 status: 500,
807 headers: vec![],
808 body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
809 }
810 }
811 };
812
813 let _ = reply_tx.send(reply);
815 });
816 }
817 }
818 }
819
820 {
822 let mut table = dispatch.write().await;
823 table.remove(&path);
824 }
825
826 Ok(())
827 }
828
829 async fn stop(&mut self) -> Result<(), CamelError> {
830 Ok(())
831 }
832
833 fn concurrency_model(&self) -> camel_component::ConcurrencyModel {
834 camel_component::ConcurrencyModel::Concurrent { max: None }
835 }
836}
837
838pub struct HttpComponent {
843 client: reqwest::Client,
844 config: HttpConfig,
845}
846
847fn build_client(config: &HttpConfig) -> reqwest::Client {
848 let mut builder = reqwest::Client::builder()
849 .connect_timeout(Duration::from_millis(config.connect_timeout_ms))
850 .pool_max_idle_per_host(config.pool_max_idle_per_host)
851 .pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
852
853 if !config.follow_redirects {
854 builder = builder.redirect(reqwest::redirect::Policy::none());
855 }
856
857 builder
858 .build()
859 .expect("reqwest::Client::build() with valid config should not fail")
860}
861
862impl HttpComponent {
863 pub fn new() -> Self {
864 let config = HttpConfig::default();
865 let client = build_client(&config);
866 Self { client, config }
867 }
868
869 pub fn with_config(config: HttpConfig) -> Self {
870 let client = build_client(&config);
871 Self { client, config }
872 }
873
874 pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
875 match config {
876 Some(cfg) => Self::with_config(cfg),
877 None => Self::new(),
878 }
879 }
880}
881
882impl Default for HttpComponent {
883 fn default() -> Self {
884 Self::new()
885 }
886}
887
888impl Component for HttpComponent {
889 fn scheme(&self) -> &str {
890 "http"
891 }
892
893 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
894 let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
895 let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
896 Ok(Box::new(HttpEndpoint {
897 uri: uri.to_string(),
898 config,
899 server_config,
900 client: self.client.clone(),
901 }))
902 }
903}
904
905pub struct HttpsComponent {
906 client: reqwest::Client,
907 config: HttpConfig,
908}
909
910impl HttpsComponent {
911 pub fn new() -> Self {
912 let config = HttpConfig::default();
913 let client = build_client(&config);
914 Self { client, config }
915 }
916
917 pub fn with_config(config: HttpConfig) -> Self {
918 let client = build_client(&config);
919 Self { client, config }
920 }
921
922 pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
923 match config {
924 Some(cfg) => Self::with_config(cfg),
925 None => Self::new(),
926 }
927 }
928}
929
930impl Default for HttpsComponent {
931 fn default() -> Self {
932 Self::new()
933 }
934}
935
936impl Component for HttpsComponent {
937 fn scheme(&self) -> &str {
938 "https"
939 }
940
941 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
942 let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
943 let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
944 Ok(Box::new(HttpEndpoint {
945 uri: uri.to_string(),
946 config,
947 server_config,
948 client: self.client.clone(),
949 }))
950 }
951}
952
953struct HttpEndpoint {
958 uri: String,
959 config: HttpEndpointConfig,
960 server_config: HttpServerConfig,
961 client: reqwest::Client,
962}
963
964impl Endpoint for HttpEndpoint {
965 fn uri(&self) -> &str {
966 &self.uri
967 }
968
969 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
970 Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
971 }
972
973 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
974 Ok(BoxProcessor::new(HttpProducer {
975 config: Arc::new(self.config.clone()),
976 client: self.client.clone(),
977 }))
978 }
979}
980
981fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
986 let parsed = url::Url::parse(url)
987 .map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
988
989 if let Some(host) = parsed.host_str()
991 && config.blocked_hosts.iter().any(|blocked| host == blocked)
992 {
993 return Err(CamelError::ProcessorError(format!(
994 "Host '{}' is blocked",
995 host
996 )));
997 }
998
999 if !config.allow_private_ips
1001 && let Some(host) = parsed.host()
1002 {
1003 match host {
1004 url::Host::Ipv4(ip) => {
1005 if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
1006 return Err(CamelError::ProcessorError(format!(
1007 "Private IP '{}' not allowed (set allowPrivateIps=true to override)",
1008 ip
1009 )));
1010 }
1011 }
1012 url::Host::Ipv6(ip) => {
1013 if ip.is_loopback() {
1014 return Err(CamelError::ProcessorError(format!(
1015 "Loopback IP '{}' not allowed",
1016 ip
1017 )));
1018 }
1019 }
1020 url::Host::Domain(domain) => {
1021 let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
1023 if blocked_domains.contains(&domain) {
1024 return Err(CamelError::ProcessorError(format!(
1025 "Domain '{}' is not allowed",
1026 domain
1027 )));
1028 }
1029 }
1030 }
1031 }
1032
1033 Ok(())
1034}
1035
1036#[derive(Clone)]
1041struct HttpProducer {
1042 config: Arc<HttpEndpointConfig>,
1043 client: reqwest::Client,
1044}
1045
1046impl HttpProducer {
1047 fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1048 if let Some(ref method) = config.http_method {
1049 return method.to_uppercase();
1050 }
1051 if let Some(method) = exchange
1052 .input
1053 .header("CamelHttpMethod")
1054 .and_then(|v| v.as_str())
1055 {
1056 return method.to_uppercase();
1057 }
1058 if !exchange.input.body.is_empty() {
1059 return "POST".to_string();
1060 }
1061 "GET".to_string()
1062 }
1063
1064 fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
1065 if let Some(uri) = exchange
1066 .input
1067 .header("CamelHttpUri")
1068 .and_then(|v| v.as_str())
1069 {
1070 let mut url = uri.to_string();
1071 if let Some(path) = exchange
1072 .input
1073 .header("CamelHttpPath")
1074 .and_then(|v| v.as_str())
1075 {
1076 if !url.ends_with('/') && !path.starts_with('/') {
1077 url.push('/');
1078 }
1079 url.push_str(path);
1080 }
1081 if let Some(query) = exchange
1082 .input
1083 .header("CamelHttpQuery")
1084 .and_then(|v| v.as_str())
1085 {
1086 url.push('?');
1087 url.push_str(query);
1088 }
1089 return url;
1090 }
1091
1092 let mut url = config.base_url.clone();
1093
1094 if let Some(path) = exchange
1095 .input
1096 .header("CamelHttpPath")
1097 .and_then(|v| v.as_str())
1098 {
1099 if !url.ends_with('/') && !path.starts_with('/') {
1100 url.push('/');
1101 }
1102 url.push_str(path);
1103 }
1104
1105 if let Some(query) = exchange
1106 .input
1107 .header("CamelHttpQuery")
1108 .and_then(|v| v.as_str())
1109 {
1110 url.push('?');
1111 url.push_str(query);
1112 } else if !config.query_params.is_empty() {
1113 url.push('?');
1115 let query_string: String = config
1116 .query_params
1117 .iter()
1118 .map(|(k, v)| format!("{k}={v}"))
1119 .collect::<Vec<_>>()
1120 .join("&");
1121 url.push_str(&query_string);
1122 }
1123
1124 url
1125 }
1126
1127 fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
1128 status >= range.0 && status <= range.1
1129 }
1130}
1131
1132impl Service<Exchange> for HttpProducer {
1133 type Response = Exchange;
1134 type Error = CamelError;
1135 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1136
1137 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1138 Poll::Ready(Ok(()))
1139 }
1140
1141 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1142 let config = self.config.clone();
1143 let client = self.client.clone();
1144
1145 Box::pin(async move {
1146 let method_str = HttpProducer::resolve_method(&exchange, &config);
1147 let url = HttpProducer::resolve_url(&exchange, &config);
1148
1149 validate_url_for_ssrf(&url, &config)?;
1151
1152 debug!(
1153 correlation_id = %exchange.correlation_id(),
1154 method = %method_str,
1155 url = %url,
1156 "HTTP request"
1157 );
1158
1159 let method = method_str.parse::<reqwest::Method>().map_err(|e| {
1160 CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
1161 })?;
1162
1163 let mut request = client.request(method, &url);
1164
1165 if let Some(timeout) = config.response_timeout {
1166 request = request.timeout(timeout);
1167 }
1168
1169 #[cfg(feature = "otel")]
1171 {
1172 let mut otel_headers = HashMap::new();
1173 camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
1174 for (k, v) in otel_headers {
1175 if let (Ok(name), Ok(val)) = (
1176 reqwest::header::HeaderName::from_bytes(k.as_bytes()),
1177 reqwest::header::HeaderValue::from_str(&v),
1178 ) {
1179 request = request.header(name, val);
1180 }
1181 }
1182 }
1183
1184 for (key, value) in &exchange.input.headers {
1185 if !key.starts_with("Camel")
1186 && let Some(val_str) = value.as_str()
1187 && let (Ok(name), Ok(val)) = (
1188 reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1189 reqwest::header::HeaderValue::from_str(val_str),
1190 )
1191 {
1192 request = request.header(name, val);
1193 }
1194 }
1195
1196 match exchange.input.body {
1197 Body::Stream(ref s) => {
1198 let mut stream_lock = s.stream.lock().await;
1199 if let Some(stream) = stream_lock.take() {
1200 request = request.body(reqwest::Body::wrap_stream(stream));
1201 } else {
1202 return Err(CamelError::AlreadyConsumed);
1203 }
1204 }
1205 _ => {
1206 let body = std::mem::take(&mut exchange.input.body);
1208 let bytes = body.into_bytes(config.max_body_size).await?;
1209 if !bytes.is_empty() {
1210 request = request.body(bytes);
1211 }
1212 }
1213 }
1214
1215 let response = request
1216 .send()
1217 .await
1218 .map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
1219
1220 let status_code = response.status().as_u16();
1221 let status_text = response
1222 .status()
1223 .canonical_reason()
1224 .unwrap_or("Unknown")
1225 .to_string();
1226
1227 for (key, value) in response.headers() {
1228 if let Ok(val_str) = value.to_str() {
1229 exchange
1230 .input
1231 .set_header(key.as_str(), serde_json::Value::String(val_str.to_string()));
1232 }
1233 }
1234
1235 exchange.input.set_header(
1236 "CamelHttpResponseCode",
1237 serde_json::Value::Number(status_code.into()),
1238 );
1239 exchange.input.set_header(
1240 "CamelHttpResponseText",
1241 serde_json::Value::String(status_text.clone()),
1242 );
1243
1244 let response_body = response.bytes().await.map_err(|e| {
1245 CamelError::ProcessorError(format!("Failed to read response body: {e}"))
1246 })?;
1247
1248 if config.throw_exception_on_failure
1249 && !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
1250 {
1251 return Err(CamelError::HttpOperationFailed {
1252 method: method_str,
1253 url,
1254 status_code,
1255 status_text,
1256 response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
1257 });
1258 }
1259
1260 if !response_body.is_empty() {
1261 exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
1262 }
1263
1264 debug!(
1265 correlation_id = %exchange.correlation_id(),
1266 status = status_code,
1267 url = %url,
1268 "HTTP response"
1269 );
1270 Ok(exchange)
1271 })
1272 }
1273}
1274
1275#[cfg(test)]
1276mod tests {
1277 use super::*;
1278 use camel_api::Message;
1279 use std::sync::Arc;
1280 use std::time::Duration;
1281
1282 fn test_producer_ctx() -> ProducerContext {
1283 ProducerContext::new()
1284 }
1285
1286 #[test]
1287 fn test_http_config_defaults() {
1288 let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
1289 assert_eq!(config.base_url, "http://localhost:8080/api");
1290 assert!(config.http_method.is_none());
1291 assert!(config.throw_exception_on_failure);
1292 assert_eq!(config.ok_status_code_range, (200, 299));
1293 assert!(config.response_timeout.is_none());
1294 }
1295
1296 #[test]
1297 fn test_http_config_scheme() {
1298 assert_eq!(HttpEndpointConfig::scheme(), "http");
1300 }
1301
1302 #[test]
1303 fn test_http_config_from_components() {
1304 let components = camel_endpoint::UriComponents {
1306 scheme: "https".to_string(),
1307 path: "//api.example.com/v1".to_string(),
1308 params: std::collections::HashMap::from([(
1309 "httpMethod".to_string(),
1310 "POST".to_string(),
1311 )]),
1312 };
1313 let config = HttpEndpointConfig::from_components(components).unwrap();
1314 assert_eq!(config.base_url, "https://api.example.com/v1");
1315 assert_eq!(config.http_method, Some("POST".to_string()));
1316 }
1317
1318 #[test]
1319 fn test_http_config_with_options() {
1320 let config = HttpEndpointConfig::from_uri(
1321 "https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
1322 ).unwrap();
1323 assert_eq!(config.base_url, "https://api.example.com/v1");
1324 assert_eq!(config.http_method, Some("PUT".to_string()));
1325 assert!(!config.throw_exception_on_failure);
1326 assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
1327 }
1328
1329 #[test]
1330 fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
1331 let config = HttpConfig::default()
1332 .with_response_timeout_ms(999)
1333 .with_allow_private_ips(true)
1334 .with_blocked_hosts(vec!["evil.com".to_string()])
1335 .with_max_body_size(12345);
1336 let endpoint =
1337 HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
1338 assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
1339 assert!(endpoint.allow_private_ips);
1340 assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
1341 assert_eq!(endpoint.max_body_size, 12345);
1342 }
1343
1344 #[test]
1345 fn test_from_uri_with_defaults_uri_overrides_config() {
1346 let config = HttpConfig::default()
1347 .with_response_timeout_ms(999)
1348 .with_allow_private_ips(true)
1349 .with_blocked_hosts(vec!["evil.com".to_string()])
1350 .with_max_body_size(12345);
1351 let endpoint = HttpEndpointConfig::from_uri_with_defaults(
1352 "http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
1353 &config,
1354 )
1355 .unwrap();
1356 assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
1357 assert!(!endpoint.allow_private_ips);
1358 assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
1359 assert_eq!(endpoint.max_body_size, 99);
1360 }
1361
1362 #[test]
1363 fn test_http_config_ok_status_range() {
1364 let config =
1365 HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
1366 assert_eq!(config.ok_status_code_range, (200, 204));
1367 }
1368
1369 #[test]
1370 fn test_http_config_wrong_scheme() {
1371 let result = HttpEndpointConfig::from_uri("file:/tmp");
1372 assert!(result.is_err());
1373 }
1374
1375 #[test]
1376 fn test_http_component_scheme() {
1377 let component = HttpComponent::new();
1378 assert_eq!(component.scheme(), "http");
1379 }
1380
1381 #[test]
1382 fn test_https_component_scheme() {
1383 let component = HttpsComponent::new();
1384 assert_eq!(component.scheme(), "https");
1385 }
1386
1387 #[test]
1388 fn test_http_endpoint_creates_consumer() {
1389 let component = HttpComponent::new();
1390 let endpoint = component
1391 .create_endpoint("http://0.0.0.0:19100/test")
1392 .unwrap();
1393 assert!(endpoint.create_consumer().is_ok());
1394 }
1395
1396 #[test]
1397 fn test_https_endpoint_creates_consumer() {
1398 let component = HttpsComponent::new();
1399 let endpoint = component
1400 .create_endpoint("https://0.0.0.0:8443/test")
1401 .unwrap();
1402 assert!(endpoint.create_consumer().is_ok());
1403 }
1404
1405 #[test]
1406 fn test_http_endpoint_creates_producer() {
1407 let ctx = test_producer_ctx();
1408 let component = HttpComponent::new();
1409 let endpoint = component.create_endpoint("http://localhost/api").unwrap();
1410 assert!(endpoint.create_producer(&ctx).is_ok());
1411 }
1412
1413 async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
1418 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1419 let addr = listener.local_addr().unwrap();
1420 let url = format!("http://127.0.0.1:{}", addr.port());
1421
1422 let handle = tokio::spawn(async move {
1423 loop {
1424 if let Ok((mut stream, _)) = listener.accept().await {
1425 tokio::spawn(async move {
1426 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1427 let mut buf = vec![0u8; 4096];
1428 let n = stream.read(&mut buf).await.unwrap_or(0);
1429 let request = String::from_utf8_lossy(&buf[..n]).to_string();
1430
1431 let method = request.split_whitespace().next().unwrap_or("GET");
1432
1433 let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
1434 let response = format!(
1435 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
1436 body.len(),
1437 body
1438 );
1439 let _ = stream.write_all(response.as_bytes()).await;
1440 });
1441 }
1442 }
1443 });
1444
1445 (url, handle)
1446 }
1447
1448 async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
1449 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1450 let addr = listener.local_addr().unwrap();
1451 let url = format!("http://127.0.0.1:{}", addr.port());
1452
1453 let handle = tokio::spawn(async move {
1454 loop {
1455 if let Ok((mut stream, _)) = listener.accept().await {
1456 let status = status;
1457 tokio::spawn(async move {
1458 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1459 let mut buf = vec![0u8; 4096];
1460 let _ = stream.read(&mut buf).await;
1461
1462 let status_text = match status {
1463 404 => "Not Found",
1464 500 => "Internal Server Error",
1465 _ => "Error",
1466 };
1467 let body = "error body";
1468 let response = format!(
1469 "HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
1470 status,
1471 status_text,
1472 body.len(),
1473 body
1474 );
1475 let _ = stream.write_all(response.as_bytes()).await;
1476 });
1477 }
1478 }
1479 });
1480
1481 (url, handle)
1482 }
1483
1484 #[tokio::test]
1485 async fn test_http_producer_get_request() {
1486 use tower::ServiceExt;
1487
1488 let (url, _handle) = start_test_server().await;
1489 let ctx = test_producer_ctx();
1490
1491 let component = HttpComponent::new();
1492 let endpoint = component
1493 .create_endpoint(&format!("{url}/api/test?allowPrivateIps=true"))
1494 .unwrap();
1495 let producer = endpoint.create_producer(&ctx).unwrap();
1496
1497 let exchange = Exchange::new(Message::default());
1498 let result = producer.oneshot(exchange).await.unwrap();
1499
1500 let status = result
1501 .input
1502 .header("CamelHttpResponseCode")
1503 .and_then(|v| v.as_u64())
1504 .unwrap();
1505 assert_eq!(status, 200);
1506
1507 assert!(!result.input.body.is_empty());
1508 }
1509
1510 #[tokio::test]
1511 async fn test_http_producer_post_with_body() {
1512 use tower::ServiceExt;
1513
1514 let (url, _handle) = start_test_server().await;
1515 let ctx = test_producer_ctx();
1516
1517 let component = HttpComponent::new();
1518 let endpoint = component
1519 .create_endpoint(&format!("{url}/api/data?allowPrivateIps=true"))
1520 .unwrap();
1521 let producer = endpoint.create_producer(&ctx).unwrap();
1522
1523 let exchange = Exchange::new(Message::new("request body"));
1524 let result = producer.oneshot(exchange).await.unwrap();
1525
1526 let status = result
1527 .input
1528 .header("CamelHttpResponseCode")
1529 .and_then(|v| v.as_u64())
1530 .unwrap();
1531 assert_eq!(status, 200);
1532 }
1533
1534 #[tokio::test]
1535 async fn test_http_producer_method_from_header() {
1536 use tower::ServiceExt;
1537
1538 let (url, _handle) = start_test_server().await;
1539 let ctx = test_producer_ctx();
1540
1541 let component = HttpComponent::new();
1542 let endpoint = component
1543 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1544 .unwrap();
1545 let producer = endpoint.create_producer(&ctx).unwrap();
1546
1547 let mut exchange = Exchange::new(Message::default());
1548 exchange.input.set_header(
1549 "CamelHttpMethod",
1550 serde_json::Value::String("DELETE".to_string()),
1551 );
1552
1553 let result = producer.oneshot(exchange).await.unwrap();
1554 let status = result
1555 .input
1556 .header("CamelHttpResponseCode")
1557 .and_then(|v| v.as_u64())
1558 .unwrap();
1559 assert_eq!(status, 200);
1560 }
1561
1562 #[tokio::test]
1563 async fn test_http_producer_forced_method() {
1564 use tower::ServiceExt;
1565
1566 let (url, _handle) = start_test_server().await;
1567 let ctx = test_producer_ctx();
1568
1569 let component = HttpComponent::new();
1570 let endpoint = component
1571 .create_endpoint(&format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"))
1572 .unwrap();
1573 let producer = endpoint.create_producer(&ctx).unwrap();
1574
1575 let exchange = Exchange::new(Message::default());
1576 let result = producer.oneshot(exchange).await.unwrap();
1577
1578 let status = result
1579 .input
1580 .header("CamelHttpResponseCode")
1581 .and_then(|v| v.as_u64())
1582 .unwrap();
1583 assert_eq!(status, 200);
1584 }
1585
1586 #[tokio::test]
1587 async fn test_http_producer_throw_exception_on_failure() {
1588 use tower::ServiceExt;
1589
1590 let (url, _handle) = start_status_server(404).await;
1591 let ctx = test_producer_ctx();
1592
1593 let component = HttpComponent::new();
1594 let endpoint = component
1595 .create_endpoint(&format!("{url}/not-found?allowPrivateIps=true"))
1596 .unwrap();
1597 let producer = endpoint.create_producer(&ctx).unwrap();
1598
1599 let exchange = Exchange::new(Message::default());
1600 let result = producer.oneshot(exchange).await;
1601 assert!(result.is_err());
1602
1603 match result.unwrap_err() {
1604 CamelError::HttpOperationFailed { status_code, .. } => {
1605 assert_eq!(status_code, 404);
1606 }
1607 e => panic!("Expected HttpOperationFailed, got: {e}"),
1608 }
1609 }
1610
1611 #[tokio::test]
1612 async fn test_http_producer_no_throw_on_failure() {
1613 use tower::ServiceExt;
1614
1615 let (url, _handle) = start_status_server(500).await;
1616 let ctx = test_producer_ctx();
1617
1618 let component = HttpComponent::new();
1619 let endpoint = component
1620 .create_endpoint(&format!(
1621 "{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"
1622 ))
1623 .unwrap();
1624 let producer = endpoint.create_producer(&ctx).unwrap();
1625
1626 let exchange = Exchange::new(Message::default());
1627 let result = producer.oneshot(exchange).await.unwrap();
1628
1629 let status = result
1630 .input
1631 .header("CamelHttpResponseCode")
1632 .and_then(|v| v.as_u64())
1633 .unwrap();
1634 assert_eq!(status, 500);
1635 }
1636
1637 #[tokio::test]
1638 async fn test_http_producer_uri_override() {
1639 use tower::ServiceExt;
1640
1641 let (url, _handle) = start_test_server().await;
1642 let ctx = test_producer_ctx();
1643
1644 let component = HttpComponent::new();
1645 let endpoint = component
1646 .create_endpoint("http://localhost:1/does-not-exist?allowPrivateIps=true")
1647 .unwrap();
1648 let producer = endpoint.create_producer(&ctx).unwrap();
1649
1650 let mut exchange = Exchange::new(Message::default());
1651 exchange.input.set_header(
1652 "CamelHttpUri",
1653 serde_json::Value::String(format!("{url}/api")),
1654 );
1655
1656 let result = producer.oneshot(exchange).await.unwrap();
1657 let status = result
1658 .input
1659 .header("CamelHttpResponseCode")
1660 .and_then(|v| v.as_u64())
1661 .unwrap();
1662 assert_eq!(status, 200);
1663 }
1664
1665 #[tokio::test]
1666 async fn test_http_producer_response_headers_mapped() {
1667 use tower::ServiceExt;
1668
1669 let (url, _handle) = start_test_server().await;
1670 let ctx = test_producer_ctx();
1671
1672 let component = HttpComponent::new();
1673 let endpoint = component
1674 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
1675 .unwrap();
1676 let producer = endpoint.create_producer(&ctx).unwrap();
1677
1678 let exchange = Exchange::new(Message::default());
1679 let result = producer.oneshot(exchange).await.unwrap();
1680
1681 assert!(
1682 result.input.header("content-type").is_some()
1683 || result.input.header("Content-Type").is_some()
1684 );
1685 assert!(result.input.header("CamelHttpResponseText").is_some());
1686 }
1687
1688 async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
1693 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1694 let addr = listener.local_addr().unwrap();
1695 let url = format!("http://127.0.0.1:{}", addr.port());
1696
1697 let handle = tokio::spawn(async move {
1698 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1699 loop {
1700 if let Ok((mut stream, _)) = listener.accept().await {
1701 tokio::spawn(async move {
1702 let mut buf = vec![0u8; 4096];
1703 let n = stream.read(&mut buf).await.unwrap_or(0);
1704 let request = String::from_utf8_lossy(&buf[..n]).to_string();
1705
1706 if request.contains("GET /final") {
1708 let body = r#"{"status":"final"}"#;
1709 let response = format!(
1710 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1711 body.len(),
1712 body
1713 );
1714 let _ = stream.write_all(response.as_bytes()).await;
1715 } else {
1716 let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
1718 let _ = stream.write_all(response.as_bytes()).await;
1719 }
1720 });
1721 }
1722 }
1723 });
1724
1725 (url, handle)
1726 }
1727
1728 #[tokio::test]
1729 async fn test_follow_redirects_false_does_not_follow() {
1730 use tower::ServiceExt;
1731
1732 let (url, _handle) = start_redirect_server().await;
1733 let ctx = test_producer_ctx();
1734
1735 let component =
1736 HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
1737 let endpoint = component
1738 .create_endpoint(&format!(
1739 "{url}?throwExceptionOnFailure=false&allowPrivateIps=true"
1740 ))
1741 .unwrap();
1742 let producer = endpoint.create_producer(&ctx).unwrap();
1743
1744 let exchange = Exchange::new(Message::default());
1745 let result = producer.oneshot(exchange).await.unwrap();
1746
1747 let status = result
1749 .input
1750 .header("CamelHttpResponseCode")
1751 .and_then(|v| v.as_u64())
1752 .unwrap();
1753 assert_eq!(
1754 status, 302,
1755 "Should NOT follow redirect when followRedirects=false"
1756 );
1757 }
1758
1759 #[tokio::test]
1760 async fn test_follow_redirects_true_follows_redirect() {
1761 use tower::ServiceExt;
1762
1763 let (url, _handle) = start_redirect_server().await;
1764 let ctx = test_producer_ctx();
1765
1766 let component =
1767 HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
1768 let endpoint = component
1769 .create_endpoint(&format!("{url}?allowPrivateIps=true"))
1770 .unwrap();
1771 let producer = endpoint.create_producer(&ctx).unwrap();
1772
1773 let exchange = Exchange::new(Message::default());
1774 let result = producer.oneshot(exchange).await.unwrap();
1775
1776 let status = result
1778 .input
1779 .header("CamelHttpResponseCode")
1780 .and_then(|v| v.as_u64())
1781 .unwrap();
1782 assert_eq!(
1783 status, 200,
1784 "Should follow redirect when followRedirects=true"
1785 );
1786 }
1787
1788 #[tokio::test]
1789 async fn test_query_params_forwarded_to_http_request() {
1790 use tower::ServiceExt;
1791
1792 let (url, _handle) = start_test_server().await;
1793 let ctx = test_producer_ctx();
1794
1795 let component = HttpComponent::new();
1796 let endpoint = component
1798 .create_endpoint(&format!(
1799 "{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"
1800 ))
1801 .unwrap();
1802 let producer = endpoint.create_producer(&ctx).unwrap();
1803
1804 let exchange = Exchange::new(Message::default());
1805 let result = producer.oneshot(exchange).await.unwrap();
1806
1807 let status = result
1810 .input
1811 .header("CamelHttpResponseCode")
1812 .and_then(|v| v.as_u64())
1813 .unwrap();
1814 assert_eq!(status, 200);
1815 }
1816
1817 #[tokio::test]
1818 async fn test_non_camel_query_params_are_forwarded() {
1819 let config = HttpEndpointConfig::from_uri(
1822 "http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
1823 )
1824 .unwrap();
1825
1826 assert!(
1828 config.query_params.contains_key("apiKey"),
1829 "apiKey should be preserved"
1830 );
1831 assert!(
1832 config.query_params.contains_key("token"),
1833 "token should be preserved"
1834 );
1835 assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
1836 assert_eq!(config.query_params.get("token").unwrap(), "abc456");
1837
1838 assert!(
1840 !config.query_params.contains_key("httpMethod"),
1841 "httpMethod should not be forwarded"
1842 );
1843 }
1844
1845 #[tokio::test]
1850 async fn test_http_producer_blocks_metadata_endpoint() {
1851 use tower::ServiceExt;
1852
1853 let ctx = test_producer_ctx();
1854 let component = HttpComponent::new();
1855 let endpoint = component
1856 .create_endpoint("http://example.com/api?allowPrivateIps=false")
1857 .unwrap();
1858 let producer = endpoint.create_producer(&ctx).unwrap();
1859
1860 let mut exchange = Exchange::new(Message::default());
1861 exchange.input.set_header(
1862 "CamelHttpUri",
1863 serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
1864 );
1865
1866 let result = producer.oneshot(exchange).await;
1867 assert!(result.is_err(), "Should block AWS metadata endpoint");
1868
1869 let err = result.unwrap_err();
1870 assert!(
1871 err.to_string().contains("Private IP"),
1872 "Error should mention private IP blocking, got: {}",
1873 err
1874 );
1875 }
1876
1877 #[test]
1878 fn test_ssrf_config_defaults() {
1879 let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
1880 assert!(
1881 !config.allow_private_ips,
1882 "Private IPs should be blocked by default"
1883 );
1884 assert!(
1885 config.blocked_hosts.is_empty(),
1886 "Blocked hosts should be empty by default"
1887 );
1888 }
1889
1890 #[test]
1891 fn test_ssrf_config_allow_private_ips() {
1892 let config =
1893 HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
1894 assert!(
1895 config.allow_private_ips,
1896 "Private IPs should be allowed when explicitly set"
1897 );
1898 }
1899
1900 #[test]
1901 fn test_ssrf_config_blocked_hosts() {
1902 let config = HttpEndpointConfig::from_uri(
1903 "http://example.com/api?blockedHosts=evil.com,malware.net",
1904 )
1905 .unwrap();
1906 assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
1907 }
1908
1909 #[tokio::test]
1910 async fn test_http_producer_blocks_localhost() {
1911 use tower::ServiceExt;
1912
1913 let ctx = test_producer_ctx();
1914 let component = HttpComponent::new();
1915 let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1916 let producer = endpoint.create_producer(&ctx).unwrap();
1917
1918 let mut exchange = Exchange::new(Message::default());
1919 exchange.input.set_header(
1920 "CamelHttpUri",
1921 serde_json::Value::String("http://localhost:8080/internal".to_string()),
1922 );
1923
1924 let result = producer.oneshot(exchange).await;
1925 assert!(result.is_err(), "Should block localhost");
1926 }
1927
1928 #[tokio::test]
1929 async fn test_http_producer_blocks_loopback_ip() {
1930 use tower::ServiceExt;
1931
1932 let ctx = test_producer_ctx();
1933 let component = HttpComponent::new();
1934 let endpoint = component.create_endpoint("http://example.com/api").unwrap();
1935 let producer = endpoint.create_producer(&ctx).unwrap();
1936
1937 let mut exchange = Exchange::new(Message::default());
1938 exchange.input.set_header(
1939 "CamelHttpUri",
1940 serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
1941 );
1942
1943 let result = producer.oneshot(exchange).await;
1944 assert!(result.is_err(), "Should block loopback IP");
1945 }
1946
1947 #[tokio::test]
1948 async fn test_http_producer_allows_private_ip_when_enabled() {
1949 use tower::ServiceExt;
1950
1951 let ctx = test_producer_ctx();
1952 let component = HttpComponent::new();
1953 let endpoint = component
1956 .create_endpoint("http://192.168.1.1/api?allowPrivateIps=true")
1957 .unwrap();
1958 let producer = endpoint.create_producer(&ctx).unwrap();
1959
1960 let exchange = Exchange::new(Message::default());
1961
1962 let result = producer.oneshot(exchange).await;
1965 if let Err(ref e) = result {
1967 let err_str = e.to_string();
1968 assert!(
1969 !err_str.contains("Private IP") && !err_str.contains("not allowed"),
1970 "Should not be SSRF error, got: {}",
1971 err_str
1972 );
1973 }
1974 }
1975
1976 #[test]
1981 fn test_http_server_config_parse() {
1982 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
1983 assert_eq!(cfg.host, "0.0.0.0");
1984 assert_eq!(cfg.port, 8080);
1985 assert_eq!(cfg.path, "/orders");
1986 }
1987
1988 #[test]
1989 fn test_http_server_config_scheme() {
1990 assert_eq!(HttpServerConfig::scheme(), "http");
1992 }
1993
1994 #[test]
1995 fn test_http_server_config_from_components() {
1996 let components = camel_endpoint::UriComponents {
1998 scheme: "https".to_string(),
1999 path: "//0.0.0.0:8443/api".to_string(),
2000 params: std::collections::HashMap::from([(
2001 "maxRequestBody".to_string(),
2002 "5242880".to_string(),
2003 )]),
2004 };
2005 let cfg = HttpServerConfig::from_components(components).unwrap();
2006 assert_eq!(cfg.host, "0.0.0.0");
2007 assert_eq!(cfg.port, 8443);
2008 assert_eq!(cfg.path, "/api");
2009 assert_eq!(cfg.max_request_body, 5242880);
2010 }
2011
2012 #[test]
2013 fn test_http_server_config_default_path() {
2014 let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
2015 assert_eq!(cfg.path, "/");
2016 }
2017
2018 #[test]
2019 fn test_http_server_config_wrong_scheme() {
2020 assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
2021 }
2022
2023 #[test]
2024 fn test_http_server_config_invalid_port() {
2025 assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
2026 }
2027
2028 #[test]
2029 fn test_http_server_config_default_port_by_scheme() {
2030 let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
2032 assert_eq!(cfg_http.port, 80);
2033
2034 let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
2036 assert_eq!(cfg_https.port, 443);
2037 }
2038
2039 #[test]
2040 fn test_request_envelope_and_reply_are_send() {
2041 fn assert_send<T: Send>() {}
2042 assert_send::<RequestEnvelope>();
2043 assert_send::<HttpReply>();
2044 }
2045
2046 #[test]
2051 fn test_server_registry_global_is_singleton() {
2052 let r1 = ServerRegistry::global();
2053 let r2 = ServerRegistry::global();
2054 assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
2055 }
2056
2057 #[tokio::test]
2058 async fn test_concurrent_get_or_spawn_returns_same_dispatch() {
2059 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2060 let port = listener.local_addr().unwrap().port();
2061 drop(listener);
2062
2063 let results: Arc<std::sync::Mutex<Vec<DispatchTable>>> =
2064 Arc::new(std::sync::Mutex::new(Vec::new()));
2065
2066 let mut handles = Vec::new();
2067 for _ in 0..4 {
2068 let results = results.clone();
2069 handles.push(tokio::spawn(async move {
2070 let dispatch = ServerRegistry::global()
2071 .get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024)
2072 .await
2073 .unwrap();
2074 results.lock().unwrap().push(dispatch);
2075 }));
2076 }
2077
2078 for h in handles {
2079 h.await.unwrap();
2080 }
2081
2082 let dispatches = results.lock().unwrap();
2083 assert_eq!(dispatches.len(), 4);
2084 for i in 1..dispatches.len() {
2085 assert!(
2086 Arc::ptr_eq(&dispatches[0], &dispatches[i]),
2087 "all concurrent callers should get the same dispatch table"
2088 );
2089 }
2090 }
2091
2092 #[tokio::test]
2097 async fn test_dispatch_handler_returns_404_for_unknown_path() {
2098 let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
2099 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2101 let port = listener.local_addr().unwrap().port();
2102 tokio::spawn(run_axum_server(listener, dispatch, 2 * 1024 * 1024));
2103
2104 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2106
2107 let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
2108 .await
2109 .unwrap();
2110 assert_eq!(resp.status().as_u16(), 404);
2111 }
2112
2113 #[tokio::test]
2118 async fn test_http_consumer_start_registers_path() {
2119 use camel_component::ConsumerContext;
2120
2121 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2123 let port = listener.local_addr().unwrap().port();
2124 drop(listener); let consumer_cfg = HttpServerConfig {
2127 host: "127.0.0.1".to_string(),
2128 port,
2129 path: "/ping".to_string(),
2130 max_request_body: 2 * 1024 * 1024,
2131 max_response_body: 10 * 1024 * 1024,
2132 };
2133 let mut consumer = HttpConsumer::new(consumer_cfg);
2134
2135 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component::ExchangeEnvelope>(16);
2136 let token = tokio_util::sync::CancellationToken::new();
2137 let ctx = ConsumerContext::new(tx, token.clone());
2138
2139 tokio::spawn(async move {
2140 consumer.start(ctx).await.unwrap();
2141 });
2142
2143 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2144
2145 let client = reqwest::Client::new();
2146 let resp_future = client
2147 .post(format!("http://127.0.0.1:{port}/ping"))
2148 .body("hello world")
2149 .send();
2150
2151 let (http_result, _) = tokio::join!(resp_future, async {
2152 if let Some(mut envelope) = rx.recv().await {
2153 envelope.exchange.input.set_header(
2155 "CamelHttpResponseCode",
2156 serde_json::Value::Number(201.into()),
2157 );
2158 if let Some(reply_tx) = envelope.reply_tx {
2159 let _ = reply_tx.send(Ok(envelope.exchange));
2160 }
2161 }
2162 });
2163
2164 let resp = http_result.unwrap();
2165 assert_eq!(resp.status().as_u16(), 201);
2166
2167 token.cancel();
2168 }
2169
2170 #[tokio::test]
2175 async fn test_integration_single_consumer_round_trip() {
2176 use camel_component::{ConsumerContext, ExchangeEnvelope};
2177
2178 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2180 let port = listener.local_addr().unwrap().port();
2181 drop(listener); let component = HttpComponent::new();
2184 let endpoint = component
2185 .create_endpoint(&format!("http://127.0.0.1:{port}/echo"))
2186 .unwrap();
2187 let mut consumer = endpoint.create_consumer().unwrap();
2188
2189 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2190 let token = tokio_util::sync::CancellationToken::new();
2191 let ctx = ConsumerContext::new(tx, token.clone());
2192
2193 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2194 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2195
2196 let client = reqwest::Client::new();
2197 let send_fut = client
2198 .post(format!("http://127.0.0.1:{port}/echo"))
2199 .header("Content-Type", "text/plain")
2200 .body("ping")
2201 .send();
2202
2203 let (http_result, _) = tokio::join!(send_fut, async {
2204 if let Some(mut envelope) = rx.recv().await {
2205 assert_eq!(
2206 envelope.exchange.input.header("CamelHttpMethod"),
2207 Some(&serde_json::Value::String("POST".into()))
2208 );
2209 assert_eq!(
2210 envelope.exchange.input.header("CamelHttpPath"),
2211 Some(&serde_json::Value::String("/echo".into()))
2212 );
2213 envelope.exchange.input.body = camel_api::body::Body::Text("pong".to_string());
2214 if let Some(reply_tx) = envelope.reply_tx {
2215 let _ = reply_tx.send(Ok(envelope.exchange));
2216 }
2217 }
2218 });
2219
2220 let resp = http_result.unwrap();
2221 assert_eq!(resp.status().as_u16(), 200);
2222 let body = resp.text().await.unwrap();
2223 assert_eq!(body, "pong");
2224
2225 token.cancel();
2226 }
2227
2228 #[tokio::test]
2229 async fn test_integration_two_consumers_shared_port() {
2230 use camel_component::{ConsumerContext, ExchangeEnvelope};
2231
2232 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2234 let port = listener.local_addr().unwrap().port();
2235 drop(listener);
2236
2237 let component = HttpComponent::new();
2238
2239 let endpoint_a = component
2241 .create_endpoint(&format!("http://127.0.0.1:{port}/hello"))
2242 .unwrap();
2243 let mut consumer_a = endpoint_a.create_consumer().unwrap();
2244
2245 let endpoint_b = component
2247 .create_endpoint(&format!("http://127.0.0.1:{port}/world"))
2248 .unwrap();
2249 let mut consumer_b = endpoint_b.create_consumer().unwrap();
2250
2251 let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2252 let token_a = tokio_util::sync::CancellationToken::new();
2253 let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
2254
2255 let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2256 let token_b = tokio_util::sync::CancellationToken::new();
2257 let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
2258
2259 tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
2260 tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
2261 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2262
2263 let client = reqwest::Client::new();
2264
2265 let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
2267 let (resp_hello, _) = tokio::join!(fut_hello, async {
2268 if let Some(mut envelope) = rx_a.recv().await {
2269 envelope.exchange.input.body =
2270 camel_api::body::Body::Text("hello-response".to_string());
2271 if let Some(reply_tx) = envelope.reply_tx {
2272 let _ = reply_tx.send(Ok(envelope.exchange));
2273 }
2274 }
2275 });
2276
2277 let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
2279 let (resp_world, _) = tokio::join!(fut_world, async {
2280 if let Some(mut envelope) = rx_b.recv().await {
2281 envelope.exchange.input.body =
2282 camel_api::body::Body::Text("world-response".to_string());
2283 if let Some(reply_tx) = envelope.reply_tx {
2284 let _ = reply_tx.send(Ok(envelope.exchange));
2285 }
2286 }
2287 });
2288
2289 let body_a = resp_hello.unwrap().text().await.unwrap();
2290 let body_b = resp_world.unwrap().text().await.unwrap();
2291
2292 assert_eq!(body_a, "hello-response");
2293 assert_eq!(body_b, "world-response");
2294
2295 token_a.cancel();
2296 token_b.cancel();
2297 }
2298
2299 #[tokio::test]
2300 async fn test_integration_unregistered_path_returns_404() {
2301 use camel_component::{ConsumerContext, ExchangeEnvelope};
2302
2303 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2305 let port = listener.local_addr().unwrap().port();
2306 drop(listener);
2307
2308 let component = HttpComponent::new();
2309 let endpoint = component
2310 .create_endpoint(&format!("http://127.0.0.1:{port}/registered"))
2311 .unwrap();
2312 let mut consumer = endpoint.create_consumer().unwrap();
2313
2314 let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2315 let token = tokio_util::sync::CancellationToken::new();
2316 let ctx = ConsumerContext::new(tx, token.clone());
2317
2318 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2319 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2320
2321 let client = reqwest::Client::new();
2322 let resp = client
2323 .get(format!("http://127.0.0.1:{port}/not-there"))
2324 .send()
2325 .await
2326 .unwrap();
2327 assert_eq!(resp.status().as_u16(), 404);
2328
2329 token.cancel();
2330 }
2331
2332 #[test]
2333 fn test_http_consumer_declares_concurrent() {
2334 use camel_component::ConcurrencyModel;
2335
2336 let config = HttpServerConfig {
2337 host: "127.0.0.1".to_string(),
2338 port: 19999,
2339 path: "/test".to_string(),
2340 max_request_body: 2 * 1024 * 1024,
2341 max_response_body: 10 * 1024 * 1024,
2342 };
2343 let consumer = HttpConsumer::new(config);
2344 assert_eq!(
2345 consumer.concurrency_model(),
2346 ConcurrencyModel::Concurrent { max: None }
2347 );
2348 }
2349
2350 #[tokio::test]
2355 async fn test_http_reply_body_stream_variant_exists() {
2356 use bytes::Bytes;
2357 use camel_api::CamelError;
2358 use futures::stream;
2359
2360 let chunks: Vec<Result<Bytes, CamelError>> =
2361 vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
2362 let stream = Box::pin(stream::iter(chunks));
2363 let reply_body = HttpReplyBody::Stream(stream);
2364 match reply_body {
2366 HttpReplyBody::Stream(_) => {}
2367 HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
2368 }
2369 }
2370
2371 #[cfg(feature = "otel")]
2376 mod otel_tests {
2377 use super::*;
2378 use camel_api::Message;
2379 use tower::ServiceExt;
2380
2381 #[tokio::test]
2382 async fn test_producer_injects_traceparent_header() {
2383 let (url, _handle) = start_test_server_with_header_capture().await;
2384 let ctx = test_producer_ctx();
2385
2386 let component = HttpComponent::new();
2387 let endpoint = component
2388 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
2389 .unwrap();
2390 let producer = endpoint.create_producer(&ctx).unwrap();
2391
2392 let mut exchange = Exchange::new(Message::default());
2394 let mut headers = std::collections::HashMap::new();
2395 headers.insert(
2396 "traceparent".to_string(),
2397 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
2398 );
2399 camel_otel::extract_into_exchange(&mut exchange, &headers);
2400
2401 let result = producer.oneshot(exchange).await.unwrap();
2402
2403 let status = result
2405 .input
2406 .header("CamelHttpResponseCode")
2407 .and_then(|v| v.as_u64())
2408 .unwrap();
2409 assert_eq!(status, 200);
2410
2411 let traceparent = result.input.header("x-received-traceparent");
2413 assert!(
2414 traceparent.is_some(),
2415 "traceparent header should have been sent"
2416 );
2417
2418 let traceparent_str = traceparent.unwrap().as_str().unwrap();
2419 let parts: Vec<&str> = traceparent_str.split('-').collect();
2421 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2422 assert_eq!(parts[0], "00", "version should be 00");
2423 assert_eq!(
2424 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2425 "trace-id should match"
2426 );
2427 assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
2428 assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
2429 }
2430
2431 #[tokio::test]
2432 async fn test_consumer_extracts_traceparent_header() {
2433 use camel_component::{ConsumerContext, ExchangeEnvelope};
2434
2435 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2437 let port = listener.local_addr().unwrap().port();
2438 drop(listener);
2439
2440 let component = HttpComponent::new();
2441 let endpoint = component
2442 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"))
2443 .unwrap();
2444 let mut consumer = endpoint.create_consumer().unwrap();
2445
2446 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2447 let token = tokio_util::sync::CancellationToken::new();
2448 let ctx = ConsumerContext::new(tx, token.clone());
2449
2450 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2451 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2452
2453 let client = reqwest::Client::new();
2455 let send_fut = client
2456 .post(format!("http://127.0.0.1:{port}/trace"))
2457 .header(
2458 "traceparent",
2459 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2460 )
2461 .body("test")
2462 .send();
2463
2464 let (http_result, _) = tokio::join!(send_fut, async {
2465 if let Some(envelope) = rx.recv().await {
2466 let mut injected_headers = std::collections::HashMap::new();
2469 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2470
2471 assert!(
2472 injected_headers.contains_key("traceparent"),
2473 "Exchange should have traceparent after extraction"
2474 );
2475
2476 let traceparent = injected_headers.get("traceparent").unwrap();
2477 let parts: Vec<&str> = traceparent.split('-').collect();
2478 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2479 assert_eq!(
2480 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2481 "Trace ID should match the original traceparent header"
2482 );
2483
2484 if let Some(reply_tx) = envelope.reply_tx {
2485 let _ = reply_tx.send(Ok(envelope.exchange));
2486 }
2487 }
2488 });
2489
2490 let resp = http_result.unwrap();
2491 assert_eq!(resp.status().as_u16(), 200);
2492
2493 token.cancel();
2494 }
2495
2496 #[tokio::test]
2497 async fn test_consumer_extracts_mixed_case_traceparent_header() {
2498 use camel_component::{ConsumerContext, ExchangeEnvelope};
2499
2500 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2502 let port = listener.local_addr().unwrap().port();
2503 drop(listener);
2504
2505 let component = HttpComponent::new();
2506 let endpoint = component
2507 .create_endpoint(&format!("http://127.0.0.1:{port}/trace"))
2508 .unwrap();
2509 let mut consumer = endpoint.create_consumer().unwrap();
2510
2511 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2512 let token = tokio_util::sync::CancellationToken::new();
2513 let ctx = ConsumerContext::new(tx, token.clone());
2514
2515 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2516 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2517
2518 let client = reqwest::Client::new();
2520 let send_fut = client
2521 .post(format!("http://127.0.0.1:{port}/trace"))
2522 .header(
2523 "TraceParent",
2524 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2525 )
2526 .body("test")
2527 .send();
2528
2529 let (http_result, _) = tokio::join!(send_fut, async {
2530 if let Some(envelope) = rx.recv().await {
2531 let mut injected_headers = HashMap::new();
2534 camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
2535
2536 assert!(
2537 injected_headers.contains_key("traceparent"),
2538 "Exchange should have traceparent after extraction from mixed-case header"
2539 );
2540
2541 let traceparent = injected_headers.get("traceparent").unwrap();
2542 let parts: Vec<&str> = traceparent.split('-').collect();
2543 assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
2544 assert_eq!(
2545 parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
2546 "Trace ID should match the original mixed-case TraceParent header"
2547 );
2548
2549 if let Some(reply_tx) = envelope.reply_tx {
2550 let _ = reply_tx.send(Ok(envelope.exchange));
2551 }
2552 }
2553 });
2554
2555 let resp = http_result.unwrap();
2556 assert_eq!(resp.status().as_u16(), 200);
2557
2558 token.cancel();
2559 }
2560
2561 #[tokio::test]
2562 async fn test_producer_no_trace_context_no_crash() {
2563 let (url, _handle) = start_test_server().await;
2564 let ctx = test_producer_ctx();
2565
2566 let component = HttpComponent::new();
2567 let endpoint = component
2568 .create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
2569 .unwrap();
2570 let producer = endpoint.create_producer(&ctx).unwrap();
2571
2572 let exchange = Exchange::new(Message::default());
2574
2575 let result = producer.oneshot(exchange).await.unwrap();
2577
2578 let status = result
2580 .input
2581 .header("CamelHttpResponseCode")
2582 .and_then(|v| v.as_u64())
2583 .unwrap();
2584 assert_eq!(status, 200);
2585 }
2586
2587 async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
2589 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2590 let addr = listener.local_addr().unwrap();
2591 let url = format!("http://127.0.0.1:{}", addr.port());
2592
2593 let handle = tokio::spawn(async move {
2594 loop {
2595 if let Ok((mut stream, _)) = listener.accept().await {
2596 tokio::spawn(async move {
2597 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2598 let mut buf = vec![0u8; 8192];
2599 let n = stream.read(&mut buf).await.unwrap_or(0);
2600 let request = String::from_utf8_lossy(&buf[..n]).to_string();
2601
2602 let traceparent = request
2604 .lines()
2605 .find(|line| line.to_lowercase().starts_with("traceparent:"))
2606 .map(|line| {
2607 line.split(':')
2608 .nth(1)
2609 .map(|s| s.trim().to_string())
2610 .unwrap_or_default()
2611 })
2612 .unwrap_or_default();
2613
2614 let body =
2615 format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
2616 let response = format!(
2617 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
2618 body.len(),
2619 traceparent,
2620 body
2621 );
2622 let _ = stream.write_all(response.as_bytes()).await;
2623 });
2624 }
2625 }
2626 });
2627
2628 (url, handle)
2629 }
2630 }
2631
2632 #[tokio::test]
2641 async fn test_request_body_arrives_as_stream() {
2642 use camel_api::body::Body;
2643 use camel_component::{ConsumerContext, ExchangeEnvelope};
2644
2645 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2646 let port = listener.local_addr().unwrap().port();
2647 drop(listener);
2648
2649 let component = HttpComponent::new();
2650 let endpoint = component
2651 .create_endpoint(&format!("http://127.0.0.1:{port}/upload"))
2652 .unwrap();
2653 let mut consumer = endpoint.create_consumer().unwrap();
2654
2655 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2656 let token = tokio_util::sync::CancellationToken::new();
2657 let ctx = ConsumerContext::new(tx, token.clone());
2658
2659 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2660 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2661
2662 let client = reqwest::Client::new();
2663 let send_fut = client
2664 .post(format!("http://127.0.0.1:{port}/upload"))
2665 .body("hello streaming world")
2666 .send();
2667
2668 let (http_result, _) = tokio::join!(send_fut, async {
2669 if let Some(mut envelope) = rx.recv().await {
2670 assert!(
2672 matches!(envelope.exchange.input.body, Body::Stream(_)),
2673 "expected Body::Stream, got discriminant {:?}",
2674 std::mem::discriminant(&envelope.exchange.input.body)
2675 );
2676 let bytes = envelope
2678 .exchange
2679 .input
2680 .body
2681 .into_bytes(1024 * 1024)
2682 .await
2683 .unwrap();
2684 assert_eq!(&bytes[..], b"hello streaming world");
2685
2686 envelope.exchange.input.body = camel_api::body::Body::Empty;
2687 if let Some(reply_tx) = envelope.reply_tx {
2688 let _ = reply_tx.send(Ok(envelope.exchange));
2689 }
2690 }
2691 });
2692
2693 let resp = http_result.unwrap();
2694 assert_eq!(resp.status().as_u16(), 200);
2695
2696 token.cancel();
2697 }
2698
2699 #[tokio::test]
2704 async fn test_streaming_response_chunked() {
2705 use bytes::Bytes;
2706 use camel_api::CamelError;
2707 use camel_api::body::{Body, StreamBody, StreamMetadata};
2708 use camel_component::{ConsumerContext, ExchangeEnvelope};
2709 use futures::stream;
2710 use std::sync::Arc;
2711 use tokio::sync::Mutex;
2712
2713 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2714 let port = listener.local_addr().unwrap().port();
2715 drop(listener);
2716
2717 let component = HttpComponent::new();
2718 let endpoint = component
2719 .create_endpoint(&format!("http://127.0.0.1:{port}/stream"))
2720 .unwrap();
2721 let mut consumer = endpoint.create_consumer().unwrap();
2722
2723 let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
2724 let token = tokio_util::sync::CancellationToken::new();
2725 let ctx = ConsumerContext::new(tx, token.clone());
2726
2727 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2728 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2729
2730 let client = reqwest::Client::new();
2731 let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
2732
2733 let (http_result, _) = tokio::join!(send_fut, async {
2734 if let Some(mut envelope) = rx.recv().await {
2735 let chunks: Vec<Result<Bytes, CamelError>> =
2737 vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
2738 let stream = Box::pin(stream::iter(chunks));
2739 envelope.exchange.input.body = Body::Stream(StreamBody {
2740 stream: Arc::new(Mutex::new(Some(stream))),
2741 metadata: StreamMetadata::default(),
2742 });
2743 if let Some(reply_tx) = envelope.reply_tx {
2744 let _ = reply_tx.send(Ok(envelope.exchange));
2745 }
2746 }
2747 });
2748
2749 let resp = http_result.unwrap();
2750 assert_eq!(resp.status().as_u16(), 200);
2751 let body = resp.text().await.unwrap();
2752 assert_eq!(body, "chunk1chunk2");
2753
2754 token.cancel();
2755 }
2756
2757 #[tokio::test]
2762 async fn test_413_when_content_length_exceeds_limit() {
2763 use camel_component::ConsumerContext;
2764
2765 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2766 let port = listener.local_addr().unwrap().port();
2767 drop(listener);
2768
2769 let component = HttpComponent::new();
2771 let endpoint = component
2772 .create_endpoint(&format!(
2773 "http://127.0.0.1:{port}/upload?maxRequestBody=100"
2774 ))
2775 .unwrap();
2776 let mut consumer = endpoint.create_consumer().unwrap();
2777
2778 let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component::ExchangeEnvelope>(16);
2779 let token = tokio_util::sync::CancellationToken::new();
2780 let ctx = ConsumerContext::new(tx, token.clone());
2781
2782 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2783 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2784
2785 let client = reqwest::Client::new();
2786 let resp = client
2787 .post(format!("http://127.0.0.1:{port}/upload"))
2788 .header("Content-Length", "1000") .body("x".repeat(1000))
2790 .send()
2791 .await
2792 .unwrap();
2793
2794 assert_eq!(resp.status().as_u16(), 413);
2795
2796 token.cancel();
2797 }
2798
2799 #[tokio::test]
2803 async fn test_chunked_upload_without_content_length_bypasses_limit() {
2804 use bytes::Bytes;
2805 use camel_api::body::Body;
2806 use camel_component::ConsumerContext;
2807 use futures::stream;
2808
2809 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2810 let port = listener.local_addr().unwrap().port();
2811 drop(listener);
2812
2813 let component = HttpComponent::new();
2815 let endpoint = component
2816 .create_endpoint(&format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"))
2817 .unwrap();
2818 let mut consumer = endpoint.create_consumer().unwrap();
2819
2820 let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component::ExchangeEnvelope>(16);
2821 let token = tokio_util::sync::CancellationToken::new();
2822 let ctx = ConsumerContext::new(tx, token.clone());
2823
2824 tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
2825 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2826
2827 let client = reqwest::Client::new();
2828
2829 let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
2833 Ok(Bytes::from("y".repeat(50))),
2834 Ok(Bytes::from("y".repeat(50))),
2835 ];
2836 let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
2837 let send_fut = client
2838 .post(format!("http://127.0.0.1:{port}/upload"))
2839 .body(stream_body)
2840 .send();
2841
2842 let consumer_fut = async {
2843 match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
2845 Ok(Some(mut envelope)) => {
2846 assert!(
2847 matches!(envelope.exchange.input.body, Body::Stream(_)),
2848 "expected Body::Stream"
2849 );
2850 envelope.exchange.input.body = camel_api::body::Body::Empty;
2851 if let Some(reply_tx) = envelope.reply_tx {
2852 let _ = reply_tx.send(Ok(envelope.exchange));
2853 }
2854 }
2855 Ok(None) => panic!("consumer channel closed unexpectedly"),
2856 Err(_) => {
2857 }
2860 }
2861 };
2862
2863 let (http_result, _) = tokio::join!(send_fut, consumer_fut);
2864
2865 let resp = http_result.unwrap();
2866 assert_ne!(
2868 resp.status().as_u16(),
2869 413,
2870 "chunked upload must not be rejected by maxRequestBody"
2871 );
2872 assert_eq!(resp.status().as_u16(), 200);
2873
2874 token.cancel();
2875 }
2876}