embedded_mqttc/
client.rs

1
2use embassy_sync::{blocking_mutex::raw::RawMutex, channel::{Receiver, Sender}, pubsub::{PubSubChannel, WaitResult}};
3use mqttrs2::QoS;
4
5use crate::{MqttError, MqttEvent, MqttPublish, MqttRequest, Topic, UniqueID};
6
7/// The MQTT Client to publish messages, subscribe, unsubscribe and receive messages
8#[derive(Clone)]
9pub struct MqttClient<'a, M: RawMutex> {
10
11    pub(super) control_reveiver: &'a PubSubChannel<M, MqttEvent, 4, 16, 8>,
12    pub(super) request_sender: Sender<'a, M, MqttRequest, 4>,
13    pub(super) received_publishes: Receiver<'a, M, MqttPublish, 4>
14
15}
16
17impl <'a, M: RawMutex> MqttClient<'a, M> {
18
19    /// Publish a MQTT message with the given parameters
20    /// 
21    /// Waits until there is a successful publish result. The publish is successful after all acknolodgements 
22    /// accordings to the selected [`QoS`] have bee exchanged
23    pub async fn publish(&self, topic: &str, payload: &[u8], qos: QoS, retain: bool) -> Result<(), MqttError> {
24
25        let id = UniqueID::new();
26        let publish = MqttPublish::new(topic, payload, qos, retain);
27
28        let mut subscriber = self.control_reveiver.subscriber()
29            .map_err(|e| {
30                error!("error subscribing to control receiver: {}", e);
31                MqttError::InternalError
32            })?;
33
34        self.request_sender.send(MqttRequest::Publish(publish, id)).await;
35
36        loop {
37            let msg = subscriber.next_message().await;
38            if let WaitResult::Message(msg) = msg {
39                if let MqttEvent::PublishResult(msg_id, result) = msg {
40                    if id == msg_id {
41                        return result;
42                    }
43                }
44            } else {
45                error!("error reading subscrition: lost messages");
46                return Err(MqttError::InternalError);
47            }
48        }
49    }
50
51    /// Subscribe to a topic
52    /// 
53    /// The method returns after the suback has bee received
54    pub async fn subscribe(&self, topic: &str) -> Result<(), MqttError> {
55        let id = UniqueID::new();
56
57        let mut subscriber = self.control_reveiver.subscriber()
58            .map_err(|e| {
59                error!("error subscribing to control receiver: {}", e);
60                MqttError::InternalError
61            })?;
62
63        let mut topic_owned = Topic::new();
64        topic_owned.push_str(topic).unwrap();
65        self.request_sender.send(MqttRequest::Subscribe(topic_owned, id)).await;
66
67        loop {
68            let msg = subscriber.next_message().await;
69            if let WaitResult::Message(msg) = msg {
70                if let MqttEvent::SubscribeResult(msg_id, result) = msg {
71                    if id == msg_id {
72                        return result.map(|_| ());
73                    }
74                }
75            } else {
76                error!("error reading subscrition: lost messages");
77                return Err(MqttError::InternalError);
78            }
79        }
80    }
81
82    /// unsubscribe from a topic
83    /// 
84    /// waits until the unsuback has bee received
85    pub async fn unsubscribe(&self, topic: &str) -> Result<(), MqttError> {
86        let id = UniqueID::new();
87
88        let mut subscriber = self.control_reveiver.subscriber()
89            .map_err(|e| {
90                error!("error subscribing to control receiver: {}", e);
91                MqttError::InternalError
92            })?;
93
94        let mut topic_owned = Topic::new();
95        topic_owned.push_str(topic).unwrap();
96        self.request_sender.send(MqttRequest::Unsubscribe(topic_owned, id)).await;
97
98        loop {
99            let msg = subscriber.next_message().await;
100            if let WaitResult::Message(msg) = msg {
101                if let MqttEvent::UnsubscribeResult(msg_id, result) = msg {
102                    if id == msg_id {
103                        return result;
104                    }
105                }
106            } else {
107                error!("error reading subscrition: lost messages");
108                return Err(MqttError::InternalError);
109            }
110        }
111    }
112
113    /// Waits for the next message
114    pub async fn receive(&self) -> MqttPublish {
115        self.received_publishes.receive().await
116    }
117
118    /// send a disconnect packet to the broker
119    pub async fn disconnect(&self) {
120        self.request_sender.send(MqttRequest::Disconnect).await;
121    }
122
123    /// wait for the next event matching the `matcher`
124    pub async fn on<F>(&self, matcher: F) where F: Fn(&MqttEvent) -> bool {
125        let mut sub = self.control_reveiver.subscriber().unwrap();
126
127        loop {
128            let event = sub.next_message_pure().await;
129            if matcher(&event) {
130                break;
131            }
132        }
133    }
134
135}