asupersync/stream/
stream.rs1use std::ops::DerefMut;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11
12pub trait Stream {
31 type Item;
33
34 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
47
48 fn size_hint(&self) -> (usize, Option<usize>) {
53 (0, None)
54 }
55}
56
57impl<P> Stream for Pin<P>
59where
60 P: DerefMut + Unpin,
61 P::Target: Stream,
62{
63 type Item = <P::Target as Stream>::Item;
64
65 #[inline]
66 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67 self.get_mut().as_mut().poll_next(cx)
71 }
72
73 #[inline]
74 fn size_hint(&self) -> (usize, Option<usize>) {
75 (**self).size_hint()
76 }
77}
78
79impl<S: Stream + Unpin + ?Sized> Stream for Box<S> {
81 type Item = S::Item;
82
83 #[inline]
84 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
85 Pin::new(&mut **self).poll_next(cx)
86 }
87
88 #[inline]
89 fn size_hint(&self) -> (usize, Option<usize>) {
90 (**self).size_hint()
91 }
92}
93
94impl<S: Stream + Unpin + ?Sized> Stream for &mut S {
96 type Item = S::Item;
97
98 #[inline]
99 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
100 Pin::new(&mut **self).poll_next(cx)
101 }
102
103 #[inline]
104 fn size_hint(&self) -> (usize, Option<usize>) {
105 (**self).size_hint()
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112 use std::sync::Arc;
113 use std::task::{Wake, Waker};
114
115 struct NoopWaker;
116
117 impl Wake for NoopWaker {
118 fn wake(self: Arc<Self>) {}
119 }
120
121 fn noop_waker() -> Waker {
122 Waker::from(Arc::new(NoopWaker))
123 }
124
125 struct TestStream {
126 items: Vec<i32>,
127 index: usize,
128 }
129
130 impl TestStream {
131 fn new(items: Vec<i32>) -> Self {
132 Self { items, index: 0 }
133 }
134 }
135
136 impl Stream for TestStream {
137 type Item = i32;
138
139 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
140 if self.index < self.items.len() {
141 let item = self.items[self.index];
142 self.index += 1;
143 Poll::Ready(Some(item))
144 } else {
145 Poll::Ready(None)
146 }
147 }
148
149 fn size_hint(&self) -> (usize, Option<usize>) {
150 let remaining = self.items.len() - self.index;
151 (remaining, Some(remaining))
152 }
153 }
154
155 fn init_test(name: &str) {
156 crate::test_utils::init_test_logging();
157 crate::test_phase!(name);
158 }
159
160 #[test]
161 fn stream_produces_items() {
162 init_test("stream_produces_items");
163 let mut stream = TestStream::new(vec![1, 2, 3]);
164 let waker = noop_waker();
165 let mut cx = Context::from_waker(&waker);
166
167 let poll = Pin::new(&mut stream).poll_next(&mut cx);
168 let ok = matches!(poll, Poll::Ready(Some(1)));
169 crate::assert_with_log!(ok, "poll 1", "Poll::Ready(Some(1))", poll);
170 let poll = Pin::new(&mut stream).poll_next(&mut cx);
171 let ok = matches!(poll, Poll::Ready(Some(2)));
172 crate::assert_with_log!(ok, "poll 2", "Poll::Ready(Some(2))", poll);
173 let poll = Pin::new(&mut stream).poll_next(&mut cx);
174 let ok = matches!(poll, Poll::Ready(Some(3)));
175 crate::assert_with_log!(ok, "poll 3", "Poll::Ready(Some(3))", poll);
176 let poll = Pin::new(&mut stream).poll_next(&mut cx);
177 let ok = matches!(poll, Poll::Ready(None));
178 crate::assert_with_log!(ok, "poll done", "Poll::Ready(None)", poll);
179 crate::test_complete!("stream_produces_items");
180 }
181
182 #[test]
183 fn stream_size_hint() {
184 init_test("stream_size_hint");
185 let stream = TestStream::new(vec![1, 2, 3]);
186 let hint = stream.size_hint();
187 let ok = hint == (3, Some(3));
188 crate::assert_with_log!(ok, "size hint", (3, Some(3)), hint);
189 crate::test_complete!("stream_size_hint");
190 }
191
192 #[test]
193 fn boxed_stream() {
194 init_test("boxed_stream");
195 let mut stream: Box<TestStream> = Box::new(TestStream::new(vec![42]));
196 let waker = noop_waker();
197 let mut cx = Context::from_waker(&waker);
198
199 let poll = Pin::new(&mut stream).poll_next(&mut cx);
200 let ok = matches!(poll, Poll::Ready(Some(42)));
201 crate::assert_with_log!(ok, "poll boxed", "Poll::Ready(Some(42))", poll);
202 crate::test_complete!("boxed_stream");
203 }
204
205 #[test]
207 fn ref_mut_stream() {
208 init_test("ref_mut_stream");
209 let mut stream = TestStream::new(vec![7, 8]);
210 let waker = noop_waker();
211 let mut cx = Context::from_waker(&waker);
212
213 let stream_ref: &mut TestStream = &mut stream;
215 let poll = Pin::new(stream_ref).poll_next(&mut cx);
216 let ok = matches!(poll, Poll::Ready(Some(7)));
217 crate::assert_with_log!(ok, "ref_mut poll 1", true, ok);
218
219 let stream_ref: &mut TestStream = &mut stream;
221 let hint = Stream::size_hint(stream_ref);
222 let ok = hint == (1, Some(1));
223 crate::assert_with_log!(ok, "ref_mut size_hint", (1, Some(1)), hint);
224
225 crate::test_complete!("ref_mut_stream");
226 }
227
228 struct NoHint;
229 impl Stream for NoHint {
230 type Item = ();
231 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<()>> {
232 Poll::Ready(None)
233 }
234 }
235
236 #[test]
238 fn default_size_hint() {
239 init_test("default_size_hint");
240
241 let stream = NoHint;
242 let hint = stream.size_hint();
243 let ok = hint == (0, None);
244 crate::assert_with_log!(ok, "default size_hint", (0, None::<usize>), hint);
245
246 crate::test_complete!("default_size_hint");
247 }
248}