async_std/stream/mod.rs
1//! Composable asynchronous iteration.
2//!
3//! This module is an async version of [`std::iter`].
4//!
5//! If you've found yourself with an asynchronous collection of some kind,
6//! and needed to perform an operation on the elements of said collection,
7//! you'll quickly run into 'streams'. Streams are heavily used in idiomatic
8//! asynchronous Rust code, so it's worth becoming familiar with them.
9//!
10//! Before explaining more, let's talk about how this module is structured:
11//!
12//! # Organization
13//!
14//! This module is largely organized by type:
15//!
16//! * [Traits] are the core portion: these traits define what kind of streams
17//! exist and what you can do with them. The methods of these traits are worth
18//! putting some extra study time into.
19//! * [Functions] provide some helpful ways to create some basic streams.
20//! * [Structs] are often the return types of the various methods on this
21//! module's traits. You'll usually want to look at the method that creates
22//! the `struct`, rather than the `struct` itself. For more detail about why,
23//! see '[Implementing Stream](#implementing-stream)'.
24//!
25//! [Traits]: #traits
26//! [Functions]: #functions
27//! [Structs]: #structs
28//!
29//! That's it! Let's dig into streams.
30//!
31//! # Stream
32//!
33//! The heart and soul of this module is the [`Stream`] trait. The core of
34//! [`Stream`] looks like this:
35//!
36//! ```
37//! # use async_std::task::{Context, Poll};
38//! # use std::pin::Pin;
39//! trait Stream {
40//! type Item;
41//! fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
42//! }
43//! ```
44//!
45//! A stream has a method, [`next`], which when called, returns an
46//! [`Poll`]<[`Option`]`<Item>>`. [`next`] will return `Ready(Some(Item))`
47//! as long as there are elements, and once they've all been exhausted, will
48//! return `None` to indicate that iteration is finished. If we're waiting on
49//! something asynchronous to resolve `Pending` is returned.
50//!
51//! Individual streams may choose to resume iteration, and so calling
52//! [`next`] again may or may not eventually start returning `Ready(Some(Item))`
53//! again at some point.
54//!
55//! [`Stream`]'s full definition includes a number of other methods as well,
56//! but they are default methods, built on top of [`next`], and so you get
57//! them for free.
58//!
59//! Streams are also composable, and it's common to chain them together to do
60//! more complex forms of processing. See the [Adapters](#adapters) section
61//! below for more details.
62//!
63//! [`Poll`]: ../task/enum.Poll.html
64//! [`Stream`]: trait.Stream.html
65//! [`next`]: trait.Stream.html#tymethod.next
66//! [`Option`]: ../../std/option/enum.Option.html
67//!
68//! # The three forms of streaming
69//!
70//! There are three common methods which can create streams from a collection:
71//!
72//! * `stream()`, which iterates over `&T`.
73//! * `stream_mut()`, which iterates over `&mut T`.
74//! * `into_stream()`, which iterates over `T`.
75//!
76//! Various things in async-std may implement one or more of the
77//! three, where appropriate.
78//!
79//! # Implementing Stream
80//!
81//! Creating a stream of your own involves two steps: creating a `struct` to
82//! hold the stream's state, and then `impl`ementing [`Stream`] for that
83//! `struct`. This is why there are so many `struct`s in this module: there is
84//! one for each stream and iterator adapter.
85//!
86//! Let's make a stream named `Counter` which counts from `1` to `5`:
87//!
88//! ```
89//! # use async_std::prelude::*;
90//! # use async_std::task::{Context, Poll};
91//! # use std::pin::Pin;
92//! // First, the struct:
93//!
94//! /// A stream which counts from one to five
95//! struct Counter {
96//! count: usize,
97//! }
98//!
99//! // we want our count to start at one, so let's add a new() method to help.
100//! // This isn't strictly necessary, but is convenient. Note that we start
101//! // `count` at zero, we'll see why in `next()`'s implementation below.
102//! impl Counter {
103//! fn new() -> Counter {
104//! Counter { count: 0 }
105//! }
106//! }
107//!
108//! // Then, we implement `Stream` for our `Counter`:
109//!
110//! impl Stream for Counter {
111//! // we will be counting with usize
112//! type Item = usize;
113//!
114//! // poll_next() is the only required method
115//! fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
116//! // Increment our count. This is why we started at zero.
117//! self.count += 1;
118//!
119//! // Check to see if we've finished counting or not.
120//! if self.count < 6 {
121//! Poll::Ready(Some(self.count))
122//! } else {
123//! Poll::Ready(None)
124//! }
125//! }
126//! }
127//!
128//! // And now we can use it!
129//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
130//! #
131//! let mut counter = Counter::new();
132//!
133//! let x = counter.next().await.unwrap();
134//! println!("{}", x);
135//!
136//! let x = counter.next().await.unwrap();
137//! println!("{}", x);
138//!
139//! let x = counter.next().await.unwrap();
140//! println!("{}", x);
141//!
142//! let x = counter.next().await.unwrap();
143//! println!("{}", x);
144//!
145//! let x = counter.next().await.unwrap();
146//! println!("{}", x);
147//! #
148//! # Ok(()) }) }
149//! ```
150//!
151//! This will print `1` through `5`, each on their own line.
152//!
153//! Calling `next().await` this way gets repetitive. Rust has a construct which
154//! can call `next()` on your stream, until it reaches `None`. Let's go over
155//! that next.
156//!
157//! # while let Loops and IntoStream
158//!
159//! Rust's `while let` loop syntax is an idiomatic way to iterate over streams. Here's a basic
160//! example of `while let`:
161//!
162//! ```
163//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
164//! #
165//! # use async_std::prelude::*;
166//! # use async_std::stream;
167//! let mut values = stream::repeat(1u8).take(5);
168//!
169//! while let Some(x) = values.next().await {
170//! println!("{}", x);
171//! }
172//! #
173//! # Ok(()) }) }
174//! ```
175//!
176//! This will print the numbers one through five, each on their own line. But
177//! you'll notice something here: we never called anything on our vector to
178//! produce a stream. What gives?
179//!
180//! There's a trait in the standard library for converting something into an
181//! stream: [`IntoStream`]. This trait has one method, [`into_stream`],
182//! which converts the thing implementing [`IntoStream`] into a stream.
183//!
184//! Unlike `std::iter::IntoIterator`, `IntoStream` does not have compiler
185//! support yet. This means that automatic conversions like with `for` loops
186//! doesn't occur yet, and `into_stream` will always have to be called manually.
187//!
188//! [`IntoStream`]: trait.IntoStream.html
189//! [`into_stream`]: trait.IntoStream.html#tymethod.into_stream
190//!
191//! # Adapters
192//!
193//! Functions which take an [`Stream`] and return another [`Stream`] are
194//! often called 'stream adapters', as they are a form of the 'adapter
195//! pattern'.
196//!
197//! Common stream adapters include [`map`], [`take`], and [`filter`].
198//! For more, see their documentation.
199//!
200//! [`map`]: trait.Stream.html#method.map
201//! [`take`]: trait.Stream.html#method.take
202//! [`filter`]: trait.Stream.html#method.filter
203//!
204//! # Laziness
205//!
206//! Streams (and stream [adapters](#adapters)) are *lazy*. This means that
207//! just creating a stream doesn't _do_ a whole lot. Nothing really happens
208//! until you call [`next`]. This is sometimes a source of confusion when
209//! creating a stream solely for its side effects. For example, the [`map`]
210//! method calls a closure on each element it iterates over:
211//!
212//! ```
213//! # #![allow(unused_must_use)]
214//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
215//! #
216//! # use async_std::prelude::*;
217//! # use async_std::stream;
218//! let v = stream::repeat(1u8).take(5);
219//! v.map(|x| println!("{}", x));
220//! #
221//! # Ok(()) }) }
222//! ```
223//!
224//! This will not print any values, as we only created a stream, rather than
225//! using it. The compiler will warn us about this kind of behavior:
226//!
227//! ```text
228//! warning: unused result that must be used: streams are lazy and
229//! do nothing unless consumed
230//! ```
231//!
232//! The idiomatic way to write a [`map`] for its side effects is to use a
233//! `while let` loop instead:
234//!
235//! ```
236//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
237//! #
238//! # use async_std::prelude::*;
239//! # use async_std::stream;
240//! let mut v = stream::repeat(1u8).take(5);
241//!
242//! while let Some(x) = &v.next().await {
243//! println!("{}", x);
244//! }
245//! #
246//! # Ok(()) }) }
247//! ```
248//!
249//! [`map`]: trait.Stream.html#method.map
250//!
251//! The two most common ways to evaluate a stream are to use a `while let` loop
252//! like this, or using the [`collect`] method to produce a new collection.
253//!
254//! [`collect`]: trait.Stream.html#method.collect
255//!
256//! # Infinity
257//!
258//! Streams do not have to be finite. As an example, an repeat stream is
259//! an infinite stream:
260//!
261//! ```
262//! # use async_std::stream;
263//! let numbers = stream::repeat(1u8);
264//! ```
265//!
266//! It is common to use the [`take`] stream adapter to turn an infinite
267//! stream into a finite one:
268//!
269//! ```
270//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
271//! #
272//! # use async_std::prelude::*;
273//! # use async_std::stream;
274//! let numbers = stream::repeat(1u8);
275//! let mut five_numbers = numbers.take(5);
276//!
277//! while let Some(number) = five_numbers.next().await {
278//! println!("{}", number);
279//! }
280//! #
281//! # Ok(()) }) }
282//! ```
283//!
284//! This will print the numbers `0` through `4`, each on their own line.
285//!
286//! Bear in mind that methods on infinite streams, even those for which a
287//! result can be determined mathematically in finite time, may not terminate.
288//! Specifically, methods such as [`min`], which in the general case require
289//! traversing every element in the stream, are likely not to return
290//! successfully for any infinite streams.
291//!
292//! ```ignore
293//! let ones = async_std::stream::repeat(1);
294//! let least = ones.min().await.unwrap(); // Oh no! An infinite loop!
295//! // `ones.min()` causes an infinite loop, so we won't reach this point!
296//! println!("The smallest number one is {}.", least);
297//! ```
298//!
299//! [`std::iter`]: https://doc.rust-lang.org/std/iter/index.html
300//! [`take`]: trait.Stream.html#method.take
301//! [`min`]: trait.Stream.html#method.min
302
303pub use empty::{empty, Empty};
304pub use from_fn::{from_fn, FromFn};
305pub use from_iter::{from_iter, FromIter};
306pub use once::{once, Once};
307pub use repeat::{repeat, Repeat};
308pub use repeat_with::{repeat_with, RepeatWith};
309pub use stream::*;
310
311pub(crate) mod stream;
312
313mod empty;
314mod from_fn;
315mod from_iter;
316mod once;
317mod repeat;
318mod repeat_with;
319
320cfg_unstable! {
321 mod double_ended_stream;
322 mod exact_size_stream;
323 mod extend;
324 mod from_stream;
325 mod fused_stream;
326 mod interval;
327 mod into_stream;
328 mod pending;
329 mod product;
330 mod successors;
331 mod sum;
332
333 pub use double_ended_stream::DoubleEndedStream;
334 pub use exact_size_stream::ExactSizeStream;
335 pub use extend::{extend, Extend};
336 pub use from_stream::FromStream;
337 pub use fused_stream::FusedStream;
338 pub use interval::{interval, Interval};
339 pub use into_stream::IntoStream;
340 pub use pending::{pending, Pending};
341 pub use product::Product;
342 pub use stream::Merge;
343 pub use successors::{successors, Successors};
344 pub use sum::Sum;
345}