use crate::Error;
use google_cloud_gax::error::rpc::Code;
pub use google_cloud_gax::retry_result::RetryResult as ResumeResult;
pub trait ReadResumePolicy: Send + Sync + std::fmt::Debug {
fn on_error(&self, status: &ResumeQuery, error: Error) -> ResumeResult;
}
pub trait ReadResumePolicyExt: Sized {
fn with_attempt_limit(self, maximum_attempts: u32) -> LimitedAttemptCount<Self> {
LimitedAttemptCount::new(self, maximum_attempts)
}
}
impl<T: ReadResumePolicy> ReadResumePolicyExt for T {}
#[derive(Debug)]
#[non_exhaustive]
pub struct ResumeQuery {
pub attempt_count: u32,
}
impl ResumeQuery {
pub fn new(attempt_count: u32) -> Self {
Self { attempt_count }
}
}
#[derive(Debug)]
pub struct Recommended;
impl ReadResumePolicy for Recommended {
fn on_error(&self, _status: &ResumeQuery, error: Error) -> ResumeResult {
match error {
e if self::is_transient(&e) => ResumeResult::Continue(e),
e => ResumeResult::Permanent(e),
}
}
}
fn is_transient(error: &Error) -> bool {
match error {
e if e.is_io() => true,
e if e.is_transport() => true,
e if e.is_timeout() => true,
e => e.status().is_some_and(|s| is_transient_code(s.code)),
}
}
fn is_transient_code(code: Code) -> bool {
matches!(
code,
Code::Unavailable | Code::ResourceExhausted | Code::Internal | Code::DeadlineExceeded
)
}
#[derive(Debug)]
pub struct AlwaysResume;
impl ReadResumePolicy for AlwaysResume {
fn on_error(&self, _status: &ResumeQuery, error: Error) -> ResumeResult {
ResumeResult::Continue(error)
}
}
#[derive(Debug)]
pub struct NeverResume;
impl ReadResumePolicy for NeverResume {
fn on_error(&self, _status: &ResumeQuery, error: Error) -> ResumeResult {
ResumeResult::Permanent(error)
}
}
#[derive(Debug)]
pub struct LimitedAttemptCount<P> {
inner: P,
maximum_attempts: u32,
}
impl<P> LimitedAttemptCount<P> {
pub fn new(inner: P, maximum_attempts: u32) -> Self {
Self {
inner,
maximum_attempts,
}
}
}
impl<P> ReadResumePolicy for LimitedAttemptCount<P>
where
P: ReadResumePolicy,
{
fn on_error(&self, status: &ResumeQuery, error: Error) -> ResumeResult {
match self.inner.on_error(status, error) {
ResumeResult::Continue(e) if status.attempt_count >= self.maximum_attempts => {
ResumeResult::Exhausted(e)
}
result => result,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn recommended() {
let policy = Recommended;
let r = policy.on_error(&ResumeQuery::new(0), common_transient());
assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
let r = policy.on_error(&ResumeQuery::new(0), common_timeout());
assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
let r = policy.on_error(&ResumeQuery::new(0), http_transient());
assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
let r = policy.on_error(&ResumeQuery::new(0), grpc_deadline_exceeded());
assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
let r = policy.on_error(&ResumeQuery::new(0), grpc_internal());
assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
let r = policy.on_error(&ResumeQuery::new(0), grpc_resource_exhausted());
assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
let r = policy.on_error(&ResumeQuery::new(0), grpc_unavailable());
assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
let r = policy.on_error(&ResumeQuery::new(0), http_permanent());
assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
let r = policy.on_error(&ResumeQuery::new(0), grpc_permanent());
assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
}
#[test]
fn always_resume() {
let policy = AlwaysResume;
let r = policy.on_error(&ResumeQuery::new(0), http_transient());
assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
let r = policy.on_error(&ResumeQuery::new(0), http_permanent());
assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
}
#[test]
fn never_resume() {
let policy = NeverResume;
let r = policy.on_error(&ResumeQuery::new(0), http_transient());
assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
let r = policy.on_error(&ResumeQuery::new(0), http_permanent());
assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
}
#[test]
fn attempt_limit() {
let policy = Recommended.with_attempt_limit(3);
let r = policy.on_error(&ResumeQuery::new(0), http_transient());
assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
let r = policy.on_error(&ResumeQuery::new(1), http_transient());
assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
let r = policy.on_error(&ResumeQuery::new(2), http_transient());
assert!(matches!(r, ResumeResult::Continue(_)), "{r:?}");
let r = policy.on_error(&ResumeQuery::new(3), http_transient());
assert!(matches!(r, ResumeResult::Exhausted(_)), "{r:?}");
let r = policy.on_error(&ResumeQuery::new(0), http_permanent());
assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
let r = policy.on_error(&ResumeQuery::new(3), http_permanent());
assert!(matches!(r, ResumeResult::Permanent(_)), "{r:?}");
}
#[test]
fn attempt_limit_inner_exhausted() {
let policy = AlwaysResume.with_attempt_limit(3).with_attempt_limit(5);
let r = policy.on_error(&ResumeQuery::new(3), http_transient());
assert!(matches!(r, ResumeResult::Exhausted(_)), "{r:?}");
}
fn http_transient() -> Error {
Error::io("test only")
}
fn http_permanent() -> Error {
Error::deser("bad data")
}
fn common_transient() -> Error {
Error::transport(http::HeaderMap::new(), "test-only")
}
fn common_timeout() -> Error {
Error::timeout("simulated timeout")
}
fn grpc_deadline_exceeded() -> Error {
grpc_error(Code::DeadlineExceeded)
}
fn grpc_internal() -> Error {
grpc_error(Code::Internal)
}
fn grpc_resource_exhausted() -> Error {
grpc_error(Code::ResourceExhausted)
}
fn grpc_unavailable() -> Error {
grpc_error(Code::Unavailable)
}
fn grpc_permanent() -> Error {
grpc_error(Code::PermissionDenied)
}
fn grpc_error(code: Code) -> Error {
let status = google_cloud_gax::error::rpc::Status::default().set_code(code);
Error::service(status)
}
}