1use futures::future::Future;
9use futures::sync::oneshot::{channel, Sender};
10use futures_cpupool::{Builder, CpuPool};
11use tokio_core::reactor::Timeout;
12use tokio_core::reactor::{Core, Handle, Remote};
13
14use std::io;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::thread::{self, JoinHandle};
18use std::time::{Instant, Duration};
19
20
21#[derive(Clone)]
25pub struct TaskHandle {
26 should_stop: Arc<AtomicBool>,
27}
28
29impl TaskHandle {
30 fn new() -> TaskHandle {
31 TaskHandle { should_stop: Arc::new(AtomicBool::new(false)) }
32 }
33
34 pub fn stop(&self) {
37 self.should_stop.store(true, Ordering::Relaxed);
38 }
39
40 pub fn stopped(&self) -> bool {
42 self.should_stop.load(Ordering::Relaxed)
43 }
44}
45
46fn fixed_interval_loop<F>(scheduled_fn: F, interval: Duration, handle: &Handle, task_handle: TaskHandle)
47 where F: Fn(&Handle) + Send + 'static
48{
49 if task_handle.stopped() {
50 return;
51 }
52 let start_time = Instant::now();
53 scheduled_fn(handle);
54 let execution = start_time.elapsed();
55 let next_iter_wait = if execution >= interval {
56 Duration::from_secs(0)
57 } else {
58 interval - execution
59 };
60 let handle_clone = handle.clone();
61 let t = Timeout::new(next_iter_wait, handle).unwrap()
62 .then(move |_| {
63 fixed_interval_loop(scheduled_fn, interval, &handle_clone, task_handle);
64 Ok::<(), ()>(())
65 });
66 handle.spawn(t);
67}
68
69fn calculate_delay(interval: Duration, execution: Duration, delay: Duration) -> (Duration, Duration) {
70 if execution >= interval {
71 (Duration::from_secs(0), delay + execution - interval)
72 } else {
73 let wait_gap = interval - execution;
74 if delay == Duration::from_secs(0) {
75 (wait_gap, Duration::from_secs(0))
76 } else if delay < wait_gap {
77 (wait_gap - delay, Duration::from_secs(0))
78 } else {
79 (Duration::from_secs(0), delay - wait_gap)
80 }
81 }
82}
83
84fn fixed_rate_loop<F>(scheduled_fn: F, interval: Duration, handle: &Handle, delay: Duration, task_handle: TaskHandle)
85 where F: Fn(&Handle) + Send + 'static
86{
87 if task_handle.stopped() {
88 return;
89 }
90 let start_time = Instant::now();
91 scheduled_fn(handle);
92 let execution = start_time.elapsed();
93 let (next_iter_wait, updated_delay) = calculate_delay(interval, execution, delay);
94 let handle_clone = handle.clone();
95 let t = Timeout::new(next_iter_wait, handle).unwrap()
96 .then(move |_| {
97 fixed_rate_loop(scheduled_fn, interval, &handle_clone, updated_delay, task_handle);
98 Ok::<(), ()>(())
99 });
100 handle.spawn(t);
101}
102
103
104struct CoreExecutorInner {
105 remote: Remote,
106 termination_sender: Option<Sender<()>>,
107 thread_handle: Option<JoinHandle<()>>,
108}
109
110impl Drop for CoreExecutorInner {
111 fn drop(&mut self) {
112 let _ = self.termination_sender.take().unwrap().send(());
113 let _ = self.thread_handle.take().unwrap().join();
114 }
115}
116
117pub struct CoreExecutor {
124 inner: Arc<CoreExecutorInner>
125}
126
127impl Clone for CoreExecutor {
128 fn clone(&self) -> Self {
129 CoreExecutor { inner: Arc::clone(&self.inner) }
130 }
131}
132
133impl CoreExecutor {
134 pub fn new() -> Result<CoreExecutor, io::Error> {
136 CoreExecutor::with_name("core_executor")
137 }
138
139 pub fn with_name(thread_name: &str) -> Result<CoreExecutor, io::Error> {
141 let (termination_tx, termination_rx) = channel();
142 let (core_tx, core_rx) = channel();
143 let thread_handle = thread::Builder::new()
144 .name(thread_name.to_owned())
145 .spawn(move || {
146 debug!("Core starting");
147 let mut core = Core::new().expect("Failed to start core");
148 let _ = core_tx.send(core.remote());
149 match core.run(termination_rx) {
150 Ok(v) => debug!("Core terminated correctly {:?}", v),
151 Err(e) => debug!("Core terminated with error: {:?}", e),
152 }
153 })?;
154 let inner = CoreExecutorInner {
155 remote: core_rx.wait().expect("Failed to receive remote"),
156 termination_sender: Some(termination_tx),
157 thread_handle: Some(thread_handle),
158 };
159 let executor = CoreExecutor {
160 inner: Arc::new(inner)
161 };
162 debug!("Executor created");
163 Ok(executor)
164 }
165
166 pub fn schedule_fixed_interval<F>(&self, initial: Duration, interval: Duration, scheduled_fn: F) -> TaskHandle
170 where F: Fn(&Handle) + Send + 'static
171 {
172 let task_handle = TaskHandle::new();
173 let task_handle_clone = task_handle.clone();
174 self.inner.remote.spawn(move |handle| {
175 let handle_clone = handle.clone();
176 let t = Timeout::new(initial, handle).unwrap()
177 .then(move |_| {
178 fixed_interval_loop(scheduled_fn, interval, &handle_clone, task_handle_clone);
179 Ok::<(), ()>(())
180 });
181 handle.spawn(t);
182 Ok::<(), ()>(())
183 });
184 task_handle
185 }
186
187 pub fn schedule_fixed_rate<F>(&self, initial: Duration, interval: Duration, scheduled_fn: F) -> TaskHandle
191 where F: Fn(&Handle) + Send + 'static
192 {
193 let task_handle = TaskHandle::new();
194 let task_handle_clone = task_handle.clone();
195 self.inner.remote.spawn(move |handle| {
196 let handle_clone = handle.clone();
197 let t = Timeout::new(initial, handle).unwrap()
198 .then(move |_| {
199 fixed_rate_loop(scheduled_fn, interval, &handle_clone, Duration::from_secs(0), task_handle_clone);
200 Ok::<(), ()>(())
201 });
202 handle.spawn(t);
203 Ok::<(), ()>(())
204 });
205 task_handle
206 }
207}
208
209
210#[derive(Clone)]
213pub struct ThreadPoolExecutor {
214 executor: CoreExecutor,
215 pool: CpuPool
216}
217
218impl ThreadPoolExecutor {
219 pub fn new(threads: usize) -> Result<ThreadPoolExecutor, io::Error> {
222 ThreadPoolExecutor::with_prefix(threads, "pool_thread_")
223 }
224
225 pub fn with_prefix(threads: usize, prefix: &str) -> Result<ThreadPoolExecutor, io::Error> {
228 let new_executor = CoreExecutor::with_name(&format!("{}executor", prefix))?;
229 Ok(ThreadPoolExecutor::with_executor(threads, prefix, new_executor))
230 }
231
232 pub fn with_executor(threads: usize, prefix: &str, executor: CoreExecutor) -> ThreadPoolExecutor {
235 let pool = Builder::new()
236 .pool_size(threads)
237 .name_prefix(prefix)
238 .create();
239 ThreadPoolExecutor { pool, executor }
240 }
241
242 pub fn schedule_fixed_rate<F>(&self, initial: Duration, interval: Duration, scheduled_fn: F) -> TaskHandle
245 where F: Fn(&Remote) + Send + Sync + 'static
246 {
247 let pool_clone = self.pool.clone();
248 let arc_fn = Arc::new(scheduled_fn);
249 self.executor.schedule_fixed_interval( initial,
251 interval,
252 move |handle| {
253 let arc_fn_clone = arc_fn.clone();
254 let remote = handle.remote().clone();
255 let t = pool_clone.spawn_fn(move || {
256 arc_fn_clone(&remote);
257 Ok::<(),()>(())
258 });
259 handle.spawn(t);
260 }
261 )
262 }
263
264 pub fn pool(&self) -> &CpuPool {
267 &self.pool
268 }
269}
270
271
272#[cfg(test)]
273mod tests {
274 use std::sync::{Arc, RwLock};
275 use std::thread;
276 use std::time::{Duration, Instant};
277
278 use super::{CoreExecutor, ThreadPoolExecutor, calculate_delay};
279
280 #[test]
281 fn fixed_interval_test() {
282 let timings = Arc::new(RwLock::new(Vec::new()));
283 {
284 let executor = CoreExecutor::new().unwrap();
285 let timings_clone = Arc::clone(&timings);
286 executor.schedule_fixed_rate(
287 Duration::from_secs(0),
288 Duration::from_secs(1),
289 move |_handle| {
290 timings_clone.write().unwrap().push(Instant::now());
291 }
292 );
293 thread::sleep(Duration::from_millis(5500));
294 }
295
296 let timings = timings.read().unwrap();
297 assert_eq!(timings.len(), 6);
298 for i in 1..6 {
299 let execution_interval = timings[i] - timings[i-1];
300 assert!(execution_interval < Duration::from_millis(1020));
301 assert!(execution_interval > Duration::from_millis(980));
302 }
303 }
304
305 #[test]
306 fn fixed_interval_slow_task_test() {
307 let counter = Arc::new(RwLock::new(0));
308 let counter_clone = Arc::clone(&counter);
309 {
310 let executor = CoreExecutor::new().unwrap();
311 executor.schedule_fixed_interval(
312 Duration::from_secs(0),
313 Duration::from_secs(1),
314 move |_handle| {
315 let counter = {
317 let mut counter = counter_clone.write().unwrap();
318 (*counter) += 1;
319 *counter
320 };
321 if counter == 1 {
322 thread::sleep(Duration::from_secs(3));
323 }
324 }
325 );
326 thread::sleep(Duration::from_millis(5500));
327 }
328 assert_eq!(*counter.read().unwrap(), 4);
329 }
330
331 #[test]
332 fn calculate_delay_test() {
333 fn s(n: u64) -> Duration { Duration::from_secs(n) };
334 assert_eq!(calculate_delay(s(10), s(3), s(0)), (s(7), s(0)));
335 assert_eq!(calculate_delay(s(10), s(11), s(0)), (s(0), s(1)));
336 assert_eq!(calculate_delay(s(10), s(3), s(3)), (s(4), s(0)));
337 assert_eq!(calculate_delay(s(10), s(3), s(9)), (s(0), s(2)));
338 assert_eq!(calculate_delay(s(10), s(12), s(15)), (s(0), s(17)));
339 }
340
341 #[test]
342 fn fixed_rate_test() {
343 let counter = Arc::new(RwLock::new(0));
344 let counter_clone = Arc::clone(&counter);
345 {
346 let executor = CoreExecutor::new().unwrap();
347 executor.schedule_fixed_rate(
348 Duration::from_secs(0),
349 Duration::from_secs(1),
350 move |_handle| {
351 let mut counter = counter_clone.write().unwrap();
352 (*counter) += 1;
353 }
354 );
355 thread::sleep(Duration::from_millis(5500));
356 }
357 assert_eq!(*counter.read().unwrap(), 6);
358 }
359
360 #[test]
361 fn fixed_rate_slow_task_test() {
362 let counter = Arc::new(RwLock::new(0));
363 let counter_clone = Arc::clone(&counter);
364 {
365 let executor = CoreExecutor::new().unwrap();
366 executor.schedule_fixed_rate(
367 Duration::from_secs(0),
368 Duration::from_secs(1),
369 move |_handle| {
370 let counter = {
372 let mut counter = counter_clone.write().unwrap();
373 (*counter) += 1;
374 *counter
375 };
376 if counter == 1 {
377 thread::sleep(Duration::from_secs(3));
378 }
379 }
380 );
381 thread::sleep(Duration::from_millis(5500));
382 }
383 assert_eq!(*counter.read().unwrap(), 6);
384 }
385
386 #[test]
387 fn fixed_rate_slow_task_test_pool() {
388 let counter = Arc::new(RwLock::new(0));
389 let counter_clone = Arc::clone(&counter);
390 {
391 let executor = ThreadPoolExecutor::new(20).unwrap();
392 executor.schedule_fixed_rate(
393 Duration::from_secs(0),
394 Duration::from_secs(1),
395 move |_remote| {
396 let counter = {
398 let mut counter = counter_clone.write().unwrap();
399 (*counter) += 1;
400 *counter
401 };
402 if counter == 1 {
403 thread::sleep(Duration::from_secs(3));
404 }
405 }
406 );
407 thread::sleep(Duration::from_millis(5500));
408 }
409 assert_eq!(*counter.read().unwrap(), 6);
410 }
411
412 #[test]
413 fn fixed_rate_stop_test() {
414 let counter1 = Arc::new(RwLock::new(0));
415 let counter2 = Arc::new(RwLock::new(0));
416 let counter1_clone = Arc::clone(&counter1);
417 let counter2_clone = Arc::clone(&counter2);
418 {
419 let executor = CoreExecutor::new().unwrap();
420 let t1 = executor.schedule_fixed_rate(
421 Duration::from_secs(0),
422 Duration::from_secs(1),
423 move |_handle| {
424 let mut counter = counter1_clone.write().unwrap();
425 (*counter) += 1;
426 }
427 );
428 executor.schedule_fixed_rate(
429 Duration::from_secs(0),
430 Duration::from_secs(1),
431 move |_handle| {
432 let mut counter = counter2_clone.write().unwrap();
433 (*counter) += 1;
434 }
435 );
436 thread::sleep(Duration::from_millis(5500));
437 t1.stop();
438 thread::sleep(Duration::from_millis(5000));
439 }
440 assert_eq!(*counter1.read().unwrap(), 6);
441 assert_eq!(*counter2.read().unwrap(), 11);
442 }
443
444 #[test]
445 fn fixed_interval_stop_test() {
446 let counter1 = Arc::new(RwLock::new(0));
447 let counter2 = Arc::new(RwLock::new(0));
448 let counter1_clone = Arc::clone(&counter1);
449 let counter2_clone = Arc::clone(&counter2);
450 {
451 let executor = CoreExecutor::new().unwrap();
452 let t1 = executor.schedule_fixed_interval(
453 Duration::from_secs(0),
454 Duration::from_secs(1),
455 move |_handle| {
456 let mut counter = counter1_clone.write().unwrap();
457 (*counter) += 1;
458 }
459 );
460 executor.schedule_fixed_rate(
461 Duration::from_secs(0),
462 Duration::from_secs(1),
463 move |_handle| {
464 let mut counter = counter2_clone.write().unwrap();
465 (*counter) += 1;
466 }
467 );
468 thread::sleep(Duration::from_millis(5500));
469 t1.stop();
470 thread::sleep(Duration::from_millis(5000));
471 }
472 assert_eq!(*counter1.read().unwrap(), 6);
473 assert_eq!(*counter2.read().unwrap(), 11);
474 }
475}