pasque 0.2.0

UDP and IP over HTTP/3
Documentation
use std::sync::Arc;
use std::time::Duration;

use tokio::{
    fs,
    net::UdpSocket,
    sync::Notify,
    time::timeout,
};

use pasque::{
    client::PsqClient,
    server::{
        config::Config,
        PsqServer
    },
    stream::{
        filestream::{FileStream, Files},
        udptunnel::{UdpEndpoint, UdpTunnel},
        PsqStream,
    }, test_utils::init_logger, PsqError
};


#[tokio::test]
async fn test_get_request() {
    init_logger();
    let addr = "127.0.0.1:8888";
    let config = Config::create_default();
    let server = tokio::spawn(async move {
        let mut psqserver = PsqServer::start(
            addr,
            &config,
        ).await.unwrap();
        psqserver.add_endpoint(
            "files", 
            Files::new(".")).await;
        loop {
            psqserver.process().await.unwrap();
        }

    });

    tokio::time::sleep(Duration::from_millis(100)).await;

    // Run client
    let mut psqconn = PsqClient::connect(
        format!("https://{}/", addr).as_str(),
        true,
    ).await.unwrap();
    let ret = FileStream::get(
        &mut psqconn,
        "files/Cargo.toml",
        "testout",
    ).await;

    assert!(ret.is_ok());

    let srclen = fs::metadata("Cargo.toml").await.unwrap().len();
    let dstlen = fs::metadata("testout").await.unwrap().len();
    let ret = ret.unwrap();

    assert!(srclen == dstlen && srclen == ret as u64);

    let ret = FileStream::get(
        &mut psqconn,
        "files/nonexisting",
        "testout",
    ).await;
    assert!(ret.is_err());

    let ret = FileStream::get(
        &mut psqconn,
        "nonexisting",
        "testout",
    ).await;
    assert!(ret.is_err());

    std::fs::remove_file("testout").unwrap();

    server.abort();
}


async fn run_server(addr: &str, shutdown: Arc<Notify>) {
    let config = Config::create_default();
    let mut psqserver = PsqServer::start(addr, &config).await.unwrap();
    psqserver.add_endpoint(
        "udp",
        UdpEndpoint::new(),
    ).await;
    loop {
        tokio::select! {
            _ = shutdown.notified() => {
                break;
            }
            result = psqserver.process() => {
                result.unwrap();
            }
        }
    }
}


async fn run_client(mut psqclient: PsqClient, shutdown: Arc<Notify>) {
    loop {
        tokio::select! {
            _ = shutdown.notified() => {
                break;
            }
            result = psqclient.process() => {
                result.unwrap();
            }
        }
    }
}


async fn run_udpserver(udpsocket: UdpSocket, shutdown: Arc<Notify>) {
    loop {
        let mut buf = [0u8; 2000];
        tokio::select! {
            _ = shutdown.notified() => {
                break;
            }
            result = udpsocket.recv_from(&mut buf) => {
                let (n, addr) = result.unwrap();
                udpsocket.send_to(&buf[..n], addr).await.unwrap();
            }
        }
    }
}


#[tokio::test]
async fn test_udp_tunnel() {
    init_logger();
    let addr = "127.0.0.1:7000";
    let server_notify = Arc::new(Notify::new());
    let server = tokio::spawn(run_server(addr, server_notify.clone()));

    tokio::time::sleep(Duration::from_millis(100)).await;

    // Run client
    let mut psqclient = PsqClient::connect(
        format!("https://{}/", addr).as_str(),
        true,
    ).await.unwrap();

    // Test first with GET which should not be supported on UDP tunnel.
    let ret = FileStream::get(
        &mut psqclient,
        "udp",
        "testout",
    ).await;
    assert!(matches!(ret, Err(PsqError::HttpResponse(405, _))));

    let udptunnel = UdpTunnel::connect(
        &mut psqclient,
        "udp",
        "127.0.0.1",
        9002,
        "127.0.0.1:0".parse().unwrap(),
    ).await.unwrap();
    let tunneladdr = udptunnel.sockaddr().unwrap();
    
    let client_notify = Arc::new(Notify::new());
    let client = tokio::spawn(run_client(psqclient, client_notify.clone()));

    // Start UDP server
    let udpsocket = UdpSocket::bind("127.0.0.1:9002").await.unwrap();

    let udpserver_notify = Arc::new(Notify::new());
    let udpserver = tokio::spawn(run_udpserver(udpsocket, udpserver_notify.clone()));

    tokio::time::sleep(Duration::from_millis(100)).await;

    // Send UDP datagram to the client socket
    let result = timeout(Duration::from_millis(1000), async {
        let udpclient = UdpSocket::bind("0.0.0.0:0").await.unwrap();
        let mut buf = [0u8; 2000];
        udpclient.send_to(b"Testing", tunneladdr).await.unwrap();
        let (n, _) = udpclient.recv_from(&mut buf).await.unwrap();
        assert_eq!(&buf[..n], b"Testing");
    }).await;
    assert!(result.is_ok(), "Test timed out");

    udpserver_notify.notify_one();
    udpserver.await.unwrap();

    client_notify.notify_one();
    client.await.unwrap();

    server_notify.notify_one();
    server.await.unwrap();
}


#[tokio::test]
async fn tunnel_closing() {
    init_logger();
    let addr = "127.0.0.1:9003";
    let server = tokio::spawn(async move {
        let config = Config::create_default();
        let mut psqserver = PsqServer::start(addr, &config).await.unwrap();
        psqserver.add_endpoint(
            "udp",
            UdpEndpoint::new(),
        ).await;
        loop {
            psqserver.process().await.unwrap();
        }

    });

    tokio::time::sleep(Duration::from_millis(100)).await;

    // Run client
    let mut psqconn = PsqClient::connect(
        format!("https://{}/", addr).as_str(),
        true,
    ).await.unwrap();

    let _result = timeout(Duration::from_millis(1000), async {
        let udptunnel = UdpTunnel::connect(
            &mut psqconn,
            "udp",
            "127.0.0.1",
            9004,
            "127.0.0.1:0".parse().unwrap(),
        ).await.unwrap();
        let tunneladdr = udptunnel.sockaddr().unwrap();
        let stream_id = udptunnel.stream_id();

        // Send UDP datagram to the client socket
        let client1 = tokio::spawn(async move {
            let udpclient = UdpSocket::bind("0.0.0.0:0").await.unwrap();
            let mut buf = [0u8; 2000];
            udpclient.send_to(b"Testing", tunneladdr).await.unwrap();
            let (n, _) = udpclient.recv_from(&mut buf).await.unwrap();
            assert_eq!(&buf[..n], b"Testing");
        });

        tokio::time::sleep(Duration::from_millis(100)).await;
        psqconn.remove_stream(stream_id).await;
        psqconn.process().await.unwrap();
        psqconn.remove_stream(stream_id).await;
        client1.abort();
    }).await;
    server.abort();
}