desync/scheduler/
future_job.rs1use super::job::*;
2
3use std::mem;
4use futures::future::{Future, FutureObj, FutureExt};
5use futures::task::{Context, Poll};
6
7enum JobState<TFn> {
8 FutureNotCreated(TFn),
10
11 WaitingForFuture(FutureObj<'static, ()>),
13
14 Completed
16}
17
18impl<TFn, TFuture> JobState<TFn>
19where TFn: FnOnce() -> TFuture+Send,
20 TFuture: 'static+Send+Future<Output=()> {
21 fn take(&mut self) -> Option<FutureObj<'static, ()>> {
22 let mut value = JobState::Completed;
24 mem::swap(self, &mut value);
25
26 match value {
28 JobState::FutureNotCreated(create_fn) => Some(FutureObj::new(Box::new(create_fn()))),
30
31 JobState::WaitingForFuture(future) => Some(future),
33
34 JobState::Completed => None
36 }
37 }
38}
39
40pub struct FutureJob<TFn> {
44 action: JobState<TFn>
45}
46
47impl<TFn, TFuture> FutureJob<TFn>
48where TFn: FnOnce() -> TFuture+Send,
49 TFuture: 'static+Send+Future<Output=()> {
50 pub fn new(create_future: TFn) -> FutureJob<TFn> {
51 FutureJob { action: JobState::FutureNotCreated(create_future) }
52 }
53}
54
55impl<TFn, TFuture> ScheduledJob for FutureJob<TFn>
56where TFn: FnOnce() -> TFuture+Send,
57 TFuture: 'static+Send+Future<Output=()> {
58 fn run(&mut self, context: &mut Context) -> Poll<()> {
59 let action = self.action.take();
61
62 if let Some(mut action) = action {
63 match action.poll_unpin(context) {
64 Poll::Ready(()) => Poll::Ready(()),
65 Poll::Pending => {
66 self.action = JobState::WaitingForFuture(action);
67 Poll::Pending
68 }
69 }
70 } else {
71 panic!("Cannot schedule an action twice");
72 }
73 }
74}