#![allow(dead_code)]
use rzmq::socket::{MonitorReceiver, SocketEvent};
use rzmq::{Context, Socket, ZmqError};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::{EnvFilter, FmtSubscriber};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Once;
use std::time::Duration;
use tokio::time::timeout;
static IPC_ENDPOINT_COUNTER: AtomicUsize = AtomicUsize::new(0);
static INPROC_ENDPOINT_COUNTER: AtomicUsize = AtomicUsize::new(0);
static TRACING_INIT: Once = Once::new();
fn setup_tracing() {
TRACING_INIT.call_once(|| {
let default_filter = "rzmq=trace,debug,info,warn";
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(default_filter));
let subscriber = FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE) .with_env_filter(env_filter)
.with_target(true) .with_line_number(true) .with_span_events(FmtSpan::CLOSE) .with_test_writer() .finish();
tracing::subscriber::set_global_default(subscriber).expect("Failed to set global tracing subscriber");
println!("Tracing subscriber initialized."); });
}
pub fn test_context() -> Context {
setup_tracing(); rzmq::Context::new().expect("Failed to create test context")
}
pub fn unique_ipc_endpoint() -> String {
let pid = std::process::id();
let count = IPC_ENDPOINT_COUNTER.fetch_add(1, Ordering::Relaxed);
format!("ipc:///tmp/rzmq_test_{}_{}", pid, count)
}
pub fn unique_inproc_endpoint() -> String {
let pid = std::process::id();
let count = INPROC_ENDPOINT_COUNTER.fetch_add(1, Ordering::Relaxed);
format!("inproc://rzmq_test_{}_{}", pid, count)
}
pub async fn recv_timeout(socket: &Socket, duration: Duration) -> Result<rzmq::Msg, ZmqError> {
match timeout(duration, socket.recv()).await {
Ok(Ok(msg)) => Ok(msg),
Ok(Err(e)) => Err(e), Err(_) => Err(ZmqError::Timeout), }
}
pub async fn send_timeout(socket: &Socket, msg: rzmq::Msg, duration: Duration) -> Result<(), ZmqError> {
match timeout(duration, socket.send(msg)).await {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => Err(e),
Err(_) => Err(ZmqError::Timeout),
}
}
pub async fn bind_socket_resolve(socket: &Socket, base_endpoint: &str) -> Result<String, ZmqError> {
socket.bind(base_endpoint).await?;
if base_endpoint.contains('*') {
panic!("Cannot resolve wildcard endpoint without ZMQ_LAST_ENDPOINT support");
} else {
Ok(base_endpoint.to_string())
}
}
pub async fn wait_for_monitor_event(
monitor_rx: &MonitorReceiver,
timeout: Duration,
short_recv_timeout: Duration,
check_event: impl Fn(&SocketEvent) -> bool, ) -> Result<SocketEvent, String> {
let start_time = tokio::time::Instant::now();
loop {
if start_time.elapsed() > timeout {
return Err(format!(
"Timeout waiting for specific monitor event after {:?}",
timeout
));
}
match tokio::time::timeout(short_recv_timeout, monitor_rx.recv()).await {
Ok(Ok(event)) => {
println!("Monitor received: {:?}", event); if check_event(&event) {
return Ok(event); }
}
Ok(Err(_recv_err)) => {
return Err("Monitor channel closed unexpectedly".to_string());
}
Err(_elapsed) => { }
}
}
}