mqtt_channel/
lib.rs

1//! Provide channel-based subscription to a MQTT broker.
2
3#![warn(missing_docs)]
4#![deny(warnings)]
5
6use futures::{SinkExt, StreamExt};
7use std::{collections::HashMap, sync::Arc};
8
9// Re-export some rumqttc::4v5 useful types
10pub use rumqttc::v5::mqttbytes::v5 as mqttbytes;
11pub use rumqttc::v5::mqttbytes::QoS;
12
13/// Result type for mqtt-channel
14pub type Result<T> = std::result::Result<T, Error>;
15
16//  ____  _        _   _     _   _
17// / ___|| |_ __ _| |_(_)___| |_(_) ___ ___
18// \___ \| __/ _` | __| / __| __| |/ __/ __|
19//  ___) | || (_| | |_| \__ \ |_| | (__\__ \
20// |____/ \__\__,_|\__|_|___/\__|_|\___|___/
21//
22
23/// Statistics of the MQTT Client
24#[cfg(feature = "statistics")]
25#[derive(Default, Clone)]
26pub struct Statistics
27{
28  /// number of received messages
29  pub received_messages: u64,
30  /// number of duplicated received messages
31  pub received_duplicated_messages: u64,
32  /// number of published messages
33  pub published_messages: u64,
34  /// Number of connection acknowledgement received [CONNACK](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901074)
35  pub connection_ack: u64,
36  /// Number of subscription acknowledgement received
37  pub subscription_ack: u64,
38  /// Number of publish acknowledgement received [PUBACK](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901121)
39  pub publish_ack: u64,
40  /// Number of publish released received [PUBREL](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901141)
41  pub publish_release: u64,
42  /// Number of publish received received [PUBREC](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901131)
43  pub publish_received: u64,
44  /// Number of publish complete received [PUBCOMP](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901151)
45  pub publish_complete: u64,
46  /// Number of ping response released received [PINGRESP](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901200)
47  pub ping_response: u64,
48}
49
50//  _____
51// | ____|_ __ _ __ ___  _ __
52// |  _| | '__| '__/ _ \| '__|
53// | |___| |  | | | (_) | |
54// |_____|_|  |_|  \___/|_|
55//
56#[derive(thiserror::Error, Debug)]
57#[allow(missing_docs)]
58pub enum Error
59{
60  #[error("Already subscribed to topic {0}.")]
61  AlreadySubscribed(String),
62  #[error("Not subscribed to topic {0}.")]
63  NotSubscribed(String),
64  #[error("An internal error occured when sending a request {0}.")]
65  RequestChannel(#[from] futures::channel::mpsc::SendError),
66  #[error("Could not lock a mutex: {0}.")]
67  MutexPoison(String),
68  #[error("An error occured in the communication middleware {0}.")]
69  Communication(#[from] rumqttc::v5::ClientError),
70  #[error("MQTT option error {0}.")]
71  OptionError(#[from] rumqttc::v5::OptionError),
72  #[cfg(feature = "json")]
73  #[error("Error during json serialization {0}")]
74  JsonSerialisation(#[from] serde_json::Error),
75  #[cfg(feature = "cbor")]
76  #[error("Error during cbor serialization {0}")]
77  CborSerialisation(#[from] ciborium::ser::Error<std::io::Error>),
78}
79
80impl<T> From<std::sync::PoisonError<std::sync::MutexGuard<'_, T>>> for Error
81{
82  fn from(value: std::sync::PoisonError<std::sync::MutexGuard<'_, T>>) -> Self
83  {
84    Error::MutexPoison(value.to_string())
85  }
86}
87
88//  ___       _                        _ _____
89// |_ _|_ __ | |_ ___ _ __ _ __   __ _| | ____|_ __ _ __ ___  _ __
90//  | || '_ \| __/ _ \ '__| '_ \ / _` | |  _| | '__| '__/ _ \| '__|
91//  | || | | | ||  __/ |  | | | | (_| | | |___| |  | | | (_) | |
92// |___|_| |_|\__\___|_|  |_| |_|\__,_|_|_____|_|  |_|  \___/|_|
93
94#[derive(thiserror::Error, Debug)]
95enum InternalError
96{
97  #[error("No channel for topic {0}")]
98  NoChannelFor(String),
99  #[error("An error occured in the communication middleware {0}.")]
100  Communication(#[from] rumqttc::v5::ConnectionError),
101  #[error("Error when broadcasting message {0}")]
102  AsyncBroadcast(String),
103  #[error("UTF-8 error {0}")]
104  Utf8Error(#[from] std::str::Utf8Error),
105}
106
107impl<T> From<async_broadcast::SendError<T>> for InternalError
108{
109  fn from(value: async_broadcast::SendError<T>) -> Self
110  {
111    Self::AsyncBroadcast(value.to_string())
112  }
113}
114
115//  __  __
116// |  \/  | ___  ___ ___  __ _  __ _  ___
117// | |\/| |/ _ \/ __/ __|/ _` |/ _` |/ _ \
118// | |  | |  __/\__ \__ \ (_| | (_| |  __/
119// |_|  |_|\___||___/___/\__,_|\__, |\___|
120//                             |___/
121
122/// Message received from MQTT
123#[derive(Clone)]
124pub struct Message<T>
125{
126  /// Raw data
127  pub payload: T,
128  /// Properties of the message, check [rumqttc documentation](https://docs.rs/rumqttc/latest/rumqttc/v5/mqttbytes/v5/struct.PublishProperties.html) for more information.
129  pub properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
130}
131
132/// Raw message
133pub type RawMessage = Message<bytes::Bytes>;
134
135//  ____                            _
136// |  _ \ ___  __ _ _   _  ___  ___| |_
137// | |_) / _ \/ _` | | | |/ _ \/ __| __|
138// |  _ <  __/ (_| | |_| |  __/\__ \ |_
139// |_| \_\___|\__, |\__,_|\___||___/\__|
140//               |_|
141
142/// Request sent to the internal MQTT tasks
143enum Request
144{
145  Subscribe
146  {
147    topic: String,
148    sender: async_broadcast::Sender<RawMessage>,
149    qos: QoS,
150  },
151  Publish
152  {
153    topic: String,
154    qos: QoS,
155    retain: bool,
156    payload: bytes::Bytes,
157    properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
158  },
159}
160
161//   ____ _ _            _   ___
162//  / ___| (_) ___ _ __ | |_|_ _|_ __  _ __   ___ _ __
163// | |   | | |/ _ \ '_ \| __|| || '_ \| '_ \ / _ \ '__|
164// | |___| | |  __/ | | | |_ | || | | | | | |  __/ |
165//  \____|_|_|\___|_| |_|\__|___|_| |_|_| |_|\___|_|
166
167#[derive(Default)]
168struct ClientInner
169{
170  senders: HashMap<String, async_broadcast::Sender<RawMessage>>,
171  #[cfg(feature = "statistics")]
172  statistics: Statistics,
173  #[cfg(feature = "debug")]
174  last_logged_statistics: Statistics,
175}
176
177//   ____ _ _            _   ____        _ _     _
178//  / ___| (_) ___ _ __ | |_| __ ) _   _(_) | __| | ___ _ __
179// | |   | | |/ _ \ '_ \| __|  _ \| | | | | |/ _` |/ _ \ '__|
180// | |___| | |  __/ | | | |_| |_) | |_| | | | (_| |  __/ |
181//  \____|_|_|\___|_| |_|\__|____/ \__,_|_|_|\__,_|\___|_|
182
183/// Used to setup a new connection to a MQTT Broker
184pub struct ClientBuilder
185{
186  mqttoptions: rumqttc::v5::MqttOptions,
187  cap: usize,
188}
189
190impl ClientBuilder
191{
192  /// Set the capacity of the reception buffer
193  pub fn set_capacity(mut self, cap: usize) -> Self
194  {
195    self.cap = cap;
196    self
197  }
198
199  async fn handle_events(
200    event: std::result::Result<rumqttc::v5::Event, rumqttc::v5::ConnectionError>,
201    client_inner: &Arc<futures::lock::Mutex<ClientInner>>,
202  ) -> std::result::Result<(), InternalError>
203  {
204    match event?
205    {
206      rumqttc::v5::Event::Incoming(packet) => match packet
207      {
208        rumqttc::v5::Incoming::Publish(pub_msg) =>
209        {
210          let topic_str = std::str::from_utf8(pub_msg.topic.as_ref())?;
211          #[cfg(feature = "statistics")]
212          let mut client_inner = client_inner.lock().await;
213          #[cfg(feature = "statistics")]
214          {
215            client_inner.statistics.received_messages += 1;
216            if pub_msg.dup
217            {
218              client_inner.statistics.received_duplicated_messages += 1;
219            }
220          }
221          #[cfg(not(feature = "statistics"))]
222          let client_inner = client_inner.lock().await;
223
224          match client_inner.senders.get(topic_str)
225          {
226            Some(s) =>
227            {
228              s.broadcast(RawMessage {
229                payload: pub_msg.payload,
230                properties: pub_msg.properties,
231              })
232              .await?;
233            }
234            None => Err(InternalError::NoChannelFor(topic_str.to_string()))?,
235          }
236        }
237        rumqttc::v5::Incoming::ConnAck(_) =>
238        {
239          #[cfg(feature = "statistics")]
240          {
241            client_inner.lock().await.statistics.connection_ack += 1;
242          }
243        }
244        rumqttc::v5::Incoming::SubAck(_) =>
245        {
246          #[cfg(feature = "statistics")]
247          {
248            client_inner.lock().await.statistics.subscription_ack += 1;
249          }
250        }
251        rumqttc::v5::Incoming::PubAck(_) =>
252        {
253          #[cfg(feature = "statistics")]
254          {
255            client_inner.lock().await.statistics.publish_ack += 1;
256          }
257        }
258        rumqttc::v5::Incoming::PubRel(_) =>
259        {
260          #[cfg(feature = "statistics")]
261          {
262            client_inner.lock().await.statistics.publish_release += 1;
263          }
264        }
265        rumqttc::v5::Incoming::PubRec(_) =>
266        {
267          #[cfg(feature = "statistics")]
268          {
269            client_inner.lock().await.statistics.publish_received += 1;
270          }
271        }
272        rumqttc::v5::Incoming::PubComp(_) =>
273        {
274          #[cfg(feature = "statistics")]
275          {
276            client_inner.lock().await.statistics.publish_complete += 1;
277          }
278        }
279        rumqttc::v5::Incoming::PingResp(_) =>
280        {
281          #[cfg(feature = "statistics")]
282          {
283            client_inner.lock().await.statistics.ping_response += 1;
284          }
285        }
286        _ =>
287        {
288          log::error!("Incoming unhandled packet {packet:?}");
289        }
290      },
291      rumqttc::v5::Event::Outgoing(_) =>
292      { /* ignore outgoing event */ }
293    }
294    #[cfg(feature = "debug")]
295    {
296      let mut client_inner = client_inner.lock().await;
297
298      if client_inner.statistics.received_messages
299        > client_inner.last_logged_statistics.received_messages + 1000
300      {
301        client_inner.last_logged_statistics = client_inner.statistics.clone();
302        log::info!(
303          "RECVMSG {} (DUP {}) PUBMSG {} CONACK {} PUBACK {} PUBREL {} PUBREC {} PUBCOMP {} PINGRESP {}",
304          client_inner.last_logged_statistics.received_messages,
305          client_inner.last_logged_statistics.received_duplicated_messages,
306          client_inner.last_logged_statistics.published_messages,
307          client_inner.last_logged_statistics.connection_ack,
308          client_inner.last_logged_statistics.publish_ack,
309          client_inner.last_logged_statistics.publish_release,
310          client_inner.last_logged_statistics.publish_received,
311          client_inner.last_logged_statistics.publish_complete,
312          client_inner.last_logged_statistics.ping_response
313        );
314      }
315    }
316
317    Ok(())
318  }
319  /// Start the connection
320  pub fn start(self) -> (Client, impl std::future::Future<Output = ()>)
321  {
322    let (client, mut eventloop) = rumqttc::v5::AsyncClient::new(self.mqttoptions, self.cap);
323
324    let (request_sender, mut request_receiver) = futures::channel::mpsc::channel(self.cap);
325
326    let fut = async move {
327      let client_inner: Arc<futures::lock::Mutex<ClientInner>> = Default::default();
328      let fut_mqtt = {
329        let client_inner = client_inner.clone();
330        async move {
331          loop
332          {
333            if let Err(e) = Self::handle_events(eventloop.poll().await, &client_inner).await
334            {
335              log::error!("An error occured when handling MQTT event: {e:?}");
336            }
337          }
338        }
339      };
340      let fut_req = async move {
341        loop
342        {
343          let r = match request_receiver.next().await
344          {
345            Some(Request::Subscribe { topic, sender, qos }) =>
346            {
347              client_inner
348                .lock()
349                .await
350                .senders
351                .insert(topic.clone(), sender);
352              client.subscribe(topic, qos).await
353            }
354            Some(Request::Publish {
355              topic,
356              qos,
357              retain,
358              payload,
359              properties,
360            }) =>
361            {
362              #[cfg(feature = "statistics")]
363              {
364                client_inner.lock().await.statistics.published_messages += 1;
365              }
366              match properties
367              {
368                Some(properties) =>
369                {
370                  client
371                    .publish_with_properties(topic, qos, retain, payload, properties)
372                    .await
373                }
374                None => client.publish(topic, qos, retain, payload).await,
375              }
376            }
377            None => Ok(()),
378          };
379          if let Err(e) = r
380          {
381            log::error!("An error occured when evaluating requests: {e:?}");
382          }
383        }
384      };
385      futures::join!(fut_mqtt, fut_req);
386    };
387
388    (
389      Client {
390        request_sender,
391        subscriptions: Default::default(),
392      },
393      fut,
394    )
395  }
396}
397
398//   ____ _ _            _
399//  / ___| (_) ___ _ __ | |_
400// | |   | | |/ _ \ '_ \| __|
401// | |___| | |  __/ | | | |_
402//  \____|_|_|\___|_| |_|\__|
403
404/// Handle to the connection
405#[derive(Clone)]
406pub struct Client
407{
408  subscriptions: std::sync::Arc<
409    futures::lock::Mutex<HashMap<String, async_broadcast::InactiveReceiver<RawMessage>>>,
410  >,
411  request_sender: futures::channel::mpsc::Sender<Request>,
412}
413
414macro_rules! _create_raw_subscription {
415  ($request_sender: expr, $subs: ident, $topic: ident, $qos: ident, $cap: ident) => {{
416    let (sender, r) = async_broadcast::broadcast::<RawMessage>($cap);
417    $subs.insert($topic.clone(), r.clone().deactivate());
418    $request_sender
419      .send(Request::Subscribe {
420        topic: $topic,
421        sender,
422        qos: $qos,
423      })
424      .await?;
425    Ok(r)
426  }};
427}
428
429impl Client
430{
431  /// Create a new MQTT Client, with the given name to connect at the given and MQTT broker
432  pub fn build(node_id: impl Into<String>, hostname: impl Into<String>, port: u16)
433    -> ClientBuilder
434  {
435    let mut mqttoptions = rumqttc::v5::MqttOptions::new(node_id, hostname, port);
436    mqttoptions.set_keep_alive(std::time::Duration::from_secs(5));
437
438    ClientBuilder {
439      mqttoptions,
440      cap: 1000,
441    }
442  }
443  /// Create a new MQTT Client from the given url of the form `mqtt://example.com:1883?client_id=123`
444  pub fn build_from_url(url: impl Into<String>) -> Result<ClientBuilder>
445  {
446    let mut mqttoptions = rumqttc::v5::MqttOptions::parse_url(url)?;
447    mqttoptions.set_keep_alive(std::time::Duration::from_secs(5));
448
449    Ok(ClientBuilder {
450      mqttoptions,
451      cap: 1000,
452    })
453  }
454  /// Publish a message on a topic
455  pub async fn publish_raw(
456    mut self,
457    topic: impl Into<String>,
458    qos: QoS,
459    retain: bool,
460    payload: impl Into<bytes::Bytes>,
461    properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
462  ) -> Result<()>
463  {
464    self
465      .request_sender
466      .send(Request::Publish {
467        topic: topic.into(),
468        qos,
469        retain,
470        payload: payload.into(),
471        properties,
472      })
473      .await?;
474    Ok(())
475  }
476  /// Publish a message on a topic, serialized to JSON
477  #[cfg(feature = "json")]
478  pub fn publish_json<T: serde::Serialize>(
479    self,
480    topic: impl Into<String>,
481    qos: QoS,
482    retain: bool,
483    payload: T,
484    properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
485  ) -> Result<impl std::future::Future<Output = Result<()>>>
486  {
487    let properties = match properties
488    {
489      None => rumqttc::v5::mqttbytes::v5::PublishProperties {
490        content_type: Some("application/json").map(str::to_string),
491        ..Default::default()
492      },
493      Some(props) =>
494      {
495        let mut props = props.clone();
496        props.content_type = Some("application/json".to_string());
497        props
498      }
499    };
500    let payload_raw = serde_json::to_string(&payload)?;
501
502    Ok(self.publish_raw(topic, qos, retain, payload_raw, Some(properties)))
503  }
504  /// Publish a message on a topic, serialized to JSON
505  #[cfg(feature = "cbor")]
506  pub fn publish_cbor<T: serde::Serialize>(
507    self,
508    topic: impl Into<String>,
509    qos: QoS,
510    retain: bool,
511    payload: T,
512    properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
513  ) -> Result<impl std::future::Future<Output = Result<()>>>
514  {
515    let properties = match properties
516    {
517      None => rumqttc::v5::mqttbytes::v5::PublishProperties {
518        content_type: Some("application/cbor").map(str::to_string),
519        ..Default::default()
520      },
521      Some(props) =>
522      {
523        let mut props = props.clone();
524        props.content_type = Some("application/cbor".to_string());
525        props
526      }
527    };
528    let mut payload_raw = Vec::<u8>::new();
529    ciborium::into_writer(&payload, &mut payload_raw)?;
530
531    Ok(self.publish_raw(topic, qos, retain, payload_raw, Some(properties)))
532  }
533  /// Create a raw subscription
534  pub async fn create_raw_subscription(
535    mut self,
536    topic: impl Into<String>,
537    qos: QoS,
538    cap: usize,
539  ) -> Result<async_broadcast::Receiver<RawMessage>>
540  {
541    let mut subs = self.subscriptions.lock().await;
542    let topic = topic.into();
543    match subs.get(&topic)
544    {
545      Some(_) => Err(Error::AlreadySubscribed(topic)),
546      None => _create_raw_subscription!(self.request_sender, subs, topic, qos, cap),
547    }
548  }
549  /// Get an existing subscription
550  pub fn get_raw_subscription(
551    &self,
552    topic: impl Into<String>,
553  ) -> Result<async_broadcast::Receiver<RawMessage>>
554  {
555    let topic = topic.into();
556    futures::executor::block_on(self.subscriptions.lock())
557      .get(&topic)
558      .ok_or_else(|| Error::NotSubscribed(topic))
559      .map(|x| x.activate_cloned())
560  }
561  /// Get an existing subscription, or create a new one with the given QoS
562  /// This function is guaranteed to return, however if the subscription
563  /// already exists, the QoS is ignored.
564  pub async fn get_or_create_raw_subscription(
565    mut self,
566    topic: impl Into<String>,
567    qos: QoS,
568    cap: usize,
569  ) -> Result<async_broadcast::Receiver<RawMessage>>
570  {
571    let mut subs = self.subscriptions.lock().await;
572    let topic = topic.into();
573    match subs.get(&topic)
574    {
575      Some(s) => Ok(s.activate_cloned()),
576      None => _create_raw_subscription!(self.request_sender, subs, topic, qos, cap),
577    }
578  }
579  /// Create a JSON subscription, aka, topic will be parsed using serde_json.
580  #[cfg(feature = "json")]
581  pub async fn get_or_create_json_subscription<T>(
582    self,
583    topic: impl Into<String>,
584    qos: QoS,
585    cap: usize,
586  ) -> Result<impl futures::Stream<Item = Message<T>>>
587  where
588    for<'de> T: serde::Deserialize<'de>,
589  {
590    use bytes::Buf;
591    let raw_s = self.get_or_create_raw_subscription(topic, qos, cap).await?;
592    let map_s = raw_s.filter_map(|msg| async move {
593      let payload_r = serde_json::from_slice::<T>(msg.payload.chunk());
594      match payload_r
595      {
596        Ok(payload) => Some(Message::<T> {
597          payload,
598          properties: msg.properties,
599        }),
600        Err(e) =>
601        {
602          log::error!("Failed to parse JSON message: {e}");
603          None
604        }
605      }
606    });
607    Ok(map_s)
608  }
609}
610
611#[cfg(test)]
612mod tests
613{
614  #[test]
615  fn it_works()
616  {
617    assert_eq!(4, 4);
618  }
619}