cancel_safe_futures/stream/
try_stream.rs

1use super::CollectThenTry;
2use crate::support::assert_future;
3use futures_core::TryStream;
4
5impl<S: ?Sized + TryStream> TryStreamExt for S {}
6
7/// Alternative adapters for `Result`-returning streams
8pub trait TryStreamExt: TryStream {
9    /// Attempts to run this stream to completion, executing the provided asynchronous closure for
10    /// each element on the stream concurrently as elements become available. Runs the stream to
11    /// completion, then exits with:
12    ///
13    /// - `Ok(())` if all elements were processed successfully.
14    /// - `Err(error)` if an error occurred while processing an element. The first error encountered
15    ///   is cached and returned.
16    ///
17    /// This is similar to
18    /// [`try_for_each_concurrent`](futures::stream::TryStreamExt::try_for_each_concurrent),
19    /// but will continue running the stream to completion even if an error is encountered.
20    ///
21    /// This method is only available when the `std` or `alloc` feature of this library is
22    /// activated, and it is activated by default.
23    ///
24    /// # Examples
25    ///
26    /// ```
27    /// use cancel_safe_futures::stream::TryStreamExt;
28    /// use tokio::sync::oneshot;
29    /// use futures_util::{stream, FutureExt, StreamExt};
30    ///
31    /// # #[tokio::main(flavor = "current_thread")]
32    /// # async fn main() {
33    /// let (tx1, rx1) = oneshot::channel();
34    /// let (tx2, rx2) = oneshot::channel();
35    /// let (tx3, rx3) = oneshot::channel();
36    ///
37    /// let stream = stream::iter(vec![rx1, rx2, rx3]);
38    /// let fut = stream.map(Ok).for_each_concurrent_then_try(
39    ///     /* limit */ 2,
40    ///     |rx| async move {
41    ///         let res: Result<(), oneshot::error::RecvError> = rx.await;
42    ///         res
43    ///     }
44    /// );
45    ///
46    /// tx1.send(()).unwrap();
47    /// // Drop the second sender so that `rx2` resolves to `Canceled`.
48    /// drop(tx2);
49    ///
50    /// // Unlike `try_for_each_concurrent`, tx3 also needs to be resolved
51    /// // before the future will finish execution. This causes `now_or_never` to
52    /// // return None.
53    /// let mut fut = std::pin::pin!(fut);
54    /// assert_eq!(fut.as_mut().now_or_never(), None);
55    ///
56    /// tx3.send(()).unwrap();
57    ///
58    /// // The final result is an error because the second future
59    /// // resulted in an error.
60    /// fut.await.unwrap_err();
61    /// # }
62    /// ```
63    #[cfg(not(futures_no_atomic_cas))]
64    #[cfg(feature = "alloc")]
65    fn for_each_concurrent_then_try<Fut, F>(
66        self,
67        limit: impl Into<Option<usize>>,
68        f: F,
69    ) -> super::ForEachConcurrentThenTry<Self, Fut, F>
70    where
71        F: FnMut(Self::Ok) -> Fut,
72        Fut: core::future::Future<Output = Result<(), Self::Error>>,
73        Self: Sized,
74    {
75        assert_future::<Result<(), Self::Error>, _>(super::ForEachConcurrentThenTry::new(
76            self,
77            limit.into(),
78            f,
79        ))
80    }
81
82    /// Attempt to transform a stream into a collection, returning a future representing the result
83    /// of that computation.
84    ///
85    /// This adapter will collect all successful results of this stream and collect them into the
86    /// specified collection type. Unlike
87    /// [`try_collect`](futures::stream::TryStreamExt::try_collect), if an error happens then the
88    /// stream will still be run to completion.
89    ///
90    /// If more than one error is produced, this adapter will return the first error encountered.
91    ///
92    /// The returned future will be resolved when the stream terminates.
93    ///
94    /// # Notes
95    ///
96    /// This adapter does not expose a way to gather and combine all returned errors. Implementing that
97    /// is a future goal, but it requires some design work for a generic way to combine errors. To
98    /// do that today, use [`futures::StreamExt::collect`] and combine errors at the end.
99    ///
100    /// # Examples
101    ///
102    /// This example uses the [`async-stream`](https://docs.rs/async-stream) crate to create a
103    /// stream with interspersed `Ok` and `Err` values.
104    ///
105    /// ```
106    /// use cancel_safe_futures::stream::TryStreamExt;
107    /// use std::sync::atomic::{AtomicBool, Ordering};
108    ///
109    /// # #[tokio::main(flavor = "current_thread")]
110    /// # async fn main() {
111    /// let end_of_stream = AtomicBool::new(false);
112    /// let end_ref = &end_of_stream;
113    ///
114    /// // This stream generates interspersed Ok and Err values.
115    /// let stream = async_stream::stream! {
116    ///     for i in 1..=4 {
117    ///         yield Ok(i);
118    ///     }
119    ///     yield Err(5);
120    ///     for i in 6..=9 {
121    ///         yield Ok(i);
122    ///     }
123    ///     yield Err(10);
124    ///
125    ///     end_ref.store(true, Ordering::SeqCst);
126    /// };
127    ///
128    /// let output: Result<Vec<i32>, i32> = stream.collect_then_try().await;
129    ///
130    /// // The first error encountered is returned.
131    /// assert_eq!(output, Err(5));
132    ///
133    /// // The stream is still run to completion even though it errored out in the middle.
134    /// assert!(end_of_stream.load(Ordering::SeqCst));
135    /// # }
136    /// ```
137    fn collect_then_try<C: Default + Extend<Self::Ok>>(self) -> CollectThenTry<Self, C>
138    where
139        Self: Sized,
140    {
141        assert_future::<Result<C, Self::Error>, _>(CollectThenTry::new(self))
142    }
143}