use paho_mqtt as mqtt;
use std::{
env,
process,
thread,
time::Duration,
sync::RwLock,
};
const DFLT_TOPICS: &[&str] = &[ "requests/subscription/add", "test", "hello" ];
const QOS: i32 = 1;
type UserTopics = RwLock<Vec<String>>;
fn on_connect_success(cli: &mqtt::AsyncClient, _msgid: u16) {
println!("Connection succeeded");
let data = cli.user_data().unwrap();
if let Some(lock) = data.downcast_ref::<UserTopics>() {
let topics = lock.read().unwrap();
println!("Subscribing to topics: {:?}", topics);
let qos = vec![QOS ; topics.len()];
cli.subscribe_many(&topics, &qos);
}
}
fn on_connect_failure(cli: &mqtt::AsyncClient, _msgid: u16, rc: i32) {
println!("Connection attempt failed with error code {}.\n", rc);
thread::sleep(Duration::from_millis(2500));
cli.reconnect_with_callbacks(on_connect_success, on_connect_failure);
}
fn main() {
env_logger::init();
let host = env::args().nth(1).unwrap_or_else(||
"tcp://localhost:1883".to_string()
);
let topics: Vec<String> = DFLT_TOPICS.iter()
.map(|s| s.to_string())
.collect();
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id("rust_dyn_subscribe")
.user_data(Box::new(RwLock::new(topics)))
.finalize();
let mut cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| {
println!("Error creating the client: {:?}", e);
process::exit(1);
});
cli.set_connected_callback(|_cli: &mqtt::AsyncClient| {
println!("Connected.");
});
cli.set_connection_lost_callback(|cli: &mqtt::AsyncClient| {
println!("Connection lost. Attempting reconnect.");
thread::sleep(Duration::from_millis(2500));
cli.reconnect_with_callbacks(on_connect_success, on_connect_failure);
});
cli.set_message_callback(|cli,msg| {
if let Some(msg) = msg {
let topic = msg.topic();
let payload_str = msg.payload_str();
if topic == "requests/subscription/add" {
let data = cli.user_data().unwrap();
if let Some(lock) = data.downcast_ref::<UserTopics>() {
let mut topics = lock.write().unwrap();
let new_topic = payload_str.to_owned().to_string();
println!("Adding topic: {}", new_topic);
cli.subscribe(&new_topic, QOS);
topics.push(new_topic);
}
else {
println!("Failed to add topic: {}", payload_str);
}
}
else {
println!("{} - {}", topic, payload_str);
}
}
});
let lwt = mqtt::Message::new("test", "Async subscriber lost connection", 1);
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.mqtt_version(mqtt::MQTT_VERSION_3_1_1)
.clean_session(true)
.will_message(lwt)
.finalize();
println!("Connecting to the MQTT server...");
cli.connect_with_callbacks(conn_opts, on_connect_success, on_connect_failure);
loop {
thread::sleep(Duration::from_millis(1000));
}
}