google_cloud_storage/read_resume_policy.rs
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines the read resume policies for Google Cloud Storage.
16//!
17//! Even if a read request starts successfully, it may be fail after it starts.
18//! For example, the read may be interrupted or become too slow and "stall". The
19//! client library can automatically recover from such errors. The application
20//! may want to control what errors are treated as recoverable, and how many
21//! failures are tolerated before abandoning the read request.
22//!
23//! The traits and types defined in this module allow for such customization.
24//!
25//! # Example
26//! ```
27//! # use google_cloud_storage::read_resume_policy::*;
28//! let policy = Recommended.with_attempt_limit(3);
29//! assert!(matches!(policy.on_error(&ResumeQuery::new(0), io_error()), ResumeResult::Continue(_)));
30//! assert!(matches!(policy.on_error(&ResumeQuery::new(1), io_error()), ResumeResult::Continue(_)));
31//! assert!(matches!(policy.on_error(&ResumeQuery::new(2), io_error()), ResumeResult::Continue(_)));
32//! assert!(matches!(policy.on_error(&ResumeQuery::new(3), io_error()), ResumeResult::Exhausted(_)));
33//!
34//! use gax::error::{Error, rpc::Code, rpc::Status};
35//! fn io_error() -> Error {
36//! // ... details omitted ...
37//! # Error::io("something failed in the read request")
38//! }
39//! ```
40
41use crate::Error;
42
43pub use gax::retry_result::RetryResult as ResumeResult;
44
45/// Defines the interface to resume policies.
46pub trait ReadResumePolicy: Send + Sync + std::fmt::Debug {
47 /// Determines if the read should continue after an error.
48 fn on_error(&self, status: &ResumeQuery, error: Error) -> ResumeResult;
49}
50
51/// Extension trait for [ReadResumePolicy].
52pub trait ReadResumePolicyExt: Sized {
53 /// Decorates a [ReadResumePolicy] to limit the number of resume attempts.
54 ///
55 /// This policy decorates an inner policy and limits the total number of
56 /// attempts. Note that `on_error()` is not called before the initial
57 /// (non-retry) attempt. Therefore, setting the maximum number of attempts
58 /// to 0 or 1 results in no retry attempts.
59 ///
60 /// The policy passes through the results from the inner policy as long as
61 /// `attempt_count < maximum_attempts`. Once the maximum number of attempts
62 /// is reached, the policy returns [Exhausted][ResumeResult::Exhausted] if the
63 /// inner policy returns [Continue][ResumeResult::Continue].
64 ///
65 /// # Example
66 /// ```
67 /// # use google_cloud_storage::read_resume_policy::*;
68 /// let policy = Recommended.with_attempt_limit(3);
69 /// assert!(matches!(policy.on_error(&ResumeQuery::new(0), transient_error()), ResumeResult::Continue(_)));
70 /// assert!(matches!(policy.on_error(&ResumeQuery::new(1), transient_error()), ResumeResult::Continue(_)));
71 /// assert!(matches!(policy.on_error(&ResumeQuery::new(2), transient_error()), ResumeResult::Continue(_)));
72 /// assert!(matches!(policy.on_error(&ResumeQuery::new(3), transient_error()), ResumeResult::Exhausted(_)));
73 ///
74 /// use gax::error::{Error, rpc::Code, rpc::Status};
75 /// fn transient_error() -> Error {
76 /// // ... details omitted ...
77 /// # Error::io("something failed in the read request")
78 /// }
79 /// ```
80 fn with_attempt_limit(self, maximum_attempts: u32) -> LimitedAttemptCount<Self> {
81 LimitedAttemptCount::new(self, maximum_attempts)
82 }
83}
84impl<T: ReadResumePolicy> ReadResumePolicyExt for T {}
85
86/// The inputs into a resume policy query.
87///
88/// On an error, the client library queries the resume policy as to whether it
89/// should attempt a new read request or not. The client library provides an
90/// instance of this type to the resume policy.
91///
92/// We use a struct so we can grow the amount of information without breaking
93/// existing resume policies.
94#[derive(Debug)]
95#[non_exhaustive]
96pub struct ResumeQuery {
97 /// The number of times the read request has been interrupted already.
98 pub attempt_count: u32,
99}
100
101impl ResumeQuery {
102 /// Create a new instance.
103 pub fn new(attempt_count: u32) -> Self {
104 Self { attempt_count }
105 }
106}
107
108/// The recommended policy when reading objects from Cloud Storage.
109///
110/// This policy resumes any read that fails due to I/O errors, and stops on any
111/// other error kind.
112///
113/// # Example
114/// ```
115/// # use google_cloud_storage::read_resume_policy::*;
116/// let policy = Recommended;
117/// assert!(matches!(policy.on_error(&ResumeQuery::new(0), io_error()), ResumeResult::Continue(_)));
118/// assert!(matches!(policy.on_error(&ResumeQuery::new(0), other_error()), ResumeResult::Permanent(_)));
119///
120/// use gax::error::{Error, rpc::Code, rpc::Status};
121/// fn io_error() -> Error {
122/// // ... details omitted ...
123/// # Error::io("something failed in the read request")
124/// }
125/// fn other_error() -> Error {
126/// // ... details omitted ...
127/// # Error::deser("something failed in the read request")
128/// }
129/// ```
130#[derive(Debug)]
131pub struct Recommended;
132
133impl ReadResumePolicy for Recommended {
134 fn on_error(&self, _status: &ResumeQuery, error: Error) -> ResumeResult {
135 if error.is_io() {
136 ResumeResult::Continue(error)
137 } else {
138 ResumeResult::Permanent(error)
139 }
140 }
141}
142
143/// A resume policy that resumes regardless of the error type.
144///
145/// This may be useful in tests, or if used with a very low limit on the number
146/// of allowed failures.
147///
148/// # Example
149/// ```
150/// # use google_cloud_storage::read_resume_policy::*;
151/// let policy = AlwaysResume.with_attempt_limit(3);
152/// assert!(matches!(policy.on_error(&ResumeQuery::new(0), scary_error()), ResumeResult::Continue(_)));
153/// assert!(matches!(policy.on_error(&ResumeQuery::new(1), scary_error()), ResumeResult::Continue(_)));
154/// assert!(matches!(policy.on_error(&ResumeQuery::new(2), scary_error()), ResumeResult::Continue(_)));
155/// assert!(matches!(policy.on_error(&ResumeQuery::new(3), scary_error()), ResumeResult::Exhausted(_)));
156///
157/// use gax::error::{Error, rpc::Code, rpc::Status};
158/// fn scary_error() -> Error {
159/// // ... details omitted ...
160/// # Error::deser("something failed in the read request")
161/// }
162/// ```
163#[derive(Debug)]
164pub struct AlwaysResume;
165
166impl ReadResumePolicy for AlwaysResume {
167 fn on_error(&self, _status: &ResumeQuery, error: Error) -> ResumeResult {
168 ResumeResult::Continue(error)
169 }
170}
171
172/// A resume policy that never resumes, regardless of the error type.
173///
174/// This is useful to disable the default resume policy.
175///
176/// # Example
177/// ```
178/// # use google_cloud_storage::read_resume_policy::*;
179/// let policy = NeverResume.with_attempt_limit(3);
180/// assert!(matches!(policy.on_error(&ResumeQuery::new(0), io_error()), ResumeResult::Permanent(_)));
181/// assert!(matches!(policy.on_error(&ResumeQuery::new(1), io_error()), ResumeResult::Permanent(_)));
182/// assert!(matches!(policy.on_error(&ResumeQuery::new(2), io_error()), ResumeResult::Permanent(_)));
183/// assert!(matches!(policy.on_error(&ResumeQuery::new(3), io_error()), ResumeResult::Permanent(_)));
184///
185/// use gax::error::{Error, rpc::Code, rpc::Status};
186/// fn io_error() -> Error {
187/// // ... details omitted ...
188/// # Error::io("something failed in the read request")
189/// }
190/// ```
191#[derive(Debug)]
192pub struct NeverResume;
193impl ReadResumePolicy for NeverResume {
194 fn on_error(&self, _status: &ResumeQuery, error: Error) -> ResumeResult {
195 ResumeResult::Permanent(error)
196 }
197}
198
199/// Decorates a resume policy to stop resuming after a certain number of attempts.
200///
201/// # Example
202/// ```
203/// # use google_cloud_storage::read_resume_policy::*;
204/// let policy = LimitedAttemptCount::new(AlwaysResume, 3);
205/// assert!(matches!(policy.on_error(&ResumeQuery::new(0), scary_error()), ResumeResult::Continue(_)));
206/// assert!(matches!(policy.on_error(&ResumeQuery::new(1), scary_error()), ResumeResult::Continue(_)));
207/// assert!(matches!(policy.on_error(&ResumeQuery::new(2), scary_error()), ResumeResult::Continue(_)));
208/// assert!(matches!(policy.on_error(&ResumeQuery::new(3), scary_error()), ResumeResult::Exhausted(_)));
209///
210/// use gax::error::{Error, rpc::Code, rpc::Status};
211/// fn scary_error() -> Error {
212/// // ... details omitted ...
213/// # Error::deser("something failed in the read request")
214/// }
215/// ```
216#[derive(Debug)]
217pub struct LimitedAttemptCount<P> {
218 inner: P,
219 maximum_attempts: u32,
220}
221
222impl<P> LimitedAttemptCount<P> {
223 /// Create a new instance.
224 pub fn new(inner: P, maximum_attempts: u32) -> Self {
225 Self {
226 inner,
227 maximum_attempts,
228 }
229 }
230}
231
232impl<P> ReadResumePolicy for LimitedAttemptCount<P>
233where
234 P: ReadResumePolicy,
235{
236 fn on_error(&self, status: &ResumeQuery, error: Error) -> ResumeResult {
237 match self.inner.on_error(status, error) {
238 ResumeResult::Continue(e) if status.attempt_count >= self.maximum_attempts => {
239 ResumeResult::Exhausted(e)
240 }
241 result => result,
242 }
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249
250 #[test]
251 fn recommended() {
252 let policy = Recommended;
253 let r = policy.on_error(&ResumeQuery::new(0), transient());
254 assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
255 let r = policy.on_error(&ResumeQuery::new(0), permanent());
256 assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
257 }
258
259 #[test]
260 fn always_resume() {
261 let policy = AlwaysResume;
262 let r = policy.on_error(&ResumeQuery::new(0), transient());
263 assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
264 let r = policy.on_error(&ResumeQuery::new(0), permanent());
265 assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
266 }
267
268 #[test]
269 fn never_resume() {
270 let policy = NeverResume;
271 let r = policy.on_error(&ResumeQuery::new(0), transient());
272 assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
273 let r = policy.on_error(&ResumeQuery::new(0), permanent());
274 assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
275 }
276
277 #[test]
278 fn attempt_limit() {
279 let policy = Recommended.with_attempt_limit(3);
280 let r = policy.on_error(&ResumeQuery::new(0), transient());
281 assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
282 let r = policy.on_error(&ResumeQuery::new(1), transient());
283 assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
284 let r = policy.on_error(&ResumeQuery::new(2), transient());
285 assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
286 let r = policy.on_error(&ResumeQuery::new(3), transient());
287 assert!(matches!(r, ResumeResult::Exhausted(_)), "{r:?}");
288
289 let r = policy.on_error(&ResumeQuery::new(0), permanent());
290 assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
291 let r = policy.on_error(&ResumeQuery::new(3), permanent());
292 assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
293 }
294
295 #[test]
296 fn attempt_limit_inner_exhausted() {
297 let policy = AlwaysResume.with_attempt_limit(3).with_attempt_limit(5);
298 let r = policy.on_error(&ResumeQuery::new(3), transient());
299 assert!(matches!(r, ResumeResult::Exhausted(_)), "{r:?}");
300 }
301
302 fn transient() -> Error {
303 Error::io("test only")
304 }
305
306 fn permanent() -> Error {
307 Error::deser("bad data")
308 }
309}