1mod too_many_requests;
57
58use crate::error::Error;
59use crate::retry_result::RetryResult;
60use crate::retry_state::RetryState;
61use crate::throttle_result::ThrottleResult;
62use std::sync::Arc;
63use std::time::Duration;
64
65pub use too_many_requests::TooManyRequests;
66
67pub trait RetryPolicy: Send + Sync + std::fmt::Debug {
72 #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
78 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult;
79
80 #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
92 fn on_throttle(&self, _state: &RetryState, error: Error) -> ThrottleResult {
93 ThrottleResult::Continue(error)
94 }
95
96 #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
107 fn remaining_time(&self, _state: &RetryState) -> Option<Duration> {
108 None
109 }
110}
111
112#[derive(Clone, Debug)]
114pub struct RetryPolicyArg(Arc<dyn RetryPolicy>);
115
116impl<T> std::convert::From<T> for RetryPolicyArg
117where
118 T: RetryPolicy + 'static,
119{
120 fn from(value: T) -> Self {
121 Self(Arc::new(value))
122 }
123}
124
125impl std::convert::From<Arc<dyn RetryPolicy>> for RetryPolicyArg {
126 fn from(value: Arc<dyn RetryPolicy>) -> Self {
127 Self(value)
128 }
129}
130
131impl From<RetryPolicyArg> for Arc<dyn RetryPolicy> {
132 fn from(value: RetryPolicyArg) -> Arc<dyn RetryPolicy> {
133 value.0
134 }
135}
136
137pub trait RetryPolicyExt: RetryPolicy + Sized {
139 fn with_time_limit(self, maximum_duration: Duration) -> LimitedElapsedTime<Self> {
160 LimitedElapsedTime::custom(self, maximum_duration)
161 }
162
163 fn with_attempt_limit(self, maximum_attempts: u32) -> LimitedAttemptCount<Self> {
190 LimitedAttemptCount::custom(self, maximum_attempts)
191 }
192
193 fn continue_on_too_many_requests(self) -> TooManyRequests<Self> {
226 TooManyRequests::new(self)
227 }
228}
229
230impl<T: RetryPolicy> RetryPolicyExt for T {}
231
232#[derive(Clone, Debug)]
256pub struct Aip194Strict;
257
258impl RetryPolicy for Aip194Strict {
259 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
260 if error.is_transient_and_before_rpc() {
261 return RetryResult::Continue(error);
262 }
263 if !state.idempotent {
264 return RetryResult::Permanent(error);
265 }
266 if error.is_io() {
267 return RetryResult::Continue(error);
268 }
269 if let Some(status) = error.status() {
270 return if status.code == crate::error::rpc::Code::Unavailable {
271 RetryResult::Continue(error)
272 } else {
273 RetryResult::Permanent(error)
274 };
275 }
276
277 match error.http_status_code() {
278 Some(code) if code == http::StatusCode::SERVICE_UNAVAILABLE.as_u16() => {
279 RetryResult::Continue(error)
280 }
281 _ => RetryResult::Permanent(error),
282 }
283 }
284}
285
286#[derive(Clone, Debug)]
307pub struct AlwaysRetry;
308
309impl RetryPolicy for AlwaysRetry {
310 fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
311 RetryResult::Continue(error)
312 }
313}
314
315#[derive(Clone, Debug)]
333pub struct NeverRetry;
334
335impl RetryPolicy for NeverRetry {
336 fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
337 RetryResult::Exhausted(error)
338 }
339}
340
341#[derive(thiserror::Error, Debug)]
342pub struct LimitedElapsedTimeError {
343 maximum_duration: Duration,
344 #[source]
345 source: Error,
346}
347
348impl LimitedElapsedTimeError {
349 pub(crate) fn new(maximum_duration: Duration, source: Error) -> Self {
350 Self {
351 maximum_duration,
352 source,
353 }
354 }
355
356 pub fn maximum_duration(&self) -> Duration {
358 self.maximum_duration
359 }
360}
361
362impl std::fmt::Display for LimitedElapsedTimeError {
363 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
364 write!(
365 f,
366 "retry policy is exhausted after {}s, the last retry attempt was throttled",
367 self.maximum_duration.as_secs_f64()
368 )
369 }
370}
371
372#[derive(Debug)]
387pub struct LimitedElapsedTime<P = Aip194Strict>
388where
389 P: RetryPolicy,
390{
391 inner: P,
392 maximum_duration: Duration,
393}
394
395impl LimitedElapsedTime {
396 pub fn new(maximum_duration: Duration) -> Self {
407 Self {
408 inner: Aip194Strict,
409 maximum_duration,
410 }
411 }
412}
413
414impl<P> LimitedElapsedTime<P>
415where
416 P: RetryPolicy,
417{
418 pub fn custom(inner: P, maximum_duration: Duration) -> Self {
436 Self {
437 inner,
438 maximum_duration,
439 }
440 }
441
442 fn error_if_exhausted(&self, state: &RetryState, error: Error) -> ThrottleResult {
443 let deadline = state.start + self.maximum_duration;
444 let now = tokio::time::Instant::now().into_std();
445 if now < deadline {
446 ThrottleResult::Continue(error)
447 } else {
448 ThrottleResult::Exhausted(Error::exhausted(LimitedElapsedTimeError::new(
449 self.maximum_duration,
450 error,
451 )))
452 }
453 }
454}
455
456impl<P> RetryPolicy for LimitedElapsedTime<P>
457where
458 P: RetryPolicy + 'static,
459{
460 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
461 match self.inner.on_error(state, error) {
462 RetryResult::Permanent(e) => RetryResult::Permanent(e),
463 RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
464 RetryResult::Continue(e) => {
465 if tokio::time::Instant::now().into_std() >= state.start + self.maximum_duration {
466 RetryResult::Exhausted(e)
467 } else {
468 RetryResult::Continue(e)
469 }
470 }
471 }
472 }
473
474 fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
475 match self.inner.on_throttle(state, error) {
476 ThrottleResult::Continue(e) => self.error_if_exhausted(state, e),
477 ThrottleResult::Exhausted(e) => ThrottleResult::Exhausted(e),
478 }
479 }
480
481 fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
482 let deadline = state.start + self.maximum_duration;
483 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now().into_std());
484 if let Some(inner) = self.inner.remaining_time(state) {
485 return Some(std::cmp::min(remaining, inner));
486 }
487 Some(remaining)
488 }
489}
490
491#[derive(Debug)]
506pub struct LimitedAttemptCount<P = Aip194Strict>
507where
508 P: RetryPolicy,
509{
510 inner: P,
511 maximum_attempts: u32,
512}
513
514impl LimitedAttemptCount {
515 pub fn new(maximum_attempts: u32) -> Self {
523 Self {
524 inner: Aip194Strict,
525 maximum_attempts,
526 }
527 }
528}
529
530impl<P> LimitedAttemptCount<P>
531where
532 P: RetryPolicy,
533{
534 pub fn custom(inner: P, maximum_attempts: u32) -> Self {
548 Self {
549 inner,
550 maximum_attempts,
551 }
552 }
553}
554
555impl<P> RetryPolicy for LimitedAttemptCount<P>
556where
557 P: RetryPolicy,
558{
559 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
560 match self.inner.on_error(state, error) {
561 RetryResult::Permanent(e) => RetryResult::Permanent(e),
562 RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
563 RetryResult::Continue(e) => {
564 if state.attempt_count >= self.maximum_attempts {
565 RetryResult::Exhausted(e)
566 } else {
567 RetryResult::Continue(e)
568 }
569 }
570 }
571 }
572
573 fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
574 assert!(state.attempt_count < self.maximum_attempts);
577 self.inner.on_throttle(state, error)
578 }
579
580 fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
581 self.inner.remaining_time(state)
582 }
583}
584
585#[cfg(test)]
586pub mod tests {
587 use super::*;
588 use http::HeaderMap;
589 use std::error::Error as StdError;
590 use std::time::Instant;
591
592 #[test]
594 fn retry_policy_arg() {
595 let policy = LimitedAttemptCount::new(3);
596 let _ = RetryPolicyArg::from(policy);
597
598 let policy: Arc<dyn RetryPolicy> = Arc::new(LimitedAttemptCount::new(3));
599 let _ = RetryPolicyArg::from(policy);
600 }
601
602 #[test]
603 fn aip194_strict() {
604 let p = Aip194Strict;
605
606 let now = Instant::now();
607 assert!(
608 p.on_error(&idempotent_state(now), unavailable())
609 .is_continue()
610 );
611 assert!(
612 p.on_error(&non_idempotent_state(now), unavailable())
613 .is_permanent()
614 );
615 assert!(matches!(
616 p.on_throttle(&idempotent_state(now), unavailable()),
617 ThrottleResult::Continue(_)
618 ));
619
620 assert!(
621 p.on_error(&idempotent_state(now), permission_denied())
622 .is_permanent()
623 );
624 assert!(
625 p.on_error(&non_idempotent_state(now), permission_denied())
626 .is_permanent()
627 );
628
629 assert!(
630 p.on_error(&idempotent_state(now), http_unavailable())
631 .is_continue()
632 );
633 assert!(
634 p.on_error(&non_idempotent_state(now), http_unavailable())
635 .is_permanent()
636 );
637 assert!(matches!(
638 p.on_throttle(&idempotent_state(now), http_unavailable()),
639 ThrottleResult::Continue(_)
640 ));
641
642 assert!(
643 p.on_error(&idempotent_state(now), http_permission_denied())
644 .is_permanent()
645 );
646 assert!(
647 p.on_error(&non_idempotent_state(now), http_permission_denied())
648 .is_permanent()
649 );
650
651 assert!(
652 p.on_error(&idempotent_state(now), Error::io("err".to_string()))
653 .is_continue()
654 );
655 assert!(
656 p.on_error(&non_idempotent_state(now), Error::io("err".to_string()))
657 .is_permanent()
658 );
659
660 assert!(
661 p.on_error(&idempotent_state(now), pre_rpc_transient())
662 .is_continue()
663 );
664 assert!(
665 p.on_error(&non_idempotent_state(now), pre_rpc_transient())
666 .is_continue()
667 );
668
669 assert!(
670 p.on_error(&idempotent_state(now), Error::ser("err"))
671 .is_permanent()
672 );
673 assert!(
674 p.on_error(&non_idempotent_state(now), Error::ser("err"))
675 .is_permanent()
676 );
677 assert!(
678 p.on_error(&idempotent_state(now), Error::deser("err"))
679 .is_permanent()
680 );
681 assert!(
682 p.on_error(&non_idempotent_state(now), Error::deser("err"))
683 .is_permanent()
684 );
685
686 assert!(
687 p.remaining_time(&idempotent_state(now)).is_none(),
688 "p={p:?}, now={now:?}"
689 );
690 }
691
692 #[test]
693 fn always_retry() {
694 let p = AlwaysRetry;
695
696 let now = Instant::now();
697 assert!(
698 p.remaining_time(&idempotent_state(now)).is_none(),
699 "p={p:?}, now={now:?}"
700 );
701 assert!(
702 p.on_error(&idempotent_state(now), http_unavailable())
703 .is_continue()
704 );
705 assert!(
706 p.on_error(&non_idempotent_state(now), http_unavailable())
707 .is_continue()
708 );
709 assert!(matches!(
710 p.on_throttle(&idempotent_state(now), http_unavailable()),
711 ThrottleResult::Continue(_)
712 ));
713
714 assert!(
715 p.on_error(&idempotent_state(now), unavailable())
716 .is_continue()
717 );
718 assert!(
719 p.on_error(&non_idempotent_state(now), unavailable())
720 .is_continue()
721 );
722 }
723
724 #[test_case::test_case(true, Error::io("err"))]
725 #[test_case::test_case(true, pre_rpc_transient())]
726 #[test_case::test_case(true, Error::ser("err"))]
727 #[test_case::test_case(false, Error::io("err"))]
728 #[test_case::test_case(false, pre_rpc_transient())]
729 #[test_case::test_case(false, Error::ser("err"))]
730 fn always_retry_error_kind(idempotent: bool, error: Error) {
731 let p = AlwaysRetry;
732 let now = Instant::now();
733 let state = if idempotent {
734 idempotent_state(now)
735 } else {
736 non_idempotent_state(now)
737 };
738 assert!(p.on_error(&state, error).is_continue());
739 }
740
741 #[test]
742 fn never_retry() {
743 let p = NeverRetry;
744
745 let now = Instant::now();
746 assert!(
747 p.remaining_time(&idempotent_state(now)).is_none(),
748 "p={p:?}, now={now:?}"
749 );
750 assert!(
751 p.on_error(&idempotent_state(now), http_unavailable())
752 .is_exhausted()
753 );
754 assert!(
755 p.on_error(&non_idempotent_state(now), http_unavailable())
756 .is_exhausted()
757 );
758 assert!(matches!(
759 p.on_throttle(&idempotent_state(now), http_unavailable()),
760 ThrottleResult::Continue(_)
761 ));
762
763 assert!(
764 p.on_error(&idempotent_state(now), unavailable())
765 .is_exhausted()
766 );
767 assert!(
768 p.on_error(&non_idempotent_state(now), unavailable())
769 .is_exhausted()
770 );
771
772 assert!(
773 p.on_error(&idempotent_state(now), http_permission_denied())
774 .is_exhausted()
775 );
776 assert!(
777 p.on_error(&non_idempotent_state(now), http_permission_denied())
778 .is_exhausted()
779 );
780 }
781
782 #[test_case::test_case(true, Error::io("err"))]
783 #[test_case::test_case(true, pre_rpc_transient())]
784 #[test_case::test_case(true, Error::ser("err"))]
785 #[test_case::test_case(false, Error::io("err"))]
786 #[test_case::test_case(false, pre_rpc_transient())]
787 #[test_case::test_case(false, Error::ser("err"))]
788 fn never_retry_error_kind(idempotent: bool, error: Error) {
789 let p = NeverRetry;
790 let now = Instant::now();
791 let state = if idempotent {
792 idempotent_state(now)
793 } else {
794 non_idempotent_state(now)
795 };
796 assert!(p.on_error(&state, error).is_exhausted());
797 }
798
799 fn pre_rpc_transient() -> Error {
800 use crate::error::CredentialsError;
801 Error::authentication(CredentialsError::from_msg(true, "err"))
802 }
803
804 fn http_unavailable() -> Error {
805 Error::http(
806 503_u16,
807 HeaderMap::new(),
808 bytes::Bytes::from_owner("SERVICE UNAVAILABLE".to_string()),
809 )
810 }
811
812 fn http_permission_denied() -> Error {
813 Error::http(
814 403_u16,
815 HeaderMap::new(),
816 bytes::Bytes::from_owner("PERMISSION DENIED".to_string()),
817 )
818 }
819
820 fn unavailable() -> Error {
821 use crate::error::rpc::Code;
822 let status = crate::error::rpc::Status::default()
823 .set_code(Code::Unavailable)
824 .set_message("UNAVAILABLE");
825 Error::service(status)
826 }
827
828 fn permission_denied() -> Error {
829 use crate::error::rpc::Code;
830 let status = crate::error::rpc::Status::default()
831 .set_code(Code::PermissionDenied)
832 .set_message("PERMISSION_DENIED");
833 Error::service(status)
834 }
835
836 mockall::mock! {
837 #[derive(Debug)]
838 pub(crate) Policy {}
839 impl RetryPolicy for Policy {
840 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult;
841 fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult;
842 fn remaining_time(&self, state: &RetryState) -> Option<Duration>;
843 }
844 }
845
846 #[test]
847 fn limited_elapsed_time_error() {
848 let limit = Duration::from_secs(123) + Duration::from_millis(567);
849 let err = LimitedElapsedTimeError::new(limit, unavailable());
850 assert_eq!(err.maximum_duration(), limit);
851 let fmt = err.to_string();
852 assert!(fmt.contains("123.567s"), "display={fmt}, debug={err:?}");
853 assert!(err.source().is_some(), "{err:?}");
854 }
855
856 #[test]
857 fn test_limited_time_forwards() {
858 let mut mock = MockPolicy::new();
859 mock.expect_on_error()
860 .times(1..)
861 .returning(|_, e| RetryResult::Continue(e));
862 mock.expect_on_throttle()
863 .times(1..)
864 .returning(|_, e| ThrottleResult::Continue(e));
865 mock.expect_remaining_time().times(1).returning(|_| None);
866
867 let now = Instant::now();
868 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
869 let rf = policy.on_error(&idempotent_state(now), transient_error());
870 assert!(rf.is_continue());
871
872 let rt = policy.remaining_time(&idempotent_state(now));
873 assert!(rt.is_some(), "policy={policy:?}, now={now:?}");
874
875 let e = policy.on_throttle(&idempotent_state(now), transient_error());
876 assert!(matches!(e, ThrottleResult::Continue(_)));
877 }
878
879 #[test]
880 fn test_limited_time_on_throttle_continue() {
881 let mut mock = MockPolicy::new();
882 mock.expect_on_throttle()
883 .times(1..)
884 .returning(|_, e| ThrottleResult::Continue(e));
885
886 let now = Instant::now();
887 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
888
889 let rf = policy.on_throttle(
891 &idempotent_state(now - Duration::from_secs(50)),
892 unavailable(),
893 );
894 assert!(matches!(rf, ThrottleResult::Continue(_)), "{rf:?}");
895
896 let rf = policy.on_throttle(
898 &idempotent_state(now - Duration::from_secs(70)),
899 unavailable(),
900 );
901 assert!(matches!(rf, ThrottleResult::Exhausted(_)), "{rf:?}");
902 }
903
904 #[test]
905 fn test_limited_time_on_throttle_exhausted() {
906 let mut mock = MockPolicy::new();
907 mock.expect_on_throttle()
908 .times(1..)
909 .returning(|_, e| ThrottleResult::Exhausted(e));
910
911 let now = Instant::now();
912 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
913
914 let rf = policy.on_throttle(
916 &idempotent_state(now - Duration::from_secs(50)),
917 unavailable(),
918 );
919 assert!(matches!(rf, ThrottleResult::Exhausted(_)), "{rf:?}");
920 }
921
922 #[test]
923 fn test_limited_time_inner_continues() {
924 let mut mock = MockPolicy::new();
925 mock.expect_on_error()
926 .times(1..)
927 .returning(|_, e| RetryResult::Continue(e));
928
929 let now = Instant::now();
930 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
931 let rf = policy.on_error(
932 &idempotent_state(now - Duration::from_secs(10)),
933 transient_error(),
934 );
935 assert!(rf.is_continue());
936
937 let rf = policy.on_error(
938 &idempotent_state(now - Duration::from_secs(70)),
939 transient_error(),
940 );
941 assert!(rf.is_exhausted());
942 }
943
944 #[test]
945 fn test_limited_time_inner_permanent() {
946 let mut mock = MockPolicy::new();
947 mock.expect_on_error()
948 .times(2)
949 .returning(|_, e| RetryResult::Permanent(e));
950
951 let now = Instant::now();
952 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
953
954 let rf = policy.on_error(
955 &non_idempotent_state(now - Duration::from_secs(10)),
956 transient_error(),
957 );
958 assert!(rf.is_permanent());
959
960 let rf = policy.on_error(
961 &non_idempotent_state(now + Duration::from_secs(10)),
962 transient_error(),
963 );
964 assert!(rf.is_permanent());
965 }
966
967 #[test]
968 fn test_limited_time_inner_exhausted() {
969 let mut mock = MockPolicy::new();
970 mock.expect_on_error()
971 .times(2)
972 .returning(|_, e| RetryResult::Exhausted(e));
973
974 let now = Instant::now();
975 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
976
977 let rf = policy.on_error(
978 &non_idempotent_state(now - Duration::from_secs(10)),
979 transient_error(),
980 );
981 assert!(rf.is_exhausted());
982
983 let rf = policy.on_error(
984 &non_idempotent_state(now + Duration::from_secs(10)),
985 transient_error(),
986 );
987 assert!(rf.is_exhausted());
988 }
989
990 #[test]
991 fn test_limited_time_remaining_inner_longer() {
992 let mut mock = MockPolicy::new();
993 mock.expect_remaining_time()
994 .times(1)
995 .returning(|_| Some(Duration::from_secs(30)));
996
997 let now = Instant::now();
998 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
999
1000 let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(55)));
1001 assert!(remaining <= Some(Duration::from_secs(5)), "{remaining:?}");
1002 }
1003
1004 #[test]
1005 fn test_limited_time_remaining_inner_shorter() {
1006 let mut mock = MockPolicy::new();
1007 mock.expect_remaining_time()
1008 .times(1)
1009 .returning(|_| Some(Duration::from_secs(5)));
1010 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
1011
1012 let now = Instant::now();
1013 let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(5)));
1014 assert!(remaining <= Some(Duration::from_secs(10)), "{remaining:?}");
1015 }
1016
1017 #[test]
1018 fn test_limited_time_remaining_inner_is_none() {
1019 let mut mock = MockPolicy::new();
1020 mock.expect_remaining_time().times(1).returning(|_| None);
1021 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
1022
1023 let now = Instant::now();
1024 let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(50)));
1025 assert!(remaining <= Some(Duration::from_secs(10)), "{remaining:?}");
1026 }
1027
1028 #[test]
1029 fn test_limited_attempt_count_on_error() {
1030 let mut mock = MockPolicy::new();
1031 mock.expect_on_error()
1032 .times(1..)
1033 .returning(|_, e| RetryResult::Continue(e));
1034
1035 let now = Instant::now();
1036 let policy = LimitedAttemptCount::custom(mock, 3);
1037 assert!(
1038 policy
1039 .on_error(
1040 &idempotent_state(now).set_attempt_count(1_u32),
1041 transient_error()
1042 )
1043 .is_continue()
1044 );
1045 assert!(
1046 policy
1047 .on_error(
1048 &idempotent_state(now).set_attempt_count(2_u32),
1049 transient_error()
1050 )
1051 .is_continue()
1052 );
1053 assert!(
1054 policy
1055 .on_error(
1056 &idempotent_state(now).set_attempt_count(3_u32),
1057 transient_error()
1058 )
1059 .is_exhausted()
1060 );
1061 }
1062
1063 #[test]
1064 fn test_limited_attempt_count_on_throttle_continue() {
1065 let mut mock = MockPolicy::new();
1066 mock.expect_on_throttle()
1067 .times(1..)
1068 .returning(|_, e| ThrottleResult::Continue(e));
1069
1070 let now = Instant::now();
1071 let policy = LimitedAttemptCount::custom(mock, 3);
1072 assert!(matches!(
1073 policy.on_throttle(
1074 &idempotent_state(now).set_attempt_count(2_u32),
1075 unavailable()
1076 ),
1077 ThrottleResult::Continue(_)
1078 ));
1079 }
1080
1081 #[test]
1082 fn test_limited_attempt_count_on_throttle_error() {
1083 let mut mock = MockPolicy::new();
1084 mock.expect_on_throttle()
1085 .times(1..)
1086 .returning(|_, e| ThrottleResult::Exhausted(e));
1087
1088 let now = Instant::now();
1089 let policy = LimitedAttemptCount::custom(mock, 3);
1090 assert!(matches!(
1091 policy.on_throttle(&idempotent_state(now), unavailable()),
1092 ThrottleResult::Exhausted(_)
1093 ));
1094 }
1095
1096 #[test]
1097 fn test_limited_attempt_count_remaining_none() {
1098 let mut mock = MockPolicy::new();
1099 mock.expect_remaining_time().times(1).returning(|_| None);
1100 let policy = LimitedAttemptCount::custom(mock, 3);
1101
1102 let now = Instant::now();
1103 assert!(
1104 policy.remaining_time(&idempotent_state(now)).is_none(),
1105 "policy={policy:?} now={now:?}"
1106 );
1107 }
1108
1109 #[test]
1110 fn test_limited_attempt_count_remaining_some() {
1111 let mut mock = MockPolicy::new();
1112 mock.expect_remaining_time()
1113 .times(1)
1114 .returning(|_| Some(Duration::from_secs(123)));
1115 let policy = LimitedAttemptCount::custom(mock, 3);
1116
1117 let now = Instant::now();
1118 assert_eq!(
1119 policy.remaining_time(&idempotent_state(now)),
1120 Some(Duration::from_secs(123))
1121 );
1122 }
1123
1124 #[test]
1125 fn test_limited_attempt_count_inner_permanent() {
1126 let mut mock = MockPolicy::new();
1127 mock.expect_on_error()
1128 .times(2)
1129 .returning(|_, e| RetryResult::Permanent(e));
1130 let policy = LimitedAttemptCount::custom(mock, 2);
1131 let now = Instant::now();
1132
1133 let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1134 assert!(rf.is_permanent());
1135
1136 let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1137 assert!(rf.is_permanent());
1138 }
1139
1140 #[test]
1141 fn test_limited_attempt_count_inner_exhausted() {
1142 let mut mock = MockPolicy::new();
1143 mock.expect_on_error()
1144 .times(2)
1145 .returning(|_, e| RetryResult::Exhausted(e));
1146 let policy = LimitedAttemptCount::custom(mock, 2);
1147 let now = Instant::now();
1148
1149 let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1150 assert!(rf.is_exhausted());
1151
1152 let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1153 assert!(rf.is_exhausted());
1154 }
1155
1156 fn transient_error() -> Error {
1157 use crate::error::rpc::{Code, Status};
1158 Error::service(
1159 Status::default()
1160 .set_code(Code::Unavailable)
1161 .set_message("try-again"),
1162 )
1163 }
1164
1165 pub(crate) fn idempotent_state(now: Instant) -> RetryState {
1166 RetryState::new(true).set_start(now)
1167 }
1168
1169 pub(crate) fn non_idempotent_state(now: Instant) -> RetryState {
1170 RetryState::new(false).set_start(now)
1171 }
1172}