use std::pin::Pin;
use std::task::{Context, Poll};
use futures::Stream;
use pin_project::pin_project;
use tor_async_utils::peekable_stream::UnobtrusivePeekableStream;
use tor_cell::relaycell::{RelayCmd, UnparsedRelayMsg};
use tracing::debug;
use crate::congestion::sendme;
use crate::stream::StreamTarget;
use crate::stream::queue::StreamQueueReceiver;
use crate::{Error, Result};
#[derive(Debug)]
#[pin_project]
pub struct StreamReceiver {
pub(crate) target: StreamTarget,
#[pin]
pub(crate) receiver: StreamQueueReceiver,
pub(crate) recv_window: sendme::StreamRecvWindow,
pub(crate) ended: bool,
}
impl StreamReceiver {
fn poll_next_inner(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Result<Poll<UnparsedRelayMsg>> {
let msg = match self.as_mut().project().receiver.poll_next(cx) {
Poll::Ready(Some(msg)) => msg,
Poll::Ready(None) => {
return Err(Error::CircuitClosed);
}
Poll::Pending => return Ok(Poll::Pending),
};
if sendme::cell_counts_towards_windows(&msg) && self.recv_window.take()? {
if let Err(e) = self.target.send_sendme() {
if matches!(e, Error::CircuitClosed) {
debug!("Failed to send stream-level SENDME. Ignoring: {e}");
} else {
return Err(e);
}
}
self.recv_window.put();
}
Ok(Poll::Ready(msg))
}
pub fn protocol_error(&mut self) {
self.target.protocol_error();
}
pub(crate) fn is_empty(&mut self) -> bool {
let peek_is_none = Pin::new(&mut self.receiver).unobtrusive_peek().is_none();
#[cfg(debug_assertions)]
if peek_is_none {
assert_eq!(self.receiver.approx_stream_bytes(), 0);
} else {
}
peek_is_none
}
}
impl Stream for StreamReceiver {
type Item = Result<UnparsedRelayMsg>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.ended {
return Poll::Ready(None);
}
match self.as_mut().poll_next_inner(cx) {
Ok(Poll::Pending) => Poll::Pending,
Ok(Poll::Ready(msg)) => {
if msg.cmd() == RelayCmd::END {
self.ended = true;
}
Poll::Ready(Some(Ok(msg)))
}
Err(e) => {
self.ended = true;
Poll::Ready(Some(Err(e)))
}
}
}
}