1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
//! Structured parallel execution for async Rust.
//!
//! > Concurrency is a system-structuring mechanism, parallelism is a resource.
//!
//! This is a replacement for the common `Task` idiom. Rather than providing a
//! separate family of APIs for concurrency and parallelism, this library
//! provides a `ParallelFuture` type. When this type is scheduled concurrently
//! it will provide parallel execution.
//!
//! # Examples
//!
//! ```
//! use parallel_future::prelude::*;
//! use futures_concurrency::prelude::*;
//!
//! async_std::task::block_on(async {
//!     let a = async { 1 }.par();        // ← returns `ParallelFuture`
//!     let b = async { 2 }.par();        // ← returns `ParallelFuture`
//!
//!     let (a, b) = (a, b).join().await; // ← concurrent `.await`
//!     assert_eq!(a + b, 3);
//! })
//! ```
//!
//! # Limitations
//!
//! Rust does not yet provide a mechanism for async destructors. That means that
//! on an early return of any kind, Rust can't guarantee that certain
//! asynchronous operations run before others. This is a language-level
//! limitation with no existing workarounds possible. `ParallelFuture` is designed to
//! work with async destructors once they land.
//!
//! `ParallelFuture` starts lazily and does not provide a manual `detach`
//! method. However it can be manually polled once and then passed to
//! `mem::forget`, which will keep the future running on another thread. In the
//! absence of unforgettable types (linear types), Rust cannot prevent
//! `ParallelFuture`s from becoming unmanaged (dangling).

#![deny(missing_debug_implementations, nonstandard_style)]
#![warn(missing_docs, unreachable_pub)]

use pin_project::{pin_project, pinned_drop};
use std::future::{Future, IntoFuture};
use std::pin::Pin;
use std::task::{Context, Poll};

use async_std::task;

/// The `parallel-future` prelude.
pub mod prelude {
    pub use super::IntoFutureExt as _;
}

/// A parallelizable future.
///
/// This type is constructed by the [`par`][crate::IntoFutureExt::par] method on [`IntoFutureExt`][crate::IntoFutureExt].
///
/// # Examples
///
/// ```
/// use parallel_future::prelude::*;
/// use futures_concurrency::prelude::*;
///
/// async_std::task::block_on(async {
///     let a = async { 1 }.par();        // ← returns `ParallelFuture`
///     let b = async { 2 }.par();        // ← returns `ParallelFuture`
///
///     let (a, b) = (a, b).join().await; // ← concurrent `.await`
///     assert_eq!(a + b, 3);
/// })
/// ```
#[derive(Debug)]
#[pin_project(PinnedDrop)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ParallelFuture<Fut: Future> {
    #[pin]
    handle: Option<task::JoinHandle<Fut::Output>>,
}

impl<Fut> Future for ParallelFuture<Fut>
where
    Fut: Future + Send + 'static,
    Fut::Output: Send + 'static,
{
    type Output = <Fut as Future>::Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        Pin::new(&mut this.handle.as_pin_mut().unwrap()).poll(cx)
    }
}

/// Cancel the `ParallelFuture` when dropped.
#[pinned_drop]
impl<Fut: Future> PinnedDrop for ParallelFuture<Fut> {
    fn drop(self: Pin<&mut Self>) {
        let mut this = self.project();
        let handle = this.handle.take().unwrap();
        let _ = handle.cancel();
    }
}

/// Extend the `Future` trait.
pub trait IntoFutureExt: IntoFuture + Sized
where
    <Self as IntoFuture>::IntoFuture: Send + 'static,
    <Self as IntoFuture>::Output: Send + 'static,
{
    /// Convert this future into a parallelizable future.
    ///
    /// # Examples
    ///
    /// ```
    /// use parallel_future::prelude::*;
    /// use futures_concurrency::prelude::*;
    ///
    /// async_std::task::block_on(async {
    ///     let a = async { 1 }.par();        // ← returns `ParallelFuture`
    ///     let b = async { 2 }.par();        // ← returns `ParallelFuture`
    ///
    ///     let (a, b) = (a, b).join().await; // ← concurrent `.await`
    ///     assert_eq!(a + b, 3);
    /// })
    /// ```
    fn par(self) -> ParallelFuture<<Self as IntoFuture>::IntoFuture> {
        ParallelFuture {
            handle: Some(task::spawn(self.into_future())),
        }
    }
}

impl<Fut> IntoFutureExt for Fut
where
    Fut: IntoFuture,
    <Fut as IntoFuture>::IntoFuture: Send + 'static,
    <Fut as IntoFuture>::Output: Send + 'static,
{
}

#[cfg(test)]
mod test {
    use super::prelude::*;

    #[test]
    fn spawn() {
        async_std::task::block_on(async {
            let res = async { "nori is a horse" }.par().await;
            assert_eq!(res, "nori is a horse");
        })
    }
}