snare 0.4.5

GitHub webhooks runner daemon
//! The job runner. This is a single thread which runs multiple commands as child processes. There
//! are two types of commands: "normal" and "error" commands. Error commands are only executed if a
//! normal command fails. For normal commands, we track stderr/stdout and exit status; for error
//! commands we track only exit status.

#![allow(clippy::cognitive_complexity)]

use std::{
    collections::HashMap,
    convert::TryInto,
    env,
    error::Error,
    fs::{self, remove_file},
    io::{Read, Write},
    os::unix::io::{AsRawFd, RawFd},
    path::PathBuf,
    process::{self, Child, Command},
    sync::Arc,
    thread,
    time::{Duration, Instant},
};

use libc::c_int;
use nix::{
    fcntl::{fcntl, FcntlArg, OFlag},
    poll::{poll, PollFd, PollFlags},
    sys::signal::{kill, Signal},
    unistd::Pid,
};
use tempfile::{tempdir, NamedTempFile, TempDir};

use crate::{config::RepoConfig, queue::QueueJob, Snare};

/// The size of the temporary read buffer in bytes. Should be >= PIPE_BUF for performance reasons.
const READBUF: usize = 8 * 1024;
/// Maximum time to wait in `poll` (in seconds) while waiting for child processes to terminate
/// and/or because there are jobs on the queue that we haven't been able to run yet.
const WAIT_TIMEOUT: i32 = 1;

struct JobRunner {
    snare: Arc<Snare>,
    /// The shell used to run jobs.
    shell: String,
    /// The maximum number of jobs we will run at any one point. Note that this may not necessarily
    /// be the same value as snare.conf.maxjobs.
    maxjobs: usize,
    /// The running jobs (`num_running` of which will be `Some`, the remainder `None`).
    running: Vec<Option<Job>>,
    /// How many `Some` entries are there in `self.running`?
    num_running: usize,
    /// The running jobs, with `0..2 *num_running + 1` entries. Each pair of entries are (stderr,
    /// stdout) for the corresponding `running` `Job` (i.e. `running[0]` has its stderr entry at
    /// `pollfds[0]` and stdout entry at `pollfds[1]`). The `+ 1` entry is the event file
    /// descriptor that allows the HTTP server thread to wake up the JobRunner thread.
    pollfds: Vec<PollFd>,
}

impl JobRunner {
    fn new(snare: Arc<Snare>) -> Result<Self, Box<dyn Error>> {
        let shell = env::var("SHELL")?;
        let maxjobs = snare.conf.lock().unwrap().maxjobs;
        assert!(maxjobs <= (std::usize::MAX - 1) / 2);
        let mut running = Vec::with_capacity(maxjobs);
        running.resize_with(maxjobs, || None);
        let mut pollfds = Vec::with_capacity(maxjobs * 2 + 1);
        pollfds.resize_with(maxjobs * 2 + 1, || PollFd::new(-1, PollFlags::empty()));
        Ok(JobRunner {
            snare,
            shell,
            maxjobs,
            running,
            num_running: 0,
            pollfds,
        })
    }

    /// Listen for new jobs on the queue and then run them.
    fn attend(&mut self) {
        self.update_pollfds();
        // `check_queue` serves two subtly different purposes:
        //   * Has the event pipe told us there are new jobs in the queue?
        //   * Are there jobs in the queue from a previous round that we couldn't run yet?
        let mut check_queue = false;
        // A scratch buffer used to read from files.
        let mut buf = Box::new([0; READBUF]);
        // The earliest finish_by time of any running process (i.e. the process that will timeout
        // the soonest).
        let mut next_finish_by: Option<Instant> = None;
        loop {
            // If there are jobs on the queue we haven't been able to run for temporary reasons,
            // then wait a short amount of time and try again.
            let mut timeout = if check_queue { WAIT_TIMEOUT * 1000 } else { -1 };
            // If any processes will exceed their timeout then, if that's shorter than the above
            // timeout, only wait for enough time to pass before we need to send them SIGTERM.
            if let Some(fby) = next_finish_by {
                let fby_timeout = fby.saturating_duration_since(Instant::now());
                if timeout == -1
                    || fby_timeout < Duration::from_millis(timeout.try_into().unwrap_or(0))
                {
                    timeout = fby_timeout
                        .as_millis()
                        .try_into()
                        .unwrap_or(c_int::max_value());
                }
            }
            poll(&mut self.pollfds, timeout).ok();

            self.check_for_sighup();

            // See if any of our active jobs have events. Knowing when a pipe is actually closed is
            // surprisingly hard. https://www.greenend.org.uk/rjk/tech/poll.html has an interesting
            // suggestion which we adapt slightly here.
            //
            // This `for` loop has various unwrap() calls. If `flags[i * 2]` or `flags[i * 2 + 1]`
            // is `Some(_)`, then `self.running[i]` is `Some(_)`, so the
            // `self.running.as_mut.unwrap()`s are safe. Since we asked for stderr/stdout to be
            // captured, `std[err|out].as_mut().unwrap()` should also be safe (though the Rust docs
            // are a little vague on this).
            for i in 0..self.maxjobs {
                // stderr
                if let Some(flags) = self.pollfds[i * 2].revents() {
                    if flags.contains(PollFlags::POLLIN) {
                        if let Ok(j) = self.running[i]
                            .as_mut()
                            .unwrap()
                            .child
                            .stderr
                            .as_mut()
                            .unwrap()
                            .read(&mut *buf)
                        {
                            self.running[i]
                                .as_mut()
                                .unwrap()
                                .stderrout
                                .as_file_mut()
                                .write_all(&buf[0..j])
                                .ok();
                        }
                    }
                    if flags.contains(PollFlags::POLLHUP) {
                        self.running[i].as_mut().unwrap().stderr_hup = true;
                        self.update_pollfds();
                    }
                }
                // stdout
                if let Some(flags) = self.pollfds[i * 2 + 1].revents() {
                    if flags.contains(PollFlags::POLLIN) {
                        if let Ok(j) = self.running[i]
                            .as_mut()
                            .unwrap()
                            .child
                            .stdout
                            .as_mut()
                            .unwrap()
                            .read(&mut *buf)
                        {
                            self.running[i]
                                .as_mut()
                                .unwrap()
                                .stderrout
                                .as_file_mut()
                                .write_all(&buf[0..j])
                                .ok();
                        }
                    }
                    if flags.contains(PollFlags::POLLHUP) {
                        self.running[i].as_mut().unwrap().stdout_hup = true;
                        self.update_pollfds();
                    }
                }
            }

            // Iterate over the running jobs and:
            //   * If any jobs have exceeded their timeout, send them SIGTERM.
            //   * If there are jobs whose stderr/stdout have closed, keep waiting on them until
            //     they exit.
            next_finish_by = None;
            for i in 0..self.running.len() {
                if let Some(Job {
                    finish_by,
                    ref child,
                    ..
                }) = self.running[i]
                {
                    if finish_by <= Instant::now() {
                        kill(Pid::from_raw(child.id() as i32), Signal::SIGTERM).ok();
                    } else if next_finish_by.is_none() || Some(finish_by) < next_finish_by {
                        next_finish_by = Some(finish_by);
                    }
                }

                if let Some(Job {
                    stderr_hup: true,
                    stdout_hup: true,
                    ..
                }) = self.running[i]
                {
                    // In the below, we know from the `let Some(_)` that `self.running[i]` is
                    // `Some(_)` and the unwrap thus safe.
                    let mut exited = false;
                    let mut exited_success = false;
                    match self.running[i].as_mut().unwrap().child.try_wait() {
                        Ok(Some(status)) => {
                            exited = true;
                            exited_success = status.success();
                        }
                        Err(_) => {
                            exited = true;
                            exited_success = false;
                        }
                        Ok(None) => (),
                    }
                    if exited {
                        if !exited_success {
                            let job = &self.running[i].as_ref().unwrap();
                            if job.is_errorcmd {
                                self.snare.error(&format!(
                                    "errorcmd exited unsuccessfully: {}",
                                    job.rconf.errorcmd.as_ref().unwrap()
                                ));
                            } else if let Some(errorchild) = self.run_errorcmd(job) {
                                let mut job = &mut self.running[i].as_mut().unwrap();
                                job.child = errorchild;
                                job.is_errorcmd = true;
                                continue;
                            }
                        }
                        remove_file(&self.running[i].as_ref().unwrap().json_path).ok();
                        self.running[i] = None;
                        self.num_running -= 1;
                        self.update_pollfds();
                    }
                }
            }

            // Has the HTTP server told us that we should check for new jobs and/or SIGCHLD/SIGHUP
            // has been received?
            match self.pollfds[self.maxjobs * 2].revents() {
                Some(flags) if flags == PollFlags::POLLIN => {
                    check_queue = true;
                    // It's fine for us to drain the event pipe completely: we'll process all the
                    // events it contains.
                    loop {
                        match nix::unistd::read(self.snare.event_read_fd, &mut *buf) {
                            Ok(0) | Err(_) => break,
                            Ok(_) => (),
                        }
                    }
                }
                _ => (),
            }

            // Should we check the queue? This could be because we were previously unable to empty
            // it fully, or because the HTTP server has told us that there might be new jobs.
            // However, it's only worth us checking the queue (which requires a lock) if there's
            // space for us to run further jobs.
            if check_queue && self.num_running < self.maxjobs {
                check_queue = !self.try_pop_queue();
            }
        }
    }

    /// Try to pop all jobs on the queue: returns `true` if it was able to do so successfully or
    /// `false` otherwise.
    fn try_pop_queue(&mut self) -> bool {
        let snare = Arc::clone(&self.snare);
        let mut queue = snare.queue.lock().unwrap();
        loop {
            if self.num_running == self.maxjobs && !queue.is_empty() {
                return false;
            }
            let pjob = queue.pop(|repo_id| {
                self.running.iter().any(|jobslot| {
                    if let Some(job) = jobslot {
                        repo_id == job.repo_id
                    } else {
                        false
                    }
                })
            });
            match pjob {
                Some(qj) => {
                    debug_assert!(self.num_running < self.maxjobs);
                    match self.try_job(qj) {
                        Ok(j) => {
                            // The unwrap is safe since we've already checked that there's room to
                            // run at least 1 job.
                            let i = self.running.iter().position(|x| x.is_none()).unwrap();
                            self.running[i] = Some(j);
                            self.num_running += 1;
                            self.update_pollfds();
                        }
                        Err(Some(qj)) => {
                            // The job couldn't be run for temporary reasons: we'll retry later.
                            queue.push_front(qj);
                            return false;
                        }
                        Err(None) => {
                            // The job couldn't be run for permanent reasons: it has been consumed
                            // and can't be rerun. Perhaps surprisingly, this is equivalent to the
                            // job having run successfully: since it hasn't been put back on the
                            // queue, there's no need to tell the caller that we couldn't pop all
                            // the jobs on the queue.
                        }
                    }
                }
                None => {
                    // We weren't able to pop any jobs from the queue, but that doesn't mean that
                    // the queue is necessarily empty: there may be `QueueKind::Sequential` jobs in
                    // it which can't be popped until others with the same path have completed.
                    return queue.is_empty();
                }
            }
        }
    }

    /// Try starting the `QueueJob` `qj` running, returning `Ok(Job)` upon success. If for
    /// temporary reasons that is not possible, the job is returned via `Err(Some(QueueJob))` so
    /// that it can be put back in the queue and retried later. If `Err(None)` is returned then the
    /// job could not be run (either because there is no command, or because there was a permanent
    /// error, and the user was appropriately notified) and the job is consumed.
    fn try_job(&mut self, qj: QueueJob) -> Result<Job, Option<QueueJob>> {
        let raw_cmd = match &qj.rconf.cmd {
            Some(c) => c,
            None => {
                // There is no command to run.
                return Err(None);
            }
        };

        // Write the JSON to an unnamed temporary file.
        let json_path = match NamedTempFile::new() {
            Ok(tfile) => match tfile.into_temp_path().keep() {
                Ok(p) => {
                    if let Err(e) = fs::write(&p, qj.json_str.as_bytes()) {
                        self.snare.error_err("Couldn't write JSON file.", e);
                        remove_file(p).ok();
                        return Err(Some(qj));
                    }
                    p
                }
                Err(e) => {
                    self.snare.error_err("Couldn't create temporary file.", e);
                    return Err(Some(qj));
                }
            },
            Err(e) => {
                self.snare.error_err("Couldn't create temporary file.", e);
                return Err(Some(qj));
            }
        };

        // We combine the child process's stderr/stdout and write them to an unnamed temporary
        // file `stderrout_file`.
        if let Ok(tempdir) = tempdir() {
            if let Ok(stderrout) = NamedTempFile::new() {
                if set_nonblock(stderrout.as_file().as_raw_fd()).is_ok() {
                    if let Some(json_path_str) = json_path.to_str() {
                        let cmd = cmd_replace(
                            raw_cmd,
                            &qj.event_type,
                            &qj.owner,
                            &qj.repo,
                            json_path_str,
                        );
                        let child = match Command::new(&self.shell)
                            .arg("-c")
                            .arg(cmd)
                            .current_dir(tempdir.path())
                            .stderr(process::Stdio::piped())
                            .stdout(process::Stdio::piped())
                            .stdin(process::Stdio::null())
                            .spawn()
                        {
                            Ok(c) => c,
                            Err(e) => {
                                self.snare.error_err("Can't spawn command: {:?}", e);
                                return Err(None);
                            }
                        };

                        // Since we've asked for stderr/stdout to be captured, the unwrap()s should
                        // be safe, though the Rust docs are slightly vague on this.
                        let stderr = child.stderr.as_ref().unwrap();
                        let stdout = child.stdout.as_ref().unwrap();

                        let stderr_fd = stderr.as_raw_fd();
                        let stdout_fd = stdout.as_raw_fd();
                        if let Err(e) =
                            set_nonblock(stderr_fd).and_then(|_| set_nonblock(stdout_fd))
                        {
                            self.snare
                                .error_err("Can't set file descriptors to non-blocking: {:?}", e);
                            return Err(None);
                        }

                        // This unwrap() is, in theory, unsafe because we could exceed the timeout
                        // duration. However, a quick back-of-the-envelope calculation suggests
                        // that, assuming `Instant` is a `u64`, this could only happen with an
                        // uptime of over 500,000,000 years. This seems adequately long that I'm
                        // happy to take the risk on the unwrap().
                        let finish_by = Instant::now()
                            .checked_add(Duration::from_millis(
                                qj.rconf.timeout.saturating_mul(1000),
                            ))
                            .unwrap();

                        return Ok(Job {
                            is_errorcmd: false,
                            repo_id: qj.repo_id,
                            event_type: qj.event_type,
                            owner: qj.owner,
                            repo: qj.repo,
                            finish_by,
                            child,
                            tempdir,
                            json_path,
                            stderrout,
                            stderr_hup: false,
                            stdout_hup: false,
                            rconf: qj.rconf,
                        });
                    }
                }
            }
        }

        Err(Some(qj))
    }

    /// After a job has been inserted / removed from `self.running`, this function must be called
    /// so that `poll()` is called with up-to-date file descriptors.
    fn update_pollfds(&mut self) {
        for (i, jobslot) in self.running.iter().enumerate() {
            let (stderr_fd, stdout_fd) = if let Some(job) = jobslot {
                // Since we've asked for stderr/stdout to be captured, the unwrap()s should be
                // safe, though the Rust docs are slightly vague on this.
                let stderr_fd = if job.stderr_hup {
                    -1
                } else {
                    job.child.stderr.as_ref().unwrap().as_raw_fd()
                };
                let stdout_fd = if job.stdout_hup {
                    -1
                } else {
                    job.child.stdout.as_ref().unwrap().as_raw_fd()
                };
                (stderr_fd, stdout_fd)
            } else {
                (-1, -1)
            };
            self.pollfds[i * 2] = PollFd::new(stderr_fd, PollFlags::POLLIN);
            self.pollfds[i * 2 + 1] = PollFd::new(stdout_fd, PollFlags::POLLIN);
        }
        self.pollfds[self.maxjobs * 2] = PollFd::new(self.snare.event_read_fd, PollFlags::POLLIN);
    }

    /// If SIGHUP has been received, reload the config, and update self.maxjobs if possible.
    fn check_for_sighup(&mut self) {
        self.snare.check_for_sighup();

        let new_maxjobs = self.snare.conf.lock().unwrap().maxjobs;
        if new_maxjobs > self.maxjobs {
            // The user now wants to allow more jobs which we can do simply and safely -- even if
            // there are jobs running -- by extending self.running and self.pollfds with blank
            // entries.
            self.running.resize_with(new_maxjobs, || None);
            self.pollfds
                .resize_with(new_maxjobs * 2 + 1, || PollFd::new(-1, PollFlags::empty()));
            self.maxjobs = new_maxjobs;
            self.update_pollfds();
        } else if new_maxjobs < self.maxjobs && self.num_running == 0 {
            // The user wants to allow fewer jobs. This is somewhat hard because we may be running
            // jobs, and possibly more than the user now wants us to be running. We could be clever
            // and compact self.running and self.pollfds, though that may still not drop the number
            // of jobs down enough. We currently do the laziest thing: we wait until there are no
            // running jobs and then truncate self.running and self.pollfds. If there are always
            // running jobs then this means we will never reduce the number of maximum possible
            // jobs.
            self.running.truncate(new_maxjobs);
            self.pollfds.truncate(new_maxjobs * 2 + 1);
            self.maxjobs = new_maxjobs;
            self.update_pollfds();
        }
    }

    /// If the user has specified an email address, send the contents of
    fn run_errorcmd(&self, job: &Job) -> Option<Child> {
        if let Some(raw_errorcmd) = &job.rconf.errorcmd {
            let errorcmd = errorcmd_replace(
                raw_errorcmd,
                &job.event_type,
                &job.owner,
                &job.repo,
                job.json_path.as_os_str().to_str().unwrap(),
                job.stderrout.path().as_os_str().to_str().unwrap(),
            );
            match Command::new(&self.shell)
                .arg("-c")
                .arg(&errorcmd)
                .current_dir(job.tempdir.path())
                .stderr(process::Stdio::null())
                .stdout(process::Stdio::null())
                .stdin(process::Stdio::null())
                .spawn()
            {
                Ok(c) => return Some(c),
                Err(e) => self
                    .snare
                    .error_err(&format!("Can't spawn '{}'", errorcmd), e),
            }
        }
        None
    }
}

/// Take the string `raw_cmd` and return a string with the following replaced:
///   * `%e` with `event_type`
///   * `%o` with `owner`
///   * `%r` with `repo`
///   * `%j` with `json_path`
///
/// Note that `raw_cmd` *must* have been validated by config::GitHub::verify_cmd_str or undefined
/// behaviour will occur.
fn cmd_replace(
    raw_cmd: &str,
    event_type: &str,
    owner: &str,
    repo: &str,
    json_path: &str,
) -> String {
    let modifiers = [
        ('e', event_type),
        ('o', owner),
        ('r', repo),
        ('j', json_path),
        ('%', "%"),
    ]
    .iter()
    .cloned()
    .collect();
    replace(raw_cmd, modifiers)
}

/// Take the string `raw_errorcmd` and return a string with the following replaced:
///   * `%e` with `event_type`
///   * `%o` with `owner`
///   * `%r` with `repo`
///   * `%j` with `json_path`
///   * `%s` with `stderrout_path`
///
/// Note that `raw_cmd` *must* have been validated by config::GitHub::verify_errorcmd_str or
/// undefined behaviour will occur.
fn errorcmd_replace(
    raw_errorcmd: &str,
    event_type: &str,
    owner: &str,
    repo: &str,
    json_path: &str,
    stderrout_path: &str,
) -> String {
    let modifiers = [
        ('e', event_type),
        ('o', owner),
        ('r', repo),
        ('j', json_path),
        ('s', stderrout_path),
        ('%', "%"),
    ]
    .iter()
    .cloned()
    .collect();
    replace(raw_errorcmd, modifiers)
}

fn replace(s: &str, modifiers: HashMap<char, &str>) -> String {
    // Except in the presence of '%%'s, the output string will be at least as long as the input
    // string, so starting at that capacity is a reasonable heuristic.
    let mut n = String::with_capacity(s.len());
    let mut i = 0;
    while i < s.len() {
        if s[i..].starts_with('%') {
            let mdf = s[i + 1..].chars().next().unwrap(); // modifier
            n.push_str(modifiers.get(&mdf).unwrap());
            i += 1 + mdf.len_utf8();
        } else {
            let c = s[i..].chars().next().unwrap();
            n.push(c);
            i += c.len_utf8();
        }
    }
    n
}

struct Job {
    /// Set to `false` if this is a normal command and `true` if it is an error command.
    is_errorcmd: bool,
    /// The repo identifier. This is used to determine if a given repository already has jobs
    /// running or not. Typically of the form "provider/owner/repo".
    repo_id: String,
    /// The event type.
    event_type: String,
    /// The repository owner's name.
    owner: String,
    /// The repository name.
    repo: String,
    /// What time must this Job have completed by? If it exceeds this time, it will be terminated.
    finish_by: Instant,
    /// The child process itself.
    child: Child,
    /// This TempDir will be dropped, and its file system contents removed, when this Job is dropped.
    tempdir: TempDir,
    /// We are responsible for manually cleaning up the JSON file stored in `json_path`.
    json_path: PathBuf,
    /// The temporary file to which we write combined stderr/stdout.
    stderrout: NamedTempFile,
    /// Has the child process's stderr been closed?
    stderr_hup: bool,
    /// Has the child process's stdout been closed?
    stdout_hup: bool,
    /// The `RepoConfig` for this job.
    rconf: RepoConfig,
}

fn set_nonblock(fd: RawFd) -> Result<(), Box<dyn Error>> {
    let mut flags = fcntl(fd, FcntlArg::F_GETFL)?;
    flags |= OFlag::O_NONBLOCK.bits();
    fcntl(
        fd,
        FcntlArg::F_SETFL(unsafe { OFlag::from_bits_unchecked(flags) }),
    )?;
    Ok(())
}

pub(crate) fn attend(snare: Arc<Snare>) -> Result<(), Box<dyn Error>> {
    let mut rn = JobRunner::new(snare)?;
    thread::spawn(move || rn.attend());
    Ok(())
}

#[cfg(test)]
mod test {
    use super::*;

    #[test]
    fn test_cmd_replace() {
        assert_eq!(cmd_replace("", "", "", "", ""), "");
        assert_eq!(cmd_replace("a", "", "", "", ""), "a");
        assert_eq!(
            cmd_replace("%% %e %o %r %j %%", "ee", "oo", "rr", "jj"),
            "% ee oo rr jj %"
        );
    }

    #[test]
    fn test_errorcmd_replace() {
        assert_eq!(errorcmd_replace("", "", "", "", "", ""), "");
        assert_eq!(errorcmd_replace("a", "", "", "", "", ""), "a");
        assert_eq!(
            errorcmd_replace("%% %e %o %r %j %s %%", "ee", "oo", "rr", "jj", "ss"),
            "% ee oo rr jj ss %"
        );
    }
}