1use crate::{
2 channel_status::ChannelState, connection_status::ConnectionState, notifier::Notifier,
3 protocol::AMQPError, types::ChannelId,
4};
5use amq_protocol::{
6 frame::{GenError, ParserError, ProtocolVersion},
7 protocol::AMQPErrorKind,
8};
9use std::{error, fmt, io, sync::Arc};
10
11pub type Result<T> = std::result::Result<T, Error>;
13
14#[derive(Clone, Debug)]
16pub struct Error {
17 kind: ErrorKind,
18 notifier: Option<Notifier>,
19}
20
21#[derive(Clone, Debug)]
26#[non_exhaustive]
27pub enum ErrorKind {
28 ChannelsLimitReached,
29 InvalidProtocolVersion(ProtocolVersion),
30
31 InvalidChannel(ChannelId),
32 InvalidChannelState(ChannelState, &'static str),
33 InvalidConnectionState(ConnectionState),
34
35 IOError(Arc<io::Error>),
36 ParsingError(ParserError),
37 ProtocolError(AMQPError),
38 SerialisationError(Arc<GenError>),
39
40 MissingHeartbeatError,
41
42 NoConfiguredExecutor,
43 NoConfiguredReactor,
44}
45
46impl Error {
47 pub fn kind(&self) -> &ErrorKind {
48 &self.kind
49 }
50
51 pub fn notifier(&self) -> Option<Notifier> {
52 self.notifier.clone()
53 }
54
55 pub(crate) fn with_notifier(mut self, notifier: Option<Notifier>) -> Self {
56 self.notifier = notifier;
57 self
58 }
59
60 pub fn wouldblock(&self) -> bool {
61 if let ErrorKind::IOError(e) = self.kind() {
62 e.kind() == io::ErrorKind::WouldBlock
63 } else {
64 false
65 }
66 }
67
68 pub fn interrupted(&self) -> bool {
69 if let ErrorKind::IOError(e) = self.kind() {
70 e.kind() == io::ErrorKind::Interrupted
71 } else {
72 false
73 }
74 }
75
76 pub fn is_amqp_error(&self) -> bool {
77 if let ErrorKind::ProtocolError(_) = self.kind() {
78 return true;
79 }
80 false
81 }
82
83 pub fn is_amqp_soft_error(&self) -> bool {
84 if let ErrorKind::ProtocolError(e) = self.kind() {
85 if let AMQPErrorKind::Soft(_) = e.kind() {
86 return true;
87 }
88 }
89 false
90 }
91
92 pub fn is_amqp_hard_error(&self) -> bool {
93 if let ErrorKind::ProtocolError(e) = self.kind() {
94 if let AMQPErrorKind::Hard(_) = e.kind() {
95 return true;
96 }
97 }
98 false
99 }
100}
101
102impl fmt::Display for Error {
103 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104 match self.kind() {
105 ErrorKind::ChannelsLimitReached => write!(
106 f,
107 "the maximum number of channels for this connection has been reached"
108 ),
109 ErrorKind::InvalidProtocolVersion(version) => {
110 write!(f, "the server only supports AMQP {}", version)
111 }
112
113 ErrorKind::InvalidChannel(channel) => write!(f, "invalid channel: {}", channel),
114 ErrorKind::InvalidChannelState(state, context) => {
115 write!(f, "invalid channel state: {:?} ({})", state, context)
116 }
117 ErrorKind::InvalidConnectionState(state) => {
118 write!(f, "invalid connection state: {:?}", state)
119 }
120
121 ErrorKind::IOError(e) => write!(f, "IO error: {}", e),
122 ErrorKind::ParsingError(e) => write!(f, "failed to parse: {}", e),
123 ErrorKind::ProtocolError(e) => write!(f, "protocol error: {}", e),
124 ErrorKind::SerialisationError(e) => write!(f, "failed to serialise: {}", e),
125
126 ErrorKind::MissingHeartbeatError => {
127 write!(f, "no heartbeat received from server for too long")
128 }
129
130 ErrorKind::NoConfiguredExecutor => {
131 write!(
132 f,
133 "an executor must be provided if the default-runtime feature is disabled"
134 )
135 }
136 ErrorKind::NoConfiguredReactor => {
137 write!(
138 f,
139 "a reactor must be provided if the default-runtime feature is disabled"
140 )
141 }
142 }
143 }
144}
145
146impl error::Error for Error {
147 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
148 match self.kind() {
149 ErrorKind::IOError(e) => Some(&**e),
150 ErrorKind::ParsingError(e) => Some(e),
151 ErrorKind::ProtocolError(e) => Some(e),
152 ErrorKind::SerialisationError(e) => Some(&**e),
153 _ => None,
154 }
155 }
156}
157
158impl From<ErrorKind> for Error {
159 fn from(kind: ErrorKind) -> Self {
160 Self {
161 kind,
162 notifier: None,
163 }
164 }
165}
166
167impl From<io::Error> for Error {
168 fn from(other: io::Error) -> Self {
169 ErrorKind::IOError(Arc::new(other)).into()
170 }
171}
172
173impl PartialEq for Error {
174 fn eq(&self, other: &Self) -> bool {
175 use ErrorKind::*;
176 use tracing::error;
177
178 match (self.kind(), other.kind()) {
179 (ChannelsLimitReached, ChannelsLimitReached) => true,
180 (InvalidProtocolVersion(left_inner), InvalidProtocolVersion(right_version)) => {
181 left_inner == right_version
182 }
183
184 (InvalidChannel(left_inner), InvalidChannel(right_inner)) => left_inner == right_inner,
185 (
186 InvalidChannelState(left_inner, left_context),
187 InvalidChannelState(right_inner, right_context),
188 ) => left_inner == right_inner && left_context == right_context,
189 (InvalidConnectionState(left_inner), InvalidConnectionState(right_inner)) => {
190 left_inner == right_inner
191 }
192
193 (IOError(_), IOError(_)) => {
194 error!("Unable to compare lapin::ErrorKind::IOError");
195 false
196 }
197 (ParsingError(left_inner), ParsingError(right_inner)) => left_inner == right_inner,
198 (ProtocolError(left_inner), ProtocolError(right_inner)) => left_inner == right_inner,
199 (SerialisationError(_), SerialisationError(_)) => {
200 error!("Unable to compare lapin::ErrorKind::SerialisationError");
201 false
202 }
203
204 _ => false,
205 }
206 }
207}