use crate::{
features::Gso,
message::Message,
socket::{ring::Consumer, task::events},
};
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use s2n_quic_core::task::cooldown::Cooldown;
pub use events::TxEvents as Events;
pub trait Socket<T: Message> {
type Error;
fn send(
&mut self,
cx: &mut Context,
entries: &mut [T],
events: &mut Events,
) -> Result<(), Self::Error>;
}
pub struct Sender<T: Message, S: Socket<T>> {
ring: Consumer<T>,
tx: S,
events: Events,
ring_cooldown: Cooldown,
io_cooldown: Cooldown,
}
impl<T, S> Sender<T, S>
where
T: Message + Unpin,
S: Socket<T> + Unpin,
{
#[inline]
pub fn new(ring: Consumer<T>, tx: S, gso: Gso, cooldown: Cooldown) -> Self {
Self {
ring,
tx,
events: Events::new(gso),
ring_cooldown: cooldown.clone(),
io_cooldown: cooldown,
}
}
#[inline]
fn poll_ring(&mut self, watermark: u32, cx: &mut Context) -> Poll<Result<(), ()>> {
loop {
let is_loop = self.ring_cooldown.state().is_loop();
let count = if is_loop {
self.ring.acquire(watermark)
} else {
match self.ring.poll_acquire(watermark, cx) {
Poll::Ready(count) => count,
Poll::Pending if !self.ring.is_open() => return Err(()).into(),
Poll::Pending => 0,
}
};
if count > 0 {
self.ring_cooldown.on_ready();
return Ok(()).into();
}
if is_loop && self.ring_cooldown.on_pending_task(cx).is_sleep() {
continue;
}
return Poll::Pending;
}
}
}
impl<T, S> Future for Sender<T, S>
where
T: Message + Unpin,
S: Socket<T> + Unpin,
{
type Output = Option<S::Error>;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
let mut pending_wake = false;
while !this.events.take_blocked() {
match this.poll_ring(u32::MAX, cx) {
Poll::Ready(Ok(_)) => {}
Poll::Ready(Err(_)) => return None.into(),
Poll::Pending => {
if pending_wake {
this.ring.wake();
}
return Poll::Pending;
}
}
let entries = this.ring.data();
match this.tx.send(cx, entries, &mut this.events) {
Ok(_) => {
let count = this.events.take_count() as u32;
if count > 0 {
this.ring.release_no_wake(count);
this.io_cooldown.on_ready();
pending_wake = true;
}
}
Err(err) => return Some(err).into(),
}
}
this.io_cooldown.on_pending_task(cx);
if pending_wake {
this.ring.wake();
}
Poll::Pending
}
}