futures_to_stream/
lib.rs

1//! Macros to create streams from heterogeneous futures
2
3use futures::future::Future;
4use futures::stream::Stream;
5
6pub mod internal {
7    use futures::future::Either;
8
9    /// Create an `Either<A, A>::Left` from an `A`
10    pub fn left_homogenous<A>(a: A) -> Either<A, A> {
11        Either::Left(a)
12    }
13}
14
15/// Create an iterator of homogeneous [`Future`]s from a set of heterogeneous futures
16/// with the same associated [`Output`](Future::Output) type.
17#[macro_export]
18macro_rules! create_homogeneous_future {
19  () => {
20    {
21      use std::option::*;
22      use futures::future::Either;
23      Either::Right(empty())
24    }
25  };
26  ($x:expr) => {
27    {
28      Some($crate::internal::left_homogenous($x)).into_iter()
29    }
30  };
31  ($x:expr, $($tail:expr),*) => {
32    {
33      use futures::future::Either;
34      use std::iter::Iterator;
35
36      Some(Either::Left($x)).into_iter().chain(
37        $crate::create_homogeneous_future!($($tail),*).map(Either::Right)
38      )
39    }
40  };
41}
42
43/// Create a [`Stream`] from a set of [`Future`]s, where all yielded [`Item`](Stream::Item)s are in the
44/// order of their presented [`Future`]s.
45#[macro_export]
46macro_rules! futures_to_ordered_stream {
47  ($($tail:tt)*) => {
48    {
49      use futures::stream::FuturesOrdered;
50
51      let futs = $crate::create_homogeneous_future!($($tail)*);
52      FuturesOrdered::from_iter(futs)
53    }
54  }
55}
56
57/// Create a [`Stream`] from a set of [`Future`]s, where [`Item`](Stream::Item)s may be yielded in any order.
58#[macro_export]
59macro_rules! futures_to_unordered_stream {
60  ($($tail:tt)*) => {
61    {
62      use futures::stream::FuturesUnordered;
63
64      let futs = $crate::create_homogeneous_future!($($tail)*);
65      FuturesUnordered::from_iter(futs)
66    }
67  }
68}
69
70#[cfg(test)]
71mod tests {
72    use futures::stream::{Stream, StreamExt};
73    use tokio::time::{sleep, Duration};
74
75    async fn test1() -> u8 {
76        sleep(Duration::from_millis(400)).await;
77        1
78    }
79    async fn test2() -> u8 {
80        sleep(Duration::from_millis(300)).await;
81        2
82    }
83    async fn test3() -> u8 {
84        sleep(Duration::from_millis(200)).await;
85        3
86    }
87    async fn test4() -> u8 {
88        sleep(Duration::from_millis(100)).await;
89        4
90    }
91
92    fn futs_ordered() -> impl Stream<Item = u8> {
93        futures_to_ordered_stream!(test1(), test2(), test3(), test4())
94    }
95
96    fn futs_unordered() -> impl Stream<Item = u8> {
97        futures_to_unordered_stream!(test1(), test2(), test3(), test4())
98    }
99
100    #[tokio::test]
101    async fn test_ordered() {
102        let res: Vec<u8> = futs_ordered().collect().await;
103        assert_eq!(vec![1, 2, 3, 4], res);
104    }
105
106    #[tokio::test]
107    async fn test_unordered() {
108        let res: Vec<u8> = futs_unordered().collect().await;
109        assert_eq!(vec![4, 3, 2, 1], res);
110    }
111}