Skip to main content

google_cloud_gax/
retry_policy.rs

1// Copyright 2024 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines traits for retry policies and some common implementations.
16//!
17//! The client libraries automatically retry RPCs when (1) they fail due to
18//! transient errors **and** the RPC is [idempotent], (2) or failed before an
19//! RPC was started. That is, when it is safe to attempt the RPC more than once.
20//!
21//! Applications may override the default behavior, increasing the retry
22//! attempts, or changing what errors are considered safe to retry.
23//!
24//! This module defines the traits for retry policies and some common
25//! implementations.
26//!
27//! To configure the default throttler for a client, use
28//! [ClientBuilder::with_retry_policy]. To configure the retry policy used for
29//! a specific request, use [RequestOptionsBuilder::with_retry_policy].
30//!
31//! [ClientBuilder::with_retry_policy]: crate::client_builder::ClientBuilder::with_retry_policy
32//! [RequestOptionsBuilder::with_retry_policy]: crate::options::RequestOptionsBuilder::with_retry_policy
33//!
34//! # Examples
35//!
36//! Create a policy that only retries transient errors, and retries for at
37//! most 10 seconds or at most 5 attempts: whichever limit is reached first
38//! stops the retry loop.
39//! ```
40//! # use google_cloud_gax::retry_policy::*;
41//! use std::time::Duration;
42//! let policy = Aip194Strict.with_time_limit(Duration::from_secs(10)).with_attempt_limit(5);
43//! ```
44//!
45//! Create a policy that retries on any error (even when unsafe to do so),
46//! and stops retrying after 5 attempts or 10 seconds, whichever limit is
47//! reached first stops the retry loop.
48//! ```
49//! # use google_cloud_gax::retry_policy::*;
50//! use std::time::Duration;
51//! let policy = AlwaysRetry.with_time_limit(Duration::from_secs(10)).with_attempt_limit(5);
52//! ```
53//!
54//! [idempotent]: https://en.wikipedia.org/wiki/Idempotence
55
56mod too_many_requests;
57
58use crate::error::Error;
59use crate::retry_result::RetryResult;
60use crate::retry_state::RetryState;
61use crate::throttle_result::ThrottleResult;
62use std::sync::Arc;
63use std::time::Duration;
64
65pub use too_many_requests::TooManyRequests;
66
67/// Determines how errors are handled in the retry loop.
68///
69/// Implementations of this trait determine if errors are retryable, and for how
70/// long the retry loop may continue.
71pub trait RetryPolicy: Send + Sync + std::fmt::Debug {
72    /// Query the retry policy after an error.
73    ///
74    /// # Parameters
75    /// * `state` - the state of the retry loop.
76    /// * `error` - the last error when attempting the request.
77    #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
78    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult;
79
80    /// Query the retry policy after a retry attempt is throttled.
81    ///
82    /// Retry attempts may be throttled before they are even sent out. The retry
83    /// policy may choose to treat these as normal errors, consuming attempts,
84    /// or may prefer to ignore them and always return [RetryResult::Continue].
85    ///
86    /// # Parameters
87    /// * `_state` - the state of the retry loop.
88    /// * `error` - the previous error that caused the retry attempt. Throttling
89    ///   only applies to retry attempts, and a retry attempt implies that a
90    ///   previous attempt failed. The retry policy should preserve this error.
91    #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
92    fn on_throttle(&self, _state: &RetryState, error: Error) -> ThrottleResult {
93        ThrottleResult::Continue(error)
94    }
95
96    /// The remaining time in the retry policy.
97    ///
98    /// For policies based on time, this returns the remaining time in the
99    /// policy. The retry loop can use this value to adjust the next RPC
100    /// timeout. For policies that are not time based this returns `None`.
101    ///
102    /// # Parameters
103    /// * `_state` - the state of the retry loop.
104    /// * `attempt_count` - the number of attempts. This method is called before
105    ///   the first attempt, so the first value is zero.
106    #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
107    fn remaining_time(&self, _state: &RetryState) -> Option<Duration> {
108        None
109    }
110}
111
112/// A helper type to use [RetryPolicy] in client and request options.
113#[derive(Clone, Debug)]
114pub struct RetryPolicyArg(Arc<dyn RetryPolicy>);
115
116impl<T> std::convert::From<T> for RetryPolicyArg
117where
118    T: RetryPolicy + 'static,
119{
120    fn from(value: T) -> Self {
121        Self(Arc::new(value))
122    }
123}
124
125impl std::convert::From<Arc<dyn RetryPolicy>> for RetryPolicyArg {
126    fn from(value: Arc<dyn RetryPolicy>) -> Self {
127        Self(value)
128    }
129}
130
131impl From<RetryPolicyArg> for Arc<dyn RetryPolicy> {
132    fn from(value: RetryPolicyArg) -> Arc<dyn RetryPolicy> {
133        value.0
134    }
135}
136
137/// Extension trait for [`RetryPolicy`]
138pub trait RetryPolicyExt: RetryPolicy + Sized {
139    /// Decorate a [RetryPolicy] to limit the total elapsed time in the retry loop.
140    ///
141    /// While the time spent in the retry loop (including time in backoff) is
142    /// less than the prescribed duration the `on_error()` method returns the
143    /// results of the inner policy. After that time it returns
144    /// [Exhausted][RetryResult::Exhausted] if the inner policy returns
145    /// [Continue][RetryResult::Continue].
146    ///
147    /// The `remaining_time()` function returns the remaining time. This is
148    /// always [Duration::ZERO] once or after the policy's expiration time is
149    /// reached.
150    ///
151    /// # Example
152    /// ```
153    /// # use google_cloud_gax::retry_policy::*;
154    /// # use google_cloud_gax::retry_state::RetryState;
155    /// let d = std::time::Duration::from_secs(10);
156    /// let policy = Aip194Strict.with_time_limit(d);
157    /// assert!(policy.remaining_time(&RetryState::new(true)) <= Some(d));
158    /// ```
159    fn with_time_limit(self, maximum_duration: Duration) -> LimitedElapsedTime<Self> {
160        LimitedElapsedTime::custom(self, maximum_duration)
161    }
162
163    /// Decorate a [RetryPolicy] to limit the number of retry attempts.
164    ///
165    /// This policy decorates an inner policy and limits the total number of
166    /// attempts. Note that `on_error()` is not called before the initial
167    /// (non-retry) attempt. Therefore, setting the maximum number of attempts
168    /// to 0 or 1 results in no retry attempts.
169    ///
170    /// The policy passes through the results from the inner policy as long as
171    /// `attempt_count < maximum_attempts`. Once the maximum number of attempts
172    /// is reached, the policy returns [Exhausted][RetryResult::Exhausted] if the
173    /// inner policy returns [Continue][RetryResult::Continue].
174    ///
175    /// # Example
176    /// ```
177    /// # use google_cloud_gax::retry_policy::*;
178    /// # use google_cloud_gax::retry_state::RetryState;
179    /// let policy = Aip194Strict.with_attempt_limit(3);
180    /// assert_eq!(policy.remaining_time(&RetryState::new(true)), None);
181    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(0_u32), transient_error()).is_continue());
182    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(1_u32), transient_error()).is_continue());
183    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(2_u32), transient_error()).is_continue());
184    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(3_u32), transient_error()).is_exhausted());
185    ///
186    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
187    /// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
188    /// ```
189    fn with_attempt_limit(self, maximum_attempts: u32) -> LimitedAttemptCount<Self> {
190        LimitedAttemptCount::custom(self, maximum_attempts)
191    }
192
193    /// Decorate a [RetryPolicy] to continue on certain status codes.
194    ///
195    /// This policy decorates an inner policy and retries any errors with HTTP
196    /// status code "429 - TOO_MANY_REQUESTS" **or** where the service returns
197    /// an error with code [ResourceExhausted].
198    ///
199    /// For other errors it returns the same value as the inner policy.
200    ///
201    /// Note that [ResourceExhausted] is ambiguous and may cause problems with
202    /// some services. The code is used for both "too many requests"  and for
203    /// "quota exceeded" problems. If the quota in question is some kind of rate
204    /// limit, then using this policy may be helpful. If the quota is not a rate
205    /// limit, then this retry policy may needlessly send the same RPC multiple
206    /// times.
207    ///
208    /// You should consult the documentation for the service and RPC in question
209    /// before using this policy.
210    ///
211    /// # Example
212    /// ```
213    /// use google_cloud_gax::retry_policy::{Aip194Strict, RetryPolicy, RetryPolicyExt};
214    /// use google_cloud_gax::retry_state::RetryState;
215    /// let policy = Aip194Strict;
216    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(0_u32), too_many_requests()).is_permanent());
217    /// let policy = Aip194Strict.continue_on_too_many_requests();
218    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(0_u32), too_many_requests()).is_continue());
219    ///
220    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
221    /// fn too_many_requests() -> Error { Error::service(Status::default().set_code(Code::ResourceExhausted)) }
222    /// ```
223    ///
224    /// [ResourceExhausted]: crate::error::rpc::Code::ResourceExhausted
225    fn continue_on_too_many_requests(self) -> TooManyRequests<Self> {
226        TooManyRequests::new(self)
227    }
228}
229
230impl<T: RetryPolicy> RetryPolicyExt for T {}
231
232/// A retry policy that strictly follows [AIP-194].
233///
234/// This policy must be decorated to limit the number of retry attempts or the
235/// duration of the retry loop.
236///
237/// The policy interprets AIP-194 **strictly**, the retry decision for
238/// server-side errors are based only on the status code, and the only retryable
239/// status code is "UNAVAILABLE".
240///
241/// # Example
242/// ```
243/// # use google_cloud_gax::retry_policy::*;
244/// # use google_cloud_gax::retry_state::RetryState;
245/// let policy = Aip194Strict;
246/// assert!(policy.on_error(&RetryState::new(true), transient_error()).is_continue());
247/// assert!(policy.on_error(&RetryState::new(true), permanent_error()).is_permanent());
248///
249/// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
250/// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
251/// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
252/// ```
253///
254/// [AIP-194]: https://google.aip.dev/194
255#[derive(Clone, Debug)]
256pub struct Aip194Strict;
257
258impl RetryPolicy for Aip194Strict {
259    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
260        use crate::error::rpc::Code;
261        use http::StatusCode;
262
263        if error.is_transient_and_before_rpc() {
264            return RetryResult::Continue(error);
265        }
266        if !state.idempotent {
267            return RetryResult::Permanent(error);
268        }
269        if error.is_io() {
270            return RetryResult::Continue(error);
271        }
272        if error.status().is_some_and(|s| s.code == Code::Unavailable) {
273            return RetryResult::Continue(error);
274        }
275        // Some services return a status of "Unknown" and a http status code of 503
276        // (SERVICE_UNAVAILABLE). That is not how gRPC status codes are supposed to work, but the
277        // intent is clear: we need to retry.
278        if error
279            .http_status_code()
280            .is_some_and(|code| code == StatusCode::SERVICE_UNAVAILABLE.as_u16())
281        {
282            return RetryResult::Continue(error);
283        }
284        RetryResult::Permanent(error)
285    }
286}
287
288/// A retry policy that retries all errors.
289///
290/// This policy must be decorated to limit the number of retry attempts or the
291/// duration of the retry loop.
292///
293/// The policy retries all errors. This may be useful if the service guarantees
294/// idempotency, maybe through the use of request ids.
295///
296/// # Example
297/// ```
298/// # use google_cloud_gax::retry_policy::*;
299/// # use google_cloud_gax::retry_state::RetryState;
300/// let policy = AlwaysRetry;
301/// assert!(policy.on_error(&RetryState::new(true), transient_error()).is_continue());
302/// assert!(policy.on_error(&RetryState::new(true), permanent_error()).is_continue());
303///
304/// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
305/// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
306/// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
307/// ```
308#[derive(Clone, Debug)]
309pub struct AlwaysRetry;
310
311impl RetryPolicy for AlwaysRetry {
312    fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
313        RetryResult::Continue(error)
314    }
315}
316
317/// A retry policy that never retries.
318///
319/// This policy is useful when the client already has (or may already have) a
320/// retry policy configured, and you want to avoid retrying a particular method.
321///
322/// # Example
323/// ```
324/// # use google_cloud_gax::retry_policy::*;
325/// # use google_cloud_gax::retry_state::RetryState;
326/// let policy = NeverRetry;
327/// assert!(policy.on_error(&RetryState::new(true), transient_error()).is_exhausted());
328/// assert!(policy.on_error(&RetryState::new(true), permanent_error()).is_exhausted());
329///
330/// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
331/// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
332/// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
333/// ```
334#[derive(Clone, Debug)]
335pub struct NeverRetry;
336
337impl RetryPolicy for NeverRetry {
338    fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
339        RetryResult::Exhausted(error)
340    }
341}
342
343#[derive(thiserror::Error, Debug)]
344pub struct LimitedElapsedTimeError {
345    maximum_duration: Duration,
346    #[source]
347    source: Error,
348}
349
350impl LimitedElapsedTimeError {
351    pub(crate) fn new(maximum_duration: Duration, source: Error) -> Self {
352        Self {
353            maximum_duration,
354            source,
355        }
356    }
357
358    /// Returns the maximum number of attempts in the exhausted policy.
359    pub fn maximum_duration(&self) -> Duration {
360        self.maximum_duration
361    }
362}
363
364impl std::fmt::Display for LimitedElapsedTimeError {
365    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
366        write!(
367            f,
368            "retry policy is exhausted after {}s, the last retry attempt was throttled",
369            self.maximum_duration.as_secs_f64()
370        )
371    }
372}
373
374/// A retry policy decorator that limits the total time in the retry loop.
375///
376/// This policy decorates an inner policy and limits the duration of retry
377/// loops. While the time spent in the retry loop (including time in backoff)
378/// is less than the prescribed duration the `on_error()` method returns the
379/// results of the inner policy. After that time it returns
380/// [Exhausted][RetryResult::Exhausted] if the inner policy returns
381/// [Continue][RetryResult::Continue].
382///
383/// The `remaining_time()` function returns the remaining time. This is always
384/// [Duration::ZERO] once or after the policy's deadline is reached.
385///
386/// # Parameters
387/// * `P` - the inner retry policy, defaults to [Aip194Strict].
388#[derive(Debug)]
389pub struct LimitedElapsedTime<P = Aip194Strict>
390where
391    P: RetryPolicy,
392{
393    inner: P,
394    maximum_duration: Duration,
395}
396
397impl LimitedElapsedTime {
398    /// Creates a new instance, with the default inner policy.
399    ///
400    /// # Example
401    /// ```
402    /// # use google_cloud_gax::retry_policy::*;
403    /// # use google_cloud_gax::retry_state::RetryState;
404    /// let d = std::time::Duration::from_secs(10);
405    /// let policy = LimitedElapsedTime::new(d);
406    /// assert!(policy.remaining_time(&RetryState::new(true)) <= Some(d));
407    /// ```
408    pub fn new(maximum_duration: Duration) -> Self {
409        Self {
410            inner: Aip194Strict,
411            maximum_duration,
412        }
413    }
414}
415
416impl<P> LimitedElapsedTime<P>
417where
418    P: RetryPolicy,
419{
420    /// Creates a new instance with a custom inner policy.
421    ///
422    /// # Example
423    /// ```
424    /// # use google_cloud_gax::retry_policy::*;
425    /// # use google_cloud_gax::retry_state::RetryState;
426    /// # use google_cloud_gax::error;
427    /// use std::time::{Duration, Instant};
428    /// let d = Duration::from_secs(10);
429    /// let policy = AlwaysRetry.with_time_limit(d);
430    /// assert!(policy.remaining_time(&RetryState::new(false)) <= Some(d));
431    /// assert!(policy.on_error(&RetryState::new(false), permanent_error()).is_continue());
432    ///
433    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
434    /// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
435    /// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
436    /// ```
437    pub fn custom(inner: P, maximum_duration: Duration) -> Self {
438        Self {
439            inner,
440            maximum_duration,
441        }
442    }
443
444    fn error_if_exhausted(&self, state: &RetryState, error: Error) -> ThrottleResult {
445        let deadline = state.start + self.maximum_duration;
446        let now = tokio::time::Instant::now().into_std();
447        if now < deadline {
448            ThrottleResult::Continue(error)
449        } else {
450            ThrottleResult::Exhausted(Error::exhausted(LimitedElapsedTimeError::new(
451                self.maximum_duration,
452                error,
453            )))
454        }
455    }
456}
457
458impl<P> RetryPolicy for LimitedElapsedTime<P>
459where
460    P: RetryPolicy + 'static,
461{
462    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
463        match self.inner.on_error(state, error) {
464            RetryResult::Permanent(e) => RetryResult::Permanent(e),
465            RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
466            RetryResult::Continue(e) => {
467                if tokio::time::Instant::now().into_std() >= state.start + self.maximum_duration {
468                    RetryResult::Exhausted(e)
469                } else {
470                    RetryResult::Continue(e)
471                }
472            }
473        }
474    }
475
476    fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
477        match self.inner.on_throttle(state, error) {
478            ThrottleResult::Continue(e) => self.error_if_exhausted(state, e),
479            ThrottleResult::Exhausted(e) => ThrottleResult::Exhausted(e),
480        }
481    }
482
483    fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
484        let deadline = state.start + self.maximum_duration;
485        let remaining = deadline.saturating_duration_since(tokio::time::Instant::now().into_std());
486        if let Some(inner) = self.inner.remaining_time(state) {
487            return Some(std::cmp::min(remaining, inner));
488        }
489        Some(remaining)
490    }
491}
492
493/// A retry policy decorator that limits the number of attempts.
494///
495/// This policy decorates an inner policy and limits the total number of
496/// attempts. Note that `on_error()` is not called before the initial
497/// (non-retry) attempt. Therefore, setting the maximum number of attempts to 0
498/// or 1 results in no retry attempts.
499///
500/// The policy passes through the results from the inner policy as long as
501/// `attempt_count < maximum_attempts`. However, once the maximum number of
502/// attempts is reached, the policy replaces any [Continue][RetryResult::Continue]
503/// result with [Exhausted][RetryResult::Exhausted].
504///
505/// # Parameters
506/// * `P` - the inner retry policy.
507#[derive(Debug)]
508pub struct LimitedAttemptCount<P = Aip194Strict>
509where
510    P: RetryPolicy,
511{
512    inner: P,
513    maximum_attempts: u32,
514}
515
516impl LimitedAttemptCount {
517    /// Creates a new instance, with the default inner policy.
518    ///
519    /// # Example
520    /// ```
521    /// # use google_cloud_gax::retry_policy::*;
522    /// let policy = LimitedAttemptCount::new(5);
523    /// ```
524    pub fn new(maximum_attempts: u32) -> Self {
525        Self {
526            inner: Aip194Strict,
527            maximum_attempts,
528        }
529    }
530}
531
532impl<P> LimitedAttemptCount<P>
533where
534    P: RetryPolicy,
535{
536    /// Creates a new instance with a custom inner policy.
537    ///
538    /// # Example
539    /// ```
540    /// # use google_cloud_gax::retry_policy::*;
541    /// # use google_cloud_gax::retry_state::RetryState;
542    /// let policy = LimitedAttemptCount::custom(AlwaysRetry, 2);
543    /// assert!(policy.on_error(&RetryState::new(false).set_attempt_count(1_u32), permanent_error()).is_continue());
544    /// assert!(policy.on_error(&RetryState::new(false).set_attempt_count(2_u32), permanent_error()).is_exhausted());
545    ///
546    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
547    /// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
548    /// ```
549    pub fn custom(inner: P, maximum_attempts: u32) -> Self {
550        Self {
551            inner,
552            maximum_attempts,
553        }
554    }
555}
556
557impl<P> RetryPolicy for LimitedAttemptCount<P>
558where
559    P: RetryPolicy,
560{
561    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
562        match self.inner.on_error(state, error) {
563            RetryResult::Permanent(e) => RetryResult::Permanent(e),
564            RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
565            RetryResult::Continue(e) => {
566                if state.attempt_count >= self.maximum_attempts {
567                    RetryResult::Exhausted(e)
568                } else {
569                    RetryResult::Continue(e)
570                }
571            }
572        }
573    }
574
575    fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
576        // The retry loop only calls `on_throttle()` if the policy has not
577        // been exhausted.
578        assert!(state.attempt_count < self.maximum_attempts);
579        self.inner.on_throttle(state, error)
580    }
581
582    fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
583        self.inner.remaining_time(state)
584    }
585}
586
587#[cfg(test)]
588pub mod tests {
589    use super::*;
590    use http::HeaderMap;
591    use std::error::Error as StdError;
592    use std::time::Instant;
593
594    // Verify `RetryPolicyArg` can be converted from the desired types.
595    #[test]
596    fn retry_policy_arg() {
597        let policy = LimitedAttemptCount::new(3);
598        let _ = RetryPolicyArg::from(policy);
599
600        let policy: Arc<dyn RetryPolicy> = Arc::new(LimitedAttemptCount::new(3));
601        let _ = RetryPolicyArg::from(policy);
602    }
603
604    #[test]
605    fn aip194_strict() {
606        let p = Aip194Strict;
607
608        let now = Instant::now();
609        assert!(
610            p.on_error(&idempotent_state(now), unavailable())
611                .is_continue()
612        );
613        assert!(
614            p.on_error(&non_idempotent_state(now), unavailable())
615                .is_permanent()
616        );
617        assert!(matches!(
618            p.on_throttle(&idempotent_state(now), unavailable()),
619            ThrottleResult::Continue(_)
620        ));
621
622        assert!(
623            p.on_error(&idempotent_state(now), unknown_and_503())
624                .is_continue()
625        );
626        assert!(
627            p.on_error(&non_idempotent_state(now), unknown_and_503())
628                .is_permanent()
629        );
630        assert!(matches!(
631            p.on_throttle(&idempotent_state(now), unknown_and_503()),
632            ThrottleResult::Continue(_)
633        ));
634
635        assert!(
636            p.on_error(&idempotent_state(now), permission_denied())
637                .is_permanent()
638        );
639        assert!(
640            p.on_error(&non_idempotent_state(now), permission_denied())
641                .is_permanent()
642        );
643
644        assert!(
645            p.on_error(&idempotent_state(now), http_unavailable())
646                .is_continue()
647        );
648        assert!(
649            p.on_error(&non_idempotent_state(now), http_unavailable())
650                .is_permanent()
651        );
652        assert!(matches!(
653            p.on_throttle(&idempotent_state(now), http_unavailable()),
654            ThrottleResult::Continue(_)
655        ));
656
657        assert!(
658            p.on_error(&idempotent_state(now), http_permission_denied())
659                .is_permanent()
660        );
661        assert!(
662            p.on_error(&non_idempotent_state(now), http_permission_denied())
663                .is_permanent()
664        );
665
666        assert!(
667            p.on_error(&idempotent_state(now), Error::io("err".to_string()))
668                .is_continue()
669        );
670        assert!(
671            p.on_error(&non_idempotent_state(now), Error::io("err".to_string()))
672                .is_permanent()
673        );
674
675        assert!(
676            p.on_error(&idempotent_state(now), pre_rpc_transient())
677                .is_continue()
678        );
679        assert!(
680            p.on_error(&non_idempotent_state(now), pre_rpc_transient())
681                .is_continue()
682        );
683
684        assert!(
685            p.on_error(&idempotent_state(now), Error::ser("err"))
686                .is_permanent()
687        );
688        assert!(
689            p.on_error(&non_idempotent_state(now), Error::ser("err"))
690                .is_permanent()
691        );
692        assert!(
693            p.on_error(&idempotent_state(now), Error::deser("err"))
694                .is_permanent()
695        );
696        assert!(
697            p.on_error(&non_idempotent_state(now), Error::deser("err"))
698                .is_permanent()
699        );
700
701        assert!(
702            p.remaining_time(&idempotent_state(now)).is_none(),
703            "p={p:?}, now={now:?}"
704        );
705    }
706
707    #[test]
708    fn always_retry() {
709        let p = AlwaysRetry;
710
711        let now = Instant::now();
712        assert!(
713            p.remaining_time(&idempotent_state(now)).is_none(),
714            "p={p:?}, now={now:?}"
715        );
716        assert!(
717            p.on_error(&idempotent_state(now), http_unavailable())
718                .is_continue()
719        );
720        assert!(
721            p.on_error(&non_idempotent_state(now), http_unavailable())
722                .is_continue()
723        );
724        assert!(matches!(
725            p.on_throttle(&idempotent_state(now), http_unavailable()),
726            ThrottleResult::Continue(_)
727        ));
728
729        assert!(
730            p.on_error(&idempotent_state(now), unavailable())
731                .is_continue()
732        );
733        assert!(
734            p.on_error(&non_idempotent_state(now), unavailable())
735                .is_continue()
736        );
737    }
738
739    #[test_case::test_case(true, Error::io("err"))]
740    #[test_case::test_case(true, pre_rpc_transient())]
741    #[test_case::test_case(true, Error::ser("err"))]
742    #[test_case::test_case(false, Error::io("err"))]
743    #[test_case::test_case(false, pre_rpc_transient())]
744    #[test_case::test_case(false, Error::ser("err"))]
745    fn always_retry_error_kind(idempotent: bool, error: Error) {
746        let p = AlwaysRetry;
747        let now = Instant::now();
748        let state = if idempotent {
749            idempotent_state(now)
750        } else {
751            non_idempotent_state(now)
752        };
753        assert!(p.on_error(&state, error).is_continue());
754    }
755
756    #[test]
757    fn never_retry() {
758        let p = NeverRetry;
759
760        let now = Instant::now();
761        assert!(
762            p.remaining_time(&idempotent_state(now)).is_none(),
763            "p={p:?}, now={now:?}"
764        );
765        assert!(
766            p.on_error(&idempotent_state(now), http_unavailable())
767                .is_exhausted()
768        );
769        assert!(
770            p.on_error(&non_idempotent_state(now), http_unavailable())
771                .is_exhausted()
772        );
773        assert!(matches!(
774            p.on_throttle(&idempotent_state(now), http_unavailable()),
775            ThrottleResult::Continue(_)
776        ));
777
778        assert!(
779            p.on_error(&idempotent_state(now), unavailable())
780                .is_exhausted()
781        );
782        assert!(
783            p.on_error(&non_idempotent_state(now), unavailable())
784                .is_exhausted()
785        );
786
787        assert!(
788            p.on_error(&idempotent_state(now), http_permission_denied())
789                .is_exhausted()
790        );
791        assert!(
792            p.on_error(&non_idempotent_state(now), http_permission_denied())
793                .is_exhausted()
794        );
795    }
796
797    #[test_case::test_case(true, Error::io("err"))]
798    #[test_case::test_case(true, pre_rpc_transient())]
799    #[test_case::test_case(true, Error::ser("err"))]
800    #[test_case::test_case(false, Error::io("err"))]
801    #[test_case::test_case(false, pre_rpc_transient())]
802    #[test_case::test_case(false, Error::ser("err"))]
803    fn never_retry_error_kind(idempotent: bool, error: Error) {
804        let p = NeverRetry;
805        let now = Instant::now();
806        let state = if idempotent {
807            idempotent_state(now)
808        } else {
809            non_idempotent_state(now)
810        };
811        assert!(p.on_error(&state, error).is_exhausted());
812    }
813
814    fn pre_rpc_transient() -> Error {
815        use crate::error::CredentialsError;
816        Error::authentication(CredentialsError::from_msg(true, "err"))
817    }
818
819    fn http_unavailable() -> Error {
820        Error::http(
821            503_u16,
822            HeaderMap::new(),
823            bytes::Bytes::from_owner("SERVICE UNAVAILABLE".to_string()),
824        )
825    }
826
827    fn http_permission_denied() -> Error {
828        Error::http(
829            403_u16,
830            HeaderMap::new(),
831            bytes::Bytes::from_owner("PERMISSION DENIED".to_string()),
832        )
833    }
834
835    fn unavailable() -> Error {
836        use crate::error::rpc::Code;
837        let status = crate::error::rpc::Status::default()
838            .set_code(Code::Unavailable)
839            .set_message("UNAVAILABLE");
840        Error::service(status)
841    }
842
843    fn unknown_and_503() -> Error {
844        use crate::error::rpc::Code;
845        let status = crate::error::rpc::Status::default()
846            .set_code(Code::Unknown)
847            .set_message("UNAVAILABLE");
848        Error::service_full(status, Some(503), None, Some("source error".into()))
849    }
850
851    fn permission_denied() -> Error {
852        use crate::error::rpc::Code;
853        let status = crate::error::rpc::Status::default()
854            .set_code(Code::PermissionDenied)
855            .set_message("PERMISSION_DENIED");
856        Error::service(status)
857    }
858
859    mockall::mock! {
860        #[derive(Debug)]
861        pub(crate) Policy {}
862        impl RetryPolicy for Policy {
863            fn on_error(&self, state: &RetryState, error: Error) -> RetryResult;
864            fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult;
865            fn remaining_time(&self, state: &RetryState) -> Option<Duration>;
866        }
867    }
868
869    #[test]
870    fn limited_elapsed_time_error() {
871        let limit = Duration::from_secs(123) + Duration::from_millis(567);
872        let err = LimitedElapsedTimeError::new(limit, unavailable());
873        assert_eq!(err.maximum_duration(), limit);
874        let fmt = err.to_string();
875        assert!(fmt.contains("123.567s"), "display={fmt}, debug={err:?}");
876        assert!(err.source().is_some(), "{err:?}");
877    }
878
879    #[test]
880    fn test_limited_time_forwards() {
881        let mut mock = MockPolicy::new();
882        mock.expect_on_error()
883            .times(1..)
884            .returning(|_, e| RetryResult::Continue(e));
885        mock.expect_on_throttle()
886            .times(1..)
887            .returning(|_, e| ThrottleResult::Continue(e));
888        mock.expect_remaining_time().times(1).returning(|_| None);
889
890        let now = Instant::now();
891        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
892        let rf = policy.on_error(&idempotent_state(now), transient_error());
893        assert!(rf.is_continue());
894
895        let rt = policy.remaining_time(&idempotent_state(now));
896        assert!(rt.is_some(), "policy={policy:?}, now={now:?}");
897
898        let e = policy.on_throttle(&idempotent_state(now), transient_error());
899        assert!(matches!(e, ThrottleResult::Continue(_)));
900    }
901
902    #[test]
903    fn test_limited_time_on_throttle_continue() {
904        let mut mock = MockPolicy::new();
905        mock.expect_on_throttle()
906            .times(1..)
907            .returning(|_, e| ThrottleResult::Continue(e));
908
909        let now = Instant::now();
910        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
911
912        // Before the policy expires the inner result is returned verbatim.
913        let rf = policy.on_throttle(
914            &idempotent_state(now - Duration::from_secs(50)),
915            unavailable(),
916        );
917        assert!(matches!(rf, ThrottleResult::Continue(_)), "{rf:?}");
918
919        // After the policy expires the innter result is always "exhausted".
920        let rf = policy.on_throttle(
921            &idempotent_state(now - Duration::from_secs(70)),
922            unavailable(),
923        );
924        assert!(matches!(rf, ThrottleResult::Exhausted(_)), "{rf:?}");
925    }
926
927    #[test]
928    fn test_limited_time_on_throttle_exhausted() {
929        let mut mock = MockPolicy::new();
930        mock.expect_on_throttle()
931            .times(1..)
932            .returning(|_, e| ThrottleResult::Exhausted(e));
933
934        let now = Instant::now();
935        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
936
937        // Before the policy expires the inner result is returned verbatim.
938        let rf = policy.on_throttle(
939            &idempotent_state(now - Duration::from_secs(50)),
940            unavailable(),
941        );
942        assert!(matches!(rf, ThrottleResult::Exhausted(_)), "{rf:?}");
943    }
944
945    #[test]
946    fn test_limited_time_inner_continues() {
947        let mut mock = MockPolicy::new();
948        mock.expect_on_error()
949            .times(1..)
950            .returning(|_, e| RetryResult::Continue(e));
951
952        let now = Instant::now();
953        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
954        let rf = policy.on_error(
955            &idempotent_state(now - Duration::from_secs(10)),
956            transient_error(),
957        );
958        assert!(rf.is_continue());
959
960        let rf = policy.on_error(
961            &idempotent_state(now - Duration::from_secs(70)),
962            transient_error(),
963        );
964        assert!(rf.is_exhausted());
965    }
966
967    #[test]
968    fn test_limited_time_inner_permanent() {
969        let mut mock = MockPolicy::new();
970        mock.expect_on_error()
971            .times(2)
972            .returning(|_, e| RetryResult::Permanent(e));
973
974        let now = Instant::now();
975        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
976
977        let rf = policy.on_error(
978            &non_idempotent_state(now - Duration::from_secs(10)),
979            transient_error(),
980        );
981        assert!(rf.is_permanent());
982
983        let rf = policy.on_error(
984            &non_idempotent_state(now + Duration::from_secs(10)),
985            transient_error(),
986        );
987        assert!(rf.is_permanent());
988    }
989
990    #[test]
991    fn test_limited_time_inner_exhausted() {
992        let mut mock = MockPolicy::new();
993        mock.expect_on_error()
994            .times(2)
995            .returning(|_, e| RetryResult::Exhausted(e));
996
997        let now = Instant::now();
998        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
999
1000        let rf = policy.on_error(
1001            &non_idempotent_state(now - Duration::from_secs(10)),
1002            transient_error(),
1003        );
1004        assert!(rf.is_exhausted());
1005
1006        let rf = policy.on_error(
1007            &non_idempotent_state(now + Duration::from_secs(10)),
1008            transient_error(),
1009        );
1010        assert!(rf.is_exhausted());
1011    }
1012
1013    #[test]
1014    fn test_limited_time_remaining_inner_longer() {
1015        let mut mock = MockPolicy::new();
1016        mock.expect_remaining_time()
1017            .times(1)
1018            .returning(|_| Some(Duration::from_secs(30)));
1019
1020        let now = Instant::now();
1021        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
1022
1023        let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(55)));
1024        assert!(remaining <= Some(Duration::from_secs(5)), "{remaining:?}");
1025    }
1026
1027    #[test]
1028    fn test_limited_time_remaining_inner_shorter() {
1029        let mut mock = MockPolicy::new();
1030        mock.expect_remaining_time()
1031            .times(1)
1032            .returning(|_| Some(Duration::from_secs(5)));
1033        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
1034
1035        let now = Instant::now();
1036        let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(5)));
1037        assert!(remaining <= Some(Duration::from_secs(10)), "{remaining:?}");
1038    }
1039
1040    #[test]
1041    fn test_limited_time_remaining_inner_is_none() {
1042        let mut mock = MockPolicy::new();
1043        mock.expect_remaining_time().times(1).returning(|_| None);
1044        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
1045
1046        let now = Instant::now();
1047        let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(50)));
1048        assert!(remaining <= Some(Duration::from_secs(10)), "{remaining:?}");
1049    }
1050
1051    #[test]
1052    fn test_limited_attempt_count_on_error() {
1053        let mut mock = MockPolicy::new();
1054        mock.expect_on_error()
1055            .times(1..)
1056            .returning(|_, e| RetryResult::Continue(e));
1057
1058        let now = Instant::now();
1059        let policy = LimitedAttemptCount::custom(mock, 3);
1060        assert!(
1061            policy
1062                .on_error(
1063                    &idempotent_state(now).set_attempt_count(1_u32),
1064                    transient_error()
1065                )
1066                .is_continue()
1067        );
1068        assert!(
1069            policy
1070                .on_error(
1071                    &idempotent_state(now).set_attempt_count(2_u32),
1072                    transient_error()
1073                )
1074                .is_continue()
1075        );
1076        assert!(
1077            policy
1078                .on_error(
1079                    &idempotent_state(now).set_attempt_count(3_u32),
1080                    transient_error()
1081                )
1082                .is_exhausted()
1083        );
1084    }
1085
1086    #[test]
1087    fn test_limited_attempt_count_on_throttle_continue() {
1088        let mut mock = MockPolicy::new();
1089        mock.expect_on_throttle()
1090            .times(1..)
1091            .returning(|_, e| ThrottleResult::Continue(e));
1092
1093        let now = Instant::now();
1094        let policy = LimitedAttemptCount::custom(mock, 3);
1095        assert!(matches!(
1096            policy.on_throttle(
1097                &idempotent_state(now).set_attempt_count(2_u32),
1098                unavailable()
1099            ),
1100            ThrottleResult::Continue(_)
1101        ));
1102    }
1103
1104    #[test]
1105    fn test_limited_attempt_count_on_throttle_error() {
1106        let mut mock = MockPolicy::new();
1107        mock.expect_on_throttle()
1108            .times(1..)
1109            .returning(|_, e| ThrottleResult::Exhausted(e));
1110
1111        let now = Instant::now();
1112        let policy = LimitedAttemptCount::custom(mock, 3);
1113        assert!(matches!(
1114            policy.on_throttle(&idempotent_state(now), unavailable()),
1115            ThrottleResult::Exhausted(_)
1116        ));
1117    }
1118
1119    #[test]
1120    fn test_limited_attempt_count_remaining_none() {
1121        let mut mock = MockPolicy::new();
1122        mock.expect_remaining_time().times(1).returning(|_| None);
1123        let policy = LimitedAttemptCount::custom(mock, 3);
1124
1125        let now = Instant::now();
1126        assert!(
1127            policy.remaining_time(&idempotent_state(now)).is_none(),
1128            "policy={policy:?} now={now:?}"
1129        );
1130    }
1131
1132    #[test]
1133    fn test_limited_attempt_count_remaining_some() {
1134        let mut mock = MockPolicy::new();
1135        mock.expect_remaining_time()
1136            .times(1)
1137            .returning(|_| Some(Duration::from_secs(123)));
1138        let policy = LimitedAttemptCount::custom(mock, 3);
1139
1140        let now = Instant::now();
1141        assert_eq!(
1142            policy.remaining_time(&idempotent_state(now)),
1143            Some(Duration::from_secs(123))
1144        );
1145    }
1146
1147    #[test]
1148    fn test_limited_attempt_count_inner_permanent() {
1149        let mut mock = MockPolicy::new();
1150        mock.expect_on_error()
1151            .times(2)
1152            .returning(|_, e| RetryResult::Permanent(e));
1153        let policy = LimitedAttemptCount::custom(mock, 2);
1154        let now = Instant::now();
1155
1156        let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1157        assert!(rf.is_permanent());
1158
1159        let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1160        assert!(rf.is_permanent());
1161    }
1162
1163    #[test]
1164    fn test_limited_attempt_count_inner_exhausted() {
1165        let mut mock = MockPolicy::new();
1166        mock.expect_on_error()
1167            .times(2)
1168            .returning(|_, e| RetryResult::Exhausted(e));
1169        let policy = LimitedAttemptCount::custom(mock, 2);
1170        let now = Instant::now();
1171
1172        let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1173        assert!(rf.is_exhausted());
1174
1175        let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1176        assert!(rf.is_exhausted());
1177    }
1178
1179    fn transient_error() -> Error {
1180        use crate::error::rpc::{Code, Status};
1181        Error::service(
1182            Status::default()
1183                .set_code(Code::Unavailable)
1184                .set_message("try-again"),
1185        )
1186    }
1187
1188    pub(crate) fn idempotent_state(now: Instant) -> RetryState {
1189        RetryState::new(true).set_start(now)
1190    }
1191
1192    pub(crate) fn non_idempotent_state(now: Instant) -> RetryState {
1193        RetryState::new(false).set_start(now)
1194    }
1195}