compact_time/
source.rs

1use std::{sync::{atomic::{AtomicU64, Ordering, AtomicBool}, Arc}, thread::{self, JoinHandle}, time::Duration};
2use super::Time;
3
4/// Source of `Time` values which are computed asynchronously.
5///
6/// A `TimeSource` computes the current time in a fixed, configurable frequency, potentially
7/// saving the overhead of system calls but with less resolution.
8#[derive(Debug, Clone)]
9pub struct TimeSource(Arc<Inner>);
10
11#[derive(Debug)]
12struct Inner {
13    current: Arc<AtomicU64>,
14    stop: Arc<AtomicBool>,
15    thread: Option<JoinHandle<()>>
16}
17
18impl TimeSource {
19    /// Create a new `TimeSource` which gets the current time at the given frequency.
20    ///
21    /// A thread is spawned to asynchronously getting the time.
22    pub fn new(f: Duration) -> Self {
23        Self(Arc::new(Inner::new(f)))
24    }
25
26    /// Get the current time.
27    ///
28    /// The time resolution is less than or equal to the frequency with which the
29    /// time source has been created.
30    pub fn get(&self) -> Time {
31        self.0.get()
32    }
33}
34
35impl Inner {
36    fn new(f: Duration) -> Self {
37        let current1 = Arc::new(AtomicU64::new(Time::now().into()));
38        let current2 = current1.clone();
39
40        let stop1 = Arc::new(AtomicBool::new(false));
41        let stop2 = stop1.clone();
42
43        let thread = thread::spawn(move || {
44            while !stop2.load(Ordering::Acquire) {
45                current2.fetch_max(now(), Ordering::AcqRel);
46                thread::sleep(f)
47            }
48        });
49
50        Self {
51            current: current1,
52            stop: stop1,
53            thread: Some(thread)
54        }
55    }
56
57    fn get(&self) -> Time {
58        Time(self.current.load(Ordering::Acquire))
59    }
60}
61
62#[cfg(feature = "coarse")]
63fn now() -> u64 {
64    Time::coarse().into()
65}
66
67#[cfg(not(feature = "coarse"))]
68fn now() -> u64 {
69    Time::now().into()
70}
71
72impl Drop for Inner {
73    fn drop(&mut self) {
74        if let Some(t) = self.thread.take() {
75            self.stop.store(true, Ordering::Release);
76            let _ = t.join();
77        }
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use std::thread;
84    use std::time::Duration;
85    use super::TimeSource;
86
87    #[test]
88    fn smoke() {
89        let ts = TimeSource::new(Duration::from_secs(1));
90        for _ in 0 .. 10 {
91            println!("{:?}", ts.get().to_utc_string());
92            thread::sleep(Duration::from_millis(500))
93        }
94    }
95}