google_cloud_gax/
polling_error_policy.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines the types for polling error policies.
16//!
17//! # Example
18//! ```
19//! # use google_cloud_gax::polling_error_policy::*;
20//! # use google_cloud_gax::options;
21//! use std::time::Duration;
22//! // Poll for at most 15 minutes or at most 50 attempts: whichever limit is
23//! // reached first stops the polling loop.
24//! let policy = Aip194Strict
25//!     .with_time_limit(Duration::from_secs(15 * 60))
26//!     .with_attempt_limit(50);
27//! ```
28//!
29//! The client libraries automatically poll long-running operations (LROs) and
30//! need to (1) distinguish between transient and permanent errors, and (2)
31//! provide a mechanism to limit the polling loop duration.
32//!
33//! We provide a trait that applications may implement to customize the behavior
34//! of the polling loop, and some common implementations that should meet most
35//! needs.
36//!
37//! To configure the default polling error policy for a client, use
38//! [ClientBuilder::with_polling_error_policy]. To configure the polling error
39//! policy used for a specific request, use
40//! [RequestOptionsBuilder::with_polling_error_policy].
41//!
42//! [ClientBuilder::with_polling_error_policy]: crate::client_builder::ClientBuilder::with_polling_error_policy
43//! [RequestOptionsBuilder::with_polling_error_policy]: crate::options::RequestOptionsBuilder::with_polling_error_policy
44
45use crate::error::Error;
46use crate::retry_result::RetryResult;
47use std::sync::Arc;
48
49/// Determines how errors are handled in the polling loop.
50///
51/// Implementations of this trait determine if polling errors may resolve in
52/// future attempts, and for how long the polling loop may continue.
53pub trait PollingErrorPolicy: Send + Sync + std::fmt::Debug {
54    /// Query the polling policy after an error.
55    ///
56    /// # Parameters
57    /// * `loop_start` - when the polling loop started.
58    /// * `attempt_count` - the number of attempts. This includes the initial
59    ///   attempt. This method called after LRO successfully starts, it is
60    ///   always non-zero.
61    /// * `error` - the last error when attempting the request.
62    #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
63    fn on_error(
64        &self,
65        loop_start: std::time::Instant,
66        attempt_count: u32,
67        error: Error,
68    ) -> RetryResult;
69
70    /// Called when the LRO is successfully polled, but the LRO is still in
71    /// progress.
72    #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
73    fn on_in_progress(
74        &self,
75        _loop_start: std::time::Instant,
76        _attempt_count: u32,
77        _operation_name: &str,
78    ) -> Result<(), Error> {
79        Ok(())
80    }
81}
82
83/// A helper type to use [PollingErrorPolicy] in client and request options.
84#[derive(Clone)]
85pub struct PollingErrorPolicyArg(pub(crate) Arc<dyn PollingErrorPolicy>);
86
87impl<T> std::convert::From<T> for PollingErrorPolicyArg
88where
89    T: PollingErrorPolicy + 'static,
90{
91    fn from(value: T) -> Self {
92        Self(Arc::new(value))
93    }
94}
95
96impl std::convert::From<Arc<dyn PollingErrorPolicy>> for PollingErrorPolicyArg {
97    fn from(value: Arc<dyn PollingErrorPolicy>) -> Self {
98        Self(value)
99    }
100}
101
102/// Extension trait for [PollingErrorPolicy]
103pub trait PollingErrorPolicyExt: PollingErrorPolicy + Sized {
104    /// Decorate a [PollingErrorPolicy] to limit the total elapsed time in the
105    /// polling loop.
106    ///
107    /// While the time spent in the polling loop (including time in backoff) is
108    /// less than the prescribed duration the `on_error()` method returns the
109    /// results of the inner policy. After that time it returns
110    /// [Exhausted][RetryResult::Exhausted] if the inner policy returns
111    /// [Continue][RetryResult::Continue].
112    ///
113    /// # Example
114    /// ```
115    /// # use google_cloud_gax::*;
116    /// use polling_error_policy::*;
117    /// use std::time::{Duration, Instant};
118    /// let policy = Aip194Strict.with_time_limit(Duration::from_secs(10)).with_attempt_limit(3);
119    /// let attempt_count = 4;
120    /// assert!(policy.on_error(Instant::now(), attempt_count, transient_error()).is_exhausted());
121    ///
122    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
123    /// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
124    /// ```
125    fn with_time_limit(self, maximum_duration: std::time::Duration) -> LimitedElapsedTime<Self> {
126        LimitedElapsedTime::custom(self, maximum_duration)
127    }
128
129    /// Decorate a [PollingErrorPolicy] to limit the number of poll attempts.
130    ///
131    /// This policy decorates an inner policy and limits the total number of
132    /// attempts. Note that `on_error()` is called only after a polling attempt.
133    /// Therefore, setting the maximum number of attempts to 0 or 1 results in
134    /// no polling after the LRO starts.
135    ///
136    /// The policy passes through the results from the inner policy as long as
137    /// `attempt_count < maximum_attempts`. Once the maximum number of attempts
138    /// is reached, the policy returns [Exhausted][RetryResult::Exhausted] if the
139    /// inner policy returns [Continue][RetryResult::Continue], and passes the
140    /// inner policy result otherwise.
141    ///
142    /// # Example
143    /// ```
144    /// # use google_cloud_gax::*;
145    /// use polling_error_policy::*;
146    /// use std::time::Instant;
147    /// let policy = Aip194Strict.with_attempt_limit(3);
148    /// assert!(policy.on_error(Instant::now(), 0, transient_error()).is_continue());
149    /// assert!(policy.on_error(Instant::now(), 1, transient_error()).is_continue());
150    /// assert!(policy.on_error(Instant::now(), 2, transient_error()).is_continue());
151    /// assert!(policy.on_error(Instant::now(), 3, transient_error()).is_exhausted());
152    ///
153    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
154    /// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
155    /// ```
156    fn with_attempt_limit(self, maximum_attempts: u32) -> LimitedAttemptCount<Self> {
157        LimitedAttemptCount::custom(self, maximum_attempts)
158    }
159}
160
161impl<T: PollingErrorPolicy> PollingErrorPolicyExt for T {}
162
163/// A polling policy that strictly follows [AIP-194].
164///
165/// This policy must be decorated to limit the number of polling attempts or the
166/// duration of the polling loop.
167///
168/// The policy interprets AIP-194 **strictly**. It examines the status code to
169/// determine if the polling loop may continue.
170///
171/// # Example
172/// ```
173/// # use google_cloud_gax::*;
174/// # use google_cloud_gax::polling_error_policy::*;
175/// use std::time::Instant;
176/// let policy = Aip194Strict.with_attempt_limit(3);
177/// let attempt_count = 4;
178/// assert!(policy.on_error(Instant::now(), attempt_count, transient_error()).is_exhausted());
179///
180/// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
181/// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
182/// ```
183///
184/// [AIP-194]: https://google.aip.dev/194
185#[derive(Clone, Debug)]
186pub struct Aip194Strict;
187
188impl PollingErrorPolicy for Aip194Strict {
189    fn on_error(
190        &self,
191        _loop_start: std::time::Instant,
192        _attempt_count: u32,
193        error: Error,
194    ) -> RetryResult {
195        if error.is_transient_and_before_rpc() {
196            return RetryResult::Continue(error);
197        }
198        if error.is_io() {
199            return RetryResult::Continue(error);
200        }
201        if let Some(status) = error.status() {
202            return if status.code == crate::error::rpc::Code::Unavailable {
203                RetryResult::Continue(error)
204            } else {
205                RetryResult::Permanent(error)
206            };
207        }
208
209        match error.http_status_code() {
210            Some(code) if code == http::StatusCode::SERVICE_UNAVAILABLE.as_u16() => {
211                RetryResult::Continue(error)
212            }
213            _ => RetryResult::Permanent(error),
214        }
215    }
216}
217
218/// A polling policy that continues on any error.
219///
220/// This policy must be decorated to limit the number of polling attempts or the
221/// duration of the polling loop.
222///
223/// The policy continues regardless of the error type or contents.
224///
225/// # Example
226/// ```
227/// # use google_cloud_gax::*;
228/// # use google_cloud_gax::polling_error_policy::*;
229/// use std::time::Instant;
230/// let policy = AlwaysContinue;
231/// assert!(policy.on_error(Instant::now(), 1, permanent_error()).is_continue());
232///
233/// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
234/// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::Aborted)) }
235/// ```
236///
237/// [AIP-194]: https://google.aip.dev/194
238#[derive(Clone, Debug)]
239pub struct AlwaysContinue;
240
241impl PollingErrorPolicy for AlwaysContinue {
242    fn on_error(
243        &self,
244        _loop_start: std::time::Instant,
245        _attempt_count: u32,
246        error: Error,
247    ) -> RetryResult {
248        RetryResult::Continue(error)
249    }
250}
251
252/// A polling policy decorator that limits the total time in the polling loop.
253///
254/// This policy decorates an inner policy and limits the duration of polling
255/// loops. While the time spent in the polling loop (including time in backoff)
256/// is less than the prescribed duration the `on_error()` method returns the
257/// results of the inner policy. After that time it returns
258/// [Exhausted][RetryResult::Exhausted] if the inner policy returns
259/// [Continue][RetryResult::Continue].
260///
261/// The `remaining_time()` function returns the remaining time. This is always
262/// [Duration::ZERO][std::time::Duration::ZERO] once or after the policy's
263/// deadline is reached.
264///
265/// # Parameters
266/// * `P` - the inner polling policy, defaults to [Aip194Strict].
267#[derive(Debug)]
268pub struct LimitedElapsedTime<P = Aip194Strict>
269where
270    P: PollingErrorPolicy,
271{
272    inner: P,
273    maximum_duration: std::time::Duration,
274}
275
276impl LimitedElapsedTime {
277    /// Creates a new instance, with the default inner policy.
278    ///
279    /// # Example
280    /// ```
281    /// # use google_cloud_gax::*;
282    /// # use google_cloud_gax::polling_error_policy::*;
283    /// use std::time::{Duration, Instant};
284    /// let policy = LimitedElapsedTime::new(Duration::from_secs(10));
285    /// let start = Instant::now() - Duration::from_secs(20);
286    /// assert!(policy.on_error(start, 1, transient_error()).is_exhausted());
287    ///
288    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
289    /// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
290    /// ```
291    pub fn new(maximum_duration: std::time::Duration) -> Self {
292        Self {
293            inner: Aip194Strict,
294            maximum_duration,
295        }
296    }
297}
298
299impl<P> LimitedElapsedTime<P>
300where
301    P: PollingErrorPolicy,
302{
303    /// Creates a new instance with a custom inner policy.
304    ///
305    /// # Example
306    /// ```
307    /// # use google_cloud_gax::*;
308    /// # use google_cloud_gax::polling_error_policy::*;
309    /// use std::time::{Duration, Instant};
310    /// let policy = LimitedElapsedTime::custom(AlwaysContinue, Duration::from_secs(10));
311    /// let start = Instant::now() - Duration::from_secs(20);
312    /// assert!(policy.on_error(start, 1, permanent_error()).is_exhausted());
313    ///
314    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
315    /// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::Aborted)) }
316    /// ```
317    pub fn custom(inner: P, maximum_duration: std::time::Duration) -> Self {
318        Self {
319            inner,
320            maximum_duration,
321        }
322    }
323
324    fn in_progress_impl(
325        &self,
326        start: std::time::Instant,
327        operation_name: &str,
328    ) -> Result<(), Error> {
329        let now = std::time::Instant::now();
330        if now < start + self.maximum_duration {
331            return Ok(());
332        }
333        Err(Error::exhausted(Exhausted::new(
334            operation_name,
335            "elapsed time",
336            format!("{:?}", now.checked_duration_since(start).unwrap()),
337            format!("{:?}", self.maximum_duration),
338        )))
339    }
340}
341
342impl<P> PollingErrorPolicy for LimitedElapsedTime<P>
343where
344    P: PollingErrorPolicy + 'static,
345{
346    fn on_error(&self, start: std::time::Instant, count: u32, error: Error) -> RetryResult {
347        match self.inner.on_error(start, count, error) {
348            RetryResult::Permanent(e) => RetryResult::Permanent(e),
349            RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
350            RetryResult::Continue(e) => {
351                if std::time::Instant::now() >= start + self.maximum_duration {
352                    RetryResult::Exhausted(e)
353                } else {
354                    RetryResult::Continue(e)
355                }
356            }
357        }
358    }
359
360    fn on_in_progress(
361        &self,
362        start: std::time::Instant,
363        count: u32,
364        operation_name: &str,
365    ) -> Result<(), Error> {
366        self.inner
367            .on_in_progress(start, count, operation_name)
368            .and_then(|_| self.in_progress_impl(start, operation_name))
369    }
370}
371
372/// A polling policy decorator that limits the number of attempts.
373///
374/// This policy decorates an inner policy and limits polling total number of
375/// attempts. Setting the maximum number of attempts to 0 results in no polling
376/// attempts before the initial one.
377///
378/// The policy passes through the results from the inner policy as long as
379/// `attempt_count < maximum_attempts`. However, once the maximum number of
380/// attempts is reached, the policy replaces any [Continue][RetryResult::Continue]
381/// result with [Exhausted][RetryResult::Exhausted].
382///
383/// # Parameters
384/// * `P` - the inner polling policy.
385#[derive(Debug)]
386pub struct LimitedAttemptCount<P = Aip194Strict>
387where
388    P: PollingErrorPolicy,
389{
390    inner: P,
391    maximum_attempts: u32,
392}
393
394impl LimitedAttemptCount {
395    /// Creates a new instance, with the default inner policy.
396    ///
397    /// # Example
398    /// ```
399    /// # use google_cloud_gax::*;
400    /// # use google_cloud_gax::polling_error_policy::*;
401    /// use std::time::Instant;
402    /// let policy = LimitedAttemptCount::new(5);
403    /// let attempt_count = 10;
404    /// assert!(policy.on_error(Instant::now(), attempt_count, transient_error()).is_exhausted());
405    ///
406    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
407    /// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
408    /// ```
409    pub fn new(maximum_attempts: u32) -> Self {
410        Self {
411            inner: Aip194Strict,
412            maximum_attempts,
413        }
414    }
415}
416
417impl<P> LimitedAttemptCount<P>
418where
419    P: PollingErrorPolicy,
420{
421    /// Creates a new instance with a custom inner policy.
422    ///
423    /// # Example
424    /// ```
425    /// # use google_cloud_gax::polling_error_policy::*;
426    /// # use google_cloud_gax::*;
427    /// use std::time::Instant;
428    /// let policy = LimitedAttemptCount::custom(AlwaysContinue, 2);
429    /// assert!(policy.on_error(Instant::now(), 1, permanent_error()).is_continue());
430    /// assert!(policy.on_error(Instant::now(), 2, permanent_error()).is_exhausted());
431    ///
432    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
433    /// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::Aborted)) }
434    /// ```
435    pub fn custom(inner: P, maximum_attempts: u32) -> Self {
436        Self {
437            inner,
438            maximum_attempts,
439        }
440    }
441
442    fn in_progress_impl(&self, count: u32, operation_name: &str) -> Result<(), Error> {
443        if count < self.maximum_attempts {
444            return Ok(());
445        }
446        Err(Error::exhausted(Exhausted::new(
447            operation_name,
448            "attempt count",
449            count.to_string(),
450            self.maximum_attempts.to_string(),
451        )))
452    }
453}
454
455impl<P> PollingErrorPolicy for LimitedAttemptCount<P>
456where
457    P: PollingErrorPolicy,
458{
459    fn on_error(&self, start: std::time::Instant, count: u32, error: Error) -> RetryResult {
460        match self.inner.on_error(start, count, error) {
461            RetryResult::Permanent(e) => RetryResult::Permanent(e),
462            RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
463            RetryResult::Continue(e) => {
464                if count >= self.maximum_attempts {
465                    RetryResult::Exhausted(e)
466                } else {
467                    RetryResult::Continue(e)
468                }
469            }
470        }
471    }
472
473    fn on_in_progress(
474        &self,
475        start: std::time::Instant,
476        count: u32,
477        operation_name: &str,
478    ) -> Result<(), Error> {
479        self.inner
480            .on_in_progress(start, count, operation_name)
481            .and_then(|_| self.in_progress_impl(count, operation_name))
482    }
483}
484
485/// Indicates that a retry or polling loop has been exhausted.
486#[derive(Debug)]
487pub struct Exhausted {
488    operation_name: String,
489    limit_name: &'static str,
490    value: String,
491    limit: String,
492}
493
494impl Exhausted {
495    pub fn new(
496        operation_name: &str,
497        limit_name: &'static str,
498        value: String,
499        limit: String,
500    ) -> Self {
501        Self {
502            operation_name: operation_name.to_string(),
503            limit_name,
504            value,
505            limit,
506        }
507    }
508}
509
510impl std::fmt::Display for Exhausted {
511    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
512        write!(
513            f,
514            "polling loop for {} exhausted, {} value ({}) exceeds limit ({})",
515            self.operation_name, self.limit_name, self.value, self.limit
516        )
517    }
518}
519
520impl std::error::Error for Exhausted {}
521
522#[cfg(test)]
523mod tests {
524    use super::*;
525    use crate::error::{CredentialsError, Error};
526    use http::HeaderMap;
527    use std::error::Error as _;
528    use std::time::{Duration, Instant};
529
530    mockall::mock! {
531        #[derive(Debug)]
532        Policy {}
533        impl PollingErrorPolicy for Policy {
534            fn on_error(&self, loop_start: std::time::Instant, attempt_count: u32, error: Error) -> RetryResult;
535            fn on_in_progress(&self, loop_start: std::time::Instant, attempt_count: u32, operation_name: &str) -> Result<(), Error>;
536        }
537    }
538
539    // Verify `PollingPolicyArg` can be converted from the desired types.
540    #[test]
541    fn polling_policy_arg() {
542        let policy = LimitedAttemptCount::new(3);
543        let _ = PollingErrorPolicyArg::from(policy);
544
545        let policy: Arc<dyn PollingErrorPolicy> = Arc::new(LimitedAttemptCount::new(3));
546        let _ = PollingErrorPolicyArg::from(policy);
547    }
548
549    #[test]
550    fn aip194_strict() {
551        let p = Aip194Strict;
552
553        let now = std::time::Instant::now();
554        assert!(p.on_in_progress(now, 0, "unused").is_ok());
555        assert!(p.on_error(now, 0, unavailable()).is_continue());
556        assert!(p.on_error(now, 0, permission_denied()).is_permanent());
557        assert!(p.on_error(now, 0, http_unavailable()).is_continue());
558        assert!(p.on_error(now, 0, http_permission_denied()).is_permanent());
559
560        assert!(
561            p.on_error(now, 0, Error::io("err".to_string()))
562                .is_continue()
563        );
564
565        assert!(
566            p.on_error(
567                now,
568                0,
569                Error::authentication(CredentialsError::from_msg(true, "err"))
570            )
571            .is_continue()
572        );
573
574        assert!(
575            p.on_error(now, 0, Error::ser("err".to_string()))
576                .is_permanent()
577        );
578    }
579
580    #[test]
581    fn always_continue() {
582        let p = AlwaysContinue;
583
584        let now = std::time::Instant::now();
585        assert!(p.on_in_progress(now, 0, "unused").is_ok());
586        assert!(p.on_error(now, 0, http_unavailable()).is_continue());
587        assert!(p.on_error(now, 0, unavailable()).is_continue());
588    }
589
590    #[test_case::test_case(Error::io("err"))]
591    #[test_case::test_case(Error::authentication(CredentialsError::from_msg(true, "err")))]
592    #[test_case::test_case(Error::ser("err"))]
593    fn always_continue_error_kind(error: Error) {
594        let p = AlwaysContinue;
595        let now = std::time::Instant::now();
596        assert!(p.on_error(now, 0, error).is_continue());
597    }
598
599    #[test]
600    fn with_time_limit() {
601        let policy = AlwaysContinue.with_time_limit(Duration::from_secs(10));
602        assert!(
603            policy
604                .on_error(
605                    Instant::now() - Duration::from_secs(1),
606                    1,
607                    permission_denied()
608                )
609                .is_continue(),
610            "{policy:?}"
611        );
612        assert!(
613            policy
614                .on_error(
615                    Instant::now() - Duration::from_secs(20),
616                    1,
617                    permission_denied()
618                )
619                .is_exhausted(),
620            "{policy:?}"
621        );
622    }
623
624    #[test]
625    fn with_attempt_limit() {
626        let policy = AlwaysContinue.with_attempt_limit(3);
627        assert!(
628            policy
629                .on_error(Instant::now(), 1, permission_denied())
630                .is_continue(),
631            "{policy:?}"
632        );
633        assert!(
634            policy
635                .on_error(Instant::now(), 5, permission_denied())
636                .is_exhausted(),
637            "{policy:?}"
638        );
639    }
640
641    fn http_error(code: u16, message: &str) -> Error {
642        let error = serde_json::json!({"error": {
643            "code": code,
644            "message": message,
645        }});
646        let payload = bytes::Bytes::from_owner(serde_json::to_string(&error).unwrap());
647        Error::http(code, HeaderMap::new(), payload)
648    }
649
650    fn http_unavailable() -> Error {
651        http_error(503, "SERVICE UNAVAILABLE")
652    }
653
654    fn http_permission_denied() -> Error {
655        http_error(403, "PERMISSION DENIED")
656    }
657
658    fn unavailable() -> Error {
659        use crate::error::rpc::Code;
660        let status = crate::error::rpc::Status::default()
661            .set_code(Code::Unavailable)
662            .set_message("UNAVAILABLE");
663        Error::service(status)
664    }
665
666    fn permission_denied() -> Error {
667        use crate::error::rpc::Code;
668        let status = crate::error::rpc::Status::default()
669            .set_code(Code::PermissionDenied)
670            .set_message("PERMISSION_DENIED");
671        Error::service(status)
672    }
673
674    #[test]
675    fn test_limited_elapsed_time_on_error() {
676        let policy = LimitedElapsedTime::new(Duration::from_secs(20));
677        assert!(
678            policy
679                .on_error(Instant::now() - Duration::from_secs(10), 1, unavailable())
680                .is_continue(),
681            "{policy:?}"
682        );
683        assert!(
684            policy
685                .on_error(Instant::now() - Duration::from_secs(30), 1, unavailable())
686                .is_exhausted(),
687            "{policy:?}"
688        );
689    }
690
691    #[test]
692    fn test_limited_elapsed_time_in_progress() {
693        let policy = LimitedElapsedTime::new(Duration::from_secs(20));
694        let result = policy.on_in_progress(Instant::now() - Duration::from_secs(10), 1, "unused");
695        assert!(result.is_ok(), "{result:?}");
696        let err = policy
697            .on_in_progress(
698                Instant::now() - Duration::from_secs(30),
699                1,
700                "test-operation-name",
701            )
702            .unwrap_err();
703        let exhausted = err.source().and_then(|e| e.downcast_ref::<Exhausted>());
704        assert!(exhausted.is_some());
705    }
706
707    #[test]
708    fn test_limited_time_forwards_on_error() {
709        let mut mock = MockPolicy::new();
710        mock.expect_on_error()
711            .times(1..)
712            .returning(|_, _, e| RetryResult::Continue(e));
713
714        let now = std::time::Instant::now();
715        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
716        let rf = policy.on_error(now, 0, transient_error());
717        assert!(rf.is_continue());
718    }
719
720    #[test]
721    fn test_limited_time_forwards_in_progress() {
722        let mut mock = MockPolicy::new();
723        mock.expect_on_in_progress()
724            .times(3)
725            .returning(|_, _, _| Ok(()));
726
727        let now = std::time::Instant::now();
728        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
729        assert!(policy.on_in_progress(now, 1, "test-op-name").is_ok());
730        assert!(policy.on_in_progress(now, 2, "test-op-name").is_ok());
731        assert!(policy.on_in_progress(now, 3, "test-op-name").is_ok());
732    }
733
734    #[test]
735    fn test_limited_time_in_progress_returns_inner() {
736        let mut mock = MockPolicy::new();
737        mock.expect_on_in_progress()
738            .times(1)
739            .returning(|_, _, _| Err(transient_error()));
740
741        let now = std::time::Instant::now();
742        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
743        assert!(policy.on_in_progress(now, 1, "test-op-name").is_err());
744    }
745
746    #[test]
747    fn test_limited_time_inner_continues() {
748        let mut mock = MockPolicy::new();
749        mock.expect_on_error()
750            .times(1..)
751            .returning(|_, _, e| RetryResult::Continue(e));
752
753        let now = std::time::Instant::now();
754        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
755        let rf = policy.on_error(now - Duration::from_secs(10), 1, transient_error());
756        assert!(rf.is_continue());
757
758        let rf = policy.on_error(now - Duration::from_secs(70), 1, transient_error());
759        assert!(rf.is_exhausted());
760    }
761
762    #[test]
763    fn test_limited_time_inner_permanent() {
764        let mut mock = MockPolicy::new();
765        mock.expect_on_error()
766            .times(2)
767            .returning(|_, _, e| RetryResult::Permanent(e));
768
769        let now = std::time::Instant::now();
770        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
771
772        let rf = policy.on_error(now - Duration::from_secs(10), 1, transient_error());
773        assert!(rf.is_permanent());
774
775        let rf = policy.on_error(now + Duration::from_secs(10), 1, transient_error());
776        assert!(rf.is_permanent());
777    }
778
779    #[test]
780    fn test_limited_time_inner_exhausted() {
781        let mut mock = MockPolicy::new();
782        mock.expect_on_error()
783            .times(2)
784            .returning(|_, _, e| RetryResult::Exhausted(e));
785
786        let now = std::time::Instant::now();
787        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
788
789        let rf = policy.on_error(now - Duration::from_secs(10), 1, transient_error());
790        assert!(rf.is_exhausted());
791
792        let rf = policy.on_error(now + Duration::from_secs(10), 1, transient_error());
793        assert!(rf.is_exhausted());
794    }
795
796    #[test]
797    fn test_limited_attempt_count_on_error() {
798        let policy = LimitedAttemptCount::new(20);
799        assert!(
800            policy
801                .on_error(Instant::now(), 10, unavailable())
802                .is_continue(),
803            "{policy:?}"
804        );
805        assert!(
806            policy
807                .on_error(Instant::now(), 30, unavailable())
808                .is_exhausted(),
809            "{policy:?}"
810        );
811    }
812
813    #[test]
814    fn test_limited_attempt_count_in_progress() {
815        let policy = LimitedAttemptCount::new(20);
816        let result = policy.on_in_progress(Instant::now(), 10, "unused");
817        assert!(result.is_ok(), "{result:?}");
818        let err = policy
819            .on_in_progress(Instant::now(), 30, "test-operation-name")
820            .unwrap_err();
821        let exhausted = err.source().and_then(|e| e.downcast_ref::<Exhausted>());
822        assert!(exhausted.is_some());
823    }
824
825    #[test]
826    fn test_limited_attempt_count_forwards_on_error() {
827        let mut mock = MockPolicy::new();
828        mock.expect_on_error()
829            .times(1..)
830            .returning(|_, _, e| RetryResult::Continue(e));
831
832        let now = std::time::Instant::now();
833        let policy = LimitedAttemptCount::custom(mock, 3);
834        assert!(policy.on_error(now, 1, transient_error()).is_continue());
835        assert!(policy.on_error(now, 2, transient_error()).is_continue());
836        assert!(policy.on_error(now, 3, transient_error()).is_exhausted());
837    }
838
839    #[test]
840    fn test_limited_attempt_count_forwards_in_progress() {
841        let mut mock = MockPolicy::new();
842        mock.expect_on_in_progress()
843            .times(3)
844            .returning(|_, _, _| Ok(()));
845
846        let now = std::time::Instant::now();
847        let policy = LimitedAttemptCount::custom(mock, 5);
848        assert!(policy.on_in_progress(now, 1, "test-op-name").is_ok());
849        assert!(policy.on_in_progress(now, 2, "test-op-name").is_ok());
850        assert!(policy.on_in_progress(now, 3, "test-op-name").is_ok());
851    }
852
853    #[test]
854    fn test_limited_attempt_count_in_progress_returns_inner() {
855        let mut mock = MockPolicy::new();
856        mock.expect_on_in_progress()
857            .times(1)
858            .returning(|_, _, _| Err(unavailable()));
859
860        let now = std::time::Instant::now();
861        let policy = LimitedAttemptCount::custom(mock, 5);
862        assert!(policy.on_in_progress(now, 1, "test-op-name").is_err());
863    }
864
865    #[test]
866    fn test_limited_attempt_count_inner_permanent() {
867        let mut mock = MockPolicy::new();
868        mock.expect_on_error()
869            .times(2)
870            .returning(|_, _, e| RetryResult::Permanent(e));
871        let policy = LimitedAttemptCount::custom(mock, 2);
872        let now = std::time::Instant::now();
873        let rf = policy.on_error(now, 1, Error::ser("err"));
874        assert!(rf.is_permanent());
875
876        let rf = policy.on_error(now, 1, Error::ser("err"));
877        assert!(rf.is_permanent());
878    }
879
880    #[test]
881    fn test_limited_attempt_count_inner_exhausted() {
882        let mut mock = MockPolicy::new();
883        mock.expect_on_error()
884            .times(2)
885            .returning(|_, _, e| RetryResult::Exhausted(e));
886        let policy = LimitedAttemptCount::custom(mock, 2);
887        let now = std::time::Instant::now();
888
889        let rf = policy.on_error(now, 1, transient_error());
890        assert!(rf.is_exhausted());
891
892        let rf = policy.on_error(now, 1, transient_error());
893        assert!(rf.is_exhausted());
894    }
895
896    #[test]
897    fn test_exhausted_fmt() {
898        let exhausted = Exhausted::new(
899            "op-name",
900            "limit-name",
901            "test-value".to_string(),
902            "test-limit".to_string(),
903        );
904        let fmt = format!("{exhausted}");
905        assert!(fmt.contains("op-name"), "{fmt}");
906        assert!(fmt.contains("limit-name"), "{fmt}");
907        assert!(fmt.contains("test-value"), "{fmt}");
908        assert!(fmt.contains("test-limit"), "{fmt}");
909    }
910
911    fn transient_error() -> Error {
912        use crate::error::rpc::{Code, Status};
913        Error::service(
914            Status::default()
915                .set_code(Code::Unavailable)
916                .set_message("try-again"),
917        )
918    }
919}