linked_futures/
lib.rs

1#![doc(html_root_url = "https://docs.rs/linked-futures/0.1.3")]
2#![warn(missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
3#![deny(intra_doc_link_resolution_failure)]
4
5//! This crate provides the way to "link" futures into a single block,
6//! which stops executing once any of these futures complete.
7//!
8//! Under the hood, it uses [`FuturesUnordered`](https://docs.rs/futures/0.3.1/futures/stream/struct.FuturesUnordered.html)
9//! to execute multiple futures efficiently. In order to avoid boxing, custom `one-of` type from
10//! [`one-of-futures`](https://crates.io/crates/one-of-futures) crate is generated for
11//! each [`link_futures`](macro.link_futures.html) block.
12
13pub use futures::stream::{FuturesUnordered, StreamExt};
14pub use one_of_futures::impl_one_of;
15
16/// Create necessary enums for later usage with [`link_futures`](macro.link_futures.html)
17#[macro_export]
18macro_rules! linked_block {
19    ( $one_of_block:ident, $identifier_enum:ident; $($variants:ident),* ) => {
20        $crate::impl_one_of!($one_of_block; $($variants),*);
21
22        #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
23        enum $identifier_enum {
24            $($variants),*
25        }
26    }
27}
28
29/// Link multiple futures into a single block
30///
31/// Example:
32/// ```rust
33/// use std::time::Duration;
34///
35/// use futures::{pin_mut, SinkExt, StreamExt};
36/// use futures::channel::mpsc;
37/// use futures::executor::block_on;
38/// use tokio::time::{delay_for, interval, Instant};
39///
40/// use linked_futures::{link_futures, linked_block};
41///
42/// linked_block!(PeriodicStoppableSender, PeriodicStoppableSenderFutureIdentifier;
43///     Forwarder,
44///     Reader,
45///     Generator,
46///     Stop
47/// );
48///
49/// #[tokio::main]
50/// async fn main() {
51///     let (mut tx1, mut rx1) = mpsc::channel::<Instant>(1);
52///     let (mut tx2, mut rx2) = mpsc::channel::<Instant>(1);
53///
54///     let mut interval = interval(Duration::from_millis(100));
55///
56///     let generator = async {
57///         while let Some(instant) = interval.next().await {
58///             tx1.send(instant).await;
59///         }
60///     };
61///     let forwarder = async {
62///         while let Some(instant) = rx1.next().await {
63///             tx2.send(instant).await;
64///         }
65///     };
66///     let reader = async {
67///         while let Some(instant) = rx2.next().await {
68///             println!("instant: {:?}", instant);
69///         }
70///     };
71///     let stop = async {
72///         delay_for(Duration::from_secs(1)).await;
73///     };
74///     let linked = link_futures!(
75///        PeriodicStoppableSender,
76///        PeriodicStoppableSenderFutureIdentifier;
77///        Generator => generator,
78///        Forwarder => forwarder,
79///        Reader => reader,
80///        Stop => stop
81///     );
82///     block_on(async {
83///         pin_mut!(linked);
84///         let (completed_future_identifier, _) = linked.await;
85///         match completed_future_identifier {
86///             PeriodicStoppableSenderFutureIdentifier::Stop =>
87///                 println!("linked block stopped normally"),
88///             n =>
89///                 panic!("linked block unexpectedly terminated by future: {:?}", n),
90///         }
91///     });
92/// }
93/// ```
94#[macro_export]
95macro_rules! link_futures {
96    ( $one_of_block:ident, $identifier_enum:ident; $( $key:ident => $value:expr ),* ) => {{
97        let mut linked = $crate::FuturesUnordered::new();
98        $( linked.push($one_of_block::$key(async {
99            ($identifier_enum::$key, $value.await)
100        })); )*
101        async move {
102            use $crate::StreamExt;
103
104            linked.next().await.unwrap()
105        }
106    }};
107}
108
109#[cfg(test)]
110mod tests {
111    #[test]
112    fn test_readme_deps() {
113        version_sync::assert_markdown_deps_updated!("README.md");
114    }
115
116    #[test]
117    fn test_html_root_url() {
118        version_sync::assert_html_root_url_updated!("src/lib.rs");
119    }
120}