use std::{thread, time::Duration};
use env_logger::{Builder, Env};
use log::{debug, info, warn};
use rmp_serde::from_slice;
use serde::Deserialize;
use tether_agent::{PlugOptionsBuilder, TetherAgentOptionsBuilder};
#[derive(Deserialize, Debug)]
struct CustomMessage {
id: usize,
name: String,
}
fn main() {
println!("Rust Tether Agent subscribe example");
let mut builder = Builder::from_env(Env::default().default_filter_or("debug"));
builder.filter_module("tether_agent", log::LevelFilter::Warn);
builder.filter_module("rumqttc", log::LevelFilter::Warn);
builder.init();
debug!("Debugging is enabled; could be verbose");
let mut tether_agent = TetherAgentOptionsBuilder::new("RustDemo")
.id(Some("example"))
.build()
.expect("failed to init Tether agent");
let input_one = PlugOptionsBuilder::create_input("one")
.build(&mut tether_agent)
.expect("failed to create input");
info!("input one {} = {}", input_one.name(), input_one.topic());
let input_two = PlugOptionsBuilder::create_input("two")
.role(Some("specific"))
.build(&mut tether_agent)
.expect("failed to create input");
info!("input two {} = {}", input_two.name(), input_two.topic());
let input_empty = PlugOptionsBuilder::create_input("nothing")
.build(&mut tether_agent)
.expect("failed to create input");
let input_everything = PlugOptionsBuilder::create_input("everything")
.topic(Some("#"))
.build(&mut tether_agent)
.expect("failed to create input");
let input_specify_id = PlugOptionsBuilder::create_input("groupMessages")
.id(Some("someGroup"))
.name(None)
.build(&mut tether_agent)
.expect("failed to create input");
debug!(
"input everything {} = {}",
input_everything.name(),
input_everything.topic()
);
info!("Checking messages every 1s, 10x...");
loop {
debug!("Checking for messages...");
while let Some((topic, payload)) = tether_agent.check_messages() {
if input_one.matches(&topic) {
info!(
"******** INPUT ONE:\n Received a message for plug named \"{}\" on topic {:?} with length {} bytes",
input_one.name(),
topic,
payload.len()
);
}
if input_two.matches(&topic) {
info!(
"******** INPUT TWO:\n Received a message for plug named \"{}\" on topic {:?} with length {} bytes",
input_two.name(),
topic,
payload.len()
);
let decoded = from_slice::<CustomMessage>(&payload);
match decoded {
Ok(d) => {
info!("Yes, we decoded the MessagePack payload as: {:?}", d);
let CustomMessage { name, id } = d;
debug!("Name is {} and ID is {}", name, id);
}
Err(e) => {
warn!("Failed to decode the payload: {}", e)
}
};
}
if input_empty.matches(&topic) {
info!(
"******** EMPTY MESSAGE:\n Received a message for plug named \"{}\" on topic {:?} with length {} bytes",
input_empty.name(),
topic,
payload.len()
);
}
if input_everything.matches(&topic) {
info!(
"******** EVERYTHING MATCHES HERE:\n Received a message for plug named \"{}\" on topic {:?} with length {} bytes",
input_everything.name(),
topic,
payload.len()
);
}
if input_specify_id.matches(&topic) {
info!("******** ID MATCH:\n Should match any role and plug name, but only messages with ID \"groupMessages\"");
info!(
"\n Received a message from plug named \"{}\" on topic {:?} with length {} bytes",
input_specify_id.name(),
topic,
payload.len()
);
}
}
thread::sleep(Duration::from_millis(1000))
}
}