Crate actix_mqtt_client[][src]

Expand description

A MQTT client based on actix framework

The actix-mqtt-client crate is a mqtt client based on the actix framework

Basic usage and example

First, create 2 actix actors, one for receiving publish messages, the other one for receiving error messages from the client, you can also create an optional actix actor for receiving the stop message:

pub struct ErrorActor;

impl actix::Actor for ErrorActor {
    type Context = actix::Context<Self>;
}

impl actix::Handler<ErrorMessage> for ErrorActor {
    type Result = ();
    fn handle(&mut self, error: ErrorMessage, _: &mut Self::Context) -> Self::Result {
        log::error!("{}", error.0);
    }
}

pub struct MessageActor;

impl actix::Actor for MessageActor {
    type Context = actix::Context<Self>;
}

impl actix::Handler<PublishMessage> for MessageActor {
    type Result = ();
    fn handle(
        &mut self,
        msg: PublishMessage,
        _: &mut Self::Context,
    ) -> Self::Result {
        log::info!(
            "Got message: id:{}, topic: {}, payload: {:?}",
            msg.id,
            msg.topic_name,
            msg.payload
        );
    }
}

Then, connect to the server(using tokio) and use the read and write part of the stream along with the actors to create a MqttClient:

use std::io::Error as IoError;
use std::net::SocketAddr;
use std::str::FromStr;
use std::time::Duration;
use actix::{Actor, Arbiter, System};
use env_logger;
use tokio::io::split;
use tokio::net::TcpStream;
use tokio::time::{sleep_until, Instant};
use actix_mqtt_client::client::{MqttClient, MqttOptions};

let sys = System::new();
let socket_addr = SocketAddr::from_str("127.0.0.1:1883").unwrap();
sys.block_on(async move {
   let result = async move {
       let stream = TcpStream::connect(socket_addr).await?;
       let (r, w) = split(stream);
       log::info!("TCP connected");
       let mut client = MqttClient::new(
           r,
           w,
           String::from("test"),
           MqttOptions::default(),
           MessageActor.start().recipient(),
           ErrorActor.start().recipient(),
           None,
       );
       client.connect().await?;
       // Waiting for the client to be connected
       while !client.is_connected().await? {
           let delay_time = Instant::now() + Duration::new(1, 0);
           sleep_until(delay_time).await;
       }
       log::info!("MQTT connected");
       log::info!("Subscribe");
       client
           .subscribe(String::from("test"), mqtt::QualityOfService::Level2)
           .await?;
       log::info!("Publish");
       client
           .publish(
               String::from("test"),
               mqtt::QualityOfService::Level0,
               Vec::from("test".as_bytes()),
           )
           .await?;
       log::info!("Wait for 10s");
       let delay_time = Instant::now() + Duration::new(10, 0);
       sleep_until(delay_time).await;
       client
           .publish(
               String::from("test"),
               mqtt::QualityOfService::Level1,
               Vec::from("test2".as_bytes()),
           )
           .await?;
       log::info!("Wait for 10s");
       let delay_time = Instant::now() + Duration::new(10, 0);
       sleep_until(delay_time).await;
       client
           .publish(
               String::from("test"),
               mqtt::QualityOfService::Level2,
               Vec::from("test3".as_bytes()),
           )
           .await?;
       log::info!("Wait for 10s");
       let delay_time = Instant::now() + Duration::new(10, 0);
       sleep_until(delay_time).await;
       log::info!("Disconnect");
       client.disconnect(false).await?;
       log::info!("Check if disconnect is successful");
       Ok(assert_eq!(true, client.is_disconnected())) as Result<(), IoError>
   }
   .await;
   result.unwrap()
});
sys.run().unwrap();

Re-exports

pub use actix;
pub use futures;
pub use tokio;

Structs

ErrorMessage

The actix message containing the error happens inside the client

MqttClient

The client for connecting to the MQTT server

MqttOptions

The options for setting up MQTT connection

PublishMessage

The actix message containing the payload of a MQTT publish packet

StopMessage

The actix message indicating that the client is about to stop

Enums

QualityOfService