async_liveliness_monitor/
support.rs1use std::{
2 sync::{
3 atomic::{AtomicI64, Ordering},
4 Weak,
5 },
6 time::{Duration, Instant},
7};
8
9use crate::LivelinessMonitor;
10
11pub struct AtomicDuration {
16 t: AtomicI64,
17}
18const SHIFT: i64 = 20;
19const MASK: i64 = 0xfffff;
20#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
22pub enum Sign {
23 #[default]
24 Positive,
25 Negative,
26}
27impl AtomicDuration {
28 const fn i64_to_duration(mut t: i64) -> (Duration, Sign) {
29 let sign = if t.is_negative() {
30 t = -t;
31 Sign::Negative
32 } else {
33 Sign::Positive
34 };
35 let micros = (t & MASK) as u32;
36 let secs = t >> SHIFT;
37 (Duration::new(secs as u64, micros * 1000), sign)
38 }
39 const fn duration_to_i64(t: Duration, sign: Sign) -> i64 {
40 let t = ((t.as_secs() as i64) << SHIFT) + (t.subsec_micros() as i64);
41 match sign {
42 Sign::Positive => t,
43 Sign::Negative => -t,
44 }
45 }
46 pub const RESOLUTION: Duration = Self::i64_to_duration(1).0;
48 pub const MAX: Duration = Self::i64_to_duration(i64::MAX).0;
50 pub fn load(&self, ord: Ordering) -> (Duration, Sign) {
54 Self::i64_to_duration(self.t.load(ord))
55 }
56 pub fn store(&self, duration: Duration, sign: Sign, ord: Ordering) {
60 self.t.store(Self::duration_to_i64(duration, sign), ord)
61 }
62 pub fn new(duration: Duration, sign: Sign) -> Self {
63 Self {
64 t: AtomicI64::new(Self::duration_to_i64(duration, sign)),
65 }
66 }
67}
68
69pub struct AtomicInstant {
76 epoch: Instant,
77 since_epoch: AtomicDuration,
78}
79impl Default for AtomicInstant {
80 fn default() -> Self {
81 Self::new(Instant::now())
82 }
83}
84impl AtomicInstant {
85 pub fn new(instant: Instant) -> Self {
87 let epoch = Instant::now();
88 let (duration, sign) = if epoch > instant {
89 (epoch - instant, Sign::Negative)
90 } else {
91 (instant - epoch, Sign::Positive)
92 };
93 AtomicInstant {
94 epoch,
95 since_epoch: AtomicDuration::new(duration, sign),
96 }
97 }
98 pub fn load(&self, ord: Ordering) -> Instant {
100 let (duration, sign) = self.since_epoch.load(ord);
101 match sign {
102 Sign::Positive => self.epoch + duration,
103 Sign::Negative => self.epoch - duration,
104 }
105 }
106 pub fn store(&self, instant: Instant, ord: Ordering) {
108 match instant.checked_duration_since(self.epoch) {
109 Some(duration) => {
110 self.since_epoch.store(duration, Sign::Positive, ord);
111 }
112 None => self
113 .since_epoch
114 .store(self.epoch - instant, Sign::Negative, ord),
115 }
116 }
117 pub fn store_now(&self, ord: Ordering) {
119 self.store(Instant::now(), ord)
120 }
121}
122
123pub struct LivelinessMonitorFuture {
124 pub(crate) monitor: Weak<LivelinessMonitor>,
125}
126impl std::future::Future for LivelinessMonitorFuture {
127 type Output = ();
128 fn poll(
129 self: std::pin::Pin<&mut Self>,
130 cx: &mut std::task::Context<'_>,
131 ) -> std::task::Poll<Self::Output> {
132 match self.get_mut().monitor.upgrade() {
133 Some(monitor) => {
134 monitor.latest_report.store_now(Ordering::Relaxed);
135 cx.waker().wake_by_ref();
136 std::task::Poll::Pending
137 }
138 None => std::task::Poll::Ready(()),
139 }
140 }
141}