mqtt_channel/
lib.rs

1//! Provide channel-based subscription to a MQTT broker.
2
3#![warn(missing_docs)]
4
5use futures::{SinkExt, StreamExt};
6use std::{collections::HashMap, sync::Arc};
7
8// Re-export some rumqttc::4v5 useful types
9pub use rumqttc::v5::mqttbytes::v5 as mqttbytes;
10pub use rumqttc::v5::mqttbytes::QoS;
11
12/// Result type for mqtt-channel
13pub type Result<T> = std::result::Result<T, Error>;
14
15//  ____  _        _   _     _   _
16// / ___|| |_ __ _| |_(_)___| |_(_) ___ ___
17// \___ \| __/ _` | __| / __| __| |/ __/ __|
18//  ___) | || (_| | |_| \__ \ |_| | (__\__ \
19// |____/ \__\__,_|\__|_|___/\__|_|\___|___/
20//
21
22/// Statistics of the MQTT Client
23#[cfg(feature = "statistics")]
24#[derive(Default, Clone)]
25pub struct Statistics
26{
27  /// number of received messages
28  pub received_messages: u64,
29  /// number of duplicated received messages
30  pub received_duplicated_messages: u64,
31  /// number of published messages
32  pub published_messages: u64,
33  /// Number of connection acknowledgement received [CONNACK](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901074)
34  pub connection_ack: u64,
35  /// Number of subscription acknowledgement received
36  pub subscription_ack: u64,
37  /// Number of publish acknowledgement received [PUBACK](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901121)
38  pub publish_ack: u64,
39  /// Number of publish released received [PUBREL](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901141)
40  pub publish_release: u64,
41  /// Number of publish received received [PUBREC](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901131)
42  pub publish_received: u64,
43  /// Number of publish complete received [PUBCOMP](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901151)
44  pub publish_complete: u64,
45  /// Number of ping response released received [PINGRESP](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901200)
46  pub ping_response: u64,
47}
48
49//  _____
50// | ____|_ __ _ __ ___  _ __
51// |  _| | '__| '__/ _ \| '__|
52// | |___| |  | | | (_) | |
53// |_____|_|  |_|  \___/|_|
54//
55#[derive(thiserror::Error, Debug)]
56#[allow(missing_docs)]
57#[non_exhaustive]
58pub enum Error
59{
60  #[error("Already subscribed to topic {0}.")]
61  AlreadySubscribed(String),
62  #[error("Not subscribed to topic {0}.")]
63  NotSubscribed(String),
64  #[error("An internal error occured when sending a request {0}.")]
65  RequestChannel(#[from] futures::channel::mpsc::SendError),
66  #[error("Could not lock a mutex: {0}.")]
67  MutexPoison(String),
68  #[error("An error occured in the communication middleware {0}.")]
69  Communication(#[from] Box<rumqttc::v5::ClientError>),
70  #[error("MQTT option error {0}.")]
71  OptionError(#[from] rumqttc::v5::OptionError),
72  #[cfg(feature = "json")]
73  #[error("Error during json serialization {0}")]
74  JsonSerialisation(#[from] serde_json::Error),
75  #[cfg(feature = "cbor")]
76  #[error("Error during cbor serialization {0}")]
77  CborSerialisation(#[from] ciborium::ser::Error<std::io::Error>),
78  #[cfg(feature = "cbor")]
79  #[error("Error during cbor deserialization {0}")]
80  CborDeserialisation(#[from] ciborium::de::Error<std::io::Error>),
81  #[error("Missing properties")]
82  MissingProperties,
83  #[error("Missing content type")]
84  MissingContentType,
85  #[error("Unsupported content type: {0}")]
86  UnsupportedConentType(String),
87}
88
89impl<T> From<std::sync::PoisonError<std::sync::MutexGuard<'_, T>>> for Error
90{
91  fn from(value: std::sync::PoisonError<std::sync::MutexGuard<'_, T>>) -> Self
92  {
93    Error::MutexPoison(value.to_string())
94  }
95}
96
97//  ___       _                        _ _____
98// |_ _|_ __ | |_ ___ _ __ _ __   __ _| | ____|_ __ _ __ ___  _ __
99//  | || '_ \| __/ _ \ '__| '_ \ / _` | |  _| | '__| '__/ _ \| '__|
100//  | || | | | ||  __/ |  | | | | (_| | | |___| |  | | | (_) | |
101// |___|_| |_|\__\___|_|  |_| |_|\__,_|_|_____|_|  |_|  \___/|_|
102
103#[derive(thiserror::Error, Debug)]
104enum InternalError
105{
106  #[error("No channel for topic {0}")]
107  NoChannelFor(String),
108  #[error("An error occured in the communication middleware {0}.")]
109  Communication(#[from] rumqttc::v5::ConnectionError),
110  #[error("Error when broadcasting message {0}")]
111  AsyncBroadcast(String),
112  #[error("UTF-8 error {0}")]
113  Utf8Error(#[from] std::str::Utf8Error),
114}
115
116impl<T> From<async_broadcast::SendError<T>> for InternalError
117{
118  fn from(value: async_broadcast::SendError<T>) -> Self
119  {
120    Self::AsyncBroadcast(value.to_string())
121  }
122}
123
124//  __  __
125// |  \/  | ___  ___ ___  __ _  __ _  ___
126// | |\/| |/ _ \/ __/ __|/ _` |/ _` |/ _ \
127// | |  | |  __/\__ \__ \ (_| | (_| |  __/
128// |_|  |_|\___||___/___/\__,_|\__, |\___|
129//                             |___/
130
131/// Message received from MQTT
132#[derive(Clone)]
133pub struct Message<T>
134{
135  /// Raw data
136  pub payload: T,
137  /// Properties of the message, check [rumqttc documentation](https://docs.rs/rumqttc/latest/rumqttc/v5/mqttbytes/v5/struct.PublishProperties.html) for more information.
138  pub properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
139}
140
141/// Raw message
142pub type RawMessage = Message<bytes::Bytes>;
143
144//  ____                            _
145// |  _ \ ___  __ _ _   _  ___  ___| |_
146// | |_) / _ \/ _` | | | |/ _ \/ __| __|
147// |  _ <  __/ (_| | |_| |  __/\__ \ |_
148// |_| \_\___|\__, |\__,_|\___||___/\__|
149//               |_|
150
151/// Request sent to the internal MQTT tasks
152enum Request
153{
154  Subscribe
155  {
156    topic: String,
157    sender: async_broadcast::Sender<RawMessage>,
158    qos: QoS,
159  },
160  Publish
161  {
162    topic: String,
163    qos: QoS,
164    retain: bool,
165    payload: bytes::Bytes,
166    properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
167  },
168}
169
170//   ____ _ _            _   ___
171//  / ___| (_) ___ _ __ | |_|_ _|_ __  _ __   ___ _ __
172// | |   | | |/ _ \ '_ \| __|| || '_ \| '_ \ / _ \ '__|
173// | |___| | |  __/ | | | |_ | || | | | | | |  __/ |
174//  \____|_|_|\___|_| |_|\__|___|_| |_|_| |_|\___|_|
175
176#[derive(Default)]
177struct ClientInner
178{
179  senders: HashMap<String, async_broadcast::Sender<RawMessage>>,
180  #[cfg(feature = "statistics")]
181  statistics: Statistics,
182  #[cfg(feature = "debug")]
183  last_logged_statistics: Statistics,
184}
185
186//   ____ _ _            _   ____        _ _     _
187//  / ___| (_) ___ _ __ | |_| __ ) _   _(_) | __| | ___ _ __
188// | |   | | |/ _ \ '_ \| __|  _ \| | | | | |/ _` |/ _ \ '__|
189// | |___| | |  __/ | | | |_| |_) | |_| | | | (_| |  __/ |
190//  \____|_|_|\___|_| |_|\__|____/ \__,_|_|_|\__,_|\___|_|
191
192/// Used to setup a new connection to a MQTT Broker
193pub struct ClientBuilder
194{
195  mqttoptions: rumqttc::v5::MqttOptions,
196  cap: usize,
197}
198
199impl ClientBuilder
200{
201  /// Set the capacity of the reception buffer
202  pub fn set_capacity(mut self, cap: usize) -> Self
203  {
204    self.cap = cap;
205    self
206  }
207
208  async fn handle_events(
209    event: std::result::Result<rumqttc::v5::Event, rumqttc::v5::ConnectionError>,
210    client_inner: &Arc<futures::lock::Mutex<ClientInner>>,
211  ) -> std::result::Result<(), InternalError>
212  {
213    match event?
214    {
215      rumqttc::v5::Event::Incoming(packet) => match packet
216      {
217        rumqttc::v5::Incoming::Publish(pub_msg) =>
218        {
219          let topic_str = std::str::from_utf8(pub_msg.topic.as_ref())?;
220          #[cfg(feature = "statistics")]
221          let mut client_inner = client_inner.lock().await;
222          #[cfg(feature = "statistics")]
223          {
224            client_inner.statistics.received_messages += 1;
225            if pub_msg.dup
226            {
227              client_inner.statistics.received_duplicated_messages += 1;
228            }
229          }
230          #[cfg(not(feature = "statistics"))]
231          let client_inner = client_inner.lock().await;
232
233          match client_inner.senders.get(topic_str)
234          {
235            Some(s) =>
236            {
237              s.broadcast(RawMessage {
238                payload: pub_msg.payload,
239                properties: pub_msg.properties,
240              })
241              .await?;
242            }
243            None => Err(InternalError::NoChannelFor(topic_str.to_string()))?,
244          }
245        }
246        rumqttc::v5::Incoming::ConnAck(_) =>
247        {
248          #[cfg(feature = "statistics")]
249          {
250            client_inner.lock().await.statistics.connection_ack += 1;
251          }
252        }
253        rumqttc::v5::Incoming::SubAck(_) =>
254        {
255          #[cfg(feature = "statistics")]
256          {
257            client_inner.lock().await.statistics.subscription_ack += 1;
258          }
259        }
260        rumqttc::v5::Incoming::PubAck(_) =>
261        {
262          #[cfg(feature = "statistics")]
263          {
264            client_inner.lock().await.statistics.publish_ack += 1;
265          }
266        }
267        rumqttc::v5::Incoming::PubRel(_) =>
268        {
269          #[cfg(feature = "statistics")]
270          {
271            client_inner.lock().await.statistics.publish_release += 1;
272          }
273        }
274        rumqttc::v5::Incoming::PubRec(_) =>
275        {
276          #[cfg(feature = "statistics")]
277          {
278            client_inner.lock().await.statistics.publish_received += 1;
279          }
280        }
281        rumqttc::v5::Incoming::PubComp(_) =>
282        {
283          #[cfg(feature = "statistics")]
284          {
285            client_inner.lock().await.statistics.publish_complete += 1;
286          }
287        }
288        rumqttc::v5::Incoming::PingResp(_) =>
289        {
290          #[cfg(feature = "statistics")]
291          {
292            client_inner.lock().await.statistics.ping_response += 1;
293          }
294        }
295        _ =>
296        {
297          log::error!("Incoming unhandled packet {packet:?}");
298        }
299      },
300      rumqttc::v5::Event::Outgoing(_) =>
301      { /* ignore outgoing event */ }
302    }
303    #[cfg(feature = "debug")]
304    {
305      let mut client_inner = client_inner.lock().await;
306
307      if client_inner.statistics.received_messages
308        > client_inner.last_logged_statistics.received_messages + 1000
309      {
310        client_inner.last_logged_statistics = client_inner.statistics.clone();
311        log::info!(
312          "RECVMSG {} (DUP {}) PUBMSG {} CONACK {} PUBACK {} PUBREL {} PUBREC {} PUBCOMP {} PINGRESP {}",
313          client_inner.last_logged_statistics.received_messages,
314          client_inner.last_logged_statistics.received_duplicated_messages,
315          client_inner.last_logged_statistics.published_messages,
316          client_inner.last_logged_statistics.connection_ack,
317          client_inner.last_logged_statistics.publish_ack,
318          client_inner.last_logged_statistics.publish_release,
319          client_inner.last_logged_statistics.publish_received,
320          client_inner.last_logged_statistics.publish_complete,
321          client_inner.last_logged_statistics.ping_response
322        );
323      }
324    }
325
326    Ok(())
327  }
328  /// Start the connection
329  pub fn start(self) -> (Client, impl std::future::Future<Output = ()>)
330  {
331    let (client, mut eventloop) = rumqttc::v5::AsyncClient::new(self.mqttoptions, self.cap);
332
333    let (request_sender, mut request_receiver) = futures::channel::mpsc::channel(self.cap);
334
335    let fut = async move {
336      let client_inner: Arc<futures::lock::Mutex<ClientInner>> = Default::default();
337      let fut_mqtt = {
338        let client_inner = client_inner.clone();
339        async move {
340          loop
341          {
342            if let Err(e) = Self::handle_events(eventloop.poll().await, &client_inner).await
343            {
344              log::error!("An error occured when handling MQTT event: {e:?}");
345            }
346          }
347        }
348      };
349      let fut_req = async move {
350        loop
351        {
352          let r = match request_receiver.next().await
353          {
354            Some(Request::Subscribe { topic, sender, qos }) =>
355            {
356              client_inner
357                .lock()
358                .await
359                .senders
360                .insert(topic.clone(), sender);
361              client.subscribe(topic, qos).await
362            }
363            Some(Request::Publish {
364              topic,
365              qos,
366              retain,
367              payload,
368              properties,
369            }) =>
370            {
371              #[cfg(feature = "statistics")]
372              {
373                client_inner.lock().await.statistics.published_messages += 1;
374              }
375              match properties
376              {
377                Some(properties) =>
378                {
379                  client
380                    .publish_with_properties(topic, qos, retain, payload, properties)
381                    .await
382                }
383                None => client.publish(topic, qos, retain, payload).await,
384              }
385            }
386            None => Ok(()),
387          };
388          if let Err(e) = r
389          {
390            log::error!("An error occured when evaluating requests: {e:?}");
391          }
392        }
393      };
394      futures::join!(fut_mqtt, fut_req);
395    };
396
397    (
398      Client {
399        request_sender,
400        subscriptions: Default::default(),
401      },
402      fut,
403    )
404  }
405}
406
407//   ____ _ _            _
408//  / ___| (_) ___ _ __ | |_
409// | |   | | |/ _ \ '_ \| __|
410// | |___| | |  __/ | | | |_
411//  \____|_|_|\___|_| |_|\__|
412
413/// Handle to the connection
414#[derive(Clone)]
415pub struct Client
416{
417  subscriptions: std::sync::Arc<
418    futures::lock::Mutex<HashMap<String, async_broadcast::InactiveReceiver<RawMessage>>>,
419  >,
420  request_sender: futures::channel::mpsc::Sender<Request>,
421}
422
423macro_rules! _create_raw_subscription {
424  ($request_sender: expr, $subs: ident, $topic: ident, $qos: ident, $cap: ident) => {{
425    let (sender, r) = async_broadcast::broadcast::<RawMessage>($cap);
426    $subs.insert($topic.clone(), r.clone().deactivate());
427    $request_sender
428      .send(Request::Subscribe {
429        topic: $topic,
430        sender,
431        qos: $qos,
432      })
433      .await?;
434    Ok(r)
435  }};
436}
437
438impl Client
439{
440  /// Create a new MQTT Client, with the given name to connect at the given and MQTT broker
441  pub fn build(node_id: impl Into<String>, hostname: impl Into<String>, port: u16)
442    -> ClientBuilder
443  {
444    let mut mqttoptions = rumqttc::v5::MqttOptions::new(node_id, hostname, port);
445    mqttoptions.set_keep_alive(std::time::Duration::from_secs(5));
446
447    ClientBuilder {
448      mqttoptions,
449      cap: 1000,
450    }
451  }
452  /// Create a new MQTT Client from the given url of the form `mqtt://example.com:1883?client_id=123`
453  pub fn build_from_url(url: impl Into<String>) -> Result<ClientBuilder>
454  {
455    let mut mqttoptions = rumqttc::v5::MqttOptions::parse_url(url)?;
456    mqttoptions.set_keep_alive(std::time::Duration::from_secs(5));
457
458    Ok(ClientBuilder {
459      mqttoptions,
460      cap: 1000,
461    })
462  }
463  /// Publish a message on a topic
464  pub async fn publish_raw(
465    mut self,
466    topic: impl Into<String>,
467    qos: QoS,
468    retain: bool,
469    payload: impl Into<bytes::Bytes>,
470    properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
471  ) -> Result<()>
472  {
473    self
474      .request_sender
475      .send(Request::Publish {
476        topic: topic.into(),
477        qos,
478        retain,
479        payload: payload.into(),
480        properties,
481      })
482      .await?;
483    Ok(())
484  }
485  /// Publish a message on a topic, serialized to JSON
486  #[cfg(feature = "json")]
487  pub fn publish_json<T: serde::Serialize>(
488    self,
489    topic: impl Into<String>,
490    qos: QoS,
491    retain: bool,
492    payload: T,
493    properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
494  ) -> Result<impl std::future::Future<Output = Result<()>>>
495  {
496    let properties = match properties
497    {
498      None => rumqttc::v5::mqttbytes::v5::PublishProperties {
499        content_type: Some("application/json").map(str::to_string),
500        ..Default::default()
501      },
502      Some(props) =>
503      {
504        let mut props = props.clone();
505        props.content_type = Some("application/json".to_string());
506        props
507      }
508    };
509    let payload_raw = serde_json::to_string(&payload)?;
510
511    Ok(self.publish_raw(topic, qos, retain, payload_raw, Some(properties)))
512  }
513  /// Publish a message on a topic, serialized to JSON
514  #[cfg(feature = "cbor")]
515  pub fn publish_cbor<T: serde::Serialize>(
516    self,
517    topic: impl Into<String>,
518    qos: QoS,
519    retain: bool,
520    payload: T,
521    properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
522  ) -> Result<impl std::future::Future<Output = Result<()>>>
523  {
524    let properties = match properties
525    {
526      None => rumqttc::v5::mqttbytes::v5::PublishProperties {
527        content_type: Some("application/cbor").map(str::to_string),
528        ..Default::default()
529      },
530      Some(props) =>
531      {
532        let mut props = props.clone();
533        props.content_type = Some("application/cbor".to_string());
534        props
535      }
536    };
537    let mut payload_raw = Vec::<u8>::new();
538    ciborium::into_writer(&payload, &mut payload_raw)?;
539
540    Ok(self.publish_raw(topic, qos, retain, payload_raw, Some(properties)))
541  }
542  /// Create a raw subscription
543  pub async fn create_raw_subscription(
544    mut self,
545    topic: impl Into<String>,
546    qos: QoS,
547    cap: usize,
548  ) -> Result<async_broadcast::Receiver<RawMessage>>
549  {
550    let mut subs = self.subscriptions.lock().await;
551    let topic = topic.into();
552    match subs.get(&topic)
553    {
554      Some(_) => Err(Error::AlreadySubscribed(topic)),
555      None => _create_raw_subscription!(self.request_sender, subs, topic, qos, cap),
556    }
557  }
558  /// Get an existing subscription
559  pub fn get_raw_subscription(
560    &self,
561    topic: impl Into<String>,
562  ) -> Result<async_broadcast::Receiver<RawMessage>>
563  {
564    let topic = topic.into();
565    futures::executor::block_on(self.subscriptions.lock())
566      .get(&topic)
567      .ok_or_else(|| Error::NotSubscribed(topic))
568      .map(|x| x.activate_cloned())
569  }
570  /// Get an existing subscription, or create a new one with the given QoS
571  /// This function is guaranteed to return, however if the subscription
572  /// already exists, the QoS is ignored.
573  pub async fn get_or_create_raw_subscription(
574    mut self,
575    topic: impl Into<String>,
576    qos: QoS,
577    cap: usize,
578  ) -> Result<async_broadcast::Receiver<RawMessage>>
579  {
580    let mut subs = self.subscriptions.lock().await;
581    let topic = topic.into();
582    match subs.get(&topic)
583    {
584      Some(s) => Ok(s.activate_cloned()),
585      None => _create_raw_subscription!(self.request_sender, subs, topic, qos, cap),
586    }
587  }
588  /// Create a subscription that use serde to deserialize., aka, topic will be parsed using serde_json.
589  #[cfg(any(feature = "json", feature = "cbor"))]
590  pub async fn get_or_create_serde_subscription<T>(
591    self,
592    topic: impl Into<String>,
593    qos: QoS,
594    cap: usize,
595  ) -> Result<impl futures::Stream<Item = Result<Message<T>>>>
596  where
597    for<'de> T: serde::Deserialize<'de>,
598  {
599    use bytes::Buf;
600    let raw_s = self.get_or_create_raw_subscription(topic, qos, cap).await?;
601    let map_s = raw_s.map(|msg| match &msg.properties
602    {
603      Some(properties) => match properties.content_type.as_ref().map(|x| x.as_str())
604      {
605        #[cfg(feature = "json")]
606        Some("application/json") =>
607        {
608          let payload = serde_json::from_slice::<T>(msg.payload.chunk())?;
609          Ok(Message::<T> {
610            payload,
611            properties: msg.properties,
612          })
613        }
614        #[cfg(feature = "cbor")]
615        Some("application/cbor") =>
616        {
617          let payload = ciborium::from_reader(msg.payload.chunk())?;
618          Ok(Message::<T> {
619            payload,
620            properties: msg.properties,
621          })
622        }
623        Some(o) => Err(Error::UnsupportedConentType(o.to_string())),
624        None => Err(Error::MissingContentType),
625      },
626
627      None => Err(Error::MissingProperties),
628    });
629    Ok(map_s)
630  }
631  /// Create a JSON subscription, aka, topic will be parsed using serde_json.
632  #[cfg(feature = "json")]
633  pub async fn get_or_create_json_subscription<T>(
634    self,
635    topic: impl Into<String>,
636    qos: QoS,
637    cap: usize,
638  ) -> Result<impl futures::Stream<Item = Message<T>>>
639  where
640    for<'de> T: serde::Deserialize<'de>,
641  {
642    use bytes::Buf;
643    let raw_s = self.get_or_create_raw_subscription(topic, qos, cap).await?;
644    let map_s = raw_s.filter_map(|msg| async move {
645      let payload_r = serde_json::from_slice::<T>(msg.payload.chunk());
646      match payload_r
647      {
648        Ok(payload) => Some(Message::<T> {
649          payload,
650          properties: msg.properties,
651        }),
652        Err(e) =>
653        {
654          log::error!("Failed to parse JSON message: {e}");
655          None
656        }
657      }
658    });
659    Ok(map_s)
660  }
661}
662
663#[cfg(test)]
664mod tests
665{
666  #[test]
667  fn it_works()
668  {
669    assert_eq!(4, 4);
670  }
671}