1const CHANNEL_SIZE: usize = 100;
95
96mod available_packet_ids;
97mod client;
98mod connect_options;
99mod state_handler;
100mod util;
101
102#[cfg(feature = "smol")]
106pub mod smol;
107#[cfg(feature = "tokio")]
111pub mod tokio;
112
113pub mod error;
117
118mod event_handlers;
122pub mod packets;
124mod state;
125
126pub use event_handlers::*;
127
128pub use client::MqttClient;
129pub use connect_options::ConnectOptions;
130use state_handler::StateHandler;
131
132use std::marker::PhantomData;
133#[cfg(test)]
134pub mod tests;
135
136#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
139pub enum NetworkStatus {
140 ShutdownSignal,
142 IncomingDisconnect,
144 OutgoingDisconnect,
146 KeepAliveTimeout,
148}
149
150#[derive(Debug)]
151pub struct NetworkBuilder<H, S> {
152 handler: PhantomData<H>,
153 stream: PhantomData<S>,
154 options: ConnectOptions,
155}
156
157impl<H, S> NetworkBuilder<H, S> {
158 #[inline]
159 pub const fn new_from_options(options: ConnectOptions) -> Self {
160 Self {
161 handler: PhantomData,
162 stream: PhantomData,
163 options,
164 }
165 }
166 #[inline]
167 pub fn new_from_client_id<C: AsRef<str>>(client_id: C) -> Self {
168 let options = ConnectOptions::new(client_id);
169 Self {
170 handler: PhantomData,
171 stream: PhantomData,
172 options,
173 }
174 }
175}
176
177#[cfg(feature = "tokio")]
178impl<H, S> NetworkBuilder<H, S>
179where
180 H: AsyncEventHandler,
181 S: ::tokio::io::AsyncRead + ::tokio::io::AsyncWrite + Sized + Unpin,
182{
183 pub fn tokio_network(self) -> (tokio::Network<H, S>, MqttClient)
195 where
196 H: AsyncEventHandler,
197 {
198 let (to_network_s, to_network_r) = async_channel::bounded(CHANNEL_SIZE);
199
200 let (apkids, apkids_r) = available_packet_ids::AvailablePacketIds::new(self.options.send_maximum());
201
202 let max_packet_size = self.options.maximum_packet_size();
203
204 let client = MqttClient::new(apkids_r, to_network_s, max_packet_size);
205
206 let network = tokio::Network::new(self.options, to_network_r, apkids);
207
208 (network, client)
209 }
210}
211
212#[cfg(feature = "smol")]
213impl<H, S> NetworkBuilder<H, S>
214where
215 H: AsyncEventHandler,
216 S: ::smol::io::AsyncRead + ::smol::io::AsyncWrite + Sized + Unpin,
217{
218 pub fn smol_network(self) -> (smol::Network<H, S>, MqttClient) {
225 let (to_network_s, to_network_r) = async_channel::bounded(CHANNEL_SIZE);
226
227 let (apkids, apkids_r) = available_packet_ids::AvailablePacketIds::new(self.options.send_maximum());
228
229 let max_packet_size = self.options.maximum_packet_size();
230
231 let client = MqttClient::new(apkids_r, to_network_s, max_packet_size);
232
233 let network = smol::Network::<H, S>::new(self.options, to_network_r, apkids);
234
235 (network, client)
236 }
237}
238
239#[cfg(test)]
240fn random_chars() -> String {
241 rand::Rng::sample_iter(rand::thread_rng(), &rand::distributions::Alphanumeric).take(7).map(char::from).collect()
242}
243
244#[cfg(feature = "smol")]
245#[cfg(test)]
246mod smol_lib_test {
247
248 use std::time::Duration;
249
250 use rand::Rng;
251
252 use crate::{example_handlers::PingPong, packets::QoS, random_chars, ConnectOptions, NetworkBuilder};
253
254 #[test]
255 fn test_smol_tcp() {
256 smol::block_on(async {
257 let mut client_id: String = random_chars();
258 client_id += "_SmolTcpPingPong";
259 let options = ConnectOptions::new(client_id);
260
261 let address = "broker.emqx.io";
262 let port = 1883;
263
264 let (mut network, client) = NetworkBuilder::new_from_options(options).smol_network();
265
266 let stream = smol::net::TcpStream::connect((address, port)).await.unwrap();
267 let mut pingpong = PingPong::new(client.clone());
268
269 network.connect(stream, &mut pingpong).await.unwrap();
270
271 client.subscribe("mqrstt").await.unwrap();
272
273 let (n, _) = futures::join!(async { network.run(&mut pingpong).await }, async {
274 client.publish("mqrstt".to_string(), QoS::ExactlyOnce, false, b"ping".repeat(500)).await.unwrap();
275 client.publish("mqrstt".to_string(), QoS::AtMostOnce, true, b"ping".to_vec()).await.unwrap();
276 client.publish("mqrstt".to_string(), QoS::AtLeastOnce, false, b"ping".to_vec()).await.unwrap();
277 client.publish("mqrstt".to_string(), QoS::ExactlyOnce, false, b"ping".repeat(500)).await.unwrap();
278
279 smol::Timer::after(std::time::Duration::from_secs(20)).await;
280 client.unsubscribe("mqrstt").await.unwrap();
281 smol::Timer::after(std::time::Duration::from_secs(5)).await;
282 client.disconnect().await.unwrap();
283 });
284 assert!(n.is_ok());
285 });
286 }
287
288 #[test]
289 fn test_smol_ping_req() {
290 smol::block_on(async {
291 let mut client_id: String = rand::thread_rng().sample_iter(&rand::distributions::Alphanumeric).take(7).map(char::from).collect();
292 client_id += "_SmolTcppingrespTest";
293 let mut options = ConnectOptions::new(client_id);
294 options.set_keep_alive_interval(Duration::from_secs(5));
295
296 let sleep_duration = options.get_keep_alive_interval() * 2 + options.get_keep_alive_interval() / 2;
297
298 let address = "broker.emqx.io";
299 let port = 1883;
300
301 let (mut network, client) = NetworkBuilder::new_from_options(options).smol_network();
302 let stream = smol::net::TcpStream::connect((address, port)).await.unwrap();
303
304 let mut pingresp = crate::example_handlers::PingResp::new(client.clone());
305
306 network.connect(stream, &mut pingresp).await.unwrap();
307
308 let (n, _) = futures::join!(
309 async {
310 match network.run(&mut pingresp).await {
311 Ok(crate::NetworkStatus::OutgoingDisconnect) => return Ok(pingresp),
312 Ok(crate::NetworkStatus::ShutdownSignal) => unreachable!(),
313 Ok(crate::NetworkStatus::KeepAliveTimeout) => panic!(),
314 Ok(crate::NetworkStatus::IncomingDisconnect) => panic!(),
315 Err(err) => return Err(err),
316 }
317 },
318 async {
319 smol::Timer::after(sleep_duration).await;
320 client.disconnect().await.unwrap();
321 }
322 );
323 assert!(n.is_ok());
324 let pingresp = n.unwrap();
325 assert_eq!(2, pingresp.ping_resp_received);
326 });
327 }
328
329 #[cfg(target_family = "windows")]
330 #[test]
331 fn test_close_write_tcp_stream_smol() {
332 use crate::error::ConnectionError;
333 use std::io::ErrorKind;
334
335 smol::block_on(async {
336 let mut client_id: String = rand::thread_rng().sample_iter(&rand::distributions::Alphanumeric).take(7).map(char::from).collect();
337 client_id += "_SmolTcppingrespTest";
338 let options = ConnectOptions::new(client_id);
339
340 let address = "127.0.0.1";
341 let port = 2001;
342
343 let listener = smol::net::TcpListener::bind((address, port)).await.unwrap();
344
345 let (n, _) = futures::join!(
346 async {
347 let (mut network, client) = NetworkBuilder::new_from_options(options).smol_network();
348 let stream = smol::net::TcpStream::connect((address, port)).await.unwrap();
349 let mut pingresp = crate::example_handlers::PingResp::new(client.clone());
350 network.connect(stream, &mut pingresp).await
351 },
352 async move {
353 let (stream, _) = listener.accept().await.unwrap();
354 smol::Timer::after(std::time::Duration::from_secs(10)).await;
355 stream.shutdown(std::net::Shutdown::Write).unwrap();
356 }
357 );
358 if let ConnectionError::Io(err) = n.unwrap_err() {
359 assert_eq!(ErrorKind::ConnectionReset, err.kind());
360 assert_eq!("Connection reset by peer".to_string(), err.to_string());
361 } else {
362 panic!();
363 }
364 });
365 }
366}
367
368#[cfg(feature = "tokio")]
369#[cfg(test)]
370mod tokio_lib_test {
371 use crate::example_handlers::PingResp;
372 use crate::random_chars;
373 use crate::ConnectOptions;
374
375 use std::time::Duration;
376
377 #[tokio::test]
378 async fn test_tokio_ping_req() {
379 let mut client_id: String = random_chars();
380 client_id += "_TokioTcppingrespTest";
381 let mut options = ConnectOptions::new(client_id);
382 let keep_alive_interval = 5;
383 options.set_keep_alive_interval(Duration::from_secs(keep_alive_interval));
384
385 let wait_duration = options.get_keep_alive_interval() * 2 + options.get_keep_alive_interval() / 2;
386
387 let (mut network, client) = crate::NetworkBuilder::new_from_options(options).tokio_network();
388
389 let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883)).await.unwrap();
390
391 let mut pingresp = PingResp::new(client.clone());
392
393 network.connect(stream, &mut pingresp).await.unwrap();
394
395 let network_handle = tokio::task::spawn(async move {
396 let _result = network.run(&mut pingresp).await;
397 pingresp
399 });
400
401 tokio::time::sleep(wait_duration).await;
402 client.disconnect().await.unwrap();
403
404 tokio::time::sleep(Duration::from_secs(1)).await;
405
406 let result = network_handle.await;
407 assert!(result.is_ok());
408 let result = result.unwrap();
409 assert_eq!(2, result.ping_resp_received);
410 }
411
412 #[cfg(all(feature = "tokio", target_family = "windows"))]
413 #[tokio::test]
414 async fn test_close_write_tcp_stream_tokio() {
415 use crate::{error::ConnectionError, NetworkBuilder};
416 use core::panic;
417 use std::io::ErrorKind;
418
419 let address = ("127.0.0.1", 2000);
420
421 let client_id: String = crate::random_chars() + "_TokioTcppingrespTest";
422 let options = crate::ConnectOptions::new(client_id);
423
424 let (n, _) = tokio::join!(
425 async move {
426 let (mut network, client) = NetworkBuilder::new_from_options(options).tokio_network();
427
428 let stream = tokio::net::TcpStream::connect(address).await.unwrap();
429
430 let mut pingresp = crate::example_handlers::PingResp::new(client.clone());
431
432 network.connect(stream, &mut pingresp).await
433 },
434 async move {
435 let listener = smol::net::TcpListener::bind(address).await.unwrap();
436 let (stream, _) = listener.accept().await.unwrap();
437 tokio::time::sleep(Duration::new(10, 0)).await;
438 stream.shutdown(std::net::Shutdown::Write).unwrap();
439 }
440 );
441
442 if let ConnectionError::Io(err) = n.unwrap_err() {
443 assert_eq!(ErrorKind::UnexpectedEof, err.kind());
444 } else {
445 panic!();
446 }
447 }
448}