google_cloud_pubsub/
retry_policy.rs1use crate::Error;
18use google_cloud_gax::retry_policy::RetryPolicy;
19use google_cloud_gax::retry_result::RetryResult;
20use google_cloud_gax::retry_state::RetryState;
21
22#[derive(Clone, Debug)]
43pub struct RetryableErrors;
44
45impl RetryPolicy for RetryableErrors {
46 fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
47 if error.is_transient_and_before_rpc() {
48 return RetryResult::Continue(error);
49 }
50
51 if error.is_io() || error.is_timeout() {
52 return RetryResult::Continue(error);
53 }
54
55 if error.is_transport() && error.http_status_code().is_none() {
56 return RetryResult::Continue(error);
60 }
61
62 if let Some(429 | 500 | 502 | 503 | 504) = error.http_status_code() {
69 return RetryResult::Continue(error);
70 }
71
72 if let Some(status) = error.status() {
73 use google_cloud_gax::error::rpc::Code;
74 return match status.code {
75 Code::Aborted
76 | Code::DeadlineExceeded
77 | Code::Internal
78 | Code::ResourceExhausted
79 | Code::Unavailable
80 | Code::Unknown => RetryResult::Continue(error),
81 _ => RetryResult::Permanent(error),
82 };
83 }
84
85 RetryResult::Permanent(error)
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92 use google_cloud_gax::error::rpc::{Code, Status};
93 use google_cloud_gax::retry_state::RetryState;
94 use http::HeaderMap;
95 use test_case::test_case;
96
97 #[test]
98 fn transport_reset() {
99 let p = RetryableErrors;
100 assert!(
101 p.on_error(&RetryState::default(), transport_err())
102 .is_continue()
103 );
104 }
105
106 #[test_case(429)]
107 #[test_case(500)]
108 #[test_case(502)]
109 #[test_case(503)]
110 #[test_case(504)]
111 fn retryable_http(code: u16) {
112 let p = RetryableErrors;
113 assert!(
114 p.on_error(&RetryState::default(), http_error(code))
115 .is_continue()
116 );
117 }
118
119 #[test_case(400)]
120 #[test_case(404)]
121 #[test_case(408)]
122 #[test_case(409)]
123 #[test_case(499)]
124 #[test_case(501)]
125 #[test_case(505)]
126 fn permanent_http(code: u16) {
127 let p = RetryableErrors;
128 assert!(
129 p.on_error(&RetryState::default(), http_error(code))
130 .is_permanent()
131 );
132 }
133
134 #[test_case(Code::Unavailable)]
135 #[test_case(Code::Internal)]
136 #[test_case(Code::Aborted)]
137 #[test_case(Code::ResourceExhausted)]
138 #[test_case(Code::DeadlineExceeded)]
139 #[test_case(Code::Unknown)]
140 fn retryable_grpc(code: Code) {
141 let p = RetryableErrors;
142 assert!(
143 p.on_error(&RetryState::default(), grpc_error(code))
144 .is_continue()
145 );
146 }
147
148 #[test_case(Code::NotFound)]
149 #[test_case(Code::PermissionDenied)]
150 #[test_case(Code::InvalidArgument)]
151 #[test_case(Code::Cancelled)]
152 fn permanent_grpc(code: Code) {
153 let p = RetryableErrors;
154 assert!(
155 p.on_error(&RetryState::default(), grpc_error(code))
156 .is_permanent()
157 );
158 }
159
160 #[test]
161 fn io() {
162 let p = RetryableErrors;
163 assert!(p.on_error(&RetryState::default(), io_error()).is_continue());
164 }
165
166 #[test]
167 fn permanent_auth() {
168 let p = RetryableErrors;
169 let auth_error =
170 google_cloud_gax::error::CredentialsError::from_msg(false, "permanent auth error");
171 assert!(
172 p.on_error(&RetryState::default(), Error::authentication(auth_error))
173 .is_permanent()
174 );
175 }
176
177 #[test]
178 fn transient_auth() {
179 let p = RetryableErrors;
180 let auth_error =
181 google_cloud_gax::error::CredentialsError::from_msg(true, "transient auth error");
182 assert!(
183 p.on_error(&RetryState::default(), Error::authentication(auth_error))
184 .is_continue()
185 );
186 }
187
188 fn transport_err() -> Error {
189 Error::transport(HeaderMap::new(), "connection closed")
190 }
191
192 fn http_error(code: u16) -> Error {
193 Error::http(code, HeaderMap::new(), bytes::Bytes::new())
194 }
195
196 fn grpc_error(code: Code) -> Error {
197 let status = Status::default().set_code(code).set_message("try again");
198 Error::service(status)
199 }
200
201 fn io_error() -> Error {
202 Error::io(gaxi::grpc::tonic::Status::unavailable("try again"))
203 }
204}