1use futures::future::Future;
4use futures::stream::Stream;
5
6pub mod internal {
7 use futures::future::Either;
8
9 pub fn left_homogenous<A>(a: A) -> Either<A, A> {
11 Either::Left(a)
12 }
13}
14
15#[macro_export]
18macro_rules! create_homogeneous_future {
19 () => {
20 {
21 use std::option::*;
22 use futures::future::Either;
23 Either::Right(empty())
24 }
25 };
26 ($x:expr) => {
27 {
28 Some($crate::internal::left_homogenous($x)).into_iter()
29 }
30 };
31 ($x:expr, $($tail:expr),*) => {
32 {
33 use futures::future::Either;
34 use std::iter::Iterator;
35
36 Some(Either::Left($x)).into_iter().chain(
37 $crate::create_homogeneous_future!($($tail),*).map(Either::Right)
38 )
39 }
40 };
41}
42
43#[macro_export]
46macro_rules! futures_to_ordered_stream {
47 ($($tail:tt)*) => {
48 {
49 use futures::stream::FuturesOrdered;
50
51 let futs = $crate::create_homogeneous_future!($($tail)*);
52 FuturesOrdered::from_iter(futs)
53 }
54 }
55}
56
57#[macro_export]
59macro_rules! futures_to_unordered_stream {
60 ($($tail:tt)*) => {
61 {
62 use futures::stream::FuturesUnordered;
63
64 let futs = $crate::create_homogeneous_future!($($tail)*);
65 FuturesUnordered::from_iter(futs)
66 }
67 }
68}
69
70#[cfg(test)]
71mod tests {
72 use futures::stream::{Stream, StreamExt};
73 use tokio::time::{sleep, Duration};
74
75 async fn test1() -> u8 {
76 sleep(Duration::from_millis(400)).await;
77 1
78 }
79 async fn test2() -> u8 {
80 sleep(Duration::from_millis(300)).await;
81 2
82 }
83 async fn test3() -> u8 {
84 sleep(Duration::from_millis(200)).await;
85 3
86 }
87 async fn test4() -> u8 {
88 sleep(Duration::from_millis(100)).await;
89 4
90 }
91
92 fn futs_ordered() -> impl Stream<Item = u8> {
93 futures_to_ordered_stream!(test1(), test2(), test3(), test4())
94 }
95
96 fn futs_unordered() -> impl Stream<Item = u8> {
97 futures_to_unordered_stream!(test1(), test2(), test3(), test4())
98 }
99
100 #[tokio::test]
101 async fn test_ordered() {
102 let res: Vec<u8> = futs_ordered().collect().await;
103 assert_eq!(vec![1, 2, 3, 4], res);
104 }
105
106 #[tokio::test]
107 async fn test_unordered() {
108 let res: Vec<u8> = futs_unordered().collect().await;
109 assert_eq!(vec![4, 3, 2, 1], res);
110 }
111}