1mod too_many_requests;
57
58use crate::error::Error;
59use crate::retry_result::RetryResult;
60use crate::retry_state::RetryState;
61use crate::throttle_result::ThrottleResult;
62use std::sync::Arc;
63use std::time::Duration;
64
65pub use too_many_requests::TooManyRequests;
66
67pub trait RetryPolicy: Send + Sync + std::fmt::Debug {
72 #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
78 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult;
79
80 #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
92 fn on_throttle(&self, _state: &RetryState, error: Error) -> ThrottleResult {
93 ThrottleResult::Continue(error)
94 }
95
96 #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
107 fn remaining_time(&self, _state: &RetryState) -> Option<Duration> {
108 None
109 }
110}
111
112#[derive(Clone, Debug)]
114pub struct RetryPolicyArg(Arc<dyn RetryPolicy>);
115
116impl<T> std::convert::From<T> for RetryPolicyArg
117where
118 T: RetryPolicy + 'static,
119{
120 fn from(value: T) -> Self {
121 Self(Arc::new(value))
122 }
123}
124
125impl std::convert::From<Arc<dyn RetryPolicy>> for RetryPolicyArg {
126 fn from(value: Arc<dyn RetryPolicy>) -> Self {
127 Self(value)
128 }
129}
130
131impl From<RetryPolicyArg> for Arc<dyn RetryPolicy> {
132 fn from(value: RetryPolicyArg) -> Arc<dyn RetryPolicy> {
133 value.0
134 }
135}
136
137pub trait RetryPolicyExt: RetryPolicy + Sized {
139 fn with_time_limit(self, maximum_duration: Duration) -> LimitedElapsedTime<Self> {
160 LimitedElapsedTime::custom(self, maximum_duration)
161 }
162
163 fn with_attempt_limit(self, maximum_attempts: u32) -> LimitedAttemptCount<Self> {
190 LimitedAttemptCount::custom(self, maximum_attempts)
191 }
192
193 fn continue_on_too_many_requests(self) -> TooManyRequests<Self> {
226 TooManyRequests::new(self)
227 }
228}
229
230impl<T: RetryPolicy> RetryPolicyExt for T {}
231
232#[derive(Clone, Debug)]
256pub struct Aip194Strict;
257
258impl RetryPolicy for Aip194Strict {
259 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
260 use crate::error::rpc::Code;
261 use http::StatusCode;
262
263 if error.is_transient_and_before_rpc() {
264 return RetryResult::Continue(error);
265 }
266 if !state.idempotent {
267 return RetryResult::Permanent(error);
268 }
269 if error.is_io() {
270 return RetryResult::Continue(error);
271 }
272 if error.status().is_some_and(|s| s.code == Code::Unavailable) {
273 return RetryResult::Continue(error);
274 }
275 if error
279 .http_status_code()
280 .is_some_and(|code| code == StatusCode::SERVICE_UNAVAILABLE.as_u16())
281 {
282 return RetryResult::Continue(error);
283 }
284 RetryResult::Permanent(error)
285 }
286}
287
288#[derive(Clone, Debug)]
309pub struct AlwaysRetry;
310
311impl RetryPolicy for AlwaysRetry {
312 fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
313 RetryResult::Continue(error)
314 }
315}
316
317#[derive(Clone, Debug)]
335pub struct NeverRetry;
336
337impl RetryPolicy for NeverRetry {
338 fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
339 RetryResult::Exhausted(error)
340 }
341}
342
343#[derive(thiserror::Error, Debug)]
345pub struct LimitedElapsedTimeError {
346 maximum_duration: Duration,
347 #[source]
348 source: Error,
349}
350
351impl LimitedElapsedTimeError {
352 pub(crate) fn new(maximum_duration: Duration, source: Error) -> Self {
353 Self {
354 maximum_duration,
355 source,
356 }
357 }
358
359 pub fn maximum_duration(&self) -> Duration {
361 self.maximum_duration
362 }
363}
364
365impl std::fmt::Display for LimitedElapsedTimeError {
366 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
367 write!(
368 f,
369 "retry policy is exhausted after {}s, the last retry attempt was throttled",
370 self.maximum_duration.as_secs_f64()
371 )
372 }
373}
374
375#[derive(Debug)]
390pub struct LimitedElapsedTime<P = Aip194Strict>
391where
392 P: RetryPolicy,
393{
394 inner: P,
395 maximum_duration: Duration,
396}
397
398impl LimitedElapsedTime {
399 pub fn new(maximum_duration: Duration) -> Self {
410 Self {
411 inner: Aip194Strict,
412 maximum_duration,
413 }
414 }
415}
416
417impl<P> LimitedElapsedTime<P>
418where
419 P: RetryPolicy,
420{
421 pub fn custom(inner: P, maximum_duration: Duration) -> Self {
439 Self {
440 inner,
441 maximum_duration,
442 }
443 }
444
445 fn error_if_exhausted(&self, state: &RetryState, error: Error) -> ThrottleResult {
446 let deadline = state.start + self.maximum_duration;
447 let now = tokio::time::Instant::now().into_std();
448 if now < deadline {
449 ThrottleResult::Continue(error)
450 } else {
451 ThrottleResult::Exhausted(Error::exhausted(LimitedElapsedTimeError::new(
452 self.maximum_duration,
453 error,
454 )))
455 }
456 }
457}
458
459impl<P> RetryPolicy for LimitedElapsedTime<P>
460where
461 P: RetryPolicy + 'static,
462{
463 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
464 match self.inner.on_error(state, error) {
465 RetryResult::Permanent(e) => RetryResult::Permanent(e),
466 RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
467 RetryResult::Continue(e) => {
468 if tokio::time::Instant::now().into_std() >= state.start + self.maximum_duration {
469 RetryResult::Exhausted(e)
470 } else {
471 RetryResult::Continue(e)
472 }
473 }
474 }
475 }
476
477 fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
478 match self.inner.on_throttle(state, error) {
479 ThrottleResult::Continue(e) => self.error_if_exhausted(state, e),
480 ThrottleResult::Exhausted(e) => ThrottleResult::Exhausted(e),
481 }
482 }
483
484 fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
485 let deadline = state.start + self.maximum_duration;
486 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now().into_std());
487 if let Some(inner) = self.inner.remaining_time(state) {
488 return Some(std::cmp::min(remaining, inner));
489 }
490 Some(remaining)
491 }
492}
493
494#[derive(Debug)]
509pub struct LimitedAttemptCount<P = Aip194Strict>
510where
511 P: RetryPolicy,
512{
513 inner: P,
514 maximum_attempts: u32,
515}
516
517impl LimitedAttemptCount {
518 pub fn new(maximum_attempts: u32) -> Self {
526 Self {
527 inner: Aip194Strict,
528 maximum_attempts,
529 }
530 }
531}
532
533impl<P> LimitedAttemptCount<P>
534where
535 P: RetryPolicy,
536{
537 pub fn custom(inner: P, maximum_attempts: u32) -> Self {
551 Self {
552 inner,
553 maximum_attempts,
554 }
555 }
556}
557
558impl<P> RetryPolicy for LimitedAttemptCount<P>
559where
560 P: RetryPolicy,
561{
562 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
563 match self.inner.on_error(state, error) {
564 RetryResult::Permanent(e) => RetryResult::Permanent(e),
565 RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
566 RetryResult::Continue(e) => {
567 if state.attempt_count >= self.maximum_attempts {
568 RetryResult::Exhausted(e)
569 } else {
570 RetryResult::Continue(e)
571 }
572 }
573 }
574 }
575
576 fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
577 assert!(state.attempt_count < self.maximum_attempts);
580 self.inner.on_throttle(state, error)
581 }
582
583 fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
584 self.inner.remaining_time(state)
585 }
586}
587
588#[cfg(test)]
589pub(crate) mod tests {
590 use super::*;
591 use http::HeaderMap;
592 use std::error::Error as StdError;
593 use std::time::Instant;
594
595 #[test]
597 fn retry_policy_arg() {
598 let policy = LimitedAttemptCount::new(3);
599 let _ = RetryPolicyArg::from(policy);
600
601 let policy: Arc<dyn RetryPolicy> = Arc::new(LimitedAttemptCount::new(3));
602 let _ = RetryPolicyArg::from(policy);
603 }
604
605 #[test]
606 fn aip194_strict() {
607 let p = Aip194Strict;
608
609 let now = Instant::now();
610 assert!(
611 p.on_error(&idempotent_state(now), unavailable())
612 .is_continue()
613 );
614 assert!(
615 p.on_error(&non_idempotent_state(now), unavailable())
616 .is_permanent()
617 );
618 assert!(matches!(
619 p.on_throttle(&idempotent_state(now), unavailable()),
620 ThrottleResult::Continue(_)
621 ));
622
623 assert!(
624 p.on_error(&idempotent_state(now), unknown_and_503())
625 .is_continue()
626 );
627 assert!(
628 p.on_error(&non_idempotent_state(now), unknown_and_503())
629 .is_permanent()
630 );
631 assert!(matches!(
632 p.on_throttle(&idempotent_state(now), unknown_and_503()),
633 ThrottleResult::Continue(_)
634 ));
635
636 assert!(
637 p.on_error(&idempotent_state(now), permission_denied())
638 .is_permanent()
639 );
640 assert!(
641 p.on_error(&non_idempotent_state(now), permission_denied())
642 .is_permanent()
643 );
644
645 assert!(
646 p.on_error(&idempotent_state(now), http_unavailable())
647 .is_continue()
648 );
649 assert!(
650 p.on_error(&non_idempotent_state(now), http_unavailable())
651 .is_permanent()
652 );
653 assert!(matches!(
654 p.on_throttle(&idempotent_state(now), http_unavailable()),
655 ThrottleResult::Continue(_)
656 ));
657
658 assert!(
659 p.on_error(&idempotent_state(now), http_permission_denied())
660 .is_permanent()
661 );
662 assert!(
663 p.on_error(&non_idempotent_state(now), http_permission_denied())
664 .is_permanent()
665 );
666
667 assert!(
668 p.on_error(&idempotent_state(now), Error::io("err".to_string()))
669 .is_continue()
670 );
671 assert!(
672 p.on_error(&non_idempotent_state(now), Error::io("err".to_string()))
673 .is_permanent()
674 );
675
676 assert!(
677 p.on_error(&idempotent_state(now), pre_rpc_transient())
678 .is_continue()
679 );
680 assert!(
681 p.on_error(&non_idempotent_state(now), pre_rpc_transient())
682 .is_continue()
683 );
684
685 assert!(
686 p.on_error(&idempotent_state(now), Error::ser("err"))
687 .is_permanent()
688 );
689 assert!(
690 p.on_error(&non_idempotent_state(now), Error::ser("err"))
691 .is_permanent()
692 );
693 assert!(
694 p.on_error(&idempotent_state(now), Error::deser("err"))
695 .is_permanent()
696 );
697 assert!(
698 p.on_error(&non_idempotent_state(now), Error::deser("err"))
699 .is_permanent()
700 );
701
702 assert!(
703 p.remaining_time(&idempotent_state(now)).is_none(),
704 "p={p:?}, now={now:?}"
705 );
706 }
707
708 #[test]
709 fn always_retry() {
710 let p = AlwaysRetry;
711
712 let now = Instant::now();
713 assert!(
714 p.remaining_time(&idempotent_state(now)).is_none(),
715 "p={p:?}, now={now:?}"
716 );
717 assert!(
718 p.on_error(&idempotent_state(now), http_unavailable())
719 .is_continue()
720 );
721 assert!(
722 p.on_error(&non_idempotent_state(now), http_unavailable())
723 .is_continue()
724 );
725 assert!(matches!(
726 p.on_throttle(&idempotent_state(now), http_unavailable()),
727 ThrottleResult::Continue(_)
728 ));
729
730 assert!(
731 p.on_error(&idempotent_state(now), unavailable())
732 .is_continue()
733 );
734 assert!(
735 p.on_error(&non_idempotent_state(now), unavailable())
736 .is_continue()
737 );
738 }
739
740 #[test_case::test_case(true, Error::io("err"))]
741 #[test_case::test_case(true, pre_rpc_transient())]
742 #[test_case::test_case(true, Error::ser("err"))]
743 #[test_case::test_case(false, Error::io("err"))]
744 #[test_case::test_case(false, pre_rpc_transient())]
745 #[test_case::test_case(false, Error::ser("err"))]
746 fn always_retry_error_kind(idempotent: bool, error: Error) {
747 let p = AlwaysRetry;
748 let now = Instant::now();
749 let state = if idempotent {
750 idempotent_state(now)
751 } else {
752 non_idempotent_state(now)
753 };
754 assert!(p.on_error(&state, error).is_continue());
755 }
756
757 #[test]
758 fn never_retry() {
759 let p = NeverRetry;
760
761 let now = Instant::now();
762 assert!(
763 p.remaining_time(&idempotent_state(now)).is_none(),
764 "p={p:?}, now={now:?}"
765 );
766 assert!(
767 p.on_error(&idempotent_state(now), http_unavailable())
768 .is_exhausted()
769 );
770 assert!(
771 p.on_error(&non_idempotent_state(now), http_unavailable())
772 .is_exhausted()
773 );
774 assert!(matches!(
775 p.on_throttle(&idempotent_state(now), http_unavailable()),
776 ThrottleResult::Continue(_)
777 ));
778
779 assert!(
780 p.on_error(&idempotent_state(now), unavailable())
781 .is_exhausted()
782 );
783 assert!(
784 p.on_error(&non_idempotent_state(now), unavailable())
785 .is_exhausted()
786 );
787
788 assert!(
789 p.on_error(&idempotent_state(now), http_permission_denied())
790 .is_exhausted()
791 );
792 assert!(
793 p.on_error(&non_idempotent_state(now), http_permission_denied())
794 .is_exhausted()
795 );
796 }
797
798 #[test_case::test_case(true, Error::io("err"))]
799 #[test_case::test_case(true, pre_rpc_transient())]
800 #[test_case::test_case(true, Error::ser("err"))]
801 #[test_case::test_case(false, Error::io("err"))]
802 #[test_case::test_case(false, pre_rpc_transient())]
803 #[test_case::test_case(false, Error::ser("err"))]
804 fn never_retry_error_kind(idempotent: bool, error: Error) {
805 let p = NeverRetry;
806 let now = Instant::now();
807 let state = if idempotent {
808 idempotent_state(now)
809 } else {
810 non_idempotent_state(now)
811 };
812 assert!(p.on_error(&state, error).is_exhausted());
813 }
814
815 fn pre_rpc_transient() -> Error {
816 use crate::error::CredentialsError;
817 Error::authentication(CredentialsError::from_msg(true, "err"))
818 }
819
820 fn http_unavailable() -> Error {
821 Error::http(
822 503_u16,
823 HeaderMap::new(),
824 bytes::Bytes::from_owner("SERVICE UNAVAILABLE".to_string()),
825 )
826 }
827
828 fn http_permission_denied() -> Error {
829 Error::http(
830 403_u16,
831 HeaderMap::new(),
832 bytes::Bytes::from_owner("PERMISSION DENIED".to_string()),
833 )
834 }
835
836 fn unavailable() -> Error {
837 use crate::error::rpc::Code;
838 let status = crate::error::rpc::Status::default()
839 .set_code(Code::Unavailable)
840 .set_message("UNAVAILABLE");
841 Error::service(status)
842 }
843
844 fn unknown_and_503() -> Error {
845 use crate::error::rpc::Code;
846 let status = crate::error::rpc::Status::default()
847 .set_code(Code::Unknown)
848 .set_message("UNAVAILABLE");
849 Error::service_full(status, Some(503), None, Some("source error".into()))
850 }
851
852 fn permission_denied() -> Error {
853 use crate::error::rpc::Code;
854 let status = crate::error::rpc::Status::default()
855 .set_code(Code::PermissionDenied)
856 .set_message("PERMISSION_DENIED");
857 Error::service(status)
858 }
859
860 mockall::mock! {
861 #[derive(Debug)]
862 pub(crate) Policy {}
863 impl RetryPolicy for Policy {
864 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult;
865 fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult;
866 fn remaining_time(&self, state: &RetryState) -> Option<Duration>;
867 }
868 }
869
870 #[test]
871 fn limited_elapsed_time_error() {
872 let limit = Duration::from_secs(123) + Duration::from_millis(567);
873 let err = LimitedElapsedTimeError::new(limit, unavailable());
874 assert_eq!(err.maximum_duration(), limit);
875 let fmt = err.to_string();
876 assert!(fmt.contains("123.567s"), "display={fmt}, debug={err:?}");
877 assert!(err.source().is_some(), "{err:?}");
878 }
879
880 #[test]
881 fn test_limited_time_forwards() {
882 let mut mock = MockPolicy::new();
883 mock.expect_on_error()
884 .times(1..)
885 .returning(|_, e| RetryResult::Continue(e));
886 mock.expect_on_throttle()
887 .times(1..)
888 .returning(|_, e| ThrottleResult::Continue(e));
889 mock.expect_remaining_time().times(1).returning(|_| None);
890
891 let now = Instant::now();
892 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
893 let rf = policy.on_error(&idempotent_state(now), transient_error());
894 assert!(rf.is_continue());
895
896 let rt = policy.remaining_time(&idempotent_state(now));
897 assert!(rt.is_some(), "policy={policy:?}, now={now:?}");
898
899 let e = policy.on_throttle(&idempotent_state(now), transient_error());
900 assert!(matches!(e, ThrottleResult::Continue(_)));
901 }
902
903 #[test]
904 fn test_limited_time_on_throttle_continue() {
905 let mut mock = MockPolicy::new();
906 mock.expect_on_throttle()
907 .times(1..)
908 .returning(|_, e| ThrottleResult::Continue(e));
909
910 let now = Instant::now();
911 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
912
913 let rf = policy.on_throttle(
915 &idempotent_state(now - Duration::from_secs(50)),
916 unavailable(),
917 );
918 assert!(matches!(rf, ThrottleResult::Continue(_)), "{rf:?}");
919
920 let rf = policy.on_throttle(
922 &idempotent_state(now - Duration::from_secs(70)),
923 unavailable(),
924 );
925 assert!(matches!(rf, ThrottleResult::Exhausted(_)), "{rf:?}");
926 }
927
928 #[test]
929 fn test_limited_time_on_throttle_exhausted() {
930 let mut mock = MockPolicy::new();
931 mock.expect_on_throttle()
932 .times(1..)
933 .returning(|_, e| ThrottleResult::Exhausted(e));
934
935 let now = Instant::now();
936 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
937
938 let rf = policy.on_throttle(
940 &idempotent_state(now - Duration::from_secs(50)),
941 unavailable(),
942 );
943 assert!(matches!(rf, ThrottleResult::Exhausted(_)), "{rf:?}");
944 }
945
946 #[test]
947 fn test_limited_time_inner_continues() {
948 let mut mock = MockPolicy::new();
949 mock.expect_on_error()
950 .times(1..)
951 .returning(|_, e| RetryResult::Continue(e));
952
953 let now = Instant::now();
954 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
955 let rf = policy.on_error(
956 &idempotent_state(now - Duration::from_secs(10)),
957 transient_error(),
958 );
959 assert!(rf.is_continue());
960
961 let rf = policy.on_error(
962 &idempotent_state(now - Duration::from_secs(70)),
963 transient_error(),
964 );
965 assert!(rf.is_exhausted());
966 }
967
968 #[test]
969 fn test_limited_time_inner_permanent() {
970 let mut mock = MockPolicy::new();
971 mock.expect_on_error()
972 .times(2)
973 .returning(|_, e| RetryResult::Permanent(e));
974
975 let now = Instant::now();
976 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
977
978 let rf = policy.on_error(
979 &non_idempotent_state(now - Duration::from_secs(10)),
980 transient_error(),
981 );
982 assert!(rf.is_permanent());
983
984 let rf = policy.on_error(
985 &non_idempotent_state(now + Duration::from_secs(10)),
986 transient_error(),
987 );
988 assert!(rf.is_permanent());
989 }
990
991 #[test]
992 fn test_limited_time_inner_exhausted() {
993 let mut mock = MockPolicy::new();
994 mock.expect_on_error()
995 .times(2)
996 .returning(|_, e| RetryResult::Exhausted(e));
997
998 let now = Instant::now();
999 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
1000
1001 let rf = policy.on_error(
1002 &non_idempotent_state(now - Duration::from_secs(10)),
1003 transient_error(),
1004 );
1005 assert!(rf.is_exhausted());
1006
1007 let rf = policy.on_error(
1008 &non_idempotent_state(now + Duration::from_secs(10)),
1009 transient_error(),
1010 );
1011 assert!(rf.is_exhausted());
1012 }
1013
1014 #[test]
1015 fn test_limited_time_remaining_inner_longer() {
1016 let mut mock = MockPolicy::new();
1017 mock.expect_remaining_time()
1018 .times(1)
1019 .returning(|_| Some(Duration::from_secs(30)));
1020
1021 let now = Instant::now();
1022 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
1023
1024 let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(55)));
1025 assert!(remaining <= Some(Duration::from_secs(5)), "{remaining:?}");
1026 }
1027
1028 #[test]
1029 fn test_limited_time_remaining_inner_shorter() {
1030 let mut mock = MockPolicy::new();
1031 mock.expect_remaining_time()
1032 .times(1)
1033 .returning(|_| Some(Duration::from_secs(5)));
1034 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
1035
1036 let now = Instant::now();
1037 let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(5)));
1038 assert!(remaining <= Some(Duration::from_secs(10)), "{remaining:?}");
1039 }
1040
1041 #[test]
1042 fn test_limited_time_remaining_inner_is_none() {
1043 let mut mock = MockPolicy::new();
1044 mock.expect_remaining_time().times(1).returning(|_| None);
1045 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
1046
1047 let now = Instant::now();
1048 let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(50)));
1049 assert!(remaining <= Some(Duration::from_secs(10)), "{remaining:?}");
1050 }
1051
1052 #[test]
1053 fn test_limited_attempt_count_on_error() {
1054 let mut mock = MockPolicy::new();
1055 mock.expect_on_error()
1056 .times(1..)
1057 .returning(|_, e| RetryResult::Continue(e));
1058
1059 let now = Instant::now();
1060 let policy = LimitedAttemptCount::custom(mock, 3);
1061 assert!(
1062 policy
1063 .on_error(
1064 &idempotent_state(now).set_attempt_count(1_u32),
1065 transient_error()
1066 )
1067 .is_continue()
1068 );
1069 assert!(
1070 policy
1071 .on_error(
1072 &idempotent_state(now).set_attempt_count(2_u32),
1073 transient_error()
1074 )
1075 .is_continue()
1076 );
1077 assert!(
1078 policy
1079 .on_error(
1080 &idempotent_state(now).set_attempt_count(3_u32),
1081 transient_error()
1082 )
1083 .is_exhausted()
1084 );
1085 }
1086
1087 #[test]
1088 fn test_limited_attempt_count_on_throttle_continue() {
1089 let mut mock = MockPolicy::new();
1090 mock.expect_on_throttle()
1091 .times(1..)
1092 .returning(|_, e| ThrottleResult::Continue(e));
1093
1094 let now = Instant::now();
1095 let policy = LimitedAttemptCount::custom(mock, 3);
1096 assert!(matches!(
1097 policy.on_throttle(
1098 &idempotent_state(now).set_attempt_count(2_u32),
1099 unavailable()
1100 ),
1101 ThrottleResult::Continue(_)
1102 ));
1103 }
1104
1105 #[test]
1106 fn test_limited_attempt_count_on_throttle_error() {
1107 let mut mock = MockPolicy::new();
1108 mock.expect_on_throttle()
1109 .times(1..)
1110 .returning(|_, e| ThrottleResult::Exhausted(e));
1111
1112 let now = Instant::now();
1113 let policy = LimitedAttemptCount::custom(mock, 3);
1114 assert!(matches!(
1115 policy.on_throttle(&idempotent_state(now), unavailable()),
1116 ThrottleResult::Exhausted(_)
1117 ));
1118 }
1119
1120 #[test]
1121 fn test_limited_attempt_count_remaining_none() {
1122 let mut mock = MockPolicy::new();
1123 mock.expect_remaining_time().times(1).returning(|_| None);
1124 let policy = LimitedAttemptCount::custom(mock, 3);
1125
1126 let now = Instant::now();
1127 assert!(
1128 policy.remaining_time(&idempotent_state(now)).is_none(),
1129 "policy={policy:?} now={now:?}"
1130 );
1131 }
1132
1133 #[test]
1134 fn test_limited_attempt_count_remaining_some() {
1135 let mut mock = MockPolicy::new();
1136 mock.expect_remaining_time()
1137 .times(1)
1138 .returning(|_| Some(Duration::from_secs(123)));
1139 let policy = LimitedAttemptCount::custom(mock, 3);
1140
1141 let now = Instant::now();
1142 assert_eq!(
1143 policy.remaining_time(&idempotent_state(now)),
1144 Some(Duration::from_secs(123))
1145 );
1146 }
1147
1148 #[test]
1149 fn test_limited_attempt_count_inner_permanent() {
1150 let mut mock = MockPolicy::new();
1151 mock.expect_on_error()
1152 .times(2)
1153 .returning(|_, e| RetryResult::Permanent(e));
1154 let policy = LimitedAttemptCount::custom(mock, 2);
1155 let now = Instant::now();
1156
1157 let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1158 assert!(rf.is_permanent());
1159
1160 let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1161 assert!(rf.is_permanent());
1162 }
1163
1164 #[test]
1165 fn test_limited_attempt_count_inner_exhausted() {
1166 let mut mock = MockPolicy::new();
1167 mock.expect_on_error()
1168 .times(2)
1169 .returning(|_, e| RetryResult::Exhausted(e));
1170 let policy = LimitedAttemptCount::custom(mock, 2);
1171 let now = Instant::now();
1172
1173 let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1174 assert!(rf.is_exhausted());
1175
1176 let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1177 assert!(rf.is_exhausted());
1178 }
1179
1180 fn transient_error() -> Error {
1181 use crate::error::rpc::{Code, Status};
1182 Error::service(
1183 Status::default()
1184 .set_code(Code::Unavailable)
1185 .set_message("try-again"),
1186 )
1187 }
1188
1189 pub(crate) fn idempotent_state(now: Instant) -> RetryState {
1190 RetryState::new(true).set_start(now)
1191 }
1192
1193 pub(crate) fn non_idempotent_state(now: Instant) -> RetryState {
1194 RetryState::new(false).set_start(now)
1195 }
1196}