1use anyhow::Result;
2use libc::{c_int, waitpid, WEXITSTATUS, WIFEXITED};
3use log::warn;
4use nix::unistd::Pid;
5use std::thread::sleep;
6use std::time::Duration;
7use std::{
8 ptr::addr_of_mut,
9 sync::{
10 atomic::{AtomicBool, Ordering},
11 Arc,
12 },
13 thread::{self, JoinHandle},
14};
15use tokio::sync::broadcast::{channel, Receiver, Sender};
16
17const CHILD_WAIT_QUEUE_LEN: usize = 10;
18
19#[derive(Clone, Copy, Debug)]
20pub struct ChildEvent {
21 pub pid: Pid,
22 pub status: c_int,
23}
24
25#[derive(Clone)]
26pub struct ChildWait {
27 sender: Sender<ChildEvent>,
28 signal: Arc<AtomicBool>,
29 _task: Arc<JoinHandle<()>>,
30}
31
32impl ChildWait {
33 pub fn new() -> Result<(ChildWait, Receiver<ChildEvent>)> {
34 let (sender, receiver) = channel(CHILD_WAIT_QUEUE_LEN);
35 let signal = Arc::new(AtomicBool::new(false));
36 let mut processor = ChildWaitTask {
37 sender: sender.clone(),
38 signal: signal.clone(),
39 };
40 let task = thread::spawn(move || {
41 if let Err(error) = processor.process() {
42 warn!("failed to process child updates: {}", error);
43 }
44 });
45 Ok((
46 ChildWait {
47 sender,
48 signal,
49 _task: Arc::new(task),
50 },
51 receiver,
52 ))
53 }
54
55 pub async fn subscribe(&self) -> Result<Receiver<ChildEvent>> {
56 Ok(self.sender.subscribe())
57 }
58}
59
60struct ChildWaitTask {
61 sender: Sender<ChildEvent>,
62 signal: Arc<AtomicBool>,
63}
64
65impl ChildWaitTask {
66 fn process(&mut self) -> Result<()> {
67 loop {
68 let mut status: c_int = 0;
69 let pid = unsafe { waitpid(-1, addr_of_mut!(status), 0) };
70 if pid == -1 {
74 sleep(Duration::from_micros(100));
75 continue;
76 }
77 if WIFEXITED(status) {
78 let event = ChildEvent {
79 pid: Pid::from_raw(pid),
80 status: WEXITSTATUS(status),
81 };
82 let _ = self.sender.send(event);
83
84 if self.signal.load(Ordering::Acquire) {
85 return Ok(());
86 }
87 }
88 }
89 }
90}
91
92impl Drop for ChildWait {
93 fn drop(&mut self) {
94 if Arc::strong_count(&self.signal) <= 1 {
95 self.signal.store(true, Ordering::Release);
96 }
97 }
98}