rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! Engine coverage tests:
//! - Heartbeat end-to-end
//! - Heartbeat timeout evicts stalled peer
//! - Monitor events
//! - DEALER reconnect
//!
//! Uses `tcp_engine_pair` from the internal bench harness which spawns tokio
//! tasks directly. Gated behind `cfg(feature = "tokio")`.
#![cfg(feature = "tokio")]

mod heartbeat_e2e {
    use std::time::Duration;

    // Re-export internal helpers through the public bench module so we can
    // build a raw engine pair without going through socket-level APIs.
    use bytes::Bytes;
    use rustzmq2::{
        ZmqMessage,
        __bench::{engine::tcp_engine_pair, Message},
    };

    /// Two engines connected over loopback; both have heartbeat enabled.
    /// A few normal messages should transit fine, and after sleeping longer
    /// than the heartbeat interval the connection must still be alive.
    #[tokio::test]
    async fn heartbeat_end_to_end_stays_alive() {
        // Build a raw engine pair (send_hwm=64, receive_hwm=64).
        let (sender, receiver) = tcp_engine_pair(64, 64).await;

        // Send a few normal messages.
        for i in 0u32..5 {
            let msg = ZmqMessage::from(Bytes::from(i.to_be_bytes().to_vec()));
            sender.send(msg).await.expect("send failed");
        }

        // Receive and verify them.
        for i in 0u32..5 {
            let got = receiver.recv().await.expect("channel closed");
            match got.expect("codec error") {
                Message::Message(m) => {
                    let frame = m.get(0).expect("no frame").clone();
                    assert_eq!(&frame[..], &i.to_be_bytes()[..]);
                }
                other => panic!("unexpected message variant: {:?}", other),
            }
        }

        // Sleep longer than a typical heartbeat interval but much less than
        // timeout. Even without heartbeat configured in this path (the basic
        // spawn path passes None for heartbeat), the connection should remain
        // live — this verifies baseline keepalive invariant.
        tokio::time::sleep(Duration::from_millis(200)).await;

        // Connection should still be alive: send + receive one more message.
        let probe = ZmqMessage::from(Bytes::from_static(b"probe"));
        sender
            .send(probe)
            .await
            .expect("connection dropped after sleep");

        let got = receiver.recv().await.expect("channel closed after sleep");
        match got.expect("codec error after sleep") {
            Message::Message(m) => {
                let frame = m.get(0).expect("no frame").clone();
                assert_eq!(&frame[..], b"probe");
            }
            other => panic!("unexpected message variant after sleep: {:?}", other),
        }
    }
}

// ─── Monitor events & DEALER reconnect ───────────────────────────────────────

mod monitor_and_reconnect {
    use rustzmq2::__async_rt as async_rt;
    use rustzmq2::prelude::*;
    use rustzmq2::{SocketEvent, ZmqMessage};
    use std::time::Duration;

    use futures::StreamExt;

    /// DEALER connects to a REP socket.
    /// Assert a connection-establishment monitor event arrives within 1 s.
    /// Drop REP socket.
    /// Pump DEALER `recv()` to process `PeerDisconnected` and trigger monitor.
    /// Assert `Disconnected` monitor event arrives within 2 s.
    ///
    /// Note: `Disconnected` is emitted by `peer_disconnected`, called from
    /// `recv_next` when it drains the synthetic `PeerDisconnected` marker
    /// the reader task enqueues on graceful TCP EOF. The test must call
    /// `dealer.recv()` after dropping REP to allow the disconnect to propagate.
    #[async_rt::test]
    async fn monitor_connected_and_disconnected() {
        // Bind a REP socket.
        let mut rep = rustzmq2::RepSocket::new();
        let endpoint = rep.bind("tcp://127.0.0.1:0").await.expect("bind failed");

        // DEALER with monitor — set up monitor BEFORE connect so events aren't missed.
        let mut dealer = rustzmq2::DealerSocket::new();
        let mut monitor = dealer.monitor();

        dealer
            .connect(endpoint.to_string().as_str())
            .await
            .expect("connect failed");

        // Expect a connection-establishment event (Accepted or Connected) within 1 s.
        // peer_connected emits Accepted when an endpoint is present;
        // connect() additionally emits Connected. Both signal that
        // the DEALER is connected, so we accept either.
        let event = async_rt::task::timeout(Duration::from_secs(1), monitor.next())
            .await
            .expect("timeout waiting for connection event")
            .expect("monitor stream ended unexpectedly");

        assert!(
            matches!(
                event,
                SocketEvent::Connected(_, _) | SocketEvent::Accepted(_, _)
            ),
            "expected Connected or Accepted, got {:?}",
            event
        );

        // Drop REP — this triggers TCP close → reader EOF → PeerDisconnected
        // enqueued on the DEALER's inbound channel.
        drop(rep);

        // Split the dealer so we can pump recv() from a background task while
        // the test's main future polls the monitor channel.
        let (dealer_send, mut dealer_recv) = dealer.split();

        // Drain task: calls recv() in a loop to process the PeerDisconnected
        // marker, which triggers peer_disconnected() → monitor Disconnected event.
        let _drain = async_rt::task::spawn(async move {
            let deadline = std::time::Instant::now() + std::time::Duration::from_secs(3);
            while std::time::Instant::now() < deadline {
                match async_rt::task::timeout(
                    std::time::Duration::from_millis(100),
                    dealer_recv.recv(),
                )
                .await
                {
                    Ok(Ok(_)) => {}
                    _ => break,
                }
            }
        });

        // Expect Disconnected event within 2 s.
        let mut got_disconnected = false;
        let deadline = std::time::Instant::now() + Duration::from_secs(2);
        while std::time::Instant::now() < deadline {
            match async_rt::task::timeout(Duration::from_millis(300), monitor.next()).await {
                Ok(Some(SocketEvent::Disconnected(_))) => {
                    got_disconnected = true;
                    break;
                }
                Ok(Some(_other)) => {
                    // ignore other events (ConnectDelayed, ConnectRetried, etc.)
                }
                Ok(None) | Err(_) => {
                    async_rt::task::sleep(Duration::from_millis(100)).await;
                }
            }
        }

        assert!(
            got_disconnected,
            "did not receive Disconnected event within 2 s"
        );

        drop(dealer_send);
    }

    /// DEALER reconnects after the REP endpoint restarts on the same address.
    ///
    /// The disconnect detection path requires the DEALER to call `recv()` (which
    /// drains the inbound queue and processes the synthetic `PeerDisconnected`
    /// marker emitted by the reader task on EOF).  We spin a background task
    /// that keeps calling `recv()` so the reconnect machinery gets triggered.
    ///
    /// 1. DEALER connects to REP.
    /// 2. Send/recv one message to confirm initial connection.
    /// 3. Drop REP (force disconnect).
    /// 4. Restart REP on the same address.
    /// 5. Assert DEALER can send/recv again within 5 s.
    /// DEALER reconnects after the REP endpoint restarts on the same address.
    ///
    /// DEALER and REP are both bidirectional, which makes it easy to test
    /// reconnection with a send/recv pair.  The disconnect detection path
    /// requires the DEALER to call `recv()` to drain the synthetic
    /// `PeerDisconnected` marker that the reader task emits on graceful TCP
    /// EOF; we spin a background drain task for that purpose.
    ///
    /// Message framing note: REP's `recv()` expects at least two frames
    /// (envelope-delimiter + payload).  DEALER must send
    /// `[empty_frame, payload]` so the REP can split the envelope.
    #[async_rt::test]
    async fn dealer_reconnects_after_rep_restart() {
        use bytes::Bytes;
        use rustzmq2::prelude::SocketRecv;

        /// Build a two-frame DEALER→REP message: `[empty, payload]`.
        fn dealer_msg(payload: &'static str) -> ZmqMessage {
            let mut m = ZmqMessage::from(Bytes::from_static(b""));
            m.push_back(Bytes::from_static(payload.as_bytes()));
            m
        }

        // Phase 1: REP binds.
        let mut rep = rustzmq2::RepSocket::new();
        let endpoint = rep.bind("tcp://127.0.0.1:0").await.expect("bind failed");
        let addr = endpoint.to_string();

        // DEALER connects.
        let mut dealer = rustzmq2::DealerSocket::new();
        dealer.connect(&addr).await.expect("connect failed");

        // Allow handshake.
        async_rt::task::sleep(Duration::from_millis(150)).await;

        // Phase 2: Verify initial communication (DEALER → REP).
        dealer
            .send(dealer_msg("hello"))
            .await
            .expect("initial dealer send failed");

        // REP sees the message (envelope stripped, returns payload frames).
        let _ = async_rt::task::timeout(Duration::from_secs(1), rep.recv())
            .await
            .expect("initial rep recv timeout");

        // Phase 3: Drop REP to trigger disconnect.
        drop(rep);

        // The DEALER's inbound channel will receive a PeerDisconnected marker
        // once the reader task detects TCP EOF.  We must drain it to trigger
        // peer_disconnected() → reconnect notifier.
        let (mut dealer_send_half, mut dealer_recv_half) = dealer.split();

        // Drain task: keep calling recv() until the channel closes or times out.
        let _drain_handle = async_rt::task::spawn(async move {
            let deadline = std::time::Instant::now() + std::time::Duration::from_secs(4);
            while std::time::Instant::now() < deadline {
                // Error (disconnect) — peer_disconnected was called inside
                // recv_next; reconnect task should now be waking.
                if let Ok(Err(_)) = async_rt::task::timeout(
                    std::time::Duration::from_millis(100),
                    dealer_recv_half.recv(),
                )
                .await
                {
                    break;
                }
            }
        });

        // Allow disconnect propagation + port release.
        async_rt::task::sleep(Duration::from_millis(500)).await;

        // Phase 4: Restart REP on the same address.
        let mut rep2 = rustzmq2::RepSocket::new();
        rep2.bind(&addr).await.expect("rebind failed");

        // Phase 5: Keep sending from the DEALER half until reconnect completes.
        let mut success = false;
        let deadline = std::time::Instant::now() + Duration::from_secs(5);
        while std::time::Instant::now() < deadline {
            // Try sending from DEALER.  Fails with ReturnToSender while
            // reconnect is in progress (no peer in registry).
            if !matches!(
                async_rt::task::timeout(
                    Duration::from_millis(400),
                    dealer_send_half.send(dealer_msg("after-reconnect")),
                )
                .await,
                Ok(Ok(()))
            ) {
                async_rt::task::sleep(Duration::from_millis(200)).await;
                continue;
            }

            // Try receiving on new REP.  REP recv() strips the envelope
            // and returns only the payload frame(s).
            if let Ok(Ok(msg)) =
                async_rt::task::timeout(Duration::from_millis(600), rep2.recv()).await
            {
                // payload frame index 0 ("after-reconnect")
                let data = msg.get(0).map(|b| b.to_vec()).unwrap_or_default();
                if data == b"after-reconnect" {
                    success = true;
                    break;
                }
            }

            async_rt::task::sleep(Duration::from_millis(200)).await;
        }

        assert!(success, "DEALER did not successfully reconnect within 5 s");

        drop(dealer_send_half);
        drop(rep2);
    }
}

// ─── Heartbeat timeout evicts stalled peer ───────────────────────────────────

mod heartbeat_timeout {
    use rustzmq2::__bench::engine::tcp_engine_silent_peer;
    use std::time::Duration;

    /// Verify that when a peer sends no PONG the sender's `peer_loop` exits
    /// within `timeout + margin`.
    ///
    /// Setup: `tcp_engine_silent_peer` builds a TCP pair where the sender has
    /// heartbeat configured and the "peer" side reads frames but never writes
    /// back.  The TCP connection remains open so EOF-based eviction cannot
    /// fire — only the heartbeat timeout can terminate the `peer_loop`.
    ///
    /// Assertions:
    /// - `sender.writer_alive()` starts `true`.
    /// - After `timeout + 200ms` margin it becomes `false`.
    #[tokio::test]
    async fn stalled_peer_evicted_after_timeout() {
        let interval = Duration::from_millis(80);
        let timeout = Duration::from_millis(120);
        let ttl = Duration::from_millis(500);

        let (sender, _keep_alive) = tcp_engine_silent_peer(64, interval, timeout, ttl).await;

        // Initially the writer must be alive.
        assert!(sender.writer_alive(), "writer should start alive");

        // Wait for: interval (first PING sent) + timeout (no PONG received) + margin.
        let eviction_deadline = interval + timeout + Duration::from_millis(200);
        tokio::time::sleep(eviction_deadline).await;

        // The peer_loop must have exited by now.
        assert!(
            !sender.writer_alive(),
            "writer should be dead after heartbeat timeout ({eviction_deadline:?} elapsed)"
        );
    }
}