1use crate::DebugConfig;
2use cyfs_base::*;
3
4use once_cell::sync::OnceCell;
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6use std::sync::Arc;
7
8
9#[derive(Clone)]
10pub struct ProcessDeadHelper {
11 interval_in_secs: u64,
12
13 task_system_last_active: Arc<AtomicU64>,
14
15 exit_on_task_system_dead: Arc<AtomicU64>,
16
17 exit_on_dead: bool,
18}
19
20impl ProcessDeadHelper {
21 fn new(interval_in_secs: u64) -> Self {
22 let exit_on_dead = match get_channel() {
23 CyfsChannel::Nightly => false,
24 _ => true,
25 };
26
27 let mut ret = Self {
28 interval_in_secs,
29 task_system_last_active: Arc::new(AtomicU64::new(bucky_time_now())),
30 exit_on_task_system_dead: Arc::new(AtomicU64::new(0)),
31 exit_on_dead,
32 };
33
34 ret.load_config();
35 ret
36 }
37
38 fn load_config(&mut self) {
39 if let Some(config_node) = DebugConfig::get_config("check") {
40 if let Err(e) = self.load_config_value(config_node) {
41 println!("load process dead check config error! {}", e);
42 }
43 }
44 }
45
46 fn load_config_value(&mut self, config_node: &toml::Value) -> BuckyResult<()> {
47 let node = config_node.as_table().ok_or_else(|| {
48 let msg = format!("invalid debug config format! content={}", config_node,);
49 error!("{}", msg);
50
51 BuckyError::new(BuckyErrorCode::InvalidFormat, msg)
52 })?;
53
54 for (k, v) in node {
55 match k.as_str() {
56 "exit_on_dead" => {
57 if let Some(v) = v.as_bool() {
58 println!("load check.exit_on_dead from config: {}, current={}", v, self.exit_on_dead);
59 self.exit_on_dead = v;
60 } else {
61 println!("unknown exit_on_dead config node: {:?}", v);
62 }
63 }
64
65 key @ _ => {
66 println!("unknown check config node: {}={:?}", key, v);
67 }
68 }
69 }
70
71 Ok(())
72 }
73
74 pub fn patch_task_min_thread() {
75 let cpu_nums = num_cpus::get();
76 if cpu_nums <= 1 {
77 const KEY: &str = "ASYNC_STD_THREAD_COUNT";
78 if std::env::var(KEY).is_err() {
79 std::env::set_var(KEY, "2");
80 }
81 }
82 }
83
84 pub fn instance() -> &'static Self {
85 static INSTANCE: OnceCell<ProcessDeadHelper> = OnceCell::new();
86 INSTANCE.get_or_init(|| Self::new(60))
87 }
88
89 pub fn start_check(&self) {
90 static INIT_DONE: AtomicBool = AtomicBool::new(false);
91 if !INIT_DONE.swap(true, Ordering::SeqCst) {
92 self.start_check_process();
93 self.start_check_task_system();
94 }
95 }
96
97 pub fn enable_exit_on_task_system_dead(&self, timeout_in_secs: Option<u64>) {
98 let v = timeout_in_secs.unwrap_or(60 * 5) * 1000 * 1000;
99 self.exit_on_task_system_dead.store(v, Ordering::SeqCst);
100 if v > 0 {
101 info!("enable exit on task system dead: timeout={}", v);
102 self.start_check();
103 } else {
104 info!("disable exit on task system dead");
105 }
106 }
107
108 fn update_task_alive(&self) {
109 let now = bucky_time_now();
110 self.task_system_last_active.store(now, Ordering::SeqCst);
111 }
112
113 fn check_task_alive(&self) {
114 let exit_timeout = self.exit_on_task_system_dead.load(Ordering::SeqCst);
115 if exit_timeout == 0 || !self.exit_on_dead {
116 return;
117 }
118
119 let now = bucky_time_now();
120 let last_active = self.task_system_last_active.load(Ordering::SeqCst);
121 if now >= last_active && now - last_active >= exit_timeout {
122 error!(
123 "task system dead timeout, now will exit process! last_active={}, exit_timeout={}s",
124 last_active,
125 exit_timeout / (1000 * 1000)
126 );
127 println!("process will exit on task system dead...");
128
129 let ins = crate::dump::DumpHelper::get_instance();
130 if ins.is_enable_dump() {
131 ins.dump();
132 }
133
134 std::thread::sleep(std::time::Duration::from_secs(5));
135 std::process::exit(-1);
136 }
137 }
138
139 fn start_check_process(&self) {
140 let dur = std::time::Duration::from_secs(self.interval_in_secs);
141
142 let this = self.clone();
143 std::thread::spawn(move || loop {
144 std::thread::sleep(dur);
145 info!("process still alive {:?}, {}", std::thread::current().id(), cyfs_base::get_version());
146 this.check_task_alive();
147 });
148 }
149
150 fn start_check_task_system(&self) {
151 let dur = std::time::Duration::from_secs(self.interval_in_secs);
152 let this = self.clone();
153 async_std::task::spawn(async move {
154 loop {
155 this.update_task_alive();
156 async_std::task::sleep(dur).await;
157 info!(
158 "process task system still alive {:?}",
159 std::thread::current().id(),
160 );
161 }
162 });
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use cyfs_base::bucky_time_to_system_time;
169
170 use super::ProcessDeadHelper;
171 use std::sync::RwLock;
172
173 struct Test {
174 v: Option<u32>,
175 }
176
177 impl Test {
178 fn new() -> Self {
179 Self { v: None }
180 }
181
182 fn get(&self) -> Option<&u32> {
183 self.v.as_ref()
184 }
185
186 fn set(&mut self, v: u32) {
187 self.v = Some(v);
188 }
189 }
190
191 async fn dead_lock() {
192 let r: RwLock<Test> = RwLock::new(Test::new());
193
194 if let Some(v) = r.read().unwrap().get().cloned() {
195 println!("v={}", v);
196 } else {
197 println!("enter else");
198 r.write().unwrap().set(1);
199 println!("v={}", 1);
200 };
201 }
202
203 #[test]
204 fn test_time() {
205 let t = 13316567010962630;
206 let s = bucky_time_to_system_time(t);
207 println!("{:#?}", s);
208
209 let datetime = chrono::offset::Local::now();
210 println!("{:?}", datetime);
212
213 let datetime: chrono::DateTime<chrono::Local> = s.into();
214 let time_str = datetime.format("%Y-%m-%d %H:%M:%S%.3f %:z");
215 println!("{}", time_str);
216 }
217
218 #[test]
219 fn test_dead_lock() {
220 ProcessDeadHelper::instance().start_check();
221 ProcessDeadHelper::instance().enable_exit_on_task_system_dead(Some(1000 * 1000 * 2));
222
223 async_std::task::block_on(dead_lock());
224
225 }
227
228 #[test]
229 fn test_safe_lock() {
230 let r: RwLock<Test> = RwLock::new(Test::new());
231
232 let v = r.read().unwrap().get().cloned();
233 if let Some(v) = v {
234 println!("v={}", v);
235 } else {
236 println!("enter else");
237 r.write().unwrap().set(1);
238 println!("v={}", 1);
239 };
240 }
241}