lapin/
error.rs

1use crate::{
2    channel_status::ChannelState, connection_status::ConnectionState, notifier::Notifier,
3    protocol::AMQPError, types::ChannelId,
4};
5use amq_protocol::{
6    frame::{GenError, ParserError, ProtocolVersion},
7    protocol::AMQPErrorKind,
8};
9use std::{error, fmt, io, sync::Arc};
10
11/// A std Result with a lapin::Error error type
12pub type Result<T> = std::result::Result<T, Error>;
13
14/// The error that can be returned in this crate.
15#[derive(Clone, Debug)]
16pub struct Error {
17    kind: ErrorKind,
18    notifier: Option<Notifier>,
19}
20
21/// The type of error that can be returned in this crate.
22///
23/// Even though we expose the complete enumeration of possible error variants, it is not
24/// considered stable to exhaustively match on this enumeration: do it at your own risk.
25#[derive(Clone, Debug)]
26#[non_exhaustive]
27pub enum ErrorKind {
28    ChannelsLimitReached,
29    InvalidProtocolVersion(ProtocolVersion),
30
31    InvalidChannel(ChannelId),
32    InvalidChannelState(ChannelState, &'static str),
33    InvalidConnectionState(ConnectionState),
34
35    IOError(Arc<io::Error>),
36    ParsingError(ParserError),
37    ProtocolError(AMQPError),
38    SerialisationError(Arc<GenError>),
39
40    MissingHeartbeatError,
41
42    NoConfiguredExecutor,
43    NoConfiguredReactor,
44}
45
46impl Error {
47    pub fn kind(&self) -> &ErrorKind {
48        &self.kind
49    }
50
51    pub fn notifier(&self) -> Option<Notifier> {
52        self.notifier.clone()
53    }
54
55    pub(crate) fn with_notifier(mut self, notifier: Option<Notifier>) -> Self {
56        self.notifier = notifier;
57        self
58    }
59
60    pub fn wouldblock(&self) -> bool {
61        if let ErrorKind::IOError(e) = self.kind() {
62            e.kind() == io::ErrorKind::WouldBlock
63        } else {
64            false
65        }
66    }
67
68    pub fn interrupted(&self) -> bool {
69        if let ErrorKind::IOError(e) = self.kind() {
70            e.kind() == io::ErrorKind::Interrupted
71        } else {
72            false
73        }
74    }
75
76    pub fn is_amqp_error(&self) -> bool {
77        if let ErrorKind::ProtocolError(_) = self.kind() {
78            return true;
79        }
80        false
81    }
82
83    pub fn is_amqp_soft_error(&self) -> bool {
84        if let ErrorKind::ProtocolError(e) = self.kind() {
85            if let AMQPErrorKind::Soft(_) = e.kind() {
86                return true;
87            }
88        }
89        false
90    }
91
92    pub fn is_amqp_hard_error(&self) -> bool {
93        if let ErrorKind::ProtocolError(e) = self.kind() {
94            if let AMQPErrorKind::Hard(_) = e.kind() {
95                return true;
96            }
97        }
98        false
99    }
100}
101
102impl fmt::Display for Error {
103    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104        match self.kind() {
105            ErrorKind::ChannelsLimitReached => write!(
106                f,
107                "the maximum number of channels for this connection has been reached"
108            ),
109            ErrorKind::InvalidProtocolVersion(version) => {
110                write!(f, "the server only supports AMQP {}", version)
111            }
112
113            ErrorKind::InvalidChannel(channel) => write!(f, "invalid channel: {}", channel),
114            ErrorKind::InvalidChannelState(state, context) => {
115                write!(f, "invalid channel state: {:?} ({})", state, context)
116            }
117            ErrorKind::InvalidConnectionState(state) => {
118                write!(f, "invalid connection state: {:?}", state)
119            }
120
121            ErrorKind::IOError(e) => write!(f, "IO error: {}", e),
122            ErrorKind::ParsingError(e) => write!(f, "failed to parse: {}", e),
123            ErrorKind::ProtocolError(e) => write!(f, "protocol error: {}", e),
124            ErrorKind::SerialisationError(e) => write!(f, "failed to serialise: {}", e),
125
126            ErrorKind::MissingHeartbeatError => {
127                write!(f, "no heartbeat received from server for too long")
128            }
129
130            ErrorKind::NoConfiguredExecutor => {
131                write!(
132                    f,
133                    "an executor must be provided if the default-runtime feature is disabled"
134                )
135            }
136            ErrorKind::NoConfiguredReactor => {
137                write!(
138                    f,
139                    "a reactor must be provided if the default-runtime feature is disabled"
140                )
141            }
142        }
143    }
144}
145
146impl error::Error for Error {
147    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
148        match self.kind() {
149            ErrorKind::IOError(e) => Some(&**e),
150            ErrorKind::ParsingError(e) => Some(e),
151            ErrorKind::ProtocolError(e) => Some(e),
152            ErrorKind::SerialisationError(e) => Some(&**e),
153            _ => None,
154        }
155    }
156}
157
158impl From<ErrorKind> for Error {
159    fn from(kind: ErrorKind) -> Self {
160        Self {
161            kind,
162            notifier: None,
163        }
164    }
165}
166
167impl From<io::Error> for Error {
168    fn from(other: io::Error) -> Self {
169        ErrorKind::IOError(Arc::new(other)).into()
170    }
171}
172
173impl PartialEq for Error {
174    fn eq(&self, other: &Self) -> bool {
175        use ErrorKind::*;
176        use tracing::error;
177
178        match (self.kind(), other.kind()) {
179            (ChannelsLimitReached, ChannelsLimitReached) => true,
180            (InvalidProtocolVersion(left_inner), InvalidProtocolVersion(right_version)) => {
181                left_inner == right_version
182            }
183
184            (InvalidChannel(left_inner), InvalidChannel(right_inner)) => left_inner == right_inner,
185            (
186                InvalidChannelState(left_inner, left_context),
187                InvalidChannelState(right_inner, right_context),
188            ) => left_inner == right_inner && left_context == right_context,
189            (InvalidConnectionState(left_inner), InvalidConnectionState(right_inner)) => {
190                left_inner == right_inner
191            }
192
193            (IOError(_), IOError(_)) => {
194                error!("Unable to compare lapin::ErrorKind::IOError");
195                false
196            }
197            (ParsingError(left_inner), ParsingError(right_inner)) => left_inner == right_inner,
198            (ProtocolError(left_inner), ProtocolError(right_inner)) => left_inner == right_inner,
199            (SerialisationError(_), SerialisationError(_)) => {
200                error!("Unable to compare lapin::ErrorKind::SerialisationError");
201                false
202            }
203
204            _ => false,
205        }
206    }
207}