Skip to main content

asupersync/stream/
stream.rs

1//! The core Stream trait for asynchronous iteration.
2//!
3//! # Cancel Safety
4//!
5//! The Stream trait is inherently cancel-safe at yield points. Dropping a
6//! stream mid-iteration is safe, though any buffered items may be lost.
7
8use std::ops::DerefMut;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11
12/// Asynchronous iterator producing a sequence of values.
13///
14/// This is the async equivalent of `Iterator`. Each call to `poll_next`
15/// attempts to pull out the next value, returning `Poll::Pending` if the
16/// value is not yet ready, `Poll::Ready(Some(item))` if a value is available,
17/// or `Poll::Ready(None)` if the stream has terminated.
18///
19/// # Examples
20///
21/// ```ignore
22/// use asupersync::stream::{Stream, StreamExt};
23///
24/// async fn process<S: Stream<Item = i32> + Unpin>(mut stream: S) {
25///     while let Some(item) = stream.next().await {
26///         println!("got: {}", item);
27///     }
28/// }
29/// ```
30pub trait Stream {
31    /// The type of values yielded by the stream.
32    type Item;
33
34    /// Attempt to pull out the next value of this stream.
35    ///
36    /// # Return value
37    ///
38    /// - `Poll::Pending` means the next value is not ready yet.
39    /// - `Poll::Ready(Some(val))` means `val` is ready and the stream may have more.
40    /// - `Poll::Ready(None)` means the stream has terminated.
41    ///
42    /// # Cancel Safety
43    ///
44    /// This method is cancel-safe. If `poll_next` returns `Poll::Pending`,
45    /// no data has been lost.
46    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
47
48    /// Returns the bounds on the remaining length of the stream.
49    ///
50    /// The default implementation returns `(0, None)` which is correct for any
51    /// stream.
52    fn size_hint(&self) -> (usize, Option<usize>) {
53        (0, None)
54    }
55}
56
57// Implement Stream for Pin<P> where P derefs to a Stream
58impl<P> Stream for Pin<P>
59where
60    P: DerefMut + Unpin,
61    P::Target: Stream,
62{
63    type Item = <P::Target as Stream>::Item;
64
65    #[inline]
66    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67        // self is Pin<&mut Pin<P>>
68        // self.get_mut() returns &mut Pin<P>
69        // as_mut() returns Pin<&mut P::Target>
70        self.get_mut().as_mut().poll_next(cx)
71    }
72
73    #[inline]
74    fn size_hint(&self) -> (usize, Option<usize>) {
75        (**self).size_hint()
76    }
77}
78
79// Implement Stream for Box<S> where S is a Stream
80impl<S: Stream + Unpin + ?Sized> Stream for Box<S> {
81    type Item = S::Item;
82
83    #[inline]
84    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
85        Pin::new(&mut **self).poll_next(cx)
86    }
87
88    #[inline]
89    fn size_hint(&self) -> (usize, Option<usize>) {
90        (**self).size_hint()
91    }
92}
93
94// Implement Stream for &mut S where S is a Stream
95impl<S: Stream + Unpin + ?Sized> Stream for &mut S {
96    type Item = S::Item;
97
98    #[inline]
99    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
100        Pin::new(&mut **self).poll_next(cx)
101    }
102
103    #[inline]
104    fn size_hint(&self) -> (usize, Option<usize>) {
105        (**self).size_hint()
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112    use std::sync::Arc;
113    use std::task::{Wake, Waker};
114
115    struct NoopWaker;
116
117    impl Wake for NoopWaker {
118        fn wake(self: Arc<Self>) {}
119    }
120
121    fn noop_waker() -> Waker {
122        Waker::from(Arc::new(NoopWaker))
123    }
124
125    struct TestStream {
126        items: Vec<i32>,
127        index: usize,
128    }
129
130    impl TestStream {
131        fn new(items: Vec<i32>) -> Self {
132            Self { items, index: 0 }
133        }
134    }
135
136    impl Stream for TestStream {
137        type Item = i32;
138
139        fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
140            if self.index < self.items.len() {
141                let item = self.items[self.index];
142                self.index += 1;
143                Poll::Ready(Some(item))
144            } else {
145                Poll::Ready(None)
146            }
147        }
148
149        fn size_hint(&self) -> (usize, Option<usize>) {
150            let remaining = self.items.len() - self.index;
151            (remaining, Some(remaining))
152        }
153    }
154
155    fn init_test(name: &str) {
156        crate::test_utils::init_test_logging();
157        crate::test_phase!(name);
158    }
159
160    #[test]
161    fn stream_produces_items() {
162        init_test("stream_produces_items");
163        let mut stream = TestStream::new(vec![1, 2, 3]);
164        let waker = noop_waker();
165        let mut cx = Context::from_waker(&waker);
166
167        let poll = Pin::new(&mut stream).poll_next(&mut cx);
168        let ok = matches!(poll, Poll::Ready(Some(1)));
169        crate::assert_with_log!(ok, "poll 1", "Poll::Ready(Some(1))", poll);
170        let poll = Pin::new(&mut stream).poll_next(&mut cx);
171        let ok = matches!(poll, Poll::Ready(Some(2)));
172        crate::assert_with_log!(ok, "poll 2", "Poll::Ready(Some(2))", poll);
173        let poll = Pin::new(&mut stream).poll_next(&mut cx);
174        let ok = matches!(poll, Poll::Ready(Some(3)));
175        crate::assert_with_log!(ok, "poll 3", "Poll::Ready(Some(3))", poll);
176        let poll = Pin::new(&mut stream).poll_next(&mut cx);
177        let ok = matches!(poll, Poll::Ready(None));
178        crate::assert_with_log!(ok, "poll done", "Poll::Ready(None)", poll);
179        crate::test_complete!("stream_produces_items");
180    }
181
182    #[test]
183    fn stream_size_hint() {
184        init_test("stream_size_hint");
185        let stream = TestStream::new(vec![1, 2, 3]);
186        let hint = stream.size_hint();
187        let ok = hint == (3, Some(3));
188        crate::assert_with_log!(ok, "size hint", (3, Some(3)), hint);
189        crate::test_complete!("stream_size_hint");
190    }
191
192    #[test]
193    fn boxed_stream() {
194        init_test("boxed_stream");
195        let mut stream: Box<TestStream> = Box::new(TestStream::new(vec![42]));
196        let waker = noop_waker();
197        let mut cx = Context::from_waker(&waker);
198
199        let poll = Pin::new(&mut stream).poll_next(&mut cx);
200        let ok = matches!(poll, Poll::Ready(Some(42)));
201        crate::assert_with_log!(ok, "poll boxed", "Poll::Ready(Some(42))", poll);
202        crate::test_complete!("boxed_stream");
203    }
204
205    /// Invariant: `&mut S` implements Stream by forwarding to the underlying stream.
206    #[test]
207    fn ref_mut_stream() {
208        init_test("ref_mut_stream");
209        let mut stream = TestStream::new(vec![7, 8]);
210        let waker = noop_waker();
211        let mut cx = Context::from_waker(&waker);
212
213        // Poll via &mut reference.
214        let stream_ref: &mut TestStream = &mut stream;
215        let poll = Pin::new(stream_ref).poll_next(&mut cx);
216        let ok = matches!(poll, Poll::Ready(Some(7)));
217        crate::assert_with_log!(ok, "ref_mut poll 1", true, ok);
218
219        // size_hint forwarding via &mut.
220        let stream_ref: &mut TestStream = &mut stream;
221        let hint = Stream::size_hint(stream_ref);
222        let ok = hint == (1, Some(1));
223        crate::assert_with_log!(ok, "ref_mut size_hint", (1, Some(1)), hint);
224
225        crate::test_complete!("ref_mut_stream");
226    }
227
228    struct NoHint;
229    impl Stream for NoHint {
230        type Item = ();
231        fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<()>> {
232            Poll::Ready(None)
233        }
234    }
235
236    /// Invariant: default size_hint returns (0, None).
237    #[test]
238    fn default_size_hint() {
239        init_test("default_size_hint");
240
241        let stream = NoHint;
242        let hint = stream.size_hint();
243        let ok = hint == (0, None);
244        crate::assert_with_log!(ok, "default size_hint", (0, None::<usize>), hint);
245
246        crate::test_complete!("default_size_hint");
247    }
248}