rumqttc-v4-next 0.33.1

Explicit MQTT 3.1.1 client crate in the rumqttc-next family
Documentation
use rumqttc::{AsyncClient, Broker, Proxy, ProxyAuth, ProxyType, QoS};
use std::{error::Error, time::Duration};
use tokio::{task, time};

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
    use rumqttc::MqttOptions;

    pretty_env_logger::init();

    let mut mqttoptions = MqttOptions::new(
        "clientId-aSziq39Bp3",
        Broker::websocket("ws://broker.mqttdashboard.com:8000/mqtt")
            .expect("valid websocket broker"),
    );
    mqttoptions.set_keep_alive(60);
    // Presumes that there is a proxy server already set up listening on 127.0.0.1:8100
    mqttoptions.set_proxy(Proxy {
        ty: ProxyType::Http,
        auth: ProxyAuth::None,
        addr: "127.0.0.1".into(),
        port: 8100,
    });

    let (client, mut eventloop) = AsyncClient::builder(mqttoptions).capacity(10).build();
    task::spawn(async move {
        requests(client).await;
        time::sleep(Duration::from_secs(3)).await;
    });

    loop {
        let event = eventloop.poll().await;
        match event {
            Ok(notif) => {
                println!("Event = {notif:?}");
            }
            Err(err) => {
                println!("Error = {err:?}");
                return Ok(());
            }
        }
    }
}

async fn requests(client: AsyncClient) {
    client
        .subscribe("hello/world", QoS::AtMostOnce)
        .await
        .unwrap();

    for i in 1..=10 {
        client
            .publish("hello/world", QoS::ExactlyOnce, false, vec![1; i])
            .await
            .unwrap();

        time::sleep(Duration::from_secs(1)).await;
    }

    time::sleep(Duration::from_secs(120)).await;
}