ntex_mqtt/
error.rs

1use std::{fmt, io, num::NonZeroU16};
2
3use ntex_util::future::Either;
4
5use crate::v5::codec::DisconnectReasonCode;
6
7/// Errors which can occur when attempting to handle mqtt connection.
8#[derive(Debug, thiserror::Error)]
9pub enum MqttError<E> {
10    /// Publish handler service error
11    #[error("Service error")]
12    Service(E),
13    /// Handshake error
14    #[error("Mqtt handshake error: {}", _0)]
15    Handshake(#[from] HandshakeError<E>),
16}
17
18/// Errors which can occur during mqtt connection handshake.
19#[derive(Debug, thiserror::Error)]
20pub enum HandshakeError<E> {
21    /// Handshake service error
22    #[error("Handshake service error")]
23    Service(E),
24    /// Protocol error
25    #[error("Mqtt protocol error: {}", _0)]
26    Protocol(#[from] ProtocolError),
27    /// Handshake timeout
28    #[error("Handshake timeout")]
29    Timeout,
30    /// Peer disconnect
31    #[error("Peer is disconnected, error: {:?}", _0)]
32    Disconnected(Option<io::Error>),
33}
34
35/// Errors related to payload processing
36#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
37pub enum PayloadError {
38    /// Protocol error
39    #[error("{0}")]
40    Protocol(#[from] ProtocolError),
41    /// Service error
42    #[error("Service error")]
43    Service,
44    /// Payload is consumed
45    #[error("Payload is consumed")]
46    Consumed,
47    /// Peer is disconnected
48    #[error("Peer is disconnected")]
49    Disconnected,
50}
51
52/// Protocol level errors
53#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)]
54pub enum ProtocolError {
55    /// MQTT decoding error
56    #[error("Decoding error: {0:?}")]
57    Decode(#[from] DecodeError),
58    /// MQTT encoding error
59    #[error("Encoding error: {0:?}")]
60    Encode(#[from] EncodeError),
61    /// Peer violated MQTT protocol specification
62    #[error("Protocol violation: {0}")]
63    ProtocolViolation(#[from] ProtocolViolationError),
64    /// Keep alive timeout
65    #[error("Keep Alive timeout")]
66    KeepAliveTimeout,
67    /// Read frame timeout
68    #[error("Read frame timeout")]
69    ReadTimeout,
70}
71
72#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)]
73#[error(transparent)]
74pub struct ProtocolViolationError {
75    inner: ViolationInner,
76}
77
78#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)]
79enum ViolationInner {
80    #[error("{message}")]
81    Common { reason: DisconnectReasonCode, message: &'static str },
82    #[error("{message}; received packet with type `{packet_type:b}`")]
83    UnexpectedPacket { packet_type: u8, message: &'static str },
84}
85
86impl ProtocolViolationError {
87    pub(crate) fn reason(&self) -> DisconnectReasonCode {
88        match self.inner {
89            ViolationInner::Common { reason, .. } => reason,
90            ViolationInner::UnexpectedPacket { .. } => DisconnectReasonCode::ProtocolError,
91        }
92    }
93}
94
95impl ProtocolError {
96    pub(crate) fn violation(reason: DisconnectReasonCode, message: &'static str) -> Self {
97        Self::ProtocolViolation(ProtocolViolationError {
98            inner: ViolationInner::Common { reason, message },
99        })
100    }
101    pub fn generic_violation(message: &'static str) -> Self {
102        Self::violation(DisconnectReasonCode::ProtocolError, message)
103    }
104
105    pub(crate) fn unexpected_packet(packet_type: u8, message: &'static str) -> ProtocolError {
106        Self::ProtocolViolation(ProtocolViolationError {
107            inner: ViolationInner::UnexpectedPacket { packet_type, message },
108        })
109    }
110    pub(crate) fn packet_id_mismatch() -> Self {
111        Self::generic_violation(
112            "Packet id of PUBACK packet does not match expected next value according to sending order of PUBLISH packets [MQTT-4.6.0-2]",
113        )
114    }
115}
116
117impl<E> From<io::Error> for MqttError<E> {
118    fn from(err: io::Error) -> Self {
119        MqttError::Handshake(HandshakeError::Disconnected(Some(err)))
120    }
121}
122
123impl<E> From<Either<io::Error, io::Error>> for MqttError<E> {
124    fn from(err: Either<io::Error, io::Error>) -> Self {
125        MqttError::Handshake(HandshakeError::Disconnected(Some(err.into_inner())))
126    }
127}
128
129impl<E> From<EncodeError> for MqttError<E> {
130    fn from(err: EncodeError) -> Self {
131        MqttError::Handshake(HandshakeError::Protocol(ProtocolError::Encode(err)))
132    }
133}
134
135impl<E> From<Either<DecodeError, io::Error>> for HandshakeError<E> {
136    fn from(err: Either<DecodeError, io::Error>) -> Self {
137        match err {
138            Either::Left(err) => HandshakeError::Protocol(ProtocolError::Decode(err)),
139            Either::Right(err) => HandshakeError::Disconnected(Some(err)),
140        }
141    }
142}
143
144#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, thiserror::Error)]
145pub enum DecodeError {
146    #[error("Invalid protocol")]
147    InvalidProtocol,
148    #[error("Invalid length")]
149    InvalidLength,
150    #[error("Malformed packet")]
151    MalformedPacket,
152    #[error("Unsupported protocol level")]
153    UnsupportedProtocolLevel,
154    #[error("Connect frame's reserved flag is set")]
155    ConnectReservedFlagSet,
156    #[error("ConnectAck frame's reserved flag is set")]
157    ConnAckReservedFlagSet,
158    #[error("Invalid client id")]
159    InvalidClientId,
160    #[error("Unsupported packet type")]
161    UnsupportedPacketType,
162    // MQTT v3 only
163    #[error("Packet id is required")]
164    PacketIdRequired,
165    #[error("Max size exceeded")]
166    MaxSizeExceeded,
167    #[error("utf8 error")]
168    Utf8Error,
169    #[error("Unexpected payload")]
170    UnexpectedPayload,
171}
172
173#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, thiserror::Error)]
174pub enum EncodeError {
175    #[error("Packet is bigger than peer's Maximum Packet Size")]
176    OverMaxPacketSize,
177    #[error("Streaming payload is bigger than Publish packet definition")]
178    OverPublishSize,
179    #[error("Streaming payload is incomplete")]
180    PublishIncomplete,
181    #[error("Invalid length")]
182    InvalidLength,
183    #[error("Malformed packet")]
184    MalformedPacket,
185    #[error("Packet id is required")]
186    PacketIdRequired,
187    #[error("Unexpected payload")]
188    UnexpectedPayload,
189    #[error("Publish packet is not completed, expect payload")]
190    ExpectPayload,
191    #[error("Unsupported version")]
192    UnsupportedVersion,
193}
194
195#[derive(Debug, PartialEq, Eq, Copy, Clone, thiserror::Error)]
196pub enum SendPacketError {
197    /// Encoder error
198    #[error("Encoding error {:?}", _0)]
199    Encode(#[from] EncodeError),
200    /// Provided packet id is in use
201    #[error("Provided packet id is in use")]
202    PacketIdInUse(NonZeroU16),
203    /// Unexpected release publish
204    #[error("Unexpected publish release")]
205    UnexpectedRelease,
206    /// Streaming has been cancelled
207    #[error("Streaming has been cancelled")]
208    StreamingCancelled,
209    /// Peer disconnected
210    #[error("Peer is disconnected")]
211    Disconnected,
212}
213
214/// Errors which can occur when attempting to handle mqtt client connection.
215#[derive(Debug, thiserror::Error)]
216pub enum ClientError<T: fmt::Debug> {
217    /// Connect negotiation failed
218    #[error("Connect ack failed: {:?}", _0)]
219    Ack(T),
220    /// Protocol error
221    #[error("Protocol error: {:?}", _0)]
222    Protocol(#[from] ProtocolError),
223    /// Handshake timeout
224    #[error("Handshake timeout")]
225    HandshakeTimeout,
226    /// Peer disconnected
227    #[error("Peer disconnected")]
228    Disconnected(Option<std::io::Error>),
229    /// Connect error
230    #[error("Connect error: {}", _0)]
231    Connect(#[from] ntex_net::connect::ConnectError),
232}
233
234impl<T: fmt::Debug> From<EncodeError> for ClientError<T> {
235    fn from(err: EncodeError) -> Self {
236        ClientError::Protocol(ProtocolError::Encode(err))
237    }
238}
239
240impl<T: fmt::Debug> From<Either<DecodeError, std::io::Error>> for ClientError<T> {
241    fn from(err: Either<DecodeError, std::io::Error>) -> Self {
242        match err {
243            Either::Left(err) => ClientError::Protocol(ProtocolError::Decode(err)),
244            Either::Right(err) => ClientError::Disconnected(Some(err)),
245        }
246    }
247}