google_cloud_gax/
retry_policy.rs

1// Copyright 2024 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines traits for retry policies and some common implementations.
16//!
17//! The client libraries automatically retry RPCs when (1) they fail due to
18//! transient errors **and** the RPC is [idempotent], (2) or failed before an
19//! RPC was started. That is, when it is safe to attempt the RPC more than once.
20//!
21//! Applications may override the default behavior, increasing the retry
22//! attempts, or changing what errors are considered safe to retry.
23//!
24//! This module defines the traits for retry policies and some common
25//! implementations.
26//!
27//! To configure the default throttler for a client, use
28//! [ClientBuilder::with_retry_policy]. To configure the retry policy used for
29//! a specific request, use [RequestOptionsBuilder::with_retry_policy].
30//!
31//! [ClientBuilder::with_retry_policy]: crate::client_builder::ClientBuilder::with_retry_policy
32//! [RequestOptionsBuilder::with_retry_policy]: crate::options::RequestOptionsBuilder::with_retry_policy
33//!
34//! # Examples
35//!
36//! Create a policy that only retries transient errors, and retries for at
37//! most 10 seconds or at most 5 attempts: whichever limit is reached first
38//! stops the retry loop.
39//! ```
40//! # use google_cloud_gax::retry_policy::*;
41//! use std::time::Duration;
42//! let policy = Aip194Strict.with_time_limit(Duration::from_secs(10)).with_attempt_limit(5);
43//! ```
44//!
45//! Create a policy that retries on any error (even when unsafe to do so),
46//! and stops retrying after 5 attempts or 10 seconds, whichever limit is
47//! reached first stops the retry loop.
48//! ```
49//! # use google_cloud_gax::retry_policy::*;
50//! use std::time::Duration;
51//! let policy = AlwaysRetry.with_time_limit(Duration::from_secs(10)).with_attempt_limit(5);
52//! ```
53//!
54//! [idempotent]: https://en.wikipedia.org/wiki/Idempotence
55
56use crate::error::Error;
57use crate::retry_result::RetryResult;
58use crate::retry_state::RetryState;
59use crate::throttle_result::ThrottleResult;
60use std::sync::Arc;
61use std::time::Duration;
62
63/// Determines how errors are handled in the retry loop.
64///
65/// Implementations of this trait determine if errors are retryable, and for how
66/// long the retry loop may continue.
67pub trait RetryPolicy: Send + Sync + std::fmt::Debug {
68    /// Query the retry policy after an error.
69    ///
70    /// # Parameters
71    /// * `state` - the state of the retry loop.
72    /// * `error` - the last error when attempting the request.
73    #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
74    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult;
75
76    /// Query the retry policy after a retry attempt is throttled.
77    ///
78    /// Retry attempts may be throttled before they are even sent out. The retry
79    /// policy may choose to treat these as normal errors, consuming attempts,
80    /// or may prefer to ignore them and always return [RetryResult::Continue].
81    ///
82    /// # Parameters
83    /// * `_state` - the state of the retry loop.
84    /// * `error` - the previous error that caused the retry attempt. Throttling
85    ///   only applies to retry attempts, and a retry attempt implies that a
86    ///   previous attempt failed. The retry policy should preserve this error.
87    #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
88    fn on_throttle(&self, _state: &RetryState, error: Error) -> ThrottleResult {
89        ThrottleResult::Continue(error)
90    }
91
92    /// The remaining time in the retry policy.
93    ///
94    /// For policies based on time, this returns the remaining time in the
95    /// policy. The retry loop can use this value to adjust the next RPC
96    /// timeout. For policies that are not time based this returns `None`.
97    ///
98    /// # Parameters
99    /// * `_state` - the state of the retry loop.
100    /// * `attempt_count` - the number of attempts. This method is called before
101    ///   the first attempt, so the first value is zero.
102    #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
103    fn remaining_time(&self, _state: &RetryState) -> Option<Duration> {
104        None
105    }
106}
107
108/// A helper type to use [RetryPolicy] in client and request options.
109#[derive(Clone, Debug)]
110pub struct RetryPolicyArg(Arc<dyn RetryPolicy>);
111
112impl<T> std::convert::From<T> for RetryPolicyArg
113where
114    T: RetryPolicy + 'static,
115{
116    fn from(value: T) -> Self {
117        Self(Arc::new(value))
118    }
119}
120
121impl std::convert::From<Arc<dyn RetryPolicy>> for RetryPolicyArg {
122    fn from(value: Arc<dyn RetryPolicy>) -> Self {
123        Self(value)
124    }
125}
126
127impl From<RetryPolicyArg> for Arc<dyn RetryPolicy> {
128    fn from(value: RetryPolicyArg) -> Arc<dyn RetryPolicy> {
129        value.0
130    }
131}
132
133/// Extension trait for [`RetryPolicy`]
134pub trait RetryPolicyExt: RetryPolicy + Sized {
135    /// Decorate a [RetryPolicy] to limit the total elapsed time in the retry loop.
136    ///
137    /// While the time spent in the retry loop (including time in backoff) is
138    /// less than the prescribed duration the `on_error()` method returns the
139    /// results of the inner policy. After that time it returns
140    /// [Exhausted][RetryResult::Exhausted] if the inner policy returns
141    /// [Continue][RetryResult::Continue].
142    ///
143    /// The `remaining_time()` function returns the remaining time. This is
144    /// always [Duration::ZERO] once or after the policy's expiration time is
145    /// reached.
146    ///
147    /// # Example
148    /// ```
149    /// # use google_cloud_gax::retry_policy::*;
150    /// # use google_cloud_gax::retry_state::RetryState;
151    /// let d = std::time::Duration::from_secs(10);
152    /// let policy = Aip194Strict.with_time_limit(d);
153    /// assert!(policy.remaining_time(&RetryState::new(true)) <= Some(d));
154    /// ```
155    fn with_time_limit(self, maximum_duration: Duration) -> LimitedElapsedTime<Self> {
156        LimitedElapsedTime::custom(self, maximum_duration)
157    }
158
159    /// Decorate a [RetryPolicy] to limit the number of retry attempts.
160    ///
161    /// This policy decorates an inner policy and limits the total number of
162    /// attempts. Note that `on_error()` is not called before the initial
163    /// (non-retry) attempt. Therefore, setting the maximum number of attempts
164    /// to 0 or 1 results in no retry attempts.
165    ///
166    /// The policy passes through the results from the inner policy as long as
167    /// `attempt_count < maximum_attempts`. Once the maximum number of attempts
168    /// is reached, the policy returns [Exhausted][RetryResult::Exhausted] if the
169    /// inner policy returns [Continue][RetryResult::Continue].
170    ///
171    /// # Example
172    /// ```
173    /// # use google_cloud_gax::retry_policy::*;
174    /// # use google_cloud_gax::retry_state::RetryState;
175    /// let policy = Aip194Strict.with_attempt_limit(3);
176    /// assert_eq!(policy.remaining_time(&RetryState::new(true)), None);
177    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(0_u32), transient_error()).is_continue());
178    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(1_u32), transient_error()).is_continue());
179    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(2_u32), transient_error()).is_continue());
180    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(3_u32), transient_error()).is_exhausted());
181    ///
182    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
183    /// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
184    /// ```
185    fn with_attempt_limit(self, maximum_attempts: u32) -> LimitedAttemptCount<Self> {
186        LimitedAttemptCount::custom(self, maximum_attempts)
187    }
188}
189
190impl<T: RetryPolicy> RetryPolicyExt for T {}
191
192/// A retry policy that strictly follows [AIP-194].
193///
194/// This policy must be decorated to limit the number of retry attempts or the
195/// duration of the retry loop.
196///
197/// The policy interprets AIP-194 **strictly**, the retry decision for
198/// server-side errors are based only on the status code, and the only retryable
199/// status code is "UNAVAILABLE".
200///
201/// # Example
202/// ```
203/// # use google_cloud_gax::retry_policy::*;
204/// # use google_cloud_gax::retry_state::RetryState;
205/// let policy = Aip194Strict;
206/// assert!(policy.on_error(&RetryState::new(true), transient_error()).is_continue());
207/// assert!(policy.on_error(&RetryState::new(true), permanent_error()).is_permanent());
208///
209/// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
210/// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
211/// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
212/// ```
213///
214/// [AIP-194]: https://google.aip.dev/194
215#[derive(Clone, Debug)]
216pub struct Aip194Strict;
217
218impl RetryPolicy for Aip194Strict {
219    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
220        if error.is_transient_and_before_rpc() {
221            return RetryResult::Continue(error);
222        }
223        if !state.idempotent {
224            return RetryResult::Permanent(error);
225        }
226        if error.is_io() {
227            return RetryResult::Continue(error);
228        }
229        if let Some(status) = error.status() {
230            return if status.code == crate::error::rpc::Code::Unavailable {
231                RetryResult::Continue(error)
232            } else {
233                RetryResult::Permanent(error)
234            };
235        }
236
237        match error.http_status_code() {
238            Some(code) if code == http::StatusCode::SERVICE_UNAVAILABLE.as_u16() => {
239                RetryResult::Continue(error)
240            }
241            _ => RetryResult::Permanent(error),
242        }
243    }
244}
245
246/// A retry policy that retries all errors.
247///
248/// This policy must be decorated to limit the number of retry attempts or the
249/// duration of the retry loop.
250///
251/// The policy retries all errors. This may be useful if the service guarantees
252/// idempotency, maybe through the use of request ids.
253///
254/// # Example
255/// ```
256/// # use google_cloud_gax::retry_policy::*;
257/// # use google_cloud_gax::retry_state::RetryState;
258/// let policy = AlwaysRetry;
259/// assert!(policy.on_error(&RetryState::new(true), transient_error()).is_continue());
260/// assert!(policy.on_error(&RetryState::new(true), permanent_error()).is_continue());
261///
262/// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
263/// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
264/// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
265/// ```
266#[derive(Clone, Debug)]
267pub struct AlwaysRetry;
268
269impl RetryPolicy for AlwaysRetry {
270    fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
271        RetryResult::Continue(error)
272    }
273}
274
275/// A retry policy that never retries.
276///
277/// This policy is useful when the client already has (or may already have) a
278/// retry policy configured, and you want to avoid retrying a particular method.
279///
280/// # Example
281/// ```
282/// # use google_cloud_gax::retry_policy::*;
283/// # use google_cloud_gax::retry_state::RetryState;
284/// let policy = NeverRetry;
285/// assert!(policy.on_error(&RetryState::new(true), transient_error()).is_exhausted());
286/// assert!(policy.on_error(&RetryState::new(true), permanent_error()).is_exhausted());
287///
288/// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
289/// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
290/// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
291/// ```
292#[derive(Clone, Debug)]
293pub struct NeverRetry;
294
295impl RetryPolicy for NeverRetry {
296    fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
297        RetryResult::Exhausted(error)
298    }
299}
300
301#[derive(thiserror::Error, Debug)]
302pub struct LimitedElapsedTimeError {
303    maximum_duration: Duration,
304    #[source]
305    source: Error,
306}
307
308impl LimitedElapsedTimeError {
309    pub(crate) fn new(maximum_duration: Duration, source: Error) -> Self {
310        Self {
311            maximum_duration,
312            source,
313        }
314    }
315
316    /// Returns the maximum number of attempts in the exhausted policy.
317    pub fn maximum_duration(&self) -> Duration {
318        self.maximum_duration
319    }
320}
321
322impl std::fmt::Display for LimitedElapsedTimeError {
323    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
324        write!(
325            f,
326            "retry policy is exhausted after {}s, the last retry attempt was throttled",
327            self.maximum_duration.as_secs_f64()
328        )
329    }
330}
331
332/// A retry policy decorator that limits the total time in the retry loop.
333///
334/// This policy decorates an inner policy and limits the duration of retry
335/// loops. While the time spent in the retry loop (including time in backoff)
336/// is less than the prescribed duration the `on_error()` method returns the
337/// results of the inner policy. After that time it returns
338/// [Exhausted][RetryResult::Exhausted] if the inner policy returns
339/// [Continue][RetryResult::Continue].
340///
341/// The `remaining_time()` function returns the remaining time. This is always
342/// [Duration::ZERO] once or after the policy's deadline is reached.
343///
344/// # Parameters
345/// * `P` - the inner retry policy, defaults to [Aip194Strict].
346#[derive(Debug)]
347pub struct LimitedElapsedTime<P = Aip194Strict>
348where
349    P: RetryPolicy,
350{
351    inner: P,
352    maximum_duration: Duration,
353}
354
355impl LimitedElapsedTime {
356    /// Creates a new instance, with the default inner policy.
357    ///
358    /// # Example
359    /// ```
360    /// # use google_cloud_gax::retry_policy::*;
361    /// # use google_cloud_gax::retry_state::RetryState;
362    /// let d = std::time::Duration::from_secs(10);
363    /// let policy = LimitedElapsedTime::new(d);
364    /// assert!(policy.remaining_time(&RetryState::new(true)) <= Some(d));
365    /// ```
366    pub fn new(maximum_duration: Duration) -> Self {
367        Self {
368            inner: Aip194Strict,
369            maximum_duration,
370        }
371    }
372}
373
374impl<P> LimitedElapsedTime<P>
375where
376    P: RetryPolicy,
377{
378    /// Creates a new instance with a custom inner policy.
379    ///
380    /// # Example
381    /// ```
382    /// # use google_cloud_gax::retry_policy::*;
383    /// # use google_cloud_gax::retry_state::RetryState;
384    /// # use google_cloud_gax::error;
385    /// use std::time::{Duration, Instant};
386    /// let d = Duration::from_secs(10);
387    /// let policy = AlwaysRetry.with_time_limit(d);
388    /// assert!(policy.remaining_time(&RetryState::new(false)) <= Some(d));
389    /// assert!(policy.on_error(&RetryState::new(false), permanent_error()).is_continue());
390    ///
391    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
392    /// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
393    /// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
394    /// ```
395    pub fn custom(inner: P, maximum_duration: Duration) -> Self {
396        Self {
397            inner,
398            maximum_duration,
399        }
400    }
401
402    fn error_if_exhausted(&self, state: &RetryState, error: Error) -> ThrottleResult {
403        let deadline = state.start + self.maximum_duration;
404        let now = tokio::time::Instant::now().into_std();
405        if now < deadline {
406            ThrottleResult::Continue(error)
407        } else {
408            ThrottleResult::Exhausted(Error::exhausted(LimitedElapsedTimeError::new(
409                self.maximum_duration,
410                error,
411            )))
412        }
413    }
414}
415
416impl<P> RetryPolicy for LimitedElapsedTime<P>
417where
418    P: RetryPolicy + 'static,
419{
420    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
421        match self.inner.on_error(state, error) {
422            RetryResult::Permanent(e) => RetryResult::Permanent(e),
423            RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
424            RetryResult::Continue(e) => {
425                if tokio::time::Instant::now().into_std() >= state.start + self.maximum_duration {
426                    RetryResult::Exhausted(e)
427                } else {
428                    RetryResult::Continue(e)
429                }
430            }
431        }
432    }
433
434    fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
435        match self.inner.on_throttle(state, error) {
436            ThrottleResult::Continue(e) => self.error_if_exhausted(state, e),
437            ThrottleResult::Exhausted(e) => ThrottleResult::Exhausted(e),
438        }
439    }
440
441    fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
442        let deadline = state.start + self.maximum_duration;
443        let remaining = deadline.saturating_duration_since(tokio::time::Instant::now().into_std());
444        if let Some(inner) = self.inner.remaining_time(state) {
445            return Some(std::cmp::min(remaining, inner));
446        }
447        Some(remaining)
448    }
449}
450
451/// A retry policy decorator that limits the number of attempts.
452///
453/// This policy decorates an inner policy and limits the total number of
454/// attempts. Note that `on_error()` is not called before the initial
455/// (non-retry) attempt. Therefore, setting the maximum number of attempts to 0
456/// or 1 results in no retry attempts.
457///
458/// The policy passes through the results from the inner policy as long as
459/// `attempt_count < maximum_attempts`. However, once the maximum number of
460/// attempts is reached, the policy replaces any [Continue][RetryResult::Continue]
461/// result with [Exhausted][RetryResult::Exhausted].
462///
463/// # Parameters
464/// * `P` - the inner retry policy.
465#[derive(Debug)]
466pub struct LimitedAttemptCount<P = Aip194Strict>
467where
468    P: RetryPolicy,
469{
470    inner: P,
471    maximum_attempts: u32,
472}
473
474impl LimitedAttemptCount {
475    /// Creates a new instance, with the default inner policy.
476    ///
477    /// # Example
478    /// ```
479    /// # use google_cloud_gax::retry_policy::*;
480    /// let policy = LimitedAttemptCount::new(5);
481    /// ```
482    pub fn new(maximum_attempts: u32) -> Self {
483        Self {
484            inner: Aip194Strict,
485            maximum_attempts,
486        }
487    }
488}
489
490impl<P> LimitedAttemptCount<P>
491where
492    P: RetryPolicy,
493{
494    /// Creates a new instance with a custom inner policy.
495    ///
496    /// # Example
497    /// ```
498    /// # use google_cloud_gax::retry_policy::*;
499    /// # use google_cloud_gax::retry_state::RetryState;
500    /// let policy = LimitedAttemptCount::custom(AlwaysRetry, 2);
501    /// assert!(policy.on_error(&RetryState::new(false).set_attempt_count(1_u32), permanent_error()).is_continue());
502    /// assert!(policy.on_error(&RetryState::new(false).set_attempt_count(2_u32), permanent_error()).is_exhausted());
503    ///
504    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
505    /// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
506    /// ```
507    pub fn custom(inner: P, maximum_attempts: u32) -> Self {
508        Self {
509            inner,
510            maximum_attempts,
511        }
512    }
513}
514
515impl<P> RetryPolicy for LimitedAttemptCount<P>
516where
517    P: RetryPolicy,
518{
519    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
520        match self.inner.on_error(state, error) {
521            RetryResult::Permanent(e) => RetryResult::Permanent(e),
522            RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
523            RetryResult::Continue(e) => {
524                if state.attempt_count >= self.maximum_attempts {
525                    RetryResult::Exhausted(e)
526                } else {
527                    RetryResult::Continue(e)
528                }
529            }
530        }
531    }
532
533    fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
534        // The retry loop only calls `on_throttle()` if the policy has not
535        // been exhausted.
536        assert!(state.attempt_count < self.maximum_attempts);
537        self.inner.on_throttle(state, error)
538    }
539
540    fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
541        self.inner.remaining_time(state)
542    }
543}
544
545#[cfg(test)]
546mod tests {
547    use super::*;
548    use http::HeaderMap;
549    use std::error::Error as StdError;
550    use std::time::Instant;
551
552    // Verify `RetryPolicyArg` can be converted from the desired types.
553    #[test]
554    fn retry_policy_arg() {
555        let policy = LimitedAttemptCount::new(3);
556        let _ = RetryPolicyArg::from(policy);
557
558        let policy: Arc<dyn RetryPolicy> = Arc::new(LimitedAttemptCount::new(3));
559        let _ = RetryPolicyArg::from(policy);
560    }
561
562    #[test]
563    fn aip194_strict() {
564        let p = Aip194Strict;
565
566        let now = Instant::now();
567        assert!(
568            p.on_error(&idempotent_state(now), unavailable())
569                .is_continue()
570        );
571        assert!(
572            p.on_error(&non_idempotent_state(now), unavailable())
573                .is_permanent()
574        );
575        assert!(matches!(
576            p.on_throttle(&idempotent_state(now), unavailable()),
577            ThrottleResult::Continue(_)
578        ));
579
580        assert!(
581            p.on_error(&idempotent_state(now), permission_denied())
582                .is_permanent()
583        );
584        assert!(
585            p.on_error(&non_idempotent_state(now), permission_denied())
586                .is_permanent()
587        );
588
589        assert!(
590            p.on_error(&idempotent_state(now), http_unavailable())
591                .is_continue()
592        );
593        assert!(
594            p.on_error(&non_idempotent_state(now), http_unavailable())
595                .is_permanent()
596        );
597        assert!(matches!(
598            p.on_throttle(&idempotent_state(now), http_unavailable()),
599            ThrottleResult::Continue(_)
600        ));
601
602        assert!(
603            p.on_error(&idempotent_state(now), http_permission_denied())
604                .is_permanent()
605        );
606        assert!(
607            p.on_error(&non_idempotent_state(now), http_permission_denied())
608                .is_permanent()
609        );
610
611        assert!(
612            p.on_error(&idempotent_state(now), Error::io("err".to_string()))
613                .is_continue()
614        );
615        assert!(
616            p.on_error(&non_idempotent_state(now), Error::io("err".to_string()))
617                .is_permanent()
618        );
619
620        assert!(
621            p.on_error(&idempotent_state(now), pre_rpc_transient())
622                .is_continue()
623        );
624        assert!(
625            p.on_error(&non_idempotent_state(now), pre_rpc_transient())
626                .is_continue()
627        );
628
629        assert!(
630            p.on_error(&idempotent_state(now), Error::ser("err"))
631                .is_permanent()
632        );
633        assert!(
634            p.on_error(&non_idempotent_state(now), Error::ser("err"))
635                .is_permanent()
636        );
637        assert!(
638            p.on_error(&idempotent_state(now), Error::deser("err"))
639                .is_permanent()
640        );
641        assert!(
642            p.on_error(&non_idempotent_state(now), Error::deser("err"))
643                .is_permanent()
644        );
645
646        assert!(p.remaining_time(&idempotent_state(now)).is_none());
647    }
648
649    #[test]
650    fn always_retry() {
651        let p = AlwaysRetry;
652
653        let now = Instant::now();
654        assert!(p.remaining_time(&idempotent_state(now)).is_none());
655        assert!(
656            p.on_error(&idempotent_state(now), http_unavailable())
657                .is_continue()
658        );
659        assert!(
660            p.on_error(&non_idempotent_state(now), http_unavailable())
661                .is_continue()
662        );
663        assert!(matches!(
664            p.on_throttle(&idempotent_state(now), http_unavailable()),
665            ThrottleResult::Continue(_)
666        ));
667
668        assert!(
669            p.on_error(&idempotent_state(now), unavailable())
670                .is_continue()
671        );
672        assert!(
673            p.on_error(&non_idempotent_state(now), unavailable())
674                .is_continue()
675        );
676    }
677
678    #[test_case::test_case(true, Error::io("err"))]
679    #[test_case::test_case(true, pre_rpc_transient())]
680    #[test_case::test_case(true, Error::ser("err"))]
681    #[test_case::test_case(false, Error::io("err"))]
682    #[test_case::test_case(false, pre_rpc_transient())]
683    #[test_case::test_case(false, Error::ser("err"))]
684    fn always_retry_error_kind(idempotent: bool, error: Error) {
685        let p = AlwaysRetry;
686        let now = Instant::now();
687        let state = if idempotent {
688            idempotent_state(now)
689        } else {
690            non_idempotent_state(now)
691        };
692        assert!(p.on_error(&state, error).is_continue());
693    }
694
695    #[test]
696    fn never_retry() {
697        let p = NeverRetry;
698
699        let now = Instant::now();
700        assert!(p.remaining_time(&idempotent_state(now)).is_none());
701        assert!(
702            p.on_error(&idempotent_state(now), http_unavailable())
703                .is_exhausted()
704        );
705        assert!(
706            p.on_error(&non_idempotent_state(now), http_unavailable())
707                .is_exhausted()
708        );
709        assert!(matches!(
710            p.on_throttle(&idempotent_state(now), http_unavailable()),
711            ThrottleResult::Continue(_)
712        ));
713
714        assert!(
715            p.on_error(&idempotent_state(now), unavailable())
716                .is_exhausted()
717        );
718        assert!(
719            p.on_error(&non_idempotent_state(now), unavailable())
720                .is_exhausted()
721        );
722
723        assert!(
724            p.on_error(&idempotent_state(now), http_permission_denied())
725                .is_exhausted()
726        );
727        assert!(
728            p.on_error(&non_idempotent_state(now), http_permission_denied())
729                .is_exhausted()
730        );
731    }
732
733    #[test_case::test_case(true, Error::io("err"))]
734    #[test_case::test_case(true, pre_rpc_transient())]
735    #[test_case::test_case(true, Error::ser("err"))]
736    #[test_case::test_case(false, Error::io("err"))]
737    #[test_case::test_case(false, pre_rpc_transient())]
738    #[test_case::test_case(false, Error::ser("err"))]
739    fn never_retry_error_kind(idempotent: bool, error: Error) {
740        let p = NeverRetry;
741        let now = Instant::now();
742        let state = if idempotent {
743            idempotent_state(now)
744        } else {
745            non_idempotent_state(now)
746        };
747        assert!(p.on_error(&state, error).is_exhausted());
748    }
749
750    fn pre_rpc_transient() -> Error {
751        use crate::error::CredentialsError;
752        Error::authentication(CredentialsError::from_msg(true, "err"))
753    }
754
755    fn http_unavailable() -> Error {
756        Error::http(
757            503_u16,
758            HeaderMap::new(),
759            bytes::Bytes::from_owner("SERVICE UNAVAILABLE".to_string()),
760        )
761    }
762
763    fn http_permission_denied() -> Error {
764        Error::http(
765            403_u16,
766            HeaderMap::new(),
767            bytes::Bytes::from_owner("PERMISSION DENIED".to_string()),
768        )
769    }
770
771    fn unavailable() -> Error {
772        use crate::error::rpc::Code;
773        let status = crate::error::rpc::Status::default()
774            .set_code(Code::Unavailable)
775            .set_message("UNAVAILABLE");
776        Error::service(status)
777    }
778
779    fn permission_denied() -> Error {
780        use crate::error::rpc::Code;
781        let status = crate::error::rpc::Status::default()
782            .set_code(Code::PermissionDenied)
783            .set_message("PERMISSION_DENIED");
784        Error::service(status)
785    }
786
787    mockall::mock! {
788        #[derive(Debug)]
789        Policy {}
790        impl RetryPolicy for Policy {
791            fn on_error(&self, state: &RetryState, error: Error) -> RetryResult;
792            fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult;
793            fn remaining_time(&self, state: &RetryState) -> Option<Duration>;
794        }
795    }
796
797    #[test]
798    fn limited_elapsed_time_error() {
799        let limit = Duration::from_secs(123) + Duration::from_millis(567);
800        let err = LimitedElapsedTimeError::new(limit, unavailable());
801        assert_eq!(err.maximum_duration(), limit);
802        let fmt = err.to_string();
803        assert!(fmt.contains("123.567s"), "display={fmt}, debug={err:?}");
804        assert!(err.source().is_some(), "{err:?}");
805    }
806
807    #[test]
808    fn test_limited_time_forwards() {
809        let mut mock = MockPolicy::new();
810        mock.expect_on_error()
811            .times(1..)
812            .returning(|_, e| RetryResult::Continue(e));
813        mock.expect_on_throttle()
814            .times(1..)
815            .returning(|_, e| ThrottleResult::Continue(e));
816        mock.expect_remaining_time().times(1).returning(|_| None);
817
818        let now = Instant::now();
819        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
820        let rf = policy.on_error(&idempotent_state(now), transient_error());
821        assert!(rf.is_continue());
822
823        let rt = policy.remaining_time(&idempotent_state(now));
824        assert!(rt.is_some());
825
826        let e = policy.on_throttle(&idempotent_state(now), transient_error());
827        assert!(matches!(e, ThrottleResult::Continue(_)));
828    }
829
830    #[test]
831    fn test_limited_time_on_throttle_continue() {
832        let mut mock = MockPolicy::new();
833        mock.expect_on_throttle()
834            .times(1..)
835            .returning(|_, e| ThrottleResult::Continue(e));
836
837        let now = Instant::now();
838        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
839
840        // Before the policy expires the inner result is returned verbatim.
841        let rf = policy.on_throttle(
842            &idempotent_state(now - Duration::from_secs(50)),
843            unavailable(),
844        );
845        assert!(matches!(rf, ThrottleResult::Continue(_)), "{rf:?}");
846
847        // After the policy expires the innter result is always "exhausted".
848        let rf = policy.on_throttle(
849            &idempotent_state(now - Duration::from_secs(70)),
850            unavailable(),
851        );
852        assert!(matches!(rf, ThrottleResult::Exhausted(_)), "{rf:?}");
853    }
854
855    #[test]
856    fn test_limited_time_on_throttle_exhausted() {
857        let mut mock = MockPolicy::new();
858        mock.expect_on_throttle()
859            .times(1..)
860            .returning(|_, e| ThrottleResult::Exhausted(e));
861
862        let now = Instant::now();
863        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
864
865        // Before the policy expires the inner result is returned verbatim.
866        let rf = policy.on_throttle(
867            &idempotent_state(now - Duration::from_secs(50)),
868            unavailable(),
869        );
870        assert!(matches!(rf, ThrottleResult::Exhausted(_)), "{rf:?}");
871    }
872
873    #[test]
874    fn test_limited_time_inner_continues() {
875        let mut mock = MockPolicy::new();
876        mock.expect_on_error()
877            .times(1..)
878            .returning(|_, e| RetryResult::Continue(e));
879
880        let now = Instant::now();
881        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
882        let rf = policy.on_error(
883            &idempotent_state(now - Duration::from_secs(10)),
884            transient_error(),
885        );
886        assert!(rf.is_continue());
887
888        let rf = policy.on_error(
889            &idempotent_state(now - Duration::from_secs(70)),
890            transient_error(),
891        );
892        assert!(rf.is_exhausted());
893    }
894
895    #[test]
896    fn test_limited_time_inner_permanent() {
897        let mut mock = MockPolicy::new();
898        mock.expect_on_error()
899            .times(2)
900            .returning(|_, e| RetryResult::Permanent(e));
901
902        let now = Instant::now();
903        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
904
905        let rf = policy.on_error(
906            &non_idempotent_state(now - Duration::from_secs(10)),
907            transient_error(),
908        );
909        assert!(rf.is_permanent());
910
911        let rf = policy.on_error(
912            &non_idempotent_state(now + Duration::from_secs(10)),
913            transient_error(),
914        );
915        assert!(rf.is_permanent());
916    }
917
918    #[test]
919    fn test_limited_time_inner_exhausted() {
920        let mut mock = MockPolicy::new();
921        mock.expect_on_error()
922            .times(2)
923            .returning(|_, e| RetryResult::Exhausted(e));
924
925        let now = Instant::now();
926        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
927
928        let rf = policy.on_error(
929            &non_idempotent_state(now - Duration::from_secs(10)),
930            transient_error(),
931        );
932        assert!(rf.is_exhausted());
933
934        let rf = policy.on_error(
935            &non_idempotent_state(now + Duration::from_secs(10)),
936            transient_error(),
937        );
938        assert!(rf.is_exhausted());
939    }
940
941    #[test]
942    fn test_limited_time_remaining_inner_longer() {
943        let mut mock = MockPolicy::new();
944        mock.expect_remaining_time()
945            .times(1)
946            .returning(|_| Some(Duration::from_secs(30)));
947
948        let now = Instant::now();
949        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
950
951        let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(55)));
952        assert!(remaining <= Some(Duration::from_secs(5)), "{remaining:?}");
953    }
954
955    #[test]
956    fn test_limited_time_remaining_inner_shorter() {
957        let mut mock = MockPolicy::new();
958        mock.expect_remaining_time()
959            .times(1)
960            .returning(|_| Some(Duration::from_secs(5)));
961        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
962
963        let now = Instant::now();
964        let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(5)));
965        assert!(remaining <= Some(Duration::from_secs(10)), "{remaining:?}");
966    }
967
968    #[test]
969    fn test_limited_time_remaining_inner_is_none() {
970        let mut mock = MockPolicy::new();
971        mock.expect_remaining_time().times(1).returning(|_| None);
972        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
973
974        let now = Instant::now();
975        let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(50)));
976        assert!(remaining <= Some(Duration::from_secs(10)), "{remaining:?}");
977    }
978
979    #[test]
980    fn test_limited_attempt_count_on_error() {
981        let mut mock = MockPolicy::new();
982        mock.expect_on_error()
983            .times(1..)
984            .returning(|_, e| RetryResult::Continue(e));
985
986        let now = Instant::now();
987        let policy = LimitedAttemptCount::custom(mock, 3);
988        assert!(
989            policy
990                .on_error(
991                    &idempotent_state(now).set_attempt_count(1_u32),
992                    transient_error()
993                )
994                .is_continue()
995        );
996        assert!(
997            policy
998                .on_error(
999                    &idempotent_state(now).set_attempt_count(2_u32),
1000                    transient_error()
1001                )
1002                .is_continue()
1003        );
1004        assert!(
1005            policy
1006                .on_error(
1007                    &idempotent_state(now).set_attempt_count(3_u32),
1008                    transient_error()
1009                )
1010                .is_exhausted()
1011        );
1012    }
1013
1014    #[test]
1015    fn test_limited_attempt_count_on_throttle_continue() {
1016        let mut mock = MockPolicy::new();
1017        mock.expect_on_throttle()
1018            .times(1..)
1019            .returning(|_, e| ThrottleResult::Continue(e));
1020
1021        let now = Instant::now();
1022        let policy = LimitedAttemptCount::custom(mock, 3);
1023        assert!(matches!(
1024            policy.on_throttle(
1025                &idempotent_state(now).set_attempt_count(2_u32),
1026                unavailable()
1027            ),
1028            ThrottleResult::Continue(_)
1029        ));
1030    }
1031
1032    #[test]
1033    fn test_limited_attempt_count_on_throttle_error() {
1034        let mut mock = MockPolicy::new();
1035        mock.expect_on_throttle()
1036            .times(1..)
1037            .returning(|_, e| ThrottleResult::Exhausted(e));
1038
1039        let now = Instant::now();
1040        let policy = LimitedAttemptCount::custom(mock, 3);
1041        assert!(matches!(
1042            policy.on_throttle(&idempotent_state(now), unavailable()),
1043            ThrottleResult::Exhausted(_)
1044        ));
1045    }
1046
1047    #[test]
1048    fn test_limited_attempt_count_remaining_none() {
1049        let mut mock = MockPolicy::new();
1050        mock.expect_remaining_time().times(1).returning(|_| None);
1051        let policy = LimitedAttemptCount::custom(mock, 3);
1052
1053        let now = Instant::now();
1054        assert!(policy.remaining_time(&idempotent_state(now)).is_none());
1055    }
1056
1057    #[test]
1058    fn test_limited_attempt_count_remaining_some() {
1059        let mut mock = MockPolicy::new();
1060        mock.expect_remaining_time()
1061            .times(1)
1062            .returning(|_| Some(Duration::from_secs(123)));
1063        let policy = LimitedAttemptCount::custom(mock, 3);
1064
1065        let now = Instant::now();
1066        assert_eq!(
1067            policy.remaining_time(&idempotent_state(now)),
1068            Some(Duration::from_secs(123))
1069        );
1070    }
1071
1072    #[test]
1073    fn test_limited_attempt_count_inner_permanent() {
1074        let mut mock = MockPolicy::new();
1075        mock.expect_on_error()
1076            .times(2)
1077            .returning(|_, e| RetryResult::Permanent(e));
1078        let policy = LimitedAttemptCount::custom(mock, 2);
1079        let now = Instant::now();
1080
1081        let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1082        assert!(rf.is_permanent());
1083
1084        let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1085        assert!(rf.is_permanent());
1086    }
1087
1088    #[test]
1089    fn test_limited_attempt_count_inner_exhausted() {
1090        let mut mock = MockPolicy::new();
1091        mock.expect_on_error()
1092            .times(2)
1093            .returning(|_, e| RetryResult::Exhausted(e));
1094        let policy = LimitedAttemptCount::custom(mock, 2);
1095        let now = Instant::now();
1096
1097        let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1098        assert!(rf.is_exhausted());
1099
1100        let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1101        assert!(rf.is_exhausted());
1102    }
1103
1104    fn transient_error() -> Error {
1105        use crate::error::rpc::{Code, Status};
1106        Error::service(
1107            Status::default()
1108                .set_code(Code::Unavailable)
1109                .set_message("try-again"),
1110        )
1111    }
1112
1113    fn idempotent_state(now: Instant) -> RetryState {
1114        RetryState::new(true).set_start(now)
1115    }
1116
1117    fn non_idempotent_state(now: Instant) -> RetryState {
1118        RetryState::new(false).set_start(now)
1119    }
1120}