1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
//! Use on `impl Stream`s via the [`TryStreamAndThenExt`] trait. //! //! Why is this necessary? Consider the example below. We have a `Stream` from `try_unfold`, but //! this stream is splitting some larger stream into sub-streams, with each sub-stream //! represented by a channel. If we simply called `and_then`, that function's implementation, //! as an optimization, only keeps *one* "pending" future in its state. This means that it //! cannot poll the backing stream, because that might produce another future which it has no //! space for. So, it *must* run the pending future to completion before polling the stream //! again. //! //! Unfortunately, in this case, the backing stream has to be polled for our future to resolve! //! So using `and_then` will deadlock. Instead, this crate makes a tradeoff: it will hold a //! list of pending futures in a `FuturesUnordered`, so it is safe to //! poll the backing stream. This means that if the resulting futures don't resolve, we could //! have a large list of futures. //! //! ```rust //! # use and_then_concurrent::TryStreamAndThenExt; //! # use futures_util::stream::TryStreamExt; //! # use std::collections::HashMap; //! # use std::time::Duration; //! # use tokio::{sync::mpsc, time::sleep}; //! # #[tokio::main] //! # async fn main() { //! let c = futures_util::stream::try_unfold( //! ( //! 0, //! HashMap::<usize, mpsc::UnboundedSender<(usize, usize)>>::default(), //! ), //! move |(mut i, mut map)| async move { //! loop { //! sleep(Duration::from_millis(10)).await; //! let (substream, message) = (i % 3, i); //! i += 1; //! if i > 25 { //! return Ok(None); //! } //! //! let mut new = None; //! if map //! .entry(substream) //! .or_insert_with(|| { //! let (sub_s, sub_r) = mpsc::unbounded_channel(); //! new = Some(sub_r); //! sub_s //! }) //! .send((substream, message)) //! .is_err() //! { //! map.remove(&substream); //! } //! //! if let Some(new_sub_r) = new { //! return Ok::<_, String>(Some((new_sub_r, (i, map)))); //! } //! } //! }, //! ) //! // .and_then(...) would deadlock! //! .and_then_concurrent(|mut res| async move { //! loop { //! let (stream, val): (usize, usize) = match res.recv().await { //! None => return Ok(()), //! Some(s) => s, //! }; //! println!("got {:?} on stream {:?}", val, stream); //! } //! }) //! .try_collect::<Vec<_>>(); //! c.await.unwrap(); //! # } //! ``` use futures_util::{ future::TryFuture, stream::{FuturesUnordered, Stream, TryStream}, }; use pin_project::pin_project; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; /// Extension to [`futures_util::stream::TryStreamExt`] pub trait TryStreamAndThenExt: TryStream { /// Chain a computation when a stream value is ready, passing `Ok` values to the closure `f`. /// /// This function is similar to [`futures_util::stream::TryStreamExt::and_then`], but the /// stream is polled concurrently with the futures returned by `f`. An unbounded number of /// futures corresponding to past stream values is kept via `FuturesUnordered`. /// /// See [crate-level docs](`crate`) for an explanation and usage example. fn and_then_concurrent<Fut, F>(self, f: F) -> AndThenConcurrent<Self, Fut, F> where Self: Sized, Fut: TryFuture<Error = Self::Error>, F: FnMut(Self::Ok) -> Fut; } impl<S: TryStream> TryStreamAndThenExt for S { fn and_then_concurrent<Fut, F>(self, f: F) -> AndThenConcurrent<Self, Fut, F> where Self: Sized, Fut: TryFuture<Error = Self::Error>, F: FnMut(Self::Ok) -> Fut, { AndThenConcurrent { stream: self, futs: FuturesUnordered::new(), fun: f, } } } /// Stream type for [`TryStreamAndThenExt::and_then_concurrent`]. #[pin_project(project = AndThenConcurrentProj)] pub struct AndThenConcurrent<St, Fut: TryFuture, F> { #[pin] stream: St, #[pin] futs: FuturesUnordered<Fut>, fun: F, } impl<St, Fut, F, T> Stream for AndThenConcurrent<St, Fut, F> where St: TryStream, Fut: Future<Output = Result<T, St::Error>>, F: FnMut(St::Ok) -> Fut, { type Item = Result<T, St::Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let AndThenConcurrentProj { mut stream, mut futs, fun, } = self.project(); match stream.as_mut().try_poll_next(cx) { Poll::Ready(Some(Ok(n))) => { futs.push(fun(n)); } Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), Poll::Pending => { if futs.is_empty() { return Poll::Pending; } } _ => (), } let x = futs.as_mut().poll_next(cx); if let Poll::Pending = x { // check stream once more match stream.as_mut().try_poll_next(cx) { Poll::Ready(Some(Ok(n))) => { futs.push(fun(n)); } _ => (), } } x } }