rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! Tests for LINGER, CONFLATE, and IMMEDIATE socket options.

#[cfg(all(test, feature = "tokio", feature = "tcp"))]
mod test {
    use rustzmq2::__async_rt as async_rt;
    use rustzmq2::prelude::*;
    use rustzmq2::{PullSocket, PushSocket, ZmqError};

    use std::time::Duration;

    // ── LINGER ───────────────────────────────────────────────────────────────────

    /// `LINGER=ZERO`: `close()` returns immediately even if messages are pending.
    #[async_rt::test]
    async fn linger_zero_close_is_immediate() {
        let mut push = PushSocket::builder().linger(Some(Duration::ZERO)).build();
        let ep = push.bind("tcp://127.0.0.1:0").await.unwrap();
        let mut pull = PullSocket::new();
        pull.connect(&ep.to_string()).await.unwrap();

        // Send a burst so there are likely queued messages.
        for i in 0u32..20 {
            let _ = push
                .send(rustzmq2::ZmqMessage::from(i.to_be_bytes().to_vec()))
                .await;
        }
        let start = std::time::Instant::now();
        let _ = push.close().await;
        let elapsed = start.elapsed();
        assert!(
            elapsed < Duration::from_millis(500),
            "linger=ZERO close took {:?}",
            elapsed
        );
    }

    /// LINGER with a finite timeout: `close()` waits up to that duration for
    /// the outbound queue to drain, then returns.
    #[async_rt::test]
    async fn linger_positive_drains_before_close() {
        let mut push = PushSocket::builder()
            .linger(Some(Duration::from_millis(500)))
            .send_hwm(1000)
            .build();
        let ep = push.bind("tcp://127.0.0.1:0").await.unwrap();
        let mut pull = PullSocket::new();
        pull.connect(&ep.to_string()).await.unwrap();

        // Give the connection time to establish.
        async_rt::task::sleep(Duration::from_millis(50)).await;

        // Send a small number of messages.
        for i in 0u32..5 {
            push.send(rustzmq2::ZmqMessage::from(i.to_be_bytes().to_vec()))
                .await
                .unwrap();
        }

        // close() with a positive linger should complete without hanging.
        let start = std::time::Instant::now();
        let _ = push.close().await;
        let elapsed = start.elapsed();
        assert!(
            elapsed < Duration::from_secs(2),
            "linger=500ms close took {:?}",
            elapsed
        );

        // The pull side should have received at least some messages.
        let mut count = 0usize;
        while async_rt::task::timeout(Duration::from_millis(100), pull.recv())
            .await
            .is_ok()
        {
            count += 1;
        }
        assert!(count > 0, "expected at least one message to drain");
    }

    // ── IMMEDIATE ────────────────────────────────────────────────────────────────

    /// `IMMEDIATE=true`: `send()` returns `ReturnToSender` when no peers are connected.
    #[async_rt::test]
    async fn immediate_returns_error_when_no_peers() {
        let mut push = PushSocket::builder().immediate(true).build();
        push.bind("tcp://127.0.0.1:0").await.unwrap();

        let result = push
            .send(rustzmq2::ZmqMessage::from(b"hello".to_vec()))
            .await;
        assert!(
            matches!(result, Err(ZmqError::ReturnToSender { .. })),
            "expected ReturnToSender, got {:?}",
            result
        );
    }

    /// `IMMEDIATE=false` (default): `send()` returns `ReturnToSender` (no peers), but
    /// does NOT fail with a transient "no slot" error — delivery should succeed
    /// once a peer connects. This test verifies that once a peer connects, send works.
    #[async_rt::test]
    async fn immediate_false_delivers_when_peer_connects() {
        let mut push = PushSocket::builder().immediate(false).build();
        let ep = push.bind("tcp://127.0.0.1:0").await.unwrap();

        let ep_str = ep.to_string();
        let task = async_rt::task::spawn(async move {
            let mut pull = PullSocket::new();
            async_rt::task::sleep(Duration::from_millis(50)).await;
            pull.connect(&ep_str).await.unwrap();
            pull.recv().await
        });

        // Allow peer to connect first, then send.
        async_rt::task::sleep(Duration::from_millis(100)).await;
        push.send(rustzmq2::ZmqMessage::from(b"hello".to_vec()))
            .await
            .unwrap();

        let msg = task.await.unwrap().unwrap();
        assert_eq!(msg.get(0).unwrap().as_ref(), b"hello");
    }

    // ── CONFLATE ─────────────────────────────────────────────────────────────────

    /// `CONFLATE=true`: rapid sends; `recv()` returns only the most recent message.
    #[async_rt::test]
    async fn conflate_keeps_only_latest_message() {
        let mut push = PushSocket::new();
        let ep = push.bind("tcp://127.0.0.1:0").await.unwrap();

        let mut pull = PullSocket::builder().conflate(true).build();
        pull.connect(&ep.to_string()).await.unwrap();

        // Give the connection time to establish.
        async_rt::task::sleep(Duration::from_millis(50)).await;

        // Send several messages rapidly.
        for i in 0u32..20 {
            let _ = push
                .send(rustzmq2::ZmqMessage::from(i.to_be_bytes().to_vec()))
                .await;
        }

        // Give messages time to arrive at the pull socket.
        async_rt::task::sleep(Duration::from_millis(100)).await;

        // With conflate, we should receive at most a few messages — ideally
        // one that is among the later ones.
        let mut received: Vec<u32> = Vec::new();
        while let Ok(msg) = async_rt::task::timeout(Duration::from_millis(50), pull.recv()).await {
            let frame = msg.unwrap().get(0).unwrap().clone();
            let val = u32::from_be_bytes(frame[..4].try_into().unwrap());
            received.push(val);
        }

        // We should have received at least one message.
        assert!(!received.is_empty(), "expected at least one message");
        // With conflate we should NOT have received all 20 messages (the
        // slot was overwritten). Allow for timing variance but require
        // significantly fewer than 20.
        assert!(
            received.len() < 20,
            "expected conflate to drop old messages, got {} messages",
            received.len()
        );
        // The last received value should be among the later ones (at least >= 5).
        let last = *received.last().unwrap();
        assert!(
            last >= 5,
            "expected a later message to survive conflate, got {}",
            last
        );
    }
}