vorarbeiter/lib.rs
1//! A supervisor for processes that allow clean shutdowns.
2//!
3//! See [`Supervisor`] for the core functionality. Real applications will likely want to use
4//! [`setup_term_flag`] as well.
5//!
6//! # Example
7//!
8//! ```no_run
9//! use std::process;
10//!
11//! // The default kill timeout is 10 seconds, which is fine here.
12//! let mut supervisor = vorarbeiter::Supervisor::default();
13//!
14//! // Spawns three new child processes and adds them to the supervisor.
15//! for _ in 0..3 {
16//! let child = process::Command::new("my-subcommand").spawn().unwrap();
17//! supervisor.add_child(child);
18//! }
19//!
20//! // Terminate all child processes, waiting for each to be completed or killed.
21//! drop(supervisor);
22//! ```
23
24use nix::sys::signal;
25use nix::unistd;
26use std::{io, process, sync, thread, time};
27
28/// A supervisor for child processes.
29///
30/// Supports default, which will result in a `kill_timeout` of 10 seconds.
31///
32/// When the supervisor is dropped, it will kill all of its owned child processes using
33/// [`shutdown_process`] in the reverse order they were added, ignoring any errors.
34#[derive(Debug)]
35pub struct Supervisor {
36 /// Supervised child processes.
37 children: Vec<process::Child>,
38 /// How long to wait before sending SIGKILL after SIGTERM.
39 kill_timeout: time::Duration,
40 /// Time between checks if process has terminated.
41 poll_interval: time::Duration,
42}
43
44impl Supervisor {
45 /// Adds a child process to the supervisor.
46 pub fn add_child(&mut self, child: process::Child) {
47 self.children.push(child)
48 }
49}
50
51impl Drop for Supervisor {
52 fn drop(&mut self) {
53 for child in self.children.iter_mut().rev() {
54 let _ = shutdown_process(child, self.kill_timeout, self.poll_interval);
55 }
56 }
57}
58
59impl Supervisor {
60 /// Create a new supervisor with the given kill timeout.
61 pub fn new(kill_timeout: time::Duration) -> Self {
62 Supervisor {
63 children: Vec::new(),
64 kill_timeout,
65 poll_interval: time::Duration::from_millis(100),
66 }
67 }
68}
69
70impl Default for Supervisor {
71 fn default() -> Self {
72 Supervisor::new(time::Duration::from_secs(10))
73 }
74}
75
76/// Shuts down a process using SIGTERM, sending SIGKILL after `timeout`.
77///
78/// First sends a `SIGTERM` to the child process and polls it for completion every `poll_interval`.
79/// If the process does not finish within `kill_timeout`, sends a `SIGKILL`.
80pub fn shutdown_process(
81 child: &mut process::Child,
82 kill_timeout: time::Duration,
83 poll_interval: time::Duration,
84) -> io::Result<process::ExitStatus> {
85 let start = time::Instant::now();
86 let pid = unistd::Pid::from_raw(child.id() as i32);
87
88 // Ask nicely via sigterm first.
89 signal::kill(pid, signal::Signal::SIGTERM)
90 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
91 while time::Instant::now() - start < kill_timeout {
92 if let Some(exit_status) = child.try_wait()? {
93 return Ok(exit_status);
94 }
95
96 thread::sleep(poll_interval);
97 }
98
99 // If that fails, kill with SIGKILL.
100 signal::kill(pid, signal::Signal::SIGKILL)
101 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
102 Ok(child.wait()?)
103}
104
105/// Sets up a termination flag.
106///
107/// A pure convenience function, creates an atomic boolean that is initially false, but will be set
108/// to `true` should the process receive a `SIGINT`, `SIGTERM` or `SIGQUIT`. This works around the
109/// issue that receiving any of these signals would by default not result in any `Drop`
110/// implementations to be called.
111///
112/// # Example
113///
114/// ```rust
115/// # use std::sync;
116/// let term = vorarbeiter::setup_term_flag().unwrap();
117///
118/// while !term.load(sync::atomic::Ordering::Relaxed) {
119/// # break;
120/// // Main loop code here.
121/// }
122/// ```
123pub fn setup_term_flag() -> Result<sync::Arc<sync::atomic::AtomicBool>, io::Error> {
124 let term = sync::Arc::new(sync::atomic::AtomicBool::new(false));
125
126 // Ensure that all signals call exit, we need to execute `Drop` properly.
127 for &signal in &[
128 signal_hook::SIGINT,
129 signal_hook::SIGTERM,
130 signal_hook::SIGQUIT,
131 ] {
132 signal_hook::flag::register(signal, term.clone())?;
133 }
134
135 Ok(term)
136}