microasync_util/
defer.rs

1extern crate std;
2
3use core::{future::Future, pin::Pin, task::{Context, Poll}};
4use std::{sync::mpsc::{Receiver, channel}, thread};
5
6/// A future that runs a function in a new thread without blocking.
7pub struct DeferredFuture<Args, T, F>
8where
9    Args: Send + 'static,
10    T: Send + 'static,
11    F: (FnOnce(Args) -> T) + Send + 'static,
12{
13    has_started: bool,
14    args: Option<Args>,
15    f: Option<F>,
16    receiver: Option<Receiver<T>>,
17}
18
19impl<Args, T, F> DeferredFuture<Args, T, F>
20where
21    Args: Send + 'static,
22    T: Send + 'static,
23    F: (FnOnce(Args) -> T) + Send + 'static,
24{
25    fn new(f: F, args: Args) -> Self {
26        Self {
27            has_started: false,
28            args: Some(args),
29            f: Some(f),
30            receiver: None,
31        }
32    }
33}
34
35impl<Args, T, F> Future for DeferredFuture<Args, T, F>
36where
37    Args: Send + 'static,
38    T: Send + 'static,
39    F: (FnOnce(Args) -> T) + Send + 'static,
40{
41    type Output = T;
42
43    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
44        // SAFETY: The pin necessarily lives as long as poll() and is owned, so won't be modified
45        let me = unsafe {
46            self.get_unchecked_mut()
47        };
48        // SAFETY: All .take().unwrap() calls can never panic because has_started will only be
49        // false if the Options are still present.
50        if !me.has_started {
51            let (arg_sender, arg_receiver) = channel();
52            let (t_sender, t_receiver) = channel();
53            let f = me.f.take().unwrap();
54            me.receiver = Some(t_receiver);
55            arg_sender.send(me.args.take().unwrap()).expect("broken channel");
56            thread::spawn(move || {
57                let (sender, receiver) = (t_sender, arg_receiver);
58                let args = receiver.recv().expect("unable to recv args");
59                sender.send(f(args)).expect("DeferredFuture completed, but thread was not ready yet.");
60            });
61            me.has_started = true;
62            return Poll::Pending;
63        }
64        if let Ok(x) = me.receiver.as_ref().unwrap().try_recv() {
65            Poll::Ready(x)
66        }
67        else {
68            Poll::Pending
69        }
70    }
71}
72
73/// Returns a DeferredFuture, which runs a computationally expensive task in a new thread.
74pub fn defer<Args, T, F>(f: F, args: Args) -> DeferredFuture<Args, T, F> 
75where
76    Args: Send + 'static,
77    T: Send + 'static,
78    F: (FnOnce(Args) -> T) + Send + 'static,
79{
80    DeferredFuture::new(f, args)
81}