use async_trait::async_trait;
use rumqttc::{AsyncClient, Publish};
use super::super::query;
use super::super::topics::c2d_topic;
use crate::persistence::{sqlite_channel, CloudToDeviceMessage};
use super::AsyncHandler;
pub(crate) struct CloudToDeviceHandler {
client: AsyncClient,
c2d_prefix: String,
producer: sqlite_channel::Sender<CloudToDeviceMessage>,
}
impl CloudToDeviceHandler {
pub(crate) fn new(
client: AsyncClient,
device_id: &str,
producer: sqlite_channel::Sender<CloudToDeviceMessage>,
) -> Self {
CloudToDeviceHandler {
client,
c2d_prefix: c2d_topic(device_id),
producer,
}
}
}
#[async_trait]
impl AsyncHandler for CloudToDeviceHandler {
fn prefix(&self) -> Vec<&str> {
vec![&self.c2d_prefix]
}
async fn handle(&mut self, publish: &Publish) {
let topic = &publish.topic;
log::debug!("Received C2D message on topic {topic}");
let properties = match query::parse(&publish.topic[self.c2d_prefix.len()..]) {
Ok(properties) => properties,
Err(e) => {
log::error!(
"Failed parsing cloud to device message topic `{}`: {:?}",
topic,
e
);
return;
}
};
let msg = CloudToDeviceMessage::new(
publish.payload.to_vec(),
properties
.into_iter()
.map(|(k, v)| (k, v.unwrap_or_default()))
.collect(),
);
if let Err(e) = self.producer.send(&msg).await {
log::error!(
"Cannot store a cloud-to-device message. It will not be processed: {}",
e
);
}
_ = self.client.ack(publish).await;
}
}