synchronous_timer/
timer.rs

1use std::collections::BinaryHeap;
2use std::panic::UnwindSafe;
3use std::sync::Arc;
4use std::time::{Duration, Instant, SystemTime};
5
6use parking_lot::{Condvar, Mutex};
7
8use crate::executor::Executor;
9use crate::task::{Task, TaskCallable, TaskGuard};
10
11/// The main structure of this library, a `Timer` handles scheduling one-off and repeating tasks,
12/// which are executed on a background thread. Tasks should be short-lived (as they block the
13/// thread) synchronous functions.
14pub struct Timer {
15    executor_thread: Option<std::thread::JoinHandle<()>>,
16    shared: Arc<Mutex<TimerShared>>,
17    changed: Arc<Condvar>,
18}
19
20pub(crate) struct TimerShared {
21    pub tasks: BinaryHeap<Task>,
22    pub done: bool,
23    pub next_id: u64,
24}
25
26impl TimerShared {
27    #[inline(always)]
28    fn with_capacity(cap: usize) -> Self {
29        Self {
30            tasks: if cap == 0 {
31                // Avoid allocating in this case
32                BinaryHeap::new()
33            } else {
34                BinaryHeap::with_capacity(cap)
35            },
36            done: false,
37            next_id: 1,
38        }
39    }
40}
41
42impl Timer {
43    /// Construct a new Timer. This will immediately start a background thread
44    /// for executing tasks, which will be shut down on drop.
45    #[inline(always)]
46    pub fn new() -> Self {
47        Self::with_capacity(0)
48    }
49
50    /// Construct a new Timer with underlying capacity for the given number of tasks
51    /// as a microoptimization. This will immediately start a background thread for
52    /// executing tasks, which will be shut down on drop.
53    pub fn with_capacity(cap: usize) -> Self {
54        let shared = Arc::new(Mutex::new(TimerShared::with_capacity(cap)));
55        let changed = Arc::new(Condvar::new());
56        let executor = Executor::new(Arc::clone(&shared), Arc::clone(&changed));
57        let executor_thread = Some(
58            std::thread::Builder::new()
59                .name("timer-executor".into())
60                .spawn(|| executor.run_until_done())
61                .unwrap(),
62        );
63        Self {
64            shared,
65            changed,
66            executor_thread,
67        }
68    }
69
70    fn push(&mut self, callable: TaskCallable, next: Instant) -> TaskGuard {
71        let mut shared = self.shared.lock();
72        let id = shared.next_id;
73        shared.next_id += 1;
74        let handle = Task::new(id, next, callable);
75        let guard = handle.guard();
76        shared.tasks.push(handle);
77        drop(shared);
78        self.changed.notify_one();
79        guard
80    }
81
82    /// Schedule a task to run once, after the given duration
83    pub fn schedule_in<F: FnOnce() + UnwindSafe + Send + 'static>(
84        &mut self,
85        duration: Duration,
86        f: F,
87    ) -> TaskGuard {
88        let callable = TaskCallable::new_once(f);
89        self.push(callable, Instant::now() + duration)
90    }
91
92    /// Schedule a task to run at a given wall-clock time. This will be converted
93    /// to an Instant and run according to the monotonic clock, so may have... somewhat
94    /// unpredictable behavior around leap seconds.
95    pub fn schedule_at<F: FnOnce() + UnwindSafe + Send + 'static>(
96        &mut self,
97        system_time: SystemTime,
98        f: F,
99    ) -> TaskGuard {
100        let callable = TaskCallable::new_once(f);
101        let now = SystemTime::now();
102        let when = match system_time.duration_since(now) {
103            Ok(d) => Instant::now() + d,
104            Err(_) => Instant::now(),
105        };
106        self.push(callable, when)
107    }
108
109    /// Schedule a task to run periodically, after every interval
110    pub fn schedule_repeating<F: FnMut() + UnwindSafe + Send + 'static>(
111        &mut self,
112        interval: Duration,
113        f: F,
114    ) -> TaskGuard {
115        let callable = TaskCallable::new_repeating(f, interval);
116        self.push(callable, Instant::now() + interval)
117    }
118
119    /// Schedule a task to run as soon as possible
120    pub fn schedule_immediately<F: FnOnce() + UnwindSafe + Send + 'static>(&mut self, f: F) {
121        let callable = TaskCallable::new_once(f);
122        self.push(callable, Instant::now()).detach()
123    }
124}
125
126impl Default for Timer {
127    fn default() -> Self {
128        Self::new()
129    }
130}
131
132impl Drop for Timer {
133    /// Drop the timer and shut down the background thread
134    fn drop(&mut self) {
135        if let Some(handle) = self.executor_thread.take() {
136            let mut s = self.shared.lock();
137            s.done = true;
138            self.changed.notify_one();
139            drop(s);
140            if let Err(e) = handle.join() {
141                log::error!("Error joining timer thread: {:?}", e);
142            }
143        }
144    }
145}