1use std::{fmt, io, num::NonZeroU16};
2
3use ntex_util::future::Either;
4
5use crate::v5::codec::DisconnectReasonCode;
6
7#[derive(Debug, thiserror::Error)]
9pub enum MqttError<E> {
10 #[error("Service error")]
12 Service(E),
13 #[error("Mqtt handshake error: {}", _0)]
15 Handshake(#[from] HandshakeError<E>),
16}
17
18#[derive(Debug, thiserror::Error)]
20pub enum HandshakeError<E> {
21 #[error("Handshake service error")]
23 Service(E),
24 #[error("Mqtt protocol error: {}", _0)]
26 Protocol(#[from] ProtocolError),
27 #[error("Handshake timeout")]
29 Timeout,
30 #[error("Peer is disconnected, error: {:?}", _0)]
32 Disconnected(Option<io::Error>),
33}
34
35#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
37pub enum PayloadError {
38 #[error("{0}")]
40 Protocol(#[from] ProtocolError),
41 #[error("Service error")]
43 Service,
44 #[error("Payload is consumed")]
46 Consumed,
47 #[error("Peer is disconnected")]
49 Disconnected,
50}
51
52#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)]
54pub enum ProtocolError {
55 #[error("Decoding error: {0:?}")]
57 Decode(#[from] DecodeError),
58 #[error("Encoding error: {0:?}")]
60 Encode(#[from] EncodeError),
61 #[error("Protocol violation: {0}")]
63 ProtocolViolation(#[from] ProtocolViolationError),
64 #[error("Keep Alive timeout")]
66 KeepAliveTimeout,
67 #[error("Read frame timeout")]
69 ReadTimeout,
70}
71
72#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)]
73#[error(transparent)]
74pub struct ProtocolViolationError {
75 inner: ViolationInner,
76}
77
78#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)]
79enum ViolationInner {
80 #[error("{message}")]
81 Common { reason: DisconnectReasonCode, message: &'static str },
82 #[error("{message}; received packet with type `{packet_type:b}`")]
83 UnexpectedPacket { packet_type: u8, message: &'static str },
84}
85
86impl ProtocolViolationError {
87 pub fn reason(&self) -> DisconnectReasonCode {
89 match self.inner {
90 ViolationInner::Common { reason, .. } => reason,
91 ViolationInner::UnexpectedPacket { .. } => DisconnectReasonCode::ProtocolError,
92 }
93 }
94
95 pub fn message(&self) -> &'static str {
97 match self.inner {
98 ViolationInner::Common { message, .. } => message,
99 ViolationInner::UnexpectedPacket { message, .. } => message,
100 }
101 }
102}
103
104impl ProtocolError {
105 pub(crate) fn violation(reason: DisconnectReasonCode, message: &'static str) -> Self {
106 Self::ProtocolViolation(ProtocolViolationError {
107 inner: ViolationInner::Common { reason, message },
108 })
109 }
110 pub fn generic_violation(message: &'static str) -> Self {
111 Self::violation(DisconnectReasonCode::ProtocolError, message)
112 }
113
114 pub(crate) fn unexpected_packet(packet_type: u8, message: &'static str) -> ProtocolError {
115 Self::ProtocolViolation(ProtocolViolationError {
116 inner: ViolationInner::UnexpectedPacket { packet_type, message },
117 })
118 }
119 pub(crate) fn packet_id_mismatch() -> Self {
120 Self::generic_violation(
121 "Packet id of PUBACK packet does not match expected next value according to sending order of PUBLISH packets [MQTT-4.6.0-2]",
122 )
123 }
124}
125
126impl<E> From<io::Error> for MqttError<E> {
127 fn from(err: io::Error) -> Self {
128 MqttError::Handshake(HandshakeError::Disconnected(Some(err)))
129 }
130}
131
132impl<E> From<Either<io::Error, io::Error>> for MqttError<E> {
133 fn from(err: Either<io::Error, io::Error>) -> Self {
134 MqttError::Handshake(HandshakeError::Disconnected(Some(err.into_inner())))
135 }
136}
137
138impl<E> From<EncodeError> for MqttError<E> {
139 fn from(err: EncodeError) -> Self {
140 MqttError::Handshake(HandshakeError::Protocol(ProtocolError::Encode(err)))
141 }
142}
143
144impl<E> From<Either<DecodeError, io::Error>> for HandshakeError<E> {
145 fn from(err: Either<DecodeError, io::Error>) -> Self {
146 match err {
147 Either::Left(err) => HandshakeError::Protocol(ProtocolError::Decode(err)),
148 Either::Right(err) => HandshakeError::Disconnected(Some(err)),
149 }
150 }
151}
152
153#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, thiserror::Error)]
154pub enum DecodeError {
155 #[error("Invalid protocol")]
156 InvalidProtocol,
157 #[error("Invalid length")]
158 InvalidLength,
159 #[error("Malformed packet")]
160 MalformedPacket,
161 #[error("Unsupported protocol level")]
162 UnsupportedProtocolLevel,
163 #[error("Connect frame's reserved flag is set")]
164 ConnectReservedFlagSet,
165 #[error("ConnectAck frame's reserved flag is set")]
166 ConnAckReservedFlagSet,
167 #[error("Invalid client id")]
168 InvalidClientId,
169 #[error("Unsupported packet type")]
170 UnsupportedPacketType,
171 #[error("Packet id is required")]
173 PacketIdRequired,
174 #[error("Max size exceeded")]
175 MaxSizeExceeded,
176 #[error("utf8 error")]
177 Utf8Error,
178 #[error("Unexpected payload")]
179 UnexpectedPayload,
180}
181
182#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, thiserror::Error)]
183pub enum EncodeError {
184 #[error("Packet is bigger than peer's Maximum Packet Size")]
185 OverMaxPacketSize,
186 #[error("Streaming payload is bigger than Publish packet definition")]
187 OverPublishSize,
188 #[error("Streaming payload is incomplete")]
189 PublishIncomplete,
190 #[error("Invalid length")]
191 InvalidLength,
192 #[error("Malformed packet")]
193 MalformedPacket,
194 #[error("Packet id is required")]
195 PacketIdRequired,
196 #[error("Unexpected payload")]
197 UnexpectedPayload,
198 #[error("Publish packet is not completed, expect payload")]
199 ExpectPayload,
200 #[error("Unsupported version")]
201 UnsupportedVersion,
202}
203
204#[derive(Debug, PartialEq, Eq, Copy, Clone, thiserror::Error)]
205pub enum SendPacketError {
206 #[error("Encoding error {:?}", _0)]
208 Encode(#[from] EncodeError),
209 #[error("Provided packet id is in use")]
211 PacketIdInUse(NonZeroU16),
212 #[error("Unexpected publish release")]
214 UnexpectedRelease,
215 #[error("Streaming has been cancelled")]
217 StreamingCancelled,
218 #[error("Peer is disconnected")]
220 Disconnected,
221}
222
223#[derive(Debug, thiserror::Error)]
225pub enum ClientError<T: fmt::Debug> {
226 #[error("Connect ack failed: {:?}", _0)]
228 Ack(T),
229 #[error("Protocol error: {:?}", _0)]
231 Protocol(#[from] ProtocolError),
232 #[error("Handshake timeout")]
234 HandshakeTimeout,
235 #[error("Peer disconnected")]
237 Disconnected(Option<std::io::Error>),
238 #[error("Connect error: {}", _0)]
240 Connect(#[from] ntex_net::connect::ConnectError),
241}
242
243impl<T: fmt::Debug> From<EncodeError> for ClientError<T> {
244 fn from(err: EncodeError) -> Self {
245 ClientError::Protocol(ProtocolError::Encode(err))
246 }
247}
248
249impl<T: fmt::Debug> From<Either<DecodeError, std::io::Error>> for ClientError<T> {
250 fn from(err: Either<DecodeError, std::io::Error>) -> Self {
251 match err {
252 Either::Left(err) => ClientError::Protocol(ProtocolError::Decode(err)),
253 Either::Right(err) => ClientError::Disconnected(Some(err)),
254 }
255 }
256}