1use std::io;
2
3use ntex_bytes::ByteString;
4use ntex_util::future::Either;
5
6pub use crate::codec::protocol::{Error, ErrorInner};
7pub use crate::codec::{AmqpCodecError, AmqpParseError, ProtocolIdError};
8use crate::{codec::protocol, types::Outcome};
9
10#[derive(Debug, thiserror::Error)]
12pub enum AmqpDispatcherError {
13 #[error("Service error")]
14 Service,
16 #[error("Amqp protocol error: {:?}", _0)]
18 Protocol(#[from] AmqpProtocolError),
19 #[error("Peer disconnected error: {:?}", _0)]
21 Disconnected(Option<io::Error>),
22}
23
24impl Clone for AmqpDispatcherError {
25 fn clone(&self) -> Self {
26 match self {
27 AmqpDispatcherError::Service => AmqpDispatcherError::Service,
28 AmqpDispatcherError::Protocol(err) => AmqpDispatcherError::Protocol(err.clone()),
29 AmqpDispatcherError::Disconnected(Some(err)) => AmqpDispatcherError::Disconnected(
30 Some(io::Error::new(err.kind(), format!("{err}"))),
31 ),
32 AmqpDispatcherError::Disconnected(None) => AmqpDispatcherError::Disconnected(None),
33 }
34 }
35}
36
37#[derive(Clone, Debug, thiserror::Error)]
38pub enum AmqpProtocolError {
39 #[error("Codec error: {:?}", _0)]
40 Codec(#[from] AmqpCodecError),
41 #[error("Too many channels")]
42 TooManyChannels,
43 #[error("Body is too large")]
44 BodyTooLarge,
45 #[error("Keep-alive timeout")]
46 KeepAliveTimeout,
47 #[error("Read timeout")]
48 ReadTimeout,
49 #[error("Disconnected")]
50 Disconnected,
51 #[error("Unknown session: {:?}", _0)]
52 UnknownSession(protocol::Frame),
53 #[error("Unknown link in session: {:?}", _0)]
54 UnknownLink(protocol::Frame),
55 #[error("Connection closed, error: {:?}", _0)]
56 Closed(Option<protocol::Error>),
57 #[error("Session ended, error: {:?}", _0)]
58 SessionEnded(Option<protocol::Error>),
59 #[error("Link detached, error: {:?}", _0)]
60 LinkDetached(Option<protocol::Error>),
61 #[error("Unexpected frame for opening state, got: {:?}", _0)]
62 UnexpectedOpeningState(protocol::Frame),
63 #[error("Unexpected frame: {:?}", _0)]
64 Unexpected(protocol::Frame),
65 #[error("Connection is dropped")]
66 ConnectionDropped,
67}
68
69impl From<AmqpParseError> for AmqpProtocolError {
70 fn from(err: AmqpParseError) -> Self {
71 Self::Codec(err.into())
72 }
73}
74
75#[derive(Clone, Debug, thiserror::Error)]
76#[error("Amqp error: {:?} {:?} ({:?})", err, description, info)]
77pub struct AmqpError {
78 err: Either<protocol::AmqpError, protocol::ErrorCondition>,
79 description: Option<ByteString>,
80 info: Option<protocol::FieldsVec>,
81}
82
83impl AmqpError {
84 pub fn new(err: protocol::AmqpError) -> Self {
85 AmqpError {
86 err: Either::Left(err),
87 description: None,
88 info: None,
89 }
90 }
91
92 pub fn with_error(err: protocol::ErrorCondition) -> Self {
93 AmqpError {
94 err: Either::Right(err),
95 description: None,
96 info: None,
97 }
98 }
99
100 pub fn internal_error() -> Self {
101 Self::new(protocol::AmqpError::InternalError)
102 }
103
104 pub fn not_found() -> Self {
105 Self::new(protocol::AmqpError::NotFound)
106 }
107
108 pub fn unauthorized_access() -> Self {
109 Self::new(protocol::AmqpError::UnauthorizedAccess)
110 }
111
112 pub fn decode_error() -> Self {
113 Self::new(protocol::AmqpError::DecodeError)
114 }
115
116 pub fn invalid_field() -> Self {
117 Self::new(protocol::AmqpError::InvalidField)
118 }
119
120 pub fn not_allowed() -> Self {
121 Self::new(protocol::AmqpError::NotAllowed)
122 }
123
124 pub fn not_implemented() -> Self {
125 Self::new(protocol::AmqpError::NotImplemented)
126 }
127
128 #[must_use]
129 pub fn text(mut self, text: &'static str) -> Self {
130 self.description = Some(ByteString::from_static(text));
131 self
132 }
133
134 #[must_use]
135 pub fn description<T>(mut self, text: T) -> Self
136 where
137 ByteString: From<T>,
138 {
139 self.description = Some(ByteString::from(text));
140 self
141 }
142}
143
144impl From<AmqpError> for protocol::Error {
145 fn from(e: AmqpError) -> protocol::Error {
146 let condition = match e.err {
147 Either::Left(err) => err.into(),
148 Either::Right(err) => err,
149 };
150 protocol::Error(Box::new(ErrorInner {
151 condition,
152 description: e.description,
153 info: e.info,
154 }))
155 }
156}
157
158impl TryFrom<AmqpError> for Outcome {
159 type Error = Error;
160
161 fn try_from(err: AmqpError) -> Result<Self, Error> {
162 Ok(Outcome::Error(err.into()))
163 }
164}
165
166#[derive(Clone, Debug, thiserror::Error)]
167#[error("Link error: {:?} {:?} ({:?})", err, description, info)]
168pub struct LinkError {
169 err: Either<protocol::LinkError, protocol::ErrorCondition>,
170 description: Option<ByteString>,
171 info: Option<protocol::FieldsVec>,
172}
173
174impl LinkError {
175 pub fn new(error: protocol::ErrorCondition) -> Self {
176 LinkError {
177 err: Either::Right(error),
178 description: None,
179 info: None,
180 }
181 }
182
183 pub fn force_detach() -> Self {
184 LinkError {
185 err: Either::Left(protocol::LinkError::DetachForced),
186 description: None,
187 info: None,
188 }
189 }
190
191 pub fn redirect() -> Self {
192 LinkError {
193 err: Either::Left(protocol::LinkError::Redirect),
194 description: None,
195 info: None,
196 }
197 }
198
199 #[must_use]
200 pub fn text(mut self, text: &'static str) -> Self {
201 self.description = Some(ByteString::from_static(text));
202 self
203 }
204
205 #[must_use]
206 pub fn description<T>(mut self, text: T) -> Self
207 where
208 ByteString: From<T>,
209 {
210 self.description = Some(ByteString::from(text));
211 self
212 }
213
214 #[must_use]
215 pub fn fields(mut self, fields: protocol::FieldsVec) -> Self {
216 self.info = Some(fields);
217 self
218 }
219}
220
221impl From<LinkError> for protocol::Error {
222 fn from(e: LinkError) -> protocol::Error {
223 let condition = match e.err {
224 Either::Left(err) => err.into(),
225 Either::Right(err) => err,
226 };
227
228 protocol::Error(Box::new(ErrorInner {
229 condition,
230 description: e.description,
231 info: e.info,
232 }))
233 }
234}
235
236impl TryFrom<LinkError> for Outcome {
237 type Error = Error;
238
239 fn try_from(err: LinkError) -> Result<Self, Error> {
240 Ok(Outcome::Error(err.into()))
241 }
242}