ntex_mqtt/
error.rs

1use std::{fmt, io, num::NonZeroU16};
2
3use ntex_util::future::Either;
4
5use crate::v5::codec::DisconnectReasonCode;
6
7/// Errors which can occur when attempting to handle mqtt connection.
8#[derive(Debug, thiserror::Error)]
9pub enum MqttError<E> {
10    /// Publish handler service error
11    #[error("Service error")]
12    Service(E),
13    /// Handshake error
14    #[error("Mqtt handshake error: {}", _0)]
15    Handshake(#[from] HandshakeError<E>),
16}
17
18/// Errors which can occur during mqtt connection handshake.
19#[derive(Debug, thiserror::Error)]
20pub enum HandshakeError<E> {
21    /// Handshake service error
22    #[error("Handshake service error")]
23    Service(E),
24    /// Protocol error
25    #[error("Mqtt protocol error: {}", _0)]
26    Protocol(#[from] ProtocolError),
27    /// Handshake timeout
28    #[error("Handshake timeout")]
29    Timeout,
30    /// Peer disconnect
31    #[error("Peer is disconnected, error: {:?}", _0)]
32    Disconnected(Option<io::Error>),
33}
34
35/// Errors related to payload processing
36#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
37pub enum PayloadError {
38    /// Protocol error
39    #[error("{0}")]
40    Protocol(#[from] ProtocolError),
41    /// Service error
42    #[error("Service error")]
43    Service,
44    /// Payload is consumed
45    #[error("Payload is consumed")]
46    Consumed,
47    /// Peer is disconnected
48    #[error("Peer is disconnected")]
49    Disconnected,
50}
51
52/// Protocol level errors
53#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)]
54pub enum ProtocolError {
55    /// MQTT decoding error
56    #[error("Decoding error: {0:?}")]
57    Decode(#[from] DecodeError),
58    /// MQTT encoding error
59    #[error("Encoding error: {0:?}")]
60    Encode(#[from] EncodeError),
61    /// Peer violated MQTT protocol specification
62    #[error("Protocol violation: {0}")]
63    ProtocolViolation(#[from] ProtocolViolationError),
64    /// Keep alive timeout
65    #[error("Keep Alive timeout")]
66    KeepAliveTimeout,
67    /// Read frame timeout
68    #[error("Read frame timeout")]
69    ReadTimeout,
70}
71
72#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)]
73#[error(transparent)]
74pub struct ProtocolViolationError {
75    inner: ViolationInner,
76}
77
78#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)]
79enum ViolationInner {
80    #[error("{message}")]
81    Common { reason: DisconnectReasonCode, message: &'static str },
82    #[error("{message}; received packet with type `{packet_type:b}`")]
83    UnexpectedPacket { packet_type: u8, message: &'static str },
84}
85
86impl ProtocolViolationError {
87    /// Protocol violation reason code
88    pub fn reason(&self) -> DisconnectReasonCode {
89        match self.inner {
90            ViolationInner::Common { reason, .. } => reason,
91            ViolationInner::UnexpectedPacket { .. } => DisconnectReasonCode::ProtocolError,
92        }
93    }
94
95    /// Protocol violation reason message
96    pub fn message(&self) -> &'static str {
97        match self.inner {
98            ViolationInner::Common { message, .. } => message,
99            ViolationInner::UnexpectedPacket { message, .. } => message,
100        }
101    }
102}
103
104impl ProtocolError {
105    pub(crate) fn violation(reason: DisconnectReasonCode, message: &'static str) -> Self {
106        Self::ProtocolViolation(ProtocolViolationError {
107            inner: ViolationInner::Common { reason, message },
108        })
109    }
110    pub fn generic_violation(message: &'static str) -> Self {
111        Self::violation(DisconnectReasonCode::ProtocolError, message)
112    }
113
114    pub(crate) fn unexpected_packet(packet_type: u8, message: &'static str) -> ProtocolError {
115        Self::ProtocolViolation(ProtocolViolationError {
116            inner: ViolationInner::UnexpectedPacket { packet_type, message },
117        })
118    }
119    pub(crate) fn packet_id_mismatch() -> Self {
120        Self::generic_violation(
121            "Packet id of PUBACK packet does not match expected next value according to sending order of PUBLISH packets [MQTT-4.6.0-2]",
122        )
123    }
124}
125
126impl<E> From<io::Error> for MqttError<E> {
127    fn from(err: io::Error) -> Self {
128        MqttError::Handshake(HandshakeError::Disconnected(Some(err)))
129    }
130}
131
132impl<E> From<Either<io::Error, io::Error>> for MqttError<E> {
133    fn from(err: Either<io::Error, io::Error>) -> Self {
134        MqttError::Handshake(HandshakeError::Disconnected(Some(err.into_inner())))
135    }
136}
137
138impl<E> From<EncodeError> for MqttError<E> {
139    fn from(err: EncodeError) -> Self {
140        MqttError::Handshake(HandshakeError::Protocol(ProtocolError::Encode(err)))
141    }
142}
143
144impl<E> From<Either<DecodeError, io::Error>> for HandshakeError<E> {
145    fn from(err: Either<DecodeError, io::Error>) -> Self {
146        match err {
147            Either::Left(err) => HandshakeError::Protocol(ProtocolError::Decode(err)),
148            Either::Right(err) => HandshakeError::Disconnected(Some(err)),
149        }
150    }
151}
152
153#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, thiserror::Error)]
154pub enum DecodeError {
155    #[error("Invalid protocol")]
156    InvalidProtocol,
157    #[error("Invalid length")]
158    InvalidLength,
159    #[error("Malformed packet")]
160    MalformedPacket,
161    #[error("Unsupported protocol level")]
162    UnsupportedProtocolLevel,
163    #[error("Connect frame's reserved flag is set")]
164    ConnectReservedFlagSet,
165    #[error("ConnectAck frame's reserved flag is set")]
166    ConnAckReservedFlagSet,
167    #[error("Invalid client id")]
168    InvalidClientId,
169    #[error("Unsupported packet type")]
170    UnsupportedPacketType,
171    // MQTT v3 only
172    #[error("Packet id is required")]
173    PacketIdRequired,
174    #[error("Max size exceeded")]
175    MaxSizeExceeded,
176    #[error("utf8 error")]
177    Utf8Error,
178    #[error("Unexpected payload")]
179    UnexpectedPayload,
180}
181
182#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, thiserror::Error)]
183pub enum EncodeError {
184    #[error("Packet is bigger than peer's Maximum Packet Size")]
185    OverMaxPacketSize,
186    #[error("Streaming payload is bigger than Publish packet definition")]
187    OverPublishSize,
188    #[error("Streaming payload is incomplete")]
189    PublishIncomplete,
190    #[error("Invalid length")]
191    InvalidLength,
192    #[error("Malformed packet")]
193    MalformedPacket,
194    #[error("Packet id is required")]
195    PacketIdRequired,
196    #[error("Unexpected payload")]
197    UnexpectedPayload,
198    #[error("Publish packet is not completed, expect payload")]
199    ExpectPayload,
200    #[error("Unsupported version")]
201    UnsupportedVersion,
202}
203
204#[derive(Debug, PartialEq, Eq, Copy, Clone, thiserror::Error)]
205pub enum SendPacketError {
206    /// Encoder error
207    #[error("Encoding error {:?}", _0)]
208    Encode(#[from] EncodeError),
209    /// Provided packet id is in use
210    #[error("Provided packet id is in use")]
211    PacketIdInUse(NonZeroU16),
212    /// Unexpected release publish
213    #[error("Unexpected publish release")]
214    UnexpectedRelease,
215    /// Streaming has been cancelled
216    #[error("Streaming has been cancelled")]
217    StreamingCancelled,
218    /// Peer disconnected
219    #[error("Peer is disconnected")]
220    Disconnected,
221}
222
223/// Errors which can occur when attempting to handle mqtt client connection.
224#[derive(Debug, thiserror::Error)]
225pub enum ClientError<T: fmt::Debug> {
226    /// Connect negotiation failed
227    #[error("Connect ack failed: {:?}", _0)]
228    Ack(T),
229    /// Protocol error
230    #[error("Protocol error: {:?}", _0)]
231    Protocol(#[from] ProtocolError),
232    /// Handshake timeout
233    #[error("Handshake timeout")]
234    HandshakeTimeout,
235    /// Peer disconnected
236    #[error("Peer disconnected")]
237    Disconnected(Option<std::io::Error>),
238    /// Connect error
239    #[error("Connect error: {}", _0)]
240    Connect(#[from] ntex_net::connect::ConnectError),
241}
242
243impl<T: fmt::Debug> From<EncodeError> for ClientError<T> {
244    fn from(err: EncodeError) -> Self {
245        ClientError::Protocol(ProtocolError::Encode(err))
246    }
247}
248
249impl<T: fmt::Debug> From<Either<DecodeError, std::io::Error>> for ClientError<T> {
250    fn from(err: Either<DecodeError, std::io::Error>) -> Self {
251        match err {
252            Either::Left(err) => ClientError::Protocol(ProtocolError::Decode(err)),
253            Either::Right(err) => ClientError::Disconnected(Some(err)),
254        }
255    }
256}