use super::Stream;
use pin_project::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};
const SKIP_COOPERATIVE_BUDGET: usize = 1024;
#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Skip<S> {
#[pin]
stream: S,
remaining: usize,
exhausted: bool,
}
impl<S> Skip<S> {
#[inline]
pub(crate) fn new(stream: S, remaining: usize) -> Self {
Self {
stream,
remaining,
exhausted: false,
}
}
}
impl<S: Stream> Stream for Skip<S> {
type Item = S::Item;
#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if *this.exhausted {
return Poll::Ready(None);
}
let mut skipped_this_poll = 0usize;
while *this.remaining > 0 {
match this.stream.as_mut().poll_next(cx) {
Poll::Ready(Some(_)) => {
*this.remaining -= 1;
skipped_this_poll += 1;
if *this.remaining > 0 && skipped_this_poll >= SKIP_COOPERATIVE_BUDGET {
cx.waker().wake_by_ref();
return Poll::Pending;
}
}
Poll::Ready(None) => {
*this.exhausted = true;
return Poll::Ready(None);
}
Poll::Pending => return Poll::Pending,
}
}
match this.stream.poll_next(cx) {
Poll::Ready(None) => {
*this.exhausted = true;
Poll::Ready(None)
}
other => other,
}
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
if self.exhausted {
return (0, Some(0));
}
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_sub(self.remaining);
let upper = upper.map(|x| x.saturating_sub(self.remaining));
(lower, upper)
}
}
#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct SkipWhile<S, F> {
#[pin]
stream: S,
predicate: F,
done: bool,
exhausted: bool,
}
impl<S, F> SkipWhile<S, F> {
#[inline]
pub(crate) fn new(stream: S, predicate: F) -> Self {
Self {
stream,
predicate,
done: false,
exhausted: false,
}
}
}
impl<S, F> Stream for SkipWhile<S, F>
where
S: Stream,
F: FnMut(&S::Item) -> bool,
{
type Item = S::Item;
#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if *this.exhausted {
return Poll::Ready(None);
}
if *this.done {
return match this.stream.poll_next(cx) {
Poll::Ready(None) => {
*this.exhausted = true;
Poll::Ready(None)
}
other => other,
};
}
let mut skipped_this_poll = 0usize;
loop {
match this.stream.as_mut().poll_next(cx) {
Poll::Ready(Some(item)) => {
if !(this.predicate)(&item) {
*this.done = true;
return Poll::Ready(Some(item));
}
skipped_this_poll += 1;
if skipped_this_poll >= SKIP_COOPERATIVE_BUDGET {
cx.waker().wake_by_ref();
return Poll::Pending;
}
}
Poll::Ready(None) => {
*this.exhausted = true;
return Poll::Ready(None);
}
Poll::Pending => return Poll::Pending,
}
}
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
if self.exhausted {
return (0, Some(0));
}
let (lower, upper) = self.stream.size_hint();
if self.done {
(lower, upper)
} else {
(0, upper)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::stream::iter;
use std::task::Waker;
fn noop_waker() -> Waker {
std::task::Waker::noop().clone()
}
fn collect<S: Stream + Unpin>(stream: &mut S) -> Vec<S::Item> {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let mut items = Vec::new();
while let Poll::Ready(Some(item)) = Pin::new(&mut *stream).poll_next(&mut cx) {
items.push(item);
}
items
}
#[derive(Debug, Default)]
struct AlwaysReadyCounter {
next: usize,
}
impl Stream for AlwaysReadyCounter {
type Item = usize;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let item = this.next;
this.next = this.next.saturating_add(1);
Poll::Ready(Some(item))
}
}
#[derive(Debug)]
struct ItemThenNoneThenPanics<T> {
item: Option<T>,
completed: bool,
}
impl<T> ItemThenNoneThenPanics<T> {
fn new(item: T) -> Self {
Self {
item: Some(item),
completed: false,
}
}
}
impl<T: Unpin> Stream for ItemThenNoneThenPanics<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Some(item) = this.item.take() {
return Poll::Ready(Some(item));
}
assert!(!this.completed, "inner stream repolled after completion");
this.completed = true;
Poll::Ready(None)
}
}
#[test]
fn test_skip_zero() {
let mut s = Skip::new(iter(vec![1, 2, 3]), 0);
assert_eq!(collect(&mut s), vec![1, 2, 3]);
}
#[test]
fn test_skip_some() {
let mut s = Skip::new(iter(vec![1, 2, 3, 4, 5]), 2);
assert_eq!(collect(&mut s), vec![3, 4, 5]);
}
#[test]
fn test_skip_all() {
let mut s = Skip::new(iter(vec![1, 2, 3]), 3);
assert_eq!(collect(&mut s), Vec::<i32>::new());
}
#[test]
fn test_skip_more_than_len() {
let mut s = Skip::new(iter(vec![1, 2]), 100);
assert_eq!(collect(&mut s), Vec::<i32>::new());
}
#[test]
fn test_skip_empty_stream() {
let mut s = Skip::new(iter(Vec::<i32>::new()), 5);
assert_eq!(collect(&mut s), Vec::<i32>::new());
}
#[test]
fn test_skip_size_hint() {
let s = Skip::new(iter(vec![1, 2, 3, 4, 5]), 2);
let (lower, upper) = s.size_hint();
assert_eq!(lower, 3);
assert_eq!(upper, Some(3));
}
#[test]
fn test_skip_while_basic() {
let mut s = SkipWhile::new(iter(vec![1, 2, 3, 4, 5]), |x: &i32| *x < 3);
assert_eq!(collect(&mut s), vec![3, 4, 5]);
}
#[test]
fn test_skip_while_none_skipped() {
let mut s = SkipWhile::new(iter(vec![5, 4, 3]), |x: &i32| *x < 3);
assert_eq!(collect(&mut s), vec![5, 4, 3]);
}
#[test]
fn test_skip_while_all_skipped() {
let mut s = SkipWhile::new(iter(vec![1, 2]), |x: &i32| *x < 10);
assert_eq!(collect(&mut s), Vec::<i32>::new());
}
#[test]
fn test_skip_while_empty() {
let mut s = SkipWhile::new(iter(Vec::<i32>::new()), |_: &i32| true);
assert_eq!(collect(&mut s), Vec::<i32>::new());
}
#[test]
fn test_skip_while_size_hint_before_done() {
let s = SkipWhile::new(iter(vec![1, 2, 3]), |x: &i32| *x < 2);
let (lower, upper) = s.size_hint();
assert_eq!(lower, 0); assert_eq!(upper, Some(3));
}
#[test]
fn test_skip_yields_after_budget_on_always_ready_stream() {
let mut s = Skip::new(AlwaysReadyCounter::default(), SKIP_COOPERATIVE_BUDGET + 5);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let first = Pin::new(&mut s).poll_next(&mut cx);
assert!(matches!(first, Poll::Pending));
assert_eq!(s.remaining, 5);
assert_eq!(s.stream.next, SKIP_COOPERATIVE_BUDGET);
let second = Pin::new(&mut s).poll_next(&mut cx);
assert_eq!(second, Poll::Ready(Some(SKIP_COOPERATIVE_BUDGET + 5)));
}
#[test]
fn test_skip_does_not_repoll_exhausted_upstream() {
let mut s = Skip::new(ItemThenNoneThenPanics::new(0usize), 1);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
assert_eq!(Pin::new(&mut s).poll_next(&mut cx), Poll::Ready(None));
assert_eq!(Pin::new(&mut s).poll_next(&mut cx), Poll::Ready(None));
}
#[test]
fn test_skip_while_yields_after_budget_when_predicate_stays_true() {
let mut s = SkipWhile::new(AlwaysReadyCounter::default(), |_: &usize| true);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let first = Pin::new(&mut s).poll_next(&mut cx);
assert!(matches!(first, Poll::Pending));
assert_eq!(s.stream.next, SKIP_COOPERATIVE_BUDGET);
assert!(!s.done);
}
#[test]
fn test_skip_while_does_not_repoll_exhausted_upstream_while_skipping() {
let mut s = SkipWhile::new(ItemThenNoneThenPanics::new(0usize), |_: &usize| true);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
assert_eq!(Pin::new(&mut s).poll_next(&mut cx), Poll::Ready(None));
assert_eq!(Pin::new(&mut s).poll_next(&mut cx), Poll::Ready(None));
}
#[test]
fn test_skip_while_does_not_repoll_exhausted_upstream_after_done() {
let mut s = SkipWhile::new(ItemThenNoneThenPanics::new(5usize), |x: &usize| *x < 5);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
assert_eq!(Pin::new(&mut s).poll_next(&mut cx), Poll::Ready(Some(5)));
assert_eq!(Pin::new(&mut s).poll_next(&mut cx), Poll::Ready(None));
assert_eq!(Pin::new(&mut s).poll_next(&mut cx), Poll::Ready(None));
}
}