#![recursion_limit = "512"]
#[macro_use]
extern crate log;
use rumq_core::mqtt4::{MqttRead, MqttWrite, Packet};
use std::io::Cursor;
use std::time::Duration;
mod eventloop;
mod network;
mod state;
pub use eventloop::eventloop;
pub use eventloop::{EventLoopError, MqttEventLoop};
pub use state::MqttState;
pub use rumq_core::mqtt4::*;
#[derive(Debug)]
pub enum Notification {
Publish(Publish),
Puback(PacketIdentifier),
Pubrec(PacketIdentifier),
Pubcomp(PacketIdentifier),
Suback(Suback),
Unsuback(PacketIdentifier),
Abort(EventLoopError),
}
#[derive(Debug)]
pub enum Request {
Publish(Publish),
Subscribe(Subscribe),
Unsubscribe(Unsubscribe),
Reconnect(Connect),
Disconnect,
}
impl From<Publish> for Request {
fn from(publish: Publish) -> Request {
return Request::Publish(publish);
}
}
impl From<Subscribe> for Request {
fn from(subscribe: Subscribe) -> Request {
return Request::Subscribe(subscribe);
}
}
impl From<Unsubscribe> for Request {
fn from(unsubscribe: Unsubscribe) -> Request {
return Request::Unsubscribe(unsubscribe);
}
}
impl From<Request> for Vec<u8> {
fn from(request: Request) -> Vec<u8> {
let mut packet = Cursor::new(Vec::new());
let o = match request {
Request::Reconnect(connect) => packet.mqtt_write(&Packet::Connect(connect)),
Request::Publish(publish) => packet.mqtt_write(&Packet::Publish(publish)),
Request::Subscribe(subscribe) => packet.mqtt_write(&Packet::Subscribe(subscribe)),
_ => unimplemented!(),
};
o.unwrap();
packet.into_inner()
}
}
impl From<Vec<u8>> for Request {
fn from(payload: Vec<u8>) -> Request {
let mut payload = Cursor::new(payload);
let packet = payload.mqtt_read().unwrap();
match packet {
Packet::Connect(connect) => Request::Reconnect(connect),
Packet::Publish(publish) => Request::Publish(publish),
Packet::Subscribe(subscribe) => Request::Subscribe(subscribe),
_ => unimplemented!(),
}
}
}
#[derive(Debug)]
pub enum Command {
Pause,
Resume,
}
#[derive(Clone, Debug)]
pub enum SecurityOptions {
None,
UsernamePassword(String, String),
}
#[derive(Clone, Debug)]
pub struct MqttOptions {
broker_addr: String,
port: u16,
keep_alive: Duration,
clean_session: bool,
client_id: String,
ca: Option<Vec<u8>>,
client_auth: Option<(Vec<u8>, Vec<u8>)>,
alpn: Option<Vec<Vec<u8>>>,
credentials: Option<(String, String)>,
max_packet_size: usize,
request_channel_capacity: usize,
notification_channel_capacity: usize,
throttle: Duration,
inflight: usize,
last_will: Option<LastWill>,
}
impl MqttOptions {
pub fn new<S: Into<String>, T: Into<String>>(id: S, host: T, port: u16) -> MqttOptions {
let id = id.into();
if id.starts_with(' ') || id.is_empty() {
panic!("Invalid client id")
}
MqttOptions {
broker_addr: host.into(),
port,
keep_alive: Duration::from_secs(60),
clean_session: true,
client_id: id,
ca: None,
client_auth: None,
alpn: None,
credentials: None,
max_packet_size: 256 * 1024,
request_channel_capacity: 10,
notification_channel_capacity: 10,
throttle: Duration::from_micros(0),
inflight: 100,
last_will: None,
}
}
pub fn broker_address(&self) -> (String, u16) {
(self.broker_addr.clone(), self.port)
}
pub fn set_last_will(&mut self, will: LastWill) -> &mut Self {
self.last_will = Some(will);
self
}
pub fn last_will(&mut self) -> Option<LastWill> {
self.last_will.clone()
}
pub fn set_ca(&mut self, ca: Vec<u8>) -> &mut Self {
self.ca = Some(ca);
self
}
pub fn ca(&self) -> Option<Vec<u8>> {
self.ca.clone()
}
pub fn set_client_auth(&mut self, cert: Vec<u8>, key: Vec<u8>) -> &mut Self {
self.client_auth = Some((cert, key));
self
}
pub fn client_auth(&self) -> Option<(Vec<u8>, Vec<u8>)> {
self.client_auth.clone()
}
pub fn set_alpn(&mut self, alpn: Vec<Vec<u8>>) -> &mut Self {
self.alpn = Some(alpn);
self
}
pub fn alpn(&self) -> Option<Vec<Vec<u8>>> {
self.alpn.clone()
}
pub fn set_keep_alive(&mut self, secs: u16) -> &mut Self {
if secs < 5 {
panic!("Keep alives should be >= 5 secs");
}
self.keep_alive = Duration::from_secs(u64::from(secs));
self
}
pub fn keep_alive(&self) -> Duration {
self.keep_alive
}
pub fn client_id(&self) -> String {
self.client_id.clone()
}
pub fn set_max_packet_size(&mut self, sz: usize) -> &mut Self {
self.max_packet_size = sz * 1024;
self
}
pub fn max_packet_size(&self) -> usize {
self.max_packet_size
}
pub fn set_clean_session(&mut self, clean_session: bool) -> &mut Self {
self.clean_session = clean_session;
self
}
pub fn clean_session(&self) -> bool {
self.clean_session
}
pub fn set_credentials<S: Into<String>>(&mut self, username: S, password: S) -> &mut Self {
self.credentials = Some((username.into(), password.into()));
self
}
pub fn credentials(&self) -> Option<(String, String)> {
self.credentials.clone()
}
pub fn set_notification_channel_capacity(&mut self, capacity: usize) -> &mut Self {
self.notification_channel_capacity = capacity;
self
}
pub fn notification_channel_capacity(&self) -> usize {
self.notification_channel_capacity
}
pub fn set_request_channel_capacity(&mut self, capacity: usize) -> &mut Self {
self.request_channel_capacity = capacity;
self
}
pub fn request_channel_capacity(&self) -> usize {
self.request_channel_capacity
}
pub fn set_throttle(&mut self, duration: Duration) -> &mut Self {
self.throttle = duration;
self
}
pub fn throttle(&self) -> Duration {
self.throttle
}
pub fn set_inflight(&mut self, inflight: usize) -> &mut Self {
if inflight == 0 {
panic!("zero in flight is not allowed")
}
self.inflight = inflight;
self
}
pub fn inflight(&self) -> usize {
self.inflight
}
}
#[cfg(test)]
mod test {
use super::MqttOptions;
#[test]
#[should_panic]
fn client_id_startswith_space() {
let _mqtt_opts = MqttOptions::new(" client_a", "127.0.0.1", 1883).set_clean_session(true);
}
#[test]
#[should_panic]
fn no_client_id() {
let _mqtt_opts = MqttOptions::new("", "127.0.0.1", 1883).set_clean_session(true);
}
}