1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
use super::job::*;
use std::mem;
use futures::future::{Future, FutureObj, FutureExt};
use futures::task::{Context, Poll};
enum JobState<TFn> {
FutureNotCreated(TFn),
WaitingForFuture(FutureObj<'static, ()>),
Completed
}
impl<TFn, TFuture> JobState<TFn>
where TFn: FnOnce() -> TFuture+Send,
TFuture: 'static+Send+Future<Output=()> {
fn take(&mut self) -> Option<FutureObj<'static, ()>> {
let mut value = JobState::Completed;
mem::swap(self, &mut value);
match value {
JobState::FutureNotCreated(create_fn) => Some(FutureObj::new(Box::new(create_fn()))),
JobState::WaitingForFuture(future) => Some(future),
JobState::Completed => None
}
}
}
pub struct FutureJob<TFn> {
action: JobState<TFn>
}
impl<TFn, TFuture> FutureJob<TFn>
where TFn: FnOnce() -> TFuture+Send,
TFuture: 'static+Send+Future<Output=()> {
pub fn new(create_future: TFn) -> FutureJob<TFn> {
FutureJob { action: JobState::FutureNotCreated(create_future) }
}
}
impl<TFn, TFuture> ScheduledJob for FutureJob<TFn>
where TFn: FnOnce() -> TFuture+Send,
TFuture: 'static+Send+Future<Output=()> {
fn run(&mut self, context: &mut Context) -> Poll<()> {
let action = self.action.take();
if let Some(mut action) = action {
match action.poll_unpin(context) {
Poll::Ready(()) => Poll::Ready(()),
Poll::Pending => {
self.action = JobState::WaitingForFuture(action);
Poll::Pending
}
}
} else {
panic!("Cannot schedule an action twice");
}
}
}