mqtt5-protocol 0.12.0

MQTT v5.0 protocol implementation - packets, encoding, and validation
Documentation
use crate::packet::suback::SubAckReasonCode;
use crate::prelude::String;
use crate::protocol::v5::reason_codes::ReasonCode;

#[cfg(feature = "std")]
use thiserror::Error;

#[cfg(not(feature = "std"))]
use core::fmt;

#[cfg(feature = "std")]
pub type Result<T> = std::result::Result<T, MqttError>;

#[cfg(not(feature = "std"))]
pub type Result<T> = core::result::Result<T, MqttError>;

#[derive(Debug, Clone)]
#[cfg_attr(feature = "std", derive(Error))]
pub enum MqttError {
    #[cfg_attr(feature = "std", error("IO error: {0}"))]
    Io(String),

    #[cfg_attr(feature = "std", error("Invalid topic name: {0}"))]
    InvalidTopicName(String),

    #[cfg_attr(feature = "std", error("Invalid topic filter: {0}"))]
    InvalidTopicFilter(String),

    #[cfg_attr(feature = "std", error("Invalid client ID: {0}"))]
    InvalidClientId(String),

    #[cfg_attr(feature = "std", error("Connection error: {0}"))]
    ConnectionError(String),

    #[cfg_attr(feature = "std", error("Connection refused: {0:?}"))]
    ConnectionRefused(ReasonCode),

    #[cfg_attr(feature = "std", error("Protocol error: {0}"))]
    ProtocolError(String),

    #[cfg_attr(feature = "std", error("Malformed packet: {0}"))]
    MalformedPacket(String),

    #[cfg_attr(
        feature = "std",
        error("Packet too large: size {size} exceeds maximum {max}")
    )]
    PacketTooLarge { size: usize, max: usize },

    #[cfg_attr(feature = "std", error("Authentication failed"))]
    AuthenticationFailed,

    #[cfg_attr(feature = "std", error("Not authorized"))]
    NotAuthorized,

    #[cfg_attr(feature = "std", error("Not connected"))]
    NotConnected,

    #[cfg_attr(feature = "std", error("Already connected"))]
    AlreadyConnected,

    #[cfg_attr(feature = "std", error("Timeout"))]
    Timeout,

    #[cfg_attr(feature = "std", error("Subscription failed: {0:?}"))]
    SubscriptionFailed(ReasonCode),

    #[cfg_attr(feature = "std", error("Subscription denied: {0:?}"))]
    SubscriptionDenied(SubAckReasonCode),

    #[cfg_attr(feature = "std", error("Unsubscription failed: {0:?}"))]
    UnsubscriptionFailed(ReasonCode),

    #[cfg_attr(feature = "std", error("Publish failed: {0:?}"))]
    PublishFailed(ReasonCode),

    #[cfg_attr(feature = "std", error("Packet identifier not found: {0}"))]
    PacketIdNotFound(u16),

    #[cfg_attr(feature = "std", error("Packet identifier already in use: {0}"))]
    PacketIdInUse(u16),

    #[cfg_attr(feature = "std", error("Invalid QoS: {0}"))]
    InvalidQoS(u8),

    #[cfg_attr(feature = "std", error("Invalid packet type: {0}"))]
    InvalidPacketType(u8),

    #[cfg_attr(feature = "std", error("Invalid reason code: {0}"))]
    InvalidReasonCode(u8),

    #[cfg_attr(feature = "std", error("Invalid property ID: {0}"))]
    InvalidPropertyId(u8),

    #[cfg_attr(feature = "std", error("Duplicate property ID: {0}"))]
    DuplicatePropertyId(u8),

    #[cfg_attr(feature = "std", error("Session expired"))]
    SessionExpired,

    #[cfg_attr(feature = "std", error("Keep alive timeout"))]
    KeepAliveTimeout,

    #[cfg_attr(feature = "std", error("Server shutting down"))]
    ServerShuttingDown,

    #[cfg_attr(feature = "std", error("Client closed connection"))]
    ClientClosed,

    #[cfg_attr(feature = "std", error("Connection closed by peer"))]
    ConnectionClosedByPeer,

    #[cfg_attr(feature = "std", error("Maximum connect time exceeded"))]
    MaxConnectTime,

    #[cfg_attr(feature = "std", error("Topic alias invalid: {0}"))]
    TopicAliasInvalid(u16),

    #[cfg_attr(feature = "std", error("Receive maximum exceeded"))]
    ReceiveMaximumExceeded,

    #[cfg_attr(feature = "std", error("Will message rejected"))]
    WillRejected,

    #[cfg_attr(feature = "std", error("Implementation specific error: {0}"))]
    ImplementationSpecific(String),

    #[cfg_attr(feature = "std", error("Unsupported protocol version"))]
    UnsupportedProtocolVersion,

    #[cfg_attr(feature = "std", error("Invalid state: {0}"))]
    InvalidState(String),

    #[cfg_attr(feature = "std", error("Client identifier not valid"))]
    ClientIdentifierNotValid,

    #[cfg_attr(feature = "std", error("Bad username or password"))]
    BadUsernameOrPassword,

    #[cfg_attr(feature = "std", error("Server unavailable"))]
    ServerUnavailable,

    #[cfg_attr(feature = "std", error("Server busy"))]
    ServerBusy,

    #[cfg_attr(feature = "std", error("Banned"))]
    Banned,

    #[cfg_attr(feature = "std", error("Bad authentication method"))]
    BadAuthenticationMethod,

    #[cfg_attr(feature = "std", error("Quota exceeded"))]
    QuotaExceeded,

    #[cfg_attr(feature = "std", error("Payload format invalid"))]
    PayloadFormatInvalid,

    #[cfg_attr(feature = "std", error("Retain not supported"))]
    RetainNotSupported,

    #[cfg_attr(feature = "std", error("QoS not supported"))]
    QoSNotSupported,

    #[cfg_attr(feature = "std", error("Use another server"))]
    UseAnotherServer,

    #[cfg_attr(feature = "std", error("Server moved"))]
    ServerMoved,

    #[cfg_attr(feature = "std", error("Shared subscriptions not supported"))]
    SharedSubscriptionsNotSupported,

    #[cfg_attr(feature = "std", error("Connection rate exceeded"))]
    ConnectionRateExceeded,

    #[cfg_attr(feature = "std", error("Subscription identifiers not supported"))]
    SubscriptionIdentifiersNotSupported,

    #[cfg_attr(feature = "std", error("Wildcard subscriptions not supported"))]
    WildcardSubscriptionsNotSupported,

    #[cfg_attr(feature = "std", error("Message too large for queue"))]
    MessageTooLarge,

    #[cfg_attr(feature = "std", error("Flow control exceeded"))]
    FlowControlExceeded,

    #[cfg_attr(feature = "std", error("Packet ID exhausted"))]
    PacketIdExhausted,

    #[cfg_attr(
        feature = "std",
        error("String too long: {0} bytes exceeds maximum of 65535")
    )]
    StringTooLong(usize),

    #[cfg_attr(feature = "std", error("Configuration error: {0}"))]
    Configuration(String),
}

#[cfg(not(feature = "std"))]
impl fmt::Display for MqttError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::Io(s) => write!(f, "IO error: {s}"),
            Self::InvalidTopicName(s) => write!(f, "Invalid topic name: {s}"),
            Self::InvalidTopicFilter(s) => write!(f, "Invalid topic filter: {s}"),
            Self::InvalidClientId(s) => write!(f, "Invalid client ID: {s}"),
            Self::ConnectionError(s) => write!(f, "Connection error: {s}"),
            Self::ConnectionRefused(r) => write!(f, "Connection refused: {r:?}"),
            Self::ProtocolError(s) => write!(f, "Protocol error: {s}"),
            Self::MalformedPacket(s) => write!(f, "Malformed packet: {s}"),
            Self::PacketTooLarge { size, max } => {
                write!(f, "Packet too large: size {size} exceeds maximum {max}")
            }
            Self::AuthenticationFailed => write!(f, "Authentication failed"),
            Self::NotAuthorized => write!(f, "Not authorized"),
            Self::NotConnected => write!(f, "Not connected"),
            Self::AlreadyConnected => write!(f, "Already connected"),
            Self::Timeout => write!(f, "Timeout"),
            Self::SubscriptionFailed(r) => write!(f, "Subscription failed: {r:?}"),
            Self::SubscriptionDenied(r) => write!(f, "Subscription denied: {r:?}"),
            Self::UnsubscriptionFailed(r) => write!(f, "Unsubscription failed: {r:?}"),
            Self::PublishFailed(r) => write!(f, "Publish failed: {r:?}"),
            Self::PacketIdNotFound(id) => write!(f, "Packet identifier not found: {id}"),
            Self::PacketIdInUse(id) => write!(f, "Packet identifier already in use: {id}"),
            Self::InvalidQoS(q) => write!(f, "Invalid QoS: {q}"),
            Self::InvalidPacketType(t) => write!(f, "Invalid packet type: {t}"),
            Self::InvalidReasonCode(r) => write!(f, "Invalid reason code: {r}"),
            Self::InvalidPropertyId(p) => write!(f, "Invalid property ID: {p}"),
            Self::DuplicatePropertyId(p) => write!(f, "Duplicate property ID: {p}"),
            Self::SessionExpired => write!(f, "Session expired"),
            Self::KeepAliveTimeout => write!(f, "Keep alive timeout"),
            Self::ServerShuttingDown => write!(f, "Server shutting down"),
            Self::ClientClosed => write!(f, "Client closed connection"),
            Self::ConnectionClosedByPeer => write!(f, "Connection closed by peer"),
            Self::MaxConnectTime => write!(f, "Maximum connect time exceeded"),
            Self::TopicAliasInvalid(a) => write!(f, "Topic alias invalid: {a}"),
            Self::ReceiveMaximumExceeded => write!(f, "Receive maximum exceeded"),
            Self::WillRejected => write!(f, "Will message rejected"),
            Self::ImplementationSpecific(s) => write!(f, "Implementation specific error: {s}"),
            Self::UnsupportedProtocolVersion => write!(f, "Unsupported protocol version"),
            Self::InvalidState(s) => write!(f, "Invalid state: {s}"),
            Self::ClientIdentifierNotValid => write!(f, "Client identifier not valid"),
            Self::BadUsernameOrPassword => write!(f, "Bad username or password"),
            Self::ServerUnavailable => write!(f, "Server unavailable"),
            Self::ServerBusy => write!(f, "Server busy"),
            Self::Banned => write!(f, "Banned"),
            Self::BadAuthenticationMethod => write!(f, "Bad authentication method"),
            Self::QuotaExceeded => write!(f, "Quota exceeded"),
            Self::PayloadFormatInvalid => write!(f, "Payload format invalid"),
            Self::RetainNotSupported => write!(f, "Retain not supported"),
            Self::QoSNotSupported => write!(f, "QoS not supported"),
            Self::UseAnotherServer => write!(f, "Use another server"),
            Self::ServerMoved => write!(f, "Server moved"),
            Self::SharedSubscriptionsNotSupported => {
                write!(f, "Shared subscriptions not supported")
            }
            Self::ConnectionRateExceeded => write!(f, "Connection rate exceeded"),
            Self::SubscriptionIdentifiersNotSupported => {
                write!(f, "Subscription identifiers not supported")
            }
            Self::WildcardSubscriptionsNotSupported => {
                write!(f, "Wildcard subscriptions not supported")
            }
            Self::MessageTooLarge => write!(f, "Message too large for queue"),
            Self::FlowControlExceeded => write!(f, "Flow control exceeded"),
            Self::PacketIdExhausted => write!(f, "Packet ID exhausted"),
            Self::StringTooLong(len) => {
                write!(f, "String too long: {len} bytes exceeds maximum of 65535")
            }
            Self::Configuration(s) => write!(f, "Configuration error: {s}"),
        }
    }
}

impl MqttError {
    #[must_use]
    pub fn is_normal_disconnect(&self) -> bool {
        match self {
            Self::ClientClosed
            | Self::ConnectionClosedByPeer
            | Self::UseAnotherServer
            | Self::ServerMoved => true,
            Self::Io(msg)
                if msg.contains("stream has been shut down")
                    || msg.contains("Connection reset") =>
            {
                true
            }
            _ => false,
        }
    }
}

#[cfg(feature = "std")]
impl From<std::io::Error> for MqttError {
    fn from(err: std::io::Error) -> Self {
        MqttError::Io(err.to_string())
    }
}

impl From<String> for MqttError {
    fn from(msg: String) -> Self {
        MqttError::MalformedPacket(msg)
    }
}

impl From<&str> for MqttError {
    fn from(msg: &str) -> Self {
        MqttError::MalformedPacket(crate::prelude::ToString::to_string(msg))
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_error_display() {
        let err = MqttError::InvalidTopicName("test/+/topic".to_string());
        assert_eq!(err.to_string(), "Invalid topic name: test/+/topic");

        let err = MqttError::PacketTooLarge {
            size: 1000,
            max: 500,
        };
        assert_eq!(
            err.to_string(),
            "Packet too large: size 1000 exceeds maximum 500"
        );

        let err = MqttError::ConnectionRefused(ReasonCode::BadUsernameOrPassword);
        assert_eq!(err.to_string(), "Connection refused: BadUsernameOrPassword");
    }

    #[cfg(feature = "std")]
    #[test]
    fn test_error_from_io() {
        use std::io;
        let io_err = io::Error::new(io::ErrorKind::ConnectionRefused, "test");
        let mqtt_err: MqttError = io_err.into();
        match mqtt_err {
            MqttError::Io(e) => assert!(e.contains("test")),
            _ => panic!("Expected Io error"),
        }
    }

    #[test]
    fn test_result_type() {
        #[allow(clippy::unnecessary_wraps)]
        fn returns_result() -> Result<String> {
            Ok("success".to_string())
        }

        fn returns_error() -> Result<String> {
            Err(MqttError::NotConnected)
        }

        assert!(returns_result().is_ok());
        assert!(returns_error().is_err());
    }
}