use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll, Poll::*, ready};
use futures::{Sink, future};
use pin_project::pin_project;
#[pin_project]
pub(crate) struct SometimesUnboundedSink<T, S> {
buf: VecDeque<T>,
#[pin]
inner: S,
}
impl<T, S: Sink<T>> SometimesUnboundedSink<T, S> {
pub(crate) fn new(inner: S) -> Self {
SometimesUnboundedSink {
buf: VecDeque::new(),
inner,
}
}
pub(crate) fn n_queued(&self) -> usize {
self.buf.len()
}
#[cfg(feature = "circ-padding")]
pub(crate) fn iter_queue(&self) -> impl Iterator<Item = &T> + '_ {
self.buf.iter()
}
pub(crate) fn pollish_send_unbounded(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
item: T,
) -> Result<(), S::Error> {
match self.as_mut().poll_ready(cx) {
Ready(Ok(())) => self.as_mut().start_send(item),
Ready(Err(e)) => Err(e),
Pending => {
self.as_mut().project().buf.push_back(item);
Ok(())
}
}
}
pub(crate) async fn send_unbounded(mut self: Pin<&mut Self>, item: T) -> Result<(), S::Error> {
let mut item = Some(item);
future::poll_fn(move |cx| {
let item = item.take().expect("polled after Ready");
Ready(self.as_mut().pollish_send_unbounded(cx, item))
})
.await
}
fn flush_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
let mut self_ = self.as_mut().project();
while !self_.buf.is_empty() {
ready!(self_.inner.as_mut().poll_ready(cx))?;
let item = self_.buf.pop_front().expect("suddenly empty!");
self_.inner.as_mut().start_send(item)?;
}
Ready(Ok(()))
}
pub(crate) fn as_inner(&self) -> &S {
&self.inner
}
pub(crate) fn as_inner_mut(&mut self) -> &mut S {
&mut self.inner
}
}
impl<T, S: Sink<T>> Sink<T> for SometimesUnboundedSink<T, S> {
type Error = S::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
ready!(self.as_mut().flush_buf(cx))?;
self.project().inner.poll_ready(cx)
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), S::Error> {
assert!(self.buf.is_empty(), "start_send without poll_ready");
self.project().inner.start_send(item)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
ready!(self.as_mut().flush_buf(cx))?;
self.project().inner.poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
ready!(self.as_mut().flush_buf(cx))?;
self.project().inner.poll_close(cx)
}
}
#[cfg(test)]
mod test {
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::mixed_attributes_style)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_time_subtraction)]
#![allow(clippy::useless_vec)]
#![allow(clippy::needless_pass_by_value)]
use super::*;
use futures::channel::mpsc;
use futures::{SinkExt as _, StreamExt as _};
use std::pin::pin;
use tor_rtmock::MockRuntime;
#[test]
fn cases() {
MockRuntime::test_with_various(|runtime| async move {
let (tx, rx) = mpsc::channel(1);
let tx = SometimesUnboundedSink::new(tx);
runtime.spawn_identified("sender", async move {
let mut tx = pin!(tx);
let mut n = 0..;
let mut n = move || n.next().unwrap();
tx.as_mut().send_unbounded(n()).await.unwrap();
tx.as_mut().send(n()).await.unwrap();
tx.as_mut().send(n()).await.unwrap();
tx.as_mut().send(n()).await.unwrap();
tx.as_mut().send_unbounded(n()).await.unwrap();
tx.as_mut().send_unbounded(n()).await.unwrap();
tx.as_mut().send_unbounded(n()).await.unwrap();
tx.as_mut().send(n()).await.unwrap();
tx.as_mut().send_unbounded(n()).await.unwrap();
tx.as_mut().flush().await.unwrap();
tx.as_mut().close().await.unwrap();
});
runtime.spawn_identified("receiver", async move {
let mut rx = pin!(rx);
let mut exp = 0..;
while let Some(n) = rx.next().await {
assert_eq!(n, exp.next().unwrap());
}
assert_eq!(exp.next().unwrap(), 9);
});
runtime.progress_until_stalled().await;
});
}
}