Skip to main content

vorarbeiter/
lib.rs

1//! A supervisor for processes that allow clean shutdowns.
2//!
3//! See [`Supervisor`] for the core functionality. Real applications will likely want to use
4//! [`setup_term_flag`] as well.
5//!
6//! # Example
7//!
8//! ```no_run
9//! use std::process;
10//!
11//! // The default kill timeout is 10 seconds, which is fine here.
12//! let mut supervisor = vorarbeiter::Supervisor::default();
13//!
14//! // Spawns three new child processes and adds them to the supervisor.
15//! for _ in 0..3 {
16//!     let child = process::Command::new("my-subcommand").spawn().unwrap();
17//!     supervisor.add_child(child);
18//! }
19//!
20//! // Terminate all child processes, waiting for each to be completed or killed.
21//! drop(supervisor);
22//! ```
23
24use nix::sys::signal;
25use nix::unistd;
26use std::{io, process, sync, thread, time};
27
28/// A supervisor for child processes.
29///
30/// Supports default, which will result in a `kill_timeout` of 10 seconds.
31///
32/// When the supervisor is dropped, it will kill all of its owned child processes using
33/// [`shutdown_process`] in the reverse order they were added, ignoring any errors.
34#[derive(Debug)]
35pub struct Supervisor {
36    /// Supervised child processes.
37    children: Vec<process::Child>,
38    /// How long to wait before sending SIGKILL after SIGTERM.
39    kill_timeout: time::Duration,
40    /// Time between checks if process has terminated.
41    poll_interval: time::Duration,
42}
43
44impl Supervisor {
45    /// Adds a child process to the supervisor.
46    pub fn add_child(&mut self, child: process::Child) {
47        self.children.push(child)
48    }
49}
50
51impl Drop for Supervisor {
52    fn drop(&mut self) {
53        for child in self.children.iter_mut().rev() {
54            let _ = shutdown_process(child, self.kill_timeout, self.poll_interval);
55        }
56    }
57}
58
59impl Supervisor {
60    /// Create a new supervisor with the given kill timeout.
61    pub fn new(kill_timeout: time::Duration) -> Self {
62        Supervisor {
63            children: Vec::new(),
64            kill_timeout,
65            poll_interval: time::Duration::from_millis(100),
66        }
67    }
68}
69
70impl Default for Supervisor {
71    fn default() -> Self {
72        Supervisor::new(time::Duration::from_secs(10))
73    }
74}
75
76/// Shuts down a process using SIGTERM, sending SIGKILL after `timeout`.
77///
78/// First sends a `SIGTERM` to the child process and polls it for completion every `poll_interval`.
79/// If the process does not finish within `kill_timeout`, sends a `SIGKILL`.
80pub fn shutdown_process(
81    child: &mut process::Child,
82    kill_timeout: time::Duration,
83    poll_interval: time::Duration,
84) -> io::Result<process::ExitStatus> {
85    let start = time::Instant::now();
86    let pid = unistd::Pid::from_raw(child.id() as i32);
87
88    // Ask nicely via sigterm first.
89    signal::kill(pid, signal::Signal::SIGTERM)
90        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
91    while time::Instant::now() - start < kill_timeout {
92        if let Some(exit_status) = child.try_wait()? {
93            return Ok(exit_status);
94        }
95
96        thread::sleep(poll_interval);
97    }
98
99    // If that fails, kill with SIGKILL.
100    signal::kill(pid, signal::Signal::SIGKILL)
101        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
102    Ok(child.wait()?)
103}
104
105/// Sets up a termination flag.
106///
107/// A pure convenience function, creates an atomic boolean that is initially false, but will be set
108/// to `true` should the process receive a `SIGINT`, `SIGTERM` or `SIGQUIT`. This works around the
109/// issue that receiving any of these signals would by default not result in any `Drop`
110/// implementations to be called.
111///
112/// # Example
113///
114/// ```rust
115/// # use std::sync;
116/// let term = vorarbeiter::setup_term_flag().unwrap();
117///
118/// while !term.load(sync::atomic::Ordering::Relaxed) {
119/// # break;
120/// // Main loop code here.
121/// }
122/// ```
123pub fn setup_term_flag() -> Result<sync::Arc<sync::atomic::AtomicBool>, io::Error> {
124    let term = sync::Arc::new(sync::atomic::AtomicBool::new(false));
125
126    // Ensure that all signals call exit, we need to execute `Drop` properly.
127    for &signal in &[
128        signal_hook::SIGINT,
129        signal_hook::SIGTERM,
130        signal_hook::SIGQUIT,
131    ] {
132        signal_hook::flag::register(signal, term.clone())?;
133    }
134
135    Ok(term)
136}