rumqttd 0.20.0

rumqttd is a high performance MQTT broker written in Rust which is light weight and embeddable
Documentation
use std::{sync::Arc, time::Duration};

use bytes::Bytes;
use rumqttd::{
    local::LinkRx,
    protocol::{Packet, Publish},
    Broker, Notification,
};

use tokio::{
    select,
    sync::Barrier,
    task,
    time::{self, Instant},
};
use tracing_subscriber::EnvFilter;

const PUBLISHERS: usize = 10000;
const MAX_MSG_PER_PUB: usize = 100;
const SLEEP_TIME_MS_BETWEEN_PUB: u64 = 100;
const CONSUMERS: usize = 2;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    // RUST_LOG=rumqttd[{client_id=consumer}]=debug cargo run --example stress
    tracing_subscriber::fmt()
        // .with_env_filter("rumqttd=debug");
        .with_env_filter(EnvFilter::from_env("RUST_LOG"))
        .pretty()
        .init();

    let manifest_dir = env!("CARGO_MANIFEST_DIR");
    let config = format!("{manifest_dir}/rumqttd.toml");
    let config = config::Config::builder()
        .add_source(config::File::with_name(&config))
        .build()
        .unwrap(); // Config::default() doesn't have working values

    let config = config.try_deserialize().unwrap();
    let broker = Broker::new(config);

    for i in 0..CONSUMERS {
        let client_id = format!("consumer_{i}");
        let (mut link_tx, mut link_rx) = broker.link(&client_id).unwrap();

        link_tx.subscribe("hello/+/world").unwrap();
        link_rx.recv().unwrap();

        task::spawn(async move { consumer(&client_id, link_rx).await });
    }

    let barrier = Arc::new(Barrier::new(PUBLISHERS + 1));
    for i in 0..PUBLISHERS {
        let c = barrier.clone();
        let client_id = format!("publisher_{i}");
        let topic = format!("hello/{client_id}/world");
        let payload = vec![0u8; 1_000]; // 0u8 is one byte, so total ~1KB
        let (mut link_tx, _link_rx) = broker.link(&client_id).unwrap();

        let topic: Bytes = topic.into();
        let payload: Bytes = payload.into();
        task::spawn(async move {
            for _ in 0..MAX_MSG_PER_PUB {
                time::sleep(Duration::from_millis(SLEEP_TIME_MS_BETWEEN_PUB)).await;
                let publish = Publish::new(topic.clone(), payload.clone(), false);
                link_tx.send(Packet::Publish(publish, None)).await.unwrap();
            }

            c.wait().await;
        });
    }

    barrier.wait().await;
    time::sleep(Duration::from_secs(5)).await;
}

async fn consumer(client_id: &str, mut link_rx: LinkRx) {
    let mut count = 0;
    let mut interval = time::interval(Duration::from_millis(500));
    let instant = Instant::now();
    loop {
        select! {
            _ = interval.tick() => {
                println!("{client_id:?}: total count: {count:<10}; time: {:?}", instant.elapsed());
            }
            notification = link_rx.next() => {
                let notification = match notification.unwrap() {
                    Some(v) => v,
                    None => continue
                };

                match notification {
                    Notification::Forward(_) => count += 1,
                    Notification::Unschedule => link_rx.wake().await.unwrap(),
                    _ => unreachable!()
                }
            }
        }
    }
}