completion/stream/adapters/
skip_take_while.rs

1//! `SkipWhile` and `TakeWhile`.
2
3use core::{
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use completion_core::CompletionStream;
9use futures_core::{ready, Stream};
10use pin_project_lite::pin_project;
11
12pin_project! {
13    /// Stream for [`CompletionStreamExt::skip_while`](crate::CompletionStreamExt::skip_while).
14    #[derive(Debug, Clone)]
15    pub struct SkipWhile<S, P> {
16        #[pin]
17        stream: S,
18        skipping: bool,
19        predicate: P,
20    }
21}
22
23impl<S, P> SkipWhile<S, P> {
24    pub(crate) fn new(stream: S, predicate: P) -> Self {
25        Self {
26            stream,
27            skipping: true,
28            predicate,
29        }
30    }
31}
32
33impl<S: CompletionStream, P> CompletionStream for SkipWhile<S, P>
34where
35    P: FnMut(&S::Item) -> bool,
36{
37    type Item = S::Item;
38
39    unsafe fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
40        let mut this = self.project();
41
42        loop {
43            match ready!(this.stream.as_mut().poll_next(cx)) {
44                Some(item) => {
45                    if *this.skipping {
46                        *this.skipping = (this.predicate)(&item);
47                    }
48                    if !*this.skipping {
49                        break Poll::Ready(Some(item));
50                    }
51                }
52                None => break Poll::Ready(None),
53            }
54        }
55    }
56    unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
57        self.project().stream.poll_cancel(cx)
58    }
59    fn size_hint(&self) -> (usize, Option<usize>) {
60        if self.skipping {
61            (0, self.stream.size_hint().1)
62        } else {
63            self.stream.size_hint()
64        }
65    }
66}
67
68impl<S, P> Stream for SkipWhile<S, P>
69where
70    S: CompletionStream + Stream<Item = <S as CompletionStream>::Item>,
71    P: FnMut(&<S as CompletionStream>::Item) -> bool,
72{
73    type Item = <Self as CompletionStream>::Item;
74    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
75        unsafe { CompletionStream::poll_next(self, cx) }
76    }
77    fn size_hint(&self) -> (usize, Option<usize>) {
78        CompletionStream::size_hint(self)
79    }
80}
81
82pin_project! {
83    /// Stream for [`CompletionStreamExt::take_while`](crate::CompletionStreamExt::take_while).
84    #[derive(Debug, Clone)]
85    pub struct TakeWhile<S, P> {
86        #[pin]
87        stream: S,
88        taking: bool,
89        predicate: P,
90    }
91}
92
93impl<S, P> TakeWhile<S, P> {
94    pub(crate) fn new(stream: S, predicate: P) -> Self {
95        Self {
96            stream,
97            taking: true,
98            predicate,
99        }
100    }
101}
102
103impl<S: CompletionStream, P> CompletionStream for TakeWhile<S, P>
104where
105    P: FnMut(&S::Item) -> bool,
106{
107    type Item = S::Item;
108
109    unsafe fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
110        let this = self.project();
111
112        if *this.taking {
113            match ready!(this.stream.poll_next(cx)) {
114                Some(item) => {
115                    if (this.predicate)(&item) {
116                        return Poll::Ready(Some(item));
117                    }
118                    *this.taking = false;
119                }
120                None => return Poll::Ready(None),
121            }
122        }
123        Poll::Ready(None)
124    }
125    unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
126        let this = self.project();
127        if *this.taking {
128            this.stream.poll_cancel(cx)
129        } else {
130            Poll::Ready(())
131        }
132    }
133    fn size_hint(&self) -> (usize, Option<usize>) {
134        if self.taking {
135            (0, self.stream.size_hint().1)
136        } else {
137            (0, Some(0))
138        }
139    }
140}
141
142impl<S, P> Stream for TakeWhile<S, P>
143where
144    S: CompletionStream + Stream<Item = <S as CompletionStream>::Item>,
145    P: FnMut(&<S as CompletionStream>::Item) -> bool,
146{
147    type Item = <Self as CompletionStream>::Item;
148    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
149        unsafe { CompletionStream::poll_next(self, cx) }
150    }
151    fn size_hint(&self) -> (usize, Option<usize>) {
152        CompletionStream::size_hint(self)
153    }
154}