srad_client_rumqtt/
client.rs

1use async_trait::async_trait;
2use log::{error, trace};
3use rumqttc::{
4    v5::{
5        mqttbytes::{
6            v5::{ConnectProperties, Filter, Packet},
7            QoS,
8        },
9        AsyncClient as RuClient, EventLoop as RuEventLoop, MqttOptions,
10    },
11    Outgoing,
12};
13use srad_types::{
14    payload::{Message, Payload},
15    topic::{DeviceTopic, StateTopic, TopicFilter},
16};
17
18use srad_client::{topic_and_payload_to_event, Event, LastWill, StatePayload};
19
20fn qos_to_mqtt_qos(qos: srad_types::topic::QoS) -> QoS {
21    match qos {
22        srad_types::topic::QoS::AtMostOnce => QoS::AtMostOnce,
23        srad_types::topic::QoS::AtLeastOnce => QoS::AtLeastOnce,
24    }
25}
26
27fn topic_filter_to_mqtt_filter(topic_filter: TopicFilter) -> Filter {
28    Filter::new(topic_filter.topic, qos_to_mqtt_qos(topic_filter.qos))
29}
30
31/// A [srad_client::Client] implementation using [rumqttc]
32#[derive(Clone)]
33pub struct Client {
34    client: RuClient,
35}
36
37impl Client {
38    async fn publish(
39        &self,
40        topic: String,
41        qos: QoS,
42        retain: bool,
43        payload: Vec<u8>,
44    ) -> Result<(), ()> {
45        match self.client.publish(topic, qos, retain, payload).await {
46            Ok(_) => Ok(()),
47            Err(_) => Err(()),
48        }
49    }
50
51    fn try_publish(
52        &self,
53        topic: String,
54        qos: QoS,
55        retain: bool,
56        payload: Vec<u8>,
57    ) -> Result<(), ()> {
58        match self.client.try_publish(topic, qos, retain, payload) {
59            Ok(_) => Ok(()),
60            Err(_) => Err(()),
61        }
62    }
63}
64
65#[async_trait]
66impl srad_client::Client for Client {
67    async fn disconnect(&self) -> Result<(), ()> {
68        match self.client.disconnect().await {
69            Ok(_) => Ok(()),
70            Err(_) => Err(()),
71        }
72    }
73
74    async fn publish_state_message(
75        &self,
76        topic: StateTopic,
77        payload: StatePayload,
78    ) -> Result<(), ()> {
79        let (qos, retain) = payload.get_publish_quality_retain();
80        self.publish(
81            topic.topic,
82            qos_to_mqtt_qos(qos),
83            retain,
84            Vec::<u8>::from(payload),
85        )
86        .await
87    }
88
89    async fn try_publish_state_message(
90        &self,
91        topic: StateTopic,
92        payload: StatePayload,
93    ) -> Result<(), ()> {
94        let (qos, retain) = payload.get_publish_quality_retain();
95        self.try_publish(
96            topic.topic,
97            qos_to_mqtt_qos(qos),
98            retain,
99            Vec::<u8>::from(payload),
100        )
101    }
102
103    async fn publish_node_message(
104        &self,
105        topic: srad_types::topic::NodeTopic,
106        payload: srad_types::payload::Payload,
107    ) -> Result<(), ()> {
108        let (qos, retain) = topic.get_publish_quality_retain();
109        self.publish(
110            topic.topic,
111            qos_to_mqtt_qos(qos),
112            retain,
113            payload.encode_to_vec(),
114        )
115        .await
116    }
117
118    async fn try_publish_node_message(
119        &self,
120        topic: srad_types::topic::NodeTopic,
121        payload: srad_types::payload::Payload,
122    ) -> Result<(), ()> {
123        let (qos, retain) = topic.get_publish_quality_retain();
124        self.try_publish(
125            topic.topic,
126            qos_to_mqtt_qos(qos),
127            retain,
128            payload.encode_to_vec(),
129        )
130    }
131
132    async fn publish_device_message(&self, topic: DeviceTopic, payload: Payload) -> Result<(), ()> {
133        let (qos, retain) = topic.get_publish_quality_retain();
134        self.publish(
135            topic.topic,
136            qos_to_mqtt_qos(qos),
137            retain,
138            payload.encode_to_vec(),
139        )
140        .await
141    }
142
143    async fn try_publish_device_message(
144        &self,
145        topic: DeviceTopic,
146        payload: Payload,
147    ) -> Result<(), ()> {
148        let (qos, retain) = topic.get_publish_quality_retain();
149        self.try_publish(
150            topic.topic,
151            qos_to_mqtt_qos(qos),
152            retain,
153            payload.encode_to_vec(),
154        )
155    }
156
157    async fn subscribe_many(&self, topics: Vec<TopicFilter>) -> Result<(), ()> {
158        let filters: Vec<Filter> = topics
159            .into_iter()
160            .map(topic_filter_to_mqtt_filter)
161            .collect();
162        match self.client.subscribe_many(filters).await {
163            Ok(_) => Ok(()),
164            Err(_) => Err(()),
165        }
166    }
167}
168
169enum ConnectionState {
170    Disconnected,
171    ManualDisconnected,
172    Connected,
173}
174
175/// An [srad_client::EventLoop] implementation using [rumqttc]
176pub struct EventLoop {
177    state: ConnectionState,
178    el: RuEventLoop,
179}
180
181impl EventLoop {
182    /// Create a new `Eventloop`.
183    ///
184    /// `options` are the mqtt options to create the rumqtt client with. Some options will be overwritten to ensure Sparkplug compliance.
185    ///
186    /// `cap` specifies the capacity of the bounded async channel for the client handle.
187    pub fn new(options: MqttOptions, cap: usize) -> (Self, Client) {
188        let mut options = options;
189        let mut connection_properties = match options.connect_properties() {
190            Some(p) => p,
191            None => ConnectProperties::new(),
192        };
193        /* Sparkplug requires session expiry interval to be 0 */
194        connection_properties.session_expiry_interval = Some(0);
195
196        options
197            .set_clean_start(true)
198            .set_connect_properties(connection_properties);
199
200        let (client, eventloop) = RuClient::new(options, cap);
201        (
202            EventLoop {
203                el: eventloop,
204                state: ConnectionState::Disconnected,
205            },
206            Client { client },
207        )
208    }
209
210    async fn poll_rumqtt(&mut self) -> Option<Event> {
211        let event = self.el.poll().await;
212        match event {
213            Ok(event) => {
214                trace!("{event:?}");
215                match event {
216                    rumqttc::v5::Event::Incoming(Packet::ConnAck(_)) => {
217                        self.state = ConnectionState::Connected;
218                        Some(Event::Online)
219                    }
220                    rumqttc::v5::Event::Incoming(Packet::Disconnect(_)) => {
221                        self.state = ConnectionState::Disconnected;
222                        Some(Event::Offline)
223                    }
224                    rumqttc::v5::Event::Incoming(Packet::Publish(publish)) => {
225                        Some(topic_and_payload_to_event(
226                            publish.topic.to_vec(),
227                            publish.payload.to_vec(),
228                        ))
229                    }
230                    rumqttc::v5::Event::Outgoing(Outgoing::Disconnect) => {
231                        self.state = ConnectionState::ManualDisconnected;
232                        Some(Event::Offline)
233                    }
234                    _ => None,
235                }
236            }
237            Err(e) => match self.state {
238                ConnectionState::Connected => {
239                    error!("Client error: {e}");
240                    self.state = ConnectionState::Disconnected;
241                    Some(Event::Offline)
242                }
243                ConnectionState::Disconnected => {
244                    error!("Client error on reconnect attempt: {e}");
245                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
246                    None
247                }
248                ConnectionState::ManualDisconnected => None,
249            },
250        }
251    }
252}
253
254#[async_trait]
255impl srad_client::EventLoop for EventLoop {
256    async fn poll(&mut self) -> Event {
257        loop {
258            if let Some(event) = self.poll_rumqtt().await {
259                return event;
260            }
261        }
262    }
263
264    fn set_last_will(&mut self, will: LastWill) {
265        let qos = qos_to_mqtt_qos(will.qos);
266        let mqtt_will = rumqttc::v5::mqttbytes::v5::LastWill::new(
267            will.topic,
268            will.payload,
269            qos,
270            will.retain,
271            None,
272        );
273        self.el.options.set_last_will(mqtt_will);
274    }
275}