mod actors;
mod client;
mod consts;
pub use actix;
pub use futures;
pub use mqtt::QualityOfService;
pub use tokio;
pub use crate::actors::packets::PublishMessage;
pub use crate::actors::{ErrorMessage, StopMessage};
pub use crate::client::{MqttClient, MqttOptions};
#[cfg(test)]
mod tests {
pub struct ErrorActor;
impl actix::Actor for ErrorActor {
type Context = actix::Context<Self>;
}
impl actix::Handler<super::ErrorMessage> for ErrorActor {
type Result = ();
fn handle(&mut self, error: super::ErrorMessage, _: &mut Self::Context) -> Self::Result {
log::error!("{}", error.0);
}
}
pub struct MessageActor;
impl actix::Actor for MessageActor {
type Context = actix::Context<Self>;
}
impl actix::Handler<crate::actors::packets::PublishMessage> for MessageActor {
type Result = ();
fn handle(
&mut self,
msg: crate::actors::packets::PublishMessage,
_: &mut Self::Context,
) -> Self::Result {
log::info!(
"Got message: id:{}, topic: {}, payload: {:?}",
msg.id,
msg.topic_name,
msg.payload
);
}
}
#[test]
fn test_client() {
use std::io::Error as IoError;
use std::net::SocketAddr;
use std::str::FromStr;
use std::time::Duration;
use actix::{Actor, Arbiter, System};
use env_logger;
use tokio::io::split;
use tokio::net::TcpStream;
use tokio::time::{delay_until, Instant};
use crate::client::{MqttClient, MqttOptions};
env_logger::init();
System::run(|| {
let socket_addr = SocketAddr::from_str("127.0.0.1:1883").unwrap();
let future = async move {
let result = async move {
let stream = TcpStream::connect(socket_addr).await?;
let (r, w) = split(stream);
log::info!("TCP connected");
let mut client = MqttClient::new(
r,
w,
String::from("test"),
MqttOptions::default(),
MessageActor.start().recipient(),
ErrorActor.start().recipient(),
None,
);
client.connect().await?;
while !client.is_connected().await.unwrap() {
log::info!("Waiting for client to be connected");
let delay_time = Instant::now() + Duration::new(0, 100);
delay_until(delay_time).await;
}
log::info!("MQTT connected");
log::info!("Subscribe");
client
.subscribe(String::from("test"), mqtt::QualityOfService::Level2)
.await?;
log::info!("Publish");
client
.publish(
String::from("test"),
mqtt::QualityOfService::Level0,
Vec::from("test".as_bytes()),
)
.await?;
log::info!("Wait for 1s");
let delay_time = Instant::now() + Duration::new(1, 0);
delay_until(delay_time).await;
client
.publish(
String::from("test"),
mqtt::QualityOfService::Level1,
Vec::from("test2".as_bytes()),
)
.await?;
log::info!("Wait for 1s");
let delay_time = Instant::now() + Duration::new(1, 0);
delay_until(delay_time).await;
client
.publish(
String::from("test"),
mqtt::QualityOfService::Level2,
Vec::from("test3".as_bytes()),
)
.await?;
log::info!("Wait for 1s");
let delay_time = Instant::now() + Duration::new(1, 0);
delay_until(delay_time).await;
log::info!("Disconnect");
client.disconnect(false).await?;
log::info!("Check if disconnect is successful");
for _ in 0..5 {
if client.is_disconnected() {
break;
}
let delay_time = Instant::now() + Duration::new(0, 200);
delay_until(delay_time).await;
}
Ok(assert_eq!(true, client.is_disconnected())) as Result<(), IoError>
}
.await;
let r = result.unwrap();
System::current().stop();
r
};
Arbiter::spawn(future);
})
.unwrap();
}
}
#[cfg(test)]
mod random_test {
use tokio::sync::mpsc::{channel, Sender};
pub struct ErrorActor;
impl actix::Actor for ErrorActor {
type Context = actix::Context<Self>;
}
impl actix::Handler<super::ErrorMessage> for ErrorActor {
type Result = ();
fn handle(&mut self, error: super::ErrorMessage, _: &mut Self::Context) -> Self::Result {
log::error!("{}", error.0);
}
}
pub struct MessageActor(Sender<(bool, Vec<u8>)>);
impl actix::Actor for MessageActor {
type Context = actix::Context<Self>;
}
impl actix::Handler<crate::actors::packets::PublishMessage> for MessageActor {
type Result = ();
fn handle(
&mut self,
msg: crate::actors::packets::PublishMessage,
_: &mut Self::Context,
) -> Self::Result {
log::info!(
"Got message: id:{}, topic: {}, payload: {:?}",
msg.id,
msg.topic_name,
msg.payload
);
self.0.try_send((false, msg.payload)).unwrap();
}
}
lazy_static::lazy_static! {
static ref PACKETS: std::sync::Mutex<std::collections::HashSet<Vec<u8>>> = std::sync::Mutex::new(std::collections::HashSet::new());
}
#[test]
fn test_random_publish_level0_cloned_client() {
use std::io::Error as IoError;
use std::net::SocketAddr;
use std::str::FromStr;
use std::time::Duration;
use actix::{Actor, Arbiter, System};
use env_logger;
use futures::stream::StreamExt;
use tokio::io::split;
use tokio::net::TcpStream;
use tokio::time::{delay_until, Instant};
use crate::client::{MqttClient, MqttOptions};
env_logger::init();
let (sender, recv) = channel(100);
System::run(|| {
let socket_addr = SocketAddr::from_str("127.0.0.1:1883").unwrap();
let future = async move {
let result = async move {
let stream = TcpStream::connect(socket_addr).await?;
let (r, w) = split(stream);
let mut client = MqttClient::new(
r,
w,
String::from("test"),
MqttOptions::default(),
MessageActor(sender.clone()).start().recipient(),
ErrorActor.start().recipient(),
None,
);
client.connect().await?;
while !client.is_connected().await.unwrap() {
log::info!("Waiting for client to be connected");
let delay_time = Instant::now() + Duration::new(0, 100);
delay_until(delay_time).await;
}
log::info!("Connected");
log::info!("Subscribe");
client
.subscribe(String::from("test"), mqtt::QualityOfService::Level0)
.await?;
async fn random_send(
client_id: i32,
client: MqttClient,
mut sender: Sender<(bool, Vec<u8>)>,
) {
let mut count: i32 = 0;
loop {
count += 1;
use rand::RngCore;
let mut data = [0u8; 32];
rand::thread_rng().fill_bytes(&mut data);
let payload = Vec::from(&data[..]);
log::info!("[{}:{}] Publish {:?}", client_id, count, payload);
delay_until(Instant::now() + Duration::from_millis(100)).await;
sender.try_send((true, payload.clone())).unwrap();
client
.publish(
String::from("test"),
mqtt::QualityOfService::Level0,
payload,
)
.await
.unwrap();
}
}
for i in 0..5 {
let client_clone = client.clone();
let sender_clone = sender.clone();
let future = random_send(i, client_clone, sender_clone);
Arbiter::spawn(future);
}
Ok(()) as Result<(), IoError>
}
.await;
result.unwrap();
};
Arbiter::spawn(future);
let recv_future = async {
let result = async {
recv.fold((), |_, (is_send, payload)| async move {
let mut p = PACKETS.lock().unwrap();
if is_send {
p.insert(payload);
} else if p.contains(&payload) {
p.remove(&payload);
}
log::info!("Pending recv items: {}", p.len());
()
})
.await;
Ok(()) as Result<(), IoError>
}
.await;
result.unwrap()
};
Arbiter::spawn(recv_future);
})
.unwrap();
}
#[test]
fn test_random_publish_level0_created_client() {
use std::io::Error as IoError;
use std::net::SocketAddr;
use std::str::FromStr;
use std::time::Duration;
use actix::{Actor, Arbiter, System};
use env_logger;
use futures::stream::StreamExt;
use tokio::io::split;
use tokio::net::TcpStream;
use tokio::time::{delay_until, Instant};
use crate::client::{MqttClient, MqttOptions};
env_logger::init();
async fn test_send(client_id: i32, sender: Sender<(bool, Vec<u8>)>) {
let socket_addr = SocketAddr::from_str("127.0.0.1:1883").unwrap();
async move {
let stream = TcpStream::connect(socket_addr).await?;
let (r, w) = split(stream);
let mut client = MqttClient::new(
r,
w,
format!("test_{}", client_id),
MqttOptions::default(),
MessageActor(sender.clone()).start().recipient(),
ErrorActor.start().recipient(),
None,
);
client.connect().await?;
while !client.is_connected().await.unwrap() {
log::info!("Waiting for client to be connected");
let delay_time = Instant::now() + Duration::new(0, 100);
delay_until(delay_time).await;
}
log::info!("Connected");
log::info!("Subscribe");
client
.subscribe(String::from("test"), mqtt::QualityOfService::Level0)
.await?;
async fn random_send(
client_id: i32,
client: MqttClient,
mut sender: Sender<(bool, Vec<u8>)>,
) {
let mut count: i32 = 0;
loop {
count += 1;
use rand::RngCore;
let mut data = [0u8; 32];
rand::thread_rng().fill_bytes(&mut data);
let payload = Vec::from(&data[..]);
log::info!("[{}:{}] Publish {:?}", client_id, count, payload);
delay_until(Instant::now() + Duration::from_millis(100)).await;
sender.try_send((true, payload.clone())).unwrap();
client
.publish(
String::from("test"),
mqtt::QualityOfService::Level0,
payload,
)
.await
.unwrap();
}
}
let future = random_send(client_id, client, sender);
Arbiter::spawn(future);
Ok(()) as Result<(), IoError>
}
.await
.unwrap();
}
System::run(|| {
let (sender, recv) = channel(100);
for i in 0..5 {
let future = test_send(i, sender.clone());
Arbiter::spawn(future);
}
let recv_future = async {
let result = async {
recv.fold((), |_, (is_send, payload)| async move {
let mut p = PACKETS.lock().unwrap();
if is_send {
p.insert(payload);
} else if p.contains(&payload) {
p.remove(&payload);
}
log::info!("Pending recv items: {}", p.len());
()
})
.await;
Ok(()) as Result<(), IoError>
}
.await;
result.unwrap()
};
Arbiter::spawn(recv_future);
})
.unwrap();
}
#[test]
fn test_random_publish_level2() {
use std::io::Error as IoError;
use std::net::SocketAddr;
use std::str::FromStr;
use std::time::Duration;
use actix::{Actor, Arbiter, System};
use env_logger;
use futures::stream::StreamExt;
use tokio::io::split;
use tokio::net::TcpStream;
use tokio::time::{delay_until, Instant};
use crate::client::{MqttClient, MqttOptions};
env_logger::init();
let (sender, recv) = channel(100);
let sender_clone = sender.clone();
System::run(|| {
let socket_addr = SocketAddr::from_str("127.0.0.1:1883").unwrap();
let future = async move {
let result = async move {
let stream = TcpStream::connect(socket_addr).await?;
let (r, w) = split(stream);
let mut client = MqttClient::new(
r,
w,
String::from("test"),
MqttOptions::default(),
MessageActor(sender).start().recipient(),
ErrorActor.start().recipient(),
None,
);
client.connect().await?;
while !client.is_connected().await.unwrap() {
log::info!("Waiting for client to be connected");
let delay_time = Instant::now() + Duration::new(0, 100);
delay_until(delay_time).await;
}
log::info!("Connected");
log::info!("Subscribe");
client
.subscribe(String::from("test"), mqtt::QualityOfService::Level2)
.await?;
futures::stream::repeat(())
.fold((client, sender_clone), |(client, mut sender), _| async {
use rand::RngCore;
let mut data = [0u8; 32];
rand::thread_rng().fill_bytes(&mut data);
let payload = Vec::from(&data[..]);
log::info!("Publish {:?}", payload);
delay_until(Instant::now() + Duration::from_millis(10)).await;
sender.try_send((true, payload.clone())).unwrap();
client
.publish(
String::from("test"),
mqtt::QualityOfService::Level2,
payload,
)
.await
.unwrap();
(client, sender)
})
.await;
Ok(()) as Result<(), IoError>
}
.await;
result.unwrap()
};
Arbiter::spawn(future);
let recv_future = async {
let result = async {
recv.fold((), |_, (is_send, payload)| async move {
let mut p = PACKETS.lock().unwrap();
if is_send {
p.insert(payload);
} else if !p.contains(&payload) {
panic!("Multiple receive for level 2: {:?}", payload);
} else {
p.remove(&payload);
}
log::info!("Pending recv items: {}", p.len());
()
})
.await;
Ok(()) as Result<(), IoError>
}
.await;
result.unwrap()
};
Arbiter::spawn(recv_future);
})
.unwrap();
}
}