#[cfg(feature = "tls")]
use crate::tls::TlsConfig;
use crate::{
protocol::packet::FixedHeaderError,
tasks::{task_client::ClientRx, TaskHub},
Client
};
use anyhow::{bail, Result};
use packet::connect::will::LastWill;
use std::sync::Arc;
pub mod packet;
#[derive(Debug, Clone)]
pub struct MqttOptions {
broker_addr: String,
port: u16,
keep_alive: u16,
clean_session: bool,
client_id: Arc<String>,
credentials: Option<(Arc<String>, Arc<String>)>,
max_incoming_packet_size: usize,
max_outgoing_packet_size: usize,
last_will: Option<LastWill>,
pub(crate) auto_reconnect: bool,
pub(crate) network_protocol: NetworkProtocol
}
impl MqttOptions {
pub fn new<S: Into<Arc<String>>, T: Into<String>>(
id: S,
host: T,
port: u16
) -> Result<MqttOptions> {
let id = id.into();
if id.starts_with(' ') || id.is_empty() {
bail!("Invalid client id");
}
Ok(MqttOptions {
broker_addr: host.into(),
port,
keep_alive: 60,
clean_session: true,
client_id: id,
credentials: None,
max_incoming_packet_size: 10 * 1024,
max_outgoing_packet_size: 10 * 1024,
last_will: None,
auto_reconnect: false,
network_protocol: Default::default()
})
}
pub fn auto_reconnect(mut self) -> Self {
self.auto_reconnect = true;
self.set_clean_session(false);
self
}
#[cfg(feature = "tls")]
pub fn set_tls(mut self, config: TlsConfig) -> Self {
self.network_protocol = NetworkProtocol::Tls(config);
self
}
pub fn broker_address(&self) -> (String, u16) {
(self.broker_addr.clone(), self.port)
}
pub fn set_last_will(&mut self, will: LastWill) -> &mut Self {
self.last_will = Some(will);
self
}
pub fn last_will(&self) -> Option<LastWill> {
self.last_will.clone()
}
pub fn set_keep_alive(mut self, duration: u16) -> Self {
assert!(duration >= 5, "Keep alives should be >= 5 secs");
self.keep_alive = duration;
self
}
pub fn keep_alive(&self) -> u16 {
self.keep_alive
}
pub fn client_id(&self) -> Arc<String> {
self.client_id.clone()
}
pub fn set_max_packet_size(
&mut self,
incoming: usize,
outgoing: usize
) -> &mut Self {
self.max_incoming_packet_size = incoming;
self.max_outgoing_packet_size = outgoing;
self
}
pub fn max_packet_size(&self) -> usize {
self.max_incoming_packet_size
}
pub fn set_clean_session(
&mut self,
clean_session: bool
) -> &mut Self {
self.clean_session = clean_session;
self
}
pub fn clean_session(&self) -> bool {
self.clean_session
}
pub fn set_credentials<
U: Into<Arc<String>>,
P1: Into<Arc<String>>
>(
&mut self,
username: U,
password: P1
) -> &mut Self {
self.credentials = Some((username.into(), password.into()));
self
}
pub fn credentials(&self) -> Option<(Arc<String>, Arc<String>)> {
self.credentials.clone()
}
pub async fn connect_to_v4(self) -> Result<(Client, ClientRx)> {
Ok(TaskHub::connect(self, Protocol::V4).await?)
}
pub async fn connect_to_v5(self) -> Result<(Client, ClientRx)> {
Ok(TaskHub::connect(self, Protocol::V5).await?)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Protocol {
V4,
V5
}
impl Protocol {
pub fn is_v5(&self) -> bool {
*self == Protocol::V5
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
pub struct FixedHeader {
pub(crate) byte1: u8,
pub(crate) fixed_header_len: usize,
pub(crate) remaining_len: usize
}
impl FixedHeader {
pub fn new(
byte1: u8,
remaining_len_len: usize,
remaining_len: usize
) -> FixedHeader {
FixedHeader {
byte1,
fixed_header_len: remaining_len_len + 1,
remaining_len
}
}
pub fn packet_type(
&self
) -> Result<PacketType, PacketParseError> {
let num = self.byte1 >> 4;
match num {
1 => Ok(PacketType::Connect),
2 => Ok(PacketType::ConnAck),
3 => Ok(PacketType::Publish),
4 => Ok(PacketType::PubAck),
5 => Ok(PacketType::PubRec),
6 => Ok(PacketType::PubRel),
7 => Ok(PacketType::PubComp),
8 => Ok(PacketType::Subscribe),
9 => Ok(PacketType::SubAck),
10 => Ok(PacketType::Unsubscribe),
11 => Ok(PacketType::UnsubAck),
12 => Ok(PacketType::PingReq),
13 => Ok(PacketType::PingResp),
14 => Ok(PacketType::Disconnect),
_ => Err(PacketParseError::InvalidPacketType(num))
}
}
pub fn frame_length(&self) -> usize {
self.fixed_header_len + self.remaining_len
}
pub fn remaining_len(&self) -> usize {
self.remaining_len
}
}
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PacketType {
Connect = 1,
ConnAck,
Publish,
PubAck,
PubRec,
PubRel,
PubComp,
Subscribe,
SubAck,
Unsubscribe,
UnsubAck,
PingReq,
PingResp,
Disconnect
}
impl From<FixedHeaderError> for PacketParseError {
fn from(value: FixedHeaderError) -> Self {
match value {
FixedHeaderError::InsufficientBytes(len) => {
Self::InsufficientBytes(len)
},
FixedHeaderError::MalformedRemainingLength => {
Self::MalformedRemainingLength
},
}
}
}
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PropertyType {
PayloadFormatIndicator = 1,
MessageExpiryInterval = 2,
ContentType = 3,
ResponseTopic = 8,
CorrelationData = 9,
SubscriptionIdentifier = 11,
SessionExpiryInterval = 17,
AssignedClientIdentifier = 18,
ServerKeepAlive = 19,
AuthenticationMethod = 21,
AuthenticationData = 22,
RequestProblemInformation = 23,
WillDelayInterval = 24,
RequestResponseInformation = 25,
ResponseInformation = 26,
ServerReference = 28,
ReasonString = 31,
ReceiveMaximum = 33,
TopicAliasMaximum = 34,
TopicAlias = 35,
MaximumQos = 36,
RetainAvailable = 37,
UserProperty = 38,
MaximumPacketSize = 39,
WildcardSubscriptionAvailable = 40,
SubscriptionIdentifierAvailable = 41,
SharedSubscriptionAvailable = 42
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
pub enum PacketParseError {
#[error("Invalid Connect return code: {0}")]
InvalidConnectReturnCode(u8),
#[error("Invalid protocol")]
InvalidProtocol,
#[error("Invalid protocol level: {0}")]
InvalidProtocolLevel(u8),
#[error("Incorrect packet format")]
IncorrectPacketFormat,
#[error("Invalid packet type: {0}")]
InvalidPacketType(u8),
#[error("Invalid property type: {0}")]
InvalidPropertyType(u8),
#[error("Invalid QoS level: {0}")]
InvalidQoS(u8),
#[error("Invalid subscribe reason code: {0}")]
InvalidSubscribeReasonCode(u8),
#[error("Packet id Zero")]
PacketIdZero,
#[error("Payload size is incorrect")]
PayloadSizeIncorrect,
#[error("payload is too long")]
PayloadTooLong,
#[error("payload size limit exceeded: {0}")]
PayloadSizeLimitExceeded(usize),
#[error("Payload required")]
PayloadRequired,
#[error("Topic is not UTF-8")]
TopicNotUtf8,
#[error("Promised boundary crossed: {0}")]
BoundaryCrossed(usize),
#[error("Malformed packet")]
MalformedPacket,
#[error("A Subscribe packet must contain atleast one filter")]
EmptySubscription,
#[error("At least {0} more bytes required to frame packet")]
InsufficientBytes(usize),
#[error("Malformed remaining length")]
MalformedRemainingLength
}
fn property(num: u8) -> Result<PropertyType, PacketParseError> {
let property = match num {
1 => PropertyType::PayloadFormatIndicator,
2 => PropertyType::MessageExpiryInterval,
3 => PropertyType::ContentType,
8 => PropertyType::ResponseTopic,
9 => PropertyType::CorrelationData,
11 => PropertyType::SubscriptionIdentifier,
17 => PropertyType::SessionExpiryInterval,
18 => PropertyType::AssignedClientIdentifier,
19 => PropertyType::ServerKeepAlive,
21 => PropertyType::AuthenticationMethod,
22 => PropertyType::AuthenticationData,
23 => PropertyType::RequestProblemInformation,
24 => PropertyType::WillDelayInterval,
25 => PropertyType::RequestResponseInformation,
26 => PropertyType::ResponseInformation,
28 => PropertyType::ServerReference,
31 => PropertyType::ReasonString,
33 => PropertyType::ReceiveMaximum,
34 => PropertyType::TopicAliasMaximum,
35 => PropertyType::TopicAlias,
36 => PropertyType::MaximumQos,
37 => PropertyType::RetainAvailable,
38 => PropertyType::UserProperty,
39 => PropertyType::MaximumPacketSize,
40 => PropertyType::WildcardSubscriptionAvailable,
41 => PropertyType::SubscriptionIdentifierAvailable,
42 => PropertyType::SharedSubscriptionAvailable,
num => return Err(PacketParseError::InvalidPropertyType(num))
};
Ok(property)
}
fn len_len(len: usize) -> usize {
if len >= 2_097_152 {
4
} else if len >= 16_384 {
3
} else if len >= 128 {
2
} else {
1
}
}
#[derive(Debug, Clone)]
pub enum NetworkProtocol {
Tcp,
#[cfg(feature = "tls")]
Tls(TlsConfig) }
impl Default for NetworkProtocol {
fn default() -> Self {
Self::Tcp
}
}