use google_cloud_gax::error::Error;
use google_cloud_gax::retry_policy::{Aip194Strict, RetryPolicy};
use google_cloud_gax::retry_result::RetryResult;
use google_cloud_gax::retry_state::RetryState;
use google_cloud_gax::throttle_result::ThrottleResult;
use std::time::Duration;
#[derive(Clone, Debug)]
pub(crate) struct SpannerRetryPolicy {
inner: Aip194Strict,
}
impl SpannerRetryPolicy {
pub(crate) fn new() -> Self {
Self {
inner: Aip194Strict,
}
}
}
impl Default for SpannerRetryPolicy {
fn default() -> Self {
Self::new()
}
}
impl RetryPolicy for SpannerRetryPolicy {
fn on_error(&self, state: &RetryState, error: Error) -> RetryResult {
let result = self.inner.on_error(state, error);
match result {
RetryResult::Permanent(error)
if state.idempotent && (error.is_transport() || error.is_io()) =>
{
RetryResult::Continue(error)
}
res => res,
}
}
fn on_throttle(&self, state: &RetryState, error: Error) -> ThrottleResult {
self.inner.on_throttle(state, error)
}
fn remaining_time(&self, state: &RetryState) -> Option<Duration> {
self.inner.remaining_time(state)
}
}
#[cfg(test)]
mod tests {
use super::*;
use google_cloud_gax::error::Error as GaxError;
use google_cloud_gax::error::rpc::{Code, Status};
use http::HeaderMap;
#[test]
fn test_spanner_retry_policy_idempotent() {
let policy = SpannerRetryPolicy::new();
let state = RetryState::new(true);
let status = Status::default()
.set_code(Code::Unavailable)
.set_message("Service Unavailable");
let err = GaxError::service(status);
assert!(
policy.on_error(&state, err).is_continue(),
"Expected UNAVAILABLE to be retried when idempotent"
);
let status = Status::default()
.set_code(Code::PermissionDenied)
.set_message("Denied");
let err = GaxError::service(status);
assert!(
policy.on_error(&state, err).is_permanent(),
"Expected PERMISSION_DENIED to not be retried"
);
let err = GaxError::transport(
HeaderMap::new(),
std::io::Error::new(std::io::ErrorKind::ConnectionReset, "connection closed"),
);
assert!(
policy.on_error(&state, err).is_continue(),
"Expected transport connection reset to be retried when idempotent"
);
}
#[test]
fn test_spanner_retry_policy_non_idempotent() {
let policy = SpannerRetryPolicy::new();
let state = RetryState::new(false);
let status = Status::default()
.set_code(Code::Unavailable)
.set_message("Service Unavailable");
let err = GaxError::service(status);
assert!(
policy.on_error(&state, err).is_permanent(),
"Expected UNAVAILABLE to be permanent when non-idempotent"
);
let err = GaxError::transport(
HeaderMap::new(),
std::io::Error::new(std::io::ErrorKind::ConnectionReset, "connection closed"),
);
assert!(
policy.on_error(&state, err).is_permanent(),
"Expected transport connection reset to not be retried when non-idempotent"
);
}
}