1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
//! `flatten_iters` flattens a stream of iterators into one continuous stream. //! //! This is useful when you have a producer that is paging through a resource (like a REST endpoint //! with pages or a next URL, an ElasticSearch query with a scroll parameter, etc.) //! //! This code is taken *almost* verbatim from [`StreamExt::flatten`] and is similar //! in spirit to [`Iterator::flatten`]. //! //! ``` //! use stream_flatten_iters::StreamExt as _; //! use futures::stream::StreamExt; //! //! #[tokio::main] //! async fn main() { //! let (mut tx, mut rx) = tokio::sync::mpsc::channel(3); //! //! tokio::spawn(async move { //! tx.send(vec![0, 1, 2, 3]).await.unwrap(); //! tx.send(vec![4, 5, 6]).await.unwrap(); //! tx.send(vec![7, 8, 9]).await.unwrap(); //! }); //! //! let mut stream = rx.flatten_iters(); //! //! while let Some(res) = stream.next().await { //! println!("got = {}", res); //! } //! } //! //! // Output: //! // got = 0 //! // got = 1 //! // got = 2 //! // got = 3 //! // got = 4 //! // got = 5 //! // got = 6 //! // got = 7 //! // got = 8 //! // got = 9 //! ``` //! //! This is especially useful when combined with [`StreamExt::buffered`] to keep a buffer of promises going //! throughout a long promise. //! //! ``` //! use stream_flatten_iters::StreamExt as _; //! use futures::stream::StreamExt; //! //! #[tokio::main] //! async fn main() -> Result<(), Box<dyn std::error::Error>> { //! let (mut tx, mut rx) = tokio::sync::mpsc::channel(3); //! //! tokio::spawn(async move { //! for i in 0_usize..100 { //! let start = i * 10; //! let end = start + 10; //! tx.send(start..end).await.unwrap(); //! } //! }); //! //! let mut stream = rx.flatten_iters().map(|i| long_process(i)).buffered(10); //! //! let mut total = 0_usize; //! while let Some(res) = stream.next().await { //! let _ = res?; //! total += 1; //! println!("Completed {} tasks", total); //! } //! //! Ok(()) //! } //! //! async fn long_process(i: usize) -> Result<(), Box<dyn std::error::Error>> { //! // Do something that takes a long time //! Ok(()) //! } //! ``` //! //! Similarly, `try_flatten_iters` is analogous to [`TryStreamExt::try_flatten`], and is useful for //! flatting results of iterators. //! //! ``` //! use stream_flatten_iters::TryStreamExt as _; //! use futures::stream::{StreamExt, TryStreamExt}; //! //! #[tokio::main] //! async fn main() { //! let (mut tx, mut rx) = tokio::sync::mpsc::channel(3); //! //! tokio::spawn(async move { //! tx.send(Ok(vec![0_usize, 1, 2, 3])).await.unwrap(); //! tx.send(Err(())).await.unwrap(); //! tx.send(Ok(vec![4, 5, 6])).await.unwrap(); //! tx.send(Ok(vec![7, 8, 9])).await.unwrap(); //! }); //! //! let mut stream = rx.try_flatten_iters(); //! //! while let Some(res) = stream.next().await { //! println!("got = {:?}", res); //! } //! } //! //! // Output: //! // got = Ok(0) //! // got = Ok(1) //! // got = Ok(2) //! // got = Ok(3) //! // got = Err(()) //! // got = Ok(4) //! // got = Ok(5) //! // got = Ok(6) //! // got = Ok(7) //! // got = Ok(8) //! // got = Ok(9) //! ``` //! //! [`StreamExt::flatten`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.flatten //! [`TryStreamExt::try_flatten`]: https://docs.rs/futures/0.3/futures/stream/trait.TryStreamExt.html#method.try_flatten //! [`Iterator::flatten`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.flatten //! [`StreamExt::buffered`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.buffered #![deny(missing_docs)] mod flatten_iters; mod try_flatten_iters; pub use flatten_iters::{FlattenIters, StreamExt}; pub use try_flatten_iters::{TryFlattenIters, TryStreamExt};