fluvio_future/
doomsday.rs1use std::fmt::Display;
2use std::sync::atomic::{AtomicBool, Ordering};
3
4use std::{
5 sync::Arc,
6 time::{Duration, Instant},
7};
8
9use tracing::{debug, error, info};
10
11use crate::sync::Mutex;
12use crate::task::JoinHandle;
13
14#[derive(Clone)]
15pub struct DoomsdayTimer {
18 time_to_explode: Arc<Mutex<Instant>>,
19 duration: Duration,
20 defused: Arc<AtomicBool>,
21 aggressive_mode: bool,
22}
23
24impl Display for DoomsdayTimer {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 let fail_mode = if self.aggressive_mode {
27 "Exits"
28 } else {
29 "Panics"
30 };
31 write!(
32 f,
33 "DoomsdayTimer(Duration: {:?}, {})",
34 self.duration, fail_mode,
35 )
36 }
37}
38
39impl DoomsdayTimer {
40 pub fn spawn(duration: Duration, exit_on_explode: bool) -> (Self, JoinHandle<()>) {
44 let s = Self {
45 time_to_explode: Arc::new(Mutex::new(Instant::now() + duration)),
46 duration,
47 defused: Default::default(),
48 aggressive_mode: exit_on_explode,
49 };
50
51 let cloned = s.clone();
52 let jh = crate::task::spawn(async move {
53 cloned.main_loop().await;
54 });
55 (s, jh)
56 }
57
58 pub async fn reset(&self) {
60 let new_time_to_explode = Instant::now() + self.duration;
61 *self.time_to_explode.lock().await = new_time_to_explode;
62 debug!("{} has been reset", self);
63 }
64
65 async fn main_loop(&self) {
66 loop {
67 if self.defused.load(Ordering::Relaxed) {
68 debug!("{} has been defused, terminating main loop", self);
69 return;
70 }
71 let now = Instant::now();
72 let time_to_explode = *self.time_to_explode.lock().await;
73 if now > time_to_explode {
74 error!("{} exploded due to timeout", self);
75 self.explode_inner();
76 } else {
77 let time_to_sleep = time_to_explode - now;
78 crate::timer::sleep(time_to_sleep).await;
79 }
80 }
81 }
82
83 pub fn explode(&self) {
85 error!("{} was exploded manually", self);
86 self.explode_inner();
87 }
88
89 fn explode_inner(&self) {
90 if self.aggressive_mode {
91 error!("{} exiting", self);
92 std::process::exit(1);
93 } else {
94 error!("{} panicking", self);
95 panic!("DoomsdayTimer with Duration {:?} exploded", self.duration);
96 }
97 }
98 pub fn defuse(&self) {
100 self.defused.store(true, Ordering::Relaxed);
101 info!("{} has been defused", self);
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use std::time::Duration;
108
109 use super::DoomsdayTimer;
110 use crate::task::run_block_on;
111 use crate::test_async;
112 use std::io::Error;
113
114 #[test_async(should_panic)]
115 async fn test_explode() -> Result<(), Error> {
116 let (_, jh) = DoomsdayTimer::spawn(Duration::from_millis(1), false);
117 crate::timer::sleep(Duration::from_millis(2)).await;
118 jh.await;
119 Ok(())
120 }
121
122 #[test_async]
123 async fn test_do_not_explode() -> Result<(), Error> {
124 let (bomb, jh) = DoomsdayTimer::spawn(Duration::from_millis(10), false);
125 crate::timer::sleep(Duration::from_millis(5)).await;
126 bomb.reset().await;
127 crate::timer::sleep(Duration::from_millis(5)).await;
128 bomb.reset().await;
129 crate::timer::sleep(Duration::from_millis(5)).await;
130 bomb.defuse();
131 run_block_on(jh);
132 Ok(())
133 }
134}