error_stack/ext/stream.rs
1use core::{
2 future::Future,
3 mem,
4 pin::Pin,
5 task::{Context, Poll, ready},
6};
7
8use futures_core::{FusedFuture, FusedStream, TryStream};
9use pin_project_lite::pin_project;
10
11use crate::Report;
12
13pin_project! {
14 /// Future for the [`try_collect_reports`](TryReportStreamExt::try_collect_reports)
15 /// and [`try_collect_reports_bounded`](TryReportStreamExt::try_collect_reports_bounded) methods.
16 #[derive(Debug)]
17 #[must_use = "futures do nothing unless you `.await` or poll them"]
18 pub struct TryCollectReports<S, A, C> {
19 #[pin]
20 stream: S,
21 output: Result<A, Report<[C]>>,
22
23 context_len: usize,
24 context_bound: usize
25 }
26}
27
28impl<S, A, C> TryCollectReports<S, A, C>
29where
30 S: TryStream,
31 A: Default + Extend<S::Ok>,
32{
33 fn new(stream: S, bound: Option<usize>) -> Self {
34 Self {
35 stream,
36 output: Ok(Default::default()),
37 context_len: 0,
38 context_bound: bound.unwrap_or(usize::MAX),
39 }
40 }
41}
42
43impl<S, A, C> FusedFuture for TryCollectReports<S, A, C>
44where
45 S: TryStream<Error: Into<Report<[C]>>> + FusedStream,
46 A: Default + Extend<S::Ok>,
47{
48 fn is_terminated(&self) -> bool {
49 self.stream.is_terminated()
50 }
51}
52
53impl<S, A, C> Future for TryCollectReports<S, A, C>
54where
55 S: TryStream<Error: Into<Report<[C]>>>,
56 A: Default + Extend<S::Ok>,
57{
58 type Output = Result<A, Report<[C]>>;
59
60 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
61 let mut this = self.project();
62
63 let value = loop {
64 if *this.context_len >= *this.context_bound {
65 break mem::replace(this.output, Ok(A::default()));
66 }
67
68 let next = ready!(this.stream.as_mut().try_poll_next(cx));
69 match (next, &mut *this.output) {
70 (Some(Ok(value)), Ok(output)) => {
71 output.extend(core::iter::once(value));
72 }
73 (Some(Ok(_)), Err(_)) => {
74 // we're now just consuming the iterator to return all related errors
75 // so we can just ignore the output
76 }
77 (Some(Err(error)), output @ Ok(_)) => {
78 *output = Err(error.into());
79 *this.context_len += 1;
80 }
81 (Some(Err(error)), Err(report)) => {
82 report.append(error.into());
83 *this.context_len += 1;
84 }
85 (None, output) => {
86 break mem::replace(output, Ok(A::default()));
87 }
88 }
89 };
90
91 Poll::Ready(value)
92 }
93}
94
95/// Trait extending [`TryStream`] with methods for collecting error-stack results in a fail-slow
96/// manner.
97///
98/// This trait provides additional functionality to [`TryStream`]s, allowing for the collection of
99/// successful items while accumulating errors. It's particularly useful when you want to continue
100/// processing a stream even after encountering errors, gathering all successful results and errors
101/// until the stream is exhausted or a specified error limit is reached.
102///
103/// The fail-slow approach means that the stream processing continues after encountering errors,
104/// unlike traditional error handling that might stop at the first error.
105///
106/// # Performance Considerations
107///
108/// These methods may have performance implications as they potentially iterate
109/// through the entire stream, even after encountering errors.
110///
111/// # Note
112///
113/// This trait is only available behind the `unstable` flag and is not covered by semver guarantees.
114/// It may be subject to breaking changes in future releases.
115///
116/// [`TryStream`]: futures_core::stream::TryStream
117pub trait TryReportStreamExt<C>: TryStream<Error: Into<Report<[C]>>> {
118 /// Collects all successful items from the stream into a collection, accumulating all errors.
119 ///
120 /// This method will continue processing the stream even after encountering errors, collecting
121 /// all successful items and all errors until the stream is exhausted.
122 ///
123 /// # Examples
124 ///
125 /// ```
126 /// # use error_stack::{Report, TryReportStreamExt};
127 /// # use futures_util::stream;
128 ///
129 /// #[derive(Debug, Clone, PartialEq, Eq)]
130 /// pub struct UnknownError;
131 ///
132 /// impl core::fmt::Display for UnknownError {
133 /// fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
134 /// f.write_str("UnknownError")
135 /// }
136 /// }
137 ///
138 /// impl core::error::Error for UnknownError {}
139 ///
140 /// #
141 /// # async fn example() {
142 /// let stream = stream::iter([
143 /// Ok(1),
144 /// Err(Report::new(UnknownError)),
145 /// Ok(2),
146 /// Err(Report::new(UnknownError)),
147 /// ]);
148 ///
149 /// let result: Result<Vec<i32>, _> = stream.try_collect_reports().await;
150 /// let report = result.expect_err("should have failed twice");
151 ///
152 /// assert_eq!(report.current_contexts().count(), 2);
153 /// # }
154 /// #
155 /// # futures::executor::block_on(example());
156 /// ```
157 fn try_collect_reports<A>(self) -> TryCollectReports<Self, A, C>
158 where
159 A: Default + Extend<Self::Ok>,
160 Self: Sized,
161 {
162 TryCollectReports::new(self, None)
163 }
164
165 /// Collects successful items from the stream into a collection, accumulating errors up to a
166 /// specified bound.
167 ///
168 /// This method will continue processing the stream after encountering errors, but will stop
169 /// once the number of accumulated errors reaches the specified `bound`.
170 ///
171 /// ```
172 /// # use error_stack::{Report, TryReportStreamExt};
173 /// # use futures_util::stream;
174 ///
175 /// #[derive(Debug, Clone, PartialEq, Eq)]
176 /// pub struct UnknownError;
177 ///
178 /// impl core::fmt::Display for UnknownError {
179 /// fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
180 /// f.write_str("UnknownError")
181 /// }
182 /// }
183 ///
184 /// impl core::error::Error for UnknownError {}
185 ///
186 /// #
187 /// # async fn example() {
188 /// let stream = stream::iter([
189 /// Ok(1),
190 /// Err(Report::new(UnknownError)),
191 /// Ok(2),
192 /// Err(Report::new(UnknownError)),
193 /// ]);
194 ///
195 /// let result: Result<Vec<i32>, _> = stream.try_collect_reports_bounded(1).await;
196 /// let report = result.expect_err("should have failed twice");
197 ///
198 /// assert_eq!(report.current_contexts().count(), 1);
199 /// # }
200 /// #
201 /// # futures::executor::block_on(example());
202 /// ```
203 fn try_collect_reports_bounded<A>(self, bound: usize) -> TryCollectReports<Self, A, C>
204 where
205 A: Default + Extend<Self::Ok>,
206 Self: Sized,
207 {
208 TryCollectReports::new(self, Some(bound))
209 }
210}
211
212impl<S, C> TryReportStreamExt<C> for S where S: TryStream<Error: Into<Report<[C]>>> {}