1mod actors;
128mod client;
129mod consts;
130
131pub use actix;
132pub use futures;
133pub use mqtt::QualityOfService;
134pub use tokio;
135
136pub use crate::actors::packets::PublishMessage;
137pub use crate::actors::{ErrorMessage, StopMessage};
138pub use crate::client::{MqttClient, MqttOptions};
139
140#[cfg(test)]
141mod tests {
142 pub struct ErrorActor;
143
144 impl actix::Actor for ErrorActor {
145 type Context = actix::Context<Self>;
146 }
147
148 impl actix::Handler<super::ErrorMessage> for ErrorActor {
149 type Result = ();
150 fn handle(&mut self, error: super::ErrorMessage, _: &mut Self::Context) -> Self::Result {
151 log::error!("{}", error.0);
152 }
153 }
154
155 pub struct MessageActor;
156
157 impl actix::Actor for MessageActor {
158 type Context = actix::Context<Self>;
159 }
160
161 impl actix::Handler<crate::actors::packets::PublishMessage> for MessageActor {
162 type Result = ();
163 fn handle(
164 &mut self,
165 msg: crate::actors::packets::PublishMessage,
166 _: &mut Self::Context,
167 ) -> Self::Result {
168 log::info!(
169 "Got message: id:{}, topic: {}, payload: {:?}",
170 msg.id,
171 msg.topic_name,
172 msg.payload
173 );
174 }
175 }
176
177 #[test]
178 fn test_client() {
179 use std::io::Error as IoError;
180 use std::net::SocketAddr;
181 use std::str::FromStr;
182 use std::time::Duration;
183
184 use actix::{Actor, System};
185 use env_logger;
186 use tokio::io::split;
187 use tokio::net::TcpStream;
188 use tokio::time::{sleep_until, Instant};
189
190 use crate::client::{MqttClient, MqttOptions};
191
192 env_logger::init();
193
194 let sys = System::new();
195 let socket_addr = SocketAddr::from_str("127.0.0.1:1883").unwrap();
196 sys.block_on(async move {
197 let result = async move {
198 let stream = TcpStream::connect(socket_addr).await?;
199 let (r, w) = split(stream);
200 log::info!("TCP connected");
201 let mut client = MqttClient::new(
202 r,
203 w,
204 String::from("test"),
205 MqttOptions::default(),
206 MessageActor.start().recipient(),
207 ErrorActor.start().recipient(),
208 None,
209 );
210 client.connect().await?;
211 while !client.is_connected().await.unwrap() {
212 log::info!("Waiting for client to be connected");
213 let delay_time = Instant::now() + Duration::new(0, 100);
214 sleep_until(delay_time).await;
215 }
216
217 log::info!("MQTT connected");
218 log::info!("Subscribe");
219 client
220 .subscribe(String::from("test"), mqtt::QualityOfService::Level2)
221 .await?;
222 log::info!("Publish");
223 client
224 .publish(
225 String::from("test"),
226 mqtt::QualityOfService::Level0,
227 Vec::from("test".as_bytes()),
228 )
229 .await?;
230 log::info!("Wait for 1s");
231 let delay_time = Instant::now() + Duration::new(1, 0);
232 sleep_until(delay_time).await;
233 client
234 .publish(
235 String::from("test"),
236 mqtt::QualityOfService::Level1,
237 Vec::from("test2".as_bytes()),
238 )
239 .await?;
240 log::info!("Wait for 1s");
241 let delay_time = Instant::now() + Duration::new(1, 0);
242 sleep_until(delay_time).await;
243 client
244 .publish(
245 String::from("test"),
246 mqtt::QualityOfService::Level2,
247 Vec::from("test3".as_bytes()),
248 )
249 .await?;
250 log::info!("Wait for 1s");
251 let delay_time = Instant::now() + Duration::new(1, 0);
252 sleep_until(delay_time).await;
253 log::info!("Disconnect");
254 client.disconnect(false).await?;
255 log::info!("Check if disconnect is successful");
256 for _ in (0 as i32)..5 {
257 if client.is_disconnected() {
258 break;
259 }
260
261 let delay_time = Instant::now() + Duration::new(0, 200);
262 sleep_until(delay_time).await;
263 }
264
265 Ok(assert_eq!(true, client.is_disconnected())) as Result<(), IoError>
266 }
267 .await;
268 let r = result.unwrap();
269 System::current().stop();
270 r
271 });
272 sys.run().unwrap();
273 }
274}
275
276#[cfg(test)]
277mod random_test {
278 use tokio::sync::mpsc::{channel, Sender};
279
280 pub struct ErrorActor;
281
282 impl actix::Actor for ErrorActor {
283 type Context = actix::Context<Self>;
284 }
285
286 impl actix::Handler<super::ErrorMessage> for ErrorActor {
287 type Result = ();
288 fn handle(&mut self, error: super::ErrorMessage, _: &mut Self::Context) -> Self::Result {
289 log::error!("{}", error.0);
290 }
291 }
292
293 pub struct MessageActor(Sender<(bool, Vec<u8>)>);
294
295 impl actix::Actor for MessageActor {
296 type Context = actix::Context<Self>;
297 }
298
299 impl actix::Handler<crate::actors::packets::PublishMessage> for MessageActor {
300 type Result = ();
301 fn handle(
302 &mut self,
303 msg: crate::actors::packets::PublishMessage,
304 _: &mut Self::Context,
305 ) -> Self::Result {
306 log::info!(
307 "Got message: id:{}, topic: {}, payload: {:?}",
308 msg.id,
309 msg.topic_name,
310 msg.payload
311 );
312
313 self.0.try_send((false, msg.payload)).unwrap();
314 }
315 }
316
317 lazy_static::lazy_static! {
318 static ref PACKETS: std::sync::Mutex<std::collections::HashSet<Vec<u8>>> = std::sync::Mutex::new(std::collections::HashSet::new());
319 }
320
321 #[test]
322 fn test_random_publish_level0_cloned_client() {
323 use std::io::Error as IoError;
324 use std::net::SocketAddr;
325 use std::str::FromStr;
326 use std::time::Duration;
327
328 use actix::{Actor, Arbiter, System};
329 use env_logger;
330 use futures::stream::StreamExt;
331 use tokio::io::split;
332 use tokio::net::TcpStream;
333 use tokio::time::{sleep_until, Instant};
334 use tokio_stream::wrappers::ReceiverStream;
335
336 use crate::client::{MqttClient, MqttOptions};
337
338 env_logger::init();
339
340 let (sender, recv) = channel(100);
341 let sys = System::new();
342 let socket_addr = SocketAddr::from_str("127.0.0.1:1883").unwrap();
343
344 sys.block_on(async move {
345 let future = async move {
346 let result = async move {
347 let stream = TcpStream::connect(socket_addr).await?;
348 let (r, w) = split(stream);
349 let mut client = MqttClient::new(
350 r,
351 w,
352 String::from("test"),
353 MqttOptions::default(),
354 MessageActor(sender.clone()).start().recipient(),
355 ErrorActor.start().recipient(),
356 None,
357 );
358 client.connect().await?;
359 while !client.is_connected().await.unwrap() {
360 log::info!("Waiting for client to be connected");
361 let delay_time = Instant::now() + Duration::new(0, 100);
362 sleep_until(delay_time).await;
363 }
364
365 log::info!("Connected");
366 log::info!("Subscribe");
367 client
368 .subscribe(String::from("test"), mqtt::QualityOfService::Level0)
369 .await?;
370 async fn random_send(
371 client_id: i32,
372 client: MqttClient,
373 sender: Sender<(bool, Vec<u8>)>,
374 ) {
375 let mut count: i32 = 0;
376 loop {
377 count += 1;
378 use rand::RngCore;
379 let mut data = [0u8; 32];
380 rand::thread_rng().fill_bytes(&mut data);
381 let payload = Vec::from(&data[..]);
382 log::info!("[{}:{}] Publish {:?}", client_id, count, payload);
383 sleep_until(Instant::now() + Duration::from_millis(100)).await;
384 sender.try_send((true, payload.clone())).unwrap();
385 client
386 .publish(
387 String::from("test"),
388 mqtt::QualityOfService::Level0,
389 payload,
390 )
391 .await
392 .unwrap();
393 }
394 }
395
396 for i in 0..5 {
397 let client_clone = client.clone();
398 let sender_clone = sender.clone();
399 let future = random_send(i, client_clone, sender_clone);
400 Arbiter::current().spawn(future);
401 }
402
403 Ok(()) as Result<(), IoError>
404 }
405 .await;
406 result.unwrap();
407 };
408
409 Arbiter::current().spawn(future);
410 let recv_future = async {
411 let result = async {
412 ReceiverStream::new(recv)
413 .fold((), |_, (is_send, payload)| async move {
414 let mut p = PACKETS.lock().unwrap();
415 if is_send {
416 p.insert(payload);
417 } else if p.contains(&payload) {
418 p.remove(&payload);
419 }
420
421 log::info!("Pending recv items: {}", p.len());
422
423 ()
424 })
425 .await;
426 Ok(()) as Result<(), IoError>
427 }
428 .await;
429 result.unwrap()
430 };
431 Arbiter::current().spawn(recv_future);
432 });
433 sys.run().unwrap();
434 }
435
436 #[test]
437 fn test_random_publish_level0_created_client() {
438 use std::io::Error as IoError;
439 use std::net::SocketAddr;
440 use std::str::FromStr;
441 use std::time::Duration;
442
443 use actix::{Actor, Arbiter, System};
444 use env_logger;
445 use futures::stream::StreamExt;
446 use tokio::io::split;
447 use tokio::net::TcpStream;
448 use tokio::time::{sleep_until, Instant};
449 use tokio_stream::wrappers::ReceiverStream;
450
451 use crate::client::{MqttClient, MqttOptions};
452
453 env_logger::init();
454
455 async fn test_send(client_id: i32, sender: Sender<(bool, Vec<u8>)>) {
456 let socket_addr = SocketAddr::from_str("127.0.0.1:1883").unwrap();
457 async move {
458 let stream = TcpStream::connect(socket_addr).await?;
459 let (r, w) = split(stream);
460 let mut client = MqttClient::new(
461 r,
462 w,
463 format!("test_{}", client_id),
464 MqttOptions::default(),
465 MessageActor(sender.clone()).start().recipient(),
466 ErrorActor.start().recipient(),
467 None,
468 );
469 client.connect().await?;
470 while !client.is_connected().await.unwrap() {
471 log::info!("Waiting for client to be connected");
472 let delay_time = Instant::now() + Duration::new(0, 100);
473 sleep_until(delay_time).await;
474 }
475 log::info!("Connected");
476 log::info!("Subscribe");
477 client
478 .subscribe(String::from("test"), mqtt::QualityOfService::Level0)
479 .await?;
480 async fn random_send(
481 client_id: i32,
482 client: MqttClient,
483 sender: Sender<(bool, Vec<u8>)>,
484 ) {
485 let mut count: i32 = 0;
486 loop {
487 count += 1;
488 use rand::RngCore;
489 let mut data = [0u8; 32];
490 rand::thread_rng().fill_bytes(&mut data);
491 let payload = Vec::from(&data[..]);
492 log::info!("[{}:{}] Publish {:?}", client_id, count, payload);
493 sleep_until(Instant::now() + Duration::from_millis(100)).await;
494 sender.try_send((true, payload.clone())).unwrap();
495 client
496 .publish(
497 String::from("test"),
498 mqtt::QualityOfService::Level0,
499 payload,
500 )
501 .await
502 .unwrap();
503 }
504 }
505
506 let future = random_send(client_id, client, sender);
507 Arbiter::current().spawn(future);
508
509 Ok(()) as Result<(), IoError>
510 }
511 .await
512 .unwrap();
513 }
514
515 let sys = System::new();
516 sys.block_on(async move {
517 let (sender, recv) = channel(100);
518 for i in 0..5 {
519 let future = test_send(i, sender.clone());
520 Arbiter::current().spawn(future);
521 }
522
523 let recv_future = async {
524 let result = async {
525 ReceiverStream::new(recv)
526 .fold((), |_, (is_send, payload)| async move {
527 let mut p = PACKETS.lock().unwrap();
528 if is_send {
529 p.insert(payload);
530 } else if p.contains(&payload) {
531 p.remove(&payload);
532 }
533
534 log::info!("Pending recv items: {}", p.len());
535
536 ()
537 })
538 .await;
539 Ok(()) as Result<(), IoError>
540 }
541 .await;
542 result.unwrap()
543 };
544 Arbiter::current().spawn(recv_future);
545 });
546 sys.run().unwrap();
547 }
548
549 #[test]
550 fn test_random_publish_level2() {
551 use std::io::Error as IoError;
552 use std::net::SocketAddr;
553 use std::str::FromStr;
554 use std::time::Duration;
555
556 use actix::{Actor, Arbiter, System};
557 use env_logger;
558 use futures::stream::StreamExt;
559 use tokio::io::split;
560 use tokio::net::TcpStream;
561 use tokio::time::{sleep_until, Instant};
562 use tokio_stream::wrappers::ReceiverStream;
563
564 use crate::client::{MqttClient, MqttOptions};
565
566 env_logger::init();
567
568 let (sender, recv) = channel(100);
569 let sender_clone = sender.clone();
570
571 let sys = System::new();
572 sys.block_on(async move {
573 let socket_addr = SocketAddr::from_str("127.0.0.1:1883").unwrap();
574 let future = async move {
575 let result = async move {
576 let stream = TcpStream::connect(socket_addr).await?;
577 let (r, w) = split(stream);
578 let mut client = MqttClient::new(
579 r,
580 w,
581 String::from("test"),
582 MqttOptions::default(),
583 MessageActor(sender).start().recipient(),
584 ErrorActor.start().recipient(),
585 None,
586 );
587 client.connect().await?;
588 while !client.is_connected().await.unwrap() {
589 log::info!("Waiting for client to be connected");
590 let delay_time = Instant::now() + Duration::new(0, 100);
591 sleep_until(delay_time).await;
592 }
593 log::info!("Connected");
594 log::info!("Subscribe");
595 client
596 .subscribe(String::from("test"), mqtt::QualityOfService::Level2)
597 .await?;
598 futures::stream::repeat(())
599 .fold((client, sender_clone), |(client, sender), _| async {
600 use rand::RngCore;
601 let mut data = [0u8; 32];
602 rand::thread_rng().fill_bytes(&mut data);
603 let payload = Vec::from(&data[..]);
604 log::info!("Publish {:?}", payload);
605 sleep_until(Instant::now() + Duration::from_millis(10)).await;
606 sender.try_send((true, payload.clone())).unwrap();
607 client
608 .publish(
609 String::from("test"),
610 mqtt::QualityOfService::Level2,
611 payload,
612 )
613 .await
614 .unwrap();
615 (client, sender)
616 })
617 .await;
618 Ok(()) as Result<(), IoError>
619 }
620 .await;
621 result.unwrap()
622 };
623 Arbiter::current().spawn(future);
624 let recv_future = async {
625 let result = async {
626 ReceiverStream::new(recv)
627 .fold((), |_, (is_send, payload)| async move {
628 let mut p = PACKETS.lock().unwrap();
629 if is_send {
630 p.insert(payload);
631 } else if !p.contains(&payload) {
632 panic!("Multiple receive for level 2: {:?}", payload);
633 } else {
634 p.remove(&payload);
635 }
636
637 log::info!("Pending recv items: {}", p.len());
638
639 ()
640 })
641 .await;
642 Ok(()) as Result<(), IoError>
643 }
644 .await;
645 result.unwrap()
646 };
647 Arbiter::current().spawn(recv_future);
648 });
649 sys.run().unwrap();
650 }
651}