1
2
3use embassy_sync::{blocking_mutex::raw::RawMutex, pubsub::DynSubscriber};
4use embedded_nal_async::{Dns, TcpConnect};
5use mqttrs2::QoS;
6
7use crate::{MqttError, MqttEvent, UniqueID, state::{State, connection::ConnectionState, receives2::ReceivedPublish}};
8
9#[derive(Clone)]
11pub struct MqttClient<'a, 'n, 'l, M: RawMutex, NET, DNS, const BUFFER: usize, const TOPIC: usize, const QUEUE: usize>
12where NET: TcpConnect, DNS: Dns {
13 state: &'a State<'n, 'l, M, NET, DNS, BUFFER, TOPIC, QUEUE>
14}
15
16impl <'a, 'n, 'l, M: RawMutex, NET, DNS, const BUFFER: usize, const TOPIC: usize, const QUEUE: usize> MqttClient<'a, 'n, 'l, M, NET, DNS, BUFFER, TOPIC, QUEUE>
17where NET: TcpConnect, DNS: Dns {
18
19 pub(crate) fn new(state: &'a State<'n, 'l, M, NET, DNS, BUFFER, TOPIC, QUEUE>) -> Self {
20 Self {
21 state
22 }
23 }
24
25 pub async fn on_auto_subscribes_done(&self) {
26 self.state.connection_state.await_connected().await;
27 }
28
29 async fn await_event<F, U>(mut subscriber: DynSubscriber<'_, MqttEvent>, f: F) -> U
30 where F: Fn(MqttEvent) -> Option<U> {
31 loop {
32 let event = subscriber.next_message_pure().await;
33 if let Some(result) = f(event) {
34 return result;
35 }
36 }
37 }
38
39 pub async fn publish(&self, topic: &str, payload: &[u8], qos: QoS, retain: bool) -> Result<(), MqttError> {
44 let unique_id = UniqueID::new();
45 let subscriber = self.state.subscribe_events()?;
46
47 self.state.publish(topic, payload, qos, retain, unique_id).await?;
48
49 Self::await_event(subscriber, |event | match event {
50 MqttEvent::PublishDone(id) if id == unique_id => Some(()),
51 _ => None
52 }).await;
53
54 unique_id.free();
55
56 Ok(())
57 }
58
59 pub async fn subscribe(&self, topic: &str, qos: QoS) -> Result<(), MqttError> {
63 let unique_id = UniqueID::new();
64 let subscriber = self.state.subscribe_events()?;
65
66 self.state.subscribe(&[topic], qos, unique_id).await;
67
68 let result = Self::await_event(subscriber, |event | match event {
69 MqttEvent::SubscribeDone(id, result) if id == unique_id => Some(result),
70 _ => None
71 }).await;
72
73 unique_id.free();
74
75 result?;
76
77 Ok(())
78 }
79
80 pub async fn unsubscribe(&self, topic: &str) -> Result<(), MqttError> {
84 let unique_id = UniqueID::new();
85 let subscriber = self.state.subscribe_events()?;
86
87 self.state.unsubscribe(&[topic], unique_id).await;
88
89 Self::await_event(subscriber, |event | match event {
90 MqttEvent::UnsubscribeDone(id) if id == unique_id => Some(()),
91 _ => None
92 }).await;
93
94 unique_id.free();
95
96 Ok(())
97 }
98
99 pub fn disconnect(&self) {
101 self.state.disconnect();
102 }
103
104 pub fn subscribe_received_publishes(&self) -> Result<DynSubscriber<'_, ReceivedPublish<BUFFER, TOPIC>>, MqttError> {
106 self.state.subscribe_received_publishes()
107 }
108
109}