completion/stream/adapters/
skip_take_while.rs1use core::{
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use completion_core::CompletionStream;
9use futures_core::{ready, Stream};
10use pin_project_lite::pin_project;
11
12pin_project! {
13 #[derive(Debug, Clone)]
15 pub struct SkipWhile<S, P> {
16 #[pin]
17 stream: S,
18 skipping: bool,
19 predicate: P,
20 }
21}
22
23impl<S, P> SkipWhile<S, P> {
24 pub(crate) fn new(stream: S, predicate: P) -> Self {
25 Self {
26 stream,
27 skipping: true,
28 predicate,
29 }
30 }
31}
32
33impl<S: CompletionStream, P> CompletionStream for SkipWhile<S, P>
34where
35 P: FnMut(&S::Item) -> bool,
36{
37 type Item = S::Item;
38
39 unsafe fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
40 let mut this = self.project();
41
42 loop {
43 match ready!(this.stream.as_mut().poll_next(cx)) {
44 Some(item) => {
45 if *this.skipping {
46 *this.skipping = (this.predicate)(&item);
47 }
48 if !*this.skipping {
49 break Poll::Ready(Some(item));
50 }
51 }
52 None => break Poll::Ready(None),
53 }
54 }
55 }
56 unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
57 self.project().stream.poll_cancel(cx)
58 }
59 fn size_hint(&self) -> (usize, Option<usize>) {
60 if self.skipping {
61 (0, self.stream.size_hint().1)
62 } else {
63 self.stream.size_hint()
64 }
65 }
66}
67
68impl<S, P> Stream for SkipWhile<S, P>
69where
70 S: CompletionStream + Stream<Item = <S as CompletionStream>::Item>,
71 P: FnMut(&<S as CompletionStream>::Item) -> bool,
72{
73 type Item = <Self as CompletionStream>::Item;
74 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
75 unsafe { CompletionStream::poll_next(self, cx) }
76 }
77 fn size_hint(&self) -> (usize, Option<usize>) {
78 CompletionStream::size_hint(self)
79 }
80}
81
82pin_project! {
83 #[derive(Debug, Clone)]
85 pub struct TakeWhile<S, P> {
86 #[pin]
87 stream: S,
88 taking: bool,
89 predicate: P,
90 }
91}
92
93impl<S, P> TakeWhile<S, P> {
94 pub(crate) fn new(stream: S, predicate: P) -> Self {
95 Self {
96 stream,
97 taking: true,
98 predicate,
99 }
100 }
101}
102
103impl<S: CompletionStream, P> CompletionStream for TakeWhile<S, P>
104where
105 P: FnMut(&S::Item) -> bool,
106{
107 type Item = S::Item;
108
109 unsafe fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
110 let this = self.project();
111
112 if *this.taking {
113 match ready!(this.stream.poll_next(cx)) {
114 Some(item) => {
115 if (this.predicate)(&item) {
116 return Poll::Ready(Some(item));
117 }
118 *this.taking = false;
119 }
120 None => return Poll::Ready(None),
121 }
122 }
123 Poll::Ready(None)
124 }
125 unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
126 let this = self.project();
127 if *this.taking {
128 this.stream.poll_cancel(cx)
129 } else {
130 Poll::Ready(())
131 }
132 }
133 fn size_hint(&self) -> (usize, Option<usize>) {
134 if self.taking {
135 (0, self.stream.size_hint().1)
136 } else {
137 (0, Some(0))
138 }
139 }
140}
141
142impl<S, P> Stream for TakeWhile<S, P>
143where
144 S: CompletionStream + Stream<Item = <S as CompletionStream>::Item>,
145 P: FnMut(&<S as CompletionStream>::Item) -> bool,
146{
147 type Item = <Self as CompletionStream>::Item;
148 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
149 unsafe { CompletionStream::poll_next(self, cx) }
150 }
151 fn size_hint(&self) -> (usize, Option<usize>) {
152 CompletionStream::size_hint(self)
153 }
154}