1#![cfg(not(feature = "no_std"))]
2extern crate std;
3
4use core::{
5 future::Future,
6 pin::Pin,
7 task::{Context, Poll},
8};
9use std::{
10 sync::mpsc::{channel, Receiver},
11 thread,
12};
13
14pub struct DeferredFuture<Args, T, F>
16where
17 Args: Send + 'static,
18 T: Send + 'static,
19 F: (FnOnce(Args) -> T) + Send + 'static,
20{
21 has_started: bool,
22 args: Option<Args>,
23 f: Option<F>,
24 receiver: Option<Receiver<T>>,
25}
26
27impl<Args, T, F> DeferredFuture<Args, T, F>
28where
29 Args: Send + 'static,
30 T: Send + 'static,
31 F: (FnOnce(Args) -> T) + Send + 'static,
32{
33 fn new(f: F, args: Args) -> Self {
34 Self {
35 has_started: false,
36 args: Some(args),
37 f: Some(f),
38 receiver: None,
39 }
40 }
41}
42
43impl<Args, T, F> Future for DeferredFuture<Args, T, F>
44where
45 Args: Send + 'static,
46 T: Send + 'static,
47 F: (FnOnce(Args) -> T) + Send + 'static,
48{
49 type Output = T;
50
51 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
52 let me = unsafe { self.get_unchecked_mut() };
54 if !me.has_started {
57 let (arg_sender, arg_receiver) = channel();
58 let (t_sender, t_receiver) = channel();
59 let f = me.f.take().unwrap();
60 me.receiver = Some(t_receiver);
61 arg_sender
62 .send(me.args.take().unwrap())
63 .expect("broken channel");
64 thread::spawn(move || {
65 let (sender, receiver) = (t_sender, arg_receiver);
66 let args = receiver.recv().expect("unable to recv args");
67 sender
68 .send(f(args))
69 .expect("DeferredFuture completed, but thread was not ready yet.");
70 });
71 me.has_started = true;
72 return Poll::Pending;
73 }
74 if let Ok(x) = me.receiver.as_ref().unwrap().try_recv() {
75 Poll::Ready(x)
76 } else {
77 Poll::Pending
78 }
79 }
80}
81
82pub fn defer<Args, T, F>(f: F, args: Args) -> DeferredFuture<Args, T, F>
85where
86 Args: Send + 'static,
87 T: Send + 'static,
88 F: (FnOnce(Args) -> T) + Send + 'static,
89{
90 DeferredFuture::new(f, args)
91}