ogre_stream_ext/
try_enumerate_ext.rs

1//! Adds both `.try_enumerate()` and `.try_enumerate_all()` to Stream's, filling in this gap left by the current `futures::TryStreamExt` implementation
2
3use futures::{StreamExt, stream::Map, Stream};
4
5pub trait TryEnumerateExt<Ok, Err>: Stream<Item=Result<Ok, Err>> + Sized {
6
7    /// Like `StreamExt::enumerate`, but yields `Result<(idx, ok), err>`.
8    /// See also [TryEnumerateAllExt::try_enumerate_all()] if you also want the count for `Err` results.
9    #[allow(clippy::type_complexity)]
10    fn try_enumerate(
11        self
12    ) -> Map<
13        futures::stream::Enumerate<Self>,
14        fn((usize, Result<Ok, Err>)) -> Result<(usize, Ok), Err>,
15    > {
16        self.enumerate().map(|(i, res)| res.map(|v| (i, v)))
17    }
18}
19impl<S, Ok, Err> TryEnumerateExt<Ok, Err> for S where S: Stream<Item=Result<Ok, Err>> + Sized {}
20
21pub trait TryEnumerateAllExt<Ok, Err>: Stream<Item=Result<Ok, Err>> + Sized {
22
23    /// Like [TryEnumerateExt::try_enumerate()], but yields `Result<(idx, ok), (idx, err)>`.
24    #[allow(clippy::type_complexity)]
25    fn try_enumerate_all(
26        self
27    ) -> Map<
28        futures::stream::Enumerate<Self>,
29        fn((usize, Result<Ok, Err>)) -> Result<(usize, Ok), (usize, Err)>,
30    > {
31        self.enumerate().map(|(i, res)| res.map(|v| (i, v)).map_err(|err| (i, err)))
32    }
33}
34impl<S, Ok, Err> TryEnumerateAllExt<Ok, Err> for S where S: Stream<Item=Result<Ok, Err>> + Sized {}
35
36
37#[cfg(test)]
38mod tests {
39    use super::*;
40    use std::future;
41    use futures::stream;
42    use tokio::pin;
43
44    /// Assures the enumeration is done for yielded items,
45    /// regardless if they are Ok or Err results
46    #[tokio::test]
47    async fn enumerate_yielded_regardless_of_errors() {
48        let stream = stream::iter([Ok(0), Err(1), Ok(2)])
49            .try_enumerate()
50            .filter_map(|res| future::ready(res.ok()));
51        pin!(stream);
52        for i in 0..2 {
53            let (enumerate_idx, value) = stream.next().await.expect("Stream ended prematurely");
54            assert_eq!(enumerate_idx, value, "Enumeration index mismatch for item #{i}")
55        }
56
57    }
58}