1use async_trait::async_trait;
2use log::{error, trace};
3use rumqttc::{
4 v5::{
5 mqttbytes::{
6 v5::{ConnectProperties, Filter, Packet},
7 QoS,
8 },
9 AsyncClient as RuClient, EventLoop as RuEventLoop, MqttOptions,
10 },
11 Outgoing,
12};
13use srad_types::{
14 payload::{Message, Payload},
15 topic::{DeviceTopic, StateTopic, TopicFilter},
16};
17
18use srad_client::{topic_and_payload_to_event, Event, LastWill, StatePayload};
19
20fn qos_to_mqtt_qos(qos: srad_types::topic::QoS) -> QoS {
21 match qos {
22 srad_types::topic::QoS::AtMostOnce => QoS::AtMostOnce,
23 srad_types::topic::QoS::AtLeastOnce => QoS::AtLeastOnce,
24 }
25}
26
27fn topic_filter_to_mqtt_filter(topic_filter: TopicFilter) -> Filter {
28 Filter::new(topic_filter.topic, qos_to_mqtt_qos(topic_filter.qos))
29}
30
31#[derive(Clone)]
33pub struct Client {
34 client: RuClient,
35}
36
37impl Client {
38 async fn publish(
39 &self,
40 topic: String,
41 qos: QoS,
42 retain: bool,
43 payload: Vec<u8>,
44 ) -> Result<(), ()> {
45 match self.client.publish(topic, qos, retain, payload).await {
46 Ok(_) => Ok(()),
47 Err(_) => Err(()),
48 }
49 }
50
51 fn try_publish(
52 &self,
53 topic: String,
54 qos: QoS,
55 retain: bool,
56 payload: Vec<u8>,
57 ) -> Result<(), ()> {
58 match self.client.try_publish(topic, qos, retain, payload) {
59 Ok(_) => Ok(()),
60 Err(_) => Err(()),
61 }
62 }
63}
64
65#[async_trait]
66impl srad_client::Client for Client {
67 async fn disconnect(&self) -> Result<(), ()> {
68 match self.client.disconnect().await {
69 Ok(_) => Ok(()),
70 Err(_) => Err(()),
71 }
72 }
73
74 async fn publish_state_message(
75 &self,
76 topic: StateTopic,
77 payload: StatePayload,
78 ) -> Result<(), ()> {
79 let (qos, retain) = payload.get_publish_quality_retain();
80 self.publish(
81 topic.topic,
82 qos_to_mqtt_qos(qos),
83 retain,
84 Vec::<u8>::from(payload),
85 )
86 .await
87 }
88
89 async fn try_publish_state_message(
90 &self,
91 topic: StateTopic,
92 payload: StatePayload,
93 ) -> Result<(), ()> {
94 let (qos, retain) = payload.get_publish_quality_retain();
95 self.try_publish(
96 topic.topic,
97 qos_to_mqtt_qos(qos),
98 retain,
99 Vec::<u8>::from(payload),
100 )
101 }
102
103 async fn publish_node_message(
104 &self,
105 topic: srad_types::topic::NodeTopic,
106 payload: srad_types::payload::Payload,
107 ) -> Result<(), ()> {
108 let (qos, retain) = topic.get_publish_quality_retain();
109 self.publish(
110 topic.topic,
111 qos_to_mqtt_qos(qos),
112 retain,
113 payload.encode_to_vec(),
114 )
115 .await
116 }
117
118 async fn try_publish_node_message(
119 &self,
120 topic: srad_types::topic::NodeTopic,
121 payload: srad_types::payload::Payload,
122 ) -> Result<(), ()> {
123 let (qos, retain) = topic.get_publish_quality_retain();
124 self.try_publish(
125 topic.topic,
126 qos_to_mqtt_qos(qos),
127 retain,
128 payload.encode_to_vec(),
129 )
130 }
131
132 async fn publish_device_message(&self, topic: DeviceTopic, payload: Payload) -> Result<(), ()> {
133 let (qos, retain) = topic.get_publish_quality_retain();
134 self.publish(
135 topic.topic,
136 qos_to_mqtt_qos(qos),
137 retain,
138 payload.encode_to_vec(),
139 )
140 .await
141 }
142
143 async fn try_publish_device_message(
144 &self,
145 topic: DeviceTopic,
146 payload: Payload,
147 ) -> Result<(), ()> {
148 let (qos, retain) = topic.get_publish_quality_retain();
149 self.try_publish(
150 topic.topic,
151 qos_to_mqtt_qos(qos),
152 retain,
153 payload.encode_to_vec(),
154 )
155 }
156
157 async fn subscribe_many(&self, topics: Vec<TopicFilter>) -> Result<(), ()> {
158 let filters: Vec<Filter> = topics
159 .into_iter()
160 .map(topic_filter_to_mqtt_filter)
161 .collect();
162 match self.client.subscribe_many(filters).await {
163 Ok(_) => Ok(()),
164 Err(_) => Err(()),
165 }
166 }
167}
168
169enum ConnectionState {
170 Disconnected,
171 ManualDisconnected,
172 Connected,
173}
174
175pub struct EventLoop {
177 state: ConnectionState,
178 el: RuEventLoop,
179}
180
181impl EventLoop {
182 pub fn new(options: MqttOptions, cap: usize) -> (Self, Client) {
188 let mut options = options;
189 let mut connection_properties = match options.connect_properties() {
190 Some(p) => p,
191 None => ConnectProperties::new(),
192 };
193 connection_properties.session_expiry_interval = Some(0);
195
196 options
197 .set_clean_start(true)
198 .set_connect_properties(connection_properties);
199
200 let (client, eventloop) = RuClient::new(options, cap);
201 (
202 EventLoop {
203 el: eventloop,
204 state: ConnectionState::Disconnected,
205 },
206 Client { client },
207 )
208 }
209
210 async fn poll_rumqtt(&mut self) -> Option<Event> {
211 let event = self.el.poll().await;
212 match event {
213 Ok(event) => {
214 trace!("{event:?}");
215 match event {
216 rumqttc::v5::Event::Incoming(Packet::ConnAck(_)) => {
217 self.state = ConnectionState::Connected;
218 Some(Event::Online)
219 }
220 rumqttc::v5::Event::Incoming(Packet::Disconnect(_)) => {
221 self.state = ConnectionState::Disconnected;
222 Some(Event::Offline)
223 }
224 rumqttc::v5::Event::Incoming(Packet::Publish(publish)) => {
225 Some(topic_and_payload_to_event(
226 publish.topic.to_vec(),
227 publish.payload.to_vec(),
228 ))
229 }
230 rumqttc::v5::Event::Outgoing(Outgoing::Disconnect) => {
231 self.state = ConnectionState::ManualDisconnected;
232 Some(Event::Offline)
233 }
234 _ => None,
235 }
236 }
237 Err(e) => match self.state {
238 ConnectionState::Connected => {
239 error!("Client error: {e}");
240 self.state = ConnectionState::Disconnected;
241 Some(Event::Offline)
242 }
243 ConnectionState::Disconnected => {
244 error!("Client error on reconnect attempt: {e}");
245 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
246 None
247 }
248 ConnectionState::ManualDisconnected => None,
249 },
250 }
251 }
252}
253
254#[async_trait]
255impl srad_client::EventLoop for EventLoop {
256 async fn poll(&mut self) -> Event {
257 loop {
258 if let Some(event) = self.poll_rumqtt().await {
259 return event;
260 }
261 }
262 }
263
264 fn set_last_will(&mut self, will: LastWill) {
265 let qos = qos_to_mqtt_qos(will.qos);
266 let mqtt_will = rumqttc::v5::mqttbytes::v5::LastWill::new(
267 will.topic,
268 will.payload,
269 qos,
270 will.retain,
271 None,
272 );
273 self.el.options.set_last_will(mqtt_will);
274 }
275}