1#![cfg_attr(not(feature = "std"), no_std)]
2
3use core::{cell::Cell, ops::Deref};
4
5use embassy_sync::blocking_mutex::{raw::CriticalSectionRawMutex, Mutex};
6use heapless::String;
7use thiserror::Error;
8
9use heapless::Vec;
10
11pub use embytes_buffer::*;
12
13use mqttrs2::{Pid, Publish, QosPid};
14pub use mqttrs2::QoS;
15
16pub(crate) mod fmt;
18
19pub mod io;
20pub(crate) mod state;
21use state::sub::MAX_CONCURRENT_REQUESTS;
22
23pub(crate) mod time;
24pub mod client;
25
26pub(crate) mod misc;
27
28pub mod queue_vec;
29
30pub mod network;
31
32
33static COUNTER: Mutex<CriticalSectionRawMutex, Cell<u64>> = Mutex::new(Cell::new(0));
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36#[cfg_attr(feature = "defmt", derive(defmt::Format))]
37pub struct UniqueID(u64);
38
39impl UniqueID {
40 pub fn new() -> Self {
41 Self(COUNTER.lock(|inner|{
42 let value = inner.get();
43 inner.set(value + 1);
44 value
45 }))
46 }
47}
48
49impl Deref for UniqueID {
50 type Target = u64;
51
52 fn deref(&self) -> &Self::Target {
53 &self.0
54 }
55}
56
57#[derive(Debug, Error, Clone, PartialEq)]
58#[cfg_attr(feature = "defmt", derive(defmt::Format))]
59pub enum MqttError {
60
61 #[error("TCP Connection failed")]
62 ConnectionFailed(network::NetworkError),
63
64 #[error("The buffer is full")]
65 BufferFull,
66
67 #[error("connection rejected by broker")]
68 ConnackError,
69
70 #[error("The connection was rejected because of invalid / missing authentication")]
71 AuthenticationError,
72
73 #[error("Error while encoding and decoding packages")]
74 CodecError,
75
76 #[error("Payload of received message is too long")]
77 ReceivedMessageTooLong,
78
79 #[error("The suback / unsuback packet arrived with an error code")]
80 SubscribeOrUnsubscribeFailed,
81
82 #[error("Some internal error occured")]
83 InternalError
84}
85
86#[derive(Clone)]
87pub struct ClientCredentials {
88 pub username: String<32>,
89 pub password: String<128>,
90}
91
92impl ClientCredentials {
93 pub fn new(username: &str, password: &str) -> Self {
94 let mut this = Self {
95 username: String::new(),
96 password: String::new()
97 };
98
99 this.username.push_str(username).unwrap();
100 this.password.push_str(password).unwrap();
101 this
102 }
103}
104
105#[derive(Debug, Clone)]
106pub struct AutoSubscribe {
107 pub topic: Topic,
108 pub qos: QoS
109}
110
111impl AutoSubscribe {
112 pub fn new(topic: &str, qos: QoS) -> Self {
113 let mut this = Self {
114 topic: Topic::new(),
115 qos
116 };
117 this.topic.push_str(topic).unwrap();
118 this
119 }
120}
121
122#[derive(Clone)]
123pub struct ClientConfig {
124 pub client_id: String<128>,
125 pub credentials: Option<ClientCredentials>,
126 pub auto_subscribes: Vec<AutoSubscribe, MAX_CONCURRENT_REQUESTS>
127}
128
129impl ClientConfig {
130 pub fn new(client_id: &str, credentials: Option<ClientCredentials>) -> Self {
131 let mut cid = String::new();
132 cid.push_str(client_id).unwrap();
133 Self {
134 client_id: cid,
135 credentials,
136 auto_subscribes: Vec::new()
137 }
138 }
139
140 pub fn new_with_auto_subscribes<'a>(client_id: &str, credentials: Option<ClientCredentials>, auto_subscribes: impl Iterator<Item = &'a str>, qos: QoS) -> Self {
141 let mut cid = String::new();
142 cid.push_str(client_id).unwrap();
143
144 let mut this = Self {
145 client_id: cid,
146 credentials,
147 auto_subscribes: Vec::new()
148 };
149
150 for topic in auto_subscribes {
151 let mut topic_string = Topic::new();
152 topic_string.push_str(topic).unwrap();
153 let auto_subscribe = AutoSubscribe{
154 topic: topic_string,
155 qos
156 };
157 this.auto_subscribes.push(auto_subscribe).unwrap();
158 }
159
160
161 this
162 }
163}
164
165pub const MAX_TOPIC_SIZE: usize = 64;
166pub const MQTT_PAYLOAD_MAX_SIZE: usize = 1024;
167
168pub type Topic = heapless::String<MAX_TOPIC_SIZE>;
169
170#[derive(Debug, Clone)]
171#[cfg_attr(feature = "defmt", derive(defmt::Format))]
172pub struct MqttPublish {
173 pub topic: Topic,
174 pub payload: Buffer<[u8; MQTT_PAYLOAD_MAX_SIZE]>,
175 pub qos: QoS,
176 pub retain: bool,
177}
178
179impl MqttPublish {
180
181 pub fn new(topic: &str, payload: &[u8], qos: QoS, retain: bool) -> Self {
182 let mut s = Self {
183 topic: Topic::new(),
184 payload: new_stack_buffer(),
185 qos, retain
186 };
187 s.topic.push_str(topic).unwrap();
188 s.payload.push(payload).unwrap();
189
190 s
191 }
192
193}
194
195impl <'a> TryFrom<&Publish<'a>> for MqttPublish {
196 type Error = MqttError;
197
198 fn try_from(value: &Publish<'a>) -> Result<Self, Self::Error> {
199 let mut topic = Topic::new();
200 if let Err(_) = topic.push_str(value.topic_name) {
201 warn!("Topic of received message is longer than {}: {}", MAX_TOPIC_SIZE, value.topic_name.len());
202 return Err(MqttError::ReceivedMessageTooLong);
203 }
204
205 let mut payload = new_stack_buffer();
206 if let Err(_e) = payload.push(&value.payload) {
207 warn!("Payload of received message is longer than {}: {}", MQTT_PAYLOAD_MAX_SIZE, value.payload.len());
208 }
209
210 let qos = value.qospid.qos();
211
212 Ok(Self {
213 topic, payload, qos,
214 retain: value.retain
215 })
216 }
217}
218
219impl MqttPublish {
220 pub(crate) fn create_publish<'a>(&'a self, pid: Pid, dup: bool) -> Publish<'a> {
221 let qospid = match self.qos {
222 QoS::AtMostOnce => QosPid::AtMostOnce,
223 QoS::AtLeastOnce => QosPid::AtLeastOnce(pid),
224 QoS::ExactlyOnce => QosPid::ExactlyOnce(pid),
225 };
226
227 Publish {
228 dup,
229 qospid,
230 retain: self.retain,
231 topic_name: &self.topic,
232 payload: self.payload.data()
233 }
234 }
235}
236
237
238#[derive(Debug, Clone, PartialEq)]
239#[cfg_attr(feature = "defmt", derive(defmt::Format))]
240pub enum MqttEvent {
241
242 Connected,
243 InitialSubscribesDone,
244
245 PublishResult(UniqueID, Result<(), MqttError>),
246 SubscribeResult(UniqueID, Result<QoS, MqttError>),
247 UnsubscribeResult(UniqueID, Result<(), MqttError>)
248}
249
250
251
252#[derive(Debug)]
253#[cfg_attr(feature = "defmt", derive(defmt::Format))]
254enum MqttRequest {
255
256 Publish(MqttPublish, UniqueID),
257
258 Subscribe(Topic, UniqueID),
259
260 Unsubscribe(Topic, UniqueID),
261
262 Disconnect,
263
264}
265
266
267
268