use google_cloud_gax::error::Error;
use google_cloud_gax::{
retry_policy::{RetryPolicy, RetryPolicyExt},
retry_result::RetryResult,
retry_state::RetryState,
};
use std::sync::Arc;
use std::time::Duration;
pub(crate) fn storage_default() -> impl RetryPolicy {
RetryableErrors.with_time_limit(Duration::from_secs(300))
}
#[derive(Clone, Debug)]
pub struct RetryableErrors;
impl RetryPolicy for RetryableErrors {
fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
if error.is_transient_and_before_rpc() {
return RetryResult::Continue(error);
}
if !state.idempotent {
return RetryResult::Permanent(error);
}
if error.is_io() || error.is_timeout() {
return RetryResult::Continue(error);
}
if error.is_transport() && error.http_status_code().is_none() {
return RetryResult::Continue(error);
}
if let Some(code) = error.http_status_code() {
return match code {
408 | 429 | 500..600 => RetryResult::Continue(error),
_ => RetryResult::Permanent(error),
};
}
if let Some(code) = error.status().map(|s| s.code) {
use google_cloud_gax::error::rpc::Code;
return match code {
Code::Internal | Code::ResourceExhausted | Code::Unavailable => {
RetryResult::Continue(error)
}
Code::DeadlineExceeded => RetryResult::Continue(error),
_ => RetryResult::Permanent(error),
};
}
RetryResult::Permanent(error)
}
}
#[derive(Clone, Debug)]
pub(crate) struct ContinueOn308<T> {
inner: T,
}
impl<T> ContinueOn308<T> {
pub fn new(inner: T) -> Self {
Self { inner }
}
}
impl RetryPolicy for ContinueOn308<Arc<dyn RetryPolicy + 'static>> {
fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
if error.http_status_code() == Some(308) {
return RetryResult::Continue(error);
}
self.inner.on_error(state, error)
}
}
#[cfg(test)]
mod tests {
use super::*;
use gaxi::grpc::tonic::Status;
use google_cloud_gax::error::rpc::Code;
use google_cloud_gax::throttle_result::ThrottleResult;
use http::HeaderMap;
use test_case::test_case;
#[test_case(408)]
#[test_case(429)]
#[test_case(500)]
#[test_case(502)]
#[test_case(503)]
#[test_case(504)]
fn retryable_http(code: u16) {
let p = RetryableErrors;
assert!(
p.on_error(&RetryState::new(true), http_error(code))
.is_continue()
);
assert!(
p.on_error(&RetryState::new(false), http_error(code))
.is_permanent()
);
let t = p.on_throttle(&RetryState::new(true), http_error(code));
assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
}
#[test_case(401)]
#[test_case(403)]
fn not_recommended_http(code: u16) {
let p = RetryableErrors;
assert!(
p.on_error(&RetryState::new(true), http_error(code))
.is_permanent()
);
assert!(
p.on_error(&RetryState::new(false), http_error(code))
.is_permanent()
);
let t = p.on_throttle(&RetryState::new(true), http_error(code));
assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
}
#[test_case(Code::Unavailable)]
#[test_case(Code::Internal)]
#[test_case(Code::ResourceExhausted)]
#[test_case(Code::DeadlineExceeded)]
fn retryable_grpc(code: Code) {
let p = RetryableErrors;
assert!(
p.on_error(&RetryState::new(true), grpc_error(code))
.is_continue()
);
assert!(
p.on_error(&RetryState::new(false), grpc_error(code))
.is_permanent()
);
let t = p.on_throttle(&RetryState::new(true), grpc_error(code));
assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
}
#[test_case(Code::Unauthenticated)]
#[test_case(Code::PermissionDenied)]
fn not_recommended_grpc(code: Code) {
let p = RetryableErrors;
assert!(
p.on_error(&RetryState::new(true), grpc_error(code))
.is_permanent()
);
assert!(
p.on_error(&RetryState::new(false), grpc_error(code))
.is_permanent()
);
let t = p.on_throttle(&RetryState::new(true), grpc_error(code));
assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
}
#[test]
fn io() {
let p = RetryableErrors;
assert!(p.on_error(&RetryState::new(true), io_error()).is_continue());
assert!(
p.on_error(&RetryState::new(false), io_error())
.is_permanent()
);
let t = p.on_throttle(&RetryState::new(true), io_error());
assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
}
#[test]
fn timeout() {
let p = RetryableErrors;
assert!(
p.on_error(&RetryState::new(true), timeout_error())
.is_continue()
);
assert!(
p.on_error(&RetryState::new(false), timeout_error())
.is_permanent()
);
let t = p.on_throttle(&RetryState::new(true), timeout_error());
assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
}
#[test]
fn continue_on_308() {
let inner: Arc<dyn RetryPolicy + 'static> = Arc::new(RetryableErrors);
let p = ContinueOn308::new(inner);
assert!(
p.on_error(&RetryState::new(true), http_error(308))
.is_continue()
);
assert!(
p.on_error(&RetryState::new(false), http_error(308))
.is_continue()
);
assert!(
p.on_error(&RetryState::new(true), http_error(429))
.is_continue()
);
assert!(
p.on_error(&RetryState::new(false), http_error(429))
.is_permanent()
);
let t = p.on_throttle(&RetryState::new(true), http_error(308));
assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
let t = p.on_throttle(&RetryState::new(true), http_error(429));
assert!(matches!(t, ThrottleResult::Continue(_)), "{t:?}");
}
fn http_error(code: u16) -> Error {
Error::http(code, HeaderMap::new(), bytes::Bytes::new())
}
fn grpc_error(code: Code) -> Error {
let status = google_cloud_gax::error::rpc::Status::default().set_code(code);
Error::service(status)
}
fn timeout_error() -> Error {
Error::timeout(Status::deadline_exceeded("try again"))
}
fn io_error() -> Error {
Error::io(Status::unavailable("try again"))
}
}