parallel_future/lib.rs
1//! Structured parallel execution for async Rust.
2//!
3//! > Concurrency is a system-structuring mechanism, parallelism is a resource.
4//!
5//! This is a replacement for the common `Task` idiom. Rather than providing a
6//! separate family of APIs for concurrency and parallelism, this library
7//! provides a `ParallelFuture` type. When this type is scheduled concurrently
8//! it will provide parallel execution.
9//!
10//! # Examples
11//!
12//! ```
13//! use parallel_future::prelude::*;
14//! use futures_concurrency::prelude::*;
15//!
16//! async_std::task::block_on(async {
17//! let a = async { 1 }.par(); // ← returns `ParallelFuture`
18//! let b = async { 2 }.par(); // ← returns `ParallelFuture`
19//!
20//! let (a, b) = (a, b).join().await; // ← concurrent `.await`
21//! assert_eq!(a + b, 3);
22//! })
23//! ```
24//!
25//! # Limitations
26//!
27//! Rust does not yet provide a mechanism for async destructors. That means that
28//! on an early return of any kind, Rust can't guarantee that certain
29//! asynchronous operations run before others. This is a language-level
30//! limitation with no existing workarounds possible. `ParallelFuture` is designed to
31//! work with async destructors once they land.
32//!
33//! `ParallelFuture` starts lazily and does not provide a manual `detach`
34//! method. However it can be manually polled once and then passed to
35//! `mem::forget`, which will keep the future running on another thread. In the
36//! absence of unforgettable types (linear types), Rust cannot prevent
37//! `ParallelFuture`s from becoming unmanaged (dangling).
38
39#![deny(missing_debug_implementations, nonstandard_style)]
40#![warn(missing_docs, unreachable_pub)]
41
42use pin_project::{pin_project, pinned_drop};
43use std::future::{Future, IntoFuture};
44use std::pin::Pin;
45use std::task::{Context, Poll};
46
47use async_std::task;
48
49/// The `parallel-future` prelude.
50pub mod prelude {
51 pub use super::IntoFutureExt as _;
52}
53
54/// A parallelizable future.
55///
56/// This type is constructed by the [`par`][crate::IntoFutureExt::par] method on [`IntoFutureExt`][crate::IntoFutureExt].
57///
58/// # Examples
59///
60/// ```
61/// use parallel_future::prelude::*;
62/// use futures_concurrency::prelude::*;
63///
64/// async_std::task::block_on(async {
65/// let a = async { 1 }.par(); // ← returns `ParallelFuture`
66/// let b = async { 2 }.par(); // ← returns `ParallelFuture`
67///
68/// let (a, b) = (a, b).join().await; // ← concurrent `.await`
69/// assert_eq!(a + b, 3);
70/// })
71/// ```
72#[derive(Debug)]
73#[pin_project(PinnedDrop)]
74#[must_use = "futures do nothing unless you `.await` or poll them"]
75pub struct ParallelFuture<Fut: IntoFuture> {
76 into_future: Option<Fut>,
77 #[pin]
78 handle: Option<task::JoinHandle<Fut::Output>>,
79}
80
81impl<Fut> Future for ParallelFuture<Fut>
82where
83 Fut: IntoFuture,
84 Fut::IntoFuture: Send + 'static,
85 Fut::Output: Send + 'static,
86{
87 type Output = <Fut as IntoFuture>::Output;
88 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
89 let mut this = self.project();
90 if this.handle.is_none() {
91 let into_fut = this.into_future.take().unwrap().into_future();
92 let handle = task::spawn(into_fut.into_future());
93 *this.handle = Some(handle);
94 }
95 Pin::new(&mut this.handle.as_pin_mut().unwrap()).poll(cx)
96 }
97}
98
99/// Cancel the `ParallelFuture` when dropped.
100#[pinned_drop]
101impl<Fut: IntoFuture> PinnedDrop for ParallelFuture<Fut> {
102 fn drop(self: Pin<&mut Self>) {
103 let mut this = self.project();
104 if let Some(handle) = this.handle.take() {
105 let _ = handle.cancel();
106 }
107 }
108}
109
110/// Extend the `Future` trait.
111pub trait IntoFutureExt: IntoFuture + Sized
112where
113 <Self as IntoFuture>::IntoFuture: Send + 'static,
114 <Self as IntoFuture>::Output: Send + 'static,
115{
116 /// Convert this future into a parallelizable future.
117 ///
118 /// # Examples
119 ///
120 /// ```
121 /// use parallel_future::prelude::*;
122 /// use futures_concurrency::prelude::*;
123 ///
124 /// async_std::task::block_on(async {
125 /// let a = async { 1 }.par(); // ← returns `ParallelFuture`
126 /// let b = async { 2 }.par(); // ← returns `ParallelFuture`
127 ///
128 /// let (a, b) = (a, b).join().await; // ← concurrent `.await`
129 /// assert_eq!(a + b, 3);
130 /// })
131 /// ```
132 fn par(self) -> ParallelFuture<Self> {
133 ParallelFuture {
134 into_future: Some(self),
135 handle: None,
136 }
137 }
138}
139
140impl<Fut> IntoFutureExt for Fut
141where
142 Fut: IntoFuture,
143 <Fut as IntoFuture>::IntoFuture: Send + 'static,
144 <Fut as IntoFuture>::Output: Send + 'static,
145{
146}
147
148#[cfg(test)]
149mod test {
150 use std::{
151 sync::{Arc, Mutex},
152 time::Duration,
153 };
154
155 use async_std::task;
156
157 use super::prelude::*;
158
159 #[test]
160 fn spawn() {
161 async_std::task::block_on(async {
162 let res = async { "nori is a horse" }.par().await;
163 assert_eq!(res, "nori is a horse");
164 })
165 }
166
167 #[test]
168 fn is_lazy() {
169 async_std::task::block_on(async {
170 let polled = Arc::new(Mutex::new(false));
171 let polled_2 = polled.clone();
172 let _res = async move {
173 *polled_2.lock().unwrap() = true;
174 }
175 .par();
176
177 task::sleep(Duration::from_millis(500)).await;
178 assert_eq!(*polled.lock().unwrap(), false);
179 })
180 }
181}