1mod too_many_requests;
57
58use crate::error::Error;
59use crate::retry_result::RetryResult;
60use crate::retry_state::RetryState;
61use crate::throttle_result::ThrottleResult;
62use std::sync::Arc;
63use std::time::Duration;
64
65pub use too_many_requests::TooManyRequests;
66
67pub trait RetryPolicy: Send + Sync + std::fmt::Debug {
72 #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
78 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult;
79
80 #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
92 fn on_throttle(&self, _state: &RetryState, error: Error) -> ThrottleResult {
93 ThrottleResult::Continue(error)
94 }
95
96 #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
107 fn remaining_time(&self, _state: &RetryState) -> Option<Duration> {
108 None
109 }
110}
111
112#[derive(Clone, Debug)]
114pub struct RetryPolicyArg(Arc<dyn RetryPolicy>);
115
116impl<T> std::convert::From<T> for RetryPolicyArg
117where
118 T: RetryPolicy + 'static,
119{
120 fn from(value: T) -> Self {
121 Self(Arc::new(value))
122 }
123}
124
125impl std::convert::From<Arc<dyn RetryPolicy>> for RetryPolicyArg {
126 fn from(value: Arc<dyn RetryPolicy>) -> Self {
127 Self(value)
128 }
129}
130
131impl From<RetryPolicyArg> for Arc<dyn RetryPolicy> {
132 fn from(value: RetryPolicyArg) -> Arc<dyn RetryPolicy> {
133 value.0
134 }
135}
136
137pub trait RetryPolicyExt: RetryPolicy + Sized {
139 fn with_time_limit(self, maximum_duration: Duration) -> LimitedElapsedTime<Self> {
160 LimitedElapsedTime::custom(self, maximum_duration)
161 }
162
163 fn with_attempt_limit(self, maximum_attempts: u32) -> LimitedAttemptCount<Self> {
190 LimitedAttemptCount::custom(self, maximum_attempts)
191 }
192
193 fn continue_on_too_many_requests(self) -> TooManyRequests<Self> {
226 TooManyRequests::new(self)
227 }
228}
229
230impl<T: RetryPolicy> RetryPolicyExt for T {}
231
232#[derive(Clone, Debug)]
256pub struct Aip194Strict;
257
258impl RetryPolicy for Aip194Strict {
259 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
260 use crate::error::rpc::Code;
261 use http::StatusCode;
262
263 if error.is_transient_and_before_rpc() {
264 return RetryResult::Continue(error);
265 }
266 if !state.idempotent {
267 return RetryResult::Permanent(error);
268 }
269 if error.is_io() {
270 return RetryResult::Continue(error);
271 }
272 if error.status().is_some_and(|s| s.code == Code::Unavailable) {
273 return RetryResult::Continue(error);
274 }
275 if error
279 .http_status_code()
280 .is_some_and(|code| code == StatusCode::SERVICE_UNAVAILABLE.as_u16())
281 {
282 return RetryResult::Continue(error);
283 }
284 RetryResult::Permanent(error)
285 }
286}
287
288#[derive(Clone, Debug)]
309pub struct AlwaysRetry;
310
311impl RetryPolicy for AlwaysRetry {
312 fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
313 RetryResult::Continue(error)
314 }
315}
316
317#[derive(Clone, Debug)]
335pub struct NeverRetry;
336
337impl RetryPolicy for NeverRetry {
338 fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
339 RetryResult::Exhausted(error)
340 }
341}
342
343#[derive(thiserror::Error, Debug)]
344pub struct LimitedElapsedTimeError {
345 maximum_duration: Duration,
346 #[source]
347 source: Error,
348}
349
350impl LimitedElapsedTimeError {
351 pub(crate) fn new(maximum_duration: Duration, source: Error) -> Self {
352 Self {
353 maximum_duration,
354 source,
355 }
356 }
357
358 pub fn maximum_duration(&self) -> Duration {
360 self.maximum_duration
361 }
362}
363
364impl std::fmt::Display for LimitedElapsedTimeError {
365 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
366 write!(
367 f,
368 "retry policy is exhausted after {}s, the last retry attempt was throttled",
369 self.maximum_duration.as_secs_f64()
370 )
371 }
372}
373
374#[derive(Debug)]
389pub struct LimitedElapsedTime<P = Aip194Strict>
390where
391 P: RetryPolicy,
392{
393 inner: P,
394 maximum_duration: Duration,
395}
396
397impl LimitedElapsedTime {
398 pub fn new(maximum_duration: Duration) -> Self {
409 Self {
410 inner: Aip194Strict,
411 maximum_duration,
412 }
413 }
414}
415
416impl<P> LimitedElapsedTime<P>
417where
418 P: RetryPolicy,
419{
420 pub fn custom(inner: P, maximum_duration: Duration) -> Self {
438 Self {
439 inner,
440 maximum_duration,
441 }
442 }
443
444 fn error_if_exhausted(&self, state: &RetryState, error: Error) -> ThrottleResult {
445 let deadline = state.start + self.maximum_duration;
446 let now = tokio::time::Instant::now().into_std();
447 if now < deadline {
448 ThrottleResult::Continue(error)
449 } else {
450 ThrottleResult::Exhausted(Error::exhausted(LimitedElapsedTimeError::new(
451 self.maximum_duration,
452 error,
453 )))
454 }
455 }
456}
457
458impl<P> RetryPolicy for LimitedElapsedTime<P>
459where
460 P: RetryPolicy + 'static,
461{
462 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
463 match self.inner.on_error(state, error) {
464 RetryResult::Permanent(e) => RetryResult::Permanent(e),
465 RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
466 RetryResult::Continue(e) => {
467 if tokio::time::Instant::now().into_std() >= state.start + self.maximum_duration {
468 RetryResult::Exhausted(e)
469 } else {
470 RetryResult::Continue(e)
471 }
472 }
473 }
474 }
475
476 fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
477 match self.inner.on_throttle(state, error) {
478 ThrottleResult::Continue(e) => self.error_if_exhausted(state, e),
479 ThrottleResult::Exhausted(e) => ThrottleResult::Exhausted(e),
480 }
481 }
482
483 fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
484 let deadline = state.start + self.maximum_duration;
485 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now().into_std());
486 if let Some(inner) = self.inner.remaining_time(state) {
487 return Some(std::cmp::min(remaining, inner));
488 }
489 Some(remaining)
490 }
491}
492
493#[derive(Debug)]
508pub struct LimitedAttemptCount<P = Aip194Strict>
509where
510 P: RetryPolicy,
511{
512 inner: P,
513 maximum_attempts: u32,
514}
515
516impl LimitedAttemptCount {
517 pub fn new(maximum_attempts: u32) -> Self {
525 Self {
526 inner: Aip194Strict,
527 maximum_attempts,
528 }
529 }
530}
531
532impl<P> LimitedAttemptCount<P>
533where
534 P: RetryPolicy,
535{
536 pub fn custom(inner: P, maximum_attempts: u32) -> Self {
550 Self {
551 inner,
552 maximum_attempts,
553 }
554 }
555}
556
557impl<P> RetryPolicy for LimitedAttemptCount<P>
558where
559 P: RetryPolicy,
560{
561 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
562 match self.inner.on_error(state, error) {
563 RetryResult::Permanent(e) => RetryResult::Permanent(e),
564 RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
565 RetryResult::Continue(e) => {
566 if state.attempt_count >= self.maximum_attempts {
567 RetryResult::Exhausted(e)
568 } else {
569 RetryResult::Continue(e)
570 }
571 }
572 }
573 }
574
575 fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
576 assert!(state.attempt_count < self.maximum_attempts);
579 self.inner.on_throttle(state, error)
580 }
581
582 fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
583 self.inner.remaining_time(state)
584 }
585}
586
587#[cfg(test)]
588pub mod tests {
589 use super::*;
590 use http::HeaderMap;
591 use std::error::Error as StdError;
592 use std::time::Instant;
593
594 #[test]
596 fn retry_policy_arg() {
597 let policy = LimitedAttemptCount::new(3);
598 let _ = RetryPolicyArg::from(policy);
599
600 let policy: Arc<dyn RetryPolicy> = Arc::new(LimitedAttemptCount::new(3));
601 let _ = RetryPolicyArg::from(policy);
602 }
603
604 #[test]
605 fn aip194_strict() {
606 let p = Aip194Strict;
607
608 let now = Instant::now();
609 assert!(
610 p.on_error(&idempotent_state(now), unavailable())
611 .is_continue()
612 );
613 assert!(
614 p.on_error(&non_idempotent_state(now), unavailable())
615 .is_permanent()
616 );
617 assert!(matches!(
618 p.on_throttle(&idempotent_state(now), unavailable()),
619 ThrottleResult::Continue(_)
620 ));
621
622 assert!(
623 p.on_error(&idempotent_state(now), unknown_and_503())
624 .is_continue()
625 );
626 assert!(
627 p.on_error(&non_idempotent_state(now), unknown_and_503())
628 .is_permanent()
629 );
630 assert!(matches!(
631 p.on_throttle(&idempotent_state(now), unknown_and_503()),
632 ThrottleResult::Continue(_)
633 ));
634
635 assert!(
636 p.on_error(&idempotent_state(now), permission_denied())
637 .is_permanent()
638 );
639 assert!(
640 p.on_error(&non_idempotent_state(now), permission_denied())
641 .is_permanent()
642 );
643
644 assert!(
645 p.on_error(&idempotent_state(now), http_unavailable())
646 .is_continue()
647 );
648 assert!(
649 p.on_error(&non_idempotent_state(now), http_unavailable())
650 .is_permanent()
651 );
652 assert!(matches!(
653 p.on_throttle(&idempotent_state(now), http_unavailable()),
654 ThrottleResult::Continue(_)
655 ));
656
657 assert!(
658 p.on_error(&idempotent_state(now), http_permission_denied())
659 .is_permanent()
660 );
661 assert!(
662 p.on_error(&non_idempotent_state(now), http_permission_denied())
663 .is_permanent()
664 );
665
666 assert!(
667 p.on_error(&idempotent_state(now), Error::io("err".to_string()))
668 .is_continue()
669 );
670 assert!(
671 p.on_error(&non_idempotent_state(now), Error::io("err".to_string()))
672 .is_permanent()
673 );
674
675 assert!(
676 p.on_error(&idempotent_state(now), pre_rpc_transient())
677 .is_continue()
678 );
679 assert!(
680 p.on_error(&non_idempotent_state(now), pre_rpc_transient())
681 .is_continue()
682 );
683
684 assert!(
685 p.on_error(&idempotent_state(now), Error::ser("err"))
686 .is_permanent()
687 );
688 assert!(
689 p.on_error(&non_idempotent_state(now), Error::ser("err"))
690 .is_permanent()
691 );
692 assert!(
693 p.on_error(&idempotent_state(now), Error::deser("err"))
694 .is_permanent()
695 );
696 assert!(
697 p.on_error(&non_idempotent_state(now), Error::deser("err"))
698 .is_permanent()
699 );
700
701 assert!(
702 p.remaining_time(&idempotent_state(now)).is_none(),
703 "p={p:?}, now={now:?}"
704 );
705 }
706
707 #[test]
708 fn always_retry() {
709 let p = AlwaysRetry;
710
711 let now = Instant::now();
712 assert!(
713 p.remaining_time(&idempotent_state(now)).is_none(),
714 "p={p:?}, now={now:?}"
715 );
716 assert!(
717 p.on_error(&idempotent_state(now), http_unavailable())
718 .is_continue()
719 );
720 assert!(
721 p.on_error(&non_idempotent_state(now), http_unavailable())
722 .is_continue()
723 );
724 assert!(matches!(
725 p.on_throttle(&idempotent_state(now), http_unavailable()),
726 ThrottleResult::Continue(_)
727 ));
728
729 assert!(
730 p.on_error(&idempotent_state(now), unavailable())
731 .is_continue()
732 );
733 assert!(
734 p.on_error(&non_idempotent_state(now), unavailable())
735 .is_continue()
736 );
737 }
738
739 #[test_case::test_case(true, Error::io("err"))]
740 #[test_case::test_case(true, pre_rpc_transient())]
741 #[test_case::test_case(true, Error::ser("err"))]
742 #[test_case::test_case(false, Error::io("err"))]
743 #[test_case::test_case(false, pre_rpc_transient())]
744 #[test_case::test_case(false, Error::ser("err"))]
745 fn always_retry_error_kind(idempotent: bool, error: Error) {
746 let p = AlwaysRetry;
747 let now = Instant::now();
748 let state = if idempotent {
749 idempotent_state(now)
750 } else {
751 non_idempotent_state(now)
752 };
753 assert!(p.on_error(&state, error).is_continue());
754 }
755
756 #[test]
757 fn never_retry() {
758 let p = NeverRetry;
759
760 let now = Instant::now();
761 assert!(
762 p.remaining_time(&idempotent_state(now)).is_none(),
763 "p={p:?}, now={now:?}"
764 );
765 assert!(
766 p.on_error(&idempotent_state(now), http_unavailable())
767 .is_exhausted()
768 );
769 assert!(
770 p.on_error(&non_idempotent_state(now), http_unavailable())
771 .is_exhausted()
772 );
773 assert!(matches!(
774 p.on_throttle(&idempotent_state(now), http_unavailable()),
775 ThrottleResult::Continue(_)
776 ));
777
778 assert!(
779 p.on_error(&idempotent_state(now), unavailable())
780 .is_exhausted()
781 );
782 assert!(
783 p.on_error(&non_idempotent_state(now), unavailable())
784 .is_exhausted()
785 );
786
787 assert!(
788 p.on_error(&idempotent_state(now), http_permission_denied())
789 .is_exhausted()
790 );
791 assert!(
792 p.on_error(&non_idempotent_state(now), http_permission_denied())
793 .is_exhausted()
794 );
795 }
796
797 #[test_case::test_case(true, Error::io("err"))]
798 #[test_case::test_case(true, pre_rpc_transient())]
799 #[test_case::test_case(true, Error::ser("err"))]
800 #[test_case::test_case(false, Error::io("err"))]
801 #[test_case::test_case(false, pre_rpc_transient())]
802 #[test_case::test_case(false, Error::ser("err"))]
803 fn never_retry_error_kind(idempotent: bool, error: Error) {
804 let p = NeverRetry;
805 let now = Instant::now();
806 let state = if idempotent {
807 idempotent_state(now)
808 } else {
809 non_idempotent_state(now)
810 };
811 assert!(p.on_error(&state, error).is_exhausted());
812 }
813
814 fn pre_rpc_transient() -> Error {
815 use crate::error::CredentialsError;
816 Error::authentication(CredentialsError::from_msg(true, "err"))
817 }
818
819 fn http_unavailable() -> Error {
820 Error::http(
821 503_u16,
822 HeaderMap::new(),
823 bytes::Bytes::from_owner("SERVICE UNAVAILABLE".to_string()),
824 )
825 }
826
827 fn http_permission_denied() -> Error {
828 Error::http(
829 403_u16,
830 HeaderMap::new(),
831 bytes::Bytes::from_owner("PERMISSION DENIED".to_string()),
832 )
833 }
834
835 fn unavailable() -> Error {
836 use crate::error::rpc::Code;
837 let status = crate::error::rpc::Status::default()
838 .set_code(Code::Unavailable)
839 .set_message("UNAVAILABLE");
840 Error::service(status)
841 }
842
843 fn unknown_and_503() -> Error {
844 use crate::error::rpc::Code;
845 let status = crate::error::rpc::Status::default()
846 .set_code(Code::Unknown)
847 .set_message("UNAVAILABLE");
848 Error::service_full(status, Some(503), None, Some("source error".into()))
849 }
850
851 fn permission_denied() -> Error {
852 use crate::error::rpc::Code;
853 let status = crate::error::rpc::Status::default()
854 .set_code(Code::PermissionDenied)
855 .set_message("PERMISSION_DENIED");
856 Error::service(status)
857 }
858
859 mockall::mock! {
860 #[derive(Debug)]
861 pub(crate) Policy {}
862 impl RetryPolicy for Policy {
863 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult;
864 fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult;
865 fn remaining_time(&self, state: &RetryState) -> Option<Duration>;
866 }
867 }
868
869 #[test]
870 fn limited_elapsed_time_error() {
871 let limit = Duration::from_secs(123) + Duration::from_millis(567);
872 let err = LimitedElapsedTimeError::new(limit, unavailable());
873 assert_eq!(err.maximum_duration(), limit);
874 let fmt = err.to_string();
875 assert!(fmt.contains("123.567s"), "display={fmt}, debug={err:?}");
876 assert!(err.source().is_some(), "{err:?}");
877 }
878
879 #[test]
880 fn test_limited_time_forwards() {
881 let mut mock = MockPolicy::new();
882 mock.expect_on_error()
883 .times(1..)
884 .returning(|_, e| RetryResult::Continue(e));
885 mock.expect_on_throttle()
886 .times(1..)
887 .returning(|_, e| ThrottleResult::Continue(e));
888 mock.expect_remaining_time().times(1).returning(|_| None);
889
890 let now = Instant::now();
891 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
892 let rf = policy.on_error(&idempotent_state(now), transient_error());
893 assert!(rf.is_continue());
894
895 let rt = policy.remaining_time(&idempotent_state(now));
896 assert!(rt.is_some(), "policy={policy:?}, now={now:?}");
897
898 let e = policy.on_throttle(&idempotent_state(now), transient_error());
899 assert!(matches!(e, ThrottleResult::Continue(_)));
900 }
901
902 #[test]
903 fn test_limited_time_on_throttle_continue() {
904 let mut mock = MockPolicy::new();
905 mock.expect_on_throttle()
906 .times(1..)
907 .returning(|_, e| ThrottleResult::Continue(e));
908
909 let now = Instant::now();
910 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
911
912 let rf = policy.on_throttle(
914 &idempotent_state(now - Duration::from_secs(50)),
915 unavailable(),
916 );
917 assert!(matches!(rf, ThrottleResult::Continue(_)), "{rf:?}");
918
919 let rf = policy.on_throttle(
921 &idempotent_state(now - Duration::from_secs(70)),
922 unavailable(),
923 );
924 assert!(matches!(rf, ThrottleResult::Exhausted(_)), "{rf:?}");
925 }
926
927 #[test]
928 fn test_limited_time_on_throttle_exhausted() {
929 let mut mock = MockPolicy::new();
930 mock.expect_on_throttle()
931 .times(1..)
932 .returning(|_, e| ThrottleResult::Exhausted(e));
933
934 let now = Instant::now();
935 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
936
937 let rf = policy.on_throttle(
939 &idempotent_state(now - Duration::from_secs(50)),
940 unavailable(),
941 );
942 assert!(matches!(rf, ThrottleResult::Exhausted(_)), "{rf:?}");
943 }
944
945 #[test]
946 fn test_limited_time_inner_continues() {
947 let mut mock = MockPolicy::new();
948 mock.expect_on_error()
949 .times(1..)
950 .returning(|_, e| RetryResult::Continue(e));
951
952 let now = Instant::now();
953 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
954 let rf = policy.on_error(
955 &idempotent_state(now - Duration::from_secs(10)),
956 transient_error(),
957 );
958 assert!(rf.is_continue());
959
960 let rf = policy.on_error(
961 &idempotent_state(now - Duration::from_secs(70)),
962 transient_error(),
963 );
964 assert!(rf.is_exhausted());
965 }
966
967 #[test]
968 fn test_limited_time_inner_permanent() {
969 let mut mock = MockPolicy::new();
970 mock.expect_on_error()
971 .times(2)
972 .returning(|_, e| RetryResult::Permanent(e));
973
974 let now = Instant::now();
975 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
976
977 let rf = policy.on_error(
978 &non_idempotent_state(now - Duration::from_secs(10)),
979 transient_error(),
980 );
981 assert!(rf.is_permanent());
982
983 let rf = policy.on_error(
984 &non_idempotent_state(now + Duration::from_secs(10)),
985 transient_error(),
986 );
987 assert!(rf.is_permanent());
988 }
989
990 #[test]
991 fn test_limited_time_inner_exhausted() {
992 let mut mock = MockPolicy::new();
993 mock.expect_on_error()
994 .times(2)
995 .returning(|_, e| RetryResult::Exhausted(e));
996
997 let now = Instant::now();
998 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
999
1000 let rf = policy.on_error(
1001 &non_idempotent_state(now - Duration::from_secs(10)),
1002 transient_error(),
1003 );
1004 assert!(rf.is_exhausted());
1005
1006 let rf = policy.on_error(
1007 &non_idempotent_state(now + Duration::from_secs(10)),
1008 transient_error(),
1009 );
1010 assert!(rf.is_exhausted());
1011 }
1012
1013 #[test]
1014 fn test_limited_time_remaining_inner_longer() {
1015 let mut mock = MockPolicy::new();
1016 mock.expect_remaining_time()
1017 .times(1)
1018 .returning(|_| Some(Duration::from_secs(30)));
1019
1020 let now = Instant::now();
1021 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
1022
1023 let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(55)));
1024 assert!(remaining <= Some(Duration::from_secs(5)), "{remaining:?}");
1025 }
1026
1027 #[test]
1028 fn test_limited_time_remaining_inner_shorter() {
1029 let mut mock = MockPolicy::new();
1030 mock.expect_remaining_time()
1031 .times(1)
1032 .returning(|_| Some(Duration::from_secs(5)));
1033 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
1034
1035 let now = Instant::now();
1036 let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(5)));
1037 assert!(remaining <= Some(Duration::from_secs(10)), "{remaining:?}");
1038 }
1039
1040 #[test]
1041 fn test_limited_time_remaining_inner_is_none() {
1042 let mut mock = MockPolicy::new();
1043 mock.expect_remaining_time().times(1).returning(|_| None);
1044 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
1045
1046 let now = Instant::now();
1047 let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(50)));
1048 assert!(remaining <= Some(Duration::from_secs(10)), "{remaining:?}");
1049 }
1050
1051 #[test]
1052 fn test_limited_attempt_count_on_error() {
1053 let mut mock = MockPolicy::new();
1054 mock.expect_on_error()
1055 .times(1..)
1056 .returning(|_, e| RetryResult::Continue(e));
1057
1058 let now = Instant::now();
1059 let policy = LimitedAttemptCount::custom(mock, 3);
1060 assert!(
1061 policy
1062 .on_error(
1063 &idempotent_state(now).set_attempt_count(1_u32),
1064 transient_error()
1065 )
1066 .is_continue()
1067 );
1068 assert!(
1069 policy
1070 .on_error(
1071 &idempotent_state(now).set_attempt_count(2_u32),
1072 transient_error()
1073 )
1074 .is_continue()
1075 );
1076 assert!(
1077 policy
1078 .on_error(
1079 &idempotent_state(now).set_attempt_count(3_u32),
1080 transient_error()
1081 )
1082 .is_exhausted()
1083 );
1084 }
1085
1086 #[test]
1087 fn test_limited_attempt_count_on_throttle_continue() {
1088 let mut mock = MockPolicy::new();
1089 mock.expect_on_throttle()
1090 .times(1..)
1091 .returning(|_, e| ThrottleResult::Continue(e));
1092
1093 let now = Instant::now();
1094 let policy = LimitedAttemptCount::custom(mock, 3);
1095 assert!(matches!(
1096 policy.on_throttle(
1097 &idempotent_state(now).set_attempt_count(2_u32),
1098 unavailable()
1099 ),
1100 ThrottleResult::Continue(_)
1101 ));
1102 }
1103
1104 #[test]
1105 fn test_limited_attempt_count_on_throttle_error() {
1106 let mut mock = MockPolicy::new();
1107 mock.expect_on_throttle()
1108 .times(1..)
1109 .returning(|_, e| ThrottleResult::Exhausted(e));
1110
1111 let now = Instant::now();
1112 let policy = LimitedAttemptCount::custom(mock, 3);
1113 assert!(matches!(
1114 policy.on_throttle(&idempotent_state(now), unavailable()),
1115 ThrottleResult::Exhausted(_)
1116 ));
1117 }
1118
1119 #[test]
1120 fn test_limited_attempt_count_remaining_none() {
1121 let mut mock = MockPolicy::new();
1122 mock.expect_remaining_time().times(1).returning(|_| None);
1123 let policy = LimitedAttemptCount::custom(mock, 3);
1124
1125 let now = Instant::now();
1126 assert!(
1127 policy.remaining_time(&idempotent_state(now)).is_none(),
1128 "policy={policy:?} now={now:?}"
1129 );
1130 }
1131
1132 #[test]
1133 fn test_limited_attempt_count_remaining_some() {
1134 let mut mock = MockPolicy::new();
1135 mock.expect_remaining_time()
1136 .times(1)
1137 .returning(|_| Some(Duration::from_secs(123)));
1138 let policy = LimitedAttemptCount::custom(mock, 3);
1139
1140 let now = Instant::now();
1141 assert_eq!(
1142 policy.remaining_time(&idempotent_state(now)),
1143 Some(Duration::from_secs(123))
1144 );
1145 }
1146
1147 #[test]
1148 fn test_limited_attempt_count_inner_permanent() {
1149 let mut mock = MockPolicy::new();
1150 mock.expect_on_error()
1151 .times(2)
1152 .returning(|_, e| RetryResult::Permanent(e));
1153 let policy = LimitedAttemptCount::custom(mock, 2);
1154 let now = Instant::now();
1155
1156 let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1157 assert!(rf.is_permanent());
1158
1159 let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1160 assert!(rf.is_permanent());
1161 }
1162
1163 #[test]
1164 fn test_limited_attempt_count_inner_exhausted() {
1165 let mut mock = MockPolicy::new();
1166 mock.expect_on_error()
1167 .times(2)
1168 .returning(|_, e| RetryResult::Exhausted(e));
1169 let policy = LimitedAttemptCount::custom(mock, 2);
1170 let now = Instant::now();
1171
1172 let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1173 assert!(rf.is_exhausted());
1174
1175 let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1176 assert!(rf.is_exhausted());
1177 }
1178
1179 fn transient_error() -> Error {
1180 use crate::error::rpc::{Code, Status};
1181 Error::service(
1182 Status::default()
1183 .set_code(Code::Unavailable)
1184 .set_message("try-again"),
1185 )
1186 }
1187
1188 pub(crate) fn idempotent_state(now: Instant) -> RetryState {
1189 RetryState::new(true).set_start(now)
1190 }
1191
1192 pub(crate) fn non_idempotent_state(now: Instant) -> RetryState {
1193 RetryState::new(false).set_start(now)
1194 }
1195}