1use std::{fmt, io, num::NonZeroU16};
2
3use ntex_util::future::Either;
4
5use crate::v5::codec::DisconnectReasonCode;
6
7#[derive(Debug, thiserror::Error)]
9pub enum MqttError<E> {
10 #[error("Service error")]
12 Service(E),
13 #[error("Mqtt handshake error: {}", _0)]
15 Handshake(#[from] HandshakeError<E>),
16}
17
18#[derive(Debug, thiserror::Error)]
20pub enum HandshakeError<E> {
21 #[error("Handshake service error")]
23 Service(E),
24 #[error("Mqtt protocol error: {}", _0)]
26 Protocol(#[from] ProtocolError),
27 #[error("Handshake timeout")]
29 Timeout,
30 #[error("Peer is disconnected, error: {:?}", _0)]
32 Disconnected(Option<io::Error>),
33}
34
35#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
37pub enum PayloadError {
38 #[error("{0}")]
40 Protocol(#[from] ProtocolError),
41 #[error("Service error")]
43 Service,
44 #[error("Payload is consumed")]
46 Consumed,
47 #[error("Peer is disconnected")]
49 Disconnected,
50}
51
52#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)]
54pub enum ProtocolError {
55 #[error("Decoding error: {0:?}")]
57 Decode(#[from] DecodeError),
58 #[error("Encoding error: {0:?}")]
60 Encode(#[from] EncodeError),
61 #[error("Protocol violation: {0}")]
63 ProtocolViolation(#[from] ProtocolViolationError),
64 #[error("Keep Alive timeout")]
66 KeepAliveTimeout,
67 #[error("Read frame timeout")]
69 ReadTimeout,
70}
71
72#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)]
73#[error(transparent)]
74pub struct ProtocolViolationError {
75 inner: ViolationInner,
76}
77
78#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)]
79enum ViolationInner {
80 #[error("{message}")]
81 Common { reason: DisconnectReasonCode, message: &'static str },
82 #[error("{message}; received packet with type `{packet_type:b}`")]
83 UnexpectedPacket { packet_type: u8, message: &'static str },
84}
85
86impl ProtocolViolationError {
87 pub(crate) fn reason(&self) -> DisconnectReasonCode {
88 match self.inner {
89 ViolationInner::Common { reason, .. } => reason,
90 ViolationInner::UnexpectedPacket { .. } => DisconnectReasonCode::ProtocolError,
91 }
92 }
93}
94
95impl ProtocolError {
96 pub(crate) fn violation(reason: DisconnectReasonCode, message: &'static str) -> Self {
97 Self::ProtocolViolation(ProtocolViolationError {
98 inner: ViolationInner::Common { reason, message },
99 })
100 }
101 pub fn generic_violation(message: &'static str) -> Self {
102 Self::violation(DisconnectReasonCode::ProtocolError, message)
103 }
104
105 pub(crate) fn unexpected_packet(packet_type: u8, message: &'static str) -> ProtocolError {
106 Self::ProtocolViolation(ProtocolViolationError {
107 inner: ViolationInner::UnexpectedPacket { packet_type, message },
108 })
109 }
110 pub(crate) fn packet_id_mismatch() -> Self {
111 Self::generic_violation(
112 "Packet id of PUBACK packet does not match expected next value according to sending order of PUBLISH packets [MQTT-4.6.0-2]",
113 )
114 }
115}
116
117impl<E> From<io::Error> for MqttError<E> {
118 fn from(err: io::Error) -> Self {
119 MqttError::Handshake(HandshakeError::Disconnected(Some(err)))
120 }
121}
122
123impl<E> From<Either<io::Error, io::Error>> for MqttError<E> {
124 fn from(err: Either<io::Error, io::Error>) -> Self {
125 MqttError::Handshake(HandshakeError::Disconnected(Some(err.into_inner())))
126 }
127}
128
129impl<E> From<EncodeError> for MqttError<E> {
130 fn from(err: EncodeError) -> Self {
131 MqttError::Handshake(HandshakeError::Protocol(ProtocolError::Encode(err)))
132 }
133}
134
135impl<E> From<Either<DecodeError, io::Error>> for HandshakeError<E> {
136 fn from(err: Either<DecodeError, io::Error>) -> Self {
137 match err {
138 Either::Left(err) => HandshakeError::Protocol(ProtocolError::Decode(err)),
139 Either::Right(err) => HandshakeError::Disconnected(Some(err)),
140 }
141 }
142}
143
144#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, thiserror::Error)]
145pub enum DecodeError {
146 #[error("Invalid protocol")]
147 InvalidProtocol,
148 #[error("Invalid length")]
149 InvalidLength,
150 #[error("Malformed packet")]
151 MalformedPacket,
152 #[error("Unsupported protocol level")]
153 UnsupportedProtocolLevel,
154 #[error("Connect frame's reserved flag is set")]
155 ConnectReservedFlagSet,
156 #[error("ConnectAck frame's reserved flag is set")]
157 ConnAckReservedFlagSet,
158 #[error("Invalid client id")]
159 InvalidClientId,
160 #[error("Unsupported packet type")]
161 UnsupportedPacketType,
162 #[error("Packet id is required")]
164 PacketIdRequired,
165 #[error("Max size exceeded")]
166 MaxSizeExceeded,
167 #[error("utf8 error")]
168 Utf8Error,
169 #[error("Unexpected payload")]
170 UnexpectedPayload,
171}
172
173#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, thiserror::Error)]
174pub enum EncodeError {
175 #[error("Packet is bigger than peer's Maximum Packet Size")]
176 OverMaxPacketSize,
177 #[error("Streaming payload is bigger than Publish packet definition")]
178 OverPublishSize,
179 #[error("Streaming payload is incomplete")]
180 PublishIncomplete,
181 #[error("Invalid length")]
182 InvalidLength,
183 #[error("Malformed packet")]
184 MalformedPacket,
185 #[error("Packet id is required")]
186 PacketIdRequired,
187 #[error("Unexpected payload")]
188 UnexpectedPayload,
189 #[error("Publish packet is not completed, expect payload")]
190 ExpectPayload,
191 #[error("Unsupported version")]
192 UnsupportedVersion,
193}
194
195#[derive(Debug, PartialEq, Eq, Copy, Clone, thiserror::Error)]
196pub enum SendPacketError {
197 #[error("Encoding error {:?}", _0)]
199 Encode(#[from] EncodeError),
200 #[error("Provided packet id is in use")]
202 PacketIdInUse(NonZeroU16),
203 #[error("Unexpected publish release")]
205 UnexpectedRelease,
206 #[error("Streaming has been cancelled")]
208 StreamingCancelled,
209 #[error("Peer is disconnected")]
211 Disconnected,
212}
213
214#[derive(Debug, thiserror::Error)]
216pub enum ClientError<T: fmt::Debug> {
217 #[error("Connect ack failed: {:?}", _0)]
219 Ack(T),
220 #[error("Protocol error: {:?}", _0)]
222 Protocol(#[from] ProtocolError),
223 #[error("Handshake timeout")]
225 HandshakeTimeout,
226 #[error("Peer disconnected")]
228 Disconnected(Option<std::io::Error>),
229 #[error("Connect error: {}", _0)]
231 Connect(#[from] ntex_net::connect::ConnectError),
232}
233
234impl<T: fmt::Debug> From<EncodeError> for ClientError<T> {
235 fn from(err: EncodeError) -> Self {
236 ClientError::Protocol(ProtocolError::Encode(err))
237 }
238}
239
240impl<T: fmt::Debug> From<Either<DecodeError, std::io::Error>> for ClientError<T> {
241 fn from(err: Either<DecodeError, std::io::Error>) -> Self {
242 match err {
243 Either::Left(err) => ClientError::Protocol(ProtocolError::Decode(err)),
244 Either::Right(err) => ClientError::Disconnected(Some(err)),
245 }
246 }
247}