linked_futures/lib.rs
1#![doc(html_root_url = "https://docs.rs/linked-futures/0.1.3")]
2#![warn(missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
3#![deny(intra_doc_link_resolution_failure)]
4
5//! This crate provides the way to "link" futures into a single block,
6//! which stops executing once any of these futures complete.
7//!
8//! Under the hood, it uses [`FuturesUnordered`](https://docs.rs/futures/0.3.1/futures/stream/struct.FuturesUnordered.html)
9//! to execute multiple futures efficiently. In order to avoid boxing, custom `one-of` type from
10//! [`one-of-futures`](https://crates.io/crates/one-of-futures) crate is generated for
11//! each [`link_futures`](macro.link_futures.html) block.
12
13pub use futures::stream::{FuturesUnordered, StreamExt};
14pub use one_of_futures::impl_one_of;
15
16/// Create necessary enums for later usage with [`link_futures`](macro.link_futures.html)
17#[macro_export]
18macro_rules! linked_block {
19 ( $one_of_block:ident, $identifier_enum:ident; $($variants:ident),* ) => {
20 $crate::impl_one_of!($one_of_block; $($variants),*);
21
22 #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
23 enum $identifier_enum {
24 $($variants),*
25 }
26 }
27}
28
29/// Link multiple futures into a single block
30///
31/// Example:
32/// ```rust
33/// use std::time::Duration;
34///
35/// use futures::{pin_mut, SinkExt, StreamExt};
36/// use futures::channel::mpsc;
37/// use futures::executor::block_on;
38/// use tokio::time::{delay_for, interval, Instant};
39///
40/// use linked_futures::{link_futures, linked_block};
41///
42/// linked_block!(PeriodicStoppableSender, PeriodicStoppableSenderFutureIdentifier;
43/// Forwarder,
44/// Reader,
45/// Generator,
46/// Stop
47/// );
48///
49/// #[tokio::main]
50/// async fn main() {
51/// let (mut tx1, mut rx1) = mpsc::channel::<Instant>(1);
52/// let (mut tx2, mut rx2) = mpsc::channel::<Instant>(1);
53///
54/// let mut interval = interval(Duration::from_millis(100));
55///
56/// let generator = async {
57/// while let Some(instant) = interval.next().await {
58/// tx1.send(instant).await;
59/// }
60/// };
61/// let forwarder = async {
62/// while let Some(instant) = rx1.next().await {
63/// tx2.send(instant).await;
64/// }
65/// };
66/// let reader = async {
67/// while let Some(instant) = rx2.next().await {
68/// println!("instant: {:?}", instant);
69/// }
70/// };
71/// let stop = async {
72/// delay_for(Duration::from_secs(1)).await;
73/// };
74/// let linked = link_futures!(
75/// PeriodicStoppableSender,
76/// PeriodicStoppableSenderFutureIdentifier;
77/// Generator => generator,
78/// Forwarder => forwarder,
79/// Reader => reader,
80/// Stop => stop
81/// );
82/// block_on(async {
83/// pin_mut!(linked);
84/// let (completed_future_identifier, _) = linked.await;
85/// match completed_future_identifier {
86/// PeriodicStoppableSenderFutureIdentifier::Stop =>
87/// println!("linked block stopped normally"),
88/// n =>
89/// panic!("linked block unexpectedly terminated by future: {:?}", n),
90/// }
91/// });
92/// }
93/// ```
94#[macro_export]
95macro_rules! link_futures {
96 ( $one_of_block:ident, $identifier_enum:ident; $( $key:ident => $value:expr ),* ) => {{
97 let mut linked = $crate::FuturesUnordered::new();
98 $( linked.push($one_of_block::$key(async {
99 ($identifier_enum::$key, $value.await)
100 })); )*
101 async move {
102 use $crate::StreamExt;
103
104 linked.next().await.unwrap()
105 }
106 }};
107}
108
109#[cfg(test)]
110mod tests {
111 #[test]
112 fn test_readme_deps() {
113 version_sync::assert_markdown_deps_updated!("README.md");
114 }
115
116 #[test]
117 fn test_html_root_url() {
118 version_sync::assert_html_root_url_updated!("src/lib.rs");
119 }
120}