scheduled_executor/
task_group.rs

1//! Task groups can be used whenever there is a sequence of tasks that need to be executed at
2//! regular intervals, and the sequence can change across different cycles.
3//!
4//! As an example lets suppose we have a list of servers that we want to healthcheck at regular
5//! intervals. First, we need to know the list of servers, and the list could change at any time,
6//! and once we have the list we need to schedule the health check. Refer to `task_group.rs` in
7//! the example folder to see how such a check could be scheduled.
8//!
9use futures::future::Future;
10use futures_cpupool::CpuPool;
11use tokio_core::reactor::{Handle, Remote, Timeout};
12
13use executor::{CoreExecutor, ThreadPoolExecutor};
14
15use std::sync::Arc;
16use std::time::Duration;
17
18/// Defines a group of tasks. Task groups allow you to schedule the execution of different tasks
19/// uniformly in a specific interval. The task discovery will be performed by `get_tasks` that will
20/// return a list of task ids. The returned task ids will be used by the `execute` function to
21/// run the specified task. `get_tasks` will be executed one per interval, while `execute` will
22/// be executed every `interval` / number of tasks.
23/// See also: example in the module documentation.
24pub trait TaskGroup: Send + Sync + Sized + 'static {
25    type TaskId: Send;
26
27    /// Runs at the beginning of each cycle and generates the list of task ids.
28    fn get_tasks(&self) -> Vec<Self::TaskId>;
29
30    /// Runs once per task id per cycle.
31    fn execute(&self, Self::TaskId);
32}
33
34fn schedule_tasks_local<T: TaskGroup>(task_group: &Arc<T>, interval: Duration, handle: &Handle) {
35    let tasks = task_group.get_tasks();
36    if tasks.is_empty() {
37        return
38    }
39    let task_interval = interval / tasks.len() as u32;
40    for (i, task) in tasks.into_iter().enumerate() {
41        let task_group_clone = task_group.clone();
42        let t = Timeout::new(task_interval * i as u32, handle).unwrap()
43            .then(move |_| {
44                task_group_clone.execute(task);
45                Ok::<(), ()>(())
46            });
47        handle.spawn(t);
48    }
49}
50
51fn schedule_tasks_remote<T: TaskGroup>(task_group: &Arc<T>, interval: Duration, remote: &Remote, pool: &CpuPool) {
52    let tasks = task_group.get_tasks();
53    if tasks.is_empty() {
54        return
55    }
56    let task_interval = interval / tasks.len() as u32;
57    for (i, task) in tasks.into_iter().enumerate() {
58        let task_group = task_group.clone();
59        let pool = pool.clone();
60
61        remote.spawn(move |handle| {
62            let task_group = task_group.clone();
63            let pool = pool.clone();
64            let t = Timeout::new(task_interval * i as u32, handle).unwrap()
65                .then(move |_| {
66                    task_group.execute(task);
67                    Ok::<(), ()>(())
68                });
69            handle.spawn(pool.spawn(t));
70            Ok::<(), ()>(())
71        })
72    }
73}
74
75/// Allows the execution of a `TaskGroup`.
76pub trait TaskGroupScheduler {
77    fn schedule<T: TaskGroup>(&self, task_group: T, initial: Duration, interval: Duration) -> Arc<T>;
78}
79
80impl TaskGroupScheduler for CoreExecutor {
81    fn schedule<T: TaskGroup>(&self, task_group: T, initial: Duration, interval: Duration) -> Arc<T> {
82        let task_group = Arc::new(task_group);
83        let task_group_clone = task_group.clone();
84        self.schedule_fixed_rate(
85            initial,
86            interval,
87            move |handle| {
88                schedule_tasks_local(&task_group_clone, interval, handle);
89            }
90        );
91        task_group
92    }
93}
94
95impl TaskGroupScheduler for ThreadPoolExecutor {
96    fn schedule<T: TaskGroup>(&self, task_group: T, initial: Duration, interval: Duration) -> Arc<T> {
97        let task_group = Arc::new(task_group);
98        let task_group_clone = task_group.clone();
99        let pool = self.pool().clone();
100        self.schedule_fixed_rate(
101            initial,
102            interval,
103            move |remote| {
104                schedule_tasks_remote(&task_group_clone, interval, remote, &pool);
105            }
106        );
107        task_group
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use std::sync::{Arc, RwLock};
114    use std::thread;
115    use std::time::{Duration, Instant};
116
117    use task_group::{TaskGroup, TaskGroupScheduler};
118    use executor::ThreadPoolExecutor;
119
120    type TaskExecutions = Vec<Vec<Instant>>;
121    struct TestGroup {
122        executions_lock: Arc<RwLock<TaskExecutions>>,
123    }
124
125    impl TestGroup {
126        fn new() -> TestGroup {
127            let executions = (0..5).map(|_| Vec::new()).collect::<Vec<_>>();
128            TestGroup {
129                executions_lock : Arc::new(RwLock::new(executions))
130            }
131        }
132
133        fn executions_lock(&self) -> Arc<RwLock<TaskExecutions>> {
134            self.executions_lock.clone()
135        }
136    }
137
138    impl TaskGroup for TestGroup {
139        type TaskId = usize;
140
141        fn get_tasks(&self) -> Vec<usize> {
142            vec![0, 1, 2, 3, 4]
143        }
144
145        fn execute(&self, task_id: usize) {
146            let mut executions = self.executions_lock.write().unwrap();
147            executions[task_id].push(Instant::now());
148        }
149    }
150
151    #[test]
152    fn task_group_test() {
153        let group = TestGroup::new();
154        let executions_lock = group.executions_lock();
155        {
156            let executor = ThreadPoolExecutor::new(4).unwrap();
157            executor.schedule(group, Duration::from_secs(0), Duration::from_secs(4));
158            thread::sleep(Duration::from_millis(11800));
159        }
160
161        let executions = &executions_lock.read().unwrap();
162        // There were 5 tasks
163        assert!(executions.len() == 5);
164        for task in 0..5 {
165            // each of them executed 3 times
166            assert!(executions[task].len() == 3);
167            for run in 1..3 {
168                // with 4 seconds between each of them
169                let task_interval = executions[task][run] - executions[task][run-1];
170                assert!(task_interval < Duration::from_millis(4500));
171                assert!(task_interval > Duration::from_millis(500));
172            }
173        }
174        for i in 1..15 {
175            let task = i % 5;
176            let run = i / 5;
177            let task_prev = (i - 1) % 5;
178            let run_prev = (i - 1) / 5;
179            let inter_task_interval = executions[task][run] - executions[task_prev][run_prev];
180            assert!(inter_task_interval < Duration::from_millis(1500));
181            assert!(inter_task_interval > Duration::from_millis(500));
182        }
183    }
184}