Skip to main content

zerodds_corba_ccm/
timer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! TimerEventService — Spec OMG Time 1.1 §2.2-§2.4.
5//!
6//! Liefert One-Shot- und Periodic-Timer, die in einen
7//! Container-Worker-Thread eingebettet sind. Bei Feuerung wird ein
8//! `TimerCallback` aufgerufen.
9
10use alloc::collections::BTreeMap;
11use alloc::sync::Arc;
12use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::sync::Mutex;
14use std::thread::{self, JoinHandle};
15use std::time::{Duration, Instant};
16
17/// Timer-Kind.
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum TimerKind {
20    /// One-Shot: feuert genau einmal.
21    OneShot,
22    /// Periodic: feuert wiederholt mit dem Periode-Intervall.
23    Periodic,
24}
25
26/// Timer-Handle (vom Service vergeben).
27#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
28pub struct TimerHandle(pub u64);
29
30/// Callback-Trait fuer Timer-Feuerung.
31pub trait TimerCallback: Send + Sync {
32    /// Wird vom Service-Thread aufgerufen, wenn der Timer feuert.
33    fn fire(&self, handle: TimerHandle);
34}
35
36struct TimerEntry {
37    kind: TimerKind,
38    next_fire: Instant,
39    period: Duration,
40    callback: Arc<dyn TimerCallback>,
41}
42
43struct ServiceInner {
44    next_handle: AtomicU64,
45    timers: Mutex<BTreeMap<TimerHandle, TimerEntry>>,
46    shutdown: AtomicBool,
47}
48
49/// TimerEventService — startet einen Worker-Thread.
50pub struct TimerEventService {
51    inner: Arc<ServiceInner>,
52    worker: Option<JoinHandle<()>>,
53}
54
55impl core::fmt::Debug for TimerEventService {
56    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
57        let n = self.inner.timers.lock().ok().map(|g| g.len()).unwrap_or(0);
58        f.debug_struct("TimerEventService")
59            .field("active_timers", &n)
60            .finish()
61    }
62}
63
64impl Default for TimerEventService {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70impl TimerEventService {
71    /// Konstruktor — startet Worker-Thread.
72    #[must_use]
73    pub fn new() -> Self {
74        let inner = Arc::new(ServiceInner {
75            next_handle: AtomicU64::new(1),
76            timers: Mutex::new(BTreeMap::new()),
77            shutdown: AtomicBool::new(false),
78        });
79        let inner_w = Arc::clone(&inner);
80        let worker = thread::Builder::new()
81            .name("ccm-timer-service".into())
82            .spawn(move || run_worker(&inner_w))
83            .ok();
84        Self { inner, worker }
85    }
86
87    /// Erstellt einen One-Shot-Timer, der nach `delay` feuert.
88    pub fn create_one_shot(&self, delay: Duration, cb: Arc<dyn TimerCallback>) -> TimerHandle {
89        self.create_internal(TimerKind::OneShot, delay, delay, cb)
90    }
91
92    /// Erstellt einen Periodic-Timer mit `period`-Intervall.
93    pub fn create_periodic(&self, period: Duration, cb: Arc<dyn TimerCallback>) -> TimerHandle {
94        self.create_internal(TimerKind::Periodic, period, period, cb)
95    }
96
97    fn create_internal(
98        &self,
99        kind: TimerKind,
100        delay: Duration,
101        period: Duration,
102        callback: Arc<dyn TimerCallback>,
103    ) -> TimerHandle {
104        let handle = TimerHandle(self.inner.next_handle.fetch_add(1, Ordering::Relaxed));
105        let entry = TimerEntry {
106            kind,
107            next_fire: Instant::now() + delay,
108            period,
109            callback,
110        };
111        if let Ok(mut g) = self.inner.timers.lock() {
112            g.insert(handle, entry);
113        }
114        handle
115    }
116
117    /// Cancelt einen Timer.
118    pub fn cancel(&self, handle: TimerHandle) -> bool {
119        self.inner
120            .timers
121            .lock()
122            .map(|mut g| g.remove(&handle).is_some())
123            .unwrap_or(false)
124    }
125
126    /// Anzahl aktiver Timer.
127    #[must_use]
128    pub fn active_count(&self) -> usize {
129        self.inner.timers.lock().map(|g| g.len()).unwrap_or(0)
130    }
131
132    /// Stoppt den Service.
133    pub fn shutdown(mut self) {
134        self.inner.shutdown.store(true, Ordering::Release);
135        if let Some(j) = self.worker.take() {
136            let _ = j.join();
137        }
138    }
139}
140
141impl Drop for TimerEventService {
142    fn drop(&mut self) {
143        self.inner.shutdown.store(true, Ordering::Release);
144        if let Some(j) = self.worker.take() {
145            let _ = j.join();
146        }
147    }
148}
149
150fn run_worker(inner: &Arc<ServiceInner>) {
151    let tick = Duration::from_millis(20);
152    while !inner.shutdown.load(Ordering::Acquire) {
153        let now = Instant::now();
154        let mut to_fire: alloc::vec::Vec<(TimerHandle, Arc<dyn TimerCallback>)> = alloc::vec![];
155        let mut to_remove: alloc::vec::Vec<TimerHandle> = alloc::vec![];
156        if let Ok(mut g) = inner.timers.lock() {
157            for (h, e) in g.iter_mut() {
158                if e.next_fire <= now {
159                    to_fire.push((*h, Arc::clone(&e.callback)));
160                    match e.kind {
161                        TimerKind::OneShot => to_remove.push(*h),
162                        TimerKind::Periodic => e.next_fire = now + e.period,
163                    }
164                }
165            }
166            for h in &to_remove {
167                g.remove(h);
168            }
169        }
170        for (h, cb) in to_fire {
171            cb.fire(h);
172        }
173        thread::sleep(tick);
174    }
175}
176
177#[cfg(test)]
178#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
179mod tests {
180    use super::*;
181    use core::sync::atomic::AtomicUsize;
182
183    struct CountingCallback {
184        fired: Arc<AtomicUsize>,
185    }
186    impl TimerCallback for CountingCallback {
187        fn fire(&self, _: TimerHandle) {
188            self.fired.fetch_add(1, Ordering::Relaxed);
189        }
190    }
191
192    fn waitfor(c: &Arc<AtomicUsize>, target: usize, timeout: Duration) {
193        let start = Instant::now();
194        while c.load(Ordering::Relaxed) < target && start.elapsed() < timeout {
195            thread::sleep(Duration::from_millis(20));
196        }
197    }
198
199    #[test]
200    fn one_shot_fires_once() {
201        let svc = TimerEventService::new();
202        let counter = Arc::new(AtomicUsize::new(0));
203        let cb = Arc::new(CountingCallback {
204            fired: Arc::clone(&counter),
205        });
206        let _ = svc.create_one_shot(Duration::from_millis(50), cb);
207        waitfor(&counter, 1, Duration::from_secs(2));
208        assert_eq!(counter.load(Ordering::Relaxed), 1);
209        // Nach Feuerung ist der One-Shot-Timer raus.
210        thread::sleep(Duration::from_millis(150));
211        assert_eq!(svc.active_count(), 0);
212    }
213
214    #[test]
215    fn periodic_fires_multiple_times() {
216        let svc = TimerEventService::new();
217        let counter = Arc::new(AtomicUsize::new(0));
218        let cb = Arc::new(CountingCallback {
219            fired: Arc::clone(&counter),
220        });
221        let h = svc.create_periodic(Duration::from_millis(50), cb);
222        waitfor(&counter, 3, Duration::from_secs(3));
223        assert!(counter.load(Ordering::Relaxed) >= 3);
224        svc.cancel(h);
225    }
226
227    #[test]
228    fn cancel_stops_periodic() {
229        let svc = TimerEventService::new();
230        let counter = Arc::new(AtomicUsize::new(0));
231        let cb = Arc::new(CountingCallback {
232            fired: Arc::clone(&counter),
233        });
234        let h = svc.create_periodic(Duration::from_millis(50), cb);
235        thread::sleep(Duration::from_millis(150));
236        assert!(svc.cancel(h));
237        let after = counter.load(Ordering::Relaxed);
238        thread::sleep(Duration::from_millis(200));
239        assert_eq!(counter.load(Ordering::Relaxed), after);
240    }
241
242    #[test]
243    fn cancel_unknown_returns_false() {
244        let svc = TimerEventService::new();
245        assert!(!svc.cancel(TimerHandle(9999)));
246    }
247}