#[cfg(feature = "scylla")]
use scylla_driver::transport::errors::QueryError;
use std::sync::{Arc, Mutex};
use thiserror::Error;
use tokio::sync::mpsc::error::SendError as MpscSendError;
use tokio::sync::oneshot::error::RecvError as OneShotRecvError;
#[derive(Debug, Clone, Error)]
pub enum ShardError {
#[error("shard is at capacity, and is unable to evict any records")]
ShardAtCapacity,
#[error("data returned from upstream is immediately expired.")]
DataImmediatelyExpired,
#[error("upstream error: {error:?}")]
UpstreamError { error: UpstreamError },
#[error("the shard is no longer running")]
ShardGone,
}
impl From<OneShotRecvError> for ShardError {
fn from(_error: OneShotRecvError) -> Self {
ShardError::ShardGone
}
}
impl<T> From<MpscSendError<T>> for ShardError {
fn from(_error: MpscSendError<T>) -> Self {
ShardError::ShardGone
}
}
#[derive(Debug, Error, Clone)]
pub enum UpstreamError {
#[error("operation aborted")]
OperationAborted,
#[error("key not found")]
KeyNotFound,
#[error("driver error: {error:?}")]
DriverError { error: Arc<anyhow::Error> },
#[error("serialization error: {error:?}")]
SerializationError { error: Arc<anyhow::Error> },
}
impl PartialEq for UpstreamError {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(UpstreamError::OperationAborted, UpstreamError::OperationAborted)
| (UpstreamError::KeyNotFound, UpstreamError::KeyNotFound) => true,
(
UpstreamError::DriverError { error: self_error },
UpstreamError::DriverError { error: other_error },
)
| (
UpstreamError::SerializationError { error: self_error },
UpstreamError::SerializationError { error: other_error },
) => Arc::ptr_eq(self_error, other_error),
_ => false,
}
}
}
impl UpstreamError {
pub fn is_not_found(&self) -> bool {
matches!(self, UpstreamError::KeyNotFound)
}
pub fn serialization_error<E>(error: E) -> Self
where
E: std::error::Error + Send + 'static,
{
UpstreamError::SerializationError {
error: Arc::new(SyncError::new(error).into()),
}
}
}
impl From<anyhow::Error> for UpstreamError {
fn from(error: anyhow::Error) -> Self {
UpstreamError::DriverError {
error: Arc::new(error),
}
}
}
#[cfg(feature = "scylla")]
impl From<QueryError> for UpstreamError {
fn from(error: QueryError) -> Self {
UpstreamError::DriverError {
error: Arc::new(error.into()),
}
}
}
impl From<UpstreamError> for ShardError {
fn from(error: UpstreamError) -> Self {
ShardError::UpstreamError { error }
}
}
pub struct SyncError<E> {
inner: Mutex<E>,
}
impl<E: std::error::Error + Send + 'static> SyncError<E> {
pub fn new(err: E) -> Self {
SyncError {
inner: Mutex::new(err),
}
}
}
impl<T> std::fmt::Display for SyncError<T>
where
T: std::fmt::Display,
{
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
self.inner.lock().unwrap().fmt(f)
}
}
impl<T> std::fmt::Debug for SyncError<T>
where
T: std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
(*self.inner.lock().unwrap()).fmt(f)
}
}
impl<E: std::error::Error + Send + 'static> std::error::Error for SyncError<E> {}