futures_test/
interleave_pending.rs1use futures_core::future::{Future, FusedFuture};
2use futures_core::stream::{Stream, FusedStream};
3use futures_io::{self as io, AsyncBufRead, AsyncRead, AsyncWrite};
4use pin_utils::{unsafe_pinned, unsafe_unpinned};
5use std::{
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10#[derive(Debug)]
18pub struct InterleavePending<T> {
19 inner: T,
20 pended: bool,
21}
22
23impl<T: Unpin> Unpin for InterleavePending<T> {}
24
25impl<T> InterleavePending<T> {
26 unsafe_pinned!(inner: T);
27 unsafe_unpinned!(pended: bool);
28
29 pub(crate) fn new(inner: T) -> Self {
30 Self {
31 inner,
32 pended: false,
33 }
34 }
35
36 pub fn get_ref(&self) -> &T {
39 &self.inner
40 }
41
42 pub fn get_mut(&mut self) -> &mut T {
45 &mut self.inner
46 }
47
48 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
51 self.project().0
52 }
53
54 pub fn into_inner(self) -> T {
56 self.inner
57 }
58
59 fn project(self: Pin<&mut Self>) -> (Pin<&mut T>, &mut bool) {
60 unsafe {
61 let this = self.get_unchecked_mut();
62 (Pin::new_unchecked(&mut this.inner), &mut this.pended)
63 }
64 }
65}
66
67impl<Fut: Future> Future for InterleavePending<Fut> {
68 type Output = Fut::Output;
69
70 fn poll(
71 mut self: Pin<&mut Self>,
72 cx: &mut Context<'_>,
73 ) -> Poll<Self::Output> {
74 if *self.as_mut().pended() {
75 let next = self.as_mut().inner().poll(cx);
76 if next.is_ready() {
77 *self.pended() = false;
78 }
79 next
80 } else {
81 cx.waker().wake_by_ref();
82 *self.pended() = true;
83 Poll::Pending
84 }
85 }
86}
87
88impl<Fut: FusedFuture> FusedFuture for InterleavePending<Fut> {
89 fn is_terminated(&self) -> bool {
90 self.inner.is_terminated()
91 }
92}
93
94impl<St: Stream> Stream for InterleavePending<St> {
95 type Item = St::Item;
96
97 fn poll_next(
98 mut self: Pin<&mut Self>,
99 cx: &mut Context<'_>,
100 ) -> Poll<Option<Self::Item>> {
101 if *self.as_mut().pended() {
102 let next = self.as_mut().inner().poll_next(cx);
103 if next.is_ready() {
104 *self.pended() = false;
105 }
106 next
107 } else {
108 cx.waker().wake_by_ref();
109 *self.pended() = true;
110 Poll::Pending
111 }
112 }
113
114 fn size_hint(&self) -> (usize, Option<usize>) {
115 self.inner.size_hint()
116 }
117}
118
119impl<Fut: FusedStream> FusedStream for InterleavePending<Fut> {
120 fn is_terminated(&self) -> bool {
121 self.inner.is_terminated()
122 }
123}
124
125impl<W: AsyncWrite> AsyncWrite for InterleavePending<W> {
126 fn poll_write(
127 self: Pin<&mut Self>,
128 cx: &mut Context<'_>,
129 buf: &[u8],
130 ) -> Poll<io::Result<usize>> {
131 let (writer, pended) = self.project();
132 if *pended {
133 let next = writer.poll_write(cx, buf);
134 if next.is_ready() {
135 *pended = false;
136 }
137 next
138 } else {
139 cx.waker().wake_by_ref();
140 *pended = true;
141 Poll::Pending
142 }
143 }
144
145 fn poll_flush(
146 self: Pin<&mut Self>,
147 cx: &mut Context<'_>,
148 ) -> Poll<io::Result<()>> {
149 let (writer, pended) = self.project();
150 if *pended {
151 let next = writer.poll_flush(cx);
152 if next.is_ready() {
153 *pended = false;
154 }
155 next
156 } else {
157 cx.waker().wake_by_ref();
158 *pended = true;
159 Poll::Pending
160 }
161 }
162
163 fn poll_close(
164 self: Pin<&mut Self>,
165 cx: &mut Context<'_>,
166 ) -> Poll<io::Result<()>> {
167 let (writer, pended) = self.project();
168 if *pended {
169 let next = writer.poll_close(cx);
170 if next.is_ready() {
171 *pended = false;
172 }
173 next
174 } else {
175 cx.waker().wake_by_ref();
176 *pended = true;
177 Poll::Pending
178 }
179 }
180}
181
182impl<R: AsyncRead> AsyncRead for InterleavePending<R> {
183 fn poll_read(
184 self: Pin<&mut Self>,
185 cx: &mut Context<'_>,
186 buf: &mut [u8],
187 ) -> Poll<io::Result<usize>> {
188 let (reader, pended) = self.project();
189 if *pended {
190 let next = reader.poll_read(cx, buf);
191 if next.is_ready() {
192 *pended = false;
193 }
194 next
195 } else {
196 cx.waker().wake_by_ref();
197 *pended = true;
198 Poll::Pending
199 }
200 }
201}
202
203impl<R: AsyncBufRead> AsyncBufRead for InterleavePending<R> {
204 fn poll_fill_buf(
205 self: Pin<&mut Self>,
206 cx: &mut Context<'_>,
207 ) -> Poll<io::Result<&[u8]>> {
208 let (reader, pended) = self.project();
209 if *pended {
210 let next = reader.poll_fill_buf(cx);
211 if next.is_ready() {
212 *pended = false;
213 }
214 next
215 } else {
216 cx.waker().wake_by_ref();
217 *pended = true;
218 Poll::Pending
219 }
220 }
221
222 fn consume(self: Pin<&mut Self>, amount: usize) {
223 self.inner().consume(amount)
224 }
225}