zeromq 0.6.0

A native Rust implementation of ZeroMQ
Documentation
use zeromq::__async_rt as async_rt;
use zeromq::prelude::*;
use zeromq::{Endpoint, ZmqMessage};

use std::time::Duration;

fn tcp_endpoint(endpoint: Endpoint) -> String {
    match endpoint {
        Endpoint::Tcp(_, port) => format!("tcp://127.0.0.1:{port}"),
        _ => unreachable!("expected tcp endpoint"),
    }
}

#[async_rt::test]
async fn pub_sub_delivers_large_message() {
    let payload = vec![0xAB; 300_000];

    let mut sub_socket = zeromq::SubSocket::new();
    let endpoint = tcp_endpoint(sub_socket.bind("tcp://127.0.0.1:0").await.unwrap());
    sub_socket.subscribe("").await.unwrap();

    let mut pub_socket = zeromq::PubSocket::new();
    pub_socket.connect(&endpoint).await.unwrap();
    async_rt::task::sleep(Duration::from_millis(100)).await;

    pub_socket
        .send(ZmqMessage::from(payload.clone()))
        .await
        .unwrap();

    let received = async_rt::task::timeout(Duration::from_secs(2), sub_socket.recv())
        .await
        .expect("timeout waiting for large PUB message")
        .unwrap();
    assert_eq!(received.get(0).unwrap().as_ref(), payload.as_slice());
}

#[async_rt::test]
async fn xpub_sub_delivers_large_message() {
    let payload = vec![0xCD; 300_000];

    let mut xpub_socket = zeromq::XPubSocket::new();
    let endpoint = tcp_endpoint(xpub_socket.bind("tcp://127.0.0.1:0").await.unwrap());

    let mut sub_socket = zeromq::SubSocket::new();
    sub_socket.connect(&endpoint).await.unwrap();
    sub_socket.subscribe("").await.unwrap();

    let _subscription = async_rt::task::timeout(Duration::from_secs(2), xpub_socket.recv())
        .await
        .expect("timeout waiting for subscription")
        .unwrap();

    xpub_socket
        .send(ZmqMessage::from(payload.clone()))
        .await
        .unwrap();

    let received = async_rt::task::timeout(Duration::from_secs(2), sub_socket.recv())
        .await
        .expect("timeout waiting for large XPUB message")
        .unwrap();
    assert_eq!(received.get(0).unwrap().as_ref(), payload.as_slice());
}

#[async_rt::test]
async fn xsub_xpub_delivers_large_message() {
    let payload = vec![0xEF; 300_000];

    let mut xpub_socket = zeromq::XPubSocket::new();
    let endpoint = tcp_endpoint(xpub_socket.bind("tcp://127.0.0.1:0").await.unwrap());

    let mut xsub_socket = zeromq::XSubSocket::new();
    xsub_socket.connect(&endpoint).await.unwrap();
    async_rt::task::sleep(Duration::from_millis(100)).await;

    xsub_socket
        .send(ZmqMessage::from(payload.clone()))
        .await
        .unwrap();

    let received = async_rt::task::timeout(Duration::from_secs(2), xpub_socket.recv())
        .await
        .expect("timeout waiting for large XSUB message")
        .unwrap();
    assert_eq!(received.get(0).unwrap().as_ref(), payload.as_slice());
}