use zmq::{Context, SocketType};
use futures::SinkExt;
use std::{thread::spawn, time::Duration};
use tmq::{push, Result, SocketExt};
use tokio::time::timeout;
use utils::{
generate_tcp_address, msg, send_multipart_repeated, send_multiparts,
sync_receive_multipart_repeated, sync_receive_multiparts,
};
mod utils;
#[tokio::test]
async fn send_single_message() -> Result<()> {
let address = generate_tcp_address();
let ctx = Context::new();
let sock = push(&ctx).connect(&address)?;
let thread = sync_receive_multiparts(address, SocketType::PULL, vec![vec!["hello", "world"]]);
send_multiparts(sock, vec![vec!["hello", "world"]]).await?;
thread.join().unwrap();
Ok(())
}
#[tokio::test]
async fn send_multiple_messages() -> Result<()> {
let address = generate_tcp_address();
let ctx = Context::new();
let sock = push(&ctx).connect(&address)?;
let data = vec![
vec!["hello", "world"],
vec!["second", "message"],
vec!["third", "message"],
];
let thread = sync_receive_multiparts(address, SocketType::PULL, data.clone());
send_multiparts(sock, data).await?;
thread.join().unwrap();
Ok(())
}
#[tokio::test]
async fn send_empty_message() -> Result<()> {
let address = generate_tcp_address();
let ctx = Context::new();
let sock = push(&ctx).connect(&address)?;
let data = vec!["hello", "world"];
let thread = sync_receive_multiparts(address, SocketType::PULL, vec![data.clone()]);
send_multiparts(sock, vec![vec![], vec![], vec![], data]).await?;
thread.join().unwrap();
Ok(())
}
#[tokio::test]
async fn send_hammer() -> Result<()> {
let address = generate_tcp_address();
let ctx = Context::new();
let sock = push(&ctx).connect(&address)?;
let count = 1_000;
let data = vec!["hello", "world"];
let thread = sync_receive_multipart_repeated(address, SocketType::PULL, data.clone(), count);
send_multipart_repeated(sock, data, count).await?;
thread.join().unwrap();
Ok(())
}
#[tokio::test]
async fn send_delayed() -> Result<()> {
let address = generate_tcp_address();
let ctx = Context::new();
let mut sock = push(&ctx).connect(&address)?;
sock.set_sndhwm(1).unwrap();
sock.send(vec![msg(b"hello")]).await?;
assert!(
timeout(Duration::from_millis(200), sock.send(vec!(msg(b"world"))))
.await
.is_err()
);
let thread = spawn(move || {
let ctx = Context::new();
let socket = ctx.socket(SocketType::PULL).unwrap();
socket.bind(&address).unwrap();
for _ in 0..3 {
socket.recv_multipart(0).unwrap();
}
});
sock.send(vec![msg(b"world")]).await?;
thread.join().unwrap();
Ok(())
}