1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use crate::{
channel_status::ChannelState, connection_status::ConnectionState, protocol::AMQPError,
};
use amq_protocol::frame::{GenError, ParserError, ProtocolVersion};
use std::{error, fmt, io, sync::Arc};
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum Error {
ChannelsLimitReached,
InvalidProtocolVersion(ProtocolVersion),
InvalidChannel(u16),
InvalidChannelState(ChannelState),
InvalidConnectionState(ConnectionState),
IOError(Arc<io::Error>),
ParsingError(ParserError),
ProtocolError(AMQPError),
SerialisationError(Arc<GenError>),
}
impl Error {
pub fn wouldblock(&self) -> bool {
if let Error::IOError(e) = self {
e.kind() == io::ErrorKind::WouldBlock
} else {
false
}
}
pub fn interrupted(&self) -> bool {
if let Error::IOError(e) = self {
e.kind() == io::ErrorKind::Interrupted
} else {
false
}
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::ChannelsLimitReached => write!(
f,
"the maximum number of channels for this connection has been reached"
),
Error::InvalidProtocolVersion(version) => {
write!(f, "the server only supports AMQP {}", version)
}
Error::InvalidChannel(channel) => write!(f, "invalid channel: {}", channel),
Error::InvalidChannelState(state) => write!(f, "invalid channel state: {:?}", state),
Error::InvalidConnectionState(state) => {
write!(f, "invalid connection state: {:?}", state)
}
Error::IOError(e) => write!(f, "IO error: {}", e),
Error::ParsingError(e) => write!(f, "failed to parse: {}", e),
Error::ProtocolError(e) => write!(f, "protocol error: {}", e),
Error::SerialisationError(e) => write!(f, "failed to serialise: {}", e),
}
}
}
impl error::Error for Error {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
Error::IOError(e) => Some(&**e),
Error::ParsingError(e) => Some(&*e),
Error::ProtocolError(e) => Some(&*e),
Error::SerialisationError(e) => Some(&**e),
_ => None,
}
}
}
impl From<io::Error> for Error {
fn from(other: io::Error) -> Self {
Error::IOError(Arc::new(other))
}
}
impl PartialEq for Error {
fn eq(&self, other: &Self) -> bool {
use log::error;
use Error::*;
match (self, other) {
(ChannelsLimitReached, ChannelsLimitReached) => true,
(InvalidProtocolVersion(left_inner), InvalidProtocolVersion(right_version)) => {
left_inner == right_version
}
(InvalidChannel(left_inner), InvalidChannel(right_inner)) => left_inner == right_inner,
(InvalidChannelState(left_inner), InvalidChannelState(right_inner)) => {
left_inner == right_inner
}
(InvalidConnectionState(left_inner), InvalidConnectionState(right_inner)) => {
left_inner == right_inner
}
(IOError(_), IOError(_)) => {
error!("Unable to compare lapin::Error::IOError");
false
}
(ParsingError(left_inner), ParsingError(right_inner)) => left_inner == right_inner,
(ProtocolError(left_inner), ProtocolError(right_inner)) => left_inner == right_inner,
(SerialisationError(_), SerialisationError(_)) => {
error!("Unable to compare lapin::Error::SerialisationError");
false
}
_ => false,
}
}
}