use scylla_cql::frame::response::error::{DbError, WriteType};
use crate::errors::RequestAttemptError;
use super::{RequestInfo, RetryDecision, RetryPolicy, RetrySession};
#[derive(Debug)]
pub struct DefaultRetryPolicy;
impl DefaultRetryPolicy {
pub fn new() -> DefaultRetryPolicy {
DefaultRetryPolicy
}
}
impl Default for DefaultRetryPolicy {
fn default() -> DefaultRetryPolicy {
DefaultRetryPolicy::new()
}
}
impl RetryPolicy for DefaultRetryPolicy {
fn new_session(&self) -> Box<dyn RetrySession> {
Box::new(DefaultRetrySession::new())
}
}
pub struct DefaultRetrySession {
was_unavailable_retry: bool,
was_read_timeout_retry: bool,
was_write_timeout_retry: bool,
}
impl DefaultRetrySession {
pub fn new() -> DefaultRetrySession {
DefaultRetrySession {
was_unavailable_retry: false,
was_read_timeout_retry: false,
was_write_timeout_retry: false,
}
}
}
impl Default for DefaultRetrySession {
fn default() -> DefaultRetrySession {
DefaultRetrySession::new()
}
}
impl RetrySession for DefaultRetrySession {
fn decide_should_retry(&mut self, request_info: RequestInfo) -> RetryDecision {
if request_info.consistency.is_serial() {
return RetryDecision::DontRetry;
};
#[deny(clippy::wildcard_enum_match_arm)]
match request_info.error {
RequestAttemptError::BrokenConnectionError(_) => {
if request_info.is_idempotent {
RetryDecision::RetryNextTarget(None)
} else {
RetryDecision::DontRetry
}
}
RequestAttemptError::DbError(db_error, _) => {
#[deny(clippy::wildcard_enum_match_arm)]
match db_error {
DbError::Overloaded | DbError::ServerError | DbError::TruncateError => {
if request_info.is_idempotent {
RetryDecision::RetryNextTarget(None)
} else {
RetryDecision::DontRetry
}
}
DbError::Unavailable { .. } => {
if !self.was_unavailable_retry {
self.was_unavailable_retry = true;
RetryDecision::RetryNextTarget(None)
} else {
RetryDecision::DontRetry
}
}
DbError::ReadTimeout {
received,
required,
data_present,
..
} => {
if !self.was_read_timeout_retry && received >= required && !*data_present {
self.was_read_timeout_retry = true;
RetryDecision::RetrySameTarget(None)
} else {
RetryDecision::DontRetry
}
}
DbError::WriteTimeout { write_type, .. } => {
if !self.was_write_timeout_retry
&& request_info.is_idempotent
&& *write_type == WriteType::BatchLog
{
self.was_write_timeout_retry = true;
RetryDecision::RetrySameTarget(None)
} else {
RetryDecision::DontRetry
}
}
DbError::IsBootstrapping => RetryDecision::RetryNextTarget(None),
DbError::SyntaxError
| DbError::Invalid
| DbError::AlreadyExists { .. }
| DbError::FunctionFailure { .. }
| DbError::AuthenticationError
| DbError::Unauthorized
| DbError::ConfigError
| DbError::ReadFailure { .. }
| DbError::WriteFailure { .. }
| DbError::Unprepared { .. }
| DbError::ProtocolError
| DbError::RateLimitReached { .. }
| DbError::Other(_)
| _ => RetryDecision::DontRetry,
}
}
RequestAttemptError::UnableToAllocStreamId => RetryDecision::RetryNextTarget(None),
RequestAttemptError::BodyExtensionsParseError(_)
| RequestAttemptError::CqlErrorParseError(_)
| RequestAttemptError::CqlRequestSerialization(_)
| RequestAttemptError::CqlResultParseError(_)
| RequestAttemptError::NonfinishedPagingState
| RequestAttemptError::RepreparedIdChanged { .. }
| RequestAttemptError::RepreparedIdMissingInBatch
| RequestAttemptError::SerializationError(_)
| RequestAttemptError::UnexpectedResponse(_) => RetryDecision::DontRetry,
}
}
fn reset(&mut self) {
*self = DefaultRetrySession::new();
}
}
#[cfg(test)]
mod tests {
use super::{DefaultRetryPolicy, RequestInfo, RetryDecision, RetryPolicy};
use crate::errors::{BrokenConnectionErrorKind, RequestAttemptError};
use crate::errors::{DbError, WriteType};
use crate::statement::Consistency;
use crate::test_utils::setup_tracing;
use bytes::Bytes;
use scylla_cql::frame::frame_errors::{BatchSerializationError, CqlRequestSerializationError};
fn make_request_info(error: &RequestAttemptError, is_idempotent: bool) -> RequestInfo<'_> {
RequestInfo {
error,
is_idempotent,
consistency: Consistency::One,
}
}
fn default_policy_assert_never_retries(error: RequestAttemptError) {
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_request_info(&error, false)),
RetryDecision::DontRetry
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_request_info(&error, true)),
RetryDecision::DontRetry
);
}
#[test]
fn default_never_retries() {
setup_tracing();
let never_retried_dberrors = vec![
DbError::SyntaxError,
DbError::Invalid,
DbError::AlreadyExists {
keyspace: String::new(),
table: String::new(),
},
DbError::FunctionFailure {
keyspace: String::new(),
function: String::new(),
arg_types: vec![],
},
DbError::AuthenticationError,
DbError::Unauthorized,
DbError::ConfigError,
DbError::ReadFailure {
consistency: Consistency::Two,
received: 2,
required: 1,
numfailures: 1,
data_present: false,
},
DbError::WriteFailure {
consistency: Consistency::Two,
received: 1,
required: 2,
numfailures: 1,
write_type: WriteType::BatchLog,
},
DbError::Unprepared {
statement_id: Bytes::from_static(b"deadbeef"),
},
DbError::ProtocolError,
DbError::Other(0x124816),
];
for dberror in never_retried_dberrors {
default_policy_assert_never_retries(RequestAttemptError::DbError(
dberror,
String::new(),
));
}
default_policy_assert_never_retries(RequestAttemptError::RepreparedIdMissingInBatch);
default_policy_assert_never_retries(RequestAttemptError::RepreparedIdChanged {
statement: String::new(),
expected_id: vec![],
reprepared_id: vec![],
});
default_policy_assert_never_retries(RequestAttemptError::CqlRequestSerialization(
CqlRequestSerializationError::BatchSerialization(
BatchSerializationError::TooManyStatements(u16::MAX as usize + 1),
),
));
}
fn default_policy_assert_idempotent_next(error: RequestAttemptError) {
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_request_info(&error, false)),
RetryDecision::DontRetry
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_request_info(&error, true)),
RetryDecision::RetryNextTarget(None)
);
}
#[test]
fn default_idempotent_next_retries() {
setup_tracing();
let idempotent_next_errors = vec![
RequestAttemptError::DbError(DbError::Overloaded, String::new()),
RequestAttemptError::DbError(DbError::TruncateError, String::new()),
RequestAttemptError::DbError(DbError::ServerError, String::new()),
RequestAttemptError::BrokenConnectionError(
BrokenConnectionErrorKind::TooManyOrphanedStreamIds(5).into(),
),
];
for error in idempotent_next_errors {
default_policy_assert_idempotent_next(error);
}
}
#[test]
fn default_bootstrapping() {
setup_tracing();
let error = RequestAttemptError::DbError(DbError::IsBootstrapping, String::new());
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_request_info(&error, false)),
RetryDecision::RetryNextTarget(None)
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_request_info(&error, true)),
RetryDecision::RetryNextTarget(None)
);
}
#[test]
fn default_unavailable() {
setup_tracing();
let error = RequestAttemptError::DbError(
DbError::Unavailable {
consistency: Consistency::Two,
required: 2,
alive: 1,
},
String::new(),
);
let mut policy_not_idempotent = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy_not_idempotent.decide_should_retry(make_request_info(&error, false)),
RetryDecision::RetryNextTarget(None)
);
assert_eq!(
policy_not_idempotent.decide_should_retry(make_request_info(&error, false)),
RetryDecision::DontRetry
);
let mut policy_idempotent = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy_idempotent.decide_should_retry(make_request_info(&error, true)),
RetryDecision::RetryNextTarget(None)
);
assert_eq!(
policy_idempotent.decide_should_retry(make_request_info(&error, true)),
RetryDecision::DontRetry
);
}
#[test]
fn default_read_timeout() {
setup_tracing();
let enough_responses_no_data = RequestAttemptError::DbError(
DbError::ReadTimeout {
consistency: Consistency::Two,
received: 2,
required: 2,
data_present: false,
},
String::new(),
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_request_info(&enough_responses_no_data, false)),
RetryDecision::RetrySameTarget(None)
);
assert_eq!(
policy.decide_should_retry(make_request_info(&enough_responses_no_data, false)),
RetryDecision::DontRetry
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_request_info(&enough_responses_no_data, true)),
RetryDecision::RetrySameTarget(None)
);
assert_eq!(
policy.decide_should_retry(make_request_info(&enough_responses_no_data, true)),
RetryDecision::DontRetry
);
let enough_responses_with_data = RequestAttemptError::DbError(
DbError::ReadTimeout {
consistency: Consistency::Two,
received: 2,
required: 2,
data_present: true,
},
String::new(),
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_request_info(&enough_responses_with_data, false)),
RetryDecision::DontRetry
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_request_info(&enough_responses_with_data, true)),
RetryDecision::DontRetry
);
let not_enough_responses_with_data = RequestAttemptError::DbError(
DbError::ReadTimeout {
consistency: Consistency::Two,
received: 1,
required: 2,
data_present: true,
},
String::new(),
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_request_info(¬_enough_responses_with_data, false)),
RetryDecision::DontRetry
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_request_info(¬_enough_responses_with_data, true)),
RetryDecision::DontRetry
);
}
#[test]
fn default_write_timeout() {
setup_tracing();
let good_write_type = RequestAttemptError::DbError(
DbError::WriteTimeout {
consistency: Consistency::Two,
received: 1,
required: 2,
write_type: WriteType::BatchLog,
},
String::new(),
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_request_info(&good_write_type, false)),
RetryDecision::DontRetry
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_request_info(&good_write_type, true)),
RetryDecision::RetrySameTarget(None)
);
assert_eq!(
policy.decide_should_retry(make_request_info(&good_write_type, true)),
RetryDecision::DontRetry
);
let bad_write_type = RequestAttemptError::DbError(
DbError::WriteTimeout {
consistency: Consistency::Two,
received: 4,
required: 2,
write_type: WriteType::Simple,
},
String::new(),
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_request_info(&bad_write_type, false)),
RetryDecision::DontRetry
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_request_info(&bad_write_type, true)),
RetryDecision::DontRetry
);
}
}