1use crate::client::{new_client, to_qos, ClientOptions, QoSType};
2use async_trait::async_trait;
3use pipebase::{
4 common::{ConfigInto, FromConfig, FromPath},
5 listen::Listen,
6};
7use rumqttc::{Event, EventLoop, Packet, QoS};
8use serde::Deserialize;
9use tokio::sync::mpsc::Sender;
10use tracing::info;
11
12#[derive(Deserialize)]
13pub struct MqttSubscriberConfig {
14 base: ClientOptions,
15 topic: String,
16 qos: QoSType,
17}
18
19impl FromPath for MqttSubscriberConfig {}
20
21impl ConfigInto<MqttSubscriber> for MqttSubscriberConfig {}
22
23pub struct MqttSubscriber {
24 topic: String,
25 qos: QoS,
26 client_opts: ClientOptions,
27 tx: Option<Sender<Vec<u8>>>,
28}
29
30#[async_trait]
31impl FromConfig<MqttSubscriberConfig> for MqttSubscriber {
32 async fn from_config(config: MqttSubscriberConfig) -> anyhow::Result<Self> {
33 let client_opts = config.base;
34 let topic = config.topic;
35 let qos = to_qos(config.qos);
36 Ok(MqttSubscriber {
37 topic,
38 qos,
39 client_opts,
40 tx: None,
41 })
42 }
43}
44
45#[async_trait]
46impl Listen<Vec<u8>, MqttSubscriberConfig> for MqttSubscriber {
47 async fn run(&mut self) -> anyhow::Result<()> {
48 let (client, event) = new_client(&self.client_opts);
49 client.subscribe(&self.topic, self.qos).await?;
50 let tx = self
51 .tx
52 .as_ref()
53 .expect("sender not inited for mqtt listener");
54 Self::start_loop(event, tx).await
55 }
56
57 fn set_sender(&mut self, sender: Sender<Vec<u8>>) {
58 self.tx = Some(sender)
59 }
60}
61
62impl MqttSubscriber {
63 async fn start_loop(mut event: EventLoop, tx: &Sender<Vec<u8>>) -> anyhow::Result<()> {
64 loop {
65 let event = event.poll().await?;
66 let packet = match event {
67 Event::Incoming(packet) => packet,
68 _ => continue,
69 };
70 let payload = match packet {
71 Packet::Publish(publish) => publish.payload,
72 _ => {
73 info!("incoming packet {:?}", packet);
74 continue;
75 }
76 };
77 tx.send(payload.to_vec()).await?;
78 }
79 }
80}