use clap::Args;
use log::{debug, error, info, warn};
use tether_agent::{mqtt::Message, PlugOptionsBuilder, TetherAgent, TetherOrCustomTopic};
#[derive(Args)]
pub struct ReceiveOptions {
#[arg(long = "plug.role")]
pub subscribe_role: Option<String>,
#[arg(long = "plug.id")]
pub subscribe_id: Option<String>,
#[arg(long = "plug.name")]
pub subscribe_plug_name: Option<String>,
#[arg(long = "topic")]
pub subscribe_topic: Option<String>,
}
impl Default for ReceiveOptions {
fn default() -> Self {
ReceiveOptions {
subscribe_topic: None,
subscribe_role: None,
subscribe_id: None,
subscribe_plug_name: None,
}
}
}
pub fn receive(
options: &ReceiveOptions,
tether_agent: &TetherAgent,
on_message: fn(plug_name: String, message: Message, decoded: Option<String>),
) {
info!("Tether Receive Utility");
let input_def = build_receiver_plug(options);
let input = input_def
.build(tether_agent)
.expect("failed to create input plug");
info!("Subscribed to topic \"{}\" ...", input.topic());
loop {
let mut did_work = false;
while let Some((topic, message)) = tether_agent.check_messages() {
did_work = true;
debug!("Received message on topic \"{}\"", message.topic());
let plug_name = match topic {
TetherOrCustomTopic::Custom(_) => String::from("unknown"),
TetherOrCustomTopic::Tether(tpt) => String::from(tpt.plug_name()),
};
let bytes = message.payload();
if bytes.is_empty() {
debug!("Empty message payload");
on_message(plug_name, message, None);
} else if let Ok(value) = rmp_serde::from_slice::<rmpv::Value>(bytes) {
let json = serde_json::to_string(&value).expect("failed to stringify JSON");
debug!("Decoded MessagePack payload: {}", json);
on_message(plug_name, message, Some(json));
} else {
debug!("Failed to decode MessagePack payload");
if let Ok(s) = String::from_utf8(bytes.to_vec()) {
warn!("String representation of payload: \"{}\"", s);
} else {
error!("Could not decode payload bytes as string, either");
}
on_message(plug_name, message, None);
}
}
if !did_work {
std::thread::sleep(std::time::Duration::from_micros(100)); }
}
}
fn build_receiver_plug(options: &ReceiveOptions) -> PlugOptionsBuilder {
if options.subscribe_id.is_some()
|| options.subscribe_role.is_some()
|| options.subscribe_plug_name.is_some()
{
debug!(
"TPT Overrides apply: {:?}, {:?}, {:?}",
&options.subscribe_id, &options.subscribe_role, &options.subscribe_plug_name
);
PlugOptionsBuilder::create_input(match &options.subscribe_plug_name {
Some(provided_name) => {
if provided_name.as_str() == "+" {
"any"
} else {
&provided_name
}
}
None => "any",
})
.role(options.subscribe_role.as_deref())
.id(options.subscribe_id.as_deref())
.name(match &options.subscribe_plug_name {
Some(provided_name_part) => {
if provided_name_part.as_str() == "+" {
Some("+")
} else {
None
}
}
None => {
if options.subscribe_id.is_some() || options.subscribe_role.is_some() {
Some("+")
} else {
None
}
}
})
} else {
debug!(
"Using custom override topic \"{:?}\"",
&options.subscribe_topic
);
PlugOptionsBuilder::create_input("custom")
.topic(Some(options.subscribe_topic.as_deref().unwrap_or("#")))
}
}
#[cfg(test)]
mod tests {
use tether_agent::TetherAgentOptionsBuilder;
use crate::tether_receive::build_receiver_plug;
use super::ReceiveOptions;
#[test]
fn default_options() {
let tether_agent = TetherAgentOptionsBuilder::new("tester")
.build()
.expect("sorry, these tests require working localhost Broker");
let options = ReceiveOptions::default();
let receive_plug = build_receiver_plug(&options)
.build(&tether_agent)
.expect("build failed");
assert_eq!(receive_plug.name(), "custom");
assert_eq!(receive_plug.topic(), "#");
}
#[test]
fn only_topic_custom() {
let tether_agent = TetherAgentOptionsBuilder::new("tester")
.build()
.expect("sorry, these tests require working localhost Broker");
let options = ReceiveOptions {
subscribe_role: None,
subscribe_id: None,
subscribe_plug_name: None,
subscribe_topic: Some("some/special/plug".into()),
};
let receive_plug = build_receiver_plug(&options)
.build(&tether_agent)
.expect("build failed");
assert_eq!(receive_plug.name(), "custom");
assert_eq!(receive_plug.topic(), "some/special/plug");
}
#[test]
fn only_plug_name() {
let tether_agent = TetherAgentOptionsBuilder::new("tester")
.build()
.expect("sorry, these tests require working localhost Broker");
let options = ReceiveOptions {
subscribe_role: None,
subscribe_id: None,
subscribe_plug_name: Some("something".into()),
subscribe_topic: None,
};
let receive_plug = build_receiver_plug(&options)
.build(&tether_agent)
.expect("build failed");
assert_eq!(receive_plug.name(), "something");
assert_eq!(receive_plug.topic(), "+/+/something");
}
#[test]
fn only_role() {
let tether_agent = TetherAgentOptionsBuilder::new("tester")
.build()
.expect("sorry, these tests require working localhost Broker");
let options = ReceiveOptions {
subscribe_role: Some("something".into()),
subscribe_id: None,
subscribe_plug_name: None,
subscribe_topic: None,
};
let receive_plug = build_receiver_plug(&options)
.build(&tether_agent)
.expect("build failed");
assert_eq!(receive_plug.name(), "any");
assert_eq!(receive_plug.topic(), "something/+/+");
}
#[test]
fn only_id() {
let tether_agent = TetherAgentOptionsBuilder::new("tester")
.build()
.expect("sorry, these tests require working localhost Broker");
let options = ReceiveOptions {
subscribe_role: None,
subscribe_id: Some("something".into()),
subscribe_plug_name: None,
subscribe_topic: None,
};
let receive_plug = build_receiver_plug(&options)
.build(&tether_agent)
.expect("build failed");
assert_eq!(receive_plug.name(), "any");
assert_eq!(receive_plug.topic(), "+/something/+");
}
#[test]
fn role_and_id() {
let tether_agent = TetherAgentOptionsBuilder::new("tester")
.build()
.expect("sorry, these tests require working localhost Broker");
let options = ReceiveOptions {
subscribe_role: Some("x".into()),
subscribe_id: Some("y".into()),
subscribe_plug_name: None,
subscribe_topic: None,
};
let receive_plug = build_receiver_plug(&options)
.build(&tether_agent)
.expect("build failed");
assert_eq!(receive_plug.name(), "any");
assert_eq!(receive_plug.topic(), "x/y/+");
}
#[test]
fn role_and_plug_name() {
let tether_agent = TetherAgentOptionsBuilder::new("tester")
.build()
.expect("sorry, these tests require working localhost Broker");
let options = ReceiveOptions {
subscribe_role: Some("x".into()),
subscribe_id: None,
subscribe_plug_name: Some("z".into()),
subscribe_topic: None,
};
let receive_plug = build_receiver_plug(&options)
.build(&tether_agent)
.expect("build failed");
assert_eq!(receive_plug.name(), "z");
assert_eq!(receive_plug.topic(), "x/+/z");
}
#[test]
fn spec_all_three() {
let tether_agent = TetherAgentOptionsBuilder::new("tester")
.build()
.expect("sorry, these tests require working localhost Broker");
let options = ReceiveOptions {
subscribe_role: Some("x".into()),
subscribe_id: Some("y".into()),
subscribe_plug_name: Some("z".into()),
subscribe_topic: None,
};
let receive_plug = build_receiver_plug(&options)
.build(&tether_agent)
.expect("build failed");
assert_eq!(receive_plug.name(), "z");
assert_eq!(receive_plug.topic(), "x/y/z");
}
#[test]
fn redundant_but_valid() {
let tether_agent = TetherAgentOptionsBuilder::new("tester")
.build()
.expect("sorry, these tests require working localhost Broker");
let options = ReceiveOptions {
subscribe_role: None,
subscribe_id: None,
subscribe_plug_name: Some("+".into()),
subscribe_topic: None,
};
let receive_plug = build_receiver_plug(&options)
.build(&tether_agent)
.expect("build failed");
assert_eq!(receive_plug.name(), "any");
assert_eq!(receive_plug.topic(), "+/+/+");
}
}