use bytes::Bytes;
use monocoque_core::options::SocketOptions;
use monocoque_zmtp::rep::RepSocket;
use monocoque_zmtp::req::ReqSocket;
use std::io;
#[compio::test]
async fn test_req_strict_send_send_fails() -> io::Result<()> {
let listener = compio::net::TcpListener::bind("127.0.0.1:0").await?;
let server_addr = listener.local_addr()?;
let server_task = compio::runtime::spawn(async move {
let (stream, _) = listener.accept().await?;
let mut rep_socket = RepSocket::new(stream).await?;
let _req = rep_socket.recv().await?;
rep_socket.send(vec![Bytes::from("reply1")]).await?;
Ok::<(), io::Error>(())
});
compio::time::sleep(std::time::Duration::from_millis(50)).await;
let stream = compio::net::TcpStream::connect(server_addr).await?;
let options = SocketOptions {
req_relaxed: false,
..Default::default()
};
let mut req_socket = ReqSocket::with_options(stream, options).await?;
req_socket.send(vec![Bytes::from("request1")]).await?;
let result = req_socket.send(vec![Bytes::from("request2")]).await;
assert!(
result.is_err(),
"Expected error when sending twice without recv in strict mode"
);
let err = result.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
assert!(err.to_string().contains("await reply") || err.to_string().contains("recv"));
let _reply = req_socket.recv().await?;
server_task.await?;
Ok(())
}
#[compio::test]
async fn test_req_strict_recv_recv_fails() -> io::Result<()> {
let listener = compio::net::TcpListener::bind("127.0.0.1:0").await?;
let server_addr = listener.local_addr()?;
let server_task = compio::runtime::spawn(async move {
let (stream, _) = listener.accept().await?;
let mut rep_socket = RepSocket::new(stream).await?;
let _req = rep_socket.recv().await?;
rep_socket.send(vec![Bytes::from("reply1")]).await?;
Ok::<(), io::Error>(())
});
compio::time::sleep(std::time::Duration::from_millis(50)).await;
let stream = compio::net::TcpStream::connect(server_addr).await?;
let mut req_socket = ReqSocket::new(stream).await?;
req_socket.send(vec![Bytes::from("request1")]).await?;
let _reply = req_socket.recv().await?;
let result = req_socket.recv().await;
assert!(
result.is_err(),
"Expected error when receiving twice without send in strict mode"
);
let err = result.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
assert!(err.to_string().contains("Idle") || err.to_string().contains("send"));
server_task.await?;
Ok(())
}
#[compio::test]
async fn test_req_relaxed_send_send_succeeds() -> io::Result<()> {
let listener = compio::net::TcpListener::bind("127.0.0.1:0").await?;
let server_addr = listener.local_addr()?;
let server_task = compio::runtime::spawn(async move {
let (stream, _) = listener.accept().await?;
let mut rep_socket = RepSocket::new(stream).await?;
let _req1 = rep_socket.recv().await?;
rep_socket.send(vec![Bytes::from("reply1")]).await?;
let _req2 = rep_socket.recv().await?;
rep_socket.send(vec![Bytes::from("reply2")]).await?;
Ok::<(), io::Error>(())
});
compio::time::sleep(std::time::Duration::from_millis(50)).await;
let stream = compio::net::TcpStream::connect(server_addr).await?;
let options = SocketOptions {
req_relaxed: true,
..Default::default()
};
let mut req_socket = ReqSocket::with_options(stream, options).await?;
req_socket.send(vec![Bytes::from("request1")]).await?;
let reply1 = req_socket.recv().await?;
assert!(reply1.is_some());
req_socket.send(vec![Bytes::from("request2")]).await?;
let reply2 = req_socket.recv().await?;
assert!(reply2.is_some());
server_task.await?;
Ok(())
}
#[compio::test]
async fn test_req_strict_normal_flow() -> io::Result<()> {
let listener = compio::net::TcpListener::bind("127.0.0.1:0").await?;
let server_addr = listener.local_addr()?;
let server_task = compio::runtime::spawn(async move {
let (stream, _) = listener.accept().await?;
let mut rep_socket = RepSocket::new(stream).await?;
for i in 0..3 {
let req = rep_socket.recv().await?.expect("Should receive request");
assert_eq!(req[0], Bytes::from(format!("request{}", i)));
rep_socket
.send(vec![Bytes::from(format!("reply{}", i))])
.await?;
}
Ok::<(), io::Error>(())
});
compio::time::sleep(std::time::Duration::from_millis(50)).await;
let stream = compio::net::TcpStream::connect(server_addr).await?;
let mut req_socket = ReqSocket::new(stream).await?;
for i in 0..3 {
req_socket
.send(vec![Bytes::from(format!("request{}", i))])
.await?;
let reply = req_socket.recv().await?.expect("Should receive reply");
assert_eq!(reply[0], Bytes::from(format!("reply{}", i)));
}
server_task.await?;
Ok(())
}
#[compio::test]
async fn test_req_correlation_mode() -> io::Result<()> {
let listener = compio::net::TcpListener::bind("127.0.0.1:0").await?;
let server_addr = listener.local_addr()?;
let server_task = compio::runtime::spawn(async move {
let (stream, _) = listener.accept().await?;
let mut rep_socket = RepSocket::new(stream).await?;
let req = rep_socket.recv().await?.expect("Should receive");
rep_socket.send(req).await?;
Ok::<(), io::Error>(())
});
compio::time::sleep(std::time::Duration::from_millis(50)).await;
let stream = compio::net::TcpStream::connect(server_addr).await?;
let options = SocketOptions {
req_correlate: true,
..Default::default()
};
let mut req_socket = ReqSocket::with_options(stream, options).await?;
req_socket.send(vec![Bytes::from("payload")]).await?;
let reply = req_socket.recv().await?.expect("Should receive");
assert_eq!(reply.len(), 1);
assert_eq!(reply[0], Bytes::from("payload"));
server_task.await?;
Ok(())
}