embedded_mqttc/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2
3use core::{cell::RefCell, net::IpAddr};
4
5use embassy_sync::blocking_mutex::{raw::CriticalSectionRawMutex, Mutex};
6use embedded_nal_async::{AddrType, Dns};
7use heapless::{Deque, String};
8use thiserror::Error;
9
10use heapless::Vec;
11
12use mqttrs2::{Pid, QosPid};
13pub use mqttrs2::QoS;
14
15use crate::fmt::Debug2Format;
16
17// This must come first so the macros are visible
18pub(crate) mod fmt;
19
20pub mod state;
21
22pub(crate) mod time;
23pub mod client;
24
25pub(crate) mod buffer;
26
27pub mod packet;
28
29pub(crate) mod mutex;
30
31#[cfg(test)]
32pub mod testutils;
33
34
35#[derive(Debug, Error, Clone, PartialEq)]
36#[cfg_attr(feature = "defmt", derive(defmt::Format))]
37pub enum MqttError {
38
39    #[error("TCP conenction failed")]
40    ConnectionFailed2(embedded_io_async::ErrorKind),
41
42    #[error("DNS failed")]
43    DnsFailed,
44
45    #[error("buffer too small")]
46    BufferTooSmall,
47
48    #[error("connection rejected by broker")]
49    ConnackError,
50
51    #[error("The connection was rejected because of invalid / missing authentication")]
52    AuthenticationError,
53
54    #[error("Error while encoding and decoding packages")]
55    CodecError(mqttrs2::Error),
56
57    #[error("Payload of received message is too long")]
58    ReceivedMessageTooLong,
59
60    #[error("The suback / unsuback packet arrived with an error code")]
61    SubscribeOrUnsubscribeFailed,
62
63    #[error("Some internal error occured")]
64    InternalError,
65
66    /// The send queue is full. According to the mqtt spec the connection should be closed in this situation
67    #[error("sending packet `{0:?}`: send queue full")]
68    QueueFull(QosPid),
69
70    #[error("received ack for unknown pid `{0:?}`")]
71    UnexpectedAck(Pid),
72
73    #[error("error with pubsub: `{0:?}`")]
74    PubsubError(embassy_sync::pubsub::Error),
75
76    #[error("topic size too big")]
77    TopicSizeError,
78}
79impl MqttError {
80    pub(crate) fn new_dns(err: &dyn core::fmt::Debug) -> Self {
81        error!("dns failed: {:?}", Debug2Format(err));
82        Self::DnsFailed
83    }
84}
85
86impl From<embedded_io_async::ErrorKind> for MqttError {
87    fn from(err_kind: embedded_io_async::ErrorKind) -> Self {
88        Self::ConnectionFailed2(err_kind)
89    }
90}
91
92impl From<embassy_sync::pubsub::Error>  for MqttError {
93    fn from(err: embassy_sync::pubsub::Error) -> Self {
94        Self::PubsubError(err)
95    }
96}
97
98impl From<mqttrs2::Error> for MqttError {
99    fn from(value: mqttrs2::Error) -> Self {
100        Self::CodecError(value)
101    }
102}
103
104
105/// Credentials used to connecto to the broker
106#[derive(Clone)]
107pub struct ClientCredentials<'a> {
108    pub username: &'a str,
109    pub password: &'a str,
110}
111
112impl <'a> ClientCredentials<'a> {
113    pub fn new(username: &'a str, password: &'a str) -> Self {
114        Self {
115            username, password
116        }
117    }
118}
119
120/// An [`AutoSubscribe`] is sent to the broker after connected.
121/// 
122/// It is also sent to the broker after reconnects. This should be the preferrd way to subscribe to topics.
123#[derive(Debug, Clone)]
124pub struct AutoSubscribe {
125    pub topic: Topic,
126    pub qos: QoS
127}
128
129impl TryInto<mqttrs2::SubscribeTopic> for &AutoSubscribe {
130    type Error = MqttError;
131
132    fn try_into(self) -> Result<mqttrs2::SubscribeTopic, Self::Error> {
133        let topic = mqttrs2::SubscribeTopic {
134            qos: self.qos,
135            topic_path: String::try_from(self.topic.as_ref())
136                .map_err(|_| MqttError::TopicSizeError)?
137        };
138        Ok(topic)
139    }
140}
141
142impl AutoSubscribe {
143    pub fn new(topic: &str, qos: QoS) -> Self {
144        let mut this = Self {
145            topic: Topic::new(),
146            qos
147        };
148        this.topic.push_str(topic).unwrap();
149        this
150    }
151}
152
153#[derive(Debug, Clone)]
154pub enum Host<'a> {
155    Hostname(&'a str),
156    Ip(IpAddr)
157}
158
159#[cfg(all(feature = "ipv4", not(feature = "ipv6")))]
160const IP_ADDR_TYPE: AddrType = AddrType::IPv4;
161
162#[cfg(all(feature = "ipv6", not(feature = "ipv4")))]
163const IP_ADDR_TYPE: AddrType = AddrType::IPv6;
164
165#[cfg(all(feature = "ipv4", feature = "ipv6"))]
166const IP_ADDR_TYPE: AddrType = AddrType::Either;
167
168impl<'a> Host<'a> {
169
170    #[cfg(any(feature = "ipv4", feature = "ipv6"))]
171    pub(crate) async fn resolve(&self, dns: &impl Dns) -> Result<IpAddr, MqttError> {
172        match self {
173            crate::Host::Hostname(host) => {
174                debug!("query dns to resolve hostname {}", host);
175                let ip = dns.get_host_by_name(host, IP_ADDR_TYPE).await
176                    .map_err(|err| MqttError::new_dns(&err))?;
177                debug!("dns resolved {} to {}", host, &ip);
178                Ok(ip)
179            },
180            crate::Host::Ip(ip) => Ok(ip.clone()),
181        }
182    }
183
184    #[cfg(all(not(feature = "ipv6"), not(feature = "ipv4")))]
185    pub(crate) async fn resolve(&self, dns: &impl Dns) -> Result<IpAddr, MqttError> {
186        match self {
187            crate::Host::Hostname(host) => panic!("dns resolution not supported, activate feature ipv4 or ipv6"),
188            crate::Host::Ip(ip) => Ok(ip.clone()),
189        }
190    }
191}
192
193
194#[derive(Clone)]
195pub struct ClientConfig<'a> {
196    pub host: Host<'a>,
197    pub port: Option<u16>,
198    pub client_id: &'a str,
199    pub credentials: Option<ClientCredentials<'a>>,
200    pub auto_subscribes: Vec<AutoSubscribe, 10>
201}
202
203impl <'a> ClientConfig<'a> {
204    pub fn new(host: Host<'a>, port: Option<u16>, client_id: &'a str, credentials: Option<ClientCredentials<'a>>) -> Self {
205        Self {
206            host,
207            port,
208            client_id,
209            credentials,
210            auto_subscribes: Vec::new()
211        }
212    }
213
214    pub fn new_with_auto_subscribes<'b>(host: Host<'a>, port: Option<u16>, client_id: &'a str, credentials: Option<ClientCredentials<'a>>, auto_subscribes: impl Iterator<Item = &'b str>, qos: QoS) -> Self {
215
216        let mut this = Self {
217            host,
218            port,
219            client_id,
220            credentials,
221            auto_subscribes: Vec::new()
222        };
223
224        for topic in auto_subscribes {
225            let mut topic_string = Topic::new();
226            topic_string.push_str(topic).unwrap();
227            let auto_subscribe = AutoSubscribe{
228                topic: topic_string,
229                qos
230            };
231            this.auto_subscribes.push(auto_subscribe).unwrap();
232        }
233
234
235        this
236    }
237}
238
239pub const MAX_TOPIC_SIZE: usize = 64;
240pub const MQTT_PAYLOAD_MAX_SIZE: usize = 1024;
241
242pub type Topic = heapless::String<MAX_TOPIC_SIZE>;
243
244
245#[derive(Debug, Clone, PartialEq)]
246#[cfg_attr(feature = "defmt", derive(defmt::Format))]
247pub(crate) enum MqttEvent {
248
249    PublishDone(UniqueID),
250    SubscribeDone(UniqueID, Result<QoS, MqttError>),
251    UnsubscribeDone(UniqueID)
252}
253
254struct UniqueIDPool {
255    next_unused: u32,
256    pool: Deque<u32, 16>
257}
258
259impl UniqueIDPool {
260
261    const fn new() -> Self {
262        Self {
263            next_unused: 0,
264            pool: Deque::new()
265        }
266    }
267
268    fn next(&mut self) -> UniqueID {
269        if let Some(id) = self.pool.pop_front() {
270            UniqueID(id)
271        } else {
272            self.take()
273        }
274    }
275
276    fn take(&mut self) -> UniqueID {
277        let id = self.next_unused;
278        if self.next_unused == u32::MAX {
279            panic!("used up all unique ids");
280        } else {
281            self.next_unused += 1;
282        }
283
284
285        UniqueID(id)
286    }
287
288    fn free(&mut self, id: UniqueID) {
289        self.pool.push_back(id.0)
290            .inspect_err(|err| {
291                error!("UniqueId pool full, dropping {} forever", err);
292            })
293        .unwrap()
294    }
295
296}
297
298static UNIQUE_ID_POOL: Mutex<CriticalSectionRawMutex, RefCell<UniqueIDPool>> = Mutex::new(RefCell::new(UniqueIDPool::new()));
299
300#[derive(Debug, PartialEq, Eq, Clone, Copy)]
301#[cfg_attr(feature = "defmt", derive(defmt::Format))]
302pub(crate) struct UniqueID(u32);
303
304#[cfg(test)]
305impl From<u32> for UniqueID {
306    fn from(value: u32) -> Self {
307        Self(value)
308    }
309}
310
311impl UniqueID {
312
313    pub(crate) fn new() -> Self {
314        UNIQUE_ID_POOL.lock(|inner| inner.borrow_mut().next())
315    }
316
317    pub(crate) fn free(self) {
318        UNIQUE_ID_POOL.lock(|inner| inner.borrow_mut().free(self))
319    }
320
321}
322
323
324
325