use core::{fmt::Display, ops::Deref};
use embassy_futures::select::{select, Either};
use embassy_sync::pubsub::WaitResult;
use embassy_time::Timer;
use heapless::{String, Vec};
use mqttrs::{Packet, QoS, Subscribe, SubscribeReturnCodes, SubscribeTopic, Unsubscribe};
#[cfg(feature = "serde")]
use crate::publish::PublishJson;
use crate::{
device_id, device_type,
io::{assign_pid, send_packet, subscribe},
publish::{PublishBytes, PublishDisplay},
ControlMessage, Error, TopicString, CONFIRMATION_TIMEOUT,
};
#[derive(Clone, Copy)]
pub enum Topic<T> {
DeviceType(T),
Device(T),
General(T),
}
impl<A, B> PartialEq<Topic<A>> for Topic<B>
where
B: PartialEq<A>,
{
fn eq(&self, other: &Topic<A>) -> bool {
match (self, other) {
(Topic::DeviceType(l0), Topic::DeviceType(r0)) => l0 == r0,
(Topic::Device(l0), Topic::Device(r0)) => l0 == r0,
(Topic::General(l0), Topic::General(r0)) => l0 == r0,
_ => false,
}
}
}
impl<T> Topic<T> {
pub fn with_bytes<B: AsRef<[u8]>>(&self, data: B) -> PublishBytes<'_, T, B> {
PublishBytes {
topic: self,
data,
qos: QoS::AtMostOnce,
retain: false,
}
}
pub fn with_display<D: Display>(&self, data: D) -> PublishDisplay<'_, T, D> {
PublishDisplay {
topic: self,
data,
qos: QoS::AtMostOnce,
retain: false,
}
}
#[cfg(feature = "serde")]
pub fn with_json<D: serde::Serialize>(&self, data: D) -> PublishJson<'_, T, D> {
PublishJson {
topic: self,
data,
qos: QoS::AtMostOnce,
retain: false,
}
}
}
impl Topic<TopicString> {
pub(crate) fn from_str(mut st: &str) -> Result<Self, ()> {
let mut strip_prefix = |pr: &str| -> bool {
if st.starts_with(pr) && &st[pr.len()..pr.len() + 1] == "/" {
st = &st[pr.len() + 1..];
true
} else {
false
}
};
if strip_prefix(device_type()) {
if strip_prefix(device_id()) {
let mut topic = TopicString::new();
topic.push_str(st)?;
Ok(Topic::Device(topic))
} else {
let mut topic = TopicString::new();
topic.push_str(st)?;
Ok(Topic::DeviceType(topic))
}
} else {
let mut topic = TopicString::new();
topic.push_str(st)?;
Ok(Topic::General(topic))
}
}
}
impl<T: Deref<Target = str>> Topic<T> {
pub(crate) fn to_string<const N: usize>(&self, result: &mut String<N>) -> Result<(), Error> {
match self {
Topic::Device(st) => {
result
.push_str(device_type())
.map_err(|_| Error::TooLarge)?;
result.push_str("/").map_err(|_| Error::TooLarge)?;
result.push_str(device_id()).map_err(|_| Error::TooLarge)?;
result.push_str("/").map_err(|_| Error::TooLarge)?;
result.push_str(st.as_ref()).map_err(|_| Error::TooLarge)?;
}
Topic::DeviceType(st) => {
result
.push_str(device_type())
.map_err(|_| Error::TooLarge)?;
result.push_str("/").map_err(|_| Error::TooLarge)?;
result.push_str(st.as_ref()).map_err(|_| Error::TooLarge)?;
}
Topic::General(st) => {
result.push_str(st.as_ref()).map_err(|_| Error::TooLarge)?;
}
}
Ok(())
}
pub fn as_ref(&self) -> Topic<&str> {
match self {
Topic::DeviceType(st) => Topic::DeviceType(st.as_ref()),
Topic::Device(st) => Topic::Device(st.as_ref()),
Topic::General(st) => Topic::General(st.as_ref()),
}
}
pub async fn subscribe(&self, wait_for_ack: bool) -> Result<(), Error> {
let mut subscriber = subscribe().await;
let mut topic_path = TopicString::new();
if self.to_string(&mut topic_path).is_err() {
return Err(Error::TooLarge);
}
let pid = assign_pid().await;
let subscribe_topic = SubscribeTopic {
topic_path,
qos: QoS::AtLeastOnce,
};
let topics = match Vec::<SubscribeTopic, 5>::from_slice(&[subscribe_topic]) {
Ok(t) => t,
Err(_) => return Err(Error::TooLarge),
};
let packet = Packet::Subscribe(Subscribe { pid, topics });
send_packet(packet).await?;
if wait_for_ack {
match select(
async {
loop {
match subscriber.next_message().await {
WaitResult::Lagged(_) => {
}
WaitResult::Message(ControlMessage::Subscribed(
subscribed_pid,
return_code,
)) if subscribed_pid == pid => {
if matches!(return_code, SubscribeReturnCodes::Success(_)) {
return Ok(());
} else {
return Err(Error::IOError);
}
}
_ => {}
}
}
},
Timer::after_millis(CONFIRMATION_TIMEOUT),
)
.await
{
Either::First(r) => r,
Either::Second(_) => Err(Error::TimedOut),
}
} else {
Ok(())
}
}
pub async fn unsubscribe(&self, wait_for_ack: bool) -> Result<(), Error> {
let mut subscriber = subscribe().await;
let mut topic_path = TopicString::new();
if self.to_string(&mut topic_path).is_err() {
return Err(Error::TooLarge);
}
let pid = assign_pid().await;
let topics = match Vec::<TopicString, 5>::from_slice(&[topic_path]) {
Ok(t) => t,
Err(_) => return Err(Error::TooLarge),
};
let packet = Packet::Unsubscribe(Unsubscribe { pid, topics });
send_packet(packet).await?;
if wait_for_ack {
match select(
async {
loop {
match subscriber.next_message().await {
WaitResult::Lagged(_) => {
}
WaitResult::Message(ControlMessage::Unsubscribed(subscribed_pid))
if subscribed_pid == pid =>
{
return Ok(());
}
_ => {}
}
}
},
Timer::after_millis(CONFIRMATION_TIMEOUT),
)
.await
{
Either::First(r) => r,
Either::Second(_) => Err(Error::TimedOut),
}
} else {
Ok(())
}
}
}