use crate::Error;
use google_cloud_gax::retry_policy::RetryPolicy;
use google_cloud_gax::retry_result::RetryResult;
use google_cloud_gax::retry_state::RetryState;
#[derive(Clone, Debug)]
pub struct RetryableErrors;
impl RetryPolicy for RetryableErrors {
fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
if error.is_transient_and_before_rpc() {
return RetryResult::Continue(error);
}
if error.is_io() || error.is_timeout() {
return RetryResult::Continue(error);
}
if error.is_transport() && error.http_status_code().is_none() {
return RetryResult::Continue(error);
}
if let Some(429 | 500 | 502 | 503 | 504) = error.http_status_code() {
return RetryResult::Continue(error);
}
if let Some(status) = error.status() {
use google_cloud_gax::error::rpc::Code;
return match status.code {
Code::Aborted
| Code::DeadlineExceeded
| Code::Internal
| Code::ResourceExhausted
| Code::Unavailable
| Code::Unknown => RetryResult::Continue(error),
_ => RetryResult::Permanent(error),
};
}
RetryResult::Permanent(error)
}
}
#[cfg(test)]
mod tests {
use super::*;
use google_cloud_gax::error::rpc::{Code, Status};
use google_cloud_gax::retry_state::RetryState;
use http::HeaderMap;
use test_case::test_case;
#[test]
fn transport_reset() {
let p = RetryableErrors;
assert!(
p.on_error(&RetryState::default(), transport_err())
.is_continue()
);
}
#[test_case(429)]
#[test_case(500)]
#[test_case(502)]
#[test_case(503)]
#[test_case(504)]
fn retryable_http(code: u16) {
let p = RetryableErrors;
assert!(
p.on_error(&RetryState::default(), http_error(code))
.is_continue()
);
}
#[test_case(400)]
#[test_case(404)]
#[test_case(408)]
#[test_case(409)]
#[test_case(499)]
#[test_case(501)]
#[test_case(505)]
fn permanent_http(code: u16) {
let p = RetryableErrors;
assert!(
p.on_error(&RetryState::default(), http_error(code))
.is_permanent()
);
}
#[test_case(Code::Unavailable)]
#[test_case(Code::Internal)]
#[test_case(Code::Aborted)]
#[test_case(Code::ResourceExhausted)]
#[test_case(Code::DeadlineExceeded)]
#[test_case(Code::Unknown)]
fn retryable_grpc(code: Code) {
let p = RetryableErrors;
assert!(
p.on_error(&RetryState::default(), grpc_error(code))
.is_continue()
);
}
#[test_case(Code::NotFound)]
#[test_case(Code::PermissionDenied)]
#[test_case(Code::InvalidArgument)]
#[test_case(Code::Cancelled)]
fn permanent_grpc(code: Code) {
let p = RetryableErrors;
assert!(
p.on_error(&RetryState::default(), grpc_error(code))
.is_permanent()
);
}
#[test]
fn io() {
let p = RetryableErrors;
assert!(p.on_error(&RetryState::default(), io_error()).is_continue());
}
#[test]
fn permanent_auth() {
let p = RetryableErrors;
let auth_error =
google_cloud_gax::error::CredentialsError::from_msg(false, "permanent auth error");
assert!(
p.on_error(&RetryState::default(), Error::authentication(auth_error))
.is_permanent()
);
}
#[test]
fn transient_auth() {
let p = RetryableErrors;
let auth_error =
google_cloud_gax::error::CredentialsError::from_msg(true, "transient auth error");
assert!(
p.on_error(&RetryState::default(), Error::authentication(auth_error))
.is_continue()
);
}
fn transport_err() -> Error {
Error::transport(HeaderMap::new(), "connection closed")
}
fn http_error(code: u16) -> Error {
Error::http(code, HeaderMap::new(), bytes::Bytes::new())
}
fn grpc_error(code: Code) -> Error {
let status = Status::default().set_code(code).set_message("try again");
Error::service(status)
}
fn io_error() -> Error {
Error::io(gaxi::grpc::tonic::Status::unavailable("try again"))
}
}