synchronous_timer/
timer.rs1use std::collections::BinaryHeap;
2use std::panic::UnwindSafe;
3use std::sync::Arc;
4use std::time::{Duration, Instant, SystemTime};
5
6use parking_lot::{Condvar, Mutex};
7
8use crate::executor::Executor;
9use crate::task::{Task, TaskCallable, TaskGuard};
10
11pub struct Timer {
15 executor_thread: Option<std::thread::JoinHandle<()>>,
16 shared: Arc<Mutex<TimerShared>>,
17 changed: Arc<Condvar>,
18}
19
20pub(crate) struct TimerShared {
21 pub tasks: BinaryHeap<Task>,
22 pub done: bool,
23 pub next_id: u64,
24}
25
26impl TimerShared {
27 #[inline(always)]
28 fn with_capacity(cap: usize) -> Self {
29 Self {
30 tasks: if cap == 0 {
31 BinaryHeap::new()
33 } else {
34 BinaryHeap::with_capacity(cap)
35 },
36 done: false,
37 next_id: 1,
38 }
39 }
40}
41
42impl Timer {
43 #[inline(always)]
46 pub fn new() -> Self {
47 Self::with_capacity(0)
48 }
49
50 pub fn with_capacity(cap: usize) -> Self {
54 let shared = Arc::new(Mutex::new(TimerShared::with_capacity(cap)));
55 let changed = Arc::new(Condvar::new());
56 let executor = Executor::new(Arc::clone(&shared), Arc::clone(&changed));
57 let executor_thread = Some(
58 std::thread::Builder::new()
59 .name("timer-executor".into())
60 .spawn(|| executor.run_until_done())
61 .unwrap(),
62 );
63 Self {
64 shared,
65 changed,
66 executor_thread,
67 }
68 }
69
70 fn push(&mut self, callable: TaskCallable, next: Instant) -> TaskGuard {
71 let mut shared = self.shared.lock();
72 let id = shared.next_id;
73 shared.next_id += 1;
74 let handle = Task::new(id, next, callable);
75 let guard = handle.guard();
76 shared.tasks.push(handle);
77 drop(shared);
78 self.changed.notify_one();
79 guard
80 }
81
82 pub fn schedule_in<F: FnOnce() + UnwindSafe + Send + 'static>(
84 &mut self,
85 duration: Duration,
86 f: F,
87 ) -> TaskGuard {
88 let callable = TaskCallable::new_once(f);
89 self.push(callable, Instant::now() + duration)
90 }
91
92 pub fn schedule_at<F: FnOnce() + UnwindSafe + Send + 'static>(
96 &mut self,
97 system_time: SystemTime,
98 f: F,
99 ) -> TaskGuard {
100 let callable = TaskCallable::new_once(f);
101 let now = SystemTime::now();
102 let when = match system_time.duration_since(now) {
103 Ok(d) => Instant::now() + d,
104 Err(_) => Instant::now(),
105 };
106 self.push(callable, when)
107 }
108
109 pub fn schedule_repeating<F: FnMut() + UnwindSafe + Send + 'static>(
111 &mut self,
112 interval: Duration,
113 f: F,
114 ) -> TaskGuard {
115 let callable = TaskCallable::new_repeating(f, interval);
116 self.push(callable, Instant::now() + interval)
117 }
118
119 pub fn schedule_immediately<F: FnOnce() + UnwindSafe + Send + 'static>(&mut self, f: F) {
121 let callable = TaskCallable::new_once(f);
122 self.push(callable, Instant::now()).detach()
123 }
124}
125
126impl Default for Timer {
127 fn default() -> Self {
128 Self::new()
129 }
130}
131
132impl Drop for Timer {
133 fn drop(&mut self) {
135 if let Some(handle) = self.executor_thread.take() {
136 let mut s = self.shared.lock();
137 s.done = true;
138 self.changed.notify_one();
139 drop(s);
140 if let Err(e) = handle.join() {
141 log::error!("Error joining timer thread: {:?}", e);
142 }
143 }
144 }
145}