use futures::{select, FutureExt};
use rand::RngExt;
use rustzmq2::prelude::*;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut socket = rustzmq2::XPubSocket::new();
socket.bind("tcp://127.0.0.1:5556").await?;
println!("XPUB server bound on tcp://127.0.0.1:5556");
let mut rng = rand::rng();
let mut active_subs: i32 = 0;
let mut send_counter = 0u64;
let mut interval = tokio::time::interval(Duration::from_millis(200));
loop {
select! {
event = socket.recv_event().fuse() => {
match event? {
XPubEvent::Subscribe { topic } => {
let topic = std::str::from_utf8(&topic).unwrap_or("?");
println!("+ subscriber (topic: {:?})", if topic.is_empty() { "*" } else { topic });
active_subs += 1;
}
XPubEvent::Unsubscribe { topic } => {
let topic = std::str::from_utf8(&topic).unwrap_or("?");
println!("- unsubscribe (topic: {:?})", if topic.is_empty() { "*" } else { topic });
active_subs = (active_subs - 1).max(0);
}
XPubEvent::Other(_) => {}
}
}
_ = interval.tick().fuse() => {
if active_subs == 0 { continue; }
let zip = rng.random_range(10000..10010_u32);
let temp = rng.random_range(-20..40_i32);
let hum = rng.random_range(10..90_u32);
socket.send(format!("{zip} {temp}°C {hum}%")).await?;
send_counter += 1;
if send_counter % 10 == 0 {
println!("sent {send_counter} updates ({active_subs} active subs)");
}
}
}
}
}