use derive_more::Display;
use cassandra_protocol::error::Error;
use cassandra_protocol::frame::message_error::{
ErrorBody, ErrorType, ReadTimeoutError, WriteTimeoutError, WriteType,
};
#[derive(Debug, PartialEq, Eq, Ord, PartialOrd, Hash, Copy, Clone, Display)]
pub enum RetryDecision {
RetrySameNode,
RetryNextNode,
DontRetry,
}
pub struct QueryInfo<'a> {
pub error: &'a Error,
pub is_idempotent: bool,
}
pub trait RetrySession {
fn decide(&mut self, query_info: QueryInfo) -> RetryDecision;
}
pub trait RetryPolicy {
fn new_session(&self) -> Box<dyn RetrySession + Send + Sync>;
}
#[derive(Default, Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct FallthroughRetryPolicy;
impl RetryPolicy for FallthroughRetryPolicy {
fn new_session(&self) -> Box<dyn RetrySession + Send + Sync> {
Box::<FallthroughRetrySession>::default()
}
}
#[derive(Default, Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct FallthroughRetrySession;
impl RetrySession for FallthroughRetrySession {
fn decide(&mut self, _query_info: QueryInfo) -> RetryDecision {
RetryDecision::DontRetry
}
}
#[derive(Default, Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct DefaultRetryPolicy;
impl RetryPolicy for DefaultRetryPolicy {
fn new_session(&self) -> Box<dyn RetrySession + Send + Sync> {
Box::<DefaultRetrySession>::default()
}
}
#[derive(Default, Debug, Clone, Copy)]
pub struct DefaultRetrySession {
was_unavailable_retry: bool,
was_read_timeout_retry: bool,
was_write_timeout_retry: bool,
}
impl RetrySession for DefaultRetrySession {
fn decide(&mut self, query_info: QueryInfo) -> RetryDecision {
match query_info.error {
Error::Io(_)
| Error::General(_)
| Error::Server {
body:
ErrorBody {
ty: ErrorType::Overloaded,
..
},
..
}
| Error::Server {
body:
ErrorBody {
ty: ErrorType::Server,
..
},
..
}
| Error::Server {
body:
ErrorBody {
ty: ErrorType::Truncate,
..
},
..
} => {
if query_info.is_idempotent {
RetryDecision::RetryNextNode
} else {
RetryDecision::DontRetry
}
}
Error::Server {
body:
ErrorBody {
ty: ErrorType::Unavailable(_),
..
},
..
} => {
if !self.was_unavailable_retry {
self.was_unavailable_retry = true;
RetryDecision::RetryNextNode
} else {
RetryDecision::DontRetry
}
}
Error::Server {
body:
ErrorBody {
ty: ErrorType::ReadTimeout(error @ ReadTimeoutError { .. }),
..
},
..
} => {
if !self.was_read_timeout_retry
&& error.received >= error.block_for
&& error.replica_has_responded()
{
self.was_read_timeout_retry = true;
RetryDecision::RetrySameNode
} else {
RetryDecision::DontRetry
}
}
Error::Server {
body:
ErrorBody {
ty: ErrorType::WriteTimeout(error @ WriteTimeoutError { .. }),
..
},
..
} => {
if !self.was_write_timeout_retry
&& query_info.is_idempotent
&& error.write_type == WriteType::BatchLog
{
self.was_write_timeout_retry = true;
RetryDecision::RetrySameNode
} else {
RetryDecision::DontRetry
}
}
Error::Server {
body:
ErrorBody {
ty: ErrorType::IsBootstrapping,
..
},
..
} => RetryDecision::RetryNextNode,
_ => RetryDecision::DontRetry,
}
}
}