1#![cfg_attr(not(feature = "std"), no_std)]
2
3use core::{cell::RefCell, net::IpAddr};
4
5use embassy_sync::blocking_mutex::{raw::CriticalSectionRawMutex, Mutex};
6use embedded_nal_async::{AddrType, Dns};
7use heapless::{Deque, String};
8use thiserror::Error;
9
10use heapless::Vec;
11
12use mqttrs2::{Pid, QosPid};
13pub use mqttrs2::QoS;
14
15use crate::fmt::Debug2Format;
16
17pub(crate) mod fmt;
19
20pub mod state;
21
22pub(crate) mod time;
23pub mod client;
24
25pub(crate) mod buffer;
26
27pub mod packet;
28
29pub(crate) mod mutex;
30
31#[cfg(test)]
32pub mod testutils;
33
34
35#[derive(Debug, Error, Clone, PartialEq)]
36#[cfg_attr(feature = "defmt", derive(defmt::Format))]
37pub enum MqttError {
38
39 #[error("TCP conenction failed")]
40 ConnectionFailed2(embedded_io_async::ErrorKind),
41
42 #[error("DNS failed")]
43 DnsFailed,
44
45 #[error("buffer too small")]
46 BufferTooSmall,
47
48 #[error("connection rejected by broker")]
49 ConnackError,
50
51 #[error("The connection was rejected because of invalid / missing authentication")]
52 AuthenticationError,
53
54 #[error("Error while encoding and decoding packages")]
55 CodecError(mqttrs2::Error),
56
57 #[error("Payload of received message is too long")]
58 ReceivedMessageTooLong,
59
60 #[error("The suback / unsuback packet arrived with an error code")]
61 SubscribeOrUnsubscribeFailed,
62
63 #[error("Some internal error occured")]
64 InternalError,
65
66 #[error("sending packet `{0:?}`: send queue full")]
68 QueueFull(QosPid),
69
70 #[error("received ack for unknown pid `{0:?}`")]
71 UnexpectedAck(Pid),
72
73 #[error("error with pubsub: `{0:?}`")]
74 PubsubError(embassy_sync::pubsub::Error),
75
76 #[error("topic size too big")]
77 TopicSizeError,
78}
79impl MqttError {
80 pub(crate) fn new_dns(err: &dyn core::fmt::Debug) -> Self {
81 error!("dns failed: {:?}", Debug2Format(err));
82 Self::DnsFailed
83 }
84}
85
86impl From<embedded_io_async::ErrorKind> for MqttError {
87 fn from(err_kind: embedded_io_async::ErrorKind) -> Self {
88 Self::ConnectionFailed2(err_kind)
89 }
90}
91
92impl From<embassy_sync::pubsub::Error> for MqttError {
93 fn from(err: embassy_sync::pubsub::Error) -> Self {
94 Self::PubsubError(err)
95 }
96}
97
98impl From<mqttrs2::Error> for MqttError {
99 fn from(value: mqttrs2::Error) -> Self {
100 Self::CodecError(value)
101 }
102}
103
104
105#[derive(Clone)]
107pub struct ClientCredentials<'a> {
108 pub username: &'a str,
109 pub password: &'a str,
110}
111
112impl <'a> ClientCredentials<'a> {
113 pub fn new(username: &'a str, password: &'a str) -> Self {
114 Self {
115 username, password
116 }
117 }
118}
119
120#[derive(Debug, Clone)]
124pub struct AutoSubscribe {
125 pub topic: Topic,
126 pub qos: QoS
127}
128
129impl TryInto<mqttrs2::SubscribeTopic> for &AutoSubscribe {
130 type Error = MqttError;
131
132 fn try_into(self) -> Result<mqttrs2::SubscribeTopic, Self::Error> {
133 let topic = mqttrs2::SubscribeTopic {
134 qos: self.qos,
135 topic_path: String::try_from(self.topic.as_ref())
136 .map_err(|_| MqttError::TopicSizeError)?
137 };
138 Ok(topic)
139 }
140}
141
142impl AutoSubscribe {
143 pub fn new(topic: &str, qos: QoS) -> Self {
144 let mut this = Self {
145 topic: Topic::new(),
146 qos
147 };
148 this.topic.push_str(topic).unwrap();
149 this
150 }
151}
152
153#[derive(Debug, Clone)]
154pub enum Host<'a> {
155 Hostname(&'a str),
156 Ip(IpAddr)
157}
158
159#[cfg(all(feature = "ipv4", not(feature = "ipv6")))]
160const IP_ADDR_TYPE: AddrType = AddrType::IPv4;
161
162#[cfg(all(feature = "ipv6", not(feature = "ipv4")))]
163const IP_ADDR_TYPE: AddrType = AddrType::IPv6;
164
165#[cfg(all(feature = "ipv4", feature = "ipv6"))]
166const IP_ADDR_TYPE: AddrType = AddrType::Either;
167
168impl<'a> Host<'a> {
169
170 #[cfg(any(feature = "ipv4", feature = "ipv6"))]
171 pub(crate) async fn resolve(&self, dns: &impl Dns) -> Result<IpAddr, MqttError> {
172 match self {
173 crate::Host::Hostname(host) => {
174 debug!("query dns to resolve hostname {}", host);
175 let ip = dns.get_host_by_name(host, IP_ADDR_TYPE).await
176 .map_err(|err| MqttError::new_dns(&err))?;
177 debug!("dns resolved {} to {}", host, &ip);
178 Ok(ip)
179 },
180 crate::Host::Ip(ip) => Ok(ip.clone()),
181 }
182 }
183
184 #[cfg(all(not(feature = "ipv6"), not(feature = "ipv4")))]
185 pub(crate) async fn resolve(&self, dns: &impl Dns) -> Result<IpAddr, MqttError> {
186 match self {
187 crate::Host::Hostname(host) => panic!("dns resolution not supported, activate feature ipv4 or ipv6"),
188 crate::Host::Ip(ip) => Ok(ip.clone()),
189 }
190 }
191}
192
193
194#[derive(Clone)]
195pub struct ClientConfig<'a> {
196 pub host: Host<'a>,
197 pub port: Option<u16>,
198 pub client_id: &'a str,
199 pub credentials: Option<ClientCredentials<'a>>,
200 pub auto_subscribes: Vec<AutoSubscribe, 10>
201}
202
203impl <'a> ClientConfig<'a> {
204 pub fn new(host: Host<'a>, port: Option<u16>, client_id: &'a str, credentials: Option<ClientCredentials<'a>>) -> Self {
205 Self {
206 host,
207 port,
208 client_id,
209 credentials,
210 auto_subscribes: Vec::new()
211 }
212 }
213
214 pub fn new_with_auto_subscribes<'b>(host: Host<'a>, port: Option<u16>, client_id: &'a str, credentials: Option<ClientCredentials<'a>>, auto_subscribes: impl Iterator<Item = &'b str>, qos: QoS) -> Self {
215
216 let mut this = Self {
217 host,
218 port,
219 client_id,
220 credentials,
221 auto_subscribes: Vec::new()
222 };
223
224 for topic in auto_subscribes {
225 let mut topic_string = Topic::new();
226 topic_string.push_str(topic).unwrap();
227 let auto_subscribe = AutoSubscribe{
228 topic: topic_string,
229 qos
230 };
231 this.auto_subscribes.push(auto_subscribe).unwrap();
232 }
233
234
235 this
236 }
237}
238
239pub const MAX_TOPIC_SIZE: usize = 64;
240pub const MQTT_PAYLOAD_MAX_SIZE: usize = 1024;
241
242pub type Topic = heapless::String<MAX_TOPIC_SIZE>;
243
244
245#[derive(Debug, Clone, PartialEq)]
246#[cfg_attr(feature = "defmt", derive(defmt::Format))]
247pub(crate) enum MqttEvent {
248
249 PublishDone(UniqueID),
250 SubscribeDone(UniqueID, Result<QoS, MqttError>),
251 UnsubscribeDone(UniqueID)
252}
253
254struct UniqueIDPool {
255 next_unused: u32,
256 pool: Deque<u32, 16>
257}
258
259impl UniqueIDPool {
260
261 const fn new() -> Self {
262 Self {
263 next_unused: 0,
264 pool: Deque::new()
265 }
266 }
267
268 fn next(&mut self) -> UniqueID {
269 if let Some(id) = self.pool.pop_front() {
270 UniqueID(id)
271 } else {
272 self.take()
273 }
274 }
275
276 fn take(&mut self) -> UniqueID {
277 let id = self.next_unused;
278 if self.next_unused == u32::MAX {
279 panic!("used up all unique ids");
280 } else {
281 self.next_unused += 1;
282 }
283
284
285 UniqueID(id)
286 }
287
288 fn free(&mut self, id: UniqueID) {
289 self.pool.push_back(id.0)
290 .inspect_err(|err| {
291 error!("UniqueId pool full, dropping {} forever", err);
292 })
293 .unwrap()
294 }
295
296}
297
298static UNIQUE_ID_POOL: Mutex<CriticalSectionRawMutex, RefCell<UniqueIDPool>> = Mutex::new(RefCell::new(UniqueIDPool::new()));
299
300#[derive(Debug, PartialEq, Eq, Clone, Copy)]
301#[cfg_attr(feature = "defmt", derive(defmt::Format))]
302pub(crate) struct UniqueID(u32);
303
304#[cfg(test)]
305impl From<u32> for UniqueID {
306 fn from(value: u32) -> Self {
307 Self(value)
308 }
309}
310
311impl UniqueID {
312
313 pub(crate) fn new() -> Self {
314 UNIQUE_ID_POOL.lock(|inner| inner.borrow_mut().next())
315 }
316
317 pub(crate) fn free(self) {
318 UNIQUE_ID_POOL.lock(|inner| inner.borrow_mut().free(self))
319 }
320
321}
322
323
324
325