scheduled_executor/
task_group.rs1use futures::future::Future;
10use futures_cpupool::CpuPool;
11use tokio_core::reactor::{Handle, Remote, Timeout};
12
13use executor::{CoreExecutor, ThreadPoolExecutor};
14
15use std::sync::Arc;
16use std::time::Duration;
17
18pub trait TaskGroup: Send + Sync + Sized + 'static {
25 type TaskId: Send;
26
27 fn get_tasks(&self) -> Vec<Self::TaskId>;
29
30 fn execute(&self, Self::TaskId);
32}
33
34fn schedule_tasks_local<T: TaskGroup>(task_group: &Arc<T>, interval: Duration, handle: &Handle) {
35 let tasks = task_group.get_tasks();
36 if tasks.is_empty() {
37 return
38 }
39 let task_interval = interval / tasks.len() as u32;
40 for (i, task) in tasks.into_iter().enumerate() {
41 let task_group_clone = task_group.clone();
42 let t = Timeout::new(task_interval * i as u32, handle).unwrap()
43 .then(move |_| {
44 task_group_clone.execute(task);
45 Ok::<(), ()>(())
46 });
47 handle.spawn(t);
48 }
49}
50
51fn schedule_tasks_remote<T: TaskGroup>(task_group: &Arc<T>, interval: Duration, remote: &Remote, pool: &CpuPool) {
52 let tasks = task_group.get_tasks();
53 if tasks.is_empty() {
54 return
55 }
56 let task_interval = interval / tasks.len() as u32;
57 for (i, task) in tasks.into_iter().enumerate() {
58 let task_group = task_group.clone();
59 let pool = pool.clone();
60
61 remote.spawn(move |handle| {
62 let task_group = task_group.clone();
63 let pool = pool.clone();
64 let t = Timeout::new(task_interval * i as u32, handle).unwrap()
65 .then(move |_| {
66 task_group.execute(task);
67 Ok::<(), ()>(())
68 });
69 handle.spawn(pool.spawn(t));
70 Ok::<(), ()>(())
71 })
72 }
73}
74
75pub trait TaskGroupScheduler {
77 fn schedule<T: TaskGroup>(&self, task_group: T, initial: Duration, interval: Duration) -> Arc<T>;
78}
79
80impl TaskGroupScheduler for CoreExecutor {
81 fn schedule<T: TaskGroup>(&self, task_group: T, initial: Duration, interval: Duration) -> Arc<T> {
82 let task_group = Arc::new(task_group);
83 let task_group_clone = task_group.clone();
84 self.schedule_fixed_rate(
85 initial,
86 interval,
87 move |handle| {
88 schedule_tasks_local(&task_group_clone, interval, handle);
89 }
90 );
91 task_group
92 }
93}
94
95impl TaskGroupScheduler for ThreadPoolExecutor {
96 fn schedule<T: TaskGroup>(&self, task_group: T, initial: Duration, interval: Duration) -> Arc<T> {
97 let task_group = Arc::new(task_group);
98 let task_group_clone = task_group.clone();
99 let pool = self.pool().clone();
100 self.schedule_fixed_rate(
101 initial,
102 interval,
103 move |remote| {
104 schedule_tasks_remote(&task_group_clone, interval, remote, &pool);
105 }
106 );
107 task_group
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use std::sync::{Arc, RwLock};
114 use std::thread;
115 use std::time::{Duration, Instant};
116
117 use task_group::{TaskGroup, TaskGroupScheduler};
118 use executor::ThreadPoolExecutor;
119
120 type TaskExecutions = Vec<Vec<Instant>>;
121 struct TestGroup {
122 executions_lock: Arc<RwLock<TaskExecutions>>,
123 }
124
125 impl TestGroup {
126 fn new() -> TestGroup {
127 let executions = (0..5).map(|_| Vec::new()).collect::<Vec<_>>();
128 TestGroup {
129 executions_lock : Arc::new(RwLock::new(executions))
130 }
131 }
132
133 fn executions_lock(&self) -> Arc<RwLock<TaskExecutions>> {
134 self.executions_lock.clone()
135 }
136 }
137
138 impl TaskGroup for TestGroup {
139 type TaskId = usize;
140
141 fn get_tasks(&self) -> Vec<usize> {
142 vec![0, 1, 2, 3, 4]
143 }
144
145 fn execute(&self, task_id: usize) {
146 let mut executions = self.executions_lock.write().unwrap();
147 executions[task_id].push(Instant::now());
148 }
149 }
150
151 #[test]
152 fn task_group_test() {
153 let group = TestGroup::new();
154 let executions_lock = group.executions_lock();
155 {
156 let executor = ThreadPoolExecutor::new(4).unwrap();
157 executor.schedule(group, Duration::from_secs(0), Duration::from_secs(4));
158 thread::sleep(Duration::from_millis(11800));
159 }
160
161 let executions = &executions_lock.read().unwrap();
162 assert!(executions.len() == 5);
164 for task in 0..5 {
165 assert!(executions[task].len() == 3);
167 for run in 1..3 {
168 let task_interval = executions[task][run] - executions[task][run-1];
170 assert!(task_interval < Duration::from_millis(4500));
171 assert!(task_interval > Duration::from_millis(500));
172 }
173 }
174 for i in 1..15 {
175 let task = i % 5;
176 let run = i / 5;
177 let task_prev = (i - 1) % 5;
178 let run_prev = (i - 1) / 5;
179 let inter_task_interval = executions[task][run] - executions[task_prev][run_prev];
180 assert!(inter_task_interval < Duration::from_millis(1500));
181 assert!(inter_task_interval > Duration::from_millis(500));
182 }
183 }
184}