1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
//! Allows to easily generate streams with async/await. #[cfg(test)] mod tests; use futures::{channel::mpsc, sink::SinkExt, stream, stream::StreamExt, Future, Stream}; /// A handle that's used to produce (yield) values of the stream. pub struct Yielder<T>(mpsc::Sender<T>); impl<T> Yielder<T> { /// Yields `value` from the stream created with `generate_stream` /// or `generate_try_stream`. /// /// Note that `value` will be dropped if the corresponding stream is dropped before it could /// produce that value. pub async fn send(&mut self, value: T) { let _ = self.0.send(value).await; } } impl<T> Clone for Yielder<T> { fn clone(&self) -> Self { Yielder(self.0.clone()) } } /// Creates a stream from `generator`. /// /// `generator` will receive a yielder object as an argument and should return a future /// (usually an async block) that will produce the stream's values using the yielder. /// If the future finishes, the stream will end after producing all yielded values. If the /// future never finishes, the stream will also never finish. /// ``` /// use futures::{stream::StreamExt, Stream}; /// use stream_generator::generate_stream; /// /// fn my_stream(start: u32) -> impl Stream<Item=u32> { /// generate_stream(move |mut y| async move { /// for i in start.. { /// y.send(i).await; /// if i == 45 { /// break; /// } /// } /// }) /// } /// /// #[tokio::main] /// async fn main() { /// let values: Vec<_> = my_stream(42).collect().await; /// assert_eq!(values, vec![42, 43, 44, 45]); /// } /// ``` pub fn generate_stream<T, F, R>(generator: F) -> impl Stream<Item = T> where F: FnOnce(Yielder<T>) -> R, R: Future<Output = ()>, { let (tx, rx) = mpsc::channel(0); let fake_stream = stream::once(generator(Yielder(tx))).filter_map(|_| async { None }); stream::select(fake_stream, rx) } /// Creates a stream of `Result`s from `generator`. /// /// `generator` will receive a yielder object as an argument and should return a future /// (usually an async block) that will produce the stream's values using the yielder. /// If the future finishes with an `Ok`, the stream will end after producing all yielded values. /// If the future finishes with an `Err`, the stream will end after producing all yielded values /// and then producing an `Err` returned by the future. /// If the future never finishes, the stream will also never finish. /// /// ``` /// use futures::stream::StreamExt; /// use stream_generator::generate_try_stream; /// /// async fn failing() -> Result<(), &'static str> { /// Err("world") /// } /// /// #[tokio::main] /// async fn main() { /// let s = generate_try_stream(|mut y| async move { /// y.send(Ok("hello")).await; /// failing().await?; /// unreachable!(); /// }); /// /// let values: Vec<_> = s.collect().await; /// assert_eq!(values, vec![Ok("hello"), Err("world")]); /// } /// ``` pub fn generate_try_stream<T, E, F, R>(generator: F) -> impl Stream<Item = Result<T, E>> where F: FnOnce(Yielder<Result<T, E>>) -> R, R: Future<Output = Result<(), E>>, { generate_stream(|y| async move { let mut y2 = y.clone(); if let Err(err) = generator(y).await { y2.send(Err(err)).await; } }) }