fluvio_future/
doomsday.rs

1use std::fmt::Display;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::{
4    sync::Arc,
5    time::{Duration, Instant},
6};
7
8use async_lock::Mutex;
9use tracing::{debug, error, info};
10
11use crate::task::Task;
12
13#[derive(Clone)]
14/// DoomsdayTimer will configurably panic or exit if it is not
15/// `reset()` at least every `duration`
16pub struct DoomsdayTimer {
17    time_to_explode: Arc<Mutex<Instant>>,
18    duration: Duration,
19    defused: Arc<AtomicBool>,
20    aggressive_mode: bool,
21}
22
23impl Display for DoomsdayTimer {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        let fail_mode = if self.aggressive_mode {
26            "Exits"
27        } else {
28            "Panics"
29        };
30        write!(
31            f,
32            "DoomsdayTimer(Duration: {:?}, {})",
33            self.duration, fail_mode,
34        )
35    }
36}
37
38impl DoomsdayTimer {
39    /// Spawn a new doomsday timer.
40    /// If `exit_on_explode` is true, it will terminate process with `exit(1)` if it explodes.
41    /// Otherwise it will call `panic()`. Note that `awaiting` on the jh will panic if the `DoomsdayTimer` panicked
42    pub fn spawn(duration: Duration, exit_on_explode: bool) -> (Self, Task<()>) {
43        let s = Self {
44            time_to_explode: Arc::new(Mutex::new(Instant::now() + duration)),
45            duration,
46            defused: Default::default(),
47            aggressive_mode: exit_on_explode,
48        };
49
50        let cloned = s.clone();
51        let jh = crate::task::spawn_task(async move {
52            cloned.main_loop().await;
53        });
54        (s, jh)
55    }
56
57    /// Reset the timer to it's full duration
58    pub async fn reset(&self) {
59        let new_time_to_explode = Instant::now() + self.duration;
60        *self.time_to_explode.lock().await = new_time_to_explode;
61        debug!("{} has been reset", self);
62    }
63
64    async fn main_loop(&self) {
65        loop {
66            if self.defused.load(Ordering::Relaxed) {
67                debug!("{} has been defused, terminating main loop", self);
68                return;
69            }
70            let now = Instant::now();
71            let time_to_explode = *self.time_to_explode.lock().await;
72            if now > time_to_explode {
73                error!("{} exploded due to timeout", self);
74                self.explode_inner();
75            } else {
76                let time_to_sleep = time_to_explode - now;
77                crate::timer::sleep(time_to_sleep).await;
78            }
79        }
80    }
81
82    /// Force the timer to explode
83    pub fn explode(&self) {
84        error!("{} was exploded manually", self);
85        self.explode_inner();
86    }
87
88    fn explode_inner(&self) {
89        if self.aggressive_mode {
90            error!("{} exiting", self);
91            std::process::exit(1);
92        } else {
93            error!("{} panicking", self);
94            panic!("DoomsdayTimer with Duration {:?} exploded", self.duration);
95        }
96    }
97    /// Defuse the timer. Cannot be undone and will no longer `explode`
98    pub fn defuse(&self) {
99        self.defused.store(true, Ordering::Relaxed);
100        info!("{} has been defused", self);
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use std::time::Duration;
107
108    use super::DoomsdayTimer;
109    use crate::task::run_block_on;
110    use crate::test_async;
111    use std::io::Error;
112
113    #[test_async(should_panic)]
114    async fn test_explode() -> Result<(), Error> {
115        let (_, jh) = DoomsdayTimer::spawn(Duration::from_millis(1), false);
116        crate::timer::sleep(Duration::from_millis(2)).await;
117        jh.await;
118        Ok(())
119    }
120
121    #[test_async]
122    async fn test_do_not_explode() -> Result<(), Error> {
123        let (bomb, jh) = DoomsdayTimer::spawn(Duration::from_millis(10), false);
124        crate::timer::sleep(Duration::from_millis(5)).await;
125        bomb.reset().await;
126        crate::timer::sleep(Duration::from_millis(5)).await;
127        bomb.reset().await;
128        crate::timer::sleep(Duration::from_millis(5)).await;
129        bomb.defuse();
130        run_block_on(jh);
131        Ok(())
132    }
133}