#[cfg(all(test, feature = "tokio", feature = "tcp"))]
mod test {
use rustzmq2::__async_rt as async_rt;
use rustzmq2::prelude::*;
use rustzmq2::{PullSocket, PushSocket, ZmqError};
use std::time::Duration;
#[async_rt::test]
async fn linger_zero_close_is_immediate() {
let mut push = PushSocket::builder().linger(Some(Duration::ZERO)).build();
let ep = push.bind("tcp://127.0.0.1:0").await.unwrap();
let mut pull = PullSocket::new();
pull.connect(&ep.to_string()).await.unwrap();
for i in 0u32..20 {
let _ = push
.send(rustzmq2::ZmqMessage::from(i.to_be_bytes().to_vec()))
.await;
}
let start = std::time::Instant::now();
let _ = push.close().await;
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(500),
"linger=ZERO close took {:?}",
elapsed
);
}
#[async_rt::test]
async fn linger_positive_drains_before_close() {
let mut push = PushSocket::builder()
.linger(Some(Duration::from_millis(500)))
.send_hwm(1000)
.build();
let ep = push.bind("tcp://127.0.0.1:0").await.unwrap();
let mut pull = PullSocket::new();
pull.connect(&ep.to_string()).await.unwrap();
async_rt::task::sleep(Duration::from_millis(50)).await;
for i in 0u32..5 {
push.send(rustzmq2::ZmqMessage::from(i.to_be_bytes().to_vec()))
.await
.unwrap();
}
let start = std::time::Instant::now();
let _ = push.close().await;
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(2),
"linger=500ms close took {:?}",
elapsed
);
let mut count = 0usize;
while async_rt::task::timeout(Duration::from_millis(100), pull.recv())
.await
.is_ok()
{
count += 1;
}
assert!(count > 0, "expected at least one message to drain");
}
#[async_rt::test]
async fn immediate_returns_error_when_no_peers() {
let mut push = PushSocket::builder().immediate(true).build();
push.bind("tcp://127.0.0.1:0").await.unwrap();
let result = push
.send(rustzmq2::ZmqMessage::from(b"hello".to_vec()))
.await;
assert!(
matches!(result, Err(ZmqError::ReturnToSender { .. })),
"expected ReturnToSender, got {:?}",
result
);
}
#[async_rt::test]
async fn immediate_false_delivers_when_peer_connects() {
let mut push = PushSocket::builder().immediate(false).build();
let ep = push.bind("tcp://127.0.0.1:0").await.unwrap();
let ep_str = ep.to_string();
let task = async_rt::task::spawn(async move {
let mut pull = PullSocket::new();
async_rt::task::sleep(Duration::from_millis(50)).await;
pull.connect(&ep_str).await.unwrap();
pull.recv().await
});
async_rt::task::sleep(Duration::from_millis(100)).await;
push.send(rustzmq2::ZmqMessage::from(b"hello".to_vec()))
.await
.unwrap();
let msg = task.await.unwrap().unwrap();
assert_eq!(msg.get(0).unwrap().as_ref(), b"hello");
}
#[async_rt::test]
async fn conflate_keeps_only_latest_message() {
let mut push = PushSocket::new();
let ep = push.bind("tcp://127.0.0.1:0").await.unwrap();
let mut pull = PullSocket::builder().conflate(true).build();
pull.connect(&ep.to_string()).await.unwrap();
async_rt::task::sleep(Duration::from_millis(50)).await;
for i in 0u32..20 {
let _ = push
.send(rustzmq2::ZmqMessage::from(i.to_be_bytes().to_vec()))
.await;
}
async_rt::task::sleep(Duration::from_millis(100)).await;
let mut received: Vec<u32> = Vec::new();
while let Ok(msg) = async_rt::task::timeout(Duration::from_millis(50), pull.recv()).await {
let frame = msg.unwrap().get(0).unwrap().clone();
let val = u32::from_be_bytes(frame[..4].try_into().unwrap());
received.push(val);
}
assert!(!received.is_empty(), "expected at least one message");
assert!(
received.len() < 20,
"expected conflate to drop old messages, got {} messages",
received.len()
);
let last = *received.last().unwrap();
assert!(
last >= 5,
"expected a later message to survive conflate, got {}",
last
);
}
}