rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
#![cfg(feature = "inproc")]
mod helpers;

use rustzmq2::__async_rt as async_rt;
use rustzmq2::prelude::*;

use std::error::Error;

#[cfg(test)]
mod test {
    use super::*;

    #[async_rt::test]
    async fn test_inproc_req_rep() -> Result<(), Box<dyn Error>> {
        pretty_env_logger::try_init().ok();

        let mut rep = rustzmq2::RepSocket::new();
        rep.bind("inproc://test-req-rep").await?;

        let num_messages = 10;

        async_rt::task::spawn(async move {
            let mut req = rustzmq2::ReqSocket::new();
            req.connect("inproc://test-req-rep").await.unwrap();
            helpers::run_req_client(req, num_messages).await.unwrap();
        });

        helpers::run_rep_server(rep, num_messages).await?;
        Ok(())
    }

    #[async_rt::test]
    async fn test_inproc_pub_sub() -> Result<(), Box<dyn Error>> {
        pretty_env_logger::try_init().ok();

        let mut pub_socket = rustzmq2::PubSocket::new();
        pub_socket.bind("inproc://test-pub-sub").await?;

        let num_subscribers = 3usize;
        let num_messages = 5usize;

        let (tx, rx) = futures::channel::mpsc::channel::<usize>(num_subscribers);
        for i in 0..num_subscribers {
            let mut result_tx = tx.clone();
            async_rt::task::spawn(async move {
                let mut sub = rustzmq2::SubSocket::new();
                sub.connect("inproc://test-pub-sub").await.unwrap();
                sub.subscribe("").await.unwrap();
                let mut count = 0;
                while count < num_messages {
                    sub.recv().await.unwrap();
                    count += 1;
                }
                result_tx.try_send(count).unwrap();
                let _ = i;
            });
        }
        drop(tx);

        async_rt::task::sleep(std::time::Duration::from_millis(100)).await;

        for i in 0..num_messages {
            pub_socket.send(format!("message {i}")).await?;
        }

        use futures::StreamExt;
        let results: Vec<usize> = rx.collect().await;
        assert_eq!(results.len(), num_subscribers);
        for count in results {
            assert_eq!(count, num_messages);
        }
        Ok(())
    }

    #[async_rt::test]
    async fn test_inproc_pair() -> Result<(), Box<dyn Error>> {
        pretty_env_logger::try_init().ok();

        let mut server = rustzmq2::PairSocket::new();
        server.bind("inproc://test-pair").await?;

        let num_messages = 20u32;

        async_rt::task::spawn(async move {
            let mut client = rustzmq2::PairSocket::new();
            client.connect("inproc://test-pair").await.unwrap();
            for i in 0..num_messages {
                client.send(format!("msg {i}")).await.unwrap();
            }
        });

        for i in 0..num_messages {
            let msg = server.recv().await?;
            let s = String::from_utf8(msg.get(0).unwrap().to_vec()).unwrap();
            assert_eq!(s, format!("msg {i}"));
        }
        Ok(())
    }

    #[async_rt::test]
    async fn test_inproc_address_reuse_after_unbind() -> Result<(), Box<dyn Error>> {
        pretty_env_logger::try_init().ok();

        let mut server = rustzmq2::RepSocket::new();
        let endpoint = server.bind("inproc://test-reuse").await?;
        server.unbind(endpoint).await?;

        // Should be able to bind the same name again after unbind.
        let mut server2 = rustzmq2::RepSocket::new();
        server2.bind("inproc://test-reuse").await?;
        Ok(())
    }

    /// Inproc has no ZMTP greeting, so the socket-type compatibility check
    /// that libzmq does on the wire happens inside the inproc handshake
    /// instead. An incompatible pairing must fail at connect time rather
    /// than silently routing garbage frames.
    #[async_rt::test]
    async fn test_inproc_rejects_incompatible_socket_types() {
        pretty_env_logger::try_init().ok();

        let mut pub_sock = rustzmq2::PubSocket::new();
        let bound = pub_sock
            .bind("inproc://test-incompat")
            .await
            .unwrap()
            .to_string();

        let mut req = rustzmq2::ReqSocket::new();
        let err = req
            .connect(&bound)
            .await
            .expect_err("REQ→PUB should be rejected");
        let msg = err.to_string();
        assert!(
            msg.contains("socket type not compatible"),
            "unexpected error: {msg}"
        );

        // A compatible peer on the same endpoint should still succeed.
        let mut sub = rustzmq2::SubSocket::new();
        sub.connect(&bound).await.expect("SUB→PUB should succeed");
    }

    /// Fix #1 — libzmq propagates `ZMQ_ROUTING_ID` over inproc
    /// (`libzmq/src/socket_base.cpp:864`, `ctx.cpp:838-841`). DEALER sets
    /// a routing id, ROUTER receives messages tagged with it as the
    /// envelope identity. Before this fix we generated a random UUID,
    /// breaking identity-based routing.
    #[async_rt::test]
    async fn test_inproc_router_honors_dealer_routing_id() -> Result<(), Box<dyn Error>> {
        pretty_env_logger::try_init().ok();
        use rustzmq2::PeerIdentity;
        use std::str::FromStr;

        let mut router = rustzmq2::RouterSocket::new();
        router.bind("inproc://test-router-id").await?;

        let mut dealer = rustzmq2::DealerSocket::builder()
            .peer_identity(PeerIdentity::from_str("worker-7")?)
            .build();
        dealer.connect("inproc://test-router-id").await?;

        // Known race (TODO: fix): the ROUTER's inproc accept-side handshake
        // runs on a separate task and renames the peer registry entry to
        // the DEALER's advertised routing_id *after* both sides complete
        // their R2 key exchange. A `send` immediately after `connect()`
        // returns can race that rename — resulting in the ROUTER observing
        // the placeholder UUID instead of "worker-7". The rename should
        // really be part of the awaited handshake path; until then, yield
        // once so the accept task can run before we send.
        tokio::task::yield_now().await;

        dealer.send("hello").await?;

        // ROUTER receives [identity, payload].
        let msg = router.recv().await?;
        let identity = msg.get(0).expect("identity frame");
        let payload = msg.get(1).expect("payload frame");
        assert_eq!(
            identity.as_ref(),
            b"worker-7",
            "ROUTER should see dealer's configured routing id, got {:?}",
            identity.as_ref()
        );
        assert_eq!(payload.as_ref(), b"hello");
        Ok(())
    }

    /// Fix #2 — `hello_msg` is injected into a newly-connected peer's
    /// inbound queue, as the first message they `recv`. libzmq does this
    /// for both wire transports (post-READY) and inproc (at pipe-creation
    /// time, `libzmq/src/socket_base.cpp:867-896`).
    #[async_rt::test]
    async fn test_inproc_hello_msg_delivered() -> Result<(), Box<dyn Error>> {
        pretty_env_logger::try_init().ok();

        let mut pub_sock = rustzmq2::PubSocket::builder()
            .hello_msg("welcome-aboard")
            .build();
        pub_sock.bind("inproc://test-hello").await?;

        let mut sub = rustzmq2::SubSocket::new();
        sub.subscribe("").await?;
        sub.connect("inproc://test-hello").await?;

        let msg = sub.recv().await?;
        assert_eq!(
            msg.get(0).unwrap().as_ref(),
            b"welcome-aboard",
            "SUB's first recv over inproc should be hello_msg"
        );
        Ok(())
    }

    /// Fix #4 — libzmq's documented inproc semantics
    /// (`doc/zmq_inproc.adoc:42-44`): "the order of `zmq_bind()` and
    /// `zmq_connect()` does not matter". Connect-before-bind must hold the
    /// connection pending, then complete the handshake when the matching
    /// bind arrives.
    #[async_rt::test]
    async fn test_inproc_connect_before_bind() -> Result<(), Box<dyn Error>> {
        pretty_env_logger::try_init().ok();

        // Connector side starts *before* anything is bound.
        let client_task = async_rt::task::spawn(async move {
            let mut req = rustzmq2::ReqSocket::new();
            // Will park on the handshake until the bind lands.
            req.connect("inproc://test-pending").await.unwrap();
            req.send("ping").await.unwrap();
            let reply = req.recv().await.unwrap();
            assert_eq!(reply.get(0).unwrap().as_ref(), b"pong");
        });

        // Give the connect a head start so it's actually pending when we bind.
        async_rt::task::sleep(std::time::Duration::from_millis(50)).await;

        let mut rep = rustzmq2::RepSocket::new();
        rep.bind("inproc://test-pending").await?;

        let msg = rep.recv().await?;
        assert_eq!(msg.get(0).unwrap().as_ref(), b"ping");
        rep.send("pong").await?;

        client_task.await.ok();
        Ok(())
    }

    /// Fix #5' — setting options that inproc ignores (CURVE, PLAIN, ZAP,
    /// TCP knobs, etc.) and then binding/connecting an inproc endpoint
    /// emits a `SocketEvent::OptionIgnoredOnTransport` monitor event.
    /// libzmq silently accepts the option; we surface it so users don't
    /// falsely assume CURVE is protecting an inproc channel.
    #[cfg(feature = "curve")]
    #[async_rt::test]
    async fn test_inproc_ignored_options_emit_monitor_event() -> Result<(), Box<dyn Error>> {
        use futures::StreamExt;
        pretty_env_logger::try_init().ok();

        // A bogus keypair is fine — we never actually run CURVE.
        let pk = [0u8; 32];
        let sk = [1u8; 32];
        let mut rep = rustzmq2::RepSocket::builder().curve_server(pk, sk).build();
        let mut events = rep.monitor();
        rep.bind("inproc://test-ignored-opts").await?;

        // Drain events with a short per-event timeout; stop when no more
        // events arrive within the window.
        let per_event = std::time::Duration::from_millis(50);
        let mut collected: Vec<&'static str> = Vec::new();
        loop {
            match async_rt::task::timeout(per_event, events.next()).await {
                Ok(Some(rustzmq2::SocketEvent::OptionIgnoredOnTransport { option, .. })) => {
                    collected.push(option);
                }
                Ok(Some(_)) => {}
                // stream closed (Ok(None)) or idle window elapsed (Err(_))
                Ok(None) | Err(_) => break,
            }
        }

        assert!(
            collected.contains(&"curve_*"),
            "expected curve_* in ignored events, got {:?}",
            collected
        );
        assert!(
            collected.contains(&"mechanism"),
            "expected mechanism in ignored events, got {:?}",
            collected
        );
        Ok(())
    }
}