google_cloud_gax/
polling_error_policy.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines the types for polling error policies.
16//!
17//! # Example
18//! ```
19//! # use google_cloud_gax::polling_error_policy::*;
20//! use std::time::Duration;
21//! // Poll for at most 15 minutes or at most 50 attempts: whichever limit is
22//! // reached first stops the polling loop.
23//! let policy = Aip194Strict
24//!     .with_time_limit(Duration::from_secs(15 * 60))
25//!     .with_attempt_limit(50);
26//! ```
27//!
28//! The client libraries automatically poll long-running operations (LROs) and
29//! need to (1) distinguish between transient and permanent errors, and (2)
30//! provide a mechanism to limit the polling loop duration.
31//!
32//! We provide a trait that applications may implement to customize the behavior
33//! of the polling loop, and some common implementations that should meet most
34//! needs.
35//!
36//! To configure the default polling error policy for a client, use
37//! [ClientBuilder::with_polling_error_policy]. To configure the polling error
38//! policy used for a specific request, use
39//! [RequestOptionsBuilder::with_polling_error_policy].
40//!
41//! [ClientBuilder::with_polling_error_policy]: crate::client_builder::ClientBuilder::with_polling_error_policy
42//! [RequestOptionsBuilder::with_polling_error_policy]: crate::options::RequestOptionsBuilder::with_polling_error_policy
43
44use crate::error::Error;
45use crate::polling_state::PollingState;
46use crate::retry_result::RetryResult;
47use std::sync::Arc;
48
49/// Determines how errors are handled in the polling loop.
50///
51/// Implementations of this trait determine if polling errors may resolve in
52/// future attempts, and for how long the polling loop may continue.
53pub trait PollingErrorPolicy: Send + Sync + std::fmt::Debug {
54    /// Query the polling policy after an error.
55    ///
56    /// # Parameters
57    /// * `state` - the current state of the polling loop.
58    /// * `error` - the last error when attempting the request.
59    #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
60    fn on_error(&self, state: &PollingState, error: Error) -> RetryResult;
61
62    /// Called when the LRO is successfully polled, but the LRO is still in
63    /// progress.
64    ///
65    /// # Parameters
66    /// * `state` - the current state of the polling loop.
67    #[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
68    fn on_in_progress(&self, _state: &PollingState, _operation_name: &str) -> Result<(), Error> {
69        Ok(())
70    }
71}
72
73/// A helper type to use [PollingErrorPolicy] in client and request options.
74#[derive(Clone)]
75pub struct PollingErrorPolicyArg(pub(crate) Arc<dyn PollingErrorPolicy>);
76
77impl<T> std::convert::From<T> for PollingErrorPolicyArg
78where
79    T: PollingErrorPolicy + 'static,
80{
81    fn from(value: T) -> Self {
82        Self(Arc::new(value))
83    }
84}
85
86impl std::convert::From<Arc<dyn PollingErrorPolicy>> for PollingErrorPolicyArg {
87    fn from(value: Arc<dyn PollingErrorPolicy>) -> Self {
88        Self(value)
89    }
90}
91
92/// Extension trait for [PollingErrorPolicy]
93pub trait PollingErrorPolicyExt: PollingErrorPolicy + Sized {
94    /// Decorate a [PollingErrorPolicy] to limit the total elapsed time in the
95    /// polling loop.
96    ///
97    /// While the time spent in the polling loop (including time in backoff) is
98    /// less than the prescribed duration the `on_error()` method returns the
99    /// results of the inner policy. After that time it returns
100    /// [Exhausted][RetryResult::Exhausted] if the inner policy returns
101    /// [Continue][RetryResult::Continue].
102    ///
103    /// # Example
104    /// ```
105    /// # use google_cloud_gax::*;
106    /// # use google_cloud_gax::polling_state::PollingState;
107    /// use polling_error_policy::*;
108    /// use std::time::Duration;
109    /// let policy = Aip194Strict.with_time_limit(Duration::from_secs(10)).with_attempt_limit(3);
110    /// let state = PollingState::default().set_attempt_count(4_u32);
111    /// assert!(policy.on_error(&state, transient_error()).is_exhausted());
112    ///
113    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
114    /// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
115    /// ```
116    fn with_time_limit(self, maximum_duration: std::time::Duration) -> LimitedElapsedTime<Self> {
117        LimitedElapsedTime::custom(self, maximum_duration)
118    }
119
120    /// Decorate a [PollingErrorPolicy] to limit the number of poll attempts.
121    ///
122    /// This policy decorates an inner policy and limits the total number of
123    /// attempts. Note that `on_error()` is called only after a polling attempt.
124    /// Therefore, setting the maximum number of attempts to 0 or 1 results in
125    /// no polling after the LRO starts.
126    ///
127    /// The policy passes through the results from the inner policy as long as
128    /// `attempt_count < maximum_attempts`. Once the maximum number of attempts
129    /// is reached, the policy returns [Exhausted][RetryResult::Exhausted] if the
130    /// inner policy returns [Continue][RetryResult::Continue], and passes the
131    /// inner policy result otherwise.
132    ///
133    /// # Example
134    /// ```
135    /// # use google_cloud_gax::*;
136    /// # use google_cloud_gax::polling_state::PollingState;
137    /// use polling_error_policy::*;
138    /// let policy = Aip194Strict.with_attempt_limit(3);
139    /// assert!(policy.on_error(&PollingState::default().set_attempt_count(0_u32), transient_error()).is_continue());
140    /// assert!(policy.on_error(&PollingState::default().set_attempt_count(1_u32), transient_error()).is_continue());
141    /// assert!(policy.on_error(&PollingState::default().set_attempt_count(2_u32), transient_error()).is_continue());
142    /// assert!(policy.on_error(&PollingState::default().set_attempt_count(3_u32), transient_error()).is_exhausted());
143    ///
144    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
145    /// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
146    /// ```
147    fn with_attempt_limit(self, maximum_attempts: u32) -> LimitedAttemptCount<Self> {
148        LimitedAttemptCount::custom(self, maximum_attempts)
149    }
150}
151
152impl<T: PollingErrorPolicy> PollingErrorPolicyExt for T {}
153
154/// A polling policy that strictly follows [AIP-194].
155///
156/// This policy must be decorated to limit the number of polling attempts or the
157/// duration of the polling loop.
158///
159/// The policy interprets AIP-194 **strictly**. It examines the status code to
160/// determine if the polling loop may continue.
161///
162/// # Example
163/// ```
164/// # use google_cloud_gax::*;
165/// # use google_cloud_gax::polling_error_policy::*;
166/// # use google_cloud_gax::polling_state::PollingState;
167/// let policy = Aip194Strict.with_attempt_limit(3);
168/// let state = PollingState::default().set_attempt_count(4_u32);
169/// assert!(policy.on_error(&state, transient_error()).is_exhausted());
170///
171/// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
172/// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
173/// ```
174///
175/// [AIP-194]: https://google.aip.dev/194
176#[derive(Clone, Debug)]
177pub struct Aip194Strict;
178
179impl PollingErrorPolicy for Aip194Strict {
180    fn on_error(&self, _state: &PollingState, error: Error) -> RetryResult {
181        if error.is_transient_and_before_rpc() {
182            return RetryResult::Continue(error);
183        }
184        if error.is_io() {
185            return RetryResult::Continue(error);
186        }
187        if let Some(status) = error.status() {
188            return if status.code == crate::error::rpc::Code::Unavailable {
189                RetryResult::Continue(error)
190            } else {
191                RetryResult::Permanent(error)
192            };
193        }
194
195        match error.http_status_code() {
196            Some(code) if code == http::StatusCode::SERVICE_UNAVAILABLE.as_u16() => {
197                RetryResult::Continue(error)
198            }
199            _ => RetryResult::Permanent(error),
200        }
201    }
202}
203
204/// A polling policy that continues on any error.
205///
206/// This policy must be decorated to limit the number of polling attempts or the
207/// duration of the polling loop.
208///
209/// The policy continues regardless of the error type or contents.
210///
211/// # Example
212/// ```
213/// # use google_cloud_gax::*;
214/// # use google_cloud_gax::polling_error_policy::*;
215/// # use google_cloud_gax::polling_state::PollingState;
216/// let policy = AlwaysContinue;
217/// assert!(policy.on_error(&PollingState::default().set_attempt_count(1_u32), permanent_error()).is_continue());
218///
219/// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
220/// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::Aborted)) }
221/// ```
222///
223/// [AIP-194]: https://google.aip.dev/194
224#[derive(Clone, Debug)]
225pub struct AlwaysContinue;
226
227impl PollingErrorPolicy for AlwaysContinue {
228    fn on_error(&self, _state: &PollingState, error: Error) -> RetryResult {
229        RetryResult::Continue(error)
230    }
231}
232
233/// A polling policy decorator that limits the total time in the polling loop.
234///
235/// This policy decorates an inner policy and limits the duration of polling
236/// loops. While the time spent in the polling loop (including time in backoff)
237/// is less than the prescribed duration the `on_error()` method returns the
238/// results of the inner policy. After that time it returns
239/// [Exhausted][RetryResult::Exhausted] if the inner policy returns
240/// [Continue][RetryResult::Continue].
241///
242/// The `remaining_time()` function returns the remaining time. This is always
243/// [Duration::ZERO][std::time::Duration::ZERO] once or after the policy's
244/// deadline is reached.
245///
246/// # Parameters
247/// * `P` - the inner polling policy, defaults to [Aip194Strict].
248#[derive(Debug)]
249pub struct LimitedElapsedTime<P = Aip194Strict>
250where
251    P: PollingErrorPolicy,
252{
253    inner: P,
254    maximum_duration: std::time::Duration,
255}
256
257impl LimitedElapsedTime {
258    /// Creates a new instance, with the default inner policy.
259    ///
260    /// # Example
261    /// ```
262    /// # use google_cloud_gax::*;
263    /// # use google_cloud_gax::polling_error_policy::*;
264    /// # use google_cloud_gax::polling_state::PollingState;
265    /// use std::time::{Duration, Instant};
266    /// let policy = LimitedElapsedTime::new(Duration::from_secs(10));
267    /// let state = PollingState::default().set_start(Instant::now() - Duration::from_secs(20));
268    /// assert!(policy.on_error(&state, transient_error()).is_exhausted());
269    ///
270    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
271    /// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
272    /// ```
273    pub fn new(maximum_duration: std::time::Duration) -> Self {
274        Self {
275            inner: Aip194Strict,
276            maximum_duration,
277        }
278    }
279}
280
281impl<P> LimitedElapsedTime<P>
282where
283    P: PollingErrorPolicy,
284{
285    /// Creates a new instance with a custom inner policy.
286    ///
287    /// # Example
288    /// ```
289    /// # use google_cloud_gax::*;
290    /// # use google_cloud_gax::polling_error_policy::*;
291    /// # use google_cloud_gax::polling_state::PollingState;
292    /// use std::time::{Duration, Instant};
293    /// let policy = LimitedElapsedTime::custom(AlwaysContinue, Duration::from_secs(10));
294    /// let state = PollingState::default().set_start(Instant::now() - Duration::from_secs(20));
295    /// assert!(policy.on_error(&state, permanent_error()).is_exhausted());
296    ///
297    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
298    /// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::Aborted)) }
299    /// ```
300    pub fn custom(inner: P, maximum_duration: std::time::Duration) -> Self {
301        Self {
302            inner,
303            maximum_duration,
304        }
305    }
306
307    fn in_progress_impl(
308        &self,
309        start: std::time::Instant,
310        operation_name: &str,
311    ) -> Result<(), Error> {
312        let now = std::time::Instant::now();
313        if now < start + self.maximum_duration {
314            return Ok(());
315        }
316        Err(Error::exhausted(Exhausted::new(
317            operation_name,
318            "elapsed time",
319            format!("{:?}", now.checked_duration_since(start).unwrap()),
320            format!("{:?}", self.maximum_duration),
321        )))
322    }
323}
324
325impl<P> PollingErrorPolicy for LimitedElapsedTime<P>
326where
327    P: PollingErrorPolicy + 'static,
328{
329    fn on_error(&self, state: &PollingState, error: Error) -> RetryResult {
330        match self.inner.on_error(state, error) {
331            RetryResult::Permanent(e) => RetryResult::Permanent(e),
332            RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
333            RetryResult::Continue(e) => {
334                if std::time::Instant::now() >= state.start + self.maximum_duration {
335                    RetryResult::Exhausted(e)
336                } else {
337                    RetryResult::Continue(e)
338                }
339            }
340        }
341    }
342
343    fn on_in_progress(&self, state: &PollingState, operation_name: &str) -> Result<(), Error> {
344        self.inner
345            .on_in_progress(state, operation_name)
346            .and_then(|_| self.in_progress_impl(state.start, operation_name))
347    }
348}
349
350/// A polling policy decorator that limits the number of attempts.
351///
352/// This policy decorates an inner policy and limits polling total number of
353/// attempts. Setting the maximum number of attempts to 0 results in no polling
354/// attempts before the initial one.
355///
356/// The policy passes through the results from the inner policy as long as
357/// `attempt_count < maximum_attempts`. However, once the maximum number of
358/// attempts is reached, the policy replaces any [Continue][RetryResult::Continue]
359/// result with [Exhausted][RetryResult::Exhausted].
360///
361/// # Parameters
362/// * `P` - the inner polling policy.
363#[derive(Debug)]
364pub struct LimitedAttemptCount<P = Aip194Strict>
365where
366    P: PollingErrorPolicy,
367{
368    inner: P,
369    maximum_attempts: u32,
370}
371
372impl LimitedAttemptCount {
373    /// Creates a new instance, with the default inner policy.
374    ///
375    /// # Example
376    /// ```
377    /// # use google_cloud_gax::*;
378    /// # use google_cloud_gax::polling_error_policy::*;
379    /// # use google_cloud_gax::polling_state::PollingState;
380    /// use std::time::Instant;
381    /// let policy = LimitedAttemptCount::new(5);
382    /// let state = PollingState::default().set_attempt_count(10_u32);
383    /// assert!(policy.on_error(&state, transient_error()).is_exhausted());
384    ///
385    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
386    /// fn transient_error() -> Error { Error::service(Status::default().set_code(Code::Unavailable)) }
387    /// ```
388    pub fn new(maximum_attempts: u32) -> Self {
389        Self {
390            inner: Aip194Strict,
391            maximum_attempts,
392        }
393    }
394}
395
396impl<P> LimitedAttemptCount<P>
397where
398    P: PollingErrorPolicy,
399{
400    /// Creates a new instance with a custom inner policy.
401    ///
402    /// # Example
403    /// ```
404    /// # use google_cloud_gax::polling_error_policy::*;
405    /// # use google_cloud_gax::polling_state::PollingState;
406    /// # use google_cloud_gax::*;
407    /// let policy = LimitedAttemptCount::custom(AlwaysContinue, 2);
408    /// assert!(policy.on_error(&PollingState::default().set_attempt_count(1_u32), permanent_error()).is_continue());
409    /// assert!(policy.on_error(&PollingState::default().set_attempt_count(2_u32), permanent_error()).is_exhausted());
410    ///
411    /// use google_cloud_gax::error::{Error, rpc::Code, rpc::Status};
412    /// fn permanent_error() -> Error { Error::service(Status::default().set_code(Code::Aborted)) }
413    /// ```
414    pub fn custom(inner: P, maximum_attempts: u32) -> Self {
415        Self {
416            inner,
417            maximum_attempts,
418        }
419    }
420
421    fn in_progress_impl(&self, count: u32, operation_name: &str) -> Result<(), Error> {
422        if count < self.maximum_attempts {
423            return Ok(());
424        }
425        Err(Error::exhausted(Exhausted::new(
426            operation_name,
427            "attempt count",
428            count.to_string(),
429            self.maximum_attempts.to_string(),
430        )))
431    }
432}
433
434impl<P> PollingErrorPolicy for LimitedAttemptCount<P>
435where
436    P: PollingErrorPolicy,
437{
438    fn on_error(&self, state: &PollingState, error: Error) -> RetryResult {
439        match self.inner.on_error(state, error) {
440            RetryResult::Permanent(e) => RetryResult::Permanent(e),
441            RetryResult::Exhausted(e) => RetryResult::Exhausted(e),
442            RetryResult::Continue(e) => {
443                if state.attempt_count >= self.maximum_attempts {
444                    RetryResult::Exhausted(e)
445                } else {
446                    RetryResult::Continue(e)
447                }
448            }
449        }
450    }
451
452    fn on_in_progress(&self, state: &PollingState, operation_name: &str) -> Result<(), Error> {
453        self.inner
454            .on_in_progress(state, operation_name)
455            .and_then(|_| self.in_progress_impl(state.attempt_count, operation_name))
456    }
457}
458
459/// Indicates that a retry or polling loop has been exhausted.
460#[derive(Debug)]
461pub struct Exhausted {
462    operation_name: String,
463    limit_name: &'static str,
464    value: String,
465    limit: String,
466}
467
468impl Exhausted {
469    pub fn new(
470        operation_name: &str,
471        limit_name: &'static str,
472        value: String,
473        limit: String,
474    ) -> Self {
475        Self {
476            operation_name: operation_name.to_string(),
477            limit_name,
478            value,
479            limit,
480        }
481    }
482}
483
484impl std::fmt::Display for Exhausted {
485    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
486        write!(
487            f,
488            "polling loop for {} exhausted, {} value ({}) exceeds limit ({})",
489            self.operation_name, self.limit_name, self.value, self.limit
490        )
491    }
492}
493
494impl std::error::Error for Exhausted {}
495
496#[cfg(test)]
497mod tests {
498    use super::*;
499    use crate::error::{CredentialsError, Error};
500    use http::HeaderMap;
501    use std::error::Error as _;
502    use std::time::{Duration, Instant};
503
504    mockall::mock! {
505        #[derive(Debug)]
506        Policy {}
507        impl PollingErrorPolicy for Policy {
508            fn on_error(&self, state: &PollingState, error: Error) -> RetryResult;
509            fn on_in_progress(&self, state: &PollingState, operation_name: &str) -> Result<(), Error>;
510        }
511    }
512
513    // Verify `PollingPolicyArg` can be converted from the desired types.
514    #[test]
515    fn polling_policy_arg() {
516        let policy = LimitedAttemptCount::new(3);
517        let _ = PollingErrorPolicyArg::from(policy);
518
519        let policy: Arc<dyn PollingErrorPolicy> = Arc::new(LimitedAttemptCount::new(3));
520        let _ = PollingErrorPolicyArg::from(policy);
521    }
522
523    #[test]
524    fn aip194_strict() {
525        let p = Aip194Strict;
526
527        assert!(p.on_in_progress(&PollingState::default(), "unused").is_ok());
528        assert!(
529            p.on_error(&PollingState::default(), unavailable())
530                .is_continue()
531        );
532        assert!(
533            p.on_error(&PollingState::default(), permission_denied())
534                .is_permanent()
535        );
536        assert!(
537            p.on_error(&PollingState::default(), http_unavailable())
538                .is_continue()
539        );
540        assert!(
541            p.on_error(&PollingState::default(), http_permission_denied())
542                .is_permanent()
543        );
544
545        assert!(
546            p.on_error(&PollingState::default(), Error::io("err".to_string()))
547                .is_continue()
548        );
549
550        assert!(
551            p.on_error(
552                &PollingState::default(),
553                Error::authentication(CredentialsError::from_msg(true, "err"))
554            )
555            .is_continue()
556        );
557
558        assert!(
559            p.on_error(&PollingState::default(), Error::ser("err".to_string()))
560                .is_permanent()
561        );
562    }
563
564    #[test]
565    fn always_continue() {
566        let p = AlwaysContinue;
567
568        assert!(p.on_in_progress(&PollingState::default(), "unused").is_ok());
569        assert!(
570            p.on_error(&PollingState::default(), http_unavailable())
571                .is_continue()
572        );
573        assert!(
574            p.on_error(&PollingState::default(), unavailable())
575                .is_continue()
576        );
577    }
578
579    #[test_case::test_case(Error::io("err"))]
580    #[test_case::test_case(Error::authentication(CredentialsError::from_msg(true, "err")))]
581    #[test_case::test_case(Error::ser("err"))]
582    fn always_continue_error_kind(error: Error) {
583        let p = AlwaysContinue;
584        assert!(p.on_error(&PollingState::default(), error).is_continue());
585    }
586
587    #[test]
588    fn with_time_limit() {
589        let policy = AlwaysContinue.with_time_limit(Duration::from_secs(10));
590        assert!(
591            policy
592                .on_error(
593                    &PollingState::default()
594                        .set_start(Instant::now() - Duration::from_secs(1))
595                        .set_attempt_count(1_u32),
596                    permission_denied()
597                )
598                .is_continue(),
599            "{policy:?}"
600        );
601        assert!(
602            policy
603                .on_error(
604                    &PollingState::default()
605                        .set_start(Instant::now() - Duration::from_secs(20))
606                        .set_attempt_count(1_u32),
607                    permission_denied()
608                )
609                .is_exhausted(),
610            "{policy:?}"
611        );
612    }
613
614    #[test]
615    fn with_attempt_limit() {
616        let policy = AlwaysContinue.with_attempt_limit(3);
617        assert!(
618            policy
619                .on_error(
620                    &PollingState::default().set_attempt_count(1_u32),
621                    permission_denied()
622                )
623                .is_continue(),
624            "{policy:?}"
625        );
626        assert!(
627            policy
628                .on_error(
629                    &PollingState::default().set_attempt_count(5_u32),
630                    permission_denied()
631                )
632                .is_exhausted(),
633            "{policy:?}"
634        );
635    }
636
637    fn http_error(code: u16, message: &str) -> Error {
638        let error = serde_json::json!({"error": {
639            "code": code,
640            "message": message,
641        }});
642        let payload = bytes::Bytes::from_owner(serde_json::to_string(&error).unwrap());
643        Error::http(code, HeaderMap::new(), payload)
644    }
645
646    fn http_unavailable() -> Error {
647        http_error(503, "SERVICE UNAVAILABLE")
648    }
649
650    fn http_permission_denied() -> Error {
651        http_error(403, "PERMISSION DENIED")
652    }
653
654    fn unavailable() -> Error {
655        use crate::error::rpc::Code;
656        let status = crate::error::rpc::Status::default()
657            .set_code(Code::Unavailable)
658            .set_message("UNAVAILABLE");
659        Error::service(status)
660    }
661
662    fn permission_denied() -> Error {
663        use crate::error::rpc::Code;
664        let status = crate::error::rpc::Status::default()
665            .set_code(Code::PermissionDenied)
666            .set_message("PERMISSION_DENIED");
667        Error::service(status)
668    }
669
670    #[test]
671    fn test_limited_elapsed_time_on_error() {
672        let policy = LimitedElapsedTime::new(Duration::from_secs(20));
673        assert!(
674            policy
675                .on_error(
676                    &PollingState::default()
677                        .set_start(Instant::now() - Duration::from_secs(10))
678                        .set_attempt_count(1_u32),
679                    unavailable()
680                )
681                .is_continue(),
682            "{policy:?}"
683        );
684        assert!(
685            policy
686                .on_error(
687                    &PollingState::default()
688                        .set_start(Instant::now() - Duration::from_secs(30))
689                        .set_attempt_count(1_u32),
690                    unavailable()
691                )
692                .is_exhausted(),
693            "{policy:?}"
694        );
695    }
696
697    #[test]
698    fn test_limited_elapsed_time_in_progress() {
699        let policy = LimitedElapsedTime::new(Duration::from_secs(20));
700        let result = policy.on_in_progress(
701            &PollingState::default()
702                .set_start(Instant::now() - Duration::from_secs(10))
703                .set_attempt_count(1_u32),
704            "unused",
705        );
706        assert!(result.is_ok(), "{result:?}");
707        let err = policy
708            .on_in_progress(
709                &PollingState::default()
710                    .set_start(Instant::now() - Duration::from_secs(30))
711                    .set_attempt_count(1_u32),
712                "test-operation-name",
713            )
714            .unwrap_err();
715        let exhausted = err.source().and_then(|e| e.downcast_ref::<Exhausted>());
716        assert!(exhausted.is_some());
717    }
718
719    #[test]
720    fn test_limited_time_forwards_on_error() {
721        let mut mock = MockPolicy::new();
722        mock.expect_on_error()
723            .times(1..)
724            .returning(|_, e| RetryResult::Continue(e));
725
726        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
727        let rf = policy.on_error(&PollingState::default(), transient_error());
728        assert!(rf.is_continue());
729    }
730
731    #[test]
732    fn test_limited_time_forwards_in_progress() {
733        let mut mock = MockPolicy::new();
734        mock.expect_on_in_progress()
735            .times(3)
736            .returning(|_, _| Ok(()));
737
738        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
739        assert!(
740            policy
741                .on_in_progress(
742                    &PollingState::default().set_attempt_count(1_u32),
743                    "test-op-name"
744                )
745                .is_ok()
746        );
747        assert!(
748            policy
749                .on_in_progress(
750                    &PollingState::default().set_attempt_count(2_u32),
751                    "test-op-name"
752                )
753                .is_ok()
754        );
755        assert!(
756            policy
757                .on_in_progress(
758                    &PollingState::default().set_attempt_count(3_u32),
759                    "test-op-name"
760                )
761                .is_ok()
762        );
763    }
764
765    #[test]
766    fn test_limited_time_in_progress_returns_inner() {
767        let mut mock = MockPolicy::new();
768        mock.expect_on_in_progress()
769            .times(1)
770            .returning(|_, _| Err(transient_error()));
771
772        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
773        assert!(
774            policy
775                .on_in_progress(
776                    &PollingState::default().set_attempt_count(1_u32),
777                    "test-op-name"
778                )
779                .is_err()
780        );
781    }
782
783    #[test]
784    fn test_limited_time_inner_continues() {
785        let mut mock = MockPolicy::new();
786        mock.expect_on_error()
787            .times(1..)
788            .returning(|_, e| RetryResult::Continue(e));
789
790        let now = std::time::Instant::now();
791        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
792        let rf = policy.on_error(
793            &PollingState::default()
794                .set_start(now - Duration::from_secs(10))
795                .set_attempt_count(1_u32),
796            transient_error(),
797        );
798        assert!(rf.is_continue());
799
800        let rf = policy.on_error(
801            &PollingState::default()
802                .set_start(now - Duration::from_secs(70))
803                .set_attempt_count(1_u32),
804            transient_error(),
805        );
806        assert!(rf.is_exhausted());
807    }
808
809    #[test]
810    fn test_limited_time_inner_permanent() {
811        let mut mock = MockPolicy::new();
812        mock.expect_on_error()
813            .times(2)
814            .returning(|_, e| RetryResult::Permanent(e));
815
816        let now = std::time::Instant::now();
817        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
818
819        let rf = policy.on_error(
820            &PollingState::default()
821                .set_start(now - Duration::from_secs(10))
822                .set_attempt_count(1_u32),
823            transient_error(),
824        );
825        assert!(rf.is_permanent());
826
827        let rf = policy.on_error(
828            &PollingState::default()
829                .set_start(now + Duration::from_secs(10))
830                .set_attempt_count(1_u32),
831            transient_error(),
832        );
833        assert!(rf.is_permanent());
834    }
835
836    #[test]
837    fn test_limited_time_inner_exhausted() {
838        let mut mock = MockPolicy::new();
839        mock.expect_on_error()
840            .times(2)
841            .returning(|_, e| RetryResult::Exhausted(e));
842
843        let now = std::time::Instant::now();
844        let policy = LimitedElapsedTime::custom(mock, Duration::from_secs(60));
845
846        let rf = policy.on_error(
847            &PollingState::default()
848                .set_start(now - Duration::from_secs(10))
849                .set_attempt_count(1_u32),
850            transient_error(),
851        );
852        assert!(rf.is_exhausted());
853
854        let rf = policy.on_error(
855            &PollingState::default()
856                .set_start(now + Duration::from_secs(10))
857                .set_attempt_count(1_u32),
858            transient_error(),
859        );
860        assert!(rf.is_exhausted());
861    }
862
863    #[test]
864    fn test_limited_attempt_count_on_error() {
865        let policy = LimitedAttemptCount::new(20);
866        assert!(
867            policy
868                .on_error(
869                    &PollingState::default().set_attempt_count(10_u32),
870                    unavailable()
871                )
872                .is_continue(),
873            "{policy:?}"
874        );
875        assert!(
876            policy
877                .on_error(
878                    &PollingState::default().set_attempt_count(30_u32),
879                    unavailable()
880                )
881                .is_exhausted(),
882            "{policy:?}"
883        );
884    }
885
886    #[test]
887    fn test_limited_attempt_count_in_progress() {
888        let policy = LimitedAttemptCount::new(20);
889        let result =
890            policy.on_in_progress(&PollingState::default().set_attempt_count(10_u32), "unused");
891        assert!(result.is_ok(), "{result:?}");
892        let err = policy
893            .on_in_progress(
894                &PollingState::default().set_attempt_count(30_u32),
895                "test-operation-name",
896            )
897            .unwrap_err();
898        let exhausted = err.source().and_then(|e| e.downcast_ref::<Exhausted>());
899        assert!(exhausted.is_some());
900    }
901
902    #[test]
903    fn test_limited_attempt_count_forwards_on_error() {
904        let mut mock = MockPolicy::new();
905        mock.expect_on_error()
906            .times(1..)
907            .returning(|_, e| RetryResult::Continue(e));
908
909        let policy = LimitedAttemptCount::custom(mock, 3);
910        assert!(
911            policy
912                .on_error(
913                    &PollingState::default().set_attempt_count(1_u32),
914                    transient_error()
915                )
916                .is_continue()
917        );
918        assert!(
919            policy
920                .on_error(
921                    &PollingState::default().set_attempt_count(2_u32),
922                    transient_error()
923                )
924                .is_continue()
925        );
926        assert!(
927            policy
928                .on_error(
929                    &PollingState::default().set_attempt_count(3_u32),
930                    transient_error()
931                )
932                .is_exhausted()
933        );
934    }
935
936    #[test]
937    fn test_limited_attempt_count_forwards_in_progress() {
938        let mut mock = MockPolicy::new();
939        mock.expect_on_in_progress()
940            .times(3)
941            .returning(|_, _| Ok(()));
942
943        let policy = LimitedAttemptCount::custom(mock, 5);
944        assert!(
945            policy
946                .on_in_progress(
947                    &PollingState::default().set_attempt_count(1_u32),
948                    "test-op-name"
949                )
950                .is_ok()
951        );
952        assert!(
953            policy
954                .on_in_progress(
955                    &PollingState::default().set_attempt_count(2_u32),
956                    "test-op-name"
957                )
958                .is_ok()
959        );
960        assert!(
961            policy
962                .on_in_progress(
963                    &PollingState::default().set_attempt_count(3_u32),
964                    "test-op-name"
965                )
966                .is_ok()
967        );
968    }
969
970    #[test]
971    fn test_limited_attempt_count_in_progress_returns_inner() {
972        let mut mock = MockPolicy::new();
973        mock.expect_on_in_progress()
974            .times(1)
975            .returning(|_, _| Err(unavailable()));
976
977        let policy = LimitedAttemptCount::custom(mock, 5);
978        assert!(
979            policy
980                .on_in_progress(
981                    &PollingState::default().set_attempt_count(1_u32),
982                    "test-op-name"
983                )
984                .is_err()
985        );
986    }
987
988    #[test]
989    fn test_limited_attempt_count_inner_permanent() {
990        let mut mock = MockPolicy::new();
991        mock.expect_on_error()
992            .times(2)
993            .returning(|_, e| RetryResult::Permanent(e));
994        let policy = LimitedAttemptCount::custom(mock, 2);
995        let rf = policy.on_error(
996            &PollingState::default().set_attempt_count(1_u32),
997            Error::ser("err"),
998        );
999        assert!(rf.is_permanent());
1000
1001        let rf = policy.on_error(
1002            &PollingState::default().set_attempt_count(1_u32),
1003            Error::ser("err"),
1004        );
1005        assert!(rf.is_permanent());
1006    }
1007
1008    #[test]
1009    fn test_limited_attempt_count_inner_exhausted() {
1010        let mut mock = MockPolicy::new();
1011        mock.expect_on_error()
1012            .times(2)
1013            .returning(|_, e| RetryResult::Exhausted(e));
1014        let policy = LimitedAttemptCount::custom(mock, 2);
1015
1016        let rf = policy.on_error(
1017            &PollingState::default().set_attempt_count(1_u32),
1018            transient_error(),
1019        );
1020        assert!(rf.is_exhausted());
1021
1022        let rf = policy.on_error(
1023            &PollingState::default().set_attempt_count(1_u32),
1024            transient_error(),
1025        );
1026        assert!(rf.is_exhausted());
1027    }
1028
1029    #[test]
1030    fn test_exhausted_fmt() {
1031        let exhausted = Exhausted::new(
1032            "op-name",
1033            "limit-name",
1034            "test-value".to_string(),
1035            "test-limit".to_string(),
1036        );
1037        let fmt = format!("{exhausted}");
1038        assert!(fmt.contains("op-name"), "{fmt}");
1039        assert!(fmt.contains("limit-name"), "{fmt}");
1040        assert!(fmt.contains("test-value"), "{fmt}");
1041        assert!(fmt.contains("test-limit"), "{fmt}");
1042    }
1043
1044    fn transient_error() -> Error {
1045        use crate::error::rpc::{Code, Status};
1046        Error::service(
1047            Status::default()
1048                .set_code(Code::Unavailable)
1049                .set_message("try-again"),
1050        )
1051    }
1052}