Skip to main content

kratazone/
childwait.rs

1use anyhow::Result;
2use libc::{c_int, waitpid, WEXITSTATUS, WIFEXITED};
3use log::warn;
4use nix::unistd::Pid;
5use std::thread::sleep;
6use std::time::Duration;
7use std::{
8    ptr::addr_of_mut,
9    sync::{
10        atomic::{AtomicBool, Ordering},
11        Arc,
12    },
13    thread::{self, JoinHandle},
14};
15use tokio::sync::broadcast::{channel, Receiver, Sender};
16
17const CHILD_WAIT_QUEUE_LEN: usize = 10;
18
19#[derive(Clone, Copy, Debug)]
20pub struct ChildEvent {
21    pub pid: Pid,
22    pub status: c_int,
23}
24
25#[derive(Clone)]
26pub struct ChildWait {
27    sender: Sender<ChildEvent>,
28    signal: Arc<AtomicBool>,
29    _task: Arc<JoinHandle<()>>,
30}
31
32impl ChildWait {
33    pub fn new() -> Result<(ChildWait, Receiver<ChildEvent>)> {
34        let (sender, receiver) = channel(CHILD_WAIT_QUEUE_LEN);
35        let signal = Arc::new(AtomicBool::new(false));
36        let mut processor = ChildWaitTask {
37            sender: sender.clone(),
38            signal: signal.clone(),
39        };
40        let task = thread::spawn(move || {
41            if let Err(error) = processor.process() {
42                warn!("failed to process child updates: {}", error);
43            }
44        });
45        Ok((
46            ChildWait {
47                sender,
48                signal,
49                _task: Arc::new(task),
50            },
51            receiver,
52        ))
53    }
54
55    pub async fn subscribe(&self) -> Result<Receiver<ChildEvent>> {
56        Ok(self.sender.subscribe())
57    }
58}
59
60struct ChildWaitTask {
61    sender: Sender<ChildEvent>,
62    signal: Arc<AtomicBool>,
63}
64
65impl ChildWaitTask {
66    fn process(&mut self) -> Result<()> {
67        loop {
68            let mut status: c_int = 0;
69            let pid = unsafe { waitpid(-1, addr_of_mut!(status), 0) };
70            // pid being -1 indicates an error occurred, wait 100 microseconds to avoid
71            // overloading the channel. Right now we don't consider any other errors
72            // but that is fine for now, as waitpid shouldn't ever stop anyway.
73            if pid == -1 {
74                sleep(Duration::from_micros(100));
75                continue;
76            }
77            if WIFEXITED(status) {
78                let event = ChildEvent {
79                    pid: Pid::from_raw(pid),
80                    status: WEXITSTATUS(status),
81                };
82                let _ = self.sender.send(event);
83
84                if self.signal.load(Ordering::Acquire) {
85                    return Ok(());
86                }
87            }
88        }
89    }
90}
91
92impl Drop for ChildWait {
93    fn drop(&mut self) {
94        if Arc::strong_count(&self.signal) <= 1 {
95            self.signal.store(true, Ordering::Release);
96        }
97    }
98}