1#![cfg_attr(not(feature = "std"), no_std)]
2
3use core::{cell::Cell, ops::Deref};
4
5use embassy_sync::blocking_mutex::{raw::CriticalSectionRawMutex, Mutex};
6use heapless::String;
7use thiserror::Error;
8
9use heapless::Vec;
10
11pub use embytes_buffer::*;
12
13use mqttrs2::{Pid, Publish, QosPid};
14pub use mqttrs2::QoS;
15
16pub(crate) mod fmt;
18
19pub mod io;
20pub(crate) mod state;
21use state::sub::MAX_CONCURRENT_REQUESTS;
22
23pub(crate) mod time;
24pub mod client;
25
26pub(crate) mod misc;
27
28pub mod queue_vec;
29
30pub mod network;
31
32
33static COUNTER: Mutex<CriticalSectionRawMutex, Cell<u64>> = Mutex::new(Cell::new(0));
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36#[cfg_attr(feature = "defmt", derive(defmt::Format))]
37pub struct UniqueID(u64);
38
39impl UniqueID {
40 pub fn new() -> Self {
41 Self(COUNTER.lock(|inner|{
42 let value = inner.get();
43 inner.set(value + 1);
44 value
45 }))
46 }
47}
48
49impl Deref for UniqueID {
50 type Target = u64;
51
52 fn deref(&self) -> &Self::Target {
53 &self.0
54 }
55}
56
57#[derive(Debug, Error, Clone, PartialEq)]
58#[cfg_attr(feature = "defmt", derive(defmt::Format))]
59pub enum MqttError {
60
61 #[error("TCP Connection failed")]
62 ConnectionFailed(network::NetworkError),
63
64 #[error("The buffer is full")]
65 BufferFull,
66
67 #[error("connection rejected by broker")]
68 ConnackError,
69
70 #[error("The connection was rejected because of invalid / missing authentication")]
71 AuthenticationError,
72
73 #[error("Error while encoding and decoding packages")]
74 CodecError,
75
76 #[error("Payload of received message is too long")]
77 ReceivedMessageTooLong,
78
79 #[error("The suback / unsuback packet arrived with an error code")]
80 SubscribeOrUnsubscribeFailed,
81
82 #[error("Some internal error occured")]
83 InternalError
84}
85
86
87#[derive(Clone)]
89pub struct ClientCredentials {
90 pub username: String<32>,
91 pub password: String<128>,
92}
93
94impl ClientCredentials {
95 pub fn new(username: &str, password: &str) -> Self {
96 let mut this = Self {
97 username: String::new(),
98 password: String::new()
99 };
100
101 this.username.push_str(username).unwrap();
102 this.password.push_str(password).unwrap();
103 this
104 }
105}
106
107#[derive(Debug, Clone)]
111pub struct AutoSubscribe {
112 pub topic: Topic,
113 pub qos: QoS
114}
115
116impl AutoSubscribe {
117 pub fn new(topic: &str, qos: QoS) -> Self {
118 let mut this = Self {
119 topic: Topic::new(),
120 qos
121 };
122 this.topic.push_str(topic).unwrap();
123 this
124 }
125}
126
127#[derive(Clone)]
128pub struct ClientConfig {
129 pub client_id: String<128>,
130 pub credentials: Option<ClientCredentials>,
131 pub auto_subscribes: Vec<AutoSubscribe, MAX_CONCURRENT_REQUESTS>
132}
133
134impl ClientConfig {
135 pub fn new(client_id: &str, credentials: Option<ClientCredentials>) -> Self {
136 let mut cid = String::new();
137 cid.push_str(client_id).unwrap();
138 Self {
139 client_id: cid,
140 credentials,
141 auto_subscribes: Vec::new()
142 }
143 }
144
145 pub fn new_with_auto_subscribes<'a>(client_id: &str, credentials: Option<ClientCredentials>, auto_subscribes: impl Iterator<Item = &'a str>, qos: QoS) -> Self {
146 let mut cid = String::new();
147 cid.push_str(client_id).unwrap();
148
149 let mut this = Self {
150 client_id: cid,
151 credentials,
152 auto_subscribes: Vec::new()
153 };
154
155 for topic in auto_subscribes {
156 let mut topic_string = Topic::new();
157 topic_string.push_str(topic).unwrap();
158 let auto_subscribe = AutoSubscribe{
159 topic: topic_string,
160 qos
161 };
162 this.auto_subscribes.push(auto_subscribe).unwrap();
163 }
164
165
166 this
167 }
168}
169
170pub const MAX_TOPIC_SIZE: usize = 64;
171pub const MQTT_PAYLOAD_MAX_SIZE: usize = 1024;
172
173pub type Topic = heapless::String<MAX_TOPIC_SIZE>;
174
175#[derive(Debug, Clone)]
176#[cfg_attr(feature = "defmt", derive(defmt::Format))]
177pub struct MqttPublish {
178 pub topic: Topic,
179 pub payload: Buffer<[u8; MQTT_PAYLOAD_MAX_SIZE]>,
180 pub qos: QoS,
181 pub retain: bool,
182}
183
184impl MqttPublish {
185
186 pub fn new(topic: &str, payload: &[u8], qos: QoS, retain: bool) -> Self {
187 let mut s = Self {
188 topic: Topic::new(),
189 payload: new_stack_buffer(),
190 qos, retain
191 };
192 s.topic.push_str(topic).unwrap();
193 s.payload.push(payload).unwrap();
194
195 s
196 }
197
198}
199
200impl <'a> TryFrom<&Publish<'a>> for MqttPublish {
201 type Error = MqttError;
202
203 fn try_from(value: &Publish<'a>) -> Result<Self, Self::Error> {
204 let mut topic = Topic::new();
205 if let Err(_) = topic.push_str(value.topic_name) {
206 warn!("Topic of received message is longer than {}: {}", MAX_TOPIC_SIZE, value.topic_name.len());
207 return Err(MqttError::ReceivedMessageTooLong);
208 }
209
210 let mut payload = new_stack_buffer();
211 if let Err(_e) = payload.push(&value.payload) {
212 warn!("Payload of received message is longer than {}: {}", MQTT_PAYLOAD_MAX_SIZE, value.payload.len());
213 }
214
215 let qos = value.qospid.qos();
216
217 Ok(Self {
218 topic, payload, qos,
219 retain: value.retain
220 })
221 }
222}
223
224impl MqttPublish {
225 pub(crate) fn create_publish<'a>(&'a self, pid: Pid, dup: bool) -> Publish<'a> {
226 let qospid = match self.qos {
227 QoS::AtMostOnce => QosPid::AtMostOnce,
228 QoS::AtLeastOnce => QosPid::AtLeastOnce(pid),
229 QoS::ExactlyOnce => QosPid::ExactlyOnce(pid),
230 };
231
232 Publish {
233 dup,
234 qospid,
235 retain: self.retain,
236 topic_name: &self.topic,
237 payload: self.payload.data()
238 }
239 }
240}
241
242
243#[derive(Debug, Clone, PartialEq)]
244#[cfg_attr(feature = "defmt", derive(defmt::Format))]
245pub enum MqttEvent {
246
247 Connected,
248 InitialSubscribesDone,
249
250 PublishResult(UniqueID, Result<(), MqttError>),
251 SubscribeResult(UniqueID, Result<QoS, MqttError>),
252 UnsubscribeResult(UniqueID, Result<(), MqttError>)
253}
254
255
256
257#[derive(Debug)]
258#[cfg_attr(feature = "defmt", derive(defmt::Format))]
259enum MqttRequest {
260
261 Publish(MqttPublish, UniqueID),
262
263 Subscribe(Topic, UniqueID),
264
265 Unsubscribe(Topic, UniqueID),
266
267 Disconnect,
268
269}
270
271
272
273