1use crate::config::{
2 HttpClientConfig, RedirectConfig, RetryConfig, TlsRootConfig, TransportSecurity,
3};
4use crate::error::HttpError;
5use crate::layers::{OtelLayer, RetryLayer, SecureRedirectPolicy, UserAgentLayer};
6use crate::response::ResponseBody;
7use crate::tls;
8use bytes::Bytes;
9use http::Response;
10use http_body_util::{BodyExt, Full};
11use hyper_rustls::HttpsConnector;
12use hyper_util::client::legacy::Client;
13use hyper_util::client::legacy::connect::HttpConnector;
14use hyper_util::rt::{TokioExecutor, TokioTimer};
15use std::time::Duration;
16use tower::buffer::Buffer;
17use tower::limit::ConcurrencyLimitLayer;
18use tower::load_shed::LoadShedLayer;
19use tower::timeout::TimeoutLayer;
20use tower::util::BoxCloneService;
21use tower::{ServiceBuilder, ServiceExt};
22use tower_http::decompression::DecompressionLayer;
23use tower_http::follow_redirect::FollowRedirectLayer;
24
25type InnerService =
27 BoxCloneService<http::Request<Full<Bytes>>, http::Response<ResponseBody>, HttpError>;
28
29pub struct HttpClientBuilder {
31 config: HttpClientConfig,
32 auth_layer: Option<Box<dyn FnOnce(InnerService) -> InnerService + Send>>,
33}
34
35impl HttpClientBuilder {
36 #[must_use]
38 pub fn new() -> Self {
39 Self {
40 config: HttpClientConfig::default(),
41 auth_layer: None,
42 }
43 }
44
45 #[must_use]
47 pub fn with_config(config: HttpClientConfig) -> Self {
48 Self {
49 config,
50 auth_layer: None,
51 }
52 }
53
54 #[must_use]
59 pub fn timeout(mut self, timeout: Duration) -> Self {
60 self.config.request_timeout = timeout;
61 self
62 }
63
64 #[must_use]
70 pub fn total_timeout(mut self, timeout: Duration) -> Self {
71 self.config.total_timeout = Some(timeout);
72 self
73 }
74
75 #[must_use]
77 pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
78 self.config.user_agent = user_agent.into();
79 self
80 }
81
82 #[must_use]
84 pub fn retry(mut self, retry: Option<RetryConfig>) -> Self {
85 self.config.retry = retry;
86 self
87 }
88
89 #[must_use]
91 pub fn max_body_size(mut self, size: usize) -> Self {
92 self.config.max_body_size = size;
93 self
94 }
95
96 #[must_use]
100 pub fn transport(mut self, transport: TransportSecurity) -> Self {
101 self.config.transport = transport;
102 self
103 }
104
105 #[must_use]
111 pub fn deny_insecure_http(mut self) -> Self {
112 tracing::debug!(
113 target: "modkit_http::security",
114 "deny_insecure_http() called - enforcing TLS for all connections"
115 );
116 self.config.transport = TransportSecurity::TlsOnly;
117 self
118 }
119
120 #[must_use]
125 pub fn with_otel(mut self) -> Self {
126 self.config.otel = true;
127 self
128 }
129
130 #[must_use]
138 pub fn with_auth_layer(
139 mut self,
140 wrap: impl FnOnce(InnerService) -> InnerService + Send + 'static,
141 ) -> Self {
142 self.auth_layer = Some(Box::new(wrap));
143 self
144 }
145
146 #[must_use]
155 pub fn buffer_capacity(mut self, capacity: usize) -> Self {
156 self.config.buffer_capacity = capacity.max(1);
158 self
159 }
160
161 #[must_use]
166 pub fn max_redirects(mut self, max_redirects: usize) -> Self {
167 self.config.redirect.max_redirects = max_redirects;
168 self
169 }
170
171 #[must_use]
176 pub fn no_redirects(mut self) -> Self {
177 self.config.redirect = RedirectConfig::disabled();
178 self
179 }
180
181 #[must_use]
196 pub fn redirect(mut self, config: RedirectConfig) -> Self {
197 self.config.redirect = config;
198 self
199 }
200
201 #[must_use]
208 pub fn pool_idle_timeout(mut self, timeout: Option<Duration>) -> Self {
209 self.config.pool_idle_timeout = timeout;
210 self
211 }
212
213 #[must_use]
221 pub fn pool_max_idle_per_host(mut self, max: usize) -> Self {
222 self.config.pool_max_idle_per_host = max;
223 self
224 }
225
226 pub fn build(self) -> Result<crate::HttpClient, HttpError> {
231 let timeout = self.config.request_timeout;
232 let total_timeout = self.config.total_timeout;
233
234 let https = build_https_connector(self.config.tls_roots, self.config.transport)?;
236
237 let mut client_builder = Client::builder(TokioExecutor::new());
239
240 client_builder
243 .pool_timer(TokioTimer::new())
244 .pool_max_idle_per_host(self.config.pool_max_idle_per_host)
245 .http2_only(false); if let Some(idle_timeout) = self.config.pool_idle_timeout {
249 client_builder.pool_idle_timeout(idle_timeout);
250 }
251
252 let hyper_client = client_builder.build::<_, Full<Bytes>>(https);
253
254 let ua_layer = UserAgentLayer::try_new(&self.config.user_agent)?;
256
257 let redirect_policy = SecureRedirectPolicy::new(self.config.redirect.clone());
289
290 let service = ServiceBuilder::new()
292 .layer(TimeoutLayer::new(timeout))
293 .layer(ua_layer)
294 .layer(DecompressionLayer::new())
295 .layer(FollowRedirectLayer::with_policy(redirect_policy))
296 .service(hyper_client);
297
298 let service = service.map_response(map_decompression_response);
304
305 let service = service.map_err(move |e: tower::BoxError| map_tower_error(e, timeout));
307
308 let mut boxed_service = service.boxed_clone();
310
311 if let Some(wrap) = self.auth_layer {
314 boxed_service = wrap(boxed_service);
315 }
316
317 if let Some(ref retry_config) = self.config.retry {
329 let retry_layer = RetryLayer::with_total_timeout(retry_config.clone(), total_timeout);
330 let retry_service = ServiceBuilder::new()
331 .layer(retry_layer)
332 .service(boxed_service);
333 boxed_service = retry_service.boxed_clone();
334 }
335
336 if let Some(rate_limit) = self.config.rate_limit
340 && rate_limit.max_concurrent_requests < usize::MAX
341 {
342 let limited_service = ServiceBuilder::new()
343 .layer(LoadShedLayer::new())
344 .layer(ConcurrencyLimitLayer::new(
345 rate_limit.max_concurrent_requests,
346 ))
347 .service(boxed_service);
348 let limited_service = limited_service.map_err(map_load_shed_error);
350 boxed_service = limited_service.boxed_clone();
351 }
352
353 if self.config.otel {
357 let otel_service = ServiceBuilder::new()
358 .layer(OtelLayer::new())
359 .service(boxed_service);
360 boxed_service = otel_service.boxed_clone();
361 }
362
363 let buffer_capacity = self.config.buffer_capacity.max(1);
367 let buffered_service: crate::client::BufferedService =
368 Buffer::new(boxed_service, buffer_capacity);
369
370 Ok(crate::HttpClient {
371 service: buffered_service,
372 max_body_size: self.config.max_body_size,
373 transport_security: self.config.transport,
374 })
375 }
376}
377
378#[cfg(test)]
379impl HttpClientBuilder {
380 fn build_with_inner_service(self, inner: InnerService) -> crate::HttpClient {
388 let mut boxed_service = inner;
389
390 if let Some(ref retry_config) = self.config.retry {
391 let retry_layer =
392 RetryLayer::with_total_timeout(retry_config.clone(), self.config.total_timeout);
393 let retry_service = ServiceBuilder::new()
394 .layer(retry_layer)
395 .service(boxed_service);
396 boxed_service = retry_service.boxed_clone();
397 }
398
399 if let Some(rate_limit) = self.config.rate_limit
400 && rate_limit.max_concurrent_requests < usize::MAX
401 {
402 let limited_service = ServiceBuilder::new()
403 .layer(LoadShedLayer::new())
404 .layer(ConcurrencyLimitLayer::new(
405 rate_limit.max_concurrent_requests,
406 ))
407 .service(boxed_service);
408 let limited_service = limited_service.map_err(map_load_shed_error);
409 boxed_service = limited_service.boxed_clone();
410 }
411
412 let buffer_capacity = self.config.buffer_capacity.max(1);
413 let buffered_service: crate::client::BufferedService =
414 Buffer::new(boxed_service, buffer_capacity);
415
416 crate::HttpClient {
417 service: buffered_service,
418 max_body_size: self.config.max_body_size,
419 transport_security: self.config.transport,
420 }
421 }
422}
423
424impl Default for HttpClientBuilder {
425 fn default() -> Self {
426 Self::new()
427 }
428}
429
430fn map_tower_error(err: tower::BoxError, timeout: Duration) -> HttpError {
436 if err.is::<tower::timeout::error::Elapsed>() {
437 return HttpError::Timeout(timeout);
438 }
439
440 match err.downcast::<HttpError>() {
442 Ok(http_err) => *http_err,
443 Err(other) => HttpError::Transport(other),
444 }
445}
446
447fn map_load_shed_error(err: tower::BoxError) -> HttpError {
449 if err.is::<tower::load_shed::error::Overloaded>() {
450 HttpError::Overloaded
451 } else {
452 match err.downcast::<HttpError>() {
454 Ok(http_err) => *http_err,
455 Err(err) => HttpError::Transport(err),
456 }
457 }
458}
459
460fn map_decompression_response<B>(response: Response<B>) -> Response<ResponseBody>
465where
466 B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
467 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
468{
469 let (parts, body) = response.into_parts();
470 let boxed_body: ResponseBody = body.map_err(Into::into).boxed();
474 Response::from_parts(parts, boxed_body)
475}
476
477fn build_https_connector(
491 tls_roots: TlsRootConfig,
492 transport: TransportSecurity,
493) -> Result<HttpsConnector<HttpConnector>, HttpError> {
494 let allow_http = transport == TransportSecurity::AllowInsecureHttp;
495
496 match tls_roots {
497 TlsRootConfig::WebPki => {
498 let provider = tls::get_crypto_provider();
499 let builder = hyper_rustls::HttpsConnectorBuilder::new()
500 .with_provider_and_webpki_roots(provider)
501 .map_err(|e| HttpError::Tls(Box::new(e)))?;
504 let connector = if allow_http {
505 builder.https_or_http().enable_all_versions().build()
506 } else {
507 builder.https_only().enable_all_versions().build()
508 };
509 Ok(connector)
510 }
511 TlsRootConfig::Native => {
512 let client_config = tls::native_roots_client_config()
513 .map_err(|e| HttpError::Tls(e.into()))?;
515 let builder = hyper_rustls::HttpsConnectorBuilder::new().with_tls_config(client_config);
516 let connector = if allow_http {
517 builder.https_or_http().enable_all_versions().build()
518 } else {
519 builder.https_only().enable_all_versions().build()
520 };
521 Ok(connector)
522 }
523 }
524}
525
526#[cfg(test)]
527#[cfg_attr(coverage_nightly, coverage(off))]
528mod tests {
529 use super::*;
530 use crate::config::DEFAULT_USER_AGENT;
531
532 #[test]
533 fn test_builder_default() {
534 let builder = HttpClientBuilder::new();
535 assert_eq!(builder.config.request_timeout, Duration::from_secs(30));
536 assert_eq!(builder.config.user_agent, DEFAULT_USER_AGENT);
537 assert!(builder.config.retry.is_some());
538 assert_eq!(builder.config.buffer_capacity, 1024);
539 }
540
541 #[test]
542 fn test_builder_with_config() {
543 let config = HttpClientConfig::minimal();
544 let builder = HttpClientBuilder::with_config(config);
545 assert_eq!(builder.config.request_timeout, Duration::from_secs(10));
546 }
547
548 #[test]
549 fn test_builder_timeout() {
550 let builder = HttpClientBuilder::new().timeout(Duration::from_secs(60));
551 assert_eq!(builder.config.request_timeout, Duration::from_secs(60));
552 }
553
554 #[test]
555 fn test_builder_user_agent() {
556 let builder = HttpClientBuilder::new().user_agent("custom/1.0");
557 assert_eq!(builder.config.user_agent, "custom/1.0");
558 }
559
560 #[test]
561 fn test_builder_retry() {
562 let builder = HttpClientBuilder::new().retry(None);
563 assert!(builder.config.retry.is_none());
564 }
565
566 #[test]
567 fn test_builder_max_body_size() {
568 let builder = HttpClientBuilder::new().max_body_size(1024);
569 assert_eq!(builder.config.max_body_size, 1024);
570 }
571
572 #[test]
573 fn test_builder_transport_security() {
574 let builder = HttpClientBuilder::new().transport(TransportSecurity::TlsOnly);
575 assert_eq!(builder.config.transport, TransportSecurity::TlsOnly);
576
577 let builder = HttpClientBuilder::new().deny_insecure_http();
578 assert_eq!(builder.config.transport, TransportSecurity::TlsOnly);
579
580 let builder = HttpClientBuilder::new();
581 assert_eq!(
582 builder.config.transport,
583 TransportSecurity::AllowInsecureHttp
584 );
585 }
586
587 #[test]
588 fn test_builder_otel() {
589 let builder = HttpClientBuilder::new().with_otel();
590 assert!(builder.config.otel);
591 }
592
593 #[test]
594 fn test_builder_buffer_capacity() {
595 let builder = HttpClientBuilder::new().buffer_capacity(512);
596 assert_eq!(builder.config.buffer_capacity, 512);
597 }
598
599 #[test]
603 fn test_builder_buffer_capacity_zero_clamped() {
604 let builder = HttpClientBuilder::new().buffer_capacity(0);
605 assert_eq!(
606 builder.config.buffer_capacity, 1,
607 "buffer_capacity=0 should be clamped to 1"
608 );
609 }
610
611 #[tokio::test]
613 async fn test_builder_buffer_capacity_zero_in_config_clamped() {
614 let config = HttpClientConfig {
615 buffer_capacity: 0, ..Default::default()
617 };
618 let result = HttpClientBuilder::with_config(config).build();
619 assert!(
621 result.is_ok(),
622 "build() should succeed with capacity clamped to 1"
623 );
624 }
625
626 #[tokio::test]
627 async fn test_builder_build_with_otel() {
628 let client = HttpClientBuilder::new().with_otel().build();
629 assert!(client.is_ok());
630 }
631
632 #[tokio::test]
633 async fn test_builder_with_auth_layer() {
634 let client = HttpClientBuilder::new()
635 .with_auth_layer(|svc| svc) .build();
637 assert!(client.is_ok());
638 }
639
640 #[tokio::test]
641 async fn test_builder_build() {
642 let client = HttpClientBuilder::new().build();
643 assert!(client.is_ok());
644 }
645
646 #[tokio::test]
647 async fn test_builder_build_with_deny_insecure_http() {
648 let client = HttpClientBuilder::new().deny_insecure_http().build();
649 assert!(client.is_ok());
650 }
651
652 #[tokio::test]
653 async fn test_builder_build_with_sse_config() {
654 use crate::config::HttpClientConfig;
655 let config = HttpClientConfig::sse();
656 let client = HttpClientBuilder::with_config(config).build();
657 assert!(client.is_ok(), "SSE config should build successfully");
658 }
659
660 #[tokio::test]
661 async fn test_builder_build_invalid_user_agent() {
662 let client = HttpClientBuilder::new()
663 .user_agent("invalid\x00agent")
664 .build();
665 assert!(client.is_err());
666 }
667
668 #[tokio::test]
669 async fn test_builder_default_uses_webpki_roots() {
670 let builder = HttpClientBuilder::new();
671 assert_eq!(builder.config.tls_roots, TlsRootConfig::WebPki);
672 let client = builder.build();
674 assert!(client.is_ok());
675 }
676
677 #[tokio::test]
678 async fn test_builder_native_roots() {
679 let config = HttpClientConfig {
680 tls_roots: TlsRootConfig::Native,
681 ..Default::default()
682 };
683 let result = HttpClientBuilder::with_config(config).build();
684
685 match &result {
689 Ok(_) => {
690 }
692 Err(HttpError::Tls(err)) => {
693 let msg = err.to_string();
695 assert!(
696 msg.contains("native root") || msg.contains("certificate"),
697 "TLS error should mention certificates: {msg}"
698 );
699 }
700 Err(other) => {
701 panic!("Unexpected error type: {other:?}");
702 }
703 }
704 }
705
706 #[tokio::test]
707 async fn test_builder_webpki_roots_https_only() {
708 let config = HttpClientConfig {
709 tls_roots: TlsRootConfig::WebPki,
710 transport: TransportSecurity::TlsOnly,
711 ..Default::default()
712 };
713 let client = HttpClientBuilder::with_config(config).build();
714 assert!(client.is_ok());
715 }
716
717 #[tokio::test]
723 async fn test_http2_enabled_for_all_configurations() {
724 let client = HttpClientBuilder::new().build();
726 assert!(
727 client.is_ok(),
728 "WebPki + AllowInsecureHttp should build with HTTP/2 enabled"
729 );
730
731 let client = HttpClientBuilder::new()
733 .transport(TransportSecurity::TlsOnly)
734 .build();
735 assert!(
736 client.is_ok(),
737 "WebPki + TlsOnly should build with HTTP/2 enabled"
738 );
739
740 let config = HttpClientConfig {
742 tls_roots: TlsRootConfig::Native,
743 transport: TransportSecurity::AllowInsecureHttp,
744 ..Default::default()
745 };
746 let client = HttpClientBuilder::with_config(config).build();
747 assert!(
748 client.is_ok(),
749 "Native + AllowInsecureHttp should build with HTTP/2 enabled"
750 );
751
752 let config = HttpClientConfig {
754 tls_roots: TlsRootConfig::Native,
755 transport: TransportSecurity::TlsOnly,
756 ..Default::default()
757 };
758 let client = HttpClientBuilder::with_config(config).build();
759 assert!(
760 client.is_ok(),
761 "Native + TlsOnly should build with HTTP/2 enabled"
762 );
763 }
764
765 #[tokio::test]
770 async fn test_load_shedding_returns_overloaded_error() {
771 use bytes::Bytes;
772 use http::{Request, Response};
773 use http_body_util::Full;
774 use std::future::Future;
775 use std::pin::Pin;
776 use std::sync::Arc;
777 use std::sync::atomic::{AtomicUsize, Ordering};
778 use std::task::{Context, Poll};
779 use tower::Service;
780 use tower::ServiceExt;
781
782 #[derive(Clone)]
784 struct SlotHoldingService {
785 active: Arc<AtomicUsize>,
786 }
787
788 impl Service<Request<Full<Bytes>>> for SlotHoldingService {
789 type Response = Response<Full<Bytes>>;
790 type Error = HttpError;
791 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
792
793 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
794 Poll::Ready(Ok(()))
795 }
796
797 fn call(&mut self, _: Request<Full<Bytes>>) -> Self::Future {
798 self.active.fetch_add(1, Ordering::SeqCst);
799 Box::pin(std::future::pending())
801 }
802 }
803
804 let active = Arc::new(AtomicUsize::new(0));
805
806 let service = tower::ServiceBuilder::new()
808 .layer(LoadShedLayer::new())
809 .layer(ConcurrencyLimitLayer::new(1))
810 .service(SlotHoldingService {
811 active: active.clone(),
812 });
813
814 let service = service.map_err(map_load_shed_error);
815
816 let req1 = Request::builder()
818 .uri("http://test")
819 .body(Full::new(Bytes::new()))
820 .unwrap();
821 let mut svc1 = service.clone();
822
823 let svc1_ready = svc1.ready().await.unwrap();
824 let _pending_fut = svc1_ready.call(req1);
825
826 tokio::time::sleep(Duration::from_millis(10)).await;
828 assert_eq!(
829 active.load(Ordering::SeqCst),
830 1,
831 "First request should be active"
832 );
833
834 let req2 = Request::builder()
836 .uri("http://test")
837 .body(Full::new(Bytes::new()))
838 .unwrap();
839
840 let mut svc2 = service.clone();
841
842 let result = tokio::time::timeout(Duration::from_millis(100), async {
844 match svc2.ready().await {
846 Ok(ready_svc) => ready_svc.call(req2).await,
847 Err(e) => Err(e),
848 }
849 })
850 .await;
851
852 assert!(result.is_ok(), "Request should not hang");
854 let err = result.unwrap().unwrap_err();
855 assert!(
856 matches!(err, HttpError::Overloaded),
857 "Expected Overloaded error, got: {err:?}"
858 );
859 }
860
861 #[test]
867 fn test_map_tower_error_preserves_overloaded() {
868 let http_err = HttpError::Overloaded;
869 let boxed: tower::BoxError = Box::new(http_err);
870 let result = map_tower_error(boxed, Duration::from_secs(30));
871
872 assert!(
873 matches!(result, HttpError::Overloaded),
874 "Should preserve HttpError::Overloaded, got: {result:?}"
875 );
876 }
877
878 #[test]
880 fn test_map_tower_error_preserves_service_closed() {
881 let http_err = HttpError::ServiceClosed;
882 let boxed: tower::BoxError = Box::new(http_err);
883 let result = map_tower_error(boxed, Duration::from_secs(30));
884
885 assert!(
886 matches!(result, HttpError::ServiceClosed),
887 "Should preserve HttpError::ServiceClosed, got: {result:?}"
888 );
889 }
890
891 #[test]
893 fn test_map_tower_error_preserves_timeout_attempt() {
894 let original_duration = Duration::from_secs(5);
895 let http_err = HttpError::Timeout(original_duration);
896 let boxed: tower::BoxError = Box::new(http_err);
897 let result = map_tower_error(boxed, Duration::from_secs(30));
899
900 match result {
901 HttpError::Timeout(d) => {
902 assert_eq!(
903 d, original_duration,
904 "Should preserve original timeout duration"
905 );
906 }
907 other => panic!("Should preserve HttpError::Timeout, got: {other:?}"),
908 }
909 }
910
911 #[test]
913 fn test_map_tower_error_wraps_unknown_as_transport() {
914 let other_err: tower::BoxError = Box::new(std::io::Error::new(
915 std::io::ErrorKind::ConnectionRefused,
916 "connection refused",
917 ));
918 let result = map_tower_error(other_err, Duration::from_secs(30));
919
920 assert!(
921 matches!(result, HttpError::Transport(_)),
922 "Should wrap unknown errors as Transport, got: {result:?}"
923 );
924 }
925
926 #[tokio::test]
945 async fn test_cancellation_propagates_through_full_stack() {
946 use crate::response::ResponseBody;
947 use std::future::Future;
948 use std::pin::Pin;
949 use std::sync::Arc;
950 use std::sync::atomic::{AtomicBool, Ordering};
951 use std::task::{Context, Poll};
952 use tower::Service;
953
954 #[derive(Clone)]
955 struct PendingService {
956 completed: Arc<AtomicBool>,
957 drop_notifier: Arc<tokio::sync::Notify>,
958 started_notifier: Arc<tokio::sync::Notify>,
959 }
960
961 struct FutureGuard {
962 completed: Arc<AtomicBool>,
963 drop_notifier: Arc<tokio::sync::Notify>,
964 }
965
966 impl Drop for FutureGuard {
967 fn drop(&mut self) {
968 if !self.completed.load(Ordering::SeqCst) {
969 self.drop_notifier.notify_one();
970 }
971 }
972 }
973
974 impl Service<http::Request<Full<Bytes>>> for PendingService {
975 type Response = http::Response<ResponseBody>;
976 type Error = HttpError;
977 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
978
979 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
980 Poll::Ready(Ok(()))
981 }
982
983 fn call(&mut self, _: http::Request<Full<Bytes>>) -> Self::Future {
984 let completed = self.completed.clone();
985 let drop_notifier = self.drop_notifier.clone();
986 let started_notifier = self.started_notifier.clone();
987 Box::pin(async move {
988 let _guard = FutureGuard {
989 completed: completed.clone(),
990 drop_notifier,
991 };
992 started_notifier.notify_one();
994 std::future::pending::<()>().await;
996 completed.store(true, Ordering::SeqCst);
997 unreachable!()
998 })
999 }
1000 }
1001
1002 let inner_completed = Arc::new(AtomicBool::new(false));
1003 let drop_notifier = Arc::new(tokio::sync::Notify::new());
1004 let started_notifier = Arc::new(tokio::sync::Notify::new());
1005
1006 let inner = PendingService {
1007 completed: inner_completed.clone(),
1008 drop_notifier: drop_notifier.clone(),
1009 started_notifier: started_notifier.clone(),
1010 };
1011
1012 let client = HttpClientBuilder::new()
1015 .timeout(Duration::from_secs(30))
1016 .retry(None)
1017 .build_with_inner_service(inner.boxed_clone());
1018
1019 let send_handle = tokio::spawn({
1021 let client = client.clone();
1022 async move { client.get("http://fake/slow").send().await }
1023 });
1024
1025 started_notifier.notified().await;
1027
1028 send_handle.abort();
1030
1031 tokio::time::timeout(Duration::from_secs(5), drop_notifier.notified())
1033 .await
1034 .expect(
1035 "Inner service future should have been dropped within 5s - \
1036 the full modkit-http stack must propagate cancellation",
1037 );
1038
1039 assert!(
1040 !inner_completed.load(Ordering::SeqCst),
1041 "Inner service future should NOT have completed"
1042 );
1043 }
1044}