Skip to main content

google_cloud_storage/
retry_policy.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines the retry policies for Google Cloud Storage.
16//!
17//! The storage service [recommends] retrying several 408, 429, and all 5xx HTTP
18//! status codes. This is confirmed in the description of each status code:
19//!
20//! - [408 - Request Timeout][408]
21//! - [429 - Too Many Requests][429]
22//! - [500 - Internal Server Error][500]
23//! - [502 - Bad Gateway][502]
24//! - [503 - Service Unavailable][503]
25//! - [504 - Gateway Timeout][504]
26//!
27//! In addition, resumable uploads return [308 - Resume Incomplete][308]. This
28//! is not handled by the [RetryableErrors] retry policy.
29//!
30//! [recommends]: https://cloud.google.com/storage/docs/retry-strategy
31//! [308]: https://cloud.google.com/storage/docs/json_api/v1/status-codes#308_Resume_Incomplete
32//! [408]: https://cloud.google.com/storage/docs/json_api/v1/status-codes#408_Request_Timeout
33//! [429]: https://cloud.google.com/storage/docs/json_api/v1/status-codes#429_Too_Many_Requests
34//! [500]: https://cloud.google.com/storage/docs/json_api/v1/status-codes#500_Internal_Server_Error
35//! [502]: https://cloud.google.com/storage/docs/json_api/v1/status-codes#502_Bad_Gateway
36//! [503]: https://cloud.google.com/storage/docs/json_api/v1/status-codes#503_Service_Unavailable
37//! [504]: https://cloud.google.com/storage/docs/json_api/v1/status-codes#504_Gateway_Timeout
38
39use google_cloud_gax::error::Error;
40use google_cloud_gax::{
41    retry_policy::{RetryPolicy, RetryPolicyExt},
42    retry_result::RetryResult,
43    retry_state::RetryState,
44};
45use std::sync::Arc;
46use std::time::Duration;
47
48/// The default retry policy for the Storage client.
49///
50/// The client will retry all the errors shown as retryable in the service
51/// documentation, and stop retrying after 10 seconds.
52pub(crate) fn storage_default() -> impl RetryPolicy {
53    RetryableErrors.with_time_limit(Duration::from_secs(300))
54}
55
56/// Follows the [retry strategy] recommended by the Cloud Storage service guides.
57///
58/// This policy must be decorated to limit the number of retry attempts and/or
59/// the duration of the retry loop.
60///
61/// # Example
62/// ```
63/// # use google_cloud_storage::retry_policy::RetryableErrors;
64/// use google_cloud_gax::retry_policy::RetryPolicyExt;
65/// use google_cloud_storage::client::Storage;
66/// use std::time::Duration;
67/// let builder = Storage::builder().with_retry_policy(
68///     RetryableErrors
69///         .with_time_limit(Duration::from_secs(60))
70///         .with_attempt_limit(10),
71/// );
72/// ```
73///
74/// [retry strategy]: https://cloud.google.com/storage/docs/retry-strategy
75#[derive(Clone, Debug)]
76pub struct RetryableErrors;
77
78impl RetryPolicy for RetryableErrors {
79    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
80        if error.is_transient_and_before_rpc() {
81            return RetryResult::Continue(error);
82        }
83        if !state.idempotent {
84            return RetryResult::Permanent(error);
85        }
86        if error.is_io() || error.is_timeout() {
87            return RetryResult::Continue(error);
88        }
89        if error.is_transport() && error.http_status_code().is_none() {
90            // Sometimes gRPC returns a transport error without an HTTP status
91            // code. We treat all of these as I/O errors and therefore
92            // retryable.
93            return RetryResult::Continue(error);
94        }
95        if let Some(code) = error.http_status_code() {
96            return match code {
97                408 | 429 | 500..600 => RetryResult::Continue(error),
98                _ => RetryResult::Permanent(error),
99            };
100        }
101        if let Some(code) = error.status().map(|s| s.code) {
102            use google_cloud_gax::error::rpc::Code;
103            return match code {
104                Code::Internal | Code::ResourceExhausted | Code::Unavailable => {
105                    RetryResult::Continue(error)
106                }
107                // Over gRPC, the service returns DeadlineExceeded for some
108                // "Internal Error; please retry" conditions.
109                Code::DeadlineExceeded => RetryResult::Continue(error),
110                _ => RetryResult::Permanent(error),
111            };
112        }
113        RetryResult::Permanent(error)
114    }
115}
116
117/// Decorate the retry policy to continue on 308 errors.
118///
119/// Used internally to handle the resumable upload loop.
120#[derive(Clone, Debug)]
121pub(crate) struct ContinueOn308<T> {
122    inner: T,
123}
124
125impl<T> ContinueOn308<T> {
126    pub fn new(inner: T) -> Self {
127        Self { inner }
128    }
129}
130
131impl RetryPolicy for ContinueOn308<Arc<dyn RetryPolicy + 'static>> {
132    fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
133        if error.http_status_code() == Some(308) {
134            return RetryResult::Continue(error);
135        }
136        self.inner.on_error(state, error)
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    use gaxi::grpc::tonic::Status;
144    use google_cloud_gax::error::rpc::Code;
145    use google_cloud_gax::throttle_result::ThrottleResult;
146    use http::HeaderMap;
147    use test_case::test_case;
148
149    #[test_case(408)]
150    #[test_case(429)]
151    #[test_case(500)]
152    #[test_case(502)]
153    #[test_case(503)]
154    #[test_case(504)]
155    fn retryable_http(code: u16) {
156        let p = RetryableErrors;
157        assert!(
158            p.on_error(&RetryState::new(true), http_error(code))
159                .is_continue()
160        );
161        assert!(
162            p.on_error(&RetryState::new(false), http_error(code))
163                .is_permanent()
164        );
165
166        let t = p.on_throttle(&RetryState::new(true), http_error(code));
167        assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
168    }
169
170    #[test_case(401)]
171    #[test_case(403)]
172    fn not_recommended_http(code: u16) {
173        let p = RetryableErrors;
174        assert!(
175            p.on_error(&RetryState::new(true), http_error(code))
176                .is_permanent()
177        );
178        assert!(
179            p.on_error(&RetryState::new(false), http_error(code))
180                .is_permanent()
181        );
182
183        let t = p.on_throttle(&RetryState::new(true), http_error(code));
184        assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
185    }
186
187    #[test_case(Code::Unavailable)]
188    #[test_case(Code::Internal)]
189    #[test_case(Code::ResourceExhausted)]
190    #[test_case(Code::DeadlineExceeded)]
191    fn retryable_grpc(code: Code) {
192        let p = RetryableErrors;
193        assert!(
194            p.on_error(&RetryState::new(true), grpc_error(code))
195                .is_continue()
196        );
197        assert!(
198            p.on_error(&RetryState::new(false), grpc_error(code))
199                .is_permanent()
200        );
201
202        let t = p.on_throttle(&RetryState::new(true), grpc_error(code));
203        assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
204    }
205
206    #[test_case(Code::Unauthenticated)]
207    #[test_case(Code::PermissionDenied)]
208    fn not_recommended_grpc(code: Code) {
209        let p = RetryableErrors;
210        assert!(
211            p.on_error(&RetryState::new(true), grpc_error(code))
212                .is_permanent()
213        );
214        assert!(
215            p.on_error(&RetryState::new(false), grpc_error(code))
216                .is_permanent()
217        );
218
219        let t = p.on_throttle(&RetryState::new(true), grpc_error(code));
220        assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
221    }
222
223    #[test]
224    fn io() {
225        let p = RetryableErrors;
226        assert!(p.on_error(&RetryState::new(true), io_error()).is_continue());
227        assert!(
228            p.on_error(&RetryState::new(false), io_error())
229                .is_permanent()
230        );
231
232        let t = p.on_throttle(&RetryState::new(true), io_error());
233        assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
234    }
235
236    #[test]
237    fn timeout() {
238        let p = RetryableErrors;
239        assert!(
240            p.on_error(&RetryState::new(true), timeout_error())
241                .is_continue()
242        );
243        assert!(
244            p.on_error(&RetryState::new(false), timeout_error())
245                .is_permanent()
246        );
247
248        let t = p.on_throttle(&RetryState::new(true), timeout_error());
249        assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
250    }
251
252    #[test]
253    fn continue_on_308() {
254        let inner: Arc<dyn RetryPolicy + 'static> = Arc::new(RetryableErrors);
255        let p = ContinueOn308::new(inner);
256        assert!(
257            p.on_error(&RetryState::new(true), http_error(308))
258                .is_continue()
259        );
260        assert!(
261            p.on_error(&RetryState::new(false), http_error(308))
262                .is_continue()
263        );
264
265        assert!(
266            p.on_error(&RetryState::new(true), http_error(429))
267                .is_continue()
268        );
269        assert!(
270            p.on_error(&RetryState::new(false), http_error(429))
271                .is_permanent()
272        );
273
274        let t = p.on_throttle(&RetryState::new(true), http_error(308));
275        assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
276
277        let t = p.on_throttle(&RetryState::new(true), http_error(429));
278        assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
279    }
280
281    fn http_error(code: u16) -> Error {
282        Error::http(code, HeaderMap::new(), bytes::Bytes::new())
283    }
284
285    fn grpc_error(code: Code) -> Error {
286        let status = google_cloud_gax::error::rpc::Status::default().set_code(code);
287        Error::service(status)
288    }
289    fn timeout_error() -> Error {
290        Error::timeout(Status::deadline_exceeded("try again"))
291    }
292
293    fn io_error() -> Error {
294        Error::io(Status::unavailable("try again"))
295    }
296}