ogre_stream_ext/
try_count_ext.rs

1//! Adds `.try_count()` to Stream's, not provided by either `futures` nor `futures-util` crates
2
3use std::future::Future;
4use futures::{TryStreamExt, TryStream};
5
6pub trait TryCountExt: TryStream + Sized {
7
8    /// A terminal operation that counts the number of `Ok` items in the Stream,
9    /// erroring if an `Err` `Result` is ever encountered
10    fn try_count(
11        self
12    ) -> impl Future<Output=Result<usize, Self::Error>> {
13        self.try_fold(0usize, |acc, _| async move {
14            Ok(acc + 1)
15        })
16    }
17}
18impl<S> TryCountExt for S where S: TryStream + Sized {}
19
20#[cfg(test)]
21mod tests {
22    use super::*;
23    use futures::stream;
24
25    #[tokio::test]
26    async fn try_count_ok() {
27        let items = [Ok::<_, ()>(0), Ok(1), Ok(2)];
28        let observed_count = stream::iter(items)
29            .try_count()
30            .await
31            .expect("`try_count()` should not fail for this test");
32        assert_eq!(observed_count, items.len(), "Wrong count")
33
34    }
35
36    #[tokio::test]
37    async fn try_count_err() {
38        let items = [Ok(0), Err(1), Ok(2)];
39        let observed_result = stream::iter(items)
40            .try_count()
41            .await;
42        assert!(observed_result.is_err(), "`try_count()` should have resulted in `Err` for this test");
43
44    }
45}