mod client;
mod dump;
mod mqtt;
mod pubsub;
mod session;
#[cfg(any(test, doc))]
mod test;
use crate::{client::*, dump::*, pubsub::*, session::*};
use futures::{lock::Mutex, prelude::*};
use log::*;
use std::{io::{Error, ErrorKind},
ops::RangeInclusive,
sync::Arc,
time::Duration};
use tokio::{net::TcpListener, spawn, task::JoinHandle};
pub use dump::*;
const FOREVER: Duration = Duration::from_secs(60 * 60 * 24 * 365);
const ASAP: Duration = Duration::from_secs(0);
pub struct OptMsDuration(pub Option<Duration>);
impl From<Duration> for OptMsDuration {
fn from(d: Duration) -> Self {
Self(Some(d))
}
}
impl From<u64> for OptMsDuration {
fn from(u: u64) -> Self {
Self(Some(Duration::from_millis(u)))
}
}
impl From<Option<Duration>> for OptMsDuration {
fn from(od: Option<Duration>) -> Self {
Self(od)
}
}
#[derive(Debug, Clone)]
pub struct Conf {
ports: RangeInclusive<u16>,
ack_timeouts: (Option<Duration>, Option<Duration>),
ack_delay: Duration,
dump_files: Vec<String>,
dump_prefix: String,
dump_decode: Option<String>,
strict: bool,
idprefix: String,
userpass: Option<String>,
max_connect: usize,
max_pkt: Vec<Option<usize>>,
max_pkt_delay: Option<Duration>,
max_time: Vec<Option<Duration>>,
sess_expire: Vec<Option<Duration>>,
}
impl Conf {
pub fn new() -> Self {
Conf { ports: 1883..=2000,
dump_files: vec![],
dump_prefix: String::new(),
dump_decode: None,
ack_timeouts: (Some(Duration::from_secs(5)), None),
ack_delay: ASAP,
strict: false,
idprefix: "".into(),
userpass: None,
max_connect: std::usize::MAX,
max_pkt: vec![None],
max_pkt_delay: None,
max_time: vec![None],
sess_expire: vec![None] }
}
pub fn ports(mut self, ports: RangeInclusive<u16>) -> Self {
self.ports = ports;
self
}
pub fn dump_files(mut self, vs: Vec<impl Into<String>>) -> Self {
self.dump_files = vs.into_iter().map(|s| s.into()).collect();
self
}
pub fn dump_prefix(mut self, s: impl Into<String>) -> Self {
self.dump_prefix = s.into();
self
}
pub fn dump_decode(mut self, s: impl Into<Option<String>>) -> Self {
self.dump_decode = s.into();
self
}
pub fn ack_timeouts(mut self,
mqtt3: impl Into<OptMsDuration>,
mqtt5: impl Into<OptMsDuration>)
-> Self {
self.ack_timeouts = (mqtt3.into().0, mqtt5.into().0);
self
}
pub fn ack_delay(mut self, d: impl Into<OptMsDuration>) -> Self {
self.ack_delay = d.into().0.unwrap_or(Duration::from_secs(0));
self
}
pub fn strict(mut self, strict: bool) -> Self {
self.strict = strict;
self
}
pub fn idprefix(mut self, s: impl Into<String>) -> Self {
self.idprefix = s.into();
self
}
pub fn userpass(mut self, s: impl Into<Option<String>>) -> Self {
self.userpass = s.into();
self
}
pub fn max_connect(mut self, c: impl Into<Option<usize>>) -> Self {
self.max_connect = c.into().unwrap_or(std::usize::MAX);
self
}
pub fn max_pkt(mut self, vou: Vec<impl Into<Option<usize>>>) -> Self {
self.max_pkt = vou.into_iter().map(|ou| ou.into()).collect();
self
}
pub fn max_pkt_delay(mut self, d: impl Into<OptMsDuration>) -> Self {
self.max_pkt_delay = d.into().0;
self
}
pub fn max_time(mut self, vod: Vec<impl Into<OptMsDuration>>) -> Self {
self.max_time = vod.into_iter().map(|od| od.into().0).collect();
self
}
pub fn sess_expire(mut self, vod: Vec<impl Into<OptMsDuration>>) -> Self {
self.sess_expire = vod.into_iter().map(|od| od.into().0).collect();
self
}
}
async fn listen(ports: &RangeInclusive<u16>) -> Result<(u16, TcpListener), Error> {
for p in *ports.start()..=*ports.end() {
match TcpListener::bind(&format!("127.0.0.1:{}", p)).await {
Ok(l) => {
info!("Listening on 127.0.0.1:{}", p);
return Ok((p, l));
},
Err(e) => trace!("Listen on 127.0.0.1:{}: {}", p, e),
}
}
let s = format!("Listen failed on 127.0.0.1::{:?} (raise log level for details)", ports);
Err(Error::new(ErrorKind::Other, s))
}
pub struct Mqttest {
pub port: u16,
pub fut: JoinHandle<Vec<ConnInfo>>,
}
impl Mqttest {
pub async fn start(conf: Conf) -> Result<Mqttest, Error> {
debug!("Start {:?}", conf);
let (port, mut listener) = listen(&conf.ports).await?;
let fut = spawn(async move {
let subs = Arc::new(Mutex::new(Subs::new()));
let sess = Arc::new(Mutex::new(Sessions::new()));
let dumps = Dump::new(&conf.dump_decode, &conf.dump_prefix);
let mut conns: Vec<ConnInfo> = Vec::new();
let mut jh = Vec::new();
while let Some(s) = listener.incoming().next().await {
trace!("New connection {:?}", s);
match s {
Ok(socket) => {
conns.push(ConnInfo {});
jh.push(spawn(Client::start(conns.len() - 1,
socket,
subs.clone(),
sess.clone(),
dumps.clone(),
conf.clone())));
if conns.len() >= conf.max_connect {
break;
}
},
Err(e) => error!("Failed to accept socket: {:?}", e),
};
}
info!("Accepted {} connections, waiting for them to finish", conns.len());
for h in jh {
h.await.expect("Client finished abnormally");
}
conns
});
Ok(Mqttest { port, fut })
}
}
pub struct ConnInfo {}