embedded_mqttc/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2
3use core::{cell::Cell, ops::Deref};
4
5use embassy_sync::blocking_mutex::{raw::CriticalSectionRawMutex, Mutex};
6use heapless::String;
7use thiserror::Error;
8
9use heapless::Vec;
10
11pub use embytes_buffer::*;
12
13use mqttrs2::{Pid, Publish, QosPid};
14pub use mqttrs2::QoS;
15
16// This must come first so the macros are visible
17pub(crate) mod fmt;
18
19pub mod io;
20pub(crate) mod state;
21use state::sub::MAX_CONCURRENT_REQUESTS;
22
23pub(crate) mod time;
24pub mod client;
25
26pub(crate) mod misc;
27
28pub mod queue_vec;
29
30pub mod network;
31
32
33static COUNTER: Mutex<CriticalSectionRawMutex, Cell<u64>> = Mutex::new(Cell::new(0));
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36#[cfg_attr(feature = "defmt", derive(defmt::Format))]
37pub struct UniqueID(u64);
38
39impl UniqueID {
40    pub fn new() -> Self {
41        Self(COUNTER.lock(|inner|{
42            let value = inner.get();
43            inner.set(value + 1);
44            value
45        }))
46    }
47}
48
49impl Deref for UniqueID {
50    type Target = u64;
51
52    fn deref(&self) -> &Self::Target {
53        &self.0
54    }
55}
56
57#[derive(Debug, Error, Clone, PartialEq)]
58#[cfg_attr(feature = "defmt", derive(defmt::Format))]
59pub enum MqttError {
60
61    #[error("TCP Connection failed")]
62    ConnectionFailed(network::NetworkError),
63
64    #[error("The buffer is full")]
65    BufferFull,
66
67    #[error("connection rejected by broker")]
68    ConnackError,
69
70    #[error("The connection was rejected because of invalid / missing authentication")]
71    AuthenticationError,
72
73    #[error("Error while encoding and decoding packages")]
74    CodecError,
75
76    #[error("Payload of received message is too long")]
77    ReceivedMessageTooLong,
78
79    #[error("The suback / unsuback packet arrived with an error code")]
80    SubscribeOrUnsubscribeFailed,
81
82    #[error("Some internal error occured")]
83    InternalError
84}
85
86
87/// Credentials used to connecto to the broker
88#[derive(Clone)]
89pub struct ClientCredentials {
90    pub username: String<32>,
91    pub password: String<128>,
92}
93
94impl ClientCredentials {
95    pub fn new(username: &str, password: &str) -> Self {
96        let mut this = Self {
97            username: String::new(),
98            password: String::new()
99        };
100
101        this.username.push_str(username).unwrap();
102        this.password.push_str(password).unwrap();
103        this
104    }
105}
106
107/// An [`AutoSubscribe`] is sent to the broker after connected.
108/// 
109/// It is also sent to the broker after reconnects. This should be the preferrd way to subscribe to topics.
110#[derive(Debug, Clone)]
111pub struct AutoSubscribe {
112    pub topic: Topic,
113    pub qos: QoS
114}
115
116impl AutoSubscribe {
117    pub fn new(topic: &str, qos: QoS) -> Self {
118        let mut this = Self {
119            topic: Topic::new(),
120            qos
121        };
122        this.topic.push_str(topic).unwrap();
123        this
124    }
125}
126
127#[derive(Clone)]
128pub struct ClientConfig {
129    pub client_id: String<128>,
130    pub credentials: Option<ClientCredentials>,
131    pub auto_subscribes: Vec<AutoSubscribe, MAX_CONCURRENT_REQUESTS>
132}
133
134impl ClientConfig {
135    pub fn new(client_id: &str, credentials: Option<ClientCredentials>) -> Self {
136        let mut cid = String::new();
137        cid.push_str(client_id).unwrap();
138        Self {
139            client_id: cid,
140            credentials,
141            auto_subscribes: Vec::new()
142        }
143    }
144
145    pub fn new_with_auto_subscribes<'a>(client_id: &str, credentials: Option<ClientCredentials>, auto_subscribes: impl Iterator<Item = &'a str>, qos: QoS) -> Self {
146        let mut cid = String::new();
147        cid.push_str(client_id).unwrap();
148
149        let mut this = Self {
150            client_id: cid,
151            credentials,
152            auto_subscribes: Vec::new()
153        };
154
155        for topic in auto_subscribes {
156            let mut topic_string = Topic::new();
157            topic_string.push_str(topic).unwrap();
158            let auto_subscribe = AutoSubscribe{
159                topic: topic_string,
160                qos
161            };
162            this.auto_subscribes.push(auto_subscribe).unwrap();
163        }
164
165
166        this
167    }
168}
169
170pub const MAX_TOPIC_SIZE: usize = 64;
171pub const MQTT_PAYLOAD_MAX_SIZE: usize = 1024;
172
173pub type Topic = heapless::String<MAX_TOPIC_SIZE>;
174
175#[derive(Debug, Clone)]
176#[cfg_attr(feature = "defmt", derive(defmt::Format))]
177pub struct MqttPublish {
178    pub topic: Topic,
179    pub payload: Buffer<[u8; MQTT_PAYLOAD_MAX_SIZE]>,
180    pub qos: QoS,
181    pub retain: bool,
182}
183
184impl MqttPublish {
185
186    pub fn new(topic: &str, payload: &[u8], qos: QoS, retain: bool) -> Self {
187        let mut s = Self {
188            topic: Topic::new(),
189            payload: new_stack_buffer(),
190            qos, retain
191        };
192        s.topic.push_str(topic).unwrap();
193        s.payload.push(payload).unwrap();
194
195        s
196    }
197
198}
199
200impl <'a> TryFrom<&Publish<'a>> for MqttPublish {
201    type Error = MqttError;
202
203    fn try_from(value: &Publish<'a>) -> Result<Self, Self::Error> {
204        let mut topic = Topic::new();
205        if let Err(_) = topic.push_str(value.topic_name) {
206            warn!("Topic of received message is longer than {}: {}", MAX_TOPIC_SIZE, value.topic_name.len());
207            return Err(MqttError::ReceivedMessageTooLong);
208        }
209
210        let mut payload = new_stack_buffer();
211        if let Err(_e) = payload.push(&value.payload) {
212            warn!("Payload of received message is longer than {}: {}", MQTT_PAYLOAD_MAX_SIZE, value.payload.len());
213        }
214
215        let qos = value.qospid.qos();
216
217        Ok(Self {
218            topic, payload, qos,
219            retain: value.retain
220        })
221    }
222}
223
224impl  MqttPublish {
225    pub(crate) fn create_publish<'a>(&'a self, pid: Pid, dup: bool) -> Publish<'a> {
226        let qospid = match self.qos {
227            QoS::AtMostOnce => QosPid::AtMostOnce,
228            QoS::AtLeastOnce => QosPid::AtLeastOnce(pid),
229            QoS::ExactlyOnce => QosPid::ExactlyOnce(pid),
230        };
231
232        Publish {
233            dup,
234            qospid,
235            retain: self.retain,
236            topic_name: &self.topic,
237            payload: self.payload.data()
238        }
239    }
240}
241
242
243#[derive(Debug, Clone, PartialEq)]
244#[cfg_attr(feature = "defmt", derive(defmt::Format))]
245pub enum MqttEvent {
246
247    Connected,
248    InitialSubscribesDone,
249
250    PublishResult(UniqueID, Result<(), MqttError>),
251    SubscribeResult(UniqueID, Result<QoS, MqttError>),
252    UnsubscribeResult(UniqueID, Result<(), MqttError>)
253}
254
255
256
257#[derive(Debug)]
258#[cfg_attr(feature = "defmt", derive(defmt::Format))]
259enum MqttRequest {
260
261    Publish(MqttPublish, UniqueID),
262
263    Subscribe(Topic, UniqueID),
264
265    Unsubscribe(Topic, UniqueID),
266
267    Disconnect,
268
269}
270
271
272
273