1use crossbeam_channel::{bounded, select, tick, Sender};
2use std::thread::JoinHandle;
3use std::time::Duration;
4
5pub struct PeriodicClosure {
12 join_handle: Option<JoinHandle<()>>,
13 canceller: Sender<()>,
14}
15
16impl PeriodicClosure {
17 pub fn start<F: FnMut() + Send + Sync + 'static>(
20 name: String,
21 period: Duration,
22 mut func: F,
23 ) -> Self {
24 let ticker = tick(period);
25 let (canceller, cancelled) = bounded(1);
26 let join_handle = std::thread::Builder::new()
27 .name(name)
28 .spawn(move || {
29 loop {
30 if cancelled.try_recv().is_ok() {
34 break;
35 }
36 select! {
38 recv(cancelled) -> _ => break,
39 recv(ticker) -> _ => func(),
40 }
41 }
42 })
43 .unwrap();
44 Self {
45 join_handle: Some(join_handle),
46 canceller,
47 }
48 }
49}
50
51impl Drop for PeriodicClosure {
52 fn drop(&mut self) {
53 if let Some(join_handle) = self.join_handle.take() {
54 self.canceller
55 .send(())
56 .expect("The receiver must exists in detached thread.");
57 join_handle.join().unwrap();
58 }
59 }
60}
61
62#[cfg(test)]
63mod tests {
64 use super::*;
65 use std::sync::Arc;
66
67 #[test]
68 fn test() {
69 let i = Arc::new(std::sync::atomic::AtomicI64::new(0));
70 let j = Arc::clone(&i);
71 let p = PeriodicClosure::start(
72 "test thread".to_string(),
73 Duration::from_millis(100),
74 move || {
75 j.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
76 },
77 );
78 std::thread::sleep(Duration::from_millis(1050));
79 std::mem::drop(p);
80 assert_eq!(i.load(std::sync::atomic::Ordering::Relaxed), 10);
82 }
83
84 #[test]
85 fn test_collapsed() {
86 let i = Arc::new(std::sync::atomic::AtomicI64::new(0));
87 let j = Arc::clone(&i);
88 let p = PeriodicClosure::start(
89 "test thread".to_string(),
90 Duration::from_millis(1),
91 move || {
92 j.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
93 std::thread::sleep(Duration::from_millis(100));
94 },
95 );
96 std::thread::sleep(Duration::from_millis(950));
97 std::mem::drop(p);
98 assert_eq!(i.load(std::sync::atomic::Ordering::Relaxed), 10);
102 }
103}
104