Skip to main content

google_cloud_pubsub/
retry_policy.rs

1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines a retry policy for Cloud Pub/Sub.
16
17use crate::Error;
18use google_cloud_gax::retry_policy::RetryPolicy;
19use google_cloud_gax::retry_result::RetryResult;
20use google_cloud_gax::retry_state::RetryState;
21
22/// Follows the retry strategy recommended by the Cloud Pub/Sub guides on
23/// [error codes].
24///
25/// ```
26/// # async fn sample() -> anyhow::Result<()> {
27/// # use google_cloud_gax::retry_policy::RetryPolicyExt;
28/// # use google_cloud_pubsub::client::Publisher;
29/// # use google_cloud_pubsub::retry_policy::RetryableErrors;
30/// let policy = RetryableErrors.with_time_limit(std::time::Duration::from_secs(60));
31/// let publisher = Publisher::builder("projects/my-project/topics/my-topic")
32///     .with_retry_policy(policy)
33///     .build()
34///     .await?;
35/// # Ok(())
36/// # }
37/// ```
38///
39/// This policy must be decorated to limit the duration of the retry loop.
40///
41/// [error codes]: https://docs.cloud.google.com/pubsub/docs/reference/error-codes
42#[derive(Clone, Debug)]
43pub struct RetryableErrors;
44
45impl RetryPolicy for RetryableErrors {
46    fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
47        if error.is_transient_and_before_rpc() {
48            return RetryResult::Continue(error);
49        }
50
51        if error.is_io() || error.is_timeout() {
52            return RetryResult::Continue(error);
53        }
54
55        if error.is_transport() && error.http_status_code().is_none() {
56            // Sometimes gRPC returns a transport error without an HTTP status
57            // code. We treat all of these as I/O errors and therefore
58            // retryable.
59            return RetryResult::Continue(error);
60        }
61
62        // Catch raw HTTP errors that may not have been mapped to a gRPC status.
63        // - 429: Resource Exhausted
64        // - 500: Unknown
65        // - 502: Bad Gateway
66        // - 503: Service Unavailable
67        // - 504: Gateway Timeout
68        if let Some(429 | 500 | 502 | 503 | 504) = error.http_status_code() {
69            return RetryResult::Continue(error);
70        }
71
72        if let Some(status) = error.status() {
73            use google_cloud_gax::error::rpc::Code;
74            return match status.code {
75                Code::Aborted
76                | Code::DeadlineExceeded
77                | Code::Internal
78                | Code::ResourceExhausted
79                | Code::Unavailable
80                | Code::Unknown => RetryResult::Continue(error),
81                _ => RetryResult::Permanent(error),
82            };
83        }
84
85        RetryResult::Permanent(error)
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92    use google_cloud_gax::error::rpc::{Code, Status};
93    use google_cloud_gax::retry_state::RetryState;
94    use http::HeaderMap;
95    use test_case::test_case;
96
97    #[test]
98    fn transport_reset() {
99        let p = RetryableErrors;
100        assert!(
101            p.on_error(&RetryState::default(), transport_err())
102                .is_continue()
103        );
104    }
105
106    #[test_case(429)]
107    #[test_case(500)]
108    #[test_case(502)]
109    #[test_case(503)]
110    #[test_case(504)]
111    fn retryable_http(code: u16) {
112        let p = RetryableErrors;
113        assert!(
114            p.on_error(&RetryState::default(), http_error(code))
115                .is_continue()
116        );
117    }
118
119    #[test_case(400)]
120    #[test_case(404)]
121    #[test_case(408)]
122    #[test_case(409)]
123    #[test_case(499)]
124    #[test_case(501)]
125    #[test_case(505)]
126    fn permanent_http(code: u16) {
127        let p = RetryableErrors;
128        assert!(
129            p.on_error(&RetryState::default(), http_error(code))
130                .is_permanent()
131        );
132    }
133
134    #[test_case(Code::Unavailable)]
135    #[test_case(Code::Internal)]
136    #[test_case(Code::Aborted)]
137    #[test_case(Code::ResourceExhausted)]
138    #[test_case(Code::DeadlineExceeded)]
139    #[test_case(Code::Unknown)]
140    fn retryable_grpc(code: Code) {
141        let p = RetryableErrors;
142        assert!(
143            p.on_error(&RetryState::default(), grpc_error(code))
144                .is_continue()
145        );
146    }
147
148    #[test_case(Code::NotFound)]
149    #[test_case(Code::PermissionDenied)]
150    #[test_case(Code::InvalidArgument)]
151    #[test_case(Code::Cancelled)]
152    fn permanent_grpc(code: Code) {
153        let p = RetryableErrors;
154        assert!(
155            p.on_error(&RetryState::default(), grpc_error(code))
156                .is_permanent()
157        );
158    }
159
160    #[test]
161    fn io() {
162        let p = RetryableErrors;
163        assert!(p.on_error(&RetryState::default(), io_error()).is_continue());
164    }
165
166    #[test]
167    fn permanent_auth() {
168        let p = RetryableErrors;
169        let auth_error =
170            google_cloud_gax::error::CredentialsError::from_msg(false, "permanent auth error");
171        assert!(
172            p.on_error(&RetryState::default(), Error::authentication(auth_error))
173                .is_permanent()
174        );
175    }
176
177    #[test]
178    fn transient_auth() {
179        let p = RetryableErrors;
180        let auth_error =
181            google_cloud_gax::error::CredentialsError::from_msg(true, "transient auth error");
182        assert!(
183            p.on_error(&RetryState::default(), Error::authentication(auth_error))
184                .is_continue()
185        );
186    }
187
188    fn transport_err() -> Error {
189        Error::transport(HeaderMap::new(), "connection closed")
190    }
191
192    fn http_error(code: u16) -> Error {
193        Error::http(code, HeaderMap::new(), bytes::Bytes::new())
194    }
195
196    fn grpc_error(code: Code) -> Error {
197        let status = Status::default().set_code(code).set_message("try again");
198        Error::service(status)
199    }
200
201    fn io_error() -> Error {
202        Error::io(gaxi::grpc::tonic::Status::unavailable("try again"))
203    }
204}