embedded_mqttc/
client.rs

1
2
3use embassy_sync::{blocking_mutex::raw::RawMutex, pubsub::DynSubscriber};
4use embedded_nal_async::{Dns, TcpConnect};
5use mqttrs2::QoS;
6
7use crate::{MqttError, MqttEvent, UniqueID, state::{State, connection::ConnectionState, receives2::ReceivedPublish}};
8
9/// The MQTT Client to publish messages, subscribe, unsubscribe and receive messages
10#[derive(Clone)]
11pub struct MqttClient<'a, 'n, 'l, M: RawMutex, NET, DNS, const BUFFER: usize, const TOPIC: usize, const QUEUE: usize> 
12where NET: TcpConnect, DNS: Dns {
13    state: &'a State<'n, 'l, M, NET, DNS, BUFFER, TOPIC, QUEUE>
14}
15
16impl <'a, 'n, 'l, M: RawMutex, NET, DNS, const BUFFER: usize, const TOPIC: usize, const QUEUE: usize> MqttClient<'a, 'n, 'l, M, NET, DNS, BUFFER, TOPIC, QUEUE> 
17where NET: TcpConnect, DNS: Dns {
18
19    pub(crate) fn new(state: &'a State<'n, 'l, M, NET, DNS, BUFFER, TOPIC, QUEUE>) -> Self {
20        Self {
21            state
22        }
23    }
24
25    pub async fn on_auto_subscribes_done(&self) {
26        self.state.connection_state.await_connected().await;
27    }
28
29    async fn await_event<F, U>(mut subscriber: DynSubscriber<'_, MqttEvent>, f: F) -> U 
30    where F: Fn(MqttEvent) -> Option<U> {
31        loop {
32            let event = subscriber.next_message_pure().await;
33            if let Some(result) = f(event) {
34                return result;
35            }
36        }
37    } 
38
39    /// Publish a MQTT message with the given parameters
40    /// 
41    /// Waits until there is a successful publish result. The publish is successful after all acknolodgements 
42    /// accordings to the selected [`QoS`] have bee exchanged
43    pub async fn publish(&self, topic: &str, payload: &[u8], qos: QoS, retain: bool) -> Result<(), MqttError> {
44        let unique_id = UniqueID::new();
45        let subscriber = self.state.subscribe_events()?;
46
47        self.state.publish(topic, payload, qos, retain, unique_id).await?;
48
49        Self::await_event(subscriber, |event | match event {
50            MqttEvent::PublishDone(id) if id == unique_id => Some(()),
51            _ => None
52        }).await;
53
54        unique_id.free();
55
56        Ok(())
57    }
58
59    /// Subscribe to a topic
60    /// 
61    /// The method returns after the suback has bee received
62    pub async fn subscribe(&self, topic: &str, qos: QoS) -> Result<(), MqttError> {
63        let unique_id = UniqueID::new();
64        let subscriber = self.state.subscribe_events()?;
65
66        self.state.subscribe(&[topic], qos, unique_id).await;
67
68        let result = Self::await_event(subscriber, |event | match event {
69            MqttEvent::SubscribeDone(id, result) if id == unique_id => Some(result),
70            _ => None
71        }).await;
72
73        unique_id.free();
74
75        result?;
76
77        Ok(())
78    }
79
80    /// unsubscribe from a topic
81    /// 
82    /// waits until the unsuback has bee received
83    pub async fn unsubscribe(&self, topic: &str) -> Result<(), MqttError> {
84        let unique_id = UniqueID::new();
85        let subscriber = self.state.subscribe_events()?;
86
87        self.state.unsubscribe(&[topic], unique_id).await;
88
89        Self::await_event(subscriber, |event | match event {
90            MqttEvent::UnsubscribeDone(id) if id == unique_id => Some(()),
91            _ => None
92        }).await;
93
94        unique_id.free();
95
96        Ok(())
97    }
98
99    /// send a disconnect packet to the broker
100    pub fn disconnect(&self) {
101        self.state.disconnect();
102    }
103
104    /// Subscribe to received publishes
105    pub fn subscribe_received_publishes(&self) -> Result<DynSubscriber<'_, ReceivedPublish<BUFFER, TOPIC>>, MqttError> {
106        self.state.subscribe_received_publishes()
107    }
108
109}