use std::thread::{spawn, JoinHandle};
use tmq::{reply, Result};
use utils::generate_tcp_address;
use zmq::Context;
mod utils;
#[tokio::test]
async fn single_message() -> Result<()> {
let address = generate_tcp_address();
let ctx = Context::new();
let recv_sock = reply(&ctx).bind(&address)?;
let part2 = "single_message";
let echo = sync_requester(address, 1, part2);
let (multipart, send_sock) = recv_sock.recv().await?;
assert_eq!(multipart.len(), 2);
assert_eq!(multipart[1].as_str().unwrap(), part2);
send_sock.send(multipart).await?;
echo.join().unwrap();
Ok(())
}
#[tokio::test]
async fn hammer_reply() -> Result<()> {
let address = generate_tcp_address();
let ctx = Context::new();
let mut recv_sock = reply(&ctx).bind(&address)?;
let count = 1_000;
let part2 = "single_message";
let echo = sync_requester(address, count, part2);
for _ in 0..count {
let (multipart, send_sock) = recv_sock.recv().await?;
assert_eq!(multipart.len(), 2);
assert_eq!(multipart[1].as_str().unwrap(), part2);
recv_sock = send_sock.send(multipart).await?;
}
echo.join().unwrap();
Ok(())
}
pub fn sync_requester(address: String, count: u64, part2: &'static str) -> JoinHandle<()> {
spawn(move || {
let socket = Context::new().socket(zmq::SocketType::REQ).unwrap();
socket.connect(&address).unwrap();
for i in 0..count {
let msg = format!("Req#{}", i);
socket
.send_multipart(&[msg.as_bytes(), part2.as_bytes()], 0)
.unwrap();
let received = socket.recv_multipart(0).unwrap();
assert_eq!(&received[0], &msg.as_bytes());
}
})
}