#![cfg_attr(docsrs, feature(doc_cfg))]
#[macro_use]
extern crate log;
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
use std::time::Duration;
mod client;
mod eventloop;
mod framed;
mod state;
mod tls;
mod cond_fut;
pub use async_channel::{SendError, Sender, TrySendError};
pub use client::{AsyncClient, Client, ClientError, Connection};
pub use eventloop::{ConnectionError, Event, EventLoop};
pub use mqttbytes::v4::*;
pub use mqttbytes::*;
pub use state::{MqttState, StateError};
pub use rustls::internal::pemfile::{certs, pkcs8_private_keys, rsa_private_keys};
pub use rustls::ClientConfig;
pub type Incoming = Packet;
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum Outgoing {
Publish(u16),
Subscribe(u16),
Unsubscribe(u16),
PubAck(u16),
PubRec(u16),
PubRel(u16),
PubComp(u16),
PingReq,
PingResp,
Disconnect,
}
#[derive(Clone, Debug, PartialEq)]
pub enum Request {
Publish(Publish),
PubAck(PubAck),
PubRec(PubRec),
PubComp(PubComp),
PubRel(PubRel),
PingReq,
PingResp,
Subscribe(Subscribe),
SubAck(SubAck),
Unsubscribe(Unsubscribe),
UnsubAck(UnsubAck),
Disconnect,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum Key {
RSA(Vec<u8>),
ECC(Vec<u8>),
}
impl From<Publish> for Request {
fn from(publish: Publish) -> Request {
Request::Publish(publish)
}
}
impl From<Subscribe> for Request {
fn from(subscribe: Subscribe) -> Request {
Request::Subscribe(subscribe)
}
}
impl From<Unsubscribe> for Request {
fn from(unsubscribe: Unsubscribe) -> Request {
Request::Unsubscribe(unsubscribe)
}
}
#[derive(Clone)]
pub enum Transport {
Tcp,
Tls(TlsConfiguration),
#[cfg(feature = "websocket")]
#[cfg_attr(docsrs, doc(cfg(feature = "websocket")))]
Ws,
#[cfg(feature = "websocket")]
#[cfg_attr(docsrs, doc(cfg(feature = "websocket")))]
Wss(TlsConfiguration),
}
impl Default for Transport {
fn default() -> Self {
Self::tcp()
}
}
impl Transport {
pub fn tcp() -> Self {
Self::Tcp
}
pub fn tls(
ca: Vec<u8>,
client_auth: Option<(Vec<u8>, Key)>,
alpn: Option<Vec<Vec<u8>>>,
) -> Self {
let config = TlsConfiguration::Simple {
ca,
alpn,
client_auth,
};
Self::tls_with_config(config)
}
pub fn tls_with_config(tls_config: TlsConfiguration) -> Self {
Self::Tls(tls_config)
}
#[cfg(feature = "websocket")]
#[cfg_attr(docsrs, doc(cfg(feature = "websocket")))]
pub fn ws() -> Self {
Self::Ws
}
#[cfg(feature = "websocket")]
#[cfg_attr(docsrs, doc(cfg(feature = "websocket")))]
pub fn wss(
ca: Vec<u8>,
client_auth: Option<(Vec<u8>, Key)>,
alpn: Option<Vec<Vec<u8>>>,
) -> Self {
let config = TlsConfiguration::Simple {
ca,
client_auth,
alpn,
};
Self::wss_with_config(config)
}
#[cfg(feature = "websocket")]
#[cfg_attr(docsrs, doc(cfg(feature = "websocket")))]
pub fn wss_with_config(tls_config: TlsConfiguration) -> Self {
Self::Wss(tls_config)
}
}
#[derive(Clone)]
pub enum TlsConfiguration {
Simple {
ca: Vec<u8>,
alpn: Option<Vec<Vec<u8>>>,
client_auth: Option<(Vec<u8>, Key)>,
},
Rustls(Arc<ClientConfig>),
}
impl From<ClientConfig> for TlsConfiguration {
fn from(config: ClientConfig) -> Self {
TlsConfiguration::Rustls(Arc::new(config))
}
}
#[derive(Clone)]
pub struct MqttOptions {
broker_addr: String,
port: u16,
transport: Transport,
keep_alive: Duration,
clean_session: bool,
client_id: String,
credentials: Option<(String, String)>,
max_incoming_packet_size: usize,
max_outgoing_packet_size: usize,
request_channel_capacity: usize,
max_request_batch: usize,
pending_throttle: Duration,
inflight: u16,
last_will: Option<LastWill>,
collision_safety: bool,
conn_timeout: u64,
}
impl MqttOptions {
pub fn new<S: Into<String>, T: Into<String>>(id: S, host: T, port: u16) -> MqttOptions {
let id = id.into();
if id.starts_with(' ') || id.is_empty() {
panic!("Invalid client id")
}
MqttOptions {
broker_addr: host.into(),
port,
transport: Transport::tcp(),
keep_alive: Duration::from_secs(60),
clean_session: true,
client_id: id,
credentials: None,
max_incoming_packet_size: 10 * 1024,
max_outgoing_packet_size: 10 * 1024,
request_channel_capacity: 10,
max_request_batch: 0,
pending_throttle: Duration::from_micros(0),
inflight: 100,
last_will: None,
collision_safety: false,
conn_timeout: 5,
}
}
pub fn broker_address(&self) -> (String, u16) {
(self.broker_addr.clone(), self.port)
}
pub fn set_last_will(&mut self, will: LastWill) -> &mut Self {
self.last_will = Some(will);
self
}
pub fn last_will(&self) -> Option<LastWill> {
self.last_will.clone()
}
pub fn set_transport(&mut self, transport: Transport) -> &mut Self {
self.transport = transport;
self
}
pub fn transport(&self) -> Transport {
self.transport.clone()
}
pub fn set_keep_alive(&mut self, secs: u16) -> &mut Self {
if secs < 5 {
panic!("Keep alives should be >= 5 secs");
}
self.keep_alive = Duration::from_secs(u64::from(secs));
self
}
pub fn keep_alive(&self) -> Duration {
self.keep_alive
}
pub fn client_id(&self) -> String {
self.client_id.clone()
}
pub fn set_max_packet_size(&mut self, incoming: usize, outgoing: usize) -> &mut Self {
self.max_incoming_packet_size = incoming;
self.max_outgoing_packet_size = outgoing;
self
}
pub fn max_packet_size(&self) -> usize {
self.max_incoming_packet_size
}
pub fn set_clean_session(&mut self, clean_session: bool) -> &mut Self {
self.clean_session = clean_session;
self
}
pub fn clean_session(&self) -> bool {
self.clean_session
}
pub fn set_credentials<S: Into<String>>(&mut self, username: S, password: S) -> &mut Self {
self.credentials = Some((username.into(), password.into()));
self
}
pub fn credentials(&self) -> Option<(String, String)> {
self.credentials.clone()
}
pub fn set_request_channel_capacity(&mut self, capacity: usize) -> &mut Self {
self.request_channel_capacity = capacity;
self
}
pub fn request_channel_capacity(&self) -> usize {
self.request_channel_capacity
}
pub fn set_pending_throttle(&mut self, duration: Duration) -> &mut Self {
self.pending_throttle = duration;
self
}
pub fn pending_throttle(&self) -> Duration {
self.pending_throttle
}
pub fn set_inflight(&mut self, inflight: u16) -> &mut Self {
if inflight == 0 {
panic!("zero in flight is not allowed")
}
self.inflight = inflight;
self
}
pub fn inflight(&self) -> u16 {
self.inflight
}
pub fn set_collision_safety(&mut self, c: bool) -> &mut Self {
self.collision_safety = c;
self
}
pub fn collision_safety(&self) -> bool {
self.collision_safety
}
pub fn set_connection_timeout(&mut self, timeout: u64) -> &mut Self {
self.conn_timeout = timeout;
self
}
pub fn connection_timeout(&self) -> u64 {
self.conn_timeout
}
}
impl Debug for MqttOptions {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_struct("MqttOptions")
.field("broker_addr", &self.broker_addr)
.field("port", &self.port)
.field("keep_alive", &self.keep_alive)
.field("clean_session", &self.clean_session)
.field("client_id", &self.client_id)
.field("credentials", &self.credentials)
.field("max_packet_size", &self.max_incoming_packet_size)
.field("request_channel_capacity", &self.request_channel_capacity)
.field("max_request_batch", &self.max_request_batch)
.field("pending_throttle", &self.pending_throttle)
.field("inflight", &self.inflight)
.field("last_will", &self.last_will)
.field("conn_timeout", &self.conn_timeout)
.finish()
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
#[should_panic]
fn client_id_startswith_space() {
let _mqtt_opts = MqttOptions::new(" client_a", "127.0.0.1", 1883).set_clean_session(true);
}
#[test]
#[cfg(feature = "websocket")]
fn no_scheme() {
let mut _mqtt_opts = MqttOptions::new("client_a", "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host", 443);
_mqtt_opts.set_transport(crate::Transport::wss(Vec::from("Test CA"), None, None));
if let crate::Transport::Wss(TlsConfiguration::Simple {
ca,
client_auth,
alpn,
}) = _mqtt_opts.transport
{
assert_eq!(ca, Vec::from("Test CA"));
assert_eq!(client_auth, None);
assert_eq!(alpn, None);
} else {
panic!("Unexpected transport!");
}
assert_eq!(_mqtt_opts.broker_addr, "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host");
}
#[test]
#[should_panic]
fn no_client_id() {
let _mqtt_opts = MqttOptions::new("", "127.0.0.1", 1883).set_clean_session(true);
}
}