1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#![doc(html_root_url = "https://docs.rs/linked-futures/0.1.3")]
#![warn(missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
#![deny(intra_doc_link_resolution_failure)]

//! This crate provides the way to "link" futures into a single block,
//! which stops executing once any of these futures complete.
//!
//! Under the hood, it uses [`FuturesUnordered`](https://docs.rs/futures/0.3.1/futures/stream/struct.FuturesUnordered.html)
//! to execute multiple futures efficiently. In order to avoid boxing, custom `one-of` type from
//! [`one-of-futures`](https://crates.io/crates/one-of-futures) crate is generated for
//! each [`link_futures`](macro.link_futures.html) block.

pub use futures::stream::{FuturesUnordered, StreamExt};
pub use one_of_futures::impl_one_of;

/// Create necessary enums for later usage with [`link_futures`](macro.link_futures.html)
#[macro_export]
macro_rules! linked_block {
    ( $one_of_block:ident, $identifier_enum:ident; $($variants:ident),* ) => {
        $crate::impl_one_of!($one_of_block; $($variants),*);

        #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
        enum $identifier_enum {
            $($variants),*
        }
    }
}

/// Link multiple futures into a single block
///
/// Example:
/// ```rust
/// use std::time::Duration;
///
/// use futures::{pin_mut, SinkExt, StreamExt};
/// use futures::channel::mpsc;
/// use futures::executor::block_on;
/// use tokio::time::{delay_for, interval, Instant};
///
/// use linked_futures::{link_futures, linked_block};
///
/// linked_block!(PeriodicStoppableSender, PeriodicStoppableSenderFutureIdentifier;
///     Forwarder,
///     Reader,
///     Generator,
///     Stop
/// );
///
/// #[tokio::main]
/// async fn main() {
///     let (mut tx1, mut rx1) = mpsc::channel::<Instant>(1);
///     let (mut tx2, mut rx2) = mpsc::channel::<Instant>(1);
///
///     let mut interval = interval(Duration::from_millis(100));
///
///     let generator = async {
///         while let Some(instant) = interval.next().await {
///             tx1.send(instant).await;
///         }
///     };
///     let forwarder = async {
///         while let Some(instant) = rx1.next().await {
///             tx2.send(instant).await;
///         }
///     };
///     let reader = async {
///         while let Some(instant) = rx2.next().await {
///             println!("instant: {:?}", instant);
///         }
///     };
///     let stop = async {
///         delay_for(Duration::from_secs(1)).await;
///     };
///     let linked = link_futures!(
///        PeriodicStoppableSender,
///        PeriodicStoppableSenderFutureIdentifier;
///        Generator => generator,
///        Forwarder => forwarder,
///        Reader => reader,
///        Stop => stop
///     );
///     block_on(async {
///         pin_mut!(linked);
///         let (completed_future_identifier, _) = linked.await;
///         match completed_future_identifier {
///             PeriodicStoppableSenderFutureIdentifier::Stop =>
///                 println!("linked block stopped normally"),
///             n =>
///                 panic!("linked block unexpectedly terminated by future: {:?}", n),
///         }
///     });
/// }
/// ```
#[macro_export]
macro_rules! link_futures {
    ( $one_of_block:ident, $identifier_enum:ident; $( $key:ident => $value:expr ),* ) => {{
        let mut linked = $crate::FuturesUnordered::new();
        $( linked.push($one_of_block::$key(async {
            ($identifier_enum::$key, $value.await)
        })); )*
        async move {
            use $crate::StreamExt;

            linked.next().await.unwrap()
        }
    }};
}

#[cfg(test)]
mod tests {
    #[test]
    fn test_readme_deps() {
        version_sync::assert_markdown_deps_updated!("README.md");
    }

    #[test]
    fn test_html_root_url() {
        version_sync::assert_html_root_url_updated!("src/lib.rs");
    }
}