rs-netty 0.2.3

A Tokio-native typed TCP/UDP pipeline framework inspired by Netty.
Documentation
use std::sync::{
    atomic::{AtomicUsize, Ordering},
    Arc, Mutex,
};
use std::time::Duration;

use rs_netty::{
    codec::{LineCodec, Utf8DatagramCodec},
    datagram_pipeline, pipeline, CloseReason, ConnInfo, ConnectionStats, Context, DatagramContext,
    DatagramHandler, Error, Life, Result, TcpServer, UdpServer,
};
use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::{TcpStream, UdpSocket},
};

#[tokio::test]
async fn tcp_server_shutdown_stops_server() -> Result<()> {
    let life = CountLife::default();
    let server = TcpServer::bind("127.0.0.1:0")
        .pipeline(|| pipeline().codec(LineCodec::new()).handler(Echo))
        .life(life.clone())
        .start()
        .await?;

    assert_ne!(server.local_addr().port(), 0);
    assert_eq!(life.started.load(Ordering::SeqCst), 1);

    server.shutdown();
    server.wait().await?;

    assert_eq!(life.stopped.load(Ordering::SeqCst), 1);
    Ok(())
}

#[tokio::test]
async fn udp_server_shutdown_stops_socket() -> Result<()> {
    let life = CountLife::default();
    let server = UdpServer::bind("127.0.0.1:0")
        .pipeline(|| {
            datagram_pipeline()
                .codec(Utf8DatagramCodec)
                .handler(UdpEcho)
        })
        .life(life.clone())
        .start()
        .await?;

    assert_ne!(server.local_addr().port(), 0);
    tokio::task::yield_now().await;
    assert_eq!(life.started.load(Ordering::SeqCst), 1);

    server.shutdown();
    server.wait().await?;

    assert_eq!(life.stopped.load(Ordering::SeqCst), 1);
    Ok(())
}

#[tokio::test]
async fn tcp_idle_timeout_closes_idle_connection() -> Result<()> {
    let life = ReasonLife::default();
    let server = TcpServer::bind("127.0.0.1:0")
        .pipeline(|| pipeline().codec(LineCodec::new()).handler(Echo))
        .idle_timeout(Duration::from_millis(20))
        .life(life.clone())
        .start()
        .await?;

    let _stream = TcpStream::connect(server.local_addr()).await?;
    tokio::time::sleep(Duration::from_millis(80)).await;

    server.shutdown();
    server.wait().await?;

    assert!(life.contains(CloseReason::IdleTimeout));
    Ok(())
}

#[tokio::test]
async fn tcp_connection_stats_are_opt_in() -> Result<()> {
    let seen = Arc::new(Mutex::new(None));
    let seen_stats = seen.clone();
    let server = TcpServer::bind("127.0.0.1:0")
        .pipeline(move || {
            pipeline().codec(LineCodec::new()).handler(StatsEcho {
                seen: seen_stats.clone(),
            })
        })
        .track_connection_stats()
        .start()
        .await?;

    let mut stream = TcpStream::connect(server.local_addr()).await?;
    stream.write_all(b"hello\n").await?;

    let mut response = vec![0; 6];
    stream.read_exact(&mut response).await?;
    drop(stream);

    server.shutdown();
    server.wait().await?;

    let stats = seen.lock().expect("stats").clone().expect("stats");
    assert!(stats.bytes_read() >= 6);
    assert!(stats.bytes_written() >= 6);
    assert_eq!(stats.frames_read(), 1);
    assert_eq!(stats.frames_written(), 1);
    Ok(())
}

#[tokio::test]
async fn tcp_context_write_and_flush_flushes_before_handler_returns() -> Result<()> {
    let server = TcpServer::bind("127.0.0.1:0")
        .pipeline(|| pipeline().codec(LineCodec::new()).handler(FlushTwice))
        .start()
        .await?;

    let mut stream = TcpStream::connect(server.local_addr()).await?;
    stream.write_all(b"go\n").await?;

    let mut first = vec![0; b"first\n".len()];
    tokio::time::timeout(Duration::from_millis(50), stream.read_exact(&mut first))
        .await
        .map_err(|err| Error::Pipeline(err.to_string()))??;
    assert_eq!(first, b"first\n");

    let mut second = vec![0; b"second\n".len()];
    tokio::time::timeout(Duration::from_millis(200), stream.read_exact(&mut second))
        .await
        .map_err(|err| Error::Pipeline(err.to_string()))??;
    assert_eq!(second, b"second\n");

    drop(stream);
    server.shutdown();
    server.wait().await
}

#[tokio::test]
async fn udp_context_write_and_flush_sends_before_handler_returns() -> Result<()> {
    let server = UdpServer::bind("127.0.0.1:0")
        .pipeline(|| {
            datagram_pipeline()
                .codec(Utf8DatagramCodec)
                .handler(UdpFlushTwice)
        })
        .start()
        .await?;

    let socket = UdpSocket::bind("127.0.0.1:0").await?;
    socket.send_to(b"go", server.local_addr()).await?;

    let mut first = vec![0; b"first".len()];
    let (first_len, _) =
        tokio::time::timeout(Duration::from_millis(50), socket.recv_from(&mut first))
            .await
            .map_err(|err| Error::Pipeline(err.to_string()))??;
    assert_eq!(&first[..first_len], b"first");

    let mut second = vec![0; b"second".len()];
    let (second_len, _) =
        tokio::time::timeout(Duration::from_millis(200), socket.recv_from(&mut second))
            .await
            .map_err(|err| Error::Pipeline(err.to_string()))??;
    assert_eq!(&second[..second_len], b"second");

    server.shutdown();
    server.wait().await
}

#[derive(Clone, Default)]
struct CountLife {
    started: Arc<AtomicUsize>,
    stopped: Arc<AtomicUsize>,
}

impl Life for CountLife {
    async fn tcp_server_started(&self, _local_addr: std::net::SocketAddr) -> Result<()> {
        self.started.fetch_add(1, Ordering::SeqCst);
        Ok(())
    }

    async fn tcp_server_stopped(&self, _local_addr: std::net::SocketAddr) -> Result<()> {
        self.stopped.fetch_add(1, Ordering::SeqCst);
        Ok(())
    }

    async fn udp_socket_started(&self, _local_addr: std::net::SocketAddr) -> Result<()> {
        self.started.fetch_add(1, Ordering::SeqCst);
        Ok(())
    }

    async fn udp_socket_stopped(&self, _local_addr: std::net::SocketAddr) -> Result<()> {
        self.stopped.fetch_add(1, Ordering::SeqCst);
        Ok(())
    }
}

#[derive(Clone, Default)]
struct ReasonLife {
    reasons: Arc<Mutex<Vec<CloseReason>>>,
}

impl ReasonLife {
    fn contains(&self, reason: CloseReason) -> bool {
        self.reasons.lock().expect("reasons").contains(&reason)
    }
}

impl Life for ReasonLife {
    async fn tcp_connection_closed(&self, _info: ConnInfo, reason: CloseReason) -> Result<()> {
        self.reasons.lock().expect("reasons").push(reason);
        Ok(())
    }
}

struct Echo;

impl rs_netty::Handler<String> for Echo {
    type Write = String;

    async fn read(&mut self, ctx: &mut Context<Self::Write>, msg: String) -> Result<()> {
        ctx.write(msg).await
    }
}

struct StatsEcho {
    seen: Arc<Mutex<Option<ConnectionStats>>>,
}

impl rs_netty::Handler<String> for StatsEcho {
    type Write = String;

    async fn read(&mut self, ctx: &mut Context<Self::Write>, msg: String) -> Result<()> {
        *self.seen.lock().expect("stats") = ctx.stats();
        ctx.write(msg).await
    }
}

struct FlushTwice;

impl rs_netty::Handler<String> for FlushTwice {
    type Write = String;

    async fn read(&mut self, ctx: &mut Context<Self::Write>, _msg: String) -> Result<()> {
        ctx.write_and_flush("first".to_string()).await?;
        tokio::time::sleep(Duration::from_millis(100)).await;
        ctx.write_and_flush("second".to_string()).await
    }
}

struct UdpEcho;

impl DatagramHandler<String> for UdpEcho {
    type Write = String;

    async fn read(&mut self, ctx: &mut DatagramContext<Self::Write>, msg: String) -> Result<()> {
        ctx.write(msg).await
    }
}

struct UdpFlushTwice;

impl DatagramHandler<String> for UdpFlushTwice {
    type Write = String;

    async fn read(&mut self, ctx: &mut DatagramContext<Self::Write>, _msg: String) -> Result<()> {
        ctx.write_and_flush("first".to_string()).await?;
        tokio::time::sleep(Duration::from_millis(100)).await;
        ctx.write_and_flush("second".to_string()).await
    }
}