1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
//! `flatten_iters` flattens a stream of iterators into one continuous stream.
//!
//! This is useful when you have a producer that is paging through a resource (like a REST endpoint
//! with pages or a next URL, an ElasticSearch query with a scroll parameter, etc.)
//!
//! This code is taken *almost* verbatim from [`StreamExt::flatten`] and is similar
//! in spirit to [`Iterator::flatten`].
//!
//! ```
//! use stream_flatten_iters::StreamExt as _;
//! use futures::stream::StreamExt;
//!
//! #[tokio::main]
//! async fn main() {
//!     let (mut tx, mut rx) = tokio::sync::mpsc::channel(3);
//!
//!     tokio::spawn(async move {
//!         tx.send(vec![0, 1, 2, 3]).await.unwrap();
//!         tx.send(vec![4, 5, 6]).await.unwrap();
//!         tx.send(vec![7, 8, 9]).await.unwrap();
//!     });
//!
//!     let mut stream = rx.flatten_iters();
//!
//!     while let Some(res) = stream.next().await {
//!         println!("got = {}", res);
//!     }
//! }
//!
//! // Output:
//! // got = 0
//! // got = 1
//! // got = 2
//! // got = 3
//! // got = 4
//! // got = 5
//! // got = 6
//! // got = 7
//! // got = 8
//! // got = 9
//! ```
//!
//! This is especially useful when combined with [`StreamExt::buffered`] to keep a buffer of promises going
//! throughout a long promise.
//!
//! ```
//! use stream_flatten_iters::StreamExt as _;
//! use futures::stream::StreamExt;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//!     let (mut tx, mut rx) = tokio::sync::mpsc::channel(3);
//!
//!     tokio::spawn(async move {
//!         for i in 0_usize..100 {
//!             let start = i * 10;
//!             let end = start + 10;
//!             tx.send(start..end).await.unwrap();
//!         }
//!     });
//!
//!     let mut stream = rx.flatten_iters().map(|i| long_process(i)).buffered(10);
//!
//!     let mut total = 0_usize;
//!     while let Some(res) = stream.next().await {
//!         let _ = res?;
//!         total += 1;
//!         println!("Completed {} tasks", total);
//!     }
//!
//!     Ok(())
//! }
//!
//! async fn long_process(i: usize) -> Result<(), Box<dyn std::error::Error>> {
//!     // Do something that takes a long time
//!     Ok(())
//! }
//! ```
//!
//! Similarly, `try_flatten_iters` is analogous to [`TryStreamExt::try_flatten`], and is useful for
//! flatting results of iterators.
//!
//! ```
//! use stream_flatten_iters::TryStreamExt as _;
//! use futures::stream::{StreamExt, TryStreamExt};
//!
//! #[tokio::main]
//! async fn main() {
//!     let (mut tx, mut rx) = tokio::sync::mpsc::channel(3);
//!
//!     tokio::spawn(async move {
//!         tx.send(Ok(vec![0_usize, 1, 2, 3])).await.unwrap();
//!         tx.send(Err(())).await.unwrap();
//!         tx.send(Ok(vec![4, 5, 6])).await.unwrap();
//!         tx.send(Ok(vec![7, 8, 9])).await.unwrap();
//!     });
//!
//!     let mut stream = rx.try_flatten_iters();
//!
//!     while let Some(res) = stream.next().await {
//!         println!("got = {:?}", res);
//!     }
//! }
//!
//! // Output:
//! // got = Ok(0)
//! // got = Ok(1)
//! // got = Ok(2)
//! // got = Ok(3)
//! // got = Err(())
//! // got = Ok(4)
//! // got = Ok(5)
//! // got = Ok(6)
//! // got = Ok(7)
//! // got = Ok(8)
//! // got = Ok(9)
//! ```
//!
//! [`StreamExt::flatten`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.flatten
//! [`TryStreamExt::try_flatten`]: https://docs.rs/futures/0.3/futures/stream/trait.TryStreamExt.html#method.try_flatten
//! [`Iterator::flatten`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.flatten
//! [`StreamExt::buffered`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.buffered

#![deny(missing_docs)]

mod flatten_iters;
mod try_flatten_iters;

pub use flatten_iters::{FlattenIters, StreamExt};
pub use try_flatten_iters::{TryFlattenIters, TryStreamExt};