use std::{
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
time::Duration,
};
use bytes::BufMut;
use futures::StreamExt;
use qbase::{
frame::PingFrame,
net::tx::{ArcSendWaker, Signals},
packet::Package,
time::ArcDeferIdleTimer,
util::ArcAsyncDeque,
};
use tokio::time::Instant;
#[derive(Debug)]
struct Heartbeat {
defer_idle_timer: ArcDeferIdleTimer,
triggered_heartbeat: bool,
last_heartbeat: Option<Instant>,
interval: Duration,
}
impl Heartbeat {
fn new(defer_idle_timer: ArcDeferIdleTimer, interval: Duration) -> Self {
Self {
defer_idle_timer,
triggered_heartbeat: false,
last_heartbeat: None,
interval,
}
}
fn need_trigger(&mut self) -> bool {
if self.triggered_heartbeat {
return false;
}
let idle_too_long = self.defer_idle_timer.is_idle_lasted_for(self.interval);
let is_defer_idle_timeout_run_out = self.defer_idle_timer.is_expired();
if !idle_too_long || is_defer_idle_timeout_run_out {
self.triggered_heartbeat = false;
return false;
}
if self
.last_heartbeat
.is_none_or(|last| last.elapsed() >= self.interval)
{
self.triggered_heartbeat = true;
return true;
}
false
}
fn try_load_heartbeat_into<P: ?Sized>(&mut self, packet: &mut P) -> Result<(), Signals>
where
PingFrame: Package<P>,
{
if !self.triggered_heartbeat {
return Err(Signals::TRANSPORT);
}
PingFrame.dump(packet)?;
self.last_heartbeat = Some(Instant::now());
self.triggered_heartbeat = false;
Ok(())
}
fn on_effective_communicated(&mut self) {
self.defer_idle_timer.renew_on_effective_communicated();
self.triggered_heartbeat = false;
}
}
#[derive(Debug, Clone)]
pub struct ArcHeartbeat(Arc<Mutex<Heartbeat>>);
impl ArcHeartbeat {
pub fn new(defer_idle_timer: ArcDeferIdleTimer, interval: Duration) -> Self {
Self(Arc::new(Mutex::new(Heartbeat::new(
defer_idle_timer,
interval,
))))
}
pub fn need_trigger(&self) -> bool {
self.0.lock().unwrap().need_trigger()
}
fn try_load_heartbeat_into<P: ?Sized>(&self, packet: &mut P) -> Result<(), Signals>
where
PingFrame: Package<P>,
{
self.0.lock().unwrap().try_load_heartbeat_into(packet)
}
pub fn renew_on_effective_communicated(&self) {
self.0.lock().unwrap().on_effective_communicated();
}
}
impl<P: ?Sized> Package<P> for &ArcHeartbeat
where
PingFrame: Package<P>,
{
#[inline]
fn dump(&mut self, packet: &mut P) -> Result<(), Signals> {
self.try_load_heartbeat_into(packet)
}
}
pub struct SendBuffer<T> {
item: Mutex<Option<T>>,
tx_waker: ArcSendWaker,
}
impl<T> SendBuffer<T> {
pub fn new(tx_waker: ArcSendWaker) -> Self {
Self {
item: Default::default(),
tx_waker,
}
}
pub fn write(&self, frame: T) {
self.tx_waker.wake_by(Signals::TRANSPORT);
*self.item.lock().unwrap() = Some(frame);
}
}
impl<F> SendBuffer<F> {
pub fn try_load_frames_into<P: ?Sized>(&self, packet: &mut P) -> Result<(), Signals>
where
for<'a> &'a F: Package<P>,
{
let mut guard = self.item.lock().unwrap();
match guard.as_ref() {
Some(mut frame) => {
frame.dump(packet)?;
guard.take().unwrap();
Ok(())
}
None => Err(Signals::TRANSPORT),
}
}
}
impl<F, P: ?Sized> Package<P> for &SendBuffer<F>
where
for<'a> &'a F: Package<P>,
{
#[inline]
fn dump(&mut self, into: &mut P) -> Result<(), Signals> {
self.try_load_frames_into(into)
}
}
#[derive(Clone, Debug, Default)]
pub struct RecvBuffer<T>(ArcAsyncDeque<T>);
impl<T> RecvBuffer<T> {
pub fn new() -> Self {
Self(ArcAsyncDeque::with_capacity(2))
}
pub fn write(&self, value: T) {
self.0.push_back(value);
}
pub async fn receive(&self) -> Option<T> {
let mut this = self;
this.next().await
}
pub fn dismiss(&self) {
self.0.close();
}
}
impl<T> futures::Stream for RecvBuffer<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.0.poll_pop(cx)
}
}
impl<T> futures::Stream for &RecvBuffer<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.0.poll_pop(cx)
}
}
#[derive(Debug, Clone, Copy)]
pub struct Constraints {
credit_limit: usize,
send_quota: usize,
}
impl Constraints {
pub fn new(credit_limit: usize, send_quota: usize) -> Self {
Self {
credit_limit,
send_quota,
}
}
pub fn is_available(&self) -> bool {
self.credit_limit > 0
}
pub fn constrain<'b>(&self, buf: &'b mut [u8]) -> &'b mut [u8] {
let min_len = buf
.remaining_mut()
.min(self.credit_limit)
.min(self.send_quota);
&mut buf[..min_len]
}
pub fn available(&self) -> usize {
self.credit_limit.min(self.send_quota)
}
pub fn commit(&mut self, len: usize, in_flight: bool) {
self.credit_limit = self.credit_limit.saturating_sub(len);
if in_flight {
self.send_quota = self.send_quota.saturating_sub(len);
}
}
}
pub trait ApplyConstraints {
fn apply(self, constraints: &Constraints) -> Self;
}
impl ApplyConstraints for &mut [u8] {
fn apply(self, constraints: &Constraints) -> Self {
constraints.constrain(self)
}
}