google_cloud_storage/
read_resume_policy.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines the read resume policies for Google Cloud Storage.
16//!
17//! Even if a read request starts successfully, it may be fail after it starts.
18//! For example, the read may be interrupted or become too slow and "stall". The
19//! client library can automatically recover from such errors. The application
20//! may want to control what errors are treated as recoverable, and how many
21//! failures are tolerated before abandoning the read request.
22//!
23//! The traits and types defined in this module allow for such customization.
24//!
25//! # Example
26//! ```
27//! # use google_cloud_storage::read_resume_policy::*;
28//! let policy = Recommended.with_attempt_limit(3);
29//! assert!(matches!(policy.on_error(&ResumeQuery::new(0), io_error()), ResumeResult::Continue(_)));
30//! assert!(matches!(policy.on_error(&ResumeQuery::new(1), io_error()), ResumeResult::Continue(_)));
31//! assert!(matches!(policy.on_error(&ResumeQuery::new(2), io_error()), ResumeResult::Continue(_)));
32//! assert!(matches!(policy.on_error(&ResumeQuery::new(3), io_error()), ResumeResult::Exhausted(_)));
33//!
34//! use gax::error::{Error, rpc::Code, rpc::Status};
35//! fn io_error() -> Error {
36//!    // ... details omitted ...
37//!    # Error::io("something failed in the read request")
38//! }
39//! ```
40
41use crate::Error;
42use gax::error::rpc::Code;
43
44pub use gax::retry_result::RetryResult as ResumeResult;
45
46/// Defines the interface to resume policies.
47pub trait ReadResumePolicy: Send + Sync + std::fmt::Debug {
48    /// Determines if the read should continue after an error.
49    fn on_error(&self, status: &ResumeQuery, error: Error) -> ResumeResult;
50}
51
52/// Extension trait for [ReadResumePolicy].
53pub trait ReadResumePolicyExt: Sized {
54    /// Decorates a [ReadResumePolicy] to limit the number of resume attempts.
55    ///
56    /// This policy decorates an inner policy and limits the total number of
57    /// attempts. Note that `on_error()` is not called before the initial
58    /// (non-retry) attempt. Therefore, setting the maximum number of attempts
59    /// to 0 or 1 results in no retry attempts.
60    ///
61    /// The policy passes through the results from the inner policy as long as
62    /// `attempt_count < maximum_attempts`. Once the maximum number of attempts
63    /// is reached, the policy returns [Exhausted][ResumeResult::Exhausted] if the
64    /// inner policy returns [Continue][ResumeResult::Continue].
65    ///
66    /// # Example
67    /// ```
68    /// # use google_cloud_storage::read_resume_policy::*;
69    /// let policy = Recommended.with_attempt_limit(3);
70    /// assert!(matches!(policy.on_error(&ResumeQuery::new(0), transient_error()), ResumeResult::Continue(_)));
71    /// assert!(matches!(policy.on_error(&ResumeQuery::new(1), transient_error()), ResumeResult::Continue(_)));
72    /// assert!(matches!(policy.on_error(&ResumeQuery::new(2), transient_error()), ResumeResult::Continue(_)));
73    /// assert!(matches!(policy.on_error(&ResumeQuery::new(3), transient_error()), ResumeResult::Exhausted(_)));
74    ///
75    /// use gax::error::{Error, rpc::Code, rpc::Status};
76    /// fn transient_error() -> Error {
77    ///    // ... details omitted ...
78    ///    # Error::io("something failed in the read request")
79    /// }
80    /// ```
81    fn with_attempt_limit(self, maximum_attempts: u32) -> LimitedAttemptCount<Self> {
82        LimitedAttemptCount::new(self, maximum_attempts)
83    }
84}
85impl<T: ReadResumePolicy> ReadResumePolicyExt for T {}
86
87/// The inputs into a resume policy query.
88///
89/// On an error, the client library queries the resume policy as to whether it
90/// should attempt a new read request or not. The client library provides an
91/// instance of this type to the resume policy.
92///
93/// We use a struct so we can grow the amount of information without breaking
94/// existing resume policies.
95#[derive(Debug)]
96#[non_exhaustive]
97pub struct ResumeQuery {
98    /// The number of times the read request has been interrupted already.
99    pub attempt_count: u32,
100}
101
102impl ResumeQuery {
103    /// Create a new instance.
104    pub fn new(attempt_count: u32) -> Self {
105        Self { attempt_count }
106    }
107}
108
109/// The recommended policy when reading objects from Cloud Storage.
110///
111/// This policy resumes any read that fails due to I/O errors, and stops on any
112/// other error kind.
113///
114/// # Example
115/// ```
116/// # use google_cloud_storage::read_resume_policy::*;
117/// let policy = Recommended;
118/// assert!(matches!(policy.on_error(&ResumeQuery::new(0), io_error()), ResumeResult::Continue(_)));
119/// assert!(matches!(policy.on_error(&ResumeQuery::new(0), other_error()), ResumeResult::Permanent(_)));
120///
121/// use gax::error::{Error, rpc::Code, rpc::Status};
122/// fn io_error() -> Error {
123///    // ... details omitted ...
124///    # Error::io("something failed in the read request")
125/// }
126/// fn other_error() -> Error {
127///    // ... details omitted ...
128///    # Error::deser("something failed in the read request")
129/// }
130/// ```
131#[derive(Debug)]
132pub struct Recommended;
133
134impl ReadResumePolicy for Recommended {
135    fn on_error(&self, _status: &ResumeQuery, error: Error) -> ResumeResult {
136        match error {
137            e if self::is_transient(&e) => ResumeResult::Continue(e),
138            e => ResumeResult::Permanent(e),
139        }
140    }
141}
142
143fn is_transient(error: &Error) -> bool {
144    match error {
145        // When using HTTP the only error after the read starts are I/O errors.
146        e if e.is_io() => true,
147        // When using gRPC the errors may include more information.
148        e if e.is_transport() => true,
149        e => e.status().is_some_and(|s| s.code == Code::Unavailable),
150    }
151}
152
153/// A resume policy that resumes regardless of the error type.
154///
155/// This may be useful in tests, or if used with a very low limit on the number
156/// of allowed failures.
157///
158/// # Example
159/// ```
160/// # use google_cloud_storage::read_resume_policy::*;
161/// let policy = AlwaysResume.with_attempt_limit(3);
162/// assert!(matches!(policy.on_error(&ResumeQuery::new(0), scary_error()), ResumeResult::Continue(_)));
163/// assert!(matches!(policy.on_error(&ResumeQuery::new(1), scary_error()), ResumeResult::Continue(_)));
164/// assert!(matches!(policy.on_error(&ResumeQuery::new(2), scary_error()), ResumeResult::Continue(_)));
165/// assert!(matches!(policy.on_error(&ResumeQuery::new(3), scary_error()), ResumeResult::Exhausted(_)));
166///
167/// use gax::error::{Error, rpc::Code, rpc::Status};
168/// fn scary_error() -> Error {
169///    // ... details omitted ...
170///    # Error::deser("something failed in the read request")
171/// }
172/// ```
173#[derive(Debug)]
174pub struct AlwaysResume;
175
176impl ReadResumePolicy for AlwaysResume {
177    fn on_error(&self, _status: &ResumeQuery, error: Error) -> ResumeResult {
178        ResumeResult::Continue(error)
179    }
180}
181
182/// A resume policy that never resumes, regardless of the error type.
183///
184/// This is useful to disable the default resume policy.
185///
186/// # Example
187/// ```
188/// # use google_cloud_storage::read_resume_policy::*;
189/// let policy = NeverResume.with_attempt_limit(3);
190/// assert!(matches!(policy.on_error(&ResumeQuery::new(0), io_error()), ResumeResult::Permanent(_)));
191/// assert!(matches!(policy.on_error(&ResumeQuery::new(1), io_error()), ResumeResult::Permanent(_)));
192/// assert!(matches!(policy.on_error(&ResumeQuery::new(2), io_error()), ResumeResult::Permanent(_)));
193/// assert!(matches!(policy.on_error(&ResumeQuery::new(3), io_error()), ResumeResult::Permanent(_)));
194///
195/// use gax::error::{Error, rpc::Code, rpc::Status};
196/// fn io_error() -> Error {
197///    // ... details omitted ...
198///    # Error::io("something failed in the read request")
199/// }
200/// ```
201#[derive(Debug)]
202pub struct NeverResume;
203impl ReadResumePolicy for NeverResume {
204    fn on_error(&self, _status: &ResumeQuery, error: Error) -> ResumeResult {
205        ResumeResult::Permanent(error)
206    }
207}
208
209/// Decorates a resume policy to stop resuming after a certain number of attempts.
210///
211/// # Example
212/// ```
213/// # use google_cloud_storage::read_resume_policy::*;
214/// let policy = LimitedAttemptCount::new(AlwaysResume, 3);
215/// assert!(matches!(policy.on_error(&ResumeQuery::new(0), scary_error()), ResumeResult::Continue(_)));
216/// assert!(matches!(policy.on_error(&ResumeQuery::new(1), scary_error()), ResumeResult::Continue(_)));
217/// assert!(matches!(policy.on_error(&ResumeQuery::new(2), scary_error()), ResumeResult::Continue(_)));
218/// assert!(matches!(policy.on_error(&ResumeQuery::new(3), scary_error()), ResumeResult::Exhausted(_)));
219///
220/// use gax::error::{Error, rpc::Code, rpc::Status};
221/// fn scary_error() -> Error {
222///    // ... details omitted ...
223///    # Error::deser("something failed in the read request")
224/// }
225/// ```
226#[derive(Debug)]
227pub struct LimitedAttemptCount<P> {
228    inner: P,
229    maximum_attempts: u32,
230}
231
232impl<P> LimitedAttemptCount<P> {
233    /// Create a new instance.
234    pub fn new(inner: P, maximum_attempts: u32) -> Self {
235        Self {
236            inner,
237            maximum_attempts,
238        }
239    }
240}
241
242impl<P> ReadResumePolicy for LimitedAttemptCount<P>
243where
244    P: ReadResumePolicy,
245{
246    fn on_error(&self, status: &ResumeQuery, error: Error) -> ResumeResult {
247        match self.inner.on_error(status, error) {
248            ResumeResult::Continue(e) if status.attempt_count >= self.maximum_attempts => {
249                ResumeResult::Exhausted(e)
250            }
251            result => result,
252        }
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259
260    #[test]
261    fn recommended() {
262        let policy = Recommended;
263        let r = policy.on_error(&ResumeQuery::new(0), common_transient());
264        assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
265        let r = policy.on_error(&ResumeQuery::new(0), http_transient());
266        assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
267        let r = policy.on_error(&ResumeQuery::new(0), grpc_transient());
268        assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
269
270        let r = policy.on_error(&ResumeQuery::new(0), http_permanent());
271        assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
272        let r = policy.on_error(&ResumeQuery::new(0), grpc_permanent());
273        assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
274    }
275
276    #[test]
277    fn always_resume() {
278        let policy = AlwaysResume;
279        let r = policy.on_error(&ResumeQuery::new(0), http_transient());
280        assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
281        let r = policy.on_error(&ResumeQuery::new(0), http_permanent());
282        assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
283    }
284
285    #[test]
286    fn never_resume() {
287        let policy = NeverResume;
288        let r = policy.on_error(&ResumeQuery::new(0), http_transient());
289        assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
290        let r = policy.on_error(&ResumeQuery::new(0), http_permanent());
291        assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
292    }
293
294    #[test]
295    fn attempt_limit() {
296        let policy = Recommended.with_attempt_limit(3);
297        let r = policy.on_error(&ResumeQuery::new(0), http_transient());
298        assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
299        let r = policy.on_error(&ResumeQuery::new(1), http_transient());
300        assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
301        let r = policy.on_error(&ResumeQuery::new(2), http_transient());
302        assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
303        let r = policy.on_error(&ResumeQuery::new(3), http_transient());
304        assert!(matches!(r, ResumeResult::Exhausted(_)), "{r:?}");
305
306        let r = policy.on_error(&ResumeQuery::new(0), http_permanent());
307        assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
308        let r = policy.on_error(&ResumeQuery::new(3), http_permanent());
309        assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
310    }
311
312    #[test]
313    fn attempt_limit_inner_exhausted() {
314        let policy = AlwaysResume.with_attempt_limit(3).with_attempt_limit(5);
315        let r = policy.on_error(&ResumeQuery::new(3), http_transient());
316        assert!(matches!(r, ResumeResult::Exhausted(_)), "{r:?}");
317    }
318
319    fn http_transient() -> Error {
320        Error::io("test only")
321    }
322
323    fn http_permanent() -> Error {
324        Error::deser("bad data")
325    }
326
327    fn common_transient() -> Error {
328        Error::transport(http::HeaderMap::new(), "test-only")
329    }
330
331    fn grpc_transient() -> Error {
332        grpc_error(Code::Unavailable)
333    }
334
335    fn grpc_permanent() -> Error {
336        grpc_error(Code::PermissionDenied)
337    }
338
339    fn grpc_error(code: Code) -> Error {
340        let status = gax::error::rpc::Status::default().set_code(code);
341        Error::service(status)
342    }
343}