use crate::{
ChannelState, ConnectionState, notifier::Notifier, protocol::AMQPError, types::ChannelId,
};
use amq_protocol::{
frame::{GenError, ParserError, ProtocolVersion},
protocol::AMQPErrorKind,
};
use async_rs::{Runtime, traits::*};
use std::{
error, fmt, io,
panic::{RefUnwindSafe, UnwindSafe},
sync::Arc,
};
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Clone, Debug)]
pub struct Error {
kind: ErrorKind,
notifier: Option<Notifier>,
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum ErrorKind {
ChannelsLimitReached,
InvalidProtocolVersion(ProtocolVersion),
InvalidChannel(ChannelId),
InvalidChannelState(ChannelState, &'static str),
InvalidConnectionState(ConnectionState),
IOError(Arc<io::Error>),
RuntimeShutdownError(Arc<io::Error>),
ParsingError(ParserError),
ProtocolError(AMQPError),
SerialisationError(Arc<GenError>),
AuthProviderError(String),
FutureCompleted,
MissingHeartbeatError,
}
impl Error {
pub(crate) fn other<E: Into<Box<dyn error::Error + Send + Sync>>>(error: E) -> Self {
io::Error::other(error).into()
}
pub(crate) fn io<RK: RuntimeKit>(error: io::Error, rt: &Runtime<RK>) -> Self {
if rt.is_runtime_shutdown_error(&error) {
ErrorKind::RuntimeShutdownError(Arc::new(error)).into()
} else {
error.into()
}
}
pub fn kind(&self) -> &ErrorKind {
&self.kind
}
pub(crate) fn notifier(&self) -> Option<Notifier> {
self.notifier.clone()
}
pub(crate) fn with_notifier(mut self, notifier: Option<Notifier>) -> Self {
self.notifier = notifier;
self
}
pub fn wouldblock(&self) -> bool {
if let ErrorKind::IOError(e) = self.kind() {
e.kind() == io::ErrorKind::WouldBlock
} else {
false
}
}
pub fn interrupted(&self) -> bool {
if let ErrorKind::IOError(e) = self.kind() {
e.kind() == io::ErrorKind::Interrupted
} else {
false
}
}
pub fn is_io_error(&self) -> bool {
if let ErrorKind::IOError(_) = self.kind() {
return true;
}
self.is_runtime_shutdown_error()
}
pub fn is_runtime_shutdown_error(&self) -> bool {
if let ErrorKind::RuntimeShutdownError(_) = self.kind() {
return true;
}
false
}
pub fn is_amqp_error(&self) -> bool {
if let ErrorKind::ProtocolError(_) = self.kind() {
return true;
}
false
}
pub fn is_amqp_soft_error(&self) -> bool {
if let ErrorKind::ProtocolError(e) = self.kind()
&& let AMQPErrorKind::Soft(_) = e.kind()
{
return true;
}
false
}
pub fn is_amqp_hard_error(&self) -> bool {
if let ErrorKind::ProtocolError(e) = self.kind()
&& let AMQPErrorKind::Hard(_) = e.kind()
{
return true;
}
false
}
pub fn can_be_recovered(&self) -> bool {
match self.kind() {
ErrorKind::ChannelsLimitReached => false,
ErrorKind::InvalidProtocolVersion(_) => false,
ErrorKind::InvalidChannel(_) => true,
ErrorKind::InvalidChannelState(..) => true,
ErrorKind::InvalidConnectionState(_) => true,
ErrorKind::IOError(_) => true,
ErrorKind::RuntimeShutdownError(_) => false,
ErrorKind::ParsingError(_) => false,
ErrorKind::ProtocolError(_) => true,
ErrorKind::SerialisationError(_) => false,
ErrorKind::AuthProviderError(_) => false,
ErrorKind::FutureCompleted => false,
ErrorKind::MissingHeartbeatError => true,
}
}
}
impl UnwindSafe for Error {}
impl RefUnwindSafe for Error {}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.kind() {
ErrorKind::ChannelsLimitReached => write!(
f,
"the maximum number of channels for this connection has been reached"
),
ErrorKind::InvalidProtocolVersion(version) => {
write!(f, "the server only supports AMQP {version}")
}
ErrorKind::InvalidChannel(channel) => write!(f, "invalid channel: {channel}"),
ErrorKind::InvalidChannelState(state, context) => {
write!(f, "invalid channel state: {state:?} ({context})")
}
ErrorKind::InvalidConnectionState(state) => {
write!(f, "invalid connection state: {state:?}")
}
ErrorKind::IOError(e) => write!(f, "IO error: {e}"),
ErrorKind::RuntimeShutdownError(e) => write!(f, "runtime shutdown error: {e}"),
ErrorKind::ParsingError(e) => write!(f, "failed to parse: {e}"),
ErrorKind::ProtocolError(e) => write!(f, "protocol error: {e}"),
ErrorKind::SerialisationError(e) => write!(f, "failed to serialise: {e}"),
ErrorKind::AuthProviderError(e) => write!(f, "failure during authentication: {e}"),
ErrorKind::FutureCompleted => write!(f, "future polled after completion"),
ErrorKind::MissingHeartbeatError => {
write!(f, "no heartbeat received from server for too long")
}
}
}
}
impl error::Error for Error {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self.kind() {
ErrorKind::IOError(e) => Some(&**e),
ErrorKind::RuntimeShutdownError(e) => Some(&**e),
ErrorKind::ParsingError(e) => Some(e),
ErrorKind::ProtocolError(e) => Some(e),
ErrorKind::SerialisationError(e) => Some(&**e),
_ => None,
}
}
}
impl From<ErrorKind> for Error {
fn from(kind: ErrorKind) -> Self {
Self {
kind,
notifier: None,
}
}
}
impl From<io::Error> for Error {
fn from(other: io::Error) -> Self {
ErrorKind::IOError(Arc::new(other)).into()
}
}
impl PartialEq for Error {
fn eq(&self, other: &Self) -> bool {
use ErrorKind::*;
match (self.kind(), other.kind()) {
(ChannelsLimitReached, ChannelsLimitReached) => true,
(InvalidProtocolVersion(left_inner), InvalidProtocolVersion(right_version)) => {
left_inner == right_version
}
(InvalidChannel(left_inner), InvalidChannel(right_inner)) => left_inner == right_inner,
(
InvalidChannelState(left_inner, left_context),
InvalidChannelState(right_inner, right_context),
) => left_inner == right_inner && left_context == right_context,
(InvalidConnectionState(left_inner), InvalidConnectionState(right_inner)) => {
left_inner == right_inner
}
(IOError(_), IOError(_)) => false,
(RuntimeShutdownError(_), RuntimeShutdownError(_)) => false,
(ParsingError(left_inner), ParsingError(right_inner)) => left_inner == right_inner,
(ProtocolError(left_inner), ProtocolError(right_inner)) => left_inner == right_inner,
(SerialisationError(_), SerialisationError(_)) => false,
(FutureCompleted, FutureCompleted) => true,
_ => false,
}
}
}