use crate::frame::frame_errors::{FrameError, ParseError};
use crate::frame::protocol_features::ProtocolFeatures;
use crate::frame::value::SerializeValuesError;
use crate::types::serialize::SerializationError;
use crate::Consistency;
use bytes::Bytes;
use std::io::ErrorKind;
use std::sync::Arc;
use thiserror::Error;
#[derive(Error, Debug, Clone)]
pub enum QueryError {
#[error("Database returned an error: {0}, Error message: {1}")]
DbError(DbError, String),
#[error(transparent)]
BadQuery(#[from] BadQuery),
#[error("IO Error: {0}")]
IoError(Arc<std::io::Error>),
#[error("Protocol Error: {0}")]
ProtocolError(&'static str),
#[error("Invalid message: {0}")]
InvalidMessage(String),
#[error("Timeout Error")]
TimeoutError,
#[error("Too many orphaned stream ids: {0}")]
TooManyOrphanedStreamIds(u16),
#[error("Unable to allocate stream id")]
UnableToAllocStreamId,
#[error("Request timeout: {0}")]
RequestTimeout(String),
#[error("Address translation failed: {0}")]
TranslationError(#[from] TranslationError),
}
#[derive(Error, Debug, Clone, PartialEq, Eq)]
pub enum DbError {
#[error("The submitted query has a syntax error")]
SyntaxError,
#[error("The query is syntactically correct but invalid")]
Invalid,
#[error(
"Attempted to create a keyspace or a table that was already existing \
(keyspace: {keyspace}, table: {table})"
)]
AlreadyExists {
keyspace: String,
table: String,
},
#[error(
"User defined function failed during execution \
(keyspace: {keyspace}, function: {function}, arg_types: {arg_types:?})"
)]
FunctionFailure {
keyspace: String,
function: String,
arg_types: Vec<String>,
},
#[error("Authentication failed - bad credentials")]
AuthenticationError,
#[error("The logged user doesn't have the right to perform the query")]
Unauthorized,
#[error("The query is invalid because of some configuration issue")]
ConfigError,
#[error(
"Not enough nodes are alive to satisfy required consistency level \
(consistency: {consistency}, required: {required}, alive: {alive})"
)]
Unavailable {
consistency: Consistency,
required: i32,
alive: i32,
},
#[error("The request cannot be processed because the coordinator node is overloaded")]
Overloaded,
#[error("The coordinator node is still bootstrapping")]
IsBootstrapping,
#[error("Error during truncate operation")]
TruncateError,
#[error("Not enough nodes responded to the read request in time to satisfy required consistency level \
(consistency: {consistency}, received: {received}, required: {required}, data_present: {data_present})")]
ReadTimeout {
consistency: Consistency,
received: i32,
required: i32,
data_present: bool,
},
#[error("Not enough nodes responded to the write request in time to satisfy required consistency level \
(consistency: {consistency}, received: {received}, required: {required}, write_type: {write_type})")]
WriteTimeout {
consistency: Consistency,
received: i32,
required: i32,
write_type: WriteType,
},
#[error(
"A non-timeout error during a read request \
(consistency: {consistency}, received: {received}, required: {required}, \
numfailures: {numfailures}, data_present: {data_present})"
)]
ReadFailure {
consistency: Consistency,
received: i32,
required: i32,
numfailures: i32,
data_present: bool,
},
#[error(
"A non-timeout error during a write request \
(consistency: {consistency}, received: {received}, required: {required}, \
numfailures: {numfailures}, write_type: {write_type}"
)]
WriteFailure {
consistency: Consistency,
received: i32,
required: i32,
numfailures: i32,
write_type: WriteType,
},
#[error(
"Tried to execute a prepared statement that is not prepared. Driver should prepare it again"
)]
Unprepared {
statement_id: Bytes,
},
#[error("Internal server error. This indicates a server-side bug")]
ServerError,
#[error("Invalid protocol message received from the driver")]
ProtocolError,
#[error("Rate limit was exceeded for a partition affected by the request")]
RateLimitReached {
op_type: OperationType,
rejected_by_coordinator: bool,
},
#[error("Other error not specified in the specification. Error code: {0}")]
Other(i32),
}
impl DbError {
pub fn code(&self, protocol_features: &ProtocolFeatures) -> i32 {
match self {
DbError::ServerError => 0x0000,
DbError::ProtocolError => 0x000A,
DbError::AuthenticationError => 0x0100,
DbError::Unavailable {
consistency: _,
required: _,
alive: _,
} => 0x1000,
DbError::Overloaded => 0x1001,
DbError::IsBootstrapping => 0x1002,
DbError::TruncateError => 0x1003,
DbError::WriteTimeout {
consistency: _,
received: _,
required: _,
write_type: _,
} => 0x1100,
DbError::ReadTimeout {
consistency: _,
received: _,
required: _,
data_present: _,
} => 0x1200,
DbError::ReadFailure {
consistency: _,
received: _,
required: _,
numfailures: _,
data_present: _,
} => 0x1300,
DbError::FunctionFailure {
keyspace: _,
function: _,
arg_types: _,
} => 0x1400,
DbError::WriteFailure {
consistency: _,
received: _,
required: _,
numfailures: _,
write_type: _,
} => 0x1500,
DbError::SyntaxError => 0x2000,
DbError::Unauthorized => 0x2100,
DbError::Invalid => 0x2200,
DbError::ConfigError => 0x2300,
DbError::AlreadyExists {
keyspace: _,
table: _,
} => 0x2400,
DbError::Unprepared { statement_id: _ } => 0x2500,
DbError::Other(code) => *code,
DbError::RateLimitReached {
op_type: _,
rejected_by_coordinator: _,
} => protocol_features.rate_limit_error.unwrap(),
}
}
}
#[derive(Debug, Copy, Clone, Error)]
pub enum TranslationError {
#[error("No rule for address")]
NoRuleForAddress,
#[error("Invalid address in rule")]
InvalidAddressInRule,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OperationType {
Read,
Write,
Other(u8),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WriteType {
Simple,
Batch,
UnloggedBatch,
Counter,
BatchLog,
Cas,
View,
Cdc,
Other(String),
}
#[derive(Error, Debug, Clone)]
#[error("Invalid query passed to Session")]
pub enum BadQuery {
#[error("Serializing values failed: {0} ")]
SerializeValuesError(#[from] SerializeValuesError),
#[error("Serializing values failed: {0} ")]
SerializationError(#[from] SerializationError),
#[error("Serialized values are too long to compute partition key! Length: {0}, Max allowed length: {1}")]
ValuesTooLongForKey(usize, usize),
#[error("Passed invalid keyspace name to use: {0}")]
BadKeyspaceName(#[from] BadKeyspaceName),
#[error("Number of Queries in Batch Statement supplied is {0} which has exceeded the max value of 65,535")]
TooManyQueriesInBatchStatement(usize),
#[error("{0}")]
Other(String),
}
#[derive(Error, Debug, Clone)]
pub enum NewSessionError {
#[error("Couldn't resolve any hostname: {0:?}")]
FailedToResolveAnyHostname(Vec<String>),
#[error("Empty known nodes list")]
EmptyKnownNodesList,
#[error("Database returned an error: {0}, Error message: {1}")]
DbError(DbError, String),
#[error(transparent)]
BadQuery(#[from] BadQuery),
#[error("IO Error: {0}")]
IoError(Arc<std::io::Error>),
#[error("Protocol Error: {0}")]
ProtocolError(&'static str),
#[error("Invalid message: {0}")]
InvalidMessage(String),
#[error("Timeout Error")]
TimeoutError,
#[error("Too many orphaned stream ids: {0}")]
TooManyOrphanedStreamIds(u16),
#[error("Unable to allocate stream id")]
UnableToAllocStreamId,
#[error("Client timeout: {0}")]
RequestTimeout(String),
#[error("Address translation failed: {0}")]
TranslationError(#[from] TranslationError),
}
#[derive(Debug, Error, Clone)]
pub enum BadKeyspaceName {
#[error("Keyspace name is empty")]
Empty,
#[error("Keyspace name too long, must be up to 48 characters, found {1} characters. Bad keyspace name: '{0}'")]
TooLong(String, usize),
#[error("Illegal character found: '{1}', only alphanumeric and underscores allowed. Bad keyspace name: '{0}'")]
IllegalCharacter(String, char),
}
impl std::fmt::Display for WriteType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl From<std::io::Error> for QueryError {
fn from(io_error: std::io::Error) -> QueryError {
QueryError::IoError(Arc::new(io_error))
}
}
impl From<SerializeValuesError> for QueryError {
fn from(serialized_err: SerializeValuesError) -> QueryError {
QueryError::BadQuery(BadQuery::SerializeValuesError(serialized_err))
}
}
impl From<SerializationError> for QueryError {
fn from(serialized_err: SerializationError) -> QueryError {
QueryError::BadQuery(BadQuery::SerializationError(serialized_err))
}
}
impl From<ParseError> for QueryError {
fn from(parse_error: ParseError) -> QueryError {
QueryError::InvalidMessage(format!("Error parsing message: {}", parse_error))
}
}
impl From<FrameError> for QueryError {
fn from(frame_error: FrameError) -> QueryError {
QueryError::InvalidMessage(format!("Frame error: {}", frame_error))
}
}
impl From<tokio::time::error::Elapsed> for QueryError {
fn from(timer_error: tokio::time::error::Elapsed) -> QueryError {
QueryError::RequestTimeout(format!("{}", timer_error))
}
}
impl From<std::io::Error> for NewSessionError {
fn from(io_error: std::io::Error) -> NewSessionError {
NewSessionError::IoError(Arc::new(io_error))
}
}
impl From<QueryError> for NewSessionError {
fn from(query_error: QueryError) -> NewSessionError {
match query_error {
QueryError::DbError(e, msg) => NewSessionError::DbError(e, msg),
QueryError::BadQuery(e) => NewSessionError::BadQuery(e),
QueryError::IoError(e) => NewSessionError::IoError(e),
QueryError::ProtocolError(m) => NewSessionError::ProtocolError(m),
QueryError::InvalidMessage(m) => NewSessionError::InvalidMessage(m),
QueryError::TimeoutError => NewSessionError::TimeoutError,
QueryError::TooManyOrphanedStreamIds(ids) => {
NewSessionError::TooManyOrphanedStreamIds(ids)
}
QueryError::UnableToAllocStreamId => NewSessionError::UnableToAllocStreamId,
QueryError::RequestTimeout(msg) => NewSessionError::RequestTimeout(msg),
QueryError::TranslationError(e) => NewSessionError::TranslationError(e),
}
}
}
impl From<BadKeyspaceName> for QueryError {
fn from(keyspace_err: BadKeyspaceName) -> QueryError {
QueryError::BadQuery(BadQuery::BadKeyspaceName(keyspace_err))
}
}
impl QueryError {
pub fn is_address_unavailable_for_use(&self) -> bool {
if let QueryError::IoError(io_error) = self {
match io_error.kind() {
ErrorKind::AddrInUse | ErrorKind::PermissionDenied => return true,
_ => {}
}
}
false
}
}
impl From<u8> for OperationType {
fn from(operation_type: u8) -> OperationType {
match operation_type {
0 => OperationType::Read,
1 => OperationType::Write,
other => OperationType::Other(other),
}
}
}
impl From<&str> for WriteType {
fn from(write_type_str: &str) -> WriteType {
match write_type_str {
"SIMPLE" => WriteType::Simple,
"BATCH" => WriteType::Batch,
"UNLOGGED_BATCH" => WriteType::UnloggedBatch,
"COUNTER" => WriteType::Counter,
"BATCH_LOG" => WriteType::BatchLog,
"CAS" => WriteType::Cas,
"VIEW" => WriteType::View,
"CDC" => WriteType::Cdc,
_ => WriteType::Other(write_type_str.to_string()),
}
}
}
impl WriteType {
pub fn as_str(&self) -> &str {
match self {
WriteType::Simple => "SIMPLE",
WriteType::Batch => "BATCH",
WriteType::UnloggedBatch => "UNLOGGED_BATCH",
WriteType::Counter => "COUNTER",
WriteType::BatchLog => "BATCH_LOG",
WriteType::Cas => "CAS",
WriteType::View => "VIEW",
WriteType::Cdc => "CDC",
WriteType::Other(write_type) => write_type.as_str(),
}
}
}
#[cfg(test)]
mod tests {
use super::{DbError, QueryError, WriteType};
use crate::frame::types::Consistency;
#[test]
fn write_type_from_str() {
let test_cases: [(&str, WriteType); 9] = [
("SIMPLE", WriteType::Simple),
("BATCH", WriteType::Batch),
("UNLOGGED_BATCH", WriteType::UnloggedBatch),
("COUNTER", WriteType::Counter),
("BATCH_LOG", WriteType::BatchLog),
("CAS", WriteType::Cas),
("VIEW", WriteType::View),
("CDC", WriteType::Cdc),
("SOMEOTHER", WriteType::Other("SOMEOTHER".to_string())),
];
for (write_type_str, expected_write_type) in &test_cases {
let write_type = WriteType::from(*write_type_str);
assert_eq!(write_type, *expected_write_type);
}
}
#[test]
fn dberror_full_info() {
let db_error = DbError::Unavailable {
consistency: Consistency::Three,
required: 3,
alive: 2,
};
let db_error_displayed: String = format!("{}", db_error);
let mut expected_dberr_msg =
"Not enough nodes are alive to satisfy required consistency level ".to_string();
expected_dberr_msg += "(consistency: Three, required: 3, alive: 2)";
assert_eq!(db_error_displayed, expected_dberr_msg);
let query_error =
QueryError::DbError(db_error, "a message about unavailable error".to_string());
let query_error_displayed: String = format!("{}", query_error);
let mut expected_querr_msg = "Database returned an error: ".to_string();
expected_querr_msg += &expected_dberr_msg;
expected_querr_msg += ", Error message: a message about unavailable error";
assert_eq!(query_error_displayed, expected_querr_msg);
}
}