1
2use embassy_sync::{blocking_mutex::raw::RawMutex, channel::{Receiver, Sender}, pubsub::{PubSubChannel, WaitResult}};
3use mqttrs2::QoS;
4
5use crate::{MqttError, MqttEvent, MqttPublish, MqttRequest, Topic, UniqueID};
6
7#[derive(Clone)]
9pub struct MqttClient<'a, M: RawMutex> {
10
11 pub(super) control_reveiver: &'a PubSubChannel<M, MqttEvent, 4, 16, 8>,
12 pub(super) request_sender: Sender<'a, M, MqttRequest, 4>,
13 pub(super) received_publishes: Receiver<'a, M, MqttPublish, 4>
14
15}
16
17impl <'a, M: RawMutex> MqttClient<'a, M> {
18
19 pub async fn publish(&self, topic: &str, payload: &[u8], qos: QoS, retain: bool) -> Result<(), MqttError> {
24
25 let id = UniqueID::new();
26 let publish = MqttPublish::new(topic, payload, qos, retain);
27
28 let mut subscriber = self.control_reveiver.subscriber()
29 .map_err(|e| {
30 error!("error subscribing to control receiver: {}", e);
31 MqttError::InternalError
32 })?;
33
34 self.request_sender.send(MqttRequest::Publish(publish, id)).await;
35
36 loop {
37 let msg = subscriber.next_message().await;
38 if let WaitResult::Message(msg) = msg {
39 if let MqttEvent::PublishResult(msg_id, result) = msg {
40 if id == msg_id {
41 return result;
42 }
43 }
44 } else {
45 error!("error reading subscrition: lost messages");
46 return Err(MqttError::InternalError);
47 }
48 }
49 }
50
51 pub async fn subscribe(&self, topic: &str) -> Result<(), MqttError> {
55 let id = UniqueID::new();
56
57 let mut subscriber = self.control_reveiver.subscriber()
58 .map_err(|e| {
59 error!("error subscribing to control receiver: {}", e);
60 MqttError::InternalError
61 })?;
62
63 let mut topic_owned = Topic::new();
64 topic_owned.push_str(topic).unwrap();
65 self.request_sender.send(MqttRequest::Subscribe(topic_owned, id)).await;
66
67 loop {
68 let msg = subscriber.next_message().await;
69 if let WaitResult::Message(msg) = msg {
70 if let MqttEvent::SubscribeResult(msg_id, result) = msg {
71 if id == msg_id {
72 return result.map(|_| ());
73 }
74 }
75 } else {
76 error!("error reading subscrition: lost messages");
77 return Err(MqttError::InternalError);
78 }
79 }
80 }
81
82 pub async fn unsubscribe(&self, topic: &str) -> Result<(), MqttError> {
86 let id = UniqueID::new();
87
88 let mut subscriber = self.control_reveiver.subscriber()
89 .map_err(|e| {
90 error!("error subscribing to control receiver: {}", e);
91 MqttError::InternalError
92 })?;
93
94 let mut topic_owned = Topic::new();
95 topic_owned.push_str(topic).unwrap();
96 self.request_sender.send(MqttRequest::Unsubscribe(topic_owned, id)).await;
97
98 loop {
99 let msg = subscriber.next_message().await;
100 if let WaitResult::Message(msg) = msg {
101 if let MqttEvent::UnsubscribeResult(msg_id, result) = msg {
102 if id == msg_id {
103 return result;
104 }
105 }
106 } else {
107 error!("error reading subscrition: lost messages");
108 return Err(MqttError::InternalError);
109 }
110 }
111 }
112
113 pub async fn receive(&self) -> MqttPublish {
115 self.received_publishes.receive().await
116 }
117
118 pub async fn disconnect(&self) {
120 self.request_sender.send(MqttRequest::Disconnect).await;
121 }
122
123 pub async fn on<F>(&self, matcher: F) where F: Fn(&MqttEvent) -> bool {
125 let mut sub = self.control_reveiver.subscriber().unwrap();
126
127 loop {
128 let event = sub.next_message_pure().await;
129 if matcher(&event) {
130 break;
131 }
132 }
133 }
134
135}