Skip to main content

google_cloud_gax/
retry_policy.rs

1// Copyright 2024 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines traits for retry policies and some common implementations.
16//!
17//! The client libraries automatically retry RPCs when (1) they fail due to
18//! transient errors **and** the RPC is [idempotent], (2) or failed before an
19//! RPC was started. That is, when it is safe to attempt the RPC more than once.
20//!
21//! Applications may override the default behavior, increasing the retry
22//! attempts, or changing what errors are considered safe to retry.
23//!
24//! This module defines the traits for retry policies and some common
25//! implementations.
26//!
27//! To configure the default throttler for a client, use
28//! [ClientBuilder::with_retry_policy]. To configure the retry policy used for
29//! a specific request, use [RequestOptionsBuilder::with_retry_policy].
30//!
31//! [ClientBuilder::with_retry_policy]: crate::client_builder::ClientBuilder::with_retry_policy
32//! [RequestOptionsBuilder::with_retry_policy]: crate::options::RequestOptionsBuilder::with_retry_policy
33//!
34//! # Examples
35//!
36//! Create a policy that only retries transient errors, and retries for at
37//! most 10 seconds or at most 5 attempts: whichever limit is reached first
38//! stops the retry loop.
39//! ```
40//! # use google_cloud_gax::retry_policy::*;
41//! use std::time::Duration;
42//! let policy = Aip194Strict.with_time_limit(Duration::from_secs(10)).with_attempt_limit(5);
43//! ```
44//!
45//! Create a policy that retries on any error (even when unsafe to do so),
46//! and stops retrying after 5 attempts or 10 seconds, whichever limit is
47//! reached first stops the retry loop.
48//! ```
49//! # use google_cloud_gax::retry_policy::*;
50//! use std::time::Duration;
51//! let policy = AlwaysRetry.with_time_limit(Duration::from_secs(10)).with_attempt_limit(5);
52//! ```
53//!
54//! [idempotent]: https://en.wikipedia.org/wiki/Idempotence
55
56use crate::error::Error;
57use crate::retry_result::RetryResult;
58use crate::retry_state::RetryState;
59use crate::throttle_result::ThrottleResult;
60use std::sync::Arc;
61use std::time::Duration;
62
63/// Determines how errors are handled in the retry loop.
64///
65/// Implementations of this trait determine if errors are retryable, and for how
66/// long the retry loop may continue.
67pub trait RetryPolicy: Send + Sync + std::fmt::Debug {
68    /// Query the retry policy after an error.
69    ///
70    /// # Parameters
71    /// * `state` - the state of the retry loop.
72    /// * `error` - the last error when attempting the request.
73    #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
74    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult;
75
76    /// Query the retry policy after a retry attempt is throttled.
77    ///
78    /// Retry attempts may be throttled before they are even sent out. The retry
79    /// policy may choose to treat these as normal errors, consuming attempts,
80    /// or may prefer to ignore them and always return [RetryResult::Continue].
81    ///
82    /// # Parameters
83    /// * `_state` - the state of the retry loop.
84    /// * `error` - the previous error that caused the retry attempt. Throttling
85    ///   only applies to retry attempts, and a retry attempt implies that a
86    ///   previous attempt failed. The retry policy should preserve this error.
87    #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
88    fn on_throttle(&self, _state: &RetryState, error: Error) -> ThrottleResult {
89        ThrottleResult::Continue(error)
90    }
91
92    /// The remaining time in the retry policy.
93    ///
94    /// For policies based on time, this returns the remaining time in the
95    /// policy. The retry loop can use this value to adjust the next RPC
96    /// timeout. For policies that are not time based this returns `None`.
97    ///
98    /// # Parameters
99    /// * `_state` - the state of the retry loop.
100    /// * `attempt_count` - the number of attempts. This method is called before
101    ///   the first attempt, so the first value is zero.
102    #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
103    fn remaining_time(&self, _state: &RetryState) -> Option<Duration> {
104        None
105    }
106}
107
108/// A helper type to use [RetryPolicy] in client and request options.
109#[derive(Clone, Debug)]
110pub struct RetryPolicyArg(Arc<dyn RetryPolicy>);
111
112impl<T> std::convert::From<T> for RetryPolicyArg
113where
114    T: RetryPolicy + 'static,
115{
116    fn from(value: T) -> Self {
117        Self(Arc::new(value))
118    }
119}
120
121impl std::convert::From<Arc<dyn RetryPolicy>> for RetryPolicyArg {
122    fn from(value: Arc<dyn RetryPolicy>) -> Self {
123        Self(value)
124    }
125}
126
127impl From<RetryPolicyArg> for Arc<dyn RetryPolicy> {
128    fn from(value: RetryPolicyArg) -> Arc<dyn RetryPolicy> {
129        value.0
130    }
131}
132
133/// Extension trait for [`RetryPolicy`]
134pub trait RetryPolicyExt: RetryPolicy + Sized {
135    /// Decorate a [RetryPolicy] to limit the total elapsed time in the retry loop.
136    ///
137    /// While the time spent in the retry loop (including time in backoff) is
138    /// less than the prescribed duration the `on_error()` method returns the
139    /// results of the inner policy. After that time it returns
140    /// [Exhausted][RetryResult::Exhausted] if the inner policy returns
141    /// [Continue][RetryResult::Continue].
142    ///
143    /// The `remaining_time()` function returns the remaining time. This is
144    /// always [Duration::ZERO] once or after the policy's expiration time is
145    /// reached.
146    ///
147    /// # Example
148    /// ```
149    /// # use google_cloud_gax::retry_policy::*;
150    /// # use google_cloud_gax::retry_state::RetryState;
151    /// let d = std::time::Duration::from_secs(10);
152    /// let policy = Aip194Strict.with_time_limit(d);
153    /// assert!(policy.remaining_time(&RetryState::new(true)) <= Some(d));
154    /// ```
155    fn with_time_limit(self, maximum_duration: Duration) -> LimitedElapsedTime<Self> {
156        LimitedElapsedTime::custom(self, maximum_duration)
157    }
158
159    /// Decorate a [RetryPolicy] to limit the number of retry attempts.
160    ///
161    /// This policy decorates an inner policy and limits the total number of
162    /// attempts. Note that `on_error()` is not called before the initial
163    /// (non-retry) attempt. Therefore, setting the maximum number of attempts
164    /// to 0 or 1 results in no retry attempts.
165    ///
166    /// The policy passes through the results from the inner policy as long as
167    /// `attempt_count < maximum_attempts`. Once the maximum number of attempts
168    /// is reached, the policy returns [Exhausted][RetryResult::Exhausted] if the
169    /// inner policy returns [Continue][RetryResult::Continue].
170    ///
171    /// # Example
172    /// ```
173    /// # use google_cloud_gax::retry_policy::*;
174    /// # use google_cloud_gax::retry_state::RetryState;
175    /// let policy = Aip194Strict.with_attempt_limit(3);
176    /// assert_eq!(policy.remaining_time(&RetryState::new(true)), None);
177    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(0_u32), transient_error()).is_continue());
178    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(1_u32), transient_error()).is_continue());
179    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(2_u32), transient_error()).is_continue());
180    /// assert!(policy.on_error(&RetryState::new(true).set_attempt_count(3_u32), transient_error()).is_exhausted());
181    ///
182    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
183    /// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
184    /// ```
185    fn with_attempt_limit(self, maximum_attempts: u32) -> LimitedAttemptCount<Self> {
186        LimitedAttemptCount::custom(self, maximum_attempts)
187    }
188}
189
190impl<T: RetryPolicy> RetryPolicyExt for T {}
191
192/// A retry policy that strictly follows [AIP-194].
193///
194/// This policy must be decorated to limit the number of retry attempts or the
195/// duration of the retry loop.
196///
197/// The policy interprets AIP-194 **strictly**, the retry decision for
198/// server-side errors are based only on the status code, and the only retryable
199/// status code is "UNAVAILABLE".
200///
201/// # Example
202/// ```
203/// # use google_cloud_gax::retry_policy::*;
204/// # use google_cloud_gax::retry_state::RetryState;
205/// let policy = Aip194Strict;
206/// assert!(policy.on_error(&RetryState::new(true), transient_error()).is_continue());
207/// assert!(policy.on_error(&RetryState::new(true), permanent_error()).is_permanent());
208///
209/// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
210/// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
211/// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
212/// ```
213///
214/// [AIP-194]: https://google.aip.dev/194
215#[derive(Clone, Debug)]
216pub struct Aip194Strict;
217
218impl RetryPolicy for Aip194Strict {
219    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
220        if error.is_transient_and_before_rpc() {
221            return RetryResult::Continue(error);
222        }
223        if !state.idempotent {
224            return RetryResult::Permanent(error);
225        }
226        if error.is_io() {
227            return RetryResult::Continue(error);
228        }
229        if let Some(status) = error.status() {
230            return if status.code == crate::error::rpc::Code::Unavailable {
231                RetryResult::Continue(error)
232            } else {
233                RetryResult::Permanent(error)
234            };
235        }
236
237        match error.http_status_code() {
238            Some(code) if code == http::StatusCode::SERVICE_UNAVAILABLE.as_u16() => {
239                RetryResult::Continue(error)
240            }
241            _ => RetryResult::Permanent(error),
242        }
243    }
244}
245
246/// A retry policy that retries all errors.
247///
248/// This policy must be decorated to limit the number of retry attempts or the
249/// duration of the retry loop.
250///
251/// The policy retries all errors. This may be useful if the service guarantees
252/// idempotency, maybe through the use of request ids.
253///
254/// # Example
255/// ```
256/// # use google_cloud_gax::retry_policy::*;
257/// # use google_cloud_gax::retry_state::RetryState;
258/// let policy = AlwaysRetry;
259/// assert!(policy.on_error(&RetryState::new(true), transient_error()).is_continue());
260/// assert!(policy.on_error(&RetryState::new(true), permanent_error()).is_continue());
261///
262/// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
263/// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
264/// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
265/// ```
266#[derive(Clone, Debug)]
267pub struct AlwaysRetry;
268
269impl RetryPolicy for AlwaysRetry {
270    fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
271        RetryResult::Continue(error)
272    }
273}
274
275/// A retry policy that never retries.
276///
277/// This policy is useful when the client already has (or may already have) a
278/// retry policy configured, and you want to avoid retrying a particular method.
279///
280/// # Example
281/// ```
282/// # use google_cloud_gax::retry_policy::*;
283/// # use google_cloud_gax::retry_state::RetryState;
284/// let policy = NeverRetry;
285/// assert!(policy.on_error(&RetryState::new(true), transient_error()).is_exhausted());
286/// assert!(policy.on_error(&RetryState::new(true), permanent_error()).is_exhausted());
287///
288/// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
289/// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
290/// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
291/// ```
292#[derive(Clone, Debug)]
293pub struct NeverRetry;
294
295impl RetryPolicy for NeverRetry {
296    fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
297        RetryResult::Exhausted(error)
298    }
299}
300
301#[derive(thiserror::Error, Debug)]
302pub struct LimitedElapsedTimeError {
303    maximum_duration: Duration,
304    #[source]
305    source: Error,
306}
307
308impl LimitedElapsedTimeError {
309    pub(crate) fn new(maximum_duration: Duration, source: Error) -> Self {
310        Self {
311            maximum_duration,
312            source,
313        }
314    }
315
316    /// Returns the maximum number of attempts in the exhausted policy.
317    pub fn maximum_duration(&self) -> Duration {
318        self.maximum_duration
319    }
320}
321
322impl std::fmt::Display for LimitedElapsedTimeError {
323    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
324        write!(
325            f,
326            "retry policy is exhausted after {}s, the last retry attempt was throttled",
327            self.maximum_duration.as_secs_f64()
328        )
329    }
330}
331
332/// A retry policy decorator that limits the total time in the retry loop.
333///
334/// This policy decorates an inner policy and limits the duration of retry
335/// loops. While the time spent in the retry loop (including time in backoff)
336/// is less than the prescribed duration the `on_error()` method returns the
337/// results of the inner policy. After that time it returns
338/// [Exhausted][RetryResult::Exhausted] if the inner policy returns
339/// [Continue][RetryResult::Continue].
340///
341/// The `remaining_time()` function returns the remaining time. This is always
342/// [Duration::ZERO] once or after the policy's deadline is reached.
343///
344/// # Parameters
345/// * `P` - the inner retry policy, defaults to [Aip194Strict].
346#[derive(Debug)]
347pub struct LimitedElapsedTime<P = Aip194Strict>
348where
349    P: RetryPolicy,
350{
351    inner: P,
352    maximum_duration: Duration,
353}
354
355impl LimitedElapsedTime {
356    /// Creates a new instance, with the default inner policy.
357    ///
358    /// # Example
359    /// ```
360    /// # use google_cloud_gax::retry_policy::*;
361    /// # use google_cloud_gax::retry_state::RetryState;
362    /// let d = std::time::Duration::from_secs(10);
363    /// let policy = LimitedElapsedTime::new(d);
364    /// assert!(policy.remaining_time(&RetryState::new(true)) <= Some(d));
365    /// ```
366    pub fn new(maximum_duration: Duration) -> Self {
367        Self {
368            inner: Aip194Strict,
369            maximum_duration,
370        }
371    }
372}
373
374impl<P> LimitedElapsedTime<P>
375where
376    P: RetryPolicy,
377{
378    /// Creates a new instance with a custom inner policy.
379    ///
380    /// # Example
381    /// ```
382    /// # use google_cloud_gax::retry_policy::*;
383    /// # use google_cloud_gax::retry_state::RetryState;
384    /// # use google_cloud_gax::error;
385    /// use std::time::{Duration, Instant};
386    /// let d = Duration::from_secs(10);
387    /// let policy = AlwaysRetry.with_time_limit(d);
388    /// assert!(policy.remaining_time(&RetryState::new(false)) <= Some(d));
389    /// assert!(policy.on_error(&RetryState::new(false), permanent_error()).is_continue());
390    ///
391    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
392    /// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
393    /// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
394    /// ```
395    pub fn custom(inner: P, maximum_duration: Duration) -> Self {
396        Self {
397            inner,
398            maximum_duration,
399        }
400    }
401
402    fn error_if_exhausted(&self, state: &RetryState, error: Error) -> ThrottleResult {
403        let deadline = state.start + self.maximum_duration;
404        let now = tokio::time::Instant::now().into_std();
405        if now < deadline {
406            ThrottleResult::Continue(error)
407        } else {
408            ThrottleResult::Exhausted(Error::exhausted(LimitedElapsedTimeError::new(
409                self.maximum_duration,
410                error,
411            )))
412        }
413    }
414}
415
416impl<P> RetryPolicy for LimitedElapsedTime<P>
417where
418    P: RetryPolicy + 'static,
419{
420    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
421        match self.inner.on_error(state, error) {
422            RetryResult::Permanent(e) => RetryResult::Permanent(e),
423            RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
424            RetryResult::Continue(e) => {
425                if tokio::time::Instant::now().into_std() >= state.start + self.maximum_duration {
426                    RetryResult::Exhausted(e)
427                } else {
428                    RetryResult::Continue(e)
429                }
430            }
431        }
432    }
433
434    fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
435        match self.inner.on_throttle(state, error) {
436            ThrottleResult::Continue(e) => self.error_if_exhausted(state, e),
437            ThrottleResult::Exhausted(e) => ThrottleResult::Exhausted(e),
438        }
439    }
440
441    fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
442        let deadline = state.start + self.maximum_duration;
443        let remaining = deadline.saturating_duration_since(tokio::time::Instant::now().into_std());
444        if let Some(inner) = self.inner.remaining_time(state) {
445            return Some(std::cmp::min(remaining, inner));
446        }
447        Some(remaining)
448    }
449}
450
451/// A retry policy decorator that limits the number of attempts.
452///
453/// This policy decorates an inner policy and limits the total number of
454/// attempts. Note that `on_error()` is not called before the initial
455/// (non-retry) attempt. Therefore, setting the maximum number of attempts to 0
456/// or 1 results in no retry attempts.
457///
458/// The policy passes through the results from the inner policy as long as
459/// `attempt_count < maximum_attempts`. However, once the maximum number of
460/// attempts is reached, the policy replaces any [Continue][RetryResult::Continue]
461/// result with [Exhausted][RetryResult::Exhausted].
462///
463/// # Parameters
464/// * `P` - the inner retry policy.
465#[derive(Debug)]
466pub struct LimitedAttemptCount<P = Aip194Strict>
467where
468    P: RetryPolicy,
469{
470    inner: P,
471    maximum_attempts: u32,
472}
473
474impl LimitedAttemptCount {
475    /// Creates a new instance, with the default inner policy.
476    ///
477    /// # Example
478    /// ```
479    /// # use google_cloud_gax::retry_policy::*;
480    /// let policy = LimitedAttemptCount::new(5);
481    /// ```
482    pub fn new(maximum_attempts: u32) -> Self {
483        Self {
484            inner: Aip194Strict,
485            maximum_attempts,
486        }
487    }
488}
489
490impl<P> LimitedAttemptCount<P>
491where
492    P: RetryPolicy,
493{
494    /// Creates a new instance with a custom inner policy.
495    ///
496    /// # Example
497    /// ```
498    /// # use google_cloud_gax::retry_policy::*;
499    /// # use google_cloud_gax::retry_state::RetryState;
500    /// let policy = LimitedAttemptCount::custom(AlwaysRetry, 2);
501    /// assert!(policy.on_error(&RetryState::new(false).set_attempt_count(1_u32), permanent_error()).is_continue());
502    /// assert!(policy.on_error(&RetryState::new(false).set_attempt_count(2_u32), permanent_error()).is_exhausted());
503    ///
504    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
505    /// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::PermissionDenied)) }
506    /// ```
507    pub fn custom(inner: P, maximum_attempts: u32) -> Self {
508        Self {
509            inner,
510            maximum_attempts,
511        }
512    }
513}
514
515impl<P> RetryPolicy for LimitedAttemptCount<P>
516where
517    P: RetryPolicy,
518{
519    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
520        match self.inner.on_error(state, error) {
521            RetryResult::Permanent(e) => RetryResult::Permanent(e),
522            RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
523            RetryResult::Continue(e) => {
524                if state.attempt_count >= self.maximum_attempts {
525                    RetryResult::Exhausted(e)
526                } else {
527                    RetryResult::Continue(e)
528                }
529            }
530        }
531    }
532
533    fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
534        // The retry loop only calls `on_throttle()` if the policy has not
535        // been exhausted.
536        assert!(state.attempt_count < self.maximum_attempts);
537        self.inner.on_throttle(state, error)
538    }
539
540    fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
541        self.inner.remaining_time(state)
542    }
543}
544
545#[cfg(test)]
546mod tests {
547    use super::*;
548    use http::HeaderMap;
549    use std::error::Error as StdError;
550    use std::time::Instant;
551
552    // Verify `RetryPolicyArg` can be converted from the desired types.
553    #[test]
554    fn retry_policy_arg() {
555        let policy = LimitedAttemptCount::new(3);
556        let _ = RetryPolicyArg::from(policy);
557
558        let policy: Arc<dyn RetryPolicy> = Arc::new(LimitedAttemptCount::new(3));
559        let _ = RetryPolicyArg::from(policy);
560    }
561
562    #[test]
563    fn aip194_strict() {
564        let p = Aip194Strict;
565
566        let now = Instant::now();
567        assert!(
568            p.on_error(&idempotent_state(now), unavailable())
569                .is_continue()
570        );
571        assert!(
572            p.on_error(&non_idempotent_state(now), unavailable())
573                .is_permanent()
574        );
575        assert!(matches!(
576            p.on_throttle(&idempotent_state(now), unavailable()),
577            ThrottleResult::Continue(_)
578        ));
579
580        assert!(
581            p.on_error(&idempotent_state(now), permission_denied())
582                .is_permanent()
583        );
584        assert!(
585            p.on_error(&non_idempotent_state(now), permission_denied())
586                .is_permanent()
587        );
588
589        assert!(
590            p.on_error(&idempotent_state(now), http_unavailable())
591                .is_continue()
592        );
593        assert!(
594            p.on_error(&non_idempotent_state(now), http_unavailable())
595                .is_permanent()
596        );
597        assert!(matches!(
598            p.on_throttle(&idempotent_state(now), http_unavailable()),
599            ThrottleResult::Continue(_)
600        ));
601
602        assert!(
603            p.on_error(&idempotent_state(now), http_permission_denied())
604                .is_permanent()
605        );
606        assert!(
607            p.on_error(&non_idempotent_state(now), http_permission_denied())
608                .is_permanent()
609        );
610
611        assert!(
612            p.on_error(&idempotent_state(now), Error::io("err".to_string()))
613                .is_continue()
614        );
615        assert!(
616            p.on_error(&non_idempotent_state(now), Error::io("err".to_string()))
617                .is_permanent()
618        );
619
620        assert!(
621            p.on_error(&idempotent_state(now), pre_rpc_transient())
622                .is_continue()
623        );
624        assert!(
625            p.on_error(&non_idempotent_state(now), pre_rpc_transient())
626                .is_continue()
627        );
628
629        assert!(
630            p.on_error(&idempotent_state(now), Error::ser("err"))
631                .is_permanent()
632        );
633        assert!(
634            p.on_error(&non_idempotent_state(now), Error::ser("err"))
635                .is_permanent()
636        );
637        assert!(
638            p.on_error(&idempotent_state(now), Error::deser("err"))
639                .is_permanent()
640        );
641        assert!(
642            p.on_error(&non_idempotent_state(now), Error::deser("err"))
643                .is_permanent()
644        );
645
646        assert!(
647            p.remaining_time(&idempotent_state(now)).is_none(),
648            "p={p:?}, now={now:?}"
649        );
650    }
651
652    #[test]
653    fn always_retry() {
654        let p = AlwaysRetry;
655
656        let now = Instant::now();
657        assert!(
658            p.remaining_time(&idempotent_state(now)).is_none(),
659            "p={p:?}, now={now:?}"
660        );
661        assert!(
662            p.on_error(&idempotent_state(now), http_unavailable())
663                .is_continue()
664        );
665        assert!(
666            p.on_error(&non_idempotent_state(now), http_unavailable())
667                .is_continue()
668        );
669        assert!(matches!(
670            p.on_throttle(&idempotent_state(now), http_unavailable()),
671            ThrottleResult::Continue(_)
672        ));
673
674        assert!(
675            p.on_error(&idempotent_state(now), unavailable())
676                .is_continue()
677        );
678        assert!(
679            p.on_error(&non_idempotent_state(now), unavailable())
680                .is_continue()
681        );
682    }
683
684    #[test_case::test_case(true, Error::io("err"))]
685    #[test_case::test_case(true, pre_rpc_transient())]
686    #[test_case::test_case(true, Error::ser("err"))]
687    #[test_case::test_case(false, Error::io("err"))]
688    #[test_case::test_case(false, pre_rpc_transient())]
689    #[test_case::test_case(false, Error::ser("err"))]
690    fn always_retry_error_kind(idempotent: bool, error: Error) {
691        let p = AlwaysRetry;
692        let now = Instant::now();
693        let state = if idempotent {
694            idempotent_state(now)
695        } else {
696            non_idempotent_state(now)
697        };
698        assert!(p.on_error(&state, error).is_continue());
699    }
700
701    #[test]
702    fn never_retry() {
703        let p = NeverRetry;
704
705        let now = Instant::now();
706        assert!(
707            p.remaining_time(&idempotent_state(now)).is_none(),
708            "p={p:?}, now={now:?}"
709        );
710        assert!(
711            p.on_error(&idempotent_state(now), http_unavailable())
712                .is_exhausted()
713        );
714        assert!(
715            p.on_error(&non_idempotent_state(now), http_unavailable())
716                .is_exhausted()
717        );
718        assert!(matches!(
719            p.on_throttle(&idempotent_state(now), http_unavailable()),
720            ThrottleResult::Continue(_)
721        ));
722
723        assert!(
724            p.on_error(&idempotent_state(now), unavailable())
725                .is_exhausted()
726        );
727        assert!(
728            p.on_error(&non_idempotent_state(now), unavailable())
729                .is_exhausted()
730        );
731
732        assert!(
733            p.on_error(&idempotent_state(now), http_permission_denied())
734                .is_exhausted()
735        );
736        assert!(
737            p.on_error(&non_idempotent_state(now), http_permission_denied())
738                .is_exhausted()
739        );
740    }
741
742    #[test_case::test_case(true, Error::io("err"))]
743    #[test_case::test_case(true, pre_rpc_transient())]
744    #[test_case::test_case(true, Error::ser("err"))]
745    #[test_case::test_case(false, Error::io("err"))]
746    #[test_case::test_case(false, pre_rpc_transient())]
747    #[test_case::test_case(false, Error::ser("err"))]
748    fn never_retry_error_kind(idempotent: bool, error: Error) {
749        let p = NeverRetry;
750        let now = Instant::now();
751        let state = if idempotent {
752            idempotent_state(now)
753        } else {
754            non_idempotent_state(now)
755        };
756        assert!(p.on_error(&state, error).is_exhausted());
757    }
758
759    fn pre_rpc_transient() -> Error {
760        use crate::error::CredentialsError;
761        Error::authentication(CredentialsError::from_msg(true, "err"))
762    }
763
764    fn http_unavailable() -> Error {
765        Error::http(
766            503_u16,
767            HeaderMap::new(),
768            bytes::Bytes::from_owner("SERVICE UNAVAILABLE".to_string()),
769        )
770    }
771
772    fn http_permission_denied() -> Error {
773        Error::http(
774            403_u16,
775            HeaderMap::new(),
776            bytes::Bytes::from_owner("PERMISSION DENIED".to_string()),
777        )
778    }
779
780    fn unavailable() -> Error {
781        use crate::error::rpc::Code;
782        let status = crate::error::rpc::Status::default()
783            .set_code(Code::Unavailable)
784            .set_message("UNAVAILABLE");
785        Error::service(status)
786    }
787
788    fn permission_denied() -> Error {
789        use crate::error::rpc::Code;
790        let status = crate::error::rpc::Status::default()
791            .set_code(Code::PermissionDenied)
792            .set_message("PERMISSION_DENIED");
793        Error::service(status)
794    }
795
796    mockall::mock! {
797        #[derive(Debug)]
798        Policy {}
799        impl RetryPolicy for Policy {
800            fn on_error(&self, state: &RetryState, error: Error) -> RetryResult;
801            fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult;
802            fn remaining_time(&self, state: &RetryState) -> Option<Duration>;
803        }
804    }
805
806    #[test]
807    fn limited_elapsed_time_error() {
808        let limit = Duration::from_secs(123) + Duration::from_millis(567);
809        let err = LimitedElapsedTimeError::new(limit, unavailable());
810        assert_eq!(err.maximum_duration(), limit);
811        let fmt = err.to_string();
812        assert!(fmt.contains("123.567s"), "display={fmt}, debug={err:?}");
813        assert!(err.source().is_some(), "{err:?}");
814    }
815
816    #[test]
817    fn test_limited_time_forwards() {
818        let mut mock = MockPolicy::new();
819        mock.expect_on_error()
820            .times(1..)
821            .returning(|_, e| RetryResult::Continue(e));
822        mock.expect_on_throttle()
823            .times(1..)
824            .returning(|_, e| ThrottleResult::Continue(e));
825        mock.expect_remaining_time().times(1).returning(|_| None);
826
827        let now = Instant::now();
828        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
829        let rf = policy.on_error(&idempotent_state(now), transient_error());
830        assert!(rf.is_continue());
831
832        let rt = policy.remaining_time(&idempotent_state(now));
833        assert!(rt.is_some(), "policy={policy:?}, now={now:?}");
834
835        let e = policy.on_throttle(&idempotent_state(now), transient_error());
836        assert!(matches!(e, ThrottleResult::Continue(_)));
837    }
838
839    #[test]
840    fn test_limited_time_on_throttle_continue() {
841        let mut mock = MockPolicy::new();
842        mock.expect_on_throttle()
843            .times(1..)
844            .returning(|_, e| ThrottleResult::Continue(e));
845
846        let now = Instant::now();
847        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
848
849        // Before the policy expires the inner result is returned verbatim.
850        let rf = policy.on_throttle(
851            &idempotent_state(now - Duration::from_secs(50)),
852            unavailable(),
853        );
854        assert!(matches!(rf, ThrottleResult::Continue(_)), "{rf:?}");
855
856        // After the policy expires the innter result is always "exhausted".
857        let rf = policy.on_throttle(
858            &idempotent_state(now - Duration::from_secs(70)),
859            unavailable(),
860        );
861        assert!(matches!(rf, ThrottleResult::Exhausted(_)), "{rf:?}");
862    }
863
864    #[test]
865    fn test_limited_time_on_throttle_exhausted() {
866        let mut mock = MockPolicy::new();
867        mock.expect_on_throttle()
868            .times(1..)
869            .returning(|_, e| ThrottleResult::Exhausted(e));
870
871        let now = Instant::now();
872        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
873
874        // Before the policy expires the inner result is returned verbatim.
875        let rf = policy.on_throttle(
876            &idempotent_state(now - Duration::from_secs(50)),
877            unavailable(),
878        );
879        assert!(matches!(rf, ThrottleResult::Exhausted(_)), "{rf:?}");
880    }
881
882    #[test]
883    fn test_limited_time_inner_continues() {
884        let mut mock = MockPolicy::new();
885        mock.expect_on_error()
886            .times(1..)
887            .returning(|_, e| RetryResult::Continue(e));
888
889        let now = Instant::now();
890        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
891        let rf = policy.on_error(
892            &idempotent_state(now - Duration::from_secs(10)),
893            transient_error(),
894        );
895        assert!(rf.is_continue());
896
897        let rf = policy.on_error(
898            &idempotent_state(now - Duration::from_secs(70)),
899            transient_error(),
900        );
901        assert!(rf.is_exhausted());
902    }
903
904    #[test]
905    fn test_limited_time_inner_permanent() {
906        let mut mock = MockPolicy::new();
907        mock.expect_on_error()
908            .times(2)
909            .returning(|_, e| RetryResult::Permanent(e));
910
911        let now = Instant::now();
912        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
913
914        let rf = policy.on_error(
915            &non_idempotent_state(now - Duration::from_secs(10)),
916            transient_error(),
917        );
918        assert!(rf.is_permanent());
919
920        let rf = policy.on_error(
921            &non_idempotent_state(now + Duration::from_secs(10)),
922            transient_error(),
923        );
924        assert!(rf.is_permanent());
925    }
926
927    #[test]
928    fn test_limited_time_inner_exhausted() {
929        let mut mock = MockPolicy::new();
930        mock.expect_on_error()
931            .times(2)
932            .returning(|_, e| RetryResult::Exhausted(e));
933
934        let now = Instant::now();
935        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
936
937        let rf = policy.on_error(
938            &non_idempotent_state(now - Duration::from_secs(10)),
939            transient_error(),
940        );
941        assert!(rf.is_exhausted());
942
943        let rf = policy.on_error(
944            &non_idempotent_state(now + Duration::from_secs(10)),
945            transient_error(),
946        );
947        assert!(rf.is_exhausted());
948    }
949
950    #[test]
951    fn test_limited_time_remaining_inner_longer() {
952        let mut mock = MockPolicy::new();
953        mock.expect_remaining_time()
954            .times(1)
955            .returning(|_| Some(Duration::from_secs(30)));
956
957        let now = Instant::now();
958        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
959
960        let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(55)));
961        assert!(remaining <= Some(Duration::from_secs(5)), "{remaining:?}");
962    }
963
964    #[test]
965    fn test_limited_time_remaining_inner_shorter() {
966        let mut mock = MockPolicy::new();
967        mock.expect_remaining_time()
968            .times(1)
969            .returning(|_| Some(Duration::from_secs(5)));
970        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
971
972        let now = Instant::now();
973        let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(5)));
974        assert!(remaining <= Some(Duration::from_secs(10)), "{remaining:?}");
975    }
976
977    #[test]
978    fn test_limited_time_remaining_inner_is_none() {
979        let mut mock = MockPolicy::new();
980        mock.expect_remaining_time().times(1).returning(|_| None);
981        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
982
983        let now = Instant::now();
984        let remaining = policy.remaining_time(&idempotent_state(now - Duration::from_secs(50)));
985        assert!(remaining <= Some(Duration::from_secs(10)), "{remaining:?}");
986    }
987
988    #[test]
989    fn test_limited_attempt_count_on_error() {
990        let mut mock = MockPolicy::new();
991        mock.expect_on_error()
992            .times(1..)
993            .returning(|_, e| RetryResult::Continue(e));
994
995        let now = Instant::now();
996        let policy = LimitedAttemptCount::custom(mock, 3);
997        assert!(
998            policy
999                .on_error(
1000                    &idempotent_state(now).set_attempt_count(1_u32),
1001                    transient_error()
1002                )
1003                .is_continue()
1004        );
1005        assert!(
1006            policy
1007                .on_error(
1008                    &idempotent_state(now).set_attempt_count(2_u32),
1009                    transient_error()
1010                )
1011                .is_continue()
1012        );
1013        assert!(
1014            policy
1015                .on_error(
1016                    &idempotent_state(now).set_attempt_count(3_u32),
1017                    transient_error()
1018                )
1019                .is_exhausted()
1020        );
1021    }
1022
1023    #[test]
1024    fn test_limited_attempt_count_on_throttle_continue() {
1025        let mut mock = MockPolicy::new();
1026        mock.expect_on_throttle()
1027            .times(1..)
1028            .returning(|_, e| ThrottleResult::Continue(e));
1029
1030        let now = Instant::now();
1031        let policy = LimitedAttemptCount::custom(mock, 3);
1032        assert!(matches!(
1033            policy.on_throttle(
1034                &idempotent_state(now).set_attempt_count(2_u32),
1035                unavailable()
1036            ),
1037            ThrottleResult::Continue(_)
1038        ));
1039    }
1040
1041    #[test]
1042    fn test_limited_attempt_count_on_throttle_error() {
1043        let mut mock = MockPolicy::new();
1044        mock.expect_on_throttle()
1045            .times(1..)
1046            .returning(|_, e| ThrottleResult::Exhausted(e));
1047
1048        let now = Instant::now();
1049        let policy = LimitedAttemptCount::custom(mock, 3);
1050        assert!(matches!(
1051            policy.on_throttle(&idempotent_state(now), unavailable()),
1052            ThrottleResult::Exhausted(_)
1053        ));
1054    }
1055
1056    #[test]
1057    fn test_limited_attempt_count_remaining_none() {
1058        let mut mock = MockPolicy::new();
1059        mock.expect_remaining_time().times(1).returning(|_| None);
1060        let policy = LimitedAttemptCount::custom(mock, 3);
1061
1062        let now = Instant::now();
1063        assert!(
1064            policy.remaining_time(&idempotent_state(now)).is_none(),
1065            "policy={policy:?} now={now:?}"
1066        );
1067    }
1068
1069    #[test]
1070    fn test_limited_attempt_count_remaining_some() {
1071        let mut mock = MockPolicy::new();
1072        mock.expect_remaining_time()
1073            .times(1)
1074            .returning(|_| Some(Duration::from_secs(123)));
1075        let policy = LimitedAttemptCount::custom(mock, 3);
1076
1077        let now = Instant::now();
1078        assert_eq!(
1079            policy.remaining_time(&idempotent_state(now)),
1080            Some(Duration::from_secs(123))
1081        );
1082    }
1083
1084    #[test]
1085    fn test_limited_attempt_count_inner_permanent() {
1086        let mut mock = MockPolicy::new();
1087        mock.expect_on_error()
1088            .times(2)
1089            .returning(|_, e| RetryResult::Permanent(e));
1090        let policy = LimitedAttemptCount::custom(mock, 2);
1091        let now = Instant::now();
1092
1093        let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1094        assert!(rf.is_permanent());
1095
1096        let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1097        assert!(rf.is_permanent());
1098    }
1099
1100    #[test]
1101    fn test_limited_attempt_count_inner_exhausted() {
1102        let mut mock = MockPolicy::new();
1103        mock.expect_on_error()
1104            .times(2)
1105            .returning(|_, e| RetryResult::Exhausted(e));
1106        let policy = LimitedAttemptCount::custom(mock, 2);
1107        let now = Instant::now();
1108
1109        let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1110        assert!(rf.is_exhausted());
1111
1112        let rf = policy.on_error(&non_idempotent_state(now), transient_error());
1113        assert!(rf.is_exhausted());
1114    }
1115
1116    fn transient_error() -> Error {
1117        use crate::error::rpc::{Code, Status};
1118        Error::service(
1119            Status::default()
1120                .set_code(Code::Unavailable)
1121                .set_message("try-again"),
1122        )
1123    }
1124
1125    fn idempotent_state(now: Instant) -> RetryState {
1126        RetryState::new(true).set_start(now)
1127    }
1128
1129    fn non_idempotent_state(now: Instant) -> RetryState {
1130        RetryState::new(false).set_start(now)
1131    }
1132}