use rustzmq2::__async_rt as async_rt;
use rustzmq2::prelude::*;
use rustzmq2::ZmqMessage;
use std::error::Error;
use std::str;
use std::time::Duration;
#[allow(dead_code)] pub async fn run_proxy(
router_socket: rustzmq2::RouterSocket,
dealer_socket: rustzmq2::DealerSocket,
duration: u64,
) {
let _ = async_rt::task::timeout(
Duration::from_millis(duration),
rustzmq2::proxy(router_socket, dealer_socket),
)
.await;
}
#[allow(dead_code)]
pub async fn run_rep_server(
mut rep_socket: rustzmq2::RepSocket,
num_messages: u32,
) -> Result<(), Box<dyn Error>> {
for i in 0..num_messages {
let mess = rep_socket.recv().await?;
let m = format!(
"{}, Rep - {}",
str::from_utf8(mess.get(0).unwrap().as_ref()).unwrap(),
i
);
let repl = ZmqMessage::from(m);
rep_socket.send(repl).await?;
}
if let Err(errs) = rep_socket.close().await {
panic!("Could not unbind socket: {:?}", errs);
}
Ok(())
}
#[allow(dead_code)]
pub async fn run_req_client(
mut req_socket: rustzmq2::ReqSocket,
num_messages: u32,
) -> Result<(), Box<dyn Error>> {
for i in 0..num_messages {
let ms: String = format!("Req - {}", i);
let m = ZmqMessage::from(ms);
req_socket.send(m).await.unwrap();
let repl = req_socket.recv().await.unwrap();
assert_eq!(
format!("Req - {}, Rep - {}", i, i),
String::from_utf8(repl.get(0).unwrap().to_vec()).unwrap()
);
}
let _ = req_socket.close().await;
Ok(())
}
#[allow(dead_code)]
pub async fn run_req_client_with_id(
mut req_socket: rustzmq2::ReqSocket,
id: u32,
num_messages: u32,
) -> Result<(), Box<dyn Error>> {
for i in 0..num_messages {
let ms: String = format!("Socket - {}, Req - {}", id, i);
let m = ZmqMessage::from(ms);
req_socket.send(m).await.unwrap();
let repl = req_socket.recv().await.unwrap();
let expected_ms = format!("Socket - {}, Req - {}, Rep - ", id, i);
let actual_ms = String::from_utf8(repl.get(0).unwrap().to_vec()).unwrap();
assert_eq!(expected_ms, actual_ms[..expected_ms.len()]);
}
let _ = req_socket.close().await;
Ok(())
}