#![allow(dead_code, unused)]
pub mod v4;
pub mod v5;
#[cfg(feature = "websockets")]
pub mod ws;
use std::{io, str::Utf8Error, string::FromUtf8Error};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use crate::Notification;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Packet {
Connect(
Connect,
Option<ConnectProperties>,
Option<LastWill>,
Option<LastWillProperties>,
Option<Login>,
),
ConnAck(ConnAck, Option<ConnAckProperties>),
Publish(Publish, Option<PublishProperties>),
PubAck(PubAck, Option<PubAckProperties>),
PingReq(PingReq),
PingResp(PingResp),
Subscribe(Subscribe, Option<SubscribeProperties>),
SubAck(SubAck, Option<SubAckProperties>),
PubRec(PubRec, Option<PubRecProperties>),
PubRel(PubRel, Option<PubRelProperties>),
PubComp(PubComp, Option<PubCompProperties>),
Unsubscribe(Unsubscribe, Option<UnsubscribeProperties>),
UnsubAck(UnsubAck, Option<UnsubAckProperties>),
Disconnect(Disconnect, Option<DisconnectProperties>),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PingReq;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PingResp;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Connect {
pub keep_alive: u16,
pub client_id: String,
pub clean_session: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConnectProperties {
pub session_expiry_interval: Option<u32>,
pub receive_maximum: Option<u16>,
pub max_packet_size: Option<u32>,
pub topic_alias_max: Option<u16>,
pub request_response_info: Option<u8>,
pub request_problem_info: Option<u8>,
pub user_properties: Vec<(String, String)>,
pub authentication_method: Option<String>,
pub authentication_data: Option<Bytes>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LastWill {
pub topic: Bytes,
pub message: Bytes,
pub qos: QoS,
pub retain: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LastWillProperties {
pub delay_interval: Option<u32>,
pub payload_format_indicator: Option<u8>,
pub message_expiry_interval: Option<u32>,
pub content_type: Option<String>,
pub response_topic: Option<String>,
pub correlation_data: Option<Bytes>,
pub user_properties: Vec<(String, String)>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Login {
pub username: String,
pub password: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectReturnCode {
Success,
RefusedProtocolVersion,
BadClientId,
ServiceUnavailable,
UnspecifiedError,
MalformedPacket,
ProtocolError,
ImplementationSpecificError,
UnsupportedProtocolVersion,
ClientIdentifierNotValid,
BadUserNamePassword,
NotAuthorized,
ServerUnavailable,
ServerBusy,
Banned,
BadAuthenticationMethod,
TopicNameInvalid,
PacketTooLarge,
QuotaExceeded,
PayloadFormatInvalid,
RetainNotSupported,
QoSNotSupported,
UseAnotherServer,
ServerMoved,
ConnectionRateExceeded,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConnAck {
pub session_present: bool,
pub code: ConnectReturnCode,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConnAckProperties {
pub session_expiry_interval: Option<u32>,
pub receive_max: Option<u16>,
pub max_qos: Option<u8>,
pub retain_available: Option<u8>,
pub max_packet_size: Option<u32>,
pub assigned_client_identifier: Option<String>,
pub topic_alias_max: Option<u16>,
pub reason_string: Option<String>,
pub user_properties: Vec<(String, String)>,
pub wildcard_subscription_available: Option<u8>,
pub subscription_identifiers_available: Option<u8>,
pub shared_subscription_available: Option<u8>,
pub server_keep_alive: Option<u16>,
pub response_information: Option<String>,
pub server_reference: Option<String>,
pub authentication_method: Option<String>,
pub authentication_data: Option<Bytes>,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct Publish {
pub(crate) dup: bool,
pub(crate) qos: QoS,
pub(crate) pkid: u16,
pub retain: bool,
pub topic: Bytes,
pub payload: Bytes,
}
impl Publish {
pub fn new<T: Into<Bytes>>(topic: T, payload: T, retain: bool) -> Publish {
Publish {
dup: false,
qos: QoS::AtMostOnce,
pkid: 0,
retain,
topic: topic.into(),
payload: payload.into(),
}
}
pub fn is_empty(&self) -> bool {
false
}
pub fn len(&self) -> usize {
let len = 2 + self.topic.len() + self.payload.len();
match self.qos == QoS::AtMostOnce {
true => len,
false => len + 2,
}
}
pub fn serialize(&self) -> Bytes {
let mut o = BytesMut::with_capacity(self.len() + 5);
let dup = self.dup as u8;
let qos = self.qos as u8;
let retain = self.retain as u8;
o.put_u8(0b0011_0000 | retain | qos << 1 | dup << 3);
o.put_u16(self.pkid);
o.put_u16(self.topic.len() as u16);
o.extend_from_slice(&self.topic[..]);
o.extend_from_slice(&self.payload[..]);
o.freeze()
}
pub fn deserialize(mut o: Bytes) -> Publish {
let header = o.get_u8();
let qos_num = (header & 0b0110) >> 1;
let qos = qos(qos_num).unwrap_or(QoS::AtMostOnce);
let dup = (header & 0b1000) != 0;
let retain = (header & 0b0001) != 0;
let pkid = o.get_u16();
let topic_len = o.get_u16();
let topic = o.split_to(topic_len as usize);
let payload = o;
Publish {
dup,
qos,
retain,
topic,
pkid,
payload,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PublishProperties {
pub payload_format_indicator: Option<u8>,
pub message_expiry_interval: Option<u32>,
pub topic_alias: Option<u16>,
pub response_topic: Option<String>,
pub correlation_data: Option<Bytes>,
pub user_properties: Vec<(String, String)>,
pub subscription_identifiers: Vec<usize>,
pub content_type: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PubAckReason {
Success,
NoMatchingSubscribers,
UnspecifiedError,
ImplementationSpecificError,
NotAuthorized,
TopicNameInvalid,
PacketIdentifierInUse,
QuotaExceeded,
PayloadFormatInvalid,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PubAck {
pub pkid: u16,
pub reason: PubAckReason,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PubAckProperties {
pub reason_string: Option<String>,
pub user_properties: Vec<(String, String)>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Subscribe {
pub pkid: u16,
pub filters: Vec<Filter>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Filter {
pub path: String,
pub qos: QoS,
pub nolocal: bool,
pub preserve_retain: bool,
pub retain_forward_rule: RetainForwardRule,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RetainForwardRule {
OnEverySubscribe,
OnNewSubscribe,
Never,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubscribeProperties {
pub id: Option<usize>,
pub user_properties: Vec<(String, String)>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubAck {
pub pkid: u16,
pub return_codes: Vec<SubscribeReasonCode>,
}
impl SubAck {
pub fn is_empty(&self) -> bool {
false
}
pub fn len(&self) -> usize {
2 + self.return_codes.len()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SubscribeReasonCode {
QoS0,
QoS1,
QoS2,
Success(QoS),
Failure,
Unspecified,
ImplementationSpecific,
NotAuthorized,
TopicFilterInvalid,
PkidInUse,
QuotaExceeded,
SharedSubscriptionsNotSupported,
SubscriptionIdNotSupported,
WildcardSubscriptionsNotSupported,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubAckProperties {
pub reason_string: Option<String>,
pub user_properties: Vec<(String, String)>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Unsubscribe {
pub pkid: u16,
pub filters: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UnsubscribeProperties {
pub user_properties: Vec<(String, String)>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UnsubAck {
pub pkid: u16,
pub reasons: Vec<UnsubAckReason>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum UnsubAckReason {
Success,
NoSubscriptionExisted,
UnspecifiedError,
ImplementationSpecificError,
NotAuthorized,
TopicFilterInvalid,
PacketIdentifierInUse,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UnsubAckProperties {
pub reason_string: Option<String>,
pub user_properties: Vec<(String, String)>,
}
struct Ping;
struct PingResponse;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum PubRecReason {
Success,
NoMatchingSubscribers,
UnspecifiedError,
ImplementationSpecificError,
NotAuthorized,
TopicNameInvalid,
PacketIdentifierInUse,
QuotaExceeded,
PayloadFormatInvalid,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PubRec {
pub pkid: u16,
pub reason: PubRecReason,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PubRecProperties {
pub reason_string: Option<String>,
pub user_properties: Vec<(String, String)>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum PubCompReason {
Success,
PacketIdentifierNotFound,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PubComp {
pub pkid: u16,
pub reason: PubCompReason,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PubCompProperties {
pub reason_string: Option<String>,
pub user_properties: Vec<(String, String)>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum PubRelReason {
Success,
PacketIdentifierNotFound,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PubRel {
pub pkid: u16,
pub reason: PubRelReason,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PubRelProperties {
pub reason_string: Option<String>,
pub user_properties: Vec<(String, String)>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Disconnect {
pub reason_code: DisconnectReasonCode,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum DisconnectReasonCode {
NormalDisconnection,
DisconnectWithWillMessage,
UnspecifiedError,
MalformedPacket,
ProtocolError,
ImplementationSpecificError,
NotAuthorized,
ServerBusy,
ServerShuttingDown,
KeepAliveTimeout,
SessionTakenOver,
TopicFilterInvalid,
TopicNameInvalid,
ReceiveMaximumExceeded,
TopicAliasInvalid,
PacketTooLarge,
MessageRateTooHigh,
QuotaExceeded,
AdministrativeAction,
PayloadFormatInvalid,
RetainNotSupported,
QoSNotSupported,
UseAnotherServer,
ServerMoved,
SharedSubscriptionNotSupported,
ConnectionRateExceeded,
MaximumConnectTime,
SubscriptionIdentifiersNotSupported,
WildcardSubscriptionsNotSupported,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DisconnectProperties {
pub session_expiry_interval: Option<u32>,
pub reason_string: Option<String>,
pub user_properties: Vec<(String, String)>,
pub server_reference: Option<String>,
}
#[repr(u8)]
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd)]
#[allow(clippy::enum_variant_names)]
pub enum QoS {
#[default]
AtMostOnce = 0,
AtLeastOnce = 1,
ExactlyOnce = 2,
}
pub fn qos(num: u8) -> Option<QoS> {
match num {
0 => Some(QoS::AtMostOnce),
1 => Some(QoS::AtLeastOnce),
2 => Some(QoS::ExactlyOnce),
qos => None,
}
}
pub fn has_wildcards(s: &str) -> bool {
s.contains('+') || s.contains('#')
}
pub fn valid_topic(topic: &str) -> bool {
if topic.contains('+') {
return false;
}
if topic.contains('#') {
return false;
}
true
}
pub fn valid_filter(filter: &str) -> bool {
if filter.is_empty() {
return false;
}
let hirerarchy = filter.split('/').collect::<Vec<&str>>();
if let Some((last, remaining)) = hirerarchy.split_last() {
for entry in remaining.iter() {
if entry.contains('#') {
return false;
}
if entry.len() > 1 && entry.contains('+') {
return false;
}
}
if last.len() != 1 && (last.contains('#') || last.contains('+')) {
return false;
}
}
true
}
pub fn matches(topic: &str, filter: &str) -> bool {
if !topic.is_empty() && topic[..1].contains('$') {
return false;
}
let mut topics = topic.split('/');
let mut filters = filter.split('/');
for f in filters.by_ref() {
if f == "#" {
return true;
}
let top = topics.next();
match top {
Some(t) if t == "#" => return false,
Some(_) if f == "+" => continue,
Some(t) if f != t => return false,
Some(_) => continue,
None => return false,
}
}
if topics.next().is_some() {
return false;
}
true
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
pub enum Error {
#[error("Invalid return code received as response for connect = {0}")]
InvalidConnectReturnCode(u8),
#[error("Invalid reason = {0}")]
InvalidReason(u8),
#[error("Invalid reason = {0}")]
InvalidRemainingLength(usize),
#[error("Invalid protocol used")]
InvalidProtocol,
#[error("Invalid protocol level {0}. Make sure right port is being used.")]
InvalidProtocolLevel(u8),
#[error("Invalid packet format")]
IncorrectPacketFormat,
#[error("Invalid packet type = {0}")]
InvalidPacketType(u8),
#[error("Invalid retain forward rule = {0}")]
InvalidRetainForwardRule(u8),
#[error("Invalid QoS level = {0}")]
InvalidQoS(u8),
#[error("Invalid subscribe reason code = {0}")]
InvalidSubscribeReasonCode(u8),
#[error("Packet received has id Zero")]
PacketIdZero,
#[error("Empty Subscription")]
EmptySubscription,
#[error("Subscription had id Zero")]
SubscriptionIdZero,
#[error("Payload size is incorrect")]
PayloadSizeIncorrect,
#[error("Payload is too long")]
PayloadTooLong,
#[error("Payload size has been exceeded by {0} bytes")]
PayloadSizeLimitExceeded(usize),
#[error("Payload is required")]
PayloadRequired,
#[error("Payload is required = {0}")]
PayloadNotUtf8(#[from] Utf8Error),
#[error("Topic not utf-8")]
TopicNotUtf8,
#[error("Promised boundary crossed, contains {0} bytes")]
BoundaryCrossed(usize),
#[error("Packet is malformed")]
MalformedPacket,
#[error("Remaining length is malformed")]
MalformedRemainingLength,
#[error("Invalid property type = {0}")]
InvalidPropertyType(u8),
#[error("Insufficient number of bytes to frame packet, {0} more bytes required")]
InsufficientBytes(usize),
}
pub trait Protocol: Clone + Send + 'static {
fn read_mut(&mut self, stream: &mut BytesMut, max_size: usize) -> Result<Packet, Error>;
fn write(&self, packet: Packet, write: &mut BytesMut) -> Result<usize, Error>;
}