use rumqttc::{AsyncClient, Event, EventLoop, MqttOptions, Packet, QoS};
use crate::{Message, Topic};
use super::Awtrix3Error;
#[derive(Debug, Default)]
pub struct Awtrix3MqttClientOptions {
pub client_id: String,
pub host: String,
pub port: u16,
pub username: Option<String>,
pub password: Option<String>,
}
impl From<Awtrix3MqttClientOptions> for MqttOptions {
fn from(val: Awtrix3MqttClientOptions) -> Self {
let mut options = MqttOptions::new(val.client_id, val.host, val.port);
if let (Some(username), Some(password)) = (val.username, val.password) {
options.set_credentials(username, password);
}
options
}
}
#[derive(Clone, Debug)]
pub struct Awtrix3MqttClient {
pub client: AsyncClient,
pub prefix: String,
}
pub struct Awtrix3MqttEventLoop {
pub event_loop: EventLoop,
pub prefix: String,
}
impl Awtrix3MqttClient {
pub fn new<O>(
options: O,
cap: usize,
prefix: &str,
) -> Result<(Self, Awtrix3MqttEventLoop), Awtrix3Error>
where
O: Into<MqttOptions>,
{
let client = AsyncClient::new(options.into(), cap);
Ok((
Self {
client: client.0,
prefix: prefix.to_string(),
},
Awtrix3MqttEventLoop {
event_loop: client.1,
prefix: prefix.to_string(),
},
))
}
pub async fn publish<D>(&self, topic: &str, payload: D) -> Result<(), Awtrix3Error>
where
D: serde::Serialize,
{
if !topic.starts_with("/") {
return Err(Awtrix3Error::InvalidPath(topic.to_string()));
}
let topic = format!("{}{}", self.prefix, topic);
let payload = serde_json::to_string(&payload).unwrap();
self.client
.publish(&topic, rumqttc::QoS::AtLeastOnce, false, payload)
.await?;
Ok(())
}
pub async fn subscribe(&self, topic: &str) -> Result<(), Awtrix3Error> {
if !topic.starts_with("/") {
return Err(Awtrix3Error::InvalidPath(topic.to_string()));
}
let topic = format!("{}{}", self.prefix, topic);
self.client.subscribe(topic, QoS::AtMostOnce).await?;
Ok(())
}
}
impl Awtrix3MqttEventLoop {
pub async fn poll(&mut self) -> Result<Event, Awtrix3Error> {
Ok(self.event_loop.poll().await?)
}
pub fn decode(&self, event: Event) -> Result<Option<(Topic, Message)>, Awtrix3Error> {
match event {
Event::Incoming(Packet::Publish(incoming)) => {
let topic = Topic::decode(&incoming.topic, &self.prefix)?;
let payload = topic.payload(&incoming.payload)?;
Ok(Some((topic, payload)))
}
_ => Ok(None),
}
}
}