use zmq::{Context, SocketType};
use futures::StreamExt;
use std::{
sync::{Arc, Barrier},
thread::spawn,
};
use tmq::{pull, Result, SocketExt};
use utils::{check_receive_multiparts, generate_tcp_address, hammer_receive, sync_send_multiparts};
mod utils;
#[tokio::test]
async fn receive_single_message() -> Result<()> {
let address = generate_tcp_address();
let ctx = Context::new();
let sock = pull(&ctx).bind(&address)?;
let thread = sync_send_multiparts(address, SocketType::PUSH, vec![vec!["hello", "world"]]);
check_receive_multiparts(sock, vec![vec!["hello", "world"]]).await?;
thread.join().unwrap();
Ok(())
}
#[tokio::test]
async fn receive_multiple_messages() -> Result<()> {
let address = generate_tcp_address();
let ctx = Context::new();
let sock = pull(&ctx).bind(&address)?;
let thread = sync_send_multiparts(
address,
SocketType::PUSH,
vec![vec!["hello", "world"], vec!["second", "message"]],
);
check_receive_multiparts(
sock,
vec![vec!["hello", "world"], vec!["second", "message"]],
)
.await?;
thread.join().unwrap();
Ok(())
}
#[tokio::test]
async fn receive_hammer() -> Result<()> {
let address = generate_tcp_address();
let ctx = Context::new();
let sock = pull(&ctx).bind(&address)?;
hammer_receive(sock, address, SocketType::PUSH).await
}
#[tokio::test]
async fn receive_buffered_hammer() -> Result<()> {
let address = generate_tcp_address();
let ctx = Context::new();
let sock = pull(&ctx).bind(&address)?;
hammer_receive(sock.buffered(1024), address, SocketType::PUSH).await
}
#[tokio::test]
async fn receive_delayed() -> Result<()> {
let address = generate_tcp_address();
let address_recv = address.clone();
let barrier = Arc::new(Barrier::new(2));
let barrier_send = barrier.clone();
let thread = spawn(move || {
let ctx = Context::new();
let socket = ctx.socket(SocketType::PUSH).unwrap();
socket.connect(&address).unwrap();
for _ in 0..3 {
socket.send_multipart(vec!["hello", "world"], 0).unwrap();
}
barrier_send.wait();
});
barrier.wait();
let ctx = Context::new();
let mut sock = pull(&ctx).bind(&address_recv)?;
sock.set_rcvhwm(1)?;
for _ in 0..3 {
sock.next().await.unwrap()?;
}
thread.join().unwrap();
Ok(())
}