fluvio_future/
doomsday.rs

1use std::fmt::Display;
2use std::sync::atomic::{AtomicBool, Ordering};
3
4use std::{
5    sync::Arc,
6    time::{Duration, Instant},
7};
8
9use tracing::{debug, error, info};
10
11use crate::sync::Mutex;
12use crate::task::JoinHandle;
13
14#[derive(Clone)]
15/// DoomsdayTimer will configurably panic or exit if it is not
16/// `reset()` at least every `duration`
17pub struct DoomsdayTimer {
18    time_to_explode: Arc<Mutex<Instant>>,
19    duration: Duration,
20    defused: Arc<AtomicBool>,
21    aggressive_mode: bool,
22}
23
24impl Display for DoomsdayTimer {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        let fail_mode = if self.aggressive_mode {
27            "Exits"
28        } else {
29            "Panics"
30        };
31        write!(
32            f,
33            "DoomsdayTimer(Duration: {:?}, {})",
34            self.duration, fail_mode,
35        )
36    }
37}
38
39impl DoomsdayTimer {
40    /// Spawn a new doomsday timer.
41    /// If `exit_on_explode` is true, it will terminate process with `exit(1)` if it explodes.
42    /// Otherwise it will call `panic()`. Note that `awaiting` on the jh will panic if the `DoomsdayTimer` panicked
43    pub fn spawn(duration: Duration, exit_on_explode: bool) -> (Self, JoinHandle<()>) {
44        let s = Self {
45            time_to_explode: Arc::new(Mutex::new(Instant::now() + duration)),
46            duration,
47            defused: Default::default(),
48            aggressive_mode: exit_on_explode,
49        };
50
51        let cloned = s.clone();
52        let jh = crate::task::spawn(async move {
53            cloned.main_loop().await;
54        });
55        (s, jh)
56    }
57
58    /// Reset the timer to it's full duration
59    pub async fn reset(&self) {
60        let new_time_to_explode = Instant::now() + self.duration;
61        *self.time_to_explode.lock().await = new_time_to_explode;
62        debug!("{} has been reset", self);
63    }
64
65    async fn main_loop(&self) {
66        loop {
67            if self.defused.load(Ordering::Relaxed) {
68                debug!("{} has been defused, terminating main loop", self);
69                return;
70            }
71            let now = Instant::now();
72            let time_to_explode = *self.time_to_explode.lock().await;
73            if now > time_to_explode {
74                error!("{} exploded due to timeout", self);
75                self.explode_inner();
76            } else {
77                let time_to_sleep = time_to_explode - now;
78                crate::timer::sleep(time_to_sleep).await;
79            }
80        }
81    }
82
83    /// Force the timer to explode
84    pub fn explode(&self) {
85        error!("{} was exploded manually", self);
86        self.explode_inner();
87    }
88
89    fn explode_inner(&self) {
90        if self.aggressive_mode {
91            error!("{} exiting", self);
92            std::process::exit(1);
93        } else {
94            error!("{} panicking", self);
95            panic!("DoomsdayTimer with Duration {:?} exploded", self.duration);
96        }
97    }
98    /// Defuse the timer. Cannot be undone and will no longer `explode`
99    pub fn defuse(&self) {
100        self.defused.store(true, Ordering::Relaxed);
101        info!("{} has been defused", self);
102    }
103}
104
105#[cfg(test)]
106mod tests {
107    use std::time::Duration;
108
109    use super::DoomsdayTimer;
110    use crate::task::run_block_on;
111    use crate::test_async;
112    use std::io::Error;
113
114    #[test_async(should_panic)]
115    async fn test_explode() -> Result<(), Error> {
116        let (_, jh) = DoomsdayTimer::spawn(Duration::from_millis(1), false);
117        crate::timer::sleep(Duration::from_millis(2)).await;
118        jh.await;
119        Ok(())
120    }
121
122    #[test_async]
123    async fn test_do_not_explode() -> Result<(), Error> {
124        let (bomb, jh) = DoomsdayTimer::spawn(Duration::from_millis(10), false);
125        crate::timer::sleep(Duration::from_millis(5)).await;
126        bomb.reset().await;
127        crate::timer::sleep(Duration::from_millis(5)).await;
128        bomb.reset().await;
129        crate::timer::sleep(Duration::from_millis(5)).await;
130        bomb.defuse();
131        run_block_on(jh);
132        Ok(())
133    }
134}