1use std::error::Error as StdError;
25use std::fmt;
26use std::io;
27use std::ops::ControlFlow;
28use std::time::Duration;
29
30#[derive(Debug, Clone, Copy)]
32pub struct RetryPolicy {
33 pub max_attempts: u32,
47 pub base_delay: Duration,
49 pub max_delay: Duration,
51}
52
53impl RetryPolicy {
54 pub const UPLOAD: RetryPolicy = RetryPolicy {
57 max_attempts: 10,
58 base_delay: Duration::from_millis(50),
59 max_delay: Duration::from_secs(30),
60 };
61
62 pub fn delay_for(&self, next_attempt: u32) -> Duration {
63 let exp = next_attempt.saturating_sub(2);
67 let mult = 1u64.checked_shl(exp).unwrap_or(u64::MAX);
68 let ms = (self.base_delay.as_millis() as u64).saturating_mul(mult);
69 std::cmp::min(Duration::from_millis(ms), self.max_delay)
70 }
71}
72
73pub fn retry_sync<T, E, F>(policy: &RetryPolicy, mut op: F) -> Result<T, E>
82where
83 F: FnMut(u32) -> Result<T, ControlFlow<E, E>>,
84{
85 let max = policy.max_attempts.max(1);
86 let mut attempt: u32 = 1;
87 loop {
88 if attempt > 1 {
89 std::thread::sleep(policy.delay_for(attempt));
90 }
91 match op(attempt) {
92 Ok(v) => return Ok(v),
93 Err(ControlFlow::Break(e)) => return Err(e),
94 Err(ControlFlow::Continue(e)) => {
95 if attempt >= max {
96 return Err(e);
97 }
98 }
99 }
100 attempt += 1;
101 }
102}
103
104pub async fn retry_async<T, E, F, Fut>(policy: &RetryPolicy, mut op: F) -> Result<T, E>
108where
109 F: FnMut(u32) -> Fut,
110 Fut: std::future::Future<Output = Result<T, ControlFlow<E, E>>>,
111{
112 let max = policy.max_attempts.max(1);
113 let mut attempt: u32 = 1;
114 loop {
115 if attempt > 1 {
116 tokio::time::sleep(policy.delay_for(attempt)).await;
117 }
118 match op(attempt).await {
119 Ok(v) => return Ok(v),
120 Err(ControlFlow::Break(e)) => return Err(e),
121 Err(ControlFlow::Continue(e)) => {
122 if attempt >= max {
123 return Err(e);
124 }
125 }
126 }
127 attempt += 1;
128 }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135pub enum SuccessClass {
136 Strict,
139 AllowRedirects,
143}
144
145pub fn retry_http_blocking<F, M>(
172 label: &str,
173 policy: &RetryPolicy,
174 success_class: SuccessClass,
175 mut send: F,
176 error_msg: M,
177) -> anyhow::Result<(reqwest::StatusCode, String)>
178where
179 F: FnMut(u32) -> Result<reqwest::blocking::Response, reqwest::Error>,
180 M: Fn(reqwest::StatusCode, &str) -> String,
181{
182 use anyhow::Context as _;
183 retry_sync(policy, |attempt| {
184 match send(attempt) {
185 Ok(resp) => {
186 let status = resp.status();
187 let succeeded = match success_class {
188 SuccessClass::Strict => status.is_success(),
189 SuccessClass::AllowRedirects => status.is_success() || status.is_redirection(),
190 };
191 let body = resp
192 .text()
193 .unwrap_or_else(|e| format!("<failed to read body: {e}>"));
194 if succeeded {
195 Ok((status, body))
196 } else {
197 let msg = error_msg(status, &body);
198 let inner = anyhow::anyhow!("{msg}");
199 let wrapped = anyhow::Error::new(HttpError::new(
200 std::io::Error::other(inner.to_string()),
201 status.as_u16(),
202 ))
203 .context(inner);
204 if is_retriable(wrapped.as_ref()) {
210 Err(ControlFlow::Continue(wrapped))
211 } else {
212 Err(ControlFlow::Break(wrapped))
213 }
214 }
215 }
216 Err(e) => {
217 let err = anyhow::Error::new(HttpError::from_response(e, None))
221 .context(format!("{label}: HTTP transport error"));
222 if is_retriable(err.as_ref()) {
223 Err(ControlFlow::Continue(err))
224 } else {
225 Err(ControlFlow::Break(err))
226 }
227 }
228 }
229 })
230 .with_context(|| format!("{label}: exhausted retry attempts"))
231}
232
233pub async fn retry_http_async<F, Fut, M>(
254 label: &str,
255 policy: &RetryPolicy,
256 success_class: SuccessClass,
257 mut send: F,
258 error_msg: M,
259) -> anyhow::Result<reqwest::Response>
260where
261 F: FnMut(u32) -> Fut,
262 Fut: std::future::Future<Output = Result<reqwest::Response, reqwest::Error>>,
263 M: Fn(reqwest::StatusCode, &str) -> String,
264{
265 use anyhow::Context as _;
266 retry_async(policy, |attempt| {
267 let fut = send(attempt);
268 let error_msg = &error_msg;
269 async move {
270 match fut.await {
271 Ok(resp) => {
272 let status = resp.status();
273 let succeeded = match success_class {
274 SuccessClass::Strict => status.is_success(),
275 SuccessClass::AllowRedirects => {
276 status.is_success() || status.is_redirection()
277 }
278 };
279 if succeeded {
280 Ok(resp)
281 } else {
282 let body = resp
283 .text()
284 .await
285 .unwrap_or_else(|e| format!("<failed to read body: {e}>"));
286 let msg = error_msg(status, &body);
287 let inner = anyhow::anyhow!("{msg}");
288 let wrapped = anyhow::Error::new(HttpError::new(
289 std::io::Error::other(inner.to_string()),
290 status.as_u16(),
291 ))
292 .context(inner);
293 if is_retriable(wrapped.as_ref()) {
299 Err(ControlFlow::Continue(wrapped))
300 } else {
301 Err(ControlFlow::Break(wrapped))
302 }
303 }
304 }
305 Err(e) => {
306 let err = anyhow::Error::new(HttpError::from_response(e, None))
310 .context(format!("{label}: HTTP transport error"));
311 if is_retriable(err.as_ref()) {
312 Err(ControlFlow::Continue(err))
313 } else {
314 Err(ControlFlow::Break(err))
315 }
316 }
317 }
318 }
319 })
320 .await
321 .with_context(|| format!("{label}: exhausted retry attempts"))
322}
323
324pub fn classify_http_sync(
332 result: reqwest::Result<reqwest::blocking::Response>,
333) -> Result<reqwest::blocking::Response, ControlFlow<anyhow::Error, anyhow::Error>> {
334 use anyhow::anyhow;
335 match result {
336 Ok(resp) => {
337 let status = resp.status();
338 if status.is_success() || status.is_redirection() {
339 Ok(resp)
340 } else if status.is_server_error() {
341 Err(ControlFlow::Continue(anyhow!(
342 "HTTP {} {}",
343 status.as_u16(),
344 status.canonical_reason().unwrap_or("server error")
345 )))
346 } else {
347 Err(ControlFlow::Break(anyhow!(
349 "HTTP {} {}",
350 status.as_u16(),
351 status.canonical_reason().unwrap_or("client error")
352 )))
353 }
354 }
355 Err(e) => Err(ControlFlow::Continue(anyhow!(e))),
357 }
358}
359
360#[derive(Debug)]
376pub struct HttpError {
377 source: Box<dyn StdError + Send + Sync + 'static>,
380 pub status: u16,
382}
383
384impl HttpError {
385 pub fn new<E>(source: E, status: u16) -> Self
388 where
389 E: StdError + Send + Sync + 'static,
390 {
391 Self {
392 source: Box::new(source),
393 status,
394 }
395 }
396
397 pub fn from_response<E>(err: E, resp: Option<&reqwest::Response>) -> Self
401 where
402 E: StdError + Send + Sync + 'static,
403 {
404 Self::new(err, resp.map(|r| r.status().as_u16()).unwrap_or(0))
405 }
406}
407
408impl fmt::Display for HttpError {
409 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
410 fmt::Display::fmt(&self.source, f)
413 }
414}
415
416impl StdError for HttpError {
417 fn source(&self) -> Option<&(dyn StdError + 'static)> {
418 Some(&*self.source)
419 }
420}
421
422#[derive(Debug)]
428pub struct Retriable(Box<dyn StdError + Send + Sync + 'static>);
429
430impl Retriable {
431 pub fn new<E>(source: E) -> Self
437 where
438 E: StdError + Send + Sync + 'static,
439 {
440 Self(Box::new(source))
441 }
442}
443
444impl fmt::Display for Retriable {
445 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
446 fmt::Display::fmt(&self.0, f)
447 }
448}
449
450impl StdError for Retriable {
451 fn source(&self) -> Option<&(dyn StdError + 'static)> {
452 Some(&*self.0)
453 }
454}
455
456pub fn is_network_error(err: &(dyn StdError + 'static)) -> bool {
485 let mut cur: Option<&(dyn StdError + 'static)> = Some(err);
486 while let Some(e) = cur {
487 if let Some(io_err) = e.downcast_ref::<io::Error>() {
490 match io_err.kind() {
491 io::ErrorKind::UnexpectedEof
492 | io::ErrorKind::TimedOut
493 | io::ErrorKind::ConnectionRefused
494 | io::ErrorKind::ConnectionReset
495 | io::ErrorKind::ConnectionAborted
496 | io::ErrorKind::BrokenPipe => return true,
497 _ => {}
498 }
499 let m = io_err.to_string().to_lowercase();
500 if m == "eof" || m == "unexpected eof" {
501 return true;
502 }
503 }
504
505 let s = e.to_string().to_lowercase();
509 if NETWORK_ERROR_NEEDLES.iter().any(|n| s.contains(n)) {
510 return true;
511 }
512
513 cur = e.source();
514 }
515 false
516}
517
518const NETWORK_ERROR_NEEDLES: &[&str] = &[
528 "connection reset",
529 "network is unreachable",
530 "connection closed",
531 "connection refused",
532 "tls handshake timeout",
533 "i/o timeout",
534 "broken pipe",
535 "timeout awaiting response headers",
536 "context deadline exceeded",
537 "operation timed out",
539 "the network connection was aborted",
541 "an existing connection was forcibly closed",
543 "dns error",
550 "failed to lookup address",
553 "no such host is known",
555];
556
557pub fn is_retriable(err: &(dyn StdError + 'static)) -> bool {
569 let mut cur: Option<&(dyn StdError + 'static)> = Some(err);
571 while let Some(e) = cur {
572 if e.is::<Retriable>() {
573 return true;
574 }
575 if let Some(http) = e.downcast_ref::<HttpError>()
576 && (http.status >= 500 || http.status == 429)
577 {
578 return true;
579 }
580 cur = e.source();
581 }
582
583 is_network_error(err)
585}
586
587pub fn is_retriable_opt(err: Option<&(dyn StdError + 'static)>) -> bool {
590 err.is_some_and(is_retriable)
591}
592
593pub fn jitter_duration(base: Duration) -> Duration {
604 let nanos = base.as_nanos() as u64;
605 let window = nanos / 5;
607 if window == 0 {
608 return base;
609 }
610 let seed = crate::sde::resolve_now().timestamp_subsec_nanos() as u64;
617 let offset = seed % (window * 2);
618 let jittered = nanos.saturating_sub(window).saturating_add(offset);
620 Duration::from_nanos(jittered)
621}
622
623#[cfg(test)]
624mod tests {
625 use super::*;
626 use std::sync::atomic::{AtomicU32, Ordering};
627
628 fn fast_policy() -> RetryPolicy {
629 RetryPolicy {
630 max_attempts: 4,
631 base_delay: Duration::from_millis(1),
632 max_delay: Duration::from_millis(5),
633 }
634 }
635
636 #[test]
637 fn delay_progression_caps_at_max() {
638 let p = RetryPolicy {
639 max_attempts: 10,
640 base_delay: Duration::from_millis(100),
641 max_delay: Duration::from_millis(500),
642 };
643 assert_eq!(p.delay_for(2), Duration::from_millis(100));
644 assert_eq!(p.delay_for(3), Duration::from_millis(200));
645 assert_eq!(p.delay_for(4), Duration::from_millis(400));
646 assert_eq!(p.delay_for(5), Duration::from_millis(500)); assert_eq!(p.delay_for(8), Duration::from_millis(500)); }
649
650 #[test]
651 fn sync_succeeds_on_first_attempt() {
652 let calls = AtomicU32::new(0);
653 let result: Result<&str, ()> = retry_sync(&fast_policy(), |_| {
654 calls.fetch_add(1, Ordering::SeqCst);
655 Ok("ok")
656 });
657 assert_eq!(result, Ok("ok"));
658 assert_eq!(calls.load(Ordering::SeqCst), 1);
659 }
660
661 #[test]
662 fn sync_retries_until_success() {
663 let calls = AtomicU32::new(0);
664 let result: Result<u32, &str> = retry_sync(&fast_policy(), |attempt| {
665 calls.fetch_add(1, Ordering::SeqCst);
666 if attempt < 3 {
667 Err(ControlFlow::Continue("transient"))
668 } else {
669 Ok(attempt)
670 }
671 });
672 assert_eq!(result, Ok(3));
673 assert_eq!(calls.load(Ordering::SeqCst), 3);
674 }
675
676 #[test]
677 fn sync_break_stops_immediately() {
678 let calls = AtomicU32::new(0);
679 let result: Result<(), &str> = retry_sync(&fast_policy(), |_| {
680 calls.fetch_add(1, Ordering::SeqCst);
681 Err(ControlFlow::Break("fatal"))
682 });
683 assert_eq!(result, Err("fatal"));
684 assert_eq!(calls.load(Ordering::SeqCst), 1);
685 }
686
687 #[test]
688 fn sync_returns_last_error_after_exhaustion() {
689 let calls = AtomicU32::new(0);
690 let result: Result<(), String> = retry_sync(&fast_policy(), |attempt| {
691 calls.fetch_add(1, Ordering::SeqCst);
692 Err(ControlFlow::Continue(format!("fail {attempt}")))
693 });
694 assert_eq!(result, Err("fail 4".to_string()));
695 assert_eq!(calls.load(Ordering::SeqCst), 4);
696 }
697
698 #[tokio::test]
699 async fn async_retries_until_success() {
700 let calls = std::sync::Arc::new(AtomicU32::new(0));
701 let calls_inner = calls.clone();
702 let result: Result<u32, &str> = retry_async(&fast_policy(), move |attempt| {
703 let c = calls_inner.clone();
704 async move {
705 c.fetch_add(1, Ordering::SeqCst);
706 if attempt < 2 {
707 Err(ControlFlow::Continue("transient"))
708 } else {
709 Ok(attempt)
710 }
711 }
712 })
713 .await;
714 assert_eq!(result, Ok(2));
715 assert_eq!(calls.load(Ordering::SeqCst), 2);
716 }
717
718 #[derive(Debug)]
726 struct StrErr(&'static str);
727 impl fmt::Display for StrErr {
728 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
729 f.write_str(self.0)
730 }
731 }
732 impl StdError for StrErr {}
733
734 #[derive(Debug)]
735 struct OwnedErr(String);
736 impl fmt::Display for OwnedErr {
737 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
738 f.write_str(&self.0)
739 }
740 }
741 impl StdError for OwnedErr {}
742
743 #[test]
744 fn network_error_substrings_match() {
745 for s in [
746 "connection reset by peer",
747 "network is unreachable",
748 "connection closed unexpectedly",
749 "connection refused",
750 "tls handshake timeout",
751 "i/o timeout",
752 "CONNECTION RESET",
753 "TLS Handshake Timeout",
754 "write: broken pipe",
755 "net/http: timeout awaiting response headers",
756 "context deadline exceeded",
757 "client error (Connect): dns error: failed to lookup address information: Name or service not known",
762 "dns error: nodename nor servname provided, or not known",
763 "dns error: No such host is known. (os error 11001)",
764 ] {
765 let e = OwnedErr(s.to_string());
766 assert!(is_network_error(&e), "expected network error: {s:?}");
767 }
768 }
769
770 #[test]
771 fn network_error_io_eof_kinds() {
772 let e = io::Error::from(io::ErrorKind::UnexpectedEof);
773 assert!(is_network_error(&e));
774
775 let e2 = io::Error::other("EOF");
777 assert!(is_network_error(&e2));
778 }
779
780 #[test]
787 fn is_network_error_classifies_io_timedout() {
788 let e = io::Error::from(io::ErrorKind::TimedOut);
789 assert!(is_network_error(&e));
790 assert!(is_retriable(&e));
791 }
792
793 #[test]
794 fn is_network_error_classifies_io_connection_refused() {
795 let e = io::Error::from(io::ErrorKind::ConnectionRefused);
796 assert!(is_network_error(&e));
797 assert!(is_retriable(&e));
798 }
799
800 #[test]
801 fn is_network_error_classifies_io_connection_reset() {
802 let e = io::Error::from(io::ErrorKind::ConnectionReset);
803 assert!(is_network_error(&e));
804 assert!(is_retriable(&e));
805 }
806
807 #[test]
808 fn is_network_error_classifies_io_connection_aborted() {
809 let e = io::Error::from(io::ErrorKind::ConnectionAborted);
810 assert!(is_network_error(&e));
811 assert!(is_retriable(&e));
812 }
813
814 #[test]
815 fn is_network_error_classifies_io_broken_pipe() {
816 let e = io::Error::from(io::ErrorKind::BrokenPipe);
817 assert!(is_network_error(&e));
818 assert!(is_retriable(&e));
819 }
820
821 #[test]
822 fn is_network_error_classifies_operation_timed_out_substring() {
823 let other_kind = io::Error::other("operation timed out");
828 assert!(is_network_error(&other_kind));
829 assert!(is_retriable(&other_kind));
830
831 let kind_only = io::Error::from(io::ErrorKind::TimedOut);
832 assert!(is_network_error(&kind_only));
833 assert!(is_retriable(&kind_only));
834 }
835
836 #[test]
837 fn network_error_wrapped_unexpected_eof() {
838 #[derive(Debug)]
840 struct Wrap(io::Error);
841 impl fmt::Display for Wrap {
842 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
843 write!(f, "read failed")
844 }
845 }
846 impl StdError for Wrap {
847 fn source(&self) -> Option<&(dyn StdError + 'static)> {
848 Some(&self.0)
849 }
850 }
851 let inner = io::Error::from(io::ErrorKind::UnexpectedEof);
852 let outer = Wrap(inner);
853 assert!(is_network_error(&outer));
854 }
855
856 #[test]
857 fn network_error_non_network_strings_reject() {
858 for s in [
859 "file not found",
860 "permission denied",
861 "dial tcp: lookup example.com: no such host",
862 "",
863 ] {
864 let e = OwnedErr(s.to_string());
865 assert!(!is_network_error(&e), "expected NOT network error: {s:?}");
866 }
867 }
868
869 #[test]
870 fn retriable_opt_nil_passthrough() {
871 assert!(!is_retriable_opt(None));
872 }
873
874 #[test]
875 fn http_error_500_retriable() {
876 let e = HttpError::new(StrErr("internal server error"), 500);
877 assert!(is_retriable(&e));
878 }
879
880 #[test]
881 fn http_error_502_503_retriable() {
882 for s in [502u16, 503] {
883 let e = HttpError::new(StrErr("bad gateway"), s);
884 assert!(is_retriable(&e), "status {s} should be retriable");
885 }
886 }
887
888 #[test]
889 fn http_error_429_retriable() {
890 let e = HttpError::new(StrErr("rate limited"), 429);
891 assert!(is_retriable(&e));
892 }
893
894 #[test]
895 fn http_error_4xx_not_retriable() {
896 for s in [400u16, 401, 403, 404, 422] {
897 let e = HttpError::new(StrErr("client err"), s);
898 assert!(!is_retriable(&e), "status {s} should NOT be retriable");
899 }
900 }
901
902 #[test]
903 fn http_error_zero_status_routes_via_message() {
904 let net = HttpError::new(StrErr("connection reset"), 0);
907 assert!(is_retriable(&net));
908
909 let non_net = HttpError::new(StrErr("dial failed"), 0);
910 assert!(!is_retriable(&non_net));
911 }
912
913 #[test]
914 fn http_error_unwrap_chain_visible() {
915 let inner = StrErr("inner");
916 let e = HttpError::new(inner, 503);
917 assert!(e.source().is_some());
918 }
919
920 #[test]
921 fn from_response_nil_resp_yields_status_zero() {
922 let inner = io::Error::other("connect: dial tcp");
926 let e = HttpError::from_response(inner, None);
927 assert_eq!(e.status, 0);
928 }
929
930 #[test]
931 fn from_response_unwrap_chain_visible() {
932 let inner = io::Error::other("connection reset by peer");
935 let e = HttpError::from_response(inner, None);
936 assert!(
937 e.source().is_some(),
938 "inner error must be reachable via source()"
939 );
940 assert!(is_retriable(&e));
942 }
943
944 #[test]
945 fn retriable_wrapper_is_retriable() {
946 let e = Retriable::new(StrErr("retry me"));
947 assert!(is_retriable(&e));
948 }
949
950 #[test]
951 fn retriable_wrapper_overrides_4xx() {
952 let inner = HttpError::new(StrErr("exists"), 422);
954 let outer = Retriable::new(inner);
955 assert!(is_retriable(&outer));
956 }
957
958 #[test]
959 fn retriable_wrapper_unwrap_chain_visible() {
960 let inner = StrErr("inner");
961 let e = Retriable::new(inner);
962 assert!(e.source().is_some());
963 }
964
965 #[test]
966 fn plain_error_not_retriable() {
967 let e = StrErr("something");
968 assert!(!is_retriable(&e));
969 }
970
971 #[test]
972 fn anyhow_error_threadable() {
973 let e: anyhow::Error = anyhow::anyhow!("connection refused");
976 assert!(is_retriable(e.as_ref()));
977
978 let e2: anyhow::Error = anyhow::anyhow!("permission denied");
979 assert!(!is_retriable(e2.as_ref()));
980 }
981
982 #[test]
983 fn is_retriable_chain_walks_to_http_error() {
984 let inner = HttpError::new(StrErr("bad gateway"), 503);
988 let wrapped: anyhow::Error = anyhow::Error::new(inner).context("publish failed");
989 assert!(is_retriable(wrapped.as_ref()));
990 }
991
992 #[test]
1003 fn classifier_5xx_via_anyhow_chain_uses_as_ref() {
1004 let wrapped: anyhow::Error =
1005 anyhow::Error::new(HttpError::new(std::io::Error::other("503"), 503))
1006 .context("publish");
1007 assert!(
1008 is_retriable(wrapped.as_ref()),
1009 "5xx HttpError reached via as_ref() must classify retriable"
1010 );
1011 }
1012
1013 #[test]
1014 fn classifier_root_cause_walks_past_http_error_drift_guard() {
1015 let wrapped: anyhow::Error =
1020 anyhow::Error::new(HttpError::new(std::io::Error::other("503"), 503))
1021 .context("publish");
1022 assert!(
1023 !is_retriable(wrapped.root_cause()),
1024 "root_cause() walks past HttpError; 5xx must NOT be detected via the leaf"
1025 );
1026 }
1027
1028 #[test]
1029 fn classifier_429_via_anyhow_chain_uses_as_ref() {
1030 let wrapped: anyhow::Error =
1033 anyhow::Error::new(HttpError::new(std::io::Error::other("429"), 429))
1034 .context("publish");
1035 assert!(is_retriable(wrapped.as_ref()));
1036 assert!(!is_retriable(wrapped.root_cause()));
1037 }
1038
1039 use crate::test_helpers::responder::spawn_oneshot_http_responder;
1048
1049 #[test]
1050 fn retry_http_blocking_success_returns_first_attempt() {
1051 let (addr, calls) =
1052 spawn_oneshot_http_responder(vec!["HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok"]);
1053 let client = reqwest::blocking::Client::builder()
1054 .timeout(Duration::from_secs(2))
1055 .build()
1056 .expect("client");
1057 let policy = RetryPolicy {
1058 max_attempts: 3,
1059 base_delay: Duration::from_millis(1),
1060 max_delay: Duration::from_millis(2),
1061 };
1062 let result = retry_http_blocking(
1063 "test",
1064 &policy,
1065 SuccessClass::Strict,
1066 |_| client.get(format!("http://{addr}/")).send(),
1067 |_, _| String::from("should not be called on success"),
1068 );
1069 let (status, body) = result.expect("success");
1070 assert_eq!(status.as_u16(), 200);
1071 assert_eq!(body, "ok");
1072 assert_eq!(calls.load(Ordering::SeqCst), 1, "single attempt");
1073 }
1074
1075 #[test]
1076 fn retry_http_blocking_retries_5xx_then_succeeds() {
1077 let (addr, calls) = spawn_oneshot_http_responder(vec![
1078 "HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n",
1079 "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok",
1080 ]);
1081 let client = reqwest::blocking::Client::builder()
1082 .timeout(Duration::from_secs(2))
1083 .build()
1084 .expect("client");
1085 let policy = RetryPolicy {
1086 max_attempts: 3,
1087 base_delay: Duration::from_millis(1),
1088 max_delay: Duration::from_millis(2),
1089 };
1090 let result = retry_http_blocking(
1091 "test",
1092 &policy,
1093 SuccessClass::Strict,
1094 |_| client.get(format!("http://{addr}/")).send(),
1095 |status, body| format!("{status}: {body}"),
1096 );
1097 let (status, _) = result.expect("eventually succeeds");
1098 assert_eq!(status.as_u16(), 200);
1099 assert_eq!(calls.load(Ordering::SeqCst), 2, "one retry then success");
1100 }
1101
1102 #[test]
1103 fn retry_http_blocking_4xx_fast_fails_no_retry() {
1104 let (addr, calls) = spawn_oneshot_http_responder(vec![
1105 "HTTP/1.1 404 Not Found\r\nContent-Length: 9\r\n\r\nnot found",
1106 ]);
1107 let client = reqwest::blocking::Client::builder()
1108 .timeout(Duration::from_secs(2))
1109 .build()
1110 .expect("client");
1111 let policy = RetryPolicy {
1112 max_attempts: 5,
1113 base_delay: Duration::from_millis(1),
1114 max_delay: Duration::from_millis(2),
1115 };
1116 let result = retry_http_blocking(
1117 "myscope",
1118 &policy,
1119 SuccessClass::Strict,
1120 |_| client.get(format!("http://{addr}/")).send(),
1121 |status, body| format!("custom error: {status} body={body}"),
1122 );
1123 let err = result.expect_err("4xx must fast-fail");
1124 let chain = format!("{err:#}");
1125 assert!(
1126 chain.contains("custom error"),
1127 "error formatter must be invoked on non-success; got: {chain}"
1128 );
1129 assert!(chain.contains("404"), "status must be in chain: {chain}");
1130 assert_eq!(
1131 calls.load(Ordering::SeqCst),
1132 1,
1133 "4xx must NOT retry (only one connection accepted)"
1134 );
1135 }
1136
1137 #[test]
1138 fn retry_http_blocking_redirect_class_alters_success_predicate() {
1139 let (addr, _calls) = spawn_oneshot_http_responder(vec![
1140 "HTTP/1.1 307 Temporary Redirect\r\nLocation: /next\r\nContent-Length: 0\r\n\r\n",
1141 ]);
1142 let client = reqwest::blocking::Client::builder()
1143 .timeout(Duration::from_secs(2))
1144 .redirect(reqwest::redirect::Policy::none())
1146 .build()
1147 .expect("client");
1148 let policy = RetryPolicy {
1149 max_attempts: 3,
1150 base_delay: Duration::from_millis(1),
1151 max_delay: Duration::from_millis(2),
1152 };
1153 let result = retry_http_blocking(
1154 "test",
1155 &policy,
1156 SuccessClass::AllowRedirects,
1157 |_| client.get(format!("http://{addr}/")).send(),
1158 |_, _| String::from("should not be called on 3xx with AllowRedirects"),
1159 );
1160 let (status, _) = result.expect("3xx is success under AllowRedirects");
1161 assert_eq!(status.as_u16(), 307);
1162 }
1163
1164 #[tokio::test]
1175 async fn retry_http_async_success_returns_first_attempt() {
1176 let (addr, calls) =
1177 spawn_oneshot_http_responder(vec!["HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok"]);
1178 let client = reqwest::Client::builder()
1179 .timeout(Duration::from_secs(2))
1180 .build()
1181 .expect("client");
1182 let policy = RetryPolicy {
1183 max_attempts: 3,
1184 base_delay: Duration::from_millis(1),
1185 max_delay: Duration::from_millis(2),
1186 };
1187 let result = retry_http_async(
1188 "test",
1189 &policy,
1190 SuccessClass::Strict,
1191 |_| client.get(format!("http://{addr}/")).send(),
1192 |_, _| String::from("should not be called on success"),
1193 )
1194 .await;
1195 let resp = result.expect("success");
1196 assert_eq!(resp.status().as_u16(), 200);
1197 let body = resp.text().await.expect("body");
1198 assert_eq!(body, "ok");
1199 assert_eq!(calls.load(Ordering::SeqCst), 1, "single attempt");
1200 }
1201
1202 #[tokio::test]
1203 async fn retry_http_async_retries_5xx_then_succeeds() {
1204 let (addr, calls) = spawn_oneshot_http_responder(vec![
1205 "HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n",
1206 "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok",
1207 ]);
1208 let client = reqwest::Client::builder()
1209 .timeout(Duration::from_secs(2))
1210 .build()
1211 .expect("client");
1212 let policy = RetryPolicy {
1213 max_attempts: 3,
1214 base_delay: Duration::from_millis(1),
1215 max_delay: Duration::from_millis(2),
1216 };
1217 let result = retry_http_async(
1218 "test",
1219 &policy,
1220 SuccessClass::Strict,
1221 |_| client.get(format!("http://{addr}/")).send(),
1222 |status, body| format!("{status}: {body}"),
1223 )
1224 .await;
1225 let resp = result.expect("eventually succeeds");
1226 assert_eq!(resp.status().as_u16(), 200);
1227 assert_eq!(calls.load(Ordering::SeqCst), 2, "one retry then success");
1228 }
1229
1230 #[tokio::test]
1231 async fn retry_http_async_4xx_fast_fails_no_retry() {
1232 let (addr, calls) = spawn_oneshot_http_responder(vec![
1233 "HTTP/1.1 404 Not Found\r\nContent-Length: 9\r\n\r\nnot found",
1234 ]);
1235 let client = reqwest::Client::builder()
1236 .timeout(Duration::from_secs(2))
1237 .build()
1238 .expect("client");
1239 let policy = RetryPolicy {
1240 max_attempts: 5,
1241 base_delay: Duration::from_millis(1),
1242 max_delay: Duration::from_millis(2),
1243 };
1244 let result = retry_http_async(
1245 "myscope",
1246 &policy,
1247 SuccessClass::Strict,
1248 |_| client.get(format!("http://{addr}/")).send(),
1249 |status, body| format!("custom error: {status} body={body}"),
1250 )
1251 .await;
1252 let err = result.expect_err("4xx must fast-fail");
1253 let chain = format!("{err:#}");
1254 assert!(
1255 chain.contains("custom error"),
1256 "error formatter must be invoked on non-success; got: {chain}"
1257 );
1258 assert!(chain.contains("404"), "status must be in chain: {chain}");
1259 assert_eq!(
1260 calls.load(Ordering::SeqCst),
1261 1,
1262 "4xx must NOT retry (only one connection accepted)"
1263 );
1264 }
1265
1266 #[tokio::test]
1267 async fn retry_http_async_429_retries_then_succeeds() {
1268 let (addr, calls) = spawn_oneshot_http_responder(vec![
1273 "HTTP/1.1 429 Too Many Requests\r\nContent-Length: 0\r\n\r\n",
1274 "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok",
1275 ]);
1276 let client = reqwest::Client::builder()
1277 .timeout(Duration::from_secs(2))
1278 .build()
1279 .expect("client");
1280 let policy = RetryPolicy {
1281 max_attempts: 3,
1282 base_delay: Duration::from_millis(1),
1283 max_delay: Duration::from_millis(2),
1284 };
1285 let result = retry_http_async(
1286 "test",
1287 &policy,
1288 SuccessClass::Strict,
1289 |_| client.get(format!("http://{addr}/")).send(),
1290 |status, body| format!("{status}: {body}"),
1291 )
1292 .await;
1293 let resp = result.expect("429 retried then success");
1294 assert_eq!(resp.status().as_u16(), 200);
1295 assert_eq!(calls.load(Ordering::SeqCst), 2);
1296 }
1297
1298 const TRANSPORT_FAIL_URL: &str = "http://nonexistent.invalid/";
1322
1323 #[test]
1324 fn retry_http_blocking_transport_error_retries_then_fails() {
1325 let attempts = std::sync::Arc::new(AtomicU32::new(0));
1326 let attempts_inner = attempts.clone();
1327 let client = reqwest::blocking::Client::builder()
1328 .timeout(Duration::from_millis(500))
1329 .build()
1330 .expect("client");
1331 let policy = RetryPolicy {
1332 max_attempts: 3,
1333 base_delay: Duration::from_millis(1),
1334 max_delay: Duration::from_millis(2),
1335 };
1336 let result = retry_http_blocking(
1337 "test-transport",
1338 &policy,
1339 SuccessClass::Strict,
1340 |_| {
1341 attempts_inner.fetch_add(1, Ordering::SeqCst);
1342 client.get(TRANSPORT_FAIL_URL).send()
1343 },
1344 |_, _| String::from("non-success branch should not be reached"),
1345 );
1346 let err = result.expect_err("transport error must surface as Err");
1347 let chain = format!("{err:#}");
1348 assert!(
1349 attempts.load(Ordering::SeqCst) > 1,
1350 "transport error must be retried; got {} attempts; chain={chain}",
1351 attempts.load(Ordering::SeqCst)
1352 );
1353 assert!(
1354 chain.contains("test-transport"),
1355 "label must surface in error chain; got: {chain}"
1356 );
1357 }
1358
1359 #[tokio::test]
1360 async fn retry_http_async_transport_error_retries_then_fails() {
1361 let attempts = std::sync::Arc::new(AtomicU32::new(0));
1362 let attempts_inner = attempts.clone();
1363 let client = reqwest::Client::builder()
1364 .timeout(Duration::from_millis(500))
1365 .build()
1366 .expect("client");
1367 let policy = RetryPolicy {
1368 max_attempts: 3,
1369 base_delay: Duration::from_millis(1),
1370 max_delay: Duration::from_millis(2),
1371 };
1372 let result = retry_http_async(
1373 "test-transport-async",
1374 &policy,
1375 SuccessClass::Strict,
1376 |_| {
1377 attempts_inner.fetch_add(1, Ordering::SeqCst);
1378 client.get(TRANSPORT_FAIL_URL).send()
1379 },
1380 |_, _| String::from("non-success branch should not be reached"),
1381 )
1382 .await;
1383 let err = result.expect_err("transport error must surface as Err");
1384 assert!(
1385 attempts.load(Ordering::SeqCst) > 1,
1386 "transport error must be retried; got {} attempts",
1387 attempts.load(Ordering::SeqCst)
1388 );
1389 let chain = format!("{err:#}");
1390 assert!(
1391 chain.contains("test-transport-async"),
1392 "label must surface in error chain; got: {chain}"
1393 );
1394 }
1395}