rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
#[cfg(test)]
mod test {

    use bytes::Bytes;
    use rustzmq2::__async_rt as async_rt;
    use rustzmq2::prelude::*;
    use rustzmq2::ZmqMessage;

    use std::error::Error;
    use std::time::Duration;

    fn assert_send<T: Send>() {}
    fn assert_clone<T: Clone>() {}

    #[test]
    fn split_halves_are_send() {
        assert_send::<rustzmq2::RouterSendHalf>();
        assert_send::<rustzmq2::RouterRecvHalf>();
    }

    #[test]
    fn router_send_half_is_clone() {
        assert_clone::<rustzmq2::RouterSendHalf>();
    }

    #[async_rt::test]
    async fn test_router_split_concurrent_send_recv() -> Result<(), Box<dyn Error>> {
        pretty_env_logger::try_init().ok();

        // ROUTER binds, DEALER connects
        let mut router = rustzmq2::RouterSocket::new();
        let endpoint = router.bind("tcp://localhost:0").await?;

        let mut dealer = rustzmq2::DealerSocket::new();
        dealer.connect(endpoint.to_string().as_str()).await?;

        async_rt::task::sleep(Duration::from_millis(100)).await;

        // Split the router into send/recv halves
        let (mut send_half, mut recv_half) = router.split();

        let num_messages: u32 = 5;

        // Dealer sends messages
        let dealer_task = async_rt::task::spawn(async move {
            for i in 0..num_messages {
                dealer
                    .send(ZmqMessage::from(format!("msg-{}", i)))
                    .await
                    .unwrap();
            }
            // Wait for all replies
            for _ in 0..num_messages {
                let _reply = dealer.recv().await.unwrap();
            }
        });

        // Router recv half receives identity-framed messages
        // Router send half echoes them back using the identity
        let echo_task = async_rt::task::spawn(async move {
            for _ in 0..num_messages {
                // recv gives [identity, ...payload]
                let msg = recv_half.recv().await.unwrap();
                // send expects [identity, ...payload] to route back
                send_half.send(msg).await.unwrap();
            }
        });

        let timeout = Duration::from_secs(5);
        async_rt::task::timeout(timeout, async {
            echo_task.await.unwrap();
            dealer_task.await.unwrap();
        })
        .await
        .expect("test timed out");

        Ok(())
    }

    #[async_rt::test]
    async fn test_router_split_send_half_clone_concurrent_send() -> Result<(), Box<dyn Error>> {
        pretty_env_logger::try_init().ok();

        fn message_for(identity: Bytes, payload: String) -> ZmqMessage {
            let mut msg = ZmqMessage::from(payload);
            msg.push_front(identity);
            msg
        }

        // ROUTER binds, DEALER connects
        let mut router = rustzmq2::RouterSocket::new();
        let endpoint = router.bind("tcp://localhost:0").await?;

        let mut dealer = rustzmq2::DealerSocket::new();
        dealer.connect(endpoint.to_string().as_str()).await?;

        async_rt::task::sleep(Duration::from_millis(100)).await;

        // Split the router into send/recv halves and clone the send halves
        let (send_half, mut recv_half) = router.split();
        let mut send_half_1 = send_half.clone();
        let mut send_half_2 = send_half;

        let num_messages: u32 = 10;

        // Dealer sends one message so router recv half can learn its identity
        dealer.send(ZmqMessage::from("register-dealer")).await?;
        let registration = recv_half.recv().await?;
        let dealer_identity = registration.get(0).unwrap().clone();

        let mut expected = Vec::new();
        for i in 0..num_messages {
            expected.push(format!("sender-1->{i}"));
            expected.push(format!("sender-2->{i}"));
        }
        expected.sort();

        // First cloned send half sends its own series of messages
        let dealer_identity_1 = dealer_identity.clone();
        let send_task_1 = async_rt::task::spawn(async move {
            for i in 0..num_messages {
                let message = message_for(dealer_identity_1.clone(), format!("sender-1->{i}"));
                send_half_1.send(message).await.unwrap();
            }
        });

        // Second cloned send half sends concurrently to the same dealer
        let dealer_identity_2 = dealer_identity.clone();
        let send_task_2 = async_rt::task::spawn(async move {
            for i in 0..num_messages {
                let message = message_for(dealer_identity_2.clone(), format!("sender-2->{i}"));
                send_half_2.send(message).await.unwrap();
            }
        });

        // Dealer receives the full set of messages from both senders
        let dealer_task = async_rt::task::spawn(async move {
            let mut received = Vec::new();
            for _ in 0..(num_messages * 2) {
                let reply = dealer.recv().await.unwrap();
                received.push(String::try_from(reply).unwrap());
            }
            received.sort();
            received
        });

        let timeout = Duration::from_secs(5);
        async_rt::task::timeout(timeout, async {
            send_task_1.await.unwrap();
            send_task_2.await.unwrap();
            assert_eq!(dealer_task.await.unwrap(), expected);
        })
        .await
        .expect("test timed out");

        Ok(())
    }
}