rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! PLAIN mechanism libzmq interoperability tests.
//!
//! "their" = libzmq via zmq2 crate
//! "our"   = zmq.rs

mod compliance;

#[cfg(test)]
mod test {
    use bytes::Bytes;
    use rustzmq2::__async_rt as async_rt;
    use rustzmq2::prelude::*;
    use rustzmq2::ZmqMessage;

    fn our_client_opts(user: &str, pass: &str) -> rustzmq2::DealerSocket {
        rustzmq2::DealerSocket::builder()
            .plain_client(
                Bytes::copy_from_slice(user.as_bytes()),
                Bytes::copy_from_slice(pass.as_bytes()),
            )
            .build()
    }

    fn our_server_opts() -> rustzmq2::RepSocket {
        rustzmq2::RepSocket::builder().plain_server().build()
    }

    /// Run a libzmq ZAP handler that accepts all PLAIN credentials.
    /// libzmq's PLAIN server requires a ZAP handler; without one it rejects connections.
    fn spawn_zap_handler(ctx: &zmq2::Context) -> std::thread::JoinHandle<()> {
        let zap = ctx.socket(zmq2::REP).expect("zap socket");
        zap.bind("inproc://zeromq.zap.01").expect("zap bind");
        std::thread::spawn(move || {
            loop {
                // ZAP request: version | req_id | domain | addr | identity | mechanism | credentials...
                let version = match zap.recv_bytes(0) {
                    Ok(v) => v,
                    Err(_) => return,
                };
                let req_id = zap.recv_bytes(0).unwrap_or_default();
                let _domain = zap.recv_bytes(0).unwrap_or_default();
                let _addr = zap.recv_bytes(0).unwrap_or_default();
                let _identity = zap.recv_bytes(0).unwrap_or_default();
                let _mechanism = zap.recv_bytes(0).unwrap_or_default();
                // drain remaining credential frames
                while zap.get_rcvmore().unwrap_or(false) {
                    let _ = zap.recv_bytes(0);
                }
                // Reply: version | req_id | status_code | status_text | user_id | metadata
                let _ = zap.send(version.as_slice(), zmq2::SNDMORE);
                let _ = zap.send(req_id.as_slice(), zmq2::SNDMORE);
                let _ = zap.send(b"200".as_ref(), zmq2::SNDMORE);
                let _ = zap.send(b"OK".as_ref(), zmq2::SNDMORE);
                let _ = zap.send(b"user".as_ref(), zmq2::SNDMORE);
                let _ = zap.send(Vec::<u8>::new().as_slice(), 0);
            }
        })
    }

    /// Our PLAIN client connects to their (libzmq) PLAIN server.
    #[async_rt::test]
    async fn plain_our_client_their_server() {
        let ctx = zmq2::Context::new();
        // libzmq PLAIN server requires a ZAP handler to authenticate clients.
        let _zap_thread = spawn_zap_handler(&ctx);

        let their_rep = ctx.socket(zmq2::REP).expect("zmq2 REP");
        their_rep.set_plain_server(true).expect("set_plain_server");
        their_rep.bind("tcp://127.0.0.1:*").expect("bind");
        let endpoint = their_rep.get_last_endpoint().unwrap().unwrap();

        let mut our_dealer = our_client_opts("alice", "secret");
        our_dealer.connect(&endpoint).await.expect("connect");

        let server_thread = std::thread::spawn(move || {
            let msg = their_rep.recv_msg(0).expect("recv");
            assert_eq!(msg.as_str().unwrap(), "ping");
            their_rep.send("pong", 0).expect("send");
        });

        async_rt::task::sleep(std::time::Duration::from_millis(100)).await;

        let mut msg = ZmqMessage::from(Bytes::from_static(b""));
        msg.push_back(Bytes::from_static(b"ping"));
        our_dealer.send(msg).await.expect("send");

        let got = async_rt::task::timeout(std::time::Duration::from_secs(2), our_dealer.recv())
            .await
            .expect("timeout")
            .expect("recv");

        let payload = got.get(1).expect("no payload frame");
        assert_eq!(payload.as_ref(), b"pong");

        server_thread.join().expect("server thread");
    }

    /// Their (libzmq) PLAIN client connects to our PLAIN server.
    #[async_rt::test]
    async fn plain_their_client_our_server() {
        let mut our_rep = our_server_opts();
        let endpoint = our_rep.bind("tcp://127.0.0.1:0").await.expect("bind");

        let ctx = zmq2::Context::new();
        let their_dealer = ctx.socket(zmq2::DEALER).expect("zmq2 DEALER");
        their_dealer
            .set_plain_username(Some("bob"))
            .expect("set username");
        their_dealer
            .set_plain_password(Some("pw"))
            .expect("set password");
        their_dealer
            .connect(endpoint.to_string().as_str())
            .expect("connect");

        let client_thread = std::thread::spawn(move || {
            their_dealer.send("", zmq2::SNDMORE).expect("send envelope");
            their_dealer.send("ping", 0).expect("send payload");
            let _env = their_dealer.recv_msg(0).expect("recv envelope");
            let reply = their_dealer.recv_msg(0).expect("recv reply");
            assert_eq!(reply.as_str().unwrap(), "pong");
        });

        async_rt::task::sleep(std::time::Duration::from_millis(100)).await;

        let got = async_rt::task::timeout(std::time::Duration::from_secs(2), our_rep.recv())
            .await
            .expect("timeout")
            .expect("recv");

        let payload = got.get(0).expect("no frame");
        assert_eq!(payload.as_ref(), b"ping");

        our_rep
            .send(ZmqMessage::from(Bytes::from_static(b"pong")))
            .await
            .expect("send");

        client_thread.join().expect("client thread");
    }
}