1use crate::error::Error;
57use crate::retry_result::RetryResult;
58use crate::retry_state::RetryState;
59use crate::throttle_result::ThrottleResult;
60use std::sync::Arc;
61use std::time::Duration;
62
63pub trait RetryPolicy: Send + Sync + std::fmt::Debug {
68 #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
74 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult;
75
76 #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
88 fn on_throttle(&self, _state: &RetryState, error: Error) -> ThrottleResult {
89 ThrottleResult::Continue(error)
90 }
91
92 #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
103 fn remaining_time(&self, _state: &RetryState) -> Option<Duration> {
104 None
105 }
106}
107
108#[derive(Clone, Debug)]
110pub struct RetryPolicyArg(Arc<dyn RetryPolicy>);
111
112impl<T> std::convert::From<T> for RetryPolicyArg
113where
114 T: RetryPolicy + 'static,
115{
116 fn from(value: T) -> Self {
117 Self(Arc::new(value))
118 }
119}
120
121impl std::convert::From<Arc<dyn RetryPolicy>> for RetryPolicyArg {
122 fn from(value: Arc<dyn RetryPolicy>) -> Self {
123 Self(value)
124 }
125}
126
127impl From<RetryPolicyArg> for Arc<dyn RetryPolicy> {
128 fn from(value: RetryPolicyArg) -> Arc<dyn RetryPolicy> {
129 value.0
130 }
131}
132
133pub trait RetryPolicyExt: RetryPolicy + Sized {
135 fn with_time_limit(self, maximum_duration: Duration) -> LimitedElapsedTime<Self> {
156 LimitedElapsedTime::custom(self, maximum_duration)
157 }
158
159 fn with_attempt_limit(self, maximum_attempts: u32) -> LimitedAttemptCount<Self> {
186 LimitedAttemptCount::custom(self, maximum_attempts)
187 }
188}
189
190impl<T: RetryPolicy> RetryPolicyExt for T {}
191
192#[derive(Clone, Debug)]
216pub struct Aip194Strict;
217
218impl RetryPolicy for Aip194Strict {
219 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
220 if error.is_transient_and_before_rpc() {
221 return RetryResult::Continue(error);
222 }
223 if !state.idempotent {
224 return RetryResult::Permanent(error);
225 }
226 if error.is_io() {
227 return RetryResult::Continue(error);
228 }
229 if let Some(status) = error.status() {
230 return if status.code == crate::error::rpc::Code::Unavailable {
231 RetryResult::Continue(error)
232 } else {
233 RetryResult::Permanent(error)
234 };
235 }
236
237 match error.http_status_code() {
238 Some(code) if code == http::StatusCode::SERVICE_UNAVAILABLE.as_u16() => {
239 RetryResult::Continue(error)
240 }
241 _ => RetryResult::Permanent(error),
242 }
243 }
244}
245
246#[derive(Clone, Debug)]
267pub struct AlwaysRetry;
268
269impl RetryPolicy for AlwaysRetry {
270 fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
271 RetryResult::Continue(error)
272 }
273}
274
275#[derive(Clone, Debug)]
293pub struct NeverRetry;
294
295impl RetryPolicy for NeverRetry {
296 fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
297 RetryResult::Exhausted(error)
298 }
299}
300
301#[derive(thiserror::Error, Debug)]
302pub struct LimitedElapsedTimeError {
303 maximum_duration: Duration,
304 #[source]
305 source: Error,
306}
307
308impl LimitedElapsedTimeError {
309 pub(crate) fn new(maximum_duration: Duration, source: Error) -> Self {
310 Self {
311 maximum_duration,
312 source,
313 }
314 }
315
316 pub fn maximum_duration(&self) -> Duration {
318 self.maximum_duration
319 }
320}
321
322impl std::fmt::Display for LimitedElapsedTimeError {
323 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
324 write!(
325 f,
326 "retry policy is exhausted after {}s, the last retry attempt was throttled",
327 self.maximum_duration.as_secs_f64()
328 )
329 }
330}
331
332#[derive(Debug)]
347pub struct LimitedElapsedTime<P = Aip194Strict>
348where
349 P: RetryPolicy,
350{
351 inner: P,
352 maximum_duration: Duration,
353}
354
355impl LimitedElapsedTime {
356 pub fn new(maximum_duration: Duration) -> Self {
367 Self {
368 inner: Aip194Strict,
369 maximum_duration,
370 }
371 }
372}
373
374impl<P> LimitedElapsedTime<P>
375where
376 P: RetryPolicy,
377{
378 pub fn custom(inner: P, maximum_duration: Duration) -> Self {
396 Self {
397 inner,
398 maximum_duration,
399 }
400 }
401
402 fn error_if_exhausted(&self, state: &RetryState, error: Error) -> ThrottleResult {
403 let deadline = state.start + self.maximum_duration;
404 let now = tokio::time::Instant::now().into_std();
405 if now < deadline {
406 ThrottleResult::Continue(error)
407 } else {
408 ThrottleResult::Exhausted(Error::exhausted(LimitedElapsedTimeError::new(
409 self.maximum_duration,
410 error,
411 )))
412 }
413 }
414}
415
416impl<P> RetryPolicy for LimitedElapsedTime<P>
417where
418 P: RetryPolicy + 'static,
419{
420 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
421 match self.inner.on_error(state, error) {
422 RetryResult::Permanent(e) => RetryResult::Permanent(e),
423 RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
424 RetryResult::Continue(e) => {
425 if tokio::time::Instant::now().into_std() >= state.start + self.maximum_duration {
426 RetryResult::Exhausted(e)
427 } else {
428 RetryResult::Continue(e)
429 }
430 }
431 }
432 }
433
434 fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
435 match self.inner.on_throttle(state, error) {
436 ThrottleResult::Continue(e) => self.error_if_exhausted(state, e),
437 ThrottleResult::Exhausted(e) => ThrottleResult::Exhausted(e),
438 }
439 }
440
441 fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
442 let deadline = state.start + self.maximum_duration;
443 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now().into_std());
444 if let Some(inner) = self.inner.remaining_time(state) {
445 return Some(std::cmp::min(remaining, inner));
446 }
447 Some(remaining)
448 }
449}
450
451#[derive(Debug)]
466pub struct LimitedAttemptCount<P = Aip194Strict>
467where
468 P: RetryPolicy,
469{
470 inner: P,
471 maximum_attempts: u32,
472}
473
474impl LimitedAttemptCount {
475 pub fn new(maximum_attempts: u32) -> Self {
483 Self {
484 inner: Aip194Strict,
485 maximum_attempts,
486 }
487 }
488}
489
490impl<P> LimitedAttemptCount<P>
491where
492 P: RetryPolicy,
493{
494 pub fn custom(inner: P, maximum_attempts: u32) -> Self {
508 Self {
509 inner,
510 maximum_attempts,
511 }
512 }
513}
514
515impl<P> RetryPolicy for LimitedAttemptCount<P>
516where
517 P: RetryPolicy,
518{
519 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
520 match self.inner.on_error(state, error) {
521 RetryResult::Permanent(e) => RetryResult::Permanent(e),
522 RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
523 RetryResult::Continue(e) => {
524 if state.attempt_count >= self.maximum_attempts {
525 RetryResult::Exhausted(e)
526 } else {
527 RetryResult::Continue(e)
528 }
529 }
530 }
531 }
532
533 fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
534 assert!(state.attempt_count < self.maximum_attempts);
537 self.inner.on_throttle(state, error)
538 }
539
540 fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
541 self.inner.remaining_time(state)
542 }
543}
544
545#[cfg(test)]
546mod tests {
547 use super::*;
548 use http::HeaderMap;
549 use std::error::Error as StdError;
550 use std::time::Instant;
551
552 #[test]
554 fn retry_policy_arg() {
555 let policy = LimitedAttemptCount::new(3);
556 let _ = RetryPolicyArg::from(policy);
557
558 let policy: Arc<dyn RetryPolicy> = Arc::new(LimitedAttemptCount::new(3));
559 let _ = RetryPolicyArg::from(policy);
560 }
561
562 #[test]
563 fn aip194_strict() {
564 let p = Aip194Strict;
565
566 let now = Instant::now();
567 assert!(
568 p.on_error(&idempotent_state(now), unavailable())
569 .is_continue()
570 );
571 assert!(
572 p.on_error(&non_idempotent_state(now), unavailable())
573 .is_permanent()
574 );
575 assert!(matches!(
576 p.on_throttle(&idempotent_state(now), unavailable()),
577 ThrottleResult::Continue(_)
578 ));
579
580 assert!(
581 p.on_error(&idempotent_state(now), permission_denied())
582 .is_permanent()
583 );
584 assert!(
585 p.on_error(&non_idempotent_state(now), permission_denied())
586 .is_permanent()
587 );
588
589 assert!(
590 p.on_error(&idempotent_state(now), http_unavailable())
591 .is_continue()
592 );
593 assert!(
594 p.on_error(&non_idempotent_state(now), http_unavailable())
595 .is_permanent()
596 );
597 assert!(matches!(
598 p.on_throttle(&idempotent_state(now), http_unavailable()),
599 ThrottleResult::Continue(_)
600 ));
601
602 assert!(
603 p.on_error(&idempotent_state(now), http_permission_denied())
604 .is_permanent()
605 );
606 assert!(
607 p.on_error(&non_idempotent_state(now), http_permission_denied())
608 .is_permanent()
609 );
610
611 assert!(
612 p.on_error(&idempotent_state(now), Error::io("err".to_string()))
613 .is_continue()
614 );
615 assert!(
616 p.on_error(&non_idempotent_state(now), Error::io("err".to_string()))
617 .is_permanent()
618 );
619
620 assert!(
621 p.on_error(&idempotent_state(now), pre_rpc_transient())
622 .is_continue()
623 );
624 assert!(
625 p.on_error(&non_idempotent_state(now), pre_rpc_transient())
626 .is_continue()
627 );
628
629 assert!(
630 p.on_error(&idempotent_state(now), Error::ser("err"))
631 .is_permanent()
632 );
633 assert!(
634 p.on_error(&non_idempotent_state(now), Error::ser("err"))
635 .is_permanent()
636 );
637 assert!(
638 p.on_error(&idempotent_state(now), Error::deser("err"))
639 .is_permanent()
640 );
641 assert!(
642 p.on_error(&non_idempotent_state(now), Error::deser("err"))
643 .is_permanent()
644 );
645
646 assert!(
647 p.remaining_time(&idempotent_state(now)).is_none(),
648 "p={p:?}, now={now:?}"
649 );
650 }
651
652 #[test]
653 fn always_retry() {
654 let p = AlwaysRetry;
655
656 let now = Instant::now();
657 assert!(
658 p.remaining_time(&idempotent_state(now)).is_none(),
659 "p={p:?}, now={now:?}"
660 );
661 assert!(
662 p.on_error(&idempotent_state(now), http_unavailable())
663 .is_continue()
664 );
665 assert!(
666 p.on_error(&non_idempotent_state(now), http_unavailable())
667 .is_continue()
668 );
669 assert!(matches!(
670 p.on_throttle(&idempotent_state(now), http_unavailable()),
671 ThrottleResult::Continue(_)
672 ));
673
674 assert!(
675 p.on_error(&idempotent_state(now), unavailable())
676 .is_continue()
677 );
678 assert!(
679 p.on_error(&non_idempotent_state(now), unavailable())
680 .is_continue()
681 );
682 }
683
684 #[test_case::test_case(true, Error::io("err"))]
685 #[test_case::test_case(true, pre_rpc_transient())]
686 #[test_case::test_case(true, Error::ser("err"))]
687 #[test_case::test_case(false, Error::io("err"))]
688 #[test_case::test_case(false, pre_rpc_transient())]
689 #[test_case::test_case(false, Error::ser("err"))]
690 fn always_retry_error_kind(idempotent: bool, error: Error) {
691 let p = AlwaysRetry;
692 let now = Instant::now();
693 let state = if idempotent {
694 idempotent_state(now)
695 } else {
696 non_idempotent_state(now)
697 };
698 assert!(p.on_error(&state, error).is_continue());
699 }
700
701 #[test]
702 fn never_retry() {
703 let p = NeverRetry;
704
705 let now = Instant::now();
706 assert!(
707 p.remaining_time(&idempotent_state(now)).is_none(),
708 "p={p:?}, now={now:?}"
709 );
710 assert!(
711 p.on_error(&idempotent_state(now), http_unavailable())
712 .is_exhausted()
713 );
714 assert!(
715 p.on_error(&non_idempotent_state(now), http_unavailable())
716 .is_exhausted()
717 );
718 assert!(matches!(
719 p.on_throttle(&idempotent_state(now), http_unavailable()),
720 ThrottleResult::Continue(_)
721 ));
722
723 assert!(
724 p.on_error(&idempotent_state(now), unavailable())
725 .is_exhausted()
726 );
727 assert!(
728 p.on_error(&non_idempotent_state(now), unavailable())
729 .is_exhausted()
730 );
731
732 assert!(
733 p.on_error(&idempotent_state(now), http_permission_denied())
734 .is_exhausted()
735 );
736 assert!(
737 p.on_error(&non_idempotent_state(now), http_permission_denied())
738 .is_exhausted()
739 );
740 }
741
742 #[test_case::test_case(true, Error::io("err"))]
743 #[test_case::test_case(true, pre_rpc_transient())]
744 #[test_case::test_case(true, Error::ser("err"))]
745 #[test_case::test_case(false, Error::io("err"))]
746 #[test_case::test_case(false, pre_rpc_transient())]
747 #[test_case::test_case(false, Error::ser("err"))]
748 fn never_retry_error_kind(idempotent: bool, error: Error) {
749 let p = NeverRetry;
750 let now = Instant::now();
751 let state = if idempotent {
752 idempotent_state(now)
753 } else {
754 non_idempotent_state(now)
755 };
756 assert!(p.on_error(&state, error).is_exhausted());
757 }
758
759 fn pre_rpc_transient() -> Error {
760 use crate::error::CredentialsError;
761 Error::authentication(CredentialsError::from_msg(true, "err"))
762 }
763
764 fn http_unavailable() -> Error {
765 Error::http(
766 503_u16,
767 HeaderMap::new(),
768 bytes::Bytes::from_owner("SERVICE UNAVAILABLE".to_string()),
769 )
770 }
771
772 fn http_permission_denied() -> Error {
773 Error::http(
774 403_u16,
775 HeaderMap::new(),
776 bytes::Bytes::from_owner("PERMISSION DENIED".to_string()),
777 )
778 }
779
780 fn unavailable() -> Error {
781 use crate::error::rpc::Code;
782 let status = crate::error::rpc::Status::default()
783 .set_code(Code::Unavailable)
784 .set_message("UNAVAILABLE");
785 Error::service(status)
786 }
787
788 fn permission_denied() -> Error {
789 use crate::error::rpc::Code;
790 let status = crate::error::rpc::Status::default()
791 .set_code(Code::PermissionDenied)
792 .set_message("PERMISSION_DENIED");
793 Error::service(status)
794 }
795
796 mockall::mock! {
797 #[derive(Debug)]
798 Policy {}
799 impl RetryPolicy for Policy {
800 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult;
801 fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult;
802 fn remaining_time(&self, state: &RetryState) -> Option<Duration>;
803 }
804 }
805
806 #[test]
807 fn limited_elapsed_time_error() {
808 let limit = Duration::from_secs(123) + Duration::from_millis(567);
809 let err = LimitedElapsedTimeError::new(limit, unavailable());
810 assert_eq!(err.maximum_duration(), limit);
811 let fmt = err.to_string();
812 assert!(fmt.contains("123.567s"), "display={fmt}, debug={err:?}");
813 assert!(err.source().is_some(), "{err:?}");
814 }
815
816 #[test]
817 fn test_limited_time_forwards() {
818 let mut mock = MockPolicy::new();
819 mock.expect_on_error()
820 .times(1..)
821 .returning(|_, e| RetryResult::Continue(e));
822 mock.expect_on_throttle()
823 .times(1..)
824 .returning(|_, e| ThrottleResult::Continue(e));
825 mock.expect_remaining_time().times(1).returning(|_| None);
826
827 let now = Instant::now();
828 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
829 let rf = policy.on_error(&idempotent_state(now), transient_error());
830 assert!(rf.is_continue());
831
832 let rt = policy.remaining_time(&idempotent_state(now));
833 assert!(rt.is_some(), "policy={policy:?}, now={now:?}");
834
835 let e = policy.on_throttle(&idempotent_state(now), transient_error());
836 assert!(matches!(e, ThrottleResult::Continue(_)));
837 }
838
839 #[test]
840 fn test_limited_time_on_throttle_continue() {
841 let mut mock = MockPolicy::new();
842 mock.expect_on_throttle()
843 .times(1..)
844 .returning(|_, e| ThrottleResult::Continue(e));
845
846 let now = Instant::now();
847 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
848
849 let rf = policy.on_throttle(
851 &idempotent_state(now - Duration::from_secs(50)),
852 unavailable(),
853 );
854 assert!(matches!(rf, ThrottleResult::Continue(_)), "{rf:?}");
855
856 let rf = policy.on_throttle(
858 &idempotent_state(now - Duration::from_secs(70)),
859 unavailable(),
860 );
861 assert!(matches!(rf, ThrottleResult::Exhausted(_)), "{rf:?}");
862 }
863
864 #[test]
865 fn test_limited_time_on_throttle_exhausted() {
866 let mut mock = MockPolicy::new();
867 mock.expect_on_throttle()
868 .times(1..)
869 .returning(|_, e| ThrottleResult::Exhausted(e));
870
871 let now = Instant::now();
872 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
873
874 let rf = policy.on_throttle(
876 &idempotent_state(now - Duration::from_secs(50)),
877 unavailable(),
878 );
879 assert!(matches!(rf, ThrottleResult::Exhausted(_)), "{rf:?}");
880 }
881
882 #[test]
883 fn test_limited_time_inner_continues() {
884 let mut mock = MockPolicy::new();
885 mock.expect_on_error()
886 .times(1..)
887 .returning(|_, e| RetryResult::Continue(e));
888
889 let now = Instant::now();
890 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
891 let rf = policy.on_error(
892 &idempotent_state(now - Duration::from_secs(10)),
893 transient_error(),
894 );
895 assert!(rf.is_continue());
896
897 let rf = policy.on_error(
898 &idempotent_state(now - Duration::from_secs(70)),
899 transient_error(),
900 );
901 assert!(rf.is_exhausted());
902 }
903
904 #[test]
905 fn test_limited_time_inner_permanent() {
906 let mut mock = MockPolicy::new();
907 mock.expect_on_error()
908 .times(2)
909 .returning(|_, e| RetryResult::Permanent(e));
910
911 let now = Instant::now();
912 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
913
914 let rf = policy.on_error(
915 &non_idempotent_state(now - Duration::from_secs(10)),
916 transient_error(),
917 );
918 assert!(rf.is_permanent());
919
920 let rf = policy.on_error(
921 &non_idempotent_state(now + Duration::from_secs(10)),
922 transient_error(),
923 );
924 assert!(rf.is_permanent());
925 }
926
927 #[test]
928 fn test_limited_time_inner_exhausted() {
929 let mut mock = MockPolicy::new();
930 mock.expect_on_error()
931 .times(2)
932 .returning(|_, e| RetryResult::Exhausted(e));
933
934 let now = Instant::now();
935 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
936
937 let rf = policy.on_error(
938 &non_idempotent_state(now - Duration::from_secs(10)),
939 transient_error(),
940 );
941 assert!(rf.is_exhausted());
942
943 let rf = policy.on_error(
944 &non_idempotent_state(now + Duration::from_secs(10)),
945 transient_error(),
946 );
947 assert!(rf.is_exhausted());
948 }
949
950 #[test]
951 fn test_limited_time_remaining_inner_longer() {
952 let mut mock = MockPolicy::new();
953 mock.expect_remaining_time()
954 .times(1)
955 .returning(|_| Some(Duration::from_secs(30)));
956
957 let now = Instant::now();
958 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
959
960 let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(55)));
961 assert!(remaining <= Some(Duration::from_secs(5)), "{remaining:?}");
962 }
963
964 #[test]
965 fn test_limited_time_remaining_inner_shorter() {
966 let mut mock = MockPolicy::new();
967 mock.expect_remaining_time()
968 .times(1)
969 .returning(|_| Some(Duration::from_secs(5)));
970 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
971
972 let now = Instant::now();
973 let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(5)));
974 assert!(remaining <= Some(Duration::from_secs(10)), "{remaining:?}");
975 }
976
977 #[test]
978 fn test_limited_time_remaining_inner_is_none() {
979 let mut mock = MockPolicy::new();
980 mock.expect_remaining_time().times(1).returning(|_| None);
981 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
982
983 let now = Instant::now();
984 let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(50)));
985 assert!(remaining <= Some(Duration::from_secs(10)), "{remaining:?}");
986 }
987
988 #[test]
989 fn test_limited_attempt_count_on_error() {
990 let mut mock = MockPolicy::new();
991 mock.expect_on_error()
992 .times(1..)
993 .returning(|_, e| RetryResult::Continue(e));
994
995 let now = Instant::now();
996 let policy = LimitedAttemptCount::custom(mock, 3);
997 assert!(
998 policy
999 .on_error(
1000 &idempotent_state(now).set_attempt_count(1_u32),
1001 transient_error()
1002 )
1003 .is_continue()
1004 );
1005 assert!(
1006 policy
1007 .on_error(
1008 &idempotent_state(now).set_attempt_count(2_u32),
1009 transient_error()
1010 )
1011 .is_continue()
1012 );
1013 assert!(
1014 policy
1015 .on_error(
1016 &idempotent_state(now).set_attempt_count(3_u32),
1017 transient_error()
1018 )
1019 .is_exhausted()
1020 );
1021 }
1022
1023 #[test]
1024 fn test_limited_attempt_count_on_throttle_continue() {
1025 let mut mock = MockPolicy::new();
1026 mock.expect_on_throttle()
1027 .times(1..)
1028 .returning(|_, e| ThrottleResult::Continue(e));
1029
1030 let now = Instant::now();
1031 let policy = LimitedAttemptCount::custom(mock, 3);
1032 assert!(matches!(
1033 policy.on_throttle(
1034 &idempotent_state(now).set_attempt_count(2_u32),
1035 unavailable()
1036 ),
1037 ThrottleResult::Continue(_)
1038 ));
1039 }
1040
1041 #[test]
1042 fn test_limited_attempt_count_on_throttle_error() {
1043 let mut mock = MockPolicy::new();
1044 mock.expect_on_throttle()
1045 .times(1..)
1046 .returning(|_, e| ThrottleResult::Exhausted(e));
1047
1048 let now = Instant::now();
1049 let policy = LimitedAttemptCount::custom(mock, 3);
1050 assert!(matches!(
1051 policy.on_throttle(&idempotent_state(now), unavailable()),
1052 ThrottleResult::Exhausted(_)
1053 ));
1054 }
1055
1056 #[test]
1057 fn test_limited_attempt_count_remaining_none() {
1058 let mut mock = MockPolicy::new();
1059 mock.expect_remaining_time().times(1).returning(|_| None);
1060 let policy = LimitedAttemptCount::custom(mock, 3);
1061
1062 let now = Instant::now();
1063 assert!(
1064 policy.remaining_time(&idempotent_state(now)).is_none(),
1065 "policy={policy:?} now={now:?}"
1066 );
1067 }
1068
1069 #[test]
1070 fn test_limited_attempt_count_remaining_some() {
1071 let mut mock = MockPolicy::new();
1072 mock.expect_remaining_time()
1073 .times(1)
1074 .returning(|_| Some(Duration::from_secs(123)));
1075 let policy = LimitedAttemptCount::custom(mock, 3);
1076
1077 let now = Instant::now();
1078 assert_eq!(
1079 policy.remaining_time(&idempotent_state(now)),
1080 Some(Duration::from_secs(123))
1081 );
1082 }
1083
1084 #[test]
1085 fn test_limited_attempt_count_inner_permanent() {
1086 let mut mock = MockPolicy::new();
1087 mock.expect_on_error()
1088 .times(2)
1089 .returning(|_, e| RetryResult::Permanent(e));
1090 let policy = LimitedAttemptCount::custom(mock, 2);
1091 let now = Instant::now();
1092
1093 let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1094 assert!(rf.is_permanent());
1095
1096 let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1097 assert!(rf.is_permanent());
1098 }
1099
1100 #[test]
1101 fn test_limited_attempt_count_inner_exhausted() {
1102 let mut mock = MockPolicy::new();
1103 mock.expect_on_error()
1104 .times(2)
1105 .returning(|_, e| RetryResult::Exhausted(e));
1106 let policy = LimitedAttemptCount::custom(mock, 2);
1107 let now = Instant::now();
1108
1109 let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1110 assert!(rf.is_exhausted());
1111
1112 let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1113 assert!(rf.is_exhausted());
1114 }
1115
1116 fn transient_error() -> Error {
1117 use crate::error::rpc::{Code, Status};
1118 Error::service(
1119 Status::default()
1120 .set_code(Code::Unavailable)
1121 .set_message("try-again"),
1122 )
1123 }
1124
1125 fn idempotent_state(now: Instant) -> RetryState {
1126 RetryState::new(true).set_start(now)
1127 }
1128
1129 fn non_idempotent_state(now: Instant) -> RetryState {
1130 RetryState::new(false).set_start(now)
1131 }
1132}