google_cloud_storage/read_resume_policy.rs
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines the read resume policies for Google Cloud Storage.
16//!
17//! Even if a read request starts successfully, it may be fail after it starts.
18//! For example, the read may be interrupted or become too slow and "stall". The
19//! client library can automatically recover from such errors. The application
20//! may want to control what errors are treated as recoverable, and how many
21//! failures are tolerated before abandoning the read request.
22//!
23//! The traits and types defined in this module allow for such customization.
24//!
25//! # Example
26//! ```
27//! # use google_cloud_storage::read_resume_policy::*;
28//! let policy = Recommended.with_attempt_limit(3);
29//! assert!(matches!(policy.on_error(&ResumeQuery::new(0), io_error()), ResumeResult::Continue(_)));
30//! assert!(matches!(policy.on_error(&ResumeQuery::new(1), io_error()), ResumeResult::Continue(_)));
31//! assert!(matches!(policy.on_error(&ResumeQuery::new(2), io_error()), ResumeResult::Continue(_)));
32//! assert!(matches!(policy.on_error(&ResumeQuery::new(3), io_error()), ResumeResult::Exhausted(_)));
33//!
34//! use gax::error::{Error, rpc::Code, rpc::Status};
35//! fn io_error() -> Error {
36//! // ... details omitted ...
37//! # Error::io("something failed in the read request")
38//! }
39//! ```
40
41use crate::Error;
42use gax::error::rpc::Code;
43
44pub use gax::retry_result::RetryResult as ResumeResult;
45
46/// Defines the interface to resume policies.
47pub trait ReadResumePolicy: Send + Sync + std::fmt::Debug {
48 /// Determines if the read should continue after an error.
49 fn on_error(&self, status: &ResumeQuery, error: Error) -> ResumeResult;
50}
51
52/// Extension trait for [ReadResumePolicy].
53pub trait ReadResumePolicyExt: Sized {
54 /// Decorates a [ReadResumePolicy] to limit the number of resume attempts.
55 ///
56 /// This policy decorates an inner policy and limits the total number of
57 /// attempts. Note that `on_error()` is not called before the initial
58 /// (non-retry) attempt. Therefore, setting the maximum number of attempts
59 /// to 0 or 1 results in no retry attempts.
60 ///
61 /// The policy passes through the results from the inner policy as long as
62 /// `attempt_count < maximum_attempts`. Once the maximum number of attempts
63 /// is reached, the policy returns [Exhausted][ResumeResult::Exhausted] if the
64 /// inner policy returns [Continue][ResumeResult::Continue].
65 ///
66 /// # Example
67 /// ```
68 /// # use google_cloud_storage::read_resume_policy::*;
69 /// let policy = Recommended.with_attempt_limit(3);
70 /// assert!(matches!(policy.on_error(&ResumeQuery::new(0), transient_error()), ResumeResult::Continue(_)));
71 /// assert!(matches!(policy.on_error(&ResumeQuery::new(1), transient_error()), ResumeResult::Continue(_)));
72 /// assert!(matches!(policy.on_error(&ResumeQuery::new(2), transient_error()), ResumeResult::Continue(_)));
73 /// assert!(matches!(policy.on_error(&ResumeQuery::new(3), transient_error()), ResumeResult::Exhausted(_)));
74 ///
75 /// use gax::error::{Error, rpc::Code, rpc::Status};
76 /// fn transient_error() -> Error {
77 /// // ... details omitted ...
78 /// # Error::io("something failed in the read request")
79 /// }
80 /// ```
81 fn with_attempt_limit(self, maximum_attempts: u32) -> LimitedAttemptCount<Self> {
82 LimitedAttemptCount::new(self, maximum_attempts)
83 }
84}
85impl<T: ReadResumePolicy> ReadResumePolicyExt for T {}
86
87/// The inputs into a resume policy query.
88///
89/// On an error, the client library queries the resume policy as to whether it
90/// should attempt a new read request or not. The client library provides an
91/// instance of this type to the resume policy.
92///
93/// We use a struct so we can grow the amount of information without breaking
94/// existing resume policies.
95#[derive(Debug)]
96#[non_exhaustive]
97pub struct ResumeQuery {
98 /// The number of times the read request has been interrupted already.
99 pub attempt_count: u32,
100}
101
102impl ResumeQuery {
103 /// Create a new instance.
104 pub fn new(attempt_count: u32) -> Self {
105 Self { attempt_count }
106 }
107}
108
109/// The recommended policy when reading objects from Cloud Storage.
110///
111/// This policy resumes any read that fails due to I/O errors, and stops on any
112/// other error kind.
113///
114/// # Example
115/// ```
116/// # use google_cloud_storage::read_resume_policy::*;
117/// let policy = Recommended;
118/// assert!(matches!(policy.on_error(&ResumeQuery::new(0), io_error()), ResumeResult::Continue(_)));
119/// assert!(matches!(policy.on_error(&ResumeQuery::new(0), other_error()), ResumeResult::Permanent(_)));
120///
121/// use gax::error::{Error, rpc::Code, rpc::Status};
122/// fn io_error() -> Error {
123/// // ... details omitted ...
124/// # Error::io("something failed in the read request")
125/// }
126/// fn other_error() -> Error {
127/// // ... details omitted ...
128/// # Error::deser("something failed in the read request")
129/// }
130/// ```
131#[derive(Debug)]
132pub struct Recommended;
133
134impl ReadResumePolicy for Recommended {
135 fn on_error(&self, _status: &ResumeQuery, error: Error) -> ResumeResult {
136 match error {
137 e if self::is_transient(&e) => ResumeResult::Continue(e),
138 e => ResumeResult::Permanent(e),
139 }
140 }
141}
142
143fn is_transient(error: &Error) -> bool {
144 match error {
145 // When using HTTP the only error after the read starts are I/O errors.
146 e if e.is_io() => true,
147 // When using gRPC the errors may include more information.
148 e if e.is_transport() => true,
149 e if e.is_timeout() => true,
150 e => e.status().is_some_and(|s| is_transient_code(s.code)),
151 }
152}
153
154fn is_transient_code(code: Code) -> bool {
155 // DeadlineExceeded is safe in this context because local deadline errors are not reported via e.status()
156 matches!(
157 code,
158 Code::Unavailable | Code::ResourceExhausted | Code::Internal | Code::DeadlineExceeded
159 )
160}
161
162/// A resume policy that resumes regardless of the error type.
163///
164/// This may be useful in tests, or if used with a very low limit on the number
165/// of allowed failures.
166///
167/// # Example
168/// ```
169/// # use google_cloud_storage::read_resume_policy::*;
170/// let policy = AlwaysResume.with_attempt_limit(3);
171/// assert!(matches!(policy.on_error(&ResumeQuery::new(0), scary_error()), ResumeResult::Continue(_)));
172/// assert!(matches!(policy.on_error(&ResumeQuery::new(1), scary_error()), ResumeResult::Continue(_)));
173/// assert!(matches!(policy.on_error(&ResumeQuery::new(2), scary_error()), ResumeResult::Continue(_)));
174/// assert!(matches!(policy.on_error(&ResumeQuery::new(3), scary_error()), ResumeResult::Exhausted(_)));
175///
176/// use gax::error::{Error, rpc::Code, rpc::Status};
177/// fn scary_error() -> Error {
178/// // ... details omitted ...
179/// # Error::deser("something failed in the read request")
180/// }
181/// ```
182#[derive(Debug)]
183pub struct AlwaysResume;
184
185impl ReadResumePolicy for AlwaysResume {
186 fn on_error(&self, _status: &ResumeQuery, error: Error) -> ResumeResult {
187 ResumeResult::Continue(error)
188 }
189}
190
191/// A resume policy that never resumes, regardless of the error type.
192///
193/// This is useful to disable the default resume policy.
194///
195/// # Example
196/// ```
197/// # use google_cloud_storage::read_resume_policy::*;
198/// let policy = NeverResume.with_attempt_limit(3);
199/// assert!(matches!(policy.on_error(&ResumeQuery::new(0), io_error()), ResumeResult::Permanent(_)));
200/// assert!(matches!(policy.on_error(&ResumeQuery::new(1), io_error()), ResumeResult::Permanent(_)));
201/// assert!(matches!(policy.on_error(&ResumeQuery::new(2), io_error()), ResumeResult::Permanent(_)));
202/// assert!(matches!(policy.on_error(&ResumeQuery::new(3), io_error()), ResumeResult::Permanent(_)));
203///
204/// use gax::error::{Error, rpc::Code, rpc::Status};
205/// fn io_error() -> Error {
206/// // ... details omitted ...
207/// # Error::io("something failed in the read request")
208/// }
209/// ```
210#[derive(Debug)]
211pub struct NeverResume;
212impl ReadResumePolicy for NeverResume {
213 fn on_error(&self, _status: &ResumeQuery, error: Error) -> ResumeResult {
214 ResumeResult::Permanent(error)
215 }
216}
217
218/// Decorates a resume policy to stop resuming after a certain number of attempts.
219///
220/// # Example
221/// ```
222/// # use google_cloud_storage::read_resume_policy::*;
223/// let policy = LimitedAttemptCount::new(AlwaysResume, 3);
224/// assert!(matches!(policy.on_error(&ResumeQuery::new(0), scary_error()), ResumeResult::Continue(_)));
225/// assert!(matches!(policy.on_error(&ResumeQuery::new(1), scary_error()), ResumeResult::Continue(_)));
226/// assert!(matches!(policy.on_error(&ResumeQuery::new(2), scary_error()), ResumeResult::Continue(_)));
227/// assert!(matches!(policy.on_error(&ResumeQuery::new(3), scary_error()), ResumeResult::Exhausted(_)));
228///
229/// use gax::error::{Error, rpc::Code, rpc::Status};
230/// fn scary_error() -> Error {
231/// // ... details omitted ...
232/// # Error::deser("something failed in the read request")
233/// }
234/// ```
235#[derive(Debug)]
236pub struct LimitedAttemptCount<P> {
237 inner: P,
238 maximum_attempts: u32,
239}
240
241impl<P> LimitedAttemptCount<P> {
242 /// Create a new instance.
243 pub fn new(inner: P, maximum_attempts: u32) -> Self {
244 Self {
245 inner,
246 maximum_attempts,
247 }
248 }
249}
250
251impl<P> ReadResumePolicy for LimitedAttemptCount<P>
252where
253 P: ReadResumePolicy,
254{
255 fn on_error(&self, status: &ResumeQuery, error: Error) -> ResumeResult {
256 match self.inner.on_error(status, error) {
257 ResumeResult::Continue(e) if status.attempt_count >= self.maximum_attempts => {
258 ResumeResult::Exhausted(e)
259 }
260 result => result,
261 }
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268
269 #[test]
270 fn recommended() {
271 let policy = Recommended;
272 let r = policy.on_error(&ResumeQuery::new(0), common_transient());
273 assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
274 let r = policy.on_error(&ResumeQuery::new(0), common_timeout());
275 assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
276 let r = policy.on_error(&ResumeQuery::new(0), http_transient());
277 assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
278 let r = policy.on_error(&ResumeQuery::new(0), grpc_deadline_exceeded());
279 assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
280 let r = policy.on_error(&ResumeQuery::new(0), grpc_internal());
281 assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
282 let r = policy.on_error(&ResumeQuery::new(0), grpc_resource_exhausted());
283 assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
284 let r = policy.on_error(&ResumeQuery::new(0), grpc_unavailable());
285 assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
286
287 let r = policy.on_error(&ResumeQuery::new(0), http_permanent());
288 assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
289 let r = policy.on_error(&ResumeQuery::new(0), grpc_permanent());
290 assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
291 }
292
293 #[test]
294 fn always_resume() {
295 let policy = AlwaysResume;
296 let r = policy.on_error(&ResumeQuery::new(0), http_transient());
297 assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
298 let r = policy.on_error(&ResumeQuery::new(0), http_permanent());
299 assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
300 }
301
302 #[test]
303 fn never_resume() {
304 let policy = NeverResume;
305 let r = policy.on_error(&ResumeQuery::new(0), http_transient());
306 assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
307 let r = policy.on_error(&ResumeQuery::new(0), http_permanent());
308 assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
309 }
310
311 #[test]
312 fn attempt_limit() {
313 let policy = Recommended.with_attempt_limit(3);
314 let r = policy.on_error(&ResumeQuery::new(0), http_transient());
315 assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
316 let r = policy.on_error(&ResumeQuery::new(1), http_transient());
317 assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
318 let r = policy.on_error(&ResumeQuery::new(2), http_transient());
319 assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
320 let r = policy.on_error(&ResumeQuery::new(3), http_transient());
321 assert!(matches!(r, ResumeResult::Exhausted(_)), "{r:?}");
322
323 let r = policy.on_error(&ResumeQuery::new(0), http_permanent());
324 assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
325 let r = policy.on_error(&ResumeQuery::new(3), http_permanent());
326 assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
327 }
328
329 #[test]
330 fn attempt_limit_inner_exhausted() {
331 let policy = AlwaysResume.with_attempt_limit(3).with_attempt_limit(5);
332 let r = policy.on_error(&ResumeQuery::new(3), http_transient());
333 assert!(matches!(r, ResumeResult::Exhausted(_)), "{r:?}");
334 }
335
336 fn http_transient() -> Error {
337 Error::io("test only")
338 }
339
340 fn http_permanent() -> Error {
341 Error::deser("bad data")
342 }
343
344 fn common_transient() -> Error {
345 Error::transport(http::HeaderMap::new(), "test-only")
346 }
347
348 fn common_timeout() -> Error {
349 Error::timeout("simulated timeout")
350 }
351
352 fn grpc_deadline_exceeded() -> Error {
353 grpc_error(Code::DeadlineExceeded)
354 }
355
356 fn grpc_internal() -> Error {
357 grpc_error(Code::Internal)
358 }
359
360 fn grpc_resource_exhausted() -> Error {
361 grpc_error(Code::ResourceExhausted)
362 }
363
364 fn grpc_unavailable() -> Error {
365 grpc_error(Code::Unavailable)
366 }
367
368 fn grpc_permanent() -> Error {
369 grpc_error(Code::PermissionDenied)
370 }
371
372 fn grpc_error(code: Code) -> Error {
373 let status = gax::error::rpc::Status::default().set_code(code);
374 Error::service(status)
375 }
376}