mod compliance;
#[cfg(test)]
mod test {
use bytes::Bytes;
use rustzmq2::__async_rt as async_rt;
use rustzmq2::prelude::*;
use rustzmq2::ZmqMessage;
fn our_client_opts(user: &str, pass: &str) -> rustzmq2::DealerSocket {
rustzmq2::DealerSocket::builder()
.plain_client(
Bytes::copy_from_slice(user.as_bytes()),
Bytes::copy_from_slice(pass.as_bytes()),
)
.build()
}
fn our_server_opts() -> rustzmq2::RepSocket {
rustzmq2::RepSocket::builder().plain_server().build()
}
fn spawn_zap_handler(ctx: &zmq2::Context) -> std::thread::JoinHandle<()> {
let zap = ctx.socket(zmq2::REP).expect("zap socket");
zap.bind("inproc://zeromq.zap.01").expect("zap bind");
std::thread::spawn(move || {
loop {
let version = match zap.recv_bytes(0) {
Ok(v) => v,
Err(_) => return,
};
let req_id = zap.recv_bytes(0).unwrap_or_default();
let _domain = zap.recv_bytes(0).unwrap_or_default();
let _addr = zap.recv_bytes(0).unwrap_or_default();
let _identity = zap.recv_bytes(0).unwrap_or_default();
let _mechanism = zap.recv_bytes(0).unwrap_or_default();
while zap.get_rcvmore().unwrap_or(false) {
let _ = zap.recv_bytes(0);
}
let _ = zap.send(version.as_slice(), zmq2::SNDMORE);
let _ = zap.send(req_id.as_slice(), zmq2::SNDMORE);
let _ = zap.send(b"200".as_ref(), zmq2::SNDMORE);
let _ = zap.send(b"OK".as_ref(), zmq2::SNDMORE);
let _ = zap.send(b"user".as_ref(), zmq2::SNDMORE);
let _ = zap.send(Vec::<u8>::new().as_slice(), 0);
}
})
}
#[async_rt::test]
async fn plain_our_client_their_server() {
let ctx = zmq2::Context::new();
let _zap_thread = spawn_zap_handler(&ctx);
let their_rep = ctx.socket(zmq2::REP).expect("zmq2 REP");
their_rep.set_plain_server(true).expect("set_plain_server");
their_rep.bind("tcp://127.0.0.1:*").expect("bind");
let endpoint = their_rep.get_last_endpoint().unwrap().unwrap();
let mut our_dealer = our_client_opts("alice", "secret");
our_dealer.connect(&endpoint).await.expect("connect");
let server_thread = std::thread::spawn(move || {
let msg = their_rep.recv_msg(0).expect("recv");
assert_eq!(msg.as_str().unwrap(), "ping");
their_rep.send("pong", 0).expect("send");
});
async_rt::task::sleep(std::time::Duration::from_millis(100)).await;
let mut msg = ZmqMessage::from(Bytes::from_static(b""));
msg.push_back(Bytes::from_static(b"ping"));
our_dealer.send(msg).await.expect("send");
let got = async_rt::task::timeout(std::time::Duration::from_secs(2), our_dealer.recv())
.await
.expect("timeout")
.expect("recv");
let payload = got.get(1).expect("no payload frame");
assert_eq!(payload.as_ref(), b"pong");
server_thread.join().expect("server thread");
}
#[async_rt::test]
async fn plain_their_client_our_server() {
let mut our_rep = our_server_opts();
let endpoint = our_rep.bind("tcp://127.0.0.1:0").await.expect("bind");
let ctx = zmq2::Context::new();
let their_dealer = ctx.socket(zmq2::DEALER).expect("zmq2 DEALER");
their_dealer
.set_plain_username(Some("bob"))
.expect("set username");
their_dealer
.set_plain_password(Some("pw"))
.expect("set password");
their_dealer
.connect(endpoint.to_string().as_str())
.expect("connect");
let client_thread = std::thread::spawn(move || {
their_dealer.send("", zmq2::SNDMORE).expect("send envelope");
their_dealer.send("ping", 0).expect("send payload");
let _env = their_dealer.recv_msg(0).expect("recv envelope");
let reply = their_dealer.recv_msg(0).expect("recv reply");
assert_eq!(reply.as_str().unwrap(), "pong");
});
async_rt::task::sleep(std::time::Duration::from_millis(100)).await;
let got = async_rt::task::timeout(std::time::Duration::from_secs(2), our_rep.recv())
.await
.expect("timeout")
.expect("recv");
let payload = got.get(0).expect("no frame");
assert_eq!(payload.as_ref(), b"ping");
our_rep
.send(ZmqMessage::from(Bytes::from_static(b"pong")))
.await
.expect("send");
client_thread.join().expect("client thread");
}
}