rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
use rustzmq2::__async_rt as async_rt;
use rustzmq2::prelude::*;
use rustzmq2::ZmqMessage;

use std::error::Error;
use std::str;
use std::time::Duration;

#[allow(dead_code)] // Rust #46379
pub async fn run_proxy(
    router_socket: rustzmq2::RouterSocket,
    dealer_socket: rustzmq2::DealerSocket,
    duration: u64,
) {
    let _ = async_rt::task::timeout(
        Duration::from_millis(duration),
        rustzmq2::proxy(router_socket, dealer_socket),
    )
    .await;
}

#[allow(dead_code)]
pub async fn run_rep_server(
    mut rep_socket: rustzmq2::RepSocket,
    num_messages: u32,
) -> Result<(), Box<dyn Error>> {
    for i in 0..num_messages {
        let mess = rep_socket.recv().await?;
        let m = format!(
            "{}, Rep - {}",
            str::from_utf8(mess.get(0).unwrap().as_ref()).unwrap(),
            i
        );
        let repl = ZmqMessage::from(m);
        rep_socket.send(repl).await?;
    }
    if let Err(errs) = rep_socket.close().await {
        panic!("Could not unbind socket: {:?}", errs);
    }
    Ok(())
}

#[allow(dead_code)]
pub async fn run_req_client(
    mut req_socket: rustzmq2::ReqSocket,
    num_messages: u32,
) -> Result<(), Box<dyn Error>> {
    for i in 0..num_messages {
        let ms: String = format!("Req - {}", i);
        let m = ZmqMessage::from(ms);
        req_socket.send(m).await.unwrap();
        let repl = req_socket.recv().await.unwrap();
        assert_eq!(
            format!("Req - {}, Rep - {}", i, i),
            String::from_utf8(repl.get(0).unwrap().to_vec()).unwrap()
        );
    }
    let _ = req_socket.close().await;
    Ok(())
}

#[allow(dead_code)]
pub async fn run_req_client_with_id(
    mut req_socket: rustzmq2::ReqSocket,
    id: u32,
    num_messages: u32,
) -> Result<(), Box<dyn Error>> {
    for i in 0..num_messages {
        let ms: String = format!("Socket - {}, Req - {}", id, i);
        let m = ZmqMessage::from(ms);
        req_socket.send(m).await.unwrap();
        let repl = req_socket.recv().await.unwrap();
        // With many parallel req sockets reply order cannot be quaranteed.
        let expected_ms = format!("Socket - {}, Req - {}, Rep - ", id, i);
        let actual_ms = String::from_utf8(repl.get(0).unwrap().to_vec()).unwrap();
        assert_eq!(expected_ms, actual_ms[..expected_ms.len()]);
    }
    let _ = req_socket.close().await;
    Ok(())
}