#[cfg(test)]
mod test {
use rustzmq2::__async_rt as async_rt;
use rustzmq2::prelude::*;
use rustzmq2::ZmqMessage;
use std::time::Duration;
#[async_rt::test]
async fn test_xpub_basic_pubsub() {
pretty_env_logger::try_init().ok();
let mut xpub_socket = rustzmq2::XPubSocket::new();
let bound_to = xpub_socket
.bind("tcp://127.0.0.1:0")
.await
.expect("Failed to bind");
let bound_addr = bound_to.to_string();
let sub_handle = async_rt::task::spawn(async move {
let mut sub_socket = rustzmq2::SubSocket::new();
sub_socket
.connect(&bound_addr)
.await
.expect("Failed to connect");
sub_socket.subscribe("").await.expect("Failed to subscribe");
async_rt::task::sleep(Duration::from_millis(200)).await;
let mut received = Vec::new();
for _ in 0..5 {
let msg = sub_socket.recv().await.expect("Failed to receive");
let data = String::from_utf8(msg.get(0).unwrap().to_vec()).unwrap();
received.push(data);
}
received
});
let sub_msg = async_rt::task::timeout(Duration::from_secs(2), xpub_socket.recv())
.await
.expect("Timeout waiting for subscription")
.expect("Failed to receive subscription");
let data = sub_msg.get(0).unwrap();
assert_eq!(data[0], 1); assert_eq!(&data[1..], b"");
async_rt::task::sleep(Duration::from_millis(100)).await;
for i in 0..5 {
let msg = ZmqMessage::from(format!("message-{}", i));
xpub_socket.send(msg).await.expect("Failed to send");
}
let received = sub_handle.await.expect("SUB task failed");
assert_eq!(received.len(), 5);
for (i, msg) in received.iter().enumerate() {
assert_eq!(msg, &format!("message-{}", i));
}
}
#[async_rt::test]
async fn test_xpub_receives_unsubscribe() {
pretty_env_logger::try_init().ok();
let mut xpub_socket = rustzmq2::XPubSocket::new();
let bound_to = xpub_socket
.bind("tcp://127.0.0.1:0")
.await
.expect("Failed to bind");
let bound_addr = bound_to.to_string();
let handle = async_rt::task::spawn(async move {
let mut sub_socket = rustzmq2::SubSocket::new();
sub_socket
.connect(&bound_addr)
.await
.expect("Failed to connect");
sub_socket
.subscribe("test")
.await
.expect("Failed to subscribe");
async_rt::task::sleep(Duration::from_millis(100)).await;
sub_socket
.unsubscribe("test")
.await
.expect("Failed to unsubscribe");
async_rt::task::sleep(Duration::from_millis(100)).await;
});
let sub_msg = async_rt::task::timeout(Duration::from_secs(2), xpub_socket.recv())
.await
.expect("Timeout")
.expect("Failed to receive");
let data = sub_msg.get(0).unwrap();
assert_eq!(data[0], 1); assert_eq!(&data[1..], b"test");
let unsub_msg = async_rt::task::timeout(Duration::from_secs(2), xpub_socket.recv())
.await
.expect("Timeout")
.expect("Failed to receive");
let data = unsub_msg.get(0).unwrap();
assert_eq!(data[0], 0); assert_eq!(&data[1..], b"test");
handle.await.expect("Task failed");
}
#[async_rt::test]
async fn test_xpub_filtered_subscriptions() {
pretty_env_logger::try_init().ok();
let mut xpub_socket = rustzmq2::XPubSocket::new();
let bound_to = xpub_socket
.bind("tcp://127.0.0.1:0")
.await
.expect("Failed to bind");
let bound_addr = bound_to.to_string();
let sub_handle = async_rt::task::spawn(async move {
let mut sub_socket = rustzmq2::SubSocket::new();
sub_socket
.connect(&bound_addr)
.await
.expect("Failed to connect");
sub_socket
.subscribe("topic1")
.await
.expect("Failed to subscribe");
async_rt::task::sleep(Duration::from_millis(200)).await;
let msg = sub_socket.recv().await.expect("Failed to receive");
String::from_utf8(msg.get(0).unwrap().to_vec()).unwrap()
});
let _sub_msg = async_rt::task::timeout(Duration::from_secs(2), xpub_socket.recv())
.await
.expect("Timeout")
.expect("Failed to receive subscription");
async_rt::task::sleep(Duration::from_millis(100)).await;
xpub_socket
.send(ZmqMessage::from("topic2-message"))
.await
.expect("Failed to send");
xpub_socket
.send(ZmqMessage::from("topic1-message"))
.await
.expect("Failed to send");
xpub_socket
.send(ZmqMessage::from("topic3-message"))
.await
.expect("Failed to send");
let received = sub_handle.await.expect("SUB task failed");
assert_eq!(received, "topic1-message");
}
}