closure_future/
lib.rs

1#![doc = include_str!("../README.md")]
2#![cfg_attr(feature = "nightly-docs", feature(doc_cfg))]
3
4use std::{
5    future::Future,
6    panic::{catch_unwind, UnwindSafe},
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11/// The error type
12#[derive(Debug, PartialEq, Eq, Clone, Copy)]
13pub enum RunError {
14    /// The wrapped closure panicked
15    Panicked,
16    /// The runner was dropped before it could complete
17    RunnerDropped,
18}
19
20#[must_use = "The future must be polled for the output to be observed"]
21/// A Future whose output will be the output of the closure
22pub struct ClosureFuture<Output> {
23    rcvr: async_oneshot::Receiver<Result<Output, RunError>>,
24}
25
26impl<Output> Future for ClosureFuture<Output> {
27    type Output = Result<Output, RunError>;
28
29    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
30        Pin::new(&mut self.rcvr).poll(cx).map(|res| match res {
31            Ok(res) => res,
32            Err(async_oneshot::Closed { .. }) => Err(RunError::RunnerDropped),
33        })
34    }
35}
36
37#[must_use = "The worker must be run for the future to make any progress."]
38/// A worker to ractually run the closure, using [`self.run()`](`Self::run`)
39pub struct ClosureFutureWorker<Output, F: FnOnce() -> Output> {
40    sndr: async_oneshot::Sender<Result<Output, RunError>>,
41    f: F,
42}
43
44impl<Output, F> ClosureFutureWorker<Output, F>
45where
46    F: FnOnce() -> Output,
47    //TODO: shall we rather wrap in AssertUnwindSafe below?
48    F: UnwindSafe,
49{
50    /// Runs the closure
51    pub fn run(mut self) {
52        let res = catch_unwind(self.f);
53        let to_store = res.map_err(|_| RunError::Panicked);
54        let _ = self.sndr.send(to_store);
55    }
56}
57
58/// Wraps a closure to be run as a Future
59pub fn closure_future<Output, F: FnOnce() -> Output>(
60    f: F,
61) -> (ClosureFuture<Output>, ClosureFutureWorker<Output, F>) {
62    let (sndr, rcvr) = async_oneshot::oneshot();
63    (ClosureFuture { rcvr }, ClosureFutureWorker { sndr, f })
64}
65
66#[cfg(feature = "rayon")]
67#[cfg_attr(feature = "nightly-docs", doc(cfg(feature = "rayon")))]
68/// An helper function to spawn a closure on the rayon global threadpool.
69/// Requires feature "rayon" to be activated.
70pub fn spawn_rayon<Output: Send + Sync + 'static, F: FnOnce() -> Output + UnwindSafe + Send + 'static>(
71    f: F,
72) -> ClosureFuture<Output> {
73    let (future, worker) = closure_future(f);
74    rayon::spawn(|| worker.run());
75    future
76}