#[cfg(test)]
mod test {
use bytes::Bytes;
use rustzmq2::__async_rt as async_rt;
use rustzmq2::prelude::*;
use rustzmq2::ZmqMessage;
use std::error::Error;
use std::time::Duration;
fn assert_send<T: Send>() {}
fn assert_clone<T: Clone>() {}
#[test]
fn split_halves_are_send() {
assert_send::<rustzmq2::RouterSendHalf>();
assert_send::<rustzmq2::RouterRecvHalf>();
}
#[test]
fn router_send_half_is_clone() {
assert_clone::<rustzmq2::RouterSendHalf>();
}
#[async_rt::test]
async fn test_router_split_concurrent_send_recv() -> Result<(), Box<dyn Error>> {
pretty_env_logger::try_init().ok();
let mut router = rustzmq2::RouterSocket::new();
let endpoint = router.bind("tcp://localhost:0").await?;
let mut dealer = rustzmq2::DealerSocket::new();
dealer.connect(endpoint.to_string().as_str()).await?;
async_rt::task::sleep(Duration::from_millis(100)).await;
let (mut send_half, mut recv_half) = router.split();
let num_messages: u32 = 5;
let dealer_task = async_rt::task::spawn(async move {
for i in 0..num_messages {
dealer
.send(ZmqMessage::from(format!("msg-{}", i)))
.await
.unwrap();
}
for _ in 0..num_messages {
let _reply = dealer.recv().await.unwrap();
}
});
let echo_task = async_rt::task::spawn(async move {
for _ in 0..num_messages {
let msg = recv_half.recv().await.unwrap();
send_half.send(msg).await.unwrap();
}
});
let timeout = Duration::from_secs(5);
async_rt::task::timeout(timeout, async {
echo_task.await.unwrap();
dealer_task.await.unwrap();
})
.await
.expect("test timed out");
Ok(())
}
#[async_rt::test]
async fn test_router_split_send_half_clone_concurrent_send() -> Result<(), Box<dyn Error>> {
pretty_env_logger::try_init().ok();
fn message_for(identity: Bytes, payload: String) -> ZmqMessage {
let mut msg = ZmqMessage::from(payload);
msg.push_front(identity);
msg
}
let mut router = rustzmq2::RouterSocket::new();
let endpoint = router.bind("tcp://localhost:0").await?;
let mut dealer = rustzmq2::DealerSocket::new();
dealer.connect(endpoint.to_string().as_str()).await?;
async_rt::task::sleep(Duration::from_millis(100)).await;
let (send_half, mut recv_half) = router.split();
let mut send_half_1 = send_half.clone();
let mut send_half_2 = send_half;
let num_messages: u32 = 10;
dealer.send(ZmqMessage::from("register-dealer")).await?;
let registration = recv_half.recv().await?;
let dealer_identity = registration.get(0).unwrap().clone();
let mut expected = Vec::new();
for i in 0..num_messages {
expected.push(format!("sender-1->{i}"));
expected.push(format!("sender-2->{i}"));
}
expected.sort();
let dealer_identity_1 = dealer_identity.clone();
let send_task_1 = async_rt::task::spawn(async move {
for i in 0..num_messages {
let message = message_for(dealer_identity_1.clone(), format!("sender-1->{i}"));
send_half_1.send(message).await.unwrap();
}
});
let dealer_identity_2 = dealer_identity.clone();
let send_task_2 = async_rt::task::spawn(async move {
for i in 0..num_messages {
let message = message_for(dealer_identity_2.clone(), format!("sender-2->{i}"));
send_half_2.send(message).await.unwrap();
}
});
let dealer_task = async_rt::task::spawn(async move {
let mut received = Vec::new();
for _ in 0..(num_messages * 2) {
let reply = dealer.recv().await.unwrap();
received.push(String::try_from(reply).unwrap());
}
received.sort();
received
});
let timeout = Duration::from_secs(5);
async_rt::task::timeout(timeout, async {
send_task_1.await.unwrap();
send_task_2.await.unwrap();
assert_eq!(dealer_task.await.unwrap(), expected);
})
.await
.expect("test timed out");
Ok(())
}
}