rust-mqtt 0.5.1

MQTT client for embedded and non-embedded environments
Documentation
use log::info;
use rust_mqtt::{
    client::options::{PublicationOptions, SubscriptionOptions, TopicReference},
    types::{IdentifiedQoS, QoS, TopicName},
};
use tokio::{
    join,
    sync::oneshot::{Receiver, Sender, channel},
};

use crate::common::{
    BROKER_ADDRESS, DEFAULT_DC_OPTIONS, NO_SESSION_CONNECT_OPTIONS,
    assert::{assert_ok, assert_published, assert_recv, assert_subscribe},
    utils::{connected_client, disconnect, unique_topic},
};

const MSG: &str = "testMessage";

async fn publish_multiple(
    topic: TopicName<'_>,
    qos: QoS,
    count: u16,
    ready_rx: Receiver<()>,
) -> Result<(), ()> {
    let mut client =
        assert_ok!(connected_client(BROKER_ADDRESS, NO_SESSION_CONNECT_OPTIONS, None).await);

    info!("[Publisher] Waiting for receiver to be ready");
    assert_ok!(ready_rx.await);
    info!("[Publisher] Receiver is ready, starting to publish");

    let pub_options = PublicationOptions::new(TopicReference::Name(topic.clone())).qos(qos);

    let topic_name = topic.as_ref();
    info!(
        "[Publisher] Sending {} messages to topic {:?}",
        count, topic_name
    );
    for i in 0..count {
        assert_published!(client, pub_options, MSG.into());
        if (i + 1) % 100 == 0 {
            info!("[Publisher] Sent {}/{} messages", i + 1, count);
        }
    }

    info!("[Publisher] Disconnecting after sending {} messages", count);
    disconnect(&mut client, DEFAULT_DC_OPTIONS).await;
    Ok(())
}

async fn receive_multiple(
    topic_name: TopicName<'static>,
    qos: QoS,
    count: u16,
    ready_tx: Sender<()>,
) -> Result<(), ()> {
    let mut client =
        assert_ok!(connected_client(BROKER_ADDRESS, NO_SESSION_CONNECT_OPTIONS, None).await);

    let options = SubscriptionOptions {
        retain_handling: rust_mqtt::client::options::RetainHandling::AlwaysSend,
        retain_as_published: false,
        no_local: false,
        qos,
        subscription_identifier: None,
    };

    info!("[Receiver] Subscribing to topic {:?}", topic_name.as_ref());
    assert_subscribe!(client, options, topic_name.into());

    info!("[Receiver] Subscription confirmed, signaling ready");
    assert_ok!(ready_tx.send(()));

    info!("[Receiver] Waiting for {} messages", count);
    for i in 0..count {
        let publish = assert_recv!(client);
        assert_eq!(&*publish.message, MSG.as_bytes());
        assert_eq!(
            <IdentifiedQoS as Into<QoS>>::into(publish.identified_qos),
            qos
        );

        if (i + 1) % 100 == 0 {
            info!("[Receiver] Received {}/{} messages", i + 1, count);
        }
    }

    info!(
        "[Receiver] Disconnecting after receiving {} messages",
        count
    );
    disconnect(&mut client, DEFAULT_DC_OPTIONS).await;
    Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[test_log::test]
async fn load_test_ten_qos0() {
    let (topic_name, _) = unique_topic();
    let (ready_tx, ready_rx) = channel();

    let (r, p) = join!(
        receive_multiple(topic_name.clone(), QoS::AtMostOnce, 10, ready_tx),
        publish_multiple(topic_name, QoS::AtMostOnce, 10, ready_rx)
    );
    assert_ok!(r);
    assert_ok!(p);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[test_log::test]
async fn load_test_ten_qos1() {
    let (topic_name, _) = unique_topic();
    let (ready_tx, ready_rx) = channel();

    let (r, p) = join!(
        receive_multiple(topic_name.clone(), QoS::AtLeastOnce, 10, ready_tx),
        publish_multiple(topic_name, QoS::AtLeastOnce, 10, ready_rx)
    );
    assert_ok!(r);
    assert_ok!(p);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[test_log::test]
async fn load_test_ten_qos2() {
    let (topic_name, _) = unique_topic();
    let (ready_tx, ready_rx) = channel();

    let (r, p) = join!(
        receive_multiple(topic_name.clone(), QoS::ExactlyOnce, 10, ready_tx),
        publish_multiple(topic_name, QoS::ExactlyOnce, 10, ready_rx)
    );
    assert_ok!(r);
    assert_ok!(p);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[test_log::test]
async fn load_test_fifty_qos0() {
    let (topic_name, _) = unique_topic();
    let (ready_tx, ready_rx) = channel();

    let (r, p) = join!(
        receive_multiple(topic_name.clone(), QoS::AtMostOnce, 50, ready_tx),
        publish_multiple(topic_name, QoS::AtMostOnce, 50, ready_rx)
    );
    assert_ok!(r);
    assert_ok!(p);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[test_log::test]
async fn load_test_fifty_qos1() {
    let (topic_name, _) = unique_topic();
    let (ready_tx, ready_rx) = channel();

    let (r, p) = join!(
        receive_multiple(topic_name.clone(), QoS::AtLeastOnce, 50, ready_tx),
        publish_multiple(topic_name, QoS::AtLeastOnce, 50, ready_rx)
    );
    assert_ok!(r);
    assert_ok!(p);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[test_log::test]
async fn load_test_fifty_qos2() {
    let (topic_name, _) = unique_topic();
    let (ready_tx, ready_rx) = channel();

    let (r, p) = join!(
        receive_multiple(topic_name.clone(), QoS::ExactlyOnce, 50, ready_tx),
        publish_multiple(topic_name, QoS::ExactlyOnce, 50, ready_rx)
    );
    assert_ok!(r);
    assert_ok!(p);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[test_log::test]
async fn load_test_five_hundred_qos0() {
    let (topic_name, _) = unique_topic();
    let (ready_tx, ready_rx) = channel();

    let (r, p) = join!(
        receive_multiple(topic_name.clone(), QoS::AtMostOnce, 500, ready_tx),
        publish_multiple(topic_name, QoS::AtMostOnce, 500, ready_rx)
    );
    assert_ok!(r);
    assert_ok!(p);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[test_log::test]
async fn load_test_five_hundred_qos1() {
    let (topic_name, _) = unique_topic();
    let (ready_tx, ready_rx) = channel();

    let (r, p) = join!(
        receive_multiple(topic_name.clone(), QoS::AtLeastOnce, 500, ready_tx),
        publish_multiple(topic_name, QoS::AtLeastOnce, 500, ready_rx)
    );
    assert_ok!(r);
    assert_ok!(p);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[test_log::test]
async fn load_test_five_hundred_qos2() {
    let (topic_name, _) = unique_topic();
    let (ready_tx, ready_rx) = channel();

    let (r, p) = join!(
        receive_multiple(topic_name.clone(), QoS::ExactlyOnce, 500, ready_tx),
        publish_multiple(topic_name, QoS::ExactlyOnce, 500, ready_rx)
    );
    assert_ok!(r);
    assert_ok!(p);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[test_log::test]
async fn load_test_ten_thousand_qos0() {
    let (topic_name, _) = unique_topic();
    let (ready_tx, ready_rx) = channel();

    let (r, p) = join!(
        receive_multiple(topic_name.clone(), QoS::AtMostOnce, 10000, ready_tx),
        publish_multiple(topic_name, QoS::AtMostOnce, 10000, ready_rx)
    );
    assert_ok!(r);
    assert_ok!(p);
}