1use crate::error::Error;
46use crate::retry_result::RetryResult;
47use std::sync::Arc;
48
49pub trait PollingErrorPolicy: Send + Sync + std::fmt::Debug {
54 #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
63 fn on_error(
64 &self,
65 loop_start: std::time::Instant,
66 attempt_count: u32,
67 error: Error,
68 ) -> RetryResult;
69
70 #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
73 fn on_in_progress(
74 &self,
75 _loop_start: std::time::Instant,
76 _attempt_count: u32,
77 _operation_name: &str,
78 ) -> Result<(), Error> {
79 Ok(())
80 }
81}
82
83#[derive(Clone)]
85pub struct PollingErrorPolicyArg(pub(crate) Arc<dyn PollingErrorPolicy>);
86
87impl<T> std::convert::From<T> for PollingErrorPolicyArg
88where
89 T: PollingErrorPolicy + 'static,
90{
91 fn from(value: T) -> Self {
92 Self(Arc::new(value))
93 }
94}
95
96impl std::convert::From<Arc<dyn PollingErrorPolicy>> for PollingErrorPolicyArg {
97 fn from(value: Arc<dyn PollingErrorPolicy>) -> Self {
98 Self(value)
99 }
100}
101
102pub trait PollingErrorPolicyExt: PollingErrorPolicy + Sized {
104 fn with_time_limit(self, maximum_duration: std::time::Duration) -> LimitedElapsedTime<Self> {
126 LimitedElapsedTime::custom(self, maximum_duration)
127 }
128
129 fn with_attempt_limit(self, maximum_attempts: u32) -> LimitedAttemptCount<Self> {
157 LimitedAttemptCount::custom(self, maximum_attempts)
158 }
159}
160
161impl<T: PollingErrorPolicy> PollingErrorPolicyExt for T {}
162
163#[derive(Clone, Debug)]
186pub struct Aip194Strict;
187
188impl PollingErrorPolicy for Aip194Strict {
189 fn on_error(
190 &self,
191 _loop_start: std::time::Instant,
192 _attempt_count: u32,
193 error: Error,
194 ) -> RetryResult {
195 if error.is_transient_and_before_rpc() {
196 return RetryResult::Continue(error);
197 }
198 if error.is_io() {
199 return RetryResult::Continue(error);
200 }
201 if let Some(status) = error.status() {
202 return if status.code == crate::error::rpc::Code::Unavailable {
203 RetryResult::Continue(error)
204 } else {
205 RetryResult::Permanent(error)
206 };
207 }
208
209 match error.http_status_code() {
210 Some(code) if code == http::StatusCode::SERVICE_UNAVAILABLE.as_u16() => {
211 RetryResult::Continue(error)
212 }
213 _ => RetryResult::Permanent(error),
214 }
215 }
216}
217
218#[derive(Clone, Debug)]
239pub struct AlwaysContinue;
240
241impl PollingErrorPolicy for AlwaysContinue {
242 fn on_error(
243 &self,
244 _loop_start: std::time::Instant,
245 _attempt_count: u32,
246 error: Error,
247 ) -> RetryResult {
248 RetryResult::Continue(error)
249 }
250}
251
252#[derive(Debug)]
268pub struct LimitedElapsedTime<P = Aip194Strict>
269where
270 P: PollingErrorPolicy,
271{
272 inner: P,
273 maximum_duration: std::time::Duration,
274}
275
276impl LimitedElapsedTime {
277 pub fn new(maximum_duration: std::time::Duration) -> Self {
292 Self {
293 inner: Aip194Strict,
294 maximum_duration,
295 }
296 }
297}
298
299impl<P> LimitedElapsedTime<P>
300where
301 P: PollingErrorPolicy,
302{
303 pub fn custom(inner: P, maximum_duration: std::time::Duration) -> Self {
318 Self {
319 inner,
320 maximum_duration,
321 }
322 }
323
324 fn in_progress_impl(
325 &self,
326 start: std::time::Instant,
327 operation_name: &str,
328 ) -> Result<(), Error> {
329 let now = std::time::Instant::now();
330 if now < start + self.maximum_duration {
331 return Ok(());
332 }
333 Err(Error::exhausted(Exhausted::new(
334 operation_name,
335 "elapsed time",
336 format!("{:?}", now.checked_duration_since(start).unwrap()),
337 format!("{:?}", self.maximum_duration),
338 )))
339 }
340}
341
342impl<P> PollingErrorPolicy for LimitedElapsedTime<P>
343where
344 P: PollingErrorPolicy + 'static,
345{
346 fn on_error(&self, start: std::time::Instant, count: u32, error: Error) -> RetryResult {
347 match self.inner.on_error(start, count, error) {
348 RetryResult::Permanent(e) => RetryResult::Permanent(e),
349 RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
350 RetryResult::Continue(e) => {
351 if std::time::Instant::now() >= start + self.maximum_duration {
352 RetryResult::Exhausted(e)
353 } else {
354 RetryResult::Continue(e)
355 }
356 }
357 }
358 }
359
360 fn on_in_progress(
361 &self,
362 start: std::time::Instant,
363 count: u32,
364 operation_name: &str,
365 ) -> Result<(), Error> {
366 self.inner
367 .on_in_progress(start, count, operation_name)
368 .and_then(|_| self.in_progress_impl(start, operation_name))
369 }
370}
371
372#[derive(Debug)]
386pub struct LimitedAttemptCount<P = Aip194Strict>
387where
388 P: PollingErrorPolicy,
389{
390 inner: P,
391 maximum_attempts: u32,
392}
393
394impl LimitedAttemptCount {
395 pub fn new(maximum_attempts: u32) -> Self {
410 Self {
411 inner: Aip194Strict,
412 maximum_attempts,
413 }
414 }
415}
416
417impl<P> LimitedAttemptCount<P>
418where
419 P: PollingErrorPolicy,
420{
421 pub fn custom(inner: P, maximum_attempts: u32) -> Self {
436 Self {
437 inner,
438 maximum_attempts,
439 }
440 }
441
442 fn in_progress_impl(&self, count: u32, operation_name: &str) -> Result<(), Error> {
443 if count < self.maximum_attempts {
444 return Ok(());
445 }
446 Err(Error::exhausted(Exhausted::new(
447 operation_name,
448 "attempt count",
449 count.to_string(),
450 self.maximum_attempts.to_string(),
451 )))
452 }
453}
454
455impl<P> PollingErrorPolicy for LimitedAttemptCount<P>
456where
457 P: PollingErrorPolicy,
458{
459 fn on_error(&self, start: std::time::Instant, count: u32, error: Error) -> RetryResult {
460 match self.inner.on_error(start, count, error) {
461 RetryResult::Permanent(e) => RetryResult::Permanent(e),
462 RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
463 RetryResult::Continue(e) => {
464 if count >= self.maximum_attempts {
465 RetryResult::Exhausted(e)
466 } else {
467 RetryResult::Continue(e)
468 }
469 }
470 }
471 }
472
473 fn on_in_progress(
474 &self,
475 start: std::time::Instant,
476 count: u32,
477 operation_name: &str,
478 ) -> Result<(), Error> {
479 self.inner
480 .on_in_progress(start, count, operation_name)
481 .and_then(|_| self.in_progress_impl(count, operation_name))
482 }
483}
484
485#[derive(Debug)]
487pub struct Exhausted {
488 operation_name: String,
489 limit_name: &'static str,
490 value: String,
491 limit: String,
492}
493
494impl Exhausted {
495 pub fn new(
496 operation_name: &str,
497 limit_name: &'static str,
498 value: String,
499 limit: String,
500 ) -> Self {
501 Self {
502 operation_name: operation_name.to_string(),
503 limit_name,
504 value,
505 limit,
506 }
507 }
508}
509
510impl std::fmt::Display for Exhausted {
511 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
512 write!(
513 f,
514 "polling loop for {} exhausted, {} value ({}) exceeds limit ({})",
515 self.operation_name, self.limit_name, self.value, self.limit
516 )
517 }
518}
519
520impl std::error::Error for Exhausted {}
521
522#[cfg(test)]
523mod tests {
524 use super::*;
525 use crate::error::{CredentialsError, Error};
526 use http::HeaderMap;
527 use std::error::Error as _;
528 use std::time::{Duration, Instant};
529
530 mockall::mock! {
531 #[derive(Debug)]
532 Policy {}
533 impl PollingErrorPolicy for Policy {
534 fn on_error(&self, loop_start: std::time::Instant, attempt_count: u32, error: Error) -> RetryResult;
535 fn on_in_progress(&self, loop_start: std::time::Instant, attempt_count: u32, operation_name: &str) -> Result<(), Error>;
536 }
537 }
538
539 #[test]
541 fn polling_policy_arg() {
542 let policy = LimitedAttemptCount::new(3);
543 let _ = PollingErrorPolicyArg::from(policy);
544
545 let policy: Arc<dyn PollingErrorPolicy> = Arc::new(LimitedAttemptCount::new(3));
546 let _ = PollingErrorPolicyArg::from(policy);
547 }
548
549 #[test]
550 fn aip194_strict() {
551 let p = Aip194Strict;
552
553 let now = std::time::Instant::now();
554 assert!(p.on_in_progress(now, 0, "unused").is_ok());
555 assert!(p.on_error(now, 0, unavailable()).is_continue());
556 assert!(p.on_error(now, 0, permission_denied()).is_permanent());
557 assert!(p.on_error(now, 0, http_unavailable()).is_continue());
558 assert!(p.on_error(now, 0, http_permission_denied()).is_permanent());
559
560 assert!(
561 p.on_error(now, 0, Error::io("err".to_string()))
562 .is_continue()
563 );
564
565 assert!(
566 p.on_error(
567 now,
568 0,
569 Error::authentication(CredentialsError::from_msg(true, "err"))
570 )
571 .is_continue()
572 );
573
574 assert!(
575 p.on_error(now, 0, Error::ser("err".to_string()))
576 .is_permanent()
577 );
578 }
579
580 #[test]
581 fn always_continue() {
582 let p = AlwaysContinue;
583
584 let now = std::time::Instant::now();
585 assert!(p.on_in_progress(now, 0, "unused").is_ok());
586 assert!(p.on_error(now, 0, http_unavailable()).is_continue());
587 assert!(p.on_error(now, 0, unavailable()).is_continue());
588 }
589
590 #[test_case::test_case(Error::io("err"))]
591 #[test_case::test_case(Error::authentication(CredentialsError::from_msg(true, "err")))]
592 #[test_case::test_case(Error::ser("err"))]
593 fn always_continue_error_kind(error: Error) {
594 let p = AlwaysContinue;
595 let now = std::time::Instant::now();
596 assert!(p.on_error(now, 0, error).is_continue());
597 }
598
599 #[test]
600 fn with_time_limit() {
601 let policy = AlwaysContinue.with_time_limit(Duration::from_secs(10));
602 assert!(
603 policy
604 .on_error(
605 Instant::now() - Duration::from_secs(1),
606 1,
607 permission_denied()
608 )
609 .is_continue(),
610 "{policy:?}"
611 );
612 assert!(
613 policy
614 .on_error(
615 Instant::now() - Duration::from_secs(20),
616 1,
617 permission_denied()
618 )
619 .is_exhausted(),
620 "{policy:?}"
621 );
622 }
623
624 #[test]
625 fn with_attempt_limit() {
626 let policy = AlwaysContinue.with_attempt_limit(3);
627 assert!(
628 policy
629 .on_error(Instant::now(), 1, permission_denied())
630 .is_continue(),
631 "{policy:?}"
632 );
633 assert!(
634 policy
635 .on_error(Instant::now(), 5, permission_denied())
636 .is_exhausted(),
637 "{policy:?}"
638 );
639 }
640
641 fn http_error(code: u16, message: &str) -> Error {
642 let error = serde_json::json!({"error": {
643 "code": code,
644 "message": message,
645 }});
646 let payload = bytes::Bytes::from_owner(serde_json::to_string(&error).unwrap());
647 Error::http(code, HeaderMap::new(), payload)
648 }
649
650 fn http_unavailable() -> Error {
651 http_error(503, "SERVICE UNAVAILABLE")
652 }
653
654 fn http_permission_denied() -> Error {
655 http_error(403, "PERMISSION DENIED")
656 }
657
658 fn unavailable() -> Error {
659 use crate::error::rpc::Code;
660 let status = crate::error::rpc::Status::default()
661 .set_code(Code::Unavailable)
662 .set_message("UNAVAILABLE");
663 Error::service(status)
664 }
665
666 fn permission_denied() -> Error {
667 use crate::error::rpc::Code;
668 let status = crate::error::rpc::Status::default()
669 .set_code(Code::PermissionDenied)
670 .set_message("PERMISSION_DENIED");
671 Error::service(status)
672 }
673
674 #[test]
675 fn test_limited_elapsed_time_on_error() {
676 let policy = LimitedElapsedTime::new(Duration::from_secs(20));
677 assert!(
678 policy
679 .on_error(Instant::now() - Duration::from_secs(10), 1, unavailable())
680 .is_continue(),
681 "{policy:?}"
682 );
683 assert!(
684 policy
685 .on_error(Instant::now() - Duration::from_secs(30), 1, unavailable())
686 .is_exhausted(),
687 "{policy:?}"
688 );
689 }
690
691 #[test]
692 fn test_limited_elapsed_time_in_progress() {
693 let policy = LimitedElapsedTime::new(Duration::from_secs(20));
694 let result = policy.on_in_progress(Instant::now() - Duration::from_secs(10), 1, "unused");
695 assert!(result.is_ok(), "{result:?}");
696 let err = policy
697 .on_in_progress(
698 Instant::now() - Duration::from_secs(30),
699 1,
700 "test-operation-name",
701 )
702 .unwrap_err();
703 let exhausted = err.source().and_then(|e| e.downcast_ref::<Exhausted>());
704 assert!(exhausted.is_some());
705 }
706
707 #[test]
708 fn test_limited_time_forwards_on_error() {
709 let mut mock = MockPolicy::new();
710 mock.expect_on_error()
711 .times(1..)
712 .returning(|_, _, e| RetryResult::Continue(e));
713
714 let now = std::time::Instant::now();
715 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
716 let rf = policy.on_error(now, 0, transient_error());
717 assert!(rf.is_continue());
718 }
719
720 #[test]
721 fn test_limited_time_forwards_in_progress() {
722 let mut mock = MockPolicy::new();
723 mock.expect_on_in_progress()
724 .times(3)
725 .returning(|_, _, _| Ok(()));
726
727 let now = std::time::Instant::now();
728 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
729 assert!(policy.on_in_progress(now, 1, "test-op-name").is_ok());
730 assert!(policy.on_in_progress(now, 2, "test-op-name").is_ok());
731 assert!(policy.on_in_progress(now, 3, "test-op-name").is_ok());
732 }
733
734 #[test]
735 fn test_limited_time_in_progress_returns_inner() {
736 let mut mock = MockPolicy::new();
737 mock.expect_on_in_progress()
738 .times(1)
739 .returning(|_, _, _| Err(transient_error()));
740
741 let now = std::time::Instant::now();
742 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
743 assert!(policy.on_in_progress(now, 1, "test-op-name").is_err());
744 }
745
746 #[test]
747 fn test_limited_time_inner_continues() {
748 let mut mock = MockPolicy::new();
749 mock.expect_on_error()
750 .times(1..)
751 .returning(|_, _, e| RetryResult::Continue(e));
752
753 let now = std::time::Instant::now();
754 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
755 let rf = policy.on_error(now - Duration::from_secs(10), 1, transient_error());
756 assert!(rf.is_continue());
757
758 let rf = policy.on_error(now - Duration::from_secs(70), 1, transient_error());
759 assert!(rf.is_exhausted());
760 }
761
762 #[test]
763 fn test_limited_time_inner_permanent() {
764 let mut mock = MockPolicy::new();
765 mock.expect_on_error()
766 .times(2)
767 .returning(|_, _, e| RetryResult::Permanent(e));
768
769 let now = std::time::Instant::now();
770 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
771
772 let rf = policy.on_error(now - Duration::from_secs(10), 1, transient_error());
773 assert!(rf.is_permanent());
774
775 let rf = policy.on_error(now + Duration::from_secs(10), 1, transient_error());
776 assert!(rf.is_permanent());
777 }
778
779 #[test]
780 fn test_limited_time_inner_exhausted() {
781 let mut mock = MockPolicy::new();
782 mock.expect_on_error()
783 .times(2)
784 .returning(|_, _, e| RetryResult::Exhausted(e));
785
786 let now = std::time::Instant::now();
787 let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
788
789 let rf = policy.on_error(now - Duration::from_secs(10), 1, transient_error());
790 assert!(rf.is_exhausted());
791
792 let rf = policy.on_error(now + Duration::from_secs(10), 1, transient_error());
793 assert!(rf.is_exhausted());
794 }
795
796 #[test]
797 fn test_limited_attempt_count_on_error() {
798 let policy = LimitedAttemptCount::new(20);
799 assert!(
800 policy
801 .on_error(Instant::now(), 10, unavailable())
802 .is_continue(),
803 "{policy:?}"
804 );
805 assert!(
806 policy
807 .on_error(Instant::now(), 30, unavailable())
808 .is_exhausted(),
809 "{policy:?}"
810 );
811 }
812
813 #[test]
814 fn test_limited_attempt_count_in_progress() {
815 let policy = LimitedAttemptCount::new(20);
816 let result = policy.on_in_progress(Instant::now(), 10, "unused");
817 assert!(result.is_ok(), "{result:?}");
818 let err = policy
819 .on_in_progress(Instant::now(), 30, "test-operation-name")
820 .unwrap_err();
821 let exhausted = err.source().and_then(|e| e.downcast_ref::<Exhausted>());
822 assert!(exhausted.is_some());
823 }
824
825 #[test]
826 fn test_limited_attempt_count_forwards_on_error() {
827 let mut mock = MockPolicy::new();
828 mock.expect_on_error()
829 .times(1..)
830 .returning(|_, _, e| RetryResult::Continue(e));
831
832 let now = std::time::Instant::now();
833 let policy = LimitedAttemptCount::custom(mock, 3);
834 assert!(policy.on_error(now, 1, transient_error()).is_continue());
835 assert!(policy.on_error(now, 2, transient_error()).is_continue());
836 assert!(policy.on_error(now, 3, transient_error()).is_exhausted());
837 }
838
839 #[test]
840 fn test_limited_attempt_count_forwards_in_progress() {
841 let mut mock = MockPolicy::new();
842 mock.expect_on_in_progress()
843 .times(3)
844 .returning(|_, _, _| Ok(()));
845
846 let now = std::time::Instant::now();
847 let policy = LimitedAttemptCount::custom(mock, 5);
848 assert!(policy.on_in_progress(now, 1, "test-op-name").is_ok());
849 assert!(policy.on_in_progress(now, 2, "test-op-name").is_ok());
850 assert!(policy.on_in_progress(now, 3, "test-op-name").is_ok());
851 }
852
853 #[test]
854 fn test_limited_attempt_count_in_progress_returns_inner() {
855 let mut mock = MockPolicy::new();
856 mock.expect_on_in_progress()
857 .times(1)
858 .returning(|_, _, _| Err(unavailable()));
859
860 let now = std::time::Instant::now();
861 let policy = LimitedAttemptCount::custom(mock, 5);
862 assert!(policy.on_in_progress(now, 1, "test-op-name").is_err());
863 }
864
865 #[test]
866 fn test_limited_attempt_count_inner_permanent() {
867 let mut mock = MockPolicy::new();
868 mock.expect_on_error()
869 .times(2)
870 .returning(|_, _, e| RetryResult::Permanent(e));
871 let policy = LimitedAttemptCount::custom(mock, 2);
872 let now = std::time::Instant::now();
873 let rf = policy.on_error(now, 1, Error::ser("err"));
874 assert!(rf.is_permanent());
875
876 let rf = policy.on_error(now, 1, Error::ser("err"));
877 assert!(rf.is_permanent());
878 }
879
880 #[test]
881 fn test_limited_attempt_count_inner_exhausted() {
882 let mut mock = MockPolicy::new();
883 mock.expect_on_error()
884 .times(2)
885 .returning(|_, _, e| RetryResult::Exhausted(e));
886 let policy = LimitedAttemptCount::custom(mock, 2);
887 let now = std::time::Instant::now();
888
889 let rf = policy.on_error(now, 1, transient_error());
890 assert!(rf.is_exhausted());
891
892 let rf = policy.on_error(now, 1, transient_error());
893 assert!(rf.is_exhausted());
894 }
895
896 #[test]
897 fn test_exhausted_fmt() {
898 let exhausted = Exhausted::new(
899 "op-name",
900 "limit-name",
901 "test-value".to_string(),
902 "test-limit".to_string(),
903 );
904 let fmt = format!("{exhausted}");
905 assert!(fmt.contains("op-name"), "{fmt}");
906 assert!(fmt.contains("limit-name"), "{fmt}");
907 assert!(fmt.contains("test-value"), "{fmt}");
908 assert!(fmt.contains("test-limit"), "{fmt}");
909 }
910
911 fn transient_error() -> Error {
912 use crate::error::rpc::{Code, Status};
913 Error::service(
914 Status::default()
915 .set_code(Code::Unavailable)
916 .set_message("try-again"),
917 )
918 }
919}