embedded_mqttc/
client.rs

1
2use embassy_sync::{blocking_mutex::raw::RawMutex, channel::{Receiver, Sender}, pubsub::{PubSubChannel, WaitResult}};
3use mqttrs2::QoS;
4
5use crate::{MqttError, MqttEvent, MqttPublish, MqttRequest, Topic, UniqueID};
6
7#[derive(Clone)]
8pub struct MqttClient<'a, M: RawMutex> {
9
10    pub(super) control_reveiver: &'a PubSubChannel<M, MqttEvent, 4, 16, 8>,
11    pub(super) request_sender: Sender<'a, M, MqttRequest, 4>,
12    pub(super) received_publishes: Receiver<'a, M, MqttPublish, 4>
13
14}
15
16impl <'a, M: RawMutex> MqttClient<'a, M> {
17
18    pub async fn publish(&self, topic: &str, payload: &[u8], qos: QoS, retain: bool) -> Result<(), MqttError> {
19
20        let id = UniqueID::new();
21        let publish = MqttPublish::new(topic, payload, qos, retain);
22
23        let mut subscriber = self.control_reveiver.subscriber()
24            .map_err(|e| {
25                error!("error subscribing to control receiver: {}", e);
26                MqttError::InternalError
27            })?;
28
29        self.request_sender.send(MqttRequest::Publish(publish, id)).await;
30
31        loop {
32            let msg = subscriber.next_message().await;
33            if let WaitResult::Message(msg) = msg {
34                if let MqttEvent::PublishResult(msg_id, result) = msg {
35                    if id == msg_id {
36                        return result;
37                    }
38                }
39            } else {
40                error!("error reading subscrition: lost messages");
41                return Err(MqttError::InternalError);
42            }
43        }
44    }
45
46    pub async fn subscribe(&self, topic: &str) -> Result<(), MqttError> {
47        let id = UniqueID::new();
48
49        let mut subscriber = self.control_reveiver.subscriber()
50            .map_err(|e| {
51                error!("error subscribing to control receiver: {}", e);
52                MqttError::InternalError
53            })?;
54
55        let mut topic_owned = Topic::new();
56        topic_owned.push_str(topic).unwrap();
57        self.request_sender.send(MqttRequest::Subscribe(topic_owned, id)).await;
58
59        loop {
60            let msg = subscriber.next_message().await;
61            if let WaitResult::Message(msg) = msg {
62                if let MqttEvent::SubscribeResult(msg_id, result) = msg {
63                    if id == msg_id {
64                        return result.map(|_| ());
65                    }
66                }
67            } else {
68                error!("error reading subscrition: lost messages");
69                return Err(MqttError::InternalError);
70            }
71        }
72    }
73
74    pub async fn unsubscribe(&self, topic: &str) -> Result<(), MqttError> {
75        let id = UniqueID::new();
76
77        let mut subscriber = self.control_reveiver.subscriber()
78            .map_err(|e| {
79                error!("error subscribing to control receiver: {}", e);
80                MqttError::InternalError
81            })?;
82
83        let mut topic_owned = Topic::new();
84        topic_owned.push_str(topic).unwrap();
85        self.request_sender.send(MqttRequest::Unsubscribe(topic_owned, id)).await;
86
87        loop {
88            let msg = subscriber.next_message().await;
89            if let WaitResult::Message(msg) = msg {
90                if let MqttEvent::UnsubscribeResult(msg_id, result) = msg {
91                    if id == msg_id {
92                        return result;
93                    }
94                }
95            } else {
96                error!("error reading subscrition: lost messages");
97                return Err(MqttError::InternalError);
98            }
99        }
100    }
101
102    pub async fn receive(&self) -> MqttPublish {
103        self.received_publishes.receive().await
104    }
105
106    pub async fn disconnect(&self) {
107        self.request_sender.send(MqttRequest::Disconnect).await;
108    }
109
110    pub async fn on<F>(&self, matcher: F) where F: Fn(&MqttEvent) -> bool {
111        let mut sub = self.control_reveiver.subscriber().unwrap();
112
113        loop {
114            let event = sub.next_message_pure().await;
115            if matcher(&event) {
116                break;
117            }
118        }
119    }
120
121}