1use crate::error::Error;
45use crate::polling_state::PollingState;
46use crate::retry_result::RetryResult;
47use std::sync::Arc;
48
49pub trait PollingErrorPolicy: Send + Sync + std::fmt::Debug {
54 #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
60 fn on_error(&self, state: &PollingState, error: Error) -> RetryResult;
61
62 #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
68 fn on_in_progress(&self, _state: &PollingState, _operation_name: &str) -> Result<(), Error> {
69 Ok(())
70 }
71}
72
73#[derive(Clone)]
75pub struct PollingErrorPolicyArg(pub(crate) Arc<dyn PollingErrorPolicy>);
76
77impl<T> std::convert::From<T> for PollingErrorPolicyArg
78where
79 T: PollingErrorPolicy + 'static,
80{
81 fn from(value: T) -> Self {
82 Self(Arc::new(value))
83 }
84}
85
86impl std::convert::From<Arc<dyn PollingErrorPolicy>> for PollingErrorPolicyArg {
87 fn from(value: Arc<dyn PollingErrorPolicy>) -> Self {
88 Self(value)
89 }
90}
91
92pub trait PollingErrorPolicyExt: PollingErrorPolicy + Sized {
94 fn with_time_limit(self, maximum_duration: std::time::Duration) -> LimitedElapsedTime<Self> {
117 LimitedElapsedTime::custom(self, maximum_duration)
118 }
119
120 fn with_attempt_limit(self, maximum_attempts: u32) -> LimitedAttemptCount<Self> {
148 LimitedAttemptCount::custom(self, maximum_attempts)
149 }
150}
151
152impl<T: PollingErrorPolicy> PollingErrorPolicyExt for T {}
153
154#[derive(Clone, Debug)]
177pub struct Aip194Strict;
178
179impl PollingErrorPolicy for Aip194Strict {
180 fn on_error(&self, _state: &PollingState, error: Error) -> RetryResult {
181 if error.is_transient_and_before_rpc() {
182 return RetryResult::Continue(error);
183 }
184 if error.is_io() {
185 return RetryResult::Continue(error);
186 }
187 if let Some(status) = error.status() {
188 return if status.code == crate::error::rpc::Code::Unavailable {
189 RetryResult::Continue(error)
190 } else {
191 RetryResult::Permanent(error)
192 };
193 }
194
195 match error.http_status_code() {
196 Some(code) if code == http::StatusCode::SERVICE_UNAVAILABLE.as_u16() => {
197 RetryResult::Continue(error)
198 }
199 _ => RetryResult::Permanent(error),
200 }
201 }
202}
203
204#[derive(Clone, Debug)]
225pub struct AlwaysContinue;
226
227impl PollingErrorPolicy for AlwaysContinue {
228 fn on_error(&self, _state: &PollingState, error: Error) -> RetryResult {
229 RetryResult::Continue(error)
230 }
231}
232
233#[derive(Debug)]
249pub struct LimitedElapsedTime<P = Aip194Strict>
250where
251 P: PollingErrorPolicy,
252{
253 inner: P,
254 maximum_duration: std::time::Duration,
255}
256
257impl LimitedElapsedTime {
258 pub fn new(maximum_duration: std::time::Duration) -> Self {
274 Self {
275 inner: Aip194Strict,
276 maximum_duration,
277 }
278 }
279}
280
281impl<P> LimitedElapsedTime<P>
282where
283 P: PollingErrorPolicy,
284{
285 pub fn custom(inner: P, maximum_duration: std::time::Duration) -> Self {
301 Self {
302 inner,
303 maximum_duration,
304 }
305 }
306
307 fn in_progress_impl(
308 &self,
309 start: std::time::Instant,
310 operation_name: &str,
311 ) -> Result<(), Error> {
312 let now = std::time::Instant::now();
313 if now < start + self.maximum_duration {
314 return Ok(());
315 }
316 Err(Error::exhausted(Exhausted::new(
317 operation_name,
318 "elapsed time",
319 format!("{:?}", now.checked_duration_since(start).unwrap()),
320 format!("{:?}", self.maximum_duration),
321 )))
322 }
323}
324
325impl<P> PollingErrorPolicy for LimitedElapsedTime<P>
326where
327 P: PollingErrorPolicy + 'static,
328{
329 fn on_error(&self, state: &PollingState, error: Error) -> RetryResult {
330 match self.inner.on_error(state, error) {
331 RetryResult::Permanent(e) => RetryResult::Permanent(e),
332 RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
333 RetryResult::Continue(e) => {
334 if std::time::Instant::now() >= state.start + self.maximum_duration {
335 RetryResult::Exhausted(e)
336 } else {
337 RetryResult::Continue(e)
338 }
339 }
340 }
341 }
342
343 fn on_in_progress(&self, state: &PollingState, operation_name: &str) -> Result<(), Error> {
344 self.inner
345 .on_in_progress(state, operation_name)
346 .and_then(|_| self.in_progress_impl(state.start, operation_name))
347 }
348}
349
350#[derive(Debug)]
364pub struct LimitedAttemptCount<P = Aip194Strict>
365where
366 P: PollingErrorPolicy,
367{
368 inner: P,
369 maximum_attempts: u32,
370}
371
372impl LimitedAttemptCount {
373 pub fn new(maximum_attempts: u32) -> Self {
389 Self {
390 inner: Aip194Strict,
391 maximum_attempts,
392 }
393 }
394}
395
396impl<P> LimitedAttemptCount<P>
397where
398 P: PollingErrorPolicy,
399{
400 pub fn custom(inner: P, maximum_attempts: u32) -> Self {
415 Self {
416 inner,
417 maximum_attempts,
418 }
419 }
420
421 fn in_progress_impl(&self, count: u32, operation_name: &str) -> Result<(), Error> {
422 if count < self.maximum_attempts {
423 return Ok(());
424 }
425 Err(Error::exhausted(Exhausted::new(
426 operation_name,
427 "attempt count",
428 count.to_string(),
429 self.maximum_attempts.to_string(),
430 )))
431 }
432}
433
434impl<P> PollingErrorPolicy for LimitedAttemptCount<P>
435where
436 P: PollingErrorPolicy,
437{
438 fn on_error(&self, state: &PollingState, error: Error) -> RetryResult {
439 match self.inner.on_error(state, error) {
440 RetryResult::Permanent(e) => RetryResult::Permanent(e),
441 RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
442 RetryResult::Continue(e) => {
443 if state.attempt_count >= self.maximum_attempts {
444 RetryResult::Exhausted(e)
445 } else {
446 RetryResult::Continue(e)
447 }
448 }
449 }
450 }
451
452 fn on_in_progress(&self, state: &PollingState, operation_name: &str) -> Result<(), Error> {
453 self.inner
454 .on_in_progress(state, operation_name)
455 .and_then(|_| self.in_progress_impl(state.attempt_count, operation_name))
456 }
457}
458
459#[derive(Debug)]
461pub struct Exhausted {
462 operation_name: String,
463 limit_name: &'static str,
464 value: String,
465 limit: String,
466}
467
468impl Exhausted {
469 pub fn new(
470 operation_name: &str,
471 limit_name: &'static str,
472 value: String,
473 limit: String,
474 ) -> Self {
475 Self {
476 operation_name: operation_name.to_string(),
477 limit_name,
478 value,
479 limit,
480 }
481 }
482}
483
484impl std::fmt::Display for Exhausted {
485 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
486 write!(
487 f,
488 "polling loop for {} exhausted, {} value ({}) exceeds limit ({})",
489 self.operation_name, self.limit_name, self.value, self.limit
490 )
491 }
492}
493
494impl std::error::Error for Exhausted {}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499 use crate::error::{CredentialsError, Error};
500 use http::HeaderMap;
501 use std::error::Error as _;
502 use std::time::{Duration, Instant};
503
504 mockall::mock! {
505 #[derive(Debug)]
506 Policy {}
507 impl PollingErrorPolicy for Policy {
508 fn on_error(&self, state: &PollingState, error: Error) -> RetryResult;
509 fn on_in_progress(&self, state: &PollingState, operation_name: &str) -> Result<(), Error>;
510 }
511 }
512
513 #[test]
515 fn polling_policy_arg() {
516 let policy = LimitedAttemptCount::new(3);
517 let _ = PollingErrorPolicyArg::from(policy);
518
519 let policy: Arc<dyn PollingErrorPolicy> = Arc::new(LimitedAttemptCount::new(3));
520 let _ = PollingErrorPolicyArg::from(policy);
521 }
522
523 #[test]
524 fn aip194_strict() {
525 let p = Aip194Strict;
526
527 assert!(p.on_in_progress(&PollingState::default(), "unused").is_ok());
528 assert!(
529 p.on_error(&PollingState::default(), unavailable())
530 .is_continue()
531 );
532 assert!(
533 p.on_error(&PollingState::default(), permission_denied())
534 .is_permanent()
535 );
536 assert!(
537 p.on_error(&PollingState::default(), http_unavailable())
538 .is_continue()
539 );
540 assert!(
541 p.on_error(&PollingState::default(), http_permission_denied())
542 .is_permanent()
543 );
544
545 assert!(
546 p.on_error(&PollingState::default(), Error::io("err".to_string()))
547 .is_continue()
548 );
549
550 assert!(
551 p.on_error(
552 &PollingState::default(),
553 Error::authentication(CredentialsError::from_msg(true, "err"))
554 )
555 .is_continue()
556 );
557
558 assert!(
559 p.on_error(&PollingState::default(), Error::ser("err".to_string()))
560 .is_permanent()
561 );
562 }
563
564 #[test]
565 fn always_continue() {
566 let p = AlwaysContinue;
567
568 assert!(p.on_in_progress(&PollingState::default(), "unused").is_ok());
569 assert!(
570 p.on_error(&PollingState::default(), http_unavailable())
571 .is_continue()
572 );
573 assert!(
574 p.on_error(&PollingState::default(), unavailable())
575 .is_continue()
576 );
577 }
578
579 #[test_case::test_case(Error::io("err"))]
580 #[test_case::test_case(Error::authentication(CredentialsError::from_msg(true, "err")))]
581 #[test_case::test_case(Error::ser("err"))]
582 fn always_continue_error_kind(error: Error) {
583 let p = AlwaysContinue;
584 assert!(p.on_error(&PollingState::default(), error).is_continue());
585 }
586
587 #[test]
588 fn with_time_limit() {
589 let policy = AlwaysContinue.with_time_limit(Duration::from_secs(10));
590 assert!(
591 policy
592 .on_error(
593 &PollingState::default()
594 .set_start(Instant::now() - Duration::from_secs(1))
595 .set_attempt_count(1_u32),
596 permission_denied()
597 )
598 .is_continue(),
599 "{policy:?}"
600 );
601 assert!(
602 policy
603 .on_error(
604 &PollingState::default()
605 .set_start(Instant::now() - Duration::from_secs(20))
606 .set_attempt_count(1_u32),
607 permission_denied()
608 )
609 .is_exhausted(),
610 "{policy:?}"
611 );
612 }
613
614 #[test]
615 fn with_attempt_limit() {
616 let policy = AlwaysContinue.with_attempt_limit(3);
617 assert!(
618 policy
619 .on_error(
620 &PollingState::default().set_attempt_count(1_u32),
621 permission_denied()
622 )
623 .is_continue(),
624 "{policy:?}"
625 );
626 assert!(
627 policy
628 .on_error(
629 &PollingState::default().set_attempt_count(5_u32),
630 permission_denied()
631 )
632 .is_exhausted(),
633 "{policy:?}"
634 );
635 }
636
637 fn http_error(code: u16, message: &str) -> Error {
638 let error = serde_json::json!({"error": {
639 "code": code,
640 "message": message,
641 }});
642 let payload = bytes::Bytes::from_owner(serde_json::to_string(&error).unwrap());
643 Error::http(code, HeaderMap::new(), payload)
644 }
645
646 fn http_unavailable() -> Error {
647 http_error(503, "SERVICE UNAVAILABLE")
648 }
649
650 fn http_permission_denied() -> Error {
651 http_error(403, "PERMISSION DENIED")
652 }
653
654 fn unavailable() -> Error {
655 use crate::error::rpc::Code;
656 let status = crate::error::rpc::Status::default()
657 .set_code(Code::Unavailable)
658 .set_message("UNAVAILABLE");
659 Error::service(status)
660 }
661
662 fn permission_denied() -> Error {
663 use crate::error::rpc::Code;
664 let status = crate::error::rpc::Status::default()
665 .set_code(Code::PermissionDenied)
666 .set_message("PERMISSION_DENIED");
667 Error::service(status)
668 }
669
670 #[test]
671 fn test_limited_elapsed_time_on_error() {
672 let policy = LimitedElapsedTime::new(Duration::from_secs(20));
673 assert!(
674 policy
675 .on_error(
676 &PollingState::default()
677 .set_start(Instant::now() - Duration::from_secs(10))
678 .set_attempt_count(1_u32),
679 unavailable()
680 )
681 .is_continue(),
682 "{policy:?}"
683 );
684 assert!(
685 policy
686 .on_error(
687 &PollingState::default()
688 .set_start(Instant::now() - Duration::from_secs(30))
689 .set_attempt_count(1_u32),
690 unavailable()
691 )
692 .is_exhausted(),
693 "{policy:?}"
694 );
695 }
696
697 #[test]
698 fn test_limited_elapsed_time_in_progress() {
699 let policy = LimitedElapsedTime::new(Duration::from_secs(20));
700 let result = policy.on_in_progress(
701 &PollingState::default()
702 .set_start(Instant::now() - Duration::from_secs(10))
703 .set_attempt_count(1_u32),
704 "unused",
705 );
706 assert!(result.is_ok(), "{result:?}");
707 let err = policy
708 .on_in_progress(
709 &PollingState::default()
710 .set_start(Instant::now() - Duration::from_secs(30))
711 .set_attempt_count(1_u32),
712 "test-operation-name",
713 )
714 .unwrap_err();
715 let exhausted = err.source().and_then(|e| e.downcast_ref::<Exhausted>());
716 assert!(exhausted.is_some());
717 }
718
719 #[test]
720 fn test_limited_time_forwards_on_error() {
721 let mut mock = MockPolicy::new();
722 mock.expect_on_error()
723 .times(1..)
724 .returning(|_, e| RetryResult::Continue(e));
725
726 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
727 let rf = policy.on_error(&PollingState::default(), transient_error());
728 assert!(rf.is_continue());
729 }
730
731 #[test]
732 fn test_limited_time_forwards_in_progress() {
733 let mut mock = MockPolicy::new();
734 mock.expect_on_in_progress()
735 .times(3)
736 .returning(|_, _| Ok(()));
737
738 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
739 assert!(
740 policy
741 .on_in_progress(
742 &PollingState::default().set_attempt_count(1_u32),
743 "test-op-name"
744 )
745 .is_ok()
746 );
747 assert!(
748 policy
749 .on_in_progress(
750 &PollingState::default().set_attempt_count(2_u32),
751 "test-op-name"
752 )
753 .is_ok()
754 );
755 assert!(
756 policy
757 .on_in_progress(
758 &PollingState::default().set_attempt_count(3_u32),
759 "test-op-name"
760 )
761 .is_ok()
762 );
763 }
764
765 #[test]
766 fn test_limited_time_in_progress_returns_inner() {
767 let mut mock = MockPolicy::new();
768 mock.expect_on_in_progress()
769 .times(1)
770 .returning(|_, _| Err(transient_error()));
771
772 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
773 assert!(
774 policy
775 .on_in_progress(
776 &PollingState::default().set_attempt_count(1_u32),
777 "test-op-name"
778 )
779 .is_err()
780 );
781 }
782
783 #[test]
784 fn test_limited_time_inner_continues() {
785 let mut mock = MockPolicy::new();
786 mock.expect_on_error()
787 .times(1..)
788 .returning(|_, e| RetryResult::Continue(e));
789
790 let now = std::time::Instant::now();
791 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
792 let rf = policy.on_error(
793 &PollingState::default()
794 .set_start(now - Duration::from_secs(10))
795 .set_attempt_count(1_u32),
796 transient_error(),
797 );
798 assert!(rf.is_continue());
799
800 let rf = policy.on_error(
801 &PollingState::default()
802 .set_start(now - Duration::from_secs(70))
803 .set_attempt_count(1_u32),
804 transient_error(),
805 );
806 assert!(rf.is_exhausted());
807 }
808
809 #[test]
810 fn test_limited_time_inner_permanent() {
811 let mut mock = MockPolicy::new();
812 mock.expect_on_error()
813 .times(2)
814 .returning(|_, e| RetryResult::Permanent(e));
815
816 let now = std::time::Instant::now();
817 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
818
819 let rf = policy.on_error(
820 &PollingState::default()
821 .set_start(now - Duration::from_secs(10))
822 .set_attempt_count(1_u32),
823 transient_error(),
824 );
825 assert!(rf.is_permanent());
826
827 let rf = policy.on_error(
828 &PollingState::default()
829 .set_start(now + Duration::from_secs(10))
830 .set_attempt_count(1_u32),
831 transient_error(),
832 );
833 assert!(rf.is_permanent());
834 }
835
836 #[test]
837 fn test_limited_time_inner_exhausted() {
838 let mut mock = MockPolicy::new();
839 mock.expect_on_error()
840 .times(2)
841 .returning(|_, e| RetryResult::Exhausted(e));
842
843 let now = std::time::Instant::now();
844 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
845
846 let rf = policy.on_error(
847 &PollingState::default()
848 .set_start(now - Duration::from_secs(10))
849 .set_attempt_count(1_u32),
850 transient_error(),
851 );
852 assert!(rf.is_exhausted());
853
854 let rf = policy.on_error(
855 &PollingState::default()
856 .set_start(now + Duration::from_secs(10))
857 .set_attempt_count(1_u32),
858 transient_error(),
859 );
860 assert!(rf.is_exhausted());
861 }
862
863 #[test]
864 fn test_limited_attempt_count_on_error() {
865 let policy = LimitedAttemptCount::new(20);
866 assert!(
867 policy
868 .on_error(
869 &PollingState::default().set_attempt_count(10_u32),
870 unavailable()
871 )
872 .is_continue(),
873 "{policy:?}"
874 );
875 assert!(
876 policy
877 .on_error(
878 &PollingState::default().set_attempt_count(30_u32),
879 unavailable()
880 )
881 .is_exhausted(),
882 "{policy:?}"
883 );
884 }
885
886 #[test]
887 fn test_limited_attempt_count_in_progress() {
888 let policy = LimitedAttemptCount::new(20);
889 let result =
890 policy.on_in_progress(&PollingState::default().set_attempt_count(10_u32), "unused");
891 assert!(result.is_ok(), "{result:?}");
892 let err = policy
893 .on_in_progress(
894 &PollingState::default().set_attempt_count(30_u32),
895 "test-operation-name",
896 )
897 .unwrap_err();
898 let exhausted = err.source().and_then(|e| e.downcast_ref::<Exhausted>());
899 assert!(exhausted.is_some());
900 }
901
902 #[test]
903 fn test_limited_attempt_count_forwards_on_error() {
904 let mut mock = MockPolicy::new();
905 mock.expect_on_error()
906 .times(1..)
907 .returning(|_, e| RetryResult::Continue(e));
908
909 let policy = LimitedAttemptCount::custom(mock, 3);
910 assert!(
911 policy
912 .on_error(
913 &PollingState::default().set_attempt_count(1_u32),
914 transient_error()
915 )
916 .is_continue()
917 );
918 assert!(
919 policy
920 .on_error(
921 &PollingState::default().set_attempt_count(2_u32),
922 transient_error()
923 )
924 .is_continue()
925 );
926 assert!(
927 policy
928 .on_error(
929 &PollingState::default().set_attempt_count(3_u32),
930 transient_error()
931 )
932 .is_exhausted()
933 );
934 }
935
936 #[test]
937 fn test_limited_attempt_count_forwards_in_progress() {
938 let mut mock = MockPolicy::new();
939 mock.expect_on_in_progress()
940 .times(3)
941 .returning(|_, _| Ok(()));
942
943 let policy = LimitedAttemptCount::custom(mock, 5);
944 assert!(
945 policy
946 .on_in_progress(
947 &PollingState::default().set_attempt_count(1_u32),
948 "test-op-name"
949 )
950 .is_ok()
951 );
952 assert!(
953 policy
954 .on_in_progress(
955 &PollingState::default().set_attempt_count(2_u32),
956 "test-op-name"
957 )
958 .is_ok()
959 );
960 assert!(
961 policy
962 .on_in_progress(
963 &PollingState::default().set_attempt_count(3_u32),
964 "test-op-name"
965 )
966 .is_ok()
967 );
968 }
969
970 #[test]
971 fn test_limited_attempt_count_in_progress_returns_inner() {
972 let mut mock = MockPolicy::new();
973 mock.expect_on_in_progress()
974 .times(1)
975 .returning(|_, _| Err(unavailable()));
976
977 let policy = LimitedAttemptCount::custom(mock, 5);
978 assert!(
979 policy
980 .on_in_progress(
981 &PollingState::default().set_attempt_count(1_u32),
982 "test-op-name"
983 )
984 .is_err()
985 );
986 }
987
988 #[test]
989 fn test_limited_attempt_count_inner_permanent() {
990 let mut mock = MockPolicy::new();
991 mock.expect_on_error()
992 .times(2)
993 .returning(|_, e| RetryResult::Permanent(e));
994 let policy = LimitedAttemptCount::custom(mock, 2);
995 let rf = policy.on_error(
996 &PollingState::default().set_attempt_count(1_u32),
997 Error::ser("err"),
998 );
999 assert!(rf.is_permanent());
1000
1001 let rf = policy.on_error(
1002 &PollingState::default().set_attempt_count(1_u32),
1003 Error::ser("err"),
1004 );
1005 assert!(rf.is_permanent());
1006 }
1007
1008 #[test]
1009 fn test_limited_attempt_count_inner_exhausted() {
1010 let mut mock = MockPolicy::new();
1011 mock.expect_on_error()
1012 .times(2)
1013 .returning(|_, e| RetryResult::Exhausted(e));
1014 let policy = LimitedAttemptCount::custom(mock, 2);
1015
1016 let rf = policy.on_error(
1017 &PollingState::default().set_attempt_count(1_u32),
1018 transient_error(),
1019 );
1020 assert!(rf.is_exhausted());
1021
1022 let rf = policy.on_error(
1023 &PollingState::default().set_attempt_count(1_u32),
1024 transient_error(),
1025 );
1026 assert!(rf.is_exhausted());
1027 }
1028
1029 #[test]
1030 fn test_exhausted_fmt() {
1031 let exhausted = Exhausted::new(
1032 "op-name",
1033 "limit-name",
1034 "test-value".to_string(),
1035 "test-limit".to_string(),
1036 );
1037 let fmt = format!("{exhausted}");
1038 assert!(fmt.contains("op-name"), "{fmt}");
1039 assert!(fmt.contains("limit-name"), "{fmt}");
1040 assert!(fmt.contains("test-value"), "{fmt}");
1041 assert!(fmt.contains("test-limit"), "{fmt}");
1042 }
1043
1044 fn transient_error() -> Error {
1045 use crate::error::rpc::{Code, Status};
1046 Error::service(
1047 Status::default()
1048 .set_code(Code::Unavailable)
1049 .set_message("try-again"),
1050 )
1051 }
1052}