async_liveliness_monitor/
support.rs

1use std::{
2    sync::{
3        atomic::{AtomicI64, Ordering},
4        Weak,
5    },
6    time::{Duration, Instant},
7};
8
9use crate::LivelinessMonitor;
10
11/// A signed [`Duration`] stored as an [`AtomicI64`].
12///
13/// Resolution: 1µs
14/// Maximum offset: 278737 years (signed)
15pub struct AtomicDuration {
16    t: AtomicI64,
17}
18const SHIFT: i64 = 20;
19const MASK: i64 = 0xfffff;
20/// A sign to be paired with a [`Duration`].
21#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
22pub enum Sign {
23    #[default]
24    Positive,
25    Negative,
26}
27impl AtomicDuration {
28    const fn i64_to_duration(mut t: i64) -> (Duration, Sign) {
29        let sign = if t.is_negative() {
30            t = -t;
31            Sign::Negative
32        } else {
33            Sign::Positive
34        };
35        let micros = (t & MASK) as u32;
36        let secs = t >> SHIFT;
37        (Duration::new(secs as u64, micros * 1000), sign)
38    }
39    const fn duration_to_i64(t: Duration, sign: Sign) -> i64 {
40        let t = ((t.as_secs() as i64) << SHIFT) + (t.subsec_micros() as i64);
41        match sign {
42            Sign::Positive => t,
43            Sign::Negative => -t,
44        }
45    }
46    /// This type's time resolution.
47    pub const RESOLUTION: Duration = Self::i64_to_duration(1).0;
48    /// This type's maximum value.
49    pub const MAX: Duration = Self::i64_to_duration(i64::MAX).0;
50    /// Atomically loads the stored value, converting it to a duration-sign tuple.
51    ///
52    /// The [`Ordering`] is used in a single `load` operation.
53    pub fn load(&self, ord: Ordering) -> (Duration, Sign) {
54        Self::i64_to_duration(self.t.load(ord))
55    }
56    /// Converts the duration-sign tuple into before storing it atomically.
57    ///
58    /// The [`Ordering`] is used in a single `store` operation.
59    pub fn store(&self, duration: Duration, sign: Sign, ord: Ordering) {
60        self.t.store(Self::duration_to_i64(duration, sign), ord)
61    }
62    pub fn new(duration: Duration, sign: Sign) -> Self {
63        Self {
64            t: AtomicI64::new(Self::duration_to_i64(duration, sign)),
65        }
66    }
67}
68
69/// An [`Instant`], stored as an [`AtomicDuration`] offset from an arbitrary epoch.
70///
71/// Due to [`Instant`] not having any const epoch, that epoch is taken by calling [`Instant::now()`] at construction.
72/// [`AtomicInstant`]'s range and resolution are bound by those of [`AtomicDuration`].
73///
74/// Defaults to [`Instant::now()`].
75pub struct AtomicInstant {
76    epoch: Instant,
77    since_epoch: AtomicDuration,
78}
79impl Default for AtomicInstant {
80    fn default() -> Self {
81        Self::new(Instant::now())
82    }
83}
84impl AtomicInstant {
85    /// Constructs a new [`AtomicInstant`], using [`Instant::now()`] as epoch.
86    pub fn new(instant: Instant) -> Self {
87        let epoch = Instant::now();
88        let (duration, sign) = if epoch > instant {
89            (epoch - instant, Sign::Negative)
90        } else {
91            (instant - epoch, Sign::Positive)
92        };
93        AtomicInstant {
94            epoch,
95            since_epoch: AtomicDuration::new(duration, sign),
96        }
97    }
98    /// Atomically loads the internal atomic using the specified [`Ordering`], and uses it to reconstruct the corresponding [`Instant`].
99    pub fn load(&self, ord: Ordering) -> Instant {
100        let (duration, sign) = self.since_epoch.load(ord);
101        match sign {
102            Sign::Positive => self.epoch + duration,
103            Sign::Negative => self.epoch - duration,
104        }
105    }
106    /// Converts the [`Instant`] into an atomically storeable value, and stores it atomically using the specified [`Ordering`].
107    pub fn store(&self, instant: Instant, ord: Ordering) {
108        match instant.checked_duration_since(self.epoch) {
109            Some(duration) => {
110                self.since_epoch.store(duration, Sign::Positive, ord);
111            }
112            None => self
113                .since_epoch
114                .store(self.epoch - instant, Sign::Negative, ord),
115        }
116    }
117    /// A shortcut for `self.store(std::time::Instant::now(), ord)`.
118    pub fn store_now(&self, ord: Ordering) {
119        self.store(Instant::now(), ord)
120    }
121}
122
123pub struct LivelinessMonitorFuture {
124    pub(crate) monitor: Weak<LivelinessMonitor>,
125}
126impl std::future::Future for LivelinessMonitorFuture {
127    type Output = ();
128    fn poll(
129        self: std::pin::Pin<&mut Self>,
130        cx: &mut std::task::Context<'_>,
131    ) -> std::task::Poll<Self::Output> {
132        match self.get_mut().monitor.upgrade() {
133            Some(monitor) => {
134                monitor.latest_report.store_now(Ordering::Relaxed);
135                cx.waker().wake_by_ref();
136                std::task::Poll::Pending
137            }
138            None => std::task::Poll::Ready(()),
139        }
140    }
141}