use mqtt311::LastWill;
use std::time::Duration;
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum ReconnectOptions {
Never,
AfterFirstSuccess(u64),
Always(u64),
}
#[derive(Clone, Debug)]
pub enum SecurityOptions {
None,
UsernamePassword(String, String),
#[cfg(feature = "jwt")]
GcloudIot(String, Vec<u8>, i64),
}
#[derive(Clone, Debug)]
pub enum ConnectionMethod {
Tcp,
Tls(Vec<u8>, Option<(Vec<u8>, Vec<u8>)>),
}
#[derive(Clone, Debug)]
pub enum Proxy {
None,
HttpConnect(String, u16, Vec<u8>, i64),
}
#[derive(Clone, Debug)]
pub struct MqttOptions {
broker_addr: String,
port: u16,
keep_alive: Duration,
clean_session: bool,
client_id: String,
connection_method: ConnectionMethod,
proxy: Proxy,
reconnect: ReconnectOptions,
security: SecurityOptions,
max_packet_size: usize,
last_will: Option<LastWill>,
request_channel_capacity: usize,
notification_channel_capacity: usize,
outgoing_ratelimit: Option<u64>,
outgoing_queuelimit: (usize, Duration)
}
impl Default for MqttOptions {
fn default() -> Self {
MqttOptions {
broker_addr: "127.0.0.1".into(),
port: 1883,
keep_alive: Duration::from_secs(30),
clean_session: true,
client_id: "test-client".into(),
connection_method: ConnectionMethod::Tcp,
proxy: Proxy::None,
reconnect: ReconnectOptions::AfterFirstSuccess(10),
security: SecurityOptions::None,
max_packet_size: 256 * 1024,
last_will: None,
request_channel_capacity: 10,
notification_channel_capacity: 10,
outgoing_ratelimit: None,
outgoing_queuelimit: (100, Duration::from_secs(3)),
}
}
}
impl MqttOptions {
pub fn new<S: Into<String>, T: Into<String>>(id: S, host: T, port: u16) -> MqttOptions {
let id = id.into();
if id.starts_with(' ') || id.is_empty() {
panic!("Invalid client id")
}
MqttOptions {
broker_addr: host.into(),
port,
keep_alive: Duration::from_secs(60),
clean_session: true,
client_id: id,
connection_method: ConnectionMethod::Tcp,
proxy: Proxy::None,
reconnect: ReconnectOptions::AfterFirstSuccess(10),
security: SecurityOptions::None,
max_packet_size: 256 * 1024,
last_will: None,
request_channel_capacity: 10,
notification_channel_capacity: 10,
outgoing_ratelimit: None,
outgoing_queuelimit: (100, Duration::from_secs(3)),
}
}
pub fn broker_address(&self) -> (String, u16) {
(self.broker_addr.clone(), self.port)
}
pub fn set_keep_alive(mut self, secs: u16) -> Self {
if secs < 10 {
panic!("Keep alives should be >= 10 secs");
}
self.keep_alive = Duration::from_secs(u64::from(secs));
self
}
pub fn keep_alive(&self) -> Duration {
self.keep_alive
}
pub fn client_id(&self) -> String {
self.client_id.clone()
}
pub fn set_max_packet_size(mut self, sz: usize) -> Self {
self.max_packet_size = sz * 1024;
self
}
pub fn max_packet_size(&self) -> usize {
self.max_packet_size
}
pub fn set_clean_session(mut self, clean_session: bool) -> Self {
self.clean_session = clean_session;
self
}
pub fn clean_session(&self) -> bool {
self.clean_session
}
pub fn set_connection_method(mut self, opts: ConnectionMethod) -> Self {
self.connection_method = opts;
self
}
pub fn connection_method(&self) -> ConnectionMethod {
self.connection_method.clone()
}
pub fn set_proxy(mut self, proxy: Proxy) -> Self {
self.proxy = proxy;
self
}
pub fn proxy(&self) -> Proxy {
self.proxy.clone()
}
pub fn set_reconnect_opts(mut self, opts: ReconnectOptions) -> Self {
self.reconnect = opts;
self
}
pub fn reconnect_opts(&self) -> ReconnectOptions {
self.reconnect
}
pub fn set_security_opts(mut self, opts: SecurityOptions) -> Self {
self.security = opts;
self
}
pub fn security_opts(&self) -> SecurityOptions {
self.security.clone()
}
pub fn set_last_will(mut self, last_will: LastWill) -> Self {
self.last_will = Some(last_will);
self
}
pub fn last_will(&self) -> Option<mqtt311::LastWill> {
self.last_will.clone()
}
pub fn set_notification_channel_capacity(mut self, capacity: usize) -> Self {
self.notification_channel_capacity = capacity;
self
}
pub fn notification_channel_capacity(&self) -> usize {
self.notification_channel_capacity
}
pub fn set_request_channel_capacity(mut self, capacity: usize) -> Self {
self.request_channel_capacity = capacity;
self
}
pub fn request_channel_capacity(&self) -> usize {
self.request_channel_capacity
}
pub fn set_outgoing_ratelimit(mut self, rate: u64) -> Self {
if rate == 0 {
panic!("zero rate is not allowed");
}
self.outgoing_ratelimit = Some(rate);
self
}
pub fn outgoing_ratelimit(&self) -> Option<u64> {
self.outgoing_ratelimit
}
pub fn set_outgoing_queuelimit(mut self, queue_size: usize, delay: Duration) -> Self {
if queue_size == 0 {
panic!("zero queue size is not allowed")
}
self.outgoing_queuelimit = (queue_size, delay);
self
}
pub fn outgoing_queuelimit(&self) -> (usize, Duration) {
self.outgoing_queuelimit
}
}
#[cfg(test)]
mod test {
use crate::mqttoptions::{MqttOptions, ReconnectOptions};
#[test]
#[should_panic]
fn client_id_startswith_space() {
let _mqtt_opts = MqttOptions::new(" client_a", "127.0.0.1", 1883)
.set_reconnect_opts(ReconnectOptions::Always(10))
.set_clean_session(true);
}
#[test]
#[should_panic]
fn no_client_id() {
let _mqtt_opts = MqttOptions::new("", "127.0.0.1", 1883)
.set_reconnect_opts(ReconnectOptions::Always(10))
.set_clean_session(true);
}
}