actix_mqtt_client/
lib.rs

1//! # A MQTT client based on actix framework
2//!
3//! The `actix-mqtt-client` crate is a mqtt client based on the [actix](https://github.com/actix/actix) framework
4//!
5//! ## Basic usage and example
6//!
7//! First, create 2 actix actors, one for receiving publish messages, the other one for receiving error messages from the client, you can also create an optional actix actor for receiving the stop message:
8//! ```rust
9//! pub struct ErrorActor;
10//!
11//! impl actix::Actor for ErrorActor {
12//!     type Context = actix::Context<Self>;
13//! }
14//!
15//! impl actix::Handler<ErrorMessage> for ErrorActor {
16//!     type Result = ();
17//!     fn handle(&mut self, error: ErrorMessage, _: &mut Self::Context) -> Self::Result {
18//!         log::error!("{}", error.0);
19//!     }
20//! }
21//!
22//! pub struct MessageActor;
23//!
24//! impl actix::Actor for MessageActor {
25//!     type Context = actix::Context<Self>;
26//! }
27//!
28//! impl actix::Handler<PublishMessage> for MessageActor {
29//!     type Result = ();
30//!     fn handle(
31//!         &mut self,
32//!         msg: PublishMessage,
33//!         _: &mut Self::Context,
34//!     ) -> Self::Result {
35//!         log::info!(
36//!             "Got message: id:{}, topic: {}, payload: {:?}",
37//!             msg.id,
38//!             msg.topic_name,
39//!             msg.payload
40//!         );
41//!     }
42//! }
43//! ```
44//!
45//! Then, connect to the server(using tokio) and use the read and write part of the stream along with the actors to create a [MqttClient](struct.MqttClient.html):
46//! ```rust
47//! use std::io::Error as IoError;
48//! use std::net::SocketAddr;
49//! use std::str::FromStr;
50//! use std::time::Duration;
51//! use actix::{Actor, Arbiter, System};
52//! use env_logger;
53//! use tokio::io::split;
54//! use tokio::net::TcpStream;
55//! use tokio::time::{sleep_until, Instant};
56//! use actix_mqtt_client::client::{MqttClient, MqttOptions};
57//!
58//! let sys = System::new();
59//! let socket_addr = SocketAddr::from_str("127.0.0.1:1883").unwrap();
60//! sys.block_on(async move {
61//!    let result = async move {
62//!        let stream = TcpStream::connect(socket_addr).await?;
63//!        let (r, w) = split(stream);
64//!        log::info!("TCP connected");
65//!        let mut client = MqttClient::new(
66//!            r,
67//!            w,
68//!            String::from("test"),
69//!            MqttOptions::default(),
70//!            MessageActor.start().recipient(),
71//!            ErrorActor.start().recipient(),
72//!            None,
73//!        );
74//!        client.connect().await?;
75//!        // Waiting for the client to be connected
76//!        while !client.is_connected().await? {
77//!            let delay_time = Instant::now() + Duration::new(1, 0);
78//!            sleep_until(delay_time).await;
79//!        }
80//!        log::info!("MQTT connected");
81//!        log::info!("Subscribe");
82//!        client
83//!            .subscribe(String::from("test"), mqtt::QualityOfService::Level2)
84//!            .await?;
85//!        log::info!("Publish");
86//!        client
87//!            .publish(
88//!                String::from("test"),
89//!                mqtt::QualityOfService::Level0,
90//!                Vec::from("test".as_bytes()),
91//!            )
92//!            .await?;
93//!        log::info!("Wait for 10s");
94//!        let delay_time = Instant::now() + Duration::new(10, 0);
95//!        sleep_until(delay_time).await;
96//!        client
97//!            .publish(
98//!                String::from("test"),
99//!                mqtt::QualityOfService::Level1,
100//!                Vec::from("test2".as_bytes()),
101//!            )
102//!            .await?;
103//!        log::info!("Wait for 10s");
104//!        let delay_time = Instant::now() + Duration::new(10, 0);
105//!        sleep_until(delay_time).await;
106//!        client
107//!            .publish(
108//!                String::from("test"),
109//!                mqtt::QualityOfService::Level2,
110//!                Vec::from("test3".as_bytes()),
111//!            )
112//!            .await?;
113//!        log::info!("Wait for 10s");
114//!        let delay_time = Instant::now() + Duration::new(10, 0);
115//!        sleep_until(delay_time).await;
116//!        log::info!("Disconnect");
117//!        client.disconnect(false).await?;
118//!        log::info!("Check if disconnect is successful");
119//!        Ok(assert_eq!(true, client.is_disconnected())) as Result<(), IoError>
120//!    }
121//!    .await;
122//!    result.unwrap()
123//! });
124//! sys.run().unwrap();
125//! ```
126
127mod actors;
128mod client;
129mod consts;
130
131pub use actix;
132pub use futures;
133pub use mqtt::QualityOfService;
134pub use tokio;
135
136pub use crate::actors::packets::PublishMessage;
137pub use crate::actors::{ErrorMessage, StopMessage};
138pub use crate::client::{MqttClient, MqttOptions};
139
140#[cfg(test)]
141mod tests {
142    pub struct ErrorActor;
143
144    impl actix::Actor for ErrorActor {
145        type Context = actix::Context<Self>;
146    }
147
148    impl actix::Handler<super::ErrorMessage> for ErrorActor {
149        type Result = ();
150        fn handle(&mut self, error: super::ErrorMessage, _: &mut Self::Context) -> Self::Result {
151            log::error!("{}", error.0);
152        }
153    }
154
155    pub struct MessageActor;
156
157    impl actix::Actor for MessageActor {
158        type Context = actix::Context<Self>;
159    }
160
161    impl actix::Handler<crate::actors::packets::PublishMessage> for MessageActor {
162        type Result = ();
163        fn handle(
164            &mut self,
165            msg: crate::actors::packets::PublishMessage,
166            _: &mut Self::Context,
167        ) -> Self::Result {
168            log::info!(
169                "Got message: id:{}, topic: {}, payload: {:?}",
170                msg.id,
171                msg.topic_name,
172                msg.payload
173            );
174        }
175    }
176
177    #[test]
178    fn test_client() {
179        use std::io::Error as IoError;
180        use std::net::SocketAddr;
181        use std::str::FromStr;
182        use std::time::Duration;
183
184        use actix::{Actor, System};
185        use env_logger;
186        use tokio::io::split;
187        use tokio::net::TcpStream;
188        use tokio::time::{sleep_until, Instant};
189
190        use crate::client::{MqttClient, MqttOptions};
191
192        env_logger::init();
193
194        let sys = System::new();
195        let socket_addr = SocketAddr::from_str("127.0.0.1:1883").unwrap();
196        sys.block_on(async move {
197            let result = async move {
198                let stream = TcpStream::connect(socket_addr).await?;
199                let (r, w) = split(stream);
200                log::info!("TCP connected");
201                let mut client = MqttClient::new(
202                    r,
203                    w,
204                    String::from("test"),
205                    MqttOptions::default(),
206                    MessageActor.start().recipient(),
207                    ErrorActor.start().recipient(),
208                    None,
209                );
210                client.connect().await?;
211                while !client.is_connected().await.unwrap() {
212                    log::info!("Waiting for client to be connected");
213                    let delay_time = Instant::now() + Duration::new(0, 100);
214                    sleep_until(delay_time).await;
215                }
216
217                log::info!("MQTT connected");
218                log::info!("Subscribe");
219                client
220                    .subscribe(String::from("test"), mqtt::QualityOfService::Level2)
221                    .await?;
222                log::info!("Publish");
223                client
224                    .publish(
225                        String::from("test"),
226                        mqtt::QualityOfService::Level0,
227                        Vec::from("test".as_bytes()),
228                    )
229                    .await?;
230                log::info!("Wait for 1s");
231                let delay_time = Instant::now() + Duration::new(1, 0);
232                sleep_until(delay_time).await;
233                client
234                    .publish(
235                        String::from("test"),
236                        mqtt::QualityOfService::Level1,
237                        Vec::from("test2".as_bytes()),
238                    )
239                    .await?;
240                log::info!("Wait for 1s");
241                let delay_time = Instant::now() + Duration::new(1, 0);
242                sleep_until(delay_time).await;
243                client
244                    .publish(
245                        String::from("test"),
246                        mqtt::QualityOfService::Level2,
247                        Vec::from("test3".as_bytes()),
248                    )
249                    .await?;
250                log::info!("Wait for 1s");
251                let delay_time = Instant::now() + Duration::new(1, 0);
252                sleep_until(delay_time).await;
253                log::info!("Disconnect");
254                client.disconnect(false).await?;
255                log::info!("Check if disconnect is successful");
256                for _ in (0 as i32)..5 {
257                    if client.is_disconnected() {
258                        break;
259                    }
260
261                    let delay_time = Instant::now() + Duration::new(0, 200);
262                    sleep_until(delay_time).await;
263                }
264
265                Ok(assert_eq!(true, client.is_disconnected())) as Result<(), IoError>
266            }
267            .await;
268            let r = result.unwrap();
269            System::current().stop();
270            r
271        });
272        sys.run().unwrap();
273    }
274}
275
276#[cfg(test)]
277mod random_test {
278    use tokio::sync::mpsc::{channel, Sender};
279
280    pub struct ErrorActor;
281
282    impl actix::Actor for ErrorActor {
283        type Context = actix::Context<Self>;
284    }
285
286    impl actix::Handler<super::ErrorMessage> for ErrorActor {
287        type Result = ();
288        fn handle(&mut self, error: super::ErrorMessage, _: &mut Self::Context) -> Self::Result {
289            log::error!("{}", error.0);
290        }
291    }
292
293    pub struct MessageActor(Sender<(bool, Vec<u8>)>);
294
295    impl actix::Actor for MessageActor {
296        type Context = actix::Context<Self>;
297    }
298
299    impl actix::Handler<crate::actors::packets::PublishMessage> for MessageActor {
300        type Result = ();
301        fn handle(
302            &mut self,
303            msg: crate::actors::packets::PublishMessage,
304            _: &mut Self::Context,
305        ) -> Self::Result {
306            log::info!(
307                "Got message: id:{}, topic: {}, payload: {:?}",
308                msg.id,
309                msg.topic_name,
310                msg.payload
311            );
312
313            self.0.try_send((false, msg.payload)).unwrap();
314        }
315    }
316
317    lazy_static::lazy_static! {
318        static ref PACKETS: std::sync::Mutex<std::collections::HashSet<Vec<u8>>> = std::sync::Mutex::new(std::collections::HashSet::new());
319    }
320
321    #[test]
322    fn test_random_publish_level0_cloned_client() {
323        use std::io::Error as IoError;
324        use std::net::SocketAddr;
325        use std::str::FromStr;
326        use std::time::Duration;
327
328        use actix::{Actor, Arbiter, System};
329        use env_logger;
330        use futures::stream::StreamExt;
331        use tokio::io::split;
332        use tokio::net::TcpStream;
333        use tokio::time::{sleep_until, Instant};
334        use tokio_stream::wrappers::ReceiverStream;
335
336        use crate::client::{MqttClient, MqttOptions};
337
338        env_logger::init();
339
340        let (sender, recv) = channel(100);
341        let sys = System::new();
342        let socket_addr = SocketAddr::from_str("127.0.0.1:1883").unwrap();
343
344        sys.block_on(async move {
345            let future = async move {
346                let result = async move {
347                    let stream = TcpStream::connect(socket_addr).await?;
348                    let (r, w) = split(stream);
349                    let mut client = MqttClient::new(
350                        r,
351                        w,
352                        String::from("test"),
353                        MqttOptions::default(),
354                        MessageActor(sender.clone()).start().recipient(),
355                        ErrorActor.start().recipient(),
356                        None,
357                    );
358                    client.connect().await?;
359                    while !client.is_connected().await.unwrap() {
360                        log::info!("Waiting for client to be connected");
361                        let delay_time = Instant::now() + Duration::new(0, 100);
362                        sleep_until(delay_time).await;
363                    }
364
365                    log::info!("Connected");
366                    log::info!("Subscribe");
367                    client
368                        .subscribe(String::from("test"), mqtt::QualityOfService::Level0)
369                        .await?;
370                    async fn random_send(
371                        client_id: i32,
372                        client: MqttClient,
373                        sender: Sender<(bool, Vec<u8>)>,
374                    ) {
375                        let mut count: i32 = 0;
376                        loop {
377                            count += 1;
378                            use rand::RngCore;
379                            let mut data = [0u8; 32];
380                            rand::thread_rng().fill_bytes(&mut data);
381                            let payload = Vec::from(&data[..]);
382                            log::info!("[{}:{}] Publish {:?}", client_id, count, payload);
383                            sleep_until(Instant::now() + Duration::from_millis(100)).await;
384                            sender.try_send((true, payload.clone())).unwrap();
385                            client
386                                .publish(
387                                    String::from("test"),
388                                    mqtt::QualityOfService::Level0,
389                                    payload,
390                                )
391                                .await
392                                .unwrap();
393                        }
394                    }
395
396                    for i in 0..5 {
397                        let client_clone = client.clone();
398                        let sender_clone = sender.clone();
399                        let future = random_send(i, client_clone, sender_clone);
400                        Arbiter::current().spawn(future);
401                    }
402
403                    Ok(()) as Result<(), IoError>
404                }
405                .await;
406                result.unwrap();
407            };
408
409            Arbiter::current().spawn(future);
410            let recv_future = async {
411                let result = async {
412                    ReceiverStream::new(recv)
413                        .fold((), |_, (is_send, payload)| async move {
414                            let mut p = PACKETS.lock().unwrap();
415                            if is_send {
416                                p.insert(payload);
417                            } else if p.contains(&payload) {
418                                p.remove(&payload);
419                            }
420
421                            log::info!("Pending recv items: {}", p.len());
422
423                            ()
424                        })
425                        .await;
426                    Ok(()) as Result<(), IoError>
427                }
428                .await;
429                result.unwrap()
430            };
431            Arbiter::current().spawn(recv_future);
432        });
433        sys.run().unwrap();
434    }
435
436    #[test]
437    fn test_random_publish_level0_created_client() {
438        use std::io::Error as IoError;
439        use std::net::SocketAddr;
440        use std::str::FromStr;
441        use std::time::Duration;
442
443        use actix::{Actor, Arbiter, System};
444        use env_logger;
445        use futures::stream::StreamExt;
446        use tokio::io::split;
447        use tokio::net::TcpStream;
448        use tokio::time::{sleep_until, Instant};
449        use tokio_stream::wrappers::ReceiverStream;
450
451        use crate::client::{MqttClient, MqttOptions};
452
453        env_logger::init();
454
455        async fn test_send(client_id: i32, sender: Sender<(bool, Vec<u8>)>) {
456            let socket_addr = SocketAddr::from_str("127.0.0.1:1883").unwrap();
457            async move {
458                let stream = TcpStream::connect(socket_addr).await?;
459                let (r, w) = split(stream);
460                let mut client = MqttClient::new(
461                    r,
462                    w,
463                    format!("test_{}", client_id),
464                    MqttOptions::default(),
465                    MessageActor(sender.clone()).start().recipient(),
466                    ErrorActor.start().recipient(),
467                    None,
468                );
469                client.connect().await?;
470                while !client.is_connected().await.unwrap() {
471                    log::info!("Waiting for client to be connected");
472                    let delay_time = Instant::now() + Duration::new(0, 100);
473                    sleep_until(delay_time).await;
474                }
475                log::info!("Connected");
476                log::info!("Subscribe");
477                client
478                    .subscribe(String::from("test"), mqtt::QualityOfService::Level0)
479                    .await?;
480                async fn random_send(
481                    client_id: i32,
482                    client: MqttClient,
483                    sender: Sender<(bool, Vec<u8>)>,
484                ) {
485                    let mut count: i32 = 0;
486                    loop {
487                        count += 1;
488                        use rand::RngCore;
489                        let mut data = [0u8; 32];
490                        rand::thread_rng().fill_bytes(&mut data);
491                        let payload = Vec::from(&data[..]);
492                        log::info!("[{}:{}] Publish {:?}", client_id, count, payload);
493                        sleep_until(Instant::now() + Duration::from_millis(100)).await;
494                        sender.try_send((true, payload.clone())).unwrap();
495                        client
496                            .publish(
497                                String::from("test"),
498                                mqtt::QualityOfService::Level0,
499                                payload,
500                            )
501                            .await
502                            .unwrap();
503                    }
504                }
505
506                let future = random_send(client_id, client, sender);
507                Arbiter::current().spawn(future);
508
509                Ok(()) as Result<(), IoError>
510            }
511            .await
512            .unwrap();
513        }
514
515        let sys = System::new();
516        sys.block_on(async move {
517            let (sender, recv) = channel(100);
518            for i in 0..5 {
519                let future = test_send(i, sender.clone());
520                Arbiter::current().spawn(future);
521            }
522
523            let recv_future = async {
524                let result = async {
525                    ReceiverStream::new(recv)
526                        .fold((), |_, (is_send, payload)| async move {
527                            let mut p = PACKETS.lock().unwrap();
528                            if is_send {
529                                p.insert(payload);
530                            } else if p.contains(&payload) {
531                                p.remove(&payload);
532                            }
533
534                            log::info!("Pending recv items: {}", p.len());
535
536                            ()
537                        })
538                        .await;
539                    Ok(()) as Result<(), IoError>
540                }
541                .await;
542                result.unwrap()
543            };
544            Arbiter::current().spawn(recv_future);
545        });
546        sys.run().unwrap();
547    }
548
549    #[test]
550    fn test_random_publish_level2() {
551        use std::io::Error as IoError;
552        use std::net::SocketAddr;
553        use std::str::FromStr;
554        use std::time::Duration;
555
556        use actix::{Actor, Arbiter, System};
557        use env_logger;
558        use futures::stream::StreamExt;
559        use tokio::io::split;
560        use tokio::net::TcpStream;
561        use tokio::time::{sleep_until, Instant};
562        use tokio_stream::wrappers::ReceiverStream;
563
564        use crate::client::{MqttClient, MqttOptions};
565
566        env_logger::init();
567
568        let (sender, recv) = channel(100);
569        let sender_clone = sender.clone();
570
571        let sys = System::new();
572        sys.block_on(async move {
573            let socket_addr = SocketAddr::from_str("127.0.0.1:1883").unwrap();
574            let future = async move {
575                let result = async move {
576                    let stream = TcpStream::connect(socket_addr).await?;
577                    let (r, w) = split(stream);
578                    let mut client = MqttClient::new(
579                        r,
580                        w,
581                        String::from("test"),
582                        MqttOptions::default(),
583                        MessageActor(sender).start().recipient(),
584                        ErrorActor.start().recipient(),
585                        None,
586                    );
587                    client.connect().await?;
588                    while !client.is_connected().await.unwrap() {
589                        log::info!("Waiting for client to be connected");
590                        let delay_time = Instant::now() + Duration::new(0, 100);
591                        sleep_until(delay_time).await;
592                    }
593                    log::info!("Connected");
594                    log::info!("Subscribe");
595                    client
596                        .subscribe(String::from("test"), mqtt::QualityOfService::Level2)
597                        .await?;
598                    futures::stream::repeat(())
599                        .fold((client, sender_clone), |(client, sender), _| async {
600                            use rand::RngCore;
601                            let mut data = [0u8; 32];
602                            rand::thread_rng().fill_bytes(&mut data);
603                            let payload = Vec::from(&data[..]);
604                            log::info!("Publish {:?}", payload);
605                            sleep_until(Instant::now() + Duration::from_millis(10)).await;
606                            sender.try_send((true, payload.clone())).unwrap();
607                            client
608                                .publish(
609                                    String::from("test"),
610                                    mqtt::QualityOfService::Level2,
611                                    payload,
612                                )
613                                .await
614                                .unwrap();
615                            (client, sender)
616                        })
617                        .await;
618                    Ok(()) as Result<(), IoError>
619                }
620                .await;
621                result.unwrap()
622            };
623            Arbiter::current().spawn(future);
624            let recv_future = async {
625                let result = async {
626                    ReceiverStream::new(recv)
627                        .fold((), |_, (is_send, payload)| async move {
628                            let mut p = PACKETS.lock().unwrap();
629                            if is_send {
630                                p.insert(payload);
631                            } else if !p.contains(&payload) {
632                                panic!("Multiple receive for level 2: {:?}", payload);
633                            } else {
634                                p.remove(&payload);
635                            }
636
637                            log::info!("Pending recv items: {}", p.len());
638
639                            ()
640                        })
641                        .await;
642                    Ok(()) as Result<(), IoError>
643                }
644                .await;
645                result.unwrap()
646            };
647            Arbiter::current().spawn(recv_future);
648        });
649        sys.run().unwrap();
650    }
651}