error_stack/ext/
stream.rs

1use core::{
2    future::Future,
3    mem,
4    pin::Pin,
5    task::{Context, Poll, ready},
6};
7
8use futures_core::{FusedFuture, FusedStream, TryStream};
9use pin_project_lite::pin_project;
10
11use crate::Report;
12
13pin_project! {
14    /// Future for the [`try_collect_reports`](TryReportStreamExt::try_collect_reports)
15    /// and [`try_collect_reports_bounded`](TryReportStreamExt::try_collect_reports_bounded) methods.
16    #[derive(Debug)]
17    #[must_use = "futures do nothing unless you `.await` or poll them"]
18    pub struct TryCollectReports<S, A, C> {
19        #[pin]
20        stream: S,
21        output: Result<A, Report<[C]>>,
22
23        context_len: usize,
24        context_bound: usize
25    }
26}
27
28impl<S, A, C> TryCollectReports<S, A, C>
29where
30    S: TryStream,
31    A: Default + Extend<S::Ok>,
32{
33    fn new(stream: S, bound: Option<usize>) -> Self {
34        Self {
35            stream,
36            output: Ok(Default::default()),
37            context_len: 0,
38            context_bound: bound.unwrap_or(usize::MAX),
39        }
40    }
41}
42
43impl<S, A, C> FusedFuture for TryCollectReports<S, A, C>
44where
45    S: TryStream<Error: Into<Report<[C]>>> + FusedStream,
46    A: Default + Extend<S::Ok>,
47{
48    fn is_terminated(&self) -> bool {
49        self.stream.is_terminated()
50    }
51}
52
53impl<S, A, C> Future for TryCollectReports<S, A, C>
54where
55    S: TryStream<Error: Into<Report<[C]>>>,
56    A: Default + Extend<S::Ok>,
57{
58    type Output = Result<A, Report<[C]>>;
59
60    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
61        let mut this = self.project();
62
63        let value = loop {
64            if *this.context_len >= *this.context_bound {
65                break mem::replace(this.output, Ok(A::default()));
66            }
67
68            let next = ready!(this.stream.as_mut().try_poll_next(cx));
69            match (next, &mut *this.output) {
70                (Some(Ok(value)), Ok(output)) => {
71                    output.extend(core::iter::once(value));
72                }
73                (Some(Ok(_)), Err(_)) => {
74                    // we're now just consuming the iterator to return all related errors
75                    // so we can just ignore the output
76                }
77                (Some(Err(error)), output @ Ok(_)) => {
78                    *output = Err(error.into());
79                    *this.context_len += 1;
80                }
81                (Some(Err(error)), Err(report)) => {
82                    report.append(error.into());
83                    *this.context_len += 1;
84                }
85                (None, output) => {
86                    break mem::replace(output, Ok(A::default()));
87                }
88            }
89        };
90
91        Poll::Ready(value)
92    }
93}
94
95/// Trait extending [`TryStream`] with methods for collecting error-stack results in a fail-slow
96/// manner.
97///
98/// This trait provides additional functionality to [`TryStream`]s, allowing for the collection of
99/// successful items while accumulating errors. It's particularly useful when you want to continue
100/// processing a stream even after encountering errors, gathering all successful results and errors
101/// until the stream is exhausted or a specified error limit is reached.
102///
103/// The fail-slow approach means that the stream processing continues after encountering errors,
104/// unlike traditional error handling that might stop at the first error.
105///
106/// # Performance Considerations
107///
108/// These methods may have performance implications as they potentially iterate
109/// through the entire stream, even after encountering errors.
110///
111/// # Note
112///
113/// This trait is only available behind the `unstable` flag and is not covered by semver guarantees.
114/// It may be subject to breaking changes in future releases.
115///
116/// [`TryStream`]: futures_core::stream::TryStream
117pub trait TryReportStreamExt<C>: TryStream<Error: Into<Report<[C]>>> {
118    /// Collects all successful items from the stream into a collection, accumulating all errors.
119    ///
120    /// This method will continue processing the stream even after encountering errors, collecting
121    /// all successful items and all errors until the stream is exhausted.
122    ///
123    /// # Examples
124    ///
125    /// ```
126    /// # use error_stack::{Report, TryReportStreamExt};
127    /// # use futures_util::stream;
128    ///
129    /// #[derive(Debug, Clone, PartialEq, Eq)]
130    /// pub struct UnknownError;
131    ///
132    /// impl core::fmt::Display for UnknownError {
133    ///     fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
134    ///         f.write_str("UnknownError")
135    ///     }
136    /// }
137    ///
138    /// impl core::error::Error for UnknownError {}
139    ///
140    /// #
141    /// # async fn example() {
142    /// let stream = stream::iter([
143    ///     Ok(1),
144    ///     Err(Report::new(UnknownError)),
145    ///     Ok(2),
146    ///     Err(Report::new(UnknownError)),
147    /// ]);
148    ///
149    /// let result: Result<Vec<i32>, _> = stream.try_collect_reports().await;
150    /// let report = result.expect_err("should have failed twice");
151    ///
152    /// assert_eq!(report.current_contexts().count(), 2);
153    /// # }
154    /// #
155    /// # futures::executor::block_on(example());
156    /// ```
157    fn try_collect_reports<A>(self) -> TryCollectReports<Self, A, C>
158    where
159        A: Default + Extend<Self::Ok>,
160        Self: Sized,
161    {
162        TryCollectReports::new(self, None)
163    }
164
165    /// Collects successful items from the stream into a collection, accumulating errors up to a
166    /// specified bound.
167    ///
168    /// This method will continue processing the stream after encountering errors, but will stop
169    /// once the number of accumulated errors reaches the specified `bound`.
170    ///
171    /// ```
172    /// # use error_stack::{Report, TryReportStreamExt};
173    /// # use futures_util::stream;
174    ///
175    /// #[derive(Debug, Clone, PartialEq, Eq)]
176    /// pub struct UnknownError;
177    ///
178    /// impl core::fmt::Display for UnknownError {
179    ///     fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
180    ///         f.write_str("UnknownError")
181    ///     }
182    /// }
183    ///
184    /// impl core::error::Error for UnknownError {}
185    ///
186    /// #
187    /// # async fn example() {
188    /// let stream = stream::iter([
189    ///     Ok(1),
190    ///     Err(Report::new(UnknownError)),
191    ///     Ok(2),
192    ///     Err(Report::new(UnknownError)),
193    /// ]);
194    ///
195    /// let result: Result<Vec<i32>, _> = stream.try_collect_reports_bounded(1).await;
196    /// let report = result.expect_err("should have failed twice");
197    ///
198    /// assert_eq!(report.current_contexts().count(), 1);
199    /// # }
200    /// #
201    /// # futures::executor::block_on(example());
202    /// ```
203    fn try_collect_reports_bounded<A>(self, bound: usize) -> TryCollectReports<Self, A, C>
204    where
205        A: Default + Extend<Self::Ok>,
206        Self: Sized,
207    {
208        TryCollectReports::new(self, Some(bound))
209    }
210}
211
212impl<S, C> TryReportStreamExt<C> for S where S: TryStream<Error: Into<Report<[C]>>> {}