#![deny(missing_docs)]
pub mod fuse_last;
mod mock_sink;
pub use mock_sink::SinkMock;
use futures::never::Never;
use futures::sink::Sink;
use std::iter::{repeat, successors, Repeat};
use std::marker::PhantomData;
use std::{
pin::Pin,
task::{Context, Poll},
};
fn reverse<E>(poll: &Poll<Result<(), E>>) -> Option<Poll<Result<(), E>>> {
match poll {
Poll::Pending => Some(Poll::Ready(Ok(()))),
Poll::Ready(_) => Some(Poll::Pending),
}
}
pub struct SinkFeedback<E, FI, SSI, Item> {
poll_fallback: FI,
start_send_fallback: SSI,
item_type: PhantomData<Item>,
err_typpe: PhantomData<E>,
}
type Drain<Item> =
SinkFeedback<Never, Repeat<Poll<Result<(), Never>>>, Repeat<Result<(), Never>>, Item>;
pub fn ok<Item>() -> Drain<Item> {
Drain {
poll_fallback: repeat(Poll::Ready(Ok(()))),
start_send_fallback: repeat(Ok(())),
item_type: Default::default(),
err_typpe: Default::default(),
}
}
pub fn interleave_pending<Item>() -> impl Sink<Item, Error = Never>
where
Item: Unpin,
{
let poll_fallback = successors(Some(Poll::Ready(Ok(()))), reverse);
let ss_value: Result<(), Never> = Ok(());
let start_send_fallback = repeat(ss_value);
from_iter(poll_fallback, start_send_fallback)
}
pub fn from_iter<Item, FI, SSI, E>(
poll_fallback: FI,
start_send_fallback: SSI,
) -> impl Sink<Item, Error = E>
where
FI: Iterator<Item = Poll<Result<(), E>>> + Unpin,
SSI: Iterator<Item = Result<(), E>> + Unpin,
E: Unpin,
Item: Unpin,
{
SinkFeedback {
poll_fallback,
start_send_fallback,
item_type: Default::default(),
err_typpe: Default::default(),
}
}
impl<E, FI, SSI, Item> Sink<Item> for SinkFeedback<E, FI, SSI, Item>
where
Self: Sized + Unpin,
FI: Iterator<Item = Poll<Result<(), E>>>,
SSI: Iterator<Item = Result<(), E>>,
{
type Error = E;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = Pin::into_inner(self);
match this.poll_fallback.next().unwrap() {
Poll::Ready(t) => Poll::Ready(t),
Poll::Pending => {
cx.waker().clone().wake();
Poll::Pending
}
}
}
fn start_send(self: Pin<&mut Self>, _item: Item) -> Result<(), Self::Error> {
Pin::into_inner(self).start_send_fallback.next().unwrap()
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_ready(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_ready(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_task::waker_fn;
use std::sync::{atomic, Arc};
#[test]
fn test_ok() {
let wake_cnt = Arc::new(atomic::AtomicUsize::new(0));
let cnt = wake_cnt.clone();
let waker = waker_fn(move || {
wake_cnt.fetch_add(1, atomic::Ordering::SeqCst);
});
let mut cx = Context::from_waker(&waker);
let mut d = super::ok();
let r1 = Pin::new(&mut d).poll_ready(&mut cx);
let s1 = Pin::new(&mut d).start_send(1);
assert_eq!(r1, Poll::Ready(Ok(())));
assert_eq!(s1, Ok(()));
assert_eq!(0, cnt.load(atomic::Ordering::SeqCst));
}
#[test]
fn test_interleave_pending() {
let wake_cnt = Arc::new(atomic::AtomicUsize::new(0));
let cnt = wake_cnt.clone();
let waker = waker_fn(move || {
wake_cnt.fetch_add(1, atomic::Ordering::SeqCst);
});
let mut cx = Context::from_waker(&waker);
let mut s = interleave_pending();
let r1 = Pin::new(&mut s).poll_ready(&mut cx);
assert_eq!(r1, Poll::Ready(Ok(())));
for v in 5..140 {
let r_s = Pin::new(&mut s).start_send(v);
assert_eq!(r_s, Ok(()));
}
assert_eq!(0, cnt.load(atomic::Ordering::SeqCst));
let r2 = Pin::new(&mut s).poll_ready(&mut cx);
assert_eq!(r2, Poll::Pending);
assert_eq!(1, cnt.load(atomic::Ordering::SeqCst));
let r3 = Pin::new(&mut s).poll_ready(&mut cx);
assert_eq!(r3, Poll::Ready(Ok(())));
assert_eq!(1, cnt.load(atomic::Ordering::SeqCst));
}
#[test]
fn test_from_iter() {
let wake_cnt = Arc::new(atomic::AtomicUsize::new(0));
let cnt = wake_cnt.clone();
let waker = waker_fn(move || {
wake_cnt.fetch_add(1, atomic::Ordering::SeqCst);
});
let mut cx = Context::from_waker(&waker);
let poll_fallback = vec![
Poll::Ready(Ok(())),
Poll::Ready(Ok(())),
Poll::Pending,
Poll::Ready(Err(12)),
]
.into_iter();
let start_send_fallback = vec![Ok::<_, u32>(())].into_iter().cycle();
let mut s = from_iter(poll_fallback, start_send_fallback);
let r1 = Pin::new(&mut s).poll_ready(&mut cx);
assert_eq!(r1, Poll::Ready(Ok(())));
let s1 = Pin::new(&mut s).start_send(1);
assert_eq!(s1, Ok(()));
let r2 = Pin::new(&mut s).poll_ready(&mut cx);
assert_eq!(r2, Poll::Ready(Ok(())));
let s2 = Pin::new(&mut s).start_send(2);
assert_eq!(s2, Ok(()));
assert_eq!(0, cnt.load(atomic::Ordering::SeqCst));
let r3 = Pin::new(&mut s).poll_ready(&mut cx);
assert_eq!(r3, Poll::Pending);
assert_eq!(1, cnt.load(atomic::Ordering::SeqCst));
let r4 = Pin::new(&mut s).poll_ready(&mut cx);
assert_eq!(r4, Poll::Ready(Err(12)));
assert_eq!(1, cnt.load(atomic::Ordering::SeqCst));
}
#[test]
#[should_panic]
fn test_panic_on_iter_end() {
let wake_cnt = Arc::new(atomic::AtomicUsize::new(0));
let waker = waker_fn(move || {
wake_cnt.fetch_add(1, atomic::Ordering::SeqCst);
});
let mut cx = Context::from_waker(&waker);
let poll_fallback = vec![Poll::Ready(Ok(()))].into_iter();
let start_send_fallback = vec![Ok::<_, u32>(())].into_iter().cycle();
let mut s = from_iter(poll_fallback, start_send_fallback);
let r1 = Pin::new(&mut s).poll_ready(&mut cx);
assert_eq!(r1, Poll::Ready(Ok(())));
let s1 = Pin::new(&mut s).start_send(1);
assert_eq!(s1, Ok(()));
let _ = Pin::new(&mut s).poll_ready(&mut cx);
}
}