zerodds_corba_ccm/
timer.rs1use alloc::collections::BTreeMap;
11use alloc::sync::Arc;
12use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::sync::Mutex;
14use std::thread::{self, JoinHandle};
15use std::time::{Duration, Instant};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum TimerKind {
20 OneShot,
22 Periodic,
24}
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
28pub struct TimerHandle(pub u64);
29
30pub trait TimerCallback: Send + Sync {
32 fn fire(&self, handle: TimerHandle);
34}
35
36struct TimerEntry {
37 kind: TimerKind,
38 next_fire: Instant,
39 period: Duration,
40 callback: Arc<dyn TimerCallback>,
41}
42
43struct ServiceInner {
44 next_handle: AtomicU64,
45 timers: Mutex<BTreeMap<TimerHandle, TimerEntry>>,
46 shutdown: AtomicBool,
47}
48
49pub struct TimerEventService {
51 inner: Arc<ServiceInner>,
52 worker: Option<JoinHandle<()>>,
53}
54
55impl core::fmt::Debug for TimerEventService {
56 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
57 let n = self.inner.timers.lock().ok().map(|g| g.len()).unwrap_or(0);
58 f.debug_struct("TimerEventService")
59 .field("active_timers", &n)
60 .finish()
61 }
62}
63
64impl Default for TimerEventService {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl TimerEventService {
71 #[must_use]
73 pub fn new() -> Self {
74 let inner = Arc::new(ServiceInner {
75 next_handle: AtomicU64::new(1),
76 timers: Mutex::new(BTreeMap::new()),
77 shutdown: AtomicBool::new(false),
78 });
79 let inner_w = Arc::clone(&inner);
80 let worker = thread::Builder::new()
81 .name("ccm-timer-service".into())
82 .spawn(move || run_worker(&inner_w))
83 .ok();
84 Self { inner, worker }
85 }
86
87 pub fn create_one_shot(&self, delay: Duration, cb: Arc<dyn TimerCallback>) -> TimerHandle {
89 self.create_internal(TimerKind::OneShot, delay, delay, cb)
90 }
91
92 pub fn create_periodic(&self, period: Duration, cb: Arc<dyn TimerCallback>) -> TimerHandle {
94 self.create_internal(TimerKind::Periodic, period, period, cb)
95 }
96
97 fn create_internal(
98 &self,
99 kind: TimerKind,
100 delay: Duration,
101 period: Duration,
102 callback: Arc<dyn TimerCallback>,
103 ) -> TimerHandle {
104 let handle = TimerHandle(self.inner.next_handle.fetch_add(1, Ordering::Relaxed));
105 let entry = TimerEntry {
106 kind,
107 next_fire: Instant::now() + delay,
108 period,
109 callback,
110 };
111 if let Ok(mut g) = self.inner.timers.lock() {
112 g.insert(handle, entry);
113 }
114 handle
115 }
116
117 pub fn cancel(&self, handle: TimerHandle) -> bool {
119 self.inner
120 .timers
121 .lock()
122 .map(|mut g| g.remove(&handle).is_some())
123 .unwrap_or(false)
124 }
125
126 #[must_use]
128 pub fn active_count(&self) -> usize {
129 self.inner.timers.lock().map(|g| g.len()).unwrap_or(0)
130 }
131
132 pub fn shutdown(mut self) {
134 self.inner.shutdown.store(true, Ordering::Release);
135 if let Some(j) = self.worker.take() {
136 let _ = j.join();
137 }
138 }
139}
140
141impl Drop for TimerEventService {
142 fn drop(&mut self) {
143 self.inner.shutdown.store(true, Ordering::Release);
144 if let Some(j) = self.worker.take() {
145 let _ = j.join();
146 }
147 }
148}
149
150fn run_worker(inner: &Arc<ServiceInner>) {
151 let tick = Duration::from_millis(20);
152 while !inner.shutdown.load(Ordering::Acquire) {
153 let now = Instant::now();
154 let mut to_fire: alloc::vec::Vec<(TimerHandle, Arc<dyn TimerCallback>)> = alloc::vec![];
155 let mut to_remove: alloc::vec::Vec<TimerHandle> = alloc::vec![];
156 if let Ok(mut g) = inner.timers.lock() {
157 for (h, e) in g.iter_mut() {
158 if e.next_fire <= now {
159 to_fire.push((*h, Arc::clone(&e.callback)));
160 match e.kind {
161 TimerKind::OneShot => to_remove.push(*h),
162 TimerKind::Periodic => e.next_fire = now + e.period,
163 }
164 }
165 }
166 for h in &to_remove {
167 g.remove(h);
168 }
169 }
170 for (h, cb) in to_fire {
171 cb.fire(h);
172 }
173 thread::sleep(tick);
174 }
175}
176
177#[cfg(test)]
178#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
179mod tests {
180 use super::*;
181 use core::sync::atomic::AtomicUsize;
182
183 struct CountingCallback {
184 fired: Arc<AtomicUsize>,
185 }
186 impl TimerCallback for CountingCallback {
187 fn fire(&self, _: TimerHandle) {
188 self.fired.fetch_add(1, Ordering::Relaxed);
189 }
190 }
191
192 fn waitfor(c: &Arc<AtomicUsize>, target: usize, timeout: Duration) {
193 let start = Instant::now();
194 while c.load(Ordering::Relaxed) < target && start.elapsed() < timeout {
195 thread::sleep(Duration::from_millis(20));
196 }
197 }
198
199 #[test]
200 fn one_shot_fires_once() {
201 let svc = TimerEventService::new();
202 let counter = Arc::new(AtomicUsize::new(0));
203 let cb = Arc::new(CountingCallback {
204 fired: Arc::clone(&counter),
205 });
206 let _ = svc.create_one_shot(Duration::from_millis(50), cb);
207 waitfor(&counter, 1, Duration::from_secs(2));
208 assert_eq!(counter.load(Ordering::Relaxed), 1);
209 thread::sleep(Duration::from_millis(150));
211 assert_eq!(svc.active_count(), 0);
212 }
213
214 #[test]
215 fn periodic_fires_multiple_times() {
216 let svc = TimerEventService::new();
217 let counter = Arc::new(AtomicUsize::new(0));
218 let cb = Arc::new(CountingCallback {
219 fired: Arc::clone(&counter),
220 });
221 let h = svc.create_periodic(Duration::from_millis(50), cb);
222 waitfor(&counter, 3, Duration::from_secs(3));
223 assert!(counter.load(Ordering::Relaxed) >= 3);
224 svc.cancel(h);
225 }
226
227 #[test]
228 fn cancel_stops_periodic() {
229 let svc = TimerEventService::new();
230 let counter = Arc::new(AtomicUsize::new(0));
231 let cb = Arc::new(CountingCallback {
232 fired: Arc::clone(&counter),
233 });
234 let h = svc.create_periodic(Duration::from_millis(50), cb);
235 thread::sleep(Duration::from_millis(150));
236 assert!(svc.cancel(h));
237 let after = counter.load(Ordering::Relaxed);
238 thread::sleep(Duration::from_millis(200));
239 assert_eq!(counter.load(Ordering::Relaxed), after);
240 }
241
242 #[test]
243 fn cancel_unknown_returns_false() {
244 let svc = TimerEventService::new();
245 assert!(!svc.cancel(TimerHandle(9999)));
246 }
247}