cyfs_debug/check/
dead.rs

1use crate::DebugConfig;
2use cyfs_base::*;
3
4use once_cell::sync::OnceCell;
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6use std::sync::Arc;
7
8
9#[derive(Clone)]
10pub struct ProcessDeadHelper {
11    interval_in_secs: u64,
12
13    task_system_last_active: Arc<AtomicU64>,
14
15    exit_on_task_system_dead: Arc<AtomicU64>,
16
17    exit_on_dead: bool,
18}
19
20impl ProcessDeadHelper {
21    fn new(interval_in_secs: u64) -> Self {
22        let exit_on_dead = match get_channel() {
23            CyfsChannel::Nightly => false,
24            _ => true,
25        };
26
27        let mut ret = Self {
28            interval_in_secs,
29            task_system_last_active: Arc::new(AtomicU64::new(bucky_time_now())),
30            exit_on_task_system_dead: Arc::new(AtomicU64::new(0)),
31            exit_on_dead,
32        };
33
34        ret.load_config();
35        ret
36    }
37
38    fn load_config(&mut self) {
39        if let Some(config_node) = DebugConfig::get_config("check") {
40            if let Err(e) = self.load_config_value(config_node) {
41                println!("load process dead check config error! {}", e);
42            }
43        }
44    }
45
46    fn load_config_value(&mut self, config_node: &toml::Value) -> BuckyResult<()> {
47        let node = config_node.as_table().ok_or_else(|| {
48            let msg = format!("invalid debug config format! content={}", config_node,);
49            error!("{}", msg);
50
51            BuckyError::new(BuckyErrorCode::InvalidFormat, msg)
52        })?;
53
54        for (k, v) in node {
55            match k.as_str() {
56                "exit_on_dead" => {
57                    if let Some(v) = v.as_bool() {
58                        println!("load check.exit_on_dead from config: {}, current={}", v, self.exit_on_dead);
59                        self.exit_on_dead = v;
60                    } else {
61                        println!("unknown exit_on_dead config node: {:?}", v);
62                    }
63                }
64
65                key @ _ => {
66                    println!("unknown check config node: {}={:?}", key, v);
67                }
68            }
69        }
70
71        Ok(())
72    }
73
74    pub fn patch_task_min_thread() {
75        let cpu_nums = num_cpus::get();
76        if cpu_nums <= 1 {
77            const KEY: &str = "ASYNC_STD_THREAD_COUNT";
78            if std::env::var(KEY).is_err() {
79                std::env::set_var(KEY, "2");
80            }
81        }
82    }
83
84    pub fn instance() -> &'static Self {
85        static INSTANCE: OnceCell<ProcessDeadHelper> = OnceCell::new();
86        INSTANCE.get_or_init(|| Self::new(60))
87    }
88
89    pub fn start_check(&self) {
90        static INIT_DONE: AtomicBool = AtomicBool::new(false);
91        if !INIT_DONE.swap(true, Ordering::SeqCst) {
92            self.start_check_process();
93            self.start_check_task_system();
94        }
95    }
96
97    pub fn enable_exit_on_task_system_dead(&self, timeout_in_secs: Option<u64>) {
98        let v = timeout_in_secs.unwrap_or(60 * 5) * 1000 * 1000;
99        self.exit_on_task_system_dead.store(v, Ordering::SeqCst);
100        if v > 0 {
101            info!("enable exit on task system dead: timeout={}", v);
102            self.start_check();
103        } else {
104            info!("disable exit on task system dead");
105        }
106    }
107
108    fn update_task_alive(&self) {
109        let now = bucky_time_now();
110        self.task_system_last_active.store(now, Ordering::SeqCst);
111    }
112
113    fn check_task_alive(&self) {
114        let exit_timeout = self.exit_on_task_system_dead.load(Ordering::SeqCst);
115        if exit_timeout == 0 || !self.exit_on_dead {
116            return;
117        }
118
119        let now = bucky_time_now();
120        let last_active = self.task_system_last_active.load(Ordering::SeqCst);
121        if now >= last_active && now - last_active >= exit_timeout {
122            error!(
123                "task system dead timeout, now will exit process! last_active={}, exit_timeout={}s",
124                last_active,
125                exit_timeout / (1000 * 1000)
126            );
127            println!("process will exit on task system dead...");
128
129            let ins = crate::dump::DumpHelper::get_instance();
130            if ins.is_enable_dump() {
131                ins.dump();
132            }
133 
134            std::thread::sleep(std::time::Duration::from_secs(5));
135            std::process::exit(-1);
136        }
137    }
138
139    fn start_check_process(&self) {
140        let dur = std::time::Duration::from_secs(self.interval_in_secs);
141
142        let this = self.clone();
143        std::thread::spawn(move || loop {
144            std::thread::sleep(dur);
145            info!("process still alive {:?}, {}", std::thread::current().id(), cyfs_base::get_version());
146            this.check_task_alive();
147        });
148    }
149
150    fn start_check_task_system(&self) {
151        let dur = std::time::Duration::from_secs(self.interval_in_secs);
152        let this = self.clone();
153        async_std::task::spawn(async move {
154            loop {
155                this.update_task_alive();
156                async_std::task::sleep(dur).await;
157                info!(
158                    "process task system still alive {:?}",
159                    std::thread::current().id(),
160                );
161            }
162        });
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use cyfs_base::bucky_time_to_system_time;
169
170    use super::ProcessDeadHelper;
171    use std::sync::RwLock;
172
173    struct Test {
174        v: Option<u32>,
175    }
176
177    impl Test {
178        fn new() -> Self {
179            Self { v: None }
180        }
181
182        fn get(&self) -> Option<&u32> {
183            self.v.as_ref()
184        }
185
186        fn set(&mut self, v: u32) {
187            self.v = Some(v);
188        }
189    }
190
191    async fn dead_lock() {
192        let r: RwLock<Test> = RwLock::new(Test::new());
193
194        if let Some(v) = r.read().unwrap().get().cloned() {
195            println!("v={}", v);
196        } else {
197            println!("enter else");
198            r.write().unwrap().set(1);
199            println!("v={}", 1);
200        };
201    }
202
203    #[test]
204    fn test_time() {
205        let t = 13316567010962630;
206        let s = bucky_time_to_system_time(t);
207        println!("{:#?}", s);
208
209        let datetime = chrono::offset::Local::now();
210        // let time = datetime.format("%Y-%m-%d %H:%M:%S%.3f %:z");
211        println!("{:?}", datetime);
212
213        let datetime: chrono::DateTime<chrono::Local> = s.into();
214        let time_str = datetime.format("%Y-%m-%d %H:%M:%S%.3f %:z");
215        println!("{}", time_str);
216    }
217
218    #[test]
219    fn test_dead_lock() {
220        ProcessDeadHelper::instance().start_check();
221        ProcessDeadHelper::instance().enable_exit_on_task_system_dead(Some(1000 * 1000 * 2));
222
223        async_std::task::block_on(dead_lock());
224
225        // async_std::task::sleep(std::time::Duration::from_secs(60 * 5));
226    }
227
228    #[test]
229    fn test_safe_lock() {
230        let r: RwLock<Test> = RwLock::new(Test::new());
231
232        let v = r.read().unwrap().get().cloned();
233        if let Some(v) = v {
234            println!("v={}", v);
235        } else {
236            println!("enter else");
237            r.write().unwrap().set(1);
238            println!("v={}", 1);
239        };
240    }
241}