scheduled_executor/
executor.rs

1//! Executors allow easy scheduling and execution of functions or closures.
2//! The `CoreExecutor` uses a single thread for scheduling and execution, while the
3//! `ThreadPoolExecutor` uses multiple threads to execute the function.
4//! Internally, each executor uses a `tokio_core::reactor::Core` as event loop, that will drive
5//! the scheduling of the functions (and for the `CoreExecutor`, also their execution). A reference
6//! to the event loop is passed to every function when executed, allowing it to register additional
7//! events if needed.
8use futures::future::Future;
9use futures::sync::oneshot::{channel, Sender};
10use futures_cpupool::{Builder, CpuPool};
11use tokio_core::reactor::Timeout;
12use tokio_core::reactor::{Core, Handle, Remote};
13
14use std::io;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::thread::{self, JoinHandle};
18use std::time::{Instant, Duration};
19
20
21/// A handle that allows a task to be stopped. A new handle is returned every time a new task is
22/// scheduled. Note that stopping a task will prevent it from running the next time it's scheduled
23/// to run, but it won't interrupt a task that is currently being executed.
24#[derive(Clone)]
25pub struct TaskHandle {
26    should_stop: Arc<AtomicBool>,
27}
28
29impl TaskHandle {
30    fn new() -> TaskHandle {
31        TaskHandle { should_stop: Arc::new(AtomicBool::new(false)) }
32    }
33
34    /// Stops the correspondent task. Not that a running task won't be interrupted, but
35    /// future tasks executions will be prevented.
36    pub fn stop(&self) {
37        self.should_stop.store(true, Ordering::Relaxed);
38    }
39
40    /// Returns true if the task is stopped.
41    pub fn stopped(&self) -> bool {
42        self.should_stop.load(Ordering::Relaxed)
43    }
44}
45
46fn fixed_interval_loop<F>(scheduled_fn: F, interval: Duration, handle: &Handle, task_handle: TaskHandle)
47    where F: Fn(&Handle) + Send + 'static
48{
49    if task_handle.stopped() {
50        return;
51    }
52    let start_time = Instant::now();
53    scheduled_fn(handle);
54    let execution = start_time.elapsed();
55    let next_iter_wait = if execution >= interval {
56        Duration::from_secs(0)
57    } else {
58        interval - execution
59    };
60    let handle_clone = handle.clone();
61    let t = Timeout::new(next_iter_wait, handle).unwrap()
62        .then(move |_| {
63            fixed_interval_loop(scheduled_fn, interval, &handle_clone, task_handle);
64            Ok::<(), ()>(())
65        });
66    handle.spawn(t);
67}
68
69fn calculate_delay(interval: Duration, execution: Duration, delay: Duration) -> (Duration, Duration) {
70    if execution >= interval {
71        (Duration::from_secs(0), delay + execution - interval)
72    } else {
73        let wait_gap = interval - execution;
74        if delay == Duration::from_secs(0) {
75            (wait_gap, Duration::from_secs(0))
76        } else if delay < wait_gap {
77            (wait_gap - delay, Duration::from_secs(0))
78        } else {
79            (Duration::from_secs(0), delay - wait_gap)
80        }
81    }
82}
83
84fn fixed_rate_loop<F>(scheduled_fn: F, interval: Duration, handle: &Handle, delay: Duration, task_handle: TaskHandle)
85    where F: Fn(&Handle) + Send + 'static
86{
87    if task_handle.stopped() {
88        return;
89    }
90    let start_time = Instant::now();
91    scheduled_fn(handle);
92    let execution = start_time.elapsed();
93    let (next_iter_wait, updated_delay) = calculate_delay(interval, execution, delay);
94    let handle_clone = handle.clone();
95    let t = Timeout::new(next_iter_wait, handle).unwrap()
96        .then(move |_| {
97            fixed_rate_loop(scheduled_fn, interval, &handle_clone, updated_delay, task_handle);
98            Ok::<(), ()>(())
99        });
100    handle.spawn(t);
101}
102
103
104struct CoreExecutorInner {
105    remote: Remote,
106    termination_sender: Option<Sender<()>>,
107    thread_handle: Option<JoinHandle<()>>,
108}
109
110impl Drop for CoreExecutorInner {
111    fn drop(&mut self) {
112        let _ = self.termination_sender.take().unwrap().send(());
113        let _ = self.thread_handle.take().unwrap().join();
114    }
115}
116
117/// A `CoreExecutor` is the most simple executor provided. It runs a single thread, which is
118/// responsible for both scheduling the function (registering the timer for the wakeup),
119/// and the actual execution. The executor will stop once dropped. The `CoreExecutor`
120/// can be cloned to generate a new reference to the same underlying executor.
121/// Given the single threaded nature of this executor, tasks are executed sequentially, and a long
122/// running task will cause delay in other subsequent executions.
123pub struct CoreExecutor {
124    inner: Arc<CoreExecutorInner>
125}
126
127impl Clone for CoreExecutor {
128    fn clone(&self) -> Self {
129        CoreExecutor { inner: Arc::clone(&self.inner) }
130    }
131}
132
133impl CoreExecutor {
134    /// Creates a new `CoreExecutor`.
135    pub fn new() -> Result<CoreExecutor, io::Error> {
136        CoreExecutor::with_name("core_executor")
137    }
138
139    /// Creates a new `CoreExecutor` with the specified thread name.
140    pub fn with_name(thread_name: &str) -> Result<CoreExecutor, io::Error> {
141        let (termination_tx, termination_rx) = channel();
142        let (core_tx, core_rx) = channel();
143        let thread_handle = thread::Builder::new()
144            .name(thread_name.to_owned())
145            .spawn(move || {
146                debug!("Core starting");
147                let mut core = Core::new().expect("Failed to start core");
148                let _ = core_tx.send(core.remote());
149                match core.run(termination_rx) {
150                    Ok(v) => debug!("Core terminated correctly {:?}", v),
151                    Err(e) => debug!("Core terminated with error: {:?}", e),
152                }
153            })?;
154        let inner = CoreExecutorInner {
155            remote: core_rx.wait().expect("Failed to receive remote"),
156            termination_sender: Some(termination_tx),
157            thread_handle: Some(thread_handle),
158        };
159        let executor = CoreExecutor {
160            inner: Arc::new(inner)
161        };
162        debug!("Executor created");
163        Ok(executor)
164    }
165
166    /// Schedule a function for running at fixed intervals. The executor will try to run the
167    /// function every `interval`, but if one execution takes longer than `interval` it will delay
168    /// all the subsequent calls.
169    pub fn schedule_fixed_interval<F>(&self, initial: Duration, interval: Duration, scheduled_fn: F) -> TaskHandle
170        where F: Fn(&Handle) + Send + 'static
171    {
172        let task_handle = TaskHandle::new();
173        let task_handle_clone = task_handle.clone();
174        self.inner.remote.spawn(move |handle| {
175            let handle_clone = handle.clone();
176            let t = Timeout::new(initial, handle).unwrap()
177                .then(move |_| {
178                    fixed_interval_loop(scheduled_fn, interval, &handle_clone, task_handle_clone);
179                    Ok::<(), ()>(())
180                });
181            handle.spawn(t);
182            Ok::<(), ()>(())
183        });
184        task_handle
185    }
186
187    /// Schedule a function for running at fixed rate. The executor will try to run the function
188    /// every `interval`, and if a task execution takes longer than `interval`, the wait time
189    /// between task will be reduced to decrease the overall delay.
190    pub fn schedule_fixed_rate<F>(&self, initial: Duration, interval: Duration, scheduled_fn: F) -> TaskHandle
191        where F: Fn(&Handle) + Send + 'static
192    {
193        let task_handle = TaskHandle::new();
194        let task_handle_clone = task_handle.clone();
195        self.inner.remote.spawn(move |handle| {
196            let handle_clone = handle.clone();
197            let t = Timeout::new(initial, handle).unwrap()
198                .then(move |_| {
199                    fixed_rate_loop(scheduled_fn, interval, &handle_clone, Duration::from_secs(0), task_handle_clone);
200                    Ok::<(), ()>(())
201                });
202            handle.spawn(t);
203            Ok::<(), ()>(())
204        });
205        task_handle
206    }
207}
208
209
210/// A `ThreadPoolExecutor` will use one thread for the task scheduling and a thread pool for
211/// task execution, allowing multiple tasks to run in parallel.
212#[derive(Clone)]
213pub struct ThreadPoolExecutor {
214    executor: CoreExecutor,
215    pool: CpuPool
216}
217
218impl ThreadPoolExecutor {
219    /// Creates a new `ThreadPoolExecutor` with the specified number of threads. Threads will
220    /// be named "pool_thread_0", "pool_thread_1" and so on.
221    pub fn new(threads: usize) -> Result<ThreadPoolExecutor, io::Error> {
222        ThreadPoolExecutor::with_prefix(threads, "pool_thread_")
223    }
224
225    /// Creates a new `ThreadPoolExecutor` with the specified number of threads and prefix for
226    /// the thread names.
227    pub fn with_prefix(threads: usize, prefix: &str) -> Result<ThreadPoolExecutor, io::Error> {
228        let new_executor = CoreExecutor::with_name(&format!("{}executor", prefix))?;
229        Ok(ThreadPoolExecutor::with_executor(threads, prefix, new_executor))
230    }
231
232    /// Creates a new `ThreadPoolExecutor` with the specified number of threads, prefix and
233    /// using the given `CoreExecutor` for scheduling.
234    pub fn with_executor(threads: usize, prefix: &str, executor: CoreExecutor) -> ThreadPoolExecutor {
235        let pool = Builder::new()
236            .pool_size(threads)
237            .name_prefix(prefix)
238            .create();
239        ThreadPoolExecutor { pool, executor }
240    }
241
242    /// Schedules the given function to be executed every `interval`. The function will be
243    /// scheduled on one of the threads in the thread pool.
244    pub fn schedule_fixed_rate<F>(&self, initial: Duration, interval: Duration, scheduled_fn: F) -> TaskHandle
245        where F: Fn(&Remote) + Send + Sync + 'static
246    {
247        let pool_clone = self.pool.clone();
248        let arc_fn = Arc::new(scheduled_fn);
249        self.executor.schedule_fixed_interval(  // Fixed interval is enough
250            initial,
251            interval,
252            move |handle| {
253                let arc_fn_clone = arc_fn.clone();
254                let remote = handle.remote().clone();
255                let t = pool_clone.spawn_fn(move || {
256                    arc_fn_clone(&remote);
257                    Ok::<(),()>(())
258                });
259                handle.spawn(t);
260            }
261        )
262    }
263
264    // TODO: make pub(crate)
265    /// Returns the thread pool used internally.
266    pub fn pool(&self) -> &CpuPool {
267        &self.pool
268    }
269}
270
271
272#[cfg(test)]
273mod tests {
274    use std::sync::{Arc, RwLock};
275    use std::thread;
276    use std::time::{Duration, Instant};
277
278    use super::{CoreExecutor, ThreadPoolExecutor, calculate_delay};
279
280    #[test]
281    fn fixed_interval_test() {
282        let timings = Arc::new(RwLock::new(Vec::new()));
283        {
284            let executor = CoreExecutor::new().unwrap();
285            let timings_clone = Arc::clone(&timings);
286            executor.schedule_fixed_rate(
287                Duration::from_secs(0),
288                Duration::from_secs(1),
289                move |_handle| {
290                    timings_clone.write().unwrap().push(Instant::now());
291                }
292            );
293            thread::sleep(Duration::from_millis(5500));
294        }
295
296        let timings = timings.read().unwrap();
297        assert_eq!(timings.len(), 6);
298        for i in 1..6 {
299            let execution_interval = timings[i] - timings[i-1];
300            assert!(execution_interval < Duration::from_millis(1020));
301            assert!(execution_interval > Duration::from_millis(980));
302        }
303    }
304
305    #[test]
306    fn fixed_interval_slow_task_test() {
307        let counter = Arc::new(RwLock::new(0));
308        let counter_clone = Arc::clone(&counter);
309        {
310            let executor = CoreExecutor::new().unwrap();
311            executor.schedule_fixed_interval(
312                Duration::from_secs(0),
313                Duration::from_secs(1),
314                move |_handle| {
315                    // TODO: use atomic int when available
316                    let counter = {
317                        let mut counter = counter_clone.write().unwrap();
318                        (*counter) += 1;
319                        *counter
320                    };
321                    if counter == 1 {
322                        thread::sleep(Duration::from_secs(3));
323                    }
324                }
325            );
326            thread::sleep(Duration::from_millis(5500));
327        }
328        assert_eq!(*counter.read().unwrap(), 4);
329    }
330
331    #[test]
332    fn calculate_delay_test() {
333        fn s(n: u64) -> Duration { Duration::from_secs(n) };
334        assert_eq!(calculate_delay(s(10), s(3), s(0)), (s(7), s(0)));
335        assert_eq!(calculate_delay(s(10), s(11), s(0)), (s(0), s(1)));
336        assert_eq!(calculate_delay(s(10), s(3), s(3)), (s(4), s(0)));
337        assert_eq!(calculate_delay(s(10), s(3), s(9)), (s(0), s(2)));
338        assert_eq!(calculate_delay(s(10), s(12), s(15)), (s(0), s(17)));
339    }
340
341    #[test]
342    fn fixed_rate_test() {
343        let counter = Arc::new(RwLock::new(0));
344        let counter_clone = Arc::clone(&counter);
345        {
346            let executor = CoreExecutor::new().unwrap();
347            executor.schedule_fixed_rate(
348                Duration::from_secs(0),
349                Duration::from_secs(1),
350                move |_handle| {
351                    let mut counter = counter_clone.write().unwrap();
352                    (*counter) += 1;
353                }
354            );
355            thread::sleep(Duration::from_millis(5500));
356        }
357        assert_eq!(*counter.read().unwrap(), 6);
358    }
359
360    #[test]
361    fn fixed_rate_slow_task_test() {
362        let counter = Arc::new(RwLock::new(0));
363        let counter_clone = Arc::clone(&counter);
364        {
365            let executor = CoreExecutor::new().unwrap();
366            executor.schedule_fixed_rate(
367                Duration::from_secs(0),
368                Duration::from_secs(1),
369                move |_handle| {
370                    // TODO: use atomic int when available
371                    let counter = {
372                        let mut counter = counter_clone.write().unwrap();
373                        (*counter) += 1;
374                        *counter
375                    };
376                    if counter == 1 {
377                        thread::sleep(Duration::from_secs(3));
378                    }
379                }
380            );
381            thread::sleep(Duration::from_millis(5500));
382        }
383        assert_eq!(*counter.read().unwrap(), 6);
384    }
385
386    #[test]
387    fn fixed_rate_slow_task_test_pool() {
388        let counter = Arc::new(RwLock::new(0));
389        let counter_clone = Arc::clone(&counter);
390        {
391            let executor = ThreadPoolExecutor::new(20).unwrap();
392            executor.schedule_fixed_rate(
393                Duration::from_secs(0),
394                Duration::from_secs(1),
395                move |_remote| {
396                    // TODO: use atomic int when available
397                    let counter = {
398                        let mut counter = counter_clone.write().unwrap();
399                        (*counter) += 1;
400                        *counter
401                    };
402                    if counter == 1 {
403                        thread::sleep(Duration::from_secs(3));
404                    }
405                }
406            );
407            thread::sleep(Duration::from_millis(5500));
408        }
409        assert_eq!(*counter.read().unwrap(), 6);
410    }
411
412    #[test]
413    fn fixed_rate_stop_test() {
414        let counter1 = Arc::new(RwLock::new(0));
415        let counter2 = Arc::new(RwLock::new(0));
416        let counter1_clone = Arc::clone(&counter1);
417        let counter2_clone = Arc::clone(&counter2);
418        {
419            let executor = CoreExecutor::new().unwrap();
420            let t1 = executor.schedule_fixed_rate(
421                Duration::from_secs(0),
422                Duration::from_secs(1),
423                move |_handle| {
424                    let mut counter = counter1_clone.write().unwrap();
425                    (*counter) += 1;
426                }
427            );
428            executor.schedule_fixed_rate(
429                Duration::from_secs(0),
430                Duration::from_secs(1),
431                move |_handle| {
432                    let mut counter = counter2_clone.write().unwrap();
433                    (*counter) += 1;
434                }
435            );
436            thread::sleep(Duration::from_millis(5500));
437            t1.stop();
438            thread::sleep(Duration::from_millis(5000));
439        }
440        assert_eq!(*counter1.read().unwrap(), 6);
441        assert_eq!(*counter2.read().unwrap(), 11);
442    }
443
444    #[test]
445    fn fixed_interval_stop_test() {
446        let counter1 = Arc::new(RwLock::new(0));
447        let counter2 = Arc::new(RwLock::new(0));
448        let counter1_clone = Arc::clone(&counter1);
449        let counter2_clone = Arc::clone(&counter2);
450        {
451            let executor = CoreExecutor::new().unwrap();
452            let t1 = executor.schedule_fixed_interval(
453                Duration::from_secs(0),
454                Duration::from_secs(1),
455                move |_handle| {
456                    let mut counter = counter1_clone.write().unwrap();
457                    (*counter) += 1;
458                }
459            );
460            executor.schedule_fixed_rate(
461                Duration::from_secs(0),
462                Duration::from_secs(1),
463                move |_handle| {
464                    let mut counter = counter2_clone.write().unwrap();
465                    (*counter) += 1;
466                }
467            );
468            thread::sleep(Duration::from_millis(5500));
469            t1.stop();
470            thread::sleep(Duration::from_millis(5000));
471        }
472        assert_eq!(*counter1.read().unwrap(), 6);
473        assert_eq!(*counter2.read().unwrap(), 11);
474    }
475}