use std::{
collections::VecDeque,
future::{poll_fn, Future},
marker::PhantomData,
net::SocketAddr,
task::Poll,
};
use bytes::{Bytes, BytesMut};
use tokio::net::UdpSocket;
use crate::{
codec::{DatagramDecoder, DatagramEncoder},
context::{
datagram::{DatagramOutboxCommand, DatagramOutboxHandle},
BusinessContext, DatagramContext, InboundContext, OutboundContext,
},
pipeline::core::pipe::{BusinessPipe, InboundPipe, OutboundPipe},
traits::{DatagramHandler, Flow},
Result,
};
pub(crate) type PendingDatagrams = VecDeque<(SocketAddr, Bytes)>;
pub struct DatagramPipeline<C, InP, BizP, H, OutP, CurrentIn, Write, CurrentOut> {
codec: C,
inbound: InP,
business: BizP,
handler: H,
outbound: OutP,
_marker: PhantomData<(CurrentIn, Write, CurrentOut)>,
}
impl<C, InP, BizP, H, OutP, CurrentIn, Write, CurrentOut>
DatagramPipeline<C, InP, BizP, H, OutP, CurrentIn, Write, CurrentOut>
{
pub(crate) fn new(codec: C, inbound: InP, business: BizP, handler: H, outbound: OutP) -> Self {
Self {
codec,
inbound,
business,
handler,
outbound,
_marker: PhantomData,
}
}
}
pub trait DatagramRuntimePipeline: Send + 'static {
type Write: Send + 'static;
type Decoded: Send + 'static;
fn decode_datagram(&mut self, src: &[u8]) -> Result<Self::Decoded>;
fn process_inbound<'ctx>(
&'ctx mut self,
inbound_ctx: &'ctx mut InboundContext,
business_ctx: &'ctx mut BusinessContext,
ctx: &'ctx mut DatagramContext<Self::Write>,
msg: Self::Decoded,
) -> impl Future<Output = Result<()>> + Send + 'ctx;
#[allow(
clippy::too_many_arguments,
reason = "The runtime deliberately passes split mutable state to avoid bundling socket, buffers, and contexts into a broad mutable facade."
)]
fn process_inbound_flushable<'ctx>(
&'ctx mut self,
inbound_ctx: &'ctx mut InboundContext,
business_ctx: &'ctx mut BusinessContext,
outbound_ctx: &'ctx mut OutboundContext,
ctx: &'ctx mut DatagramContext<Self::Write>,
socket: &'ctx UdpSocket,
write_buf: &'ctx mut BytesMut,
pending_datagrams: &'ctx mut PendingDatagrams,
msg: Self::Decoded,
) -> impl Future<Output = Result<()>> + Send + 'ctx;
fn process_outbound<'ctx>(
&'ctx mut self,
outbound_ctx: &'ctx mut OutboundContext,
msg: Self::Write,
dst: &'ctx mut BytesMut,
) -> impl Future<Output = Result<()>> + Send + 'ctx;
}
impl<C, InP, BizP, H, OutP, CurrentIn, Write, CurrentOut> DatagramRuntimePipeline
for DatagramPipeline<C, InP, BizP, H, OutP, CurrentIn, Write, CurrentOut>
where
C: DatagramDecoder + DatagramEncoder<CurrentOut>,
InP: InboundPipe<C::Item>,
BizP: BusinessPipe<InP::Out, Out = CurrentIn>,
H: DatagramHandler<CurrentIn, Write = Write>,
OutP: OutboundPipe<Write, Out = CurrentOut>,
C::Item: Send + 'static,
CurrentIn: Send + 'static,
Write: Send + 'static,
CurrentOut: Send + 'static,
{
type Write = Write;
type Decoded = C::Item;
fn decode_datagram(&mut self, src: &[u8]) -> Result<Self::Decoded> {
self.codec.decode_datagram(src)
}
async fn process_inbound(
&mut self,
inbound_ctx: &mut InboundContext,
business_ctx: &mut BusinessContext,
ctx: &mut DatagramContext<Self::Write>,
msg: Self::Decoded,
) -> Result<()> {
let msg = match self.inbound.process(inbound_ctx, msg).await? {
Flow::Next(msg) => msg,
Flow::Stop => return Ok(()),
};
let msg = match self.business.process(business_ctx, msg).await? {
Flow::Next(msg) => msg,
Flow::Stop => return Ok(()),
};
self.handler.read(ctx, msg).await
}
#[allow(
clippy::too_many_arguments,
reason = "The runtime deliberately passes split mutable state to avoid bundling socket, buffers, and contexts into a broad mutable facade."
)]
async fn process_inbound_flushable(
&mut self,
inbound_ctx: &mut InboundContext,
business_ctx: &mut BusinessContext,
outbound_ctx: &mut OutboundContext,
ctx: &mut DatagramContext<Self::Write>,
socket: &UdpSocket,
write_buf: &mut BytesMut,
pending_datagrams: &mut PendingDatagrams,
msg: Self::Decoded,
) -> Result<()> {
let msg = match self.inbound.process(inbound_ctx, msg).await? {
Flow::Next(msg) => msg,
Flow::Stop => return Ok(()),
};
let msg = match self.business.process(business_ctx, msg).await? {
Flow::Next(msg) => msg,
Flow::Stop => return Ok(()),
};
let outbox = ctx.outbox();
let handler = self.handler.read(ctx, msg);
tokio::pin!(handler);
let result = loop {
let poll_result = poll_fn(|cx| match handler.as_mut().poll(cx) {
Poll::Ready(result) => Poll::Ready(DatagramHandlerPoll::Ready(result)),
Poll::Pending if outbox.has_flush_command() => {
Poll::Ready(DatagramHandlerPoll::Pending)
}
Poll::Pending => Poll::Pending,
})
.await;
match poll_result {
DatagramHandlerPoll::Ready(result) => break result,
DatagramHandlerPoll::Pending => {
drain_datagram_outbox(
&outbox,
&mut self.codec,
&mut self.outbound,
outbound_ctx,
socket,
write_buf,
pending_datagrams,
)
.await?;
}
}
};
#[allow(
clippy::drop_non_drop,
reason = "This explicit drop marks the end of the pinned handler borrow before draining the outbox."
)]
drop(handler);
drain_datagram_outbox(
&outbox,
&mut self.codec,
&mut self.outbound,
outbound_ctx,
socket,
write_buf,
pending_datagrams,
)
.await?;
result
}
async fn process_outbound(
&mut self,
outbound_ctx: &mut OutboundContext,
msg: Self::Write,
dst: &mut BytesMut,
) -> Result<()> {
let msg = match self.outbound.process(outbound_ctx, msg).await? {
Flow::Next(msg) => msg,
Flow::Stop => return Ok(()),
};
self.codec.encode_datagram(msg, dst)
}
}
enum DatagramHandlerPoll {
Ready(Result<()>),
Pending,
}
pub(crate) async fn drain_datagram_outbox<C, OutP, Write, CurrentOut>(
outbox: &DatagramOutboxHandle<Write>,
codec: &mut C,
outbound: &mut OutP,
outbound_ctx: &mut OutboundContext,
socket: &UdpSocket,
write_buf: &mut BytesMut,
pending_datagrams: &mut PendingDatagrams,
) -> Result<()>
where
C: DatagramEncoder<CurrentOut>,
OutP: OutboundPipe<Write, Out = CurrentOut>,
Write: Send + 'static,
CurrentOut: Send + 'static,
{
let commands = outbox.take_commands();
for command in commands {
match command {
DatagramOutboxCommand::WriteTo(peer_addr, msg) => {
encode_datagram_outbound(codec, outbound, outbound_ctx, msg, write_buf).await?;
queue_datagram(peer_addr, write_buf, pending_datagrams);
}
DatagramOutboxCommand::Flush { completion } => {
let result = flush_datagrams(socket, pending_datagrams).await;
if let Some(id) = completion {
outbox.complete_flush(id);
}
result?;
}
DatagramOutboxCommand::WriteToAndFlush { peer_addr, msg } => {
encode_datagram_outbound(codec, outbound, outbound_ctx, msg, write_buf).await?;
queue_datagram(peer_addr, write_buf, pending_datagrams);
flush_datagrams(socket, pending_datagrams).await?;
}
}
}
Ok(())
}
pub(crate) async fn encode_datagram_outbound<C, OutP, Write, CurrentOut>(
codec: &mut C,
outbound: &mut OutP,
outbound_ctx: &mut OutboundContext,
msg: Write,
write_buf: &mut BytesMut,
) -> Result<()>
where
C: DatagramEncoder<CurrentOut>,
OutP: OutboundPipe<Write, Out = CurrentOut>,
Write: Send + 'static,
CurrentOut: Send + 'static,
{
let msg = match outbound.process(outbound_ctx, msg).await? {
Flow::Next(msg) => msg,
Flow::Stop => return Ok(()),
};
codec.encode_datagram(msg, write_buf)
}
pub(crate) fn queue_datagram(
peer_addr: SocketAddr,
write_buf: &mut BytesMut,
pending_datagrams: &mut PendingDatagrams,
) {
if !write_buf.is_empty() {
pending_datagrams.push_back((peer_addr, write_buf.split().freeze()));
}
}
pub(crate) async fn flush_datagrams(
socket: &UdpSocket,
pending_datagrams: &mut PendingDatagrams,
) -> Result<()> {
while let Some((peer_addr, bytes)) = pending_datagrams.pop_front() {
socket.send_to(&bytes, peer_addr).await?;
}
Ok(())
}