rs-netty 1.0.0

A Tokio-native typed TCP/UDP pipeline framework inspired by Netty.
Documentation
use std::{
    collections::VecDeque,
    future::{ready, Future, IntoFuture, Ready},
    net::SocketAddr,
    pin::Pin,
    sync::{
        atomic::{AtomicBool, AtomicU64, Ordering},
        Arc, Mutex,
    },
};

use crate::{channel::DatagramChannel, context::DatagramInfo, Result};

/// Context passed to a UDP [`crate::DatagramHandler`].
///
/// Writes through this context are staged in a handler-local outbox. They are
/// encoded into the socket's pending datagram queue when the handler returns.
/// They are sent only when [`Self::flush`] or a `*_and_flush` method requests
/// an immediate flush. Dropping a flush handle is fire-and-forget; awaiting it
/// waits until the local `send_to` calls complete.
pub struct DatagramContext<W> {
    info: DatagramInfo,
    channel: DatagramChannel<W>,
    outbox: DatagramOutboxHandle<W>,
    close_requested: bool,
}

impl<W: Send + 'static> DatagramContext<W> {
    pub(crate) fn new(info: DatagramInfo, channel: DatagramChannel<W>) -> Self {
        Self {
            info,
            channel,
            outbox: DatagramOutboxHandle::new(),
            close_requested: false,
        }
    }

    /// Socket id assigned by the UDP runtime.
    pub fn id(&self) -> u64 {
        self.info.id()
    }

    /// Peer address for the current datagram.
    pub fn peer_addr(&self) -> SocketAddr {
        self.info.peer_addr()
    }

    /// Local socket address.
    pub fn local_addr(&self) -> SocketAddr {
        self.info.local_addr()
    }

    /// Returns a cloneable channel for writing from outside the current handler.
    pub fn channel(&self) -> DatagramChannel<W> {
        self.channel.clone()
    }

    /// Stages a response to the current datagram peer.
    ///
    /// The message is stored in the handler-local outbox and later encoded into
    /// the socket's pending datagram queue. It is not sent until the outbox or
    /// channel is explicitly flushed.
    #[inline]
    pub fn write(&mut self, msg: W) -> DatagramWriteHandle {
        self.outbox.push_write(self.info.peer_addr(), msg);
        DatagramWriteHandle { _private: () }
    }

    /// Stages a datagram for an explicit peer.
    ///
    /// Use this when a handler needs to reply somewhere other than the sender
    /// of the current datagram.
    #[inline]
    pub fn write_to(&mut self, peer_addr: SocketAddr, msg: W) -> DatagramWriteHandle {
        self.outbox.push_write(peer_addr, msg);
        DatagramWriteHandle { _private: () }
    }

    /// Sends messages staged by this handler so far.
    ///
    /// Dropping the returned handle is fire-and-forget. Awaiting it waits until
    /// the socket task completes `send_to` for all staged messages before this
    /// flush boundary.
    #[inline]
    pub fn flush(&mut self) -> DatagramFlushHandle<'_, W> {
        self.outbox.push_flush()
    }

    /// Stages a response to the current peer and requests a flush.
    #[inline]
    pub fn write_and_flush(&mut self, msg: W) -> DatagramFlushHandle<'_, W> {
        self.outbox.push_write_and_flush(self.info.peer_addr(), msg)
    }

    /// Stages a datagram for an explicit peer and requests a flush.
    #[inline]
    pub fn write_to_and_flush(
        &mut self,
        peer_addr: SocketAddr,
        msg: W,
    ) -> DatagramFlushHandle<'_, W> {
        self.outbox.push_write_and_flush(peer_addr, msg)
    }

    /// Requests that the socket task close after the current handler returns.
    pub async fn close(&mut self) -> Result<()> {
        self.close_requested = true;
        Ok(())
    }

    pub(crate) fn outbox(&self) -> DatagramOutboxHandle<W> {
        self.outbox.clone()
    }

    pub(crate) fn close_requested(&self) -> bool {
        self.close_requested
    }
}

pub struct DatagramWriteHandle {
    _private: (),
}

impl IntoFuture for DatagramWriteHandle {
    type Output = Result<()>;
    type IntoFuture = Ready<Result<()>>;

    #[inline]
    fn into_future(self) -> Self::IntoFuture {
        ready(Ok(()))
    }
}

pub struct DatagramFlushHandle<'a, W> {
    outbox: &'a DatagramOutboxHandle<W>,
}

impl<'a, W> IntoFuture for DatagramFlushHandle<'a, W> {
    type Output = Result<()>;
    type IntoFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;

    #[inline]
    fn into_future(self) -> Self::IntoFuture {
        let id = self.outbox.push_flush_completion();
        let state = &self.outbox.core.flush_state;

        Box::pin(async move {
            state.mark_awaited(id);

            loop {
                let notified = state.notify.notified();
                tokio::pin!(notified);
                notified.as_mut().enable();

                if state.completed_flush_id.load(Ordering::Acquire) >= id {
                    return Ok(());
                }

                notified.await;
            }
        })
    }
}

pub(crate) enum DatagramOutboxCommand<W> {
    WriteTo(SocketAddr, W),
    Flush { completion: Option<u64> },
    WriteToAndFlush { peer_addr: SocketAddr, msg: W },
}

struct DatagramOutboxState<W> {
    head: Option<DatagramOutboxCommand<W>>,
    tail: VecDeque<DatagramOutboxCommand<W>>,
}

impl<W> DatagramOutboxState<W> {
    fn new() -> Self {
        Self {
            head: None,
            tail: VecDeque::new(),
        }
    }

    #[inline]
    fn push(&mut self, command: DatagramOutboxCommand<W>) {
        if self.head.is_none() {
            self.head = Some(command);
        } else {
            self.tail.push_back(command);
        }
    }

    #[inline]
    fn take_batch(&mut self) -> DatagramOutboxBatch<W> {
        DatagramOutboxBatch {
            head: self.head.take(),
            tail: std::mem::take(&mut self.tail),
        }
    }
}

pub(crate) struct DatagramOutboxBatch<W> {
    head: Option<DatagramOutboxCommand<W>>,
    tail: VecDeque<DatagramOutboxCommand<W>>,
}

impl<W> Iterator for DatagramOutboxBatch<W> {
    type Item = DatagramOutboxCommand<W>;

    #[inline]
    fn next(&mut self) -> Option<Self::Item> {
        self.head.take().or_else(|| self.tail.pop_front())
    }
}

struct DatagramFlushState {
    next_flush_id: AtomicU64,
    completed_flush_id: AtomicU64,
    awaited_flush_id: AtomicU64,
    notify: tokio::sync::Notify,
}

impl DatagramFlushState {
    fn new() -> Self {
        Self {
            next_flush_id: AtomicU64::new(0),
            completed_flush_id: AtomicU64::new(0),
            awaited_flush_id: AtomicU64::new(0),
            notify: tokio::sync::Notify::new(),
        }
    }

    #[inline]
    fn next_id(&self) -> u64 {
        self.next_flush_id.fetch_add(1, Ordering::Relaxed) + 1
    }

    #[inline]
    fn mark_awaited(&self, id: u64) {
        self.awaited_flush_id.fetch_max(id, Ordering::Release);
    }

    #[inline]
    fn complete(&self, id: u64) {
        self.completed_flush_id.store(id, Ordering::Release);
        if self.awaited_flush_id.load(Ordering::Acquire) >= id {
            self.notify.notify_waiters();
        }
    }
}

struct DatagramOutboxCore<W> {
    commands: Mutex<DatagramOutboxState<W>>,
    flush_requested: AtomicBool,
    flush_state: DatagramFlushState,
}

pub(crate) struct DatagramOutboxHandle<W> {
    core: Arc<DatagramOutboxCore<W>>,
}

impl<W> Clone for DatagramOutboxHandle<W> {
    fn clone(&self) -> Self {
        Self {
            core: self.core.clone(),
        }
    }
}

impl<W> DatagramOutboxHandle<W> {
    fn new() -> Self {
        Self {
            core: Arc::new(DatagramOutboxCore {
                commands: Mutex::new(DatagramOutboxState::new()),
                flush_requested: AtomicBool::new(false),
                flush_state: DatagramFlushState::new(),
            }),
        }
    }

    #[inline]
    fn push_write(&self, peer_addr: SocketAddr, msg: W) {
        self.core
            .commands
            .lock()
            .expect("datagram outbox lock poisoned")
            .push(DatagramOutboxCommand::WriteTo(peer_addr, msg));
    }

    #[inline]
    fn push_flush(&self) -> DatagramFlushHandle<'_, W> {
        self.core
            .commands
            .lock()
            .expect("datagram outbox lock poisoned")
            .push(DatagramOutboxCommand::Flush { completion: None });
        self.core.flush_requested.store(true, Ordering::Release);
        DatagramFlushHandle { outbox: self }
    }

    #[inline]
    fn push_write_and_flush(&self, peer_addr: SocketAddr, msg: W) -> DatagramFlushHandle<'_, W> {
        self.core
            .commands
            .lock()
            .expect("datagram outbox lock poisoned")
            .push(DatagramOutboxCommand::WriteToAndFlush { peer_addr, msg });
        self.core.flush_requested.store(true, Ordering::Release);
        DatagramFlushHandle { outbox: self }
    }

    #[inline]
    fn push_flush_completion(&self) -> u64 {
        let id = self.core.flush_state.next_id();
        self.core
            .commands
            .lock()
            .expect("datagram outbox lock poisoned")
            .push(DatagramOutboxCommand::Flush {
                completion: Some(id),
            });
        self.core.flush_requested.store(true, Ordering::Release);
        id
    }

    #[inline]
    pub(crate) fn has_flush_command(&self) -> bool {
        self.core.flush_requested.load(Ordering::Acquire)
    }

    #[inline]
    pub(crate) fn take_commands(&self) -> DatagramOutboxBatch<W> {
        self.core.flush_requested.store(false, Ordering::Release);
        self.core
            .commands
            .lock()
            .expect("datagram outbox lock poisoned")
            .take_batch()
    }

    #[inline]
    pub(crate) fn complete_flush(&self, id: u64) {
        self.core.flush_state.complete(id);
    }
}