#[cfg(test)]
mod test {
use rustzmq2::__async_rt as async_rt;
use rustzmq2::prelude::*;
use rustzmq2::Endpoint;
use rustzmq2::ZmqMessage;
use futures::channel::{mpsc, oneshot};
use futures::{SinkExt, StreamExt};
use std::time::Duration;
#[async_rt::test]
async fn test_pub_sub_sockets() {
pretty_env_logger::try_init().ok();
async fn helper(bind_addr: &'static str) {
let mut task_handles = Vec::new();
let payload = "zmq-pub-sub-test-payload".to_string();
let cloned_payload = payload.clone();
let (server_stop_sender, mut server_stop) = oneshot::channel::<()>();
let (has_bound_sender, has_bound) = oneshot::channel::<Endpoint>();
task_handles.push(async_rt::task::spawn(async move {
let mut pub_socket = rustzmq2::PubSocket::new();
let bound_to = pub_socket
.bind(bind_addr)
.await
.unwrap_or_else(|e| panic!("Failed to bind to {}: {}", bind_addr, e));
has_bound_sender
.send(bound_to)
.expect("channel was dropped");
loop {
if let Ok(Some(_)) = server_stop.try_recv() {
break;
}
let s: String = cloned_payload.clone();
let m = ZmqMessage::from(s);
pub_socket.send(m).await.expect("Failed to send");
async_rt::task::sleep(Duration::from_millis(1)).await;
}
if let Err(errs) = pub_socket.close().await {
panic!("Could not unbind socket: {:?}", errs);
}
}));
let bound_addr = has_bound.await.expect("channel was cancelled");
if let Endpoint::Tcp(_host, port) = bound_addr.clone() {
assert_ne!(port, 0);
}
let (sub_results_sender, sub_results) = mpsc::channel(100);
for _ in 0..10 {
let mut cloned_sub_sender = sub_results_sender.clone();
let cloned_payload = payload.clone();
let cloned_bound_addr = bound_addr.to_string();
task_handles.push(async_rt::task::spawn(async move {
let mut sub_socket = rustzmq2::SubSocket::new();
sub_socket
.connect(&cloned_bound_addr)
.await
.unwrap_or_else(|_| panic!("Failed to connect to {}", bind_addr));
sub_socket.subscribe("").await.expect("Failed to subscribe");
async_rt::task::sleep(std::time::Duration::from_millis(500)).await;
for _ in 0..10 {
let recv_message = sub_socket.recv().await.unwrap();
let recv_payload =
String::from_utf8(recv_message.get(0).unwrap().to_vec()).unwrap();
assert_eq!(cloned_payload, recv_payload);
cloned_sub_sender.send(()).await.unwrap();
}
}));
}
drop(sub_results_sender);
let res_vec: Vec<()> = sub_results.collect().await;
assert_eq!(100, res_vec.len());
server_stop_sender.send(()).unwrap();
for t in task_handles {
t.await.expect("Task failed unexpectedly!");
}
}
#[cfg(all(feature = "ipc", target_family = "unix"))]
let mut addrs = vec![
"tcp://localhost:0",
"tcp://127.0.0.1:0",
"tcp://[::1]:0",
"tcp://127.0.0.1:0",
"tcp://localhost:0",
"tcp://127.0.0.1:0",
"tcp://[::1]:0",
];
#[cfg(not(all(feature = "ipc", target_family = "unix")))]
let addrs = vec![
"tcp://localhost:0",
"tcp://127.0.0.1:0",
"tcp://[::1]:0",
"tcp://127.0.0.1:0",
"tcp://localhost:0",
"tcp://127.0.0.1:0",
"tcp://[::1]:0",
];
#[cfg(all(feature = "ipc", target_family = "unix"))]
addrs.extend_from_slice(&["ipc://asdf.sock", "ipc://anothersocket-asdf"]);
futures::future::join_all(addrs.into_iter().map(helper)).await;
}
}