Skip to main content

ntex_amqp/
error.rs

1use std::io;
2
3use ntex_bytes::ByteString;
4use ntex_util::future::Either;
5
6pub use crate::codec::protocol::{Error, ErrorInner};
7pub use crate::codec::{AmqpCodecError, AmqpParseError, ProtocolIdError};
8use crate::{codec::protocol, types::Outcome};
9
10/// Errors which can occur when attempting to handle amqp connection.
11#[derive(Debug, thiserror::Error)]
12pub enum AmqpDispatcherError {
13    #[error("Service error")]
14    /// Service error
15    Service,
16    /// Amqp protocol error
17    #[error("Amqp protocol error: {:?}", _0)]
18    Protocol(#[from] AmqpProtocolError),
19    /// Peer disconnect
20    #[error("Peer disconnected error: {:?}", _0)]
21    Disconnected(Option<io::Error>),
22}
23
24impl Clone for AmqpDispatcherError {
25    fn clone(&self) -> Self {
26        match self {
27            AmqpDispatcherError::Service => AmqpDispatcherError::Service,
28            AmqpDispatcherError::Protocol(err) => AmqpDispatcherError::Protocol(err.clone()),
29            AmqpDispatcherError::Disconnected(Some(err)) => AmqpDispatcherError::Disconnected(
30                Some(io::Error::new(err.kind(), format!("{err}"))),
31            ),
32            AmqpDispatcherError::Disconnected(None) => AmqpDispatcherError::Disconnected(None),
33        }
34    }
35}
36
37#[derive(Clone, Debug, thiserror::Error)]
38pub enum AmqpProtocolError {
39    #[error("Codec error: {:?}", _0)]
40    Codec(#[from] AmqpCodecError),
41    #[error("Too many channels")]
42    TooManyChannels,
43    #[error("Body is too large")]
44    BodyTooLarge,
45    #[error("Keep-alive timeout")]
46    KeepAliveTimeout,
47    #[error("Read timeout")]
48    ReadTimeout,
49    #[error("Disconnected")]
50    Disconnected,
51    #[error("Unknown session: {:?}", _0)]
52    UnknownSession(protocol::Frame),
53    #[error("Unknown link in session: {:?}", _0)]
54    UnknownLink(protocol::Frame),
55    #[error("Connection closed, error: {:?}", _0)]
56    Closed(Option<protocol::Error>),
57    #[error("Session ended, error: {:?}", _0)]
58    SessionEnded(Option<protocol::Error>),
59    #[error("Link detached, error: {:?}", _0)]
60    LinkDetached(Option<protocol::Error>),
61    #[error("Unexpected frame for opening state, got: {:?}", _0)]
62    UnexpectedOpeningState(protocol::Frame),
63    #[error("Unexpected frame: {:?}", _0)]
64    Unexpected(protocol::Frame),
65    #[error("Connection is dropped")]
66    ConnectionDropped,
67}
68
69impl From<AmqpParseError> for AmqpProtocolError {
70    fn from(err: AmqpParseError) -> Self {
71        Self::Codec(err.into())
72    }
73}
74
75#[derive(Clone, Debug, thiserror::Error)]
76#[error("Amqp error: {:?} {:?} ({:?})", err, description, info)]
77pub struct AmqpError {
78    err: Either<protocol::AmqpError, protocol::ErrorCondition>,
79    description: Option<ByteString>,
80    info: Option<protocol::FieldsVec>,
81}
82
83impl AmqpError {
84    pub fn new(err: protocol::AmqpError) -> Self {
85        AmqpError {
86            err: Either::Left(err),
87            description: None,
88            info: None,
89        }
90    }
91
92    pub fn with_error(err: protocol::ErrorCondition) -> Self {
93        AmqpError {
94            err: Either::Right(err),
95            description: None,
96            info: None,
97        }
98    }
99
100    pub fn internal_error() -> Self {
101        Self::new(protocol::AmqpError::InternalError)
102    }
103
104    pub fn not_found() -> Self {
105        Self::new(protocol::AmqpError::NotFound)
106    }
107
108    pub fn unauthorized_access() -> Self {
109        Self::new(protocol::AmqpError::UnauthorizedAccess)
110    }
111
112    pub fn decode_error() -> Self {
113        Self::new(protocol::AmqpError::DecodeError)
114    }
115
116    pub fn invalid_field() -> Self {
117        Self::new(protocol::AmqpError::InvalidField)
118    }
119
120    pub fn not_allowed() -> Self {
121        Self::new(protocol::AmqpError::NotAllowed)
122    }
123
124    pub fn not_implemented() -> Self {
125        Self::new(protocol::AmqpError::NotImplemented)
126    }
127
128    #[must_use]
129    pub fn text(mut self, text: &'static str) -> Self {
130        self.description = Some(ByteString::from_static(text));
131        self
132    }
133
134    #[must_use]
135    pub fn description<T>(mut self, text: T) -> Self
136    where
137        ByteString: From<T>,
138    {
139        self.description = Some(ByteString::from(text));
140        self
141    }
142}
143
144impl From<AmqpError> for protocol::Error {
145    fn from(e: AmqpError) -> protocol::Error {
146        let condition = match e.err {
147            Either::Left(err) => err.into(),
148            Either::Right(err) => err,
149        };
150        protocol::Error(Box::new(ErrorInner {
151            condition,
152            description: e.description,
153            info: e.info,
154        }))
155    }
156}
157
158impl TryFrom<AmqpError> for Outcome {
159    type Error = Error;
160
161    fn try_from(err: AmqpError) -> Result<Self, Error> {
162        Ok(Outcome::Error(err.into()))
163    }
164}
165
166#[derive(Clone, Debug, thiserror::Error)]
167#[error("Link error: {:?} {:?} ({:?})", err, description, info)]
168pub struct LinkError {
169    err: Either<protocol::LinkError, protocol::ErrorCondition>,
170    description: Option<ByteString>,
171    info: Option<protocol::FieldsVec>,
172}
173
174impl LinkError {
175    pub fn new(error: protocol::ErrorCondition) -> Self {
176        LinkError {
177            err: Either::Right(error),
178            description: None,
179            info: None,
180        }
181    }
182
183    pub fn force_detach() -> Self {
184        LinkError {
185            err: Either::Left(protocol::LinkError::DetachForced),
186            description: None,
187            info: None,
188        }
189    }
190
191    pub fn redirect() -> Self {
192        LinkError {
193            err: Either::Left(protocol::LinkError::Redirect),
194            description: None,
195            info: None,
196        }
197    }
198
199    #[must_use]
200    pub fn text(mut self, text: &'static str) -> Self {
201        self.description = Some(ByteString::from_static(text));
202        self
203    }
204
205    #[must_use]
206    pub fn description<T>(mut self, text: T) -> Self
207    where
208        ByteString: From<T>,
209    {
210        self.description = Some(ByteString::from(text));
211        self
212    }
213
214    #[must_use]
215    pub fn fields(mut self, fields: protocol::FieldsVec) -> Self {
216        self.info = Some(fields);
217        self
218    }
219}
220
221impl From<LinkError> for protocol::Error {
222    fn from(e: LinkError) -> protocol::Error {
223        let condition = match e.err {
224            Either::Left(err) => err.into(),
225            Either::Right(err) => err,
226        };
227
228        protocol::Error(Box::new(ErrorInner {
229            condition,
230            description: e.description,
231            info: e.info,
232        }))
233    }
234}
235
236impl TryFrom<LinkError> for Outcome {
237    type Error = Error;
238
239    fn try_from(err: LinkError) -> Result<Self, Error> {
240        Ok(Outcome::Error(err.into()))
241    }
242}