use bytes::Bytes;
use monocoque::zmq::{PubSocket, SubSocket};
use std::sync::mpsc;
use std::time::Duration;
use tracing::{error, info};
#[compio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.init();
info!("=== PubSub Events Example ===\n");
let port = portpicker::pick_unused_port().expect("No ports available");
info!("Using port {}", port);
let mut pub_socket = PubSocket::bind(format!("127.0.0.1:{}", port)).await?;
info!("[Publisher] Bound to port {}", port);
let (ready_tx, ready_rx) = mpsc::channel::<()>();
let subscriber_handle = compio::runtime::spawn(async move {
if let Err(e) = run_subscriber(port, ready_tx).await {
error!("[Subscriber] Error: {e}");
}
});
pub_socket.accept_subscriber().await?;
info!("[Publisher] Subscriber connected");
loop {
match ready_rx.try_recv() {
Ok(_) => break,
Err(mpsc::TryRecvError::Empty) => {
compio::time::sleep(Duration::from_millis(10)).await;
}
Err(mpsc::TryRecvError::Disconnected) => break,
}
}
info!("[Publisher] Publishing events...");
let events = vec![
("trade.BTC", "BTC/USD: 45000"),
("trade.ETH", "ETH/USD: 3000"),
("news.crypto", "New regulation announced"),
("trade.BTC", "BTC/USD: 45100"),
("alert.system", "System maintenance in 1 hour"),
("trade.ETH", "ETH/USD: 3050"),
];
for (topic, data) in events {
let message = vec![Bytes::from(topic), Bytes::from(data)];
info!("[Publisher] Publishing: {topic} -> {data}");
pub_socket.send(message).await?;
}
info!("[Publisher] Done publishing");
subscriber_handle.await;
Ok(())
}
async fn run_subscriber(
port: u16,
ready_tx: mpsc::Sender<()>,
) -> Result<(), Box<dyn std::error::Error>> {
info!("[Subscriber] Connecting to port {}...", port);
let mut socket = SubSocket::connect(&format!("127.0.0.1:{}", port)).await?;
info!("[Subscriber] Connected!");
info!("[Subscriber] Subscribing to 'trade.' prefix");
socket.subscribe(b"trade.").await?;
info!("[Subscriber] Subscribed!");
let _ = ready_tx.send(());
info!("[Subscriber] Waiting for events...\n");
for i in 0..4 {
match socket.recv().await {
Ok(Some(message)) => {
if message.len() >= 2 {
let topic = std::str::from_utf8(&message[0]).unwrap_or("<invalid>");
let data = std::str::from_utf8(&message[1]).unwrap_or("<invalid>");
info!("[Subscriber] Message {}: {topic} -> {data}", i + 1);
}
}
Ok(None) => {
info!("[Subscriber] Connection closed");
break;
}
Err(e) => {
error!("[Subscriber] Recv error: {e}");
break;
}
}
}
info!("[Subscriber] Done receiving");
Ok(())
}