parallel_future/
lib.rs

1//! Structured parallel execution for async Rust.
2//!
3//! > Concurrency is a system-structuring mechanism, parallelism is a resource.
4//!
5//! This is a replacement for the common `Task` idiom. Rather than providing a
6//! separate family of APIs for concurrency and parallelism, this library
7//! provides a `ParallelFuture` type. When this type is scheduled concurrently
8//! it will provide parallel execution.
9//!
10//! # Examples
11//!
12//! ```
13//! use parallel_future::prelude::*;
14//! use futures_concurrency::prelude::*;
15//!
16//! async_std::task::block_on(async {
17//!     let a = async { 1 }.par();        // ← returns `ParallelFuture`
18//!     let b = async { 2 }.par();        // ← returns `ParallelFuture`
19//!
20//!     let (a, b) = (a, b).join().await; // ← concurrent `.await`
21//!     assert_eq!(a + b, 3);
22//! })
23//! ```
24//!
25//! # Limitations
26//!
27//! Rust does not yet provide a mechanism for async destructors. That means that
28//! on an early return of any kind, Rust can't guarantee that certain
29//! asynchronous operations run before others. This is a language-level
30//! limitation with no existing workarounds possible. `ParallelFuture` is designed to
31//! work with async destructors once they land.
32//!
33//! `ParallelFuture` starts lazily and does not provide a manual `detach`
34//! method. However it can be manually polled once and then passed to
35//! `mem::forget`, which will keep the future running on another thread. In the
36//! absence of unforgettable types (linear types), Rust cannot prevent
37//! `ParallelFuture`s from becoming unmanaged (dangling).
38
39#![deny(missing_debug_implementations, nonstandard_style)]
40#![warn(missing_docs, unreachable_pub)]
41
42use pin_project::{pin_project, pinned_drop};
43use std::future::{Future, IntoFuture};
44use std::pin::Pin;
45use std::task::{Context, Poll};
46
47use async_std::task;
48
49/// The `parallel-future` prelude.
50pub mod prelude {
51    pub use super::IntoFutureExt as _;
52}
53
54/// A parallelizable future.
55///
56/// This type is constructed by the [`par`][crate::IntoFutureExt::par] method on [`IntoFutureExt`][crate::IntoFutureExt].
57///
58/// # Examples
59///
60/// ```
61/// use parallel_future::prelude::*;
62/// use futures_concurrency::prelude::*;
63///
64/// async_std::task::block_on(async {
65///     let a = async { 1 }.par();        // ← returns `ParallelFuture`
66///     let b = async { 2 }.par();        // ← returns `ParallelFuture`
67///
68///     let (a, b) = (a, b).join().await; // ← concurrent `.await`
69///     assert_eq!(a + b, 3);
70/// })
71/// ```
72#[derive(Debug)]
73#[pin_project(PinnedDrop)]
74#[must_use = "futures do nothing unless you `.await` or poll them"]
75pub struct ParallelFuture<Fut: IntoFuture> {
76    into_future: Option<Fut>,
77    #[pin]
78    handle: Option<task::JoinHandle<Fut::Output>>,
79}
80
81impl<Fut> Future for ParallelFuture<Fut>
82where
83    Fut: IntoFuture,
84    Fut::IntoFuture: Send + 'static,
85    Fut::Output: Send + 'static,
86{
87    type Output = <Fut as IntoFuture>::Output;
88    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
89        let mut this = self.project();
90        if this.handle.is_none() {
91            let into_fut = this.into_future.take().unwrap().into_future();
92            let handle = task::spawn(into_fut.into_future());
93            *this.handle = Some(handle);
94        }
95        Pin::new(&mut this.handle.as_pin_mut().unwrap()).poll(cx)
96    }
97}
98
99/// Cancel the `ParallelFuture` when dropped.
100#[pinned_drop]
101impl<Fut: IntoFuture> PinnedDrop for ParallelFuture<Fut> {
102    fn drop(self: Pin<&mut Self>) {
103        let mut this = self.project();
104        if let Some(handle) = this.handle.take() {
105            let _ = handle.cancel();
106        }
107    }
108}
109
110/// Extend the `Future` trait.
111pub trait IntoFutureExt: IntoFuture + Sized
112where
113    <Self as IntoFuture>::IntoFuture: Send + 'static,
114    <Self as IntoFuture>::Output: Send + 'static,
115{
116    /// Convert this future into a parallelizable future.
117    ///
118    /// # Examples
119    ///
120    /// ```
121    /// use parallel_future::prelude::*;
122    /// use futures_concurrency::prelude::*;
123    ///
124    /// async_std::task::block_on(async {
125    ///     let a = async { 1 }.par();        // ← returns `ParallelFuture`
126    ///     let b = async { 2 }.par();        // ← returns `ParallelFuture`
127    ///
128    ///     let (a, b) = (a, b).join().await; // ← concurrent `.await`
129    ///     assert_eq!(a + b, 3);
130    /// })
131    /// ```
132    fn par(self) -> ParallelFuture<Self> {
133        ParallelFuture {
134            into_future: Some(self),
135            handle: None,
136        }
137    }
138}
139
140impl<Fut> IntoFutureExt for Fut
141where
142    Fut: IntoFuture,
143    <Fut as IntoFuture>::IntoFuture: Send + 'static,
144    <Fut as IntoFuture>::Output: Send + 'static,
145{
146}
147
148#[cfg(test)]
149mod test {
150    use std::{
151        sync::{Arc, Mutex},
152        time::Duration,
153    };
154
155    use async_std::task;
156
157    use super::prelude::*;
158
159    #[test]
160    fn spawn() {
161        async_std::task::block_on(async {
162            let res = async { "nori is a horse" }.par().await;
163            assert_eq!(res, "nori is a horse");
164        })
165    }
166
167    #[test]
168    fn is_lazy() {
169        async_std::task::block_on(async {
170            let polled = Arc::new(Mutex::new(false));
171            let polled_2 = polled.clone();
172            let _res = async move {
173                *polled_2.lock().unwrap() = true;
174            }
175            .par();
176
177            task::sleep(Duration::from_millis(500)).await;
178            assert_eq!(*polled.lock().unwrap(), false);
179        })
180    }
181}