compio-net 0.12.1

Networking IO for compio
use std::net::Ipv6Addr;

use compio_io::{
    AsyncReadManaged, AsyncReadMulti, AsyncWriteExt,
    ancillary::{AncillaryBuf, AsyncReadAncillaryManaged, ReturnFlags},
};
use compio_net::{TcpListener, TcpStream, UdpSocket, UnixListener, UnixStream};
use futures_util::{StreamExt, TryStreamExt};

#[compio_macros::test]
async fn test_tcp_read_buffer_pool() {
    let listener = TcpListener::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
    let addr = listener.local_addr().unwrap();

    compio_runtime::spawn(async move {
        let mut stream = listener.accept().await.unwrap().0;
        stream.write_all(b"test").await.unwrap();
    })
    .detach();

    let mut stream = TcpStream::connect(addr).await.unwrap();

    assert_eq!(
        stream.read_managed(0).await.unwrap().unwrap().as_ref(),
        b"test"
    );
    let res = stream.read_managed(0).await;
    assert!(matches!(res, Ok(None)));
}

#[cfg_attr(windows, ignore)]
#[compio_macros::test]
async fn test_tcp_read_managed_with_ancillary() {
    let listener = TcpListener::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
    let addr = listener.local_addr().unwrap();

    compio_runtime::spawn(async move {
        let mut stream = listener.accept().await.unwrap().0;
        stream.write_all(b"test").await.unwrap();
    })
    .detach();

    let mut stream = TcpStream::connect(addr).await.unwrap();

    let (buffer, _control, flags) = stream
        .read_managed_with_ancillary(0, AncillaryBuf::<64>::new())
        .await
        .unwrap()
        .unwrap();
    assert_eq!(buffer.as_ref(), b"test");
    assert_eq!(flags, ReturnFlags::empty());
}

#[compio_macros::test]
async fn test_udp_read_buffer_pool() {
    let listener = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
    let addr = listener.local_addr().unwrap();
    let connected = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
    connected.connect(addr).await.unwrap();
    let addr = connected.local_addr().unwrap();

    compio_runtime::spawn(async move {
        listener.send_to(b"test", addr).await.unwrap();
    })
    .detach();

    assert_eq!(
        connected.recv_managed(0).await.unwrap().unwrap().as_ref(),
        b"test"
    );
}

#[compio_macros::test]
async fn test_udp_recv_from_buffer_pool() {
    let listener = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
    let listener_addr = listener.local_addr().unwrap();
    let connected = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
    let connected_addr = connected.local_addr().unwrap();

    compio_runtime::spawn(async move {
        connected.send_to(b"test", listener_addr).await.unwrap();
    })
    .detach();

    let (buffer, addr) = listener.recv_from_managed(0).await.unwrap().unwrap();
    assert_eq!(buffer.as_ref(), b"test");
    assert_eq!(addr, connected_addr);
}

#[compio_macros::test]
async fn test_udp_recv_msg_buffer_pool() {
    let listener = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
    let listener_addr = listener.local_addr().unwrap();
    let connected = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
    let connected_addr = connected.local_addr().unwrap();

    compio_runtime::spawn(async move {
        connected.send_to(b"test", listener_addr).await.unwrap();
    })
    .detach();

    let (buffer, _, addr, flags) = listener
        .recv_msg_managed(0, Vec::with_capacity(64))
        .await
        .unwrap()
        .unwrap();
    assert_eq!(buffer.as_ref(), b"test");
    assert_eq!(addr, connected_addr);
    assert_eq!(flags, ReturnFlags::empty());
}

#[compio_macros::test]
async fn test_uds_recv_buffer_pool() {
    let dir = tempfile::Builder::new()
        .prefix("compio-uds-buffer-pool-tests")
        .tempdir()
        .unwrap();
    let sock_path = dir.path().join("connect.sock");

    let listener = UnixListener::bind(&sock_path).await.unwrap();

    compio_runtime::spawn(async move {
        let mut stream = listener.accept().await.unwrap().0;
        stream.write_all(b"test").await.unwrap();
    })
    .detach();

    let mut stream = UnixStream::connect(&sock_path).await.unwrap();

    assert_eq!(
        stream.read_managed(0).await.unwrap().unwrap().as_ref(),
        b"test"
    );
    assert!(matches!(stream.read_managed(0).await, Ok(None)));
}

#[cfg_attr(windows, ignore)]
#[compio_macros::test]
async fn test_uds_read_managed_with_ancillary() {
    let dir = tempfile::Builder::new()
        .prefix("compio-uds-anc-buf")
        .tempdir()
        .unwrap();
    let sock_path = dir.path().join("connect.sock");

    let listener = UnixListener::bind(&sock_path).await.unwrap();

    compio_runtime::spawn(async move {
        let mut stream = listener.accept().await.unwrap().0;
        stream.write_all(b"test").await.unwrap();
    })
    .detach();

    let mut stream = UnixStream::connect(&sock_path).await.unwrap();

    let (buffer, _control, flags) = stream
        .read_managed_with_ancillary(0, AncillaryBuf::<64>::new())
        .await
        .unwrap()
        .unwrap();
    assert_eq!(buffer.as_ref(), b"test");
    assert_eq!(flags, ReturnFlags::empty());
}

#[compio_macros::test]
async fn test_tcp_recv_multi() {
    let listener = TcpListener::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
    let addr = listener.local_addr().unwrap();

    compio_runtime::spawn(async move {
        let mut stream = listener.accept().await.unwrap().0;
        stream.write_all(b"test").await.unwrap();
    })
    .detach();

    let mut stream = TcpStream::connect(addr).await.unwrap();

    let buffer = stream.read_multi(0).try_collect::<Vec<_>>().await.unwrap();
    assert_eq!(buffer.len(), 1);
    assert_eq!(&*buffer[0], b"test");
}

#[compio_macros::test]
async fn test_udp_recv_multi() {
    let listener = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
    let addr = listener.local_addr().unwrap();
    let connected = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
    connected.connect(addr).await.unwrap();
    let addr = connected.local_addr().unwrap();

    compio_runtime::spawn(async move {
        listener.send_to(b"test", addr).await.unwrap();
    })
    .detach();

    let buffer = connected.recv_multi(0).next().await.unwrap().unwrap();
    assert_eq!(&*buffer, b"test");
}

#[compio_macros::test]
async fn test_udp_recv_from_multi() {
    let listener = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
    let server_addr = listener.local_addr().unwrap();
    let connected = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
    let addr = connected.local_addr().unwrap();

    compio_runtime::spawn(async move {
        listener.send_to(b"test", addr).await.unwrap();
    })
    .detach();

    let result = connected.recv_from_multi().next().await.unwrap().unwrap();
    assert_eq!(result.data(), b"test");
    assert_eq!(result.addr().and_then(|a| a.as_socket()), Some(server_addr));
}

#[compio_macros::test]
async fn test_udp_recv_msg_multi() {
    let listener = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
    let server_addr = listener.local_addr().unwrap();
    let connected = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
    let addr = connected.local_addr().unwrap();

    compio_runtime::spawn(async move {
        listener.send_to(b"test", addr).await.unwrap();
    })
    .detach();

    let result = connected.recv_msg_multi(64).next().await.unwrap().unwrap();
    assert_eq!(result.data(), b"test");
    assert_eq!(result.addr().and_then(|a| a.as_socket()), Some(server_addr));
    assert_eq!(result.flags(), ReturnFlags::empty());
}

#[compio_macros::test(with_proactor(buffer_pool_buffer_len = 256))]
async fn test_udp_recv_msg_multi_truncated_datagram() {
    let listener = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
    let server_addr = listener.local_addr().unwrap();
    let connected = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
    let addr = connected.local_addr().unwrap();

    compio_runtime::spawn(async move {
        listener.send_to(vec![0; 1024], addr).await.unwrap();
    })
    .detach();

    let result = connected.recv_msg_multi(64).next().await.unwrap().unwrap();
    assert_eq!(result.addr().and_then(|a| a.as_socket()), Some(server_addr));
    assert!(result.flags().contains(ReturnFlags::TRUNC));
}

#[compio_macros::test]
async fn test_uds_recv_multi() {
    let dir = tempfile::Builder::new()
        .prefix("compio-uds-recv-multi-tests")
        .tempdir()
        .unwrap();
    let sock_path = dir.path().join("connect.sock");

    let listener = UnixListener::bind(&sock_path).await.unwrap();

    compio_runtime::spawn(async move {
        let mut stream = listener.accept().await.unwrap().0;
        stream.write_all(b"test").await.unwrap();
    })
    .detach();

    let mut stream = UnixStream::connect(&sock_path).await.unwrap();

    let buffer = stream.read_multi(0).try_collect::<Vec<_>>().await.unwrap();
    assert_eq!(buffer.len(), 1);
    assert_eq!(&*buffer[0], b"test");
}