1use crate::error::Error;
57use crate::retry_result::RetryResult;
58use crate::retry_state::RetryState;
59use crate::throttle_result::ThrottleResult;
60use std::sync::Arc;
61use std::time::Duration;
62
63pub trait RetryPolicy: Send + Sync + std::fmt::Debug {
68 #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
74 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult;
75
76 #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
88 fn on_throttle(&self, _state: &RetryState, error: Error) -> ThrottleResult {
89 ThrottleResult::Continue(error)
90 }
91
92 #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
103 fn remaining_time(&self, _state: &RetryState) -> Option<Duration> {
104 None
105 }
106}
107
108#[derive(Clone, Debug)]
110pub struct RetryPolicyArg(Arc<dyn RetryPolicy>);
111
112impl<T> std::convert::From<T> for RetryPolicyArg
113where
114 T: RetryPolicy + 'static,
115{
116 fn from(value: T) -> Self {
117 Self(Arc::new(value))
118 }
119}
120
121impl std::convert::From<Arc<dyn RetryPolicy>> for RetryPolicyArg {
122 fn from(value: Arc<dyn RetryPolicy>) -> Self {
123 Self(value)
124 }
125}
126
127impl From<RetryPolicyArg> for Arc<dyn RetryPolicy> {
128 fn from(value: RetryPolicyArg) -> Arc<dyn RetryPolicy> {
129 value.0
130 }
131}
132
133pub trait RetryPolicyExt: RetryPolicy + Sized {
135 fn with_time_limit(self, maximum_duration: Duration) -> LimitedElapsedTime<Self> {
156 LimitedElapsedTime::custom(self, maximum_duration)
157 }
158
159 fn with_attempt_limit(self, maximum_attempts: u32) -> LimitedAttemptCount<Self> {
186 LimitedAttemptCount::custom(self, maximum_attempts)
187 }
188}
189
190impl<T: RetryPolicy> RetryPolicyExt for T {}
191
192#[derive(Clone, Debug)]
216pub struct Aip194Strict;
217
218impl RetryPolicy for Aip194Strict {
219 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
220 if error.is_transient_and_before_rpc() {
221 return RetryResult::Continue(error);
222 }
223 if !state.idempotent {
224 return RetryResult::Permanent(error);
225 }
226 if error.is_io() {
227 return RetryResult::Continue(error);
228 }
229 if let Some(status) = error.status() {
230 return if status.code == crate::error::rpc::Code::Unavailable {
231 RetryResult::Continue(error)
232 } else {
233 RetryResult::Permanent(error)
234 };
235 }
236
237 match error.http_status_code() {
238 Some(code) if code == http::StatusCode::SERVICE_UNAVAILABLE.as_u16() => {
239 RetryResult::Continue(error)
240 }
241 _ => RetryResult::Permanent(error),
242 }
243 }
244}
245
246#[derive(Clone, Debug)]
267pub struct AlwaysRetry;
268
269impl RetryPolicy for AlwaysRetry {
270 fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
271 RetryResult::Continue(error)
272 }
273}
274
275#[derive(Clone, Debug)]
293pub struct NeverRetry;
294
295impl RetryPolicy for NeverRetry {
296 fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
297 RetryResult::Exhausted(error)
298 }
299}
300
301#[derive(thiserror::Error, Debug)]
302pub struct LimitedElapsedTimeError {
303 maximum_duration: Duration,
304 #[source]
305 source: Error,
306}
307
308impl LimitedElapsedTimeError {
309 pub(crate) fn new(maximum_duration: Duration, source: Error) -> Self {
310 Self {
311 maximum_duration,
312 source,
313 }
314 }
315
316 pub fn maximum_duration(&self) -> Duration {
318 self.maximum_duration
319 }
320}
321
322impl std::fmt::Display for LimitedElapsedTimeError {
323 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
324 write!(
325 f,
326 "retry policy is exhausted after {}s, the last retry attempt was throttled",
327 self.maximum_duration.as_secs_f64()
328 )
329 }
330}
331
332#[derive(Debug)]
347pub struct LimitedElapsedTime<P = Aip194Strict>
348where
349 P: RetryPolicy,
350{
351 inner: P,
352 maximum_duration: Duration,
353}
354
355impl LimitedElapsedTime {
356 pub fn new(maximum_duration: Duration) -> Self {
367 Self {
368 inner: Aip194Strict,
369 maximum_duration,
370 }
371 }
372}
373
374impl<P> LimitedElapsedTime<P>
375where
376 P: RetryPolicy,
377{
378 pub fn custom(inner: P, maximum_duration: Duration) -> Self {
396 Self {
397 inner,
398 maximum_duration,
399 }
400 }
401
402 fn error_if_exhausted(&self, state: &RetryState, error: Error) -> ThrottleResult {
403 let deadline = state.start + self.maximum_duration;
404 let now = tokio::time::Instant::now().into_std();
405 if now < deadline {
406 ThrottleResult::Continue(error)
407 } else {
408 ThrottleResult::Exhausted(Error::exhausted(LimitedElapsedTimeError::new(
409 self.maximum_duration,
410 error,
411 )))
412 }
413 }
414}
415
416impl<P> RetryPolicy for LimitedElapsedTime<P>
417where
418 P: RetryPolicy + 'static,
419{
420 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
421 match self.inner.on_error(state, error) {
422 RetryResult::Permanent(e) => RetryResult::Permanent(e),
423 RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
424 RetryResult::Continue(e) => {
425 if tokio::time::Instant::now().into_std() >= state.start + self.maximum_duration {
426 RetryResult::Exhausted(e)
427 } else {
428 RetryResult::Continue(e)
429 }
430 }
431 }
432 }
433
434 fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
435 match self.inner.on_throttle(state, error) {
436 ThrottleResult::Continue(e) => self.error_if_exhausted(state, e),
437 ThrottleResult::Exhausted(e) => ThrottleResult::Exhausted(e),
438 }
439 }
440
441 fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
442 let deadline = state.start + self.maximum_duration;
443 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now().into_std());
444 if let Some(inner) = self.inner.remaining_time(state) {
445 return Some(std::cmp::min(remaining, inner));
446 }
447 Some(remaining)
448 }
449}
450
451#[derive(Debug)]
466pub struct LimitedAttemptCount<P = Aip194Strict>
467where
468 P: RetryPolicy,
469{
470 inner: P,
471 maximum_attempts: u32,
472}
473
474impl LimitedAttemptCount {
475 pub fn new(maximum_attempts: u32) -> Self {
483 Self {
484 inner: Aip194Strict,
485 maximum_attempts,
486 }
487 }
488}
489
490impl<P> LimitedAttemptCount<P>
491where
492 P: RetryPolicy,
493{
494 pub fn custom(inner: P, maximum_attempts: u32) -> Self {
508 Self {
509 inner,
510 maximum_attempts,
511 }
512 }
513}
514
515impl<P> RetryPolicy for LimitedAttemptCount<P>
516where
517 P: RetryPolicy,
518{
519 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
520 match self.inner.on_error(state, error) {
521 RetryResult::Permanent(e) => RetryResult::Permanent(e),
522 RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
523 RetryResult::Continue(e) => {
524 if state.attempt_count >= self.maximum_attempts {
525 RetryResult::Exhausted(e)
526 } else {
527 RetryResult::Continue(e)
528 }
529 }
530 }
531 }
532
533 fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
534 assert!(state.attempt_count < self.maximum_attempts);
537 self.inner.on_throttle(state, error)
538 }
539
540 fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
541 self.inner.remaining_time(state)
542 }
543}
544
545#[cfg(test)]
546mod tests {
547 use super::*;
548 use http::HeaderMap;
549 use std::error::Error as StdError;
550 use std::time::Instant;
551
552 #[test]
554 fn retry_policy_arg() {
555 let policy = LimitedAttemptCount::new(3);
556 let _ = RetryPolicyArg::from(policy);
557
558 let policy: Arc<dyn RetryPolicy> = Arc::new(LimitedAttemptCount::new(3));
559 let _ = RetryPolicyArg::from(policy);
560 }
561
562 #[test]
563 fn aip194_strict() {
564 let p = Aip194Strict;
565
566 let now = Instant::now();
567 assert!(
568 p.on_error(&idempotent_state(now), unavailable())
569 .is_continue()
570 );
571 assert!(
572 p.on_error(&non_idempotent_state(now), unavailable())
573 .is_permanent()
574 );
575 assert!(matches!(
576 p.on_throttle(&idempotent_state(now), unavailable()),
577 ThrottleResult::Continue(_)
578 ));
579
580 assert!(
581 p.on_error(&idempotent_state(now), permission_denied())
582 .is_permanent()
583 );
584 assert!(
585 p.on_error(&non_idempotent_state(now), permission_denied())
586 .is_permanent()
587 );
588
589 assert!(
590 p.on_error(&idempotent_state(now), http_unavailable())
591 .is_continue()
592 );
593 assert!(
594 p.on_error(&non_idempotent_state(now), http_unavailable())
595 .is_permanent()
596 );
597 assert!(matches!(
598 p.on_throttle(&idempotent_state(now), http_unavailable()),
599 ThrottleResult::Continue(_)
600 ));
601
602 assert!(
603 p.on_error(&idempotent_state(now), http_permission_denied())
604 .is_permanent()
605 );
606 assert!(
607 p.on_error(&non_idempotent_state(now), http_permission_denied())
608 .is_permanent()
609 );
610
611 assert!(
612 p.on_error(&idempotent_state(now), Error::io("err".to_string()))
613 .is_continue()
614 );
615 assert!(
616 p.on_error(&non_idempotent_state(now), Error::io("err".to_string()))
617 .is_permanent()
618 );
619
620 assert!(
621 p.on_error(&idempotent_state(now), pre_rpc_transient())
622 .is_continue()
623 );
624 assert!(
625 p.on_error(&non_idempotent_state(now), pre_rpc_transient())
626 .is_continue()
627 );
628
629 assert!(
630 p.on_error(&idempotent_state(now), Error::ser("err"))
631 .is_permanent()
632 );
633 assert!(
634 p.on_error(&non_idempotent_state(now), Error::ser("err"))
635 .is_permanent()
636 );
637 assert!(
638 p.on_error(&idempotent_state(now), Error::deser("err"))
639 .is_permanent()
640 );
641 assert!(
642 p.on_error(&non_idempotent_state(now), Error::deser("err"))
643 .is_permanent()
644 );
645
646 assert!(p.remaining_time(&idempotent_state(now)).is_none());
647 }
648
649 #[test]
650 fn always_retry() {
651 let p = AlwaysRetry;
652
653 let now = Instant::now();
654 assert!(p.remaining_time(&idempotent_state(now)).is_none());
655 assert!(
656 p.on_error(&idempotent_state(now), http_unavailable())
657 .is_continue()
658 );
659 assert!(
660 p.on_error(&non_idempotent_state(now), http_unavailable())
661 .is_continue()
662 );
663 assert!(matches!(
664 p.on_throttle(&idempotent_state(now), http_unavailable()),
665 ThrottleResult::Continue(_)
666 ));
667
668 assert!(
669 p.on_error(&idempotent_state(now), unavailable())
670 .is_continue()
671 );
672 assert!(
673 p.on_error(&non_idempotent_state(now), unavailable())
674 .is_continue()
675 );
676 }
677
678 #[test_case::test_case(true, Error::io("err"))]
679 #[test_case::test_case(true, pre_rpc_transient())]
680 #[test_case::test_case(true, Error::ser("err"))]
681 #[test_case::test_case(false, Error::io("err"))]
682 #[test_case::test_case(false, pre_rpc_transient())]
683 #[test_case::test_case(false, Error::ser("err"))]
684 fn always_retry_error_kind(idempotent: bool, error: Error) {
685 let p = AlwaysRetry;
686 let now = Instant::now();
687 let state = if idempotent {
688 idempotent_state(now)
689 } else {
690 non_idempotent_state(now)
691 };
692 assert!(p.on_error(&state, error).is_continue());
693 }
694
695 #[test]
696 fn never_retry() {
697 let p = NeverRetry;
698
699 let now = Instant::now();
700 assert!(p.remaining_time(&idempotent_state(now)).is_none());
701 assert!(
702 p.on_error(&idempotent_state(now), http_unavailable())
703 .is_exhausted()
704 );
705 assert!(
706 p.on_error(&non_idempotent_state(now), http_unavailable())
707 .is_exhausted()
708 );
709 assert!(matches!(
710 p.on_throttle(&idempotent_state(now), http_unavailable()),
711 ThrottleResult::Continue(_)
712 ));
713
714 assert!(
715 p.on_error(&idempotent_state(now), unavailable())
716 .is_exhausted()
717 );
718 assert!(
719 p.on_error(&non_idempotent_state(now), unavailable())
720 .is_exhausted()
721 );
722
723 assert!(
724 p.on_error(&idempotent_state(now), http_permission_denied())
725 .is_exhausted()
726 );
727 assert!(
728 p.on_error(&non_idempotent_state(now), http_permission_denied())
729 .is_exhausted()
730 );
731 }
732
733 #[test_case::test_case(true, Error::io("err"))]
734 #[test_case::test_case(true, pre_rpc_transient())]
735 #[test_case::test_case(true, Error::ser("err"))]
736 #[test_case::test_case(false, Error::io("err"))]
737 #[test_case::test_case(false, pre_rpc_transient())]
738 #[test_case::test_case(false, Error::ser("err"))]
739 fn never_retry_error_kind(idempotent: bool, error: Error) {
740 let p = NeverRetry;
741 let now = Instant::now();
742 let state = if idempotent {
743 idempotent_state(now)
744 } else {
745 non_idempotent_state(now)
746 };
747 assert!(p.on_error(&state, error).is_exhausted());
748 }
749
750 fn pre_rpc_transient() -> Error {
751 use crate::error::CredentialsError;
752 Error::authentication(CredentialsError::from_msg(true, "err"))
753 }
754
755 fn http_unavailable() -> Error {
756 Error::http(
757 503_u16,
758 HeaderMap::new(),
759 bytes::Bytes::from_owner("SERVICE UNAVAILABLE".to_string()),
760 )
761 }
762
763 fn http_permission_denied() -> Error {
764 Error::http(
765 403_u16,
766 HeaderMap::new(),
767 bytes::Bytes::from_owner("PERMISSION DENIED".to_string()),
768 )
769 }
770
771 fn unavailable() -> Error {
772 use crate::error::rpc::Code;
773 let status = crate::error::rpc::Status::default()
774 .set_code(Code::Unavailable)
775 .set_message("UNAVAILABLE");
776 Error::service(status)
777 }
778
779 fn permission_denied() -> Error {
780 use crate::error::rpc::Code;
781 let status = crate::error::rpc::Status::default()
782 .set_code(Code::PermissionDenied)
783 .set_message("PERMISSION_DENIED");
784 Error::service(status)
785 }
786
787 mockall::mock! {
788 #[derive(Debug)]
789 Policy {}
790 impl RetryPolicy for Policy {
791 fn on_error(&self, state: &RetryState, error: Error) -> RetryResult;
792 fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult;
793 fn remaining_time(&self, state: &RetryState) -> Option<Duration>;
794 }
795 }
796
797 #[test]
798 fn limited_elapsed_time_error() {
799 let limit = Duration::from_secs(123) + Duration::from_millis(567);
800 let err = LimitedElapsedTimeError::new(limit, unavailable());
801 assert_eq!(err.maximum_duration(), limit);
802 let fmt = err.to_string();
803 assert!(fmt.contains("123.567s"), "display={fmt}, debug={err:?}");
804 assert!(err.source().is_some(), "{err:?}");
805 }
806
807 #[test]
808 fn test_limited_time_forwards() {
809 let mut mock = MockPolicy::new();
810 mock.expect_on_error()
811 .times(1..)
812 .returning(|_, e| RetryResult::Continue(e));
813 mock.expect_on_throttle()
814 .times(1..)
815 .returning(|_, e| ThrottleResult::Continue(e));
816 mock.expect_remaining_time().times(1).returning(|_| None);
817
818 let now = Instant::now();
819 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
820 let rf = policy.on_error(&idempotent_state(now), transient_error());
821 assert!(rf.is_continue());
822
823 let rt = policy.remaining_time(&idempotent_state(now));
824 assert!(rt.is_some());
825
826 let e = policy.on_throttle(&idempotent_state(now), transient_error());
827 assert!(matches!(e, ThrottleResult::Continue(_)));
828 }
829
830 #[test]
831 fn test_limited_time_on_throttle_continue() {
832 let mut mock = MockPolicy::new();
833 mock.expect_on_throttle()
834 .times(1..)
835 .returning(|_, e| ThrottleResult::Continue(e));
836
837 let now = Instant::now();
838 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
839
840 let rf = policy.on_throttle(
842 &idempotent_state(now - Duration::from_secs(50)),
843 unavailable(),
844 );
845 assert!(matches!(rf, ThrottleResult::Continue(_)), "{rf:?}");
846
847 let rf = policy.on_throttle(
849 &idempotent_state(now - Duration::from_secs(70)),
850 unavailable(),
851 );
852 assert!(matches!(rf, ThrottleResult::Exhausted(_)), "{rf:?}");
853 }
854
855 #[test]
856 fn test_limited_time_on_throttle_exhausted() {
857 let mut mock = MockPolicy::new();
858 mock.expect_on_throttle()
859 .times(1..)
860 .returning(|_, e| ThrottleResult::Exhausted(e));
861
862 let now = Instant::now();
863 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
864
865 let rf = policy.on_throttle(
867 &idempotent_state(now - Duration::from_secs(50)),
868 unavailable(),
869 );
870 assert!(matches!(rf, ThrottleResult::Exhausted(_)), "{rf:?}");
871 }
872
873 #[test]
874 fn test_limited_time_inner_continues() {
875 let mut mock = MockPolicy::new();
876 mock.expect_on_error()
877 .times(1..)
878 .returning(|_, e| RetryResult::Continue(e));
879
880 let now = Instant::now();
881 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
882 let rf = policy.on_error(
883 &idempotent_state(now - Duration::from_secs(10)),
884 transient_error(),
885 );
886 assert!(rf.is_continue());
887
888 let rf = policy.on_error(
889 &idempotent_state(now - Duration::from_secs(70)),
890 transient_error(),
891 );
892 assert!(rf.is_exhausted());
893 }
894
895 #[test]
896 fn test_limited_time_inner_permanent() {
897 let mut mock = MockPolicy::new();
898 mock.expect_on_error()
899 .times(2)
900 .returning(|_, e| RetryResult::Permanent(e));
901
902 let now = Instant::now();
903 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
904
905 let rf = policy.on_error(
906 &non_idempotent_state(now - Duration::from_secs(10)),
907 transient_error(),
908 );
909 assert!(rf.is_permanent());
910
911 let rf = policy.on_error(
912 &non_idempotent_state(now + Duration::from_secs(10)),
913 transient_error(),
914 );
915 assert!(rf.is_permanent());
916 }
917
918 #[test]
919 fn test_limited_time_inner_exhausted() {
920 let mut mock = MockPolicy::new();
921 mock.expect_on_error()
922 .times(2)
923 .returning(|_, e| RetryResult::Exhausted(e));
924
925 let now = Instant::now();
926 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
927
928 let rf = policy.on_error(
929 &non_idempotent_state(now - Duration::from_secs(10)),
930 transient_error(),
931 );
932 assert!(rf.is_exhausted());
933
934 let rf = policy.on_error(
935 &non_idempotent_state(now + Duration::from_secs(10)),
936 transient_error(),
937 );
938 assert!(rf.is_exhausted());
939 }
940
941 #[test]
942 fn test_limited_time_remaining_inner_longer() {
943 let mut mock = MockPolicy::new();
944 mock.expect_remaining_time()
945 .times(1)
946 .returning(|_| Some(Duration::from_secs(30)));
947
948 let now = Instant::now();
949 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
950
951 let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(55)));
952 assert!(remaining <= Some(Duration::from_secs(5)), "{remaining:?}");
953 }
954
955 #[test]
956 fn test_limited_time_remaining_inner_shorter() {
957 let mut mock = MockPolicy::new();
958 mock.expect_remaining_time()
959 .times(1)
960 .returning(|_| Some(Duration::from_secs(5)));
961 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
962
963 let now = Instant::now();
964 let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(5)));
965 assert!(remaining <= Some(Duration::from_secs(10)), "{remaining:?}");
966 }
967
968 #[test]
969 fn test_limited_time_remaining_inner_is_none() {
970 let mut mock = MockPolicy::new();
971 mock.expect_remaining_time().times(1).returning(|_| None);
972 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
973
974 let now = Instant::now();
975 let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(50)));
976 assert!(remaining <= Some(Duration::from_secs(10)), "{remaining:?}");
977 }
978
979 #[test]
980 fn test_limited_attempt_count_on_error() {
981 let mut mock = MockPolicy::new();
982 mock.expect_on_error()
983 .times(1..)
984 .returning(|_, e| RetryResult::Continue(e));
985
986 let now = Instant::now();
987 let policy = LimitedAttemptCount::custom(mock, 3);
988 assert!(
989 policy
990 .on_error(
991 &idempotent_state(now).set_attempt_count(1_u32),
992 transient_error()
993 )
994 .is_continue()
995 );
996 assert!(
997 policy
998 .on_error(
999 &idempotent_state(now).set_attempt_count(2_u32),
1000 transient_error()
1001 )
1002 .is_continue()
1003 );
1004 assert!(
1005 policy
1006 .on_error(
1007 &idempotent_state(now).set_attempt_count(3_u32),
1008 transient_error()
1009 )
1010 .is_exhausted()
1011 );
1012 }
1013
1014 #[test]
1015 fn test_limited_attempt_count_on_throttle_continue() {
1016 let mut mock = MockPolicy::new();
1017 mock.expect_on_throttle()
1018 .times(1..)
1019 .returning(|_, e| ThrottleResult::Continue(e));
1020
1021 let now = Instant::now();
1022 let policy = LimitedAttemptCount::custom(mock, 3);
1023 assert!(matches!(
1024 policy.on_throttle(
1025 &idempotent_state(now).set_attempt_count(2_u32),
1026 unavailable()
1027 ),
1028 ThrottleResult::Continue(_)
1029 ));
1030 }
1031
1032 #[test]
1033 fn test_limited_attempt_count_on_throttle_error() {
1034 let mut mock = MockPolicy::new();
1035 mock.expect_on_throttle()
1036 .times(1..)
1037 .returning(|_, e| ThrottleResult::Exhausted(e));
1038
1039 let now = Instant::now();
1040 let policy = LimitedAttemptCount::custom(mock, 3);
1041 assert!(matches!(
1042 policy.on_throttle(&idempotent_state(now), unavailable()),
1043 ThrottleResult::Exhausted(_)
1044 ));
1045 }
1046
1047 #[test]
1048 fn test_limited_attempt_count_remaining_none() {
1049 let mut mock = MockPolicy::new();
1050 mock.expect_remaining_time().times(1).returning(|_| None);
1051 let policy = LimitedAttemptCount::custom(mock, 3);
1052
1053 let now = Instant::now();
1054 assert!(policy.remaining_time(&idempotent_state(now)).is_none());
1055 }
1056
1057 #[test]
1058 fn test_limited_attempt_count_remaining_some() {
1059 let mut mock = MockPolicy::new();
1060 mock.expect_remaining_time()
1061 .times(1)
1062 .returning(|_| Some(Duration::from_secs(123)));
1063 let policy = LimitedAttemptCount::custom(mock, 3);
1064
1065 let now = Instant::now();
1066 assert_eq!(
1067 policy.remaining_time(&idempotent_state(now)),
1068 Some(Duration::from_secs(123))
1069 );
1070 }
1071
1072 #[test]
1073 fn test_limited_attempt_count_inner_permanent() {
1074 let mut mock = MockPolicy::new();
1075 mock.expect_on_error()
1076 .times(2)
1077 .returning(|_, e| RetryResult::Permanent(e));
1078 let policy = LimitedAttemptCount::custom(mock, 2);
1079 let now = Instant::now();
1080
1081 let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1082 assert!(rf.is_permanent());
1083
1084 let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1085 assert!(rf.is_permanent());
1086 }
1087
1088 #[test]
1089 fn test_limited_attempt_count_inner_exhausted() {
1090 let mut mock = MockPolicy::new();
1091 mock.expect_on_error()
1092 .times(2)
1093 .returning(|_, e| RetryResult::Exhausted(e));
1094 let policy = LimitedAttemptCount::custom(mock, 2);
1095 let now = Instant::now();
1096
1097 let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1098 assert!(rf.is_exhausted());
1099
1100 let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1101 assert!(rf.is_exhausted());
1102 }
1103
1104 fn transient_error() -> Error {
1105 use crate::error::rpc::{Code, Status};
1106 Error::service(
1107 Status::default()
1108 .set_code(Code::Unavailable)
1109 .set_message("try-again"),
1110 )
1111 }
1112
1113 fn idempotent_state(now: Instant) -> RetryState {
1114 RetryState::new(true).set_start(now)
1115 }
1116
1117 fn non_idempotent_state(now: Instant) -> RetryState {
1118 RetryState::new(false).set_start(now)
1119 }
1120}