cancel_safe_futures/stream/try_stream.rs
1use super::CollectThenTry;
2use crate::support::assert_future;
3use futures_core::TryStream;
4
5impl<S: ?Sized + TryStream> TryStreamExt for S {}
6
7/// Alternative adapters for `Result`-returning streams
8pub trait TryStreamExt: TryStream {
9 /// Attempts to run this stream to completion, executing the provided asynchronous closure for
10 /// each element on the stream concurrently as elements become available. Runs the stream to
11 /// completion, then exits with:
12 ///
13 /// - `Ok(())` if all elements were processed successfully.
14 /// - `Err(error)` if an error occurred while processing an element. The first error encountered
15 /// is cached and returned.
16 ///
17 /// This is similar to
18 /// [`try_for_each_concurrent`](futures::stream::TryStreamExt::try_for_each_concurrent),
19 /// but will continue running the stream to completion even if an error is encountered.
20 ///
21 /// This method is only available when the `std` or `alloc` feature of this library is
22 /// activated, and it is activated by default.
23 ///
24 /// # Examples
25 ///
26 /// ```
27 /// use cancel_safe_futures::stream::TryStreamExt;
28 /// use tokio::sync::oneshot;
29 /// use futures_util::{stream, FutureExt, StreamExt};
30 ///
31 /// # #[tokio::main(flavor = "current_thread")]
32 /// # async fn main() {
33 /// let (tx1, rx1) = oneshot::channel();
34 /// let (tx2, rx2) = oneshot::channel();
35 /// let (tx3, rx3) = oneshot::channel();
36 ///
37 /// let stream = stream::iter(vec![rx1, rx2, rx3]);
38 /// let fut = stream.map(Ok).for_each_concurrent_then_try(
39 /// /* limit */ 2,
40 /// |rx| async move {
41 /// let res: Result<(), oneshot::error::RecvError> = rx.await;
42 /// res
43 /// }
44 /// );
45 ///
46 /// tx1.send(()).unwrap();
47 /// // Drop the second sender so that `rx2` resolves to `Canceled`.
48 /// drop(tx2);
49 ///
50 /// // Unlike `try_for_each_concurrent`, tx3 also needs to be resolved
51 /// // before the future will finish execution. This causes `now_or_never` to
52 /// // return None.
53 /// let mut fut = std::pin::pin!(fut);
54 /// assert_eq!(fut.as_mut().now_or_never(), None);
55 ///
56 /// tx3.send(()).unwrap();
57 ///
58 /// // The final result is an error because the second future
59 /// // resulted in an error.
60 /// fut.await.unwrap_err();
61 /// # }
62 /// ```
63 #[cfg(not(futures_no_atomic_cas))]
64 #[cfg(feature = "alloc")]
65 fn for_each_concurrent_then_try<Fut, F>(
66 self,
67 limit: impl Into<Option<usize>>,
68 f: F,
69 ) -> super::ForEachConcurrentThenTry<Self, Fut, F>
70 where
71 F: FnMut(Self::Ok) -> Fut,
72 Fut: core::future::Future<Output = Result<(), Self::Error>>,
73 Self: Sized,
74 {
75 assert_future::<Result<(), Self::Error>, _>(super::ForEachConcurrentThenTry::new(
76 self,
77 limit.into(),
78 f,
79 ))
80 }
81
82 /// Attempt to transform a stream into a collection, returning a future representing the result
83 /// of that computation.
84 ///
85 /// This adapter will collect all successful results of this stream and collect them into the
86 /// specified collection type. Unlike
87 /// [`try_collect`](futures::stream::TryStreamExt::try_collect), if an error happens then the
88 /// stream will still be run to completion.
89 ///
90 /// If more than one error is produced, this adapter will return the first error encountered.
91 ///
92 /// The returned future will be resolved when the stream terminates.
93 ///
94 /// # Notes
95 ///
96 /// This adapter does not expose a way to gather and combine all returned errors. Implementing that
97 /// is a future goal, but it requires some design work for a generic way to combine errors. To
98 /// do that today, use [`futures::StreamExt::collect`] and combine errors at the end.
99 ///
100 /// # Examples
101 ///
102 /// This example uses the [`async-stream`](https://docs.rs/async-stream) crate to create a
103 /// stream with interspersed `Ok` and `Err` values.
104 ///
105 /// ```
106 /// use cancel_safe_futures::stream::TryStreamExt;
107 /// use std::sync::atomic::{AtomicBool, Ordering};
108 ///
109 /// # #[tokio::main(flavor = "current_thread")]
110 /// # async fn main() {
111 /// let end_of_stream = AtomicBool::new(false);
112 /// let end_ref = &end_of_stream;
113 ///
114 /// // This stream generates interspersed Ok and Err values.
115 /// let stream = async_stream::stream! {
116 /// for i in 1..=4 {
117 /// yield Ok(i);
118 /// }
119 /// yield Err(5);
120 /// for i in 6..=9 {
121 /// yield Ok(i);
122 /// }
123 /// yield Err(10);
124 ///
125 /// end_ref.store(true, Ordering::SeqCst);
126 /// };
127 ///
128 /// let output: Result<Vec<i32>, i32> = stream.collect_then_try().await;
129 ///
130 /// // The first error encountered is returned.
131 /// assert_eq!(output, Err(5));
132 ///
133 /// // The stream is still run to completion even though it errored out in the middle.
134 /// assert!(end_of_stream.load(Ordering::SeqCst));
135 /// # }
136 /// ```
137 fn collect_then_try<C: Default + Extend<Self::Ok>>(self) -> CollectThenTry<Self, C>
138 where
139 Self: Sized,
140 {
141 assert_future::<Result<C, Self::Error>, _>(CollectThenTry::new(self))
142 }
143}