periodic_closure/
lib.rs

1use crossbeam_channel::{bounded, select, tick, Sender};
2use std::thread::JoinHandle;
3use std::time::Duration;
4
5/// The struct implements an abstraction for running periodically a closure at
6/// fixed period - 'P'. The first call happens after duration of 'P' when
7/// 'start' is called. If the closure runs longer than the period then
8/// consequtive calls to the closure will be collapsed guaranteeing that we
9/// don't call the closure more than once for any interval of duration 'P'.
10/// The closure is stopped when the object is dropped.
11pub struct PeriodicClosure {
12    join_handle: Option<JoinHandle<()>>,
13    canceller: Sender<()>,
14}
15
16impl PeriodicClosure {
17    /// Starts an OS thread with the given name. Panics if the OS thread can't
18    /// be spawned.
19    pub fn start<F: FnMut() + Send + Sync + 'static>(
20        name: String,
21        period: Duration,
22        mut func: F,
23    ) -> Self {
24        let ticker = tick(period);
25        let (canceller, cancelled) = bounded(1);
26        let join_handle = std::thread::Builder::new()
27            .name(name)
28            .spawn(move || {
29                loop {
30                    // The cancel signal takes precedence over a tick. In case we have a message in
31                    // both receivers we don't know what the select will return. It may happen that
32                    // the select keeps returning ticks although there is a cancellation signal.
33                    if cancelled.try_recv().is_ok() {
34                        break;
35                    }
36                    // The ticker is collapsing ticks for us since it is a channel with size 1.
37                    select! {
38                        recv(cancelled) -> _ => break,
39                        recv(ticker) -> _ => func(),
40                    }
41                }
42            })
43            .unwrap();
44        Self {
45            join_handle: Some(join_handle),
46            canceller,
47        }
48    }
49}
50
51impl Drop for PeriodicClosure {
52    fn drop(&mut self) {
53        if let Some(join_handle) = self.join_handle.take() {
54            self.canceller
55                .send(())
56                .expect("The receiver must exists in detached thread.");
57            join_handle.join().unwrap();
58        }
59    }
60}
61
62#[cfg(test)]
63mod tests {
64    use super::*;
65    use std::sync::Arc;
66
67    #[test]
68    fn test() {
69        let i = Arc::new(std::sync::atomic::AtomicI64::new(0));
70        let j = Arc::clone(&i);
71        let p = PeriodicClosure::start(
72            "test thread".to_string(),
73            Duration::from_millis(100),
74            move || {
75                j.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
76            },
77        );
78        std::thread::sleep(Duration::from_millis(1050));
79        std::mem::drop(p);
80        // We should have exactly 10 increments.
81        assert_eq!(i.load(std::sync::atomic::Ordering::Relaxed), 10);
82    }
83
84    #[test]
85    fn test_collapsed() {
86        let i = Arc::new(std::sync::atomic::AtomicI64::new(0));
87        let j = Arc::clone(&i);
88        let p = PeriodicClosure::start(
89            "test thread".to_string(),
90            Duration::from_millis(1),
91            move || {
92                j.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
93                std::thread::sleep(Duration::from_millis(100));
94            },
95        );
96        std::thread::sleep(Duration::from_millis(950));
97        std::mem::drop(p);
98        // We should have exactly 10 increments. The first closure runs for
99        // [1ms...101ms], the last closure will run, in theory, for
100        // [901ms...1001ms].
101        assert_eq!(i.load(std::sync::atomic::Ordering::Relaxed), 10);
102    }
103}
104