1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
//! A supervisor for processes that allow clean shutdowns.
//!
//! See [`Supervisor`] for the core functionality. Real applications will likely want to use
//! [`setup_term_flag`] as well.
//!
//! # Example
//!
//! ```no_run
//! use std::process;
//!
//! // The default kill timeout is 10 seconds, which is fine here.
//! let mut supervisor = vorarbeiter::Supervisor::default();
//!
//! // Spawns three new child processes and adds them to the supervisor.
//! for _ in 0..3 {
//!     let child = process::Command::new("my-subcommand").spawn().unwrap();
//!     supervisor.add_child(child);
//! }
//!
//! // Terminate all child processes, waiting for each to be completed or killed.
//! drop(supervisor);
//! ```

use nix::sys::signal;
use nix::unistd;
use std::{io, process, sync, thread, time};

/// A supervisor for child processes.
///
/// Supports default, which will result in a `kill_timeout` of 10 seconds.
///
/// When the supervisor is dropped, it will kill all of its owned child processes using
/// [`shutdown_process`] in the reverse order they were added, ignoring any errors.
#[derive(Debug)]
pub struct Supervisor {
    /// Supervised child processes.
    children: Vec<process::Child>,
    /// How long to wait before sending SIGKILL after SIGTERM.
    kill_timeout: time::Duration,
    /// Time between checks if process has terminated.
    poll_interval: time::Duration,
}

impl Supervisor {
    /// Adds a child process to the supervisor.
    pub fn add_child(&mut self, child: process::Child) {
        self.children.push(child)
    }
}

impl Drop for Supervisor {
    fn drop(&mut self) {
        for child in self.children.iter_mut().rev() {
            let _ = shutdown_process(child, self.kill_timeout, self.poll_interval);
        }
    }
}

impl Supervisor {
    /// Create a new supervisor with the given kill timeout.
    pub fn new(kill_timeout: time::Duration) -> Self {
        Supervisor {
            children: Vec::new(),
            kill_timeout,
            poll_interval: time::Duration::from_millis(100),
        }
    }
}

impl Default for Supervisor {
    fn default() -> Self {
        Supervisor::new(time::Duration::from_secs(10))
    }
}

/// Shuts down a process using SIGTERM, sending SIGKILL after `timeout`.
///
/// First sends a `SIGTERM` to the child process and polls it for completion every `poll_interval`.
/// If the process does not finish within `kill_timeout`, sends a `SIGKILL`.
pub fn shutdown_process(
    child: &mut process::Child,
    kill_timeout: time::Duration,
    poll_interval: time::Duration,
) -> io::Result<process::ExitStatus> {
    let start = time::Instant::now();
    let pid = unistd::Pid::from_raw(child.id() as i32);

    // Ask nicely via sigterm first.
    signal::kill(pid, signal::Signal::SIGTERM)
        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
    while time::Instant::now() - start < kill_timeout {
        if let Some(exit_status) = child.try_wait()? {
            return Ok(exit_status);
        }

        thread::sleep(poll_interval);
    }

    // If that fails, kill with SIGKILL.
    signal::kill(pid, signal::Signal::SIGKILL)
        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
    Ok(child.wait()?)
}

/// Sets up a termination flag.
///
/// A pure convenience function, creates an atomic boolean that is initially false, but will be set
/// to `true` should the process receive a `SIGINT`, `SIGTERM` or `SIGQUIT`. This works around the
/// issue that receiving any of these signals would by default not result in any `Drop`
/// implementations to be called.
///
/// # Example
///
/// ```rust
/// # use std::sync;
/// let term = vorarbeiter::setup_term_flag().unwrap();
///
/// while !term.load(sync::atomic::Ordering::Relaxed) {
/// # break;
/// // Main loop code here.
/// }
/// ```
pub fn setup_term_flag() -> Result<sync::Arc<sync::atomic::AtomicBool>, io::Error> {
    let term = sync::Arc::new(sync::atomic::AtomicBool::new(false));

    // Ensure that all signals call exit, we need to execute `Drop` properly.
    for &signal in &[
        signal_hook::SIGINT,
        signal_hook::SIGTERM,
        signal_hook::SIGQUIT,
    ] {
        signal_hook::flag::register(signal, term.clone())?;
    }

    Ok(term)
}