Skip to main content

google_cloud_gax/
retry_policy.rs

1// Copyright 2024 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines traits for retry policies and some common implementations.
16//!
17//! The client libraries automatically retry RPCs when (1) they fail due to
18//! transient errors **and** the RPC is [idempotent], (2) or failed before an
19//! RPC was started. That is, when it is safe to attempt the RPC more than once.
20//!
21//! Applications may override the default behavior, increasing the retry
22//! attempts, or changing what errors are considered safe to retry.
23//!
24//! This module defines the traits for retry policies and some common
25//! implementations.
26//!
27//! To configure the default throttler for a client, use
28//! [ClientBuilder::with_retry_policy]. To configure the retry policy used for
29//! a specific request, use [RequestOptionsBuilder::with_retry_policy].
30//!
31//! [ClientBuilder::with_retry_policy]: crate::client_builder::ClientBuilder::with_retry_policy
32//! [RequestOptionsBuilder::with_retry_policy]: crate::options::RequestOptionsBuilder::with_retry_policy
33//!
34//! # Examples
35//!
36//! Create a policy that only retries transient errors, and retries for at
37//! most 10 seconds or at most 5 attempts: whichever limit is reached first
38//! stops the retry loop.
39//! ```
40//! # use google_cloud_gax::retry_policy::*;
41//! use std::time::Duration;
42//! let policy = Aip194Strict.with_time_limit(Duration::from_secs(10)).with_attempt_limit(5);
43//! ```
44//!
45//! Create a policy that retries on any error (even when unsafe to do so),
46//! and stops retrying after 5 attempts or 10 seconds, whichever limit is
47//! reached first stops the retry loop.
48//! ```
49//! # use google_cloud_gax::retry_policy::*;
50//! use std::time::Duration;
51//! let policy = AlwaysRetry.with_time_limit(Duration::from_secs(10)).with_attempt_limit(5);
52//! ```
53//!
54//! [idempotent]: https://en.wikipedia.org/wiki/Idempotence
55
56mod too_many_requests;
57
58use crate::error::Error;
59use crate::retry_result::RetryResult;
60use crate::retry_state::RetryState;
61use crate::throttle_result::ThrottleResult;
62use std::sync::Arc;
63use std::time::Duration;
64
65pub use too_many_requests::TooManyRequests;
66
67/// Determines how errors are handled in the retry loop.
68///
69/// Implementations of this trait determine if errors are retryable, and for how
70/// long the retry loop may continue.
71pub trait RetryPolicy: Send + Sync + std::fmt::Debug {
72    /// Query the retry policy after an error.
73    ///
74    /// # Parameters
75    /// * `state` - the state of the retry loop.
76    /// * `error` - the last error when attempting the request.
77    #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
78    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult;
79
80    /// Query the retry policy after a retry attempt is throttled.
81    ///
82    /// Retry attempts may be throttled before they are even sent out. The retry
83    /// policy may choose to treat these as normal errors, consuming attempts,
84    /// or may prefer to ignore them and always return [RetryResult::Continue].
85    ///
86    /// # Parameters
87    /// * `_state` - the state of the retry loop.
88    /// * `error` - the previous error that caused the retry attempt. Throttling
89    ///   only applies to retry attempts, and a retry attempt implies that a
90    ///   previous attempt failed. The retry policy should preserve this error.
91    #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
92    fn on_throttle(&self, _state: &RetryState, error: Error) -> ThrottleResult {
93        ThrottleResult::Continue(error)
94    }
95
96    /// The remaining time in the retry policy.
97    ///
98    /// For policies based on time, this returns the remaining time in the
99    /// policy. The retry loop can use this value to adjust the next RPC
100    /// timeout. For policies that are not time based this returns `None`.
101    ///
102    /// # Parameters
103    /// * `_state` - the state of the retry loop.
104    /// * `attempt_count` - the number of attempts. This method is called before
105    ///   the first attempt, so the first value is zero.
106    #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
107    fn remaining_time(&self, _state: &RetryState) -> Option<Duration> {
108        None
109    }
110}
111
112/// A helper type to use [RetryPolicy] in client and request options.
113#[derive(Clone, Debug)]
114pub struct RetryPolicyArg(Arc<dyn RetryPolicy>);
115
116impl<T> std::convert::From<T> for RetryPolicyArg
117where
118    T: RetryPolicy + 'static,
119{
120    fn from(value: T) -> Self {
121        Self(Arc::new(value))
122    }
123}
124
125impl std::convert::From<Arc<dyn RetryPolicy>> for RetryPolicyArg {
126    fn from(value: Arc<dyn RetryPolicy>) -> Self {
127        Self(value)
128    }
129}
130
131impl From<RetryPolicyArg> for Arc<dyn RetryPolicy> {
132    fn from(value: RetryPolicyArg) -> Arc<dyn RetryPolicy> {
133        value.0
134    }
135}
136
137/// Extension trait for [`RetryPolicy`]
138pub trait RetryPolicyExt: RetryPolicy + Sized {
139    /// Decorate a [RetryPolicy] to limit the total elapsed time in the retry loop.
140    ///
141    /// While the time spent in the retry loop (including time in backoff) is
142    /// less than the prescribed duration the `on_error()` method returns the
143    /// results of the inner policy. After that time it returns
144    /// [Exhausted][RetryResult::Exhausted] if the inner policy returns
145    /// [Continue][RetryResult::Continue].
146    ///
147    /// The `remaining_time()` function returns the remaining time. This is
148    /// always [Duration::ZERO] once or after the policy's expiration time is
149    /// reached.
150    ///
151    /// # Example
152    /// ```
153    /// # use google_cloud_gax::retry_policy::*;
154    /// # use google_cloud_gax::retry_state::RetryState;
155    /// let d = std::time::Duration::from_secs(10);
156    /// let policy = Aip194Strict.with_time_limit(d);
157    /// assert!(policy.remaining_time(&RetryState::new(true)) <= Some(d));
158    /// ```
159    fn with_time_limit(self, maximum_duration: Duration) -> LimitedElapsedTime<Self> {
160        LimitedElapsedTime::custom(self, maximum_duration)
161    }
162
163    /// Decorate a [RetryPolicy] to limit the number of retry attempts.
164    ///
165    /// This policy decorates an inner policy and limits the total number of
166    /// attempts. Note that `on_error()` is not called before the initial
167    /// (non-retry) attempt. Therefore, setting the maximum number of attempts
168    /// to 0 or 1 results in no retry attempts.
169    ///
170    /// The policy passes through the results from the inner policy as long as
171    /// `attempt_count < maximum_attempts`. Once the maximum number of attempts
172    /// is reached, the policy returns [Exhausted][RetryResult::Exhausted] if the
173    /// inner policy returns [Continue][RetryResult::Continue].
174    ///
175    /// # Example
176    /// ```
177    /// # use google_cloud_gax::retry_policy::*;
178    /// # use google_cloud_gax::retry_state::RetryState;
179    /// let policy = Aip194Strict.with_attempt_limit(3);
180    /// assert_eq!(policy.remaining_time(&RetryState::new(true)), None);
181    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(0_u32), transient_error()).is_continue());
182    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(1_u32), transient_error()).is_continue());
183    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(2_u32), transient_error()).is_continue());
184    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(3_u32), transient_error()).is_exhausted());
185    ///
186    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
187    /// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
188    /// ```
189    fn with_attempt_limit(self, maximum_attempts: u32) -> LimitedAttemptCount<Self> {
190        LimitedAttemptCount::custom(self, maximum_attempts)
191    }
192
193    /// Decorate a [RetryPolicy] to continue on certain status codes.
194    ///
195    /// This policy decorates an inner policy and retries any errors with HTTP
196    /// status code "429 - TOO_MANY_REQUESTS" **or** where the service returns
197    /// an error with code [ResourceExhausted].
198    ///
199    /// For other errors it returns the same value as the inner policy.
200    ///
201    /// Note that [ResourceExhausted] is ambiguous and may cause problems with
202    /// some services. The code is used for both "too many requests"  and for
203    /// "quota exceeded" problems. If the quota in question is some kind of rate
204    /// limit, then using this policy may be helpful. If the quota is not a rate
205    /// limit, then this retry policy may needlessly send the same RPC multiple
206    /// times.
207    ///
208    /// You should consult the documentation for the service and RPC in question
209    /// before using this policy.
210    ///
211    /// # Example
212    /// ```
213    /// use google_cloud_gax::retry_policy::{Aip194Strict, RetryPolicy, RetryPolicyExt};
214    /// use google_cloud_gax::retry_state::RetryState;
215    /// let policy = Aip194Strict;
216    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(0_u32), too_many_requests()).is_permanent());
217    /// let policy = Aip194Strict.continue_on_too_many_requests();
218    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(0_u32), too_many_requests()).is_continue());
219    ///
220    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
221    /// fn too_many_requests() -> Error { Error::service(Status::default().set_code(Code::ResourceExhausted)) }
222    /// ```
223    ///
224    /// [ResourceExhausted]: crate::error::rpc::Code::ResourceExhausted
225    fn continue_on_too_many_requests(self) -> TooManyRequests<Self> {
226        TooManyRequests::new(self)
227    }
228}
229
230impl<T: RetryPolicy> RetryPolicyExt for T {}
231
232/// A retry policy that strictly follows [AIP-194].
233///
234/// This policy must be decorated to limit the number of retry attempts or the
235/// duration of the retry loop.
236///
237/// The policy interprets AIP-194 **strictly**, the retry decision for
238/// server-side errors are based only on the status code, and the only retryable
239/// status code is "UNAVAILABLE".
240///
241/// # Example
242/// ```
243/// # use google_cloud_gax::retry_policy::*;
244/// # use google_cloud_gax::retry_state::RetryState;
245/// let policy = Aip194Strict;
246/// assert!(policy.on_error(&RetryState::new(true), transient_error()).is_continue());
247/// assert!(policy.on_error(&RetryState::new(true), permanent_error()).is_permanent());
248///
249/// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
250/// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
251/// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
252/// ```
253///
254/// [AIP-194]: https://google.aip.dev/194
255#[derive(Clone, Debug)]
256pub struct Aip194Strict;
257
258impl RetryPolicy for Aip194Strict {
259    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
260        if error.is_transient_and_before_rpc() {
261            return RetryResult::Continue(error);
262        }
263        if !state.idempotent {
264            return RetryResult::Permanent(error);
265        }
266        if error.is_io() {
267            return RetryResult::Continue(error);
268        }
269        if let Some(status) = error.status() {
270            return if status.code == crate::error::rpc::Code::Unavailable {
271                RetryResult::Continue(error)
272            } else {
273                RetryResult::Permanent(error)
274            };
275        }
276
277        match error.http_status_code() {
278            Some(code) if code == http::StatusCode::SERVICE_UNAVAILABLE.as_u16() => {
279                RetryResult::Continue(error)
280            }
281            _ => RetryResult::Permanent(error),
282        }
283    }
284}
285
286/// A retry policy that retries all errors.
287///
288/// This policy must be decorated to limit the number of retry attempts or the
289/// duration of the retry loop.
290///
291/// The policy retries all errors. This may be useful if the service guarantees
292/// idempotency, maybe through the use of request ids.
293///
294/// # Example
295/// ```
296/// # use google_cloud_gax::retry_policy::*;
297/// # use google_cloud_gax::retry_state::RetryState;
298/// let policy = AlwaysRetry;
299/// assert!(policy.on_error(&RetryState::new(true), transient_error()).is_continue());
300/// assert!(policy.on_error(&RetryState::new(true), permanent_error()).is_continue());
301///
302/// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
303/// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
304/// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
305/// ```
306#[derive(Clone, Debug)]
307pub struct AlwaysRetry;
308
309impl RetryPolicy for AlwaysRetry {
310    fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
311        RetryResult::Continue(error)
312    }
313}
314
315/// A retry policy that never retries.
316///
317/// This policy is useful when the client already has (or may already have) a
318/// retry policy configured, and you want to avoid retrying a particular method.
319///
320/// # Example
321/// ```
322/// # use google_cloud_gax::retry_policy::*;
323/// # use google_cloud_gax::retry_state::RetryState;
324/// let policy = NeverRetry;
325/// assert!(policy.on_error(&RetryState::new(true), transient_error()).is_exhausted());
326/// assert!(policy.on_error(&RetryState::new(true), permanent_error()).is_exhausted());
327///
328/// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
329/// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
330/// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
331/// ```
332#[derive(Clone, Debug)]
333pub struct NeverRetry;
334
335impl RetryPolicy for NeverRetry {
336    fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
337        RetryResult::Exhausted(error)
338    }
339}
340
341#[derive(thiserror::Error, Debug)]
342pub struct LimitedElapsedTimeError {
343    maximum_duration: Duration,
344    #[source]
345    source: Error,
346}
347
348impl LimitedElapsedTimeError {
349    pub(crate) fn new(maximum_duration: Duration, source: Error) -> Self {
350        Self {
351            maximum_duration,
352            source,
353        }
354    }
355
356    /// Returns the maximum number of attempts in the exhausted policy.
357    pub fn maximum_duration(&self) -> Duration {
358        self.maximum_duration
359    }
360}
361
362impl std::fmt::Display for LimitedElapsedTimeError {
363    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
364        write!(
365            f,
366            "retry policy is exhausted after {}s, the last retry attempt was throttled",
367            self.maximum_duration.as_secs_f64()
368        )
369    }
370}
371
372/// A retry policy decorator that limits the total time in the retry loop.
373///
374/// This policy decorates an inner policy and limits the duration of retry
375/// loops. While the time spent in the retry loop (including time in backoff)
376/// is less than the prescribed duration the `on_error()` method returns the
377/// results of the inner policy. After that time it returns
378/// [Exhausted][RetryResult::Exhausted] if the inner policy returns
379/// [Continue][RetryResult::Continue].
380///
381/// The `remaining_time()` function returns the remaining time. This is always
382/// [Duration::ZERO] once or after the policy's deadline is reached.
383///
384/// # Parameters
385/// * `P` - the inner retry policy, defaults to [Aip194Strict].
386#[derive(Debug)]
387pub struct LimitedElapsedTime<P = Aip194Strict>
388where
389    P: RetryPolicy,
390{
391    inner: P,
392    maximum_duration: Duration,
393}
394
395impl LimitedElapsedTime {
396    /// Creates a new instance, with the default inner policy.
397    ///
398    /// # Example
399    /// ```
400    /// # use google_cloud_gax::retry_policy::*;
401    /// # use google_cloud_gax::retry_state::RetryState;
402    /// let d = std::time::Duration::from_secs(10);
403    /// let policy = LimitedElapsedTime::new(d);
404    /// assert!(policy.remaining_time(&RetryState::new(true)) <= Some(d));
405    /// ```
406    pub fn new(maximum_duration: Duration) -> Self {
407        Self {
408            inner: Aip194Strict,
409            maximum_duration,
410        }
411    }
412}
413
414impl<P> LimitedElapsedTime<P>
415where
416    P: RetryPolicy,
417{
418    /// Creates a new instance with a custom inner policy.
419    ///
420    /// # Example
421    /// ```
422    /// # use google_cloud_gax::retry_policy::*;
423    /// # use google_cloud_gax::retry_state::RetryState;
424    /// # use google_cloud_gax::error;
425    /// use std::time::{Duration, Instant};
426    /// let d = Duration::from_secs(10);
427    /// let policy = AlwaysRetry.with_time_limit(d);
428    /// assert!(policy.remaining_time(&RetryState::new(false)) <= Some(d));
429    /// assert!(policy.on_error(&RetryState::new(false), permanent_error()).is_continue());
430    ///
431    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
432    /// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
433    /// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
434    /// ```
435    pub fn custom(inner: P, maximum_duration: Duration) -> Self {
436        Self {
437            inner,
438            maximum_duration,
439        }
440    }
441
442    fn error_if_exhausted(&self, state: &RetryState, error: Error) -> ThrottleResult {
443        let deadline = state.start + self.maximum_duration;
444        let now = tokio::time::Instant::now().into_std();
445        if now < deadline {
446            ThrottleResult::Continue(error)
447        } else {
448            ThrottleResult::Exhausted(Error::exhausted(LimitedElapsedTimeError::new(
449                self.maximum_duration,
450                error,
451            )))
452        }
453    }
454}
455
456impl<P> RetryPolicy for LimitedElapsedTime<P>
457where
458    P: RetryPolicy + 'static,
459{
460    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
461        match self.inner.on_error(state, error) {
462            RetryResult::Permanent(e) => RetryResult::Permanent(e),
463            RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
464            RetryResult::Continue(e) => {
465                if tokio::time::Instant::now().into_std() >= state.start + self.maximum_duration {
466                    RetryResult::Exhausted(e)
467                } else {
468                    RetryResult::Continue(e)
469                }
470            }
471        }
472    }
473
474    fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
475        match self.inner.on_throttle(state, error) {
476            ThrottleResult::Continue(e) => self.error_if_exhausted(state, e),
477            ThrottleResult::Exhausted(e) => ThrottleResult::Exhausted(e),
478        }
479    }
480
481    fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
482        let deadline = state.start + self.maximum_duration;
483        let remaining = deadline.saturating_duration_since(tokio::time::Instant::now().into_std());
484        if let Some(inner) = self.inner.remaining_time(state) {
485            return Some(std::cmp::min(remaining, inner));
486        }
487        Some(remaining)
488    }
489}
490
491/// A retry policy decorator that limits the number of attempts.
492///
493/// This policy decorates an inner policy and limits the total number of
494/// attempts. Note that `on_error()` is not called before the initial
495/// (non-retry) attempt. Therefore, setting the maximum number of attempts to 0
496/// or 1 results in no retry attempts.
497///
498/// The policy passes through the results from the inner policy as long as
499/// `attempt_count < maximum_attempts`. However, once the maximum number of
500/// attempts is reached, the policy replaces any [Continue][RetryResult::Continue]
501/// result with [Exhausted][RetryResult::Exhausted].
502///
503/// # Parameters
504/// * `P` - the inner retry policy.
505#[derive(Debug)]
506pub struct LimitedAttemptCount<P = Aip194Strict>
507where
508    P: RetryPolicy,
509{
510    inner: P,
511    maximum_attempts: u32,
512}
513
514impl LimitedAttemptCount {
515    /// Creates a new instance, with the default inner policy.
516    ///
517    /// # Example
518    /// ```
519    /// # use google_cloud_gax::retry_policy::*;
520    /// let policy = LimitedAttemptCount::new(5);
521    /// ```
522    pub fn new(maximum_attempts: u32) -> Self {
523        Self {
524            inner: Aip194Strict,
525            maximum_attempts,
526        }
527    }
528}
529
530impl<P> LimitedAttemptCount<P>
531where
532    P: RetryPolicy,
533{
534    /// Creates a new instance with a custom inner policy.
535    ///
536    /// # Example
537    /// ```
538    /// # use google_cloud_gax::retry_policy::*;
539    /// # use google_cloud_gax::retry_state::RetryState;
540    /// let policy = LimitedAttemptCount::custom(AlwaysRetry, 2);
541    /// assert!(policy.on_error(&RetryState::new(false).set_attempt_count(1_u32), permanent_error()).is_continue());
542    /// assert!(policy.on_error(&RetryState::new(false).set_attempt_count(2_u32), permanent_error()).is_exhausted());
543    ///
544    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
545    /// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
546    /// ```
547    pub fn custom(inner: P, maximum_attempts: u32) -> Self {
548        Self {
549            inner,
550            maximum_attempts,
551        }
552    }
553}
554
555impl<P> RetryPolicy for LimitedAttemptCount<P>
556where
557    P: RetryPolicy,
558{
559    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
560        match self.inner.on_error(state, error) {
561            RetryResult::Permanent(e) => RetryResult::Permanent(e),
562            RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
563            RetryResult::Continue(e) => {
564                if state.attempt_count >= self.maximum_attempts {
565                    RetryResult::Exhausted(e)
566                } else {
567                    RetryResult::Continue(e)
568                }
569            }
570        }
571    }
572
573    fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
574        // The retry loop only calls `on_throttle()` if the policy has not
575        // been exhausted.
576        assert!(state.attempt_count < self.maximum_attempts);
577        self.inner.on_throttle(state, error)
578    }
579
580    fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
581        self.inner.remaining_time(state)
582    }
583}
584
585#[cfg(test)]
586pub mod tests {
587    use super::*;
588    use http::HeaderMap;
589    use std::error::Error as StdError;
590    use std::time::Instant;
591
592    // Verify `RetryPolicyArg` can be converted from the desired types.
593    #[test]
594    fn retry_policy_arg() {
595        let policy = LimitedAttemptCount::new(3);
596        let _ = RetryPolicyArg::from(policy);
597
598        let policy: Arc<dyn RetryPolicy> = Arc::new(LimitedAttemptCount::new(3));
599        let _ = RetryPolicyArg::from(policy);
600    }
601
602    #[test]
603    fn aip194_strict() {
604        let p = Aip194Strict;
605
606        let now = Instant::now();
607        assert!(
608            p.on_error(&idempotent_state(now), unavailable())
609                .is_continue()
610        );
611        assert!(
612            p.on_error(&non_idempotent_state(now), unavailable())
613                .is_permanent()
614        );
615        assert!(matches!(
616            p.on_throttle(&idempotent_state(now), unavailable()),
617            ThrottleResult::Continue(_)
618        ));
619
620        assert!(
621            p.on_error(&idempotent_state(now), permission_denied())
622                .is_permanent()
623        );
624        assert!(
625            p.on_error(&non_idempotent_state(now), permission_denied())
626                .is_permanent()
627        );
628
629        assert!(
630            p.on_error(&idempotent_state(now), http_unavailable())
631                .is_continue()
632        );
633        assert!(
634            p.on_error(&non_idempotent_state(now), http_unavailable())
635                .is_permanent()
636        );
637        assert!(matches!(
638            p.on_throttle(&idempotent_state(now), http_unavailable()),
639            ThrottleResult::Continue(_)
640        ));
641
642        assert!(
643            p.on_error(&idempotent_state(now), http_permission_denied())
644                .is_permanent()
645        );
646        assert!(
647            p.on_error(&non_idempotent_state(now), http_permission_denied())
648                .is_permanent()
649        );
650
651        assert!(
652            p.on_error(&idempotent_state(now), Error::io("err".to_string()))
653                .is_continue()
654        );
655        assert!(
656            p.on_error(&non_idempotent_state(now), Error::io("err".to_string()))
657                .is_permanent()
658        );
659
660        assert!(
661            p.on_error(&idempotent_state(now), pre_rpc_transient())
662                .is_continue()
663        );
664        assert!(
665            p.on_error(&non_idempotent_state(now), pre_rpc_transient())
666                .is_continue()
667        );
668
669        assert!(
670            p.on_error(&idempotent_state(now), Error::ser("err"))
671                .is_permanent()
672        );
673        assert!(
674            p.on_error(&non_idempotent_state(now), Error::ser("err"))
675                .is_permanent()
676        );
677        assert!(
678            p.on_error(&idempotent_state(now), Error::deser("err"))
679                .is_permanent()
680        );
681        assert!(
682            p.on_error(&non_idempotent_state(now), Error::deser("err"))
683                .is_permanent()
684        );
685
686        assert!(
687            p.remaining_time(&idempotent_state(now)).is_none(),
688            "p={p:?}, now={now:?}"
689        );
690    }
691
692    #[test]
693    fn always_retry() {
694        let p = AlwaysRetry;
695
696        let now = Instant::now();
697        assert!(
698            p.remaining_time(&idempotent_state(now)).is_none(),
699            "p={p:?}, now={now:?}"
700        );
701        assert!(
702            p.on_error(&idempotent_state(now), http_unavailable())
703                .is_continue()
704        );
705        assert!(
706            p.on_error(&non_idempotent_state(now), http_unavailable())
707                .is_continue()
708        );
709        assert!(matches!(
710            p.on_throttle(&idempotent_state(now), http_unavailable()),
711            ThrottleResult::Continue(_)
712        ));
713
714        assert!(
715            p.on_error(&idempotent_state(now), unavailable())
716                .is_continue()
717        );
718        assert!(
719            p.on_error(&non_idempotent_state(now), unavailable())
720                .is_continue()
721        );
722    }
723
724    #[test_case::test_case(true, Error::io("err"))]
725    #[test_case::test_case(true, pre_rpc_transient())]
726    #[test_case::test_case(true, Error::ser("err"))]
727    #[test_case::test_case(false, Error::io("err"))]
728    #[test_case::test_case(false, pre_rpc_transient())]
729    #[test_case::test_case(false, Error::ser("err"))]
730    fn always_retry_error_kind(idempotent: bool, error: Error) {
731        let p = AlwaysRetry;
732        let now = Instant::now();
733        let state = if idempotent {
734            idempotent_state(now)
735        } else {
736            non_idempotent_state(now)
737        };
738        assert!(p.on_error(&state, error).is_continue());
739    }
740
741    #[test]
742    fn never_retry() {
743        let p = NeverRetry;
744
745        let now = Instant::now();
746        assert!(
747            p.remaining_time(&idempotent_state(now)).is_none(),
748            "p={p:?}, now={now:?}"
749        );
750        assert!(
751            p.on_error(&idempotent_state(now), http_unavailable())
752                .is_exhausted()
753        );
754        assert!(
755            p.on_error(&non_idempotent_state(now), http_unavailable())
756                .is_exhausted()
757        );
758        assert!(matches!(
759            p.on_throttle(&idempotent_state(now), http_unavailable()),
760            ThrottleResult::Continue(_)
761        ));
762
763        assert!(
764            p.on_error(&idempotent_state(now), unavailable())
765                .is_exhausted()
766        );
767        assert!(
768            p.on_error(&non_idempotent_state(now), unavailable())
769                .is_exhausted()
770        );
771
772        assert!(
773            p.on_error(&idempotent_state(now), http_permission_denied())
774                .is_exhausted()
775        );
776        assert!(
777            p.on_error(&non_idempotent_state(now), http_permission_denied())
778                .is_exhausted()
779        );
780    }
781
782    #[test_case::test_case(true, Error::io("err"))]
783    #[test_case::test_case(true, pre_rpc_transient())]
784    #[test_case::test_case(true, Error::ser("err"))]
785    #[test_case::test_case(false, Error::io("err"))]
786    #[test_case::test_case(false, pre_rpc_transient())]
787    #[test_case::test_case(false, Error::ser("err"))]
788    fn never_retry_error_kind(idempotent: bool, error: Error) {
789        let p = NeverRetry;
790        let now = Instant::now();
791        let state = if idempotent {
792            idempotent_state(now)
793        } else {
794            non_idempotent_state(now)
795        };
796        assert!(p.on_error(&state, error).is_exhausted());
797    }
798
799    fn pre_rpc_transient() -> Error {
800        use crate::error::CredentialsError;
801        Error::authentication(CredentialsError::from_msg(true, "err"))
802    }
803
804    fn http_unavailable() -> Error {
805        Error::http(
806            503_u16,
807            HeaderMap::new(),
808            bytes::Bytes::from_owner("SERVICE UNAVAILABLE".to_string()),
809        )
810    }
811
812    fn http_permission_denied() -> Error {
813        Error::http(
814            403_u16,
815            HeaderMap::new(),
816            bytes::Bytes::from_owner("PERMISSION DENIED".to_string()),
817        )
818    }
819
820    fn unavailable() -> Error {
821        use crate::error::rpc::Code;
822        let status = crate::error::rpc::Status::default()
823            .set_code(Code::Unavailable)
824            .set_message("UNAVAILABLE");
825        Error::service(status)
826    }
827
828    fn permission_denied() -> Error {
829        use crate::error::rpc::Code;
830        let status = crate::error::rpc::Status::default()
831            .set_code(Code::PermissionDenied)
832            .set_message("PERMISSION_DENIED");
833        Error::service(status)
834    }
835
836    mockall::mock! {
837        #[derive(Debug)]
838        pub(crate) Policy {}
839        impl RetryPolicy for Policy {
840            fn on_error(&self, state: &RetryState, error: Error) -> RetryResult;
841            fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult;
842            fn remaining_time(&self, state: &RetryState) -> Option<Duration>;
843        }
844    }
845
846    #[test]
847    fn limited_elapsed_time_error() {
848        let limit = Duration::from_secs(123) + Duration::from_millis(567);
849        let err = LimitedElapsedTimeError::new(limit, unavailable());
850        assert_eq!(err.maximum_duration(), limit);
851        let fmt = err.to_string();
852        assert!(fmt.contains("123.567s"), "display={fmt}, debug={err:?}");
853        assert!(err.source().is_some(), "{err:?}");
854    }
855
856    #[test]
857    fn test_limited_time_forwards() {
858        let mut mock = MockPolicy::new();
859        mock.expect_on_error()
860            .times(1..)
861            .returning(|_, e| RetryResult::Continue(e));
862        mock.expect_on_throttle()
863            .times(1..)
864            .returning(|_, e| ThrottleResult::Continue(e));
865        mock.expect_remaining_time().times(1).returning(|_| None);
866
867        let now = Instant::now();
868        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
869        let rf = policy.on_error(&idempotent_state(now), transient_error());
870        assert!(rf.is_continue());
871
872        let rt = policy.remaining_time(&idempotent_state(now));
873        assert!(rt.is_some(), "policy={policy:?}, now={now:?}");
874
875        let e = policy.on_throttle(&idempotent_state(now), transient_error());
876        assert!(matches!(e, ThrottleResult::Continue(_)));
877    }
878
879    #[test]
880    fn test_limited_time_on_throttle_continue() {
881        let mut mock = MockPolicy::new();
882        mock.expect_on_throttle()
883            .times(1..)
884            .returning(|_, e| ThrottleResult::Continue(e));
885
886        let now = Instant::now();
887        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
888
889        // Before the policy expires the inner result is returned verbatim.
890        let rf = policy.on_throttle(
891            &idempotent_state(now - Duration::from_secs(50)),
892            unavailable(),
893        );
894        assert!(matches!(rf, ThrottleResult::Continue(_)), "{rf:?}");
895
896        // After the policy expires the innter result is always "exhausted".
897        let rf = policy.on_throttle(
898            &idempotent_state(now - Duration::from_secs(70)),
899            unavailable(),
900        );
901        assert!(matches!(rf, ThrottleResult::Exhausted(_)), "{rf:?}");
902    }
903
904    #[test]
905    fn test_limited_time_on_throttle_exhausted() {
906        let mut mock = MockPolicy::new();
907        mock.expect_on_throttle()
908            .times(1..)
909            .returning(|_, e| ThrottleResult::Exhausted(e));
910
911        let now = Instant::now();
912        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
913
914        // Before the policy expires the inner result is returned verbatim.
915        let rf = policy.on_throttle(
916            &idempotent_state(now - Duration::from_secs(50)),
917            unavailable(),
918        );
919        assert!(matches!(rf, ThrottleResult::Exhausted(_)), "{rf:?}");
920    }
921
922    #[test]
923    fn test_limited_time_inner_continues() {
924        let mut mock = MockPolicy::new();
925        mock.expect_on_error()
926            .times(1..)
927            .returning(|_, e| RetryResult::Continue(e));
928
929        let now = Instant::now();
930        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
931        let rf = policy.on_error(
932            &idempotent_state(now - Duration::from_secs(10)),
933            transient_error(),
934        );
935        assert!(rf.is_continue());
936
937        let rf = policy.on_error(
938            &idempotent_state(now - Duration::from_secs(70)),
939            transient_error(),
940        );
941        assert!(rf.is_exhausted());
942    }
943
944    #[test]
945    fn test_limited_time_inner_permanent() {
946        let mut mock = MockPolicy::new();
947        mock.expect_on_error()
948            .times(2)
949            .returning(|_, e| RetryResult::Permanent(e));
950
951        let now = Instant::now();
952        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
953
954        let rf = policy.on_error(
955            &non_idempotent_state(now - Duration::from_secs(10)),
956            transient_error(),
957        );
958        assert!(rf.is_permanent());
959
960        let rf = policy.on_error(
961            &non_idempotent_state(now + Duration::from_secs(10)),
962            transient_error(),
963        );
964        assert!(rf.is_permanent());
965    }
966
967    #[test]
968    fn test_limited_time_inner_exhausted() {
969        let mut mock = MockPolicy::new();
970        mock.expect_on_error()
971            .times(2)
972            .returning(|_, e| RetryResult::Exhausted(e));
973
974        let now = Instant::now();
975        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
976
977        let rf = policy.on_error(
978            &non_idempotent_state(now - Duration::from_secs(10)),
979            transient_error(),
980        );
981        assert!(rf.is_exhausted());
982
983        let rf = policy.on_error(
984            &non_idempotent_state(now + Duration::from_secs(10)),
985            transient_error(),
986        );
987        assert!(rf.is_exhausted());
988    }
989
990    #[test]
991    fn test_limited_time_remaining_inner_longer() {
992        let mut mock = MockPolicy::new();
993        mock.expect_remaining_time()
994            .times(1)
995            .returning(|_| Some(Duration::from_secs(30)));
996
997        let now = Instant::now();
998        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
999
1000        let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(55)));
1001        assert!(remaining <= Some(Duration::from_secs(5)), "{remaining:?}");
1002    }
1003
1004    #[test]
1005    fn test_limited_time_remaining_inner_shorter() {
1006        let mut mock = MockPolicy::new();
1007        mock.expect_remaining_time()
1008            .times(1)
1009            .returning(|_| Some(Duration::from_secs(5)));
1010        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
1011
1012        let now = Instant::now();
1013        let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(5)));
1014        assert!(remaining <= Some(Duration::from_secs(10)), "{remaining:?}");
1015    }
1016
1017    #[test]
1018    fn test_limited_time_remaining_inner_is_none() {
1019        let mut mock = MockPolicy::new();
1020        mock.expect_remaining_time().times(1).returning(|_| None);
1021        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
1022
1023        let now = Instant::now();
1024        let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(50)));
1025        assert!(remaining <= Some(Duration::from_secs(10)), "{remaining:?}");
1026    }
1027
1028    #[test]
1029    fn test_limited_attempt_count_on_error() {
1030        let mut mock = MockPolicy::new();
1031        mock.expect_on_error()
1032            .times(1..)
1033            .returning(|_, e| RetryResult::Continue(e));
1034
1035        let now = Instant::now();
1036        let policy = LimitedAttemptCount::custom(mock, 3);
1037        assert!(
1038            policy
1039                .on_error(
1040                    &idempotent_state(now).set_attempt_count(1_u32),
1041                    transient_error()
1042                )
1043                .is_continue()
1044        );
1045        assert!(
1046            policy
1047                .on_error(
1048                    &idempotent_state(now).set_attempt_count(2_u32),
1049                    transient_error()
1050                )
1051                .is_continue()
1052        );
1053        assert!(
1054            policy
1055                .on_error(
1056                    &idempotent_state(now).set_attempt_count(3_u32),
1057                    transient_error()
1058                )
1059                .is_exhausted()
1060        );
1061    }
1062
1063    #[test]
1064    fn test_limited_attempt_count_on_throttle_continue() {
1065        let mut mock = MockPolicy::new();
1066        mock.expect_on_throttle()
1067            .times(1..)
1068            .returning(|_, e| ThrottleResult::Continue(e));
1069
1070        let now = Instant::now();
1071        let policy = LimitedAttemptCount::custom(mock, 3);
1072        assert!(matches!(
1073            policy.on_throttle(
1074                &idempotent_state(now).set_attempt_count(2_u32),
1075                unavailable()
1076            ),
1077            ThrottleResult::Continue(_)
1078        ));
1079    }
1080
1081    #[test]
1082    fn test_limited_attempt_count_on_throttle_error() {
1083        let mut mock = MockPolicy::new();
1084        mock.expect_on_throttle()
1085            .times(1..)
1086            .returning(|_, e| ThrottleResult::Exhausted(e));
1087
1088        let now = Instant::now();
1089        let policy = LimitedAttemptCount::custom(mock, 3);
1090        assert!(matches!(
1091            policy.on_throttle(&idempotent_state(now), unavailable()),
1092            ThrottleResult::Exhausted(_)
1093        ));
1094    }
1095
1096    #[test]
1097    fn test_limited_attempt_count_remaining_none() {
1098        let mut mock = MockPolicy::new();
1099        mock.expect_remaining_time().times(1).returning(|_| None);
1100        let policy = LimitedAttemptCount::custom(mock, 3);
1101
1102        let now = Instant::now();
1103        assert!(
1104            policy.remaining_time(&idempotent_state(now)).is_none(),
1105            "policy={policy:?} now={now:?}"
1106        );
1107    }
1108
1109    #[test]
1110    fn test_limited_attempt_count_remaining_some() {
1111        let mut mock = MockPolicy::new();
1112        mock.expect_remaining_time()
1113            .times(1)
1114            .returning(|_| Some(Duration::from_secs(123)));
1115        let policy = LimitedAttemptCount::custom(mock, 3);
1116
1117        let now = Instant::now();
1118        assert_eq!(
1119            policy.remaining_time(&idempotent_state(now)),
1120            Some(Duration::from_secs(123))
1121        );
1122    }
1123
1124    #[test]
1125    fn test_limited_attempt_count_inner_permanent() {
1126        let mut mock = MockPolicy::new();
1127        mock.expect_on_error()
1128            .times(2)
1129            .returning(|_, e| RetryResult::Permanent(e));
1130        let policy = LimitedAttemptCount::custom(mock, 2);
1131        let now = Instant::now();
1132
1133        let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1134        assert!(rf.is_permanent());
1135
1136        let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1137        assert!(rf.is_permanent());
1138    }
1139
1140    #[test]
1141    fn test_limited_attempt_count_inner_exhausted() {
1142        let mut mock = MockPolicy::new();
1143        mock.expect_on_error()
1144            .times(2)
1145            .returning(|_, e| RetryResult::Exhausted(e));
1146        let policy = LimitedAttemptCount::custom(mock, 2);
1147        let now = Instant::now();
1148
1149        let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1150        assert!(rf.is_exhausted());
1151
1152        let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1153        assert!(rf.is_exhausted());
1154    }
1155
1156    fn transient_error() -> Error {
1157        use crate::error::rpc::{Code, Status};
1158        Error::service(
1159            Status::default()
1160                .set_code(Code::Unavailable)
1161                .set_message("try-again"),
1162        )
1163    }
1164
1165    pub(crate) fn idempotent_state(now: Instant) -> RetryState {
1166        RetryState::new(true).set_start(now)
1167    }
1168
1169    pub(crate) fn non_idempotent_state(now: Instant) -> RetryState {
1170        RetryState::new(false).set_start(now)
1171    }
1172}