google_cloud_storage/
retry_policy.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines the retry policies for Google Cloud Storage.
16//!
17//! The storage service [recommends] retrying several 408, 429, and all 5xx HTTP
18//! status codes. This is confirmed in the description of each status code:
19//!
20//! - [408 - Request Timeout][408]
21//! - [429 - Too Many Requests][429]
22//! - [500 - Internal Server Error][500]
23//! - [502 - Bad Gateway][502]
24//! - [503 - Service Unavailable][503]
25//! - [504 - Gateway Timeout][504]
26//!
27//! In addition, resumable uploads return [308 - Resume Incomplete][308]. This
28//! is not handled by the [RetryableErrors] retry policy.
29//!
30//! [recommends]: https://cloud.google.com/storage/docs/retry-strategy
31//! [308]: https://cloud.google.com/storage/docs/json_api/v1/status-codes#308_Resume_Incomplete
32//! [408]: https://cloud.google.com/storage/docs/json_api/v1/status-codes#408_Request_Timeout
33//! [429]: https://cloud.google.com/storage/docs/json_api/v1/status-codes#429_Too_Many_Requests
34//! [500]: https://cloud.google.com/storage/docs/json_api/v1/status-codes#500_Internal_Server_Error
35//! [502]: https://cloud.google.com/storage/docs/json_api/v1/status-codes#502_Bad_Gateway
36//! [503]: https://cloud.google.com/storage/docs/json_api/v1/status-codes#503_Service_Unavailable
37//! [504]: https://cloud.google.com/storage/docs/json_api/v1/status-codes#504_Gateway_Timeout
38
39use gax::error::Error;
40use gax::{
41    retry_policy::{RetryPolicy, RetryPolicyExt},
42    retry_result::RetryResult,
43    retry_state::RetryState,
44};
45use std::sync::Arc;
46use std::time::Duration;
47
48/// The default retry policy for the Storage client.
49///
50/// The client will retry all the errors shown as retryable in the service
51/// documentation, and stop retrying after 10 seconds.
52pub(crate) fn storage_default() -> impl RetryPolicy {
53    RetryableErrors.with_time_limit(Duration::from_secs(300))
54}
55
56/// Follows the [retry strategy] recommended by the Cloud Storage service guides.
57///
58/// This policy must be decorated to limit the number of retry attempts and/or
59/// the duration of the retry loop.
60///
61/// # Example
62/// ```
63/// # use google_cloud_storage::retry_policy::RetryableErrors;
64/// use gax::retry_policy::RetryPolicyExt;
65/// use google_cloud_storage::client::Storage;
66/// use std::time::Duration;
67/// let builder = Storage::builder().with_retry_policy(
68///     RetryableErrors
69///         .with_time_limit(Duration::from_secs(60))
70///         .with_attempt_limit(10),
71/// );
72/// ```
73///
74/// [retry strategy]: https://cloud.google.com/storage/docs/retry-strategy
75#[derive(Clone, Debug)]
76pub struct RetryableErrors;
77
78impl RetryPolicy for RetryableErrors {
79    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
80        if error.is_transient_and_before_rpc() {
81            return RetryResult::Continue(error);
82        }
83        if !state.idempotent {
84            return RetryResult::Permanent(error);
85        }
86        if error.is_io() || error.is_timeout() {
87            return RetryResult::Continue(error);
88        }
89        if error.is_transport() && error.http_status_code().is_none() {
90            // Sometimes gRPC returns a transport error without an HTTP status
91            // code. We treat all of these as I/O errors and therefore
92            // retryable.
93            return RetryResult::Continue(error);
94        }
95        if let Some(code) = error.http_status_code() {
96            return match code {
97                408 | 429 | 500..600 => RetryResult::Continue(error),
98                _ => RetryResult::Permanent(error),
99            };
100        }
101        if let Some(code) = error.status().map(|s| s.code) {
102            use gax::error::rpc::Code;
103            return match code {
104                Code::Internal | Code::ResourceExhausted | Code::Unavailable => {
105                    RetryResult::Continue(error)
106                }
107                // Over gRPC, the service returns DeadlineExceeded for some
108                // "Internal Error; please retry" conditions.
109                Code::DeadlineExceeded => RetryResult::Continue(error),
110                _ => RetryResult::Permanent(error),
111            };
112        }
113        RetryResult::Permanent(error)
114    }
115}
116
117/// Decorate the retry policy to continue on 308 errors.
118///
119/// Used internally to handle the resumable upload loop.
120#[derive(Clone, Debug)]
121pub(crate) struct ContinueOn308<T> {
122    inner: T,
123}
124
125impl<T> ContinueOn308<T> {
126    pub fn new(inner: T) -> Self {
127        Self { inner }
128    }
129}
130
131impl RetryPolicy for ContinueOn308<Arc<dyn RetryPolicy + 'static>> {
132    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
133        if error.http_status_code() == Some(308) {
134            return RetryResult::Continue(error);
135        }
136        self.inner.on_error(state, error)
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    use gax::error::rpc::Code;
144    use gax::throttle_result::ThrottleResult;
145    use http::HeaderMap;
146    use test_case::test_case;
147
148    #[test_case(408)]
149    #[test_case(429)]
150    #[test_case(500)]
151    #[test_case(502)]
152    #[test_case(503)]
153    #[test_case(504)]
154    fn retryable_http(code: u16) {
155        let p = RetryableErrors;
156        assert!(
157            p.on_error(&RetryState::new(true), http_error(code))
158                .is_continue()
159        );
160        assert!(
161            p.on_error(&RetryState::new(false), http_error(code))
162                .is_permanent()
163        );
164
165        let t = p.on_throttle(&RetryState::new(true), http_error(code));
166        assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
167    }
168
169    #[test_case(401)]
170    #[test_case(403)]
171    fn not_recommended_http(code: u16) {
172        let p = RetryableErrors;
173        assert!(
174            p.on_error(&RetryState::new(true), http_error(code))
175                .is_permanent()
176        );
177        assert!(
178            p.on_error(&RetryState::new(false), http_error(code))
179                .is_permanent()
180        );
181
182        let t = p.on_throttle(&RetryState::new(true), http_error(code));
183        assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
184    }
185
186    #[test_case(Code::Unavailable)]
187    #[test_case(Code::Internal)]
188    #[test_case(Code::ResourceExhausted)]
189    #[test_case(Code::DeadlineExceeded)]
190    fn retryable_grpc(code: Code) {
191        let p = RetryableErrors;
192        assert!(
193            p.on_error(&RetryState::new(true), grpc_error(code))
194                .is_continue()
195        );
196        assert!(
197            p.on_error(&RetryState::new(false), grpc_error(code))
198                .is_permanent()
199        );
200
201        let t = p.on_throttle(&RetryState::new(true), grpc_error(code));
202        assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
203    }
204
205    #[test_case(Code::Unauthenticated)]
206    #[test_case(Code::PermissionDenied)]
207    fn not_recommended_grpc(code: Code) {
208        let p = RetryableErrors;
209        assert!(
210            p.on_error(&RetryState::new(true), grpc_error(code))
211                .is_permanent()
212        );
213        assert!(
214            p.on_error(&RetryState::new(false), grpc_error(code))
215                .is_permanent()
216        );
217
218        let t = p.on_throttle(&RetryState::new(true), grpc_error(code));
219        assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
220    }
221
222    #[test]
223    fn io() {
224        let p = RetryableErrors;
225        assert!(p.on_error(&RetryState::new(true), io_error()).is_continue());
226        assert!(
227            p.on_error(&RetryState::new(false), io_error())
228                .is_permanent()
229        );
230
231        let t = p.on_throttle(&RetryState::new(true), io_error());
232        assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
233    }
234
235    #[test]
236    fn timeout() {
237        let p = RetryableErrors;
238        assert!(
239            p.on_error(&RetryState::new(true), timeout_error())
240                .is_continue()
241        );
242        assert!(
243            p.on_error(&RetryState::new(false), timeout_error())
244                .is_permanent()
245        );
246
247        let t = p.on_throttle(&RetryState::new(true), timeout_error());
248        assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
249    }
250
251    #[test]
252    fn continue_on_308() {
253        let inner: Arc<dyn RetryPolicy + 'static> = Arc::new(RetryableErrors);
254        let p = ContinueOn308::new(inner);
255        assert!(
256            p.on_error(&RetryState::new(true), http_error(308))
257                .is_continue()
258        );
259        assert!(
260            p.on_error(&RetryState::new(false), http_error(308))
261                .is_continue()
262        );
263
264        assert!(
265            p.on_error(&RetryState::new(true), http_error(429))
266                .is_continue()
267        );
268        assert!(
269            p.on_error(&RetryState::new(false), http_error(429))
270                .is_permanent()
271        );
272
273        let t = p.on_throttle(&RetryState::new(true), http_error(308));
274        assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
275
276        let t = p.on_throttle(&RetryState::new(true), http_error(429));
277        assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
278    }
279
280    fn http_error(code: u16) -> Error {
281        Error::http(code, HeaderMap::new(), bytes::Bytes::new())
282    }
283
284    fn grpc_error(code: Code) -> Error {
285        let status = gax::error::rpc::Status::default().set_code(code);
286        Error::service(status)
287    }
288
289    fn timeout_error() -> Error {
290        Error::timeout(tonic::Status::deadline_exceeded("try again"))
291    }
292
293    fn io_error() -> Error {
294        Error::io(tonic::Status::unavailable("try again"))
295    }
296}