google_cloud_storage/
read_resume_policy.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines the read resume policies for Google Cloud Storage.
16//!
17//! Even if a read request starts successfully, it may be fail after it starts.
18//! For example, the read may be interrupted or become too slow and "stall". The
19//! client library can automatically recover from such errors. The application
20//! may want to control what errors are treated as recoverable, and how many
21//! failures are tolerated before abandoning the read request.
22//!
23//! The traits and types defined in this module allow for such customization.
24//!
25//! # Example
26//! ```
27//! # use google_cloud_storage::read_resume_policy::*;
28//! let policy = Recommended.with_attempt_limit(3);
29//! assert!(matches!(policy.on_error(&ResumeQuery::new(0), io_error()), ResumeResult::Continue(_)));
30//! assert!(matches!(policy.on_error(&ResumeQuery::new(1), io_error()), ResumeResult::Continue(_)));
31//! assert!(matches!(policy.on_error(&ResumeQuery::new(2), io_error()), ResumeResult::Continue(_)));
32//! assert!(matches!(policy.on_error(&ResumeQuery::new(3), io_error()), ResumeResult::Exhausted(_)));
33//!
34//! use gax::error::{Error, rpc::Code, rpc::Status};
35//! fn io_error() -> Error {
36//!    // ... details omitted ...
37//!    # Error::io("something failed in the read request")
38//! }
39//! ```
40
41use crate::Error;
42
43pub use gax::retry_result::RetryResult as ResumeResult;
44
45/// Defines the interface to resume policies.
46pub trait ReadResumePolicy: Send + Sync + std::fmt::Debug {
47    /// Determines if the read should continue after an error.
48    fn on_error(&self, status: &ResumeQuery, error: Error) -> ResumeResult;
49}
50
51/// Extension trait for [ReadResumePolicy].
52pub trait ReadResumePolicyExt: Sized {
53    /// Decorates a [ReadResumePolicy] to limit the number of resume attempts.
54    ///
55    /// This policy decorates an inner policy and limits the total number of
56    /// attempts. Note that `on_error()` is not called before the initial
57    /// (non-retry) attempt. Therefore, setting the maximum number of attempts
58    /// to 0 or 1 results in no retry attempts.
59    ///
60    /// The policy passes through the results from the inner policy as long as
61    /// `attempt_count < maximum_attempts`. Once the maximum number of attempts
62    /// is reached, the policy returns [Exhausted][ResumeResult::Exhausted] if the
63    /// inner policy returns [Continue][ResumeResult::Continue].
64    ///
65    /// # Example
66    /// ```
67    /// # use google_cloud_storage::read_resume_policy::*;
68    /// let policy = Recommended.with_attempt_limit(3);
69    /// assert!(matches!(policy.on_error(&ResumeQuery::new(0), transient_error()), ResumeResult::Continue(_)));
70    /// assert!(matches!(policy.on_error(&ResumeQuery::new(1), transient_error()), ResumeResult::Continue(_)));
71    /// assert!(matches!(policy.on_error(&ResumeQuery::new(2), transient_error()), ResumeResult::Continue(_)));
72    /// assert!(matches!(policy.on_error(&ResumeQuery::new(3), transient_error()), ResumeResult::Exhausted(_)));
73    ///
74    /// use gax::error::{Error, rpc::Code, rpc::Status};
75    /// fn transient_error() -> Error {
76    ///    // ... details omitted ...
77    ///    # Error::io("something failed in the read request")
78    /// }
79    /// ```
80    fn with_attempt_limit(self, maximum_attempts: u32) -> LimitedAttemptCount<Self> {
81        LimitedAttemptCount::new(self, maximum_attempts)
82    }
83}
84impl<T: ReadResumePolicy> ReadResumePolicyExt for T {}
85
86/// The inputs into a resume policy query.
87///
88/// On an error, the client library queries the resume policy as to whether it
89/// should attempt a new read request or not. The client library provides an
90/// instance of this type to the resume policy.
91///
92/// We use a struct so we can grow the amount of information without breaking
93/// existing resume policies.
94#[derive(Debug)]
95#[non_exhaustive]
96pub struct ResumeQuery {
97    /// The number of times the read request has been interrupted already.
98    pub attempt_count: u32,
99}
100
101impl ResumeQuery {
102    /// Create a new instance.
103    pub fn new(attempt_count: u32) -> Self {
104        Self { attempt_count }
105    }
106}
107
108/// The recommended policy when reading objects from Cloud Storage.
109///
110/// This policy resumes any read that fails due to I/O errors, and stops on any
111/// other error kind.
112///
113/// # Example
114/// ```
115/// # use google_cloud_storage::read_resume_policy::*;
116/// let policy = Recommended;
117/// assert!(matches!(policy.on_error(&ResumeQuery::new(0), io_error()), ResumeResult::Continue(_)));
118/// assert!(matches!(policy.on_error(&ResumeQuery::new(0), other_error()), ResumeResult::Permanent(_)));
119///
120/// use gax::error::{Error, rpc::Code, rpc::Status};
121/// fn io_error() -> Error {
122///    // ... details omitted ...
123///    # Error::io("something failed in the read request")
124/// }
125/// fn other_error() -> Error {
126///    // ... details omitted ...
127///    # Error::deser("something failed in the read request")
128/// }
129/// ```
130#[derive(Debug)]
131pub struct Recommended;
132
133impl ReadResumePolicy for Recommended {
134    fn on_error(&self, _status: &ResumeQuery, error: Error) -> ResumeResult {
135        if error.is_io() {
136            ResumeResult::Continue(error)
137        } else {
138            ResumeResult::Permanent(error)
139        }
140    }
141}
142
143/// A resume policy that resumes regardless of the error type.
144///
145/// This may be useful in tests, or if used with a very low limit on the number
146/// of allowed failures.
147///
148/// # Example
149/// ```
150/// # use google_cloud_storage::read_resume_policy::*;
151/// let policy = AlwaysResume.with_attempt_limit(3);
152/// assert!(matches!(policy.on_error(&ResumeQuery::new(0), scary_error()), ResumeResult::Continue(_)));
153/// assert!(matches!(policy.on_error(&ResumeQuery::new(1), scary_error()), ResumeResult::Continue(_)));
154/// assert!(matches!(policy.on_error(&ResumeQuery::new(2), scary_error()), ResumeResult::Continue(_)));
155/// assert!(matches!(policy.on_error(&ResumeQuery::new(3), scary_error()), ResumeResult::Exhausted(_)));
156///
157/// use gax::error::{Error, rpc::Code, rpc::Status};
158/// fn scary_error() -> Error {
159///    // ... details omitted ...
160///    # Error::deser("something failed in the read request")
161/// }
162/// ```
163#[derive(Debug)]
164pub struct AlwaysResume;
165
166impl ReadResumePolicy for AlwaysResume {
167    fn on_error(&self, _status: &ResumeQuery, error: Error) -> ResumeResult {
168        ResumeResult::Continue(error)
169    }
170}
171
172/// A resume policy that never resumes, regardless of the error type.
173///
174/// This is useful to disable the default resume policy.
175///
176/// # Example
177/// ```
178/// # use google_cloud_storage::read_resume_policy::*;
179/// let policy = NeverResume.with_attempt_limit(3);
180/// assert!(matches!(policy.on_error(&ResumeQuery::new(0), io_error()), ResumeResult::Permanent(_)));
181/// assert!(matches!(policy.on_error(&ResumeQuery::new(1), io_error()), ResumeResult::Permanent(_)));
182/// assert!(matches!(policy.on_error(&ResumeQuery::new(2), io_error()), ResumeResult::Permanent(_)));
183/// assert!(matches!(policy.on_error(&ResumeQuery::new(3), io_error()), ResumeResult::Permanent(_)));
184///
185/// use gax::error::{Error, rpc::Code, rpc::Status};
186/// fn io_error() -> Error {
187///    // ... details omitted ...
188///    # Error::io("something failed in the read request")
189/// }
190/// ```
191#[derive(Debug)]
192pub struct NeverResume;
193impl ReadResumePolicy for NeverResume {
194    fn on_error(&self, _status: &ResumeQuery, error: Error) -> ResumeResult {
195        ResumeResult::Permanent(error)
196    }
197}
198
199/// Decorates a resume policy to stop resuming after a certain number of attempts.
200///
201/// # Example
202/// ```
203/// # use google_cloud_storage::read_resume_policy::*;
204/// let policy = LimitedAttemptCount::new(AlwaysResume, 3);
205/// assert!(matches!(policy.on_error(&ResumeQuery::new(0), scary_error()), ResumeResult::Continue(_)));
206/// assert!(matches!(policy.on_error(&ResumeQuery::new(1), scary_error()), ResumeResult::Continue(_)));
207/// assert!(matches!(policy.on_error(&ResumeQuery::new(2), scary_error()), ResumeResult::Continue(_)));
208/// assert!(matches!(policy.on_error(&ResumeQuery::new(3), scary_error()), ResumeResult::Exhausted(_)));
209///
210/// use gax::error::{Error, rpc::Code, rpc::Status};
211/// fn scary_error() -> Error {
212///    // ... details omitted ...
213///    # Error::deser("something failed in the read request")
214/// }
215/// ```
216#[derive(Debug)]
217pub struct LimitedAttemptCount<P> {
218    inner: P,
219    maximum_attempts: u32,
220}
221
222impl<P> LimitedAttemptCount<P> {
223    /// Create a new instance.
224    pub fn new(inner: P, maximum_attempts: u32) -> Self {
225        Self {
226            inner,
227            maximum_attempts,
228        }
229    }
230}
231
232impl<P> ReadResumePolicy for LimitedAttemptCount<P>
233where
234    P: ReadResumePolicy,
235{
236    fn on_error(&self, status: &ResumeQuery, error: Error) -> ResumeResult {
237        match self.inner.on_error(status, error) {
238            ResumeResult::Continue(e) if status.attempt_count >= self.maximum_attempts => {
239                ResumeResult::Exhausted(e)
240            }
241            result => result,
242        }
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249
250    #[test]
251    fn recommended() {
252        let policy = Recommended;
253        let r = policy.on_error(&ResumeQuery::new(0), transient());
254        assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
255        let r = policy.on_error(&ResumeQuery::new(0), permanent());
256        assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
257    }
258
259    #[test]
260    fn always_resume() {
261        let policy = AlwaysResume;
262        let r = policy.on_error(&ResumeQuery::new(0), transient());
263        assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
264        let r = policy.on_error(&ResumeQuery::new(0), permanent());
265        assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
266    }
267
268    #[test]
269    fn never_resume() {
270        let policy = NeverResume;
271        let r = policy.on_error(&ResumeQuery::new(0), transient());
272        assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
273        let r = policy.on_error(&ResumeQuery::new(0), permanent());
274        assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
275    }
276
277    #[test]
278    fn attempt_limit() {
279        let policy = Recommended.with_attempt_limit(3);
280        let r = policy.on_error(&ResumeQuery::new(0), transient());
281        assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
282        let r = policy.on_error(&ResumeQuery::new(1), transient());
283        assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
284        let r = policy.on_error(&ResumeQuery::new(2), transient());
285        assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
286        let r = policy.on_error(&ResumeQuery::new(3), transient());
287        assert!(matches!(r, ResumeResult::Exhausted(_)), "{r:?}");
288
289        let r = policy.on_error(&ResumeQuery::new(0), permanent());
290        assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
291        let r = policy.on_error(&ResumeQuery::new(3), permanent());
292        assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
293    }
294
295    #[test]
296    fn attempt_limit_inner_exhausted() {
297        let policy = AlwaysResume.with_attempt_limit(3).with_attempt_limit(5);
298        let r = policy.on_error(&ResumeQuery::new(3), transient());
299        assert!(matches!(r, ResumeResult::Exhausted(_)), "{r:?}");
300    }
301
302    fn transient() -> Error {
303        Error::io("test only")
304    }
305
306    fn permanent() -> Error {
307        Error::deser("bad data")
308    }
309}