desync/scheduler/
future_job.rs

1use super::job::*;
2
3use std::mem;
4use futures::future::{Future, FutureObj, FutureExt};
5use futures::task::{Context, Poll};
6
7enum JobState<TFn> {
8    /// Need to call the function to create the underlying future
9    FutureNotCreated(TFn),
10
11    /// The future is waiting to be evaluated
12    WaitingForFuture(FutureObj<'static, ()>),
13
14    /// The future has completed
15    Completed
16}
17
18impl<TFn, TFuture> JobState<TFn>
19where   TFn:        FnOnce() -> TFuture+Send,
20        TFuture:    'static+Send+Future<Output=()> {
21    fn take(&mut self) -> Option<FutureObj<'static, ()>> {
22        // Move the value out of this object
23        let mut value = JobState::Completed;
24        mem::swap(self, &mut value);
25
26        // Result depends on the current state
27        match value {
28            // Create the futureobj if we're in the 'not created' state, create the future
29            JobState::FutureNotCreated(create_fn)   => Some(FutureObj::new(Box::new(create_fn()))),
30
31            // Return the active future if there is one
32            JobState::WaitingForFuture(future)      => Some(future),
33
34            // The future has gone away in the completed state
35            JobState::Completed                     => None
36        }
37    }
38}
39
40///
41/// Job that evaluates a future
42///
43pub struct FutureJob<TFn> {
44    action: JobState<TFn>
45}
46
47impl<TFn, TFuture> FutureJob<TFn>
48where   TFn:        FnOnce() -> TFuture+Send,
49        TFuture:    'static+Send+Future<Output=()> {
50    pub fn new(create_future: TFn) -> FutureJob<TFn> {
51        FutureJob { action: JobState::FutureNotCreated(create_future) }
52    }
53}
54
55impl<TFn, TFuture> ScheduledJob for FutureJob<TFn>
56where   TFn:        FnOnce() -> TFuture+Send,
57        TFuture:    'static+Send+Future<Output=()> {
58    fn run(&mut self, context: &mut Context) -> Poll<()> {
59        // Consume the action when it's run
60        let action = self.action.take();
61
62        if let Some(mut action) = action {
63            match action.poll_unpin(context) {
64                Poll::Ready(()) => Poll::Ready(()),
65                Poll::Pending   => {
66                    self.action = JobState::WaitingForFuture(action);
67                    Poll::Pending
68                }
69            }
70        } else {
71            panic!("Cannot schedule an action twice");
72        }
73    }
74}