use tokio::{task, time};
use rumqttc::{AsyncClient, Event, EventLoop, Incoming, MqttOptions, QoS};
use std::error::Error;
use std::time::Duration;
fn create_conn() -> (AsyncClient, EventLoop) {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
mqttoptions
.set_keep_alive(Duration::from_secs(5))
.set_manual_acks(true)
.set_clean_session(false);
AsyncClient::new(mqttoptions, 10)
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
let (client, mut eventloop) = create_conn();
client
.subscribe("hello/world", QoS::AtLeastOnce)
.await
.unwrap();
task::spawn(async move {
requests(client.clone()).await;
client.disconnect().await.unwrap();
});
loop {
let event = eventloop.poll().await;
match &event {
Ok(notif) => {
println!("Event = {notif:?}");
}
Err(error) => {
println!("Error = {error:?}");
break;
}
}
}
let (client, mut eventloop) = create_conn();
loop {
let event = eventloop.poll().await;
match &event {
Ok(notif) => {
println!("Event = {notif:?}");
}
Err(error) => {
println!("Error = {error:?}");
return Ok(());
}
}
if let Ok(Event::Incoming(Incoming::Publish(publish))) = event {
let c = client.clone();
tokio::spawn(async move {
c.ack(&publish).await.unwrap();
});
}
}
}
async fn requests(client: AsyncClient) {
for i in 1..=10 {
client
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; i])
.await
.unwrap();
time::sleep(Duration::from_secs(1)).await;
}
}