ogre_stream_ext/
try_enumerate_ext.rs1use futures::{StreamExt, stream::Map, Stream};
4
5pub trait TryEnumerateExt<Ok, Err>: Stream<Item=Result<Ok, Err>> + Sized {
6
7 #[allow(clippy::type_complexity)]
10 fn try_enumerate(
11 self
12 ) -> Map<
13 futures::stream::Enumerate<Self>,
14 fn((usize, Result<Ok, Err>)) -> Result<(usize, Ok), Err>,
15 > {
16 self.enumerate().map(|(i, res)| res.map(|v| (i, v)))
17 }
18}
19impl<S, Ok, Err> TryEnumerateExt<Ok, Err> for S where S: Stream<Item=Result<Ok, Err>> + Sized {}
20
21pub trait TryEnumerateAllExt<Ok, Err>: Stream<Item=Result<Ok, Err>> + Sized {
22
23 #[allow(clippy::type_complexity)]
25 fn try_enumerate_all(
26 self
27 ) -> Map<
28 futures::stream::Enumerate<Self>,
29 fn((usize, Result<Ok, Err>)) -> Result<(usize, Ok), (usize, Err)>,
30 > {
31 self.enumerate().map(|(i, res)| res.map(|v| (i, v)).map_err(|err| (i, err)))
32 }
33}
34impl<S, Ok, Err> TryEnumerateAllExt<Ok, Err> for S where S: Stream<Item=Result<Ok, Err>> + Sized {}
35
36
37#[cfg(test)]
38mod tests {
39 use super::*;
40 use std::future;
41 use futures::stream;
42 use tokio::pin;
43
44 #[tokio::test]
47 async fn enumerate_yielded_regardless_of_errors() {
48 let stream = stream::iter([Ok(0), Err(1), Ok(2)])
49 .try_enumerate()
50 .filter_map(|res| future::ready(res.ok()));
51 pin!(stream);
52 for i in 0..2 {
53 let (enumerate_idx, value) = stream.next().await.expect("Stream ended prematurely");
54 assert_eq!(enumerate_idx, value, "Enumeration index mismatch for item #{i}")
55 }
56
57 }
58}