1
2use embassy_sync::{blocking_mutex::raw::RawMutex, channel::{Receiver, Sender}, pubsub::{PubSubChannel, WaitResult}};
3use mqttrs2::QoS;
4
5use crate::{MqttError, MqttEvent, MqttPublish, MqttRequest, Topic, UniqueID};
6
7#[derive(Clone)]
8pub struct MqttClient<'a, M: RawMutex> {
9
10 pub(super) control_reveiver: &'a PubSubChannel<M, MqttEvent, 4, 16, 8>,
11 pub(super) request_sender: Sender<'a, M, MqttRequest, 4>,
12 pub(super) received_publishes: Receiver<'a, M, MqttPublish, 4>
13
14}
15
16impl <'a, M: RawMutex> MqttClient<'a, M> {
17
18 pub async fn publish(&self, topic: &str, payload: &[u8], qos: QoS, retain: bool) -> Result<(), MqttError> {
19
20 let id = UniqueID::new();
21 let publish = MqttPublish::new(topic, payload, qos, retain);
22
23 let mut subscriber = self.control_reveiver.subscriber()
24 .map_err(|e| {
25 error!("error subscribing to control receiver: {}", e);
26 MqttError::InternalError
27 })?;
28
29 self.request_sender.send(MqttRequest::Publish(publish, id)).await;
30
31 loop {
32 let msg = subscriber.next_message().await;
33 if let WaitResult::Message(msg) = msg {
34 if let MqttEvent::PublishResult(msg_id, result) = msg {
35 if id == msg_id {
36 return result;
37 }
38 }
39 } else {
40 error!("error reading subscrition: lost messages");
41 return Err(MqttError::InternalError);
42 }
43 }
44 }
45
46 pub async fn subscribe(&self, topic: &str) -> Result<(), MqttError> {
47 let id = UniqueID::new();
48
49 let mut subscriber = self.control_reveiver.subscriber()
50 .map_err(|e| {
51 error!("error subscribing to control receiver: {}", e);
52 MqttError::InternalError
53 })?;
54
55 let mut topic_owned = Topic::new();
56 topic_owned.push_str(topic).unwrap();
57 self.request_sender.send(MqttRequest::Subscribe(topic_owned, id)).await;
58
59 loop {
60 let msg = subscriber.next_message().await;
61 if let WaitResult::Message(msg) = msg {
62 if let MqttEvent::SubscribeResult(msg_id, result) = msg {
63 if id == msg_id {
64 return result.map(|_| ());
65 }
66 }
67 } else {
68 error!("error reading subscrition: lost messages");
69 return Err(MqttError::InternalError);
70 }
71 }
72 }
73
74 pub async fn unsubscribe(&self, topic: &str) -> Result<(), MqttError> {
75 let id = UniqueID::new();
76
77 let mut subscriber = self.control_reveiver.subscriber()
78 .map_err(|e| {
79 error!("error subscribing to control receiver: {}", e);
80 MqttError::InternalError
81 })?;
82
83 let mut topic_owned = Topic::new();
84 topic_owned.push_str(topic).unwrap();
85 self.request_sender.send(MqttRequest::Unsubscribe(topic_owned, id)).await;
86
87 loop {
88 let msg = subscriber.next_message().await;
89 if let WaitResult::Message(msg) = msg {
90 if let MqttEvent::UnsubscribeResult(msg_id, result) = msg {
91 if id == msg_id {
92 return result;
93 }
94 }
95 } else {
96 error!("error reading subscrition: lost messages");
97 return Err(MqttError::InternalError);
98 }
99 }
100 }
101
102 pub async fn receive(&self) -> MqttPublish {
103 self.received_publishes.receive().await
104 }
105
106 pub async fn disconnect(&self) {
107 self.request_sender.send(MqttRequest::Disconnect).await;
108 }
109
110 pub async fn on<F>(&self, matcher: F) where F: Fn(&MqttEvent) -> bool {
111 let mut sub = self.control_reveiver.subscriber().unwrap();
112
113 loop {
114 let event = sub.next_message_pure().await;
115 if matcher(&event) {
116 break;
117 }
118 }
119 }
120
121}