#![deny(missing_debug_implementations, nonstandard_style)]
#![warn(missing_docs, unreachable_pub)]
use pin_project::{pin_project, pinned_drop};
use std::future::{Future, IntoFuture};
use std::pin::Pin;
use std::task::{Context, Poll};
use async_std::task;
pub mod prelude {
pub use super::IntoFutureExt as _;
}
#[derive(Debug)]
#[pin_project(PinnedDrop)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ParallelFuture<Fut: IntoFuture> {
into_future: Option<Fut>,
#[pin]
handle: Option<task::JoinHandle<Fut::Output>>,
}
impl<Fut> Future for ParallelFuture<Fut>
where
Fut: IntoFuture,
Fut::IntoFuture: Send + 'static,
Fut::Output: Send + 'static,
{
type Output = <Fut as IntoFuture>::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
if this.handle.is_none() {
let into_fut = this.into_future.take().unwrap().into_future();
let handle = task::spawn(into_fut.into_future());
*this.handle = Some(handle);
}
Pin::new(&mut this.handle.as_pin_mut().unwrap()).poll(cx)
}
}
#[pinned_drop]
impl<Fut: IntoFuture> PinnedDrop for ParallelFuture<Fut> {
fn drop(self: Pin<&mut Self>) {
let mut this = self.project();
if let Some(handle) = this.handle.take() {
let _ = handle.cancel();
}
}
}
pub trait IntoFutureExt: IntoFuture + Sized
where
<Self as IntoFuture>::IntoFuture: Send + 'static,
<Self as IntoFuture>::Output: Send + 'static,
{
fn par(self) -> ParallelFuture<Self> {
ParallelFuture {
into_future: Some(self),
handle: None,
}
}
}
impl<Fut> IntoFutureExt for Fut
where
Fut: IntoFuture,
<Fut as IntoFuture>::IntoFuture: Send + 'static,
<Fut as IntoFuture>::Output: Send + 'static,
{
}
#[cfg(test)]
mod test {
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use async_std::task;
use super::prelude::*;
#[test]
fn spawn() {
async_std::task::block_on(async {
let res = async { "nori is a horse" }.par().await;
assert_eq!(res, "nori is a horse");
})
}
#[test]
fn is_lazy() {
async_std::task::block_on(async {
let polled = Arc::new(Mutex::new(false));
let polled_2 = polled.clone();
let _res = async move {
*polled_2.lock().unwrap() = true;
}
.par();
task::sleep(Duration::from_millis(500)).await;
assert_eq!(*polled.lock().unwrap(), false);
})
}
}