Skip to main content

google_cloud_gax/
retry_policy.rs

1// Copyright 2024 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines traits for retry policies and some common implementations.
16//!
17//! The client libraries automatically retry RPCs when (1) they fail due to
18//! transient errors **and** the RPC is [idempotent], (2) or failed before an
19//! RPC was started. That is, when it is safe to attempt the RPC more than once.
20//!
21//! Applications may override the default behavior, increasing the retry
22//! attempts, or changing what errors are considered safe to retry.
23//!
24//! This module defines the traits for retry policies and some common
25//! implementations.
26//!
27//! To configure the default throttler for a client, use
28//! [ClientBuilder::with_retry_policy]. To configure the retry policy used for
29//! a specific request, use [RequestOptionsBuilder::with_retry_policy].
30//!
31//! [ClientBuilder::with_retry_policy]: crate::client_builder::ClientBuilder::with_retry_policy
32//! [RequestOptionsBuilder::with_retry_policy]: crate::options::RequestOptionsBuilder::with_retry_policy
33//!
34//! # Examples
35//!
36//! Create a policy that only retries transient errors, and retries for at
37//! most 10 seconds or at most 5 attempts: whichever limit is reached first
38//! stops the retry loop.
39//! ```
40//! # use google_cloud_gax::retry_policy::*;
41//! use std::time::Duration;
42//! let policy = Aip194Strict.with_time_limit(Duration::from_secs(10)).with_attempt_limit(5);
43//! ```
44//!
45//! Create a policy that retries on any error (even when unsafe to do so),
46//! and stops retrying after 5 attempts or 10 seconds, whichever limit is
47//! reached first stops the retry loop.
48//! ```
49//! # use google_cloud_gax::retry_policy::*;
50//! use std::time::Duration;
51//! let policy = AlwaysRetry.with_time_limit(Duration::from_secs(10)).with_attempt_limit(5);
52//! ```
53//!
54//! [idempotent]: https://en.wikipedia.org/wiki/Idempotence
55
56mod too_many_requests;
57
58use crate::error::Error;
59use crate::retry_result::RetryResult;
60use crate::retry_state::RetryState;
61use crate::throttle_result::ThrottleResult;
62use std::sync::Arc;
63use std::time::Duration;
64
65pub use too_many_requests::TooManyRequests;
66
67/// Determines how errors are handled in the retry loop.
68///
69/// Implementations of this trait determine if errors are retryable, and for how
70/// long the retry loop may continue.
71pub trait RetryPolicy: Send + Sync + std::fmt::Debug {
72    /// Query the retry policy after an error.
73    ///
74    /// # Parameters
75    /// * `state` - the state of the retry loop.
76    /// * `error` - the last error when attempting the request.
77    #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
78    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult;
79
80    /// Query the retry policy after a retry attempt is throttled.
81    ///
82    /// Retry attempts may be throttled before they are even sent out. The retry
83    /// policy may choose to treat these as normal errors, consuming attempts,
84    /// or may prefer to ignore them and always return [RetryResult::Continue].
85    ///
86    /// # Parameters
87    /// * `_state` - the state of the retry loop.
88    /// * `error` - the previous error that caused the retry attempt. Throttling
89    ///   only applies to retry attempts, and a retry attempt implies that a
90    ///   previous attempt failed. The retry policy should preserve this error.
91    #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
92    fn on_throttle(&self, _state: &RetryState, error: Error) -> ThrottleResult {
93        ThrottleResult::Continue(error)
94    }
95
96    /// The remaining time in the retry policy.
97    ///
98    /// For policies based on time, this returns the remaining time in the
99    /// policy. The retry loop can use this value to adjust the next RPC
100    /// timeout. For policies that are not time based this returns `None`.
101    ///
102    /// # Parameters
103    /// * `_state` - the state of the retry loop.
104    /// * `attempt_count` - the number of attempts. This method is called before
105    ///   the first attempt, so the first value is zero.
106    #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
107    fn remaining_time(&self, _state: &RetryState) -> Option<Duration> {
108        None
109    }
110}
111
112/// A helper type to use [RetryPolicy] in client and request options.
113#[derive(Clone, Debug)]
114pub struct RetryPolicyArg(Arc<dyn RetryPolicy>);
115
116impl<T> std::convert::From<T> for RetryPolicyArg
117where
118    T: RetryPolicy + 'static,
119{
120    fn from(value: T) -> Self {
121        Self(Arc::new(value))
122    }
123}
124
125impl std::convert::From<Arc<dyn RetryPolicy>> for RetryPolicyArg {
126    fn from(value: Arc<dyn RetryPolicy>) -> Self {
127        Self(value)
128    }
129}
130
131impl From<RetryPolicyArg> for Arc<dyn RetryPolicy> {
132    fn from(value: RetryPolicyArg) -> Arc<dyn RetryPolicy> {
133        value.0
134    }
135}
136
137/// Extension trait for [`RetryPolicy`]
138pub trait RetryPolicyExt: RetryPolicy + Sized {
139    /// Decorate a [RetryPolicy] to limit the total elapsed time in the retry loop.
140    ///
141    /// While the time spent in the retry loop (including time in backoff) is
142    /// less than the prescribed duration the `on_error()` method returns the
143    /// results of the inner policy. After that time it returns
144    /// [Exhausted][RetryResult::Exhausted] if the inner policy returns
145    /// [Continue][RetryResult::Continue].
146    ///
147    /// The `remaining_time()` function returns the remaining time. This is
148    /// always [Duration::ZERO] once or after the policy's expiration time is
149    /// reached.
150    ///
151    /// # Example
152    /// ```
153    /// # use google_cloud_gax::retry_policy::*;
154    /// # use google_cloud_gax::retry_state::RetryState;
155    /// let d = std::time::Duration::from_secs(10);
156    /// let policy = Aip194Strict.with_time_limit(d);
157    /// assert!(policy.remaining_time(&RetryState::new(true)) <= Some(d));
158    /// ```
159    fn with_time_limit(self, maximum_duration: Duration) -> LimitedElapsedTime<Self> {
160        LimitedElapsedTime::custom(self, maximum_duration)
161    }
162
163    /// Decorate a [RetryPolicy] to limit the number of retry attempts.
164    ///
165    /// This policy decorates an inner policy and limits the total number of
166    /// attempts. Note that `on_error()` is not called before the initial
167    /// (non-retry) attempt. Therefore, setting the maximum number of attempts
168    /// to 0 or 1 results in no retry attempts.
169    ///
170    /// The policy passes through the results from the inner policy as long as
171    /// `attempt_count < maximum_attempts`. Once the maximum number of attempts
172    /// is reached, the policy returns [Exhausted][RetryResult::Exhausted] if the
173    /// inner policy returns [Continue][RetryResult::Continue].
174    ///
175    /// # Example
176    /// ```
177    /// # use google_cloud_gax::retry_policy::*;
178    /// # use google_cloud_gax::retry_state::RetryState;
179    /// let policy = Aip194Strict.with_attempt_limit(3);
180    /// assert_eq!(policy.remaining_time(&RetryState::new(true)), None);
181    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(0_u32), transient_error()).is_continue());
182    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(1_u32), transient_error()).is_continue());
183    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(2_u32), transient_error()).is_continue());
184    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(3_u32), transient_error()).is_exhausted());
185    ///
186    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
187    /// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
188    /// ```
189    fn with_attempt_limit(self, maximum_attempts: u32) -> LimitedAttemptCount<Self> {
190        LimitedAttemptCount::custom(self, maximum_attempts)
191    }
192
193    /// Decorate a [RetryPolicy] to continue on certain status codes.
194    ///
195    /// This policy decorates an inner policy and retries any errors with HTTP
196    /// status code "429 - TOO_MANY_REQUESTS" **or** where the service returns
197    /// an error with code [ResourceExhausted].
198    ///
199    /// For other errors it returns the same value as the inner policy.
200    ///
201    /// Note that [ResourceExhausted] is ambiguous and may cause problems with
202    /// some services. The code is used for both "too many requests"  and for
203    /// "quota exceeded" problems. If the quota in question is some kind of rate
204    /// limit, then using this policy may be helpful. If the quota is not a rate
205    /// limit, then this retry policy may needlessly send the same RPC multiple
206    /// times.
207    ///
208    /// You should consult the documentation for the service and RPC in question
209    /// before using this policy.
210    ///
211    /// # Example
212    /// ```
213    /// use google_cloud_gax::retry_policy::{Aip194Strict, RetryPolicy, RetryPolicyExt};
214    /// use google_cloud_gax::retry_state::RetryState;
215    /// let policy = Aip194Strict;
216    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(0_u32), too_many_requests()).is_permanent());
217    /// let policy = Aip194Strict.continue_on_too_many_requests();
218    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(0_u32), too_many_requests()).is_continue());
219    ///
220    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
221    /// fn too_many_requests() -> Error { Error::service(Status::default().set_code(Code::ResourceExhausted)) }
222    /// ```
223    ///
224    /// [ResourceExhausted]: crate::error::rpc::Code::ResourceExhausted
225    fn continue_on_too_many_requests(self) -> TooManyRequests<Self> {
226        TooManyRequests::new(self)
227    }
228}
229
230impl<T: RetryPolicy> RetryPolicyExt for T {}
231
232/// A retry policy that strictly follows [AIP-194].
233///
234/// This policy must be decorated to limit the number of retry attempts or the
235/// duration of the retry loop.
236///
237/// The policy interprets AIP-194 **strictly**, the retry decision for
238/// server-side errors are based only on the status code, and the only retryable
239/// status code is "UNAVAILABLE".
240///
241/// # Example
242/// ```
243/// # use google_cloud_gax::retry_policy::*;
244/// # use google_cloud_gax::retry_state::RetryState;
245/// let policy = Aip194Strict;
246/// assert!(policy.on_error(&RetryState::new(true), transient_error()).is_continue());
247/// assert!(policy.on_error(&RetryState::new(true), permanent_error()).is_permanent());
248///
249/// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
250/// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
251/// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
252/// ```
253///
254/// [AIP-194]: https://google.aip.dev/194
255#[derive(Clone, Debug)]
256pub struct Aip194Strict;
257
258impl RetryPolicy for Aip194Strict {
259    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
260        use crate::error::rpc::Code;
261        use http::StatusCode;
262
263        if error.is_transient_and_before_rpc() {
264            return RetryResult::Continue(error);
265        }
266        if !state.idempotent {
267            return RetryResult::Permanent(error);
268        }
269        if error.is_io() {
270            return RetryResult::Continue(error);
271        }
272        if error.status().is_some_and(|s| s.code == Code::Unavailable) {
273            return RetryResult::Continue(error);
274        }
275        // Some services return a status of "Unknown" and a http status code of 503
276        // (SERVICE_UNAVAILABLE). That is not how gRPC status codes are supposed to work, but the
277        // intent is clear: we need to retry.
278        if error
279            .http_status_code()
280            .is_some_and(|code| code == StatusCode::SERVICE_UNAVAILABLE.as_u16())
281        {
282            return RetryResult::Continue(error);
283        }
284        RetryResult::Permanent(error)
285    }
286}
287
288/// A retry policy that retries all errors.
289///
290/// This policy must be decorated to limit the number of retry attempts or the
291/// duration of the retry loop.
292///
293/// The policy retries all errors. This may be useful if the service guarantees
294/// idempotency, maybe through the use of request ids.
295///
296/// # Example
297/// ```
298/// # use google_cloud_gax::retry_policy::*;
299/// # use google_cloud_gax::retry_state::RetryState;
300/// let policy = AlwaysRetry;
301/// assert!(policy.on_error(&RetryState::new(true), transient_error()).is_continue());
302/// assert!(policy.on_error(&RetryState::new(true), permanent_error()).is_continue());
303///
304/// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
305/// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
306/// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
307/// ```
308#[derive(Clone, Debug)]
309pub struct AlwaysRetry;
310
311impl RetryPolicy for AlwaysRetry {
312    fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
313        RetryResult::Continue(error)
314    }
315}
316
317/// A retry policy that never retries.
318///
319/// This policy is useful when the client already has (or may already have) a
320/// retry policy configured, and you want to avoid retrying a particular method.
321///
322/// # Example
323/// ```
324/// # use google_cloud_gax::retry_policy::*;
325/// # use google_cloud_gax::retry_state::RetryState;
326/// let policy = NeverRetry;
327/// assert!(policy.on_error(&RetryState::new(true), transient_error()).is_exhausted());
328/// assert!(policy.on_error(&RetryState::new(true), permanent_error()).is_exhausted());
329///
330/// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
331/// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
332/// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
333/// ```
334#[derive(Clone, Debug)]
335pub struct NeverRetry;
336
337impl RetryPolicy for NeverRetry {
338    fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
339        RetryResult::Exhausted(error)
340    }
341}
342
343/// Error indicating that the maximum elapsed time for retries has been exceeded.
344#[derive(thiserror::Error, Debug)]
345pub struct LimitedElapsedTimeError {
346    maximum_duration: Duration,
347    #[source]
348    source: Error,
349}
350
351impl LimitedElapsedTimeError {
352    pub(crate) fn new(maximum_duration: Duration, source: Error) -> Self {
353        Self {
354            maximum_duration,
355            source,
356        }
357    }
358
359    /// Returns the maximum number of attempts in the exhausted policy.
360    pub fn maximum_duration(&self) -> Duration {
361        self.maximum_duration
362    }
363}
364
365impl std::fmt::Display for LimitedElapsedTimeError {
366    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
367        write!(
368            f,
369            "retry policy is exhausted after {}s, the last retry attempt was throttled",
370            self.maximum_duration.as_secs_f64()
371        )
372    }
373}
374
375/// A retry policy decorator that limits the total time in the retry loop.
376///
377/// This policy decorates an inner policy and limits the duration of retry
378/// loops. While the time spent in the retry loop (including time in backoff)
379/// is less than the prescribed duration the `on_error()` method returns the
380/// results of the inner policy. After that time it returns
381/// [Exhausted][RetryResult::Exhausted] if the inner policy returns
382/// [Continue][RetryResult::Continue].
383///
384/// The `remaining_time()` function returns the remaining time. This is always
385/// [Duration::ZERO] once or after the policy's deadline is reached.
386///
387/// # Parameters
388/// * `P` - the inner retry policy, defaults to [Aip194Strict].
389#[derive(Debug)]
390pub struct LimitedElapsedTime<P = Aip194Strict>
391where
392    P: RetryPolicy,
393{
394    inner: P,
395    maximum_duration: Duration,
396}
397
398impl LimitedElapsedTime {
399    /// Creates a new instance, with the default inner policy.
400    ///
401    /// # Example
402    /// ```
403    /// # use google_cloud_gax::retry_policy::*;
404    /// # use google_cloud_gax::retry_state::RetryState;
405    /// let d = std::time::Duration::from_secs(10);
406    /// let policy = LimitedElapsedTime::new(d);
407    /// assert!(policy.remaining_time(&RetryState::new(true)) <= Some(d));
408    /// ```
409    pub fn new(maximum_duration: Duration) -> Self {
410        Self {
411            inner: Aip194Strict,
412            maximum_duration,
413        }
414    }
415}
416
417impl<P> LimitedElapsedTime<P>
418where
419    P: RetryPolicy,
420{
421    /// Creates a new instance with a custom inner policy.
422    ///
423    /// # Example
424    /// ```
425    /// # use google_cloud_gax::retry_policy::*;
426    /// # use google_cloud_gax::retry_state::RetryState;
427    /// # use google_cloud_gax::error;
428    /// use std::time::{Duration, Instant};
429    /// let d = Duration::from_secs(10);
430    /// let policy = AlwaysRetry.with_time_limit(d);
431    /// assert!(policy.remaining_time(&RetryState::new(false)) <= Some(d));
432    /// assert!(policy.on_error(&RetryState::new(false), permanent_error()).is_continue());
433    ///
434    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
435    /// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
436    /// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
437    /// ```
438    pub fn custom(inner: P, maximum_duration: Duration) -> Self {
439        Self {
440            inner,
441            maximum_duration,
442        }
443    }
444
445    fn error_if_exhausted(&self, state: &RetryState, error: Error) -> ThrottleResult {
446        let deadline = state.start + self.maximum_duration;
447        let now = tokio::time::Instant::now().into_std();
448        if now < deadline {
449            ThrottleResult::Continue(error)
450        } else {
451            ThrottleResult::Exhausted(Error::exhausted(LimitedElapsedTimeError::new(
452                self.maximum_duration,
453                error,
454            )))
455        }
456    }
457}
458
459impl<P> RetryPolicy for LimitedElapsedTime<P>
460where
461    P: RetryPolicy + 'static,
462{
463    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
464        match self.inner.on_error(state, error) {
465            RetryResult::Permanent(e) => RetryResult::Permanent(e),
466            RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
467            RetryResult::Continue(e) => {
468                if tokio::time::Instant::now().into_std() >= state.start + self.maximum_duration {
469                    RetryResult::Exhausted(e)
470                } else {
471                    RetryResult::Continue(e)
472                }
473            }
474        }
475    }
476
477    fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
478        match self.inner.on_throttle(state, error) {
479            ThrottleResult::Continue(e) => self.error_if_exhausted(state, e),
480            ThrottleResult::Exhausted(e) => ThrottleResult::Exhausted(e),
481        }
482    }
483
484    fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
485        let deadline = state.start + self.maximum_duration;
486        let remaining = deadline.saturating_duration_since(tokio::time::Instant::now().into_std());
487        if let Some(inner) = self.inner.remaining_time(state) {
488            return Some(std::cmp::min(remaining, inner));
489        }
490        Some(remaining)
491    }
492}
493
494/// A retry policy decorator that limits the number of attempts.
495///
496/// This policy decorates an inner policy and limits the total number of
497/// attempts. Note that `on_error()` is not called before the initial
498/// (non-retry) attempt. Therefore, setting the maximum number of attempts to 0
499/// or 1 results in no retry attempts.
500///
501/// The policy passes through the results from the inner policy as long as
502/// `attempt_count < maximum_attempts`. However, once the maximum number of
503/// attempts is reached, the policy replaces any [Continue][RetryResult::Continue]
504/// result with [Exhausted][RetryResult::Exhausted].
505///
506/// # Parameters
507/// * `P` - the inner retry policy.
508#[derive(Debug)]
509pub struct LimitedAttemptCount<P = Aip194Strict>
510where
511    P: RetryPolicy,
512{
513    inner: P,
514    maximum_attempts: u32,
515}
516
517impl LimitedAttemptCount {
518    /// Creates a new instance, with the default inner policy.
519    ///
520    /// # Example
521    /// ```
522    /// # use google_cloud_gax::retry_policy::*;
523    /// let policy = LimitedAttemptCount::new(5);
524    /// ```
525    pub fn new(maximum_attempts: u32) -> Self {
526        Self {
527            inner: Aip194Strict,
528            maximum_attempts,
529        }
530    }
531}
532
533impl<P> LimitedAttemptCount<P>
534where
535    P: RetryPolicy,
536{
537    /// Creates a new instance with a custom inner policy.
538    ///
539    /// # Example
540    /// ```
541    /// # use google_cloud_gax::retry_policy::*;
542    /// # use google_cloud_gax::retry_state::RetryState;
543    /// let policy = LimitedAttemptCount::custom(AlwaysRetry, 2);
544    /// assert!(policy.on_error(&RetryState::new(false).set_attempt_count(1_u32), permanent_error()).is_continue());
545    /// assert!(policy.on_error(&RetryState::new(false).set_attempt_count(2_u32), permanent_error()).is_exhausted());
546    ///
547    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
548    /// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
549    /// ```
550    pub fn custom(inner: P, maximum_attempts: u32) -> Self {
551        Self {
552            inner,
553            maximum_attempts,
554        }
555    }
556}
557
558impl<P> RetryPolicy for LimitedAttemptCount<P>
559where
560    P: RetryPolicy,
561{
562    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
563        match self.inner.on_error(state, error) {
564            RetryResult::Permanent(e) => RetryResult::Permanent(e),
565            RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
566            RetryResult::Continue(e) => {
567                if state.attempt_count >= self.maximum_attempts {
568                    RetryResult::Exhausted(e)
569                } else {
570                    RetryResult::Continue(e)
571                }
572            }
573        }
574    }
575
576    fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
577        // The retry loop only calls `on_throttle()` if the policy has not
578        // been exhausted.
579        assert!(state.attempt_count < self.maximum_attempts);
580        self.inner.on_throttle(state, error)
581    }
582
583    fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
584        self.inner.remaining_time(state)
585    }
586}
587
588#[cfg(test)]
589pub(crate) mod tests {
590    use super::*;
591    use http::HeaderMap;
592    use std::error::Error as StdError;
593    use std::time::Instant;
594
595    // Verify `RetryPolicyArg` can be converted from the desired types.
596    #[test]
597    fn retry_policy_arg() {
598        let policy = LimitedAttemptCount::new(3);
599        let _ = RetryPolicyArg::from(policy);
600
601        let policy: Arc<dyn RetryPolicy> = Arc::new(LimitedAttemptCount::new(3));
602        let _ = RetryPolicyArg::from(policy);
603    }
604
605    #[test]
606    fn aip194_strict() {
607        let p = Aip194Strict;
608
609        let now = Instant::now();
610        assert!(
611            p.on_error(&idempotent_state(now), unavailable())
612                .is_continue()
613        );
614        assert!(
615            p.on_error(&non_idempotent_state(now), unavailable())
616                .is_permanent()
617        );
618        assert!(matches!(
619            p.on_throttle(&idempotent_state(now), unavailable()),
620            ThrottleResult::Continue(_)
621        ));
622
623        assert!(
624            p.on_error(&idempotent_state(now), unknown_and_503())
625                .is_continue()
626        );
627        assert!(
628            p.on_error(&non_idempotent_state(now), unknown_and_503())
629                .is_permanent()
630        );
631        assert!(matches!(
632            p.on_throttle(&idempotent_state(now), unknown_and_503()),
633            ThrottleResult::Continue(_)
634        ));
635
636        assert!(
637            p.on_error(&idempotent_state(now), permission_denied())
638                .is_permanent()
639        );
640        assert!(
641            p.on_error(&non_idempotent_state(now), permission_denied())
642                .is_permanent()
643        );
644
645        assert!(
646            p.on_error(&idempotent_state(now), http_unavailable())
647                .is_continue()
648        );
649        assert!(
650            p.on_error(&non_idempotent_state(now), http_unavailable())
651                .is_permanent()
652        );
653        assert!(matches!(
654            p.on_throttle(&idempotent_state(now), http_unavailable()),
655            ThrottleResult::Continue(_)
656        ));
657
658        assert!(
659            p.on_error(&idempotent_state(now), http_permission_denied())
660                .is_permanent()
661        );
662        assert!(
663            p.on_error(&non_idempotent_state(now), http_permission_denied())
664                .is_permanent()
665        );
666
667        assert!(
668            p.on_error(&idempotent_state(now), Error::io("err".to_string()))
669                .is_continue()
670        );
671        assert!(
672            p.on_error(&non_idempotent_state(now), Error::io("err".to_string()))
673                .is_permanent()
674        );
675
676        assert!(
677            p.on_error(&idempotent_state(now), pre_rpc_transient())
678                .is_continue()
679        );
680        assert!(
681            p.on_error(&non_idempotent_state(now), pre_rpc_transient())
682                .is_continue()
683        );
684
685        assert!(
686            p.on_error(&idempotent_state(now), Error::ser("err"))
687                .is_permanent()
688        );
689        assert!(
690            p.on_error(&non_idempotent_state(now), Error::ser("err"))
691                .is_permanent()
692        );
693        assert!(
694            p.on_error(&idempotent_state(now), Error::deser("err"))
695                .is_permanent()
696        );
697        assert!(
698            p.on_error(&non_idempotent_state(now), Error::deser("err"))
699                .is_permanent()
700        );
701
702        assert!(
703            p.remaining_time(&idempotent_state(now)).is_none(),
704            "p={p:?}, now={now:?}"
705        );
706    }
707
708    #[test]
709    fn always_retry() {
710        let p = AlwaysRetry;
711
712        let now = Instant::now();
713        assert!(
714            p.remaining_time(&idempotent_state(now)).is_none(),
715            "p={p:?}, now={now:?}"
716        );
717        assert!(
718            p.on_error(&idempotent_state(now), http_unavailable())
719                .is_continue()
720        );
721        assert!(
722            p.on_error(&non_idempotent_state(now), http_unavailable())
723                .is_continue()
724        );
725        assert!(matches!(
726            p.on_throttle(&idempotent_state(now), http_unavailable()),
727            ThrottleResult::Continue(_)
728        ));
729
730        assert!(
731            p.on_error(&idempotent_state(now), unavailable())
732                .is_continue()
733        );
734        assert!(
735            p.on_error(&non_idempotent_state(now), unavailable())
736                .is_continue()
737        );
738    }
739
740    #[test_case::test_case(true, Error::io("err"))]
741    #[test_case::test_case(true, pre_rpc_transient())]
742    #[test_case::test_case(true, Error::ser("err"))]
743    #[test_case::test_case(false, Error::io("err"))]
744    #[test_case::test_case(false, pre_rpc_transient())]
745    #[test_case::test_case(false, Error::ser("err"))]
746    fn always_retry_error_kind(idempotent: bool, error: Error) {
747        let p = AlwaysRetry;
748        let now = Instant::now();
749        let state = if idempotent {
750            idempotent_state(now)
751        } else {
752            non_idempotent_state(now)
753        };
754        assert!(p.on_error(&state, error).is_continue());
755    }
756
757    #[test]
758    fn never_retry() {
759        let p = NeverRetry;
760
761        let now = Instant::now();
762        assert!(
763            p.remaining_time(&idempotent_state(now)).is_none(),
764            "p={p:?}, now={now:?}"
765        );
766        assert!(
767            p.on_error(&idempotent_state(now), http_unavailable())
768                .is_exhausted()
769        );
770        assert!(
771            p.on_error(&non_idempotent_state(now), http_unavailable())
772                .is_exhausted()
773        );
774        assert!(matches!(
775            p.on_throttle(&idempotent_state(now), http_unavailable()),
776            ThrottleResult::Continue(_)
777        ));
778
779        assert!(
780            p.on_error(&idempotent_state(now), unavailable())
781                .is_exhausted()
782        );
783        assert!(
784            p.on_error(&non_idempotent_state(now), unavailable())
785                .is_exhausted()
786        );
787
788        assert!(
789            p.on_error(&idempotent_state(now), http_permission_denied())
790                .is_exhausted()
791        );
792        assert!(
793            p.on_error(&non_idempotent_state(now), http_permission_denied())
794                .is_exhausted()
795        );
796    }
797
798    #[test_case::test_case(true, Error::io("err"))]
799    #[test_case::test_case(true, pre_rpc_transient())]
800    #[test_case::test_case(true, Error::ser("err"))]
801    #[test_case::test_case(false, Error::io("err"))]
802    #[test_case::test_case(false, pre_rpc_transient())]
803    #[test_case::test_case(false, Error::ser("err"))]
804    fn never_retry_error_kind(idempotent: bool, error: Error) {
805        let p = NeverRetry;
806        let now = Instant::now();
807        let state = if idempotent {
808            idempotent_state(now)
809        } else {
810            non_idempotent_state(now)
811        };
812        assert!(p.on_error(&state, error).is_exhausted());
813    }
814
815    fn pre_rpc_transient() -> Error {
816        use crate::error::CredentialsError;
817        Error::authentication(CredentialsError::from_msg(true, "err"))
818    }
819
820    fn http_unavailable() -> Error {
821        Error::http(
822            503_u16,
823            HeaderMap::new(),
824            bytes::Bytes::from_owner("SERVICE UNAVAILABLE".to_string()),
825        )
826    }
827
828    fn http_permission_denied() -> Error {
829        Error::http(
830            403_u16,
831            HeaderMap::new(),
832            bytes::Bytes::from_owner("PERMISSION DENIED".to_string()),
833        )
834    }
835
836    fn unavailable() -> Error {
837        use crate::error::rpc::Code;
838        let status = crate::error::rpc::Status::default()
839            .set_code(Code::Unavailable)
840            .set_message("UNAVAILABLE");
841        Error::service(status)
842    }
843
844    fn unknown_and_503() -> Error {
845        use crate::error::rpc::Code;
846        let status = crate::error::rpc::Status::default()
847            .set_code(Code::Unknown)
848            .set_message("UNAVAILABLE");
849        Error::service_full(status, Some(503), None, Some("source error".into()))
850    }
851
852    fn permission_denied() -> Error {
853        use crate::error::rpc::Code;
854        let status = crate::error::rpc::Status::default()
855            .set_code(Code::PermissionDenied)
856            .set_message("PERMISSION_DENIED");
857        Error::service(status)
858    }
859
860    mockall::mock! {
861        #[derive(Debug)]
862        pub(crate) Policy {}
863        impl RetryPolicy for Policy {
864            fn on_error(&self, state: &RetryState, error: Error) -> RetryResult;
865            fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult;
866            fn remaining_time(&self, state: &RetryState) -> Option<Duration>;
867        }
868    }
869
870    #[test]
871    fn limited_elapsed_time_error() {
872        let limit = Duration::from_secs(123) + Duration::from_millis(567);
873        let err = LimitedElapsedTimeError::new(limit, unavailable());
874        assert_eq!(err.maximum_duration(), limit);
875        let fmt = err.to_string();
876        assert!(fmt.contains("123.567s"), "display={fmt}, debug={err:?}");
877        assert!(err.source().is_some(), "{err:?}");
878    }
879
880    #[test]
881    fn test_limited_time_forwards() {
882        let mut mock = MockPolicy::new();
883        mock.expect_on_error()
884            .times(1..)
885            .returning(|_, e| RetryResult::Continue(e));
886        mock.expect_on_throttle()
887            .times(1..)
888            .returning(|_, e| ThrottleResult::Continue(e));
889        mock.expect_remaining_time().times(1).returning(|_| None);
890
891        let now = Instant::now();
892        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
893        let rf = policy.on_error(&idempotent_state(now), transient_error());
894        assert!(rf.is_continue());
895
896        let rt = policy.remaining_time(&idempotent_state(now));
897        assert!(rt.is_some(), "policy={policy:?}, now={now:?}");
898
899        let e = policy.on_throttle(&idempotent_state(now), transient_error());
900        assert!(matches!(e, ThrottleResult::Continue(_)));
901    }
902
903    #[test]
904    fn test_limited_time_on_throttle_continue() {
905        let mut mock = MockPolicy::new();
906        mock.expect_on_throttle()
907            .times(1..)
908            .returning(|_, e| ThrottleResult::Continue(e));
909
910        let now = Instant::now();
911        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
912
913        // Before the policy expires the inner result is returned verbatim.
914        let rf = policy.on_throttle(
915            &idempotent_state(now - Duration::from_secs(50)),
916            unavailable(),
917        );
918        assert!(matches!(rf, ThrottleResult::Continue(_)), "{rf:?}");
919
920        // After the policy expires the innter result is always "exhausted".
921        let rf = policy.on_throttle(
922            &idempotent_state(now - Duration::from_secs(70)),
923            unavailable(),
924        );
925        assert!(matches!(rf, ThrottleResult::Exhausted(_)), "{rf:?}");
926    }
927
928    #[test]
929    fn test_limited_time_on_throttle_exhausted() {
930        let mut mock = MockPolicy::new();
931        mock.expect_on_throttle()
932            .times(1..)
933            .returning(|_, e| ThrottleResult::Exhausted(e));
934
935        let now = Instant::now();
936        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
937
938        // Before the policy expires the inner result is returned verbatim.
939        let rf = policy.on_throttle(
940            &idempotent_state(now - Duration::from_secs(50)),
941            unavailable(),
942        );
943        assert!(matches!(rf, ThrottleResult::Exhausted(_)), "{rf:?}");
944    }
945
946    #[test]
947    fn test_limited_time_inner_continues() {
948        let mut mock = MockPolicy::new();
949        mock.expect_on_error()
950            .times(1..)
951            .returning(|_, e| RetryResult::Continue(e));
952
953        let now = Instant::now();
954        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
955        let rf = policy.on_error(
956            &idempotent_state(now - Duration::from_secs(10)),
957            transient_error(),
958        );
959        assert!(rf.is_continue());
960
961        let rf = policy.on_error(
962            &idempotent_state(now - Duration::from_secs(70)),
963            transient_error(),
964        );
965        assert!(rf.is_exhausted());
966    }
967
968    #[test]
969    fn test_limited_time_inner_permanent() {
970        let mut mock = MockPolicy::new();
971        mock.expect_on_error()
972            .times(2)
973            .returning(|_, e| RetryResult::Permanent(e));
974
975        let now = Instant::now();
976        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
977
978        let rf = policy.on_error(
979            &non_idempotent_state(now - Duration::from_secs(10)),
980            transient_error(),
981        );
982        assert!(rf.is_permanent());
983
984        let rf = policy.on_error(
985            &non_idempotent_state(now + Duration::from_secs(10)),
986            transient_error(),
987        );
988        assert!(rf.is_permanent());
989    }
990
991    #[test]
992    fn test_limited_time_inner_exhausted() {
993        let mut mock = MockPolicy::new();
994        mock.expect_on_error()
995            .times(2)
996            .returning(|_, e| RetryResult::Exhausted(e));
997
998        let now = Instant::now();
999        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
1000
1001        let rf = policy.on_error(
1002            &non_idempotent_state(now - Duration::from_secs(10)),
1003            transient_error(),
1004        );
1005        assert!(rf.is_exhausted());
1006
1007        let rf = policy.on_error(
1008            &non_idempotent_state(now + Duration::from_secs(10)),
1009            transient_error(),
1010        );
1011        assert!(rf.is_exhausted());
1012    }
1013
1014    #[test]
1015    fn test_limited_time_remaining_inner_longer() {
1016        let mut mock = MockPolicy::new();
1017        mock.expect_remaining_time()
1018            .times(1)
1019            .returning(|_| Some(Duration::from_secs(30)));
1020
1021        let now = Instant::now();
1022        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
1023
1024        let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(55)));
1025        assert!(remaining <= Some(Duration::from_secs(5)), "{remaining:?}");
1026    }
1027
1028    #[test]
1029    fn test_limited_time_remaining_inner_shorter() {
1030        let mut mock = MockPolicy::new();
1031        mock.expect_remaining_time()
1032            .times(1)
1033            .returning(|_| Some(Duration::from_secs(5)));
1034        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
1035
1036        let now = Instant::now();
1037        let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(5)));
1038        assert!(remaining <= Some(Duration::from_secs(10)), "{remaining:?}");
1039    }
1040
1041    #[test]
1042    fn test_limited_time_remaining_inner_is_none() {
1043        let mut mock = MockPolicy::new();
1044        mock.expect_remaining_time().times(1).returning(|_| None);
1045        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
1046
1047        let now = Instant::now();
1048        let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(50)));
1049        assert!(remaining <= Some(Duration::from_secs(10)), "{remaining:?}");
1050    }
1051
1052    #[test]
1053    fn test_limited_attempt_count_on_error() {
1054        let mut mock = MockPolicy::new();
1055        mock.expect_on_error()
1056            .times(1..)
1057            .returning(|_, e| RetryResult::Continue(e));
1058
1059        let now = Instant::now();
1060        let policy = LimitedAttemptCount::custom(mock, 3);
1061        assert!(
1062            policy
1063                .on_error(
1064                    &idempotent_state(now).set_attempt_count(1_u32),
1065                    transient_error()
1066                )
1067                .is_continue()
1068        );
1069        assert!(
1070            policy
1071                .on_error(
1072                    &idempotent_state(now).set_attempt_count(2_u32),
1073                    transient_error()
1074                )
1075                .is_continue()
1076        );
1077        assert!(
1078            policy
1079                .on_error(
1080                    &idempotent_state(now).set_attempt_count(3_u32),
1081                    transient_error()
1082                )
1083                .is_exhausted()
1084        );
1085    }
1086
1087    #[test]
1088    fn test_limited_attempt_count_on_throttle_continue() {
1089        let mut mock = MockPolicy::new();
1090        mock.expect_on_throttle()
1091            .times(1..)
1092            .returning(|_, e| ThrottleResult::Continue(e));
1093
1094        let now = Instant::now();
1095        let policy = LimitedAttemptCount::custom(mock, 3);
1096        assert!(matches!(
1097            policy.on_throttle(
1098                &idempotent_state(now).set_attempt_count(2_u32),
1099                unavailable()
1100            ),
1101            ThrottleResult::Continue(_)
1102        ));
1103    }
1104
1105    #[test]
1106    fn test_limited_attempt_count_on_throttle_error() {
1107        let mut mock = MockPolicy::new();
1108        mock.expect_on_throttle()
1109            .times(1..)
1110            .returning(|_, e| ThrottleResult::Exhausted(e));
1111
1112        let now = Instant::now();
1113        let policy = LimitedAttemptCount::custom(mock, 3);
1114        assert!(matches!(
1115            policy.on_throttle(&idempotent_state(now), unavailable()),
1116            ThrottleResult::Exhausted(_)
1117        ));
1118    }
1119
1120    #[test]
1121    fn test_limited_attempt_count_remaining_none() {
1122        let mut mock = MockPolicy::new();
1123        mock.expect_remaining_time().times(1).returning(|_| None);
1124        let policy = LimitedAttemptCount::custom(mock, 3);
1125
1126        let now = Instant::now();
1127        assert!(
1128            policy.remaining_time(&idempotent_state(now)).is_none(),
1129            "policy={policy:?} now={now:?}"
1130        );
1131    }
1132
1133    #[test]
1134    fn test_limited_attempt_count_remaining_some() {
1135        let mut mock = MockPolicy::new();
1136        mock.expect_remaining_time()
1137            .times(1)
1138            .returning(|_| Some(Duration::from_secs(123)));
1139        let policy = LimitedAttemptCount::custom(mock, 3);
1140
1141        let now = Instant::now();
1142        assert_eq!(
1143            policy.remaining_time(&idempotent_state(now)),
1144            Some(Duration::from_secs(123))
1145        );
1146    }
1147
1148    #[test]
1149    fn test_limited_attempt_count_inner_permanent() {
1150        let mut mock = MockPolicy::new();
1151        mock.expect_on_error()
1152            .times(2)
1153            .returning(|_, e| RetryResult::Permanent(e));
1154        let policy = LimitedAttemptCount::custom(mock, 2);
1155        let now = Instant::now();
1156
1157        let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1158        assert!(rf.is_permanent());
1159
1160        let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1161        assert!(rf.is_permanent());
1162    }
1163
1164    #[test]
1165    fn test_limited_attempt_count_inner_exhausted() {
1166        let mut mock = MockPolicy::new();
1167        mock.expect_on_error()
1168            .times(2)
1169            .returning(|_, e| RetryResult::Exhausted(e));
1170        let policy = LimitedAttemptCount::custom(mock, 2);
1171        let now = Instant::now();
1172
1173        let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1174        assert!(rf.is_exhausted());
1175
1176        let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1177        assert!(rf.is_exhausted());
1178    }
1179
1180    fn transient_error() -> Error {
1181        use crate::error::rpc::{Code, Status};
1182        Error::service(
1183            Status::default()
1184                .set_code(Code::Unavailable)
1185                .set_message("try-again"),
1186        )
1187    }
1188
1189    pub(crate) fn idempotent_state(now: Instant) -> RetryState {
1190        RetryState::new(true).set_start(now)
1191    }
1192
1193    pub(crate) fn non_idempotent_state(now: Instant) -> RetryState {
1194        RetryState::new(false).set_start(now)
1195    }
1196}