futures_test/
interleave_pending.rs

1use futures_core::future::{Future, FusedFuture};
2use futures_core::stream::{Stream, FusedStream};
3use futures_io::{self as io, AsyncBufRead, AsyncRead, AsyncWrite};
4use pin_utils::{unsafe_pinned, unsafe_unpinned};
5use std::{
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10/// Wrapper that interleaves [`Poll::Pending`] in calls to poll.
11///
12/// See the `interleave_pending` methods on:
13/// * [`FutureTestExt`](crate::future::FutureTestExt::interleave_pending)
14/// * [`StreamTestExt`](crate::stream::StreamTestExt::interleave_pending)
15/// * [`AsyncReadTestExt`](crate::io::AsyncReadTestExt::interleave_pending)
16/// * [`AsyncWriteTestExt`](crate::io::AsyncWriteTestExt::interleave_pending_write)
17#[derive(Debug)]
18pub struct InterleavePending<T> {
19    inner: T,
20    pended: bool,
21}
22
23impl<T: Unpin> Unpin for InterleavePending<T> {}
24
25impl<T> InterleavePending<T> {
26    unsafe_pinned!(inner: T);
27    unsafe_unpinned!(pended: bool);
28
29    pub(crate) fn new(inner: T) -> Self {
30        Self {
31            inner,
32            pended: false,
33        }
34    }
35
36    /// Acquires a reference to the underlying I/O object that this adaptor is
37    /// wrapping.
38    pub fn get_ref(&self) -> &T {
39        &self.inner
40    }
41
42    /// Acquires a mutable reference to the underlying I/O object that this
43    /// adaptor is wrapping.
44    pub fn get_mut(&mut self) -> &mut T {
45        &mut self.inner
46    }
47
48    /// Acquires a pinned mutable reference to the underlying I/O object that
49    /// this adaptor is wrapping.
50    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
51        self.project().0
52    }
53
54    /// Consumes this adaptor returning the underlying I/O object.
55    pub fn into_inner(self) -> T {
56        self.inner
57    }
58
59    fn project(self: Pin<&mut Self>) -> (Pin<&mut T>, &mut bool) {
60        unsafe {
61            let this = self.get_unchecked_mut();
62            (Pin::new_unchecked(&mut this.inner), &mut this.pended)
63        }
64    }
65}
66
67impl<Fut: Future> Future for InterleavePending<Fut> {
68    type Output = Fut::Output;
69
70    fn poll(
71        mut self: Pin<&mut Self>,
72        cx: &mut Context<'_>,
73    ) -> Poll<Self::Output> {
74        if *self.as_mut().pended() {
75            let next = self.as_mut().inner().poll(cx);
76            if next.is_ready() {
77                *self.pended() = false;
78            }
79            next
80        } else {
81            cx.waker().wake_by_ref();
82            *self.pended() = true;
83            Poll::Pending
84        }
85    }
86}
87
88impl<Fut: FusedFuture> FusedFuture for InterleavePending<Fut> {
89    fn is_terminated(&self) -> bool {
90        self.inner.is_terminated()
91    }
92}
93
94impl<St: Stream> Stream for InterleavePending<St> {
95    type Item = St::Item;
96
97    fn poll_next(
98        mut self: Pin<&mut Self>,
99        cx: &mut Context<'_>,
100    ) -> Poll<Option<Self::Item>> {
101        if *self.as_mut().pended() {
102            let next = self.as_mut().inner().poll_next(cx);
103            if next.is_ready() {
104                *self.pended() = false;
105            }
106            next
107        } else {
108            cx.waker().wake_by_ref();
109            *self.pended() = true;
110            Poll::Pending
111        }
112    }
113
114    fn size_hint(&self) -> (usize, Option<usize>) {
115        self.inner.size_hint()
116    }
117}
118
119impl<Fut: FusedStream> FusedStream for InterleavePending<Fut> {
120    fn is_terminated(&self) -> bool {
121        self.inner.is_terminated()
122    }
123}
124
125impl<W: AsyncWrite> AsyncWrite for InterleavePending<W> {
126    fn poll_write(
127        self: Pin<&mut Self>,
128        cx: &mut Context<'_>,
129        buf: &[u8],
130    ) -> Poll<io::Result<usize>> {
131        let (writer, pended) = self.project();
132        if *pended {
133            let next = writer.poll_write(cx, buf);
134            if next.is_ready() {
135                *pended = false;
136            }
137            next
138        } else {
139            cx.waker().wake_by_ref();
140            *pended = true;
141            Poll::Pending
142        }
143    }
144
145    fn poll_flush(
146        self: Pin<&mut Self>,
147        cx: &mut Context<'_>,
148    ) -> Poll<io::Result<()>> {
149        let (writer, pended) = self.project();
150        if *pended {
151            let next = writer.poll_flush(cx);
152            if next.is_ready() {
153                *pended = false;
154            }
155            next
156        } else {
157            cx.waker().wake_by_ref();
158            *pended = true;
159            Poll::Pending
160        }
161    }
162
163    fn poll_close(
164        self: Pin<&mut Self>,
165        cx: &mut Context<'_>,
166    ) -> Poll<io::Result<()>> {
167        let (writer, pended) = self.project();
168        if *pended {
169            let next = writer.poll_close(cx);
170            if next.is_ready() {
171                *pended = false;
172            }
173            next
174        } else {
175            cx.waker().wake_by_ref();
176            *pended = true;
177            Poll::Pending
178        }
179    }
180}
181
182impl<R: AsyncRead> AsyncRead for InterleavePending<R> {
183    fn poll_read(
184        self: Pin<&mut Self>,
185        cx: &mut Context<'_>,
186        buf: &mut [u8],
187    ) -> Poll<io::Result<usize>> {
188        let (reader, pended) = self.project();
189        if *pended {
190            let next = reader.poll_read(cx, buf);
191            if next.is_ready() {
192                *pended = false;
193            }
194            next
195        } else {
196            cx.waker().wake_by_ref();
197            *pended = true;
198            Poll::Pending
199        }
200    }
201}
202
203impl<R: AsyncBufRead> AsyncBufRead for InterleavePending<R> {
204    fn poll_fill_buf(
205        self: Pin<&mut Self>,
206        cx: &mut Context<'_>,
207    ) -> Poll<io::Result<&[u8]>> {
208        let (reader, pended) = self.project();
209        if *pended {
210            let next = reader.poll_fill_buf(cx);
211            if next.is_ready() {
212                *pended = false;
213            }
214            next
215        } else {
216            cx.waker().wake_by_ref();
217            *pended = true;
218            Poll::Pending
219        }
220    }
221
222    fn consume(self: Pin<&mut Self>, amount: usize) {
223        self.inner().consume(amount)
224    }
225}