use rzmq::{Context, Msg, SocketType, ZmqError};
use std::time::Duration;
mod common;
const SHORT_TIMEOUT: Duration = Duration::from_millis(200);
const LONG_TIMEOUT: Duration = Duration::from_secs(2);
#[tokio::test]
async fn test_req_rep_tcp_basic() -> Result<(), ZmqError> {
let ctx = common::test_context();
{
let req = ctx.socket(SocketType::Req)?;
let rep = ctx.socket(SocketType::Rep)?;
let endpoint = "tcp://127.0.0.1:5560";
rep.bind(endpoint).await?;
tokio::time::sleep(Duration::from_millis(50)).await;
req.connect(endpoint).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let req_msg = Msg::from_static(b"Request 1");
req.send(req_msg).await?;
println!("REQ sent request");
let received_req = common::recv_timeout(&rep, LONG_TIMEOUT).await?;
assert_eq!(received_req.data().unwrap(), b"Request 1");
println!("REP received request");
let rep_msg = Msg::from_static(b"Reply 1");
rep.send(rep_msg).await?;
println!("REP sent reply");
let received_rep = common::recv_timeout(&req, LONG_TIMEOUT).await?;
assert_eq!(received_rep.data().unwrap(), b"Reply 1");
println!("REQ received reply");
req.send(Msg::from_static(b"Request 2")).await?;
println!("REQ sent request 2");
let received_req2 = common::recv_timeout(&rep, LONG_TIMEOUT).await?;
assert_eq!(received_req2.data().unwrap(), b"Request 2");
println!("REP received request 2");
rep.send(Msg::from_static(b"Reply 2")).await?;
println!("REP sent reply 2");
let received_rep2 = common::recv_timeout(&req, LONG_TIMEOUT).await?;
assert_eq!(received_rep2.data().unwrap(), b"Reply 2");
println!("REQ received reply 2");
}
ctx.term().await?;
Ok(())
}
#[tokio::test]
async fn test_req_rep_tcp_connect_before_bind() -> Result<(), ZmqError> {
let ctx = common::test_context();
{
let req = ctx.socket(SocketType::Req)?;
let rep = ctx.socket(SocketType::Rep)?;
let endpoint = "tcp://127.0.0.1:5561";
req.connect(endpoint).await?;
tokio::time::sleep(Duration::from_millis(50)).await;
rep.bind(endpoint).await?;
tokio::time::sleep(Duration::from_millis(200)).await;
let req_msg = Msg::from_static(b"Hello Late Bind");
req.send(req_msg).await?;
println!("REQ sent");
let received_req = common::recv_timeout(&rep, LONG_TIMEOUT).await?;
assert_eq!(received_req.data().unwrap(), b"Hello Late Bind");
println!("REP received");
rep.send(Msg::from_static(b"Hi Back")).await?;
println!("REP sent");
let received_rep = common::recv_timeout(&req, LONG_TIMEOUT).await?;
assert_eq!(received_rep.data().unwrap(), b"Hi Back");
println!("REQ received");
}
ctx.term().await?;
Ok(())
}
#[tokio::test]
#[cfg(feature = "ipc")]
async fn test_req_rep_ipc_basic() -> Result<(), ZmqError> {
let ctx = common::test_context();
{
let req = ctx.socket(SocketType::Req)?;
let rep = ctx.socket(SocketType::Rep)?;
let endpoint = common::unique_ipc_endpoint();
println!("Using IPC endpoint: {}", endpoint);
rep.bind(&endpoint).await?;
tokio::time::sleep(Duration::from_millis(50)).await;
req.connect(&endpoint).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
req.send(Msg::from_static(b"IPC Req")).await?;
let received_req = common::recv_timeout(&rep, LONG_TIMEOUT).await?;
assert_eq!(received_req.data().unwrap(), b"IPC Req");
rep.send(Msg::from_static(b"IPC Rep")).await?;
let received_rep = common::recv_timeout(&req, LONG_TIMEOUT).await?;
assert_eq!(received_rep.data().unwrap(), b"IPC Rep");
}
ctx.term().await?;
Ok(())
}
#[tokio::test]
#[cfg(feature = "inproc")]
async fn test_req_rep_inproc_basic() -> Result<(), ZmqError> {
let ctx = common::test_context();
{
let req = ctx.socket(SocketType::Req)?;
let rep = ctx.socket(SocketType::Rep)?;
let endpoint = common::unique_inproc_endpoint();
rep.bind(&endpoint).await?;
req.connect(&endpoint).await?;
tokio::time::sleep(Duration::from_millis(20)).await;
req.send(Msg::from_static(b"Inproc Req")).await?;
let received_req = common::recv_timeout(&rep, LONG_TIMEOUT).await?;
assert_eq!(received_req.data().unwrap(), b"Inproc Req");
rep.send(Msg::from_static(b"Inproc Rep")).await?;
let received_rep = common::recv_timeout(&req, LONG_TIMEOUT).await?;
assert_eq!(received_rep.data().unwrap(), b"Inproc Rep");
}
ctx.term().await?;
Ok(())
}