rs-netty 0.2.2

A Tokio-native typed TCP/UDP pipeline framework inspired by Netty.
Documentation
use bytes::BytesMut;
use tokio::{
    net::UdpSocket,
    sync::{mpsc, watch},
};

use crate::{
    channel::{command::DatagramCommand, DatagramChannel},
    context::{BusinessContext, DatagramContext, DatagramInfo, InboundContext, OutboundContext},
    life::Life,
    pipeline::datagram::runtime::{flush_datagram, DatagramRuntimePipeline},
    transport::{shutdown, udp::config::UdpSocketConfig},
    Error, Result,
};

pub(crate) struct DatagramSocketRuntime<P>
where
    P: DatagramRuntimePipeline,
{
    pub id: u64,
    pub socket: UdpSocket,
    pub pipeline: P,
    pub config: UdpSocketConfig,
    pub channel: DatagramChannel<P::Write>,
    pub rx: mpsc::Receiver<DatagramCommand<P::Write>>,
    pub shutdown_rx: Option<watch::Receiver<bool>>,
}

pub(crate) async fn run_datagram_socket_with_life<P, L>(
    runtime: DatagramSocketRuntime<P>,
    life: L,
) -> Result<()>
where
    P: DatagramRuntimePipeline,
    L: Life,
{
    let DatagramSocketRuntime {
        id,
        socket,
        mut pipeline,
        config,
        channel,
        mut rx,
        mut shutdown_rx,
    } = runtime;

    let local_addr = socket.local_addr()?;
    let read_buffer_capacity = config
        .read_buffer_capacity
        .max(config.max_datagram_size)
        .max(1);
    let mut read_buf = vec![0_u8; read_buffer_capacity];
    let mut write_buf = BytesMut::with_capacity(config.write_buffer_capacity);

    life.udp_socket_started(local_addr).await?;

    let result: Result<()> = async {
        loop {
            if shutdown::requested(&shutdown_rx) {
                break;
            }

            tokio::select! {
                read = socket.recv_from(&mut read_buf) => {
                    let (read_len, peer_addr) = read?;

                    if read_len > config.max_datagram_size {
                        return Err(Error::DatagramTooLarge {
                            current: read_len,
                            max: config.max_datagram_size,
                        });
                    }

                    let msg = pipeline.decode_datagram(&read_buf[..read_len])?;
                    let info = DatagramInfo::new(id, peer_addr, local_addr);
                    let mut inbound_ctx = InboundContext::new_datagram(info);
                    let mut business_ctx = BusinessContext::new_datagram(info);
                    let mut ctx = DatagramContext::new(info, channel.clone());
                    let mut outbound_ctx = OutboundContext::new_datagram(info);

                    pipeline
                        .process_inbound_flushable(
                            &mut inbound_ctx,
                            &mut business_ctx,
                            &mut outbound_ctx,
                            &mut ctx,
                            &socket,
                            &mut write_buf,
                            msg,
                        )
                        .await?;

                    if ctx.close_requested() {
                        return Ok(());
                    }
                }

                cmd = rx.recv() => {
                    match cmd {
                        Some(DatagramCommand::WriteTo(peer_addr, msg)) => {
                            let info = DatagramInfo::new(id, peer_addr, local_addr);
                            let mut outbound_ctx = OutboundContext::new_datagram(info);

                            pipeline
                                .process_outbound(&mut outbound_ctx, msg, &mut write_buf)
                                .await?;

                            flush_datagram(&socket, peer_addr, &mut write_buf).await?;
                        }
                        Some(DatagramCommand::WriteToAndFlush(peer_addr, msg, done)) => {
                            let info = DatagramInfo::new(id, peer_addr, local_addr);
                            let mut outbound_ctx = OutboundContext::new_datagram(info);

                            let result = async {
                                pipeline
                                    .process_outbound(&mut outbound_ctx, msg, &mut write_buf)
                                    .await?;
                                flush_datagram(&socket, peer_addr, &mut write_buf).await
                            }
                            .await;

                            let ack = match &result {
                                Ok(()) => Ok(()),
                                Err(err) => Err(Error::Pipeline(format!(
                                    "write_to_and_flush failed: {err}"
                                ))),
                            };
                            let _ = done.send(ack);
                            result?;
                        }
                        Some(DatagramCommand::Close) | None => {
                            break;
                        }
                    }
                }

                _ = shutdown::wait(&mut shutdown_rx) => {
                    break;
                }
            }
        }

        Ok(())
    }
    .await;

    match result {
        Ok(()) => life.udp_socket_stopped(local_addr).await,
        Err(err) => {
            if let Err(life_err) = life.udp_socket_stopped(local_addr).await {
                tracing::debug!(
                    local_addr = %local_addr,
                    error = ?life_err,
                    "udp life hook failed while stopping errored socket"
                );
            }

            Err(err)
        }
    }
}