embedded_mqttc/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2
3use core::{cell::Cell, ops::Deref};
4
5use embassy_sync::blocking_mutex::{raw::CriticalSectionRawMutex, Mutex};
6use heapless::String;
7use thiserror::Error;
8
9use heapless::Vec;
10
11pub use embytes_buffer::*;
12
13use mqttrs2::{Pid, Publish, QosPid};
14pub use mqttrs2::QoS;
15
16// This must come first so the macros are visible
17pub(crate) mod fmt;
18
19pub mod io;
20pub(crate) mod state;
21use state::sub::MAX_CONCURRENT_REQUESTS;
22
23pub(crate) mod time;
24pub mod client;
25
26pub(crate) mod misc;
27
28pub mod queue_vec;
29
30pub mod network;
31
32
33static COUNTER: Mutex<CriticalSectionRawMutex, Cell<u64>> = Mutex::new(Cell::new(0));
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36#[cfg_attr(feature = "defmt", derive(defmt::Format))]
37pub struct UniqueID(u64);
38
39impl UniqueID {
40    pub fn new() -> Self {
41        Self(COUNTER.lock(|inner|{
42            let value = inner.get();
43            inner.set(value + 1);
44            value
45        }))
46    }
47}
48
49impl Deref for UniqueID {
50    type Target = u64;
51
52    fn deref(&self) -> &Self::Target {
53        &self.0
54    }
55}
56
57#[derive(Debug, Error, Clone, PartialEq)]
58#[cfg_attr(feature = "defmt", derive(defmt::Format))]
59pub enum MqttError {
60
61    #[error("TCP Connection failed")]
62    ConnectionFailed(network::NetworkError),
63
64    #[error("The buffer is full")]
65    BufferFull,
66
67    #[error("connection rejected by broker")]
68    ConnackError,
69
70    #[error("The connection was rejected because of invalid / missing authentication")]
71    AuthenticationError,
72
73    #[error("Error while encoding and decoding packages")]
74    CodecError,
75
76    #[error("Payload of received message is too long")]
77    ReceivedMessageTooLong,
78
79    #[error("The suback / unsuback packet arrived with an error code")]
80    SubscribeOrUnsubscribeFailed,
81
82    #[error("Some internal error occured")]
83    InternalError
84}
85
86#[derive(Clone)]
87pub struct ClientCredentials {
88    pub username: String<32>,
89    pub password: String<128>,
90}
91
92impl ClientCredentials {
93    pub fn new(username: &str, password: &str) -> Self {
94        let mut this = Self {
95            username: String::new(),
96            password: String::new()
97        };
98
99        this.username.push_str(username).unwrap();
100        this.password.push_str(password).unwrap();
101        this
102    }
103}
104
105#[derive(Debug, Clone)]
106pub struct AutoSubscribe {
107    pub topic: Topic,
108    pub qos: QoS
109}
110
111impl AutoSubscribe {
112    pub fn new(topic: &str, qos: QoS) -> Self {
113        let mut this = Self {
114            topic: Topic::new(),
115            qos
116        };
117        this.topic.push_str(topic).unwrap();
118        this
119    }
120}
121
122#[derive(Clone)]
123pub struct ClientConfig {
124    pub client_id: String<128>,
125    pub credentials: Option<ClientCredentials>,
126    pub auto_subscribes: Vec<AutoSubscribe, MAX_CONCURRENT_REQUESTS>
127}
128
129impl ClientConfig {
130    pub fn new(client_id: &str, credentials: Option<ClientCredentials>) -> Self {
131        let mut cid = String::new();
132        cid.push_str(client_id).unwrap();
133        Self {
134            client_id: cid,
135            credentials,
136            auto_subscribes: Vec::new()
137        }
138    }
139
140    pub fn new_with_auto_subscribes<'a>(client_id: &str, credentials: Option<ClientCredentials>, auto_subscribes: impl Iterator<Item = &'a str>, qos: QoS) -> Self {
141        let mut cid = String::new();
142        cid.push_str(client_id).unwrap();
143
144        let mut this = Self {
145            client_id: cid,
146            credentials,
147            auto_subscribes: Vec::new()
148        };
149
150        for topic in auto_subscribes {
151            let mut topic_string = Topic::new();
152            topic_string.push_str(topic).unwrap();
153            let auto_subscribe = AutoSubscribe{
154                topic: topic_string,
155                qos
156            };
157            this.auto_subscribes.push(auto_subscribe).unwrap();
158        }
159
160
161        this
162    }
163}
164
165pub const MAX_TOPIC_SIZE: usize = 64;
166pub const MQTT_PAYLOAD_MAX_SIZE: usize = 1024;
167
168pub type Topic = heapless::String<MAX_TOPIC_SIZE>;
169
170#[derive(Debug, Clone)]
171#[cfg_attr(feature = "defmt", derive(defmt::Format))]
172pub struct MqttPublish {
173    pub topic: Topic,
174    pub payload: Buffer<[u8; MQTT_PAYLOAD_MAX_SIZE]>,
175    pub qos: QoS,
176    pub retain: bool,
177}
178
179impl MqttPublish {
180
181    pub fn new(topic: &str, payload: &[u8], qos: QoS, retain: bool) -> Self {
182        let mut s = Self {
183            topic: Topic::new(),
184            payload: new_stack_buffer(),
185            qos, retain
186        };
187        s.topic.push_str(topic).unwrap();
188        s.payload.push(payload).unwrap();
189
190        s
191    }
192
193}
194
195impl <'a> TryFrom<&Publish<'a>> for MqttPublish {
196    type Error = MqttError;
197
198    fn try_from(value: &Publish<'a>) -> Result<Self, Self::Error> {
199        let mut topic = Topic::new();
200        if let Err(_) = topic.push_str(value.topic_name) {
201            warn!("Topic of received message is longer than {}: {}", MAX_TOPIC_SIZE, value.topic_name.len());
202            return Err(MqttError::ReceivedMessageTooLong);
203        }
204
205        let mut payload = new_stack_buffer();
206        if let Err(_e) = payload.push(&value.payload) {
207            warn!("Payload of received message is longer than {}: {}", MQTT_PAYLOAD_MAX_SIZE, value.payload.len());
208        }
209
210        let qos = value.qospid.qos();
211
212        Ok(Self {
213            topic, payload, qos,
214            retain: value.retain
215        })
216    }
217}
218
219impl  MqttPublish {
220    pub(crate) fn create_publish<'a>(&'a self, pid: Pid, dup: bool) -> Publish<'a> {
221        let qospid = match self.qos {
222            QoS::AtMostOnce => QosPid::AtMostOnce,
223            QoS::AtLeastOnce => QosPid::AtLeastOnce(pid),
224            QoS::ExactlyOnce => QosPid::ExactlyOnce(pid),
225        };
226
227        Publish {
228            dup,
229            qospid,
230            retain: self.retain,
231            topic_name: &self.topic,
232            payload: self.payload.data()
233        }
234    }
235}
236
237
238#[derive(Debug, Clone, PartialEq)]
239#[cfg_attr(feature = "defmt", derive(defmt::Format))]
240pub enum MqttEvent {
241
242    Connected,
243    InitialSubscribesDone,
244
245    PublishResult(UniqueID, Result<(), MqttError>),
246    SubscribeResult(UniqueID, Result<QoS, MqttError>),
247    UnsubscribeResult(UniqueID, Result<(), MqttError>)
248}
249
250
251
252#[derive(Debug)]
253#[cfg_attr(feature = "defmt", derive(defmt::Format))]
254enum MqttRequest {
255
256    Publish(MqttPublish, UniqueID),
257
258    Subscribe(Topic, UniqueID),
259
260    Unsubscribe(Topic, UniqueID),
261
262    Disconnect,
263
264}
265
266
267
268