rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! `#[doc(hidden)]` benchmark surface.
//!
//! Exposed via `zeromq::__bench` so criterion microbenchmarks can reach
//! in and drive the engine layer directly without building a full
//! socket. Stability: **none**. Anything here may change without notice.

pub use crate::codec::zmq_codec::ZmqCodec;
pub use crate::codec::Message;

/// Benchmark helpers for the engine path. Bundled as a single module so
/// we don't have to widen visibility on every internal type involved.
#[cfg(all(feature = "tokio", feature = "tcp"))]
pub mod engine {
    use crate::codec::handshake::greet_exchange;
    use crate::codec::{CodecError, DefaultFramedIo as FramedIo, Message};
    use crate::engine::PeerEngine;
    use crate::error::SendError;
    use crate::message::ZmqMessage;
    use crate::PeerIdentity;
    use flume::Receiver;

    pub use crate::error::SendError as EngineSendError;

    /// Open a TCP pair + greeting exchange + pair of `PeerEngine`s.
    /// Returns `(sender, receiver)` where messages sent via `sender`
    /// arrive on `receiver`. The `receive_hwm` parameter sizes the
    /// receiver's shared inbound channel.
    pub async fn tcp_engine_pair(
        send_hwm: usize,
        receive_hwm: usize,
    ) -> (SenderHandle, ReceiverHandle) {
        use tokio::net::{TcpListener, TcpStream};
        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = listener.local_addr().unwrap();
        let connect_fut = TcpStream::connect(addr);
        let (accept_res, connect_res) = futures::join!(listener.accept(), connect_fut);
        let (server, _) = accept_res.unwrap();
        let client = connect_res.unwrap();
        let mut io_a = FramedIo::from_tcp(server);
        let mut io_b = FramedIo::from_tcp(client);
        let (a, b) = futures::join!(greet_exchange(&mut io_a), greet_exchange(&mut io_b));
        a.unwrap();
        b.unwrap();
        #[cfg(feature = "curve")]
        let (a_read, a_write, _) = io_a.into_parts();
        #[cfg(not(feature = "curve"))]
        let (a_read, a_write) = io_a.into_parts();
        #[cfg(feature = "curve")]
        let (b_read, b_write, _) = io_b.into_parts();
        #[cfg(not(feature = "curve"))]
        let (b_read, b_write) = io_b.into_parts();
        // Each engine feeds its own shared inbound. The sender side
        // (engine_a) never receives in this harness, so its inbound
        // is a small dummy.
        let (a_in_tx, _a_in_rx) = flume::bounded(8);
        let (b_in_tx, b_in_rx) = flume::bounded(receive_hwm);
        // Bench harness: no registry, so arbitrary PeerKey values are fine.
        let tx = PeerEngine::spawn(
            0,
            PeerIdentity::new(),
            a_read,
            a_write.into_engine_writer(),
            send_hwm,
            a_in_tx,
            crate::engine::peer_loop::PeerConfig::default(),
        );
        let rx = PeerEngine::spawn(
            1,
            PeerIdentity::new(),
            b_read,
            b_write.into_engine_writer(),
            send_hwm,
            b_in_tx,
            crate::engine::peer_loop::PeerConfig::default(),
        );
        (
            SenderHandle(tx),
            ReceiverHandle {
                _engine: rx,
                inbound: b_in_rx,
            },
        )
    }

    /// Like `tcp_engine_pair` but with heartbeat enabled on the sender side.
    /// The receiver does NOT have heartbeat configured — it will reply to
    /// incoming PINGs via the inline PONG path in `peer_loop`, but it
    /// won't send PINGs of its own.
    ///
    /// Use `drop_receiver` to simulate a stalled peer: with no task
    /// draining the receiver's inbound channel, the sender's PONG replies
    /// can still transit (the TCP socket stays open), but if the TCP
    /// connection is severed the sender will time out.
    ///
    /// For the eviction test we instead want to prevent the receiver from
    /// ever sending back a PONG.  We achieve this by dropping the
    /// `ReceiverHandle` while keeping the TCP connection alive — the
    /// receiver's `PeerEngine` is dropped so its `peer_loop` exits,
    /// closing the write side of the TCP stream, which causes the sender
    /// to see EOF and disconnect independently of the heartbeat.
    ///
    /// A cleaner approach: build a one-sided pair where only the sender
    /// has a live engine and the receiver side is a raw `TcpStream` that
    /// never writes back.  That's what `tcp_engine_silent_peer` provides.
    pub async fn tcp_engine_pair_with_heartbeat(
        send_hwm: usize,
        receive_hwm: usize,
        heartbeat_interval: std::time::Duration,
        heartbeat_timeout: std::time::Duration,
        heartbeat_ttl: std::time::Duration,
    ) -> (SenderHandle, ReceiverHandle) {
        use crate::engine::HeartbeatConfig;
        use tokio::net::{TcpListener, TcpStream};
        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = listener.local_addr().unwrap();
        let connect_fut = TcpStream::connect(addr);
        let (accept_res, connect_res) = futures::join!(listener.accept(), connect_fut);
        let (server, _) = accept_res.unwrap();
        let client = connect_res.unwrap();
        let mut io_a = FramedIo::from_tcp(server);
        let mut io_b = FramedIo::from_tcp(client);
        let (a, b) = futures::join!(greet_exchange(&mut io_a), greet_exchange(&mut io_b));
        a.unwrap();
        b.unwrap();
        #[cfg(feature = "curve")]
        let (a_read, a_write, _) = io_a.into_parts();
        #[cfg(not(feature = "curve"))]
        let (a_read, a_write) = io_a.into_parts();
        #[cfg(feature = "curve")]
        let (b_read, b_write, _) = io_b.into_parts();
        #[cfg(not(feature = "curve"))]
        let (b_read, b_write) = io_b.into_parts();
        let (a_in_tx, _a_in_rx) = flume::bounded(8);
        let (b_in_tx, b_in_rx) = flume::bounded(receive_hwm);
        // Sender has heartbeat configured.
        let tx = PeerEngine::spawn(
            0,
            PeerIdentity::new(),
            a_read,
            a_write.into_engine_writer(),
            send_hwm,
            a_in_tx,
            crate::engine::peer_loop::PeerConfig {
                heartbeat: Some(HeartbeatConfig {
                    interval: heartbeat_interval,
                    timeout: heartbeat_timeout,
                    ttl: heartbeat_ttl,
                }),
                ..Default::default()
            },
        );
        let rx = PeerEngine::spawn(
            1,
            PeerIdentity::new(),
            b_read,
            b_write.into_engine_writer(),
            send_hwm,
            b_in_tx,
            crate::engine::peer_loop::PeerConfig::default(),
        );
        (
            SenderHandle(tx),
            ReceiverHandle {
                _engine: rx,
                inbound: b_in_rx,
            },
        )
    }

    /// Build a one-sided engine: the sender has heartbeat configured, the
    /// peer side reads frames and discards them (never sends back a PONG).
    /// The TCP connection stays open so EOF-based detection doesn't fire —
    /// only the heartbeat timeout can evict the peer.
    ///
    /// Returns `(SenderHandle, _keep_alive)` where `_keep_alive` must be
    /// held for the duration of the test to prevent TCP FIN.
    pub async fn tcp_engine_silent_peer(
        send_hwm: usize,
        heartbeat_interval: std::time::Duration,
        heartbeat_timeout: std::time::Duration,
        heartbeat_ttl: std::time::Duration,
    ) -> (SenderHandle, tokio::task::JoinHandle<()>) {
        use crate::engine::HeartbeatConfig;
        use tokio::net::{TcpListener, TcpStream};
        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = listener.local_addr().unwrap();
        let (accept_res, connect_res) = futures::join!(listener.accept(), TcpStream::connect(addr));
        let (server, _) = accept_res.unwrap();
        let client = connect_res.unwrap();
        let mut io_a = FramedIo::from_tcp(client);
        let mut io_b = FramedIo::from_tcp(server);
        // Both sides complete ZMTP greeting so the sender advances past
        // greet_exchange.
        let (a, b) = futures::join!(greet_exchange(&mut io_a), greet_exchange(&mut io_b));
        a.unwrap();
        b.unwrap();
        #[cfg(feature = "curve")]
        let (a_read, a_write, _) = io_a.into_parts();
        #[cfg(not(feature = "curve"))]
        let (a_read, a_write) = io_a.into_parts();
        let (a_in_tx, _a_in_rx) = flume::bounded(8);
        let tx = PeerEngine::spawn(
            0,
            PeerIdentity::new(),
            a_read,
            a_write.into_engine_writer(),
            send_hwm,
            a_in_tx,
            crate::engine::peer_loop::PeerConfig {
                heartbeat: Some(HeartbeatConfig {
                    interval: heartbeat_interval,
                    timeout: heartbeat_timeout,
                    ttl: heartbeat_ttl,
                }),
                ..Default::default()
            },
        );
        // The silent-peer task drains io_b's read side (so TCP recv
        // buffer doesn't fill up and stall the write path) but never
        // writes anything back.  Holding the JoinHandle prevents the task
        // from being dropped, keeping the TCP connection open.
        let keep_alive = tokio::spawn(async move {
            use futures::StreamExt as _;
            while io_b.read_half.next().await.is_some() {}
        });
        (SenderHandle(tx), keep_alive)
    }

    pub struct SenderHandle(PeerEngine);
    pub struct ReceiverHandle {
        _engine: PeerEngine,
        inbound: Receiver<(
            crate::engine::registry::PeerKey,
            Result<Message, CodecError>,
        )>,
    }

    impl SenderHandle {
        pub async fn send(&self, msg: ZmqMessage) -> Result<(), SendError> {
            self.0.send(Message::Message(msg)).await
        }

        /// Returns `true` while the underlying `peer_loop` is still running.
        /// Becomes `false` after a heartbeat timeout or TCP disconnect.
        pub fn writer_alive(&self) -> bool {
            self.0.writer_alive()
        }
    }

    impl ReceiverHandle {
        pub async fn recv(&self) -> Option<Result<Message, CodecError>> {
            self.inbound.recv_async().await.ok().map(|(_, res)| res)
        }
    }
}