#![cfg(not(target_os = "wasi"))]
#![cfg(not(miri))]
use std::{
io::{self, Result},
net::Shutdown,
time::Duration,
};
use futures_lite::{AsyncReadExt, AsyncWriteExt, future};
use nio::{
net::{TcpListener, TcpReader, TcpStream, TcpWriter, UdpSocket},
sleep, spawn_local, test,
};
const LOREM_IPSUM: &[u8] = b"
Lorem ipsum dolor sit amet, consectetur adipiscing elit.
Donec pretium ante erat, vitae sodales mi varius quis.
Etiam vestibulum lorem vel urna tempor, eu fermentum odio aliquam.
Aliquam consequat urna vitae ipsum pulvinar, in blandit purus eleifend.
";
async fn tcp_streams() -> Result<(TcpStream, TcpStream)> {
let mut listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
let server = spawn_local(async move {
let conn = listener.accept().await.unwrap();
conn.connect().await.unwrap()
});
let client = TcpStream::connect(addr).await?;
let server = server.await?;
Ok((server, client))
}
#[test]
async fn tcp_connect() -> Result<()> {
let (server, client) = tcp_streams().await?;
assert_eq!(server.peer_addr()?, client.local_addr()?);
assert_eq!(client.peer_addr()?, server.local_addr()?);
let err = TcpStream::connect(server.local_addr()?).await.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::ConnectionRefused);
Ok(())
}
#[test]
async fn tcp_peek_read() -> Result<()> {
let (mut server, mut client) = tcp_streams().await?;
client.write_all(LOREM_IPSUM).await?;
let mut buf = [0; 1024];
let n = server.peek(&mut buf).await?;
assert_eq!(&buf[..n], LOREM_IPSUM);
let n = server.read(&mut buf).await?;
assert_eq!(&buf[..n], LOREM_IPSUM);
Ok(())
}
#[test]
async fn tcp_reader_hangup() -> Result<()> {
let (server, mut client) = tcp_streams().await?;
let task = spawn_local(async move {
sleep(Duration::from_secs(1)).await;
drop(server);
});
while client.write_all(LOREM_IPSUM).await.is_ok() {}
task.await?;
Ok(())
}
#[test]
async fn tcp_writer_hangup() -> Result<()> {
let (server, mut client) = tcp_streams().await?;
let task = spawn_local(async move {
sleep(Duration::from_secs(1)).await;
drop(server);
});
let mut v = vec![];
client.read_to_end(&mut v).await?;
assert!(v.is_empty());
task.await?;
Ok(())
}
#[test]
async fn udp_send_recv() -> Result<()> {
let mut socket1 = UdpSocket::bind("127.0.0.1:0").await?;
let mut socket2 = UdpSocket::bind("127.0.0.1:0").await?;
socket1.connect(socket2.local_addr()?).await?;
let mut buf = [0u8; 1024];
socket1.send(LOREM_IPSUM).await?;
let n = socket2.peek(&mut buf).await?;
assert_eq!(&buf[..n], LOREM_IPSUM);
let n = socket2.recv(&mut buf).await?;
assert_eq!(&buf[..n], LOREM_IPSUM);
socket2.send_to(LOREM_IPSUM, socket1.local_addr()?).await?;
let n = socket1.peek_from(&mut buf).await?.0;
assert_eq!(&buf[..n], LOREM_IPSUM);
let n = socket1.recv_from(&mut buf).await?.0;
assert_eq!(&buf[..n], LOREM_IPSUM);
Ok(())
}
#[test]
async fn tcp_duplex() -> Result<()> {
let (server, client) = tcp_streams().await?;
async fn do_read(mut s: TcpReader) -> io::Result<()> {
let mut buf = vec![0u8; 4096];
loop {
let len = s.read(&mut buf).await?;
if len == 0 {
return Ok(());
}
}
}
async fn do_write(mut s: TcpWriter) -> io::Result<()> {
let buf = vec![0u8; 4096];
for _ in 0..4096 {
s.write_all(&buf).await?;
}
Ok(())
}
let (reader, writer) = client.split();
let r1 = spawn_local(do_read(reader));
let w1 = spawn_local(do_write(writer));
sleep(Duration::from_millis(5)).await;
let (reader, writer) = server.split();
let r2 = spawn_local(do_read(reader));
w1.await??;
r2.await??;
let w2 = spawn_local(do_write(writer));
r1.await??;
w2.await??;
Ok(())
}
#[test]
async fn shutdown() -> Result<()> {
let (mut server, client) = tcp_streams().await?;
let mut buf = Vec::new();
future::try_zip(server.read_to_end(&mut buf), async {
client.shutdown(Shutdown::Write)
})
.await?;
Ok(())
}
#[test]
async fn tcp_read_write() -> Result<()> {
let (mut server, mut client) = tcp_streams().await?;
client.write_all(LOREM_IPSUM).await?;
client.shutdown(Shutdown::Write)?;
let mut buffer = vec![0; LOREM_IPSUM.len()];
server.read_exact(&mut buffer).await?;
assert_eq!(buffer, LOREM_IPSUM);
Ok(())
}