use std::io::Error;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::{AsyncRead, Stream};
use pin_project::pin_project;
use tor_basic_utils::assert_val_impl_trait;
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
use crate::stream::StreamTarget;
use crate::util::notify::NotifyReceiver;
#[derive(Debug)]
#[pin_project]
pub(crate) struct XonXoffReader<R> {
#[pin]
ctrl: XonXoffReaderCtrl,
#[pin]
reader: R,
pending_drain_rate_update: bool,
}
impl<R> XonXoffReader<R> {
pub(crate) fn new(ctrl: XonXoffReaderCtrl, reader: R) -> Self {
Self {
ctrl,
reader,
pending_drain_rate_update: false,
}
}
pub(crate) fn inner(&self) -> &R {
&self.reader
}
pub(crate) fn inner_mut(&mut self) -> &mut R {
&mut self.reader
}
}
impl<R: AsyncRead + BufferIsEmpty> AsyncRead for XonXoffReader<R> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize, Error>> {
let mut self_ = self.project();
assert_val_impl_trait!(
self_.ctrl.drain_rate_request_stream,
futures::stream::FusedStream,
);
if let Poll::Ready(Some(())) = self_
.ctrl
.as_mut()
.project()
.drain_rate_request_stream
.poll_next(cx)
{
*self_.pending_drain_rate_update = true;
}
let res = self_.reader.as_mut().poll_read(cx, buf);
if *self_.pending_drain_rate_update && self_.reader.is_empty() {
self_
.ctrl
.stream_target
.drain_rate_update(XonKbpsEwma::Unlimited)?;
*self_.pending_drain_rate_update = false;
}
res
}
}
#[derive(Debug)]
#[pin_project]
pub(crate) struct XonXoffReaderCtrl {
#[pin]
drain_rate_request_stream: NotifyReceiver<DrainRateRequest>,
stream_target: StreamTarget,
}
impl XonXoffReaderCtrl {
pub(crate) fn new(
drain_rate_request_stream: NotifyReceiver<DrainRateRequest>,
stream_target: StreamTarget,
) -> Self {
Self {
drain_rate_request_stream,
stream_target,
}
}
}
pub(crate) trait BufferIsEmpty {
fn is_empty(self: Pin<&mut Self>) -> bool;
}
#[derive(Debug)]
pub(crate) struct DrainRateRequest;