1use crate::{
2 channel_status::ChannelState, connection_status::ConnectionState, protocol::AMQPError,
3 types::ChannelId,
4};
5use amq_protocol::frame::{GenError, ParserError, ProtocolVersion};
6use std::{error, fmt, io, sync::Arc};
7
8pub type Result<T> = std::result::Result<T, Error>;
10
11#[derive(Clone, Debug)]
16#[non_exhaustive]
17pub enum Error {
18 ChannelsLimitReached,
19 InvalidProtocolVersion(ProtocolVersion),
20
21 InvalidChannel(ChannelId),
22 InvalidChannelState(ChannelState),
23 InvalidConnectionState(ConnectionState),
24
25 IOError(Arc<io::Error>),
26 ParsingError(ParserError),
27 ProtocolError(AMQPError),
28 SerialisationError(Arc<GenError>),
29
30 MissingHeartbeatError,
31}
32
33impl Error {
34 pub fn wouldblock(&self) -> bool {
35 if let Error::IOError(e) = self {
36 e.kind() == io::ErrorKind::WouldBlock
37 } else {
38 false
39 }
40 }
41
42 pub fn interrupted(&self) -> bool {
43 if let Error::IOError(e) = self {
44 e.kind() == io::ErrorKind::Interrupted
45 } else {
46 false
47 }
48 }
49}
50
51impl fmt::Display for Error {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53 match self {
54 Error::ChannelsLimitReached => write!(
55 f,
56 "the maximum number of channels for this connection has been reached"
57 ),
58 Error::InvalidProtocolVersion(version) => {
59 write!(f, "the server only supports AMQP {}", version)
60 }
61
62 Error::InvalidChannel(channel) => write!(f, "invalid channel: {}", channel),
63 Error::InvalidChannelState(state) => write!(f, "invalid channel state: {:?}", state),
64 Error::InvalidConnectionState(state) => {
65 write!(f, "invalid connection state: {:?}", state)
66 }
67
68 Error::IOError(e) => write!(f, "IO error: {}", e),
69 Error::ParsingError(e) => write!(f, "failed to parse: {}", e),
70 Error::ProtocolError(e) => write!(f, "protocol error: {}", e),
71 Error::SerialisationError(e) => write!(f, "failed to serialise: {}", e),
72
73 Error::MissingHeartbeatError => {
74 write!(f, "no heartbeat received from server for too long")
75 }
76 }
77 }
78}
79
80impl error::Error for Error {
81 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
82 match self {
83 Error::IOError(e) => Some(&**e),
84 Error::ParsingError(e) => Some(e),
85 Error::ProtocolError(e) => Some(e),
86 Error::SerialisationError(e) => Some(&**e),
87 _ => None,
88 }
89 }
90}
91
92impl From<io::Error> for Error {
93 fn from(other: io::Error) -> Self {
94 Error::IOError(Arc::new(other))
95 }
96}
97
98impl PartialEq for Error {
99 fn eq(&self, other: &Self) -> bool {
100 use tracing::error;
101 use Error::*;
102
103 match (self, other) {
104 (ChannelsLimitReached, ChannelsLimitReached) => true,
105 (InvalidProtocolVersion(left_inner), InvalidProtocolVersion(right_version)) => {
106 left_inner == right_version
107 }
108
109 (InvalidChannel(left_inner), InvalidChannel(right_inner)) => left_inner == right_inner,
110 (InvalidChannelState(left_inner), InvalidChannelState(right_inner)) => {
111 left_inner == right_inner
112 }
113 (InvalidConnectionState(left_inner), InvalidConnectionState(right_inner)) => {
114 left_inner == right_inner
115 }
116
117 (IOError(_), IOError(_)) => {
118 error!("Unable to compare lapin::Error::IOError");
119 false
120 }
121 (ParsingError(left_inner), ParsingError(right_inner)) => left_inner == right_inner,
122 (ProtocolError(left_inner), ProtocolError(right_inner)) => left_inner == right_inner,
123 (SerialisationError(_), SerialisationError(_)) => {
124 error!("Unable to compare lapin::Error::SerialisationError");
125 false
126 }
127
128 _ => false,
129 }
130 }
131}