1#![warn(missing_docs)]
4#![deny(warnings)]
5
6use futures::{SinkExt, StreamExt};
7use std::{collections::HashMap, sync::Arc};
8
9pub use rumqttc::v5::mqttbytes::v5 as mqttbytes;
11pub use rumqttc::v5::mqttbytes::QoS;
12
13pub type Result<T> = std::result::Result<T, Error>;
15
16#[cfg(feature = "statistics")]
25#[derive(Default, Clone)]
26pub struct Statistics
27{
28 pub received_messages: u64,
30 pub received_duplicated_messages: u64,
32 pub published_messages: u64,
34 pub connection_ack: u64,
36 pub subscription_ack: u64,
38 pub publish_ack: u64,
40 pub publish_release: u64,
42 pub publish_received: u64,
44 pub publish_complete: u64,
46 pub ping_response: u64,
48}
49
50#[derive(thiserror::Error, Debug)]
57#[allow(missing_docs)]
58pub enum Error
59{
60 #[error("Already subscribed to topic {0}.")]
61 AlreadySubscribed(String),
62 #[error("Not subscribed to topic {0}.")]
63 NotSubscribed(String),
64 #[error("An internal error occured when sending a request {0}.")]
65 RequestChannel(#[from] futures::channel::mpsc::SendError),
66 #[error("Could not lock a mutex: {0}.")]
67 MutexPoison(String),
68 #[error("An error occured in the communication middleware {0}.")]
69 Communication(#[from] rumqttc::v5::ClientError),
70 #[error("MQTT option error {0}.")]
71 OptionError(#[from] rumqttc::v5::OptionError),
72 #[cfg(feature = "json")]
73 #[error("Error during json serialization {0}")]
74 JsonSerialisation(#[from] serde_json::Error),
75 #[cfg(feature = "cbor")]
76 #[error("Error during cbor serialization {0}")]
77 CborSerialisation(#[from] ciborium::ser::Error<std::io::Error>),
78}
79
80impl<T> From<std::sync::PoisonError<std::sync::MutexGuard<'_, T>>> for Error
81{
82 fn from(value: std::sync::PoisonError<std::sync::MutexGuard<'_, T>>) -> Self
83 {
84 Error::MutexPoison(value.to_string())
85 }
86}
87
88#[derive(thiserror::Error, Debug)]
95enum InternalError
96{
97 #[error("No channel for topic {0}")]
98 NoChannelFor(String),
99 #[error("An error occured in the communication middleware {0}.")]
100 Communication(#[from] rumqttc::v5::ConnectionError),
101 #[error("Error when broadcasting message {0}")]
102 AsyncBroadcast(String),
103 #[error("UTF-8 error {0}")]
104 Utf8Error(#[from] std::str::Utf8Error),
105}
106
107impl<T> From<async_broadcast::SendError<T>> for InternalError
108{
109 fn from(value: async_broadcast::SendError<T>) -> Self
110 {
111 Self::AsyncBroadcast(value.to_string())
112 }
113}
114
115#[derive(Clone)]
124pub struct Message<T>
125{
126 pub payload: T,
128 pub properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
130}
131
132pub type RawMessage = Message<bytes::Bytes>;
134
135enum Request
144{
145 Subscribe
146 {
147 topic: String,
148 sender: async_broadcast::Sender<RawMessage>,
149 qos: QoS,
150 },
151 Publish
152 {
153 topic: String,
154 qos: QoS,
155 retain: bool,
156 payload: bytes::Bytes,
157 properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
158 },
159}
160
161#[derive(Default)]
168struct ClientInner
169{
170 senders: HashMap<String, async_broadcast::Sender<RawMessage>>,
171 #[cfg(feature = "statistics")]
172 statistics: Statistics,
173 #[cfg(feature = "debug")]
174 last_logged_statistics: Statistics,
175}
176
177pub struct ClientBuilder
185{
186 mqttoptions: rumqttc::v5::MqttOptions,
187 cap: usize,
188}
189
190impl ClientBuilder
191{
192 pub fn set_capacity(mut self, cap: usize) -> Self
194 {
195 self.cap = cap;
196 self
197 }
198
199 async fn handle_events(
200 event: std::result::Result<rumqttc::v5::Event, rumqttc::v5::ConnectionError>,
201 client_inner: &Arc<futures::lock::Mutex<ClientInner>>,
202 ) -> std::result::Result<(), InternalError>
203 {
204 match event?
205 {
206 rumqttc::v5::Event::Incoming(packet) => match packet
207 {
208 rumqttc::v5::Incoming::Publish(pub_msg) =>
209 {
210 let topic_str = std::str::from_utf8(pub_msg.topic.as_ref())?;
211 #[cfg(feature = "statistics")]
212 let mut client_inner = client_inner.lock().await;
213 #[cfg(feature = "statistics")]
214 {
215 client_inner.statistics.received_messages += 1;
216 if pub_msg.dup
217 {
218 client_inner.statistics.received_duplicated_messages += 1;
219 }
220 }
221 #[cfg(not(feature = "statistics"))]
222 let client_inner = client_inner.lock().await;
223
224 match client_inner.senders.get(topic_str)
225 {
226 Some(s) =>
227 {
228 s.broadcast(RawMessage {
229 payload: pub_msg.payload,
230 properties: pub_msg.properties,
231 })
232 .await?;
233 }
234 None => Err(InternalError::NoChannelFor(topic_str.to_string()))?,
235 }
236 }
237 rumqttc::v5::Incoming::ConnAck(_) =>
238 {
239 #[cfg(feature = "statistics")]
240 {
241 client_inner.lock().await.statistics.connection_ack += 1;
242 }
243 }
244 rumqttc::v5::Incoming::SubAck(_) =>
245 {
246 #[cfg(feature = "statistics")]
247 {
248 client_inner.lock().await.statistics.subscription_ack += 1;
249 }
250 }
251 rumqttc::v5::Incoming::PubAck(_) =>
252 {
253 #[cfg(feature = "statistics")]
254 {
255 client_inner.lock().await.statistics.publish_ack += 1;
256 }
257 }
258 rumqttc::v5::Incoming::PubRel(_) =>
259 {
260 #[cfg(feature = "statistics")]
261 {
262 client_inner.lock().await.statistics.publish_release += 1;
263 }
264 }
265 rumqttc::v5::Incoming::PubRec(_) =>
266 {
267 #[cfg(feature = "statistics")]
268 {
269 client_inner.lock().await.statistics.publish_received += 1;
270 }
271 }
272 rumqttc::v5::Incoming::PubComp(_) =>
273 {
274 #[cfg(feature = "statistics")]
275 {
276 client_inner.lock().await.statistics.publish_complete += 1;
277 }
278 }
279 rumqttc::v5::Incoming::PingResp(_) =>
280 {
281 #[cfg(feature = "statistics")]
282 {
283 client_inner.lock().await.statistics.ping_response += 1;
284 }
285 }
286 _ =>
287 {
288 log::error!("Incoming unhandled packet {packet:?}");
289 }
290 },
291 rumqttc::v5::Event::Outgoing(_) =>
292 { }
293 }
294 #[cfg(feature = "debug")]
295 {
296 let mut client_inner = client_inner.lock().await;
297
298 if client_inner.statistics.received_messages
299 > client_inner.last_logged_statistics.received_messages + 1000
300 {
301 client_inner.last_logged_statistics = client_inner.statistics.clone();
302 log::info!(
303 "RECVMSG {} (DUP {}) PUBMSG {} CONACK {} PUBACK {} PUBREL {} PUBREC {} PUBCOMP {} PINGRESP {}",
304 client_inner.last_logged_statistics.received_messages,
305 client_inner.last_logged_statistics.received_duplicated_messages,
306 client_inner.last_logged_statistics.published_messages,
307 client_inner.last_logged_statistics.connection_ack,
308 client_inner.last_logged_statistics.publish_ack,
309 client_inner.last_logged_statistics.publish_release,
310 client_inner.last_logged_statistics.publish_received,
311 client_inner.last_logged_statistics.publish_complete,
312 client_inner.last_logged_statistics.ping_response
313 );
314 }
315 }
316
317 Ok(())
318 }
319 pub fn start(self) -> (Client, impl std::future::Future<Output = ()>)
321 {
322 let (client, mut eventloop) = rumqttc::v5::AsyncClient::new(self.mqttoptions, self.cap);
323
324 let (request_sender, mut request_receiver) = futures::channel::mpsc::channel(self.cap);
325
326 let fut = async move {
327 let client_inner: Arc<futures::lock::Mutex<ClientInner>> = Default::default();
328 let fut_mqtt = {
329 let client_inner = client_inner.clone();
330 async move {
331 loop
332 {
333 if let Err(e) = Self::handle_events(eventloop.poll().await, &client_inner).await
334 {
335 log::error!("An error occured when handling MQTT event: {e:?}");
336 }
337 }
338 }
339 };
340 let fut_req = async move {
341 loop
342 {
343 let r = match request_receiver.next().await
344 {
345 Some(Request::Subscribe { topic, sender, qos }) =>
346 {
347 client_inner
348 .lock()
349 .await
350 .senders
351 .insert(topic.clone(), sender);
352 client.subscribe(topic, qos).await
353 }
354 Some(Request::Publish {
355 topic,
356 qos,
357 retain,
358 payload,
359 properties,
360 }) =>
361 {
362 #[cfg(feature = "statistics")]
363 {
364 client_inner.lock().await.statistics.published_messages += 1;
365 }
366 match properties
367 {
368 Some(properties) =>
369 {
370 client
371 .publish_with_properties(topic, qos, retain, payload, properties)
372 .await
373 }
374 None => client.publish(topic, qos, retain, payload).await,
375 }
376 }
377 None => Ok(()),
378 };
379 if let Err(e) = r
380 {
381 log::error!("An error occured when evaluating requests: {e:?}");
382 }
383 }
384 };
385 futures::join!(fut_mqtt, fut_req);
386 };
387
388 (
389 Client {
390 request_sender,
391 subscriptions: Default::default(),
392 },
393 fut,
394 )
395 }
396}
397
398#[derive(Clone)]
406pub struct Client
407{
408 subscriptions: std::sync::Arc<
409 futures::lock::Mutex<HashMap<String, async_broadcast::InactiveReceiver<RawMessage>>>,
410 >,
411 request_sender: futures::channel::mpsc::Sender<Request>,
412}
413
414macro_rules! _create_raw_subscription {
415 ($request_sender: expr, $subs: ident, $topic: ident, $qos: ident, $cap: ident) => {{
416 let (sender, r) = async_broadcast::broadcast::<RawMessage>($cap);
417 $subs.insert($topic.clone(), r.clone().deactivate());
418 $request_sender
419 .send(Request::Subscribe {
420 topic: $topic,
421 sender,
422 qos: $qos,
423 })
424 .await?;
425 Ok(r)
426 }};
427}
428
429impl Client
430{
431 pub fn build(node_id: impl Into<String>, hostname: impl Into<String>, port: u16)
433 -> ClientBuilder
434 {
435 let mut mqttoptions = rumqttc::v5::MqttOptions::new(node_id, hostname, port);
436 mqttoptions.set_keep_alive(std::time::Duration::from_secs(5));
437
438 ClientBuilder {
439 mqttoptions,
440 cap: 1000,
441 }
442 }
443 pub fn build_from_url(url: impl Into<String>) -> Result<ClientBuilder>
445 {
446 let mut mqttoptions = rumqttc::v5::MqttOptions::parse_url(url)?;
447 mqttoptions.set_keep_alive(std::time::Duration::from_secs(5));
448
449 Ok(ClientBuilder {
450 mqttoptions,
451 cap: 1000,
452 })
453 }
454 pub async fn publish_raw(
456 mut self,
457 topic: impl Into<String>,
458 qos: QoS,
459 retain: bool,
460 payload: impl Into<bytes::Bytes>,
461 properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
462 ) -> Result<()>
463 {
464 self
465 .request_sender
466 .send(Request::Publish {
467 topic: topic.into(),
468 qos,
469 retain,
470 payload: payload.into(),
471 properties,
472 })
473 .await?;
474 Ok(())
475 }
476 #[cfg(feature = "json")]
478 pub fn publish_json<T: serde::Serialize>(
479 self,
480 topic: impl Into<String>,
481 qos: QoS,
482 retain: bool,
483 payload: T,
484 properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
485 ) -> Result<impl std::future::Future<Output = Result<()>>>
486 {
487 let properties = match properties
488 {
489 None => rumqttc::v5::mqttbytes::v5::PublishProperties {
490 content_type: Some("application/json").map(str::to_string),
491 ..Default::default()
492 },
493 Some(props) =>
494 {
495 let mut props = props.clone();
496 props.content_type = Some("application/json".to_string());
497 props
498 }
499 };
500 let payload_raw = serde_json::to_string(&payload)?;
501
502 Ok(self.publish_raw(topic, qos, retain, payload_raw, Some(properties)))
503 }
504 #[cfg(feature = "cbor")]
506 pub fn publish_cbor<T: serde::Serialize>(
507 self,
508 topic: impl Into<String>,
509 qos: QoS,
510 retain: bool,
511 payload: T,
512 properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
513 ) -> Result<impl std::future::Future<Output = Result<()>>>
514 {
515 let properties = match properties
516 {
517 None => rumqttc::v5::mqttbytes::v5::PublishProperties {
518 content_type: Some("application/cbor").map(str::to_string),
519 ..Default::default()
520 },
521 Some(props) =>
522 {
523 let mut props = props.clone();
524 props.content_type = Some("application/cbor".to_string());
525 props
526 }
527 };
528 let mut payload_raw = Vec::<u8>::new();
529 ciborium::into_writer(&payload, &mut payload_raw)?;
530
531 Ok(self.publish_raw(topic, qos, retain, payload_raw, Some(properties)))
532 }
533 pub async fn create_raw_subscription(
535 mut self,
536 topic: impl Into<String>,
537 qos: QoS,
538 cap: usize,
539 ) -> Result<async_broadcast::Receiver<RawMessage>>
540 {
541 let mut subs = self.subscriptions.lock().await;
542 let topic = topic.into();
543 match subs.get(&topic)
544 {
545 Some(_) => Err(Error::AlreadySubscribed(topic)),
546 None => _create_raw_subscription!(self.request_sender, subs, topic, qos, cap),
547 }
548 }
549 pub fn get_raw_subscription(
551 &self,
552 topic: impl Into<String>,
553 ) -> Result<async_broadcast::Receiver<RawMessage>>
554 {
555 let topic = topic.into();
556 futures::executor::block_on(self.subscriptions.lock())
557 .get(&topic)
558 .ok_or_else(|| Error::NotSubscribed(topic))
559 .map(|x| x.activate_cloned())
560 }
561 pub async fn get_or_create_raw_subscription(
565 mut self,
566 topic: impl Into<String>,
567 qos: QoS,
568 cap: usize,
569 ) -> Result<async_broadcast::Receiver<RawMessage>>
570 {
571 let mut subs = self.subscriptions.lock().await;
572 let topic = topic.into();
573 match subs.get(&topic)
574 {
575 Some(s) => Ok(s.activate_cloned()),
576 None => _create_raw_subscription!(self.request_sender, subs, topic, qos, cap),
577 }
578 }
579 #[cfg(feature = "json")]
581 pub async fn get_or_create_json_subscription<T>(
582 self,
583 topic: impl Into<String>,
584 qos: QoS,
585 cap: usize,
586 ) -> Result<impl futures::Stream<Item = Message<T>>>
587 where
588 for<'de> T: serde::Deserialize<'de>,
589 {
590 use bytes::Buf;
591 let raw_s = self.get_or_create_raw_subscription(topic, qos, cap).await?;
592 let map_s = raw_s.filter_map(|msg| async move {
593 let payload_r = serde_json::from_slice::<T>(msg.payload.chunk());
594 match payload_r
595 {
596 Ok(payload) => Some(Message::<T> {
597 payload,
598 properties: msg.properties,
599 }),
600 Err(e) =>
601 {
602 log::error!("Failed to parse JSON message: {e}");
603 None
604 }
605 }
606 });
607 Ok(map_s)
608 }
609}
610
611#[cfg(test)]
612mod tests
613{
614 #[test]
615 fn it_works()
616 {
617 assert_eq!(4, 4);
618 }
619}