1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
//! Structured parallel execution for async Rust.
//!
//! > Concurrency is a system-structuring mechanism, parallelism is a resource.
//!
//! This is a replacement for the common `Task` idiom. Rather than providing a
//! separate family of APIs for concurrency and parallelism, this library
//! provides a `ParallelFuture` type. When this type is scheduled concurrently
//! it will provide parallel execution.
//!
//! # Examples
//!
//! ```
//! use parallel_future::prelude::*;
//! use futures_concurrency::prelude::*;
//!
//! async_std::task::block_on(async {
//! let a = async { 1 }.par(); // ← returns `ParallelFuture`
//! let b = async { 2 }.par(); // ← returns `ParallelFuture`
//!
//! let (a, b) = (a, b).join().await; // ← concurrent `.await`
//! assert_eq!(a + b, 3);
//! })
//! ```
//!
//! # Limitations
//!
//! Rust does not yet provide a mechanism for async destructors. That means that
//! on an early return of any kind, Rust can't guarantee that certain
//! asynchronous operations run before others. This is a language-level
//! limitation with no existing workarounds possible. `ParallelFuture` is designed to
//! work with async destructors once they land.
//!
//! `ParallelFuture` starts lazily and does not provide a manual `detach`
//! method. However it can be manually polled once and then passed to
//! `mem::forget`, which will keep the future running on another thread. In the
//! absence of unforgettable types (linear types), Rust cannot prevent
//! `ParallelFuture`s from becoming unmanaged (dangling).
#![deny(missing_debug_implementations, nonstandard_style)]
#![warn(missing_docs, unreachable_pub)]
use pin_project::{pin_project, pinned_drop};
use std::future::{Future, IntoFuture};
use std::pin::Pin;
use std::task::{Context, Poll};
use async_std::task;
/// The `parallel-future` prelude.
pub mod prelude {
pub use super::IntoFutureExt as _;
}
/// A parallelizable future.
///
/// This type is constructed by the [`par`][crate::IntoFutureExt::par] method on [`IntoFutureExt`][crate::IntoFutureExt].
///
/// # Examples
///
/// ```
/// use parallel_future::prelude::*;
/// use futures_concurrency::prelude::*;
///
/// async_std::task::block_on(async {
/// let a = async { 1 }.par(); // ← returns `ParallelFuture`
/// let b = async { 2 }.par(); // ← returns `ParallelFuture`
///
/// let (a, b) = (a, b).join().await; // ← concurrent `.await`
/// assert_eq!(a + b, 3);
/// })
/// ```
#[derive(Debug)]
#[pin_project(PinnedDrop)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ParallelFuture<Fut: Future> {
#[pin]
handle: Option<task::JoinHandle<Fut::Output>>,
}
impl<Fut> Future for ParallelFuture<Fut>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
type Output = <Fut as Future>::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
Pin::new(&mut this.handle.as_pin_mut().unwrap()).poll(cx)
}
}
/// Cancel the `ParallelFuture` when dropped.
#[pinned_drop]
impl<Fut: Future> PinnedDrop for ParallelFuture<Fut> {
fn drop(self: Pin<&mut Self>) {
let mut this = self.project();
let handle = this.handle.take().unwrap();
let _ = handle.cancel();
}
}
/// Extend the `Future` trait.
pub trait IntoFutureExt: IntoFuture + Sized
where
<Self as IntoFuture>::IntoFuture: Send + 'static,
<Self as IntoFuture>::Output: Send + 'static,
{
/// Convert this future into a parallelizable future.
///
/// # Examples
///
/// ```
/// use parallel_future::prelude::*;
/// use futures_concurrency::prelude::*;
///
/// async_std::task::block_on(async {
/// let a = async { 1 }.par(); // ← returns `ParallelFuture`
/// let b = async { 2 }.par(); // ← returns `ParallelFuture`
///
/// let (a, b) = (a, b).join().await; // ← concurrent `.await`
/// assert_eq!(a + b, 3);
/// })
/// ```
fn par(self) -> ParallelFuture<<Self as IntoFuture>::IntoFuture> {
ParallelFuture {
handle: Some(task::spawn(self.into_future())),
}
}
}
impl<Fut> IntoFutureExt for Fut
where
Fut: IntoFuture,
<Fut as IntoFuture>::IntoFuture: Send + 'static,
<Fut as IntoFuture>::Output: Send + 'static,
{
}
#[cfg(test)]
mod test {
use super::prelude::*;
#[test]
fn spawn() {
async_std::task::block_on(async {
let res = async { "nori is a horse" }.par().await;
assert_eq!(res, "nori is a horse");
})
}
}