1#![warn(missing_docs)]
4
5use futures::{SinkExt, StreamExt};
6use std::{collections::HashMap, sync::Arc};
7
8pub use rumqttc::v5::mqttbytes::v5 as mqttbytes;
10pub use rumqttc::v5::mqttbytes::QoS;
11
12pub type Result<T> = std::result::Result<T, Error>;
14
15#[cfg(feature = "statistics")]
24#[derive(Default, Clone)]
25pub struct Statistics
26{
27 pub received_messages: u64,
29 pub received_duplicated_messages: u64,
31 pub published_messages: u64,
33 pub connection_ack: u64,
35 pub subscription_ack: u64,
37 pub publish_ack: u64,
39 pub publish_release: u64,
41 pub publish_received: u64,
43 pub publish_complete: u64,
45 pub ping_response: u64,
47}
48
49#[derive(thiserror::Error, Debug)]
56#[allow(missing_docs)]
57#[non_exhaustive]
58pub enum Error
59{
60 #[error("Already subscribed to topic {0}.")]
61 AlreadySubscribed(String),
62 #[error("Not subscribed to topic {0}.")]
63 NotSubscribed(String),
64 #[error("An internal error occured when sending a request {0}.")]
65 RequestChannel(#[from] futures::channel::mpsc::SendError),
66 #[error("Could not lock a mutex: {0}.")]
67 MutexPoison(String),
68 #[error("An error occured in the communication middleware {0}.")]
69 Communication(#[from] Box<rumqttc::v5::ClientError>),
70 #[error("MQTT option error {0}.")]
71 OptionError(#[from] rumqttc::v5::OptionError),
72 #[cfg(feature = "json")]
73 #[error("Error during json serialization {0}")]
74 JsonSerialisation(#[from] serde_json::Error),
75 #[cfg(feature = "cbor")]
76 #[error("Error during cbor serialization {0}")]
77 CborSerialisation(#[from] ciborium::ser::Error<std::io::Error>),
78 #[cfg(feature = "cbor")]
79 #[error("Error during cbor deserialization {0}")]
80 CborDeserialisation(#[from] ciborium::de::Error<std::io::Error>),
81 #[error("Missing properties")]
82 MissingProperties,
83 #[error("Missing content type")]
84 MissingContentType,
85 #[error("Unsupported content type: {0}")]
86 UnsupportedConentType(String),
87}
88
89impl<T> From<std::sync::PoisonError<std::sync::MutexGuard<'_, T>>> for Error
90{
91 fn from(value: std::sync::PoisonError<std::sync::MutexGuard<'_, T>>) -> Self
92 {
93 Error::MutexPoison(value.to_string())
94 }
95}
96
97#[derive(thiserror::Error, Debug)]
104enum InternalError
105{
106 #[error("No channel for topic {0}")]
107 NoChannelFor(String),
108 #[error("An error occured in the communication middleware {0}.")]
109 Communication(#[from] rumqttc::v5::ConnectionError),
110 #[error("Error when broadcasting message {0}")]
111 AsyncBroadcast(String),
112 #[error("UTF-8 error {0}")]
113 Utf8Error(#[from] std::str::Utf8Error),
114}
115
116impl<T> From<async_broadcast::SendError<T>> for InternalError
117{
118 fn from(value: async_broadcast::SendError<T>) -> Self
119 {
120 Self::AsyncBroadcast(value.to_string())
121 }
122}
123
124#[derive(Clone)]
133pub struct Message<T>
134{
135 pub payload: T,
137 pub properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
139}
140
141pub type RawMessage = Message<bytes::Bytes>;
143
144enum Request
153{
154 Subscribe
155 {
156 topic: String,
157 sender: async_broadcast::Sender<RawMessage>,
158 qos: QoS,
159 },
160 Publish
161 {
162 topic: String,
163 qos: QoS,
164 retain: bool,
165 payload: bytes::Bytes,
166 properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
167 },
168}
169
170#[derive(Default)]
177struct ClientInner
178{
179 senders: HashMap<String, async_broadcast::Sender<RawMessage>>,
180 #[cfg(feature = "statistics")]
181 statistics: Statistics,
182 #[cfg(feature = "debug")]
183 last_logged_statistics: Statistics,
184}
185
186pub struct ClientBuilder
194{
195 mqttoptions: rumqttc::v5::MqttOptions,
196 cap: usize,
197}
198
199impl ClientBuilder
200{
201 pub fn set_capacity(mut self, cap: usize) -> Self
203 {
204 self.cap = cap;
205 self
206 }
207
208 async fn handle_events(
209 event: std::result::Result<rumqttc::v5::Event, rumqttc::v5::ConnectionError>,
210 client_inner: &Arc<futures::lock::Mutex<ClientInner>>,
211 ) -> std::result::Result<(), InternalError>
212 {
213 match event?
214 {
215 rumqttc::v5::Event::Incoming(packet) => match packet
216 {
217 rumqttc::v5::Incoming::Publish(pub_msg) =>
218 {
219 let topic_str = std::str::from_utf8(pub_msg.topic.as_ref())?;
220 #[cfg(feature = "statistics")]
221 let mut client_inner = client_inner.lock().await;
222 #[cfg(feature = "statistics")]
223 {
224 client_inner.statistics.received_messages += 1;
225 if pub_msg.dup
226 {
227 client_inner.statistics.received_duplicated_messages += 1;
228 }
229 }
230 #[cfg(not(feature = "statistics"))]
231 let client_inner = client_inner.lock().await;
232
233 match client_inner.senders.get(topic_str)
234 {
235 Some(s) =>
236 {
237 s.broadcast(RawMessage {
238 payload: pub_msg.payload,
239 properties: pub_msg.properties,
240 })
241 .await?;
242 }
243 None => Err(InternalError::NoChannelFor(topic_str.to_string()))?,
244 }
245 }
246 rumqttc::v5::Incoming::ConnAck(_) =>
247 {
248 #[cfg(feature = "statistics")]
249 {
250 client_inner.lock().await.statistics.connection_ack += 1;
251 }
252 }
253 rumqttc::v5::Incoming::SubAck(_) =>
254 {
255 #[cfg(feature = "statistics")]
256 {
257 client_inner.lock().await.statistics.subscription_ack += 1;
258 }
259 }
260 rumqttc::v5::Incoming::PubAck(_) =>
261 {
262 #[cfg(feature = "statistics")]
263 {
264 client_inner.lock().await.statistics.publish_ack += 1;
265 }
266 }
267 rumqttc::v5::Incoming::PubRel(_) =>
268 {
269 #[cfg(feature = "statistics")]
270 {
271 client_inner.lock().await.statistics.publish_release += 1;
272 }
273 }
274 rumqttc::v5::Incoming::PubRec(_) =>
275 {
276 #[cfg(feature = "statistics")]
277 {
278 client_inner.lock().await.statistics.publish_received += 1;
279 }
280 }
281 rumqttc::v5::Incoming::PubComp(_) =>
282 {
283 #[cfg(feature = "statistics")]
284 {
285 client_inner.lock().await.statistics.publish_complete += 1;
286 }
287 }
288 rumqttc::v5::Incoming::PingResp(_) =>
289 {
290 #[cfg(feature = "statistics")]
291 {
292 client_inner.lock().await.statistics.ping_response += 1;
293 }
294 }
295 _ =>
296 {
297 log::error!("Incoming unhandled packet {packet:?}");
298 }
299 },
300 rumqttc::v5::Event::Outgoing(_) =>
301 { }
302 }
303 #[cfg(feature = "debug")]
304 {
305 let mut client_inner = client_inner.lock().await;
306
307 if client_inner.statistics.received_messages
308 > client_inner.last_logged_statistics.received_messages + 1000
309 {
310 client_inner.last_logged_statistics = client_inner.statistics.clone();
311 log::info!(
312 "RECVMSG {} (DUP {}) PUBMSG {} CONACK {} PUBACK {} PUBREL {} PUBREC {} PUBCOMP {} PINGRESP {}",
313 client_inner.last_logged_statistics.received_messages,
314 client_inner.last_logged_statistics.received_duplicated_messages,
315 client_inner.last_logged_statistics.published_messages,
316 client_inner.last_logged_statistics.connection_ack,
317 client_inner.last_logged_statistics.publish_ack,
318 client_inner.last_logged_statistics.publish_release,
319 client_inner.last_logged_statistics.publish_received,
320 client_inner.last_logged_statistics.publish_complete,
321 client_inner.last_logged_statistics.ping_response
322 );
323 }
324 }
325
326 Ok(())
327 }
328 pub fn start(self) -> (Client, impl std::future::Future<Output = ()>)
330 {
331 let (client, mut eventloop) = rumqttc::v5::AsyncClient::new(self.mqttoptions, self.cap);
332
333 let (request_sender, mut request_receiver) = futures::channel::mpsc::channel(self.cap);
334
335 let fut = async move {
336 let client_inner: Arc<futures::lock::Mutex<ClientInner>> = Default::default();
337 let fut_mqtt = {
338 let client_inner = client_inner.clone();
339 async move {
340 loop
341 {
342 if let Err(e) = Self::handle_events(eventloop.poll().await, &client_inner).await
343 {
344 log::error!("An error occured when handling MQTT event: {e:?}");
345 }
346 }
347 }
348 };
349 let fut_req = async move {
350 loop
351 {
352 let r = match request_receiver.next().await
353 {
354 Some(Request::Subscribe { topic, sender, qos }) =>
355 {
356 client_inner
357 .lock()
358 .await
359 .senders
360 .insert(topic.clone(), sender);
361 client.subscribe(topic, qos).await
362 }
363 Some(Request::Publish {
364 topic,
365 qos,
366 retain,
367 payload,
368 properties,
369 }) =>
370 {
371 #[cfg(feature = "statistics")]
372 {
373 client_inner.lock().await.statistics.published_messages += 1;
374 }
375 match properties
376 {
377 Some(properties) =>
378 {
379 client
380 .publish_with_properties(topic, qos, retain, payload, properties)
381 .await
382 }
383 None => client.publish(topic, qos, retain, payload).await,
384 }
385 }
386 None => Ok(()),
387 };
388 if let Err(e) = r
389 {
390 log::error!("An error occured when evaluating requests: {e:?}");
391 }
392 }
393 };
394 futures::join!(fut_mqtt, fut_req);
395 };
396
397 (
398 Client {
399 request_sender,
400 subscriptions: Default::default(),
401 },
402 fut,
403 )
404 }
405}
406
407#[derive(Clone)]
415pub struct Client
416{
417 subscriptions: std::sync::Arc<
418 futures::lock::Mutex<HashMap<String, async_broadcast::InactiveReceiver<RawMessage>>>,
419 >,
420 request_sender: futures::channel::mpsc::Sender<Request>,
421}
422
423macro_rules! _create_raw_subscription {
424 ($request_sender: expr, $subs: ident, $topic: ident, $qos: ident, $cap: ident) => {{
425 let (sender, r) = async_broadcast::broadcast::<RawMessage>($cap);
426 $subs.insert($topic.clone(), r.clone().deactivate());
427 $request_sender
428 .send(Request::Subscribe {
429 topic: $topic,
430 sender,
431 qos: $qos,
432 })
433 .await?;
434 Ok(r)
435 }};
436}
437
438impl Client
439{
440 pub fn build(node_id: impl Into<String>, hostname: impl Into<String>, port: u16)
442 -> ClientBuilder
443 {
444 let mut mqttoptions = rumqttc::v5::MqttOptions::new(node_id, hostname, port);
445 mqttoptions.set_keep_alive(std::time::Duration::from_secs(5));
446
447 ClientBuilder {
448 mqttoptions,
449 cap: 1000,
450 }
451 }
452 pub fn build_from_url(url: impl Into<String>) -> Result<ClientBuilder>
454 {
455 let mut mqttoptions = rumqttc::v5::MqttOptions::parse_url(url)?;
456 mqttoptions.set_keep_alive(std::time::Duration::from_secs(5));
457
458 Ok(ClientBuilder {
459 mqttoptions,
460 cap: 1000,
461 })
462 }
463 pub async fn publish_raw(
465 mut self,
466 topic: impl Into<String>,
467 qos: QoS,
468 retain: bool,
469 payload: impl Into<bytes::Bytes>,
470 properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
471 ) -> Result<()>
472 {
473 self
474 .request_sender
475 .send(Request::Publish {
476 topic: topic.into(),
477 qos,
478 retain,
479 payload: payload.into(),
480 properties,
481 })
482 .await?;
483 Ok(())
484 }
485 #[cfg(feature = "json")]
487 pub fn publish_json<T: serde::Serialize>(
488 self,
489 topic: impl Into<String>,
490 qos: QoS,
491 retain: bool,
492 payload: T,
493 properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
494 ) -> Result<impl std::future::Future<Output = Result<()>>>
495 {
496 let properties = match properties
497 {
498 None => rumqttc::v5::mqttbytes::v5::PublishProperties {
499 content_type: Some("application/json").map(str::to_string),
500 ..Default::default()
501 },
502 Some(props) =>
503 {
504 let mut props = props.clone();
505 props.content_type = Some("application/json".to_string());
506 props
507 }
508 };
509 let payload_raw = serde_json::to_string(&payload)?;
510
511 Ok(self.publish_raw(topic, qos, retain, payload_raw, Some(properties)))
512 }
513 #[cfg(feature = "cbor")]
515 pub fn publish_cbor<T: serde::Serialize>(
516 self,
517 topic: impl Into<String>,
518 qos: QoS,
519 retain: bool,
520 payload: T,
521 properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
522 ) -> Result<impl std::future::Future<Output = Result<()>>>
523 {
524 let properties = match properties
525 {
526 None => rumqttc::v5::mqttbytes::v5::PublishProperties {
527 content_type: Some("application/cbor").map(str::to_string),
528 ..Default::default()
529 },
530 Some(props) =>
531 {
532 let mut props = props.clone();
533 props.content_type = Some("application/cbor".to_string());
534 props
535 }
536 };
537 let mut payload_raw = Vec::<u8>::new();
538 ciborium::into_writer(&payload, &mut payload_raw)?;
539
540 Ok(self.publish_raw(topic, qos, retain, payload_raw, Some(properties)))
541 }
542 pub async fn create_raw_subscription(
544 mut self,
545 topic: impl Into<String>,
546 qos: QoS,
547 cap: usize,
548 ) -> Result<async_broadcast::Receiver<RawMessage>>
549 {
550 let mut subs = self.subscriptions.lock().await;
551 let topic = topic.into();
552 match subs.get(&topic)
553 {
554 Some(_) => Err(Error::AlreadySubscribed(topic)),
555 None => _create_raw_subscription!(self.request_sender, subs, topic, qos, cap),
556 }
557 }
558 pub fn get_raw_subscription(
560 &self,
561 topic: impl Into<String>,
562 ) -> Result<async_broadcast::Receiver<RawMessage>>
563 {
564 let topic = topic.into();
565 futures::executor::block_on(self.subscriptions.lock())
566 .get(&topic)
567 .ok_or_else(|| Error::NotSubscribed(topic))
568 .map(|x| x.activate_cloned())
569 }
570 pub async fn get_or_create_raw_subscription(
574 mut self,
575 topic: impl Into<String>,
576 qos: QoS,
577 cap: usize,
578 ) -> Result<async_broadcast::Receiver<RawMessage>>
579 {
580 let mut subs = self.subscriptions.lock().await;
581 let topic = topic.into();
582 match subs.get(&topic)
583 {
584 Some(s) => Ok(s.activate_cloned()),
585 None => _create_raw_subscription!(self.request_sender, subs, topic, qos, cap),
586 }
587 }
588 #[cfg(any(feature = "json", feature = "cbor"))]
590 pub async fn get_or_create_serde_subscription<T>(
591 self,
592 topic: impl Into<String>,
593 qos: QoS,
594 cap: usize,
595 ) -> Result<impl futures::Stream<Item = Result<Message<T>>>>
596 where
597 for<'de> T: serde::Deserialize<'de>,
598 {
599 use bytes::Buf;
600 let raw_s = self.get_or_create_raw_subscription(topic, qos, cap).await?;
601 let map_s = raw_s.map(|msg| match &msg.properties
602 {
603 Some(properties) => match properties.content_type.as_ref().map(|x| x.as_str())
604 {
605 #[cfg(feature = "json")]
606 Some("application/json") =>
607 {
608 let payload = serde_json::from_slice::<T>(msg.payload.chunk())?;
609 Ok(Message::<T> {
610 payload,
611 properties: msg.properties,
612 })
613 }
614 #[cfg(feature = "cbor")]
615 Some("application/cbor") =>
616 {
617 let payload = ciborium::from_reader(msg.payload.chunk())?;
618 Ok(Message::<T> {
619 payload,
620 properties: msg.properties,
621 })
622 }
623 Some(o) => Err(Error::UnsupportedConentType(o.to_string())),
624 None => Err(Error::MissingContentType),
625 },
626
627 None => Err(Error::MissingProperties),
628 });
629 Ok(map_s)
630 }
631 #[cfg(feature = "json")]
633 pub async fn get_or_create_json_subscription<T>(
634 self,
635 topic: impl Into<String>,
636 qos: QoS,
637 cap: usize,
638 ) -> Result<impl futures::Stream<Item = Message<T>>>
639 where
640 for<'de> T: serde::Deserialize<'de>,
641 {
642 use bytes::Buf;
643 let raw_s = self.get_or_create_raw_subscription(topic, qos, cap).await?;
644 let map_s = raw_s.filter_map(|msg| async move {
645 let payload_r = serde_json::from_slice::<T>(msg.payload.chunk());
646 match payload_r
647 {
648 Ok(payload) => Some(Message::<T> {
649 payload,
650 properties: msg.properties,
651 }),
652 Err(e) =>
653 {
654 log::error!("Failed to parse JSON message: {e}");
655 None
656 }
657 }
658 });
659 Ok(map_s)
660 }
661}
662
663#[cfg(test)]
664mod tests
665{
666 #[test]
667 fn it_works()
668 {
669 assert_eq!(4, 4);
670 }
671}