cuenv_core/tasks/
process_registry.rs1use std::collections::HashMap;
31use std::sync::{Arc, OnceLock};
32use std::time::Duration;
33use tokio::sync::Mutex;
34use tracing::{debug, info, warn};
35
36static GLOBAL_REGISTRY: OnceLock<Arc<ProcessRegistry>> = OnceLock::new();
38
39#[must_use]
44pub fn global_registry() -> Arc<ProcessRegistry> {
45 GLOBAL_REGISTRY
46 .get_or_init(|| Arc::new(ProcessRegistry::new()))
47 .clone()
48}
49
50pub struct ProcessRegistry {
55 pids: Mutex<HashMap<u32, String>>,
57}
58
59impl ProcessRegistry {
60 #[must_use]
62 pub fn new() -> Self {
63 Self {
64 pids: Mutex::new(HashMap::new()),
65 }
66 }
67
68 pub async fn register(&self, pid: u32, task_name: String) {
72 let mut pids = self.pids.lock().await;
73 debug!(pid, task = %task_name, "Registering process");
74 pids.insert(pid, task_name);
75 }
76
77 pub async fn unregister(&self, pid: u32) {
81 let mut pids = self.pids.lock().await;
82 if let Some(task_name) = pids.remove(&pid) {
83 debug!(pid, task = %task_name, "Unregistering process");
84 }
85 }
86
87 pub async fn count(&self) -> usize {
89 self.pids.lock().await.len()
90 }
91
92 pub async fn terminate_all(&self, timeout: Duration) {
102 let mut pids = self.pids.lock().await;
103
104 if pids.is_empty() {
105 return;
106 }
107
108 info!(count = pids.len(), "Terminating child processes");
109
110 for (pid, task_name) in pids.iter() {
112 debug!(pid, task = %task_name, "Sending SIGTERM");
113 Self::send_term_signal(*pid);
114 }
115
116 let deadline = std::time::Instant::now() + timeout;
118 while !pids.is_empty() && std::time::Instant::now() < deadline {
119 let mut exited = Vec::new();
121 for (pid, _) in pids.iter() {
122 if !Self::is_process_alive(*pid) {
123 exited.push(*pid);
124 }
125 }
126
127 for pid in exited {
129 if let Some(task_name) = pids.remove(&pid) {
130 debug!(pid, task = %task_name, "Process exited gracefully");
131 }
132 }
133
134 if !pids.is_empty() {
135 tokio::time::sleep(Duration::from_millis(100)).await;
137 }
138 }
139
140 for (pid, task_name) in pids.drain() {
142 warn!(pid, task = %task_name, "Force killing process after timeout");
143 Self::send_kill_signal(pid);
144 }
145 }
146
147 #[cfg(unix)]
149 fn send_term_signal(pid: u32) {
150 #[expect(unsafe_code, reason = "Required for POSIX signal handling")]
154 unsafe {
155 libc::kill(-(pid as i32), libc::SIGTERM);
157 }
158 }
159
160 #[cfg(unix)]
162 fn send_kill_signal(pid: u32) {
163 #[expect(unsafe_code, reason = "Required for POSIX signal handling")]
167 unsafe {
168 libc::kill(-(pid as i32), libc::SIGKILL);
170 }
171 }
172
173 #[cfg(unix)]
175 fn is_process_alive(pid: u32) -> bool {
176 #[expect(unsafe_code, reason = "Required for POSIX process existence check")]
179 unsafe {
180 libc::kill(pid as i32, 0) == 0
181 }
182 }
183
184 #[cfg(windows)]
186 fn send_term_signal(pid: u32) {
187 use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, Signal, System};
188
189 let mut system = System::new();
190 let process_pid = Pid::from(pid as usize);
191 system.refresh_processes_specifics(
192 ProcessesToUpdate::Some(&[process_pid]),
193 false,
194 ProcessRefreshKind::nothing(),
195 );
196
197 if let Some(process) = system.process(process_pid) {
198 let _ = process.kill_with(Signal::Term);
199 }
200 }
201
202 #[cfg(windows)]
204 fn send_kill_signal(pid: u32) {
205 use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, Signal, System};
206
207 let mut system = System::new();
208 let process_pid = Pid::from(pid as usize);
209 system.refresh_processes_specifics(
210 ProcessesToUpdate::Some(&[process_pid]),
211 false,
212 ProcessRefreshKind::nothing(),
213 );
214
215 if let Some(process) = system.process(process_pid) {
216 let _ = process.kill_with(Signal::Kill);
217 }
218 }
219
220 #[cfg(windows)]
222 fn is_process_alive(pid: u32) -> bool {
223 use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, System};
224
225 let mut system = System::new();
226 let process_pid = Pid::from(pid as usize);
227 system.refresh_processes_specifics(
228 ProcessesToUpdate::Some(&[process_pid]),
229 false,
230 ProcessRefreshKind::nothing(),
231 );
232
233 system.process(process_pid).is_some()
234 }
235}
236
237impl Default for ProcessRegistry {
238 fn default() -> Self {
239 Self::new()
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246
247 #[tokio::test]
248 async fn test_registry_new() {
249 let registry = ProcessRegistry::new();
250 assert_eq!(registry.count().await, 0);
251 }
252
253 #[tokio::test]
254 async fn test_register_unregister() {
255 let registry = ProcessRegistry::new();
256
257 registry.register(1234, "test_task".to_string()).await;
258 assert_eq!(registry.count().await, 1);
259
260 registry.unregister(1234).await;
261 assert_eq!(registry.count().await, 0);
262 }
263
264 #[tokio::test]
265 async fn test_unregister_nonexistent() {
266 let registry = ProcessRegistry::new();
267
268 registry.unregister(9999).await;
270 assert_eq!(registry.count().await, 0);
271 }
272
273 #[tokio::test]
274 async fn test_terminate_empty() {
275 let registry = ProcessRegistry::new();
276
277 registry.terminate_all(Duration::from_secs(1)).await;
279 assert_eq!(registry.count().await, 0);
280 }
281
282 #[tokio::test]
283 async fn test_global_registry_singleton() {
284 let r1 = global_registry();
285 let r2 = global_registry();
286
287 assert!(Arc::ptr_eq(&r1, &r2));
289 }
290}