fluvio_future/
doomsday.rs1use std::fmt::Display;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::{
4 sync::Arc,
5 time::{Duration, Instant},
6};
7
8use async_lock::Mutex;
9use tracing::{debug, error, info};
10
11use crate::task::Task;
12
13#[derive(Clone)]
14pub struct DoomsdayTimer {
17 time_to_explode: Arc<Mutex<Instant>>,
18 duration: Duration,
19 defused: Arc<AtomicBool>,
20 aggressive_mode: bool,
21}
22
23impl Display for DoomsdayTimer {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 let fail_mode = if self.aggressive_mode {
26 "Exits"
27 } else {
28 "Panics"
29 };
30 write!(
31 f,
32 "DoomsdayTimer(Duration: {:?}, {})",
33 self.duration, fail_mode,
34 )
35 }
36}
37
38impl DoomsdayTimer {
39 pub fn spawn(duration: Duration, exit_on_explode: bool) -> (Self, Task<()>) {
43 let s = Self {
44 time_to_explode: Arc::new(Mutex::new(Instant::now() + duration)),
45 duration,
46 defused: Default::default(),
47 aggressive_mode: exit_on_explode,
48 };
49
50 let cloned = s.clone();
51 let jh = crate::task::spawn_task(async move {
52 cloned.main_loop().await;
53 });
54 (s, jh)
55 }
56
57 pub async fn reset(&self) {
59 let new_time_to_explode = Instant::now() + self.duration;
60 *self.time_to_explode.lock().await = new_time_to_explode;
61 debug!("{} has been reset", self);
62 }
63
64 async fn main_loop(&self) {
65 loop {
66 if self.defused.load(Ordering::Relaxed) {
67 debug!("{} has been defused, terminating main loop", self);
68 return;
69 }
70 let now = Instant::now();
71 let time_to_explode = *self.time_to_explode.lock().await;
72 if now > time_to_explode {
73 error!("{} exploded due to timeout", self);
74 self.explode_inner();
75 } else {
76 let time_to_sleep = time_to_explode - now;
77 crate::timer::sleep(time_to_sleep).await;
78 }
79 }
80 }
81
82 pub fn explode(&self) {
84 error!("{} was exploded manually", self);
85 self.explode_inner();
86 }
87
88 fn explode_inner(&self) {
89 if self.aggressive_mode {
90 error!("{} exiting", self);
91 std::process::exit(1);
92 } else {
93 error!("{} panicking", self);
94 panic!("DoomsdayTimer with Duration {:?} exploded", self.duration);
95 }
96 }
97 pub fn defuse(&self) {
99 self.defused.store(true, Ordering::Relaxed);
100 info!("{} has been defused", self);
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use std::time::Duration;
107
108 use super::DoomsdayTimer;
109 use crate::task::run_block_on;
110 use crate::test_async;
111 use std::io::Error;
112
113 #[test_async(should_panic)]
114 async fn test_explode() -> Result<(), Error> {
115 let (_, jh) = DoomsdayTimer::spawn(Duration::from_millis(1), false);
116 crate::timer::sleep(Duration::from_millis(2)).await;
117 jh.await;
118 Ok(())
119 }
120
121 #[test_async]
122 async fn test_do_not_explode() -> Result<(), Error> {
123 let (bomb, jh) = DoomsdayTimer::spawn(Duration::from_millis(10), false);
124 crate::timer::sleep(Duration::from_millis(5)).await;
125 bomb.reset().await;
126 crate::timer::sleep(Duration::from_millis(5)).await;
127 bomb.reset().await;
128 crate::timer::sleep(Duration::from_millis(5)).await;
129 bomb.defuse();
130 run_block_on(jh);
131 Ok(())
132 }
133}