mqrstt/
lib.rs

1//! A pure rust MQTT client which is easy to use, efficient and provides both sync and async options.
2//!
3//! Because this crate aims to be runtime agnostic the user is required to provide their own data stream.
4//! For an async approach the stream has to implement the `AsyncRead` and `AsyncWrite` traits.
5//! That is [`::tokio::io::AsyncRead`] and [`::tokio::io::AsyncWrite`] for tokio and [`::smol::io::AsyncRead`] and [`::smol::io::AsyncWrite`] for smol.
6//!
7//!
8//!
9//! Features:
10//! ----------------------------
11//! - MQTT v5
12//! - Runtime agnostic (Smol, Tokio)
13//! - Sync
14//! - TLS/TCP
15//! - Lean
16//! - Keep alive depends on actual communication
17//! - This tokio implemention has been fuzzed using cargo-fuzz!
18//!   
19//! To do:
20//! ----------------------------
21//! - Even More testing
22//! - Add TLS examples to repository
23//!
24//! Minimum Supported Rust Version (MSRV):
25//! ----------------------------
26//! From 0.3 the tokio and smol variants will require MSRV: 1.75 due to async fn in trait feature.
27//!
28//! Notes:
29//! ----------------------------
30//! - Your handler should not wait too long
31//! - Create a new connection when an error or disconnect is encountered
32//! - Handlers only get incoming packets
33//!
34//! Smol example:
35//! ----------------------------
36//! ```rust
37//! use mqrstt::{example_handlers::NOP, NetworkBuilder, NetworkStatus};
38//!
39//! smol::block_on(async {
40//!     // Construct a no op handler
41//!     let mut nop = NOP {};
42//!
43//!     // In normal operations you would want to loop this connection
44//!     // To reconnect after a disconnect or error
45//!     let (mut network, client) = NetworkBuilder::new_from_client_id("mqrsttSmolExample").smol_network();
46//!     let stream = smol::net::TcpStream::connect(("broker.emqx.io", 1883)).await.unwrap();
47//!     network.connect(stream, &mut nop).await.unwrap();
48//!
49//!     // This subscribe is only processed when we run the network
50//!     client.subscribe("mqrstt").await.unwrap();
51//!
52//!     let (result, _) = futures::join!(network.run(&mut nop), async {
53//!         smol::Timer::after(std::time::Duration::from_secs(30)).await;
54//!         client.disconnect().await.unwrap();
55//!     });
56//!     assert!(result.is_ok());
57//!     assert_eq!(result.unwrap(), NetworkStatus::OutgoingDisconnect);
58//! });
59//! ```
60//!
61//!
62//!  Tokio example:
63//! ----------------------------
64//! ```rust
65//! use mqrstt::{
66//!     example_handlers::NOP,
67//!     NetworkBuilder, NetworkStatus,
68//! };
69//!
70//! use tokio::time::Duration;
71//!
72//! #[tokio::main]
73//! async fn main() {
74//!     let (mut network, client) = NetworkBuilder::new_from_client_id("TokioTcpPingPongExample").tokio_network();
75//!     // Construct a no op handler
76//!     let mut nop = NOP {};
77//!     // In normal operations you would want to loop this connection
78//!     // To reconnect after a disconnect or error
79//!     let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883)).await.unwrap();
80//!     network.connect(stream, &mut nop).await.unwrap();
81//!
82//!     client.subscribe("mqrstt").await.unwrap();
83//!     // Run the network
84//!     let network_handle = tokio::spawn(async move { network.run(&mut nop).await });
85//!
86//!     tokio::time::sleep(Duration::from_secs(30)).await;
87//!     client.disconnect().await.unwrap();
88//!     let result = network_handle.await;
89//!     assert!(result.is_ok());
90//!     assert_eq!(result.unwrap().unwrap(), NetworkStatus::OutgoingDisconnect);
91//! }
92//! ```
93
94const CHANNEL_SIZE: usize = 100;
95
96mod available_packet_ids;
97mod client;
98mod connect_options;
99mod state_handler;
100mod util;
101
102/// Contains the reader writer parts for the smol runtime.
103///
104/// Module [`crate::smol`] only contains a synchronized approach to call the users `Handler`.
105#[cfg(feature = "smol")]
106pub mod smol;
107/// Contains the reader and writer parts for the tokio runtime.
108///
109/// Module [`crate::tokio`] contains both a synchronized and concurrent approach to call the users `Handler`.
110#[cfg(feature = "tokio")]
111pub mod tokio;
112
113/// Error types that the user can see during operation of the client.
114///
115/// Wraps all other errors that can be encountered.
116pub mod error;
117
118/// All event handler traits are defined here.
119///
120/// Event handlers are used to process incoming packets.
121mod event_handlers;
122/// All MQTT packets are defined here
123pub mod packets;
124mod state;
125
126pub use event_handlers::*;
127
128pub use client::MqttClient;
129pub use connect_options::ConnectOptions;
130use state_handler::StateHandler;
131
132use std::marker::PhantomData;
133#[cfg(test)]
134pub mod tests;
135
136/// [`NetworkStatus`] Represents status of the Network object.
137/// It is returned when the run handle returns from performing an operation.
138#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
139pub enum NetworkStatus {
140    /// The other side indicated a shutdown shutdown, Only used in concurrent context
141    ShutdownSignal,
142    /// Indicate that there was an incoming disconnect and the socket has been closed.
143    IncomingDisconnect,
144    /// Indicate that an outgoing disconnect has been transmited and the socket is closed
145    OutgoingDisconnect,
146    /// The server did not respond to the ping request and the socket has been closed
147    KeepAliveTimeout,
148}
149
150#[derive(Debug)]
151pub struct NetworkBuilder<H, S> {
152    handler: PhantomData<H>,
153    stream: PhantomData<S>,
154    options: ConnectOptions,
155}
156
157impl<H, S> NetworkBuilder<H, S> {
158    #[inline]
159    pub const fn new_from_options(options: ConnectOptions) -> Self {
160        Self {
161            handler: PhantomData,
162            stream: PhantomData,
163            options,
164        }
165    }
166    #[inline]
167    pub fn new_from_client_id<C: AsRef<str>>(client_id: C) -> Self {
168        let options = ConnectOptions::new(client_id);
169        Self {
170            handler: PhantomData,
171            stream: PhantomData,
172            options,
173        }
174    }
175}
176
177#[cfg(feature = "tokio")]
178impl<H, S> NetworkBuilder<H, S>
179where
180    H: AsyncEventHandler,
181    S: ::tokio::io::AsyncRead + ::tokio::io::AsyncWrite + Sized + Unpin,
182{
183    /// Creates the needed components to run the MQTT client using a stream that implements [`::tokio::io::AsyncRead`] and [`::tokio::io::AsyncWrite`]
184    ///
185    /// # Example
186    /// ```
187    /// use mqrstt::ConnectOptions;
188    ///
189    /// let options = ConnectOptions::new("ExampleClient");
190    /// let (mut network, client) = mqrstt::NetworkBuilder::<(), tokio::net::TcpStream>
191    ///     ::new_from_options(options)
192    ///     .tokio_network();
193    /// ```
194    pub fn tokio_network(self) -> (tokio::Network<H, S>, MqttClient)
195    where
196        H: AsyncEventHandler,
197    {
198        let (to_network_s, to_network_r) = async_channel::bounded(CHANNEL_SIZE);
199
200        let (apkids, apkids_r) = available_packet_ids::AvailablePacketIds::new(self.options.send_maximum());
201
202        let max_packet_size = self.options.maximum_packet_size();
203
204        let client = MqttClient::new(apkids_r, to_network_s, max_packet_size);
205
206        let network = tokio::Network::new(self.options, to_network_r, apkids);
207
208        (network, client)
209    }
210}
211
212#[cfg(feature = "smol")]
213impl<H, S> NetworkBuilder<H, S>
214where
215    H: AsyncEventHandler,
216    S: ::smol::io::AsyncRead + ::smol::io::AsyncWrite + Sized + Unpin,
217{
218    /// Creates the needed components to run the MQTT client using a stream that implements [`::tokio::io::AsyncRead`]  and [`::tokio::io::AsyncWrite`]
219    /// ```
220    /// let (mut network, client) = mqrstt::NetworkBuilder::<(), smol::net::TcpStream>
221    ///     ::new_from_client_id("ExampleClient")
222    ///     .smol_network();
223    /// ```
224    pub fn smol_network(self) -> (smol::Network<H, S>, MqttClient) {
225        let (to_network_s, to_network_r) = async_channel::bounded(CHANNEL_SIZE);
226
227        let (apkids, apkids_r) = available_packet_ids::AvailablePacketIds::new(self.options.send_maximum());
228
229        let max_packet_size = self.options.maximum_packet_size();
230
231        let client = MqttClient::new(apkids_r, to_network_s, max_packet_size);
232
233        let network = smol::Network::<H, S>::new(self.options, to_network_r, apkids);
234
235        (network, client)
236    }
237}
238
239#[cfg(test)]
240fn random_chars() -> String {
241    rand::Rng::sample_iter(rand::thread_rng(), &rand::distributions::Alphanumeric).take(7).map(char::from).collect()
242}
243
244#[cfg(feature = "smol")]
245#[cfg(test)]
246mod smol_lib_test {
247
248    use std::time::Duration;
249
250    use rand::Rng;
251
252    use crate::{example_handlers::PingPong, packets::QoS, random_chars, ConnectOptions, NetworkBuilder};
253
254    #[test]
255    fn test_smol_tcp() {
256        smol::block_on(async {
257            let mut client_id: String = random_chars();
258            client_id += "_SmolTcpPingPong";
259            let options = ConnectOptions::new(client_id);
260
261            let address = "broker.emqx.io";
262            let port = 1883;
263
264            let (mut network, client) = NetworkBuilder::new_from_options(options).smol_network();
265
266            let stream = smol::net::TcpStream::connect((address, port)).await.unwrap();
267            let mut pingpong = PingPong::new(client.clone());
268
269            network.connect(stream, &mut pingpong).await.unwrap();
270
271            client.subscribe("mqrstt").await.unwrap();
272
273            let (n, _) = futures::join!(async { network.run(&mut pingpong).await }, async {
274                client.publish("mqrstt".to_string(), QoS::ExactlyOnce, false, b"ping".repeat(500)).await.unwrap();
275                client.publish("mqrstt".to_string(), QoS::AtMostOnce, true, b"ping".to_vec()).await.unwrap();
276                client.publish("mqrstt".to_string(), QoS::AtLeastOnce, false, b"ping".to_vec()).await.unwrap();
277                client.publish("mqrstt".to_string(), QoS::ExactlyOnce, false, b"ping".repeat(500)).await.unwrap();
278
279                smol::Timer::after(std::time::Duration::from_secs(20)).await;
280                client.unsubscribe("mqrstt").await.unwrap();
281                smol::Timer::after(std::time::Duration::from_secs(5)).await;
282                client.disconnect().await.unwrap();
283            });
284            assert!(n.is_ok());
285        });
286    }
287
288    #[test]
289    fn test_smol_ping_req() {
290        smol::block_on(async {
291            let mut client_id: String = rand::thread_rng().sample_iter(&rand::distributions::Alphanumeric).take(7).map(char::from).collect();
292            client_id += "_SmolTcppingrespTest";
293            let mut options = ConnectOptions::new(client_id);
294            options.set_keep_alive_interval(Duration::from_secs(5));
295
296            let sleep_duration = options.get_keep_alive_interval() * 2 + options.get_keep_alive_interval() / 2;
297
298            let address = "broker.emqx.io";
299            let port = 1883;
300
301            let (mut network, client) = NetworkBuilder::new_from_options(options).smol_network();
302            let stream = smol::net::TcpStream::connect((address, port)).await.unwrap();
303
304            let mut pingresp = crate::example_handlers::PingResp::new(client.clone());
305
306            network.connect(stream, &mut pingresp).await.unwrap();
307
308            let (n, _) = futures::join!(
309                async {
310                    match network.run(&mut pingresp).await {
311                        Ok(crate::NetworkStatus::OutgoingDisconnect) => return Ok(pingresp),
312                        Ok(crate::NetworkStatus::ShutdownSignal) => unreachable!(),
313                        Ok(crate::NetworkStatus::KeepAliveTimeout) => panic!(),
314                        Ok(crate::NetworkStatus::IncomingDisconnect) => panic!(),
315                        Err(err) => return Err(err),
316                    }
317                },
318                async {
319                    smol::Timer::after(sleep_duration).await;
320                    client.disconnect().await.unwrap();
321                }
322            );
323            assert!(n.is_ok());
324            let pingresp = n.unwrap();
325            assert_eq!(2, pingresp.ping_resp_received);
326        });
327    }
328
329    #[cfg(target_family = "windows")]
330    #[test]
331    fn test_close_write_tcp_stream_smol() {
332        use crate::error::ConnectionError;
333        use std::io::ErrorKind;
334
335        smol::block_on(async {
336            let mut client_id: String = rand::thread_rng().sample_iter(&rand::distributions::Alphanumeric).take(7).map(char::from).collect();
337            client_id += "_SmolTcppingrespTest";
338            let options = ConnectOptions::new(client_id);
339
340            let address = "127.0.0.1";
341            let port = 2001;
342
343            let listener = smol::net::TcpListener::bind((address, port)).await.unwrap();
344
345            let (n, _) = futures::join!(
346                async {
347                    let (mut network, client) = NetworkBuilder::new_from_options(options).smol_network();
348                    let stream = smol::net::TcpStream::connect((address, port)).await.unwrap();
349                    let mut pingresp = crate::example_handlers::PingResp::new(client.clone());
350                    network.connect(stream, &mut pingresp).await
351                },
352                async move {
353                    let (stream, _) = listener.accept().await.unwrap();
354                    smol::Timer::after(std::time::Duration::from_secs(10)).await;
355                    stream.shutdown(std::net::Shutdown::Write).unwrap();
356                }
357            );
358            if let ConnectionError::Io(err) = n.unwrap_err() {
359                assert_eq!(ErrorKind::ConnectionReset, err.kind());
360                assert_eq!("Connection reset by peer".to_string(), err.to_string());
361            } else {
362                panic!();
363            }
364        });
365    }
366}
367
368#[cfg(feature = "tokio")]
369#[cfg(test)]
370mod tokio_lib_test {
371    use crate::example_handlers::PingResp;
372    use crate::random_chars;
373    use crate::ConnectOptions;
374
375    use std::time::Duration;
376
377    #[tokio::test]
378    async fn test_tokio_ping_req() {
379        let mut client_id: String = random_chars();
380        client_id += "_TokioTcppingrespTest";
381        let mut options = ConnectOptions::new(client_id);
382        let keep_alive_interval = 5;
383        options.set_keep_alive_interval(Duration::from_secs(keep_alive_interval));
384
385        let wait_duration = options.get_keep_alive_interval() * 2 + options.get_keep_alive_interval() / 2;
386
387        let (mut network, client) = crate::NetworkBuilder::new_from_options(options).tokio_network();
388
389        let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883)).await.unwrap();
390
391        let mut pingresp = PingResp::new(client.clone());
392
393        network.connect(stream, &mut pingresp).await.unwrap();
394
395        let network_handle = tokio::task::spawn(async move {
396            let _result = network.run(&mut pingresp).await;
397            // check result and or restart the connection
398            pingresp
399        });
400
401        tokio::time::sleep(wait_duration).await;
402        client.disconnect().await.unwrap();
403
404        tokio::time::sleep(Duration::from_secs(1)).await;
405
406        let result = network_handle.await;
407        assert!(result.is_ok());
408        let result = result.unwrap();
409        assert_eq!(2, result.ping_resp_received);
410    }
411
412    #[cfg(all(feature = "tokio", target_family = "windows"))]
413    #[tokio::test]
414    async fn test_close_write_tcp_stream_tokio() {
415        use crate::{error::ConnectionError, NetworkBuilder};
416        use core::panic;
417        use std::io::ErrorKind;
418
419        let address = ("127.0.0.1", 2000);
420
421        let client_id: String = crate::random_chars() + "_TokioTcppingrespTest";
422        let options = crate::ConnectOptions::new(client_id);
423
424        let (n, _) = tokio::join!(
425            async move {
426                let (mut network, client) = NetworkBuilder::new_from_options(options).tokio_network();
427
428                let stream = tokio::net::TcpStream::connect(address).await.unwrap();
429
430                let mut pingresp = crate::example_handlers::PingResp::new(client.clone());
431
432                network.connect(stream, &mut pingresp).await
433            },
434            async move {
435                let listener = smol::net::TcpListener::bind(address).await.unwrap();
436                let (stream, _) = listener.accept().await.unwrap();
437                tokio::time::sleep(Duration::new(10, 0)).await;
438                stream.shutdown(std::net::Shutdown::Write).unwrap();
439            }
440        );
441
442        if let ConnectionError::Io(err) = n.unwrap_err() {
443            assert_eq!(ErrorKind::UnexpectedEof, err.kind());
444        } else {
445            panic!();
446        }
447    }
448}