pipemqtt/
listen.rs

1use crate::client::{new_client, to_qos, ClientOptions, QoSType};
2use async_trait::async_trait;
3use pipebase::{
4    common::{ConfigInto, FromConfig, FromPath},
5    listen::Listen,
6};
7use rumqttc::{Event, EventLoop, Packet, QoS};
8use serde::Deserialize;
9use tokio::sync::mpsc::Sender;
10use tracing::info;
11
12#[derive(Deserialize)]
13pub struct MqttSubscriberConfig {
14    base: ClientOptions,
15    topic: String,
16    qos: QoSType,
17}
18
19impl FromPath for MqttSubscriberConfig {}
20
21impl ConfigInto<MqttSubscriber> for MqttSubscriberConfig {}
22
23pub struct MqttSubscriber {
24    topic: String,
25    qos: QoS,
26    client_opts: ClientOptions,
27    tx: Option<Sender<Vec<u8>>>,
28}
29
30#[async_trait]
31impl FromConfig<MqttSubscriberConfig> for MqttSubscriber {
32    async fn from_config(config: MqttSubscriberConfig) -> anyhow::Result<Self> {
33        let client_opts = config.base;
34        let topic = config.topic;
35        let qos = to_qos(config.qos);
36        Ok(MqttSubscriber {
37            topic,
38            qos,
39            client_opts,
40            tx: None,
41        })
42    }
43}
44
45#[async_trait]
46impl Listen<Vec<u8>, MqttSubscriberConfig> for MqttSubscriber {
47    async fn run(&mut self) -> anyhow::Result<()> {
48        let (client, event) = new_client(&self.client_opts);
49        client.subscribe(&self.topic, self.qos).await?;
50        let tx = self
51            .tx
52            .as_ref()
53            .expect("sender not inited for mqtt listener");
54        Self::start_loop(event, tx).await
55    }
56
57    fn set_sender(&mut self, sender: Sender<Vec<u8>>) {
58        self.tx = Some(sender)
59    }
60}
61
62impl MqttSubscriber {
63    async fn start_loop(mut event: EventLoop, tx: &Sender<Vec<u8>>) -> anyhow::Result<()> {
64        loop {
65            let event = event.poll().await?;
66            let packet = match event {
67                Event::Incoming(packet) => packet,
68                _ => continue,
69            };
70            let payload = match packet {
71                Packet::Publish(publish) => publish.payload,
72                _ => {
73                    info!("incoming packet {:?}", packet);
74                    continue;
75                }
76            };
77            tx.send(payload.to_vec()).await?;
78        }
79    }
80}