use std::{
collections::VecDeque,
future::{ready, Future, IntoFuture, Ready},
net::SocketAddr,
pin::Pin,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex,
},
};
use crate::{channel::DatagramChannel, context::DatagramInfo, Result};
pub struct DatagramContext<W> {
info: DatagramInfo,
channel: DatagramChannel<W>,
outbox: DatagramOutboxHandle<W>,
close_requested: bool,
}
impl<W: Send + 'static> DatagramContext<W> {
pub(crate) fn new(info: DatagramInfo, channel: DatagramChannel<W>) -> Self {
Self {
info,
channel,
outbox: DatagramOutboxHandle::new(),
close_requested: false,
}
}
pub fn id(&self) -> u64 {
self.info.id()
}
pub fn peer_addr(&self) -> SocketAddr {
self.info.peer_addr()
}
pub fn local_addr(&self) -> SocketAddr {
self.info.local_addr()
}
pub fn channel(&self) -> DatagramChannel<W> {
self.channel.clone()
}
#[inline]
pub fn write(&mut self, msg: W) -> DatagramWriteHandle {
self.outbox.push_write(self.info.peer_addr(), msg);
DatagramWriteHandle { _private: () }
}
#[inline]
pub fn write_to(&mut self, peer_addr: SocketAddr, msg: W) -> DatagramWriteHandle {
self.outbox.push_write(peer_addr, msg);
DatagramWriteHandle { _private: () }
}
#[inline]
pub fn flush(&mut self) -> DatagramFlushHandle<'_, W> {
self.outbox.push_flush()
}
#[inline]
pub fn write_and_flush(&mut self, msg: W) -> DatagramFlushHandle<'_, W> {
self.outbox.push_write_and_flush(self.info.peer_addr(), msg)
}
#[inline]
pub fn write_to_and_flush(
&mut self,
peer_addr: SocketAddr,
msg: W,
) -> DatagramFlushHandle<'_, W> {
self.outbox.push_write_and_flush(peer_addr, msg)
}
pub async fn close(&mut self) -> Result<()> {
self.close_requested = true;
Ok(())
}
pub(crate) fn outbox(&self) -> DatagramOutboxHandle<W> {
self.outbox.clone()
}
pub(crate) fn close_requested(&self) -> bool {
self.close_requested
}
}
pub struct DatagramWriteHandle {
_private: (),
}
impl IntoFuture for DatagramWriteHandle {
type Output = Result<()>;
type IntoFuture = Ready<Result<()>>;
#[inline]
fn into_future(self) -> Self::IntoFuture {
ready(Ok(()))
}
}
pub struct DatagramFlushHandle<'a, W> {
outbox: &'a DatagramOutboxHandle<W>,
}
impl<'a, W> IntoFuture for DatagramFlushHandle<'a, W> {
type Output = Result<()>;
type IntoFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
#[inline]
fn into_future(self) -> Self::IntoFuture {
let id = self.outbox.push_flush_completion();
let state = &self.outbox.core.flush_state;
Box::pin(async move {
state.mark_awaited(id);
loop {
let notified = state.notify.notified();
tokio::pin!(notified);
notified.as_mut().enable();
if state.completed_flush_id.load(Ordering::Acquire) >= id {
return Ok(());
}
notified.await;
}
})
}
}
pub(crate) enum DatagramOutboxCommand<W> {
WriteTo(SocketAddr, W),
Flush { completion: Option<u64> },
WriteToAndFlush { peer_addr: SocketAddr, msg: W },
}
struct DatagramOutboxState<W> {
head: Option<DatagramOutboxCommand<W>>,
tail: VecDeque<DatagramOutboxCommand<W>>,
}
impl<W> DatagramOutboxState<W> {
fn new() -> Self {
Self {
head: None,
tail: VecDeque::new(),
}
}
#[inline]
fn push(&mut self, command: DatagramOutboxCommand<W>) {
if self.head.is_none() {
self.head = Some(command);
} else {
self.tail.push_back(command);
}
}
#[inline]
fn take_batch(&mut self) -> DatagramOutboxBatch<W> {
DatagramOutboxBatch {
head: self.head.take(),
tail: std::mem::take(&mut self.tail),
}
}
}
pub(crate) struct DatagramOutboxBatch<W> {
head: Option<DatagramOutboxCommand<W>>,
tail: VecDeque<DatagramOutboxCommand<W>>,
}
impl<W> Iterator for DatagramOutboxBatch<W> {
type Item = DatagramOutboxCommand<W>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.head.take().or_else(|| self.tail.pop_front())
}
}
struct DatagramFlushState {
next_flush_id: AtomicU64,
completed_flush_id: AtomicU64,
awaited_flush_id: AtomicU64,
notify: tokio::sync::Notify,
}
impl DatagramFlushState {
fn new() -> Self {
Self {
next_flush_id: AtomicU64::new(0),
completed_flush_id: AtomicU64::new(0),
awaited_flush_id: AtomicU64::new(0),
notify: tokio::sync::Notify::new(),
}
}
#[inline]
fn next_id(&self) -> u64 {
self.next_flush_id.fetch_add(1, Ordering::Relaxed) + 1
}
#[inline]
fn mark_awaited(&self, id: u64) {
self.awaited_flush_id.fetch_max(id, Ordering::Release);
}
#[inline]
fn complete(&self, id: u64) {
self.completed_flush_id.store(id, Ordering::Release);
if self.awaited_flush_id.load(Ordering::Acquire) >= id {
self.notify.notify_waiters();
}
}
}
struct DatagramOutboxCore<W> {
commands: Mutex<DatagramOutboxState<W>>,
flush_requested: AtomicBool,
flush_state: DatagramFlushState,
}
pub(crate) struct DatagramOutboxHandle<W> {
core: Arc<DatagramOutboxCore<W>>,
}
impl<W> Clone for DatagramOutboxHandle<W> {
fn clone(&self) -> Self {
Self {
core: self.core.clone(),
}
}
}
impl<W> DatagramOutboxHandle<W> {
fn new() -> Self {
Self {
core: Arc::new(DatagramOutboxCore {
commands: Mutex::new(DatagramOutboxState::new()),
flush_requested: AtomicBool::new(false),
flush_state: DatagramFlushState::new(),
}),
}
}
#[inline]
fn push_write(&self, peer_addr: SocketAddr, msg: W) {
self.core
.commands
.lock()
.expect("datagram outbox lock poisoned")
.push(DatagramOutboxCommand::WriteTo(peer_addr, msg));
}
#[inline]
fn push_flush(&self) -> DatagramFlushHandle<'_, W> {
self.core
.commands
.lock()
.expect("datagram outbox lock poisoned")
.push(DatagramOutboxCommand::Flush { completion: None });
self.core.flush_requested.store(true, Ordering::Release);
DatagramFlushHandle { outbox: self }
}
#[inline]
fn push_write_and_flush(&self, peer_addr: SocketAddr, msg: W) -> DatagramFlushHandle<'_, W> {
self.core
.commands
.lock()
.expect("datagram outbox lock poisoned")
.push(DatagramOutboxCommand::WriteToAndFlush { peer_addr, msg });
self.core.flush_requested.store(true, Ordering::Release);
DatagramFlushHandle { outbox: self }
}
#[inline]
fn push_flush_completion(&self) -> u64 {
let id = self.core.flush_state.next_id();
self.core
.commands
.lock()
.expect("datagram outbox lock poisoned")
.push(DatagramOutboxCommand::Flush {
completion: Some(id),
});
self.core.flush_requested.store(true, Ordering::Release);
id
}
#[inline]
pub(crate) fn has_flush_command(&self) -> bool {
self.core.flush_requested.load(Ordering::Acquire)
}
#[inline]
pub(crate) fn take_commands(&self) -> DatagramOutboxBatch<W> {
self.core.flush_requested.store(false, Ordering::Release);
self.core
.commands
.lock()
.expect("datagram outbox lock poisoned")
.take_batch()
}
#[inline]
pub(crate) fn complete_flush(&self, id: u64) {
self.core.flush_state.complete(id);
}
}