1pub fn classify_error(err: &anyhow::Error) -> (bool, bool, u64) {
8 if let Some(pg) = err.downcast_ref::<postgres::Error>() {
10 if let Some(db) = pg.as_db_error() {
11 return classify_pg_sqlstate(db.code());
12 }
13 if pg.is_closed() {
15 return (true, true, 0);
16 }
17 }
18
19 if let Some(result) = err
21 .downcast_ref::<mysql::Error>()
22 .and_then(classify_mysql_error)
23 {
24 return result;
25 }
26
27 let msg = format!("{:#}", err).to_lowercase();
29
30 if msg.contains("loading credential")
32 || msg.contains("loadcredential")
33 || msg.contains("metadata.google.internal")
34 || msg.contains("permission denied")
35 || msg.contains("access denied")
36 || msg.contains("invalid_grant")
37 || msg.contains("token has been expired or revoked")
38 {
39 return (false, false, 0);
40 }
41
42 if msg.contains("connection reset")
44 || msg.contains("broken pipe")
45 || msg.contains("connection refused")
46 || msg.contains("no route to host")
47 || msg.contains("network is unreachable")
48 || msg.contains("name resolution")
49 || msg.contains("dns")
50 || msg.contains("ssl handshake")
51 || msg.contains("i/o timeout")
52 || msg.contains("unexpected eof")
53 || msg.contains("closed the connection unexpectedly")
54 || msg.contains("got an error reading communication packets")
55 {
56 return (true, true, 0);
57 }
58
59 if msg.contains("gone away")
61 || msg.contains("lost connection")
62 || msg.contains("the server closed the connection")
63 || msg.contains("can't connect to mysql server")
64 {
65 return (true, true, 0);
66 }
67
68 if msg.contains("timed out")
70 || msg.contains("timeout")
71 || msg.contains("canceling statement")
72 || msg.contains("lock wait timeout")
73 || msg.contains("execution time exceeded")
74 {
75 return (true, false, 0);
76 }
77
78 if msg.contains("too many connections")
80 || msg.contains("the database system is starting up")
81 || msg.contains("the database system is shutting down")
82 {
83 return (true, true, 15_000);
84 }
85
86 if msg.contains("deadlock") || msg.contains("could not serialize access") {
88 return (true, false, 1_000);
89 }
90
91 (false, false, 0)
93}
94
95fn classify_pg_sqlstate(code: &postgres::error::SqlState) -> (bool, bool, u64) {
98 use postgres::error::SqlState;
99
100 if *code == SqlState::CONNECTION_EXCEPTION
102 || *code == SqlState::CONNECTION_DOES_NOT_EXIST
103 || *code == SqlState::CONNECTION_FAILURE
104 || *code == SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION
105 || *code == SqlState::SQLSERVER_REJECTED_ESTABLISHMENT_OF_SQLCONNECTION
106 || code.code().starts_with("08")
107 {
108 return (true, true, 0);
109 }
110
111 if *code == SqlState::ADMIN_SHUTDOWN
113 || *code == SqlState::CRASH_SHUTDOWN
114 || *code == SqlState::CANNOT_CONNECT_NOW
115 {
116 return (true, true, 15_000);
117 }
118
119 if *code == SqlState::TOO_MANY_CONNECTIONS {
121 return (true, true, 15_000);
122 }
123
124 if *code == SqlState::T_R_SERIALIZATION_FAILURE {
126 return (true, false, 1_000);
127 }
128 if *code == SqlState::T_R_DEADLOCK_DETECTED {
129 return (true, false, 1_000);
130 }
131
132 if *code == SqlState::QUERY_CANCELED {
134 return (true, false, 0);
135 }
136
137 if code.code().starts_with("53") {
139 return (true, false, 5_000);
140 }
141
142 if code.code().starts_with("28") {
144 return (false, false, 0);
145 }
146
147 if code.code().starts_with("42") {
149 return (false, false, 0);
150 }
151
152 (false, false, 0)
154}
155
156fn classify_mysql_error(err: &mysql::Error) -> Option<(bool, bool, u64)> {
159 match err {
160 mysql::Error::MySqlError(me) => {
161 match me.code {
162 1213 => Some((true, false, 1_000)),
164 1205 => Some((true, false, 0)),
166 1040 => Some((true, true, 15_000)),
168 1053 => Some((true, true, 15_000)),
170 1045 | 1044 => Some((false, false, 0)),
172 1049 | 1146 | 1064 => Some((false, false, 0)),
174 _ => None,
175 }
176 }
177 mysql::Error::IoError(_) => Some((true, true, 0)),
178 _ => None,
179 }
180}
181
182#[cfg(test)]
183pub(crate) fn is_transient(err: &anyhow::Error) -> bool {
184 classify_error(err).0
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190
191 #[test]
192 fn test_is_transient_matches() {
193 assert!(is_transient(&anyhow::anyhow!("statement timed out")));
194 assert!(is_transient(&anyhow::anyhow!("connection reset")));
195 }
196
197 #[test]
198 fn test_is_transient_rejects() {
199 assert!(!is_transient(&anyhow::anyhow!("syntax error")));
200 assert!(!is_transient(&anyhow::anyhow!("permission denied")));
201 assert!(!is_transient(&anyhow::anyhow!("table not found")));
202 }
203
204 #[test]
205 fn test_classify_network_errors_need_reconnect() {
206 let cases = [
207 "connection refused",
208 "no route to host",
209 "network is unreachable",
210 "broken pipe",
211 "unexpected eof",
212 "MySQL server has gone away",
213 "lost connection to server",
214 "can't connect to mysql server",
215 "the server closed the connection",
216 "got an error reading communication packets",
217 "ssl handshake failed",
218 ];
219 for msg in cases {
220 let (transient, reconnect, _) = classify_error(&anyhow::anyhow!("{}", msg));
221 assert!(transient, "should be transient: {}", msg);
222 assert!(reconnect, "should need reconnect: {}", msg);
223 }
224 }
225
226 #[test]
227 fn test_classify_timeout_no_reconnect() {
228 let (t, r, _) = classify_error(&anyhow::anyhow!("statement timed out"));
229 assert!(t);
230 assert!(!r, "timeout should not require reconnect");
231
232 let (t, r, _) = classify_error(&anyhow::anyhow!("lock wait timeout exceeded"));
233 assert!(t);
234 assert!(!r);
235 }
236
237 #[test]
238 fn test_classify_capacity_errors_extra_delay() {
239 let (t, r, delay) = classify_error(&anyhow::anyhow!("too many connections"));
240 assert!(t);
241 assert!(r);
242 assert!(
243 delay >= 10_000,
244 "capacity errors should have extra delay, got: {}ms",
245 delay
246 );
247
248 let (t, _, delay) = classify_error(&anyhow::anyhow!("the database system is starting up"));
249 assert!(t);
250 assert!(delay >= 10_000);
251 }
252
253 #[test]
254 fn test_classify_deadlock_retryable() {
255 let (t, r, delay) = classify_error(&anyhow::anyhow!("deadlock detected"));
256 assert!(t);
257 assert!(!r, "deadlock should not require reconnect");
258 assert!(delay >= 1_000, "deadlock should have small extra delay");
259 }
260
261 #[test]
262 fn test_classify_permanent_errors() {
263 let cases = [
264 "syntax error",
265 "permission denied",
266 "relation does not exist",
267 "column not found",
268 ];
269 for msg in cases {
270 let (transient, _, _) = classify_error(&anyhow::anyhow!("{}", msg));
271 assert!(!transient, "should NOT be transient: {}", msg);
272 }
273 }
274
275 #[test]
276 fn test_classify_credential_errors_not_transient() {
277 let cases = [
278 "loading credential to sign http request",
279 "error sending request for url (http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token): dns error",
280 "invalid_grant: Token has been expired or revoked",
281 "Access Denied: no permission",
282 ];
283 for msg in cases {
284 let (transient, _, _) = classify_error(&anyhow::anyhow!("{}", msg));
285 assert!(
286 !transient,
287 "credential error should NOT be transient: {}",
288 msg
289 );
290 }
291 }
292}