rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
// 06_xpub_server — XPUB socket with subscription event tracking.
//
// XPUB differs from PUB in that the application can receive subscription and
// unsubscription messages from clients. This example:
//   1. Binds a single XPUB socket.
//   2. Uses futures::select! to interleave publishing and draining subscription
//      events on the same socket.
//   3. Tracks active subscriber count and only publishes when > 0.
//
// Run alongside `cargo run --example 04_pubsub_client` (the SUB peer).

use futures::{select, FutureExt};
use rand::RngExt;
use rustzmq2::prelude::*;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut socket = rustzmq2::XPubSocket::new();
    socket.bind("tcp://127.0.0.1:5556").await?;
    println!("XPUB server bound on tcp://127.0.0.1:5556");

    let mut rng = rand::rng();
    let mut active_subs: i32 = 0;
    let mut send_counter = 0u64;
    let mut interval = tokio::time::interval(Duration::from_millis(200));

    loop {
        select! {
            // Drain incoming subscription / unsubscription events.
            event = socket.recv_event().fuse() => {
                match event? {
                    XPubEvent::Subscribe { topic } => {
                        let topic = std::str::from_utf8(&topic).unwrap_or("?");
                        println!("+ subscriber (topic: {:?})", if topic.is_empty() { "*" } else { topic });
                        active_subs += 1;
                    }
                    XPubEvent::Unsubscribe { topic } => {
                        let topic = std::str::from_utf8(&topic).unwrap_or("?");
                        println!("- unsubscribe (topic: {:?})", if topic.is_empty() { "*" } else { topic });
                        active_subs = (active_subs - 1).max(0);
                    }
                    XPubEvent::Other(_) => {}
                }
            }
            // Publish on each tick, but only when someone is listening.
            _ = interval.tick().fuse() => {
                if active_subs == 0 { continue; }
                let zip  = rng.random_range(10000..10010_u32);
                let temp = rng.random_range(-20..40_i32);
                let hum  = rng.random_range(10..90_u32);
                socket.send(format!("{zip} {temp}°C {hum}%")).await?;
                send_counter += 1;
                if send_counter % 10 == 0 {
                    println!("sent {send_counter} updates ({active_subs} active subs)");
                }
            }
        }
    }
}