citadel_proto 0.13.0

Networking library for the Citadel Protocol
Documentation
//! # Citadel Protocol Outbound Message Handling
//!
//! This module provides functionality for sending outbound messages in the Citadel Protocol.
//! It implements various types of message senders optimized for different use cases,
//! including bounded and unbounded channels, UDP and TCP streams.
//!
//! ## Features
//!
//! - **Multiple Channel Types**:
//!   - Unbounded channels for high-throughput scenarios
//!   - Bounded channels for rate-limiting and backpressure
//!   - UDP-specific senders for datagram-based communication
//!
//! - **Stream Management**:
//!   - Primary stream handling for reliable TCP communication
//!   - UDP stream handling with keep-alive support
//!   - Automatic stream cleanup and resource management
//!
//! - **Error Handling**:
//!   - Comprehensive error types for different failure scenarios
//!   - Graceful handling of connection issues
//!   - Automatic retry mechanisms
//!
//! - **Performance Optimizations**:
//!   - Zero-copy buffer management
//!   - Efficient async/await support
//!   - Minimal allocation overhead
//!
//! ## Components
//!
//! - **UnboundedSender**: High-throughput channel without backpressure
//! - **BoundedSender**: Rate-limited channel with backpressure
//! - **OutboundPrimaryStreamSender**: TCP stream management
//! - **OutboundUdpSender**: UDP datagram handling with keep-alive

use crate::error::NetworkError;
use crate::proto::packet::packet_flags;
use bytes::BytesMut;
pub use citadel_io::tokio::sync::mpsc::{
    error::SendError, Receiver, Sender, UnboundedReceiver, UnboundedSender as UnboundedSenderInner,
};
use citadel_user::re_exports::__private::Formatter;
use futures::task::{Context, Poll};
use futures::Sink;
use std::net::SocketAddr;
use std::pin::Pin;

pub struct UnboundedSender<T>(pub(crate) UnboundedSenderInner<T>);

impl<T> Clone for UnboundedSender<T> {
    fn clone(&self) -> Self {
        Self(self.0.clone())
    }
}

pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
    let (tx, rx) = citadel_io::tokio::sync::mpsc::unbounded_channel();
    (UnboundedSender(tx), rx)
}

impl<T> UnboundedSender<T> {
    #[inline]
    pub fn unbounded_send(&self, item: T) -> Result<(), SendError<T>> {
        self.0.send(item)
    }
}

pub fn channel<T>(len: usize) -> (Sender<T>, Receiver<T>) {
    citadel_io::tokio::sync::mpsc::channel(len)
}

#[derive(Clone)]
pub struct OutboundPrimaryStreamSender(UnboundedSender<bytes::BytesMut>);

impl OutboundPrimaryStreamSender {
    #[inline]
    pub fn unbounded_send(&self, item: bytes::BytesMut) -> Result<(), SendError<BytesMut>> {
        self.0.unbounded_send(item)
    }
}

impl From<UnboundedSender<bytes::BytesMut>> for OutboundPrimaryStreamSender {
    fn from(inner: UnboundedSender<BytesMut>) -> Self {
        Self(inner)
    }
}

pub struct OutboundPrimaryStreamReceiver(
    pub citadel_io::tokio_stream::wrappers::UnboundedReceiverStream<bytes::BytesMut>,
);

impl From<UnboundedReceiver<bytes::BytesMut>> for OutboundPrimaryStreamReceiver {
    fn from(inner: UnboundedReceiver<BytesMut>) -> Self {
        Self(citadel_io::tokio_stream::wrappers::UnboundedReceiverStream::new(inner))
    }
}

/// For keeping the firewall open
pub const KEEP_ALIVE: &[u8; 2] = b"KA";

#[derive(Clone)]
pub struct OutboundUdpSender {
    sender: UnboundedSender<(u8, BytesMut)>,
    local_addr: SocketAddr,
    remote_addr: SocketAddr,
    pub(crate) needs_manual_ka: bool,
}

impl OutboundUdpSender {
    pub fn new(
        sender: UnboundedSender<(u8, BytesMut)>,
        local_addr: SocketAddr,
        remote_addr: SocketAddr,
        needs_manual_ka: bool,
    ) -> Self {
        Self {
            sender,
            local_addr,
            remote_addr,
            needs_manual_ka,
        }
    }

    pub fn unbounded_send<T: Into<BytesMut>>(&self, packet: T) -> Result<(), NetworkError> {
        self.sender
            .unbounded_send((packet_flags::cmd::aux::udp::STREAM, packet.into()))
            .map_err(|err| NetworkError::Generic(err.to_string()))
    }

    pub fn send_keep_alive(&self) -> bool {
        self.sender
            .unbounded_send((
                packet_flags::cmd::aux::udp::KEEP_ALIVE,
                BytesMut::from(&KEEP_ALIVE[..]),
            ))
            .is_ok()
    }

    pub fn local_addr(&self) -> SocketAddr {
        self.local_addr
    }

    pub fn remote_addr(&self) -> SocketAddr {
        self.remote_addr
    }
}

impl Sink<BytesMut> for OutboundUdpSender {
    type Error = NetworkError;

    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Pin::new(&mut self.sender)
            .poll_ready(cx)
            .map_err(|err| NetworkError::Generic(err.to_string()))
    }

    fn start_send(mut self: Pin<&mut Self>, item: BytesMut) -> Result<(), Self::Error> {
        Pin::new(&mut self.sender)
            .start_send((packet_flags::cmd::aux::udp::STREAM, item))
            .map_err(|err| NetworkError::Generic(err.to_string()))
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Pin::new(&mut self.sender)
            .poll_flush(cx)
            .map_err(|err| NetworkError::Generic(err.to_string()))
    }

    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Pin::new(&mut self.sender)
            .poll_close(cx)
            .map_err(|err| NetworkError::Generic(err.to_string()))
    }
}

impl std::fmt::Debug for OutboundUdpSender {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(f, "UDP Sender")
    }
}

/// As asynchronous channel meant to rate-limit input
pub struct BoundedSender<T>(citadel_io::tokio::sync::mpsc::Sender<T>);

pub type BoundedReceiver<T> = citadel_io::tokio::sync::mpsc::Receiver<T>;

impl<T> BoundedSender<T> {
    /// Creates a new bounded channel
    pub fn new(limit: usize) -> (BoundedSender<T>, BoundedReceiver<T>) {
        let (tx, rx) = citadel_io::tokio::sync::mpsc::channel(limit);
        (Self(tx), rx)
    }

    /// Attempts to send a value through the stream non-blocking and synchronously
    pub fn try_send(
        &self,
        t: T,
    ) -> Result<(), citadel_io::tokio::sync::mpsc::error::TrySendError<T>> {
        self.0.try_send(t)
    }

    /// Sends a value through the channel
    pub async fn send(
        &self,
        t: T,
    ) -> Result<(), citadel_io::tokio::sync::mpsc::error::SendError<T>> {
        self.0.send(t).await
    }
}

impl<T> Clone for BoundedSender<T> {
    fn clone(&self) -> Self {
        Self(self.0.clone())
    }
}

impl<T> Sink<T> for UnboundedSender<T> {
    type Error = NetworkError;

    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        if self.0.is_closed() {
            Poll::Ready(Err(NetworkError::InternalError("Channel tx closed")))
        } else {
            Poll::Ready(Ok(()))
        }
    }

    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
        self.0
            .send(item)
            .map_err(|err| NetworkError::Generic(err.to_string()))
    }

    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }
}