use rzmq::{Context, Msg, SocketType, ZmqError};
use std::time::Duration;
use tokio::time::{sleep, timeout};
mod common;
const SHORT_TIMEOUT: Duration = Duration::from_millis(250);
const LONG_TIMEOUT: Duration = Duration::from_secs(3); const NUM_MESSAGES: usize = 5;
#[tokio::test]
async fn test_push_pull_explicit_close_then_term() -> Result<(), ZmqError> {
println!("\n--- Starting test_push_pull_explicit_close_then_term ---");
let ctx = common::test_context();
println!("Creating PUSH and PULL sockets...");
let push = ctx.socket(SocketType::Push)?;
let pull = ctx.socket(SocketType::Pull)?;
let endpoint = "tcp://127.0.0.1:5701";
println!("Binding PULL to {}...", endpoint);
pull.bind(endpoint).await?;
sleep(Duration::from_millis(50)).await;
println!("Connecting PUSH to {}...", endpoint);
push.connect(endpoint).await?;
sleep(Duration::from_millis(150)).await;
println!("Performing readiness handshake...");
let ready_msg = Msg::from_static(b"READY");
match timeout(LONG_TIMEOUT, push.send(ready_msg)).await {
Ok(Ok(())) => println!("Readiness send OK."),
Ok(Err(e)) => panic!("Readiness send failed: {}", e),
Err(_) => panic!("Readiness send timed out"),
}
match timeout(LONG_TIMEOUT, pull.recv()).await {
Ok(Ok(msg)) if msg.data().unwrap_or_default() == b"READY" => {
println!("Readiness recv OK.")
}
Ok(Ok(msg)) => panic!("Readiness recv wrong message: {:?}", msg.data().unwrap_or_default()),
Ok(Err(e)) => panic!("Readiness recv failed: {}", e),
Err(_) => panic!("Readiness recv timed out"),
}
println!("Readiness handshake complete.");
println!("Starting main send/receive loop ({} messages)...", NUM_MESSAGES);
let sender_task = {
let push = push.clone();
tokio::spawn(async move {
for i in 0..NUM_MESSAGES {
let msg = Msg::from_vec(format!("Msg {}", i).into_bytes());
if let Err(e) = push.send(msg).await {
println!("Sender task error: {}", e);
return Err(e);
}
tokio::task::yield_now().await;
}
println!("Sender task finished.");
Ok(())
})
};
let receiver_task = {
let pull = pull.clone();
tokio::spawn(async move {
for i in 0..NUM_MESSAGES {
match timeout(LONG_TIMEOUT, pull.recv()).await {
Ok(Ok(msg)) => {
let expected = format!("Msg {}", i).into_bytes();
assert_eq!(msg.data().unwrap(), expected.as_slice(), "Mismatch on msg {}", i);
println!("Receiver task received Msg {}", i);
}
Ok(Err(e)) => {
println!("Receiver task error on msg {}: {}", i, e);
if i == NUM_MESSAGES - 1 && matches!(&e, ZmqError::InvalidState(s) if s == &"Socket terminated") {
println!("Receiver task tolerated expected termination error at the end.");
return Ok(i); }
return Err(e); }
Err(_) => {
println!("Receiver task timed out waiting for Msg {}", i);
return Err(ZmqError::Timeout);
}
}
}
println!("Receiver task finished receiving all messages.");
Ok(NUM_MESSAGES) })
};
let send_result = sender_task.await.expect("Sender task panicked");
let recv_result = receiver_task.await.expect("Receiver task panicked");
println!("Send task result: {:?}", send_result);
println!("Receive task result: {:?}", recv_result);
assert!(send_result.is_ok(), "Sender task failed");
match recv_result {
Ok(count) => {
if count < NUM_MESSAGES {
println!(
"Warning: Receiver task completed but received only {}/{} messages (likely due to termination race).",
count, NUM_MESSAGES
);
}
}
Err(e) => {
panic!("Receiver task failed with unexpected error: {}", e);
}
}
println!("Send/Receive loop finished.");
println!("Closing PUSH socket explicitly...");
let push_close_res = push.close().await;
println!("PUSH close result: {:?}", push_close_res);
println!("Closing PULL socket explicitly...");
let pull_close_res = pull.close().await;
println!("PULL close result: {:?}", pull_close_res);
sleep(Duration::from_millis(50)).await;
println!("Terminating context...");
let term_result = ctx.term().await;
println!("Context term result: {:?}", term_result);
assert!(term_result.is_ok(), "Context termination failed");
println!("--- Test test_push_pull_explicit_close_then_term Finished ---");
Ok(())
}